diff --git a/doc/API.md b/doc/API.md index 38886c69..b3199614 100644 --- a/doc/API.md +++ b/doc/API.md @@ -10,6 +10,7 @@ * [`hangUp`](#hangUp) * [`handle`](#handle) * [`unhandle`](#unhandle) + * [`ping`](#ping) * [`peerRouting.findPeer`](#peerRouting.findPeer) * [`contentRouting.findProviders`](#contentRouting.findProviders) * [`contentRouting.provide`](#contentRouting.provide) @@ -307,6 +308,31 @@ Unregisters all handlers with the given protocols libp2p.unhandle(['/echo/1.0.0']) ``` +### ping + +Pings a given peer and get the operation's latency. + +`libp2p.ping(peer)` + +#### Parameters + +| Name | Type | Description | +|------|------|-------------| +| peer | `PeerInfo|PeerId|Multiaddr|string` | peer to ping | + +#### Returns + +| Type | Description | +|------|-------------| +| `Promise` | Latency of the operation in ms | + +#### Example + +```js +// ... +const latency = await libp2p.ping(otherPeerId) +``` + ### peerRouting.findPeer Iterates over all peer routers in series to find the given peer. If the DHT is enabled, it will be tried first. diff --git a/package.json b/package.json index eaddaabb..11e2be02 100644 --- a/package.json +++ b/package.json @@ -50,6 +50,7 @@ "err-code": "^1.1.2", "hashlru": "^2.3.0", "it-all": "^1.0.1", + "it-buffer": "^0.1.1", "it-handshake": "^1.0.1", "it-length-prefixed": "^3.0.0", "it-pipe": "^1.1.0", diff --git a/src/identify/index.js b/src/identify/index.js index 6f349037..df517295 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -9,7 +9,7 @@ const { collect, take } = require('streaming-iterables') const PeerInfo = require('peer-info') const PeerId = require('peer-id') const multiaddr = require('multiaddr') -const { toBuffer } = require('../util') +const { toBuffer } = require('it-buffer') const Message = require('./message') diff --git a/src/index.js b/src/index.js index c54e3fb8..4ba23ce5 100644 --- a/src/index.js +++ b/src/index.js @@ -21,6 +21,7 @@ const TransportManager = require('./transport-manager') const Upgrader = require('./upgrader') const PeerStore = require('./peer-store') const Registrar = require('./registrar') +const ping = require('./ping') const { IdentifyService, multicodecs: IDENTIFY_PROTOCOLS @@ -148,6 +149,9 @@ class Libp2p extends EventEmitter { this.peerRouting = peerRouting(this) this.contentRouting = contentRouting(this) + // Mount default protocols + ping.mount(this) + this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this) } @@ -203,6 +207,8 @@ class Libp2p extends EventEmitter { await this.transportManager.close() await this.registrar.close() + + ping.unmount(this) } catch (err) { if (err) { log.error(err) @@ -280,17 +286,16 @@ class Libp2p extends EventEmitter { ) } - // TODO: Update ping - // /** - // * Pings the provided peer - // * - // * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping - // * @returns {Promise} - // */ - // ping (peer) { - // const peerInfo = await getPeerInfoRemote(peer, this) - // return new Ping(this._switch, peerInfo) - // } + /** + * Pings the given peer + * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping + * @returns {Promise} + */ + async ping (peer) { + const peerInfo = await getPeerInfo(peer, this.peerStore) + + return ping(this, peerInfo) + } /** * Registers the `handler` for each protocol diff --git a/src/ping/README.md b/src/ping/README.md index b0ace043..079e9562 100644 --- a/src/ping/README.md +++ b/src/ping/README.md @@ -8,16 +8,11 @@ libp2p-ping JavaScript Implementation ## Usage ```javascript -var Ping = require('libp2p-ping') +var Ping = require('libp2p/src/ping') -Ping.mount(swarm) // Enable this peer to echo Ping requests +Ping.mount(libp2p) // Enable this peer to echo Ping requests -var p = new Ping(swarm, peerDst) // Ping peerDst, peerDst must be a peer-info object +const latency = await Ping(libp2p, peerDst) -p.on('ping', function (time) { - console.log(time + 'ms') - p.stop() // stop sending pings -}) - -p.start() +Ping.unmount(libp2p) ``` diff --git a/src/ping/handler.js b/src/ping/handler.js deleted file mode 100644 index 12f1793a..00000000 --- a/src/ping/handler.js +++ /dev/null @@ -1,50 +0,0 @@ -'use strict' - -const pull = require('pull-stream/pull') -const handshake = require('pull-handshake') -const constants = require('./constants') -const PROTOCOL = constants.PROTOCOL -const PING_LENGTH = constants.PING_LENGTH - -const debug = require('debug') -const log = debug('libp2p-ping') -log.error = debug('libp2p-ping:error') - -function mount (swarm) { - swarm.handle(PROTOCOL, (protocol, conn) => { - const stream = handshake({ timeout: 0 }) - const shake = stream.handshake - - // receive and echo back - function next () { - shake.read(PING_LENGTH, (err, buf) => { - if (err === true) { - // stream closed - return - } - if (err) { - return log.error(err) - } - - shake.write(buf) - return next() - }) - } - - pull( - conn, - stream, - conn - ) - - next() - }) -} - -function unmount (swarm) { - swarm.unhandle(PROTOCOL) -} - -exports = module.exports -exports.mount = mount -exports.unmount = unmount diff --git a/src/ping/index.js b/src/ping/index.js index f23b21ce..9a13bb35 100644 --- a/src/ping/index.js +++ b/src/ping/index.js @@ -1,7 +1,62 @@ 'use strict' -const handler = require('./handler') +const debug = require('debug') +const log = debug('libp2p-ping') +log.error = debug('libp2p-ping:error') +const errCode = require('err-code') -exports = module.exports = require('./ping') -exports.mount = handler.mount -exports.unmount = handler.unmount +const crypto = require('libp2p-crypto') +const pipe = require('it-pipe') +const { toBuffer } = require('it-buffer') +const { collect } = require('streaming-iterables') + +const { PROTOCOL, PING_LENGTH } = require('./constants') + +/** + * Ping a given peer and wait for its response, getting the operation latency. + * @param {Libp2p} node + * @param {PeerInfo} peer + * @returns {Promise} + */ +async function ping (node, peer) { + log('dialing %s to %s', PROTOCOL, peer.id.toB58String()) + + const { stream } = await node.dialProtocol(peer, PROTOCOL) + + const start = new Date() + const data = crypto.randomBytes(PING_LENGTH) + + const [result] = await pipe( + [data], + stream, + toBuffer, + collect + ) + const end = Date.now() + + if (!data.equals(result)) { + throw errCode(new Error('Received wrong ping ack'), 'ERR_WRONG_PING_ACK') + } + + return end - start +} + +/** + * Subscribe ping protocol handler. + * @param {Libp2p} node + */ +function mount (node) { + node.handle(PROTOCOL, ({ stream }) => pipe(stream, stream)) +} + +/** + * Unsubscribe ping protocol handler. + * @param {Libp2p} node + */ +function unmount (node) { + node.unhandle(PROTOCOL) +} + +exports = module.exports = ping +exports.mount = mount +exports.unmount = unmount diff --git a/src/ping/ping.js b/src/ping/ping.js deleted file mode 100644 index 5c9f7f03..00000000 --- a/src/ping/ping.js +++ /dev/null @@ -1,83 +0,0 @@ -'use strict' - -const EventEmitter = require('events').EventEmitter -const pull = require('pull-stream/pull') -const empty = require('pull-stream/sources/empty') -const handshake = require('pull-handshake') -const constants = require('./constants') -const util = require('./util') -const rnd = util.rnd -const debug = require('debug') -const log = debug('libp2p-ping') -log.error = debug('libp2p-ping:error') - -const PROTOCOL = constants.PROTOCOL -const PING_LENGTH = constants.PING_LENGTH - -class Ping extends EventEmitter { - constructor (swarm, peer) { - super() - - this._stopped = false - this.peer = peer - this.swarm = swarm - } - - start () { - log('dialing %s to %s', PROTOCOL, this.peer.id.toB58String()) - - this.swarm.dial(this.peer, PROTOCOL, (err, conn) => { - if (err) { - return this.emit('error', err) - } - - const stream = handshake({ timeout: 0 }) - this.shake = stream.handshake - - pull( - stream, - conn, - stream - ) - - // write and wait to see ping back - const self = this - function next () { - const start = new Date() - const buf = rnd(PING_LENGTH) - self.shake.write(buf) - self.shake.read(PING_LENGTH, (err, bufBack) => { - const end = new Date() - if (err || !buf.equals(bufBack)) { - const err = new Error('Received wrong ping ack') - return self.emit('error', err) - } - - self.emit('ping', end - start) - - if (self._stopped) { - return - } - next() - }) - } - - next() - }) - } - - stop () { - if (this._stopped || !this.shake) { - return - } - - this._stopped = true - - pull( - empty(), - this.shake.rest() - ) - } -} - -module.exports = Ping diff --git a/src/util/index.js b/src/util/index.js deleted file mode 100644 index bca13a45..00000000 --- a/src/util/index.js +++ /dev/null @@ -1,16 +0,0 @@ -'use strict' - -/** - * Converts BufferList messages to Buffers - * @param {*} source - * @returns {AsyncGenerator} - */ -function toBuffer (source) { - return (async function * () { - for await (const chunk of source) { - yield Buffer.isBuffer(chunk) ? chunk : chunk.slice() - } - })() -} - -module.exports.toBuffer = toBuffer diff --git a/test/core/ping.node.js b/test/core/ping.node.js new file mode 100644 index 00000000..97c78603 --- /dev/null +++ b/test/core/ping.node.js @@ -0,0 +1,35 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai + +const pTimes = require('p-times') + +const peerUtils = require('../utils/creators/peer') +const baseOptions = require('../utils/base-options') + +describe('ping', () => { + let nodes + + beforeEach(async () => { + nodes = await peerUtils.createPeer({ + number: 2, + config: baseOptions + }) + }) + + it('ping once from peer0 to peer1', async () => { + const latency = await nodes[0].ping(nodes[1].peerInfo) + + expect(latency).to.be.a('Number') + }) + + it('ping several times for getting an average', async () => { + const latencies = await pTimes(5, () => nodes[1].ping(nodes[0].peerInfo)) + + const averageLatency = latencies.reduce((p, c) => p + c, 0) / latencies.length + expect(averageLatency).to.be.a('Number') + }) +})