mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-12 00:31:22 +00:00
dial a conn + test
This commit is contained in:
111
src/swarm.js
111
src/swarm.js
@ -1,4 +1,5 @@
|
||||
var multistream = require('multistream-select')
|
||||
var async = require('async')
|
||||
|
||||
exports = module.exports = Swarm
|
||||
|
||||
@ -27,6 +28,8 @@ function Swarm (peerInfo) {
|
||||
|
||||
self.protocols = {}
|
||||
|
||||
self.muxer
|
||||
|
||||
// public interface
|
||||
|
||||
self.addTransport = function (name, transport, options, dialOptions, listenOptions, callback) {
|
||||
@ -63,6 +66,112 @@ function Swarm (peerInfo) {
|
||||
|
||||
self.dial = function (peerInfo, options, protocol, callback) {
|
||||
// 1. check if we have transports we support
|
||||
// 2. check if we have a conn waiting
|
||||
// 3. check if we have a stream muxer available
|
||||
|
||||
if (typeof protocol === 'function') {
|
||||
callback = protocol
|
||||
protocol = undefined
|
||||
}
|
||||
|
||||
// check if a conn is waiting
|
||||
// if it is and protocol was selected, jump into multistreamHandshake
|
||||
// if it is and no protocol was selected, do nothing and call and empty callback
|
||||
|
||||
if (self.conns[peerInfo.id.toB58String()]) {
|
||||
if (protocol) {
|
||||
multistreamHandshake(self.conns[peerInfo.id.toB58String()])
|
||||
self.conns[peerInfo.id.toB58String()] = undefined
|
||||
return
|
||||
} else {
|
||||
return callback()
|
||||
}
|
||||
}
|
||||
|
||||
// check if a stream muxer for this peer is available
|
||||
if (self.muxedConns[peerInfo.id.toB58String()]) {
|
||||
return openMuxedStream()
|
||||
}
|
||||
|
||||
// Creating a new conn with this peer routine
|
||||
|
||||
// TODO - check if there is a preference in protocol to use on
|
||||
// options.protocol
|
||||
var supportedTransports = Object.keys(self.protocols)
|
||||
var multiaddrs = peerInfo.multiaddrs.filter(function (multiaddr) {
|
||||
return multiaddr.protoNames().some(function (proto) {
|
||||
return supportedTransports.indexOf(proto) >= 0
|
||||
})
|
||||
})
|
||||
|
||||
var conn
|
||||
|
||||
async.eachSeries(multiaddrs, function (multiaddr, next) {
|
||||
if (conn) {
|
||||
return next()
|
||||
}
|
||||
|
||||
var transportName = getTransportNameForMultiaddr(multiaddr)
|
||||
var transport = self.transports[transportName]
|
||||
var dialOptions = clone(transport.dialOptions)
|
||||
dialOptions.ready = connected
|
||||
|
||||
var connTry = transport.transport.dial(multiaddr, dialOptions)
|
||||
|
||||
connTry.once('error', function (err) {
|
||||
if (err) {
|
||||
return // TODO handle error
|
||||
}
|
||||
next() // try next multiaddr
|
||||
})
|
||||
|
||||
function connected () {
|
||||
conn = connTry
|
||||
next()
|
||||
}
|
||||
|
||||
function getTransportNameForMultiaddr (multiaddr) {
|
||||
// this works for all those ip + transport + port tripplets
|
||||
return multiaddr.protoNames()[1]
|
||||
}
|
||||
|
||||
function clone (obj) {
|
||||
var target = {}
|
||||
for (var i in obj) {
|
||||
if (obj.hasOwnProperty(i)) {
|
||||
target[i] = obj[i]
|
||||
}
|
||||
}
|
||||
return target
|
||||
}
|
||||
}, done)
|
||||
|
||||
function done () {
|
||||
// TODO apply upgrades
|
||||
// TODO apply stream muxer
|
||||
// if no protocol is selected, save it in the pool
|
||||
// if protocol is selected, multistream that protocol
|
||||
|
||||
if (protocol) {
|
||||
multistreamHandshake(conn)
|
||||
} else {
|
||||
self.conns[peerInfo.id.toB58String()] = conn
|
||||
callback()
|
||||
}
|
||||
}
|
||||
|
||||
function openMuxedStream () {
|
||||
// 1. create a new stream on this muxedConn and pass that to
|
||||
// multistreamHanshake
|
||||
}
|
||||
|
||||
function multistreamHandshake (conn) {
|
||||
var msS = new multistream.Select()
|
||||
msS.handle(conn)
|
||||
self.protocols.forEach(function (protocol) {
|
||||
msS.addHandler(protocol, self.protocols[protocol])
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
self.closeListener = function (transportName, callback) {
|
||||
@ -88,7 +197,7 @@ function Swarm (peerInfo) {
|
||||
function listen (conn) {
|
||||
console.log('Received new connection')
|
||||
// TODO apply upgrades
|
||||
// TODO then add StreamMuxer if available
|
||||
// TODO then add StreamMuxer if available (and point streams from muxer to userProtocolMuxer)
|
||||
|
||||
// if no stream muxer, then
|
||||
userProtocolMuxer(conn)
|
||||
|
@ -38,9 +38,32 @@ experiment('Without a Stream Muxer', function () {
|
||||
})
|
||||
|
||||
test('dial a conn', function (done) {
|
||||
done()
|
||||
var mh1 = multiaddr('/ip4/127.0.0.1/tcp/8010')
|
||||
var p1 = new Peer(Id.create(), [])
|
||||
var sw1 = new Swarm(p1)
|
||||
sw1.addTransport('tcp', tcp, { multiaddr: mh1 }, {}, {port: 8010}, ready)
|
||||
|
||||
var mh2 = multiaddr('/ip4/127.0.0.1/tcp/8020')
|
||||
var p2 = new Peer(Id.create(), [])
|
||||
var sw2 = new Swarm(p2)
|
||||
sw2.addTransport('tcp', tcp, { multiaddr: mh2 }, {}, {port: 8020}, ready)
|
||||
|
||||
var count = 0
|
||||
|
||||
function ready () {
|
||||
count++
|
||||
if (count < 2) {
|
||||
return
|
||||
}
|
||||
|
||||
sw1.dial(p2, {}, function () {
|
||||
expect(Object.keys(sw1.conns).length).to.equal(1)
|
||||
done()
|
||||
})
|
||||
}
|
||||
})
|
||||
test('dial a conn on a protocol', function (done) { done() })
|
||||
test('dial a protocol on a previous created conn', function (done) { done() })
|
||||
test('add an upgrade', function (done) { done() })
|
||||
test('dial a conn on top of a upgrade', function (done) { done() })
|
||||
test('dial a conn on a protocol on top of a upgrade', function (done) { done() })
|
||||
|
Reference in New Issue
Block a user