chore: use topology interface

This commit is contained in:
Vasco Santos 2019-11-12 15:34:55 +01:00
parent cdfd02306f
commit 4cc9736485
4 changed files with 19 additions and 182 deletions

View File

@ -55,7 +55,7 @@
"it-protocol-buffers": "^0.2.0", "it-protocol-buffers": "^0.2.0",
"latency-monitor": "~0.2.1", "latency-monitor": "~0.2.1",
"libp2p-crypto": "^0.17.1", "libp2p-crypto": "^0.17.1",
"libp2p-interfaces": "^0.1.3", "libp2p-interfaces": "^0.1.4",
"mafmt": "^7.0.0", "mafmt": "^7.0.0",
"merge-options": "^1.0.1", "merge-options": "^1.0.1",
"moving-average": "^1.0.0", "moving-average": "^1.0.0",
@ -91,7 +91,7 @@
"libp2p-bootstrap": "^0.9.7", "libp2p-bootstrap": "^0.9.7",
"libp2p-delegated-content-routing": "^0.2.2", "libp2p-delegated-content-routing": "^0.2.2",
"libp2p-delegated-peer-routing": "^0.2.2", "libp2p-delegated-peer-routing": "^0.2.2",
"libp2p-floodsub": "libp2p/js-libp2p-floodsub#refactor/async", "libp2p-floodsub": "^0.19.0",
"libp2p-gossipsub": "ChainSafe/gossipsub-js#refactor/async", "libp2p-gossipsub": "ChainSafe/gossipsub-js#refactor/async",
"libp2p-kad-dht": "^0.15.3", "libp2p-kad-dht": "^0.15.3",
"libp2p-mdns": "^0.12.3", "libp2p-mdns": "^0.12.3",

View File

@ -1,108 +0,0 @@
'use strict'
const assert = require('assert')
class Topology {
/**
* @param {Object} props
* @param {number} props.min minimum needed connections (default: 0)
* @param {number} props.max maximum needed connections (default: Infinity)
* @param {Array<string>} props.multicodecs protocol multicodecs
* @param {Object} props.handlers
* @param {function} props.handlers.onConnect protocol "onConnect" handler
* @param {function} props.handlers.onDisconnect protocol "onDisconnect" handler
* @constructor
*/
constructor ({
min = 0,
max = Infinity,
multicodecs,
handlers
}) {
assert(multicodecs, 'one or more multicodec should be provided')
assert(handlers, 'the handlers should be provided')
assert(handlers.onConnect && typeof handlers.onConnect === 'function',
'the \'onConnect\' handler must be provided')
assert(handlers.onDisconnect && typeof handlers.onDisconnect === 'function',
'the \'onDisconnect\' handler must be provided')
this.multicodecs = Array.isArray(multicodecs) ? multicodecs : [multicodecs]
this.min = min
this.max = max
// Handlers
this._onConnect = handlers.onConnect
this._onDisconnect = handlers.onDisconnect
this.peers = new Map()
this._registrar = undefined
this._onProtocolChange = this._onProtocolChange.bind(this)
}
set registrar (registrar) {
this._registrar = registrar
this._registrar.peerStore.on('change:protocols', this._onProtocolChange)
// Update topology peers
this._updatePeers(this._registrar.peerStore.peers.values())
}
/**
* Update topology.
* @param {Array<PeerInfo>} peerInfoIterable
* @returns {void}
*/
_updatePeers (peerInfoIterable) {
for (const peerInfo of peerInfoIterable) {
if (this.multicodecs.filter(multicodec => peerInfo.protocols.has(multicodec))) {
// Add the peer regardless of whether or not there is currently a connection
this.peers.set(peerInfo.id.toB58String(), peerInfo)
// If there is a connection, call _onConnect
const connection = this._registrar.getConnection(peerInfo)
connection && this._onConnect(peerInfo, connection)
} else {
// Remove any peers we might be tracking that are no longer of value to us
this.peers.delete(peerInfo.id.toB58String())
}
}
}
/**
* Notify protocol of peer disconnected.
* @param {PeerInfo} peerInfo
* @param {Error} [error]
* @returns {void}
*/
disconnect (peerInfo, error) {
this._onDisconnect(peerInfo, error)
}
/**
* Check if a new peer support the multicodecs for this topology.
* @param {Object} props
* @param {PeerInfo} props.peerInfo
* @param {Array<string>} props.protocols
*/
_onProtocolChange ({ peerInfo, protocols }) {
const existingPeer = this.peers.get(peerInfo.id.toB58String())
const hasProtocol = protocols.filter(protocol => this.multicodecs.includes(protocol))
// Not supporting the protocol anymore?
if (existingPeer && hasProtocol.length === 0) {
this._onDisconnect({
peerInfo
})
}
// New to protocol support
for (const protocol of protocols) {
if (this.multicodecs.includes(protocol)) {
this._updatePeers([peerInfo])
return
}
}
}
}
module.exports = Topology

View File

@ -7,7 +7,6 @@ log.error = debug('libp2p:peer-store:error')
const { Connection } = require('libp2p-interfaces/src/connection') const { Connection } = require('libp2p-interfaces/src/connection')
const PeerInfo = require('peer-info') const PeerInfo = require('peer-info')
const Toplogy = require('./connection-manager/topology')
/** /**
* Responsible for notifying registered protocols of events in the network. * Responsible for notifying registered protocols of events in the network.
@ -106,17 +105,12 @@ class Registrar {
/** /**
* Register handlers for a set of multicodecs given * Register handlers for a set of multicodecs given
* @param {Object} topologyProps properties for topology * @param {Topology} topology protocol topology
* @param {Array<string>|string} topologyProps.multicodecs
* @param {Object} topologyProps.handlers
* @param {function} topologyProps.handlers.onConnect
* @param {function} topologyProps.handlers.onDisconnect
* @return {string} registrar identifier * @return {string} registrar identifier
*/ */
register (topologyProps) { register (topology) {
// Create multicodec topology // Create topology
const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now() const id = (parseInt(Math.random() * 1e9)).toString(36) + Date.now()
const topology = new Toplogy(topologyProps)
this.topologies.set(id, topology) this.topologies.set(id, topology)

View File

@ -7,6 +7,7 @@ const { expect } = chai
const pDefer = require('p-defer') const pDefer = require('p-defer')
const PeerInfo = require('peer-info') const PeerInfo = require('peer-info')
const Topology = require('libp2p-interfaces/src/topology/multicodec-topology')
const PeerStore = require('../../src/peer-store') const PeerStore = require('../../src/peer-store')
const Registrar = require('../../src/registrar') const Registrar = require('../../src/registrar')
const { createMockConnection } = require('./utils') const { createMockConnection } = require('./utils')
@ -32,53 +33,7 @@ describe('registrar', () => {
throw new Error('should fail to register a protocol if no multicodec is provided') throw new Error('should fail to register a protocol if no multicodec is provided')
}) })
it('should fail to register a protocol if no handlers are provided', () => { // TODO: not valid topology
const topologyProps = {
multicodecs: multicodec
}
try {
registrar.register(topologyProps)
} catch (err) {
expect(err).to.exist()
return
}
throw new Error('should fail to register a protocol if no handlers are provided')
})
it('should fail to register a protocol if the onConnect handler is not provided', () => {
const topologyProps = {
multicodecs: multicodec,
handlers: {
onDisconnect: () => { }
}
}
try {
registrar.register(topologyProps)
} catch (err) {
expect(err).to.exist()
return
}
throw new Error('should fail to register a protocol if the onConnect handler is not provided')
})
it('should fail to register a protocol if the onDisconnect handler is not provided', () => {
const topologyProps = {
multicodecs: multicodec,
handlers: {
onConnect: () => { }
}
}
try {
registrar.register(topologyProps)
} catch (err) {
expect(err).to.exist()
return
}
throw new Error('should fail to register a protocol if the onDisconnect handler is not provided')
})
}) })
describe('registration', () => { describe('registration', () => {
@ -88,13 +43,13 @@ describe('registrar', () => {
}) })
it('should be able to register a protocol', () => { it('should be able to register a protocol', () => {
const topologyProps = { const topologyProps = new Topology({
multicodecs: multicodec,
handlers: { handlers: {
onConnect: () => { }, onConnect: () => { },
onDisconnect: () => { } onDisconnect: () => { }
},
multicodecs: multicodec
} }
})
const identifier = registrar.register(topologyProps) const identifier = registrar.register(topologyProps)
@ -102,13 +57,13 @@ describe('registrar', () => {
}) })
it('should be able to unregister a protocol', () => { it('should be able to unregister a protocol', () => {
const topologyProps = { const topologyProps = new Topology({
multicodecs: multicodec,
handlers: { handlers: {
onConnect: () => { }, onConnect: () => { },
onDisconnect: () => { } onDisconnect: () => { }
},
multicodecs: multicodec
} }
})
const identifier = registrar.register(topologyProps) const identifier = registrar.register(topologyProps)
const success = registrar.unregister(identifier) const success = registrar.unregister(identifier)
@ -138,7 +93,7 @@ describe('registrar', () => {
registrar.onConnect(remotePeerInfo, conn) registrar.onConnect(remotePeerInfo, conn)
expect(registrar.connections.size).to.eql(1) expect(registrar.connections.size).to.eql(1)
const topologyProps = { const topologyProps = new Topology({
multicodecs: multicodec, multicodecs: multicodec,
handlers: { handlers: {
onConnect: (peerInfo, connection) => { onConnect: (peerInfo, connection) => {
@ -153,7 +108,7 @@ describe('registrar', () => {
onDisconnectDefer.resolve() onDisconnectDefer.resolve()
} }
} }
} })
// Register protocol // Register protocol
const identifier = registrar.register(topologyProps) const identifier = registrar.register(topologyProps)
@ -161,11 +116,9 @@ describe('registrar', () => {
// Topology created // Topology created
expect(topology).to.exist() expect(topology).to.exist()
expect(topology.peers.size).to.eql(1)
registrar.onDisconnect(remotePeerInfo) registrar.onDisconnect(remotePeerInfo)
expect(registrar.connections.size).to.eql(0) expect(registrar.connections.size).to.eql(0)
expect(topology.peers.size).to.eql(1) // topology should keep the peer
// Wait for handlers to be called // Wait for handlers to be called
return Promise.all([ return Promise.all([
@ -178,7 +131,7 @@ describe('registrar', () => {
const onConnectDefer = pDefer() const onConnectDefer = pDefer()
const onDisconnectDefer = pDefer() const onDisconnectDefer = pDefer()
const topologyProps = { const topologyProps = new Topology({
multicodecs: multicodec, multicodecs: multicodec,
handlers: { handlers: {
onConnect: () => { onConnect: () => {
@ -188,7 +141,7 @@ describe('registrar', () => {
onDisconnectDefer.resolve() onDisconnectDefer.resolve()
} }
} }
} })
// Register protocol // Register protocol
const identifier = registrar.register(topologyProps) const identifier = registrar.register(topologyProps)
@ -196,7 +149,6 @@ describe('registrar', () => {
// Topology created // Topology created
expect(topology).to.exist() expect(topology).to.exist()
expect(topology.peers.size).to.eql(0)
expect(registrar.connections.size).to.eql(0) expect(registrar.connections.size).to.eql(0)
// Setup connections before registrar // Setup connections before registrar
@ -212,7 +164,6 @@ describe('registrar', () => {
peerStore.put(peerInfo) peerStore.put(peerInfo)
await onConnectDefer.promise await onConnectDefer.promise
expect(topology.peers.size).to.eql(1)
// Remove protocol to peer and update it // Remove protocol to peer and update it
peerInfo.protocols.delete(multicodec) peerInfo.protocols.delete(multicodec)