mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 02:22:14 +00:00
refactor: ping (#505)
* refactor: ping * chore: ping is now a function * chore: address review
This commit is contained in:
parent
71f46bf4a6
commit
4f7586886c
26
doc/API.md
26
doc/API.md
@ -10,6 +10,7 @@
|
||||
* [`hangUp`](#hangUp)
|
||||
* [`handle`](#handle)
|
||||
* [`unhandle`](#unhandle)
|
||||
* [`ping`](#ping)
|
||||
* [`peerRouting.findPeer`](#peerRouting.findPeer)
|
||||
* [`contentRouting.findProviders`](#contentRouting.findProviders)
|
||||
* [`contentRouting.provide`](#contentRouting.provide)
|
||||
@ -307,6 +308,31 @@ Unregisters all handlers with the given protocols
|
||||
libp2p.unhandle(['/echo/1.0.0'])
|
||||
```
|
||||
|
||||
### ping
|
||||
|
||||
Pings a given peer and get the operation's latency.
|
||||
|
||||
`libp2p.ping(peer)`
|
||||
|
||||
#### Parameters
|
||||
|
||||
| Name | Type | Description |
|
||||
|------|------|-------------|
|
||||
| peer | `PeerInfo|PeerId|Multiaddr|string` | peer to ping |
|
||||
|
||||
#### Returns
|
||||
|
||||
| Type | Description |
|
||||
|------|-------------|
|
||||
| `Promise<number>` | Latency of the operation in ms |
|
||||
|
||||
#### Example
|
||||
|
||||
```js
|
||||
// ...
|
||||
const latency = await libp2p.ping(otherPeerId)
|
||||
```
|
||||
|
||||
### peerRouting.findPeer
|
||||
|
||||
Iterates over all peer routers in series to find the given peer. If the DHT is enabled, it will be tried first.
|
||||
|
@ -50,6 +50,7 @@
|
||||
"err-code": "^1.1.2",
|
||||
"hashlru": "^2.3.0",
|
||||
"it-all": "^1.0.1",
|
||||
"it-buffer": "^0.1.1",
|
||||
"it-handshake": "^1.0.1",
|
||||
"it-length-prefixed": "^3.0.0",
|
||||
"it-pipe": "^1.1.0",
|
||||
|
@ -9,7 +9,7 @@ const { collect, take } = require('streaming-iterables')
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const multiaddr = require('multiaddr')
|
||||
const { toBuffer } = require('../util')
|
||||
const { toBuffer } = require('it-buffer')
|
||||
|
||||
const Message = require('./message')
|
||||
|
||||
|
27
src/index.js
27
src/index.js
@ -21,6 +21,7 @@ const TransportManager = require('./transport-manager')
|
||||
const Upgrader = require('./upgrader')
|
||||
const PeerStore = require('./peer-store')
|
||||
const Registrar = require('./registrar')
|
||||
const ping = require('./ping')
|
||||
const {
|
||||
IdentifyService,
|
||||
multicodecs: IDENTIFY_PROTOCOLS
|
||||
@ -148,6 +149,9 @@ class Libp2p extends EventEmitter {
|
||||
this.peerRouting = peerRouting(this)
|
||||
this.contentRouting = contentRouting(this)
|
||||
|
||||
// Mount default protocols
|
||||
ping.mount(this)
|
||||
|
||||
this._onDiscoveryPeer = this._onDiscoveryPeer.bind(this)
|
||||
}
|
||||
|
||||
@ -203,6 +207,8 @@ class Libp2p extends EventEmitter {
|
||||
|
||||
await this.transportManager.close()
|
||||
await this.registrar.close()
|
||||
|
||||
ping.unmount(this)
|
||||
} catch (err) {
|
||||
if (err) {
|
||||
log.error(err)
|
||||
@ -280,17 +286,16 @@ class Libp2p extends EventEmitter {
|
||||
)
|
||||
}
|
||||
|
||||
// TODO: Update ping
|
||||
// /**
|
||||
// * Pings the provided peer
|
||||
// *
|
||||
// * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
|
||||
// * @returns {Promise<Ping>}
|
||||
// */
|
||||
// ping (peer) {
|
||||
// const peerInfo = await getPeerInfoRemote(peer, this)
|
||||
// return new Ping(this._switch, peerInfo)
|
||||
// }
|
||||
/**
|
||||
* Pings the given peer
|
||||
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
|
||||
* @returns {Promise<number>}
|
||||
*/
|
||||
async ping (peer) {
|
||||
const peerInfo = await getPeerInfo(peer, this.peerStore)
|
||||
|
||||
return ping(this, peerInfo)
|
||||
}
|
||||
|
||||
/**
|
||||
* Registers the `handler` for each protocol
|
||||
|
@ -8,16 +8,11 @@ libp2p-ping JavaScript Implementation
|
||||
## Usage
|
||||
|
||||
```javascript
|
||||
var Ping = require('libp2p-ping')
|
||||
var Ping = require('libp2p/src/ping')
|
||||
|
||||
Ping.mount(swarm) // Enable this peer to echo Ping requests
|
||||
Ping.mount(libp2p) // Enable this peer to echo Ping requests
|
||||
|
||||
var p = new Ping(swarm, peerDst) // Ping peerDst, peerDst must be a peer-info object
|
||||
const latency = await Ping(libp2p, peerDst)
|
||||
|
||||
p.on('ping', function (time) {
|
||||
console.log(time + 'ms')
|
||||
p.stop() // stop sending pings
|
||||
})
|
||||
|
||||
p.start()
|
||||
Ping.unmount(libp2p)
|
||||
```
|
||||
|
@ -1,50 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const pull = require('pull-stream/pull')
|
||||
const handshake = require('pull-handshake')
|
||||
const constants = require('./constants')
|
||||
const PROTOCOL = constants.PROTOCOL
|
||||
const PING_LENGTH = constants.PING_LENGTH
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p-ping')
|
||||
log.error = debug('libp2p-ping:error')
|
||||
|
||||
function mount (swarm) {
|
||||
swarm.handle(PROTOCOL, (protocol, conn) => {
|
||||
const stream = handshake({ timeout: 0 })
|
||||
const shake = stream.handshake
|
||||
|
||||
// receive and echo back
|
||||
function next () {
|
||||
shake.read(PING_LENGTH, (err, buf) => {
|
||||
if (err === true) {
|
||||
// stream closed
|
||||
return
|
||||
}
|
||||
if (err) {
|
||||
return log.error(err)
|
||||
}
|
||||
|
||||
shake.write(buf)
|
||||
return next()
|
||||
})
|
||||
}
|
||||
|
||||
pull(
|
||||
conn,
|
||||
stream,
|
||||
conn
|
||||
)
|
||||
|
||||
next()
|
||||
})
|
||||
}
|
||||
|
||||
function unmount (swarm) {
|
||||
swarm.unhandle(PROTOCOL)
|
||||
}
|
||||
|
||||
exports = module.exports
|
||||
exports.mount = mount
|
||||
exports.unmount = unmount
|
@ -1,7 +1,62 @@
|
||||
'use strict'
|
||||
|
||||
const handler = require('./handler')
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p-ping')
|
||||
log.error = debug('libp2p-ping:error')
|
||||
const errCode = require('err-code')
|
||||
|
||||
exports = module.exports = require('./ping')
|
||||
exports.mount = handler.mount
|
||||
exports.unmount = handler.unmount
|
||||
const crypto = require('libp2p-crypto')
|
||||
const pipe = require('it-pipe')
|
||||
const { toBuffer } = require('it-buffer')
|
||||
const { collect } = require('streaming-iterables')
|
||||
|
||||
const { PROTOCOL, PING_LENGTH } = require('./constants')
|
||||
|
||||
/**
|
||||
* Ping a given peer and wait for its response, getting the operation latency.
|
||||
* @param {Libp2p} node
|
||||
* @param {PeerInfo} peer
|
||||
* @returns {Promise<Number>}
|
||||
*/
|
||||
async function ping (node, peer) {
|
||||
log('dialing %s to %s', PROTOCOL, peer.id.toB58String())
|
||||
|
||||
const { stream } = await node.dialProtocol(peer, PROTOCOL)
|
||||
|
||||
const start = new Date()
|
||||
const data = crypto.randomBytes(PING_LENGTH)
|
||||
|
||||
const [result] = await pipe(
|
||||
[data],
|
||||
stream,
|
||||
toBuffer,
|
||||
collect
|
||||
)
|
||||
const end = Date.now()
|
||||
|
||||
if (!data.equals(result)) {
|
||||
throw errCode(new Error('Received wrong ping ack'), 'ERR_WRONG_PING_ACK')
|
||||
}
|
||||
|
||||
return end - start
|
||||
}
|
||||
|
||||
/**
|
||||
* Subscribe ping protocol handler.
|
||||
* @param {Libp2p} node
|
||||
*/
|
||||
function mount (node) {
|
||||
node.handle(PROTOCOL, ({ stream }) => pipe(stream, stream))
|
||||
}
|
||||
|
||||
/**
|
||||
* Unsubscribe ping protocol handler.
|
||||
* @param {Libp2p} node
|
||||
*/
|
||||
function unmount (node) {
|
||||
node.unhandle(PROTOCOL)
|
||||
}
|
||||
|
||||
exports = module.exports = ping
|
||||
exports.mount = mount
|
||||
exports.unmount = unmount
|
||||
|
@ -1,83 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const EventEmitter = require('events').EventEmitter
|
||||
const pull = require('pull-stream/pull')
|
||||
const empty = require('pull-stream/sources/empty')
|
||||
const handshake = require('pull-handshake')
|
||||
const constants = require('./constants')
|
||||
const util = require('./util')
|
||||
const rnd = util.rnd
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p-ping')
|
||||
log.error = debug('libp2p-ping:error')
|
||||
|
||||
const PROTOCOL = constants.PROTOCOL
|
||||
const PING_LENGTH = constants.PING_LENGTH
|
||||
|
||||
class Ping extends EventEmitter {
|
||||
constructor (swarm, peer) {
|
||||
super()
|
||||
|
||||
this._stopped = false
|
||||
this.peer = peer
|
||||
this.swarm = swarm
|
||||
}
|
||||
|
||||
start () {
|
||||
log('dialing %s to %s', PROTOCOL, this.peer.id.toB58String())
|
||||
|
||||
this.swarm.dial(this.peer, PROTOCOL, (err, conn) => {
|
||||
if (err) {
|
||||
return this.emit('error', err)
|
||||
}
|
||||
|
||||
const stream = handshake({ timeout: 0 })
|
||||
this.shake = stream.handshake
|
||||
|
||||
pull(
|
||||
stream,
|
||||
conn,
|
||||
stream
|
||||
)
|
||||
|
||||
// write and wait to see ping back
|
||||
const self = this
|
||||
function next () {
|
||||
const start = new Date()
|
||||
const buf = rnd(PING_LENGTH)
|
||||
self.shake.write(buf)
|
||||
self.shake.read(PING_LENGTH, (err, bufBack) => {
|
||||
const end = new Date()
|
||||
if (err || !buf.equals(bufBack)) {
|
||||
const err = new Error('Received wrong ping ack')
|
||||
return self.emit('error', err)
|
||||
}
|
||||
|
||||
self.emit('ping', end - start)
|
||||
|
||||
if (self._stopped) {
|
||||
return
|
||||
}
|
||||
next()
|
||||
})
|
||||
}
|
||||
|
||||
next()
|
||||
})
|
||||
}
|
||||
|
||||
stop () {
|
||||
if (this._stopped || !this.shake) {
|
||||
return
|
||||
}
|
||||
|
||||
this._stopped = true
|
||||
|
||||
pull(
|
||||
empty(),
|
||||
this.shake.rest()
|
||||
)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = Ping
|
@ -1,16 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
/**
|
||||
* Converts BufferList messages to Buffers
|
||||
* @param {*} source
|
||||
* @returns {AsyncGenerator}
|
||||
*/
|
||||
function toBuffer (source) {
|
||||
return (async function * () {
|
||||
for await (const chunk of source) {
|
||||
yield Buffer.isBuffer(chunk) ? chunk : chunk.slice()
|
||||
}
|
||||
})()
|
||||
}
|
||||
|
||||
module.exports.toBuffer = toBuffer
|
35
test/core/ping.node.js
Normal file
35
test/core/ping.node.js
Normal file
@ -0,0 +1,35 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
const { expect } = chai
|
||||
|
||||
const pTimes = require('p-times')
|
||||
|
||||
const peerUtils = require('../utils/creators/peer')
|
||||
const baseOptions = require('../utils/base-options')
|
||||
|
||||
describe('ping', () => {
|
||||
let nodes
|
||||
|
||||
beforeEach(async () => {
|
||||
nodes = await peerUtils.createPeer({
|
||||
number: 2,
|
||||
config: baseOptions
|
||||
})
|
||||
})
|
||||
|
||||
it('ping once from peer0 to peer1', async () => {
|
||||
const latency = await nodes[0].ping(nodes[1].peerInfo)
|
||||
|
||||
expect(latency).to.be.a('Number')
|
||||
})
|
||||
|
||||
it('ping several times for getting an average', async () => {
|
||||
const latencies = await pTimes(5, () => nodes[1].ping(nodes[0].peerInfo))
|
||||
|
||||
const averageLatency = latencies.reduce((p, c) => p + c, 0) / latencies.length
|
||||
expect(averageLatency).to.be.a('Number')
|
||||
})
|
||||
})
|
Loading…
x
Reference in New Issue
Block a user