feat: registrar (#471)

* feat: peer-store v0

* feat: registrar

* chore: apply suggestions from code review

Co-Authored-By: Jacob Heun <jacobheun@gmail.com>

* chore: address review

* chore: support multiple conns

* chore: address review

* fix: no remote peer from topology on disconnect
This commit is contained in:
Vasco Santos 2019-11-06 15:47:11 +01:00 committed by Jacob Heun
parent 582094a834
commit 9d52b80c45
8 changed files with 594 additions and 8 deletions

View File

@ -0,0 +1,108 @@
'use strict'
const assert = require('assert')
class Topology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Array<string>} props.multicodecs protocol multicodecs
* @param {Object} props.handlers
* @param {function} props.handlers.onConnect protocol "onConnect" handler
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
* @constructor
*/
constructor ({
min = 0,
max = Infinity,
multicodecs,
handlers
}) {
assert(multicodecs, 'one or more multicodec should be provided')
assert(handlers, 'the handlers should be provided')
assert(handlers.onConnect && typeof handlers.onConnect === 'function',
'the \'onConnect\' handler must be provided')
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function',
'the \'onDisconnect\' handler must be provided')
this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs]
this.min = min
this.max = max
// Handlers
this._onConnect = handlers.onConnect
this._onDisconnect = handlers.onDisconnect
this.peers = new Map()
this._registrar = undefined
this._onProtocolChange = this._onProtocolChange.bind(this)
}
set registrar (registrar) {
this._registrar = registrar
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)
// Update topology peers
this._updatePeers(this._registrar.peerStore.peers.values())
}
/**
* Update topology.
* @param {Array<PeerInfo>} peerInfoIterable
* @returns {void}
*/
_updatePeers (peerInfoIterable) {
for (const peerInfo of peerInfoIterable) {
if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) {
// Add the peer regardless of whether or not there is currently a connection
this.peers.set(peerInfo.id.toB58String(), peerInfo)
// If there is a connection, call _onConnect
const connection = this._registrar.getConnection(peerInfo)
connection && this._onConnect(peerInfo, connection)
} else {
// Remove any peers we might be tracking that are no longer of value to us
this.peers.delete(peerInfo.id.toB58String())
}
}
}
/**
* Notify protocol of peer disconnected.
* @param {PeerInfo} peerInfo
* @param {Error} [error]
* @returns {void}
*/
disconnect (peerInfo, error) {
this._onDisconnect(peerInfo, error)
}
/**
* Check if a new peer support the multicodecs for this topology.
* @param {Object} props
* @param {PeerInfo} props.peerInfo
* @param {Array<string>} props.protocols
*/
_onProtocolChange ({ peerInfo, protocols }) {
const existingPeer = this.peers.get(peerInfo.id.toB58String())
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol))
// Not supporting the protocol anymore?
if (existingPeer && hasProtocol.length === 0) {
this._onDisconnect({
peerInfo
})
}
// New to protocol support
for (const protocol of protocols) {
if (this.multicodecs.includes(protocol)) {
this._updatePeers([peerInfo])
return
}
}
}
}
module.exports = Topology

View File

@ -7,14 +7,14 @@ const errCode = require('err-code')
/**
* Converts the given `peer` to a `PeerInfo` instance.
* The `PeerBook` will be checked for the resulting peer, and
* the peer will be updated in the `PeerBook`.
* The `PeerStore` will be checked for the resulting peer, and
* the peer will be updated in the `PeerStore`.
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer
* @param {PeerBook} peerBook
* @param {PeerStore} peerStore
* @returns {PeerInfo}
*/
function getPeerInfo (peer, peerBook) {
function getPeerInfo (peer, peerStore) {
if (typeof peer === 'string') {
peer = multiaddr(peer)
}
@ -38,7 +38,7 @@ function getPeerInfo (peer, peerBook) {
addr && peer.multiaddrs.add(addr)
return peerBook ? peerBook.put(peer) : peer
return peerStore ? peerStore.put(peer) : peer
}
/**
@ -54,7 +54,7 @@ function getPeerInfoRemote (peer, libp2p) {
let peerInfo
try {
peerInfo = getPeerInfo(peer, libp2p.peerBook)
peerInfo = getPeerInfo(peer, libp2p.peerStore)
} catch (err) {
return Promise.reject(errCode(
new Error(`${peer} is not a valid peer type`),

View File

@ -29,6 +29,7 @@ const Dialer = require('./dialer')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store')
const Registrar = require('./registrar')
const notStarted = (action, state) => {
return errCode(
@ -71,10 +72,13 @@ class Libp2p extends EventEmitter {
const peerInfo = getPeerInfo(connection.remotePeer)
this.peerStore.put(peerInfo)
this.registrar.onConnect(peerInfo, connection)
this.emit('peer:connect', peerInfo)
},
onConnectionEnd: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
this.registrar.onDisconnect(peerInfo, connection)
this.emit('peer:disconnect', peerInfo)
}
})
@ -108,6 +112,10 @@ class Libp2p extends EventEmitter {
transportManager: this.transportManager
})
this.registrar = new Registrar({ peerStore: this.peerStore })
this.handle = this.handle.bind(this)
this.registrar.handle = this.handle
// Attach private network protector
if (this._modules.connProtector) {
this.upgrader.protector = this._modules.connProtector

139
src/registrar.js Normal file
View File

@ -0,0 +1,139 @@
'use strict'
const assert = require('assert')
const debug = require('debug')
const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error')
const { Connection } = require('libp2p-interfaces/src/connection')
const PeerInfo = require('peer-info')
const Toplogy = require('./connection-manager/topology')
/**
* Responsible for notifying registered protocols of events in the network.
*/
class Registrar {
/**
* @param {Object} props
* @param {PeerStore} props.peerStore
* @constructor
*/
constructor ({ peerStore }) {
this.peerStore = peerStore
/**
* Map of connections per peer
* TODO: this should be handled by connectionManager
* @type {Map<string, Array<conn>>}
*/
this.connections = new Map()
/**
* Map of topologies
*
* @type {Map<string, object>}
*/
this.topologies = new Map()
this._handle = undefined
}
get handle () {
return this._handle
}
set handle (handle) {
this._handle = handle
}
/**
* Add a new connected peer to the record
* TODO: this should live in the ConnectionManager
* @param {PeerInfo} peerInfo
* @param {Connection} conn
* @returns {void}
*/
onConnect (peerInfo, conn) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
assert(Connection.isConnection(conn), 'conn must be an instance of interface-connection')
const id = peerInfo.id.toB58String()
const storedConn = this.connections.get(id)
if (storedConn) {
storedConn.push(conn)
} else {
this.connections.set(id, [conn])
}
}
/**
* Remove a disconnected peer from the record
* TODO: this should live in the ConnectionManager
* @param {PeerInfo} peerInfo
* @param {Connection} connection
* @param {Error} [error]
* @returns {void}
*/
onDisconnect (peerInfo, connection, error) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
const id = peerInfo.id.toB58String()
let storedConn = this.connections.get(id)
if (storedConn && storedConn.length > 1) {
storedConn = storedConn.filter((conn) => conn.id === connection.id)
} else if (storedConn) {
for (const [, topology] of this.topologies) {
topology.disconnect(peerInfo, error)
}
this.connections.delete(peerInfo.id.toB58String())
}
}
/**
* Get a connection with a peer.
* @param {PeerInfo} peerInfo
* @returns {Connection}
*/
getConnection (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
// TODO: what should we return
return this.connections.get(peerInfo.id.toB58String())[0]
}
/**
* Register handlers for a set of multicodecs given
* @param {Object} topologyProps properties for topology
* @param {Array<string>|string} topologyProps.multicodecs
* @param {Object} topologyProps.handlers
* @param {function} topologyProps.handlers.onConnect
* @param {function} topologyProps.handlers.onDisconnect
* @return {string} registrar identifier
*/
register (topologyProps) {
// Create multicodec topology
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
const topology = new Toplogy(topologyProps)
this.topologies.set(id, topology)
// Set registrar
topology.registrar = this
return id
}
/**
* Unregister topology.
* @param {string} id registrar identifier
* @return {boolean} unregistered successfully
*/
unregister (id) {
return this.topologies.delete(id)
}
}
module.exports = Registrar

View File

@ -0,0 +1,57 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const sinon = require('sinon')
const mergeOptions = require('merge-options')
const multiaddr = require('multiaddr')
const Libp2p = require('../../src')
const baseOptions = require('../utils/base-options')
const peerUtils = require('../utils/creators/peer')
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
describe('registrar on dial', () => {
let peerInfo
let remotePeerInfo
let libp2p
let remoteLibp2p
let remoteAddr
before(async () => {
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2)
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo: remotePeerInfo
}))
await remoteLibp2p.transportManager.listen([listenAddr])
remoteAddr = remoteLibp2p.transportManager.getAddrs()[0]
})
after(async () => {
sinon.restore()
await remoteLibp2p.stop()
libp2p && await libp2p.stop()
})
it('should inform registrar of a new connection', async () => {
libp2p = new Libp2p(mergeOptions(baseOptions, {
peerInfo
}))
sinon.spy(remoteLibp2p.registrar, 'onConnect')
await libp2p.dial(remoteAddr)
expect(remoteLibp2p.registrar.onConnect.callCount).to.equal(1)
const libp2pConn = libp2p.registrar.getConnection(remotePeerInfo)
expect(libp2pConn).to.exist()
const remoteConn = remoteLibp2p.registrar.getConnection(peerInfo)
expect(remoteConn).to.exist()
})
})

View File

@ -0,0 +1,224 @@
'use strict'
/* eslint-env mocha */
const chai = require('chai')
chai.use(require('dirty-chai'))
const { expect } = chai
const pDefer = require('p-defer')
const PeerInfo = require('peer-info')
const PeerStore = require('../../src/peer-store')
const Registrar = require('../../src/registrar')
const { createMockConnection } = require('./utils')
const multicodec = '/test/1.0.0'
describe('registrar', () => {
let peerStore, registrar
describe('errors', () => {
beforeEach(() => {
peerStore = new PeerStore()
registrar = new Registrar({ peerStore })
})
it('should fail to register a protocol if no multicodec is provided', () => {
try {
registrar.register()
} catch (err) {
expect(err).to.exist()
return
}
throw new Error('should fail to register a protocol if no multicodec is provided')
})
it('should fail to register a protocol if no handlers are provided', () => {
const topologyProps = {
multicodecs: multicodec
}
try {
registrar.register(topologyProps)
} catch (err) {
expect(err).to.exist()
return
}
throw new Error('should fail to register a protocol if no handlers are provided')
})
it('should fail to register a protocol if the onConnect handler is not provided', () => {
const topologyProps = {
multicodecs: multicodec,
handlers: {
onDisconnect: () => { }
}
}
try {
registrar.register(topologyProps)
} catch (err) {
expect(err).to.exist()
return
}
throw new Error('should fail to register a protocol if the onConnect handler is not provided')
})
it('should fail to register a protocol if the onDisconnect handler is not provided', () => {
const topologyProps = {
multicodecs: multicodec,
handlers: {
onConnect: () => { }
}
}
try {
registrar.register(topologyProps)
} catch (err) {
expect(err).to.exist()
return
}
throw new Error('should fail to register a protocol if the onDisconnect handler is not provided')
})
})
describe('registration', () => {
beforeEach(() => {
peerStore = new PeerStore()
registrar = new Registrar({ peerStore })
})
it('should be able to register a protocol', () => {
const topologyProps = {
handlers: {
onConnect: () => { },
onDisconnect: () => { }
},
multicodecs: multicodec
}
const identifier = registrar.register(topologyProps)
expect(identifier).to.exist()
})
it('should be able to unregister a protocol', () => {
const topologyProps = {
handlers: {
onConnect: () => { },
onDisconnect: () => { }
},
multicodecs: multicodec
}
const identifier = registrar.register(topologyProps)
const success = registrar.unregister(identifier)
expect(success).to.eql(true)
})
it('should fail to unregister if no register was made', () => {
const success = registrar.unregister('bad-identifier')
expect(success).to.eql(false)
})
it('should call onConnect handler for connected peers after register', async () => {
const onConnectDefer = pDefer()
const onDisconnectDefer = pDefer()
// Setup connections before registrar
const conn = await createMockConnection()
const remotePeerInfo = await PeerInfo.create(conn.remotePeer)
// Add protocol to peer
remotePeerInfo.protocols.add(multicodec)
// Add connected peer to peerStore and registrar
peerStore.put(remotePeerInfo)
registrar.onConnect(remotePeerInfo, conn)
expect(registrar.connections.size).to.eql(1)
const topologyProps = {
multicodecs: multicodec,
handlers: {
onConnect: (peerInfo, connection) => {
expect(peerInfo.id.toB58String()).to.eql(remotePeerInfo.id.toB58String())
expect(connection.id).to.eql(conn.id)
onConnectDefer.resolve()
},
onDisconnect: (peerInfo) => {
expect(peerInfo.id.toB58String()).to.eql(remotePeerInfo.id.toB58String())
onDisconnectDefer.resolve()
}
}
}
// Register protocol
const identifier = registrar.register(topologyProps)
const topology = registrar.topologies.get(identifier)
// Topology created
expect(topology).to.exist()
expect(topology.peers.size).to.eql(1)
registrar.onDisconnect(remotePeerInfo)
expect(registrar.connections.size).to.eql(0)
expect(topology.peers.size).to.eql(1) // topology should keep the peer
// Wait for handlers to be called
return Promise.all([
onConnectDefer.promise,
onDisconnectDefer.promise
])
})
it('should call onConnect handler after register, once a peer is connected and protocols are updated', async () => {
const onConnectDefer = pDefer()
const onDisconnectDefer = pDefer()
const topologyProps = {
multicodecs: multicodec,
handlers: {
onConnect: () => {
onConnectDefer.resolve()
},
onDisconnect: () => {
onDisconnectDefer.resolve()
}
}
}
// Register protocol
const identifier = registrar.register(topologyProps)
const topology = registrar.topologies.get(identifier)
// Topology created
expect(topology).to.exist()
expect(topology.peers.size).to.eql(0)
expect(registrar.connections.size).to.eql(0)
// Setup connections before registrar
const conn = await createMockConnection()
const peerInfo = await PeerInfo.create(conn.remotePeer)
// Add connected peer to peerStore and registrar
peerStore.put(peerInfo)
registrar.onConnect(peerInfo, conn)
// Add protocol to peer and update it
peerInfo.protocols.add(multicodec)
peerStore.put(peerInfo)
await onConnectDefer.promise
expect(topology.peers.size).to.eql(1)
// Remove protocol to peer and update it
peerInfo.protocols.delete(multicodec)
peerStore.put(peerInfo)
await onDisconnectDefer.promise
})
})
})

50
test/registrar/utils.js Normal file
View File

@ -0,0 +1,50 @@
'use strict'
const { Connection } = require('libp2p-interfaces/src/connection')
const multiaddr = require('multiaddr')
const pair = require('it-pair')
const peerUtils = require('../utils/creators/peer')
module.exports.createMockConnection = async (properties = {}) => {
const localAddr = multiaddr('/ip4/127.0.0.1/tcp/8080')
const remoteAddr = multiaddr('/ip4/127.0.0.1/tcp/8081')
const [localPeer, remotePeer] = await peerUtils.createPeerInfoFromFixture(2)
const openStreams = []
let streamId = 0
return new Connection({
localPeer: localPeer.id,
remotePeer: remotePeer.id,
localAddr,
remoteAddr,
stat: {
timeline: {
open: Date.now() - 10,
upgraded: Date.now()
},
direction: 'outbound',
encryption: '/secio/1.0.0',
multiplexer: '/mplex/6.7.0'
},
newStream: (protocols) => {
const id = streamId++
const stream = pair()
stream.close = () => stream.sink([])
stream.id = id
openStreams.push(stream)
return {
stream,
protocol: protocols[0]
}
},
close: () => { },
getStreams: () => openStreams,
...properties
})
}

View File

@ -2,12 +2,12 @@
const Transport = require('libp2p-tcp')
const Muxer = require('libp2p-mplex')
const mockCrypto = require('../utils/mockCrypto')
const Crypto = require('../../src/insecure/plaintext')
module.exports = {
modules: {
transport: [Transport],
streamMuxer: [Muxer],
connEncryption: [mockCrypto]
connEncryption: [Crypto]
}
}