fix: clean up peer discovery flow (#494)

* fix: clean up peer discovery flow

* test(fix): let libp2p start after connecting

* test(fix): dont auto dial in disco tests
This commit is contained in:
Jacob Heun 2019-12-03 20:14:15 +01:00
parent dbb9e57311
commit f3eb1f1201
No known key found for this signature in database
GPG Key ID: CA5A94C15809879F
8 changed files with 71 additions and 66 deletions

View File

@ -148,10 +148,9 @@ class IdentifyService {
* *
* @async * @async
* @param {Connection} connection * @param {Connection} connection
* @param {PeerID} expectedPeer The PeerId the identify response should match
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async identify (connection, expectedPeer) { async identify (connection) {
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY) const { stream } = await connection.newStream(MULTICODEC_IDENTIFY)
const [data] = await pipe( const [data] = await pipe(
stream, stream,
@ -181,7 +180,7 @@ class IdentifyService {
const id = await PeerId.createFromPubKey(publicKey) const id = await PeerId.createFromPubKey(publicKey)
const peerInfo = new PeerInfo(id) const peerInfo = new PeerInfo(id)
if (expectedPeer && expectedPeer.toB58String() !== id.toB58String()) { if (connection.remotePeer.toString() !== id.toString()) {
throw errCode(new Error('identified peer does not match the expected peer'), codes.ERR_INVALID_PEER) throw errCode(new Error('identified peer does not match the expected peer'), codes.ERR_INVALID_PEER)
} }
@ -192,7 +191,7 @@ class IdentifyService {
IdentifyService.updatePeerAddresses(peerInfo, listenAddrs) IdentifyService.updatePeerAddresses(peerInfo, listenAddrs)
IdentifyService.updatePeerProtocols(peerInfo, protocols) IdentifyService.updatePeerProtocols(peerInfo, protocols)
this.registrar.peerStore.update(peerInfo) this.registrar.peerStore.replace(peerInfo)
// TODO: Track our observed address so that we can score it // TODO: Track our observed address so that we can score it
log('received observed address of %s', observedAddr) log('received observed address of %s', observedAddr)
} }
@ -283,7 +282,7 @@ class IdentifyService {
IdentifyService.updatePeerProtocols(peerInfo, message.protocols) IdentifyService.updatePeerProtocols(peerInfo, message.protocols)
// Update the peer in the PeerStore // Update the peer in the PeerStore
this.registrar.peerStore.update(peerInfo) this.registrar.peerStore.replace(peerInfo)
} }
} }

View File

@ -54,9 +54,7 @@ class Libp2p extends EventEmitter {
this.upgrader = new Upgrader({ this.upgrader = new Upgrader({
localPeer: this.peerInfo.id, localPeer: this.peerInfo.id,
onConnection: (connection) => { onConnection: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer) const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer))
this.peerStore.put(peerInfo)
this.registrar.onConnect(peerInfo, connection) this.registrar.onConnect(peerInfo, connection)
this.emit('peer:connect', peerInfo) this.emit('peer:connect', peerInfo)
}, },
@ -144,7 +142,7 @@ class Libp2p extends EventEmitter {
this.peerRouting = peerRouting(this) this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this) this.contentRouting = contentRouting(this)
this._peerDiscovered = this._peerDiscovered.bind(this) this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this)
} }
/** /**
@ -340,7 +338,7 @@ class Libp2p extends EventEmitter {
// TODO: this should be modified once random-walk is used as // TODO: this should be modified once random-walk is used as
// the other discovery modules // the other discovery modules
this._dht.on('peer', this._peerDiscovered) this._dht.on('peer', this._onDiscoveryPeer)
} }
} }
@ -351,6 +349,11 @@ class Libp2p extends EventEmitter {
_onDidStart () { _onDidStart () {
this._isStarted = true this._isStarted = true
this.peerStore.on('peer', peerInfo => {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
})
// Peer discovery // Peer discovery
this._setupPeerDiscovery() this._setupPeerDiscovery()
@ -362,24 +365,17 @@ class Libp2p extends EventEmitter {
} }
/** /**
* Handles discovered peers. Each discovered peer will be emitted via * Called whenever peer discovery services emit `peer` events.
* the `peer:discovery` event. If auto dial is enabled for libp2p * Known peers may be emitted.
* and the current connection count is under the low watermark, the
* peer will be dialed.
* @private * @private
* @param {PeerInfo} peerInfo * @param {PeerInfo} peerInfo
*/ */
_peerDiscovered (peerInfo) { _onDiscoveryPeer (peerInfo) {
if (peerInfo.id.toB58String() === this.peerInfo.id.toB58String()) { if (peerInfo.id.toString() === this.peerInfo.id.toString()) {
log.error(new Error(codes.ERR_DISCOVERED_SELF)) log.error(new Error(codes.ERR_DISCOVERED_SELF))
return return
} }
peerInfo = this.peerStore.put(peerInfo) this.peerStore.put(peerInfo)
if (!this.isStarted()) return
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
} }
/** /**
@ -432,7 +428,7 @@ class Libp2p extends EventEmitter {
discoveryService = DiscoveryService discoveryService = DiscoveryService
} }
discoveryService.on('peer', this._peerDiscovered) discoveryService.on('peer', this._onDiscoveryPeer)
this._discovery.push(discoveryService) this._discovery.push(discoveryService)
} }
} }

View File

@ -45,7 +45,7 @@ class PeerStore extends EventEmitter {
let peer let peer
// Already know the peer? // Already know the peer?
if (this.peers.has(peerInfo.id.toB58String())) { if (this.has(peerInfo.id)) {
peer = this.update(peerInfo) peer = this.update(peerInfo)
} else { } else {
peer = this.add(peerInfo) peer = this.add(peerInfo)
@ -118,15 +118,12 @@ class PeerStore extends EventEmitter {
if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size || if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size ||
multiaddrsIntersection.length !== recorded.multiaddrs.size) { multiaddrsIntersection.length !== recorded.multiaddrs.size) {
// recorded.multiaddrs = peerInfo.multiaddrs
recorded.multiaddrs.clear()
for (const ma of peerInfo.multiaddrs.toArray()) { for (const ma of peerInfo.multiaddrs.toArray()) {
recorded.multiaddrs.add(ma) recorded.multiaddrs.add(ma)
} }
this.emit('change:multiaddrs', { this.emit('change:multiaddrs', {
peerInfo: peerInfo, peerInfo: recorded,
multiaddrs: recorded.multiaddrs.toArray() multiaddrs: recorded.multiaddrs.toArray()
}) })
} }
@ -139,14 +136,12 @@ class PeerStore extends EventEmitter {
if (protocolsIntersection.size !== peerInfo.protocols.size || if (protocolsIntersection.size !== peerInfo.protocols.size ||
protocolsIntersection.size !== recorded.protocols.size) { protocolsIntersection.size !== recorded.protocols.size) {
recorded.protocols.clear()
for (const protocol of peerInfo.protocols) { for (const protocol of peerInfo.protocols) {
recorded.protocols.add(protocol) recorded.protocols.add(protocol)
} }
this.emit('change:protocols', { this.emit('change:protocols', {
peerInfo: peerInfo, peerInfo: recorded,
protocols: Array.from(recorded.protocols) protocols: Array.from(recorded.protocols)
}) })
} }
@ -170,13 +165,7 @@ class PeerStore extends EventEmitter {
peerId = peerId.toB58String() peerId = peerId.toB58String()
} }
const peerInfo = this.peers.get(peerId) return this.peers.get(peerId)
if (peerInfo) {
return peerInfo
}
return undefined
} }
/** /**
@ -217,6 +206,16 @@ class PeerStore extends EventEmitter {
this.remove(peerInfo.id.toB58String()) this.remove(peerInfo.id.toB58String())
this.add(peerInfo) this.add(peerInfo)
// This should be cleaned up in PeerStore v2
this.emit('change:multiaddrs', {
peerInfo,
multiaddrs: peerInfo.multiaddrs.toArray()
})
this.emit('change:protocols', {
peerInfo,
protocols: Array.from(peerInfo.protocols)
})
} }
} }

View File

@ -216,7 +216,8 @@ describe('Dialing (direct, WebSockets)', () => {
}) })
sinon.spy(libp2p.dialer.identifyService, 'identify') sinon.spy(libp2p.dialer.identifyService, 'identify')
sinon.spy(libp2p.peerStore, 'update') sinon.spy(libp2p.peerStore, 'replace')
sinon.spy(libp2p.upgrader, 'onConnection')
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
@ -225,7 +226,7 @@ describe('Dialing (direct, WebSockets)', () => {
expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1) expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1)
await libp2p.dialer.identifyService.identify.firstCall.returnValue await libp2p.dialer.identifyService.identify.firstCall.returnValue
expect(libp2p.peerStore.update.callCount).to.equal(1) expect(libp2p.peerStore.replace.callCount).to.equal(1)
}) })
it('should be able to use hangup to close connections', async () => { it('should be able to use hangup to close connections', async () => {

View File

@ -47,7 +47,7 @@ describe('Identify', () => {
protocols, protocols,
registrar: { registrar: {
peerStore: { peerStore: {
update: () => {} replace: () => {}
} }
} }
}) })
@ -57,17 +57,17 @@ describe('Identify', () => {
}) })
const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
const localConnectionMock = { newStream: () => {} } const localConnectionMock = { newStream: () => {}, remotePeer: remotePeer.id }
const remoteConnectionMock = { remoteAddr: observedAddr } const remoteConnectionMock = { remoteAddr: observedAddr }
const [local, remote] = duplexPair() const [local, remote] = duplexPair()
sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY }) sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY })
sinon.spy(localIdentify.registrar.peerStore, 'update') sinon.spy(localIdentify.registrar.peerStore, 'replace')
// Run identify // Run identify
await Promise.all([ await Promise.all([
localIdentify.identify(localConnectionMock, remotePeer.id), localIdentify.identify(localConnectionMock),
remoteIdentify.handleMessage({ remoteIdentify.handleMessage({
connection: remoteConnectionMock, connection: remoteConnectionMock,
stream: remote, stream: remote,
@ -75,9 +75,9 @@ describe('Identify', () => {
}) })
]) ])
expect(localIdentify.registrar.peerStore.update.callCount).to.equal(1) expect(localIdentify.registrar.peerStore.replace.callCount).to.equal(1)
// Validate the remote peer gets updated in the peer store // Validate the remote peer gets updated in the peer store
const call = localIdentify.registrar.peerStore.update.firstCall const call = localIdentify.registrar.peerStore.replace.firstCall
expect(call.args[0].id.bytes).to.equal(remotePeer.id.bytes) expect(call.args[0].id.bytes).to.equal(remotePeer.id.bytes)
}) })
@ -92,7 +92,7 @@ describe('Identify', () => {
}) })
const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234')
const localConnectionMock = { newStream: () => {} } const localConnectionMock = { newStream: () => {}, remotePeer }
const remoteConnectionMock = { remoteAddr: observedAddr } const remoteConnectionMock = { remoteAddr: observedAddr }
const [local, remote] = duplexPair() const [local, remote] = duplexPair()
@ -128,7 +128,7 @@ describe('Identify', () => {
peerInfo: remotePeer, peerInfo: remotePeer,
registrar: { registrar: {
peerStore: { peerStore: {
update: () => {} replace: () => {}
} }
} }
}) })
@ -148,7 +148,7 @@ describe('Identify', () => {
sinon.spy(IdentifyService, 'updatePeerAddresses') sinon.spy(IdentifyService, 'updatePeerAddresses')
sinon.spy(IdentifyService, 'updatePeerProtocols') sinon.spy(IdentifyService, 'updatePeerProtocols')
sinon.spy(remoteIdentify.registrar.peerStore, 'update') sinon.spy(remoteIdentify.registrar.peerStore, 'replace')
// Run identify // Run identify
await Promise.all([ await Promise.all([
@ -163,8 +163,8 @@ describe('Identify', () => {
expect(IdentifyService.updatePeerAddresses.callCount).to.equal(1) expect(IdentifyService.updatePeerAddresses.callCount).to.equal(1)
expect(IdentifyService.updatePeerProtocols.callCount).to.equal(1) expect(IdentifyService.updatePeerProtocols.callCount).to.equal(1)
expect(remoteIdentify.registrar.peerStore.update.callCount).to.equal(1) expect(remoteIdentify.registrar.peerStore.replace.callCount).to.equal(1)
const [peerInfo] = remoteIdentify.registrar.peerStore.update.firstCall.args const [peerInfo] = remoteIdentify.registrar.peerStore.replace.firstCall.args
expect(peerInfo.id.bytes).to.eql(localPeer.id.bytes) expect(peerInfo.id.bytes).to.eql(localPeer.id.bytes)
expect(peerInfo.multiaddrs.toArray()).to.eql([listeningAddr]) expect(peerInfo.multiaddrs.toArray()).to.eql([listeningAddr])
expect(peerInfo.protocols).to.eql(localProtocols) expect(peerInfo.protocols).to.eql(localProtocols)
@ -198,7 +198,7 @@ describe('Identify', () => {
}) })
sinon.spy(libp2p.dialer.identifyService, 'identify') sinon.spy(libp2p.dialer.identifyService, 'identify')
sinon.spy(libp2p.peerStore, 'update') sinon.spy(libp2p.peerStore, 'replace')
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
@ -207,7 +207,7 @@ describe('Identify', () => {
expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1) expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1)
await libp2p.dialer.identifyService.identify.firstCall.returnValue await libp2p.dialer.identifyService.identify.firstCall.returnValue
expect(libp2p.peerStore.update.callCount).to.equal(1) expect(libp2p.peerStore.replace.callCount).to.equal(1)
await connection.close() await connection.close()
}) })

View File

@ -48,6 +48,7 @@ describe('peer discovery scenarios', () => {
}, },
config: { config: {
peerDiscovery: { peerDiscovery: {
autoDial: false,
bootstrap: { bootstrap: {
enabled: true, enabled: true,
list: bootstrappers list: bootstrappers
@ -84,6 +85,7 @@ describe('peer discovery scenarios', () => {
}, },
config: { config: {
peerDiscovery: { peerDiscovery: {
autoDial: false,
mdns: { mdns: {
enabled: true, enabled: true,
interval: 200, // discover quickly interval: 200, // discover quickly
@ -111,9 +113,11 @@ describe('peer discovery scenarios', () => {
} }
}) })
await remoteLibp2p1.start() await Promise.all([
await remoteLibp2p2.start() remoteLibp2p1.start(),
await libp2p.start() remoteLibp2p2.start(),
libp2p.start()
])
await deferred.promise await deferred.promise
@ -130,11 +134,14 @@ describe('peer discovery scenarios', () => {
dht: KadDht dht: KadDht
}, },
config: { config: {
peerDiscovery: {
autoDial: false
},
dht: { dht: {
randomWalk: { randomWalk: {
enabled: true, enabled: true,
delay: 100, delay: 100, // start the first query quickly
interval: 200, // start the query sooner interval: 1000,
timeout: 3000 timeout: 3000
}, },
enabled: true enabled: true
@ -153,9 +160,10 @@ describe('peer discovery scenarios', () => {
} }
}) })
await remoteLibp2p1.start() await Promise.all([
await remoteLibp2p2.start() remoteLibp2p1.start(),
await libp2p.start() remoteLibp2p2.start()
])
// Topology: // Topology:
// A -> B // A -> B
@ -165,8 +173,12 @@ describe('peer discovery scenarios', () => {
remoteLibp2p2.dial(remotePeerInfo1) remoteLibp2p2.dial(remotePeerInfo1)
]) ])
libp2p.start()
await deferred.promise await deferred.promise
await remoteLibp2p1.stop() return Promise.all([
await remoteLibp2p2.stop() remoteLibp2p1.stop(),
remoteLibp2p2.stop()
])
}) })
}) })

View File

@ -56,7 +56,6 @@ describe('peer-store', () => {
// Put the peer in the store // Put the peer in the store
peerStore.put(peerInfo) peerStore.put(peerInfo)
sinon.spy(peerStore, 'put')
sinon.spy(peerStore, 'add') sinon.spy(peerStore, 'add')
sinon.spy(peerStore, 'update') sinon.spy(peerStore, 'update')
@ -75,7 +74,6 @@ describe('peer-store', () => {
peerStore.put(peerInfo) peerStore.put(peerInfo)
expect(peerStore.put.callCount).to.equal(1)
expect(peerStore.add.callCount).to.equal(0) expect(peerStore.add.callCount).to.equal(0)
expect(peerStore.update.callCount).to.equal(1) expect(peerStore.update.callCount).to.equal(1)
}) })

View File

@ -166,7 +166,7 @@ describe('registrar', () => {
// Remove protocol to peer and update it // Remove protocol to peer and update it
peerInfo.protocols.delete(multicodec) peerInfo.protocols.delete(multicodec)
peerStore.put(peerInfo) peerStore.replace(peerInfo)
await onDisconnectDefer.promise await onDisconnectDefer.promise
}) })