feat: (BREAKING CHANGE) overhaul libp2p config and constructor

* docs: update chat example and add info to its readme
* docs: update echo example
* docs: update libp2p in browser example
* docs: update pubsub example
* docs: update peer and content routing examples
* docs: update discovery mechanisms example
* docs: update encrypted comms example
* docs: update protocol and stream muxing example
* feat: add config validation
* test: update CI configs, use only node 8
This commit is contained in:
David Dias
2018-06-28 10:06:25 +02:00
committed by GitHub
parent b80e89269c
commit 6905f1ba41
56 changed files with 1401 additions and 789 deletions

View File

@ -3,9 +3,9 @@
const EventEmitter = require('events').EventEmitter
const assert = require('assert')
const setImmediate = require('async/setImmediate')
const each = require('async/each')
const series = require('async/series')
const parallel = require('async/parallel')
const PeerBook = require('peer-book')
const Switch = require('libp2p-switch')
@ -18,88 +18,88 @@ const contentRouting = require('./content-routing')
const dht = require('./dht')
const pubsub = require('./pubsub')
const getPeerInfo = require('./get-peer-info')
const validateConfig = require('./config').validate
exports = module.exports
const NOT_STARTED_ERROR_MESSAGE = 'The libp2p node is not started yet'
class Node extends EventEmitter {
constructor (_modules, _peerInfo, _peerBook, _options) {
constructor (_options) {
super()
assert(_modules, 'requires modules to equip libp2p with features')
assert(_peerInfo, 'requires a PeerInfo instance')
// validateConfig will ensure the config is correct,
// and add default values where appropriate
_options = validateConfig(_options)
this.modules = _modules
this.peerInfo = _peerInfo
this.peerBook = _peerBook || new PeerBook()
_options = _options || {}
this.peerInfo = _options.peerInfo
this.peerBook = _options.peerBook || new PeerBook()
this._modules = _options.modules
this._config = _options.config
this._isStarted = false
this._transport = [] // Transport instances/references
this._discovery = [] // Discovery service instances/references
this.switch = new Switch(this.peerInfo, this.peerBook, _options.switch)
this.stats = this.switch.stats
this._switch = new Switch(this.peerInfo, this.peerBook, _options.switch)
this.stats = this._switch.stats
this.connectionManager = new ConnectionManager(this, _options.connectionManager)
// Attach stream multiplexers
if (this.modules.connection && this.modules.connection.muxer) {
let muxers = this.modules.connection.muxer
muxers = Array.isArray(muxers) ? muxers : [muxers]
muxers.forEach((muxer) => this.switch.connection.addStreamMuxer(muxer))
if (this._modules.streamMuxer) {
let muxers = this._modules.streamMuxer
muxers.forEach((muxer) => this._switch.connection.addStreamMuxer(muxer))
// If muxer exists, we can use Identify
this.switch.connection.reuse()
// If muxer exists
// we can use Identify
this._switch.connection.reuse()
// we can use Relay for listening/dialing
this._switch.connection.enableCircuitRelay(this._config.relay)
// If muxer exists, we can use Relay for listening/dialing
this.switch.connection.enableCircuitRelay(_options.relay)
// Received incommind dial and muxer upgrade happened,
// Received incomming dial and muxer upgrade happened,
// reuse this muxed connection
this.switch.on('peer-mux-established', (peerInfo) => {
this._switch.on('peer-mux-established', (peerInfo) => {
this.emit('peer:connect', peerInfo)
this.peerBook.put(peerInfo)
})
this.switch.on('peer-mux-closed', (peerInfo) => {
this._switch.on('peer-mux-closed', (peerInfo) => {
this.emit('peer:disconnect', peerInfo)
})
}
// Attach crypto channels
if (this.modules.connection && this.modules.connection.crypto) {
let cryptos = this.modules.connection.crypto
cryptos = Array.isArray(cryptos) ? cryptos : [cryptos]
if (this._modules.connEncryption) {
let cryptos = this._modules.connEncryption
cryptos.forEach((crypto) => {
this.switch.connection.crypto(crypto.tag, crypto.encrypt)
})
}
// Attach discovery mechanisms
if (this.modules.discovery) {
let discoveries = this.modules.discovery
discoveries = Array.isArray(discoveries) ? discoveries : [discoveries]
discoveries.forEach((discovery) => {
discovery.on('peer', (peerInfo) => this.emit('peer:discovery', peerInfo))
this._switch.connection.crypto(crypto.tag, crypto.encrypt)
})
}
// dht provided components (peerRouting, contentRouting, dht)
if (_modules.DHT) {
this._dht = new this.modules.DHT(this.switch, {
kBucketSize: 20,
datastore: _options.DHT && _options.DHT.datastore
if (this._config.EXPERIMENTAL.dht) {
const DHT = this._modules.dht
this._dht = new DHT(this._switch, {
kBucketSize: this._config.dht.kBucketSize || 20,
// TODO make datastore an option of libp2p itself so
// that other things can use it as well
datastore: dht.datastore
})
}
// enable/disable pubsub
if (this._config.EXPERIMENTAL && this._config.EXPERIMENTAL.pubsub) {
this.pubsub = pubsub(this)
}
// Attach remaining APIs
this.peerRouting = peerRouting(this)
this.contentRouting = contentRouting(this)
this.dht = dht(this)
this.pubsub = pubsub(this)
this._getPeerInfo = getPeerInfo(this)
// Mount default protocols
Ping.mount(this.switch)
Ping.mount(this._switch)
}
/*
@ -107,14 +107,11 @@ class Node extends EventEmitter {
* - create listeners on the multiaddrs the Peer wants to listen
*/
start (callback) {
if (!this.modules.transport) {
if (!this._modules.transport) {
return callback(new Error('no transports were present'))
}
let ws
let transports = this.modules.transport
transports = Array.isArray(transports) ? transports : [transports]
// so that we can have webrtc-star addrs without adding manually the id
const maOld = []
@ -128,30 +125,58 @@ class Node extends EventEmitter {
this.peerInfo.multiaddrs.replace(maOld, maNew)
const multiaddrs = this.peerInfo.multiaddrs.toArray()
transports.forEach((transport) => {
if (transport.filter(multiaddrs).length > 0) {
this.switch.transport.add(
transport.tag || transport.constructor.name, transport)
} else if (WebSockets.isWebSockets(transport)) {
// TODO find a cleaner way to signal that a transport is always
// used for dialing, even if no listener
ws = transport
this._modules.transport.forEach((Transport) => {
let t
if (typeof Transport === 'function') {
t = new Transport()
} else {
t = Transport
}
if (t.filter(multiaddrs).length > 0) {
this._switch.transport.add(t.tag || t.constructor.name, t)
} else if (WebSockets.isWebSockets(t)) {
// TODO find a cleaner way to signal that a transport is always used
// for dialing, even if no listener
ws = t
}
this._transport.push(t)
})
series([
(cb) => this.switch.start(cb),
(cb) => this._switch.start(cb),
(cb) => {
if (ws) {
// always add dialing on websockets
this.switch.transport.add(ws.tag || ws.constructor.name, ws)
this._switch.transport.add(ws.tag || ws.constructor.name, ws)
}
// all transports need to be setup before discover starts
if (this.modules.discovery) {
return each(this.modules.discovery, (d, cb) => d.start(cb), cb)
if (this._modules.peerDiscovery && this._config.peerDiscovery) {
each(this._modules.peerDiscovery, (D, _cb) => {
// If enabled then start it
if (this._config.peerDiscovery[D.tag].enabled) {
let d
if (typeof D === 'function') {
this._config.peerDiscovery[D.tag].peerInfo = this.peerInfo
d = new D(this._config.peerDiscovery[D.tag])
} else {
d = D
}
d.on('peer', (peerInfo) => this.emit('peer:discovery', peerInfo))
this._discovery.push(d)
d.start(_cb)
} else {
_cb()
}
}, cb)
} else {
cb()
}
cb()
},
(cb) => {
// TODO: chicken-and-egg problem #1:
@ -166,7 +191,7 @@ class Node extends EventEmitter {
(cb) => {
// TODO: chicken-and-egg problem #2:
// have to set started here because FloodSub requires libp2p is already started
if (this._options !== false) {
if (this._floodSub) {
this._floodSub.start(cb)
} else {
cb()
@ -177,13 +202,11 @@ class Node extends EventEmitter {
// detect which multiaddrs we don't have a transport for and remove them
const multiaddrs = this.peerInfo.multiaddrs.toArray()
transports.forEach((transport) => {
multiaddrs.forEach((multiaddr) => {
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
!transports.find((transport) => transport.filter(multiaddr).length > 0)) {
this.peerInfo.multiaddrs.delete(multiaddr)
}
})
multiaddrs.forEach((multiaddr) => {
if (!multiaddr.toString().match(/\/p2p-circuit($|\/)/) &&
!this._transport.find((transport) => transport.filter(multiaddr).length > 0)) {
this.peerInfo.multiaddrs.delete(multiaddr)
}
})
cb()
},
@ -198,17 +221,24 @@ class Node extends EventEmitter {
* Stop the libp2p node by closing its listeners and open connections
*/
stop (callback) {
if (this.modules.discovery) {
this.modules.discovery.forEach((discovery) => {
setImmediate(() => discovery.stop(() => {}))
})
}
series([
(cb) => {
if (this._floodSub.started) {
this._floodSub.stop(cb)
if (this._modules.peerDiscovery) {
// stop all discoveries before continuing with shutdown
return parallel(
this._discovery.map((d) => {
return (_cb) => d.stop(() => { _cb() })
}),
cb
)
}
cb()
},
(cb) => {
if (this._floodSub) {
return this._floodSub.stop(cb)
}
cb()
},
(cb) => {
if (this._dht) {
@ -216,7 +246,7 @@ class Node extends EventEmitter {
}
cb()
},
(cb) => this.switch.stop(cb),
(cb) => this._switch.stop(cb),
(cb) => {
this.emit('stop')
cb()
@ -237,7 +267,7 @@ class Node extends EventEmitter {
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) { return callback(err) }
this.switch.dial(peerInfo, (err) => {
this._switch.dial(peerInfo, (err) => {
if (err) { return callback(err) }
this.peerBook.put(peerInfo)
@ -257,7 +287,7 @@ class Node extends EventEmitter {
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) { return callback(err) }
this.switch.dial(peerInfo, protocol, (err, conn) => {
this._switch.dial(peerInfo, protocol, (err, conn) => {
if (err) { return callback(err) }
this.peerBook.put(peerInfo)
callback(null, conn)
@ -271,25 +301,28 @@ class Node extends EventEmitter {
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) { return callback(err) }
this.switch.hangUp(peerInfo, callback)
this._switch.hangUp(peerInfo, callback)
})
}
ping (peer, callback) {
assert(this.isStarted(), NOT_STARTED_ERROR_MESSAGE)
if (!this.isStarted()) {
return callback(new Error(NOT_STARTED_ERROR_MESSAGE))
}
this._getPeerInfo(peer, (err, peerInfo) => {
if (err) { return callback(err) }
callback(null, new Ping(this.switch, peerInfo))
callback(null, new Ping(this._switch, peerInfo))
})
}
handle (protocol, handlerFunc, matchFunc) {
this.switch.handle(protocol, handlerFunc, matchFunc)
this._switch.handle(protocol, handlerFunc, matchFunc)
}
unhandle (protocol) {
this.switch.unhandle(protocol)
this._switch.unhandle(protocol)
}
}