mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-14 01:31:22 +00:00
refactor: core async (#478)
* refactor: cleanup core test: auto dial on startup * fix: make hangup work properly * chore: fix lint * chore: apply suggestions from code review Co-Authored-By: Vasco Santos <vasco.santos@moxy.studio>
This commit is contained in:
@ -56,10 +56,7 @@ function getPeerInfoRemote (peer, libp2p) {
|
||||
try {
|
||||
peerInfo = getPeerInfo(peer, libp2p.peerStore)
|
||||
} catch (err) {
|
||||
return Promise.reject(errCode(
|
||||
new Error(`${peer} is not a valid peer type`),
|
||||
'ERR_INVALID_PEER_TYPE'
|
||||
))
|
||||
throw errCode(err, 'ERR_INVALID_PEER_TYPE')
|
||||
}
|
||||
|
||||
// If we don't have an address for the peer, attempt to find it
|
||||
@ -67,7 +64,7 @@ function getPeerInfoRemote (peer, libp2p) {
|
||||
return libp2p.peerRouting.findPeer(peerInfo.id)
|
||||
}
|
||||
|
||||
return Promise.resolve(peerInfo)
|
||||
return peerInfo
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
|
204
src/index.js
204
src/index.js
@ -1,21 +1,13 @@
|
||||
'use strict'
|
||||
|
||||
const FSM = require('fsm-event')
|
||||
const { EventEmitter } = require('events')
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p')
|
||||
log.error = debug('libp2p:error')
|
||||
const errCode = require('err-code')
|
||||
const promisify = require('promisify-es6')
|
||||
|
||||
const each = require('async/each')
|
||||
|
||||
const PeerInfo = require('peer-info')
|
||||
const multiaddr = require('multiaddr')
|
||||
const Switch = require('./switch')
|
||||
const Ping = require('./ping')
|
||||
|
||||
const { emitFirst } = require('./util')
|
||||
const peerRouting = require('./peer-routing')
|
||||
const contentRouting = require('./content-routing')
|
||||
const dht = require('./dht')
|
||||
@ -34,20 +26,11 @@ const {
|
||||
multicodecs: IDENTIFY_PROTOCOLS
|
||||
} = require('./identify')
|
||||
|
||||
const notStarted = (action, state) => {
|
||||
return errCode(
|
||||
new Error(`libp2p cannot ${action} when not started; state is ${state}`),
|
||||
codes.ERR_NODE_NOT_STARTED
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* @fires Libp2p#error Emitted when an error occurs
|
||||
* @fires Libp2p#peer:connect Emitted when a peer is connected to this node
|
||||
* @fires Libp2p#peer:disconnect Emitted when a peer disconnects from this node
|
||||
* @fires Libp2p#peer:discovery Emitted when a peer is discovered
|
||||
* @fires Libp2p#start Emitted when the node and its services has started
|
||||
* @fires Libp2p#stop Emitted when the node and its services has stopped
|
||||
*/
|
||||
class Libp2p extends EventEmitter {
|
||||
constructor (_options) {
|
||||
@ -67,9 +50,6 @@ class Libp2p extends EventEmitter {
|
||||
|
||||
this.peerStore = new PeerStore()
|
||||
|
||||
// create the switch, and listen for errors
|
||||
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)
|
||||
|
||||
// Setup the Upgrader
|
||||
this.upgrader = new Upgrader({
|
||||
localPeer: this.peerInfo.id,
|
||||
@ -158,63 +138,7 @@ class Libp2p extends EventEmitter {
|
||||
this.contentRouting = contentRouting(this)
|
||||
this.dht = dht(this)
|
||||
|
||||
// Mount default protocols
|
||||
Ping.mount(this._switch)
|
||||
|
||||
this.state = new FSM('STOPPED', {
|
||||
STOPPED: {
|
||||
start: 'STARTING',
|
||||
stop: 'STOPPED',
|
||||
done: 'STOPPED'
|
||||
},
|
||||
STARTING: {
|
||||
done: 'STARTED',
|
||||
abort: 'STOPPED',
|
||||
stop: 'STOPPING'
|
||||
},
|
||||
STARTED: {
|
||||
stop: 'STOPPING',
|
||||
start: 'STARTED'
|
||||
},
|
||||
STOPPING: {
|
||||
stop: 'STOPPING',
|
||||
done: 'STOPPED'
|
||||
}
|
||||
})
|
||||
this.state.on('STARTING', () => {
|
||||
log('libp2p is starting')
|
||||
this._onStarting()
|
||||
})
|
||||
this.state.on('STOPPING', () => {
|
||||
log('libp2p is stopping')
|
||||
})
|
||||
this.state.on('STARTED', () => {
|
||||
log('libp2p has started')
|
||||
this.emit('start')
|
||||
})
|
||||
this.state.on('STOPPED', () => {
|
||||
log('libp2p has stopped')
|
||||
this.emit('stop')
|
||||
})
|
||||
this.state.on('error', (err) => {
|
||||
log.error(err)
|
||||
this.emit('error', err)
|
||||
})
|
||||
|
||||
// Once we start, emit and dial any peers we may have already discovered
|
||||
this.state.on('STARTED', () => {
|
||||
for (const peerInfo of this.peerStore.peers) {
|
||||
this.emit('peer:discovery', peerInfo)
|
||||
this._maybeConnect(peerInfo)
|
||||
}
|
||||
})
|
||||
|
||||
this._peerDiscovered = this._peerDiscovered.bind(this)
|
||||
|
||||
// promisify all instance methods
|
||||
;['start', 'hangUp', 'ping'].forEach(method => {
|
||||
this[method] = promisify(this[method], { context: this })
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
@ -233,14 +157,23 @@ class Libp2p extends EventEmitter {
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts the libp2p node and all sub services
|
||||
* Starts the libp2p node and all its subsystems
|
||||
*
|
||||
* @param {function(Error)} callback
|
||||
* @returns {void}
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
start (callback = () => {}) {
|
||||
emitFirst(this, ['error', 'start'], callback)
|
||||
this.state('start')
|
||||
async start () {
|
||||
log('libp2p is starting')
|
||||
try {
|
||||
await this._onStarting()
|
||||
await this._onDidStart()
|
||||
log('libp2p has started')
|
||||
} catch (err) {
|
||||
this.emit('error', err)
|
||||
log.error('An error occurred starting libp2p', err)
|
||||
await this.stop()
|
||||
throw err
|
||||
}
|
||||
this._isStarted = true
|
||||
}
|
||||
|
||||
/**
|
||||
@ -249,23 +182,22 @@ class Libp2p extends EventEmitter {
|
||||
* @returns {void}
|
||||
*/
|
||||
async stop () {
|
||||
this.state('stop')
|
||||
log('libp2p is stopping')
|
||||
|
||||
try {
|
||||
this.pubsub && await this.pubsub.stop()
|
||||
await this.transportManager.close()
|
||||
await this._switch.stop()
|
||||
} catch (err) {
|
||||
if (err) {
|
||||
log.error(err)
|
||||
this.emit('error', err)
|
||||
}
|
||||
}
|
||||
this.state('done')
|
||||
log('libp2p has stopped')
|
||||
}
|
||||
|
||||
isStarted () {
|
||||
return this.state ? this.state._state === 'STARTED' : false
|
||||
return this._isStarted
|
||||
}
|
||||
|
||||
/**
|
||||
@ -319,36 +251,30 @@ class Libp2p extends EventEmitter {
|
||||
}
|
||||
|
||||
/**
|
||||
* Disconnects from the given peer
|
||||
* Disconnects all connections to the given `peer`
|
||||
*
|
||||
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
|
||||
* @param {function(Error)} callback
|
||||
* @returns {void}
|
||||
* @param {PeerId} peer The PeerId to close connections to
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
hangUp (peer, callback) {
|
||||
getPeerInfoRemote(peer, this)
|
||||
.then(peerInfo => {
|
||||
this._switch.hangUp(peerInfo, callback)
|
||||
}, callback)
|
||||
hangUp (peer) {
|
||||
return Promise.all(
|
||||
this.registrar.connections.get(peer.toB58String()).map(connection => {
|
||||
return connection.close()
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Pings the provided peer
|
||||
*
|
||||
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
|
||||
* @param {function(Error, Ping)} callback
|
||||
* @returns {void}
|
||||
*/
|
||||
ping (peer, callback) {
|
||||
if (!this.isStarted()) {
|
||||
return callback(notStarted('ping', this.state._state))
|
||||
}
|
||||
|
||||
getPeerInfoRemote(peer, this)
|
||||
.then(peerInfo => {
|
||||
callback(null, new Ping(this._switch, peerInfo))
|
||||
}, callback)
|
||||
}
|
||||
// TODO: Update ping
|
||||
// /**
|
||||
// * Pings the provided peer
|
||||
// *
|
||||
// * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping
|
||||
// * @returns {Promise<Ping>}
|
||||
// */
|
||||
// ping (peer) {
|
||||
// const peerInfo = await getPeerInfoRemote(peer, this)
|
||||
// return new Ping(this._switch, peerInfo)
|
||||
// }
|
||||
|
||||
/**
|
||||
* Registers the `handler` for each protocol
|
||||
@ -379,32 +305,25 @@ class Libp2p extends EventEmitter {
|
||||
}
|
||||
|
||||
async _onStarting () {
|
||||
if (!this._modules.transport) {
|
||||
this.emit('error', new Error('no transports were present'))
|
||||
return this.state('abort')
|
||||
}
|
||||
|
||||
const multiaddrs = this.peerInfo.multiaddrs.toArray()
|
||||
|
||||
// Start parallel tasks
|
||||
const tasks = [
|
||||
this.transportManager.listen(multiaddrs)
|
||||
]
|
||||
await this.transportManager.listen(multiaddrs)
|
||||
|
||||
if (this._config.pubsub.enabled) {
|
||||
this.pubsub && this.pubsub.start()
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
await Promise.all(tasks)
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
this.emit('error', err)
|
||||
return this.state('stop')
|
||||
/**
|
||||
* Called when libp2p has started and before it returns
|
||||
* @private
|
||||
*/
|
||||
_onDidStart () {
|
||||
// Once we start, emit and dial any peers we may have already discovered
|
||||
for (const peerInfo of this.peerStore.peers.values()) {
|
||||
this.emit('peer:discovery', peerInfo)
|
||||
this._maybeConnect(peerInfo)
|
||||
}
|
||||
|
||||
// libp2p has started
|
||||
this.state('done')
|
||||
}
|
||||
|
||||
/**
|
||||
@ -435,15 +354,18 @@ class Libp2p extends EventEmitter {
|
||||
* @private
|
||||
* @param {PeerInfo} peerInfo
|
||||
*/
|
||||
_maybeConnect (peerInfo) {
|
||||
// If auto dialing is on, check if we should dial
|
||||
if (this._config.peerDiscovery.autoDial === true && !peerInfo.isConnected()) {
|
||||
async _maybeConnect (peerInfo) {
|
||||
// If auto dialing is on and we have no connection to the peer, check if we should dial
|
||||
if (this._config.peerDiscovery.autoDial === true && !this.registrar.connections.get(peerInfo)) {
|
||||
const minPeers = this._options.connectionManager.minPeers || 0
|
||||
if (minPeers > Object.keys(this._switch.connection.connections).length) {
|
||||
// TODO: This does not account for multiple connections to a peer
|
||||
if (minPeers > this.registrar.connections.size) {
|
||||
log('connecting to discovered peer')
|
||||
this._switch.dialer.connect(peerInfo, (err) => {
|
||||
err && log.error('could not connect to discovered peer', err)
|
||||
})
|
||||
try {
|
||||
await this.dialer.connectToPeer(peerInfo)
|
||||
} catch (err) {
|
||||
log.error('could not connect to discovered peer', err)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -452,9 +374,9 @@ class Libp2p extends EventEmitter {
|
||||
* Initializes and starts peer discovery services
|
||||
*
|
||||
* @private
|
||||
* @param {function(Error)} callback
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
_setupPeerDiscovery (callback) {
|
||||
_setupPeerDiscovery () {
|
||||
for (const DiscoveryService of this._modules.peerDiscovery) {
|
||||
let config = {
|
||||
enabled: true // on by default
|
||||
@ -480,9 +402,7 @@ class Libp2p extends EventEmitter {
|
||||
}
|
||||
}
|
||||
|
||||
each(this._discovery, (d, cb) => {
|
||||
d.start(cb)
|
||||
}, callback)
|
||||
return this._discovery.map(d => d.start())
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -119,6 +119,12 @@ class TransportManager {
|
||||
* @param {Multiaddr[]} addrs
|
||||
*/
|
||||
async listen (addrs) {
|
||||
if (addrs.length === 0) {
|
||||
log('no addresses were provided for listening, this node is dial only')
|
||||
return
|
||||
}
|
||||
|
||||
const couldNotListen = []
|
||||
for (const [key, transport] of this._transports.entries()) {
|
||||
const supportedAddrs = transport.filter(addrs)
|
||||
const tasks = []
|
||||
@ -133,6 +139,12 @@ class TransportManager {
|
||||
tasks.push(listener.listen(addr))
|
||||
}
|
||||
|
||||
// Keep track of transports we had no addresses for
|
||||
if (tasks.length === 0) {
|
||||
couldNotListen.push(key)
|
||||
continue
|
||||
}
|
||||
|
||||
const results = await pSettle(tasks)
|
||||
// If we are listening on at least 1 address, succeed.
|
||||
// TODO: we should look at adding a retry (`p-retry`) here to better support
|
||||
@ -143,6 +155,12 @@ class TransportManager {
|
||||
throw errCode(new Error(`Transport (${key}) could not listen on any available address`), codes.ERR_NO_VALID_ADDRESSES)
|
||||
}
|
||||
}
|
||||
|
||||
// If no transports were able to listen, throw an error. This likely
|
||||
// means we were given addresses we do not have transports for
|
||||
if (couldNotListen.length === this._transports.size) {
|
||||
throw errCode(new Error(`no valid addresses were provided for transports [${couldNotListen}]`), codes.ERR_NO_VALID_ADDRESSES)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -1,34 +1,4 @@
|
||||
'use strict'
|
||||
const once = require('once')
|
||||
|
||||
/**
|
||||
* Registers `handler` to each event in `events`. The `handler`
|
||||
* will only be called for the first event fired, at which point
|
||||
* the `handler` will be removed as a listener.
|
||||
*
|
||||
* Ensures `handler` is only called once.
|
||||
*
|
||||
* @example
|
||||
* // will call `callback` when `start` or `error` is emitted by `this`
|
||||
* emitFirst(this, ['error', 'start'], callback)
|
||||
*
|
||||
* @private
|
||||
* @param {EventEmitter} emitter The emitter to listen on
|
||||
* @param {Array<string>} events The events to listen for
|
||||
* @param {function(*)} handler The handler to call when an event is triggered
|
||||
* @returns {void}
|
||||
*/
|
||||
function emitFirst (emitter, events, handler) {
|
||||
handler = once(handler)
|
||||
events.forEach((e) => {
|
||||
emitter.once(e, (...args) => {
|
||||
events.forEach((ev) => {
|
||||
emitter.removeListener(ev, handler)
|
||||
})
|
||||
handler.apply(emitter, args)
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts BufferList messages to Buffers
|
||||
@ -43,5 +13,4 @@ function toBuffer (source) {
|
||||
})()
|
||||
}
|
||||
|
||||
module.exports.emitFirst = emitFirst
|
||||
module.exports.toBuffer = toBuffer
|
||||
|
@ -231,6 +231,23 @@ describe('Dialing (direct, TCP)', () => {
|
||||
expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1)
|
||||
})
|
||||
|
||||
it('should be able to use hangup to close connections', async () => {
|
||||
libp2p = new Libp2p({
|
||||
peerInfo,
|
||||
modules: {
|
||||
transport: [Transport],
|
||||
streamMuxer: [Muxer],
|
||||
connEncryption: [Crypto]
|
||||
}
|
||||
})
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
expect(connection.stat.timeline.close).to.not.exist()
|
||||
await libp2p.hangUp(connection.remotePeer)
|
||||
expect(connection.stat.timeline.close).to.exist()
|
||||
})
|
||||
|
||||
it('should use the protectors when provided for connecting', async () => {
|
||||
const protector = new Protector(swarmKeyBuffer)
|
||||
libp2p = new Libp2p({
|
||||
|
@ -210,7 +210,7 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
|
||||
sinon.spy(libp2p.dialer, 'connectToMultiaddr')
|
||||
|
||||
const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr)
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
const { stream, protocol } = await connection.newStream('/echo/1.0.0')
|
||||
expect(stream).to.exist()
|
||||
@ -241,5 +241,22 @@ describe('Dialing (direct, WebSockets)', () => {
|
||||
|
||||
expect(libp2p.peerStore.update.callCount).to.equal(1)
|
||||
})
|
||||
|
||||
it('should be able to use hangup to close connections', async () => {
|
||||
libp2p = new Libp2p({
|
||||
peerInfo,
|
||||
modules: {
|
||||
transport: [Transport],
|
||||
streamMuxer: [Muxer],
|
||||
connEncryption: [Crypto]
|
||||
}
|
||||
})
|
||||
|
||||
const connection = await libp2p.dial(remoteAddr)
|
||||
expect(connection).to.exist()
|
||||
expect(connection.stat.timeline.close).to.not.exist()
|
||||
await libp2p.hangUp(connection.remotePeer)
|
||||
expect(connection.stat.timeline.close).to.exist()
|
||||
})
|
||||
})
|
||||
})
|
||||
|
45
test/peer-discovery/index.spec.js
Normal file
45
test/peer-discovery/index.spec.js
Normal file
@ -0,0 +1,45 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
const defer = require('p-defer')
|
||||
|
||||
const Libp2p = require('../../src')
|
||||
const baseOptions = require('../utils/base-options.browser')
|
||||
const { createPeerInfoFromFixture } = require('../utils/creators/peer')
|
||||
|
||||
describe('peer discovery', () => {
|
||||
let peerInfo
|
||||
let remotePeerInfo
|
||||
let libp2p
|
||||
|
||||
before(async () => {
|
||||
[peerInfo, remotePeerInfo] = await createPeerInfoFromFixture(2)
|
||||
})
|
||||
|
||||
afterEach(async () => {
|
||||
libp2p && await libp2p.stop()
|
||||
})
|
||||
|
||||
it('should dial know peers on startup', async () => {
|
||||
libp2p = new Libp2p({
|
||||
...baseOptions,
|
||||
peerInfo
|
||||
})
|
||||
libp2p.peerStore.add(remotePeerInfo)
|
||||
const deferred = defer()
|
||||
sinon.stub(libp2p.dialer, 'connectToPeer').callsFake((remotePeerInfo) => {
|
||||
expect(remotePeerInfo).to.equal(remotePeerInfo)
|
||||
deferred.resolve()
|
||||
})
|
||||
const spy = sinon.spy()
|
||||
libp2p.on('peer:discovery', spy)
|
||||
|
||||
libp2p.start()
|
||||
await deferred.promise
|
||||
expect(spy.getCall(0).args).to.eql([remotePeerInfo])
|
||||
})
|
||||
})
|
Reference in New Issue
Block a user