diff --git a/doc/API.md b/doc/API.md index 84d97151..11ddeb56 100644 --- a/doc/API.md +++ b/doc/API.md @@ -73,6 +73,7 @@ * [`libp2p`](#libp2p) * [`libp2p.connectionManager`](#libp2pconnectionmanager) * [`libp2p.peerStore`](#libp2ppeerStore) + * [`libp2p.transportManager`](#libp2ptransportmanager) * [Types](#types) * [`Stats`](#stats) @@ -2019,6 +2020,14 @@ This event will be triggered anytime we are disconnected from another peer, rega - `peerId`: instance of [`PeerId`][peer-id] - `protocols`: array of known, supported protocols for the peer (string identifiers) +### libp2p.transportManager + +#### Listening addresses change + +This event will be triggered anytime the listening addresses change. + +`libp2p.transportManager.on('listening', () => {})` + ## Types ### Stats diff --git a/src/circuit/auto-relay.js b/src/circuit/auto-relay.js index 71e95131..bec8c3a8 100644 --- a/src/circuit/auto-relay.js +++ b/src/circuit/auto-relay.js @@ -143,8 +143,7 @@ class AutoRelay { try { await this._transportManager.listen([multiaddr(listenAddr)]) - // Announce multiaddrs update on listen success - await this._libp2p.identifyService.pushToPeerStore() + // Announce multiaddrs will update on listen success by TransportManager event being triggered } catch (err) { log.error(err) this._listenRelays.delete(id) diff --git a/src/circuit/listener.js b/src/circuit/listener.js index 59ca0cec..207150e8 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -20,8 +20,8 @@ module.exports = (libp2p) => { const deleted = listeningAddrs.delete(connection.remotePeer.toB58String()) if (deleted) { - // Announce multiaddrs update on listen success - libp2p.identifyService.pushToPeerStore() + // Announce listen addresses change + listener.emit('listening') } }) diff --git a/src/identify/index.js b/src/identify/index.js index 3e3b01ab..44e2f630 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -64,11 +64,10 @@ class IdentifyService { */ this.connectionManager = libp2p.connectionManager - this.connectionManager.on('peer:connect', (connection) => { - const peerId = connection.remotePeer - - this.identify(connection, peerId).catch(log.error) - }) + /** + * @property {TransportManager} + */ + this.transportManager = libp2p.transportManager /** * @property {PeerId} @@ -93,6 +92,15 @@ class IdentifyService { this.peerStore.metadataBook.set(this.peerId, 'AgentVersion', uint8ArrayFromString(this._host.agentVersion)) this.peerStore.metadataBook.set(this.peerId, 'ProtocolVersion', uint8ArrayFromString(this._host.protocolVersion)) + this.connectionManager.on('peer:connect', (connection) => { + this.identify(connection).catch(log.error) + }) + + // When new addresses are used for listening, update self peer record + this.transportManager.on('listening', async () => { + await this._createSelfPeerRecord() + this.pushToPeerStore() + }) } /** @@ -325,34 +333,23 @@ class IdentifyService { /** * Get self signed peer record raw envelope. - * - * @returns {Uint8Array} + * @return {Promise} */ - async _getSelfPeerRecord () { - // Update self peer record if needed - await this._createOrUpdateSelfPeerRecord() + _getSelfPeerRecord () { + const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId) - return this.peerStore.addressBook.getRawEnvelope(this.peerId) + if (selfSignedPeerRecord) { + return selfSignedPeerRecord + } + + return this._createSelfPeerRecord() } /** - * Creates or updates the self peer record if it exists and is outdated. - * @return {Promise} + * Create self signed peer record raw envelope. + * @return {Uint8Array} */ - async _createOrUpdateSelfPeerRecord () { - const selfPeerRecordEnvelope = await this.peerStore.addressBook.getPeerRecord(this.peerId) - - if (selfPeerRecordEnvelope) { - const peerRecord = PeerRecord.createFromProtobuf(selfPeerRecordEnvelope.payload) - - const mIntersection = peerRecord.multiaddrs.filter((m) => this._libp2p.multiaddrs.some((newM) => m.equals(newM))) - if (mIntersection.length === this._libp2p.multiaddrs.length) { - // Same multiaddrs as already existing in the record, no need to proceed - return - } - } - - // Create / Update Peer record + async _createSelfPeerRecord () { try { const peerRecord = new PeerRecord({ peerId: this.peerId, @@ -360,9 +357,12 @@ class IdentifyService { }) const envelope = await Envelope.seal(peerRecord, this.peerId) this.peerStore.addressBook.consumePeerRecord(envelope) + + return this.peerStore.addressBook.getRawEnvelope(this.peerId) } catch (err) { - log.error('failed to create self peer record') + log.error('failed to get self peer record') } + return null } } diff --git a/src/index.js b/src/index.js index b62df6ee..5c5456af 100644 --- a/src/index.js +++ b/src/index.js @@ -269,7 +269,6 @@ class Libp2p extends EventEmitter { await this.transportManager.close() ping.unmount(this) - this.dialer.destroy() } catch (err) { if (err) { diff --git a/src/transport-manager.js b/src/transport-manager.js index a324f07a..d7b2af80 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -1,5 +1,6 @@ 'use strict' +const { EventEmitter } = require('events') const pSettle = require('p-settle') const { codes } = require('./errors') const errCode = require('err-code') @@ -7,7 +8,11 @@ const debug = require('debug') const log = debug('libp2p:transports') log.error = debug('libp2p:transports:error') -class TransportManager { +/** + * Responsible for managing the transports and their listeners. + * @fires TransportManager#listening Emitted when listening addresses change. + */ +class TransportManager extends EventEmitter { /** * @class * @param {object} options @@ -16,6 +21,8 @@ class TransportManager { * @param {boolean} [options.faultTolerance = FAULT_TOLERANCE.FATAL_ALL] - Address listen error tolerance. */ constructor ({ libp2p, upgrader, faultTolerance = FAULT_TOLERANCE.FATAL_ALL }) { + super() + this.libp2p = libp2p this.upgrader = upgrader this._transports = new Map() @@ -65,6 +72,7 @@ class TransportManager { log('closing listeners for %s', key) while (listeners.length) { const listener = listeners.pop() + listener.removeAllListeners('listening') tasks.push(listener.close()) } } @@ -158,6 +166,9 @@ class TransportManager { const listener = transport.createListener(this._listenerOptions.get(key), this.onConnection) this._listeners.get(key).push(listener) + // Track listen events + listener.on('listening', () => this.emit('listening')) + // We need to attempt to listen on everything tasks.push(listener.listen(addr)) } @@ -202,6 +213,7 @@ class TransportManager { if (this._listeners.has(key)) { // Close any running listeners for (const listener of this._listeners.get(key)) { + listener.removeAllListeners('listening') await listener.close() } } diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 74394dd1..3b3a631c 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -52,6 +52,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -63,6 +64,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -107,6 +109,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -118,6 +121,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -168,6 +172,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: [], _options: { host: {} } @@ -178,6 +183,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [], _options: { host: {} } @@ -246,6 +252,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -260,6 +267,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager, + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [], _options: { host: {} } @@ -309,6 +317,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs, _options: { host: {} } @@ -323,6 +332,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager, + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [], _options: { host: {} }