stream multiplexing done, starting on identify refactor

This commit is contained in:
David Dias 2016-03-07 12:47:11 +00:00
parent 9d8ee67c61
commit f8e14e4ddf
5 changed files with 206 additions and 409 deletions

View File

@ -32,7 +32,7 @@
"bl": "^1.1.2", "bl": "^1.1.2",
"chai": "^3.5.0", "chai": "^3.5.0",
"istanbul": "^0.4.2", "istanbul": "^0.4.2",
"libp2p-spdy": "^0.1.0", "libp2p-spdy": "^0.2.3",
"libp2p-tcp": "^0.2.1", "libp2p-tcp": "^0.2.1",
"mocha": "^2.4.5", "mocha": "^2.4.5",
"multiaddr": "^1.1.1", "multiaddr": "^1.1.1",

View File

@ -1,30 +1,47 @@
/* /*
* Identify is one of the protocols swarms speaks in order to broadcast and learn * Identify is one of the protocols swarms speaks in order to
* about the ip:port pairs a specific peer is available through * broadcast and learn about the ip:port pairs a specific peer
* is available through
*/ */
var Interactive = require('multistream-select').Interactive // var multistream = require('multistream-select')
var protobufs = require('protocol-buffers-stream') // var protobufs = require('protocol-buffers-stream')
var fs = require('fs') // var fs = require('fs')
var path = require('path') // var path = require('path')
var schema = fs.readFileSync(path.join(__dirname, 'identify.proto')) // var protobufs = require('protocol-buffers-stream')
var Address6 = require('ip-address').Address6 // var schema = fs.readFileSync(path.join(__dirname, 'identify.proto'))
var Id = require('peer-id') // var Address6 = require('ip-address').Address6
var multiaddr = require('multiaddr') // var Id = require('peer-id')
// var multiaddr = require('multiaddr')
exports = module.exports = identify exports = module.exports
var protoId = '/ipfs/identify/1.0.0' exports.multicodec = '/ipfs/identify/1.0.0'
exports.protoId = protoId exports.exec = (muxedConn, callback) => {
var createProtoStream = protobufs(schema) // TODO
// 1. open a stream
// 2. multistream into identify
// 3. send what I see from this other peer
// 4. receive what the other peer sees from me
// 4. callback with (err, peerInfo)
}
exports.handler = (peerInfo) => {
return function (conn) {
// TODO
// 1. receive incoming observed info about me
// 2. send back what I see from the other
}
}
/*
function identify (muxedConns, peerInfoSelf, socket, conn, muxer) { function identify (muxedConns, peerInfoSelf, socket, conn, muxer) {
var msi = new Interactive() var msi = new Interactive()
msi.handle(conn, function () { msi.handle(conn, function () {
msi.select(protoId, function (err, ds) { msi.select(protoId, function (err, ds) {
if (err) { if (err) {
return console.log(err) // TODO Treat error return console.log(err)
} }
var ps = createProtoStream() var ps = createProtoStream()
@ -39,16 +56,6 @@ function identify (muxedConns, peerInfoSelf, socket, conn, muxer) {
socket: socket socket: socket
} }
// TODO: Pass the new discovered info about the peer that contacted us
// to something like the Kademlia Router, so the peerInfo for this peer
// is fresh
// - before this was exectued through a event emitter
// self.emit('peer-update', {
// peerId: peerId,
// listenAddrs: msg.listenAddrs.map(function (mhb) {return multiaddr(mhb)})
// })
})
var mh = getMultiaddr(socket) var mh = getMultiaddr(socket)
ps.identify({ ps.identify({
@ -156,4 +163,4 @@ function updateSelf (peerSelf, observedAddr) {
peerSelf.multiaddrs.push(omh) peerSelf.multiaddrs.push(omh)
} }
} }
} }*/

View File

@ -1,6 +1,6 @@
const multistream = require('multistream-select') const multistream = require('multistream-select')
// const async = require('async') // const async = require('async')
// const identify = require('./identify') const identify = require('./identify')
const PassThrough = require('stream').PassThrough const PassThrough = require('stream').PassThrough
exports = module.exports = Swarm exports = module.exports = Swarm
@ -130,16 +130,28 @@ function Swarm (peerInfo) {
// { muxerCodec: <muxer> } e.g { '/spdy/0.3.1': spdy } // { muxerCodec: <muxer> } e.g { '/spdy/0.3.1': spdy }
this.muxers = {} this.muxers = {}
this.connection.addStreamMuxer = (muxer) => { this.connection.addStreamMuxer = (muxer) => {
// TODO // for dialing
// .handle(protocol, () => { this.muxers[muxer.multicodec] = muxer
// after attaching the stream muxer, check if identify is enabled
// }) // for listening
// TODO add to the list of muxers available this.handle(muxer.multicodec, (conn) => {
const muxedConn = muxer(conn, true)
muxedConn.on('stream', connHandler)
if (this.identify) {
identify.exec(muxedConn, (err, pi) => {
if (err) {}
// TODO muxedConns[pi.id.toB58String()].muxer = muxedConn
})
}
})
} }
// enable the Identify protocol // enable the Identify protocol
this.identify = false
this.connection.reuse = () => { this.connection.reuse = () => {
// TODO identify this.identify = true
this.handle(identify.multicodec, identify.handler(peerInfo))
} }
const self = this // couldn't get rid of this const self = this // couldn't get rid of this
@ -224,14 +236,40 @@ function Swarm (peerInfo) {
} }
function attemptMuxerUpgrade (conn, cb) { function attemptMuxerUpgrade (conn, cb) {
if (Object.keys(self.muxers).length === 0) { const muxers = Object.keys(self.muxers)
if (muxers.length === 0) {
return cb(new Error('no muxers available')) return cb(new Error('no muxers available'))
} }
// TODO add muxer to the muxedConns object for the peerId
// TODO if it succeeds, add incomming open coons to connHandler // 1. try to handshake in one of the muxers available
// 2. if succeeds
// - add the muxedConn to the list of muxedConns
// - add incomming new streams to connHandler
nextMuxer(muxers.shift())
function nextMuxer (key) {
var msI = new multistream.Interactive()
msI.handle(conn, function () {
msI.select(key, (err, conn) => {
if (err) {
if (muxers.length === 0) {
cb(new Error('could not upgrade to stream muxing'))
} else {
nextMuxer(muxers.shift())
}
}
const muxedConn = self.muxers[key](conn, false)
self.muxedConns[b58Id] = {}
self.muxedConns[b58Id].muxer = muxedConn
cb(null, muxedConn)
})
})
}
} }
function openConnInMuxedConn (muxer, cb) { function openConnInMuxedConn (muxer, cb) {
// TODO open a conn in this muxer cb(muxer.newStream())
} }
function protocolHandshake (conn, protocol, cb) { function protocolHandshake (conn, protocol, cb) {
@ -255,6 +293,10 @@ function Swarm (peerInfo) {
this.close = (callback) => { this.close = (callback) => {
var count = 0 var count = 0
Object.keys(this.muxedConns).forEach((key) => {
this.muxedConns[key].muxer.end()
})
Object.keys(this.transports).forEach((key) => { Object.keys(this.transports).forEach((key) => {
this.transports[key].close(() => { this.transports[key].close(() => {
if (++count === Object.keys(this.transports).length) { if (++count === Object.keys(this.transports).length) {

View File

@ -1,354 +0,0 @@
/* eslint-env mocha */
var async = require('async')
var expect = require('chai').expect
var multiaddr = require('multiaddr')
var Id = require('peer-id')
var Peer = require('peer-info')
var Swarm = require('../src')
var tcp = require('libp2p-tcp')
var Spdy = require('libp2p-spdy')
// because of Travis-CI
process.on('uncaughtException', function (err) {
console.log('Caught exception: ' + err)
})
describe('Basics', function () {
it('enforces creation with new', function (done) {
expect(function () {
Swarm()
}).to.throw()
done()
})
it('it throws an exception without peerSelf', function (done) {
expect(function () {
var sw = new Swarm()
sw.close()
}).to.throw(Error)
done()
})
})
describe('When dialing', function () {
describe('if the swarm does add any of the peer transports', function () {
it('it returns an error', function (done) {
var peerOne = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8090')])
var peerTwo = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/8091')])
var swarm = new Swarm(peerOne)
swarm.dial(peerTwo, {}, function (err) {
expect(err).to.exist
done()
})
})
})
})
describe('Without a Stream Muxer', function () {
describe('and one swarm over tcp', function () {
it('add the transport', function (done) {
var mh = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p = new Peer(Id.create(), [])
var sw = new Swarm(p)
sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, ready)
function ready () {
expect(sw.transports['tcp'].options).to.deep.equal({})
expect(sw.transports['tcp'].dialOptions).to.deep.equal({})
expect(sw.transports['tcp'].listenOptions).to.deep.equal({port: 8010})
expect(sw.transports['tcp'].transport).to.deep.equal(tcp)
sw.close(done)
}
})
})
describe('and two swarms over tcp', function () {
var mh1, p1, sw1, mh2, p2, sw2
beforeEach(function (done) {
mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
p1 = new Peer(Id.create(), [])
sw1 = new Swarm(p1)
mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
p2 = new Peer(Id.create(), [])
sw2 = new Swarm(p2)
async.parallel([
function (cb) {
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb)
},
function (cb) {
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb)
}
], done)
})
afterEach(function (done) {
async.parallel([sw1.close, sw2.close], done)
})
it('dial a conn', function (done) {
sw1.dial(p2, {}, function (err) {
expect(err).to.equal(undefined)
expect(Object.keys(sw1.conns).length).to.equal(1)
done()
})
})
it('dial a conn on a protocol', function (done) {
sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
conn.end()
conn.on('end', done)
})
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
})
it('dial a protocol on a previous created conn', function (done) {
sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
conn.end()
conn.on('end', done)
})
sw1.dial(p2, {}, function (err) {
expect(err).to.equal(undefined)
expect(Object.keys(sw1.conns).length).to.equal(1)
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
})
})
// it('add an upgrade', function (done) { done() })
// it('dial a conn on top of a upgrade', function (done) { done() })
// it('dial a conn on a protocol on top of a upgrade', function (done) { done() })
})
/* TODO
describe('udp', function () {
it('add the transport', function (done) { done() })
it('dial a conn', function (done) { done() })
it('dial a conn on a protocol', function (done) { done() })
it('add an upgrade', function (done) { done() })
it('dial a conn on top of a upgrade', function (done) { done() })
it('dial a conn on a protocol on top of a upgrade', function (done) { done() })
}) */
/* TODO
describe('udt', function () {
it('add the transport', function (done) { done() })
it('dial a conn', function (done) { done() })
it('dial a conn on a protocol', function (done) { done() })
it('add an upgrade', function (done) { done() })
it('dial a conn on top of a upgrade', function (done) { done() })
it('dial a conn on a protocol on top of a upgrade', function (done) { done() })
}) */
/* TODO
describe('utp', function () {
it('add the transport', function (done) { done() })
it('dial a conn', function (done) { done() })
it('dial a conn on a protocol', function (done) { done() })
it('add an upgrade', function (done) { done() })
it('dial a conn on top of a upgrade', function (done) { done() })
it('dial a conn on a protocol on top of a upgrade', function (done) { done() })
}) */
})
describe('With a SPDY Stream Muxer', function () {
describe('and one swarm over tcp', function () {
// TODO: What is the it here?
it('add Stream Muxer', function (done) {
// var mh = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p = new Peer(Id.create(), [])
var sw = new Swarm(p)
sw.addStreamMuxer('spdy', Spdy, {})
done()
})
})
describe('and two swarms over tcp', function () {
var mh1, p1, sw1, mh2, p2, sw2
beforeEach(function (done) {
mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
p1 = new Peer(Id.create(), [])
sw1 = new Swarm(p1)
sw1.addStreamMuxer('spdy', Spdy, {})
mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
p2 = new Peer(Id.create(), [])
sw2 = new Swarm(p2)
sw2.addStreamMuxer('spdy', Spdy, {})
async.parallel([
function (cb) {
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb)
},
function (cb) {
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb)
}
], done)
})
function afterEach (done) {
var cleaningCounter = 0
sw1.closeConns(cleaningUp)
sw2.closeConns(cleaningUp)
sw1.closeListener('tcp', cleaningUp)
sw2.closeListener('tcp', cleaningUp)
function cleaningUp () {
cleaningCounter++
// TODO FIX: here should be 4, but because super wrapping of
// streams, it makes it so hard to properly close the muxed
// streams - https://github.com/indutny/spdy-transport/issues/14
if (cleaningCounter < 3) {
return
}
done()
}
}
it('dial a conn on a protocol', function (done) {
sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
// formallity so that the conn starts flowing
conn.on('data', function (chunk) {})
conn.end()
conn.on('end', function () {
expect(Object.keys(sw1.muxedConns).length).to.equal(1)
expect(Object.keys(sw2.muxedConns).length).to.equal(0)
afterEach(done)
})
})
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
conn.on('data', function () {})
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
})
it('dial two conns (transport reuse)', function (done) {
sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
// formality so that the conn starts flowing
conn.on('data', function (chunk) {})
conn.end()
conn.on('end', function () {
expect(Object.keys(sw1.muxedConns).length).to.equal(1)
expect(Object.keys(sw2.muxedConns).length).to.equal(0)
afterEach(done)
})
})
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
// TODO Improve clarity
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
conn.on('data', function () {})
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
conn.on('data', function () {})
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
})
})
describe('and two identity enabled swarms over tcp', function () {
var mh1, p1, sw1, mh2, p2, sw2
beforeEach(function (done) {
mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
p1 = new Peer(Id.create(), [])
sw1 = new Swarm(p1)
sw1.addStreamMuxer('spdy', Spdy, {})
sw1.enableIdentify()
mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
p2 = new Peer(Id.create(), [])
sw2 = new Swarm(p2)
sw2.addStreamMuxer('spdy', Spdy, {})
sw2.enableIdentify()
async.parallel([
function (cb) {
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, cb)
},
function (cb) {
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, cb)
}
], done)
})
afterEach(function (done) {
var cleaningCounter = 0
sw1.closeConns(cleaningUp)
sw2.closeConns(cleaningUp)
sw1.closeListener('tcp', cleaningUp)
sw2.closeListener('tcp', cleaningUp)
function cleaningUp () {
cleaningCounter++
// TODO FIX: here should be 4, but because super wrapping of
// streams, it makes it so hard to properly close the muxed
// streams - https://github.com/indutny/spdy-transport/issues/14
if (cleaningCounter < 3) {
return
}
// give time for identify to finish
setTimeout(function () {
expect(Object.keys(sw2.muxedConns).length).to.equal(1)
done()
}, 500)
}
})
it('identify', function (done) {
sw2.handleProtocol('/sparkles/1.0.0', function (conn) {
// formallity so that the conn starts flowing
conn.on('data', function (chunk) {})
conn.end()
conn.on('end', function () {
expect(Object.keys(sw1.muxedConns).length).to.equal(1)
done()
})
})
sw1.dial(p2, {}, '/sparkles/1.0.0', function (err, conn) {
conn.on('data', function () {})
expect(err).to.equal(null)
expect(Object.keys(sw1.conns).length).to.equal(0)
conn.end()
})
})
})
})

View File

@ -9,7 +9,7 @@ const Peer = require('peer-info')
const Swarm = require('../src') const Swarm = require('../src')
const TCP = require('libp2p-tcp') const TCP = require('libp2p-tcp')
const bl = require('bl') const bl = require('bl')
// var SPDY = require('libp2p-spdy') const spdy = require('libp2p-spdy')
describe('basics', () => { describe('basics', () => {
it('throws on missing peerInfo', (done) => { it('throws on missing peerInfo', (done) => {
@ -196,7 +196,7 @@ describe('transport - websockets', function () {
}) })
describe('high level API - 1st without stream multiplexing (on TCP)', function () { describe('high level API - 1st without stream multiplexing (on TCP)', function () {
this.timeout(10000) this.timeout(20000)
var swarmA var swarmA
var peerA var peerA
@ -278,7 +278,7 @@ describe('high level API - 1st without stream multiplexing (on TCP)', function (
}) })
describe('stream muxing (on TCP)', function () { describe('stream muxing (on TCP)', function () {
this.timeout(10000) this.timeout(20000)
describe('multiplex', () => { describe('multiplex', () => {
before((done) => { done() }) before((done) => { done() })
@ -291,31 +291,131 @@ describe('stream muxing (on TCP)', function () {
it.skip('enable identify to reuse incomming muxed conn', (done) => {}) it.skip('enable identify to reuse incomming muxed conn', (done) => {})
}) })
describe('spdy', () => { describe('spdy', () => {
before((done) => { done() }) var swarmA
after((done) => { done() }) var peerA
var swarmB
var peerB
var swarmC
var peerC
it.skip('add', (done) => {}) before((done) => {
it.skip('handle + dial on protocol', (done) => {}) peerA = new Peer()
it.skip('dial to warm conn', (done) => {}) peerB = new Peer()
it.skip('dial on protocol, reuse warmed conn', (done) => {}) peerC = new Peer()
it.skip('enable identify to reuse incomming muxed conn', (done) => {})
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002'))
peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003'))
swarmA = new Swarm(peerA)
swarmB = new Swarm(peerB)
swarmC = new Swarm(peerC)
swarmA.transport.add('tcp', new TCP())
swarmA.transport.listen('tcp', {}, null, ready)
swarmB.transport.add('tcp', new TCP())
swarmB.transport.listen('tcp', {}, null, ready)
swarmC.transport.add('tcp', new TCP())
swarmC.transport.listen('tcp', {}, null, ready)
var counter = 0
function ready () {
if (++counter === 3) {
done()
}
}
})
after((done) => {
var counter = 0
swarmA.close(closed)
swarmB.close(closed)
swarmC.close(closed)
function closed () {
if (++counter === 3) {
done()
}
}
})
it('add', (done) => {
swarmA.connection.addStreamMuxer(spdy)
swarmB.connection.addStreamMuxer(spdy)
swarmC.connection.addStreamMuxer(spdy)
done()
})
it('handle + dial on protocol', (done) => {
swarmB.handle('/abacaxi/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmA.dial(peerB, '/abacaxi/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmA.muxedConns).length).to.equal(1)
conn.end()
conn.on('end', done)
})
})
it('dial to warm conn', (done) => {
swarmB.dial(peerA, (err) => {
expect(err).to.not.exist
expect(Object.keys(swarmB.conns).length).to.equal(0)
expect(Object.keys(swarmB.muxedConns).length).to.equal(1)
done()
})
})
it('dial on protocol, reuse warmed conn', (done) => {
swarmA.handle('/papaia/1.0.0', (conn) => {
conn.pipe(conn)
})
swarmB.dial(peerA, '/papaia/1.0.0', (err, conn) => {
expect(err).to.not.exist
expect(Object.keys(swarmB.conns).length).to.equal(0)
expect(Object.keys(swarmB.muxedConns).length).to.equal(1)
conn.end()
conn.on('end', done)
})
})
it.skip('enable identify to reuse incomming muxed conn', (done) => {
swarmA.connection.reuse()
swarmC.connection.reuse()
swarmC.dial(peerA, (err) => {
expect(err).to.not.exist
setTimeout(() => {
expect(Object.keys(swarmC.muxedConns).length).to.equal(1)
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
}, 100)
})
})
}) })
}) })
/*
describe('conn upgrades', function () { describe('conn upgrades', function () {
this.timeout(10000) this.timeout(20000)
describe('secio on tcp', () => { describe('secio on tcp', () => {
before((done) => { done() }) // before((done) => { done() })
after((done) => { done() }) // after((done) => { done() })
it.skip('add', (done) => {}) it.skip('add', (done) => {})
it.skip('dial', (done) => {}) it.skip('dial', (done) => {})
it.skip('tls on a muxed stream (not the full conn)', (done) => {}) it.skip('tls on a muxed stream (not the full conn)', (done) => {})
}) })
describe('tls on tcp', () => { describe('tls on tcp', () => {
before((done) => { done() }) // before((done) => { done() })
after((done) => { done() }) // after((done) => { done() })
it.skip('add', (done) => {}) it.skip('add', (done) => {})
it.skip('dial', (done) => {}) it.skip('dial', (done) => {})
@ -324,12 +424,14 @@ describe('conn upgrades', function () {
}) })
describe('high level API - with everything mixed all together!', function () { describe('high level API - with everything mixed all together!', function () {
this.timeout(10000) this.timeout(20000)
before((done) => { done() }) // before((done) => { done() })
// after((done) => { done() })
it.skip('add tcp', (done) => {}) it.skip('add tcp', (done) => {})
it.skip('add utp', (done) => {}) it.skip('add utp', (done) => {})
it.skip('add websockets', (done) => {}) it.skip('add websockets', (done) => {})
it.skip('dial', (done) => {}) it.skip('dial', (done) => {})
}) })
*/