add identify

This commit is contained in:
David Dias 2015-09-23 19:14:29 +01:00
parent 0bcbe63005
commit 2000827273
3 changed files with 181 additions and 69 deletions

View File

@ -4,93 +4,111 @@
*/ */
var Interactive = require('multistream-select').Interactive var Interactive = require('multistream-select').Interactive
var EventEmmiter = require('events').EventEmitter
var util = require('util')
var protobufs = require('protocol-buffers-stream') var protobufs = require('protocol-buffers-stream')
var fs = require('fs') var fs = require('fs')
var schema = fs.readFileSync(__dirname + '/identify.proto') var schema = fs.readFileSync(__dirname + '/identify.proto')
var v6 = require('ip-address').v6 var v6 = require('ip-address').v6
var Id = require('ipfs-peer-id') var Id = require('peer-id')
var multiaddr = require('multiaddr') var multiaddr = require('multiaddr')
exports = module.exports = Identify exports = module.exports = identify
util.inherits(Identify, EventEmmiter) var protoId = '/ipfs/identify/1.0.0'
function Identify (swarm, peerSelf) { exports.protoId = protoId
var self = this var createProtoStream = protobufs(schema)
self.createProtoStream = protobufs(schema)
swarm.registerHandler('/ipfs/identify/1.0.0', function (stream) { function identify (muxedConns, peerInfoSelf, socket, conn, muxer) {
var ps = self.createProtoStream() var msi = new Interactive()
msi.handle(conn, function () {
msi.select(protoId, function (err, ds) {
if (err) {
return console.log(err) // TODO Treat error
}
ps.on('identify', function (msg) { var ps = createProtoStream()
updateSelf(peerSelf, msg.observedAddr)
var peerId = Id.createFromPubKey(msg.publicKey) ps.on('identify', function (msg) {
var peerId = Id.createFromPubKey(msg.publicKey)
updateSelf(peerInfoSelf, msg.observedAddr)
muxedConns[peerId.toB58String()] = {
muxer: muxer,
socket: socket
}
console.log('do I get back')
// TODO: Pass the new discovered info about the peer that contacted us
// to something like the Kademlia Router, so the peerInfo for this peer
// is fresh
// - before this was exectued through a event emitter
// self.emit('peer-update', {
// peerId: peerId,
// listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
// })
})
var socket = swarm.connections[peerId.toB58String()].socket
var mh = getMultiaddr(socket) var mh = getMultiaddr(socket)
ps.identify({ ps.identify({
protocolVersion: 'na', protocolVersion: 'na',
agentVersion: 'na', agentVersion: 'na',
publicKey: peerSelf.id.pubKey, publicKey: peerInfoSelf.id.pubKey,
listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}), listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) {
return mh.buffer
}),
observedAddr: mh.buffer observedAddr: mh.buffer
}) })
self.emit('peer-update', { ps.pipe(ds).pipe(ps)
peerId: peerId, ps.finalize()
listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)}) })
})
}
exports.getHandlerFunction = function (peerInfoSelf, muxedConns) {
return function (conn) {
// wait for the other peer to identify itself
// update our multiaddr with observed addr list
// then get the socket from our list of muxedConns and send the reply back
var ps = createProtoStream()
ps.on('identify', function (msg) {
updateSelf(peerInfoSelf, msg.observedAddr)
var peerId = Id.createFromPubKey(msg.publicKey)
var socket = muxedConns[peerId.toB58String()].socket
var mh = getMultiaddr(socket)
ps.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerInfoSelf.id.pubKey,
listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) {
return mh.buffer
}),
observedAddr: mh.buffer
}) })
// TODO: Pass the new discovered info about the peer that contacted us
// to something like the Kademlia Router, so the peerInfo for this peer
// is fresh
// - before this was exectued through a event emitter
// self.emit('peer-update', {
// peerId: peerId,
// listenAddrs: msg.listenAddrs.map(function (mhb) {
// return multiaddr(mhb)
// })
// })
ps.finalize() ps.finalize()
}) })
ps.pipe(stream).pipe(ps) ps.pipe(conn).pipe(ps)
}) }
swarm.on('connection-unknown', function (conn, socket) {
conn.dialStream(function (err, stream) {
if (err) { return console.log(err) }
var msi = new Interactive()
msi.handle(stream, function () {
msi.select('/ipfs/identify/1.0.0', function (err, ds) {
if (err) { return console.log(err) }
var ps = self.createProtoStream()
ps.on('identify', function (msg) {
var peerId = Id.createFromPubKey(msg.publicKey)
updateSelf(peerSelf, msg.observedAddr)
swarm.connections[peerId.toB58String()] = {
conn: conn,
socket: socket
}
self.emit('peer-update', {
peerId: peerId,
listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
})
})
var mh = getMultiaddr(socket)
ps.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerSelf.id.pubKey,
listenAddrs: peerSelf.multiaddrs.map(function (mh) {return mh.buffer}),
observedAddr: mh.buffer
})
ps.pipe(ds).pipe(ps)
ps.finalize()
})
})
})
})
} }
function getMultiaddr (socket) { function getMultiaddr (socket) {

View File

@ -1,5 +1,6 @@
var multistream = require('multistream-select') var multistream = require('multistream-select')
var async = require('async') var async = require('async')
var identify = require('./identify')
exports = module.exports = Swarm exports = module.exports = Swarm
@ -15,7 +16,10 @@ function Swarm (peerInfo) {
// peerIdB58: { conn: <conn> } // peerIdB58: { conn: <conn> }
self.conns = {} self.conns = {}
// peerIdB58: { muxer: <muxer> } // peerIdB58: {
// muxer: <muxer>,
// socket: socket // so we can extract the info we need for identify
// }
self.muxedConns = {} self.muxedConns = {}
// transportName: { transport: transport, // transportName: { transport: transport,
@ -107,7 +111,7 @@ function Swarm (peerInfo) {
// check if a stream muxer for this peer is available // check if a stream muxer for this peer is available
if (self.muxedConns[peerInfo.id.toB58String()]) { if (self.muxedConns[peerInfo.id.toB58String()]) {
if (protocol) { if (protocol) {
return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()]) return openMuxedStream(self.muxedConns[peerInfo.id.toB58String()].muxer)
} else { } else {
return callback() return callback()
} }
@ -184,7 +188,10 @@ function Swarm (peerInfo) {
muxer.on('stream', userProtocolMuxer) muxer.on('stream', userProtocolMuxer)
self.muxedConns[peerInfo.id.toB58String()] = muxer self.muxedConns[peerInfo.id.toB58String()] = {
muxer: muxer,
socket: conn
}
if (protocol) { if (protocol) {
openMuxedStream(muxer) openMuxedStream(muxer)
@ -245,6 +252,17 @@ function Swarm (peerInfo) {
// close everything // close everything
} }
self.enableIdentify = function () {
// set flag to true
// add identify to the list of handled protocols
self.identify = true
// we pass muxedConns so that identify can access the socket of the other
// peer
self.handleProtocol(identify.protoId,
identify.getHandlerFunction(self.peerInfo, self.muxedConns))
}
self.handleProtocol = function (protocol, handlerFunction) { self.handleProtocol = function (protocol, handlerFunction) {
self.protocols[protocol] = handlerFunction self.protocols[protocol] = handlerFunction
} }
@ -253,7 +271,7 @@ function Swarm (peerInfo) {
function listen (conn) { function listen (conn) {
// TODO apply upgrades // TODO apply upgrades
// TODO then add StreamMuxer if available (and point streams from muxer to userProtocolMuxer) // add StreamMuxer if available (and point streams from muxer to userProtocolMuxer)
if (self.muxers['spdy']) { if (self.muxers['spdy']) {
var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options) var spdy = new self.muxers['spdy'].Muxer(self.muxers['spdy'].options)
@ -263,6 +281,21 @@ function Swarm (peerInfo) {
} }
// TODO This muxer has to be identified! // TODO This muxer has to be identified!
// pass to identify a reference of
// our muxedConn list
// ourselves (peerInfo)
// the conn, which is the socket
// and a stream it can send stuff
if (self.identify) {
muxer.dialStream(function (err, stream) {
if (err) {
return console.log(err) // TODO Treat error
}
// conn === socket at this point
console.log('bimbas')
identify(self.muxedConns, self.peerInfo, conn, stream, muxer)
})
}
muxer.on('stream', userProtocolMuxer) muxer.on('stream', userProtocolMuxer)
}) })

View File

@ -358,8 +358,69 @@ experiment('With a SPDY Stream Muxer', function () {
}) })
} }
}) })
test('identify', function (done) { test('identify', function (done) {
done() var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p1 = new Peer(Id.create(), [])
var sw1 = new Swarm(p1)
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready)
sw1.addStreamMuxer('spdy', Spdy, {})
sw1.enableIdentify()
var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
var p2 = new Peer(Id.create(), [])
var sw2 = new Swarm(p2)
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready)
sw2.addStreamMuxer('spdy', Spdy, {})
sw2.enableIdentify()
sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
// formallity so that the conn starts flowing
conn.on('data', function (chunk) {})
conn.end()
conn.on('end', function () {
expect(Object.keys(sw1.muxedConns).length).to.equal(1)
var cleaningCounter = 0
sw1.closeConns(cleaningUp)
sw2.closeConns(cleaningUp)
sw1.closeListener('tcp', cleaningUp)
sw2.closeListener('tcp', cleaningUp)
function cleaningUp () {
cleaningCounter++
// TODO FIX: here should be 4, but because super wrapping of
// streams, it makes it so hard to properly close the muxed
// streams - https://github.com/indutny/spdy-transport/issues/14
if (cleaningCounter < 3) {
return
}
// give time for identify to finish
setTimeout(function () {
expect(Object.keys(sw2.muxedConns).length).to.equal(1)
done()
}, 500)
}
})
})
var count = 0
function ready () {
count++
if (count < 2) {
return
}
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
conn.on('data', function () {})
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
}
}) })
}) })
}) })