This commit is contained in:
David Dias 2016-03-10 20:28:58 +00:00
parent 366b6ef382
commit 990111980b
6 changed files with 89 additions and 182 deletions

View File

@ -1,25 +0,0 @@
// var Identify = require('./../src/identify')
var Swarm = require('./../src')
var Peer = require('ipfs-peer')
var Id = require('ipfs-peer-id')
var multiaddr = require('multiaddr')
var a = new Swarm()
a.port = 4000
// a.listen()
// var peerA = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/' + a.port)])
// attention, peerB Id isn't going to match, but whateves
var peerB = new Peer(Id.create(), [multiaddr('/ip4/127.0.0.1/tcp/4001')])
// var i = new Identify(a, peerA)
// i.on('thenews', function (news) {
// console.log('such news')
// })
a.openStream(peerB, '/ipfs/sparkles/1.2.3', function (err, stream) {
if (err) {
return console.log(err)
}
console.log('WoHoo, dialed a stream')
})

View File

@ -1,14 +0,0 @@
var Swarm = require('./../src')
var Peer = require('peer-info')
var Id = require('peer-id')
var multiaddr = require('multiaddr')
var tcp = require('libp2p-tcp')
var mh = multiaddr('/ip4/127.0.0.1/tcp/8010')
var p = new Peer(Id.create(), [])
var sw = new Swarm(p)
sw.addTransport('tcp', tcp, { multiaddr: mh }, {}, {port: 8010}, function () {
console.log('transport added')
})

View File

@ -33,11 +33,11 @@
"chai": "^3.5.0",
"istanbul": "^0.4.2",
"libp2p-spdy": "^0.2.3",
"libp2p-tcp": "^0.2.1",
"libp2p-tcp": "^0.3.0",
"mocha": "^2.4.5",
"multiaddr": "^1.1.1",
"peer-id": "^0.5.3",
"peer-info": "^0.5.2",
"peer-id": "^0.6.0",
"peer-info": "^0.6.0",
"pre-commit": "^1.1.2",
"standard": "^6.0.7",
"stream-pair": "^1.0.3"

View File

@ -1,166 +1,93 @@
/*
* Identify is one of the protocols swarms speaks in order to
* broadcast and learn about the ip:port pairs a specific peer
* is available through
* is available through and to know when a new stream muxer is
* established, so a conn can be reused
*/
// var multistream = require('multistream-select')
// var protobufs = require('protocol-buffers-stream')
// var fs = require('fs')
// var path = require('path')
// var protobufs = require('protocol-buffers-stream')
// var schema = fs.readFileSync(path.join(__dirname, 'identify.proto'))
// var Address6 = require('ip-address').Address6
// var Id = require('peer-id')
// var multiaddr = require('multiaddr')
const multistream = require('multistream-select')
const fs = require('fs')
const path = require('path')
const pbStream = require('protocol-buffers-stream')(
fs.readFileSync(path.join(__dirname, 'identify.proto')))
const Info = require('peer-info')
const Id = require('peer-id')
const multiaddr = require('multiaddr')
exports = module.exports
exports.multicodec = '/ipfs/identify/1.0.0'
exports.exec = (muxedConn, callback) => {
// TODO
exports.exec = (rawConn, muxer, peerInfo, callback) => {
// 1. open a stream
// 2. multistream into identify
// 3. send what I see from this other peer
// 3. send what I see from this other peer (extract fro conn)
// 4. receive what the other peer sees from me
// 4. callback with (err, peerInfo)
}
exports.handler = (peerInfo) => {
return function (conn) {
// TODO
// 1. receive incoming observed info about me
// 2. send back what I see from the other
}
}
const conn = muxer.newStream()
/*
function identify (muxedConns, peerInfoSelf, socket, conn, muxer) {
var msi = new Interactive()
msi.handle(conn, function () {
msi.select(protoId, function (err, ds) {
var msI = new multistream.Interactive()
msI.handle(conn, () => {
msI.select(exports.multicodec, (err, ds) => {
if (err) {
return console.log(err)
return callback(err)
}
var ps = createProtoStream()
var pbs = pbStream()
ps.on('identify', function (msg) {
var peerId = Id.createFromPubKey(msg.publicKey)
pbs.on('identify', (msg) => {
peerInfo.multiaddr.addSafe(msg.observedAddr)
updateSelf(peerInfoSelf, msg.observedAddr)
const peerId = Id.createFromPubKey(msg.publicKey)
const otherPeerInfo = new Info(peerId)
msg.listenAddrs.forEach((ma) => {
otherPeerInfo.multiaddr.add(multiaddr(ma))
})
muxedConns[peerId.toB58String()] = {
muxer: muxer,
socket: socket
}
var mh = getMultiaddr(socket)
ps.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerInfoSelf.id.pubKey,
listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) {
return mh.buffer
}),
observedAddr: mh.buffer
callback(null, otherPeerInfo)
})
ps.pipe(ds).pipe(ps)
ps.finalize()
const obsMultiaddr = rawConn.getObservedAddrs()[0]
pbs.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerInfo.id.pubKey,
listenAddrs: peerInfo.multiaddrs.map((mh) => { return mh.buffer }),
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : null
})
pbs.pipe(ds).pipe(pbs)
pbs.finalize()
})
})
}
exports.getHandlerFunction = function (peerInfoSelf, muxedConns) {
exports.handler = (peerInfo, swarm) => {
return function (conn) {
// wait for the other peer to identify itself
// update our multiaddr with observed addr list
// then get the socket from our list of muxedConns and send the reply back
// 1. receive incoming observed info about me
// 2. update my own information (on peerInfo)
// 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs()
var pbs = pbStream()
var ps = createProtoStream()
pbs.on('identify', function (msg) {
peerInfo.multiaddr.addSafe(msg.observedAddr)
ps.on('identify', function (msg) {
updateSelf(peerInfoSelf, msg.observedAddr)
const peerId = Id.createFromPubKey(msg.publicKey)
const conn = swarm.muxedConns[peerId.toB58String()].conn
const obsMultiaddr = conn.getObservedAddrs()[0]
var peerId = Id.createFromPubKey(msg.publicKey)
var socket = muxedConns[peerId.toB58String()].socket
var mh = getMultiaddr(socket)
ps.identify({
pbs.identify({
protocolVersion: 'na',
agentVersion: 'na',
publicKey: peerInfoSelf.id.pubKey,
listenAddrs: peerInfoSelf.multiaddrs.map(function (mh) {
return mh.buffer
publicKey: peerInfo.id.pubKey,
listenAddrs: peerInfo.multiaddrs.map(function (ma) {
return ma.buffer
}),
observedAddr: mh.buffer
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : null
})
// TODO: Pass the new discovered info about the peer that contacted us
// to something like the Kademlia Router, so the peerInfo for this peer
// is fresh
// - before this was exectued through a event emitter
// self.emit('peer-update', {
// peerId: peerId,
// listenAddrs: msg.listenAddrs.map(function (mhb) {
// return multiaddr(mhb)
// })
// })
ps.finalize()
pbs.finalize()
})
ps.pipe(conn).pipe(ps)
pbs.pipe(conn).pipe(pbs)
}
}
function getMultiaddr (socket) {
var mh
if (socket.remoteFamily === 'IPv6') {
var addr = new Address6(socket.remoteAddress)
if (addr.v4) {
var ip4 = addr.to4().correctForm()
mh = multiaddr('/ip4/' + ip4 + '/tcp/' + socket.remotePort)
} else {
mh = multiaddr('/ip6/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
}
} else {
mh = multiaddr('/ip4/' + socket.remoteAddress + '/tcp/' + socket.remotePort)
}
return mh
}
function updateSelf (peerSelf, observedAddr) {
var omh = multiaddr(observedAddr)
if (!peerSelf.previousObservedAddrs) {
peerSelf.previousObservedAddrs = []
}
for (var i = 0; i < peerSelf.previousObservedAddrs.length; i++) {
if (peerSelf.previousObservedAddrs[i].toString() === omh.toString()) {
peerSelf.previousObservedAddrs.splice(i, 1)
addToSelf()
return
}
}
peerSelf.previousObservedAddrs.push(omh)
function addToSelf () {
var isIn = false
peerSelf.multiaddrs.forEach(function (mh) {
if (mh.toString() === omh.toString()) {
isIn = true
}
})
if (!isIn) {
peerSelf.multiaddrs.push(omh)
}
}
}*/

View File

@ -42,7 +42,7 @@ function Swarm (peerInfo) {
multiaddrs = [multiaddrs]
}
// TODO a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself)
// TODO a) filter the multiaddrs that are actually valid for this transport (use a func from the transport itself) (maybe even make the transport do that)
// b) if multiaddrs.length = 1, return the conn from the
// transport, otherwise, create a passthrough
@ -116,7 +116,7 @@ function Swarm (peerInfo) {
// {
// peerIdB58: {
// muxer: <muxer>
// rawSocket: socket // to abstract info required for the Identify Protocol
// conn: <transport socket> // to extract info required for the Identify Protocol
// }
// }
this.muxedConns = {}
@ -136,12 +136,19 @@ function Swarm (peerInfo) {
// for listening
this.handle(muxer.multicodec, (conn) => {
const muxedConn = muxer(conn, true)
muxedConn.on('stream', connHandler)
muxedConn.on('stream', (conn) => {
connHandler(conn)
})
// if identify is enabled, attempt to do it for muxer reuse
if (this.identify) {
identify.exec(muxedConn, (err, pi) => {
if (err) {}
// TODO muxedConns[pi.id.toB58String()].muxer = muxedConn
identify.exec(conn, muxedConn, peerInfo, (err, pi) => {
if (err) {
return console.log('Identify exec failed', err)
}
this.muxedConns[pi.id.toB58String()] = {}
this.muxedConns[pi.id.toB58String()].muxer = muxedConn
this.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs
})
}
})
@ -151,18 +158,19 @@ function Swarm (peerInfo) {
this.identify = false
this.connection.reuse = () => {
this.identify = true
this.handle(identify.multicodec, identify.handler(peerInfo))
this.handle(identify.multicodec, identify.handler(peerInfo, this))
}
const self = this // couldn't get rid of this
const self = this // prefered this to bind
// incomming connection handler
function connHandler (conn) {
var msS = new multistream.Select()
msS.handle(conn)
Object.keys(self.protocols).forEach(function (protocol) {
Object.keys(self.protocols).forEach((protocol) => {
if (!protocol) { return }
msS.addHandler(protocol, self.protocols[protocol])
})
msS.handle(conn)
}
// higher level (public) API
@ -258,11 +266,17 @@ function Swarm (peerInfo) {
} else {
nextMuxer(muxers.shift())
}
return
}
const muxedConn = self.muxers[key](conn, false)
self.muxedConns[b58Id] = {}
self.muxedConns[b58Id].muxer = muxedConn
self.muxedConns[b58Id].conn = conn
// in case identify is on
muxedConn.on('stream', connHandler)
cb(null, muxedConn)
})
})

View File

@ -303,6 +303,10 @@ describe('stream muxing (on TCP)', function () {
peerB = new Peer()
peerC = new Peer()
// console.log('peer A', peerA.id.toB58String())
// console.log('peer B', peerB.id.toB58String())
// console.log('peer C', peerC.id.toB58String())
peerA.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9001'))
peerB.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9002'))
peerC.multiaddr.add(multiaddr('/ip4/127.0.0.1/tcp/9003'))
@ -386,7 +390,7 @@ describe('stream muxing (on TCP)', function () {
})
})
it.skip('enable identify to reuse incomming muxed conn', (done) => {
it('enable identify to reuse incomming muxed conn', (done) => {
swarmA.connection.reuse()
swarmC.connection.reuse()
@ -395,7 +399,8 @@ describe('stream muxing (on TCP)', function () {
setTimeout(() => {
expect(Object.keys(swarmC.muxedConns).length).to.equal(1)
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
}, 100)
done()
}, 500)
})
})
})