fix: clean up peer discovery flow (#494)

* fix: clean up peer discovery flow

* test(fix): let libp2p start after connecting

* test(fix): dont auto dial in disco tests
This commit is contained in:
Jacob Heun
2019-12-03 20:14:15 +01:00
parent 9a6e07d70b
commit 12fc069873
8 changed files with 71 additions and 66 deletions

View File

@ -148,10 +148,9 @@ class IdentifyService {
*
* @async
* @param {Connection} connection
* @param {PeerID} expectedPeer The PeerId the identify response should match
* @returns {Promise<void>}
*/
async identify (connection, expectedPeer) {
async identify (connection) {
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY)
const [data] = await pipe(
stream,
@ -181,7 +180,7 @@ class IdentifyService {
const id = await PeerId.createFromPubKey(publicKey)
const peerInfo = new PeerInfo(id)
if (expectedPeer && expectedPeer.toB58String() !== id.toB58String()) {
if (connection.remotePeer.toString() !== id.toString()) {
throw errCode(new Error('identified peer does not match the expected peer'), codes.ERR_INVALID_PEER)
}
@ -192,7 +191,7 @@ class IdentifyService {
IdentifyService.updatePeerAddresses(peerInfo, listenAddrs)
IdentifyService.updatePeerProtocols(peerInfo, protocols)
this.registrar.peerStore.update(peerInfo)
this.registrar.peerStore.replace(peerInfo)
// TODO: Track our observed address so that we can score it
log('received observed address of %s', observedAddr)
}
@ -283,7 +282,7 @@ class IdentifyService {
IdentifyService.updatePeerProtocols(peerInfo, message.protocols)
// Update the peer in the PeerStore
this.registrar.peerStore.update(peerInfo)
this.registrar.peerStore.replace(peerInfo)
}
}

View File

@ -54,9 +54,7 @@ class Libp2p extends EventEmitter {
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
onConnection: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
this.peerStore.put(peerInfo)
const peerInfo = this.peerStore.put(new PeerInfo(connection.remotePeer))
this.registrar.onConnect(peerInfo, connection)
this.emit('peer:connect', peerInfo)
},
@ -144,7 +142,7 @@ class Libp2p extends EventEmitter {
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
this._peerDiscovered = this._peerDiscovered.bind(this)
this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this)
}
/**
@ -340,7 +338,7 @@ class Libp2p extends EventEmitter {
// TODO: this should be modified once random-walk is used as
// the other discovery modules
this._dht.on('peer', this._peerDiscovered)
this._dht.on('peer', this._onDiscoveryPeer)
}
}
@ -351,6 +349,11 @@ class Libp2p extends EventEmitter {
_onDidStart () {
this._isStarted = true
this.peerStore.on('peer', peerInfo => {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
})
// Peer discovery
this._setupPeerDiscovery()
@ -362,24 +365,17 @@ class Libp2p extends EventEmitter {
}
/**
* Handles discovered peers. Each discovered peer will be emitted via
* the `peer:discovery` event. If auto dial is enabled for libp2p
* and the current connection count is under the low watermark, the
* peer will be dialed.
* Called whenever peer discovery services emit `peer` events.
* Known peers may be emitted.
* @private
* @param {PeerInfo} peerInfo
*/
_peerDiscovered (peerInfo) {
if (peerInfo.id.toB58String() === this.peerInfo.id.toB58String()) {
_onDiscoveryPeer (peerInfo) {
if (peerInfo.id.toString() === this.peerInfo.id.toString()) {
log.error(new Error(codes.ERR_DISCOVERED_SELF))
return
}
peerInfo = this.peerStore.put(peerInfo)
if (!this.isStarted()) return
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
this.peerStore.put(peerInfo)
}
/**
@ -432,7 +428,7 @@ class Libp2p extends EventEmitter {
discoveryService = DiscoveryService
}
discoveryService.on('peer', this._peerDiscovered)
discoveryService.on('peer', this._onDiscoveryPeer)
this._discovery.push(discoveryService)
}
}

View File

@ -45,7 +45,7 @@ class PeerStore extends EventEmitter {
let peer
// Already know the peer?
if (this.peers.has(peerInfo.id.toB58String())) {
if (this.has(peerInfo.id)) {
peer = this.update(peerInfo)
} else {
peer = this.add(peerInfo)
@ -118,15 +118,12 @@ class PeerStore extends EventEmitter {
if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size ||
multiaddrsIntersection.length !== recorded.multiaddrs.size) {
// recorded.multiaddrs = peerInfo.multiaddrs
recorded.multiaddrs.clear()
for (const ma of peerInfo.multiaddrs.toArray()) {
recorded.multiaddrs.add(ma)
}
this.emit('change:multiaddrs', {
peerInfo: peerInfo,
peerInfo: recorded,
multiaddrs: recorded.multiaddrs.toArray()
})
}
@ -139,14 +136,12 @@ class PeerStore extends EventEmitter {
if (protocolsIntersection.size !== peerInfo.protocols.size ||
protocolsIntersection.size !== recorded.protocols.size) {
recorded.protocols.clear()
for (const protocol of peerInfo.protocols) {
recorded.protocols.add(protocol)
}
this.emit('change:protocols', {
peerInfo: peerInfo,
peerInfo: recorded,
protocols: Array.from(recorded.protocols)
})
}
@ -170,13 +165,7 @@ class PeerStore extends EventEmitter {
peerId = peerId.toB58String()
}
const peerInfo = this.peers.get(peerId)
if (peerInfo) {
return peerInfo
}
return undefined
return this.peers.get(peerId)
}
/**
@ -217,6 +206,16 @@ class PeerStore extends EventEmitter {
this.remove(peerInfo.id.toB58String())
this.add(peerInfo)
// This should be cleaned up in PeerStore v2
this.emit('change:multiaddrs', {
peerInfo,
multiaddrs: peerInfo.multiaddrs.toArray()
})
this.emit('change:protocols', {
peerInfo,
protocols: Array.from(peerInfo.protocols)
})
}
}