refactor: ping (#505)

* refactor: ping

* chore: ping is now a function

* chore: address review
This commit is contained in:
Vasco Santos 2019-12-10 19:26:54 +01:00 committed by Jacob Heun
parent 7fc1900343
commit 64cbf90e02
No known key found for this signature in database
GPG Key ID: CA5A94C15809879F
10 changed files with 142 additions and 174 deletions

View File

@ -10,6 +10,7 @@
* [`hangUp`](#hangUp)
* [`handle`](#handle)
* [`unhandle`](#unhandle)
* [`ping`](#ping)
* [`peerRouting.findPeer`](#peerRouting.findPeer)
* [`contentRouting.findProviders`](#contentRouting.findProviders)
* [`contentRouting.provide`](#contentRouting.provide)
@ -307,6 +308,31 @@ Unregisters all handlers with the given protocols
libp2p.unhandle(['/echo/1.0.0'])
```
### ping
Pings a given peer and get the operation's latency.
`libp2p.ping(peer)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| peer | `PeerInfo|PeerId|Multiaddr|string` | peer to ping |
#### Returns
| Type | Description |
|------|-------------|
| `Promise<number>` | Latency of the operation in ms |
#### Example
```js
// ...
const latency = await libp2p.ping(otherPeerId)
```
### peerRouting.findPeer
Iterates over all peer routers in series to find the given peer. If the DHT is enabled, it will be tried first.

View File

@ -50,6 +50,7 @@
"err-code": "^1.1.2",
"hashlru": "^2.3.0",
"it-all": "^1.0.1",
"it-buffer": "^0.1.1",
"it-handshake": "^1.0.1",
"it-length-prefixed": "^3.0.0",
"it-pipe": "^1.1.0",

View File

@ -9,7 +9,7 @@ const { collect, take } = require('streaming-iterables')
const PeerInfo = require('peer-info')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr')
const { toBuffer } = require('../util')
const { toBuffer } = require('it-buffer')
const Message = require('./message')

View File

@ -21,6 +21,7 @@ const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store')
const Registrar = require('./registrar')
const ping = require('./ping')
const {
IdentifyService,
multicodecs: IDENTIFY_PROTOCOLS
@ -148,6 +149,9 @@ class Libp2p extends EventEmitter {
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
// Mount default protocols
ping.mount(this)
this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this)
}
@ -203,6 +207,8 @@ class Libp2p extends EventEmitter {
await this.transportManager.close()
await this.registrar.close()
ping.unmount(this)
} catch (err) {
if (err) {
log.error(err)
@ -280,17 +286,16 @@ class Libp2p extends EventEmitter {
)
}
// TODO: Update ping
// /**
// * Pings the provided peer
// *
// * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
// * @returns {Promise<Ping>}
// */
// ping (peer) {
// const peerInfo = await getPeerInfoRemote(peer, this)
// return new Ping(this._switch, peerInfo)
// }
/**
* Pings the given peer
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
* @returns {Promise<number>}
*/
async ping (peer) {
const peerInfo = await getPeerInfo(peer, this.peerStore)
return ping(this, peerInfo)
}
/**
* Registers the `handler` for each protocol

View File

@ -8,16 +8,11 @@ libp2p-ping JavaScript Implementation
## Usage
```javascript
var Ping = require('libp2p-ping')
var Ping = require('libp2p/src/ping')
Ping.mount(swarm) // Enable this peer to echo Ping requests
Ping.mount(libp2p) // Enable this peer to echo Ping requests
var p = new Ping(swarm, peerDst) // Ping peerDst, peerDst must be a peer-info object
const latency = await Ping(libp2p, peerDst)
p.on('ping', function (time) {
console.log(time + 'ms')
p.stop() // stop sending pings
})
p.start()
Ping.unmount(libp2p)
```

View File

@ -1,50 +0,0 @@
'use strict'
const pull = require('pull-stream/pull')
const handshake = require('pull-handshake')
const constants = require('./constants')
const PROTOCOL = constants.PROTOCOL
const PING_LENGTH = constants.PING_LENGTH
const debug = require('debug')
const log = debug('libp2p-ping')
log.error = debug('libp2p-ping:error')
function mount (swarm) {
swarm.handle(PROTOCOL, (protocol, conn) => {
const stream = handshake({ timeout: 0 })
const shake = stream.handshake
// receive and echo back
function next () {
shake.read(PING_LENGTH, (err, buf) => {
if (err === true) {
// stream closed
return
}
if (err) {
return log.error(err)
}
shake.write(buf)
return next()
})
}
pull(
conn,
stream,
conn
)
next()
})
}
function unmount (swarm) {
swarm.unhandle(PROTOCOL)
}
exports = module.exports
exports.mount = mount
exports.unmount = unmount

View File

@ -1,7 +1,62 @@
'use strict'
const handler = require('./handler')
const debug = require('debug')
const log = debug('libp2p-ping')
log.error = debug('libp2p-ping:error')
const errCode = require('err-code')
exports = module.exports = require('./ping')
exports.mount = handler.mount
exports.unmount = handler.unmount
const crypto = require('libp2p-crypto')
const pipe = require('it-pipe')
const { toBuffer } = require('it-buffer')
const { collect } = require('streaming-iterables')
const { PROTOCOL, PING_LENGTH } = require('./constants')
/**
* Ping a given peer and wait for its response, getting the operation latency.
* @param {Libp2p} node
* @param {PeerInfo} peer
* @returns {Promise<Number>}
*/
async function ping (node, peer) {
log('dialing %s to %s', PROTOCOL, peer.id.toB58String())
const { stream } = await node.dialProtocol(peer, PROTOCOL)
const start = new Date()
const data = crypto.randomBytes(PING_LENGTH)
const [result] = await pipe(
[data],
stream,
toBuffer,
collect
)
const end = Date.now()
if (!data.equals(result)) {
throw errCode(new Error('Received wrong ping ack'), 'ERR_WRONG_PING_ACK')
}
return end - start
}
/**
* Subscribe ping protocol handler.
* @param {Libp2p} node
*/
function mount (node) {
node.handle(PROTOCOL, ({ stream }) => pipe(stream, stream))
}
/**
* Unsubscribe ping protocol handler.
* @param {Libp2p} node
*/
function unmount (node) {
node.unhandle(PROTOCOL)
}
exports = module.exports = ping
exports.mount = mount
exports.unmount = unmount

View File

@ -1,83 +0,0 @@
'use strict'
const EventEmitter = require('events').EventEmitter
const pull = require('pull-stream/pull')
const empty = require('pull-stream/sources/empty')
const handshake = require('pull-handshake')
const constants = require('./constants')
const util = require('./util')
const rnd = util.rnd
const debug = require('debug')
const log = debug('libp2p-ping')
log.error = debug('libp2p-ping:error')
const PROTOCOL = constants.PROTOCOL
const PING_LENGTH = constants.PING_LENGTH
class Ping extends EventEmitter {
constructor (swarm, peer) {
super()
this._stopped = false
this.peer = peer
this.swarm = swarm
}
start () {
log('dialing %s to %s', PROTOCOL, this.peer.id.toB58String())
this.swarm.dial(this.peer, PROTOCOL, (err, conn) => {
if (err) {
return this.emit('error', err)
}
const stream = handshake({ timeout: 0 })
this.shake = stream.handshake
pull(
stream,
conn,
stream
)
// write and wait to see ping back
const self = this
function next () {
const start = new Date()
const buf = rnd(PING_LENGTH)
self.shake.write(buf)
self.shake.read(PING_LENGTH, (err, bufBack) => {
const end = new Date()
if (err || !buf.equals(bufBack)) {
const err = new Error('Received wrong ping ack')
return self.emit('error', err)
}
self.emit('ping', end - start)
if (self._stopped) {
return
}
next()
})
}
next()
})
}
stop () {
if (this._stopped || !this.shake) {
return
}
this._stopped = true
pull(
empty(),
this.shake.rest()
)
}
}
module.exports = Ping

View File

@ -1,16 +0,0 @@
'use strict'
/**
* Converts BufferList messages to Buffers
* @param {*} source
* @returns {AsyncGenerator}
*/
function toBuffer (source) {
return (async function * () {
for await (const chunk of source) {
yield Buffer.isBuffer(chunk) ? chunk : chunk.slice()
}
})()
}
module.exports.toBuffer = toBuffer

35
test/core/ping.node.js Normal file
View File

@ -0,0 +1,35 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const pTimes = require('p-times')
const peerUtils = require('../utils/creators/peer')
const baseOptions = require('../utils/base-options')
describe('ping', () => {
let nodes
beforeEach(async () => {
nodes = await peerUtils.createPeer({
number: 2,
config: baseOptions
})
})
it('ping once from peer0 to peer1', async () => {
const latency = await nodes[0].ping(nodes[1].peerInfo)
expect(latency).to.be.a('Number')
})
it('ping several times for getting an average', async () => {
const latencies = await pTimes(5, () => nodes[1].ping(nodes[0].peerInfo))
const averageLatency = latencies.reduce((p, c) => p + c, 0) / latencies.length
expect(averageLatency).to.be.a('Number')
})
})