mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-13 17:21:21 +00:00
refactor(async): update transports subsystem (#461)
* test: remove all tests for a clean slate The refactor will require a large number of updates to the tests. In order to ensure we have done a decent deduplication, and have a cleaner suite of tests we've removed all tests. This will also allow us to more easily see tests for the refactored systems. We have a record of the latest test suites in master, so we are not losing any history. * chore: update tcp and websockets * chore: remove other transports until they are converted * chore: use mafmt and multiaddr async versions * chore: add and fix dependencies * chore: clean up travis file * feat: add new transport manager * docs: add constructor jsdocs * refactor(config): check that transports exist This also removes the other logic, it can be added when those subsystems are refactored * chore(deps): use async peer-id and peer-info * feat: wire up the transport manager with libp2p * chore: remove superstruct dep
This commit is contained in:
168
src/index.js
168
src/index.js
@ -9,15 +9,12 @@ const errCode = require('err-code')
|
||||
const promisify = require('promisify-es6')
|
||||
|
||||
const each = require('async/each')
|
||||
const series = require('async/series')
|
||||
const parallel = require('async/parallel')
|
||||
const nextTick = require('async/nextTick')
|
||||
|
||||
const PeerBook = require('peer-book')
|
||||
const PeerInfo = require('peer-info')
|
||||
const Switch = require('./switch')
|
||||
const Ping = require('./ping')
|
||||
const WebSockets = require('libp2p-websockets')
|
||||
const ConnectionManager = require('./connection-manager')
|
||||
|
||||
const { emitFirst } = require('./util')
|
||||
@ -29,6 +26,8 @@ const { getPeerInfoRemote } = require('./get-peer-info')
|
||||
const validateConfig = require('./config').validate
|
||||
const { codes } = require('./errors')
|
||||
|
||||
const TransportManager = require('./transport-manager')
|
||||
|
||||
const notStarted = (action, state) => {
|
||||
return errCode(
|
||||
new Error(`libp2p cannot ${action} when not started; state is ${state}`),
|
||||
@ -67,6 +66,21 @@ class Libp2p extends EventEmitter {
|
||||
this.stats = this._switch.stats
|
||||
this.connectionManager = new ConnectionManager(this, this._options.connectionManager)
|
||||
|
||||
// Setup the transport manager
|
||||
this.transportManager = new TransportManager({
|
||||
libp2p: this,
|
||||
// TODO: set the actual upgrader
|
||||
upgrader: {
|
||||
upgradeInbound: (maConn) => maConn,
|
||||
upgradeOutbound: (maConn) => maConn
|
||||
},
|
||||
// TODO: Route incoming connections to a multiplex protocol router
|
||||
onConnection: () => {}
|
||||
})
|
||||
this._modules.transport.forEach((Transport) => {
|
||||
this.transportManager.add(Transport.prototype[Symbol.toStringTag], Transport)
|
||||
})
|
||||
|
||||
// Attach stream multiplexers
|
||||
if (this._modules.streamMuxer) {
|
||||
const muxers = this._modules.streamMuxer
|
||||
@ -336,146 +350,42 @@ class Libp2p extends EventEmitter {
|
||||
this._switch.unhandle(protocol)
|
||||
}
|
||||
|
||||
_onStarting () {
|
||||
async _onStarting () {
|
||||
if (!this._modules.transport) {
|
||||
this.emit('error', new Error('no transports were present'))
|
||||
return this.state('abort')
|
||||
}
|
||||
|
||||
let ws
|
||||
|
||||
// so that we can have webrtc-star addrs without adding manually the id
|
||||
const maOld = []
|
||||
const maNew = []
|
||||
this.peerInfo.multiaddrs.toArray().forEach((ma) => {
|
||||
if (!ma.getPeerId()) {
|
||||
maOld.push(ma)
|
||||
maNew.push(ma.encapsulate('/p2p/' + this.peerInfo.id.toB58String()))
|
||||
}
|
||||
})
|
||||
this.peerInfo.multiaddrs.replace(maOld, maNew)
|
||||
|
||||
const multiaddrs = this.peerInfo.multiaddrs.toArray()
|
||||
|
||||
this._modules.transport.forEach((Transport) => {
|
||||
let t
|
||||
// Start parallel tasks
|
||||
try {
|
||||
await Promise.all([
|
||||
this.transportManager.listen(multiaddrs)
|
||||
])
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
this.emit('error', err)
|
||||
return this.state('stop')
|
||||
}
|
||||
|
||||
if (typeof Transport === 'function') {
|
||||
t = new Transport({ libp2p: this })
|
||||
} else {
|
||||
t = Transport
|
||||
}
|
||||
|
||||
if (t.filter(multiaddrs).length > 0) {
|
||||
this._switch.transport.add(t.tag || t[Symbol.toStringTag], t)
|
||||
} else if (WebSockets.isWebSockets(t)) {
|
||||
// TODO find a cleaner way to signal that a transport is always used
|
||||
// for dialing, even if no listener
|
||||
ws = t
|
||||
}
|
||||
this._transport.push(t)
|
||||
})
|
||||
|
||||
series([
|
||||
(cb) => {
|
||||
this.connectionManager.start()
|
||||
this._switch.start(cb)
|
||||
},
|
||||
(cb) => {
|
||||
if (ws) {
|
||||
// always add dialing on websockets
|
||||
this._switch.transport.add(ws.tag || ws.constructor.name, ws)
|
||||
}
|
||||
|
||||
// detect which multiaddrs we don't have a transport for and remove them
|
||||
const multiaddrs = this.peerInfo.multiaddrs.toArray()
|
||||
|
||||
multiaddrs.forEach((multiaddr) => {
|
||||
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
|
||||
!this._transport.find((transport) => transport.filter(multiaddr).length > 0)) {
|
||||
this.peerInfo.multiaddrs.delete(multiaddr)
|
||||
}
|
||||
})
|
||||
cb()
|
||||
},
|
||||
(cb) => {
|
||||
if (this._dht) {
|
||||
this._dht.start(() => {
|
||||
this._dht.on('peer', this._peerDiscovered)
|
||||
cb()
|
||||
})
|
||||
} else {
|
||||
cb()
|
||||
}
|
||||
},
|
||||
(cb) => {
|
||||
if (this.pubsub) {
|
||||
return this.pubsub.start(cb)
|
||||
}
|
||||
cb()
|
||||
},
|
||||
// Peer Discovery
|
||||
(cb) => {
|
||||
if (this._modules.peerDiscovery) {
|
||||
this._setupPeerDiscovery(cb)
|
||||
} else {
|
||||
cb()
|
||||
}
|
||||
}
|
||||
], (err) => {
|
||||
if (err) {
|
||||
log.error(err)
|
||||
this.emit('error', err)
|
||||
return this.state('stop')
|
||||
}
|
||||
this.state('done')
|
||||
})
|
||||
// libp2p has started
|
||||
this.state('done')
|
||||
}
|
||||
|
||||
_onStopping () {
|
||||
series([
|
||||
(cb) => {
|
||||
// stop all discoveries before continuing with shutdown
|
||||
parallel(
|
||||
this._discovery.map((d) => {
|
||||
d.removeListener('peer', this._peerDiscovered)
|
||||
return (_cb) => d.stop((err) => {
|
||||
log.error('an error occurred stopping the discovery service', err)
|
||||
_cb()
|
||||
})
|
||||
}),
|
||||
cb
|
||||
)
|
||||
},
|
||||
(cb) => {
|
||||
if (this.pubsub) {
|
||||
return this.pubsub.stop(cb)
|
||||
}
|
||||
cb()
|
||||
},
|
||||
(cb) => {
|
||||
if (this._dht) {
|
||||
this._dht.removeListener('peer', this._peerDiscovered)
|
||||
return this._dht.stop(cb)
|
||||
}
|
||||
cb()
|
||||
},
|
||||
(cb) => {
|
||||
this.connectionManager.stop()
|
||||
this._switch.stop(cb)
|
||||
},
|
||||
(cb) => {
|
||||
// Ensures idempotent restarts, ignore any errors
|
||||
// from removeAll, they're not useful at this point
|
||||
this._switch.transport.removeAll(() => cb())
|
||||
}
|
||||
], (err) => {
|
||||
async _onStopping () {
|
||||
// Start parallel tasks
|
||||
try {
|
||||
await this.transportManager.close()
|
||||
} catch (err) {
|
||||
if (err) {
|
||||
log.error(err)
|
||||
this.emit('error', err)
|
||||
}
|
||||
this.state('done')
|
||||
})
|
||||
}
|
||||
|
||||
// libp2p has stopped
|
||||
this.state('done')
|
||||
}
|
||||
|
||||
/**
|
||||
|
Reference in New Issue
Block a user