From c7a54f34f7dcd003c2a27465b1ef2ba3a4f4624a Mon Sep 17 00:00:00 2001 From: Jacob Heun Date: Thu, 7 Nov 2019 12:11:50 +0100 Subject: [PATCH] refactor: async identify and identify push (#473) * chore: add missing dep * feat: import from identify push branch https://github.com/libp2p/js-libp2p-identify/tree/feat/identify-push * feat: add the connection to stream handlers * refactor: identify to async/await * chore: fix lint * test: add identify tests * refactor: add identify to the dialer flow * feat: connect identify to the registrar * fix: resolve review feedback * fix: perform identify push when our protocols change --- package.json | 3 +- src/dialer.js | 29 +++ src/errors.js | 3 + src/identify/README.md | 28 +-- src/identify/consts.js | 6 + src/identify/dialer.js | 87 --------- src/identify/index.js | 302 ++++++++++++++++++++++++++++- src/identify/listener.js | 35 ---- src/index.js | 35 +++- src/upgrader.js | 19 +- src/util/index.js | 14 ++ test/dialing/direct.spec.js | 23 +++ test/identify/index.spec.js | 247 +++++++++++++++++++++++ test/upgrading/upgrader.spec.js | 4 +- test/utils/base-options.browser.js | 13 ++ 15 files changed, 674 insertions(+), 174 deletions(-) create mode 100644 src/identify/consts.js delete mode 100644 src/identify/dialer.js delete mode 100644 src/identify/listener.js create mode 100644 test/identify/index.spec.js create mode 100644 test/utils/base-options.browser.js diff --git a/package.json b/package.json index 910325e8..04970f95 100644 --- a/package.json +++ b/package.json @@ -52,6 +52,7 @@ "it-handshake": "^1.0.1", "it-length-prefixed": "jacobheun/pull-length-prefixed#feat/fromReader", "it-pipe": "^1.0.1", + "it-protocol-buffers": "^0.2.0", "latency-monitor": "~0.2.1", "libp2p-crypto": "^0.16.2", "libp2p-interfaces": "^0.1.3", @@ -68,9 +69,7 @@ "promisify-es6": "^1.0.3", "protons": "^1.0.1", "pull-cat": "^1.1.11", - "pull-defer": "~0.2.3", "pull-handshake": "^1.1.4", - "pull-reader": "^1.3.1", "pull-stream": "^3.6.9", "retimer": "^2.0.0", "xsalsa20": "^1.0.2" diff --git a/src/dialer.js b/src/dialer.js index 2177ccda..85ac6ae3 100644 --- a/src/dialer.js +++ b/src/dialer.js @@ -1,5 +1,6 @@ 'use strict' +const nextTick = require('async/nextTick') const multiaddr = require('multiaddr') const errCode = require('err-code') const { default: PQueue } = require('p-queue') @@ -31,6 +32,22 @@ class Dialer { this.concurrency = concurrency this.timeout = timeout this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true }) + + /** + * @property {IdentifyService} + */ + this._identifyService = null + } + + set identifyService (service) { + this._identifyService = service + } + + /** + * @type {IdentifyService} + */ + get identifyService () { + return this._identifyService } /** @@ -64,6 +81,18 @@ class Dialer { throw err } + // Perform a delayed Identify handshake + if (this.identifyService) { + nextTick(async () => { + try { + await this.identifyService.identify(conn, conn.remotePeer) + // TODO: Update the PeerStore with the information from identify + } catch (err) { + log.error(err) + } + }) + } + return conn } diff --git a/src/errors.js b/src/errors.js index de75f094..2a103db2 100644 --- a/src/errors.js +++ b/src/errors.js @@ -8,6 +8,7 @@ exports.messages = { exports.codes = { DHT_DISABLED: 'ERR_DHT_DISABLED', PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED', + ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED', ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED', ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED', ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES', @@ -15,6 +16,8 @@ exports.codes = { ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT', ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED', ERR_INVALID_KEY: 'ERR_INVALID_KEY', + ERR_INVALID_MESSAGE: 'ERR_INVALID_MESSAGE', + ERR_INVALID_PEER: 'ERR_INVALID_PEER', ERR_MUXER_UNAVAILABLE: 'ERR_MUXER_UNAVAILABLE', ERR_TIMEOUT: 'ERR_TIMEOUT', ERR_TRANSPORT_UNAVAILABLE: 'ERR_TRANSPORT_UNAVAILABLE', diff --git a/src/identify/README.md b/src/identify/README.md index 865a392c..d356abe3 100644 --- a/src/identify/README.md +++ b/src/identify/README.md @@ -6,32 +6,8 @@ ## Description -Identify is a STUN protocol, used by libp2p-swarm in order to broadcast and learn about the `ip:port` pairs a specific peer is available through and to know when a new stream muxer is established, so a conn can be reused. +Identify is a STUN protocol, used by libp2p in order to broadcast and learn about the `ip:port` pairs a specific peer is available through and to know when a new stream muxer is established, so a conn can be reused. ## How does it work -Best way to understand the current design is through this issue: https://github.com/libp2p/js-libp2p-swarm/issues/78 - -### This module uses `pull-streams` - -We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362). - -You can learn more about pull-streams at: - -- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ) -- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams) -- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple) -- [pull-streams documentation](https://pull-stream.github.io/) - -#### Converting `pull-streams` to Node.js Streams - -If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/pull-stream/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example: - -```js -const pullToStream = require('pull-stream-to-stream') - -const nodeStreamInstance = pullToStream(pullStreamInstance) -// nodeStreamInstance is an instance of a Node.js Stream -``` - -To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream. +The spec for Identify and Identify Push is at [libp2p/specs](https://github.com/libp2p/specs/tree/master/identify). diff --git a/src/identify/consts.js b/src/identify/consts.js new file mode 100644 index 00000000..ae8d8432 --- /dev/null +++ b/src/identify/consts.js @@ -0,0 +1,6 @@ +'use strict' + +module.exports.PROTOCOL_VERSION = 'ipfs/0.1.0' +module.exports.AGENT_VERSION = 'js-libp2p/0.1.0' +module.exports.MULTICODEC_IDENTIFY = '/ipfs/id/1.0.0' +module.exports.MULTICODEC_IDENTIFY_PUSH = '/ipfs/id/push/1.0.0' diff --git a/src/identify/dialer.js b/src/identify/dialer.js deleted file mode 100644 index ba9a524f..00000000 --- a/src/identify/dialer.js +++ /dev/null @@ -1,87 +0,0 @@ -'use strict' -const PeerInfo = require('peer-info') -const PeerId = require('peer-id') -const multiaddr = require('multiaddr') -const pull = require('pull-stream/pull') -const take = require('pull-stream/throughs/take') -const collect = require('pull-stream/sinks/collect') -const lp = require('pull-length-prefixed') - -const msg = require('./message') - -module.exports = (conn, expectedPeerInfo, callback) => { - if (typeof expectedPeerInfo === 'function') { - callback = expectedPeerInfo - expectedPeerInfo = null - // eslint-disable-next-line no-console - console.warn('WARNING: no expected peer info was given, identify will not be able to verify peer integrity') - } - - pull( - conn, - lp.decode(), - take(1), - collect((err, data) => { - if (err) { - return callback(err) - } - - // connection got closed graciously - if (data.length === 0) { - return callback(new Error('conn was closed, did not receive data')) - } - - const input = msg.decode(data[0]) - - PeerId.createFromPubKey(input.publicKey, (err, id) => { - if (err) { - return callback(err) - } - - const peerInfo = new PeerInfo(id) - if (expectedPeerInfo && expectedPeerInfo.id.toB58String() !== id.toB58String()) { - return callback(new Error('invalid peer')) - } - - try { - input.listenAddrs - .map(multiaddr) - .forEach((ma) => peerInfo.multiaddrs.add(ma)) - } catch (err) { - return callback(err) - } - - let observedAddr - - try { - observedAddr = getObservedAddrs(input) - } catch (err) { - return callback(err) - } - - // Copy the protocols - peerInfo.protocols = new Set(input.protocols) - - callback(null, peerInfo, observedAddr) - }) - }) - ) -} - -function getObservedAddrs (input) { - if (!hasObservedAddr(input)) { - return [] - } - - let addrs = input.observedAddr - - if (!Array.isArray(addrs)) { - addrs = [addrs] - } - - return addrs.map((oa) => multiaddr(oa)) -} - -function hasObservedAddr (input) { - return input.observedAddr && input.observedAddr.length > 0 -} diff --git a/src/identify/index.js b/src/identify/index.js index bcbdd641..ca1d70e4 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -1,7 +1,299 @@ 'use strict' -exports = module.exports -exports.multicodec = '/ipfs/id/1.0.0' -exports.listener = require('./listener') -exports.dialer = require('./dialer') -exports.message = require('./message') +const debug = require('debug') +const pb = require('it-protocol-buffers') +const lp = require('it-length-prefixed') +const pipe = require('it-pipe') +const { collect, take } = require('streaming-iterables') + +const PeerInfo = require('peer-info') +const PeerId = require('peer-id') +const multiaddr = require('multiaddr') +const { toBuffer } = require('../util') + +const Message = require('./message') + +const log = debug('libp2p:identify') +log.error = debug('libp2p:identify:error') + +const { + MULTICODEC_IDENTIFY, + MULTICODEC_IDENTIFY_PUSH, + AGENT_VERSION, + PROTOCOL_VERSION +} = require('./consts') + +const errCode = require('err-code') +const { codes } = require('../errors') + +class IdentifyService { + /** + * Replaces the multiaddrs on the given `peerInfo`, + * with the provided `multiaddrs` + * @param {PeerInfo} peerInfo + * @param {Array|Array} multiaddrs + */ + static updatePeerAddresses (peerInfo, multiaddrs) { + if (multiaddrs && multiaddrs.length > 0) { + peerInfo.multiaddrs.clear() + multiaddrs.forEach(ma => { + try { + peerInfo.multiaddrs.add(ma) + } catch (err) { + log.error('could not add multiaddr', err) + } + }) + } + } + + /** + * Replaces the protocols on the given `peerInfo`, + * with the provided `protocols` + * @static + * @param {PeerInfo} peerInfo + * @param {Array} protocols + */ + static updatePeerProtocols (peerInfo, protocols) { + if (protocols && protocols.length > 0) { + peerInfo.protocols.clear() + protocols.forEach(proto => peerInfo.protocols.add(proto)) + } + } + + /** + * Takes the `addr` and converts it to a Multiaddr if possible + * @param {Buffer|String} addr + * @returns {Multiaddr|null} + */ + static getCleanMultiaddr (addr) { + if (addr && addr.length > 0) { + try { + return multiaddr(addr) + } catch (_) { + return null + } + } + return null + } + + /** + * @constructor + * @param {object} options + * @param {Registrar} options.registrar + * @param {Map} options.protocols A reference to the protocols we support + * @param {PeerInfo} options.peerInfo The peer running the identify service + */ + constructor (options) { + /** + * @property {Registrar} + */ + this.registrar = options.registrar + /** + * @property {PeerInfo} + */ + this.peerInfo = options.peerInfo + + this._protocols = options.protocols + + this.handleMessage = this.handleMessage.bind(this) + } + + /** + * Send an Identify Push update to the list of connections + * @param {Array} connections + * @returns {Promise} + */ + push (connections) { + const pushes = connections.map(async connection => { + try { + const { stream } = await connection.newStream(MULTICODEC_IDENTIFY_PUSH) + + await pipe( + [{ + listenAddrs: this.peerInfo.multiaddrs.toArray().map((ma) => ma.buffer), + protocols: Array.from(this._protocols.keys()) + }], + pb.encode(Message), + stream + ) + } catch (err) { + // Just log errors + log.error('could not push identify update to peer', err) + } + }) + + return Promise.all(pushes) + } + + /** + * Calls `push` for all peers in the `peerStore` that are connected + * @param {PeerStore} peerStore + */ + pushToPeerStore (peerStore) { + const connections = [] + let connection + for (const peer of peerStore.peers.values()) { + if (peer.protocols.has(MULTICODEC_IDENTIFY_PUSH) && (connection = this.registrar.getConnection(peer))) { + connections.push(connection) + } + } + + this.push(connections) + } + + /** + * Requests the `Identify` message from peer associated with the given `connection`. + * If the identified peer does not match the `PeerId` associated with the connection, + * an error will be thrown. + * + * @async + * @param {Connection} connection + * @param {PeerID} expectedPeer The PeerId the identify response should match + * @returns {Promise} + */ + async identify (connection, expectedPeer) { + const { stream } = await connection.newStream(MULTICODEC_IDENTIFY) + const [data] = await pipe( + stream, + lp.decode(), + take(1), + toBuffer, + collect + ) + + if (!data) { + throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED) + } + + let message + try { + message = Message.decode(data) + } catch (err) { + throw errCode(err, codes.ERR_INVALID_MESSAGE) + } + + let { + publicKey, + listenAddrs, + protocols, + observedAddr + } = message + + const id = await PeerId.createFromPubKey(publicKey) + const peerInfo = new PeerInfo(id) + if (expectedPeer && expectedPeer.toB58String() !== id.toB58String()) { + throw errCode(new Error('identified peer does not match the expected peer'), codes.ERR_INVALID_PEER) + } + + // Get the observedAddr if there is one + observedAddr = IdentifyService.getCleanMultiaddr(observedAddr) + + // Copy the listenAddrs and protocols + IdentifyService.updatePeerAddresses(peerInfo, listenAddrs) + IdentifyService.updatePeerProtocols(peerInfo, protocols) + + this.registrar.peerStore.update(peerInfo) + // TODO: Track our observed address so that we can score it + log('received observed address of %s', observedAddr) + } + + /** + * A handler to register with Libp2p to process identify messages. + * + * @param {object} options + * @param {String} options.protocol + * @param {*} options.stream + * @param {Connection} options.connection + * @returns {Promise} + */ + handleMessage ({ connection, stream, protocol }) { + switch (protocol) { + case MULTICODEC_IDENTIFY: + return this._handleIdentify({ connection, stream }) + case MULTICODEC_IDENTIFY_PUSH: + return this._handlePush({ connection, stream }) + default: + log.error('cannot handle unknown protocol %s', protocol) + } + } + + /** + * Sends the `Identify` response to the requesting peer over the + * given `connection` + * @private + * @param {object} options + * @param {*} options.stream + * @param {Connection} options.connection + */ + _handleIdentify ({ connection, stream }) { + let publicKey = Buffer.alloc(0) + if (this.peerInfo.id.pubKey) { + publicKey = this.peerInfo.id.pubKey.bytes + } + + const message = Message.encode({ + protocolVersion: PROTOCOL_VERSION, + agentVersion: AGENT_VERSION, + publicKey, + listenAddrs: this.peerInfo.multiaddrs.toArray().map((ma) => ma.buffer), + observedAddr: connection.remoteAddr.buffer, + protocols: Array.from(this._protocols.keys()) + }) + + pipe( + [message], + lp.encode(), + stream + ) + } + + /** + * Reads the Identify Push message from the given `connection` + * @private + * @param {object} options + * @param {*} options.stream + * @param {Connection} options.connection + */ + async _handlePush ({ connection, stream }) { + const [data] = await pipe( + stream, + lp.decode(), + take(1), + toBuffer, + collect + ) + + let message + try { + message = Message.decode(data) + } catch (err) { + return log.error('received invalid message', err) + } + + // Update the listen addresses + const peerInfo = new PeerInfo(connection.remotePeer) + + try { + IdentifyService.updatePeerAddresses(peerInfo, message.listenAddrs) + } catch (err) { + return log.error('received invalid listen addrs', err) + } + + // Update the protocols + IdentifyService.updatePeerProtocols(peerInfo, message.protocols) + + // Update the peer in the PeerStore + this.registrar.peerStore.update(peerInfo) + } +} + +module.exports.IdentifyService = IdentifyService +/** + * The protocols the IdentifyService supports + * @property multicodecs + */ +module.exports.multicodecs = { + IDENTIFY: MULTICODEC_IDENTIFY, + IDENTIFY_PUSH: MULTICODEC_IDENTIFY_PUSH +} +module.exports.Message = Message diff --git a/src/identify/listener.js b/src/identify/listener.js deleted file mode 100644 index 8f1d47bf..00000000 --- a/src/identify/listener.js +++ /dev/null @@ -1,35 +0,0 @@ -'use strict' - -const pull = require('pull-stream/pull') -const values = require('pull-stream/sources/values') -const lp = require('pull-length-prefixed') - -const msg = require('./message') - -module.exports = (conn, pInfoSelf) => { - // send what I see from the other + my Info - conn.getObservedAddrs((err, observedAddrs) => { - if (err) { return } - observedAddrs = observedAddrs[0] - - let publicKey = Buffer.alloc(0) - if (pInfoSelf.id.pubKey) { - publicKey = pInfoSelf.id.pubKey.bytes - } - - const msgSend = msg.encode({ - protocolVersion: 'ipfs/0.1.0', - agentVersion: 'na', - publicKey: publicKey, - listenAddrs: pInfoSelf.multiaddrs.toArray().map((ma) => ma.buffer), - observedAddr: observedAddrs ? observedAddrs.buffer : Buffer.from(''), - protocols: Array.from(pInfoSelf.protocols) - }) - - pull( - values([msgSend]), - lp.encode(), - conn - ) - }) -} diff --git a/src/index.js b/src/index.js index 998d4a6b..45648db0 100644 --- a/src/index.js +++ b/src/index.js @@ -30,6 +30,10 @@ const TransportManager = require('./transport-manager') const Upgrader = require('./upgrader') const PeerStore = require('./peer-store') const Registrar = require('./registrar') +const { + IdentifyService, + multicodecs: IDENTIFY_PROTOCOLS +} = require('./identify') const notStarted = (action, state) => { return errCode( @@ -83,6 +87,11 @@ class Libp2p extends EventEmitter { } }) + // Create the Registrar + this.registrar = new Registrar({ peerStore: this.peerStore }) + this.handle = this.handle.bind(this) + this.registrar.handle = this.handle + // Setup the transport manager this.transportManager = new TransportManager({ libp2p: this, @@ -100,22 +109,26 @@ class Libp2p extends EventEmitter { }) } + this.dialer = new Dialer({ + transportManager: this.transportManager + }) + // Attach stream multiplexers if (this._modules.streamMuxer) { const muxers = this._modules.streamMuxer muxers.forEach((muxer) => { this.upgrader.muxers.set(muxer.multicodec, muxer) }) + + // Add the identify service since we can multiplex + this.dialer.identifyService = new IdentifyService({ + registrar: this.registrar, + peerInfo: this.peerInfo, + protocols: this.upgrader.protocols + }) + this.handle(Object.values(IDENTIFY_PROTOCOLS), this.dialer.identifyService.handleMessage) } - this.dialer = new Dialer({ - transportManager: this.transportManager - }) - - this.registrar = new Registrar({ peerStore: this.peerStore }) - this.handle = this.handle.bind(this) - this.registrar.handle = this.handle - // Attach private network protector if (this._modules.connProtector) { this.upgrader.protector = this._modules.connProtector @@ -338,13 +351,15 @@ class Libp2p extends EventEmitter { /** * Registers the `handler` for each protocol * @param {string[]|string} protocols - * @param {function({ stream:*, protocol:string })} handler + * @param {function({ connection:*, stream:*, protocol:string })} handler */ handle (protocols, handler) { protocols = Array.isArray(protocols) ? protocols : [protocols] protocols.forEach(protocol => { this.upgrader.protocols.set(protocol, handler) }) + + this.dialer.identifyService.pushToPeerStore(this.peerStore) } /** @@ -357,6 +372,8 @@ class Libp2p extends EventEmitter { protocols.forEach(protocol => { this.upgrader.protocols.delete(protocol) }) + + this.dialer.identifyService.pushToPeerStore(this.peerStore) } async _onStarting () { diff --git a/src/upgrader.js b/src/upgrader.js index 7947d6b8..1699451a 100644 --- a/src/upgrader.js +++ b/src/upgrader.js @@ -182,10 +182,14 @@ class Upgrader { // Run anytime a remote stream is created onStream: async muxedStream => { const mss = new Multistream.Listener(muxedStream) - const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) - log('%s: incoming stream opened on %s', direction, protocol) - connection.addStream(stream, protocol) - this._onStream({ stream, protocol }) + try { + const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys())) + log('%s: incoming stream opened on %s', direction, protocol) + connection.addStream(stream, protocol) + this._onStream({ connection, stream, protocol }) + } catch (err) { + log.error(err) + } }, // Run anytime a stream closes onStreamEnd: muxedStream => { @@ -246,12 +250,13 @@ class Upgrader { * Routes incoming streams to the correct handler * @private * @param {object} options + * @param {Connection} options.connection The connection the stream belongs to * @param {Stream} options.stream - * @param {string} protocol + * @param {string} options.protocol */ - _onStream ({ stream, protocol }) { + _onStream ({ connection, stream, protocol }) { const handler = this.protocols.get(protocol) - handler({ stream, protocol }) + handler({ connection, stream, protocol }) } /** diff --git a/src/util/index.js b/src/util/index.js index bfee1875..0fdd80ca 100644 --- a/src/util/index.js +++ b/src/util/index.js @@ -30,4 +30,18 @@ function emitFirst (emitter, events, handler) { }) } +/** + * Converts BufferList messages to Buffers + * @param {*} source + * @returns {AsyncGenerator} + */ +function toBuffer (source) { + return (async function * () { + for await (const chunk of source) { + yield Buffer.isBuffer(chunk) ? chunk : chunk.slice() + } + })() +} + module.exports.emitFirst = emitFirst +module.exports.toBuffer = toBuffer diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index b23d4f13..1aca5987 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -218,5 +218,28 @@ describe('Dialing (direct, WebSockets)', () => { await connection.close() expect(libp2p.dialer.connectToMultiaddr.callCount).to.equal(1) }) + + it('should run identify automatically after connecting', async () => { + libp2p = new Libp2p({ + peerInfo, + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } + }) + + sinon.spy(libp2p.dialer.identifyService, 'identify') + sinon.spy(libp2p.peerStore, 'update') + + const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) + expect(connection).to.exist() + // Wait for setImmediate to trigger the identify call + await delay(1) + expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1) + await libp2p.dialer.identifyService.identify.firstCall.returnValue + + expect(libp2p.peerStore.update.callCount).to.equal(1) + }) }) }) diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js new file mode 100644 index 00000000..114b5156 --- /dev/null +++ b/test/identify/index.spec.js @@ -0,0 +1,247 @@ +'use strict' +/* eslint-env mocha */ + +const chai = require('chai') +chai.use(require('dirty-chai')) +const { expect } = chai +const sinon = require('sinon') + +const delay = require('delay') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const duplexPair = require('it-pair/duplex') +const multiaddr = require('multiaddr') + +const { codes: Errors } = require('../../src/errors') +const { IdentifyService, multicodecs } = require('../../src/identify') +const Peers = require('../fixtures/peers') +const Libp2p = require('../../src') +const baseOptions = require('../utils/base-options.browser') + +const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser') +const remoteAddr = MULTIADDRS_WEBSOCKETS[0] + +describe('Identify', () => { + let localPeer + let remotePeer + const protocols = new Map([ + [multicodecs.IDENTIFY, () => {}], + [multicodecs.IDENTIFY_PUSH, () => {}] + ]) + + before(async () => { + [localPeer, remotePeer] = (await Promise.all([ + PeerId.createFromJSON(Peers[0]), + PeerId.createFromJSON(Peers[1]) + ])).map(id => new PeerInfo(id)) + }) + + afterEach(() => { + sinon.restore() + }) + + it('should be able to identify another peer', async () => { + const localIdentify = new IdentifyService({ + peerInfo: localPeer, + protocols, + registrar: { + peerStore: { + update: () => {} + } + } + }) + const remoteIdentify = new IdentifyService({ + peerInfo: remotePeer, + protocols + }) + + const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') + const localConnectionMock = { newStream: () => {} } + const remoteConnectionMock = { remoteAddr: observedAddr } + + const [local, remote] = duplexPair() + sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY }) + + sinon.spy(localIdentify.registrar.peerStore, 'update') + + // Run identify + await Promise.all([ + localIdentify.identify(localConnectionMock, remotePeer.id), + remoteIdentify.handleMessage({ + connection: remoteConnectionMock, + stream: remote, + protocol: multicodecs.IDENTIFY + }) + ]) + + expect(localIdentify.registrar.peerStore.update.callCount).to.equal(1) + // Validate the remote peer gets updated in the peer store + const call = localIdentify.registrar.peerStore.update.firstCall + expect(call.args[0].id.bytes).to.equal(remotePeer.id.bytes) + }) + + it('should throw if identified peer is the wrong peer', async () => { + const localIdentify = new IdentifyService({ + peerInfo: localPeer, + protocols + }) + const remoteIdentify = new IdentifyService({ + peerInfo: remotePeer, + protocols + }) + + const observedAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') + const localConnectionMock = { newStream: () => {} } + const remoteConnectionMock = { remoteAddr: observedAddr } + + const [local, remote] = duplexPair() + sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY }) + + // Run identify + try { + await Promise.all([ + localIdentify.identify(localConnectionMock, localPeer.id), + remoteIdentify.handleMessage({ + connection: remoteConnectionMock, + stream: remote, + protocol: multicodecs.IDENTIFY + }) + ]) + expect.fail('should have thrown') + } catch (err) { + expect(err).to.exist() + expect(err.code).to.eql(Errors.ERR_INVALID_PEER) + } + }) + + describe('push', () => { + it('should be able to push identify updates to another peer', async () => { + const localIdentify = new IdentifyService({ + peerInfo: localPeer, + registrar: { getConnection: () => {} }, + protocols: new Map([ + [multicodecs.IDENTIFY], + [multicodecs.IDENTIFY_PUSH], + ['/echo/1.0.0'] + ]) + }) + const remoteIdentify = new IdentifyService({ + peerInfo: remotePeer, + registrar: { + peerStore: { + update: () => {} + } + } + }) + + // Setup peer protocols and multiaddrs + const localProtocols = new Set([multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH, '/echo/1.0.0']) + const listeningAddr = multiaddr('/ip4/127.0.0.1/tcp/1234') + sinon.stub(localPeer.multiaddrs, 'toArray').returns([listeningAddr]) + sinon.stub(localPeer, 'protocols').value(localProtocols) + sinon.stub(remotePeer, 'protocols').value(new Set([multicodecs.IDENTIFY, multicodecs.IDENTIFY_PUSH])) + + const localConnectionMock = { newStream: () => {} } + const remoteConnectionMock = { remotePeer: localPeer.id } + + const [local, remote] = duplexPair() + sinon.stub(localConnectionMock, 'newStream').returns({ stream: local, protocol: multicodecs.IDENTIFY_PUSH }) + + sinon.spy(IdentifyService, 'updatePeerAddresses') + sinon.spy(IdentifyService, 'updatePeerProtocols') + sinon.spy(remoteIdentify.registrar.peerStore, 'update') + + // Run identify + await Promise.all([ + localIdentify.push([localConnectionMock]), + remoteIdentify.handleMessage({ + connection: remoteConnectionMock, + stream: remote, + protocol: multicodecs.IDENTIFY_PUSH + }) + ]) + + expect(IdentifyService.updatePeerAddresses.callCount).to.equal(1) + expect(IdentifyService.updatePeerProtocols.callCount).to.equal(1) + + expect(remoteIdentify.registrar.peerStore.update.callCount).to.equal(1) + const [peerInfo] = remoteIdentify.registrar.peerStore.update.firstCall.args + expect(peerInfo.id.bytes).to.eql(localPeer.id.bytes) + expect(peerInfo.multiaddrs.toArray()).to.eql([listeningAddr]) + expect(peerInfo.protocols).to.eql(localProtocols) + }) + }) + + describe('libp2p.dialer.identifyService', () => { + let peerInfo + let libp2p + let remoteLibp2p + + before(async () => { + const peerId = await PeerId.createFromJSON(Peers[0]) + peerInfo = new PeerInfo(peerId) + }) + + afterEach(async () => { + sinon.restore() + libp2p && await libp2p.stop() + libp2p = null + }) + + after(async () => { + remoteLibp2p && await remoteLibp2p.stop() + }) + + it('should run identify automatically after connecting', async () => { + libp2p = new Libp2p({ + ...baseOptions, + peerInfo + }) + + sinon.spy(libp2p.dialer.identifyService, 'identify') + sinon.spy(libp2p.peerStore, 'update') + + const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) + expect(connection).to.exist() + // Wait for nextTick to trigger the identify call + await delay(1) + expect(libp2p.dialer.identifyService.identify.callCount).to.equal(1) + await libp2p.dialer.identifyService.identify.firstCall.returnValue + + expect(libp2p.peerStore.update.callCount).to.equal(1) + await connection.close() + }) + + it('should push protocol updates to an already connected peer', async () => { + libp2p = new Libp2p({ + ...baseOptions, + peerInfo + }) + + sinon.spy(libp2p.dialer.identifyService, 'identify') + sinon.spy(libp2p.dialer.identifyService, 'push') + sinon.spy(libp2p.peerStore, 'update') + + const connection = await libp2p.dialer.connectToMultiaddr(remoteAddr) + expect(connection).to.exist() + // Wait for nextTick to trigger the identify call + await delay(1) + + // Wait for identify to finish + await libp2p.dialer.identifyService.identify.firstCall.returnValue + + libp2p.handle('/echo/2.0.0', () => {}) + libp2p.unhandle('/echo/2.0.0') + + // Verify the remote peer is notified of both changes + expect(libp2p.dialer.identifyService.push.callCount).to.equal(2) + for (const call of libp2p.dialer.identifyService.push.getCalls()) { + const [connections] = call.args + expect(connections.length).to.equal(1) + expect(connections[0].remotePeer.toB58String()).to.equal(remoteAddr.getPeerId()) + const results = await call.returnValue + expect(results.length).to.equal(1) + } + }) + }) +}) diff --git a/test/upgrading/upgrader.spec.js b/test/upgrading/upgrader.spec.js index ff30810b..69d0686f 100644 --- a/test/upgrading/upgrader.spec.js +++ b/test/upgrading/upgrader.spec.js @@ -361,16 +361,14 @@ describe('libp2p.upgrader', () => { } }) - expect(libp2p.upgrader.protocols.size).to.equal(0) + expect(libp2p.upgrader.protocols).to.not.have.any.keys(['/echo/1.0.0', '/echo/1.0.1']) const echoHandler = () => {} libp2p.handle(['/echo/1.0.0', '/echo/1.0.1'], echoHandler) - expect(libp2p.upgrader.protocols.size).to.equal(2) expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(echoHandler) expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler) libp2p.unhandle(['/echo/1.0.0']) - expect(libp2p.upgrader.protocols.size).to.equal(1) expect(libp2p.upgrader.protocols.get('/echo/1.0.0')).to.equal(undefined) expect(libp2p.upgrader.protocols.get('/echo/1.0.1')).to.equal(echoHandler) }) diff --git a/test/utils/base-options.browser.js b/test/utils/base-options.browser.js new file mode 100644 index 00000000..3a1337a0 --- /dev/null +++ b/test/utils/base-options.browser.js @@ -0,0 +1,13 @@ +'use strict' + +const Transport = require('libp2p-websockets') +const Muxer = require('libp2p-mplex') +const Crypto = require('../../src/insecure/plaintext') + +module.exports = { + modules: { + transport: [Transport], + streamMuxer: [Muxer], + connEncryption: [Crypto] + } +}