mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-05-10 01:38:06 +00:00
chore: store self protocols in protobook (#760)
This commit is contained in:
parent
20d9d98b3a
commit
186f9b758e
77
doc/API.md
77
doc/API.md
@ -37,6 +37,7 @@
|
||||
* [`peerStore.protoBook.add`](#peerstoreprotobookadd)
|
||||
* [`peerStore.protoBook.delete`](#peerstoreprotobookdelete)
|
||||
* [`peerStore.protoBook.get`](#peerstoreprotobookget)
|
||||
* [`peerStore.protoBook.remove`](#peerstoreprotobookremove)
|
||||
* [`peerStore.protoBook.set`](#peerstoreprotobookset)
|
||||
* [`peerStore.delete`](#peerstoredelete)
|
||||
* [`peerStore.get`](#peerstoreget)
|
||||
@ -875,32 +876,6 @@ Consider using `addressBook.add()` if you're not sure this is what you want to d
|
||||
peerStore.addressBook.add(peerId, multiaddr)
|
||||
```
|
||||
|
||||
### peerStore.protoBook.add
|
||||
|
||||
Add known `protocols` of a given peer.
|
||||
|
||||
`peerStore.protoBook.add(peerId, protocols)`
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Description |
|
||||
|------|------|-------------|
|
||||
| peerId | [`PeerId`][peer-id] | peerId to set |
|
||||
| protocols | `Array<string>` | protocols to add |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `ProtoBook` | Returns the Proto Book component |
|
||||
|
||||
#### Example
|
||||
|
||||
```js
|
||||
peerStore.protoBook.add(peerId, protocols)
|
||||
```
|
||||
|
||||
|
||||
### peerStore.keyBook.delete
|
||||
|
||||
Delete the provided peer from the book.
|
||||
@ -1123,6 +1098,31 @@ Set known metadata of a given `peerId`.
|
||||
peerStore.metadataBook.set(peerId, 'location', uint8ArrayFromString('Berlin'))
|
||||
```
|
||||
|
||||
### peerStore.protoBook.add
|
||||
|
||||
Add known `protocols` of a given peer.
|
||||
|
||||
`peerStore.protoBook.add(peerId, protocols)`
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Description |
|
||||
|------|------|-------------|
|
||||
| peerId | [`PeerId`][peer-id] | peerId to set |
|
||||
| protocols | `Array<string>` | protocols to add |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `ProtoBook` | Returns the Proto Book component |
|
||||
|
||||
#### Example
|
||||
|
||||
```js
|
||||
peerStore.protoBook.add(peerId, protocols)
|
||||
```
|
||||
|
||||
### peerStore.protoBook.delete
|
||||
|
||||
Delete the provided peer from the book.
|
||||
@ -1179,6 +1179,31 @@ peerStore.protoBook.get(peerId)
|
||||
// [ '/proto/1.0.0', '/proto/1.1.0' ]
|
||||
```
|
||||
|
||||
### peerStore.protoBook.remove
|
||||
|
||||
Remove given `protocols` of a given peer.
|
||||
|
||||
`peerStore.protoBook.remove(peerId, protocols)`
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Description |
|
||||
|------|------|-------------|
|
||||
| peerId | [`PeerId`][peer-id] | peerId to set |
|
||||
| protocols | `Array<string>` | protocols to remove |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `ProtoBook` | Returns the Proto Book component |
|
||||
|
||||
#### Example
|
||||
|
||||
```js
|
||||
peerStore.protoBook.remove(peerId, protocols)
|
||||
```
|
||||
|
||||
### peerStore.protoBook.set
|
||||
|
||||
Set known `protocols` of a given peer.
|
||||
|
@ -51,9 +51,8 @@ class IdentifyService {
|
||||
* @class
|
||||
* @param {object} options
|
||||
* @param {Libp2p} options.libp2p
|
||||
* @param {Map<string, handler>} options.protocols - A reference to the protocols we support
|
||||
*/
|
||||
constructor ({ libp2p, protocols }) {
|
||||
constructor ({ libp2p }) {
|
||||
/**
|
||||
* @property {PeerStore}
|
||||
*/
|
||||
@ -74,8 +73,6 @@ class IdentifyService {
|
||||
*/
|
||||
this._libp2p = libp2p
|
||||
|
||||
this._protocols = protocols
|
||||
|
||||
this.handleMessage = this.handleMessage.bind(this)
|
||||
|
||||
// Store self host metadata
|
||||
@ -97,6 +94,13 @@ class IdentifyService {
|
||||
this.pushToPeerStore()
|
||||
}
|
||||
})
|
||||
|
||||
// When self protocols change, trigger identify-push
|
||||
this.peerStore.on('change:protocols', ({ peerId }) => {
|
||||
if (peerId.toString() === this.peerId.toString()) {
|
||||
this.pushToPeerStore()
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
@ -108,7 +112,7 @@ class IdentifyService {
|
||||
async push (connections) {
|
||||
const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)
|
||||
const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes)
|
||||
const protocols = Array.from(this._protocols.keys())
|
||||
const protocols = this.peerStore.protoBook.get(this.peerId) || []
|
||||
|
||||
const pushes = connections.map(async connection => {
|
||||
try {
|
||||
@ -139,6 +143,11 @@ class IdentifyService {
|
||||
* @returns {void}
|
||||
*/
|
||||
pushToPeerStore () {
|
||||
// Do not try to push if libp2p node is not running
|
||||
if (!this._libp2p.isStarted()) {
|
||||
return
|
||||
}
|
||||
|
||||
const connections = []
|
||||
let connection
|
||||
for (const peer of this.peerStore.peers.values()) {
|
||||
@ -258,6 +267,7 @@ class IdentifyService {
|
||||
}
|
||||
|
||||
const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)
|
||||
const protocols = this.peerStore.protoBook.get(this.peerId) || []
|
||||
|
||||
const message = Message.encode({
|
||||
protocolVersion: this._host.protocolVersion,
|
||||
@ -266,7 +276,7 @@ class IdentifyService {
|
||||
listenAddrs: this._libp2p.multiaddrs.map((ma) => ma.bytes),
|
||||
signedPeerRecord,
|
||||
observedAddr: connection.remoteAddr.bytes,
|
||||
protocols: Array.from(this._protocols.keys())
|
||||
protocols
|
||||
})
|
||||
|
||||
try {
|
||||
|
17
src/index.js
17
src/index.js
@ -158,10 +158,7 @@ class Libp2p extends EventEmitter {
|
||||
})
|
||||
|
||||
// Add the identify service since we can multiplex
|
||||
this.identifyService = new IdentifyService({
|
||||
libp2p: this,
|
||||
protocols: this.upgrader.protocols
|
||||
})
|
||||
this.identifyService = new IdentifyService({ libp2p: this })
|
||||
this.handle(Object.values(IDENTIFY_PROTOCOLS), this.identifyService.handleMessage)
|
||||
}
|
||||
|
||||
@ -442,10 +439,8 @@ class Libp2p extends EventEmitter {
|
||||
this.upgrader.protocols.set(protocol, handler)
|
||||
})
|
||||
|
||||
// Only push if libp2p is running
|
||||
if (this.isStarted() && this.identifyService) {
|
||||
this.identifyService.pushToPeerStore()
|
||||
}
|
||||
// Add new protocols to self protocols in the Protobook
|
||||
this.peerStore.protoBook.add(this.peerId, protocols)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -460,10 +455,8 @@ class Libp2p extends EventEmitter {
|
||||
this.upgrader.protocols.delete(protocol)
|
||||
})
|
||||
|
||||
// Only push if libp2p is running
|
||||
if (this.isStarted() && this.identifyService) {
|
||||
this.identifyService.pushToPeerStore()
|
||||
}
|
||||
// Remove protocols from self protocols in the Protobook
|
||||
this.peerStore.protoBook.remove(this.peerId, protocols)
|
||||
}
|
||||
|
||||
async _onStarting () {
|
||||
|
@ -112,13 +112,50 @@ class ProtoBook extends Book {
|
||||
return this
|
||||
}
|
||||
|
||||
protocols = [...newSet]
|
||||
|
||||
this._setData(peerId, newSet)
|
||||
log(`added provided protocols for ${id}`)
|
||||
|
||||
return this
|
||||
}
|
||||
|
||||
/**
|
||||
* Removes known protocols of a provided peer.
|
||||
* If the protocols did not exist before, nothing will be done.
|
||||
*
|
||||
* @param {PeerId} peerId
|
||||
* @param {Array<string>} protocols
|
||||
* @returns {ProtoBook}
|
||||
*/
|
||||
remove (peerId, protocols) {
|
||||
if (!PeerId.isPeerId(peerId)) {
|
||||
log.error('peerId must be an instance of peer-id to store data')
|
||||
throw errcode(new Error('peerId must be an instance of peer-id'), ERR_INVALID_PARAMETERS)
|
||||
}
|
||||
|
||||
if (!protocols) {
|
||||
log.error('protocols must be provided to store data')
|
||||
throw errcode(new Error('protocols must be provided'), ERR_INVALID_PARAMETERS)
|
||||
}
|
||||
|
||||
const id = peerId.toB58String()
|
||||
const recSet = this.data.get(id)
|
||||
|
||||
if (recSet) {
|
||||
const newSet = new Set([
|
||||
...recSet
|
||||
].filter((p) => !protocols.includes(p)))
|
||||
|
||||
// Any protocol removed?
|
||||
if (recSet.size === newSet.size) {
|
||||
return this
|
||||
}
|
||||
|
||||
this._setData(peerId, newSet)
|
||||
log(`removed provided protocols for ${id}`)
|
||||
}
|
||||
|
||||
return this
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = ProtoBook
|
||||
|
@ -29,18 +29,21 @@ const remoteAddr = MULTIADDRS_WEBSOCKETS[0]
|
||||
const listenMaddrs = [multiaddr('/ip4/127.0.0.1/tcp/15002/ws')]
|
||||
|
||||
describe('Identify', () => {
|
||||
let localPeer
|
||||
let remotePeer
|
||||
const protocols = new Map([
|
||||
[multicodecs.IDENTIFY, () => {}],
|
||||
[multicodecs.IDENTIFY_PUSH, () => {}]
|
||||
])
|
||||
let localPeer, localPeerStore
|
||||
let remotePeer, remotePeerStore
|
||||
const protocols = [multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH]
|
||||
|
||||
before(async () => {
|
||||
[localPeer, remotePeer] = (await Promise.all([
|
||||
PeerId.createFromJSON(Peers[0]),
|
||||
PeerId.createFromJSON(Peers[1])
|
||||
]))
|
||||
|
||||
localPeerStore = new PeerStore({ peerId: localPeer })
|
||||
localPeerStore.protoBook.set(localPeer, protocols)
|
||||
|
||||
remotePeerStore = new PeerStore({ peerId: remotePeer })
|
||||
remotePeerStore.protoBook.set(remotePeer, protocols)
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
@ -52,22 +55,19 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
peerStore: localPeerStore,
|
||||
multiaddrs: listenMaddrs,
|
||||
_options: { host: {} }
|
||||
},
|
||||
protocols
|
||||
isStarted: () => true
|
||||
}
|
||||
})
|
||||
|
||||
const remoteIdentify = new IdentifyService({
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
peerStore: remotePeerStore,
|
||||
multiaddrs: listenMaddrs,
|
||||
_options: { host: {} }
|
||||
},
|
||||
protocols
|
||||
isStarted: () => true
|
||||
}
|
||||
})
|
||||
|
||||
const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
|
||||
@ -110,22 +110,20 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
peerStore: localPeerStore,
|
||||
multiaddrs: listenMaddrs,
|
||||
_options: { host: {} }
|
||||
},
|
||||
protocols
|
||||
isStarted: () => true
|
||||
}
|
||||
})
|
||||
|
||||
const remoteIdentify = new IdentifyService({
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
peerStore: remotePeerStore,
|
||||
multiaddrs: listenMaddrs,
|
||||
_options: { host: {} }
|
||||
},
|
||||
protocols
|
||||
isStarted: () => true
|
||||
}
|
||||
})
|
||||
|
||||
const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
|
||||
@ -171,21 +169,17 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
multiaddrs: [],
|
||||
_options: { host: {} }
|
||||
},
|
||||
protocols
|
||||
peerStore: localPeerStore,
|
||||
multiaddrs: []
|
||||
}
|
||||
})
|
||||
const remoteIdentify = new IdentifyService({
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
multiaddrs: [],
|
||||
_options: { host: {} }
|
||||
},
|
||||
protocols
|
||||
peerStore: remotePeerStore,
|
||||
multiaddrs: []
|
||||
}
|
||||
})
|
||||
|
||||
const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
|
||||
@ -242,35 +236,38 @@ describe('Identify', () => {
|
||||
|
||||
describe('push', () => {
|
||||
it('should be able to push identify updates to another peer', async () => {
|
||||
const storedProtocols = [multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0']
|
||||
const connectionManager = new EventEmitter()
|
||||
connectionManager.getConnection = () => { }
|
||||
|
||||
const localPeerStore = new PeerStore({ peerId: localPeer })
|
||||
localPeerStore.protoBook.set(localPeer, storedProtocols)
|
||||
|
||||
const localIdentify = new IdentifyService({
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
peerStore: localPeerStore,
|
||||
multiaddrs: listenMaddrs,
|
||||
_options: { host: {} }
|
||||
},
|
||||
protocols: new Map([
|
||||
[multicodecs.IDENTIFY],
|
||||
[multicodecs.IDENTIFY_PUSH],
|
||||
['/echo/1.0.0']
|
||||
])
|
||||
isStarted: () => true
|
||||
}
|
||||
})
|
||||
|
||||
const remotePeerStore = new PeerStore({ peerId: remotePeer })
|
||||
remotePeerStore.protoBook.set(remotePeer, storedProtocols)
|
||||
|
||||
const remoteIdentify = new IdentifyService({
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager,
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
peerStore: remotePeerStore,
|
||||
multiaddrs: [],
|
||||
_options: { host: {} }
|
||||
isStarted: () => true
|
||||
}
|
||||
})
|
||||
|
||||
// Setup peer protocols and multiaddrs
|
||||
const localProtocols = new Set([multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0'])
|
||||
const localProtocols = new Set(storedProtocols)
|
||||
const localConnectionMock = { newStream: () => { } }
|
||||
const remoteConnectionMock = { remotePeer: localPeer }
|
||||
|
||||
@ -309,35 +306,39 @@ describe('Identify', () => {
|
||||
|
||||
// LEGACY
|
||||
it('should be able to push identify updates to another peer with no certified peer records support', async () => {
|
||||
const storedProtocols = [multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0']
|
||||
const connectionManager = new EventEmitter()
|
||||
connectionManager.getConnection = () => { }
|
||||
|
||||
const localPeerStore = new PeerStore({ peerId: localPeer })
|
||||
localPeerStore.protoBook.set(localPeer, storedProtocols)
|
||||
|
||||
const localIdentify = new IdentifyService({
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
peerStore: localPeerStore,
|
||||
multiaddrs: listenMaddrs,
|
||||
_options: { host: {} }
|
||||
},
|
||||
protocols: new Map([
|
||||
[multicodecs.IDENTIFY],
|
||||
[multicodecs.IDENTIFY_PUSH],
|
||||
['/echo/1.0.0']
|
||||
])
|
||||
isStarted: () => true
|
||||
}
|
||||
})
|
||||
|
||||
const remotePeerStore = new PeerStore({ peerId: remotePeer })
|
||||
remotePeerStore.protoBook.set(remotePeer, storedProtocols)
|
||||
|
||||
const remoteIdentify = new IdentifyService({
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager,
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
multiaddrs: [],
|
||||
_options: { host: {} }
|
||||
_options: { host: {} },
|
||||
isStarted: () => true
|
||||
}
|
||||
})
|
||||
|
||||
// Setup peer protocols and multiaddrs
|
||||
const localProtocols = new Set([multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0'])
|
||||
const localProtocols = new Set(storedProtocols)
|
||||
const localConnectionMock = { newStream: () => {} }
|
||||
const remoteConnectionMock = { remotePeer: localPeer }
|
||||
|
||||
|
@ -5,7 +5,9 @@ const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
const { expect } = chai
|
||||
|
||||
const sinon = require('sinon')
|
||||
const pDefer = require('p-defer')
|
||||
const pWaitFor = require('p-wait-for')
|
||||
|
||||
const PeerStore = require('../../src/peer-store')
|
||||
|
||||
@ -224,6 +226,96 @@ describe('protoBook', () => {
|
||||
})
|
||||
})
|
||||
|
||||
describe('protoBook.remove', () => {
|
||||
let peerStore, pb
|
||||
|
||||
beforeEach(() => {
|
||||
peerStore = new PeerStore({ peerId })
|
||||
pb = peerStore.protoBook
|
||||
})
|
||||
|
||||
afterEach(() => {
|
||||
peerStore.removeAllListeners()
|
||||
})
|
||||
|
||||
it('throws invalid parameters error if invalid PeerId is provided', () => {
|
||||
expect(() => {
|
||||
pb.remove('invalid peerId')
|
||||
}).to.throw(ERR_INVALID_PARAMETERS)
|
||||
})
|
||||
|
||||
it('throws invalid parameters error if no protocols provided', () => {
|
||||
expect(() => {
|
||||
pb.remove(peerId)
|
||||
}).to.throw(ERR_INVALID_PARAMETERS)
|
||||
})
|
||||
|
||||
it('removes the given protocol and emits change event', async () => {
|
||||
const spy = sinon.spy()
|
||||
|
||||
const supportedProtocols = ['protocol1', 'protocol2']
|
||||
const removedProtocols = ['protocol1']
|
||||
const finalProtocols = supportedProtocols.filter(p => !removedProtocols.includes(p))
|
||||
|
||||
peerStore.on('change:protocols', spy)
|
||||
|
||||
// Replace
|
||||
pb.set(peerId, supportedProtocols)
|
||||
let protocols = pb.get(peerId)
|
||||
expect(protocols).to.have.deep.members(supportedProtocols)
|
||||
|
||||
// Remove
|
||||
pb.remove(peerId, removedProtocols)
|
||||
protocols = pb.get(peerId)
|
||||
expect(protocols).to.have.deep.members(finalProtocols)
|
||||
|
||||
await pWaitFor(() => spy.callCount === 2)
|
||||
|
||||
const [firstCallArgs] = spy.firstCall.args
|
||||
const [secondCallArgs] = spy.secondCall.args
|
||||
expect(arraysAreEqual(firstCallArgs.protocols, supportedProtocols))
|
||||
expect(arraysAreEqual(secondCallArgs.protocols, finalProtocols))
|
||||
})
|
||||
|
||||
it('emits on remove if the content changes', () => {
|
||||
const spy = sinon.spy()
|
||||
|
||||
const supportedProtocols = ['protocol1', 'protocol2']
|
||||
const removedProtocols = ['protocol2']
|
||||
const finalProtocols = supportedProtocols.filter(p => !removedProtocols.includes(p))
|
||||
|
||||
peerStore.on('change:protocols', spy)
|
||||
|
||||
// set
|
||||
pb.set(peerId, supportedProtocols)
|
||||
|
||||
// remove (content already existing)
|
||||
pb.remove(peerId, removedProtocols)
|
||||
const protocols = pb.get(peerId)
|
||||
expect(protocols).to.have.deep.members(finalProtocols)
|
||||
|
||||
return pWaitFor(() => spy.callCount === 2)
|
||||
})
|
||||
|
||||
it('does not emit on remove if the content does not change', () => {
|
||||
const spy = sinon.spy()
|
||||
|
||||
const supportedProtocols = ['protocol1', 'protocol2']
|
||||
const removedProtocols = ['protocol3']
|
||||
|
||||
peerStore.on('change:protocols', spy)
|
||||
|
||||
// set
|
||||
pb.set(peerId, supportedProtocols)
|
||||
|
||||
// remove
|
||||
pb.remove(peerId, removedProtocols)
|
||||
|
||||
// Only one event
|
||||
expect(spy.callCount).to.eql(1)
|
||||
})
|
||||
})
|
||||
|
||||
describe('protoBook.get', () => {
|
||||
let peerStore, pb
|
||||
|
||||
|
Loading…
x
Reference in New Issue
Block a user