diff --git a/doc/API.md b/doc/API.md index 4b412dd9..cd7eedd3 100644 --- a/doc/API.md +++ b/doc/API.md @@ -71,6 +71,7 @@ * [`libp2p`](#libp2p) * [`libp2p.connectionManager`](#libp2pconnectionmanager) * [`libp2p.peerStore`](#libp2ppeerStore) + * [`libp2p.transportManager`](#libp2ptransportmanager) * [Types](#types) * [`Stats`](#stats) @@ -1985,6 +1986,14 @@ This event will be triggered anytime we are disconnected from another peer, rega - `peerId`: instance of [`PeerId`][peer-id] - `protocols`: array of known, supported protocols for the peer (string identifiers) +### libp2p.transportManager + +#### Listening addresses change + +This event will be triggered anytime the listening addresses change. + +`libp2p.transportManager.on('listening', () => {})` + ## Types ### Stats diff --git a/src/circuit/auto-relay.js b/src/circuit/auto-relay.js index 71e95131..bec8c3a8 100644 --- a/src/circuit/auto-relay.js +++ b/src/circuit/auto-relay.js @@ -143,8 +143,7 @@ class AutoRelay { try { await this._transportManager.listen([multiaddr(listenAddr)]) - // Announce multiaddrs update on listen success - await this._libp2p.identifyService.pushToPeerStore() + // Announce multiaddrs will update on listen success by TransportManager event being triggered } catch (err) { log.error(err) this._listenRelays.delete(id) diff --git a/src/circuit/listener.js b/src/circuit/listener.js index 427a8ffe..b309d0bb 100644 --- a/src/circuit/listener.js +++ b/src/circuit/listener.js @@ -20,8 +20,8 @@ module.exports = (libp2p) => { const deleted = listeningAddrs.delete(connection.remotePeer.toB58String()) if (deleted) { - // Announce multiaddrs update on listen success - libp2p.identifyService.pushToPeerStore() + // Announce listen addresses change + listener.emit('listening') } }) diff --git a/src/identify/index.js b/src/identify/index.js index 09e0c428..5b2c454a 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -63,11 +63,10 @@ class IdentifyService { */ this.connectionManager = libp2p.connectionManager - this.connectionManager.on('peer:connect', (connection) => { - const peerId = connection.remotePeer - - this.identify(connection, peerId).catch(log.error) - }) + /** + * @property {TransportManager} + */ + this.transportManager = libp2p.transportManager /** * @property {PeerId} @@ -82,6 +81,18 @@ class IdentifyService { this._protocols = protocols this.handleMessage = this.handleMessage.bind(this) + + this.connectionManager.on('peer:connect', (connection) => { + const peerId = connection.remotePeer + + this.identify(connection, peerId).catch(log.error) + }) + + // When new addresses are used for listening, update self peer record + this.transportManager.on('listening', async () => { + await this._createSelfPeerRecord() + this.pushToPeerStore() + }) } /** @@ -311,33 +322,23 @@ class IdentifyService { /** * Get self signed peer record raw envelope. - * @return {Uint8Array} + * @return {Promise} */ - async _getSelfPeerRecord () { - // Update self peer record if needed - await this._createOrUpdateSelfPeerRecord() + _getSelfPeerRecord () { + const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId) - return this.peerStore.addressBook.getRawEnvelope(this.peerId) + if (selfSignedPeerRecord) { + return selfSignedPeerRecord + } + + return this._createSelfPeerRecord() } /** - * Creates or updates the self peer record if it exists and is outdated. - * @return {Promise} + * Create self signed peer record raw envelope. + * @return {Uint8Array} */ - async _createOrUpdateSelfPeerRecord () { - const selfPeerRecordEnvelope = await this.peerStore.addressBook.getPeerRecord(this.peerId) - - if (selfPeerRecordEnvelope) { - const peerRecord = PeerRecord.createFromProtobuf(selfPeerRecordEnvelope.payload) - - const mIntersection = peerRecord.multiaddrs.filter((m) => this._libp2p.multiaddrs.some((newM) => m.equals(newM))) - if (mIntersection.length === this._libp2p.multiaddrs.length) { - // Same multiaddrs as already existing in the record, no need to proceed - return - } - } - - // Create / Update Peer record + async _createSelfPeerRecord () { try { const peerRecord = new PeerRecord({ peerId: this.peerId, @@ -345,9 +346,12 @@ class IdentifyService { }) const envelope = await Envelope.seal(peerRecord, this.peerId) this.peerStore.addressBook.consumePeerRecord(envelope) + + return this.peerStore.addressBook.getRawEnvelope(this.peerId) } catch (err) { - log.error('failed to create self peer record') + log.error('failed to get self peer record') } + return null } } diff --git a/src/index.js b/src/index.js index c48b26b3..806e80d4 100644 --- a/src/index.js +++ b/src/index.js @@ -268,7 +268,6 @@ class Libp2p extends EventEmitter { await this.transportManager.close() ping.unmount(this) - this.dialer.destroy() } catch (err) { if (err) { diff --git a/src/transport-manager.js b/src/transport-manager.js index 5030c594..4ebf9fff 100644 --- a/src/transport-manager.js +++ b/src/transport-manager.js @@ -1,5 +1,6 @@ 'use strict' +const { EventEmitter } = require('events') const pSettle = require('p-settle') const { codes } = require('./errors') const errCode = require('err-code') @@ -7,7 +8,11 @@ const debug = require('debug') const log = debug('libp2p:transports') log.error = debug('libp2p:transports:error') -class TransportManager { +/** + * Responsible for managing the transports and their listeners. + * @fires TransportManager#listening Emitted when listening addresses change. + */ +class TransportManager extends EventEmitter { /** * @constructor * @param {object} options @@ -16,6 +21,8 @@ class TransportManager { * @param {boolean} [options.faultTolerance = FAULT_TOLERANCE.FATAL_ALL] Address listen error tolerance. */ constructor ({ libp2p, upgrader, faultTolerance = FAULT_TOLERANCE.FATAL_ALL }) { + super() + this.libp2p = libp2p this.upgrader = upgrader this._transports = new Map() @@ -62,6 +69,7 @@ class TransportManager { log('closing listeners for %s', key) while (listeners.length) { const listener = listeners.pop() + listener.removeAllListeners('listening') tasks.push(listener.close()) } } @@ -150,6 +158,9 @@ class TransportManager { const listener = transport.createListener({}, this.onConnection) this._listeners.get(key).push(listener) + // Track listen events + listener.on('listening', () => this.emit('listening')) + // We need to attempt to listen on everything tasks.push(listener.listen(addr)) } @@ -194,6 +205,7 @@ class TransportManager { if (this._listeners.has(key)) { // Close any running listeners for (const listener of this._listeners.get(key)) { + listener.removeAllListeners('listening') await listener.close() } } diff --git a/test/identify/index.spec.js b/test/identify/index.spec.js index 799a3ebd..2a70e527 100644 --- a/test/identify/index.spec.js +++ b/test/identify/index.spec.js @@ -52,6 +52,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs }, @@ -62,6 +63,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: listenMaddrs }, @@ -105,6 +107,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs }, @@ -115,6 +118,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: listenMaddrs }, @@ -164,6 +168,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: [] }, @@ -173,6 +178,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [] }, @@ -210,6 +216,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs }, @@ -223,6 +230,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager, + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [] } @@ -271,6 +279,7 @@ describe('Identify', () => { libp2p: { peerId: localPeer, connectionManager: new EventEmitter(), + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: localPeer }), multiaddrs: listenMaddrs }, @@ -284,6 +293,7 @@ describe('Identify', () => { libp2p: { peerId: remotePeer, connectionManager, + transportManager: new EventEmitter(), peerStore: new PeerStore({ peerId: remotePeer }), multiaddrs: [] }