From 990111980b2773b921538c663706cd9f15cdc00e Mon Sep 17 00:00:00 2001 From: David Dias Date: Thu, 10 Mar 2016 20:28:58 +0000 Subject: [PATCH] woot --- examples/peerA.js | 25 ------ examples/peerB.js | 14 ---- package.json | 6 +- src/identify.js | 183 +++++++++++++------------------------------- src/index.js | 34 +++++--- tests/swarm-test.js | 9 ++- 6 files changed, 89 insertions(+), 182 deletions(-) delete mode 100644 examples/peerA.js delete mode 100644 examples/peerB.js diff --git a/examples/peerA.js b/examples/peerA.js deleted file mode 100644 index 9bba4793..00000000 --- a/examples/peerA.js +++ /dev/null @@ -1,25 +0,0 @@ -// var Identify = require('./../src/identify') -var Swarm = require('./../src') -var Peer = require('ipfs-peer') -var Id = require('ipfs-peer-id') -var multiaddr = require('multiaddr') - -var a = new Swarm() -a.port = 4000 -// a.listen() -// var peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)]) - -// attention, peerB Id isn't going to match, but whateves -var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/4001')]) - -// var i = new Identify(a, peerA) -// i.on('thenews', function (news) { -// console.log('such news') -// }) - -a.openStream(peerB, '/ipfs/sparkles/1.2.3', function (err, stream) { - if (err) { - return console.log(err) - } - console.log('WoHoo, dialed a stream') -}) diff --git a/examples/peerB.js b/examples/peerB.js deleted file mode 100644 index 95ca3ff6..00000000 --- a/examples/peerB.js +++ /dev/null @@ -1,14 +0,0 @@ -var Swarm = require('./../src') - -var Peer = require('peer-info') -var Id = require('peer-id') -var multiaddr = require('multiaddr') -var tcp = require('libp2p-tcp') - -var mh = multiaddr('/ip4/127.0.0.1/tcp/8010') -var p = new Peer(Id.create(), []) -var sw = new Swarm(p) - -sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, function () { - console.log('transport added') -}) diff --git a/package.json b/package.json index 8c133589..e2e81a6c 100644 --- a/package.json +++ b/package.json @@ -33,11 +33,11 @@ "chai": "^3.5.0", "istanbul": "^0.4.2", "libp2p-spdy": "^0.2.3", - "libp2p-tcp": "^0.2.1", + "libp2p-tcp": "^0.3.0", "mocha": "^2.4.5", "multiaddr": "^1.1.1", - "peer-id": "^0.5.3", - "peer-info": "^0.5.2", + "peer-id": "^0.6.0", + "peer-info": "^0.6.0", "pre-commit": "^1.1.2", "standard": "^6.0.7", "stream-pair": "^1.0.3" diff --git a/src/identify.js b/src/identify.js index 7bad7eb7..0ef04ae9 100644 --- a/src/identify.js +++ b/src/identify.js @@ -1,166 +1,93 @@ /* * Identify is one of the protocols swarms speaks in order to * broadcast and learn about the ip:port pairs a specific peer - * is available through + * is available through and to know when a new stream muxer is + * established, so a conn can be reused */ -// var multistream = require('multistream-select') -// var protobufs = require('protocol-buffers-stream') -// var fs = require('fs') -// var path = require('path') -// var protobufs = require('protocol-buffers-stream') -// var schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) -// var Address6 = require('ip-address').Address6 -// var Id = require('peer-id') -// var multiaddr = require('multiaddr') +const multistream = require('multistream-select') +const fs = require('fs') +const path = require('path') +const pbStream = require('protocol-buffers-stream')( + fs.readFileSync(path.join(__dirname, 'identify.proto'))) +const Info = require('peer-info') +const Id = require('peer-id') +const multiaddr = require('multiaddr') exports = module.exports - exports.multicodec = '/ipfs/identify/1.0.0' -exports.exec = (muxedConn, callback) => { - // TODO +exports.exec = (rawConn, muxer, peerInfo, callback) => { // 1. open a stream // 2. multistream into identify - // 3. send what I see from this other peer + // 3. send what I see from this other peer (extract fro conn) // 4. receive what the other peer sees from me // 4. callback with (err, peerInfo) -} -exports.handler = (peerInfo) => { - return function (conn) { - // TODO - // 1. receive incoming observed info about me - // 2. send back what I see from the other - } -} + const conn = muxer.newStream() -/* -function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { - var msi = new Interactive() - msi.handle(conn, function () { - msi.select(protoId, function (err, ds) { + var msI = new multistream.Interactive() + msI.handle(conn, () => { + msI.select(exports.multicodec, (err, ds) => { if (err) { - return console.log(err) + return callback(err) } - var ps = createProtoStream() + var pbs = pbStream() - ps.on('identify', function (msg) { - var peerId = Id.createFromPubKey(msg.publicKey) + pbs.on('identify', (msg) => { + peerInfo.multiaddr.addSafe(msg.observedAddr) - updateSelf(peerInfoSelf, msg.observedAddr) + const peerId = Id.createFromPubKey(msg.publicKey) + const otherPeerInfo = new Info(peerId) + msg.listenAddrs.forEach((ma) => { + otherPeerInfo.multiaddr.add(multiaddr(ma)) + }) - muxedConns[peerId.toB58String()] = { - muxer: muxer, - socket: socket - } - - var mh = getMultiaddr(socket) - - ps.identify({ - protocolVersion: 'na', - agentVersion: 'na', - publicKey: peerInfoSelf.id.pubKey, - listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { - return mh.buffer - }), - observedAddr: mh.buffer + callback(null, otherPeerInfo) }) - ps.pipe(ds).pipe(ps) - ps.finalize() + const obsMultiaddr = rawConn.getObservedAddrs()[0] + + pbs.identify({ + protocolVersion: 'na', + agentVersion: 'na', + publicKey: peerInfo.id.pubKey, + listenAddrs: peerInfo.multiaddrs.map((mh) => { return mh.buffer }), + observedAddr: obsMultiaddr ? obsMultiaddr.buffer : null + }) + + pbs.pipe(ds).pipe(pbs) + pbs.finalize() }) }) } -exports.getHandlerFunction = function (peerInfoSelf, muxedConns) { +exports.handler = (peerInfo, swarm) => { return function (conn) { - // wait for the other peer to identify itself - // update our multiaddr with observed addr list - // then get the socket from our list of muxedConns and send the reply back + // 1. receive incoming observed info about me + // 2. update my own information (on peerInfo) + // 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs() + var pbs = pbStream() - var ps = createProtoStream() + pbs.on('identify', function (msg) { + peerInfo.multiaddr.addSafe(msg.observedAddr) - ps.on('identify', function (msg) { - updateSelf(peerInfoSelf, msg.observedAddr) + const peerId = Id.createFromPubKey(msg.publicKey) + const conn = swarm.muxedConns[peerId.toB58String()].conn + const obsMultiaddr = conn.getObservedAddrs()[0] - var peerId = Id.createFromPubKey(msg.publicKey) - - var socket = muxedConns[peerId.toB58String()].socket - - var mh = getMultiaddr(socket) - - ps.identify({ + pbs.identify({ protocolVersion: 'na', agentVersion: 'na', - publicKey: peerInfoSelf.id.pubKey, - listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { - return mh.buffer + publicKey: peerInfo.id.pubKey, + listenAddrs: peerInfo.multiaddrs.map(function (ma) { + return ma.buffer }), - observedAddr: mh.buffer + observedAddr: obsMultiaddr ? obsMultiaddr.buffer : null }) - - // TODO: Pass the new discovered info about the peer that contacted us - // to something like the Kademlia Router, so the peerInfo for this peer - // is fresh - // - before this was exectued through a event emitter - // self.emit('peer-update', { - // peerId: peerId, - // listenAddrs: msg.listenAddrs.map(function (mhb) { - // return multiaddr(mhb) - // }) - // }) - - ps.finalize() + pbs.finalize() }) - ps.pipe(conn).pipe(ps) + pbs.pipe(conn).pipe(pbs) } } - -function getMultiaddr (socket) { - var mh - if (socket.remoteFamily === 'IPv6') { - var addr = new Address6(socket.remoteAddress) - if (addr.v4) { - var ip4 = addr.to4().correctForm() - mh = multiaddr('/ip4/' + ip4 + '/tcp/' + socket.remotePort) - } else { - mh = multiaddr('/ip6/' + socket.remoteAddress + '/tcp/' + socket.remotePort) - } - } else { - mh = multiaddr('/ip4/' + socket.remoteAddress + '/tcp/' + socket.remotePort) - } - return mh -} - -function updateSelf (peerSelf, observedAddr) { - var omh = multiaddr(observedAddr) - - if (!peerSelf.previousObservedAddrs) { - peerSelf.previousObservedAddrs = [] - } - - for (var i = 0; i < peerSelf.previousObservedAddrs.length; i++) { - if (peerSelf.previousObservedAddrs[i].toString() === omh.toString()) { - peerSelf.previousObservedAddrs.splice(i, 1) - addToSelf() - return - } - } - - peerSelf.previousObservedAddrs.push(omh) - - function addToSelf () { - var isIn = false - peerSelf.multiaddrs.forEach(function (mh) { - if (mh.toString() === omh.toString()) { - isIn = true - } - }) - - if (!isIn) { - peerSelf.multiaddrs.push(omh) - } - } -}*/ diff --git a/src/index.js b/src/index.js index 4e3f3f6d..34427f55 100644 --- a/src/index.js +++ b/src/index.js @@ -42,7 +42,7 @@ function Swarm (peerInfo) { multiaddrs = [multiaddrs] } - // TODO a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) + // TODO a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that) // b) if multiaddrs.length = 1, return the conn from the // transport, otherwise, create a passthrough @@ -116,7 +116,7 @@ function Swarm (peerInfo) { // { // peerIdB58: { // muxer: - // rawSocket: socket // to abstract info required for the Identify Protocol + // conn: // to extract info required for the Identify Protocol // } // } this.muxedConns = {} @@ -136,12 +136,19 @@ function Swarm (peerInfo) { // for listening this.handle(muxer.multicodec, (conn) => { const muxedConn = muxer(conn, true) - muxedConn.on('stream', connHandler) + muxedConn.on('stream', (conn) => { + connHandler(conn) + }) + // if identify is enabled, attempt to do it for muxer reuse if (this.identify) { - identify.exec(muxedConn, (err, pi) => { - if (err) {} - // TODO muxedConns[pi.id.toB58String()].muxer = muxedConn + identify.exec(conn, muxedConn, peerInfo, (err, pi) => { + if (err) { + return console.log('Identify exec failed', err) + } + this.muxedConns[pi.id.toB58String()] = {} + this.muxedConns[pi.id.toB58String()].muxer = muxedConn + this.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs }) } }) @@ -151,18 +158,19 @@ function Swarm (peerInfo) { this.identify = false this.connection.reuse = () => { this.identify = true - this.handle(identify.multicodec, identify.handler(peerInfo)) + this.handle(identify.multicodec, identify.handler(peerInfo, this)) } - const self = this // couldn't get rid of this + const self = this // prefered this to bind // incomming connection handler function connHandler (conn) { var msS = new multistream.Select() - msS.handle(conn) - Object.keys(self.protocols).forEach(function (protocol) { + Object.keys(self.protocols).forEach((protocol) => { + if (!protocol) { return } msS.addHandler(protocol, self.protocols[protocol]) }) + msS.handle(conn) } // higher level (public) API @@ -258,11 +266,17 @@ function Swarm (peerInfo) { } else { nextMuxer(muxers.shift()) } + return } const muxedConn = self.muxers[key](conn, false) self.muxedConns[b58Id] = {} self.muxedConns[b58Id].muxer = muxedConn + self.muxedConns[b58Id].conn = conn + + // in case identify is on + muxedConn.on('stream', connHandler) + cb(null, muxedConn) }) }) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 06e11612..13c28894 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -303,6 +303,10 @@ describe('stream muxing (on TCP)', function () { peerB = new Peer() peerC = new Peer() + // console.log('peer A', peerA.id.toB58String()) + // console.log('peer B', peerB.id.toB58String()) + // console.log('peer C', peerC.id.toB58String()) + peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001')) peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002')) peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003')) @@ -386,7 +390,7 @@ describe('stream muxing (on TCP)', function () { }) }) - it.skip('enable identify to reuse incomming muxed conn', (done) => { + it('enable identify to reuse incomming muxed conn', (done) => { swarmA.connection.reuse() swarmC.connection.reuse() @@ -395,7 +399,8 @@ describe('stream muxing (on TCP)', function () { setTimeout(() => { expect(Object.keys(swarmC.muxedConns).length).to.equal(1) expect(Object.keys(swarmA.muxedConns).length).to.equal(2) - }, 100) + done() + }, 500) }) }) })