mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-05-22 07:01:21 +00:00
feat: peer store (#470)
* feat: peer-store v0 * chore: apply suggestions from code review Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
This commit is contained in:
parent
fba058370a
commit
53bcd2d745
@ -63,7 +63,6 @@
|
|||||||
"once": "^1.4.0",
|
"once": "^1.4.0",
|
||||||
"p-queue": "^6.1.1",
|
"p-queue": "^6.1.1",
|
||||||
"p-settle": "^3.1.0",
|
"p-settle": "^3.1.0",
|
||||||
"peer-book": "^0.9.1",
|
|
||||||
"peer-id": "^0.13.3",
|
"peer-id": "^0.13.3",
|
||||||
"peer-info": "^0.17.0",
|
"peer-info": "^0.17.0",
|
||||||
"promisify-es6": "^1.0.3",
|
"promisify-es6": "^1.0.3",
|
||||||
|
34
src/index.js
34
src/index.js
@ -11,7 +11,6 @@ const promisify = require('promisify-es6')
|
|||||||
const each = require('async/each')
|
const each = require('async/each')
|
||||||
const nextTick = require('async/nextTick')
|
const nextTick = require('async/nextTick')
|
||||||
|
|
||||||
const PeerBook = require('peer-book')
|
|
||||||
const PeerInfo = require('peer-info')
|
const PeerInfo = require('peer-info')
|
||||||
const multiaddr = require('multiaddr')
|
const multiaddr = require('multiaddr')
|
||||||
const Switch = require('./switch')
|
const Switch = require('./switch')
|
||||||
@ -29,6 +28,7 @@ const { codes } = require('./errors')
|
|||||||
const Dialer = require('./dialer')
|
const Dialer = require('./dialer')
|
||||||
const TransportManager = require('./transport-manager')
|
const TransportManager = require('./transport-manager')
|
||||||
const Upgrader = require('./upgrader')
|
const Upgrader = require('./upgrader')
|
||||||
|
const PeerStore = require('./peer-store')
|
||||||
|
|
||||||
const notStarted = (action, state) => {
|
const notStarted = (action, state) => {
|
||||||
return errCode(
|
return errCode(
|
||||||
@ -54,7 +54,7 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
this.datastore = this._options.datastore
|
this.datastore = this._options.datastore
|
||||||
this.peerInfo = this._options.peerInfo
|
this.peerInfo = this._options.peerInfo
|
||||||
this.peerBook = this._options.peerBook || new PeerBook()
|
this.peerStore = new PeerStore()
|
||||||
|
|
||||||
this._modules = this._options.modules
|
this._modules = this._options.modules
|
||||||
this._config = this._options.config
|
this._config = this._options.config
|
||||||
@ -62,13 +62,15 @@ class Libp2p extends EventEmitter {
|
|||||||
this._discovery = [] // Discovery service instances/references
|
this._discovery = [] // Discovery service instances/references
|
||||||
|
|
||||||
// create the switch, and listen for errors
|
// create the switch, and listen for errors
|
||||||
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
|
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)
|
||||||
|
|
||||||
// Setup the Upgrader
|
// Setup the Upgrader
|
||||||
this.upgrader = new Upgrader({
|
this.upgrader = new Upgrader({
|
||||||
localPeer: this.peerInfo.id,
|
localPeer: this.peerInfo.id,
|
||||||
onConnection: (connection) => {
|
onConnection: (connection) => {
|
||||||
const peerInfo = getPeerInfo(connection.remotePeer)
|
const peerInfo = getPeerInfo(connection.remotePeer)
|
||||||
|
|
||||||
|
this.peerStore.put(peerInfo)
|
||||||
this.emit('peer:connect', peerInfo)
|
this.emit('peer:connect', peerInfo)
|
||||||
},
|
},
|
||||||
onConnectionEnd: (connection) => {
|
onConnectionEnd: (connection) => {
|
||||||
@ -179,10 +181,10 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
// Once we start, emit and dial any peers we may have already discovered
|
// Once we start, emit and dial any peers we may have already discovered
|
||||||
this.state.on('STARTED', () => {
|
this.state.on('STARTED', () => {
|
||||||
this.peerBook.getAllArray().forEach((peerInfo) => {
|
for (const peerInfo of this.peerStore.peers) {
|
||||||
this.emit('peer:discovery', peerInfo)
|
this.emit('peer:discovery', peerInfo)
|
||||||
this._maybeConnect(peerInfo)
|
this._maybeConnect(peerInfo)
|
||||||
})
|
}
|
||||||
})
|
})
|
||||||
|
|
||||||
this._peerDiscovered = this._peerDiscovered.bind(this)
|
this._peerDiscovered = this._peerDiscovered.bind(this)
|
||||||
@ -245,7 +247,7 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Dials to the provided peer. If successful, the `PeerInfo` of the
|
* Dials to the provided peer. If successful, the `PeerInfo` of the
|
||||||
* peer will be added to the nodes `PeerBook`
|
* peer will be added to the nodes `peerStore`
|
||||||
*
|
*
|
||||||
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
|
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
|
||||||
* @param {object} options
|
* @param {object} options
|
||||||
@ -258,7 +260,7 @@ class Libp2p extends EventEmitter {
|
|||||||
|
|
||||||
/**
|
/**
|
||||||
* Dials to the provided peer and handshakes with the given protocol.
|
* Dials to the provided peer and handshakes with the given protocol.
|
||||||
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
|
* If successful, the `PeerInfo` of the peer will be added to the nodes `peerStore`,
|
||||||
* and the `Connection` will be sent in the callback
|
* and the `Connection` will be sent in the callback
|
||||||
*
|
*
|
||||||
* @async
|
* @async
|
||||||
@ -277,11 +279,19 @@ class Libp2p extends EventEmitter {
|
|||||||
connection = await this.dialer.connectToPeer(peer, options)
|
connection = await this.dialer.connectToPeer(peer, options)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
const peerInfo = getPeerInfo(connection.remotePeer)
|
||||||
|
|
||||||
// If a protocol was provided, create a new stream
|
// If a protocol was provided, create a new stream
|
||||||
if (protocols) {
|
if (protocols) {
|
||||||
return connection.newStream(protocols)
|
const stream = await connection.newStream(protocols)
|
||||||
|
|
||||||
|
peerInfo.protocols.add(stream.protocol)
|
||||||
|
this.peerStore.put(peerInfo)
|
||||||
|
|
||||||
|
return stream
|
||||||
}
|
}
|
||||||
|
|
||||||
|
this.peerStore.put(peerInfo)
|
||||||
return connection
|
return connection
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -369,12 +379,6 @@ class Libp2p extends EventEmitter {
|
|||||||
* the `peer:discovery` event. If auto dial is enabled for libp2p
|
* the `peer:discovery` event. If auto dial is enabled for libp2p
|
||||||
* and the current connection count is under the low watermark, the
|
* and the current connection count is under the low watermark, the
|
||||||
* peer will be dialed.
|
* peer will be dialed.
|
||||||
*
|
|
||||||
* TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345,
|
|
||||||
* it would be ideal if only new peers were emitted. Currently, with
|
|
||||||
* other modules adding peers to the `PeerBook` we have no way of knowing
|
|
||||||
* if a peer is new or not, so it has to be emitted.
|
|
||||||
*
|
|
||||||
* @private
|
* @private
|
||||||
* @param {PeerInfo} peerInfo
|
* @param {PeerInfo} peerInfo
|
||||||
*/
|
*/
|
||||||
@ -383,7 +387,7 @@ class Libp2p extends EventEmitter {
|
|||||||
log.error(new Error(codes.ERR_DISCOVERED_SELF))
|
log.error(new Error(codes.ERR_DISCOVERED_SELF))
|
||||||
return
|
return
|
||||||
}
|
}
|
||||||
peerInfo = this.peerBook.put(peerInfo)
|
peerInfo = this.peerStore.put(peerInfo)
|
||||||
|
|
||||||
if (!this.isStarted()) return
|
if (!this.isStarted()) return
|
||||||
|
|
||||||
|
3
src/peer-store/README.md
Normal file
3
src/peer-store/README.md
Normal file
@ -0,0 +1,3 @@
|
|||||||
|
# Peerstore
|
||||||
|
|
||||||
|
WIP
|
190
src/peer-store/index.js
Normal file
190
src/peer-store/index.js
Normal file
@ -0,0 +1,190 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const assert = require('assert')
|
||||||
|
const debug = require('debug')
|
||||||
|
const log = debug('libp2p:peer-store')
|
||||||
|
log.error = debug('libp2p:peer-store:error')
|
||||||
|
|
||||||
|
const { EventEmitter } = require('events')
|
||||||
|
|
||||||
|
const PeerInfo = require('peer-info')
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Responsible for managing known peers, as well as their addresses and metadata
|
||||||
|
* @fires PeerStore#peer Emitted when a peer is connected to this node
|
||||||
|
* @fires PeerStore#change:protocols
|
||||||
|
* @fires PeerStore#change:multiaddrs
|
||||||
|
*/
|
||||||
|
class PeerStore extends EventEmitter {
|
||||||
|
constructor () {
|
||||||
|
super()
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Map of peers
|
||||||
|
*
|
||||||
|
* @type {Map<string, PeerInfo>}
|
||||||
|
*/
|
||||||
|
this.peers = new Map()
|
||||||
|
|
||||||
|
// TODO: Track ourselves. We should split `peerInfo` up into its pieces so we get better
|
||||||
|
// control and observability. This will be the initial step for removing PeerInfo
|
||||||
|
// https://github.com/libp2p/go-libp2p-core/blob/master/peerstore/peerstore.go
|
||||||
|
// this.addressBook = new Map()
|
||||||
|
// this.protoBook = new Map()
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Stores the peerInfo of a new peer.
|
||||||
|
* If already exist, its info is updated.
|
||||||
|
* @param {PeerInfo} peerInfo
|
||||||
|
*/
|
||||||
|
put (peerInfo) {
|
||||||
|
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
||||||
|
|
||||||
|
// Already know the peer?
|
||||||
|
if (this.peers.has(peerInfo.id.toB58String())) {
|
||||||
|
this.update(peerInfo)
|
||||||
|
} else {
|
||||||
|
this.add(peerInfo)
|
||||||
|
|
||||||
|
// Emit the new peer found
|
||||||
|
this.emit('peer', peerInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Add a new peer to the store.
|
||||||
|
* @param {PeerInfo} peerInfo
|
||||||
|
*/
|
||||||
|
add (peerInfo) {
|
||||||
|
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
||||||
|
|
||||||
|
// Create new instance and add values to it
|
||||||
|
const newPeerInfo = new PeerInfo(peerInfo.id)
|
||||||
|
|
||||||
|
peerInfo.multiaddrs.forEach((ma) => newPeerInfo.multiaddrs.add(ma))
|
||||||
|
peerInfo.protocols.forEach((p) => newPeerInfo.protocols.add(p))
|
||||||
|
|
||||||
|
const connectedMa = peerInfo.isConnected()
|
||||||
|
connectedMa && newPeerInfo.connect(connectedMa)
|
||||||
|
|
||||||
|
const peerProxy = new Proxy(newPeerInfo, {
|
||||||
|
set: (obj, prop, value) => {
|
||||||
|
if (prop === 'multiaddrs') {
|
||||||
|
this.emit('change:multiaddrs', {
|
||||||
|
peerInfo: obj,
|
||||||
|
multiaddrs: value.toArray()
|
||||||
|
})
|
||||||
|
} else if (prop === 'protocols') {
|
||||||
|
this.emit('change:protocols', {
|
||||||
|
peerInfo: obj,
|
||||||
|
protocols: Array.from(value)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return Reflect.set(...arguments)
|
||||||
|
}
|
||||||
|
})
|
||||||
|
|
||||||
|
this.peers.set(peerInfo.id.toB58String(), peerProxy)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Updates an already known peer.
|
||||||
|
* @param {PeerInfo} peerInfo
|
||||||
|
*/
|
||||||
|
update (peerInfo) {
|
||||||
|
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
||||||
|
const id = peerInfo.id.toB58String()
|
||||||
|
const recorded = this.peers.get(id)
|
||||||
|
|
||||||
|
// pass active connection state
|
||||||
|
const ma = peerInfo.isConnected()
|
||||||
|
if (ma) {
|
||||||
|
recorded.connect(ma)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Verify new multiaddrs
|
||||||
|
// TODO: better track added and removed multiaddrs
|
||||||
|
const multiaddrsIntersection = [
|
||||||
|
...recorded.multiaddrs.toArray()
|
||||||
|
].filter((m) => peerInfo.multiaddrs.has(m))
|
||||||
|
|
||||||
|
if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size ||
|
||||||
|
multiaddrsIntersection.length !== recorded.multiaddrs.size) {
|
||||||
|
// recorded.multiaddrs = peerInfo.multiaddrs
|
||||||
|
recorded.multiaddrs.clear()
|
||||||
|
|
||||||
|
for (const ma of peerInfo.multiaddrs.toArray()) {
|
||||||
|
recorded.multiaddrs.add(ma)
|
||||||
|
}
|
||||||
|
|
||||||
|
this.emit('change:multiaddrs', {
|
||||||
|
peerInfo: peerInfo,
|
||||||
|
multiaddrs: recorded.multiaddrs.toArray()
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update protocols
|
||||||
|
// TODO: better track added and removed protocols
|
||||||
|
const protocolsIntersection = new Set(
|
||||||
|
[...recorded.protocols].filter((p) => peerInfo.protocols.has(p))
|
||||||
|
)
|
||||||
|
|
||||||
|
if (protocolsIntersection.size !== peerInfo.protocols.size ||
|
||||||
|
protocolsIntersection.size !== recorded.protocols.size) {
|
||||||
|
recorded.protocols.clear()
|
||||||
|
|
||||||
|
for (const protocol of peerInfo.protocols) {
|
||||||
|
recorded.protocols.add(protocol)
|
||||||
|
}
|
||||||
|
|
||||||
|
this.emit('change:protocols', {
|
||||||
|
peerInfo: peerInfo,
|
||||||
|
protocols: Array.from(recorded.protocols)
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add the public key if missing
|
||||||
|
if (!recorded.id.pubKey && peerInfo.id.pubKey) {
|
||||||
|
recorded.id.pubKey = peerInfo.id.pubKey
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the info to the given id.
|
||||||
|
* @param {string} peerId b58str id
|
||||||
|
* @returns {PeerInfo}
|
||||||
|
*/
|
||||||
|
get (peerId) {
|
||||||
|
const peerInfo = this.peers.get(peerId)
|
||||||
|
|
||||||
|
if (peerInfo) {
|
||||||
|
return peerInfo
|
||||||
|
}
|
||||||
|
|
||||||
|
return undefined
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Removes the Peer with the matching `peerId` from the PeerStore
|
||||||
|
* @param {string} peerId b58str id
|
||||||
|
* @returns {boolean} true if found and removed
|
||||||
|
*/
|
||||||
|
remove (peerId) {
|
||||||
|
return this.peers.delete(peerId)
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Completely replaces the existing peers metadata with the given `peerInfo`
|
||||||
|
* @param {PeerInfo} peerInfo
|
||||||
|
* @returns {void}
|
||||||
|
*/
|
||||||
|
replace (peerInfo) {
|
||||||
|
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
|
||||||
|
|
||||||
|
this.remove(peerInfo.id.toB58String())
|
||||||
|
this.add(peerInfo)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports = PeerStore
|
220
test/peer-store/peer-store.spec.js
Normal file
220
test/peer-store/peer-store.spec.js
Normal file
@ -0,0 +1,220 @@
|
|||||||
|
'use strict'
|
||||||
|
/* eslint-env mocha */
|
||||||
|
|
||||||
|
const chai = require('chai')
|
||||||
|
chai.use(require('dirty-chai'))
|
||||||
|
const { expect } = chai
|
||||||
|
const sinon = require('sinon')
|
||||||
|
|
||||||
|
const pDefer = require('p-defer')
|
||||||
|
const mergeOptions = require('merge-options')
|
||||||
|
|
||||||
|
const Libp2p = require('../../src')
|
||||||
|
const PeerStore = require('../../src/peer-store')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
|
||||||
|
const baseOptions = require('../utils/base-options')
|
||||||
|
const peerUtils = require('../utils/creators/peer')
|
||||||
|
const mockConnection = require('../utils/mockConnection')
|
||||||
|
|
||||||
|
const addr = multiaddr('/ip4/127.0.0.1/tcp/8000')
|
||||||
|
const listenAddr = multiaddr('/ip4/127.0.0.1/tcp/0')
|
||||||
|
|
||||||
|
describe('peer-store', () => {
|
||||||
|
let peerStore
|
||||||
|
|
||||||
|
beforeEach(() => {
|
||||||
|
peerStore = new PeerStore()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should add a new peer and emit it when it does not exist', async () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
|
||||||
|
sinon.spy(peerStore, 'put')
|
||||||
|
sinon.spy(peerStore, 'add')
|
||||||
|
sinon.spy(peerStore, 'update')
|
||||||
|
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfo(1)
|
||||||
|
|
||||||
|
peerStore.on('peer', (peer) => {
|
||||||
|
expect(peer).to.exist()
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
peerStore.put(peerInfo)
|
||||||
|
|
||||||
|
// Wait for peerStore to emit the peer
|
||||||
|
await defer.promise
|
||||||
|
|
||||||
|
expect(peerStore.put.callCount).to.equal(1)
|
||||||
|
expect(peerStore.add.callCount).to.equal(1)
|
||||||
|
expect(peerStore.update.callCount).to.equal(0)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should update peer when it is already in the store', async () => {
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfo(1)
|
||||||
|
|
||||||
|
// Put the peer in the store
|
||||||
|
peerStore.put(peerInfo)
|
||||||
|
|
||||||
|
sinon.spy(peerStore, 'put')
|
||||||
|
sinon.spy(peerStore, 'add')
|
||||||
|
sinon.spy(peerStore, 'update')
|
||||||
|
|
||||||
|
// When updating, peer event must not be emitted
|
||||||
|
peerStore.on('peer', () => {
|
||||||
|
throw new Error('should not emit twice')
|
||||||
|
})
|
||||||
|
// If no multiaddrs change, the event should not be emitted
|
||||||
|
peerStore.on('change:multiaddrs', () => {
|
||||||
|
throw new Error('should not emit change:multiaddrs')
|
||||||
|
})
|
||||||
|
// If no protocols change, the event should not be emitted
|
||||||
|
peerStore.on('change:protocols', () => {
|
||||||
|
throw new Error('should not emit change:protocols')
|
||||||
|
})
|
||||||
|
|
||||||
|
peerStore.put(peerInfo)
|
||||||
|
|
||||||
|
expect(peerStore.put.callCount).to.equal(1)
|
||||||
|
expect(peerStore.add.callCount).to.equal(0)
|
||||||
|
expect(peerStore.update.callCount).to.equal(1)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should emit the "change:multiaddrs" event when a peer has new multiaddrs', async () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
const [createdPeerInfo] = await peerUtils.createPeerInfo(1)
|
||||||
|
|
||||||
|
// Put the peer in the store
|
||||||
|
peerStore.put(createdPeerInfo)
|
||||||
|
|
||||||
|
// When updating, "change:multiaddrs" event must not be emitted
|
||||||
|
peerStore.on('change:multiaddrs', ({ peerInfo, multiaddrs }) => {
|
||||||
|
expect(peerInfo).to.exist()
|
||||||
|
expect(peerInfo.id).to.eql(createdPeerInfo.id)
|
||||||
|
expect(peerInfo.protocols).to.eql(createdPeerInfo.protocols)
|
||||||
|
expect(multiaddrs).to.exist()
|
||||||
|
expect(multiaddrs).to.eql(createdPeerInfo.multiaddrs.toArray())
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
// If no protocols change, the event should not be emitted
|
||||||
|
peerStore.on('change:protocols', () => {
|
||||||
|
throw new Error('should not emit change:protocols')
|
||||||
|
})
|
||||||
|
|
||||||
|
createdPeerInfo.multiaddrs.add(addr)
|
||||||
|
peerStore.put(createdPeerInfo)
|
||||||
|
|
||||||
|
// Wait for peerStore to emit the event
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should emit the "change:protocols" event when a peer has new protocols', async () => {
|
||||||
|
const defer = pDefer()
|
||||||
|
const [createdPeerInfo] = await peerUtils.createPeerInfo(1)
|
||||||
|
|
||||||
|
// Put the peer in the store
|
||||||
|
peerStore.put(createdPeerInfo)
|
||||||
|
|
||||||
|
// If no multiaddrs change, the event should not be emitted
|
||||||
|
peerStore.on('change:multiaddrs', () => {
|
||||||
|
throw new Error('should not emit change:multiaddrs')
|
||||||
|
})
|
||||||
|
// When updating, "change:protocols" event must be emitted
|
||||||
|
peerStore.on('change:protocols', ({ peerInfo, protocols }) => {
|
||||||
|
expect(peerInfo).to.exist()
|
||||||
|
expect(peerInfo.id).to.eql(createdPeerInfo.id)
|
||||||
|
expect(peerInfo.multiaddrs).to.eql(createdPeerInfo.multiaddrs)
|
||||||
|
expect(protocols).to.exist()
|
||||||
|
expect(protocols).to.eql(Array.from(createdPeerInfo.protocols))
|
||||||
|
defer.resolve()
|
||||||
|
})
|
||||||
|
|
||||||
|
createdPeerInfo.protocols.add('/new-protocol/1.0.0')
|
||||||
|
peerStore.put(createdPeerInfo)
|
||||||
|
|
||||||
|
// Wait for peerStore to emit the event
|
||||||
|
await defer.promise
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to retrieve a peer from store through its b58str id', async () => {
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfo(1)
|
||||||
|
const id = peerInfo.id.toB58String()
|
||||||
|
|
||||||
|
let retrievedPeer = peerStore.get(id)
|
||||||
|
expect(retrievedPeer).to.not.exist()
|
||||||
|
|
||||||
|
// Put the peer in the store
|
||||||
|
peerStore.put(peerInfo)
|
||||||
|
|
||||||
|
retrievedPeer = peerStore.get(id)
|
||||||
|
expect(retrievedPeer).to.exist()
|
||||||
|
expect(retrievedPeer.id).to.equal(peerInfo.id)
|
||||||
|
expect(retrievedPeer.multiaddrs).to.eql(peerInfo.multiaddrs)
|
||||||
|
expect(retrievedPeer.protocols).to.eql(peerInfo.protocols)
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should be able to remove a peer from store through its b58str id', async () => {
|
||||||
|
const [peerInfo] = await peerUtils.createPeerInfo(1)
|
||||||
|
const id = peerInfo.id.toB58String()
|
||||||
|
|
||||||
|
let removed = peerStore.remove(id)
|
||||||
|
expect(removed).to.eql(false)
|
||||||
|
|
||||||
|
// Put the peer in the store
|
||||||
|
peerStore.put(peerInfo)
|
||||||
|
expect(peerStore.peers.size).to.equal(1)
|
||||||
|
|
||||||
|
removed = peerStore.remove(id)
|
||||||
|
expect(removed).to.eql(true)
|
||||||
|
expect(peerStore.peers.size).to.equal(0)
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('peer-store on dial', () => {
|
||||||
|
let peerInfo
|
||||||
|
let remotePeerInfo
|
||||||
|
let libp2p
|
||||||
|
let remoteLibp2p
|
||||||
|
|
||||||
|
before(async () => {
|
||||||
|
[peerInfo, remotePeerInfo] = await peerUtils.createPeerInfoFromFixture(2)
|
||||||
|
remoteLibp2p = new Libp2p(mergeOptions(baseOptions, {
|
||||||
|
peerInfo: remotePeerInfo
|
||||||
|
}))
|
||||||
|
})
|
||||||
|
|
||||||
|
after(async () => {
|
||||||
|
sinon.restore()
|
||||||
|
await remoteLibp2p.stop()
|
||||||
|
libp2p && await libp2p.stop()
|
||||||
|
})
|
||||||
|
|
||||||
|
it('should put the remote peerInfo after dial and emit event', async () => {
|
||||||
|
const remoteId = remotePeerInfo.id.toB58String()
|
||||||
|
|
||||||
|
libp2p = new Libp2p(mergeOptions(baseOptions, {
|
||||||
|
peerInfo
|
||||||
|
}))
|
||||||
|
|
||||||
|
sinon.spy(libp2p.peerStore, 'put')
|
||||||
|
sinon.spy(libp2p.peerStore, 'add')
|
||||||
|
sinon.spy(libp2p.peerStore, 'update')
|
||||||
|
sinon.stub(libp2p.dialer, 'connectToMultiaddr').returns(mockConnection({
|
||||||
|
remotePeer: remotePeerInfo.id
|
||||||
|
}))
|
||||||
|
|
||||||
|
const connection = await libp2p.dial(listenAddr)
|
||||||
|
await connection.close()
|
||||||
|
|
||||||
|
expect(libp2p.peerStore.put.callCount).to.equal(1)
|
||||||
|
expect(libp2p.peerStore.add.callCount).to.equal(1)
|
||||||
|
expect(libp2p.peerStore.update.callCount).to.equal(0)
|
||||||
|
|
||||||
|
const storedPeer = libp2p.peerStore.get(remoteId)
|
||||||
|
expect(storedPeer).to.exist()
|
||||||
|
})
|
||||||
|
})
|
||||||
|
|
||||||
|
describe('peer-store on discovery', () => {
|
||||||
|
// TODO: implement with discovery
|
||||||
|
})
|
13
test/utils/base-options.js
Normal file
13
test/utils/base-options.js
Normal file
@ -0,0 +1,13 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const Transport = require('libp2p-tcp')
|
||||||
|
const Muxer = require('libp2p-mplex')
|
||||||
|
const mockCrypto = require('../utils/mockCrypto')
|
||||||
|
|
||||||
|
module.exports = {
|
||||||
|
modules: {
|
||||||
|
transport: [Transport],
|
||||||
|
streamMuxer: [Muxer],
|
||||||
|
connEncryption: [mockCrypto]
|
||||||
|
}
|
||||||
|
}
|
24
test/utils/creators/peer.js
Normal file
24
test/utils/creators/peer.js
Normal file
@ -0,0 +1,24 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const PeerId = require('peer-id')
|
||||||
|
const PeerInfo = require('peer-info')
|
||||||
|
|
||||||
|
const Peers = require('../../fixtures/peers')
|
||||||
|
|
||||||
|
module.exports.createPeerInfo = async (length) => {
|
||||||
|
const peers = await Promise.all(
|
||||||
|
Array.from({ length })
|
||||||
|
.map((_, i) => PeerId.create())
|
||||||
|
)
|
||||||
|
|
||||||
|
return peers.map((peer) => new PeerInfo(peer))
|
||||||
|
}
|
||||||
|
|
||||||
|
module.exports.createPeerInfoFromFixture = async (length) => {
|
||||||
|
const peers = await Promise.all(
|
||||||
|
Array.from({ length })
|
||||||
|
.map((_, i) => PeerId.createFromJSON(Peers[i]))
|
||||||
|
)
|
||||||
|
|
||||||
|
return peers.map((peer) => new PeerInfo(peer))
|
||||||
|
}
|
50
test/utils/mockConnection.js
Normal file
50
test/utils/mockConnection.js
Normal file
@ -0,0 +1,50 @@
|
|||||||
|
'use strict'
|
||||||
|
|
||||||
|
const { Connection } = require('libp2p-interfaces/src/connection')
|
||||||
|
const multiaddr = require('multiaddr')
|
||||||
|
|
||||||
|
const pair = require('it-pair')
|
||||||
|
|
||||||
|
const peerUtils = require('./creators/peer')
|
||||||
|
|
||||||
|
module.exports = async (properties = {}) => {
|
||||||
|
const localAddr = multiaddr('/ip4/127.0.0.1/tcp/8080')
|
||||||
|
const remoteAddr = multiaddr('/ip4/127.0.0.1/tcp/8081')
|
||||||
|
|
||||||
|
const [localPeer, remotePeer] = await peerUtils.createPeerInfoFromFixture(2)
|
||||||
|
const openStreams = []
|
||||||
|
let streamId = 0
|
||||||
|
|
||||||
|
return new Connection({
|
||||||
|
localPeer: localPeer.id,
|
||||||
|
remotePeer: remotePeer.id,
|
||||||
|
localAddr,
|
||||||
|
remoteAddr,
|
||||||
|
stat: {
|
||||||
|
timeline: {
|
||||||
|
open: Date.now() - 10,
|
||||||
|
upgraded: Date.now()
|
||||||
|
},
|
||||||
|
direction: 'outbound',
|
||||||
|
encryption: '/secio/1.0.0',
|
||||||
|
multiplexer: '/mplex/6.7.0'
|
||||||
|
},
|
||||||
|
newStream: (protocols) => {
|
||||||
|
const id = streamId++
|
||||||
|
const stream = pair()
|
||||||
|
|
||||||
|
stream.close = () => stream.sink([])
|
||||||
|
stream.id = id
|
||||||
|
|
||||||
|
openStreams.push(stream)
|
||||||
|
|
||||||
|
return {
|
||||||
|
stream,
|
||||||
|
protocol: protocols[0]
|
||||||
|
}
|
||||||
|
},
|
||||||
|
close: () => { },
|
||||||
|
getStreams: () => openStreams,
|
||||||
|
...properties
|
||||||
|
})
|
||||||
|
}
|
Loading…
x
Reference in New Issue
Block a user