diff --git a/doc/API.md b/doc/API.md index 636d57bb..69e0ca4f 100644 --- a/doc/API.md +++ b/doc/API.md @@ -73,7 +73,6 @@ * [`libp2p`](#libp2p) * [`libp2p.connectionManager`](#libp2pconnectionmanager) * [`libp2p.peerStore`](#libp2ppeerStore) - * [`libp2p.transportManager`](#libp2ptransportmanager) * [Types](#types) * [`Stats`](#stats) @@ -1988,14 +1987,6 @@ This event will be triggered anytime we are disconnected from another peer, rega - `peerId`: instance of [`PeerId`][peer-id] - `protocols`: array of known, supported protocols for the peer (string identifiers) -### libp2p.transportManager - -#### Listening addresses change - -This event will be triggered anytime the listening addresses change. - -`libp2p.transportManager.on('listening', () => {})` - ## Types ### Stats diff --git a/src/circuit/listener.js b/src/circuit/listener.js index 207150e8..02e371fb 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -21,7 +21,7 @@ module.exports = (libp2p) => { if (deleted) { // Announce listen addresses change - listener.emit('listening') + listener.emit('close') } }) diff --git a/src/identify/index.js b/src/identify/index.js index dad7e496..d7202ed9 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -64,11 +64,6 @@ class IdentifyService { */ this.connectionManager = libp2p.connectionManager - /** - * @property {TransportManager} - */ - this.transportManager = libp2p.transportManager - /** * @property {PeerId} */ @@ -89,10 +84,11 @@ class IdentifyService { this.identify(connection, peerId).catch(log.error) }) - // When new addresses are used for listening, update self peer record - this.transportManager.on('listening', async () => { - await this._createSelfPeerRecord() - this.pushToPeerStore() + // When self multiaddrs change, trigger identify-push + this.peerStore.on('change:multiaddrs', ({ peerId }) => { + if (peerId.toString() === this.peerId.toString()) { + this.pushToPeerStore() + } }) } @@ -103,7 +99,7 @@ class IdentifyService { * @returns {Promise} */ async push (connections) { - const signedPeerRecord = await this._getSelfPeerRecord() + const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId) const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes) const protocols = Array.from(this._protocols.keys()) @@ -253,7 +249,7 @@ class IdentifyService { publicKey = this.peerId.pubKey.bytes } - const signedPeerRecord = await this._getSelfPeerRecord() + const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId) const message = Message.encode({ protocolVersion: PROTOCOL_VERSION, @@ -323,40 +319,6 @@ class IdentifyService { // Update the protocols this.peerStore.protoBook.set(id, message.protocols) } - - /** - * Get self signed peer record raw envelope. - * @return {Promise} - */ - _getSelfPeerRecord () { - const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId) - - if (selfSignedPeerRecord) { - return selfSignedPeerRecord - } - - return this._createSelfPeerRecord() - } - - /** - * Create self signed peer record raw envelope. - * @return {Uint8Array} - */ - async _createSelfPeerRecord () { - try { - const peerRecord = new PeerRecord({ - peerId: this.peerId, - multiaddrs: this._libp2p.multiaddrs - }) - const envelope = await Envelope.seal(peerRecord, this.peerId) - this.peerStore.addressBook.consumePeerRecord(envelope) - - return this.peerStore.addressBook.getRawEnvelope(this.peerId) - } catch (err) { - log.error('failed to get self peer record') - } - return null - } } module.exports.IdentifyService = IdentifyService diff --git a/src/transport-manager.js b/src/transport-manager.js index 8c743fe0..08256d9a 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -1,6 +1,5 @@ 'use strict' -const { EventEmitter } = require('events') const pSettle = require('p-settle') const { codes } = require('./errors') const errCode = require('err-code') @@ -8,11 +7,10 @@ const debug = require('debug') const log = debug('libp2p:transports') log.error = debug('libp2p:transports:error') -/** - * Responsible for managing the transports and their listeners. - * @fires TransportManager#listening Emitted when listening addresses change. - */ -class TransportManager extends EventEmitter { +const Envelope = require('./record/envelope') +const PeerRecord = require('./record/peer-record') + +class TransportManager { /** * @class * @param {object} options @@ -21,8 +19,6 @@ class TransportManager extends EventEmitter { * @param {boolean} [options.faultTolerance = FAULT_TOLERANCE.FATAL_ALL] - Address listen error tolerance. */ constructor ({ libp2p, upgrader, faultTolerance = FAULT_TOLERANCE.FATAL_ALL }) { - super() - this.libp2p = libp2p this.upgrader = upgrader this._transports = new Map() @@ -71,6 +67,7 @@ class TransportManager extends EventEmitter { while (listeners.length) { const listener = listeners.pop() listener.removeAllListeners('listening') + listener.removeAllListeners('close') tasks.push(listener.close()) } } @@ -164,8 +161,9 @@ class TransportManager extends EventEmitter { const listener = transport.createListener({}, this.onConnection) this._listeners.get(key).push(listener) - // Track listen events - listener.on('listening', () => this.emit('listening')) + // Track listen/close events + listener.on('listening', () => this._createSelfPeerRecord()) + listener.on('close', () => this._createSelfPeerRecord()) // We need to attempt to listen on everything tasks.push(listener.listen(addr)) @@ -212,6 +210,7 @@ class TransportManager extends EventEmitter { // Close any running listeners for (const listener of this._listeners.get(key)) { listener.removeAllListeners('listening') + listener.removeAllListeners('close') await listener.close() } } @@ -234,6 +233,26 @@ class TransportManager extends EventEmitter { await Promise.all(tasks) } + + /** + * Create self signed peer record raw envelope. + * @return {Uint8Array} + */ + async _createSelfPeerRecord () { + try { + const peerRecord = new PeerRecord({ + peerId: this.libp2p.peerId, + multiaddrs: this.libp2p.multiaddrs + }) + const envelope = await Envelope.seal(peerRecord, this.libp2p.peerId) + this.libp2p.peerStore.addressBook.consumePeerRecord(envelope) + + return this.libp2p.peerStore.addressBook.getRawEnvelope(this.libp2p.peerId) + } catch (err) { + log.error('failed to get self peer record') + } + return null + } } /** diff --git a/test/dialing/direct.spec.js b/test/dialing/direct.spec.js index 540f528b..0659ed9a 100644 --- a/test/dialing/direct.spec.js +++ b/test/dialing/direct.spec.js @@ -349,7 +349,6 @@ describe('Dialing (direct, WebSockets)', () => { const connection = await libp2p.dial(remoteAddr) expect(connection).to.exist() - sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord') sinon.spy(libp2p.peerStore.protoBook, 'set') // Wait for onConnection to be called @@ -358,8 +357,6 @@ describe('Dialing (direct, WebSockets)', () => { expect(libp2p.identifyService.identify.callCount).to.equal(1) await libp2p.identifyService.identify.firstCall.returnValue - // Self + New peer - expect(libp2p.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2) expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1) }) diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 9443e2f5..9abbe9be 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -20,6 +20,7 @@ const { IdentifyService, multicodecs } = require('../../src/identify') const Peers = require('../fixtures/peers') const Libp2p = require('../../src') const Envelope = require('../../src/record/envelope') +const PeerRecord = require('../../src/record/peer-record') const PeerStore = require('../../src/peer-store') const baseOptions = require('../utils/base-options.browser') const pkg = require('../../package.json') @@ -52,7 +53,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs }, @@ -63,7 +63,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: listenMaddrs }, @@ -80,6 +79,9 @@ describe('Identify', () => { sinon.spy(localIdentify.peerStore.addressBook, 'consumePeerRecord') sinon.spy(localIdentify.peerStore.protoBook, 'set') + // Transport Manager creates signed peer record + await _createSelfPeerRecord(remoteIdentify._libp2p) + // Run identify await Promise.all([ localIdentify.identify(localConnectionMock), @@ -107,7 +109,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs }, @@ -118,7 +119,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: listenMaddrs }, @@ -168,7 +168,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: [] }, @@ -178,7 +177,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [] }, @@ -216,7 +214,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs }, @@ -230,7 +227,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager, - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [] } @@ -247,6 +243,10 @@ describe('Identify', () => { sinon.spy(remoteIdentify.peerStore.addressBook, 'consumePeerRecord') sinon.spy(remoteIdentify.peerStore.protoBook, 'set') + // Transport Manager creates signed peer record + await _createSelfPeerRecord(localIdentify._libp2p) + await _createSelfPeerRecord(remoteIdentify._libp2p) + // Run identify await Promise.all([ localIdentify.push([localConnectionMock]), @@ -257,7 +257,7 @@ describe('Identify', () => { }) ]) - expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(1) + expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2) expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1) const addresses = localIdentify.peerStore.addressBook.get(localPeer) @@ -279,7 +279,6 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs }, @@ -293,7 +292,6 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager, - transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [] } @@ -369,8 +367,8 @@ describe('Identify', () => { expect(connection).to.exist() // Wait for peer store to be updated - // Dialer._createDialTarget (add), Identify (consume), Create self (consume) - await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 2 && peerStoreSpyAdd.callCount === 1) + // Dialer._createDialTarget (add), Identify (consume) + await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1) expect(libp2p.identifyService.identify.callCount).to.equal(1) // The connection should have no open streams @@ -416,3 +414,18 @@ describe('Identify', () => { }) }) }) + +// Self peer record creating on Transport Manager simulation +const _createSelfPeerRecord = async (libp2p) => { + try { + const peerRecord = new PeerRecord({ + peerId: libp2p.peerId, + multiaddrs: libp2p.multiaddrs + }) + const envelope = await Envelope.seal(peerRecord, libp2p.peerId) + libp2p.peerStore.addressBook.consumePeerRecord(envelope) + + return libp2p.peerStore.addressBook.getRawEnvelope(libp2p.peerId) + } catch (_) {} + return null +} diff --git a/test/transports/transport-manager.node.js b/test/transports/transport-manager.node.js index 6e8cc12a..f8a1c633 100644 --- a/test/transports/transport-manager.node.js +++ b/test/transports/transport-manager.node.js @@ -4,12 +4,16 @@ const chai = require('chai') chai.use(require('dirty-chai')) const { expect } = chai +const sinon = require('sinon') const AddressManager = require('../../src/address-manager') const TransportManager = require('../../src/transport-manager') +const PeerStore = require('../../src/peer-store') const Transport = require('libp2p-tcp') +const PeerId = require('peer-id') const multiaddr = require('multiaddr') const mockUpgrader = require('../utils/mockUpgrader') +const Peers = require('../fixtures/peers') const addrs = [ multiaddr('/ip4/127.0.0.1/tcp/0'), multiaddr('/ip4/127.0.0.1/tcp/0') @@ -17,11 +21,17 @@ const addrs = [ describe('Transport Manager (TCP)', () => { let tm + let localPeer + + before(async () => { + localPeer = await PeerId.createFromJSON(Peers[0]) + }) before(() => { tm = new TransportManager({ libp2p: { - addressManager: new AddressManager({ listen: addrs }) + addressManager: new AddressManager({ listen: addrs }), + PeerStore: new PeerStore({ peerId: localPeer }) }, upgrader: mockUpgrader, onConnection: () => {} @@ -40,10 +50,16 @@ describe('Transport Manager (TCP)', () => { }) it('should be able to listen', async () => { + sinon.spy(tm, '_createSelfPeerRecord') + tm.add(Transport.prototype[Symbol.toStringTag], Transport) await tm.listen(addrs) expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag]) expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length) + + // Created Self Peer record on new listen address + expect(tm._createSelfPeerRecord.callCount).to.equal(addrs.length) + // Ephemeral ip addresses may result in multiple listeners expect(tm.getAddrs().length).to.equal(addrs.length) await tm.close()