mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 10:32:14 +00:00
chore: use listening events to create self peer record on updates
This commit is contained in:
parent
971655ff27
commit
ee8ee5b49b
@ -73,6 +73,7 @@
|
||||
* [`libp2p`](#libp2p)
|
||||
* [`libp2p.connectionManager`](#libp2pconnectionmanager)
|
||||
* [`libp2p.peerStore`](#libp2ppeerStore)
|
||||
* [`libp2p.transportManager`](#libp2ptransportmanager)
|
||||
* [Types](#types)
|
||||
* [`Stats`](#stats)
|
||||
|
||||
@ -1987,6 +1988,14 @@ This event will be triggered anytime we are disconnected from another peer, rega
|
||||
- `peerId`: instance of [`PeerId`][peer-id]
|
||||
- `protocols`: array of known, supported protocols for the peer (string identifiers)
|
||||
|
||||
### libp2p.transportManager
|
||||
|
||||
#### Listening addresses change
|
||||
|
||||
This event will be triggered anytime the listening addresses change.
|
||||
|
||||
`libp2p.transportManager.on('listening', () => {})`
|
||||
|
||||
## Types
|
||||
|
||||
### Stats
|
||||
|
@ -143,8 +143,7 @@ class AutoRelay {
|
||||
|
||||
try {
|
||||
await this._transportManager.listen([multiaddr(listenAddr)])
|
||||
// Announce multiaddrs update on listen success
|
||||
await this._libp2p.identifyService.pushToPeerStore()
|
||||
// Announce multiaddrs will update on listen success by TransportManager event being triggered
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
this._listenRelays.delete(id)
|
||||
|
@ -20,8 +20,8 @@ module.exports = (libp2p) => {
|
||||
const deleted = listeningAddrs.delete(connection.remotePeer.toB58String())
|
||||
|
||||
if (deleted) {
|
||||
// Announce multiaddrs update on listen success
|
||||
libp2p.identifyService.pushToPeerStore()
|
||||
// Announce listen addresses change
|
||||
listener.emit('listening')
|
||||
}
|
||||
})
|
||||
|
||||
|
@ -64,11 +64,10 @@ class IdentifyService {
|
||||
*/
|
||||
this.connectionManager = libp2p.connectionManager
|
||||
|
||||
this.connectionManager.on('peer:connect', (connection) => {
|
||||
const peerId = connection.remotePeer
|
||||
|
||||
this.identify(connection, peerId).catch(log.error)
|
||||
})
|
||||
/**
|
||||
* @property {TransportManager}
|
||||
*/
|
||||
this.transportManager = libp2p.transportManager
|
||||
|
||||
/**
|
||||
* @property {PeerId}
|
||||
@ -83,6 +82,18 @@ class IdentifyService {
|
||||
this._protocols = protocols
|
||||
|
||||
this.handleMessage = this.handleMessage.bind(this)
|
||||
|
||||
this.connectionManager.on('peer:connect', (connection) => {
|
||||
const peerId = connection.remotePeer
|
||||
|
||||
this.identify(connection, peerId).catch(log.error)
|
||||
})
|
||||
|
||||
// When new addresses are used for listening, update self peer record
|
||||
this.transportManager.on('listening', async () => {
|
||||
await this._createSelfPeerRecord()
|
||||
this.pushToPeerStore()
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
@ -315,34 +326,23 @@ class IdentifyService {
|
||||
|
||||
/**
|
||||
* Get self signed peer record raw envelope.
|
||||
*
|
||||
* @returns {Uint8Array}
|
||||
* @return {Promise<Uint8Array>}
|
||||
*/
|
||||
async _getSelfPeerRecord () {
|
||||
// Update self peer record if needed
|
||||
await this._createOrUpdateSelfPeerRecord()
|
||||
_getSelfPeerRecord () {
|
||||
const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId)
|
||||
|
||||
return this.peerStore.addressBook.getRawEnvelope(this.peerId)
|
||||
if (selfSignedPeerRecord) {
|
||||
return selfSignedPeerRecord
|
||||
}
|
||||
|
||||
return this._createSelfPeerRecord()
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates or updates the self peer record if it exists and is outdated.
|
||||
* @return {Promise<void>}
|
||||
* Create self signed peer record raw envelope.
|
||||
* @return {Uint8Array}
|
||||
*/
|
||||
async _createOrUpdateSelfPeerRecord () {
|
||||
const selfPeerRecordEnvelope = await this.peerStore.addressBook.getPeerRecord(this.peerId)
|
||||
|
||||
if (selfPeerRecordEnvelope) {
|
||||
const peerRecord = PeerRecord.createFromProtobuf(selfPeerRecordEnvelope.payload)
|
||||
|
||||
const mIntersection = peerRecord.multiaddrs.filter((m) => this._libp2p.multiaddrs.some((newM) => m.equals(newM)))
|
||||
if (mIntersection.length === this._libp2p.multiaddrs.length) {
|
||||
// Same multiaddrs as already existing in the record, no need to proceed
|
||||
return
|
||||
}
|
||||
}
|
||||
|
||||
// Create / Update Peer record
|
||||
async _createSelfPeerRecord () {
|
||||
try {
|
||||
const peerRecord = new PeerRecord({
|
||||
peerId: this.peerId,
|
||||
@ -350,9 +350,12 @@ class IdentifyService {
|
||||
})
|
||||
const envelope = await Envelope.seal(peerRecord, this.peerId)
|
||||
this.peerStore.addressBook.consumePeerRecord(envelope)
|
||||
|
||||
return this.peerStore.addressBook.getRawEnvelope(this.peerId)
|
||||
} catch (err) {
|
||||
log.error('failed to create self peer record')
|
||||
log.error('failed to get self peer record')
|
||||
}
|
||||
return null
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -269,7 +269,6 @@ class Libp2p extends EventEmitter {
|
||||
await this.transportManager.close()
|
||||
|
||||
ping.unmount(this)
|
||||
|
||||
this.dialer.destroy()
|
||||
} catch (err) {
|
||||
if (err) {
|
||||
|
@ -1,5 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
const { EventEmitter } = require('events')
|
||||
const pSettle = require('p-settle')
|
||||
const { codes } = require('./errors')
|
||||
const errCode = require('err-code')
|
||||
@ -7,7 +8,11 @@ const debug = require('debug')
|
||||
const log = debug('libp2p:transports')
|
||||
log.error = debug('libp2p:transports:error')
|
||||
|
||||
class TransportManager {
|
||||
/**
|
||||
* Responsible for managing the transports and their listeners.
|
||||
* @fires TransportManager#listening Emitted when listening addresses change.
|
||||
*/
|
||||
class TransportManager extends EventEmitter {
|
||||
/**
|
||||
* @class
|
||||
* @param {object} options
|
||||
@ -16,6 +21,8 @@ class TransportManager {
|
||||
* @param {boolean} [options.faultTolerance = FAULT_TOLERANCE.FATAL_ALL] - Address listen error tolerance.
|
||||
*/
|
||||
constructor ({ libp2p, upgrader, faultTolerance = FAULT_TOLERANCE.FATAL_ALL }) {
|
||||
super()
|
||||
|
||||
this.libp2p = libp2p
|
||||
this.upgrader = upgrader
|
||||
this._transports = new Map()
|
||||
@ -63,6 +70,7 @@ class TransportManager {
|
||||
log('closing listeners for %s', key)
|
||||
while (listeners.length) {
|
||||
const listener = listeners.pop()
|
||||
listener.removeAllListeners('listening')
|
||||
tasks.push(listener.close())
|
||||
}
|
||||
}
|
||||
@ -156,6 +164,9 @@ class TransportManager {
|
||||
const listener = transport.createListener({}, this.onConnection)
|
||||
this._listeners.get(key).push(listener)
|
||||
|
||||
// Track listen events
|
||||
listener.on('listening', () => this.emit('listening'))
|
||||
|
||||
// We need to attempt to listen on everything
|
||||
tasks.push(listener.listen(addr))
|
||||
}
|
||||
@ -200,6 +211,7 @@ class TransportManager {
|
||||
if (this._listeners.has(key)) {
|
||||
// Close any running listeners
|
||||
for (const listener of this._listeners.get(key)) {
|
||||
listener.removeAllListeners('listening')
|
||||
await listener.close()
|
||||
}
|
||||
}
|
||||
|
@ -52,6 +52,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
multiaddrs: listenMaddrs
|
||||
},
|
||||
@ -62,6 +63,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
multiaddrs: listenMaddrs
|
||||
},
|
||||
@ -105,6 +107,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
multiaddrs: listenMaddrs
|
||||
},
|
||||
@ -115,6 +118,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
multiaddrs: listenMaddrs
|
||||
},
|
||||
@ -164,6 +168,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
multiaddrs: []
|
||||
},
|
||||
@ -173,6 +178,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
multiaddrs: []
|
||||
},
|
||||
@ -210,6 +216,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
multiaddrs: listenMaddrs
|
||||
},
|
||||
@ -223,6 +230,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager,
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
multiaddrs: []
|
||||
}
|
||||
@ -271,6 +279,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: localPeer,
|
||||
connectionManager: new EventEmitter(),
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: localPeer }),
|
||||
multiaddrs: listenMaddrs
|
||||
},
|
||||
@ -284,6 +293,7 @@ describe('Identify', () => {
|
||||
libp2p: {
|
||||
peerId: remotePeer,
|
||||
connectionManager,
|
||||
transportManager: new EventEmitter(),
|
||||
peerStore: new PeerStore({ peerId: remotePeer }),
|
||||
multiaddrs: []
|
||||
}
|
||||
|
Loading…
x
Reference in New Issue
Block a user