identify + test refactor and reorg

This commit is contained in:
David Dias
2015-07-09 13:53:03 -07:00
parent 9c062ffeeb
commit 572c7e4cfa
5 changed files with 205 additions and 104 deletions

View File

@ -1,19 +1,23 @@
var swarm = require('./../src').singleton var Identify = require('./../src/identify')
var Swarm = require('./../src')
var Peer = require('ipfs-peer') var Peer = require('ipfs-peer')
var Id = require('ipfs-peer-id') var Id = require('ipfs-peer-id')
var multiaddr = require('multiaddr') var multiaddr = require('multiaddr')
// create Id var a = new Swarm()
// create multiaddr a.port = 4000
// create Peer // a.listen()
// openStream var peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)])
var peerId = Id.create() // attention, peerB Id isn't going to match, but whateves
var mhs = [] var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/4001')])
mhs.push(multiaddr('/ip4/127.0.0.1/tcp/4001'))
var p = new Peer(peerId, mhs)
swarm.openStream(p, '/ipfs/sparkles/1.2.3', function (err, stream) { var i = new Identify(a, peerA)
i.on('thenews', function (news) {
console.log('such news')
})
a.openStream(peerB, '/ipfs/sparkles/1.2.3', function (err, stream) {
if (err) { if (err) {
return console.log('ERR - ', err) return console.log('ERR - ', err)
} }

View File

@ -1,7 +1,20 @@
var swarm = require('./../src').singleton var Identify = require('./../src/identify')
var Swarm = require('./../src')
var Peer = require('ipfs-peer')
var Id = require('ipfs-peer-id')
var multiaddr = require('multiaddr')
swarm.listen() var b = new Swarm()
b.port = 4001
var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + b.port)])
swarm.registerHandle('/ipfs/sparkles/1.2.3', function (stream) { var i = new Identify(b, peerB)
i.on('thenews', function (news) {
console.log('such news')
})
b.listen()
b.registerHandle('/ipfs/sparkles/1.2.3', function (stream) {
console.log('woop got a stream') console.log('woop got a stream')
}) })

View File

@ -3,41 +3,93 @@
* pairs a specific peer is available through * pairs a specific peer is available through
*/ */
var swarm = require('./').singleton
var Interactive = require('multistream-select').Interactive var Interactive = require('multistream-select').Interactive
var EventEmmiter = require('events').EventEmitter
var util = require('util')
exports = module.exports exports = module.exports = Identify
// peer acting as server, asking whom is talking util.inherits(Identify, EventEmmiter)
exports.inquiry = function (spdyConnection, cb) {
spdyConnection.request({method: 'GET', path: '/', headers: {}}, function (stream) {
var msi = new Interactive()
msi.handle(stream)
msi.select('/ipfs/identify/1.0.0', function (ds) {
var peerId = ''
ds.setEncoding('utf8')
ds.on('data', function (chunk) { function Identify (swarm, peerSelf) {
peerId += chunk var self = this
})
ds.on('end', function () { swarm.registerHandle('/ipfs/identify/1.0.0', function (stream) {
cb(null, spdyConnection, peerId) var identifyMsg = {}
identifyMsg = {}
identifyMsg.sender = exportPeer(peerSelf)
// TODO (daviddias) populate with the way I see the other peer
// identifyMsg.receiver =
stream.write(JSON.stringify(identifyMsg))
var answer = ''
stream.on('data', function (chunk) {
answer += chunk.toString()
})
stream.on('end', function () {
console.log(JSON.prse(answer))
self.emit('thenews', answer)
})
stream.end()
// receive their info and how they see us
// send back our stuff
})
swarm.on('connection', function (spdyConnection) {
spdyConnection.request({
path: '/',
method: 'GET'
}, function (err, stream) {
if (err) {
return console.log(err)
}
var msi = new Interactive()
msi.handle(stream, function () {
msi.select('/ipfs/identify/1.0.0', function (err, ds) {
if (err) {
return console.log('err')
}
var identifyMsg = {}
identifyMsg = {}
identifyMsg.sender = exportPeer(peerSelf)
// TODO (daviddias) populate with the way I see the other peer
// identifyMsg.receiver =
stream.write(JSON.stringify(identifyMsg))
var answer = ''
stream.on('data', function (chunk) {
answer += chunk.toString()
})
stream.on('end', function () {
console.log(JSON.parse(answer))
// TODO (daviddias), push to the connections list on swarm that we have a new known connection
self.emit('thenews', answer)
})
stream.end()
})
}) })
}) })
// open a spdy stream
// do the multistream handshake
// send them our data
}) })
// 0. open a stream
// 1. negotiate /ipfs/identify/1.0.0
// 2. check other peerId
// 3. reply back with cb(null, connection, peerId)
}
// peer asking which pairs ip:port does the other peer see function exportPeer (peer) {
exports.whoAmI = function () {} return {
id: peer.id.toB58String(),
exports.start = function (peerSelf) { multiaddrs: peer.multiaddrs.map(function (mh) {
swarm.registerHandle('/ipfs/identify/1.0.0', function (ds) { return mh.toString()
ds.setDefaultEncoding('utf8') })
ds.write(peerSelf.toB58String()) }
ds.end() }
})
} }

View File

@ -2,12 +2,15 @@ var tcp = require('net')
var Select = require('multistream-select').Select var Select = require('multistream-select').Select
var Interactive = require('multistream-select').Interactive var Interactive = require('multistream-select').Interactive
var spdy = require('spdy-transport') var spdy = require('spdy-transport')
// var identify = require('./identify')
var log = require('ipfs-logger').group('swarm') var log = require('ipfs-logger').group('swarm')
var async = require('async') var async = require('async')
var EventEmitter = require('events').EventEmitter
var util = require('util')
exports = module.exports = Swarm exports = module.exports = Swarm
util.inherits(Swarm, EventEmitter)
function Swarm () { function Swarm () {
var self = this var self = this
@ -16,20 +19,26 @@ function Swarm () {
} }
self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001 self.port = parseInt(process.env.IPFS_SWARM_PORT, 10) || 4001
self.connections = {} self.connections = {}
self.handles = [] self.handles = []
// set the listener // set the listener
self.listen = function () { self.listen = function (port, ready) {
console.log('GOING TO LISTEN ON: ', self.port) if (!ready) {
tcp.createServer(function (socket) { ready = function noop () {}
console.log('GOT INCOMING CONNECTION') }
if (typeof port === 'function') {
ready = port
} else if (port) {
self.port = port
}
tcp.createServer(function (socket) {
var ms = new Select() var ms = new Select()
ms.handle(socket) ms.handle(socket)
ms.addHandler('/spdy/3.1.0', function (ds) { ms.addHandler('/spdy/3.1.0', function (ds) {
console.log('GOT SPDY HANDLER REQUEST')
log.info('Negotiated spdy with incoming socket') log.info('Negotiated spdy with incoming socket')
log.info('Buffer should be clean - ', ds.read()) log.info('Buffer should be clean - ', ds.read())
var spdyConnection = spdy.connection.create(ds, { var spdyConnection = spdy.connection.create(ds, {
@ -39,31 +48,19 @@ function Swarm () {
spdyConnection.start(3.1) spdyConnection.start(3.1)
self.emit('connection', spdyConnection)
// attach multistream handlers to incoming streams // attach multistream handlers to incoming streams
spdyConnection.on('stream', function (spdyStream) { spdyConnection.on('stream', function (spdyStream) {
registerHandles(spdyStream) registerHandles(spdyStream)
}) })
// learn about the other peer Identity
/* TODO(daviddias)
identify.inquiry(spdyConnection, function (err, spdyConnection, peerIdB58) {
if (err) {
return log.error(err)
}
if (self.connections[peerIdB58]) {
return log.error('New connection established with a peer(' + peerIdB58 + ') that we already had a connection')
}
spdyConnection.peerId = peerIdB58
self.connections[peerIdB58] = spdyConnection
})
*/
// close the connection when all the streams close // close the connection when all the streams close
spdyConnection.on('close', function () { spdyConnection.on('close', function () {
delete self.connections[spdyConnection.peerId] delete self.connections[spdyConnection.peerId]
}) })
}) })
}).listen(self.port) }).listen(self.port, ready)
} }
// interface // interface
@ -81,7 +78,6 @@ function Swarm () {
return callback() return callback()
} }
var socket = tcp.connect(multiaddr.toOptions(), function connected () { var socket = tcp.connect(multiaddr.toOptions(), function connected () {
console.log('CONNECTED TO: ', multiaddr.toString())
gotSocket(socket) gotSocket(socket)
}) })
@ -102,11 +98,9 @@ function Swarm () {
// do the spdy people dance (multistream-select into spdy) // do the spdy people dance (multistream-select into spdy)
function gotSocket (socket) { function gotSocket (socket) {
console.log('GOT SOCKET')
gotOne = true gotOne = true
var msi = new Interactive() var msi = new Interactive()
msi.handle(socket, function () { msi.handle(socket, function () {
console.log('GOING TO NEGOTIATE SPDY')
msi.select('/spdy/3.1.0', function (err, ds) { msi.select('/spdy/3.1.0', function (err, ds) {
if (err) { if (err) {
return console.log('err', err) return console.log('err', err)
@ -116,6 +110,7 @@ function Swarm () {
isServer: false isServer: false
}) })
spdyConnection.start(3.1) spdyConnection.start(3.1)
self.connections[peer.id.toB58String()] = spdyConnection self.connections[peer.id.toB58String()] = spdyConnection
// attach multistream handlers to incoming streams // attach multistream handlers to incoming streams
@ -166,6 +161,18 @@ function Swarm () {
log.info('Registered handler for protocol:', protocol) log.info('Registered handler for protocol:', protocol)
} }
self.close = function (cb) {
var keys = Object.keys(self.connections)
var number = keys.length
if (number === 0) { cb() }
var c = new Counter(number, cb)
keys.map(function (key) {
c.hit()
self.connections[key].end()
})
}
function registerHandles (spdyStream) { function registerHandles (spdyStream) {
log.info('Preparing stream to handle the registered protocols') log.info('Preparing stream to handle the registered protocols')
var msH = new Select() var msH = new Select()
@ -176,3 +183,15 @@ function Swarm () {
} }
} }
function Counter (target, callback) {
var c = 0
this.hit = count
function count () {
c += 1
if (c === target) {
callback()
}
}
}

View File

@ -13,59 +13,72 @@ var Id = require('ipfs-peer-id')
var Peer = require('ipfs-peer') var Peer = require('ipfs-peer')
var Swarm = require('../src/index.js') var Swarm = require('../src/index.js')
experiment(': ', function () { var swarmA
var a var swarmB
var b var peerA
var peerA var peerB
var peerB
beforeEach(function (done) { beforeEach(function (done) {
a = new Swarm() swarmA = new Swarm()
a.port = 4000 swarmB = new Swarm()
a.listen() var c = new Counter(2, done)
peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)])
b = new Swarm() swarmA.listen(4000, function () {
b.port = 4001 peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmA.port)])
b.listen() c.hit()
peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + b.port)])
setTimeout(done, 1000)
}) })
afterEach(function (done) { swarmB.listen(4001, function () {
// a.close() peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + swarmB.port)])
// b.close() c.hit()
done()
}) })
})
afterEach(function (done) {
var c = new Counter(2, done)
swarmA.close(function () {
c.hit()
})
swarmB.close(function () {
c.hit()
})
})
experiment('BASE', function () {
test('Open a stream', {timeout: false}, function (done) { test('Open a stream', {timeout: false}, function (done) {
var protocol = '/sparkles/3.3.3' var protocol = '/sparkles/3.3.3'
var c = new Counter(2, done) var c = new Counter(2, done)
b.registerHandle(protocol, function (stream) { swarmB.registerHandle(protocol, function (stream) {
console.log('bim') c.hit()
c.unamas()
}) })
a.openStream(peerB, protocol, function (err, stream) { swarmA.openStream(peerB, protocol, function (err, stream) {
console.log('pim')
expect(err).to.not.be.instanceof(Error) expect(err).to.not.be.instanceof(Error)
c.unamas() c.hit()
}) })
}) })
})
function Counter (target, callback) { experiment('IDENTIFY', function () {
var c = 0
this.unamas = count
function count () {
c += 1
if (c === target) {
callback()
}
}
}
}) })
experiment('HARDNESS', function () {
})
function Counter (target, callback) {
var c = 0
this.hit = count
function count () {
c += 1
if (c === target) {
callback()
}
}
}