diff --git a/src/get-peer-info.js b/src/get-peer-info.js index b74caa3e..01a6bc49 100644 --- a/src/get-peer-info.js +++ b/src/get-peer-info.js @@ -56,10 +56,7 @@ function getPeerInfoRemote (peer, libp2p) { try { peerInfo = getPeerInfo(peer, libp2p.peerStore) } catch (err) { - return Promise.reject(errCode( - new Error(`${peer} is not a valid peer type`), - 'ERR_INVALID_PEER_TYPE' - )) + throw errCode(err, 'ERR_INVALID_PEER_TYPE') } // If we don't have an address for the peer, attempt to find it @@ -67,7 +64,7 @@ function getPeerInfoRemote (peer, libp2p) { return libp2p.peerRouting.findPeer(peerInfo.id) } - return Promise.resolve(peerInfo) + return peerInfo } module.exports = { diff --git a/src/index.js b/src/index.js index 5efcc8fe..ed1d9794 100644 --- a/src/index.js +++ b/src/index.js @@ -1,21 +1,13 @@ 'use strict' -const FSM = require('fsm-event') const { EventEmitter } = require('events') const debug = require('debug') const log = debug('libp2p') log.error = debug('libp2p:error') -const errCode = require('err-code') -const promisify = require('promisify-es6') - -const each = require('async/each') const PeerInfo = require('peer-info') const multiaddr = require('multiaddr') -const Switch = require('./switch') -const Ping = require('./ping') -const { emitFirst } = require('./util') const peerRouting = require('./peer-routing') const contentRouting = require('./content-routing') const dht = require('./dht') @@ -34,20 +26,11 @@ const { multicodecs: IDENTIFY_PROTOCOLS } = require('./identify') -const notStarted = (action, state) => { - return errCode( - new Error(`libp2p cannot ${action} when not started; state is ${state}`), - codes.ERR_NODE_NOT_STARTED - ) -} - /** * @fires Libp2p#error Emitted when an error occurs * @fires Libp2p#peer:connect Emitted when a peer is connected to this node * @fires Libp2p#peer:disconnect Emitted when a peer disconnects from this node * @fires Libp2p#peer:discovery Emitted when a peer is discovered - * @fires Libp2p#start Emitted when the node and its services has started - * @fires Libp2p#stop Emitted when the node and its services has stopped */ class Libp2p extends EventEmitter { constructor (_options) { @@ -67,9 +50,6 @@ class Libp2p extends EventEmitter { this.peerStore = new PeerStore() - // create the switch, and listen for errors - this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch) - // Setup the Upgrader this.upgrader = new Upgrader({ localPeer: this.peerInfo.id, @@ -158,63 +138,7 @@ class Libp2p extends EventEmitter { this.contentRouting = contentRouting(this) this.dht = dht(this) - // Mount default protocols - Ping.mount(this._switch) - - this.state = new FSM('STOPPED', { - STOPPED: { - start: 'STARTING', - stop: 'STOPPED', - done: 'STOPPED' - }, - STARTING: { - done: 'STARTED', - abort: 'STOPPED', - stop: 'STOPPING' - }, - STARTED: { - stop: 'STOPPING', - start: 'STARTED' - }, - STOPPING: { - stop: 'STOPPING', - done: 'STOPPED' - } - }) - this.state.on('STARTING', () => { - log('libp2p is starting') - this._onStarting() - }) - this.state.on('STOPPING', () => { - log('libp2p is stopping') - }) - this.state.on('STARTED', () => { - log('libp2p has started') - this.emit('start') - }) - this.state.on('STOPPED', () => { - log('libp2p has stopped') - this.emit('stop') - }) - this.state.on('error', (err) => { - log.error(err) - this.emit('error', err) - }) - - // Once we start, emit and dial any peers we may have already discovered - this.state.on('STARTED', () => { - for (const peerInfo of this.peerStore.peers) { - this.emit('peer:discovery', peerInfo) - this._maybeConnect(peerInfo) - } - }) - this._peerDiscovered = this._peerDiscovered.bind(this) - - // promisify all instance methods - ;['start', 'hangUp', 'ping'].forEach(method => { - this[method] = promisify(this[method], { context: this }) - }) } /** @@ -233,14 +157,23 @@ class Libp2p extends EventEmitter { } /** - * Starts the libp2p node and all sub services + * Starts the libp2p node and all its subsystems * - * @param {function(Error)} callback - * @returns {void} + * @returns {Promise} */ - start (callback = () => {}) { - emitFirst(this, ['error', 'start'], callback) - this.state('start') + async start () { + log('libp2p is starting') + try { + await this._onStarting() + await this._onDidStart() + log('libp2p has started') + } catch (err) { + this.emit('error', err) + log.error('An error occurred starting libp2p', err) + await this.stop() + throw err + } + this._isStarted = true } /** @@ -249,23 +182,22 @@ class Libp2p extends EventEmitter { * @returns {void} */ async stop () { - this.state('stop') + log('libp2p is stopping') try { this.pubsub && await this.pubsub.stop() await this.transportManager.close() - await this._switch.stop() } catch (err) { if (err) { log.error(err) this.emit('error', err) } } - this.state('done') + log('libp2p has stopped') } isStarted () { - return this.state ? this.state._state === 'STARTED' : false + return this._isStarted } /** @@ -319,36 +251,30 @@ class Libp2p extends EventEmitter { } /** - * Disconnects from the given peer + * Disconnects all connections to the given `peer` * - * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping - * @param {function(Error)} callback - * @returns {void} + * @param {PeerId} peer The PeerId to close connections to + * @returns {Promise} */ - hangUp (peer, callback) { - getPeerInfoRemote(peer, this) - .then(peerInfo => { - this._switch.hangUp(peerInfo, callback) - }, callback) + hangUp (peer) { + return Promise.all( + this.registrar.connections.get(peer.toB58String()).map(connection => { + return connection.close() + }) + ) } - /** - * Pings the provided peer - * - * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping - * @param {function(Error, Ping)} callback - * @returns {void} - */ - ping (peer, callback) { - if (!this.isStarted()) { - return callback(notStarted('ping', this.state._state)) - } - - getPeerInfoRemote(peer, this) - .then(peerInfo => { - callback(null, new Ping(this._switch, peerInfo)) - }, callback) - } + // TODO: Update ping + // /** + // * Pings the provided peer + // * + // * @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to ping + // * @returns {Promise} + // */ + // ping (peer) { + // const peerInfo = await getPeerInfoRemote(peer, this) + // return new Ping(this._switch, peerInfo) + // } /** * Registers the `handler` for each protocol @@ -379,32 +305,25 @@ class Libp2p extends EventEmitter { } async _onStarting () { - if (!this._modules.transport) { - this.emit('error', new Error('no transports were present')) - return this.state('abort') - } - const multiaddrs = this.peerInfo.multiaddrs.toArray() - // Start parallel tasks - const tasks = [ - this.transportManager.listen(multiaddrs) - ] + await this.transportManager.listen(multiaddrs) if (this._config.pubsub.enabled) { this.pubsub && this.pubsub.start() } + } - try { - await Promise.all(tasks) - } catch (err) { - log.error(err) - this.emit('error', err) - return this.state('stop') + /** + * Called when libp2p has started and before it returns + * @private + */ + _onDidStart () { + // Once we start, emit and dial any peers we may have already discovered + for (const peerInfo of this.peerStore.peers.values()) { + this.emit('peer:discovery', peerInfo) + this._maybeConnect(peerInfo) } - - // libp2p has started - this.state('done') } /** @@ -435,15 +354,18 @@ class Libp2p extends EventEmitter { * @private * @param {PeerInfo} peerInfo */ - _maybeConnect (peerInfo) { - // If auto dialing is on, check if we should dial - if (this._config.peerDiscovery.autoDial === true && !peerInfo.isConnected()) { + async _maybeConnect (peerInfo) { + // If auto dialing is on and we have no connection to the peer, check if we should dial + if (this._config.peerDiscovery.autoDial === true && !this.registrar.connections.get(peerInfo)) { const minPeers = this._options.connectionManager.minPeers || 0 - if (minPeers > Object.keys(this._switch.connection.connections).length) { + // TODO: This does not account for multiple connections to a peer + if (minPeers > this.registrar.connections.size) { log('connecting to discovered peer') - this._switch.dialer.connect(peerInfo, (err) => { - err && log.error('could not connect to discovered peer', err) - }) + try { + await this.dialer.connectToPeer(peerInfo) + } catch (err) { + log.error('could not connect to discovered peer', err) + } } } } @@ -452,9 +374,9 @@ class Libp2p extends EventEmitter { * Initializes and starts peer discovery services * * @private - * @param {function(Error)} callback + * @returns {Promise} */ - _setupPeerDiscovery (callback) { + _setupPeerDiscovery () { for (const DiscoveryService of this._modules.peerDiscovery) { let config = { enabled: true // on by default @@ -480,9 +402,7 @@ class Libp2p extends EventEmitter { } } - each(this._discovery, (d, cb) => { - d.start(cb) - }, callback) + return this._discovery.map(d => d.start()) } } diff --git a/src/transport-manager.js b/src/transport-manager.js index 136dc8c8..9fe86349 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -119,6 +119,12 @@ class TransportManager { * @param {Multiaddr[]} addrs */ async listen (addrs) { + if (addrs.length === 0) { + log('no addresses were provided for listening, this node is dial only') + return + } + + const couldNotListen = [] for (const [key, transport] of this._transports.entries()) { const supportedAddrs = transport.filter(addrs) const tasks = [] @@ -133,6 +139,12 @@ class TransportManager { tasks.push(listener.listen(addr)) } + // Keep track of transports we had no addresses for + if (tasks.length === 0) { + couldNotListen.push(key) + continue + } + const results = await pSettle(tasks) // If we are listening on at least 1 address, succeed. // TODO: we should look at adding a retry (`p-retry`) here to better support @@ -143,6 +155,12 @@ class TransportManager { throw errCode(new Error(`Transport (${key}) could not listen on any available address`), codes.ERR_NO_VALID_ADDRESSES) } } + + // If no transports were able to listen, throw an error. This likely + // means we were given addresses we do not have transports for + if (couldNotListen.length === this._transports.size) { + throw errCode(new Error(`no valid addresses were provided for transports [${couldNotListen}]`), codes.ERR_NO_VALID_ADDRESSES) + } } /** diff --git a/src/util/index.js b/src/util/index.js index 0fdd80ca..bca13a45 100644 --- a/src/util/index.js +++ b/src/util/index.js @@ -1,34 +1,4 @@ 'use strict' -const once = require('once') - -/** - * Registers `handler` to each event in `events`. The `handler` - * will only be called for the first event fired, at which point - * the `handler` will be removed as a listener. - * - * Ensures `handler` is only called once. - * - * @example - * // will call `callback` when `start` or `error` is emitted by `this` - * emitFirst(this, ['error', 'start'], callback) - * - * @private - * @param {EventEmitter} emitter The emitter to listen on - * @param {Array} events The events to listen for - * @param {function(*)} handler The handler to call when an event is triggered - * @returns {void} - */ -function emitFirst (emitter, events, handler) { - handler = once(handler) - events.forEach((e) => { - emitter.once(e, (...args) => { - events.forEach((ev) => { - emitter.removeListener(ev, handler) - }) - handler.apply(emitter, args) - }) - }) -} /** * Converts BufferList messages to Buffers @@ -43,5 +13,4 @@ function toBuffer (source) { })() } -module.exports.emitFirst = emitFirst module.exports.toBuffer = toBuffer diff --git a/test/dialing/direct.node.js b/test/dialing/direct.node.js index d8acbcac..73eee1ba 100644 --- a/test/dialing/direct.node.js +++ b/test/dialing/direct.node.js @@ -231,6 +231,23 @@ describe('Dialing (direct, TCP)', () => { expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1) }) + it('should be able to use hangup to close connections', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } + }) + + const connection = await libp2p.dial(remoteAddr) + expect(connection).to.exist() + expect(connection.stat.timeline.close).to.not.exist() + await libp2p.hangUp(connection.remotePeer) + expect(connection.stat.timeline.close).to.exist() + }) + it('should use the protectors when provided for connecting', async () => { const protector = new Protector(swarmKeyBuffer) libp2p = new Libp2p({ diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index 1aca5987..8a16fbab 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -210,7 +210,7 @@ describe('Dialing (direct, WebSockets)', () => { sinon.spy(libp2p.dialer, 'connectToMultiaddr') - const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) + const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() const { stream, protocol } = await connection.newStream('/echo/1.0.0') expect(stream).to.exist() @@ -241,5 +241,22 @@ describe('Dialing (direct, WebSockets)', () => { expect(libp2p.peerStore.update.callCount).to.equal(1) }) + + it('should be able to use hangup to close connections', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } + }) + + const connection = await libp2p.dial(remoteAddr) + expect(connection).to.exist() + expect(connection.stat.timeline.close).to.not.exist() + await libp2p.hangUp(connection.remotePeer) + expect(connection.stat.timeline.close).to.exist() + }) }) }) diff --git a/test/peer-discovery/index.spec.js b/test/peer-discovery/index.spec.js new file mode 100644 index 00000000..2b64ad30 --- /dev/null +++ b/test/peer-discovery/index.spec.js @@ -0,0 +1,45 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') +const defer = require('p-defer') + +const Libp2p = require('../../src') +const baseOptions = require('../utils/base-options.browser') +const { createPeerInfoFromFixture } = require('../utils/creators/peer') + +describe('peer discovery', () => { + let peerInfo + let remotePeerInfo + let libp2p + + before(async () => { + [peerInfo, remotePeerInfo] = await createPeerInfoFromFixture(2) + }) + + afterEach(async () => { + libp2p && await libp2p.stop() + }) + + it('should dial know peers on startup', async () => { + libp2p = new Libp2p({ + ...baseOptions, + peerInfo + }) + libp2p.peerStore.add(remotePeerInfo) + const deferred = defer() + sinon.stub(libp2p.dialer, 'connectToPeer').callsFake((remotePeerInfo) => { + expect(remotePeerInfo).to.equal(remotePeerInfo) + deferred.resolve() + }) + const spy = sinon.spy() + libp2p.on('peer:discovery', spy) + + libp2p.start() + await deferred.promise + expect(spy.getCall(0).args).to.eql([remotePeerInfo]) + }) +})