From 5d0b25cb00f640de3ddaa8b7558c9ffbf1cfdd9c Mon Sep 17 00:00:00 2001 From: David Dias Date: Sat, 26 Nov 2016 03:07:52 +0100 Subject: [PATCH] feat: create base libp2p.Node class --- package.json | 8 ++ src/index.js | 241 ++++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 239 insertions(+), 10 deletions(-) diff --git a/package.json b/package.json index de29273e..45c8fc34 100644 --- a/package.json +++ b/package.json @@ -37,5 +37,13 @@ "chai": "^3.5.0", "aegir": "^9.1.2", "pre-commit": "^1.1.1" + }, + "dependencies": { + "libp2p-swarm": "^0.26.3", + "mafmt": "^2.1.2", + "multiaddr": "^2.1.1", + "peer-book": "^0.3.0", + "peer-id": "^0.8.0", + "peer-info": "^0.8.1" } } diff --git a/src/index.js b/src/index.js index d46f08e6..b70e1233 100644 --- a/src/index.js +++ b/src/index.js @@ -1,19 +1,240 @@ 'use strict' -exports = module.exports = Libp2p +const Swarm = require('libp2p-swarm') +const PeerId = require('peer-id') +const PeerInfo = require('peer-info') +const PeerBook = require('peer-book') +const multiaddr = require('multiaddr') +const mafmt = require('mafmt') +const EE = require('events').EventEmitter +const assert = require('assert') -function Libp2p (swarm, peerRouting, recordStore) { - var self = this +exports = module.exports - if (!(self instanceof Libp2p)) { - throw new Error('Must be called with new') +const OFFLINE_ERROR_MESSAGE = 'The libp2p node is not started yet' +const IPFS_CODE = 421 + +class Node { + constructor (_modules, _peerInfo, _peerBook) { + assert(_modules, 'requires modules to equip libp2p with features') + assert(_peerInfo, 'requires a PeerInfo instance') + + this.modules = _modules + this.peerInfo = _peerInfo + this.peerBook = _peerBook || new PeerBook() + this.isOnline = false + + this.discovery = new EE() + + this.swarm = new Swarm(this.peerInfo) + + // Attach stream multiplexers + if (this.modules.connection.muxer) { + let muxers = this.modules.connection.muxer + muxers = Array.isArray(muxers) ? muxers : [muxers] + muxers.forEach((muxer) => { + this.swarm.connection.addStreamMuxer(muxer) + }) + + // If muxer exists, we can use Identify + this.swarm.connection.reuse() + + // Received incommind dial and muxer upgrade happened, reuse this + // muxed connection + this.swarm.on('peer-mux-established', (peerInfo) => { + this.peerBook.put(peerInfo) + }) + + this.swarm.on('peer-mux-closed', (peerInfo) => { + this.peerBook.removeByB58String(peerInfo.id.toB58String()) + }) + } + + // Attach crypto channels + if (this.modules.connection.crypto) { + let cryptos = this.modules.connection.crypto + cryptos = Array.isArray(cryptos) ? cryptos : [cryptos] + cryptos.forEach((crypto) => { + this.swarm.connection.crypto(crypto.tag, crypto.encrypt) + }) + } + + // Attach discovery mechanisms + if (this.discovery) { + let discoveries = this.modules.discovery + discoveries = Array.isArray(discoveries) ? discoveries : [discoveries] + discoveries.forEach((discovery) => { + discovery.on('peer', (peerInfo) => { + this.discovery.emit('peer', peerInfo) + }) + }) + } + + // Not fully implemented in js-libp2p yet + this.routing = undefined + this.records = undefined } - if (!swarm) { - throw new Error('a swarm must be passed') + /* + * Start the libp2p node + * - create listeners on the multiaddrs the Peer wants to listen + */ + start (callback) { + if (!this.modules.transport) { + return callback(new Error('no transports were present')) + } + let transports = this.modules.transport + transports = Array.isArray(transports) ? transports : [transports] + const multiaddrs = this.peerInfo.multiaddrs + transports.forEach((transport) => { + if (transport.filter(multiaddrs).length > 0) { + this.swarm.transport.add( + transport.tag || transport.constructor.name, transport) + } + }) + + this.swarm.listen((err) => { + if (err) { + return callback(err) + } + + this.isOnline = true + callback() + }) } - self.swarm = swarm - self.routing = peerRouting - self.record = recordStore + /* + * Stop the libp2p node by closing its listeners and open connections + */ + stop (callback) { + this.isOnline = false + this.swarm.close(callback) + } + + // + // Dialing methods + // + + // TODO + dialById (id, protocol, callback) { + // NOTE: dialById only works if a previous dial was made. This will + // change once we have PeerRouting + + assert(this.isOnline, OFFLINE_ERROR_MESSAGE) + + if (typeof protocol === 'function') { + callback = protocol + protocol = undefined + } + + callback(new Error('not implemented yet')) + } + + dialByMultiaddr (maddr, protocol, callback) { + assert(this.isOnline, OFFLINE_ERROR_MESSAGE) + + if (typeof protocol === 'function') { + callback = protocol + protocol = undefined + } + + if (typeof maddr === 'string') { + maddr = multiaddr(maddr) + } + + if (!mafmt.IPFS.matches(maddr.toString())) { + return callback(new Error('multiaddr not valid')) + } + + const ipfsIdB58String = maddr.stringTuples().filter((tuple) => { + if (tuple[0] === IPFS_CODE) { + return true + } + })[0][1] + + let peer + try { + peer = this.peerBook.getByB58String(ipfsIdB58String) + } catch (err) { + peer = new PeerInfo(PeerId.createFromB58String(ipfsIdB58String)) + } + + peer.multiaddr.add(maddr) + this.dialByPeerInfo(peer, protocol, callback) + } + + dialByPeerInfo (peer, protocol, callback) { + assert(this.isOnline, OFFLINE_ERROR_MESSAGE) + + if (typeof protocol === 'function') { + callback = protocol + protocol = undefined + } + + this.swarm.dial(peer, protocol, (err, conn) => { + if (err) { + return callback(err) + } + this.peerBook.put(peer) + callback(null, conn) + }) + } + + // + // Disconnecting (hangUp) methods + // + + hangUpById (id, callback) { + // TODO + callback(new Error('not implemented yet')) + } + + hangUpByMultiaddr (maddr, callback) { + assert(this.isOnline, OFFLINE_ERROR_MESSAGE) + + if (typeof maddr === 'string') { + maddr = multiaddr(maddr) + } + + if (!mafmt.IPFS.matches(maddr.toString())) { + return callback(new Error('multiaddr not valid')) + } + + const ipfsIdB58String = maddr.stringTuples().filter((tuple) => { + if (tuple[0] === IPFS_CODE) { + return true + } + })[0][1] + + try { + const pi = this.peerBook.getByB58String(ipfsIdB58String) + this.hangUpByPeerInfo(pi, callback) + } catch (err) { + // already disconnected + callback() + } + } + + hangUpByPeerInfo (peer, callback) { + assert(this.isOnline, OFFLINE_ERROR_MESSAGE) + + this.peerBook.removeByB58String(peer.id.toB58String()) + this.swarm.hangUp(peer, callback) + } + + // + // Protocol multiplexing handling + // + + handle (protocol, handlerFunc, matchFunc) { + this.swarm.handle(protocol, handlerFunc, matchFunc) + } + + unhandle (protocol) { + this.swarm.unhandle(protocol) + } +} + +module.exports = { + Node: Node }