add multistream and muxer tests

This commit is contained in:
David Dias
2015-07-15 11:34:37 -07:00
parent 3e56559396
commit a2a7df870b
8 changed files with 242 additions and 60 deletions

View File

@ -1,4 +1,4 @@
var Identify = require('./../src/identify') // var Identify = require('./../src/identify')
var Swarm = require('./../src') var Swarm = require('./../src')
var Peer = require('ipfs-peer') var Peer = require('ipfs-peer')
var Id = require('ipfs-peer-id') var Id = require('ipfs-peer-id')
@ -7,19 +7,19 @@ var multiaddr = require('multiaddr')
var a = new Swarm() var a = new Swarm()
a.port = 4000 a.port = 4000
// a.listen() // a.listen()
var peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)]) // var peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)])
// attention, peerB Id isn't going to match, but whateves // attention, peerB Id isn't going to match, but whateves
var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/4001')]) var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/4001')])
var i = new Identify(a, peerA) // var i = new Identify(a, peerA)
i.on('thenews', function (news) { // i.on('thenews', function (news) {
console.log('such news') // console.log('such news')
}) // })
a.openStream(peerB, '/ipfs/sparkles/1.2.3', function (err, stream) { a.openStream(peerB, '/ipfs/sparkles/1.2.3', function (err, stream) {
if (err) { if (err) {
return console.log('ERR - ', err) return console.log(err)
} }
console.log('WoHoo, dialed a stream') console.log('WoHoo, dialed a stream')
}) })

View File

@ -1,17 +1,17 @@
var Identify = require('./../src/identify') // var Identify = require('./../src/identify')
var Swarm = require('./../src') var Swarm = require('./../src')
var Peer = require('ipfs-peer') // var Peer = require('ipfs-peer')
var Id = require('ipfs-peer-id') // var Id = require('ipfs-peer-id')
var multiaddr = require('multiaddr') // var multiaddr = require('multiaddr')
var b = new Swarm() var b = new Swarm()
b.port = 4001 b.port = 4001
var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + b.port)]) // var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + b.port)])
var i = new Identify(b, peerB) // var i = new Identify(b, peerB)
i.on('thenews', function (news) { // i.on('thenews', function (news) {
console.log('such news') // console.log('such news')
}) // })
b.on('error', function (err) { b.on('error', function (err) {
console.log(err) console.log(err)
@ -19,6 +19,10 @@ b.on('error', function (err) {
b.listen() b.listen()
b.registerHandle('/ipfs/sparkles/1.2.3', function (err, stream) { b.registerHandler('/ipfs/sparkles/1.2.3', function (stream) {
// if (err) {
// return console.log(err)
// }
console.log('woop got a stream') console.log('woop got a stream')
}) })

View File

@ -30,12 +30,13 @@
"lab": "^5.13.0", "lab": "^5.13.0",
"precommit-hook": "^3.0.0", "precommit-hook": "^3.0.0",
"standard": "^4.5.2", "standard": "^4.5.2",
"stream-pair": "^1.0.2" "stream-pair": "^1.0.3"
}, },
"dependencies": { "dependencies": {
"async": "^1.3.0", "async": "^1.3.0",
"multiaddr": "^1.0.0", "multiaddr": "^1.0.0",
"multiplex-stream-muxer": "^0.2.0",
"multistream-select": "^0.6.1", "multistream-select": "^0.6.1",
"spdy-transport": "indutny/spdy-transport" "spdy-stream-muxer": "^0.2.0"
} }
} }

View File

@ -1,6 +1,6 @@
/* /*
* Identify is one of the protocols swarms speaks in order to broadcast and learn about the ip:port * Identify is one of the protocols swarms speaks in order to broadcast and learn
* pairs a specific peer is available through * about the ip:port pairs a specific peer is available through
*/ */
var Interactive = require('multistream-select').Interactive var Interactive = require('multistream-select').Interactive
@ -15,6 +15,8 @@ function Identify (swarm, peerSelf) {
var self = this var self = this
swarm.registerHandler('/ipfs/identify/1.0.0', function (stream) { swarm.registerHandler('/ipfs/identify/1.0.0', function (stream) {
console.log('DO I EVER GET CALLED?')
var identifyMsg = {} var identifyMsg = {}
identifyMsg = {} identifyMsg = {}
identifyMsg.sender = exportPeer(peerSelf) identifyMsg.sender = exportPeer(peerSelf)
@ -39,19 +41,25 @@ function Identify (swarm, peerSelf) {
// send back our stuff // send back our stuff
}) })
swarm.on('connection-unknown', function (spdyConnection) { swarm.on('connection-unknown', function (conn) {
spdyConnection.request({ console.log('IDENTIFY - DIALING STREAM FROM SERVER')
path: '/',
method: 'GET' conn.on('error', function (err) {
}, function (err, stream) { console.log('CAPUT-A', err)
})
conn.dialStream(function (err, stream) {
if (err) { if (err) {
return console.log(err) return console.log(err)
} }
stream.on('error', function (err) {
console.log('CAPUT-B', err)
})
console.log('GOT STREAM')
var msi = new Interactive() var msi = new Interactive()
msi.handle(stream, function () { msi.handle(stream, function () {
console.log('HANDLE GOOD')
msi.select('/ipfs/identify/1.0.0', function (err, ds) { msi.select('/ipfs/identify/1.0.0', function (err, ds) {
if (err) { return console.log('err') } if (err) { return console.log(err) }
var identifyMsg = {} var identifyMsg = {}
identifyMsg = {} identifyMsg = {}
identifyMsg.sender = exportPeer(peerSelf) identifyMsg.sender = exportPeer(peerSelf)
@ -68,8 +76,9 @@ function Identify (swarm, peerSelf) {
stream.on('end', function () { stream.on('end', function () {
answer = JSON.parse(answer) answer = JSON.parse(answer)
swarm.connections[answer.sender.id] = spdyConnection swarm.connections[answer.sender.id] = conn
console.log('BAM')
self.emit('peer-update', answer) self.emit('peer-update', answer)
}) })

2
src/stream-muxer.js Normal file
View File

@ -0,0 +1,2 @@
exports = module.exports = require('spdy-stream-muxer')
// exports = module.exports = require('multiplex-stream-muxer')

View File

@ -1,7 +1,7 @@
var tcp = require('net') var tcp = require('net')
var Select = require('multistream-select').Select var Select = require('multistream-select').Select
var Interactive = require('multistream-select').Interactive var Interactive = require('multistream-select').Interactive
var spdy = require('spdy-transport') var Muxer = require('./stream-muxer')
var log = require('ipfs-logger').group('swarm') var log = require('ipfs-logger').group('swarm')
var async = require('async') var async = require('async')
var EventEmitter = require('events').EventEmitter var EventEmitter = require('events').EventEmitter
@ -38,18 +38,25 @@ function Swarm () {
ms.addHandler('/spdy/3.1.0', function (ds) { ms.addHandler('/spdy/3.1.0', function (ds) {
log.info('Negotiated spdy with incoming socket') log.info('Negotiated spdy with incoming socket')
var conn = spdy.connection.create(ds, { protocol: 'spdy', isServer: true }) var conn = new Muxer().attach(ds, false)
conn.start(3.1)
self.emit('connection-unknown', conn)
// attach multistream handlers to incoming streams // attach multistream handlers to incoming streams
conn.on('stream', function () {
console.log('HERE')
})
conn.on('error', function () {
console.log('error here')
})
conn.on('stream', registerHandles) conn.on('stream', registerHandles)
errorUp(self, conn) errorUp(self, conn)
// IDENTIFY DOES THAT FOR US // FOR IDENTIFY
// conn.on('close', function () { delete self.connections[conn.peerId] }) self.emit('connection-unknown', conn)
// IDENTIFY DOES THIS FOR US
// conn.on('close', function () { delete self.connections[conn.peerId] })
}) })
}).listen(self.port, ready) }).listen(self.port, ready)
errorUp(self, self.listener) errorUp(self, self.listener)
@ -95,11 +102,15 @@ function Swarm () {
msi.select('/spdy/3.1.0', function (err, ds) { msi.select('/spdy/3.1.0', function (err, ds) {
if (err) { cb(err) } if (err) { cb(err) }
var conn = spdy.connection.create(ds, { protocol: 'spdy', isServer: false }) var conn = new Muxer().attach(ds, false)
conn.start(3.1) conn.on('stream', function () {
console.log('WOOHO NEW STREAM')
})
conn.on('error', function () {
console.log('BADUM TSS')
})
conn.on('stream', registerHandles) conn.on('stream', registerHandles)
self.connections[peer.id.toB58String()] = conn self.connections[peer.id.toB58String()] = conn
conn.on('close', function () { delete self.connections[peer.id.toB58String()] }) conn.on('close', function () { delete self.connections[peer.id.toB58String()] })
errorUp(self, conn) errorUp(self, conn)
@ -109,12 +120,14 @@ function Swarm () {
} }
function createStream (peer, protocol, cb) { function createStream (peer, protocol, cb) {
// spawn new stream // spawn new muxed stream
var conn = self.connections[peer.id.toB58String()] var conn = self.connections[peer.id.toB58String()]
conn.request({path: '/', method: 'GET'}, function (err, stream) { conn.dialStream(function (err, stream) {
if (err) { return cb(err) } if (err) { return cb(err) }
errorUp(self, stream) errorUp(self, stream)
stream.on('error', function (err) {
console.log('error here - ', err)
})
// negotiate desired protocol // negotiate desired protocol
var msi = new Interactive() var msi = new Interactive()
msi.handle(stream, function () { msi.handle(stream, function () {
@ -128,7 +141,9 @@ function Swarm () {
} }
self.registerHandler = function (protocol, handlerFunc) { self.registerHandler = function (protocol, handlerFunc) {
console.log('new handler coming in for - ', protocol)
if (self.handles[protocol]) { if (self.handles[protocol]) {
console.log('here already - ', protocol)
return handlerFunc(new Error('Handle for protocol already exists', protocol)) return handlerFunc(new Error('Handle for protocol already exists', protocol))
} }
self.handles.push({ protocol: protocol, func: handlerFunc }) self.handles.push({ protocol: protocol, func: handlerFunc })
@ -153,10 +168,12 @@ function Swarm () {
function registerHandles (stream) { function registerHandles (stream) {
log.info('Registering protocol handlers on new stream') log.info('Registering protocol handlers on new stream')
console.log('REGISTERING HANDLES')
errorUp(self, stream) errorUp(self, stream)
var msH = new Select() var msH = new Select()
msH.handle(stream) msH.handle(stream)
self.handles.forEach(function (handle) { self.handles.forEach(function (handle) {
console.log(' ->', handle.protocol)
msH.addHandler(handle.protocol, handle.func) msH.addHandler(handle.protocol, handle.func)
}) })
} }

View File

@ -0,0 +1,139 @@
var Lab = require('lab')
var Code = require('code')
var lab = exports.lab = Lab.script()
var experiment = lab.experiment
var test = lab.test
var beforeEach = lab.beforeEach
var afterEach = lab.afterEach
var expect = Code.expect
var Muxer = require('./../src/stream-muxer.js')
var multistream = require('multistream-select')
var Interactive = multistream.Interactive
var Select = multistream.Select
var streamPair = require('stream-pair')
beforeEach(function (done) {
done()
})
afterEach(function (done) {
done()
})
experiment('MULTISTREAM AND STREAM MUXER', function () {
test('Open a socket and multistream-select it into spdy', function (done) {
var pair = streamPair.create()
var msI = new Interactive()
var msS = new Select()
var dialerMuxer = new Muxer()
var listenerMuxer = new Muxer()
msS.handle(pair.other)
msS.addHandler('/spdy/0.3.1', function (stream) {
var listenerConn = listenerMuxer.attach(stream, true)
expect(typeof listenerConn).to.be.equal('object')
done()
})
msI.handle(pair, function () {
msI.select('/spdy/0.3.1', function (err, stream) {
expect(err).to.not.be.instanceof(Error)
var dialerConn = dialerMuxer.attach(stream, false)
expect(typeof dialerConn).to.be.equal('object')
})
})
})
test('socket->ms-select into spdy->stream from dialer->ms-select into other protocol', function (done) {
var pair = streamPair.create()
var msI = new Interactive()
var msS = new Select()
var dialerMuxer = new Muxer()
var listenerMuxer = new Muxer()
msS.handle(pair.other)
msS.addHandler('/spdy/0.3.1', function (stream) {
var listenerConn = listenerMuxer.attach(stream, true)
listenerConn.on('stream', function (stream) {
stream.on('data', function (chunk) {
expect(chunk.toString()).to.equal('mux all the streams')
done()
})
})
})
msI.handle(pair, function () {
msI.select('/spdy/0.3.1', function (err, stream) {
expect(err).to.not.be.instanceof(Error)
var dialerConn = dialerMuxer.attach(stream, false)
dialerConn.dialStream(function (err, stream) {
expect(err).to.not.be.instanceof(Error)
stream.write('mux all the streams')
})
})
})
})
test('socket->ms-select into spdy->stream from listener->ms-select into another protocol', function (done) {
var pair = streamPair.create()
var msI = new Interactive()
var msS = new Select()
var dialerMuxer = new Muxer()
var listenerMuxer = new Muxer()
msS.handle(pair.other)
msS.addHandler('/spdy/0.3.1', function (stream) {
var listenerConn = listenerMuxer.attach(stream, true)
listenerConn.on('stream', function (stream) {
stream.on('data', function (chunk) {
expect(chunk.toString()).to.equal('mux all the streams')
listenerConn.dialStream(function (err, stream) {
expect(err).to.not.be.instanceof(Error)
var msI2 = new Interactive()
msI2.handle(stream, function () {
msI2.select('/other/protocol', function (err, stream) {
expect(err).to.not.be.instanceof(Error)
stream.write('the other protocol')
})
})
})
})
})
})
msI.handle(pair, function () {
msI.select('/spdy/0.3.1', function (err, stream) {
expect(err).to.not.be.instanceof(Error)
var dialerConn = dialerMuxer.attach(stream, false)
dialerConn.dialStream(function (err, stream) {
expect(err).to.not.be.instanceof(Error)
stream.write('mux all the streams')
})
dialerConn.on('stream', function (stream) {
var msS2 = new Select()
msS2.handle(stream)
msS2.addHandler('/other/protocol', function (stream) {
stream.on('data', function (chunk) {
expect(chunk.toString()).to.equal('the other protocol')
done()
})
})
})
})
})
})
})

View File

@ -41,7 +41,7 @@ afterEach(function (done) {
swarmB.closeListener() swarmB.closeListener()
done() done()
}) })
/*
experiment('BASE', function () { experiment('BASE', function () {
test('Open a stream', function (done) { test('Open a stream', function (done) {
var protocol = '/sparkles/3.3.3' var protocol = '/sparkles/3.3.3'
@ -60,9 +60,7 @@ experiment('BASE', function () {
test('Reuse connection (from dialer)', function (done) { test('Reuse connection (from dialer)', function (done) {
var protocol = '/sparkles/3.3.3' var protocol = '/sparkles/3.3.3'
swarmB.registerHandler(protocol, function (err, stream) { swarmB.registerHandler(protocol, function (stream) {})
expect(err).to.not.be.instanceof(Error)
})
swarmA.openStream(peerB, protocol, function (err, stream) { swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error) expect(err).to.not.be.instanceof(Error)
@ -75,26 +73,37 @@ experiment('BASE', function () {
}) })
}) })
}) })
*/
experiment('IDENTIFY', function () { experiment('IDENTIFY', function () {
test('Attach Identify, open a stream, see a peer update', function (done) { test('Attach Identify, open a stream, see a peer update', function (done) {
swarmA.on('error', function (err) {
console.log('A - ', err)
})
swarmB.on('error', function (err) {
console.log('B - ', err)
})
var protocol = '/sparkles/3.3.3' var protocol = '/sparkles/3.3.3'
var identifyA = new Identify(swarmA, peerA) var identifyA = new Identify(swarmA, peerA)
var identifyB = new Identify(swarmB, peerB) var identifyB = new Identify(swarmB, peerB)
setTimeout(function () {
swarmB.registerHandler(protocol, function (stream) {})
swarmB.registerHandler(protocol, function (stream) {}) swarmA.openStream(peerB, protocol, function (err, stream) {
expect(err).to.not.be.instanceof(Error)
})
swarmA.openStream(peerB, protocol, function (err, stream) { identifyB.on('peer-update', function (answer) {
expect(err).to.not.be.instanceof(Error) console.log('SUCH PEER-UPDATE')
}) done()
})
identifyB.on('peer-update', function (answer) { identifyA.on('peer-update', function (answer) {})
done() }, 500)
})
identifyA.on('peer-update', function (answer) {})
}) })
/*
test('Attach Identify, open a stream, reuse stream', function (done) { test('Attach Identify, open a stream, reuse stream', function (done) {
var protocol = '/sparkles/3.3.3' var protocol = '/sparkles/3.3.3'
@ -104,13 +113,13 @@ experiment('IDENTIFY', function () {
swarmA.registerHandler(protocol, function (stream) {}) swarmA.registerHandler(protocol, function (stream) {})
swarmB.registerHandler(protocol, function (stream) {}) swarmB.registerHandler(protocol, function (stream) {})
swarmA.openStream(peerB, protocol, function (err, stream) { swarmA.openStream(peerB, protocol, function theOTHER (err, stream) {
expect(err).to.not.be.instanceof(Error) expect(err).to.not.be.instanceof(Error)
}) })
identifyB.on('peer-update', function (answer) { identifyB.on('peer-update', function (answer) {
expect(Object.keys(swarmB.connections).length).to.equal(1) expect(Object.keys(swarmB.connections).length).to.equal(1)
swarmB.openStream(peerA, protocol, function (err, stream) { swarmB.openStream(peerA, protocol, function theCALLBACK (err, stream) {
expect(err).to.not.be.instanceof(Error) expect(err).to.not.be.instanceof(Error)
expect(Object.keys(swarmB.connections).length).to.equal(1) expect(Object.keys(swarmB.connections).length).to.equal(1)
done() done()
@ -118,6 +127,7 @@ experiment('IDENTIFY', function () {
}) })
identifyA.on('peer-update', function (answer) {}) identifyA.on('peer-update', function (answer) {})
}) })
*/
}) })
experiment('HARDNESS', function () {}) experiment('HARDNESS', function () {})