diff --git a/src/index.js b/src/index.js index 1f1b745f..f5e0f6c0 100644 --- a/src/index.js +++ b/src/index.js @@ -29,9 +29,9 @@ const Upgrader = require('./upgrader') const PeerStore = require('./peer-store') const PubsubAdapter = require('./pubsub-adapter') const Registrar = require('./registrar') -const ping = require('./ping') const IdentifyService = require('./identify') const FetchService = require('./fetch') +const PingService = require('./ping') const NatManager = require('./nat-manager') const { updateSelfPeerRecord } = require('./record/utils') @@ -339,12 +339,10 @@ class Libp2p extends EventEmitter { this.peerRouting = new PeerRouting(this) this.contentRouting = new ContentRouting(this) - // Mount default protocols - ping.mount(this) - this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this) this.fetchService = new FetchService(this) + this.pingService = new PingService(this) } /** @@ -382,6 +380,10 @@ class Libp2p extends EventEmitter { await this.handle(FetchService.PROTOCOL, this.fetchService.handleMessage) } + if (this.pingService) { + await this.handle(PingService.getProtocolStr(this), this.pingService.handleMessage) + } + try { await this._onStarting() await this._onDidStart() @@ -433,9 +435,9 @@ class Libp2p extends EventEmitter { await this.natManager.stop() await this.transportManager.close() - this.unhandle(FetchService.PROTOCOL) + await this.unhandle(FetchService.PROTOCOL) + await this.unhandle(PingService.getProtocolStr(this)) - ping.unmount(this) this.dialer.destroy() } catch (/** @type {any} */ err) { if (err) { @@ -609,10 +611,10 @@ class Libp2p extends EventEmitter { // If received multiaddr, ping it if (multiaddrs) { - return ping(this, multiaddrs[0]) + return this.pingService.ping(multiaddrs[0]) } - return ping(this, id) + return this.pingService.ping(id) } /** diff --git a/src/ping/index.js b/src/ping/index.js index 75e13b9a..46f87e3a 100644 --- a/src/ping/index.js +++ b/src/ping/index.js @@ -22,64 +22,63 @@ const { PROTOCOL_NAME, PING_LENGTH, PROTOCOL_VERSION } = require('./constants') * @typedef {import('libp2p-interfaces/src/stream-muxer/types').MuxedStream} MuxedStream */ -/** - * Ping a given peer and wait for its response, getting the operation latency. - * - * @param {Libp2p} node - * @param {PeerId|Multiaddr} peer - * @returns {Promise} - */ -async function ping (node, peer) { - const protocol = `/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` - // @ts-ignore multiaddr might not have toB58String - log('dialing %s to %s', protocol, peer.toB58String ? peer.toB58String() : peer) - - const connection = await node.dial(peer) - const { stream } = await connection.newStream(protocol) - - const start = Date.now() - const data = crypto.randomBytes(PING_LENGTH) - - const [result] = await pipe( - [data], - stream, - (/** @type {MuxedStream} */ stream) => take(1, stream), - toBuffer, - collect - ) - const end = Date.now() - - if (!equals(data, result)) { - throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK) +class PingService { + /** + * @param {import('../')} libp2p + */ + static getProtocolStr (libp2p) { + return `/${libp2p._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` } - return end - start + /** + * @param {Libp2p} libp2p + */ + constructor (libp2p) { + this._libp2p = libp2p + } + + /** + * A handler to register with Libp2p to process ping messages + * + * @param {Object} options + * @param {MuxedStream} options.stream + */ + handleMessage ({ stream }) { + return pipe(stream, stream) + } + + /** + * Ping a given peer and wait for its response, getting the operation latency. + * + * @param {PeerId|Multiaddr} peer + * @returns {Promise} + */ + async ping (peer) { + const protocol = `/${this._libp2p._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}` + // @ts-ignore multiaddr might not have toB58String + log('dialing %s to %s', protocol, peer.toB58String ? peer.toB58String() : peer) + + const connection = await this._libp2p.dial(peer) + const { stream } = await connection.newStream(protocol) + + const start = Date.now() + const data = crypto.randomBytes(PING_LENGTH) + + const [result] = await pipe( + [data], + stream, + (/** @type {MuxedStream} */ stream) => take(1, stream), + toBuffer, + collect + ) + const end = Date.now() + + if (!equals(data, result)) { + throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK) + } + + return end - start + } } -/** - * Subscribe ping protocol handler. - * - * @param {Libp2p} node - */ -function mount (node) { - node.handle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`, ({ stream }) => pipe(stream, stream)) - .catch(err => { - log.error(err) - }) -} - -/** - * Unsubscribe ping protocol handler. - * - * @param {Libp2p} node - */ -function unmount (node) { - node.unhandle(`/${node._config.protocolPrefix}/${PROTOCOL_NAME}/${PROTOCOL_VERSION}`) - .catch(err => { - log.error(err) - }) -} - -exports = module.exports = ping -exports.mount = mount -exports.unmount = unmount +module.exports = PingService diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index 176c8c0a..6726b747 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -371,7 +371,7 @@ describe('libp2p.upgrader', () => { expect(libp2p.upgrader).to.equal(libp2p.transportManager.upgrader) }) - it('should be able to register and unregister a handler', () => { + it('should be able to register and unregister a handler', async () => { libp2p = new Libp2p({ peerId: peers[0], modules: { @@ -384,11 +384,11 @@ describe('libp2p.upgrader', () => { expect(libp2p.upgrader.protocols).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1']) const echoHandler = () => {} - libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler) + await libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler) expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(echoHandler) expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler) - libp2p.unhandle(['/echo/1.0.0']) + await libp2p.unhandle(['/echo/1.0.0']) expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(undefined) expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler) })