From 43eda43f06c6b048fea9128c8877c2454aacf60b Mon Sep 17 00:00:00 2001 From: Vasco Santos Date: Wed, 23 Sep 2020 18:45:01 +0200 Subject: [PATCH] chore: create signed peer record on new listen addresses in transport manager --- doc/API.md | 9 ---- src/circuit/listener.js | 2 +- src/identify/index.js | 52 +++-------------------- src/transport-manager.js | 39 ++++++++++++----- test/dialing/direct.spec.js | 3 -- test/identify/index.spec.js | 39 +++++++++++------ test/transports/transport-manager.node.js | 18 +++++++- 7 files changed, 80 insertions(+), 82 deletions(-) diff --git a/doc/API.md b/doc/API.md index 11ddeb56..84d97151 100644 --- a/doc/API.md +++ b/doc/API.md @@ -73,7 +73,6 @@ * [`libp2p`](#libp2p) * [`libp2p.connectionManager`](#libp2pconnectionmanager) * [`libp2p.peerStore`](#libp2ppeerStore) - * [`libp2p.transportManager`](#libp2ptransportmanager) * [Types](#types) * [`Stats`](#stats) @@ -2020,14 +2019,6 @@ This event will be triggered anytime we are disconnected from another peer, rega - `peerId`: instance of [`PeerId`][peer-id] - `protocols`: array of known, supported protocols for the peer (string identifiers) -### libp2p.transportManager - -#### Listening addresses change - -This event will be triggered anytime the listening addresses change. - -`libp2p.transportManager.on('listening', () => {})` - ## Types ### Stats diff --git a/src/circuit/listener.js b/src/circuit/listener.js index 207150e8..02e371fb 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -21,7 +21,7 @@ module.exports = (libp2p) => { if (deleted) { // Announce listen addresses change - listener.emit('listening') + listener.emit('close') } }) diff --git a/src/identify/index.js b/src/identify/index.js index 44e2f630..58ab2e95 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -64,11 +64,6 @@ class IdentifyService { */ this.connectionManager = libp2p.connectionManager - /** - * @property {TransportManager} - */ - this.transportManager = libp2p.transportManager - /** * @property {PeerId} */ @@ -96,10 +91,11 @@ class IdentifyService { this.identify(connection).catch(log.error) }) - // When new addresses are used for listening, update self peer record - this.transportManager.on('listening', async () => { - await this._createSelfPeerRecord() - this.pushToPeerStore() + // When self multiaddrs change, trigger identify-push + this.peerStore.on('change:multiaddrs', ({ peerId }) => { + if (peerId.toString() === this.peerId.toString()) { + this.pushToPeerStore() + } }) } @@ -110,7 +106,7 @@ class IdentifyService { * @returns {Promise} */ async push (connections) { - const signedPeerRecord = await this._getSelfPeerRecord() + const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId) const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes) const protocols = Array.from(this._protocols.keys()) @@ -260,7 +256,7 @@ class IdentifyService { publicKey = this.peerId.pubKey.bytes } - const signedPeerRecord = await this._getSelfPeerRecord() + const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId) const message = Message.encode({ protocolVersion: this._host.protocolVersion, @@ -330,40 +326,6 @@ class IdentifyService { // Update the protocols this.peerStore.protoBook.set(id, message.protocols) } - - /** - * Get self signed peer record raw envelope. - * @return {Promise} - */ - _getSelfPeerRecord () { - const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId) - - if (selfSignedPeerRecord) { - return selfSignedPeerRecord - } - - return this._createSelfPeerRecord() - } - - /** - * Create self signed peer record raw envelope. - * @return {Uint8Array} - */ - async _createSelfPeerRecord () { - try { - const peerRecord = new PeerRecord({ - peerId: this.peerId, - multiaddrs: this._libp2p.multiaddrs - }) - const envelope = await Envelope.seal(peerRecord, this.peerId) - this.peerStore.addressBook.consumePeerRecord(envelope) - - return this.peerStore.addressBook.getRawEnvelope(this.peerId) - } catch (err) { - log.error('failed to get self peer record') - } - return null - } } module.exports.IdentifyService = IdentifyService diff --git a/src/transport-manager.js b/src/transport-manager.js index d7b2af80..64aa18ec 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -1,6 +1,5 @@ 'use strict' -const { EventEmitter } = require('events') const pSettle = require('p-settle') const { codes } = require('./errors') const errCode = require('err-code') @@ -8,11 +7,10 @@ const debug = require('debug') const log = debug('libp2p:transports') log.error = debug('libp2p:transports:error') -/** - * Responsible for managing the transports and their listeners. - * @fires TransportManager#listening Emitted when listening addresses change. - */ -class TransportManager extends EventEmitter { +const Envelope = require('./record/envelope') +const PeerRecord = require('./record/peer-record') + +class TransportManager { /** * @class * @param {object} options @@ -21,8 +19,6 @@ class TransportManager extends EventEmitter { * @param {boolean} [options.faultTolerance = FAULT_TOLERANCE.FATAL_ALL] - Address listen error tolerance. */ constructor ({ libp2p, upgrader, faultTolerance = FAULT_TOLERANCE.FATAL_ALL }) { - super() - this.libp2p = libp2p this.upgrader = upgrader this._transports = new Map() @@ -73,6 +69,7 @@ class TransportManager extends EventEmitter { while (listeners.length) { const listener = listeners.pop() listener.removeAllListeners('listening') + listener.removeAllListeners('close') tasks.push(listener.close()) } } @@ -166,8 +163,9 @@ class TransportManager extends EventEmitter { const listener = transport.createListener(this._listenerOptions.get(key), this.onConnection) this._listeners.get(key).push(listener) - // Track listen events - listener.on('listening', () => this.emit('listening')) + // Track listen/close events + listener.on('listening', () => this._createSelfPeerRecord()) + listener.on('close', () => this._createSelfPeerRecord()) // We need to attempt to listen on everything tasks.push(listener.listen(addr)) @@ -214,6 +212,7 @@ class TransportManager extends EventEmitter { // Close any running listeners for (const listener of this._listeners.get(key)) { listener.removeAllListeners('listening') + listener.removeAllListeners('close') await listener.close() } } @@ -236,6 +235,26 @@ class TransportManager extends EventEmitter { await Promise.all(tasks) } + + /** + * Create self signed peer record raw envelope. + * @return {Uint8Array} + */ + async _createSelfPeerRecord () { + try { + const peerRecord = new PeerRecord({ + peerId: this.libp2p.peerId, + multiaddrs: this.libp2p.multiaddrs + }) + const envelope = await Envelope.seal(peerRecord, this.libp2p.peerId) + this.libp2p.peerStore.addressBook.consumePeerRecord(envelope) + + return this.libp2p.peerStore.addressBook.getRawEnvelope(this.libp2p.peerId) + } catch (err) { + log.error('failed to get self peer record') + } + return null + } } /** diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index f7aa6d99..8ef78a91 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -349,7 +349,6 @@ describe('Dialing (direct, WebSockets)', () => { const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() - sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord') sinon.spy(libp2p.peerStore.protoBook, 'set') // Wait for onConnection to be called @@ -358,8 +357,6 @@ describe('Dialing (direct, WebSockets)', () => { expect(libp2p.identifyService.identify.callCount).to.equal(1) await libp2p.identifyService.identify.firstCall.returnValue - // Self + New peer - expect(libp2p.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2) expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1) }) diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 3b3a631c..7356497f 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -20,6 +20,7 @@ const { IdentifyService, multicodecs } = require('../../src/identify') const Peers = require('../fixtures/peers') const Libp2p = require('../../src') const Envelope = require('../../src/record/envelope') +const PeerRecord = require('../../src/record/peer-record') const PeerStore = require('../../src/peer-store') const baseOptions = require('../utils/base-options.browser') const pkg = require('../../package.json') @@ -52,7 +53,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -64,7 +64,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -82,6 +81,9 @@ describe('Identify', () => { sinon.spy(localIdentify.peerStore.addressBook, 'consumePeerRecord') sinon.spy(localIdentify.peerStore.protoBook, 'set') + // Transport Manager creates signed peer record + await _createSelfPeerRecord(remoteIdentify._libp2p) + // Run identify await Promise.all([ localIdentify.identify(localConnectionMock), @@ -109,7 +111,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -121,7 +122,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -172,7 +172,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: [], _options: { host: {} } @@ -183,7 +182,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [], _options: { host: {} } @@ -252,7 +250,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -267,7 +264,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager, - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [], _options: { host: {} } @@ -285,6 +281,10 @@ describe('Identify', () => { sinon.spy(remoteIdentify.peerStore.addressBook, 'consumePeerRecord') sinon.spy(remoteIdentify.peerStore.protoBook, 'set') + // Transport Manager creates signed peer record + await _createSelfPeerRecord(localIdentify._libp2p) + await _createSelfPeerRecord(remoteIdentify._libp2p) + // Run identify await Promise.all([ localIdentify.push([localConnectionMock]), @@ -295,7 +295,7 @@ describe('Identify', () => { }) ]) - expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(1) + expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2) expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1) const addresses = localIdentify.peerStore.addressBook.get(localPeer) @@ -317,7 +317,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -332,7 +331,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager, - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [], _options: { host: {} } @@ -409,8 +407,8 @@ describe('Identify', () => { expect(connection).to.exist() // Wait for peer store to be updated - // Dialer._createDialTarget (add), Identify (consume), Create self (consume) - await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 2 && peerStoreSpyAdd.callCount === 1) + // Dialer._createDialTarget (add), Identify (consume) + await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1) expect(libp2p.identifyService.identify.callCount).to.equal(1) // The connection should have no open streams @@ -474,3 +472,18 @@ describe('Identify', () => { }) }) }) + +// Self peer record creating on Transport Manager simulation +const _createSelfPeerRecord = async (libp2p) => { + try { + const peerRecord = new PeerRecord({ + peerId: libp2p.peerId, + multiaddrs: libp2p.multiaddrs + }) + const envelope = await Envelope.seal(peerRecord, libp2p.peerId) + libp2p.peerStore.addressBook.consumePeerRecord(envelope) + + return libp2p.peerStore.addressBook.getRawEnvelope(libp2p.peerId) + } catch (_) {} + return null +} diff --git a/test/transports/transport-manager.node.js b/test/transports/transport-manager.node.js index 58bc8737..d88d20f5 100644 --- a/test/transports/transport-manager.node.js +++ b/test/transports/transport-manager.node.js @@ -4,13 +4,17 @@ const chai = require('chai') chai.use(require('dirty-chai')) const { expect } = chai +const sinon = require('sinon') const AddressManager = require('../../src/address-manager') const TransportManager = require('../../src/transport-manager') +const PeerStore = require('../../src/peer-store') const Transport = require('libp2p-tcp') +const PeerId = require('peer-id') const multiaddr = require('multiaddr') const mockUpgrader = require('../utils/mockUpgrader') const sinon = require('sinon') +const Peers = require('../fixtures/peers') const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/0'), multiaddr('/ip4/127.0.0.1/tcp/0') @@ -18,11 +22,17 @@ const addrs = [ describe('Transport Manager (TCP)', () => { let tm + let localPeer + + before(async () => { + localPeer = await PeerId.createFromJSON(Peers[0]) + }) before(() => { tm = new TransportManager({ libp2p: { - addressManager: new AddressManager({ listen: addrs }) + addressManager: new AddressManager({ listen: addrs }), + PeerStore: new PeerStore({ peerId: localPeer }) }, upgrader: mockUpgrader, onConnection: () => {} @@ -41,12 +51,18 @@ describe('Transport Manager (TCP)', () => { }) it('should be able to listen', async () => { + sinon.spy(tm, '_createSelfPeerRecord') + tm.add(Transport.prototype[Symbol.toStringTag], Transport, { listenerOptions: { listen: 'carefully' } }) const transport = tm._transports.get(Transport.prototype[Symbol.toStringTag]) const spyListener = sinon.spy(transport, 'createListener') await tm.listen() expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag]) expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length) + + // Created Self Peer record on new listen address + expect(tm._createSelfPeerRecord.callCount).to.equal(addrs.length) + // Ephemeral ip addresses may result in multiple listeners expect(tm.getAddrs().length).to.equal(addrs.length) await tm.close()