mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-25 23:11:35 +00:00
refactor: async identify and identify push (#473)
* chore: add missing dep * feat: import from identify push branch https://github.com/libp2p/js-libp2p-identify/tree/feat/identify-push * feat: add the connection to stream handlers * refactor: identify to async/await * chore: fix lint * test: add identify tests * refactor: add identify to the dialer flow * feat: connect identify to the registrar * fix: resolve review feedback * fix: perform identify push when our protocols change
This commit is contained in:
@ -1,5 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
const nextTick = require('async/nextTick')
|
||||
const multiaddr = require('multiaddr')
|
||||
const errCode = require('err-code')
|
||||
const { default: PQueue } = require('p-queue')
|
||||
@ -31,6 +32,22 @@ class Dialer {
|
||||
this.concurrency = concurrency
|
||||
this.timeout = timeout
|
||||
this.queue = new PQueue({ concurrency, timeout, throwOnTimeout: true })
|
||||
|
||||
/**
|
||||
* @property {IdentifyService}
|
||||
*/
|
||||
this._identifyService = null
|
||||
}
|
||||
|
||||
set identifyService (service) {
|
||||
this._identifyService = service
|
||||
}
|
||||
|
||||
/**
|
||||
* @type {IdentifyService}
|
||||
*/
|
||||
get identifyService () {
|
||||
return this._identifyService
|
||||
}
|
||||
|
||||
/**
|
||||
@ -64,6 +81,18 @@ class Dialer {
|
||||
throw err
|
||||
}
|
||||
|
||||
// Perform a delayed Identify handshake
|
||||
if (this.identifyService) {
|
||||
nextTick(async () => {
|
||||
try {
|
||||
await this.identifyService.identify(conn, conn.remotePeer)
|
||||
// TODO: Update the PeerStore with the information from identify
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
return conn
|
||||
}
|
||||
|
||||
|
@ -8,6 +8,7 @@ exports.messages = {
|
||||
exports.codes = {
|
||||
DHT_DISABLED: 'ERR_DHT_DISABLED',
|
||||
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
|
||||
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
|
||||
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',
|
||||
ERR_NODE_NOT_STARTED: 'ERR_NODE_NOT_STARTED',
|
||||
ERR_NO_VALID_ADDRESSES: 'ERR_NO_VALID_ADDRESSES',
|
||||
@ -15,6 +16,8 @@ exports.codes = {
|
||||
ERR_DUPLICATE_TRANSPORT: 'ERR_DUPLICATE_TRANSPORT',
|
||||
ERR_ENCRYPTION_FAILED: 'ERR_ENCRYPTION_FAILED',
|
||||
ERR_INVALID_KEY: 'ERR_INVALID_KEY',
|
||||
ERR_INVALID_MESSAGE: 'ERR_INVALID_MESSAGE',
|
||||
ERR_INVALID_PEER: 'ERR_INVALID_PEER',
|
||||
ERR_MUXER_UNAVAILABLE: 'ERR_MUXER_UNAVAILABLE',
|
||||
ERR_TIMEOUT: 'ERR_TIMEOUT',
|
||||
ERR_TRANSPORT_UNAVAILABLE: 'ERR_TRANSPORT_UNAVAILABLE',
|
||||
|
@ -6,32 +6,8 @@
|
||||
|
||||
## Description
|
||||
|
||||
Identify is a STUN protocol, used by libp2p-swarm in order to broadcast and learn about the `ip:port` pairs a specific peer is available through and to know when a new stream muxer is established, so a conn can be reused.
|
||||
Identify is a STUN protocol, used by libp2p in order to broadcast and learn about the `ip:port` pairs a specific peer is available through and to know when a new stream muxer is established, so a conn can be reused.
|
||||
|
||||
## How does it work
|
||||
|
||||
Best way to understand the current design is through this issue: https://github.com/libp2p/js-libp2p-swarm/issues/78
|
||||
|
||||
### This module uses `pull-streams`
|
||||
|
||||
We expose a streaming interface based on `pull-streams`, rather then on the Node.js core streams implementation (aka Node.js streams). `pull-streams` offers us a better mechanism for error handling and flow control guarantees. If you would like to know more about why we did this, see the discussion at this [issue](https://github.com/ipfs/js-ipfs/issues/362).
|
||||
|
||||
You can learn more about pull-streams at:
|
||||
|
||||
- [The history of Node.js streams, nodebp April 2014](https://www.youtube.com/watch?v=g5ewQEuXjsQ)
|
||||
- [The history of streams, 2016](http://dominictarr.com/post/145135293917/history-of-streams)
|
||||
- [pull-streams, the simple streaming primitive](http://dominictarr.com/post/149248845122/pull-streams-pull-streams-are-a-very-simple)
|
||||
- [pull-streams documentation](https://pull-stream.github.io/)
|
||||
|
||||
#### Converting `pull-streams` to Node.js Streams
|
||||
|
||||
If you are a Node.js streams user, you can convert a pull-stream to a Node.js stream using the module [`pull-stream-to-stream`](https://github.com/pull-stream/pull-stream-to-stream), giving you an instance of a Node.js stream that is linked to the pull-stream. For example:
|
||||
|
||||
```js
|
||||
const pullToStream = require('pull-stream-to-stream')
|
||||
|
||||
const nodeStreamInstance = pullToStream(pullStreamInstance)
|
||||
// nodeStreamInstance is an instance of a Node.js Stream
|
||||
```
|
||||
|
||||
To learn more about this utility, visit https://pull-stream.github.io/#pull-stream-to-stream.
|
||||
The spec for Identify and Identify Push is at [libp2p/specs](https://github.com/libp2p/specs/tree/master/identify).
|
||||
|
6
src/identify/consts.js
Normal file
6
src/identify/consts.js
Normal file
@ -0,0 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
module.exports.PROTOCOL_VERSION = 'ipfs/0.1.0'
|
||||
module.exports.AGENT_VERSION = 'js-libp2p/0.1.0'
|
||||
module.exports.MULTICODEC_IDENTIFY = '/ipfs/id/1.0.0'
|
||||
module.exports.MULTICODEC_IDENTIFY_PUSH = '/ipfs/id/push/1.0.0'
|
@ -1,87 +0,0 @@
|
||||
'use strict'
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const multiaddr = require('multiaddr')
|
||||
const pull = require('pull-stream/pull')
|
||||
const take = require('pull-stream/throughs/take')
|
||||
const collect = require('pull-stream/sinks/collect')
|
||||
const lp = require('pull-length-prefixed')
|
||||
|
||||
const msg = require('./message')
|
||||
|
||||
module.exports = (conn, expectedPeerInfo, callback) => {
|
||||
if (typeof expectedPeerInfo === 'function') {
|
||||
callback = expectedPeerInfo
|
||||
expectedPeerInfo = null
|
||||
// eslint-disable-next-line no-console
|
||||
console.warn('WARNING: no expected peer info was given, identify will not be able to verify peer integrity')
|
||||
}
|
||||
|
||||
pull(
|
||||
conn,
|
||||
lp.decode(),
|
||||
take(1),
|
||||
collect((err, data) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
// connection got closed graciously
|
||||
if (data.length === 0) {
|
||||
return callback(new Error('conn was closed, did not receive data'))
|
||||
}
|
||||
|
||||
const input = msg.decode(data[0])
|
||||
|
||||
PeerId.createFromPubKey(input.publicKey, (err, id) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
const peerInfo = new PeerInfo(id)
|
||||
if (expectedPeerInfo && expectedPeerInfo.id.toB58String() !== id.toB58String()) {
|
||||
return callback(new Error('invalid peer'))
|
||||
}
|
||||
|
||||
try {
|
||||
input.listenAddrs
|
||||
.map(multiaddr)
|
||||
.forEach((ma) => peerInfo.multiaddrs.add(ma))
|
||||
} catch (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
let observedAddr
|
||||
|
||||
try {
|
||||
observedAddr = getObservedAddrs(input)
|
||||
} catch (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
// Copy the protocols
|
||||
peerInfo.protocols = new Set(input.protocols)
|
||||
|
||||
callback(null, peerInfo, observedAddr)
|
||||
})
|
||||
})
|
||||
)
|
||||
}
|
||||
|
||||
function getObservedAddrs (input) {
|
||||
if (!hasObservedAddr(input)) {
|
||||
return []
|
||||
}
|
||||
|
||||
let addrs = input.observedAddr
|
||||
|
||||
if (!Array.isArray(addrs)) {
|
||||
addrs = [addrs]
|
||||
}
|
||||
|
||||
return addrs.map((oa) => multiaddr(oa))
|
||||
}
|
||||
|
||||
function hasObservedAddr (input) {
|
||||
return input.observedAddr && input.observedAddr.length > 0
|
||||
}
|
@ -1,7 +1,299 @@
|
||||
'use strict'
|
||||
|
||||
exports = module.exports
|
||||
exports.multicodec = '/ipfs/id/1.0.0'
|
||||
exports.listener = require('./listener')
|
||||
exports.dialer = require('./dialer')
|
||||
exports.message = require('./message')
|
||||
const debug = require('debug')
|
||||
const pb = require('it-protocol-buffers')
|
||||
const lp = require('it-length-prefixed')
|
||||
const pipe = require('it-pipe')
|
||||
const { collect, take } = require('streaming-iterables')
|
||||
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const multiaddr = require('multiaddr')
|
||||
const { toBuffer } = require('../util')
|
||||
|
||||
const Message = require('./message')
|
||||
|
||||
const log = debug('libp2p:identify')
|
||||
log.error = debug('libp2p:identify:error')
|
||||
|
||||
const {
|
||||
MULTICODEC_IDENTIFY,
|
||||
MULTICODEC_IDENTIFY_PUSH,
|
||||
AGENT_VERSION,
|
||||
PROTOCOL_VERSION
|
||||
} = require('./consts')
|
||||
|
||||
const errCode = require('err-code')
|
||||
const { codes } = require('../errors')
|
||||
|
||||
class IdentifyService {
|
||||
/**
|
||||
* Replaces the multiaddrs on the given `peerInfo`,
|
||||
* with the provided `multiaddrs`
|
||||
* @param {PeerInfo} peerInfo
|
||||
* @param {Array<Multiaddr>|Array<Buffer>} multiaddrs
|
||||
*/
|
||||
static updatePeerAddresses (peerInfo, multiaddrs) {
|
||||
if (multiaddrs && multiaddrs.length > 0) {
|
||||
peerInfo.multiaddrs.clear()
|
||||
multiaddrs.forEach(ma => {
|
||||
try {
|
||||
peerInfo.multiaddrs.add(ma)
|
||||
} catch (err) {
|
||||
log.error('could not add multiaddr', err)
|
||||
}
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Replaces the protocols on the given `peerInfo`,
|
||||
* with the provided `protocols`
|
||||
* @static
|
||||
* @param {PeerInfo} peerInfo
|
||||
* @param {Array<string>} protocols
|
||||
*/
|
||||
static updatePeerProtocols (peerInfo, protocols) {
|
||||
if (protocols && protocols.length > 0) {
|
||||
peerInfo.protocols.clear()
|
||||
protocols.forEach(proto => peerInfo.protocols.add(proto))
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Takes the `addr` and converts it to a Multiaddr if possible
|
||||
* @param {Buffer|String} addr
|
||||
* @returns {Multiaddr|null}
|
||||
*/
|
||||
static getCleanMultiaddr (addr) {
|
||||
if (addr && addr.length > 0) {
|
||||
try {
|
||||
return multiaddr(addr)
|
||||
} catch (_) {
|
||||
return null
|
||||
}
|
||||
}
|
||||
return null
|
||||
}
|
||||
|
||||
/**
|
||||
* @constructor
|
||||
* @param {object} options
|
||||
* @param {Registrar} options.registrar
|
||||
* @param {Map<string, handler>} options.protocols A reference to the protocols we support
|
||||
* @param {PeerInfo} options.peerInfo The peer running the identify service
|
||||
*/
|
||||
constructor (options) {
|
||||
/**
|
||||
* @property {Registrar}
|
||||
*/
|
||||
this.registrar = options.registrar
|
||||
/**
|
||||
* @property {PeerInfo}
|
||||
*/
|
||||
this.peerInfo = options.peerInfo
|
||||
|
||||
this._protocols = options.protocols
|
||||
|
||||
this.handleMessage = this.handleMessage.bind(this)
|
||||
}
|
||||
|
||||
/**
|
||||
* Send an Identify Push update to the list of connections
|
||||
* @param {Array<Connection>} connections
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
push (connections) {
|
||||
const pushes = connections.map(async connection => {
|
||||
try {
|
||||
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY_PUSH)
|
||||
|
||||
await pipe(
|
||||
[{
|
||||
listenAddrs: this.peerInfo.multiaddrs.toArray().map((ma) => ma.buffer),
|
||||
protocols: Array.from(this._protocols.keys())
|
||||
}],
|
||||
pb.encode(Message),
|
||||
stream
|
||||
)
|
||||
} catch (err) {
|
||||
// Just log errors
|
||||
log.error('could not push identify update to peer', err)
|
||||
}
|
||||
})
|
||||
|
||||
return Promise.all(pushes)
|
||||
}
|
||||
|
||||
/**
|
||||
* Calls `push` for all peers in the `peerStore` that are connected
|
||||
* @param {PeerStore} peerStore
|
||||
*/
|
||||
pushToPeerStore (peerStore) {
|
||||
const connections = []
|
||||
let connection
|
||||
for (const peer of peerStore.peers.values()) {
|
||||
if (peer.protocols.has(MULTICODEC_IDENTIFY_PUSH) && (connection = this.registrar.getConnection(peer))) {
|
||||
connections.push(connection)
|
||||
}
|
||||
}
|
||||
|
||||
this.push(connections)
|
||||
}
|
||||
|
||||
/**
|
||||
* Requests the `Identify` message from peer associated with the given `connection`.
|
||||
* If the identified peer does not match the `PeerId` associated with the connection,
|
||||
* an error will be thrown.
|
||||
*
|
||||
* @async
|
||||
* @param {Connection} connection
|
||||
* @param {PeerID} expectedPeer The PeerId the identify response should match
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async identify (connection, expectedPeer) {
|
||||
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY)
|
||||
const [data] = await pipe(
|
||||
stream,
|
||||
lp.decode(),
|
||||
take(1),
|
||||
toBuffer,
|
||||
collect
|
||||
)
|
||||
|
||||
if (!data) {
|
||||
throw errCode(new Error('No data could be retrieved'), codes.ERR_CONNECTION_ENDED)
|
||||
}
|
||||
|
||||
let message
|
||||
try {
|
||||
message = Message.decode(data)
|
||||
} catch (err) {
|
||||
throw errCode(err, codes.ERR_INVALID_MESSAGE)
|
||||
}
|
||||
|
||||
let {
|
||||
publicKey,
|
||||
listenAddrs,
|
||||
protocols,
|
||||
observedAddr
|
||||
} = message
|
||||
|
||||
const id = await PeerId.createFromPubKey(publicKey)
|
||||
const peerInfo = new PeerInfo(id)
|
||||
if (expectedPeer && expectedPeer.toB58String() !== id.toB58String()) {
|
||||
throw errCode(new Error('identified peer does not match the expected peer'), codes.ERR_INVALID_PEER)
|
||||
}
|
||||
|
||||
// Get the observedAddr if there is one
|
||||
observedAddr = IdentifyService.getCleanMultiaddr(observedAddr)
|
||||
|
||||
// Copy the listenAddrs and protocols
|
||||
IdentifyService.updatePeerAddresses(peerInfo, listenAddrs)
|
||||
IdentifyService.updatePeerProtocols(peerInfo, protocols)
|
||||
|
||||
this.registrar.peerStore.update(peerInfo)
|
||||
// TODO: Track our observed address so that we can score it
|
||||
log('received observed address of %s', observedAddr)
|
||||
}
|
||||
|
||||
/**
|
||||
* A handler to register with Libp2p to process identify messages.
|
||||
*
|
||||
* @param {object} options
|
||||
* @param {String} options.protocol
|
||||
* @param {*} options.stream
|
||||
* @param {Connection} options.connection
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
handleMessage ({ connection, stream, protocol }) {
|
||||
switch (protocol) {
|
||||
case MULTICODEC_IDENTIFY:
|
||||
return this._handleIdentify({ connection, stream })
|
||||
case MULTICODEC_IDENTIFY_PUSH:
|
||||
return this._handlePush({ connection, stream })
|
||||
default:
|
||||
log.error('cannot handle unknown protocol %s', protocol)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the `Identify` response to the requesting peer over the
|
||||
* given `connection`
|
||||
* @private
|
||||
* @param {object} options
|
||||
* @param {*} options.stream
|
||||
* @param {Connection} options.connection
|
||||
*/
|
||||
_handleIdentify ({ connection, stream }) {
|
||||
let publicKey = Buffer.alloc(0)
|
||||
if (this.peerInfo.id.pubKey) {
|
||||
publicKey = this.peerInfo.id.pubKey.bytes
|
||||
}
|
||||
|
||||
const message = Message.encode({
|
||||
protocolVersion: PROTOCOL_VERSION,
|
||||
agentVersion: AGENT_VERSION,
|
||||
publicKey,
|
||||
listenAddrs: this.peerInfo.multiaddrs.toArray().map((ma) => ma.buffer),
|
||||
observedAddr: connection.remoteAddr.buffer,
|
||||
protocols: Array.from(this._protocols.keys())
|
||||
})
|
||||
|
||||
pipe(
|
||||
[message],
|
||||
lp.encode(),
|
||||
stream
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the Identify Push message from the given `connection`
|
||||
* @private
|
||||
* @param {object} options
|
||||
* @param {*} options.stream
|
||||
* @param {Connection} options.connection
|
||||
*/
|
||||
async _handlePush ({ connection, stream }) {
|
||||
const [data] = await pipe(
|
||||
stream,
|
||||
lp.decode(),
|
||||
take(1),
|
||||
toBuffer,
|
||||
collect
|
||||
)
|
||||
|
||||
let message
|
||||
try {
|
||||
message = Message.decode(data)
|
||||
} catch (err) {
|
||||
return log.error('received invalid message', err)
|
||||
}
|
||||
|
||||
// Update the listen addresses
|
||||
const peerInfo = new PeerInfo(connection.remotePeer)
|
||||
|
||||
try {
|
||||
IdentifyService.updatePeerAddresses(peerInfo, message.listenAddrs)
|
||||
} catch (err) {
|
||||
return log.error('received invalid listen addrs', err)
|
||||
}
|
||||
|
||||
// Update the protocols
|
||||
IdentifyService.updatePeerProtocols(peerInfo, message.protocols)
|
||||
|
||||
// Update the peer in the PeerStore
|
||||
this.registrar.peerStore.update(peerInfo)
|
||||
}
|
||||
}
|
||||
|
||||
module.exports.IdentifyService = IdentifyService
|
||||
/**
|
||||
* The protocols the IdentifyService supports
|
||||
* @property multicodecs
|
||||
*/
|
||||
module.exports.multicodecs = {
|
||||
IDENTIFY: MULTICODEC_IDENTIFY,
|
||||
IDENTIFY_PUSH: MULTICODEC_IDENTIFY_PUSH
|
||||
}
|
||||
module.exports.Message = Message
|
||||
|
@ -1,35 +0,0 @@
|
||||
'use strict'
|
||||
|
||||
const pull = require('pull-stream/pull')
|
||||
const values = require('pull-stream/sources/values')
|
||||
const lp = require('pull-length-prefixed')
|
||||
|
||||
const msg = require('./message')
|
||||
|
||||
module.exports = (conn, pInfoSelf) => {
|
||||
// send what I see from the other + my Info
|
||||
conn.getObservedAddrs((err, observedAddrs) => {
|
||||
if (err) { return }
|
||||
observedAddrs = observedAddrs[0]
|
||||
|
||||
let publicKey = Buffer.alloc(0)
|
||||
if (pInfoSelf.id.pubKey) {
|
||||
publicKey = pInfoSelf.id.pubKey.bytes
|
||||
}
|
||||
|
||||
const msgSend = msg.encode({
|
||||
protocolVersion: 'ipfs/0.1.0',
|
||||
agentVersion: 'na',
|
||||
publicKey: publicKey,
|
||||
listenAddrs: pInfoSelf.multiaddrs.toArray().map((ma) => ma.buffer),
|
||||
observedAddr: observedAddrs ? observedAddrs.buffer : Buffer.from(''),
|
||||
protocols: Array.from(pInfoSelf.protocols)
|
||||
})
|
||||
|
||||
pull(
|
||||
values([msgSend]),
|
||||
lp.encode(),
|
||||
conn
|
||||
)
|
||||
})
|
||||
}
|
35
src/index.js
35
src/index.js
@ -30,6 +30,10 @@ const TransportManager = require('./transport-manager')
|
||||
const Upgrader = require('./upgrader')
|
||||
const PeerStore = require('./peer-store')
|
||||
const Registrar = require('./registrar')
|
||||
const {
|
||||
IdentifyService,
|
||||
multicodecs: IDENTIFY_PROTOCOLS
|
||||
} = require('./identify')
|
||||
|
||||
const notStarted = (action, state) => {
|
||||
return errCode(
|
||||
@ -83,6 +87,11 @@ class Libp2p extends EventEmitter {
|
||||
}
|
||||
})
|
||||
|
||||
// Create the Registrar
|
||||
this.registrar = new Registrar({ peerStore: this.peerStore })
|
||||
this.handle = this.handle.bind(this)
|
||||
this.registrar.handle = this.handle
|
||||
|
||||
// Setup the transport manager
|
||||
this.transportManager = new TransportManager({
|
||||
libp2p: this,
|
||||
@ -100,22 +109,26 @@ class Libp2p extends EventEmitter {
|
||||
})
|
||||
}
|
||||
|
||||
this.dialer = new Dialer({
|
||||
transportManager: this.transportManager
|
||||
})
|
||||
|
||||
// Attach stream multiplexers
|
||||
if (this._modules.streamMuxer) {
|
||||
const muxers = this._modules.streamMuxer
|
||||
muxers.forEach((muxer) => {
|
||||
this.upgrader.muxers.set(muxer.multicodec, muxer)
|
||||
})
|
||||
|
||||
// Add the identify service since we can multiplex
|
||||
this.dialer.identifyService = new IdentifyService({
|
||||
registrar: this.registrar,
|
||||
peerInfo: this.peerInfo,
|
||||
protocols: this.upgrader.protocols
|
||||
})
|
||||
this.handle(Object.values(IDENTIFY_PROTOCOLS), this.dialer.identifyService.handleMessage)
|
||||
}
|
||||
|
||||
this.dialer = new Dialer({
|
||||
transportManager: this.transportManager
|
||||
})
|
||||
|
||||
this.registrar = new Registrar({ peerStore: this.peerStore })
|
||||
this.handle = this.handle.bind(this)
|
||||
this.registrar.handle = this.handle
|
||||
|
||||
// Attach private network protector
|
||||
if (this._modules.connProtector) {
|
||||
this.upgrader.protector = this._modules.connProtector
|
||||
@ -338,13 +351,15 @@ class Libp2p extends EventEmitter {
|
||||
/**
|
||||
* Registers the `handler` for each protocol
|
||||
* @param {string[]|string} protocols
|
||||
* @param {function({ stream:*, protocol:string })} handler
|
||||
* @param {function({ connection:*, stream:*, protocol:string })} handler
|
||||
*/
|
||||
handle (protocols, handler) {
|
||||
protocols = Array.isArray(protocols) ? protocols : [protocols]
|
||||
protocols.forEach(protocol => {
|
||||
this.upgrader.protocols.set(protocol, handler)
|
||||
})
|
||||
|
||||
this.dialer.identifyService.pushToPeerStore(this.peerStore)
|
||||
}
|
||||
|
||||
/**
|
||||
@ -357,6 +372,8 @@ class Libp2p extends EventEmitter {
|
||||
protocols.forEach(protocol => {
|
||||
this.upgrader.protocols.delete(protocol)
|
||||
})
|
||||
|
||||
this.dialer.identifyService.pushToPeerStore(this.peerStore)
|
||||
}
|
||||
|
||||
async _onStarting () {
|
||||
|
@ -182,10 +182,14 @@ class Upgrader {
|
||||
// Run anytime a remote stream is created
|
||||
onStream: async muxedStream => {
|
||||
const mss = new Multistream.Listener(muxedStream)
|
||||
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
|
||||
log('%s: incoming stream opened on %s', direction, protocol)
|
||||
connection.addStream(stream, protocol)
|
||||
this._onStream({ stream, protocol })
|
||||
try {
|
||||
const { stream, protocol } = await mss.handle(Array.from(this.protocols.keys()))
|
||||
log('%s: incoming stream opened on %s', direction, protocol)
|
||||
connection.addStream(stream, protocol)
|
||||
this._onStream({ connection, stream, protocol })
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
},
|
||||
// Run anytime a stream closes
|
||||
onStreamEnd: muxedStream => {
|
||||
@ -246,12 +250,13 @@ class Upgrader {
|
||||
* Routes incoming streams to the correct handler
|
||||
* @private
|
||||
* @param {object} options
|
||||
* @param {Connection} options.connection The connection the stream belongs to
|
||||
* @param {Stream} options.stream
|
||||
* @param {string} protocol
|
||||
* @param {string} options.protocol
|
||||
*/
|
||||
_onStream ({ stream, protocol }) {
|
||||
_onStream ({ connection, stream, protocol }) {
|
||||
const handler = this.protocols.get(protocol)
|
||||
handler({ stream, protocol })
|
||||
handler({ connection, stream, protocol })
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -30,4 +30,18 @@ function emitFirst (emitter, events, handler) {
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
* Converts BufferList messages to Buffers
|
||||
* @param {*} source
|
||||
* @returns {AsyncGenerator}
|
||||
*/
|
||||
function toBuffer (source) {
|
||||
return (async function * () {
|
||||
for await (const chunk of source) {
|
||||
yield Buffer.isBuffer(chunk) ? chunk : chunk.slice()
|
||||
}
|
||||
})()
|
||||
}
|
||||
|
||||
module.exports.emitFirst = emitFirst
|
||||
module.exports.toBuffer = toBuffer
|
||||
|
Reference in New Issue
Block a user