mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-07-09 22:01:34 +00:00
Compare commits
8 Commits
v0.37.0
...
feat/disco
Author | SHA1 | Date | |
---|---|---|---|
87da0cd86d | |||
894d8e3cac | |||
6e94724074 | |||
6b7b1d7714 | |||
1bb6735cd3 | |||
4848313cb3 | |||
da5785d18e | |||
72fe150e64 |
60
doc/API.md
60
doc/API.md
@ -20,6 +20,8 @@
|
|||||||
* [`contentRouting.put`](#contentroutingput)
|
* [`contentRouting.put`](#contentroutingput)
|
||||||
* [`contentRouting.get`](#contentroutingget)
|
* [`contentRouting.get`](#contentroutingget)
|
||||||
* [`contentRouting.getMany`](#contentroutinggetmany)
|
* [`contentRouting.getMany`](#contentroutinggetmany)
|
||||||
|
* [`discovery.advertise`](#discoveryadvertise)
|
||||||
|
* [`discovery.findPeers`](#discoveryfindpeers)
|
||||||
* [`peerRouting.findPeer`](#peerroutingfindpeer)
|
* [`peerRouting.findPeer`](#peerroutingfindpeer)
|
||||||
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
|
* [`peerStore.addressBook.add`](#peerstoreaddressbookadd)
|
||||||
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
|
* [`peerStore.addressBook.delete`](#peerstoreaddressbookdelete)
|
||||||
@ -666,6 +668,64 @@ const key = '/key'
|
|||||||
const { from, val } = await libp2p.contentRouting.get(key)
|
const { from, val } = await libp2p.contentRouting.get(key)
|
||||||
```
|
```
|
||||||
|
|
||||||
|
### discovery.advertise
|
||||||
|
|
||||||
|
Advertise services on the network.
|
||||||
|
This will use content routing modules and rendezvous if enabled.
|
||||||
|
|
||||||
|
`libp2p.discovery.advertise(namespace, options)`
|
||||||
|
|
||||||
|
#### Parameters
|
||||||
|
|
||||||
|
| Name | Type | Description |
|
||||||
|
|------|------|-------------|
|
||||||
|
| namespace | `string` | namespace of the service to advertise |
|
||||||
|
| [options] | `object` | advertise options |
|
||||||
|
|
||||||
|
#### Returns
|
||||||
|
|
||||||
|
| Type | Description |
|
||||||
|
|------|-------------|
|
||||||
|
| `Promise<void>` | Promise resolves once advertise messages are sent |
|
||||||
|
|
||||||
|
#### Example
|
||||||
|
|
||||||
|
```js
|
||||||
|
// ...
|
||||||
|
const namespace = '/libp2p/relay'
|
||||||
|
await libp2p.discovery.advertise(namespace)
|
||||||
|
```
|
||||||
|
|
||||||
|
### discovery.findPeers
|
||||||
|
|
||||||
|
Discover peers providing a given service.
|
||||||
|
This will use content routing modules and rendezvous if enabled and will store the peers data in the PeerStore.
|
||||||
|
|
||||||
|
`libp2p.discovery.findPeers(namespace, options)`
|
||||||
|
|
||||||
|
#### Parameters
|
||||||
|
|
||||||
|
| Name | Type | Description |
|
||||||
|
|------|------|-------------|
|
||||||
|
| namespace | `string` | namespace of the service to find peers |
|
||||||
|
| [options] | `object` | find peers options |
|
||||||
|
| [options.limit] | `number` | number of distinct peers to find |
|
||||||
|
|
||||||
|
#### Returns
|
||||||
|
|
||||||
|
| Type | Description |
|
||||||
|
|------|-------------|
|
||||||
|
| `AsyncIterable<PeerId>}` | Async iterator for peers |
|
||||||
|
|
||||||
|
#### Example
|
||||||
|
|
||||||
|
```js
|
||||||
|
// Iterate over the peers found for the given namespace
|
||||||
|
for await (const peer of libp2p.discovery.findPeers(namespace)) {
|
||||||
|
console.log(peer)
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
### peerRouting.findPeer
|
### peerRouting.findPeer
|
||||||
|
|
||||||
Iterates over all peer routers in series to find the given peer. If the DHT is enabled, it will be tried first.
|
Iterates over all peer routers in series to find the given peer. If the DHT is enabled, it will be tried first.
|
||||||
|
230
src/circuit/auto-relay.js
Normal file
230
src/circuit/auto-relay.js
Normal file
@ -0,0 +1,230 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const debug = require('debug')
|
||||||
|
const log = debug('libp2p:auto-relay')
|
||||||
|
log.error = debug('libp2p:auto-relay:error')
|
||||||
|
|
||||||
|
const uint8ArrayFromString = require('uint8arrays/from-string')
|
||||||
|
const uint8ArrayToString = require('uint8arrays/to-string')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
const PeerId = require('peer-id')
|
||||||
|
|
||||||
|
const { relay: multicodec } = require('./multicodec')
|
||||||
|
const { canHop } = require('./circuit/hop')
|
||||||
|
|
||||||
|
const circuitProtoCode = 290
|
||||||
|
const hopMetadataKey = 'hop_relay'
|
||||||
|
const hopMetadataValue = 'true'
|
||||||
|
|
||||||
|
class AutoRelay {
|
||||||
|
/**
|
||||||
|
* Creates an instance of AutoRelay.
|
||||||
|
* @constructor
|
||||||
|
* @param {object} props
|
||||||
|
* @param {Libp2p} props.libp2p
|
||||||
|
* @param {number} [props.maxListeners = 1] maximum number of relays to listen.
|
||||||
|
*/
|
||||||
|
constructor ({ libp2p, maxListeners = 1 }) {
|
||||||
|
this._libp2p = libp2p
|
||||||
|
this._peerId = libp2p.peerId
|
||||||
|
this._peerStore = libp2p.peerStore
|
||||||
|
this._connectionManager = libp2p.connectionManager
|
||||||
|
this._transportManager = libp2p.transportManager
|
||||||
|
|
||||||
|
this.maxListeners = maxListeners
|
||||||
|
|
||||||
|
/**
|
||||||
|
* @type {Set<string>}
|
||||||
|
*/
|
||||||
|
this._listenRelays = new Set()
|
||||||
|
|
||||||
|
this._onProtocolChange = this._onProtocolChange.bind(this)
|
||||||
|
this._onPeerDisconnected = this._onPeerDisconnected.bind(this)
|
||||||
|
|
||||||
|
this._peerStore.on('change:protocols', this._onProtocolChange)
|
||||||
|
this._connectionManager.on('peer:disconnect', this._onPeerDisconnected)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Check if a peer supports the relay protocol.
|
||||||
|
* If the protocol is not supported, check if it was supported before and remove it as a listen relay.
|
||||||
|
* If the protocol is supported, check if the peer supports **HOP** and add it as a listener if
|
||||||
|
* inside the threshold.
|
||||||
|
* @param {Object} props
|
||||||
|
* @param {PeerId} props.peerId
|
||||||
|
* @param {Array<string>} props.protocols
|
||||||
|
* @return {Promise<void>}
|
||||||
|
*/
|
||||||
|
async _onProtocolChange ({ peerId, protocols }) {
|
||||||
|
const id = peerId.toB58String()
|
||||||
|
|
||||||
|
// Check if it has the protocol
|
||||||
|
const hasProtocol = protocols.find(protocol => protocol === multicodec)
|
||||||
|
|
||||||
|
// If no protocol, check if we were keeping the peer before as a listenRelay
|
||||||
|
if (!hasProtocol && this._listenRelays.has(id)) {
|
||||||
|
this._removeListenRelay(id)
|
||||||
|
return
|
||||||
|
} else if (!hasProtocol || this._listenRelays.has(id)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// If protocol, check if can hop, store info in the metadataBook and listen on it
|
||||||
|
try {
|
||||||
|
const connection = this._connectionManager.get(peerId)
|
||||||
|
|
||||||
|
// Do not hop on a relayed connection
|
||||||
|
if (connection.remoteAddr.protoCodes().includes(circuitProtoCode)) {
|
||||||
|
log(`relayed connection to ${id} will not be used to hop on`)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const supportsHop = await canHop({ connection })
|
||||||
|
|
||||||
|
if (supportsHop) {
|
||||||
|
this._peerStore.metadataBook.set(peerId, hopMetadataKey, uint8ArrayFromString(hopMetadataValue))
|
||||||
|
await this._addListenRelay(connection, id)
|
||||||
|
}
|
||||||
|
} catch (err) {
|
||||||
|
log.error(err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Peer disconnects.
|
||||||
|
* @param {Connection} connection connection to the peer
|
||||||
|
* @return {void}
|
||||||
|
*/
|
||||||
|
_onPeerDisconnected (connection) {
|
||||||
|
const peerId = connection.remotePeer
|
||||||
|
const id = peerId.toB58String()
|
||||||
|
|
||||||
|
// Not listening on this relay
|
||||||
|
if (!this._listenRelays.has(id)) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
this._removeListenRelay(id)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Attempt to listen on the given relay connection.
|
||||||
|
* @private
|
||||||
|
* @param {Connection} connection connection to the peer
|
||||||
|
* @param {string} id peer identifier string
|
||||||
|
* @return {Promise<void>}
|
||||||
|
*/
|
||||||
|
async _addListenRelay (connection, id) {
|
||||||
|
// Check if already listening on enough relays
|
||||||
|
if (this._listenRelays.size >= this.maxListeners) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create relay listen addr
|
||||||
|
let listenAddr, remoteMultiaddr
|
||||||
|
|
||||||
|
try {
|
||||||
|
const remoteAddrs = this._peerStore.addressBook.get(connection.remotePeer)
|
||||||
|
// TODO: HOP Relays should avoid advertising private addresses!
|
||||||
|
remoteMultiaddr = remoteAddrs.find(a => a.isCertified).multiaddr // Get first announced address certified
|
||||||
|
} catch (_) {
|
||||||
|
log.error(`${id} does not have announced certified multiaddrs`)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if (!remoteMultiaddr.protoNames().includes('p2p')) {
|
||||||
|
listenAddr = `${remoteMultiaddr.toString()}/p2p/${connection.remotePeer.toB58String()}/p2p-circuit`
|
||||||
|
} else {
|
||||||
|
listenAddr = `${remoteMultiaddr.toString()}/p2p-circuit`
|
||||||
|
}
|
||||||
|
|
||||||
|
// Attempt to listen on relay
|
||||||
|
this._listenRelays.add(id)
|
||||||
|
|
||||||
|
try {
|
||||||
|
await this._transportManager.listen([multiaddr(listenAddr)])
|
||||||
|
// Announce multiaddrs will update on listen success by TransportManager event being triggered
|
||||||
|
} catch (err) {
|
||||||
|
log.error(err)
|
||||||
|
this._listenRelays.delete(id)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Remove listen relay.
|
||||||
|
* @private
|
||||||
|
* @param {string} id peer identifier string.
|
||||||
|
* @return {void}
|
||||||
|
*/
|
||||||
|
_removeListenRelay (id) {
|
||||||
|
if (this._listenRelays.delete(id)) {
|
||||||
|
// TODO: this should be responsibility of the connMgr
|
||||||
|
this._listenOnAvailableHopRelays([id])
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Try to listen on available hop relay connections.
|
||||||
|
* The following order will happen while we do not have enough relays.
|
||||||
|
* 1. Check the metadata store for known relays, try to listen on the ones we are already connected.
|
||||||
|
* 2. Dial and try to listen on the peers we know that support hop but are not connected.
|
||||||
|
* 3. Search the network.
|
||||||
|
* @param {Array<string>} [peersToIgnore]
|
||||||
|
* @return {Promise<void>}
|
||||||
|
*/
|
||||||
|
async _listenOnAvailableHopRelays (peersToIgnore = []) {
|
||||||
|
// TODO: The peer redial issue on disconnect should be handled by connection gating
|
||||||
|
// Check if already listening on enough relays
|
||||||
|
if (this._listenRelays.size >= this.maxListeners) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
const knownHopsToDial = []
|
||||||
|
|
||||||
|
// Check if we have known hop peers to use and attempt to listen on the already connected
|
||||||
|
for (const [id, metadataMap] of this._peerStore.metadataBook.data.entries()) {
|
||||||
|
// Continue to next if listening on this or peer to ignore
|
||||||
|
if (this._listenRelays.has(id) || peersToIgnore.includes(id)) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
const supportsHop = metadataMap.get(hopMetadataKey)
|
||||||
|
|
||||||
|
// Continue to next if it does not support Hop
|
||||||
|
if (!supportsHop || uint8ArrayToString(supportsHop) !== hopMetadataValue) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
const peerId = PeerId.createFromCID(id)
|
||||||
|
const connection = this._connectionManager.get(peerId)
|
||||||
|
|
||||||
|
// If not connected, store for possible later use.
|
||||||
|
if (!connection) {
|
||||||
|
knownHopsToDial.push(peerId)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
await this._addListenRelay(connection, id)
|
||||||
|
|
||||||
|
// Check if already listening on enough relays
|
||||||
|
if (this._listenRelays.size >= this.maxListeners) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to listen on known peers that are not connected
|
||||||
|
for (const peerId of knownHopsToDial) {
|
||||||
|
const connection = await this._libp2p.dial(peerId)
|
||||||
|
await this._addListenRelay(connection, peerId.toB58String())
|
||||||
|
|
||||||
|
// Check if already listening on enough relays
|
||||||
|
if (this._listenRelays.size >= this.maxListeners) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Try to find relays to hop on the network
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = AutoRelay
|
@ -117,6 +117,33 @@ module.exports.hop = async function hop ({
|
|||||||
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
|
throw errCode(new Error(`HOP request failed with code ${response.code}`), Errors.ERR_HOP_REQUEST_FAILED)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Performs a CAN_HOP request to a relay peer, in order to understand its capabilities.
|
||||||
|
* @param {object} options
|
||||||
|
* @param {Connection} options.connection Connection to the relay
|
||||||
|
* @returns {Promise<boolean>}
|
||||||
|
*/
|
||||||
|
module.exports.canHop = async function canHop ({
|
||||||
|
connection
|
||||||
|
}) {
|
||||||
|
// Create a new stream to the relay
|
||||||
|
const { stream } = await connection.newStream([multicodec.relay])
|
||||||
|
// Send the HOP request
|
||||||
|
const streamHandler = new StreamHandler({ stream })
|
||||||
|
streamHandler.write({
|
||||||
|
type: CircuitPB.Type.CAN_HOP
|
||||||
|
})
|
||||||
|
|
||||||
|
const response = await streamHandler.read()
|
||||||
|
await streamHandler.close()
|
||||||
|
|
||||||
|
if (response.code !== CircuitPB.Status.SUCCESS) {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Creates an unencoded CAN_HOP response based on the Circuits configuration
|
* Creates an unencoded CAN_HOP response based on the Circuits configuration
|
||||||
* @private
|
* @private
|
||||||
|
@ -1,16 +1,18 @@
|
|||||||
'use strict'
|
'use strict'
|
||||||
|
|
||||||
|
const debug = require('debug')
|
||||||
|
const log = debug('libp2p:circuit')
|
||||||
|
log.error = debug('libp2p:circuit:error')
|
||||||
|
|
||||||
const mafmt = require('mafmt')
|
const mafmt = require('mafmt')
|
||||||
const multiaddr = require('multiaddr')
|
const multiaddr = require('multiaddr')
|
||||||
const PeerId = require('peer-id')
|
const PeerId = require('peer-id')
|
||||||
const withIs = require('class-is')
|
const withIs = require('class-is')
|
||||||
const { CircuitRelay: CircuitPB } = require('./protocol')
|
const { CircuitRelay: CircuitPB } = require('./protocol')
|
||||||
|
|
||||||
const debug = require('debug')
|
|
||||||
const log = debug('libp2p:circuit')
|
|
||||||
log.error = debug('libp2p:circuit:error')
|
|
||||||
const toConnection = require('libp2p-utils/src/stream-to-ma-conn')
|
const toConnection = require('libp2p-utils/src/stream-to-ma-conn')
|
||||||
|
|
||||||
|
const AutoRelay = require('./auto-relay')
|
||||||
const { relay: multicodec } = require('./multicodec')
|
const { relay: multicodec } = require('./multicodec')
|
||||||
const createListener = require('./listener')
|
const createListener = require('./listener')
|
||||||
const { handleCanHop, handleHop, hop } = require('./circuit/hop')
|
const { handleCanHop, handleHop, hop } = require('./circuit/hop')
|
||||||
@ -35,11 +37,19 @@ class Circuit {
|
|||||||
this._libp2p = libp2p
|
this._libp2p = libp2p
|
||||||
this.peerId = libp2p.peerId
|
this.peerId = libp2p.peerId
|
||||||
this._registrar.handle(multicodec, this._onProtocol.bind(this))
|
this._registrar.handle(multicodec, this._onProtocol.bind(this))
|
||||||
|
|
||||||
|
// Create autoRelay if enabled
|
||||||
|
this._autoRelay = this._options.autoRelay.enabled && new AutoRelay({ libp2p, ...this._options.autoRelay })
|
||||||
}
|
}
|
||||||
|
|
||||||
async _onProtocol ({ connection, stream, protocol }) {
|
async _onProtocol ({ connection, stream }) {
|
||||||
const streamHandler = new StreamHandler({ stream })
|
const streamHandler = new StreamHandler({ stream })
|
||||||
const request = await streamHandler.read()
|
const request = await streamHandler.read()
|
||||||
|
|
||||||
|
if (!request) {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
const circuit = this
|
const circuit = this
|
||||||
let virtualConnection
|
let virtualConnection
|
||||||
|
|
||||||
@ -163,7 +173,7 @@ class Circuit {
|
|||||||
// Called on successful HOP and STOP requests
|
// Called on successful HOP and STOP requests
|
||||||
this.handler = handler
|
this.handler = handler
|
||||||
|
|
||||||
return createListener(this, options)
|
return createListener(this._libp2p, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -8,13 +8,23 @@ const log = debug('libp2p:circuit:listener')
|
|||||||
log.err = debug('libp2p:circuit:error:listener')
|
log.err = debug('libp2p:circuit:error:listener')
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @param {*} circuit
|
* @param {Libp2p} libp2p
|
||||||
* @returns {Listener} a transport listener
|
* @returns {Listener} a transport listener
|
||||||
*/
|
*/
|
||||||
module.exports = (circuit) => {
|
module.exports = (libp2p) => {
|
||||||
const listener = new EventEmitter()
|
const listener = new EventEmitter()
|
||||||
const listeningAddrs = new Map()
|
const listeningAddrs = new Map()
|
||||||
|
|
||||||
|
// Remove listeningAddrs when a peer disconnects
|
||||||
|
libp2p.connectionManager.on('peer:disconnect', (connection) => {
|
||||||
|
const deleted = listeningAddrs.delete(connection.remotePeer.toB58String())
|
||||||
|
|
||||||
|
if (deleted) {
|
||||||
|
// Announce listen addresses change
|
||||||
|
listener.emit('close')
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add swarm handler and listen for incoming connections
|
* Add swarm handler and listen for incoming connections
|
||||||
*
|
*
|
||||||
@ -24,7 +34,7 @@ module.exports = (circuit) => {
|
|||||||
listener.listen = async (addr) => {
|
listener.listen = async (addr) => {
|
||||||
const addrString = String(addr).split('/p2p-circuit').find(a => a !== '')
|
const addrString = String(addr).split('/p2p-circuit').find(a => a !== '')
|
||||||
|
|
||||||
const relayConn = await circuit._dialer.connectToPeer(multiaddr(addrString))
|
const relayConn = await libp2p.dial(multiaddr(addrString))
|
||||||
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
|
const relayedAddr = relayConn.remoteAddr.encapsulate('/p2p-circuit')
|
||||||
|
|
||||||
listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
|
listeningAddrs.set(relayConn.remotePeer.toB58String(), relayedAddr)
|
||||||
|
@ -54,6 +54,10 @@ const DefaultConfig = {
|
|||||||
hop: {
|
hop: {
|
||||||
enabled: false,
|
enabled: false,
|
||||||
active: false
|
active: false
|
||||||
|
},
|
||||||
|
autoRelay: {
|
||||||
|
enabled: false,
|
||||||
|
maxListeners: 2
|
||||||
}
|
}
|
||||||
},
|
},
|
||||||
transport: {}
|
transport: {}
|
||||||
|
@ -16,6 +16,7 @@ module.exports = (node) => {
|
|||||||
}
|
}
|
||||||
|
|
||||||
return {
|
return {
|
||||||
|
routers,
|
||||||
/**
|
/**
|
||||||
* Iterates over all content routers in series to find providers of the given key.
|
* Iterates over all content routers in series to find providers of the given key.
|
||||||
* Once a content router succeeds, iteration will stop.
|
* Once a content router succeeds, iteration will stop.
|
||||||
@ -31,6 +32,8 @@ module.exports = (node) => {
|
|||||||
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
|
throw errCode(new Error('No content routers available'), 'NO_ROUTERS_AVAILABLE')
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// TODO: Abortables
|
||||||
|
|
||||||
const result = await pAny(
|
const result = await pAny(
|
||||||
routers.map(async (router) => {
|
routers.map(async (router) => {
|
||||||
const provs = await all(router.findProviders(key, options))
|
const provs = await all(router.findProviders(key, options))
|
||||||
|
96
src/discovery/index.js
Normal file
96
src/discovery/index.js
Normal file
@ -0,0 +1,96 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const debug = require('debug')
|
||||||
|
const log = debug('libp2p:discovery')
|
||||||
|
log.error = debug('libp2p:discovery:error')
|
||||||
|
const errCode = require('err-code')
|
||||||
|
|
||||||
|
// const AbortController = require('abort-controller')
|
||||||
|
const { parallelMerge } = require('streaming-iterables')
|
||||||
|
|
||||||
|
const Rendezvous = require('./rendezvous')
|
||||||
|
const Routing = require('./routing')
|
||||||
|
const { codes } = require('../errors')
|
||||||
|
|
||||||
|
module.exports = (libp2p) => {
|
||||||
|
const addressBook = libp2p.peerStore.addressBook
|
||||||
|
const routing = Routing(libp2p)
|
||||||
|
const rendezvous = Rendezvous(libp2p)
|
||||||
|
|
||||||
|
const getDiscoveryAvailableIts = (namespace, options) => {
|
||||||
|
if (routing.isEnabled && rendezvous.isEnabled) {
|
||||||
|
return parallelMerge(
|
||||||
|
routing.findPeers(namespace, options),
|
||||||
|
rendezvous.findPeers(namespace, options)
|
||||||
|
)
|
||||||
|
} else if (routing.isEnabled) {
|
||||||
|
return routing.findPeers(namespace, options)
|
||||||
|
}
|
||||||
|
return rendezvous.findPeers(namespace, options)
|
||||||
|
}
|
||||||
|
|
||||||
|
return {
|
||||||
|
/**
|
||||||
|
* Advertise services on the network.
|
||||||
|
* @param {string} namespace
|
||||||
|
* @param {object} [options]
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
*/
|
||||||
|
async advertise (namespace, options) {
|
||||||
|
if (!routing.isEnabled && !rendezvous.isEnabled) {
|
||||||
|
throw errCode(new Error('no discovery implementations available'), codes.ERR_NO_DISCOVERY_IMPLEMENTATIONS)
|
||||||
|
}
|
||||||
|
return Promise.all([
|
||||||
|
routing.isEnabled && routing.advertise(namespace, options),
|
||||||
|
rendezvous.isEnabled && rendezvous.advertise(namespace, options)
|
||||||
|
])
|
||||||
|
},
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Discover peers providing a given service.
|
||||||
|
* @param {string} namespace
|
||||||
|
* @param {object} [options]
|
||||||
|
* @param {number} [options.limit] number of distinct peers to find
|
||||||
|
* @param {AsyncIterable<PeerId>}
|
||||||
|
*/
|
||||||
|
async * findPeers (namespace, options = {}) {
|
||||||
|
if (!routing.isEnabled && !rendezvous.isEnabled) {
|
||||||
|
throw errCode(new Error('no discovery implementations available'), codes.ERR_NO_DISCOVERY_IMPLEMENTATIONS)
|
||||||
|
}
|
||||||
|
|
||||||
|
const discoveredPeers = new Set()
|
||||||
|
// TODO: add abort controller
|
||||||
|
const discAsyncIt = getDiscoveryAvailableIts(namespace, options)
|
||||||
|
|
||||||
|
// Store in the AddressBook: signed record or uncertified
|
||||||
|
for await (const { signedPeerRecord, id, multiaddrs } of discAsyncIt) {
|
||||||
|
if (signedPeerRecord) {
|
||||||
|
const idStr = signedPeerRecord.peerId.toB58String()
|
||||||
|
const isNew = !discoveredPeers.has(idStr)
|
||||||
|
discoveredPeers.add(idStr)
|
||||||
|
|
||||||
|
// Consume peer record and yield if new
|
||||||
|
if (addressBook.consumePeerRecord(signedPeerRecord) && isNew) {
|
||||||
|
yield signedPeerRecord.peerId
|
||||||
|
}
|
||||||
|
} else if (id && multiaddrs) {
|
||||||
|
const idStr = id.toB58String()
|
||||||
|
const isNew = !discoveredPeers.has(idStr)
|
||||||
|
discoveredPeers.add(idStr)
|
||||||
|
|
||||||
|
addressBook.add(id, multiaddrs)
|
||||||
|
|
||||||
|
if (isNew) {
|
||||||
|
yield
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Abort if enough
|
||||||
|
if (options.limit && options.limit <= discoveredPeers.size) {
|
||||||
|
console.log('abort')
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: who handles reprovide??
|
38
src/discovery/rendezvous.js
Normal file
38
src/discovery/rendezvous.js
Normal file
@ -0,0 +1,38 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RendezvousDiscovery is an implementation of service discovery
|
||||||
|
* using Rendezvous. Namespaces represent services supported by other nodes.
|
||||||
|
* @param {Libp2p} libp2p
|
||||||
|
*/
|
||||||
|
module.exports = libp2p => {
|
||||||
|
const rendezvous = libp2p.rendezvous
|
||||||
|
const isEnabled = !!rendezvous
|
||||||
|
|
||||||
|
return {
|
||||||
|
isEnabled,
|
||||||
|
/**
|
||||||
|
* Advertise services on the network using the rendezvous module.
|
||||||
|
* @param {string} namespace
|
||||||
|
* @param {object} [options]
|
||||||
|
* @param {object} [options.ttl = 7200e3] registration ttl in ms (minimum 120)
|
||||||
|
* @returns {Promise<number>}
|
||||||
|
*/
|
||||||
|
advertise(namespace, options) {
|
||||||
|
return rendezvous.register(namespace, options)
|
||||||
|
},
|
||||||
|
/**
|
||||||
|
* Discover peers providing a given service.
|
||||||
|
* @param {string} namespace
|
||||||
|
* @param {object} [options]
|
||||||
|
* @param {number} [options.limit] limit of peers to discover
|
||||||
|
* @returns {AsyncIterable<{ signedPeerRecord: Envelope }>}
|
||||||
|
*/
|
||||||
|
async * findPeers(namespace, options) {
|
||||||
|
// TODO: Abortables + options
|
||||||
|
for await (const peer of rendezvous.discover(namespace, options)) {
|
||||||
|
yield peer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
47
src/discovery/routing.js
Normal file
47
src/discovery/routing.js
Normal file
@ -0,0 +1,47 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { namespaceToCid } = require('./utils')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* RoutingDiscovery is an implementation of service discovery
|
||||||
|
* using ContentRouting. Namespaces represent services supported by other nodes.
|
||||||
|
* @param {Libp2p} libp2p
|
||||||
|
*/
|
||||||
|
module.exports = libp2p => {
|
||||||
|
const contentRouting = libp2p.contentRouting
|
||||||
|
const isEnabled = !!contentRouting.routers.length
|
||||||
|
|
||||||
|
return {
|
||||||
|
isEnabled,
|
||||||
|
/**
|
||||||
|
* Advertise services on the network using content routing modules.
|
||||||
|
* @param {string} namespace
|
||||||
|
* @param {object} [options]
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
*/
|
||||||
|
async advertise(namespace, options) {
|
||||||
|
const cid = await namespaceToCid(namespace)
|
||||||
|
|
||||||
|
// TODO: options?
|
||||||
|
await contentRouting.provide(cid)
|
||||||
|
},
|
||||||
|
/**
|
||||||
|
* Discover peers providing a given service.
|
||||||
|
* @param {string} namespace
|
||||||
|
* @param {object} [options]
|
||||||
|
* @param {number} [options.limit] limit of peers to discover
|
||||||
|
* @returns {AsyncIterable<{ id: PeerId, multiaddrs: Multiaddr[] }>}
|
||||||
|
*/
|
||||||
|
async * findPeers(namespace, options = {}) {
|
||||||
|
const cid = await namespaceToCid(namespace)
|
||||||
|
const providerOptions = {
|
||||||
|
maxNumProviders: options.limit
|
||||||
|
}
|
||||||
|
|
||||||
|
// TODO: Abortables + options
|
||||||
|
for await (const peer of contentRouting.findProviders(cid, providerOptions)) {
|
||||||
|
yield peer
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
16
src/discovery/utils.js
Normal file
16
src/discovery/utils.js
Normal file
@ -0,0 +1,16 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const CID = require('cids')
|
||||||
|
const multihashing = require('multihashing-async')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Convert a namespace string into a cid.
|
||||||
|
* @param {string} namespace
|
||||||
|
* @return {Promise<CID>}
|
||||||
|
*/
|
||||||
|
module.exports.namespaceToCid = async (namespace) => {
|
||||||
|
const bytes = new TextEncoder('utf8').encode(namespace)
|
||||||
|
const hash = await multihashing(bytes, 'sha2-256')
|
||||||
|
|
||||||
|
return new CID(hash)
|
||||||
|
}
|
@ -16,6 +16,7 @@ exports.codes = {
|
|||||||
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
|
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
|
||||||
ERR_ALREADY_ABORTED: 'ERR_ALREADY_ABORTED',
|
ERR_ALREADY_ABORTED: 'ERR_ALREADY_ABORTED',
|
||||||
ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES',
|
ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES',
|
||||||
|
ERR_NO_DISCOVERY_IMPLEMENTATIONS: 'ERR_NO_DISCOVERY_IMPLEMENTATIONS',
|
||||||
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
|
ERR_DISCOVERED_SELF: 'ERR_DISCOVERED_SELF',
|
||||||
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
|
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
|
||||||
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
|
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
|
||||||
|
@ -63,12 +63,6 @@ class IdentifyService {
|
|||||||
*/
|
*/
|
||||||
this.connectionManager = libp2p.connectionManager
|
this.connectionManager = libp2p.connectionManager
|
||||||
|
|
||||||
this.connectionManager.on('peer:connect', (connection) => {
|
|
||||||
const peerId = connection.remotePeer
|
|
||||||
|
|
||||||
this.identify(connection, peerId).catch(log.error)
|
|
||||||
})
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @property {PeerId}
|
* @property {PeerId}
|
||||||
*/
|
*/
|
||||||
@ -82,6 +76,19 @@ class IdentifyService {
|
|||||||
this._protocols = protocols
|
this._protocols = protocols
|
||||||
|
|
||||||
this.handleMessage = this.handleMessage.bind(this)
|
this.handleMessage = this.handleMessage.bind(this)
|
||||||
|
|
||||||
|
this.connectionManager.on('peer:connect', (connection) => {
|
||||||
|
const peerId = connection.remotePeer
|
||||||
|
|
||||||
|
this.identify(connection, peerId).catch(log.error)
|
||||||
|
})
|
||||||
|
|
||||||
|
// When self multiaddrs change, trigger identify-push
|
||||||
|
this.peerStore.on('change:multiaddrs', ({ peerId }) => {
|
||||||
|
if (peerId.toString() === this.peerId.toString()) {
|
||||||
|
this.pushToPeerStore()
|
||||||
|
}
|
||||||
|
})
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -90,7 +97,7 @@ class IdentifyService {
|
|||||||
* @returns {Promise<void>}
|
* @returns {Promise<void>}
|
||||||
*/
|
*/
|
||||||
async push (connections) {
|
async push (connections) {
|
||||||
const signedPeerRecord = await this._getSelfPeerRecord()
|
const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)
|
||||||
const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes)
|
const listenAddrs = this._libp2p.multiaddrs.map((ma) => ma.bytes)
|
||||||
const protocols = Array.from(this._protocols.keys())
|
const protocols = Array.from(this._protocols.keys())
|
||||||
|
|
||||||
@ -119,12 +126,12 @@ class IdentifyService {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Calls `push` for all peers in the `peerStore` that are connected
|
* Calls `push` for all peers in the `peerStore` that are connected
|
||||||
* @param {PeerStore} peerStore
|
* @returns {void}
|
||||||
*/
|
*/
|
||||||
pushToPeerStore (peerStore) {
|
pushToPeerStore () {
|
||||||
const connections = []
|
const connections = []
|
||||||
let connection
|
let connection
|
||||||
for (const peer of peerStore.peers.values()) {
|
for (const peer of this.peerStore.peers.values()) {
|
||||||
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) {
|
if (peer.protocols.includes(MULTICODEC_IDENTIFY_PUSH) && (connection = this.connectionManager.get(peer.id))) {
|
||||||
connections.push(connection)
|
connections.push(connection)
|
||||||
}
|
}
|
||||||
@ -239,7 +246,7 @@ class IdentifyService {
|
|||||||
publicKey = this.peerId.pubKey.bytes
|
publicKey = this.peerId.pubKey.bytes
|
||||||
}
|
}
|
||||||
|
|
||||||
const signedPeerRecord = await this._getSelfPeerRecord()
|
const signedPeerRecord = await this.peerStore.addressBook.getRawEnvelope(this.peerId)
|
||||||
|
|
||||||
const message = Message.encode({
|
const message = Message.encode({
|
||||||
protocolVersion: PROTOCOL_VERSION,
|
protocolVersion: PROTOCOL_VERSION,
|
||||||
@ -308,33 +315,6 @@ class IdentifyService {
|
|||||||
// Update the protocols
|
// Update the protocols
|
||||||
this.peerStore.protoBook.set(id, message.protocols)
|
this.peerStore.protoBook.set(id, message.protocols)
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
|
||||||
* Get self signed peer record raw envelope.
|
|
||||||
* @return {Uint8Array}
|
|
||||||
*/
|
|
||||||
async _getSelfPeerRecord () {
|
|
||||||
const selfSignedPeerRecord = this.peerStore.addressBook.getRawEnvelope(this.peerId)
|
|
||||||
|
|
||||||
// TODO: support invalidation when dynamic multiaddrs are supported
|
|
||||||
if (selfSignedPeerRecord) {
|
|
||||||
return selfSignedPeerRecord
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
|
||||||
const peerRecord = new PeerRecord({
|
|
||||||
peerId: this.peerId,
|
|
||||||
multiaddrs: this._libp2p.multiaddrs
|
|
||||||
})
|
|
||||||
const envelope = await Envelope.seal(peerRecord, this.peerId)
|
|
||||||
this.peerStore.addressBook.consumePeerRecord(envelope)
|
|
||||||
|
|
||||||
return this.peerStore.addressBook.getRawEnvelope(this.peerId)
|
|
||||||
} catch (err) {
|
|
||||||
log.error('failed to get self peer record')
|
|
||||||
}
|
|
||||||
return null
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
module.exports.IdentifyService = IdentifyService
|
module.exports.IdentifyService = IdentifyService
|
||||||
|
11
src/index.js
11
src/index.js
@ -247,6 +247,7 @@ class Libp2p extends EventEmitter {
|
|||||||
log('libp2p is stopping')
|
log('libp2p is stopping')
|
||||||
|
|
||||||
try {
|
try {
|
||||||
|
this._isStarted = false
|
||||||
for (const service of this._discovery.values()) {
|
for (const service of this._discovery.values()) {
|
||||||
service.removeListener('peer', this._onDiscoveryPeer)
|
service.removeListener('peer', this._onDiscoveryPeer)
|
||||||
}
|
}
|
||||||
@ -274,7 +275,6 @@ class Libp2p extends EventEmitter {
|
|||||||
this.emit('error', err)
|
this.emit('error', err)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
this._isStarted = false
|
|
||||||
log('libp2p has stopped')
|
log('libp2p has stopped')
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -424,7 +424,7 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
// Only push if libp2p is running
|
// Only push if libp2p is running
|
||||||
if (this.isStarted() && this.identifyService) {
|
if (this.isStarted() && this.identifyService) {
|
||||||
this.identifyService.pushToPeerStore(this.peerStore)
|
this.identifyService.pushToPeerStore()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -441,13 +441,14 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
// Only push if libp2p is running
|
// Only push if libp2p is running
|
||||||
if (this.isStarted() && this.identifyService) {
|
if (this.isStarted() && this.identifyService) {
|
||||||
this.identifyService.pushToPeerStore(this.peerStore)
|
this.identifyService.pushToPeerStore()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
async _onStarting () {
|
async _onStarting () {
|
||||||
// Listen on the provided transports
|
// Listen on the provided transports for the provided addresses
|
||||||
await this.transportManager.listen()
|
const addrs = this.addressManager.getListenAddrs()
|
||||||
|
await this.transportManager.listen(addrs)
|
||||||
|
|
||||||
// Start PeerStore
|
// Start PeerStore
|
||||||
await this.peerStore.start()
|
await this.peerStore.start()
|
||||||
|
@ -260,7 +260,7 @@ class AddressBook extends Book {
|
|||||||
* Get the known data of a provided peer.
|
* Get the known data of a provided peer.
|
||||||
* @override
|
* @override
|
||||||
* @param {PeerId} peerId
|
* @param {PeerId} peerId
|
||||||
* @returns {Array<data>}
|
* @returns {Array<Address>|undefined}
|
||||||
*/
|
*/
|
||||||
get (peerId) {
|
get (peerId) {
|
||||||
if (!PeerId.isPeerId(peerId)) {
|
if (!PeerId.isPeerId(peerId)) {
|
||||||
|
20
src/record/utils.js
Normal file
20
src/record/utils.js
Normal file
@ -0,0 +1,20 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const Envelope = require('./envelope')
|
||||||
|
const PeerRecord = require('./peer-record')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Create (or update if existing) self peer record and store it in the AddressBook.
|
||||||
|
* @param {libp2p} libp2p
|
||||||
|
* @returns {Promise<void>}
|
||||||
|
*/
|
||||||
|
async function updateSelfPeerRecord (libp2p) {
|
||||||
|
const peerRecord = new PeerRecord({
|
||||||
|
peerId: libp2p.peerId,
|
||||||
|
multiaddrs: libp2p.multiaddrs
|
||||||
|
})
|
||||||
|
const envelope = await Envelope.seal(peerRecord, libp2p.peerId)
|
||||||
|
libp2p.peerStore.addressBook.consumePeerRecord(envelope)
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports.updateSelfPeerRecord = updateSelfPeerRecord
|
@ -7,6 +7,8 @@ const debug = require('debug')
|
|||||||
const log = debug('libp2p:transports')
|
const log = debug('libp2p:transports')
|
||||||
log.error = debug('libp2p:transports:error')
|
log.error = debug('libp2p:transports:error')
|
||||||
|
|
||||||
|
const { updateSelfPeerRecord } = require('./record/utils')
|
||||||
|
|
||||||
class TransportManager {
|
class TransportManager {
|
||||||
/**
|
/**
|
||||||
* @constructor
|
* @constructor
|
||||||
@ -62,6 +64,8 @@ class TransportManager {
|
|||||||
log('closing listeners for %s', key)
|
log('closing listeners for %s', key)
|
||||||
while (listeners.length) {
|
while (listeners.length) {
|
||||||
const listener = listeners.pop()
|
const listener = listeners.pop()
|
||||||
|
listener.removeAllListeners('listening')
|
||||||
|
listener.removeAllListeners('close')
|
||||||
tasks.push(listener.close())
|
tasks.push(listener.close())
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -131,11 +135,10 @@ class TransportManager {
|
|||||||
/**
|
/**
|
||||||
* Starts listeners for each listen Multiaddr.
|
* Starts listeners for each listen Multiaddr.
|
||||||
* @async
|
* @async
|
||||||
|
* @param {Array<Multiaddr>} addrs addresses to attempt to listen on
|
||||||
*/
|
*/
|
||||||
async listen () {
|
async listen (addrs) {
|
||||||
const addrs = this.libp2p.addressManager.getListenAddrs()
|
if (!addrs || addrs.length === 0) {
|
||||||
|
|
||||||
if (addrs.length === 0) {
|
|
||||||
log('no addresses were provided for listening, this node is dial only')
|
log('no addresses were provided for listening, this node is dial only')
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
@ -151,6 +154,10 @@ class TransportManager {
|
|||||||
const listener = transport.createListener({}, this.onConnection)
|
const listener = transport.createListener({}, this.onConnection)
|
||||||
this._listeners.get(key).push(listener)
|
this._listeners.get(key).push(listener)
|
||||||
|
|
||||||
|
// Track listen/close events
|
||||||
|
listener.on('listening', () => updateSelfPeerRecord(this.libp2p))
|
||||||
|
listener.on('close', () => updateSelfPeerRecord(this.libp2p))
|
||||||
|
|
||||||
// We need to attempt to listen on everything
|
// We need to attempt to listen on everything
|
||||||
tasks.push(listener.listen(addr))
|
tasks.push(listener.listen(addr))
|
||||||
}
|
}
|
||||||
@ -195,6 +202,8 @@ class TransportManager {
|
|||||||
if (this._listeners.has(key)) {
|
if (this._listeners.has(key)) {
|
||||||
// Close any running listeners
|
// Close any running listeners
|
||||||
for (const listener of this._listeners.get(key)) {
|
for (const listener of this._listeners.get(key)) {
|
||||||
|
listener.removeAllListeners('listening')
|
||||||
|
listener.removeAllListeners('close')
|
||||||
await listener.close()
|
await listener.close()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -40,21 +40,28 @@ describe('Dialing (direct, TCP)', () => {
|
|||||||
let peerStore
|
let peerStore
|
||||||
let remoteAddr
|
let remoteAddr
|
||||||
|
|
||||||
before(async () => {
|
beforeEach(async () => {
|
||||||
const [remotePeerId] = await Promise.all([
|
const [localPeerId, remotePeerId] = await Promise.all([
|
||||||
PeerId.createFromJSON(Peers[0])
|
PeerId.createFromJSON(Peers[0]),
|
||||||
|
PeerId.createFromJSON(Peers[1])
|
||||||
])
|
])
|
||||||
|
|
||||||
|
peerStore = new PeerStore({ peerId: remotePeerId })
|
||||||
remoteTM = new TransportManager({
|
remoteTM = new TransportManager({
|
||||||
libp2p: {
|
libp2p: {
|
||||||
addressManager: new AddressManager({ listen: [listenAddr] })
|
addressManager: new AddressManager({ listen: [listenAddr] }),
|
||||||
|
peerId: remotePeerId,
|
||||||
|
peerStore
|
||||||
},
|
},
|
||||||
upgrader: mockUpgrader
|
upgrader: mockUpgrader
|
||||||
})
|
})
|
||||||
remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport)
|
remoteTM.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
|
|
||||||
peerStore = new PeerStore({ peerId: remotePeerId })
|
|
||||||
localTM = new TransportManager({
|
localTM = new TransportManager({
|
||||||
libp2p: {},
|
libp2p: {
|
||||||
|
peerId: localPeerId,
|
||||||
|
peerStore: new PeerStore({ peerId: localPeerId })
|
||||||
|
},
|
||||||
upgrader: mockUpgrader
|
upgrader: mockUpgrader
|
||||||
})
|
})
|
||||||
localTM.add(Transport.prototype[Symbol.toStringTag], Transport)
|
localTM.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
@ -64,7 +71,7 @@ describe('Dialing (direct, TCP)', () => {
|
|||||||
remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`)
|
remoteAddr = remoteTM.getAddrs()[0].encapsulate(`/p2p/${remotePeerId.toB58String()}`)
|
||||||
})
|
})
|
||||||
|
|
||||||
after(() => remoteTM.close())
|
afterEach(() => remoteTM.close())
|
||||||
|
|
||||||
afterEach(() => {
|
afterEach(() => {
|
||||||
sinon.restore()
|
sinon.restore()
|
||||||
@ -110,7 +117,7 @@ describe('Dialing (direct, TCP)', () => {
|
|||||||
peerStore
|
peerStore
|
||||||
})
|
})
|
||||||
|
|
||||||
peerStore.addressBook.set(peerId, [remoteAddr])
|
peerStore.addressBook.set(peerId, remoteTM.getAddrs())
|
||||||
|
|
||||||
const connection = await dialer.connectToPeer(peerId)
|
const connection = await dialer.connectToPeer(peerId)
|
||||||
expect(connection).to.exist()
|
expect(connection).to.exist()
|
||||||
|
@ -354,7 +354,6 @@ describe('Dialing (direct, WebSockets)', () => {
|
|||||||
const connection = await libp2p.dial(remoteAddr)
|
const connection = await libp2p.dial(remoteAddr)
|
||||||
expect(connection).to.exist()
|
expect(connection).to.exist()
|
||||||
|
|
||||||
sinon.spy(libp2p.peerStore.addressBook, 'consumePeerRecord')
|
|
||||||
sinon.spy(libp2p.peerStore.protoBook, 'set')
|
sinon.spy(libp2p.peerStore.protoBook, 'set')
|
||||||
|
|
||||||
// Wait for onConnection to be called
|
// Wait for onConnection to be called
|
||||||
@ -363,8 +362,6 @@ describe('Dialing (direct, WebSockets)', () => {
|
|||||||
expect(libp2p.identifyService.identify.callCount).to.equal(1)
|
expect(libp2p.identifyService.identify.callCount).to.equal(1)
|
||||||
await libp2p.identifyService.identify.firstCall.returnValue
|
await libp2p.identifyService.identify.firstCall.returnValue
|
||||||
|
|
||||||
// Self + New peer
|
|
||||||
expect(libp2p.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2)
|
|
||||||
expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1)
|
expect(libp2p.peerStore.protoBook.set.callCount).to.equal(1)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
@ -8,7 +8,6 @@ const { expect } = chai
|
|||||||
const sinon = require('sinon')
|
const sinon = require('sinon')
|
||||||
|
|
||||||
const { EventEmitter } = require('events')
|
const { EventEmitter } = require('events')
|
||||||
const delay = require('delay')
|
|
||||||
const PeerId = require('peer-id')
|
const PeerId = require('peer-id')
|
||||||
const duplexPair = require('it-pair/duplex')
|
const duplexPair = require('it-pair/duplex')
|
||||||
const multiaddr = require('multiaddr')
|
const multiaddr = require('multiaddr')
|
||||||
@ -22,6 +21,7 @@ const Libp2p = require('../../src')
|
|||||||
const Envelope = require('../../src/record/envelope')
|
const Envelope = require('../../src/record/envelope')
|
||||||
const PeerStore = require('../../src/peer-store')
|
const PeerStore = require('../../src/peer-store')
|
||||||
const baseOptions = require('../utils/base-options.browser')
|
const baseOptions = require('../utils/base-options.browser')
|
||||||
|
const { updateSelfPeerRecord } = require('../../src/record/utils')
|
||||||
const pkg = require('../../package.json')
|
const pkg = require('../../package.json')
|
||||||
|
|
||||||
const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
|
const { MULTIADDRS_WEBSOCKETS } = require('../fixtures/browser')
|
||||||
@ -78,6 +78,9 @@ describe('Identify', () => {
|
|||||||
sinon.spy(localIdentify.peerStore.addressBook, 'consumePeerRecord')
|
sinon.spy(localIdentify.peerStore.addressBook, 'consumePeerRecord')
|
||||||
sinon.spy(localIdentify.peerStore.protoBook, 'set')
|
sinon.spy(localIdentify.peerStore.protoBook, 'set')
|
||||||
|
|
||||||
|
// Transport Manager creates signed peer record
|
||||||
|
await updateSelfPeerRecord(remoteIdentify._libp2p)
|
||||||
|
|
||||||
// Run identify
|
// Run identify
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
localIdentify.identify(localConnectionMock),
|
localIdentify.identify(localConnectionMock),
|
||||||
@ -239,6 +242,10 @@ describe('Identify', () => {
|
|||||||
sinon.spy(remoteIdentify.peerStore.addressBook, 'consumePeerRecord')
|
sinon.spy(remoteIdentify.peerStore.addressBook, 'consumePeerRecord')
|
||||||
sinon.spy(remoteIdentify.peerStore.protoBook, 'set')
|
sinon.spy(remoteIdentify.peerStore.protoBook, 'set')
|
||||||
|
|
||||||
|
// Transport Manager creates signed peer record
|
||||||
|
await updateSelfPeerRecord(localIdentify._libp2p)
|
||||||
|
await updateSelfPeerRecord(remoteIdentify._libp2p)
|
||||||
|
|
||||||
// Run identify
|
// Run identify
|
||||||
await Promise.all([
|
await Promise.all([
|
||||||
localIdentify.push([localConnectionMock]),
|
localIdentify.push([localConnectionMock]),
|
||||||
@ -249,7 +256,7 @@ describe('Identify', () => {
|
|||||||
})
|
})
|
||||||
])
|
])
|
||||||
|
|
||||||
expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(1)
|
expect(remoteIdentify.peerStore.addressBook.consumePeerRecord.callCount).to.equal(2)
|
||||||
expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1)
|
expect(remoteIdentify.peerStore.protoBook.set.callCount).to.equal(1)
|
||||||
|
|
||||||
const addresses = localIdentify.peerStore.addressBook.get(localPeer)
|
const addresses = localIdentify.peerStore.addressBook.get(localPeer)
|
||||||
@ -359,8 +366,8 @@ describe('Identify', () => {
|
|||||||
expect(connection).to.exist()
|
expect(connection).to.exist()
|
||||||
|
|
||||||
// Wait for peer store to be updated
|
// Wait for peer store to be updated
|
||||||
// Dialer._createDialTarget (add), Identify (consume), Create self (consume)
|
// Dialer._createDialTarget (add), Identify (consume)
|
||||||
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 2 && peerStoreSpyAdd.callCount === 1)
|
await pWaitFor(() => peerStoreSpyConsumeRecord.callCount === 1 && peerStoreSpyAdd.callCount === 1)
|
||||||
expect(libp2p.identifyService.identify.callCount).to.equal(1)
|
expect(libp2p.identifyService.identify.callCount).to.equal(1)
|
||||||
|
|
||||||
// The connection should have no open streams
|
// The connection should have no open streams
|
||||||
@ -381,8 +388,6 @@ describe('Identify', () => {
|
|||||||
|
|
||||||
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
|
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
|
||||||
expect(connection).to.exist()
|
expect(connection).to.exist()
|
||||||
// Wait for nextTick to trigger the identify call
|
|
||||||
await delay(1)
|
|
||||||
|
|
||||||
// Wait for identify to finish
|
// Wait for identify to finish
|
||||||
await libp2p.identifyService.identify.firstCall.returnValue
|
await libp2p.identifyService.identify.firstCall.returnValue
|
||||||
@ -404,5 +409,39 @@ describe('Identify', () => {
|
|||||||
// Verify the streams close
|
// Verify the streams close
|
||||||
await pWaitFor(() => connection.streams.length === 0)
|
await pWaitFor(() => connection.streams.length === 0)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should push multiaddr updates to an already connected peer', async () => {
|
||||||
|
libp2p = new Libp2p({
|
||||||
|
...baseOptions,
|
||||||
|
peerId
|
||||||
|
})
|
||||||
|
|
||||||
|
await libp2p.start()
|
||||||
|
|
||||||
|
sinon.spy(libp2p.identifyService, 'identify')
|
||||||
|
sinon.spy(libp2p.identifyService, 'push')
|
||||||
|
|
||||||
|
const connection = await libp2p.dialer.connectToPeer(remoteAddr)
|
||||||
|
expect(connection).to.exist()
|
||||||
|
|
||||||
|
// Wait for identify to finish
|
||||||
|
await libp2p.identifyService.identify.firstCall.returnValue
|
||||||
|
sinon.stub(libp2p, 'isStarted').returns(true)
|
||||||
|
|
||||||
|
libp2p.peerStore.addressBook.add(libp2p.peerId, [multiaddr('/ip4/180.0.0.1/tcp/15001/ws')])
|
||||||
|
|
||||||
|
// Verify the remote peer is notified of change
|
||||||
|
expect(libp2p.identifyService.push.callCount).to.equal(1)
|
||||||
|
for (const call of libp2p.identifyService.push.getCalls()) {
|
||||||
|
const [connections] = call.args
|
||||||
|
expect(connections.length).to.equal(1)
|
||||||
|
expect(connections[0].remotePeer.toB58String()).to.equal(remoteAddr.getPeerId())
|
||||||
|
const results = await call.returnValue
|
||||||
|
expect(results.length).to.equal(1)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify the streams close
|
||||||
|
await pWaitFor(() => connection.streams.length === 0)
|
||||||
|
})
|
||||||
})
|
})
|
||||||
})
|
})
|
||||||
|
460
test/relay/auto-relay.node.js
Normal file
460
test/relay/auto-relay.node.js
Normal file
@ -0,0 +1,460 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
|
||||||
|
const delay = require('delay')
|
||||||
|
const pWaitFor = require('p-wait-for')
|
||||||
|
const sinon = require('sinon')
|
||||||
|
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
const Libp2p = require('../../src')
|
||||||
|
const { relay: relayMulticodec } = require('../../src/circuit/multicodec')
|
||||||
|
|
||||||
|
const { createPeerId } = require('../utils/creators/peer')
|
||||||
|
const baseOptions = require('../utils/base-options')
|
||||||
|
|
||||||
|
const listenAddr = '/ip4/0.0.0.0/tcp/0'
|
||||||
|
|
||||||
|
describe('auto-relay', () => {
|
||||||
|
describe('basics', () => {
|
||||||
|
let libp2p
|
||||||
|
let relayLibp2p
|
||||||
|
let autoRelay
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
const peerIds = await createPeerId({ number: 2 })
|
||||||
|
// Create 2 nodes, and turn HOP on for the relay
|
||||||
|
;[libp2p, relayLibp2p] = peerIds.map((peerId, index) => {
|
||||||
|
const opts = {
|
||||||
|
...baseOptions,
|
||||||
|
config: {
|
||||||
|
...baseOptions.config,
|
||||||
|
relay: {
|
||||||
|
hop: {
|
||||||
|
enabled: index !== 0
|
||||||
|
},
|
||||||
|
autoRelay: {
|
||||||
|
enabled: true,
|
||||||
|
maxListeners: 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Libp2p({
|
||||||
|
...opts,
|
||||||
|
addresses: {
|
||||||
|
listen: [listenAddr]
|
||||||
|
},
|
||||||
|
connectionManager: {
|
||||||
|
autoDial: false
|
||||||
|
},
|
||||||
|
peerDiscovery: {
|
||||||
|
autoDial: false
|
||||||
|
},
|
||||||
|
peerId
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
autoRelay = libp2p.transportManager._transports.get('Circuit')._autoRelay
|
||||||
|
|
||||||
|
expect(autoRelay.maxListeners).to.eql(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
// Start each node
|
||||||
|
return Promise.all([libp2p, relayLibp2p].map(libp2p => libp2p.start()))
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
// Stop each node
|
||||||
|
return Promise.all([libp2p, relayLibp2p].map(libp2p => libp2p.stop()))
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should ask if node supports hop on protocol change (relay protocol) and add to listen multiaddrs', async () => {
|
||||||
|
// Spy if a connected peer is being added as listen relay
|
||||||
|
sinon.spy(autoRelay, '_addListenRelay')
|
||||||
|
|
||||||
|
const originalMultiaddrsLength = relayLibp2p.multiaddrs.length
|
||||||
|
|
||||||
|
// Discover relay
|
||||||
|
libp2p.peerStore.addressBook.add(relayLibp2p.peerId, relayLibp2p.multiaddrs)
|
||||||
|
await libp2p.dial(relayLibp2p.peerId)
|
||||||
|
|
||||||
|
// Wait for peer added as listen relay
|
||||||
|
await pWaitFor(() => autoRelay._addListenRelay.callCount === 1)
|
||||||
|
expect(autoRelay._listenRelays.size).to.equal(1)
|
||||||
|
|
||||||
|
// Wait for listen multiaddr update
|
||||||
|
await pWaitFor(() => libp2p.multiaddrs.length === originalMultiaddrsLength + 1)
|
||||||
|
expect(libp2p.multiaddrs[originalMultiaddrsLength].getPeerId()).to.eql(relayLibp2p.peerId.toB58String())
|
||||||
|
|
||||||
|
// Peer has relay multicodec
|
||||||
|
const knownProtocols = libp2p.peerStore.protoBook.get(relayLibp2p.peerId)
|
||||||
|
expect(knownProtocols).to.include(relayMulticodec)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('flows with 1 listener max', () => {
|
||||||
|
let libp2p
|
||||||
|
let relayLibp2p1
|
||||||
|
let relayLibp2p2
|
||||||
|
let relayLibp2p3
|
||||||
|
let autoRelay1
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
const peerIds = await createPeerId({ number: 4 })
|
||||||
|
// Create 4 nodes, and turn HOP on for the relay
|
||||||
|
;[libp2p, relayLibp2p1, relayLibp2p2, relayLibp2p3] = peerIds.map((peerId, index) => {
|
||||||
|
let opts = baseOptions
|
||||||
|
|
||||||
|
if (index !== 0) {
|
||||||
|
opts = {
|
||||||
|
...baseOptions,
|
||||||
|
config: {
|
||||||
|
...baseOptions.config,
|
||||||
|
relay: {
|
||||||
|
hop: {
|
||||||
|
enabled: true
|
||||||
|
},
|
||||||
|
autoRelay: {
|
||||||
|
enabled: true,
|
||||||
|
maxListeners: 1
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return new Libp2p({
|
||||||
|
...opts,
|
||||||
|
addresses: {
|
||||||
|
listen: [listenAddr]
|
||||||
|
},
|
||||||
|
connectionManager: {
|
||||||
|
autoDial: false
|
||||||
|
},
|
||||||
|
peerDiscovery: {
|
||||||
|
autoDial: false
|
||||||
|
},
|
||||||
|
peerId
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
autoRelay1 = relayLibp2p1.transportManager._transports.get('Circuit')._autoRelay
|
||||||
|
|
||||||
|
expect(autoRelay1.maxListeners).to.eql(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
// Start each node
|
||||||
|
return Promise.all([libp2p, relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.start()))
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
// Stop each node
|
||||||
|
return Promise.all([libp2p, relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.stop()))
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should ask if node supports hop on protocol change (relay protocol) and add to listen multiaddrs', async () => {
|
||||||
|
// Spy if a connected peer is being added as listen relay
|
||||||
|
sinon.spy(autoRelay1, '_addListenRelay')
|
||||||
|
|
||||||
|
// Discover relay
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
|
||||||
|
|
||||||
|
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
|
||||||
|
const originalMultiaddrs2Length = relayLibp2p2.multiaddrs.length
|
||||||
|
|
||||||
|
await relayLibp2p1.dial(relayLibp2p2.peerId)
|
||||||
|
|
||||||
|
// Wait for peer added as listen relay
|
||||||
|
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
|
||||||
|
// Wait for listen multiaddr update
|
||||||
|
await Promise.all([
|
||||||
|
pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1),
|
||||||
|
pWaitFor(() => relayLibp2p2.multiaddrs.length === originalMultiaddrs2Length + 1)
|
||||||
|
])
|
||||||
|
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
|
||||||
|
|
||||||
|
// Peer has relay multicodec
|
||||||
|
const knownProtocols = relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId)
|
||||||
|
expect(knownProtocols).to.include(relayMulticodec)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to dial a peer from its relayed address previously added', async () => {
|
||||||
|
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
|
||||||
|
const originalMultiaddrs2Length = relayLibp2p2.multiaddrs.length
|
||||||
|
|
||||||
|
// Discover relay
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
|
||||||
|
|
||||||
|
await relayLibp2p1.dial(relayLibp2p2.peerId)
|
||||||
|
|
||||||
|
// Wait for listen multiaddr update
|
||||||
|
await Promise.all([
|
||||||
|
pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1),
|
||||||
|
pWaitFor(() => relayLibp2p2.multiaddrs.length === originalMultiaddrs2Length + 1)
|
||||||
|
])
|
||||||
|
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
|
||||||
|
|
||||||
|
// Dial from the other through a relay
|
||||||
|
const relayedMultiaddr2 = multiaddr(`${relayLibp2p1.multiaddrs[0]}/p2p/${relayLibp2p1.peerId.toB58String()}/p2p-circuit`)
|
||||||
|
libp2p.peerStore.addressBook.add(relayLibp2p2.peerId, [relayedMultiaddr2])
|
||||||
|
|
||||||
|
await libp2p.dial(relayLibp2p2.peerId)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should only add maxListeners relayed addresses', async () => {
|
||||||
|
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
|
||||||
|
const originalMultiaddrs2Length = relayLibp2p2.multiaddrs.length
|
||||||
|
|
||||||
|
// Spy if a connected peer is being added as listen relay
|
||||||
|
sinon.spy(autoRelay1, '_addListenRelay')
|
||||||
|
sinon.spy(autoRelay1._listenRelays, 'add')
|
||||||
|
|
||||||
|
// Discover one relay and connect
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
|
||||||
|
await relayLibp2p1.dial(relayLibp2p2.peerId)
|
||||||
|
|
||||||
|
expect(relayLibp2p1.connectionManager.size).to.eql(1)
|
||||||
|
|
||||||
|
// Wait for peer added as listen relay
|
||||||
|
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1 && autoRelay1._listenRelays.add.callCount === 1)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
|
||||||
|
// Wait for listen multiaddr update
|
||||||
|
await Promise.all([
|
||||||
|
pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1),
|
||||||
|
pWaitFor(() => relayLibp2p2.multiaddrs.length === originalMultiaddrs2Length + 1)
|
||||||
|
])
|
||||||
|
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
|
||||||
|
|
||||||
|
// Relay2 has relay multicodec
|
||||||
|
const knownProtocols2 = relayLibp2p1.peerStore.protoBook.get(relayLibp2p2.peerId)
|
||||||
|
expect(knownProtocols2).to.include(relayMulticodec)
|
||||||
|
|
||||||
|
// Discover an extra relay and connect
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
|
||||||
|
await relayLibp2p1.dial(relayLibp2p3.peerId)
|
||||||
|
|
||||||
|
// Wait to guarantee the dialed peer is not added as a listen relay
|
||||||
|
await delay(300)
|
||||||
|
|
||||||
|
expect(autoRelay1._addListenRelay.callCount).to.equal(2)
|
||||||
|
expect(autoRelay1._listenRelays.add.callCount).to.equal(1)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
expect(relayLibp2p1.connectionManager.size).to.eql(2)
|
||||||
|
|
||||||
|
// Relay2 has relay multicodec
|
||||||
|
const knownProtocols3 = relayLibp2p1.peerStore.protoBook.get(relayLibp2p3.peerId)
|
||||||
|
expect(knownProtocols3).to.include(relayMulticodec)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not listen on a relayed address if peer disconnects', async () => {
|
||||||
|
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
|
||||||
|
|
||||||
|
// Spy if identify push is fired on adding/removing listen addr
|
||||||
|
sinon.spy(relayLibp2p1.identifyService, 'pushToPeerStore')
|
||||||
|
|
||||||
|
// Discover one relay and connect
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
|
||||||
|
await relayLibp2p1.dial(relayLibp2p2.peerId)
|
||||||
|
|
||||||
|
// Wait for listenning on the relay
|
||||||
|
await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
|
||||||
|
|
||||||
|
// Identify push for adding listen relay multiaddr
|
||||||
|
expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(1)
|
||||||
|
|
||||||
|
// Disconnect from peer used for relay
|
||||||
|
await relayLibp2p1.hangUp(relayLibp2p2.peerId)
|
||||||
|
|
||||||
|
// Wait for removed listening on the relay
|
||||||
|
await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(0)
|
||||||
|
|
||||||
|
// Identify push for removing listen relay multiaddr
|
||||||
|
expect(relayLibp2p1.identifyService.pushToPeerStore.callCount).to.equal(2)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should try to listen on other connected peers relayed address if one used relay disconnects', async () => {
|
||||||
|
const originalMultiaddrs1Length = relayLibp2p1.multiaddrs.length
|
||||||
|
|
||||||
|
// Spy if a connected peer is being added as listen relay
|
||||||
|
sinon.spy(autoRelay1, '_addListenRelay')
|
||||||
|
sinon.spy(relayLibp2p1.transportManager, 'listen')
|
||||||
|
|
||||||
|
// Discover one relay and connect
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
|
||||||
|
await relayLibp2p1.dial(relayLibp2p2.peerId)
|
||||||
|
|
||||||
|
// Discover an extra relay and connect
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
|
||||||
|
await relayLibp2p1.dial(relayLibp2p3.peerId)
|
||||||
|
|
||||||
|
// Wait for both peer to be attempted to added as listen relay
|
||||||
|
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
expect(relayLibp2p1.connectionManager.size).to.equal(2)
|
||||||
|
|
||||||
|
// Wait for listen multiaddr update
|
||||||
|
await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1)
|
||||||
|
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p2.peerId.toB58String())
|
||||||
|
|
||||||
|
// Only one will be used for listeninng
|
||||||
|
expect(relayLibp2p1.transportManager.listen.callCount).to.equal(1)
|
||||||
|
|
||||||
|
// Spy if relay from listen map was removed
|
||||||
|
sinon.spy(autoRelay1._listenRelays, 'delete')
|
||||||
|
|
||||||
|
// Disconnect from peer used for relay
|
||||||
|
await relayLibp2p1.hangUp(relayLibp2p2.peerId)
|
||||||
|
expect(autoRelay1._listenRelays.delete.callCount).to.equal(1)
|
||||||
|
expect(autoRelay1._addListenRelay.callCount).to.equal(1)
|
||||||
|
|
||||||
|
// Wait for other peer connected to be added as listen addr
|
||||||
|
await pWaitFor(() => relayLibp2p1.transportManager.listen.callCount === 2)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
expect(relayLibp2p1.connectionManager.size).to.eql(1)
|
||||||
|
|
||||||
|
// Wait for listen multiaddr update
|
||||||
|
await pWaitFor(() => relayLibp2p1.multiaddrs.length === originalMultiaddrs1Length + 1)
|
||||||
|
expect(relayLibp2p1.multiaddrs[originalMultiaddrs1Length].getPeerId()).to.eql(relayLibp2p3.peerId.toB58String())
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should try to listen on stored peers relayed address if one used relay disconnects and there are not enough connected', async () => {
|
||||||
|
// Spy if a connected peer is being added as listen relay
|
||||||
|
sinon.spy(autoRelay1, '_addListenRelay')
|
||||||
|
sinon.spy(relayLibp2p1.transportManager, 'listen')
|
||||||
|
|
||||||
|
// Discover one relay and connect
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, relayLibp2p2.multiaddrs)
|
||||||
|
await relayLibp2p1.dial(relayLibp2p2.peerId)
|
||||||
|
|
||||||
|
// Discover an extra relay and connect to gather its Hop support
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
|
||||||
|
await relayLibp2p1.dial(relayLibp2p3.peerId)
|
||||||
|
|
||||||
|
// Wait for both peer to be attempted to added as listen relay
|
||||||
|
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 2)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
expect(relayLibp2p1.connectionManager.size).to.equal(2)
|
||||||
|
|
||||||
|
// Only one will be used for listeninng
|
||||||
|
expect(relayLibp2p1.transportManager.listen.callCount).to.equal(1)
|
||||||
|
|
||||||
|
// Disconnect not used listen relay
|
||||||
|
await relayLibp2p1.hangUp(relayLibp2p3.peerId)
|
||||||
|
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
expect(relayLibp2p1.connectionManager.size).to.equal(1)
|
||||||
|
|
||||||
|
// Spy on dial
|
||||||
|
sinon.spy(relayLibp2p1, 'dial')
|
||||||
|
|
||||||
|
// Remove peer used as relay from peerStore and disconnect it
|
||||||
|
relayLibp2p1.peerStore.delete(relayLibp2p2.peerId)
|
||||||
|
await relayLibp2p1.hangUp(relayLibp2p2.peerId)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(0)
|
||||||
|
expect(relayLibp2p1.connectionManager.size).to.equal(0)
|
||||||
|
|
||||||
|
// Wait for other peer connected to be added as listen addr
|
||||||
|
await pWaitFor(() => relayLibp2p1.transportManager.listen.callCount === 2)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
expect(relayLibp2p1.connectionManager.size).to.eql(1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('flows with 2 max listeners', () => {
|
||||||
|
let relayLibp2p1
|
||||||
|
let relayLibp2p2
|
||||||
|
let relayLibp2p3
|
||||||
|
let autoRelay1
|
||||||
|
let autoRelay2
|
||||||
|
|
||||||
|
beforeEach(async () => {
|
||||||
|
const peerIds = await createPeerId({ number: 3 })
|
||||||
|
// Create 3 nodes, and turn HOP on for the relay
|
||||||
|
;[relayLibp2p1, relayLibp2p2, relayLibp2p3] = peerIds.map((peerId) => {
|
||||||
|
return new Libp2p({
|
||||||
|
...baseOptions,
|
||||||
|
config: {
|
||||||
|
...baseOptions.config,
|
||||||
|
relay: {
|
||||||
|
...baseOptions.config.relay,
|
||||||
|
hop: {
|
||||||
|
enabled: true
|
||||||
|
},
|
||||||
|
autoRelay: {
|
||||||
|
enabled: true,
|
||||||
|
maxListeners: 2
|
||||||
|
}
|
||||||
|
}
|
||||||
|
},
|
||||||
|
addresses: {
|
||||||
|
listen: [listenAddr]
|
||||||
|
},
|
||||||
|
connectionManager: {
|
||||||
|
autoDial: false
|
||||||
|
},
|
||||||
|
peerDiscovery: {
|
||||||
|
autoDial: false
|
||||||
|
},
|
||||||
|
peerId
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
autoRelay1 = relayLibp2p1.transportManager._transports.get('Circuit')._autoRelay
|
||||||
|
autoRelay2 = relayLibp2p2.transportManager._transports.get('Circuit')._autoRelay
|
||||||
|
})
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
// Start each node
|
||||||
|
return Promise.all([relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.start()))
|
||||||
|
})
|
||||||
|
|
||||||
|
afterEach(() => {
|
||||||
|
// Stop each node
|
||||||
|
return Promise.all([relayLibp2p1, relayLibp2p2, relayLibp2p3].map(libp2p => libp2p.stop()))
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should not add listener to a already relayed connection', async () => {
|
||||||
|
// Spy if a connected peer is being added as listen relay
|
||||||
|
sinon.spy(autoRelay1, '_addListenRelay')
|
||||||
|
sinon.spy(autoRelay2, '_addListenRelay')
|
||||||
|
|
||||||
|
// Relay 1 discovers Relay 3 and connect
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
|
||||||
|
await relayLibp2p1.dial(relayLibp2p3.peerId)
|
||||||
|
|
||||||
|
// Wait for peer added as listen relay
|
||||||
|
await pWaitFor(() => autoRelay1._addListenRelay.callCount === 1)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
|
||||||
|
// Relay 2 discovers Relay 3 and connect
|
||||||
|
relayLibp2p2.peerStore.addressBook.add(relayLibp2p3.peerId, relayLibp2p3.multiaddrs)
|
||||||
|
await relayLibp2p2.dial(relayLibp2p3.peerId)
|
||||||
|
|
||||||
|
// Wait for peer added as listen relay
|
||||||
|
await pWaitFor(() => autoRelay2._addListenRelay.callCount === 1)
|
||||||
|
expect(autoRelay2._listenRelays.size).to.equal(1)
|
||||||
|
|
||||||
|
// Relay 1 discovers Relay 2 relayed multiaddr via Relay 3
|
||||||
|
const ma2RelayedBy3 = relayLibp2p2.multiaddrs[relayLibp2p2.multiaddrs.length - 1]
|
||||||
|
relayLibp2p1.peerStore.addressBook.add(relayLibp2p2.peerId, [ma2RelayedBy3])
|
||||||
|
await relayLibp2p1.dial(relayLibp2p2.peerId)
|
||||||
|
|
||||||
|
// Peer not added as listen relay
|
||||||
|
expect(autoRelay1._addListenRelay.callCount).to.equal(1)
|
||||||
|
expect(autoRelay1._listenRelays.size).to.equal(1)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
})
|
@ -72,7 +72,7 @@ describe('Dialing (via relay, TCP)', () => {
|
|||||||
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
|
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
|
||||||
sinon.stub(dstLibp2p.addressManager, 'listen').value([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
|
sinon.stub(dstLibp2p.addressManager, 'listen').value([multiaddr(`/p2p-circuit${relayAddr}/p2p/${relayIdString}`)])
|
||||||
|
|
||||||
await dstLibp2p.transportManager.listen()
|
await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
|
||||||
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
||||||
|
|
||||||
const connection = await srcLibp2p.dial(dialAddr)
|
const connection = await srcLibp2p.dial(dialAddr)
|
||||||
@ -157,7 +157,7 @@ describe('Dialing (via relay, TCP)', () => {
|
|||||||
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
|
const tcpAddrs = dstLibp2p.transportManager.getAddrs()
|
||||||
sinon.stub(dstLibp2p.addressManager, 'getListenAddrs').returns([multiaddr(`${relayAddr}/p2p-circuit`)])
|
sinon.stub(dstLibp2p.addressManager, 'getListenAddrs').returns([multiaddr(`${relayAddr}/p2p-circuit`)])
|
||||||
|
|
||||||
await dstLibp2p.transportManager.listen()
|
await dstLibp2p.transportManager.listen(dstLibp2p.addressManager.getListenAddrs())
|
||||||
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
expect(dstLibp2p.transportManager.getAddrs()).to.have.deep.members([...tcpAddrs, dialAddr.decapsulate('p2p')])
|
||||||
|
|
||||||
// Tamper with the our multiaddrs for the circuit message
|
// Tamper with the our multiaddrs for the circuit message
|
@ -7,9 +7,13 @@ const { expect } = chai
|
|||||||
|
|
||||||
const AddressManager = require('../../src/address-manager')
|
const AddressManager = require('../../src/address-manager')
|
||||||
const TransportManager = require('../../src/transport-manager')
|
const TransportManager = require('../../src/transport-manager')
|
||||||
|
const PeerStore = require('../../src/peer-store')
|
||||||
|
const PeerRecord = require('../../src/record/peer-record')
|
||||||
const Transport = require('libp2p-tcp')
|
const Transport = require('libp2p-tcp')
|
||||||
|
const PeerId = require('peer-id')
|
||||||
const multiaddr = require('multiaddr')
|
const multiaddr = require('multiaddr')
|
||||||
const mockUpgrader = require('../utils/mockUpgrader')
|
const mockUpgrader = require('../utils/mockUpgrader')
|
||||||
|
const Peers = require('../fixtures/peers')
|
||||||
const addrs = [
|
const addrs = [
|
||||||
multiaddr('/ip4/127.0.0.1/tcp/0'),
|
multiaddr('/ip4/127.0.0.1/tcp/0'),
|
||||||
multiaddr('/ip4/127.0.0.1/tcp/0')
|
multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
@ -17,11 +21,19 @@ const addrs = [
|
|||||||
|
|
||||||
describe('Transport Manager (TCP)', () => {
|
describe('Transport Manager (TCP)', () => {
|
||||||
let tm
|
let tm
|
||||||
|
let localPeer
|
||||||
|
|
||||||
before(() => {
|
before(async () => {
|
||||||
|
localPeer = await PeerId.createFromJSON(Peers[0])
|
||||||
|
})
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
tm = new TransportManager({
|
tm = new TransportManager({
|
||||||
libp2p: {
|
libp2p: {
|
||||||
addressManager: new AddressManager({ listen: addrs })
|
peerId: localPeer,
|
||||||
|
multiaddrs: addrs,
|
||||||
|
addressManager: new AddressManager({ listen: addrs }),
|
||||||
|
peerStore: new PeerStore({ peerId: localPeer })
|
||||||
},
|
},
|
||||||
upgrader: mockUpgrader,
|
upgrader: mockUpgrader,
|
||||||
onConnection: () => {}
|
onConnection: () => {}
|
||||||
@ -41,18 +53,38 @@ describe('Transport Manager (TCP)', () => {
|
|||||||
|
|
||||||
it('should be able to listen', async () => {
|
it('should be able to listen', async () => {
|
||||||
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
await tm.listen()
|
await tm.listen(addrs)
|
||||||
expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag])
|
expect(tm._listeners).to.have.key(Transport.prototype[Symbol.toStringTag])
|
||||||
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length)
|
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(addrs.length)
|
||||||
|
|
||||||
// Ephemeral ip addresses may result in multiple listeners
|
// Ephemeral ip addresses may result in multiple listeners
|
||||||
expect(tm.getAddrs().length).to.equal(addrs.length)
|
expect(tm.getAddrs().length).to.equal(addrs.length)
|
||||||
await tm.close()
|
await tm.close()
|
||||||
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(0)
|
expect(tm._listeners.get(Transport.prototype[Symbol.toStringTag])).to.have.length(0)
|
||||||
})
|
})
|
||||||
|
|
||||||
|
it('should create self signed peer record on listen', async () => {
|
||||||
|
let signedPeerRecord = await tm.libp2p.peerStore.addressBook.getPeerRecord(localPeer)
|
||||||
|
expect(signedPeerRecord).to.not.exist()
|
||||||
|
|
||||||
|
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
|
await tm.listen(addrs)
|
||||||
|
|
||||||
|
// Should created Self Peer record on new listen address
|
||||||
|
signedPeerRecord = await tm.libp2p.peerStore.addressBook.getPeerRecord(localPeer)
|
||||||
|
expect(signedPeerRecord).to.exist()
|
||||||
|
|
||||||
|
const record = PeerRecord.createFromProtobuf(signedPeerRecord.payload)
|
||||||
|
expect(record).to.exist()
|
||||||
|
expect(record.multiaddrs.length).to.equal(addrs.length)
|
||||||
|
addrs.forEach((a, i) => {
|
||||||
|
expect(record.multiaddrs[i].equals(a)).to.be.true()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
it('should be able to dial', async () => {
|
it('should be able to dial', async () => {
|
||||||
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
await tm.listen()
|
await tm.listen(addrs)
|
||||||
const addr = tm.getAddrs().shift()
|
const addr = tm.getAddrs().shift()
|
||||||
const connection = await tm.dial(addr)
|
const connection = await tm.dial(addr)
|
||||||
expect(connection).to.exist()
|
expect(connection).to.exist()
|
||||||
|
@ -87,7 +87,7 @@ describe('Transport Manager (WebSockets)', () => {
|
|||||||
it('should fail to listen with no valid address', async () => {
|
it('should fail to listen with no valid address', async () => {
|
||||||
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
tm.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||||
|
|
||||||
await expect(tm.listen())
|
await expect(tm.listen([listenAddr]))
|
||||||
.to.eventually.be.rejected()
|
.to.eventually.be.rejected()
|
||||||
.and.to.have.property('code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
|
.and.to.have.property('code', ErrorCodes.ERR_NO_VALID_ADDRESSES)
|
||||||
})
|
})
|
||||||
|
Reference in New Issue
Block a user