mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-07-24 04:51:56 +00:00
new identify
This commit is contained in:
@@ -4,16 +4,16 @@ libp2p-swarm JavaScript implementation
|
||||
[](http://ipn.io)
|
||||
[](http://ipfs.io/)
|
||||
[](http://webchat.freenode.net/?channels=%23ipfs)
|
||||
[](https://travis-ci.org/diasdavid/js-libp2p-swarm)
|
||||
[](https://coveralls.io/github/diasdavid/js-libp2p-swarm?branch=master)
|
||||
[](https://david-dm.org/diasdavid/js-libp2p-swarm)
|
||||
[](https://travis-ci.org/libp2p/js-libp2p-swarm)
|
||||
[](https://coveralls.io/github/libp2p/js-libp2p-swarm?branch=master)
|
||||
[](https://david-dm.org/libp2p/js-libp2p-swarm)
|
||||
[](https://github.com/feross/standard)
|
||||
|
||||
> libp2p swarm implementation in JavaScript.
|
||||
|
||||
libp2p-swarm is a connection abstraction that is able to leverage several transports and connection upgrades, such as congestion control, channel encryption, the multiplexing of several streams in one connection, and more. It does this by bringing protocol multiplexing to the application level (instead of the traditional Port level) using multicodec and multistream.
|
||||
|
||||
libp2p-swarm is used by [libp2p](https://github.com/diasdavid/js-libp2p) but it can be also used as a standalone module.
|
||||
libp2p-swarm is used by [libp2p](https://github.com/libp2p/js-libp2p) but it can be also used as a standalone module.
|
||||
|
||||
## Table of Contents
|
||||
|
||||
|
@@ -43,7 +43,7 @@
|
||||
"gulp": "^3.9.1",
|
||||
"istanbul": "^0.4.3",
|
||||
"libp2p-multiplex": "^0.2.1",
|
||||
"libp2p-spdy": "^0.6.3",
|
||||
"libp2p-spdy": "^0.7.0",
|
||||
"libp2p-tcp": "^0.7.1",
|
||||
"libp2p-webrtc-star": "^0.3.1",
|
||||
"libp2p-websockets": "^0.7.0",
|
||||
@@ -56,9 +56,10 @@
|
||||
"bl": "^1.1.2",
|
||||
"browserify-zlib": "github:ipfs/browserify-zlib",
|
||||
"duplexify": "^3.4.3",
|
||||
"interface-connection": "^0.1.3",
|
||||
"interface-connection": "^0.1.7",
|
||||
"ip-address": "^5.8.0",
|
||||
"length-prefixed-stream": "^1.5.0",
|
||||
"libp2p-identify": "^0.1.1",
|
||||
"lodash.contains": "^2.4.3",
|
||||
"multiaddr": "^2.0.0",
|
||||
"multistream-select": "^0.9.0",
|
||||
@@ -75,4 +76,4 @@
|
||||
"Richard Littauer <richard.littauer@gmail.com>",
|
||||
"dignifiedquire <dignifiedquire@gmail.com>"
|
||||
]
|
||||
}
|
||||
}
|
||||
|
@@ -1,7 +1,8 @@
|
||||
'use strict'
|
||||
|
||||
const protocolMuxer = require('./protocol-muxer')
|
||||
const identify = require('./identify')
|
||||
const identify = require('libp2p-identify')
|
||||
const multistream = require('multistream-select')
|
||||
|
||||
module.exports = function connection (swarm) {
|
||||
return {
|
||||
@@ -15,43 +16,50 @@ module.exports = function connection (swarm) {
|
||||
swarm.handle(muxer.multicodec, (conn) => {
|
||||
const muxedConn = muxer(conn, true)
|
||||
|
||||
var peerIdForConn
|
||||
|
||||
muxedConn.on('stream', (conn) => {
|
||||
function gotId () {
|
||||
if (peerIdForConn) {
|
||||
conn.peerId = peerIdForConn
|
||||
protocolMuxer(swarm.protocols, conn)
|
||||
} else {
|
||||
setTimeout(gotId, 100)
|
||||
}
|
||||
}
|
||||
|
||||
// If identify happened, when we have the Id of the conn
|
||||
if (swarm.identify) {
|
||||
return gotId()
|
||||
}
|
||||
|
||||
protocolMuxer(swarm.protocols, conn)
|
||||
})
|
||||
|
||||
// if identify is enabled, attempt to do it for muxer reuse
|
||||
// If identify is enabled
|
||||
// 1. overload getPeerInfo
|
||||
// 2. call getPeerInfo
|
||||
// 3. add this conn to the pool
|
||||
if (swarm.identify) {
|
||||
identify.exec(conn, muxedConn, swarm._peerInfo, (err, pi) => {
|
||||
// overload peerInfo to use Identify instead
|
||||
conn.getPeerInfo = (cb) => {
|
||||
const conn = muxedConn.newStream()
|
||||
const ms = new multistream.Dialer()
|
||||
ms.handle(conn, (err) => {
|
||||
if (err) { return cb(err) }
|
||||
|
||||
ms.select(identify.multicodec, (err, conn) => {
|
||||
if (err) { return cb(err) }
|
||||
|
||||
identify.exec(conn, (err, peerInfo, observedAddrs) => {
|
||||
if (err) { return cb(err) }
|
||||
|
||||
observedAddrs.forEach((oa) => {
|
||||
swarm._peerInfo.multiaddr.addSafe(oa)
|
||||
})
|
||||
|
||||
cb(null, peerInfo)
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
conn.getPeerInfo((err, peerInfo) => {
|
||||
if (err) {
|
||||
return console.log('Identify exec failed', err)
|
||||
return console.log('Identify not successful')
|
||||
}
|
||||
swarm.muxedConns[peerInfo.id.toB58String()] = {
|
||||
muxer: muxedConn
|
||||
}
|
||||
|
||||
peerIdForConn = pi.id
|
||||
swarm.muxedConns[pi.id.toB58String()] = {}
|
||||
swarm.muxedConns[pi.id.toB58String()].muxer = muxedConn
|
||||
swarm.muxedConns[pi.id.toB58String()].conn = conn // to be able to extract addrs
|
||||
|
||||
swarm.emit('peer-mux-established', pi)
|
||||
|
||||
swarm.emit('peer-mux-established', peerInfo)
|
||||
muxedConn.on('close', () => {
|
||||
delete swarm.muxedConns[pi.id.toB58String()]
|
||||
swarm.emit('peer-mux-closed', pi)
|
||||
delete swarm.muxedConns[peerInfo.id.toB58String()]
|
||||
swarm.emit('peer-mux-closed', peerInfo)
|
||||
})
|
||||
})
|
||||
}
|
||||
@@ -60,7 +68,7 @@ module.exports = function connection (swarm) {
|
||||
|
||||
reuse () {
|
||||
swarm.identify = true
|
||||
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo, swarm))
|
||||
swarm.handle(identify.multicodec, identify.handler(swarm._peerInfo))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
17
src/dial.js
17
src/dial.js
@@ -43,6 +43,8 @@ module.exports = function dial (swarm) {
|
||||
return proxyConn
|
||||
|
||||
function gotWarmedUpConn (conn) {
|
||||
conn.setPeerInfo(pi)
|
||||
|
||||
attemptMuxerUpgrade(conn, (err, muxer) => {
|
||||
if (!protocol) {
|
||||
if (err) {
|
||||
@@ -61,6 +63,13 @@ module.exports = function dial (swarm) {
|
||||
}
|
||||
|
||||
function gotMuxer (muxer) {
|
||||
if (swarm.identify) {
|
||||
// TODO: Consider:
|
||||
// 1. overload getPeerInfo
|
||||
// 2. exec identify (through getPeerInfo)
|
||||
// 3. update the peerInfo that is already stored in the conn
|
||||
}
|
||||
|
||||
openConnInMuxedConn(muxer, (conn) => {
|
||||
protocolHandshake(conn, protocol, callback)
|
||||
})
|
||||
@@ -88,7 +97,7 @@ module.exports = function dial (swarm) {
|
||||
cryptoDial()
|
||||
|
||||
function cryptoDial () {
|
||||
// currently, js-libp2p-swarm doesn't implement any crypto
|
||||
// currently, no crypto channel is implemented
|
||||
const ms = new multistream.Dialer()
|
||||
ms.handle(conn, (err) => {
|
||||
if (err) {
|
||||
@@ -133,7 +142,7 @@ module.exports = function dial (swarm) {
|
||||
const muxedConn = swarm.muxers[key](conn, false)
|
||||
swarm.muxedConns[b58Id] = {}
|
||||
swarm.muxedConns[b58Id].muxer = muxedConn
|
||||
swarm.muxedConns[b58Id].conn = conn
|
||||
// should not be needed anymore - swarm.muxedConns[b58Id].conn = conn
|
||||
|
||||
swarm.emit('peer-mux-established', pi)
|
||||
|
||||
@@ -142,9 +151,8 @@ module.exports = function dial (swarm) {
|
||||
swarm.emit('peer-mux-closed', pi)
|
||||
})
|
||||
|
||||
// in case identify is on
|
||||
// For incoming streams, in case identify is on
|
||||
muxedConn.on('stream', (conn) => {
|
||||
conn.peerId = pi.id
|
||||
protocolMuxer(swarm.protocols, conn)
|
||||
})
|
||||
|
||||
@@ -169,7 +177,6 @@ module.exports = function dial (swarm) {
|
||||
return callback(err)
|
||||
}
|
||||
proxyConn.setInnerConn(conn)
|
||||
proxyConn.peerId = pi.id
|
||||
callback(null, proxyConn)
|
||||
})
|
||||
})
|
||||
|
145
src/identify.js
145
src/identify.js
@@ -1,145 +0,0 @@
|
||||
/*
|
||||
* Identify is one of the protocols swarms speaks in order to
|
||||
* broadcast and learn about the ip:port pairs a specific peer
|
||||
* is available through and to know when a new stream muxer is
|
||||
* established, so a conn can be reused
|
||||
*/
|
||||
|
||||
'use strict'
|
||||
|
||||
const multistream = require('multistream-select')
|
||||
const fs = require('fs')
|
||||
const path = require('path')
|
||||
const PeerInfo = require('peer-info')
|
||||
const PeerId = require('peer-id')
|
||||
const multiaddr = require('multiaddr')
|
||||
const bl = require('bl')
|
||||
|
||||
const lpstream = require('length-prefixed-stream')
|
||||
const protobuf = require('protocol-buffers')
|
||||
const schema = fs.readFileSync(path.join(__dirname, 'identify.proto'))
|
||||
const idPb = protobuf(schema)
|
||||
|
||||
exports = module.exports
|
||||
exports.multicodec = '/ipfs/id/1.0.0'
|
||||
|
||||
exports.exec = (rawConn, muxer, pInfo, callback) => {
|
||||
// 1. open a stream
|
||||
// 2. multistream into identify
|
||||
// 3. send what I see from this other peer (extract fro conn)
|
||||
// 4. receive what the other peer sees from me
|
||||
// 4. callback with (err, peerInfo)
|
||||
|
||||
const conn = muxer.newStream()
|
||||
|
||||
const ms = new multistream.Dialer()
|
||||
ms.handle(conn, (err) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
ms.select(exports.multicodec, (err, conn) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
|
||||
const encode = lpstream.encode()
|
||||
const decode = lpstream.decode()
|
||||
|
||||
encode
|
||||
.pipe(conn)
|
||||
.pipe(decode)
|
||||
.pipe(bl((err, data) => {
|
||||
if (err) {
|
||||
return callback(err)
|
||||
}
|
||||
const msg = idPb.Identify.decode(data)
|
||||
if (hasObservedAddr(msg)) {
|
||||
pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
|
||||
}
|
||||
|
||||
const pId = PeerId.createFromPubKey(msg.publicKey)
|
||||
const otherPInfo = new PeerInfo(pId)
|
||||
msg.listenAddrs.forEach((ma) => {
|
||||
otherPInfo.multiaddr.add(multiaddr(ma))
|
||||
})
|
||||
callback(null, otherPInfo)
|
||||
}))
|
||||
|
||||
rawConn.getObservedAddrs((err, addrs) => {
|
||||
if (err) {
|
||||
return
|
||||
}
|
||||
const obsMultiaddr = addrs[0]
|
||||
|
||||
let publicKey = new Buffer(0)
|
||||
if (pInfo.id.pubKey) {
|
||||
publicKey = pInfo.id.pubKey.bytes
|
||||
}
|
||||
|
||||
const msg = idPb.Identify.encode({
|
||||
protocolVersion: 'na',
|
||||
agentVersion: 'na',
|
||||
publicKey: publicKey,
|
||||
listenAddrs: pInfo.multiaddrs.map((mh) => mh.buffer),
|
||||
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
|
||||
})
|
||||
|
||||
encode.write(msg)
|
||||
encode.end()
|
||||
})
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
exports.handler = (pInfo, swarm) => {
|
||||
return (conn) => {
|
||||
// 1. receive incoming observed info about me
|
||||
// 2. update my own information (on peerInfo)
|
||||
// 3. send back what I see from the other (get from swarm.muxedConns[incPeerID].conn.getObservedAddrs()
|
||||
|
||||
const encode = lpstream.encode()
|
||||
const decode = lpstream.decode()
|
||||
|
||||
encode
|
||||
.pipe(conn)
|
||||
.pipe(decode)
|
||||
.pipe(bl((err, data) => {
|
||||
if (err) {
|
||||
console.log(new Error('Failed to decode lpm from identify'))
|
||||
return
|
||||
}
|
||||
const msg = idPb.Identify.decode(data)
|
||||
if (hasObservedAddr(msg)) {
|
||||
pInfo.multiaddr.addSafe(multiaddr(msg.observedAddr))
|
||||
}
|
||||
|
||||
const pId = PeerId.createFromPubKey(msg.publicKey)
|
||||
const conn = swarm.muxedConns[pId.toB58String()].conn
|
||||
conn.getObservedAddrs((err, addrs) => {
|
||||
if (err) {}
|
||||
const obsMultiaddr = addrs[0]
|
||||
|
||||
let publicKey = new Buffer(0)
|
||||
if (pInfo.id.pubKey) {
|
||||
publicKey = pInfo.id.pubKey.bytes
|
||||
}
|
||||
|
||||
const msgSend = idPb.Identify.encode({
|
||||
protocolVersion: 'na',
|
||||
agentVersion: 'na',
|
||||
publicKey: publicKey,
|
||||
listenAddrs: pInfo.multiaddrs.map((ma) => ma.buffer),
|
||||
observedAddr: obsMultiaddr ? obsMultiaddr.buffer : new Buffer('')
|
||||
})
|
||||
|
||||
encode.write(msgSend)
|
||||
encode.end()
|
||||
})
|
||||
}))
|
||||
}
|
||||
}
|
||||
|
||||
function hasObservedAddr (msg) {
|
||||
return msg.observedAddr && msg.observedAddr.length > 0
|
||||
}
|
@@ -1,25 +0,0 @@
|
||||
message Identify {
|
||||
|
||||
// protocolVersion determines compatibility between peers
|
||||
optional string protocolVersion = 5; // e.g. ipfs/1.0.0
|
||||
|
||||
// agentVersion is like a UserAgent string in browsers, or client version in bittorrent
|
||||
// includes the client name and client.
|
||||
optional string agentVersion = 6; // e.g. go-ipfs/0.1.0
|
||||
|
||||
// publicKey is this node's public key (which also gives its node.ID)
|
||||
// - may not need to be sent, as secure channel implies it has been sent.
|
||||
// - then again, if we change / disable secure channel, may still want it.
|
||||
optional bytes publicKey = 1;
|
||||
|
||||
// listenAddrs are the multiaddrs the sender node listens for open connections on
|
||||
repeated bytes listenAddrs = 2;
|
||||
|
||||
// oservedAddr is the multiaddr of the remote endpoint that the sender node perceives
|
||||
// this is useful information to convey to the other side, as it helps the remote endpoint
|
||||
// determine whether its connection to the local peer goes through NAT.
|
||||
optional bytes observedAddr = 4;
|
||||
|
||||
// (DEPRECATED) protocols are the services this node is running
|
||||
// repeated string protocols = 3;
|
||||
}
|
@@ -10,7 +10,8 @@ const Swarm = require('../src')
|
||||
const TCP = require('libp2p-tcp')
|
||||
const multiplex = require('libp2p-spdy')
|
||||
|
||||
describe('stream muxing with multiplex (on TCP)', function () {
|
||||
// TODO multiplex needs to be upgraded, like spdy, to work again
|
||||
describe.skip('stream muxing with multiplex (on TCP)', function () {
|
||||
this.timeout(60 * 1000)
|
||||
|
||||
var swarmA
|
||||
|
@@ -127,7 +127,36 @@ describe('stream muxing with spdy (on TCP)', function () {
|
||||
})
|
||||
})
|
||||
|
||||
it('make sure it does not blow up when the socket is closed', (done) => {
|
||||
it('with Identify, do getPeerInfo', (done) => {
|
||||
swarmA.handle('/banana/1.0.0', (conn) => {
|
||||
conn.getPeerInfo((err, peerInfoC) => {
|
||||
expect(err).to.not.exist
|
||||
expect(peerInfoC.id.toB58String()).to.equal(peerC.id.toB58String())
|
||||
})
|
||||
|
||||
conn.pipe(conn)
|
||||
})
|
||||
|
||||
swarmC.dial(peerA, '/banana/1.0.0', (err, conn) => {
|
||||
expect(err).to.not.exist
|
||||
setTimeout(() => {
|
||||
expect(Object.keys(swarmC.muxedConns).length).to.equal(1)
|
||||
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
|
||||
conn.getPeerInfo((err, peerInfoA) => {
|
||||
expect(err).to.not.exist
|
||||
expect(peerInfoA.id.toB58String()).to.equal(peerA.id.toB58String())
|
||||
conn.on('end', done)
|
||||
conn.resume()
|
||||
conn.end()
|
||||
})
|
||||
}, 500)
|
||||
})
|
||||
})
|
||||
|
||||
// This test is not possible as the raw conn is not exposed anymore
|
||||
// TODO: create a similar version, but that spawns a swarm in a
|
||||
// different proc
|
||||
it.skip('make sure it does not blow up when the socket is closed', (done) => {
|
||||
swarmD.connection.reuse()
|
||||
|
||||
let count = 0
|
||||
@@ -148,7 +177,10 @@ describe('stream muxing with spdy (on TCP)', function () {
|
||||
})
|
||||
})
|
||||
|
||||
it('blow up a socket, with WebSockets', (done) => {
|
||||
// This test is not possible as the raw conn is not exposed anymore
|
||||
// TODO: create a similar version, but that spawns a swarm in a
|
||||
// different proc
|
||||
it.skip('blow up a socket, with WebSockets', (done) => {
|
||||
var swarmE
|
||||
var peerE
|
||||
var swarmF
|
||||
|
@@ -198,13 +198,19 @@ describe('high level API - with everything mixed all together!', function () {
|
||||
|
||||
it('dial from tcp+ws to tcp+ws', (done) => {
|
||||
swarmC.handle('/mamao/1.0.0', (conn) => {
|
||||
expect(conn.peerId).to.exist
|
||||
conn.getPeerInfo((err, peerInfo) => {
|
||||
expect(err).to.not.exist
|
||||
expect(peerInfo).to.exist
|
||||
})
|
||||
conn.pipe(conn)
|
||||
})
|
||||
|
||||
swarmA.dial(peerC, '/mamao/1.0.0', (err, conn) => {
|
||||
expect(err).to.not.exist
|
||||
expect(conn.peerId).to.exist
|
||||
conn.getPeerInfo((err, peerInfo) => {
|
||||
expect(err).to.not.exist
|
||||
expect(peerInfo).to.exist
|
||||
})
|
||||
expect(Object.keys(swarmA.muxedConns).length).to.equal(2)
|
||||
conn.end()
|
||||
|
||||
|
Reference in New Issue
Block a user