Identify working with protobufs and observed addrs

This commit is contained in:
David Dias
2015-07-17 12:05:02 -07:00
parent 6f4011a3cf
commit 5a4d9ee4ed
7 changed files with 90 additions and 126 deletions

View File

@ -34,9 +34,11 @@
}, },
"dependencies": { "dependencies": {
"async": "^1.3.0", "async": "^1.3.0",
"ip-address": "^4.0.0",
"multiaddr": "^1.0.0", "multiaddr": "^1.0.0",
"multiplex-stream-muxer": "^0.2.0", "multiplex-stream-muxer": "^0.2.0",
"multistream-select": "^0.6.1", "multistream-select": "^0.6.1",
"protocol-buffers-stream": "^1.2.0",
"spdy-stream-muxer": "^0.2.0" "spdy-stream-muxer": "^0.2.0"
} }
} }

View File

@ -1,89 +0,0 @@
/*
* Identify is one of the protocols swarms speaks in order to broadcast and learn
* about the ip:port pairs a specific peer is available through
*/
var Interactive = require('multistream-select').Interactive
var EventEmmiter = require('events').EventEmitter
var util = require('util')
exports = module.exports = Identify
util.inherits(Identify, EventEmmiter)
function Identify (swarm, peerSelf) {
var self = this
swarm.registerHandler('/ipfs/identify/1.0.0', function (stream) {
var identifyMsg = {}
identifyMsg = {}
identifyMsg.sender = exportPeer(peerSelf)
// TODO (daviddias) populate with the way I see the other peer
// identifyMsg.receiver =
stream.write(JSON.stringify(identifyMsg))
var answer = ''
stream.on('data', function (chunk) {
answer += chunk.toString()
})
stream.on('end', function () {
self.emit('peer-update', answer)
})
stream.end()
// receive their info and how they see us
// send back our stuff
})
swarm.on('connection-unknown', function (conn) {
conn.dialStream(function (err, stream) {
if (err) {
return console.log(err)
}
var msi = new Interactive()
msi.handle(stream, function () {
msi.select('/ipfs/identify/1.0.0', function (err, ds) {
if (err) { return console.log(err) }
var identifyMsg = {}
identifyMsg = {}
identifyMsg.sender = exportPeer(peerSelf)
// TODO (daviddias) populate with the way I see the other peer
stream.write(JSON.stringify(identifyMsg))
var answer = ''
stream.on('data', function (chunk) {
answer = answer + chunk.toString()
})
stream.on('end', function () {
answer = JSON.parse(answer)
swarm.connections[answer.sender.id] = conn
self.emit('peer-update', answer)
})
stream.end()
})
})
})
// open a spdy stream
// do the multistream handshake
// send them our data
})
function exportPeer (peer) {
return {
id: peer.id.toB58String(),
multiaddrs: peer.multiaddrs.map(function (mh) {
return mh.toString()
})
}
}
}

View File

@ -18,7 +18,7 @@ message Identify {
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives // oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
// this is useful information to convey to the other side, as it helps the remote endpoint // this is useful information to convey to the other side, as it helps the remote endpoint
// determine whether its connection to the local peer goes through NAT. // determine whether its connection to the local peer goes through NAT.
// optional bytes observedAddr = 4; optional bytes observedAddr = 4;
// (DEPRECATED) protocols are the services this node is running // (DEPRECATED) protocols are the services this node is running
// repeated string protocols = 3; // repeated string protocols = 3;

View File

@ -9,6 +9,9 @@ var util = require('util')
var protobufs = require('protocol-buffers-stream') var protobufs = require('protocol-buffers-stream')
var fs = require('fs') var fs = require('fs')
var schema = fs.readFileSync(__dirname + '/identify.proto') var schema = fs.readFileSync(__dirname + '/identify.proto')
var v6 = require('ip-address').v6
var Id = require('ipfs-peer-id')
var multiaddr = require('multiaddr')
exports = module.exports = Identify exports = module.exports = Identify
@ -22,28 +25,32 @@ function Identify (swarm, peerSelf) {
var ps = self.createProtoStream() var ps = self.createProtoStream()
ps.on('identify', function (msg) { ps.on('identify', function (msg) {
console.log('RECEIVED PROTOBUF - ', msg) // console.log('RECEIVED PROTOBUF - ', msg)
// 1. wrap the msg updateSelf(peerSelf, msg.observedAddr)
// 2. create a Peer obj using the publick key to derive the ID
// 3. populate it with observedAddr
// 4. maybe emit 2 peers update to update the other peer and also ourselfs?
self.emit('peer-update', {})
})
ps.identify({ var peerId = Id.createFromPubKey(msg.publicKey)
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerSelf.id.pubKey,
listenAddrs: peerSelf.multiaddrs
// observedAddr: new Buffer()
})
var socket = swarm.connections[peerId.toB58String()].socket
var mh = getMultiaddr(socket)
ps.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerSelf.id.pubKey,
listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}),
observedAddr: mh.buffer
})
self.emit('peer-update', {
peerId: peerId,
listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
})
ps.finalize()
})
ps.pipe(stream).pipe(ps) ps.pipe(stream).pipe(ps)
// TODO(daviddias) ps.end() based on https://github.com/mafintosh/protocol-buffers-stream/issues/1
}) })
swarm.on('connection-unknown', function (conn) { swarm.on('connection-unknown', function (conn, socket) {
conn.dialStream(function (err, stream) { conn.dialStream(function (err, stream) {
if (err) { return console.log(err) } if (err) { return console.log(err) }
var msi = new Interactive() var msi = new Interactive()
@ -54,28 +61,67 @@ function Identify (swarm, peerSelf) {
var ps = self.createProtoStream() var ps = self.createProtoStream()
ps.on('identify', function (msg) { ps.on('identify', function (msg) {
console.log('RECEIVED PROTOBUF - ', msg) // console.log('RECEIVED PROTOBUF - SIDE ZZ ', msg)
// 1. wrap the msg var peerId = Id.createFromPubKey(msg.publicKey)
// 2. create a Peer obj using the publick key to derive the ID
// 3. populate it with observedAddr updateSelf(peerSelf, msg.observedAddr)
// 4. maybe emit 2 peers update to update the other peer and also ourselfs?
// 5. add the conn to connections list -> swarm.connections[otherPeerId] = conn swarm.connections[peerId.toB58String()] = {
self.emit('peer-update', {}) conn: conn,
socket: socket
}
self.emit('peer-update', {
peerId: peerId,
listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
})
}) })
var mh = getMultiaddr(socket)
ps.identify({ ps.identify({
protocolVersion: 'na', protocolVersion: 'na',
agentVersion: 'na', agentVersion: 'na',
publicKey: peerSelf.id.pubKey, publicKey: peerSelf.id.pubKey,
listenAddrs: peerSelf.multiaddrs listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}),
// observedAddr: new Buffer() observedAddr: mh.buffer
}) })
ps.pipe(ds).pipe(ps) ps.pipe(ds).pipe(ps)
ps.finalize()
// TODO(daviddias) ps.end() based on https://github.com/mafintosh/protocol-buffers-stream/issues/1
}) })
}) })
}) })
}) })
} }
function getMultiaddr (socket) {
var mh
if (~socket.remoteAddress.indexOf(':')) {
var addr = new v6.Address(socket.remoteAddress)
if (addr.v4) {
var ip4 = socket.remoteAddress.split(':')[3]
mh = multiaddr('/ip4/' + ip4 + '/tcp/' + socket.remotePort)
} else {
mh = multiaddr('/ip6/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
}
} else {
mh = multiaddr('/ip4/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
}
return mh
}
function updateSelf (peerSelf, observedAddr) {
var omh = multiaddr(observedAddr)
var isIn = false
peerSelf.multiaddrs.forEach(function (mh) {
if (mh.toString() === omh.toString()) {
isIn = true
}
})
if (!isIn) {
peerSelf.multiaddrs.push(omh)
}
}

View File

@ -21,4 +21,4 @@ ps.identify({
ps.pipe(ps) ps.pipe(ps)
// ps.end() ps.end()

View File

@ -19,7 +19,7 @@ function Swarm () {
} }
self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001
self.connections = {} self.connections = {} // {conn: <>, socket: <>}
self.handles = [] self.handles = []
// set the listener // set the listener
@ -49,7 +49,7 @@ function Swarm () {
errorUp(self, conn) errorUp(self, conn)
// FOR IDENTIFY // FOR IDENTIFY
self.emit('connection-unknown', conn) self.emit('connection-unknown', conn, socket)
// IDENTIFY DOES THIS FOR US // IDENTIFY DOES THIS FOR US
// conn.on('close', function () { delete self.connections[conn.peerId] }) // conn.on('close', function () { delete self.connections[conn.peerId] })
@ -100,7 +100,10 @@ function Swarm () {
var conn = new Muxer().attach(ds, false) var conn = new Muxer().attach(ds, false)
conn.on('stream', registerHandles) conn.on('stream', registerHandles)
self.connections[peer.id.toB58String()] = conn self.connections[peer.id.toB58String()] = {
conn: conn,
socket: socket
}
conn.on('close', function () { delete self.connections[peer.id.toB58String()] }) conn.on('close', function () { delete self.connections[peer.id.toB58String()] })
errorUp(self, conn) errorUp(self, conn)
@ -111,7 +114,7 @@ function Swarm () {
function createStream (peer, protocol, cb) { function createStream (peer, protocol, cb) {
// spawn new muxed stream // spawn new muxed stream
var conn = self.connections[peer.id.toB58String()] var conn = self.connections[peer.id.toB58String()].conn
conn.dialStream(function (err, stream) { conn.dialStream(function (err, stream) {
if (err) { return cb(err) } if (err) { return cb(err) }
errorUp(self, stream) errorUp(self, stream)
@ -143,7 +146,7 @@ function Swarm () {
keys.map(function (key) { keys.map(function (key) {
c.hit() c.hit()
self.connections[key].end() self.connections[key].conn.end()
}) })
} }

View File

@ -103,6 +103,8 @@ experiment('IDENTIFY', function () {
}) })
test('Attach Identify, open a stream, reuse stream', function (done) { test('Attach Identify, open a stream, reuse stream', function (done) {
console.log('\n\n\n')
var protocol = '/sparkles/3.3.3' var protocol = '/sparkles/3.3.3'
var identifyA = new Identify(swarmA, peerA) var identifyA = new Identify(swarmA, peerA)
@ -111,13 +113,13 @@ experiment('IDENTIFY', function () {
swarmA.registerHandler(protocol, function (stream) {}) swarmA.registerHandler(protocol, function (stream) {})
swarmB.registerHandler(protocol, function (stream) {}) swarmB.registerHandler(protocol, function (stream) {})
swarmA.openStream(peerB, protocol, function theOTHER (err, stream) { swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error) expect(err).to.not.be.instanceof(Error)
}) })
identifyB.on('peer-update', function (answer) { identifyB.on('peer-update', function (answer) {
expect(Object.keys(swarmB.connections).length).to.equal(1) expect(Object.keys(swarmB.connections).length).to.equal(1)
swarmB.openStream(peerA, protocol, function theCALLBACK (err, stream) { swarmB.openStream(peerA, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error) expect(err).to.not.be.instanceof(Error)
expect(Object.keys(swarmB.connections).length).to.equal(1) expect(Object.keys(swarmB.connections).length).to.equal(1)
done() done()