From 5a4d9ee4ede900b0b10d41945a83815cba1a9193 Mon Sep 17 00:00:00 2001 From: David Dias Date: Fri, 17 Jul 2015 12:05:02 -0700 Subject: [PATCH] Identify working with protobufs and observed addrs --- package.json | 2 + src/identify.js | 89 ------------------------------- src/identify/identify.proto | 2 +- src/identify/index.js | 102 ++++++++++++++++++++++++++---------- src/identify/proto-test.js | 2 +- src/swarm.js | 13 +++-- tests/swarm-test.js | 6 ++- 7 files changed, 90 insertions(+), 126 deletions(-) delete mode 100644 src/identify.js diff --git a/package.json b/package.json index 461f8f47..f7b879aa 100644 --- a/package.json +++ b/package.json @@ -34,9 +34,11 @@ }, "dependencies": { "async": "^1.3.0", + "ip-address": "^4.0.0", "multiaddr": "^1.0.0", "multiplex-stream-muxer": "^0.2.0", "multistream-select": "^0.6.1", + "protocol-buffers-stream": "^1.2.0", "spdy-stream-muxer": "^0.2.0" } } diff --git a/src/identify.js b/src/identify.js deleted file mode 100644 index 5746fa95..00000000 --- a/src/identify.js +++ /dev/null @@ -1,89 +0,0 @@ -/* - * Identify is one of the protocols swarms speaks in order to broadcast and learn - * about the ip:port pairs a specific peer is available through - */ - -var Interactive = require('multistream-select').Interactive -var EventEmmiter = require('events').EventEmitter -var util = require('util') - -exports = module.exports = Identify - -util.inherits(Identify, EventEmmiter) - -function Identify (swarm, peerSelf) { - var self = this - - swarm.registerHandler('/ipfs/identify/1.0.0', function (stream) { - var identifyMsg = {} - identifyMsg = {} - identifyMsg.sender = exportPeer(peerSelf) - // TODO (daviddias) populate with the way I see the other peer - // identifyMsg.receiver = - - stream.write(JSON.stringify(identifyMsg)) - - var answer = '' - - stream.on('data', function (chunk) { - answer += chunk.toString() - }) - - stream.on('end', function () { - self.emit('peer-update', answer) - }) - - stream.end() - - // receive their info and how they see us - // send back our stuff - }) - - swarm.on('connection-unknown', function (conn) { - conn.dialStream(function (err, stream) { - if (err) { - return console.log(err) - } - var msi = new Interactive() - msi.handle(stream, function () { - msi.select('/ipfs/identify/1.0.0', function (err, ds) { - if (err) { return console.log(err) } - var identifyMsg = {} - identifyMsg = {} - identifyMsg.sender = exportPeer(peerSelf) - // TODO (daviddias) populate with the way I see the other peer - - stream.write(JSON.stringify(identifyMsg)) - - var answer = '' - - stream.on('data', function (chunk) { - answer = answer + chunk.toString() - }) - - stream.on('end', function () { - answer = JSON.parse(answer) - - swarm.connections[answer.sender.id] = conn - - self.emit('peer-update', answer) - }) - - stream.end() - }) - }) - }) - // open a spdy stream - // do the multistream handshake - // send them our data - }) - - function exportPeer (peer) { - return { - id: peer.id.toB58String(), - multiaddrs: peer.multiaddrs.map(function (mh) { - return mh.toString() - }) - } - } -} diff --git a/src/identify/identify.proto b/src/identify/identify.proto index 280bc401..e4845aaf 100644 --- a/src/identify/identify.proto +++ b/src/identify/identify.proto @@ -18,7 +18,7 @@ message Identify { // oservedAddr is the multiaddr of the remote endpoint that the sender node perceives // this is useful information to convey to the other side, as it helps the remote endpoint // determine whether its connection to the local peer goes through NAT. - // optional bytes observedAddr = 4; + optional bytes observedAddr = 4; // (DEPRECATED) protocols are the services this node is running // repeated string protocols = 3; diff --git a/src/identify/index.js b/src/identify/index.js index 606c2216..4fbca87a 100644 --- a/src/identify/index.js +++ b/src/identify/index.js @@ -9,6 +9,9 @@ var util = require('util') var protobufs = require('protocol-buffers-stream') var fs = require('fs') var schema = fs.readFileSync(__dirname + '/identify.proto') +var v6 = require('ip-address').v6 +var Id = require('ipfs-peer-id') +var multiaddr = require('multiaddr') exports = module.exports = Identify @@ -22,28 +25,32 @@ function Identify (swarm, peerSelf) { var ps = self.createProtoStream() ps.on('identify', function (msg) { - console.log('RECEIVED PROTOBUF - ', msg) - // 1. wrap the msg - // 2. create a Peer obj using the publick key to derive the ID - // 3. populate it with observedAddr - // 4. maybe emit 2 peers update to update the other peer and also ourselfs? - self.emit('peer-update', {}) - }) + // console.log('RECEIVED PROTOBUF - ', msg) + updateSelf(peerSelf, msg.observedAddr) - ps.identify({ - protocolVersion: 'na', - agentVersion: 'na', - publicKey: peerSelf.id.pubKey, - listenAddrs: peerSelf.multiaddrs - // observedAddr: new Buffer() - }) + var peerId = Id.createFromPubKey(msg.publicKey) + var socket = swarm.connections[peerId.toB58String()].socket + var mh = getMultiaddr(socket) + ps.identify({ + protocolVersion: 'na', + agentVersion: 'na', + publicKey: peerSelf.id.pubKey, + listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}), + observedAddr: mh.buffer + }) + + self.emit('peer-update', { + peerId: peerId, + listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) + }) + + ps.finalize() + }) ps.pipe(stream).pipe(ps) - - // TODO(daviddias) ps.end() based on https://github.com/mafintosh/protocol-buffers-stream/issues/1 }) - swarm.on('connection-unknown', function (conn) { + swarm.on('connection-unknown', function (conn, socket) { conn.dialStream(function (err, stream) { if (err) { return console.log(err) } var msi = new Interactive() @@ -54,28 +61,67 @@ function Identify (swarm, peerSelf) { var ps = self.createProtoStream() ps.on('identify', function (msg) { - console.log('RECEIVED PROTOBUF - ', msg) - // 1. wrap the msg - // 2. create a Peer obj using the publick key to derive the ID - // 3. populate it with observedAddr - // 4. maybe emit 2 peers update to update the other peer and also ourselfs? - // 5. add the conn to connections list -> swarm.connections[otherPeerId] = conn - self.emit('peer-update', {}) + // console.log('RECEIVED PROTOBUF - SIDE ZZ ', msg) + var peerId = Id.createFromPubKey(msg.publicKey) + + updateSelf(peerSelf, msg.observedAddr) + + swarm.connections[peerId.toB58String()] = { + conn: conn, + socket: socket + } + + self.emit('peer-update', { + peerId: peerId, + listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) + }) }) + var mh = getMultiaddr(socket) + ps.identify({ protocolVersion: 'na', agentVersion: 'na', publicKey: peerSelf.id.pubKey, - listenAddrs: peerSelf.multiaddrs - // observedAddr: new Buffer() + listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}), + observedAddr: mh.buffer }) ps.pipe(ds).pipe(ps) - - // TODO(daviddias) ps.end() based on https://github.com/mafintosh/protocol-buffers-stream/issues/1 + ps.finalize() }) }) }) }) } + +function getMultiaddr (socket) { + var mh + if (~socket.remoteAddress.indexOf(':')) { + var addr = new v6.Address(socket.remoteAddress) + if (addr.v4) { + var ip4 = socket.remoteAddress.split(':')[3] + mh = multiaddr('/ip4/' + ip4 + '/tcp/' + socket.remotePort) + } else { + mh = multiaddr('/ip6/' + socket.remoteAddress + '/tcp/' + socket.remotePort) + } + } else { + mh = multiaddr('/ip4/' + socket.remoteAddress + '/tcp/' + socket.remotePort) + } + return mh +} + +function updateSelf (peerSelf, observedAddr) { + var omh = multiaddr(observedAddr) + var isIn = false + peerSelf.multiaddrs.forEach(function (mh) { + if (mh.toString() === omh.toString()) { + isIn = true + } + }) + + if (!isIn) { + peerSelf.multiaddrs.push(omh) + } +} + diff --git a/src/identify/proto-test.js b/src/identify/proto-test.js index 2fd4d88a..f3c4b2c6 100644 --- a/src/identify/proto-test.js +++ b/src/identify/proto-test.js @@ -21,4 +21,4 @@ ps.identify({ ps.pipe(ps) -// ps.end() +ps.end() diff --git a/src/swarm.js b/src/swarm.js index bf5ae5ce..af686386 100644 --- a/src/swarm.js +++ b/src/swarm.js @@ -19,7 +19,7 @@ function Swarm () { } self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 - self.connections = {} + self.connections = {} // {conn: <>, socket: <>} self.handles = [] // set the listener @@ -49,7 +49,7 @@ function Swarm () { errorUp(self, conn) // FOR IDENTIFY - self.emit('connection-unknown', conn) + self.emit('connection-unknown', conn, socket) // IDENTIFY DOES THIS FOR US // conn.on('close', function () { delete self.connections[conn.peerId] }) @@ -100,7 +100,10 @@ function Swarm () { var conn = new Muxer().attach(ds, false) conn.on('stream', registerHandles) - self.connections[peer.id.toB58String()] = conn + self.connections[peer.id.toB58String()] = { + conn: conn, + socket: socket + } conn.on('close', function () { delete self.connections[peer.id.toB58String()] }) errorUp(self, conn) @@ -111,7 +114,7 @@ function Swarm () { function createStream (peer, protocol, cb) { // spawn new muxed stream - var conn = self.connections[peer.id.toB58String()] + var conn = self.connections[peer.id.toB58String()].conn conn.dialStream(function (err, stream) { if (err) { return cb(err) } errorUp(self, stream) @@ -143,7 +146,7 @@ function Swarm () { keys.map(function (key) { c.hit() - self.connections[key].end() + self.connections[key].conn.end() }) } diff --git a/tests/swarm-test.js b/tests/swarm-test.js index b696f94a..f3f3d489 100644 --- a/tests/swarm-test.js +++ b/tests/swarm-test.js @@ -103,6 +103,8 @@ experiment('IDENTIFY', function () { }) test('Attach Identify, open a stream, reuse stream', function (done) { + console.log('\n\n\n') + var protocol = '/sparkles/3.3.3' var identifyA = new Identify(swarmA, peerA) @@ -111,13 +113,13 @@ experiment('IDENTIFY', function () { swarmA.registerHandler(protocol, function (stream) {}) swarmB.registerHandler(protocol, function (stream) {}) - swarmA.openStream(peerB, protocol, function theOTHER (err, stream) { + swarmA.openStream(peerB, protocol, function (err, stream) { expect(err).to.not.be.instanceof(Error) }) identifyB.on('peer-update', function (answer) { expect(Object.keys(swarmB.connections).length).to.equal(1) - swarmB.openStream(peerA, protocol, function theCALLBACK (err, stream) { + swarmB.openStream(peerA, protocol, function (err, stream) { expect(err).to.not.be.instanceof(Error) expect(Object.keys(swarmB.connections).length).to.equal(1) done()