Compare commits

...

8 Commits

Author SHA1 Message Date
87da0cd86d feat: discovery api 2020-10-02 11:54:11 +02:00
894d8e3cac chore: address review 2020-09-25 18:00:17 +02:00
6e94724074 chore: add identify test for multiaddr change 2020-09-25 18:00:17 +02:00
6b7b1d7714 chore: create signed peer record on new listen addresses in transport manager 2020-09-25 18:00:17 +02:00
1bb6735cd3 chore: use listening events to create self peer record on updates 2020-09-25 18:00:17 +02:00
4848313cb3 chore: _isStarted is false when stop starts 2020-09-25 18:00:17 +02:00
da5785d18e chore: auto relay multiaddr update push 2020-09-25 18:00:17 +02:00
72fe150e64 feat: auto relay (#723)
* feat: auto relay

* fix: leverage protoBook events to ask relay peers if they support hop

* chore: refactor disconnect

* chore: do not listen on a relayed conn

* chore: tweaks

* chore: improve _listenOnAvailableHopRelays logic

* chore: default value of 1 to maxListeners on auto-relay
2020-09-25 17:35:38 +02:00
24 changed files with 1167 additions and 80 deletions

View File

@ -20,6 +20,8 @@
* [`contentRouting.put`](#contentroutingput) * [`contentRouting.put`](#contentroutingput)
* [`contentRouting.get`](#contentroutingget) * [`contentRouting.get`](#contentroutingget)
* [`contentRouting.getMany`](#contentroutinggetmany) * [`contentRouting.getMany`](#contentroutinggetmany)
* [`discovery.advertise`](#discoveryadvertise)
* [`discovery.findPeers`](#discoveryfindpeers)
* [`peerRouting.findPeer`](#peerroutingfindpeer) * [`peerRouting.findPeer`](#peerroutingfindpeer)
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd) * [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete) * [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
@ -666,6 +668,64 @@ const key = '/key'
const { from, val } = await libp2p.contentRouting.get(key) const { from, val } = await libp2p.contentRouting.get(key)
``` ```
### discovery.advertise
Advertise services on the network.
This will use content routing modules and rendezvous if enabled.
`libp2p.discovery.advertise(namespace, options)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| namespace | `string` | namespace of the service to advertise |
| [options] | `object` | advertise options |
#### Returns
| Type | Description |
|------|-------------|
| `Promise<void>` | Promise resolves once advertise messages are sent |
#### Example
```js
// ...
const namespace = '/libp2p/relay'
await libp2p.discovery.advertise(namespace)
```
### discovery.findPeers
Discover peers providing a given service.
This will use content routing modules and rendezvous if enabled and will store the peers data in the PeerStore.
`libp2p.discovery.findPeers(namespace, options)`
#### Parameters
| Name | Type | Description |
|------|------|-------------|
| namespace | `string` | namespace of the service to find peers |
| [options] | `object` | find peers options |
| [options.limit] | `number` | number of distinct peers to find |
#### Returns
| Type | Description |
|------|-------------|
| `AsyncIterable<PeerId>}` | Async iterator for peers |
#### Example
```js
// Iterate over the peers found for the given namespace
for await (const peer of libp2p.discovery.findPeers(namespace)) {
console.log(peer)
}
```
### peerRouting.findPeer ### peerRouting.findPeer
Iterates over all peer routers in series to find the given peer. If the DHT is enabled, it will be tried first. Iterates over all peer routers in series to find the given peer. If the DHT is enabled, it will be tried first.

230
src/circuit/auto-relay.js Normal file
View File

@ -0,0 +1,230 @@
'use strict'
const debug = require('debug')
const log = debug('libp2p:auto-relay')
log.error = debug('libp2p:auto-relay:error')
const uint8ArrayFromString = require('uint8arrays/from-string')
const uint8ArrayToString = require('uint8arrays/to-string')
const multiaddr = require('multiaddr')
const PeerId = require('peer-id')
const { relay: multicodec } = require('./multicodec')
const { canHop } = require('./circuit/hop')
const circuitProtoCode = 290
const hopMetadataKey = 'hop_relay'
const hopMetadataValue = 'true'
class AutoRelay {
/**
* Creates an instance of AutoRelay.
* @constructor
* @param {object} props
* @param {Libp2p} props.libp2p
* @param {number} [props.maxListeners = 1] maximum number of relays to listen.
*/
constructor ({ libp2p, maxListeners = 1 }) {
this._libp2p = libp2p
this._peerId = libp2p.peerId
this._peerStore = libp2p.peerStore
this._connectionManager = libp2p.connectionManager
this._transportManager = libp2p.transportManager
this.maxListeners = maxListeners
/**
* @type {Set<string>}
*/
this._listenRelays = new Set()
this._onProtocolChange = this._onProtocolChange.bind(this)
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
this._peerStore.on('change:protocols', this._onProtocolChange)
this._connectionManager.on('peer:disconnect', this._onPeerDisconnected)
}
/**
* Check if a peer supports the relay protocol.
* If the protocol is not supported, check if it was supported before and remove it as a listen relay.
* If the protocol is supported, check if the peer supports **HOP** and add it as a listener if
* inside the threshold.
* @param {Object} props
* @param {PeerId} props.peerId
* @param {Array<string>} props.protocols
* @return {Promise<void>}
*/
async _onProtocolChange ({ peerId, protocols }) {
const id = peerId.toB58String()
// Check if it has the protocol
const hasProtocol = protocols.find(protocol => protocol === multicodec)
// If no protocol, check if we were keeping the peer before as a listenRelay
if (!hasProtocol && this._listenRelays.has(id)) {
this._removeListenRelay(id)
return
} else if (!hasProtocol || this._listenRelays.has(id)) {
return
}
// If protocol, check if can hop, store info in the metadataBook and listen on it
try {
const connection = this._connectionManager.get(peerId)
// Do not hop on a relayed connection
if (connection.remoteAddr.protoCodes().includes(circuitProtoCode)) {
log(`relayed connection to ${id} will not be used to hop on`)
return
}
const supportsHop = await canHop({ connection })
if (supportsHop) {
this._peerStore.metadataBook.set(peerId, hopMetadataKey, uint8ArrayFromString(hopMetadataValue))
await this._addListenRelay(connection, id)
}
} catch (err) {
log.error(err)
}
}
/**
* Peer disconnects.
* @param {Connection} connection connection to the peer
* @return {void}
*/
_onPeerDisconnected (connection) {
const peerId = connection.remotePeer
const id = peerId.toB58String()
// Not listening on this relay
if (!this._listenRelays.has(id)) {
return
}
this._removeListenRelay(id)
}
/**
* Attempt to listen on the given relay connection.
* @private
* @param {Connection} connection connection to the peer
* @param {string} id peer identifier string
* @return {Promise<void>}
*/
async _addListenRelay (connection, id) {
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}
// Create relay listen addr
let listenAddr, remoteMultiaddr
try {
const remoteAddrs = this._peerStore.addressBook.get(connection.remotePeer)
// TODO: HOP Relays should avoid advertising private addresses!
remoteMultiaddr = remoteAddrs.find(a => a.isCertified).multiaddr // Get first announced address certified
} catch (_) {
log.error(`${id} does not have announced certified multiaddrs`)
return
}
if (!remoteMultiaddr.protoNames().includes('p2p')) {
listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit`
} else {
listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit`
}
// Attempt to listen on relay
this._listenRelays.add(id)
try {
await this._transportManager.listen([multiaddr(listenAddr)])
// Announce multiaddrs will update on listen success by TransportManager event being triggered
} catch (err) {
log.error(err)
this._listenRelays.delete(id)
}
}
/**
* Remove listen relay.
* @private
* @param {string} id peer identifier string.
* @return {void}
*/
_removeListenRelay (id) {
if (this._listenRelays.delete(id)) {
// TODO: this should be responsibility of the connMgr
this._listenOnAvailableHopRelays([id])
}
}
/**
* Try to listen on available hop relay connections.
* The following order will happen while we do not have enough relays.
* 1. Check the metadata store for known relays, try to listen on the ones we are already connected.
* 2. Dial and try to listen on the peers we know that support hop but are not connected.
* 3. Search the network.
* @param {Array<string>} [peersToIgnore]
* @return {Promise<void>}
*/
async _listenOnAvailableHopRelays (peersToIgnore = []) {
// TODO: The peer redial issue on disconnect should be handled by connection gating
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}
const knownHopsToDial = []
// Check if we have known hop peers to use and attempt to listen on the already connected
for (const [id, metadataMap] of this._peerStore.metadataBook.data.entries()) {
// Continue to next if listening on this or peer to ignore
if (this._listenRelays.has(id) || peersToIgnore.includes(id)) {
continue
}
const supportsHop = metadataMap.get(hopMetadataKey)
// Continue to next if it does not support Hop
if (!supportsHop || uint8ArrayToString(supportsHop) !== hopMetadataValue) {
continue
}
const peerId = PeerId.createFromCID(id)
const connection = this._connectionManager.get(peerId)
// If not connected, store for possible later use.
if (!connection) {
knownHopsToDial.push(peerId)
continue
}
await this._addListenRelay(connection, id)
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}
}
// Try to listen on known peers that are not connected
for (const peerId of knownHopsToDial) {
const connection = await this._libp2p.dial(peerId)
await this._addListenRelay(connection, peerId.toB58String())
// Check if already listening on enough relays
if (this._listenRelays.size >= this.maxListeners) {
return
}
}
// TODO: Try to find relays to hop on the network
}
}
module.exports = AutoRelay

View File

@ -117,6 +117,33 @@ module.exports.hop = async function hop ({
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED) throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
} }
/**
* Performs a CAN_HOP request to a relay peer, in order to understand its capabilities.
* @param {object} options
* @param {Connection} options.connection Connection to the relay
* @returns {Promise<boolean>}
*/
module.exports.canHop = async function canHop ({
connection
}) {
// Create a new stream to the relay
const { stream } = await connection.newStream([multicodec.relay])
// Send the HOP request
const streamHandler = new StreamHandler({ stream })
streamHandler.write({
type: CircuitPB.Type.CAN_HOP
})
const response = await streamHandler.read()
await streamHandler.close()
if (response.code !== CircuitPB.Status.SUCCESS) {
return false
}
return true
}
/** /**
* Creates an unencoded CAN_HOP response based on the Circuits configuration * Creates an unencoded CAN_HOP response based on the Circuits configuration
* @private * @private

View File

@ -1,16 +1,18 @@
'use strict' 'use strict'
const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')
const mafmt = require('mafmt') const mafmt = require('mafmt')
const multiaddr = require('multiaddr') const multiaddr = require('multiaddr')
const PeerId = require('peer-id') const PeerId = require('peer-id')
const withIs = require('class-is') const withIs = require('class-is')
const { CircuitRelay: CircuitPB } = require('./protocol') const { CircuitRelay: CircuitPB } = require('./protocol')
const debug = require('debug')
const log = debug('libp2p:circuit')
log.error = debug('libp2p:circuit:error')
const toConnection = require('libp2p-utils/src/stream-to-ma-conn') const toConnection = require('libp2p-utils/src/stream-to-ma-conn')
const AutoRelay = require('./auto-relay')
const { relay: multicodec } = require('./multicodec') const { relay: multicodec } = require('./multicodec')
const createListener = require('./listener') const createListener = require('./listener')
const { handleCanHop, handleHop, hop } = require('./circuit/hop') const { handleCanHop, handleHop, hop } = require('./circuit/hop')
@ -35,11 +37,19 @@ class Circuit {
this._libp2p = libp2p this._libp2p = libp2p
this.peerId = libp2p.peerId this.peerId = libp2p.peerId
this._registrar.handle(multicodec, this._onProtocol.bind(this)) this._registrar.handle(multicodec, this._onProtocol.bind(this))
// Create autoRelay if enabled
this._autoRelay = this._options.autoRelay.enabled && new AutoRelay({ libp2p, ...this._options.autoRelay })
} }
async _onProtocol ({ connection, stream, protocol }) { async _onProtocol ({ connection, stream }) {
const streamHandler = new StreamHandler({ stream }) const streamHandler = new StreamHandler({ stream })
const request = await streamHandler.read() const request = await streamHandler.read()
if (!request) {
return
}
const circuit = this const circuit = this
let virtualConnection let virtualConnection
@ -163,7 +173,7 @@ class Circuit {
// Called on successful HOP and STOP requests // Called on successful HOP and STOP requests
this.handler = handler this.handler = handler
return createListener(this, options) return createListener(this._libp2p, options)
} }
/** /**

View File

@ -8,13 +8,23 @@ const log = debug('libp2p:circuit:listener')
log.err = debug('libp2p:circuit:error:listener') log.err = debug('libp2p:circuit:error:listener')
/** /**
* @param {*} circuit * @param {Libp2p} libp2p
* @returns {Listener} a transport listener * @returns {Listener} a transport listener
*/ */
module.exports = (circuit) => { module.exports = (libp2p) => {
const listener = new EventEmitter() const listener = new EventEmitter()
const listeningAddrs = new Map() const listeningAddrs = new Map()
// Remove listeningAddrs when a peer disconnects
libp2p.connectionManager.on('peer:disconnect', (connection) => {
const deleted = listeningAddrs.delete(connection.remotePeer.toB58String())
if (deleted) {
// Announce listen addresses change
listener.emit('close')
}
})
/** /**
* Add swarm handler and listen for incoming connections * Add swarm handler and listen for incoming connections
* *
@ -24,7 +34,7 @@ module.exports = (circuit) => {
listener.listen = async (addr) => { listener.listen = async (addr) => {
const addrString = String(addr).split('/p2p-circuit').find(a => a !== '') const addrString = String(addr).split('/p2p-circuit').find(a => a !== '')
const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString)) const relayConn = await libp2p.dial(multiaddr(addrString))
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit') const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr) listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)

View File

@ -54,6 +54,10 @@ const DefaultConfig = {
hop: { hop: {
enabled: false, enabled: false,
active: false active: false
},
autoRelay: {
enabled: false,
maxListeners: 2
} }
}, },
transport: {} transport: {}

View File

@ -16,6 +16,7 @@ module.exports = (node) => {
} }
return { return {
routers,
/** /**
* Iterates over all content routers in series to find providers of the given key. * Iterates over all content routers in series to find providers of the given key.
* Once a content router succeeds, iteration will stop. * Once a content router succeeds, iteration will stop.
@ -31,6 +32,8 @@ module.exports = (node) => {
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE') throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
} }
// TODO: Abortables
const result = await pAny( const result = await pAny(
routers.map(async (router) => { routers.map(async (router) => {
const provs = await all(router.findProviders(key, options)) const provs = await all(router.findProviders(key, options))

96
src/discovery/index.js Normal file
View File

@ -0,0 +1,96 @@
'use strict'
const debug = require('debug')
const log = debug('libp2p:discovery')
log.error = debug('libp2p:discovery:error')
const errCode = require('err-code')
// const AbortController = require('abort-controller')
const { parallelMerge } = require('streaming-iterables')
const Rendezvous = require('./rendezvous')
const Routing = require('./routing')
const { codes } = require('../errors')
module.exports = (libp2p) => {
const addressBook = libp2p.peerStore.addressBook
const routing = Routing(libp2p)
const rendezvous = Rendezvous(libp2p)
const getDiscoveryAvailableIts = (namespace, options) => {
if (routing.isEnabled && rendezvous.isEnabled) {
return parallelMerge(
routing.findPeers(namespace, options),
rendezvous.findPeers(namespace, options)
)
} else if (routing.isEnabled) {
return routing.findPeers(namespace, options)
}
return rendezvous.findPeers(namespace, options)
}
return {
/**
* Advertise services on the network.
* @param {string} namespace
* @param {object} [options]
* @returns {Promise<void>}
*/
async advertise (namespace, options) {
if (!routing.isEnabled && !rendezvous.isEnabled) {
throw errCode(new Error('no discovery implementations available'), codes.ERR_NO_DISCOVERY_IMPLEMENTATIONS)
}
return Promise.all([
routing.isEnabled && routing.advertise(namespace, options),
rendezvous.isEnabled && rendezvous.advertise(namespace, options)
])
},
/**
* Discover peers providing a given service.
* @param {string} namespace
* @param {object} [options]
* @param {number} [options.limit] number of distinct peers to find
* @param {AsyncIterable<PeerId>}
*/
async * findPeers (namespace, options = {}) {
if (!routing.isEnabled && !rendezvous.isEnabled) {
throw errCode(new Error('no discovery implementations available'), codes.ERR_NO_DISCOVERY_IMPLEMENTATIONS)
}
const discoveredPeers = new Set()
// TODO: add abort controller
const discAsyncIt = getDiscoveryAvailableIts(namespace, options)
// Store in the AddressBook: signed record or uncertified
for await (const { signedPeerRecord, id, multiaddrs } of discAsyncIt) {
if (signedPeerRecord) {
const idStr = signedPeerRecord.peerId.toB58String()
const isNew = !discoveredPeers.has(idStr)
discoveredPeers.add(idStr)
// Consume peer record and yield if new
if (addressBook.consumePeerRecord(signedPeerRecord) && isNew) {
yield signedPeerRecord.peerId
}
} else if (id && multiaddrs) {
const idStr = id.toB58String()
const isNew = !discoveredPeers.has(idStr)
discoveredPeers.add(idStr)
addressBook.add(id, multiaddrs)
if (isNew) {
yield
}
}
// Abort if enough
if (options.limit && options.limit <= discoveredPeers.size) {
console.log('abort')
}
}
}
}
}
// TODO: who handles reprovide??

View File

@ -0,0 +1,38 @@
'use strict'
/**
* RendezvousDiscovery is an implementation of service discovery
* using Rendezvous. Namespaces represent services supported by other nodes.
* @param {Libp2p} libp2p
*/
module.exports = libp2p => {
const rendezvous = libp2p.rendezvous
const isEnabled = !!rendezvous
return {
isEnabled,
/**
* Advertise services on the network using the rendezvous module.
* @param {string} namespace
* @param {object} [options]
* @param {object} [options.ttl = 7200e3] registration ttl in ms (minimum 120)
* @returns {Promise<number>}
*/
advertise(namespace, options) {
return rendezvous.register(namespace, options)
},
/**
* Discover peers providing a given service.
* @param {string} namespace
* @param {object} [options]
* @param {number} [options.limit] limit of peers to discover
* @returns {AsyncIterable<{ signedPeerRecord: Envelope }>}
*/
async * findPeers(namespace, options) {
// TODO: Abortables + options
for await (const peer of rendezvous.discover(namespace, options)) {
yield peer
}
}
}
}

47
src/discovery/routing.js Normal file
View File

@ -0,0 +1,47 @@
'use strict'
const { namespaceToCid } = require('./utils')
/**
* RoutingDiscovery is an implementation of service discovery
* using ContentRouting. Namespaces represent services supported by other nodes.
* @param {Libp2p} libp2p
*/
module.exports = libp2p => {
const contentRouting = libp2p.contentRouting
const isEnabled = !!contentRouting.routers.length
return {
isEnabled,
/**
* Advertise services on the network using content routing modules.
* @param {string} namespace
* @param {object} [options]
* @returns {Promise<void>}
*/
async advertise(namespace, options) {
const cid = await namespaceToCid(namespace)
// TODO: options?
await contentRouting.provide(cid)
},
/**
* Discover peers providing a given service.
* @param {string} namespace
* @param {object} [options]
* @param {number} [options.limit] limit of peers to discover
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
*/
async * findPeers(namespace, options = {}) {
const cid = await namespaceToCid(namespace)
const providerOptions = {
maxNumProviders: options.limit
}
// TODO: Abortables + options
for await (const peer of contentRouting.findProviders(cid, providerOptions)) {
yield peer
}
}
}
}

16
src/discovery/utils.js Normal file
View File

@ -0,0 +1,16 @@
'use strict'
const CID = require('cids')
const multihashing = require('multihashing-async')
/**
* Convert a namespace string into a cid.
* @param {string} namespace
* @return {Promise<CID>}
*/
module.exports.namespaceToCid = async (namespace) => {
const bytes = new TextEncoder('utf8').encode(namespace)
const hash = await multihashing(bytes, 'sha2-256')
return new CID(hash)
}

View File

@ -16,6 +16,7 @@ exports.codes = {
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED', ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
ERR_ALREADY_ABORTED: 'ERR_ALREADY_ABORTED', ERR_ALREADY_ABORTED: 'ERR_ALREADY_ABORTED',
ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES', ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES',
ERR_NO_DISCOVERY_IMPLEMENTATIONS: 'ERR_NO_DISCOVERY_IMPLEMENTATIONS',
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF', ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT', ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED', ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',

View File

@ -63,12 +63,6 @@ class IdentifyService {
*/ */
this.connectionManager = libp2p.connectionManager this.connectionManager = libp2p.connectionManager
this.connectionManager.on('peer:connect', (connection) => {
const peerId = connection.remotePeer
this.identify(connection, peerId).catch(log.error)
})
/** /**
* @property {PeerId} * @property {PeerId}
*/ */
@ -82,6 +76,19 @@ class IdentifyService {
this._protocols = protocols this._protocols = protocols
this.handleMessage = this.handleMessage.bind(this) this.handleMessage = this.handleMessage.bind(this)
this.connectionManager.on('peer:connect', (connection) => {
const peerId = connection.remotePeer
this.identify(connection, peerId).catch(log.error)
})
// When self multiaddrs change, trigger identify-push
this.peerStore.on('change:multiaddrs', ({ peerId }) => {
if (peerId.toString() === this.peerId.toString()) {
this.pushToPeerStore()
}
})
} }
/** /**
@ -90,7 +97,7 @@ class IdentifyService {
* @returns {Promise<void>} * @returns {Promise<void>}
*/ */
async push (connections) { async push (connections) {
const signedPeerRecord = await this._getSelfPeerRecord() const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)
const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes) const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes)
const protocols = Array.from(this._protocols.keys()) const protocols = Array.from(this._protocols.keys())
@ -119,12 +126,12 @@ class IdentifyService {
/** /**
* Calls `push` for all peers in the `peerStore` that are connected * Calls `push` for all peers in the `peerStore` that are connected
* @param {PeerStore} peerStore * @returns {void}
*/ */
pushToPeerStore (peerStore) { pushToPeerStore () {
const connections = [] const connections = []
let connection let connection
for (const peer of peerStore.peers.values()) { for (const peer of this.peerStore.peers.values()) {
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) { if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) {
connections.push(connection) connections.push(connection)
} }
@ -239,7 +246,7 @@ class IdentifyService {
publicKey = this.peerId.pubKey.bytes publicKey = this.peerId.pubKey.bytes
} }
const signedPeerRecord = await this._getSelfPeerRecord() const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)
const message = Message.encode({ const message = Message.encode({
protocolVersion: PROTOCOL_VERSION, protocolVersion: PROTOCOL_VERSION,
@ -308,33 +315,6 @@ class IdentifyService {
// Update the protocols // Update the protocols
this.peerStore.protoBook.set(id, message.protocols) this.peerStore.protoBook.set(id, message.protocols)
} }
/**
* Get self signed peer record raw envelope.
* @return {Uint8Array}
*/
async _getSelfPeerRecord () {
const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId)
// TODO: support invalidation when dynamic multiaddrs are supported
if (selfSignedPeerRecord) {
return selfSignedPeerRecord
}
try {
const peerRecord = new PeerRecord({
peerId: this.peerId,
multiaddrs: this._libp2p.multiaddrs
})
const envelope = await Envelope.seal(peerRecord, this.peerId)
this.peerStore.addressBook.consumePeerRecord(envelope)
return this.peerStore.addressBook.getRawEnvelope(this.peerId)
} catch (err) {
log.error('failed to get self peer record')
}
return null
}
} }
module.exports.IdentifyService = IdentifyService module.exports.IdentifyService = IdentifyService

View File

@ -247,6 +247,7 @@ class Libp2p extends EventEmitter {
log('libp2p is stopping') log('libp2p is stopping')
try { try {
this._isStarted = false
for (const service of this._discovery.values()) { for (const service of this._discovery.values()) {
service.removeListener('peer', this._onDiscoveryPeer) service.removeListener('peer', this._onDiscoveryPeer)
} }
@ -274,7 +275,6 @@ class Libp2p extends EventEmitter {
this.emit('error', err) this.emit('error', err)
} }
} }
this._isStarted = false
log('libp2p has stopped') log('libp2p has stopped')
} }
@ -424,7 +424,7 @@ class Libp2p extends EventEmitter {
// Only push if libp2p is running // Only push if libp2p is running
if (this.isStarted() && this.identifyService) { if (this.isStarted() && this.identifyService) {
this.identifyService.pushToPeerStore(this.peerStore) this.identifyService.pushToPeerStore()
} }
} }
@ -441,13 +441,14 @@ class Libp2p extends EventEmitter {
// Only push if libp2p is running // Only push if libp2p is running
if (this.isStarted() && this.identifyService) { if (this.isStarted() && this.identifyService) {
this.identifyService.pushToPeerStore(this.peerStore) this.identifyService.pushToPeerStore()
} }
} }
async _onStarting () { async _onStarting () {
// Listen on the provided transports // Listen on the provided transports for the provided addresses
await this.transportManager.listen() const addrs = this.addressManager.getListenAddrs()
await this.transportManager.listen(addrs)
// Start PeerStore // Start PeerStore
await this.peerStore.start() await this.peerStore.start()

View File

@ -260,7 +260,7 @@ class AddressBook extends Book {
* Get the known data of a provided peer. * Get the known data of a provided peer.
* @override * @override
* @param {PeerId} peerId * @param {PeerId} peerId
* @returns {Array<data>} * @returns {Array<Address>|undefined}
*/ */
get (peerId) { get (peerId) {
if (!PeerId.isPeerId(peerId)) { if (!PeerId.isPeerId(peerId)) {

20
src/record/utils.js Normal file
View File

@ -0,0 +1,20 @@
'use strict'
const Envelope = require('./envelope')
const PeerRecord = require('./peer-record')
/**
* Create (or update if existing) self peer record and store it in the AddressBook.
* @param {libp2p} libp2p
* @returns {Promise<void>}
*/
async function updateSelfPeerRecord (libp2p) {
const peerRecord = new PeerRecord({
peerId: libp2p.peerId,
multiaddrs: libp2p.multiaddrs
})
const envelope = await Envelope.seal(peerRecord, libp2p.peerId)
libp2p.peerStore.addressBook.consumePeerRecord(envelope)
}
module.exports.updateSelfPeerRecord = updateSelfPeerRecord

View File

@ -7,6 +7,8 @@ const debug = require('debug')
const log = debug('libp2p:transports') const log = debug('libp2p:transports')
log.error = debug('libp2p:transports:error') log.error = debug('libp2p:transports:error')
const { updateSelfPeerRecord } = require('./record/utils')
class TransportManager { class TransportManager {
/** /**
* @constructor * @constructor
@ -62,6 +64,8 @@ class TransportManager {
log('closing listeners for %s', key) log('closing listeners for %s', key)
while (listeners.length) { while (listeners.length) {
const listener = listeners.pop() const listener = listeners.pop()
listener.removeAllListeners('listening')
listener.removeAllListeners('close')
tasks.push(listener.close()) tasks.push(listener.close())
} }
} }
@ -131,11 +135,10 @@ class TransportManager {
/** /**
* Starts listeners for each listen Multiaddr. * Starts listeners for each listen Multiaddr.
* @async * @async
* @param {Array<Multiaddr>} addrs addresses to attempt to listen on
*/ */
async listen () { async listen (addrs) {
const addrs = this.libp2p.addressManager.getListenAddrs() if (!addrs || addrs.length === 0) {
if (addrs.length === 0) {
log('no addresses were provided for listening, this node is dial only') log('no addresses were provided for listening, this node is dial only')
return return
} }
@ -151,6 +154,10 @@ class TransportManager {
const listener = transport.createListener({}, this.onConnection) const listener = transport.createListener({}, this.onConnection)
this._listeners.get(key).push(listener) this._listeners.get(key).push(listener)
// Track listen/close events
listener.on('listening', () => updateSelfPeerRecord(this.libp2p))
listener.on('close', () => updateSelfPeerRecord(this.libp2p))
// We need to attempt to listen on everything // We need to attempt to listen on everything
tasks.push(listener.listen(addr)) tasks.push(listener.listen(addr))
} }
@ -195,6 +202,8 @@ class TransportManager {
if (this._listeners.has(key)) { if (this._listeners.has(key)) {
// Close any running listeners // Close any running listeners
for (const listener of this._listeners.get(key)) { for (const listener of this._listeners.get(key)) {
listener.removeAllListeners('listening')
listener.removeAllListeners('close')
await listener.close() await listener.close()
} }
} }

View File

@ -40,21 +40,28 @@ describe('Dialing (direct, TCP)', () => {
let peerStore let peerStore
let remoteAddr let remoteAddr
before(async () => { beforeEach(async () => {
const [remotePeerId] = await Promise.all([ const [localPeerId, remotePeerId] = await Promise.all([
PeerId.createFromJSON(Peers[0]) PeerId.createFromJSON(Peers[0]),
PeerId.createFromJSON(Peers[1])
]) ])
peerStore = new PeerStore({ peerId: remotePeerId })
remoteTM = new TransportManager({ remoteTM = new TransportManager({
libp2p: { libp2p: {
addressManager: new AddressManager({ listen: [listenAddr] }) addressManager: new AddressManager({ listen: [listenAddr] }),
peerId: remotePeerId,
peerStore
}, },
upgrader: mockUpgrader upgrader: mockUpgrader
}) })
remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport) remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport)
peerStore = new PeerStore({ peerId: remotePeerId })
localTM = new TransportManager({ localTM = new TransportManager({
libp2p: {}, libp2p: {
peerId: localPeerId,
peerStore: new PeerStore({ peerId: localPeerId })
},
upgrader: mockUpgrader upgrader: mockUpgrader
}) })
localTM.add(Transport.prototype[Symbol.toStringTag], Transport) localTM.add(Transport.prototype[Symbol.toStringTag], Transport)
@ -64,7 +71,7 @@ describe('Dialing (direct, TCP)', () => {
remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`) remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`)
}) })
after(() => remoteTM.close()) afterEach(() => remoteTM.close())
afterEach(() => { afterEach(() => {
sinon.restore() sinon.restore()
@ -110,7 +117,7 @@ describe('Dialing (direct, TCP)', () => {
peerStore peerStore
}) })
peerStore.addressBook.set(peerId, [remoteAddr]) peerStore.addressBook.set(peerId, remoteTM.getAddrs())
const connection = await dialer.connectToPeer(peerId) const connection = await dialer.connectToPeer(peerId)
expect(connection).to.exist() expect(connection).to.exist()

View File

@ -354,7 +354,6 @@ describe('Dialing (direct, WebSockets)', () => {
const connection = await libp2p.dial(remoteAddr) const connection = await libp2p.dial(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
sinon.spy(libp2p.peerStore.protoBook, 'set') sinon.spy(libp2p.peerStore.protoBook, 'set')
// Wait for onConnection to be called // Wait for onConnection to be called
@ -363,8 +362,6 @@ describe('Dialing (direct, WebSockets)', () => {
expect(libp2p.identifyService.identify.callCount).to.equal(1) expect(libp2p.identifyService.identify.callCount).to.equal(1)
await libp2p.identifyService.identify.firstCall.returnValue await libp2p.identifyService.identify.firstCall.returnValue
// Self + New peer
expect(libp2p.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2)
expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1) expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1)
}) })

View File

@ -8,7 +8,6 @@ const { expect } = chai
const sinon = require('sinon') const sinon = require('sinon')
const { EventEmitter } = require('events') const { EventEmitter } = require('events')
const delay = require('delay')
const PeerId = require('peer-id') const PeerId = require('peer-id')
const duplexPair = require('it-pair/duplex') const duplexPair = require('it-pair/duplex')
const multiaddr = require('multiaddr') const multiaddr = require('multiaddr')
@ -22,6 +21,7 @@ const Libp2p = require('../../src')
const Envelope = require('../../src/record/envelope') const Envelope = require('../../src/record/envelope')
const PeerStore = require('../../src/peer-store') const PeerStore = require('../../src/peer-store')
const baseOptions = require('../utils/base-options.browser') const baseOptions = require('../utils/base-options.browser')
const { updateSelfPeerRecord } = require('../../src/record/utils')
const pkg = require('../../package.json') const pkg = require('../../package.json')
const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser') const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
@ -78,6 +78,9 @@ describe('Identify', () => {
sinon.spy(localIdentify.peerStore.addressBook, 'consumePeerRecord') sinon.spy(localIdentify.peerStore.addressBook, 'consumePeerRecord')
sinon.spy(localIdentify.peerStore.protoBook, 'set') sinon.spy(localIdentify.peerStore.protoBook, 'set')
// Transport Manager creates signed peer record
await updateSelfPeerRecord(remoteIdentify._libp2p)
// Run identify // Run identify
await Promise.all([ await Promise.all([
localIdentify.identify(localConnectionMock), localIdentify.identify(localConnectionMock),
@ -239,6 +242,10 @@ describe('Identify', () => {
sinon.spy(remoteIdentify.peerStore.addressBook, 'consumePeerRecord') sinon.spy(remoteIdentify.peerStore.addressBook, 'consumePeerRecord')
sinon.spy(remoteIdentify.peerStore.protoBook, 'set') sinon.spy(remoteIdentify.peerStore.protoBook, 'set')
// Transport Manager creates signed peer record
await updateSelfPeerRecord(localIdentify._libp2p)
await updateSelfPeerRecord(remoteIdentify._libp2p)
// Run identify // Run identify
await Promise.all([ await Promise.all([
localIdentify.push([localConnectionMock]), localIdentify.push([localConnectionMock]),
@ -249,7 +256,7 @@ describe('Identify', () => {
}) })
]) ])
expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(1) expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2)
expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1) expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1)
const addresses = localIdentify.peerStore.addressBook.get(localPeer) const addresses = localIdentify.peerStore.addressBook.get(localPeer)
@ -359,8 +366,8 @@ describe('Identify', () => {
expect(connection).to.exist() expect(connection).to.exist()
// Wait for peer store to be updated // Wait for peer store to be updated
// Dialer._createDialTarget (add), Identify (consume), Create self (consume) // Dialer._createDialTarget (add), Identify (consume)
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 2 && peerStoreSpyAdd.callCount === 1) await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
expect(libp2p.identifyService.identify.callCount).to.equal(1) expect(libp2p.identifyService.identify.callCount).to.equal(1)
// The connection should have no open streams // The connection should have no open streams
@ -381,8 +388,6 @@ describe('Identify', () => {
const connection = await libp2p.dialer.connectToPeer(remoteAddr) const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist() expect(connection).to.exist()
// Wait for nextTick to trigger the identify call
await delay(1)
// Wait for identify to finish // Wait for identify to finish
await libp2p.identifyService.identify.firstCall.returnValue await libp2p.identifyService.identify.firstCall.returnValue
@ -404,5 +409,39 @@ describe('Identify', () => {
// Verify the streams close // Verify the streams close
await pWaitFor(() => connection.streams.length === 0) await pWaitFor(() => connection.streams.length === 0)
}) })
it('should push multiaddr updates to an already connected peer', async () => {
libp2p = new Libp2p({
...baseOptions,
peerId
})
await libp2p.start()
sinon.spy(libp2p.identifyService, 'identify')
sinon.spy(libp2p.identifyService, 'push')
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
expect(connection).to.exist()
// Wait for identify to finish
await libp2p.identifyService.identify.firstCall.returnValue
sinon.stub(libp2p, 'isStarted').returns(true)
libp2p.peerStore.addressBook.add(libp2p.peerId, [multiaddr('/ip4/180.0.0.1/tcp/15001/ws')])
// Verify the remote peer is notified of change
expect(libp2p.identifyService.push.callCount).to.equal(1)
for (const call of libp2p.identifyService.push.getCalls()) {
const [connections] = call.args
expect(connections.length).to.equal(1)
expect(connections[0].remotePeer.toB58String()).to.equal(remoteAddr.getPeerId())
const results = await call.returnValue
expect(results.length).to.equal(1)
}
// Verify the streams close
await pWaitFor(() => connection.streams.length === 0)
})
}) })
}) })

View File

@ -0,0 +1,460 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const delay = require('delay')
const pWaitFor = require('p-wait-for')
const sinon = require('sinon')
const multiaddr = require('multiaddr')
const Libp2p = require('../../src')
const { relay: relayMulticodec } = require('../../src/circuit/multicodec')
const { createPeerId } = require('../utils/creators/peer')
const baseOptions = require('../utils/base-options')
const listenAddr = '/ip4/0.0.0.0/tcp/0'
describe('auto-relay', () => {
describe('basics', () => {
let libp2p
let relayLibp2p
let autoRelay
beforeEach(async () => {
const peerIds = await createPeerId({ number: 2 })
// Create 2 nodes, and turn HOP on for the relay
;[libp2p, relayLibp2p] = peerIds.map((peerId, index) => {
const opts = {
...baseOptions,
config: {
...baseOptions.config,
relay: {
hop: {
enabled: index !== 0
},
autoRelay: {
enabled: true,
maxListeners: 1
}
}
}
}
return new Libp2p({
...opts,
addresses: {
listen: [listenAddr]
},
connectionManager: {
autoDial: false
},
peerDiscovery: {
autoDial: false
},
peerId
})
})
autoRelay = libp2p.transportManager._transports.get('Circuit')._autoRelay
expect(autoRelay.maxListeners).to.eql(1)
})
beforeEach(() => {
// Start each node
return Promise.all([libp2p, relayLibp2p].map(libp2p => libp2p.start()))
})
afterEach(() => {
// Stop each node
return Promise.all([libp2p, relayLibp2p].map(libp2p => libp2p.stop()))
})
it('should ask if node supports hop on protocol change (relay protocol) and add to listen multiaddrs', async () => {
// Spy if a connected peer is being added as listen relay
sinon.spy(autoRelay, '_addListenRelay')
const originalMultiaddrsLength = relayLibp2p.multiaddrs.length
// Discover relay
libp2p.peerStore.addressBook.add(relayLibp2p.peerId, relayLibp2p.multiaddrs)
await libp2p.dial(relayLibp2p.peerId)
// Wait for peer added as listen relay
await pWaitFor(() => autoRelay._addListenRelay.callCount === 1)
expect(autoRelay._listenRelays.size).to.equal(1)
// Wait for listen multiaddr update
await pWaitFor(() => libp2p.multiaddrs.length === originalMultiaddrsLength + 1)
expect(libp2p.multiaddrs[originalMultiaddrsLength].getPeerId()).to.eql(relayLibp2p.peerId.toB58String())
// Peer has relay multicodec
const knownProtocols = libp2p.peerStore.protoBook.get(relayLibp2p.peerId)
expect(knownProtocols).to.include(relayMulticodec)
})
})
describe('flows with 1 listener max', () => {
let libp2p
let relayLibp2p1
let relayLibp2p2
let relayLibp2p3
let autoRelay1
beforeEach(async () => {
const peerIds = await createPeerId({ number: 4 })
// Create 4 nodes, and turn HOP on for the relay
;[libp2p, relayLibp2p1, relayLibp2p2, relayLibp2p3] = peerIds.map((peerId, index) => {
let opts = baseOptions
if (index !== 0) {
opts = {
...baseOptions,
config: {
...baseOptions.config,
relay: {
hop: {
enabled: true
},
autoRelay: {
enabled: true,
maxListeners: 1
}
}
}
}
}
return new Libp2p({
...opts,
addresses: {
listen: [listenAddr]
},
connectionManager: {
autoDial: false
},
peerDiscovery: {
autoDial: false
},
peerId
})
})
autoRelay1 = relayLibp2p1.transportManager._transports.get('Circuit')._autoRelay
expect(autoRelay1.maxListeners).to.eql(1)
})
beforeEach(() => {
// Start each node
return Promise.all([libp2p, relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.start()))
})
afterEach(() => {
// Stop each node
return Promise.all([libp2p, relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.stop()))
})
it('should ask if node supports hop on protocol change (relay protocol) and add to listen multiaddrs', async () => {
// Spy if a connected peer is being added as listen relay
sinon.spy(autoRelay1, '_addListenRelay')
// Discover relay
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
const originalMultiaddrs2Length = relayLibp2p2.multiaddrs.length
await relayLibp2p1.dial(relayLibp2p2.peerId)
// Wait for peer added as listen relay
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1)
expect(autoRelay1._listenRelays.size).to.equal(1)
// Wait for listen multiaddr update
await Promise.all([
pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1),
pWaitFor(() => relayLibp2p2.multiaddrs.length === originalMultiaddrs2Length + 1)
])
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
// Peer has relay multicodec
const knownProtocols = relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId)
expect(knownProtocols).to.include(relayMulticodec)
})
it('should be able to dial a peer from its relayed address previously added', async () => {
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
const originalMultiaddrs2Length = relayLibp2p2.multiaddrs.length
// Discover relay
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
await relayLibp2p1.dial(relayLibp2p2.peerId)
// Wait for listen multiaddr update
await Promise.all([
pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1),
pWaitFor(() => relayLibp2p2.multiaddrs.length === originalMultiaddrs2Length + 1)
])
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
// Dial from the other through a relay
const relayedMultiaddr2 = multiaddr(`${relayLibp2p1.multiaddrs[0]}/p2p/${relayLibp2p1.peerId.toB58String()}/p2p-circuit`)
libp2p.peerStore.addressBook.add(relayLibp2p2.peerId, [relayedMultiaddr2])
await libp2p.dial(relayLibp2p2.peerId)
})
it('should only add maxListeners relayed addresses', async () => {
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
const originalMultiaddrs2Length = relayLibp2p2.multiaddrs.length
// Spy if a connected peer is being added as listen relay
sinon.spy(autoRelay1, '_addListenRelay')
sinon.spy(autoRelay1._listenRelays, 'add')
// Discover one relay and connect
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
await relayLibp2p1.dial(relayLibp2p2.peerId)
expect(relayLibp2p1.connectionManager.size).to.eql(1)
// Wait for peer added as listen relay
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1 && autoRelay1._listenRelays.add.callCount === 1)
expect(autoRelay1._listenRelays.size).to.equal(1)
// Wait for listen multiaddr update
await Promise.all([
pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1),
pWaitFor(() => relayLibp2p2.multiaddrs.length === originalMultiaddrs2Length + 1)
])
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
// Relay2 has relay multicodec
const knownProtocols2 = relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId)
expect(knownProtocols2).to.include(relayMulticodec)
// Discover an extra relay and connect
relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
await relayLibp2p1.dial(relayLibp2p3.peerId)
// Wait to guarantee the dialed peer is not added as a listen relay
await delay(300)
expect(autoRelay1._addListenRelay.callCount).to.equal(2)
expect(autoRelay1._listenRelays.add.callCount).to.equal(1)
expect(autoRelay1._listenRelays.size).to.equal(1)
expect(relayLibp2p1.connectionManager.size).to.eql(2)
// Relay2 has relay multicodec
const knownProtocols3 = relayLibp2p1.peerStore.protoBook.get(relayLibp2p3.peerId)
expect(knownProtocols3).to.include(relayMulticodec)
})
it('should not listen on a relayed address if peer disconnects', async () => {
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
// Spy if identify push is fired on adding/removing listen addr
sinon.spy(relayLibp2p1.identifyService, 'pushToPeerStore')
// Discover one relay and connect
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
await relayLibp2p1.dial(relayLibp2p2.peerId)
// Wait for listenning on the relay
await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1)
expect(autoRelay1._listenRelays.size).to.equal(1)
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
// Identify push for adding listen relay multiaddr
expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(1)
// Disconnect from peer used for relay
await relayLibp2p1.hangUp(relayLibp2p2.peerId)
// Wait for removed listening on the relay
await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length)
expect(autoRelay1._listenRelays.size).to.equal(0)
// Identify push for removing listen relay multiaddr
expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(2)
})
it('should try to listen on other connected peers relayed address if one used relay disconnects', async () => {
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
// Spy if a connected peer is being added as listen relay
sinon.spy(autoRelay1, '_addListenRelay')
sinon.spy(relayLibp2p1.transportManager, 'listen')
// Discover one relay and connect
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
await relayLibp2p1.dial(relayLibp2p2.peerId)
// Discover an extra relay and connect
relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
await relayLibp2p1.dial(relayLibp2p3.peerId)
// Wait for both peer to be attempted to added as listen relay
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1)
expect(autoRelay1._listenRelays.size).to.equal(1)
expect(relayLibp2p1.connectionManager.size).to.equal(2)
// Wait for listen multiaddr update
await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1)
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
// Only one will be used for listeninng
expect(relayLibp2p1.transportManager.listen.callCount).to.equal(1)
// Spy if relay from listen map was removed
sinon.spy(autoRelay1._listenRelays, 'delete')
// Disconnect from peer used for relay
await relayLibp2p1.hangUp(relayLibp2p2.peerId)
expect(autoRelay1._listenRelays.delete.callCount).to.equal(1)
expect(autoRelay1._addListenRelay.callCount).to.equal(1)
// Wait for other peer connected to be added as listen addr
await pWaitFor(() => relayLibp2p1.transportManager.listen.callCount === 2)
expect(autoRelay1._listenRelays.size).to.equal(1)
expect(relayLibp2p1.connectionManager.size).to.eql(1)
// Wait for listen multiaddr update
await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1)
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p3.peerId.toB58String())
})
it('should try to listen on stored peers relayed address if one used relay disconnects and there are not enough connected', async () => {
// Spy if a connected peer is being added as listen relay
sinon.spy(autoRelay1, '_addListenRelay')
sinon.spy(relayLibp2p1.transportManager, 'listen')
// Discover one relay and connect
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
await relayLibp2p1.dial(relayLibp2p2.peerId)
// Discover an extra relay and connect to gather its Hop support
relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
await relayLibp2p1.dial(relayLibp2p3.peerId)
// Wait for both peer to be attempted to added as listen relay
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 2)
expect(autoRelay1._listenRelays.size).to.equal(1)
expect(relayLibp2p1.connectionManager.size).to.equal(2)
// Only one will be used for listeninng
expect(relayLibp2p1.transportManager.listen.callCount).to.equal(1)
// Disconnect not used listen relay
await relayLibp2p1.hangUp(relayLibp2p3.peerId)
expect(autoRelay1._listenRelays.size).to.equal(1)
expect(relayLibp2p1.connectionManager.size).to.equal(1)
// Spy on dial
sinon.spy(relayLibp2p1, 'dial')
// Remove peer used as relay from peerStore and disconnect it
relayLibp2p1.peerStore.delete(relayLibp2p2.peerId)
await relayLibp2p1.hangUp(relayLibp2p2.peerId)
expect(autoRelay1._listenRelays.size).to.equal(0)
expect(relayLibp2p1.connectionManager.size).to.equal(0)
// Wait for other peer connected to be added as listen addr
await pWaitFor(() => relayLibp2p1.transportManager.listen.callCount === 2)
expect(autoRelay1._listenRelays.size).to.equal(1)
expect(relayLibp2p1.connectionManager.size).to.eql(1)
})
})
describe('flows with 2 max listeners', () => {
let relayLibp2p1
let relayLibp2p2
let relayLibp2p3
let autoRelay1
let autoRelay2
beforeEach(async () => {
const peerIds = await createPeerId({ number: 3 })
// Create 3 nodes, and turn HOP on for the relay
;[relayLibp2p1, relayLibp2p2, relayLibp2p3] = peerIds.map((peerId) => {
return new Libp2p({
...baseOptions,
config: {
...baseOptions.config,
relay: {
...baseOptions.config.relay,
hop: {
enabled: true
},
autoRelay: {
enabled: true,
maxListeners: 2
}
}
},
addresses: {
listen: [listenAddr]
},
connectionManager: {
autoDial: false
},
peerDiscovery: {
autoDial: false
},
peerId
})
})
autoRelay1 = relayLibp2p1.transportManager._transports.get('Circuit')._autoRelay
autoRelay2 = relayLibp2p2.transportManager._transports.get('Circuit')._autoRelay
})
beforeEach(() => {
// Start each node
return Promise.all([relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.start()))
})
afterEach(() => {
// Stop each node
return Promise.all([relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.stop()))
})
it('should not add listener to a already relayed connection', async () => {
// Spy if a connected peer is being added as listen relay
sinon.spy(autoRelay1, '_addListenRelay')
sinon.spy(autoRelay2, '_addListenRelay')
// Relay 1 discovers Relay 3 and connect
relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
await relayLibp2p1.dial(relayLibp2p3.peerId)
// Wait for peer added as listen relay
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1)
expect(autoRelay1._listenRelays.size).to.equal(1)
// Relay 2 discovers Relay 3 and connect
relayLibp2p2.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
await relayLibp2p2.dial(relayLibp2p3.peerId)
// Wait for peer added as listen relay
await pWaitFor(() => autoRelay2._addListenRelay.callCount === 1)
expect(autoRelay2._listenRelays.size).to.equal(1)
// Relay 1 discovers Relay 2 relayed multiaddr via Relay 3
const ma2RelayedBy3 = relayLibp2p2.multiaddrs[relayLibp2p2.multiaddrs.length - 1]
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, [ma2RelayedBy3])
await relayLibp2p1.dial(relayLibp2p2.peerId)
// Peer not added as listen relay
expect(autoRelay1._addListenRelay.callCount).to.equal(1)
expect(autoRelay1._listenRelays.size).to.equal(1)
})
})
})

View File

@ -72,7 +72,7 @@ describe('Dialing (via relay, TCP)', () => {
const tcpAddrs = dstLibp2p.transportManager.getAddrs() const tcpAddrs = dstLibp2p.transportManager.getAddrs()
sinon.stub(dstLibp2p.addressManager, 'listen').value([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)]) sinon.stub(dstLibp2p.addressManager, 'listen').value([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
await dstLibp2p.transportManager.listen() await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')]) expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
const connection = await srcLibp2p.dial(dialAddr) const connection = await srcLibp2p.dial(dialAddr)
@ -157,7 +157,7 @@ describe('Dialing (via relay, TCP)', () => {
const tcpAddrs = dstLibp2p.transportManager.getAddrs() const tcpAddrs = dstLibp2p.transportManager.getAddrs()
sinon.stub(dstLibp2p.addressManager, 'getListenAddrs').returns([multiaddr(`${relayAddr}/p2p-circuit`)]) sinon.stub(dstLibp2p.addressManager, 'getListenAddrs').returns([multiaddr(`${relayAddr}/p2p-circuit`)])
await dstLibp2p.transportManager.listen() await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')]) expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
// Tamper with the our multiaddrs for the circuit message // Tamper with the our multiaddrs for the circuit message

View File

@ -7,9 +7,13 @@ const { expect } = chai
const AddressManager = require('../../src/address-manager') const AddressManager = require('../../src/address-manager')
const TransportManager = require('../../src/transport-manager') const TransportManager = require('../../src/transport-manager')
const PeerStore = require('../../src/peer-store')
const PeerRecord = require('../../src/record/peer-record')
const Transport = require('libp2p-tcp') const Transport = require('libp2p-tcp')
const PeerId = require('peer-id')
const multiaddr = require('multiaddr') const multiaddr = require('multiaddr')
const mockUpgrader = require('../utils/mockUpgrader') const mockUpgrader = require('../utils/mockUpgrader')
const Peers = require('../fixtures/peers')
const addrs = [ const addrs = [
multiaddr('/ip4/127.0.0.1/tcp/0'), multiaddr('/ip4/127.0.0.1/tcp/0'),
multiaddr('/ip4/127.0.0.1/tcp/0') multiaddr('/ip4/127.0.0.1/tcp/0')
@ -17,11 +21,19 @@ const addrs = [
describe('Transport Manager (TCP)', () => { describe('Transport Manager (TCP)', () => {
let tm let tm
let localPeer
before(() => { before(async () => {
localPeer = await PeerId.createFromJSON(Peers[0])
})
beforeEach(() => {
tm = new TransportManager({ tm = new TransportManager({
libp2p: { libp2p: {
addressManager: new AddressManager({ listen: addrs }) peerId: localPeer,
multiaddrs: addrs,
addressManager: new AddressManager({ listen: addrs }),
peerStore: new PeerStore({ peerId: localPeer })
}, },
upgrader: mockUpgrader, upgrader: mockUpgrader,
onConnection: () => {} onConnection: () => {}
@ -41,18 +53,38 @@ describe('Transport Manager (TCP)', () => {
it('should be able to listen', async () => { it('should be able to listen', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport) tm.add(Transport.prototype[Symbol.toStringTag], Transport)
await tm.listen() await tm.listen(addrs)
expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag]) expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag])
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length) expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length)
// Ephemeral ip addresses may result in multiple listeners // Ephemeral ip addresses may result in multiple listeners
expect(tm.getAddrs().length).to.equal(addrs.length) expect(tm.getAddrs().length).to.equal(addrs.length)
await tm.close() await tm.close()
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(0) expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(0)
}) })
it('should create self signed peer record on listen', async () => {
let signedPeerRecord = await tm.libp2p.peerStore.addressBook.getPeerRecord(localPeer)
expect(signedPeerRecord).to.not.exist()
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
await tm.listen(addrs)
// Should created Self Peer record on new listen address
signedPeerRecord = await tm.libp2p.peerStore.addressBook.getPeerRecord(localPeer)
expect(signedPeerRecord).to.exist()
const record = PeerRecord.createFromProtobuf(signedPeerRecord.payload)
expect(record).to.exist()
expect(record.multiaddrs.length).to.equal(addrs.length)
addrs.forEach((a, i) => {
expect(record.multiaddrs[i].equals(a)).to.be.true()
})
})
it('should be able to dial', async () => { it('should be able to dial', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport) tm.add(Transport.prototype[Symbol.toStringTag], Transport)
await tm.listen() await tm.listen(addrs)
const addr = tm.getAddrs().shift() const addr = tm.getAddrs().shift()
const connection = await tm.dial(addr) const connection = await tm.dial(addr)
expect(connection).to.exist() expect(connection).to.exist()

View File

@ -87,7 +87,7 @@ describe('Transport Manager (WebSockets)', () => {
it('should fail to listen with no valid address', async () => { it('should fail to listen with no valid address', async () => {
tm.add(Transport.prototype[Symbol.toStringTag], Transport) tm.add(Transport.prototype[Symbol.toStringTag], Transport)
await expect(tm.listen()) await expect(tm.listen([listenAddr]))
.to.eventually.be.rejected() .to.eventually.be.rejected()
.and.to.have.property('code', ErrorCodes.ERR_NO_VALID_ADDRESSES) .and.to.have.property('code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
}) })