fix: await unhandle of protocols (#1144)

To allow us to shut down cleanly, we must wait the unhandling of protocols - this is because they write the new list of protocols into the datastore which might also be in the process of shutting down.
This commit is contained in:
Alex Potsides 2022-01-25 19:56:56 +00:00 committed by GitHub
parent 831ed39701
commit d44bd9094f
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
3 changed files with 69 additions and 68 deletions

View File

@ -29,9 +29,9 @@ const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store') const PeerStore = require('./peer-store')
const PubsubAdapter = require('./pubsub-adapter') const PubsubAdapter = require('./pubsub-adapter')
const Registrar = require('./registrar') const Registrar = require('./registrar')
const ping = require('./ping')
const IdentifyService = require('./identify') const IdentifyService = require('./identify')
const FetchService = require('./fetch') const FetchService = require('./fetch')
const PingService = require('./ping')
const NatManager = require('./nat-manager') const NatManager = require('./nat-manager')
const { updateSelfPeerRecord } = require('./record/utils') const { updateSelfPeerRecord } = require('./record/utils')
@ -339,12 +339,10 @@ class Libp2p extends EventEmitter {
this.peerRouting = new PeerRouting(this) this.peerRouting = new PeerRouting(this)
this.contentRouting = new ContentRouting(this) this.contentRouting = new ContentRouting(this)
// Mount default protocols
ping.mount(this)
this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this) this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this)
this.fetchService = new FetchService(this) this.fetchService = new FetchService(this)
this.pingService = new PingService(this)
} }
/** /**
@ -382,6 +380,10 @@ class Libp2p extends EventEmitter {
await this.handle(FetchService.PROTOCOL, this.fetchService.handleMessage) await this.handle(FetchService.PROTOCOL, this.fetchService.handleMessage)
} }
if (this.pingService) {
await this.handle(PingService.getProtocolStr(this), this.pingService.handleMessage)
}
try { try {
await this._onStarting() await this._onStarting()
await this._onDidStart() await this._onDidStart()
@ -433,9 +435,9 @@ class Libp2p extends EventEmitter {
await this.natManager.stop() await this.natManager.stop()
await this.transportManager.close() await this.transportManager.close()
this.unhandle(FetchService.PROTOCOL) await this.unhandle(FetchService.PROTOCOL)
await this.unhandle(PingService.getProtocolStr(this))
ping.unmount(this)
this.dialer.destroy() this.dialer.destroy()
} catch (/** @type {any} */ err) { } catch (/** @type {any} */ err) {
if (err) { if (err) {
@ -609,10 +611,10 @@ class Libp2p extends EventEmitter {
// If received multiaddr, ping it // If received multiaddr, ping it
if (multiaddrs) { if (multiaddrs) {
return ping(this, multiaddrs[0]) return this.pingService.ping(multiaddrs[0])
} }
return ping(this, id) return this.pingService.ping(id)
} }
/** /**

View File

@ -22,64 +22,63 @@ const { PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION } = require('./constants')
* @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream
*/ */
/** class PingService {
* Ping a given peer and wait for its response, getting the operation latency. /**
* * @param {import('../')} libp2p
* @param {Libp2p} node */
* @param {PeerId|Multiaddr} peer static getProtocolStr (libp2p) {
* @returns {Promise<number>} return `/${libp2p._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`
*/
async function ping (node, peer) {
const protocol = `/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`
// @ts-ignore multiaddr might not have toB58String
log('dialing %s to %s', protocol, peer.toB58String ? peer.toB58String() : peer)
const connection = await node.dial(peer)
const { stream } = await connection.newStream(protocol)
const start = Date.now()
const data = crypto.randomBytes(PING_LENGTH)
const [result] = await pipe(
[data],
stream,
(/** @type {MuxedStream} */ stream) => take(1, stream),
toBuffer,
collect
)
const end = Date.now()
if (!equals(data, result)) {
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
} }
return end - start /**
* @param {Libp2p} libp2p
*/
constructor (libp2p) {
this._libp2p = libp2p
}
/**
* A handler to register with Libp2p to process ping messages
*
* @param {Object} options
* @param {MuxedStream} options.stream
*/
handleMessage ({ stream }) {
return pipe(stream, stream)
}
/**
* Ping a given peer and wait for its response, getting the operation latency.
*
* @param {PeerId|Multiaddr} peer
* @returns {Promise<number>}
*/
async ping (peer) {
const protocol = `/${this._libp2p._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`
// @ts-ignore multiaddr might not have toB58String
log('dialing %s to %s', protocol, peer.toB58String ? peer.toB58String() : peer)
const connection = await this._libp2p.dial(peer)
const { stream } = await connection.newStream(protocol)
const start = Date.now()
const data = crypto.randomBytes(PING_LENGTH)
const [result] = await pipe(
[data],
stream,
(/** @type {MuxedStream} */ stream) => take(1, stream),
toBuffer,
collect
)
const end = Date.now()
if (!equals(data, result)) {
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
}
return end - start
}
} }
/** module.exports = PingService
* Subscribe ping protocol handler.
*
* @param {Libp2p} node
*/
function mount (node) {
node.handle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`, ({ stream }) => pipe(stream, stream))
.catch(err => {
log.error(err)
})
}
/**
* Unsubscribe ping protocol handler.
*
* @param {Libp2p} node
*/
function unmount (node) {
node.unhandle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`)
.catch(err => {
log.error(err)
})
}
exports = module.exports = ping
exports.mount = mount
exports.unmount = unmount

View File

@ -371,7 +371,7 @@ describe('libp2p.upgrader', () => {
expect(libp2p.upgrader).to.equal(libp2p.transportManager.upgrader) expect(libp2p.upgrader).to.equal(libp2p.transportManager.upgrader)
}) })
it('should be able to register and unregister a handler', () => { it('should be able to register and unregister a handler', async () => {
libp2p = new Libp2p({ libp2p = new Libp2p({
peerId: peers[0], peerId: peers[0],
modules: { modules: {
@ -384,11 +384,11 @@ describe('libp2p.upgrader', () => {
expect(libp2p.upgrader.protocols).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1']) expect(libp2p.upgrader.protocols).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1'])
const echoHandler = () => {} const echoHandler = () => {}
libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler) await libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler)
expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(echoHandler) expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(echoHandler)
expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler) expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler)
libp2p.unhandle(['/echo/1.0.0']) await libp2p.unhandle(['/echo/1.0.0'])
expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(undefined) expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(undefined)
expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler) expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler)
}) })