From 1833ded0f730686aebcba108c177ee31b74f9419 Mon Sep 17 00:00:00 2001 From: David Dias Date: Mon, 21 Sep 2015 16:46:04 +0100 Subject: [PATCH] making progress --- README.md | 2 +- examples/{c.js => peerA.js} | 0 examples/{s.js => peerB.js} | 5 +- src/swarm-old.js | 189 +++++++++++++++++++++++++++++++++ src/swarm.js | 203 +++++++++--------------------------- 5 files changed, 242 insertions(+), 157 deletions(-) rename examples/{c.js => peerA.js} (100%) rename examples/{s.js => peerB.js} (87%) create mode 100644 src/swarm-old.js diff --git a/README.md b/README.md index 4fcf3e68..89103343 100644 --- a/README.md +++ b/README.md @@ -56,7 +56,7 @@ sw.addStreamMuxer(streamMuxer, [options]) ### Dial to another peer ```JavaScript -sw.dial(PeerInfo, protocol, options) +sw.dial(PeerInfo, options, protocol) sw.dial(PeerInfo, options) ``` diff --git a/examples/c.js b/examples/peerA.js similarity index 100% rename from examples/c.js rename to examples/peerA.js diff --git a/examples/s.js b/examples/peerB.js similarity index 87% rename from examples/s.js rename to examples/peerB.js index 856730d0..c480983a 100644 --- a/examples/s.js +++ b/examples/peerB.js @@ -1,6 +1,7 @@ -// var Identify = require('./../src/identify') var Swarm = require('./../src') -// var Peer = require('ipfs-peer') + + +var Peer = require('ipfs-peer') // var Id = require('ipfs-peer-id') // var multiaddr = require('multiaddr') diff --git a/src/swarm-old.js b/src/swarm-old.js new file mode 100644 index 00000000..9a27bdad --- /dev/null +++ b/src/swarm-old.js @@ -0,0 +1,189 @@ +var tcp = require('net') +var Select = require('multistream-select').Select +var Interactive = require('multistream-select').Interactive +var Muxer = require('./stream-muxer') +var log = require('ipfs-logger').group('swarm') +var async = require('async') +var EventEmitter = require('events').EventEmitter +var util = require('util') + +exports = module.exports = Swarm + +util.inherits(Swarm, EventEmitter) + +function Swarm () { + var self = this + + if (!(self instanceof Swarm)) { + throw new Error('Swarm must be called with new') + } + + self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 + self.connections = {} // {peerIdB58: {conn: <>, socket: <>} + self.handles = {} + + // set the listener + + self.listen = function (port, ready) { + if (!ready) { + ready = function noop () {} + } + if (typeof port === 'function') { + ready = port + } else if (port) { self.port = port } + + // + + self.listener = tcp.createServer(function (socket) { + errorUp(self, socket) + var ms = new Select() + ms.handle(socket) + ms.addHandler('/spdy/3.1.0', function (ds) { + log.info('Negotiated spdy with incoming socket') + + var conn = new Muxer().attach(ds, true) + + // attach multistream handlers to incoming streams + + conn.on('stream', registerHandles) + errorUp(self, conn) + + // FOR IDENTIFY + self.emit('connection-unknown', conn, socket) + + // IDENTIFY DOES THIS FOR US + // conn.on('close', function () { delete self.connections[conn.peerId] }) + }) + }).listen(self.port, ready) + errorUp(self, self.listener) + } + + // interface + + // open stream account for connection reuse + self.openConnection = function (peer, cb) { + // If no connection open yet, open it + if (!self.connections[peer.id.toB58String()]) { + // Establish a socket with one of the addresses + var socket + async.eachSeries(peer.multiaddrs, function (multiaddr, next) { + if (socket) { return next() } + + var tmp = tcp.connect(multiaddr.toOptions(), function () { + socket = tmp + errorUp(self, socket) + next() + }) + + tmp.once('error', function (err) { + log.warn(multiaddr.toString(), 'on', peer.id.toB58String(), 'not available', err) + next() + }) + + }, function done () { + if (!socket) { + return cb(new Error('Not able to open a scoket with peer - ', + peer.id.toB58String())) + } + gotSocket(socket) + }) + } else { + cb() + } + + // do the spdy people dance (multistream-select into spdy) + function gotSocket (socket) { + var msi = new Interactive() + msi.handle(socket, function () { + msi.select('/spdy/3.1.0', function (err, ds) { + if (err) { cb(err) } + + var conn = new Muxer().attach(ds, false) + conn.on('stream', registerHandles) + self.connections[peer.id.toB58String()] = { + conn: conn, + socket: socket + } + conn.on('close', function () { delete self.connections[peer.id.toB58String()]}) + errorUp(self, conn) + + cb() + }) + }) + } + } + + self.openStream = function (peer, protocol, cb) { + self.openConnection(peer, function (err) { + if (err) { + return cb(err) + } + // spawn new muxed stream + var conn = self.connections[peer.id.toB58String()].conn + conn.dialStream(function (err, stream) { + if (err) { return cb(err) } + errorUp(self, stream) + // negotiate desired protocol + var msi = new Interactive() + msi.handle(stream, function () { + msi.select(protocol, function (err, ds) { + if (err) { return cb(err) } + peer.lastSeen = new Date() + cb(null, ds) // return the stream + }) + }) + }) + }) + } + + self.registerHandler = function (protocol, handlerFunc) { + if (self.handles[protocol]) { + return handlerFunc(new Error('Handle for protocol already exists', protocol)) + } + self.handles[protocol] = handlerFunc + log.info('Registered handler for protocol:', protocol) + } + + self.closeConns = function (cb) { + var keys = Object.keys(self.connections) + var number = keys.length + if (number === 0) { cb() } + var c = new Counter(number, cb) + + keys.forEach(function (key) { + self.connections[key].conn.end() + c.hit() + }) + } + + self.closeListener = function (cb) { + self.listener.close(cb) + } + + function registerHandles (stream) { + log.info('Registering protocol handlers on new stream') + errorUp(self, stream) + var msH = new Select() + msH.handle(stream) + Object.keys(self.handles).forEach(function (protocol) { + msH.addHandler(protocol, self.handles[protocol]) + }) + } + +} + +function errorUp (self, emitter) { + emitter.on('error', function (err) { + self.emit('error', err) + }) +} + +function Counter (target, callback) { + var c = 0 + this.hit = count + + function count () { + c += 1 + if (c === target) { callback() } + } +} diff --git a/src/swarm.js b/src/swarm.js index 9a27bdad..4d792999 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -1,189 +1,84 @@ -var tcp = require('net') -var Select = require('multistream-select').Select -var Interactive = require('multistream-select').Interactive -var Muxer = require('./stream-muxer') -var log = require('ipfs-logger').group('swarm') -var async = require('async') -var EventEmitter = require('events').EventEmitter -var util = require('util') exports = module.exports = Swarm -util.inherits(Swarm, EventEmitter) - -function Swarm () { +function Swarm (peerInfo) { var self = this if (!(self instanceof Swarm)) { throw new Error('Swarm must be called with new') } - self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 - self.connections = {} // {peerIdB58: {conn: <>, socket: <>} - self.handles = {} + self.peerInfo = peerInfo - // set the listener + // peerIdB58: { conn: } + self.conns = {} - self.listen = function (port, ready) { - if (!ready) { - ready = function noop () {} - } - if (typeof port === 'function') { - ready = port - } else if (port) { self.port = port } + // peerIdB58: { muxer: } + self.muxedConns = {} - // + // transportName: { transport: transport, + // dialOptions: dialOptions, + // listenOptions: listenOptions, + // listeners: [] } + self.transports = {} - self.listener = tcp.createServer(function (socket) { - errorUp(self, socket) - var ms = new Select() - ms.handle(socket) - ms.addHandler('/spdy/3.1.0', function (ds) { - log.info('Negotiated spdy with incoming socket') + self.listeners = {} - var conn = new Muxer().attach(ds, true) + // public interface - // attach multistream handlers to incoming streams + self.addTransport = function (transport, options, dialOptions, listenOptions, callback) { + // set up the transport and add the list of incoming streams + // add transport to the list of transports - conn.on('stream', registerHandles) - errorUp(self, conn) + var listener = transport.createListener(options, listen) - // FOR IDENTIFY - self.emit('connection-unknown', conn, socket) - - // IDENTIFY DOES THIS FOR US - // conn.on('close', function () { delete self.connections[conn.peerId] }) - }) - }).listen(self.port, ready) - errorUp(self, self.listener) - } - - // interface - - // open stream account for connection reuse - self.openConnection = function (peer, cb) { - // If no connection open yet, open it - if (!self.connections[peer.id.toB58String()]) { - // Establish a socket with one of the addresses - var socket - async.eachSeries(peer.multiaddrs, function (multiaddr, next) { - if (socket) { return next() } - - var tmp = tcp.connect(multiaddr.toOptions(), function () { - socket = tmp - errorUp(self, socket) - next() - }) - - tmp.once('error', function (err) { - log.warn(multiaddr.toString(), 'on', peer.id.toB58String(), 'not available', err) - next() - }) - - }, function done () { - if (!socket) { - return cb(new Error('Not able to open a scoket with peer - ', - peer.id.toB58String())) - } - gotSocket(socket) - }) - } else { - cb() - } - - // do the spdy people dance (multistream-select into spdy) - function gotSocket (socket) { - var msi = new Interactive() - msi.handle(socket, function () { - msi.select('/spdy/3.1.0', function (err, ds) { - if (err) { cb(err) } - - var conn = new Muxer().attach(ds, false) - conn.on('stream', registerHandles) - self.connections[peer.id.toB58String()] = { - conn: conn, - socket: socket - } - conn.on('close', function () { delete self.connections[peer.id.toB58String()]}) - errorUp(self, conn) - - cb() - }) - }) - } - } - - self.openStream = function (peer, protocol, cb) { - self.openConnection(peer, function (err) { - if (err) { - return cb(err) + listener.listen(listenOptions, function ready () { + self.transports[options.name] = { + transport: transport, + dialOptions: dialOptions, + listenOptions: listenOptions, + listeners: [listener] } - // spawn new muxed stream - var conn = self.connections[peer.id.toB58String()].conn - conn.dialStream(function (err, stream) { - if (err) { return cb(err) } - errorUp(self, stream) - // negotiate desired protocol - var msi = new Interactive() - msi.handle(stream, function () { - msi.select(protocol, function (err, ds) { - if (err) { return cb(err) } - peer.lastSeen = new Date() - cb(null, ds) // return the stream - }) - }) - }) + + // If a known multiaddr is passed, then add to our list of multiaddrs + if (options.multiaddr) { + self.peerInfo.multiaddrs.push(options.multiaddr) + } + + callback() }) } - self.registerHandler = function (protocol, handlerFunc) { - if (self.handles[protocol]) { - return handlerFunc(new Error('Handle for protocol already exists', protocol)) - } - self.handles[protocol] = handlerFunc - log.info('Registered handler for protocol:', protocol) + self.addUpgrade = function (ConnUpgrade, options) { + } - self.closeConns = function (cb) { - var keys = Object.keys(self.connections) - var number = keys.length - if (number === 0) { cb() } - var c = new Counter(number, cb) + self.addStreamMuxer = function (StreamMuxer, options) { - keys.forEach(function (key) { - self.connections[key].conn.end() - c.hit() - }) } - self.closeListener = function (cb) { - self.listener.close(cb) + self.dial = function (peerInfo, options, protocol, callback) { + // 1. check if we have transports we support } - function registerHandles (stream) { - log.info('Registering protocol handlers on new stream') - errorUp(self, stream) - var msH = new Select() - msH.handle(stream) - Object.keys(self.handles).forEach(function (protocol) { - msH.addHandler(protocol, self.handles[protocol]) - }) + self.closeListener = function (transportName, callback) { + // close a specific listener + // remove it from available transports } -} + self.close = function (callback) { + // close everything + } -function errorUp (self, emitter) { - emitter.on('error', function (err) { - self.emit('error', err) - }) -} + self.handleProtocol = function (protocol, handlerFunction) { -function Counter (target, callback) { - var c = 0 - this.hit = count + } - function count () { - c += 1 - if (c === target) { callback() } + // internals + + function listen (conn) { + console.log('Received new connection') + // apply upgrades + // then add it to the multistreamHandler } }