diff --git a/src/identify/index.js b/src/identify/index.js index 08a09bb4..a37b4a04 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -4,93 +4,111 @@ */ var Interactive = require('multistream-select').Interactive -var EventEmmiter = require('events').EventEmitter -var util = require('util') var protobufs = require('protocol-buffers-stream') var fs = require('fs') var schema = fs.readFileSync(__dirname + '/identify.proto') var v6 = require('ip-address').v6 -var Id = require('ipfs-peer-id') +var Id = require('peer-id') var multiaddr = require('multiaddr') -exports = module.exports = Identify +exports = module.exports = identify -util.inherits(Identify, EventEmmiter) +var protoId = '/ipfs/identify/1.0.0' -function Identify (swarm, peerSelf) { - var self = this - self.createProtoStream = protobufs(schema) +exports.protoId = protoId +var createProtoStream = protobufs(schema) - swarm.registerHandler('/ipfs/identify/1.0.0', function (stream) { - var ps = self.createProtoStream() +function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { + var msi = new Interactive() + msi.handle(conn, function () { + msi.select(protoId, function (err, ds) { + if (err) { + return console.log(err) // TODO Treat error + } - ps.on('identify', function (msg) { - updateSelf(peerSelf, msg.observedAddr) + var ps = createProtoStream() - var peerId = Id.createFromPubKey(msg.publicKey) + ps.on('identify', function (msg) { + var peerId = Id.createFromPubKey(msg.publicKey) + + updateSelf(peerInfoSelf, msg.observedAddr) + + muxedConns[peerId.toB58String()] = { + muxer: muxer, + socket: socket + } + console.log('do I get back') + + // TODO: Pass the new discovered info about the peer that contacted us + // to something like the Kademlia Router, so the peerInfo for this peer + // is fresh + // - before this was exectued through a event emitter + // self.emit('peer-update', { + // peerId: peerId, + // listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) + // }) + }) - var socket = swarm.connections[peerId.toB58String()].socket var mh = getMultiaddr(socket) + ps.identify({ protocolVersion: 'na', agentVersion: 'na', - publicKey: peerSelf.id.pubKey, - listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}), + publicKey: peerInfoSelf.id.pubKey, + listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { + return mh.buffer + }), observedAddr: mh.buffer }) - self.emit('peer-update', { - peerId: peerId, - listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) + ps.pipe(ds).pipe(ps) + ps.finalize() + }) + }) +} + +exports.getHandlerFunction = function (peerInfoSelf, muxedConns) { + return function (conn) { + // wait for the other peer to identify itself + // update our multiaddr with observed addr list + // then get the socket from our list of muxedConns and send the reply back + + var ps = createProtoStream() + + ps.on('identify', function (msg) { + updateSelf(peerInfoSelf, msg.observedAddr) + + var peerId = Id.createFromPubKey(msg.publicKey) + + var socket = muxedConns[peerId.toB58String()].socket + + var mh = getMultiaddr(socket) + + ps.identify({ + protocolVersion: 'na', + agentVersion: 'na', + publicKey: peerInfoSelf.id.pubKey, + listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) { + return mh.buffer + }), + observedAddr: mh.buffer }) + // TODO: Pass the new discovered info about the peer that contacted us + // to something like the Kademlia Router, so the peerInfo for this peer + // is fresh + // - before this was exectued through a event emitter + // self.emit('peer-update', { + // peerId: peerId, + // listenAddrs: msg.listenAddrs.map(function (mhb) { + // return multiaddr(mhb) + // }) + // }) + ps.finalize() }) - ps.pipe(stream).pipe(ps) - }) - - swarm.on('connection-unknown', function (conn, socket) { - conn.dialStream(function (err, stream) { - if (err) { return console.log(err) } - var msi = new Interactive() - msi.handle(stream, function () { - msi.select('/ipfs/identify/1.0.0', function (err, ds) { - if (err) { return console.log(err) } - - var ps = self.createProtoStream() - - ps.on('identify', function (msg) { - var peerId = Id.createFromPubKey(msg.publicKey) - - updateSelf(peerSelf, msg.observedAddr) - - swarm.connections[peerId.toB58String()] = { - conn: conn, - socket: socket - } - - self.emit('peer-update', { - peerId: peerId, - listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) - }) - }) - - var mh = getMultiaddr(socket) - - ps.identify({ - protocolVersion: 'na', - agentVersion: 'na', - publicKey: peerSelf.id.pubKey, - listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}), - observedAddr: mh.buffer - }) - - ps.pipe(ds).pipe(ps) - ps.finalize() - }) - }) - }) - }) + ps.pipe(conn).pipe(ps) + } } function getMultiaddr (socket) { diff --git a/src/swarm.js b/src/swarm.js index 873bb8e9..7284738b 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -1,5 +1,6 @@ var multistream = require('multistream-select') var async = require('async') +var identify = require('./identify') exports = module.exports = Swarm @@ -15,7 +16,10 @@ function Swarm (peerInfo) { // peerIdB58: { conn: } self.conns = {} - // peerIdB58: { muxer: } + // peerIdB58: { + // muxer: , + // socket: socket // so we can extract the info we need for identify + // } self.muxedConns = {} // transportName: { transport: transport, @@ -107,7 +111,7 @@ function Swarm (peerInfo) { // check if a stream muxer for this peer is available if (self.muxedConns[peerInfo.id.toB58String()]) { if (protocol) { - return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()]) + return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()].muxer) } else { return callback() } @@ -184,7 +188,10 @@ function Swarm (peerInfo) { muxer.on('stream', userProtocolMuxer) - self.muxedConns[peerInfo.id.toB58String()] = muxer + self.muxedConns[peerInfo.id.toB58String()] = { + muxer: muxer, + socket: conn + } if (protocol) { openMuxedStream(muxer) @@ -245,6 +252,17 @@ function Swarm (peerInfo) { // close everything } + self.enableIdentify = function () { + // set flag to true + // add identify to the list of handled protocols + self.identify = true + + // we pass muxedConns so that identify can access the socket of the other + // peer + self.handleProtocol(identify.protoId, + identify.getHandlerFunction(self.peerInfo, self.muxedConns)) + } + self.handleProtocol = function (protocol, handlerFunction) { self.protocols[protocol] = handlerFunction } @@ -253,7 +271,7 @@ function Swarm (peerInfo) { function listen (conn) { // TODO apply upgrades - // TODO then add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) + // add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) if (self.muxers['spdy']) { var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) @@ -263,6 +281,21 @@ function Swarm (peerInfo) { } // TODO This muxer has to be identified! + // pass to identify a reference of + // our muxedConn list + // ourselves (peerInfo) + // the conn, which is the socket + // and a stream it can send stuff + if (self.identify) { + muxer.dialStream(function (err, stream) { + if (err) { + return console.log(err) // TODO Treat error + } + // conn === socket at this point + console.log('bimbas') + identify(self.muxedConns, self.peerInfo, conn, stream, muxer) + }) + } muxer.on('stream', userProtocolMuxer) }) diff --git a/tests/swarm-test.js b/tests/swarm-test.js index 6ea7a45b..ddf64824 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -358,8 +358,69 @@ experiment('With a SPDY Stream Muxer', function () { }) } }) + test('identify', function (done) { - done() + var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010') + var p1 = new Peer(Id.create(), []) + var sw1 = new Swarm(p1) + sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready) + sw1.addStreamMuxer('spdy', Spdy, {}) + sw1.enableIdentify() + + var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020') + var p2 = new Peer(Id.create(), []) + var sw2 = new Swarm(p2) + sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready) + sw2.addStreamMuxer('spdy', Spdy, {}) + sw2.enableIdentify() + + sw2.handleProtocol('/sparkles/1.0.0', function (conn) { + // formallity so that the conn starts flowing + conn.on('data', function (chunk) {}) + + conn.end() + conn.on('end', function () { + expect(Object.keys(sw1.muxedConns).length).to.equal(1) + + var cleaningCounter = 0 + sw1.closeConns(cleaningUp) + sw2.closeConns(cleaningUp) + + sw1.closeListener('tcp', cleaningUp) + sw2.closeListener('tcp', cleaningUp) + + function cleaningUp () { + cleaningCounter++ + // TODO FIX: here should be 4, but because super wrapping of + // streams, it makes it so hard to properly close the muxed + // streams - https://github.com/indutny/spdy-transport/issues/14 + if (cleaningCounter < 3) { + return + } + // give time for identify to finish + setTimeout(function () { + expect(Object.keys(sw2.muxedConns).length).to.equal(1) + done() + }, 500) + } + }) + }) + + var count = 0 + + function ready () { + count++ + if (count < 2) { + return + } + + sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) { + conn.on('data', function () {}) + expect(err).to.equal(null) + expect(Object.keys(sw1.conns).length).to.equal(0) + conn.end() + }) + } }) }) })