mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-04-25 10:32:14 +00:00
refactor: peristent peer-store extended class and disabled by defaul
This commit is contained in:
parent
da7ddbb160
commit
b880d5de97
@ -45,7 +45,7 @@ const after = async () => {
|
||||
}
|
||||
|
||||
module.exports = {
|
||||
bundlesize: { maxSize: '179kB' },
|
||||
bundlesize: { maxSize: '185kB' },
|
||||
hooks: {
|
||||
pre: before,
|
||||
post: after
|
||||
|
@ -74,6 +74,7 @@ Creates an instance of Libp2p.
|
||||
| [options.dialer] | `object` | libp2p Dialer configuration
|
||||
| [options.metrics] | `object` | libp2p Metrics configuration
|
||||
| [options.peerId] | [`PeerId`][peer-id] | peerId instance (it will be created if not provided) |
|
||||
| [options.peerStore] | [`PeerId`][peer-id] | libp2p PeerStore configuration |
|
||||
|
||||
For Libp2p configurations and modules details read the [Configuration Document](./CONFIGURATION.md).
|
||||
|
||||
|
@ -506,6 +506,32 @@ const node = await Libp2p.create({
|
||||
})
|
||||
```
|
||||
|
||||
#### Configuring PeerStore
|
||||
|
||||
PeerStore persistence is disabled in libp2p by default. You can enable and configure it as follows. Aside from enabled being `false` by default, it will need an implementation of a [datastore](https://github.com/ipfs/interface-datastore).
|
||||
|
||||
```js
|
||||
const Libp2p = require('libp2p')
|
||||
const TCP = require('libp2p-tcp')
|
||||
const MPLEX = require('libp2p-mplex')
|
||||
const SECIO = require('libp2p-secio')
|
||||
|
||||
const LevelStore = require('datastore-level')
|
||||
|
||||
const node = await Libp2p.create({
|
||||
modules: {
|
||||
transport: [TCP],
|
||||
streamMuxer: [MPLEX],
|
||||
connEncryption: [SECIO]
|
||||
},
|
||||
datastore: new LevelStore('path/to/store'),
|
||||
peerStore: {
|
||||
persistence: true,
|
||||
threshold: 5
|
||||
}
|
||||
})
|
||||
```
|
||||
|
||||
#### Customizing Transports
|
||||
|
||||
Some Transports can be passed additional options when they are created. For example, `libp2p-webrtc-star` accepts an optional, custom `wrtc` implementation. In addition to libp2p passing itself and an `Upgrader` to handle connection upgrading, libp2p will also pass the options, if they are provided, from `config.transport`.
|
||||
|
@ -21,7 +21,8 @@ const DefaultConfig = {
|
||||
enabled: false
|
||||
},
|
||||
peerStore: {
|
||||
persistence: true
|
||||
persistence: false,
|
||||
threshold: 5
|
||||
},
|
||||
config: {
|
||||
dht: {
|
||||
|
20
src/index.js
20
src/index.js
@ -6,7 +6,6 @@ const globalThis = require('ipfs-utils/src/globalthis')
|
||||
const log = debug('libp2p')
|
||||
log.error = debug('libp2p:error')
|
||||
|
||||
const { MemoryDatastore } = require('interface-datastore')
|
||||
const PeerId = require('peer-id')
|
||||
|
||||
const peerRouting = require('./peer-routing')
|
||||
@ -24,6 +23,7 @@ const Metrics = require('./metrics')
|
||||
const TransportManager = require('./transport-manager')
|
||||
const Upgrader = require('./upgrader')
|
||||
const PeerStore = require('./peer-store')
|
||||
const PersistentPeerStore = require('./peer-store/persistent')
|
||||
const Registrar = require('./registrar')
|
||||
const ping = require('./ping')
|
||||
const {
|
||||
@ -45,11 +45,14 @@ class Libp2p extends EventEmitter {
|
||||
this._options = validateConfig(_options)
|
||||
|
||||
this.peerId = this._options.peerId
|
||||
this.datastore = this._options.datastore || new MemoryDatastore()
|
||||
this.peerStore = new PeerStore({
|
||||
datastore: this.datastore,
|
||||
...this._options.peerStore
|
||||
})
|
||||
this.datastore = this._options.datastore
|
||||
|
||||
this.peerStore = !(this.datastore && this._options.peerStore.persistence)
|
||||
? new PeerStore()
|
||||
: new PersistentPeerStore({
|
||||
datastore: this.datastore,
|
||||
...this._options.peerStore
|
||||
})
|
||||
|
||||
// Addresses {listen, announce, noAnnounce}
|
||||
this.addresses = this._options.addresses
|
||||
@ -223,7 +226,8 @@ class Libp2p extends EventEmitter {
|
||||
|
||||
this._discovery = new Map()
|
||||
|
||||
this.connectionManager.stop()
|
||||
await this.peerStore.stop()
|
||||
await this.connectionManager.stop()
|
||||
|
||||
await Promise.all([
|
||||
this.pubsub && this.pubsub.stop(),
|
||||
@ -398,7 +402,7 @@ class Libp2p extends EventEmitter {
|
||||
await this.transportManager.listen()
|
||||
|
||||
// Start PeerStore
|
||||
await this.peerStore.load()
|
||||
await this.peerStore.start()
|
||||
|
||||
if (this._config.pubsub.enabled) {
|
||||
this.pubsub && this.pubsub.start()
|
||||
|
@ -85,15 +85,11 @@ Access to its underlying books:
|
||||
|
||||
## Data Persistence
|
||||
|
||||
The data stored in the PeerStore will be persisted by default. Keeping a record of the peers already discovered by the peer, as well as their known data aims to improve the efficiency of peers joining the network after being offline.
|
||||
The data stored in the PeerStore can be persisted if configured appropriately. Keeping a record of the peers already discovered by the peer, as well as their known data aims to improve the efficiency of peers joining the network after being offline.
|
||||
|
||||
---
|
||||
TODO: Discuss if we should make it persisted by default now. Taking into consideration that we will use a MemoryDatastore by default, unless the user configures a datastore to use, it will be worthless. It might make sense to make it disabled by default until we work on improving configuration and provide good defauls for each environment.
|
||||
---
|
||||
The libp2p node will need to receive a [datastore](https://github.com/ipfs/interface-datastore), in order to store this data in a persistent way. A [datastore](https://github.com/ipfs/interface-datastore) stores its data in a key-value fashion. As a result, we need coherent keys so that we do not overwrite data.
|
||||
|
||||
The libp2p node will need to receive a [datastore](https://github.com/ipfs/interface-datastore), in order to store this data in a persistent way. Otherwise, it will be stored on a [memory datastore](https://github.com/ipfs/interface-datastore/blob/master/src/memory.js).
|
||||
|
||||
A [datastore](https://github.com/ipfs/interface-datastore) stores its data in a key-value fashion. As a result, we need coherent keys so that we do not overwrite data.
|
||||
The PeerStore should not be continuously updating the datastore with the new data observed. Accordingly, it should only store new data after reaching a certain threshold of "dirty" peers, as well as when the node is stopped.
|
||||
|
||||
Taking into account that a datastore allows queries using a key prefix, we can find all the information if we define a consistent namespace that allow us to find the content without having any information. The namespaces were defined as follows:
|
||||
|
||||
|
@ -9,7 +9,6 @@ const multiaddr = require('multiaddr')
|
||||
const PeerId = require('peer-id')
|
||||
|
||||
const Book = require('./book')
|
||||
const Protobuf = require('./pb/address-book.proto')
|
||||
|
||||
const {
|
||||
codes: { ERR_INVALID_PARAMETERS }
|
||||
@ -18,8 +17,6 @@ const {
|
||||
/**
|
||||
* The AddressBook is responsible for keeping the known multiaddrs
|
||||
* of a peer.
|
||||
* This data will be persisted in the PeerStore datastore as follows:
|
||||
* /peers/addrs/<b32 peer id no padding>
|
||||
*/
|
||||
class AddressBook extends Book {
|
||||
/**
|
||||
@ -40,24 +37,9 @@ class AddressBook extends Book {
|
||||
*/
|
||||
super({
|
||||
peerStore,
|
||||
event: {
|
||||
name: 'change:multiaddrs',
|
||||
property: 'multiaddrs',
|
||||
transformer: (data) => data.map((address) => address.multiaddr)
|
||||
},
|
||||
ds: {
|
||||
prefix: '/peers/addrs/',
|
||||
setTransformer: (data) => Protobuf.encode({
|
||||
addrs: data.map((address) => address.multiaddr.buffer)
|
||||
}),
|
||||
getTransformer: (encData) => {
|
||||
const data = Protobuf.decode(encData)
|
||||
|
||||
return data.addrs.map((a) => ({
|
||||
multiaddr: multiaddr(a)
|
||||
}))
|
||||
}
|
||||
}
|
||||
eventName: 'change:multiaddrs',
|
||||
eventProperty: 'multiaddrs',
|
||||
eventTransformer: (data) => data.map((address) => address.multiaddr)
|
||||
})
|
||||
|
||||
/**
|
||||
@ -145,6 +127,7 @@ class AddressBook extends Book {
|
||||
}
|
||||
|
||||
this._setData(peerId, addresses)
|
||||
|
||||
log(`added provided multiaddrs for ${id}`)
|
||||
|
||||
// Notify the existance of a new peer
|
||||
|
@ -1,11 +1,6 @@
|
||||
'use strict'
|
||||
|
||||
const errcode = require('err-code')
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:peer-store:book')
|
||||
log.error = debug('libp2p:peer-store:book:error')
|
||||
|
||||
const { Key } = require('interface-datastore')
|
||||
const PeerId = require('peer-id')
|
||||
|
||||
const {
|
||||
@ -16,31 +11,21 @@ const passthrough = data => data
|
||||
|
||||
/**
|
||||
* The Book is the skeleton for the PeerStore books.
|
||||
* It handles the PeerStore persistence and events.
|
||||
*/
|
||||
class Book {
|
||||
/**
|
||||
* @constructor
|
||||
* @param {Object} properties
|
||||
* @param {PeerStore} properties.peerStore PeerStore instance.
|
||||
* @param {Object} [properties.event] Event properties. If not provided, no events will be emitted.
|
||||
* @param {string} [properties.event.name] Name of the event to emit by the PeerStore.
|
||||
* @param {string} [properties.event.property] Name of the property to emit by the PeerStore.
|
||||
* @param {function} [properties.events.transformer] Transformer function of the provided data for being emitted.
|
||||
* @param {Object} [properties.ds] Datastore properties. If not provided, no data will be persisted.
|
||||
* @param {String} [properties.ds.prefix] Prefix of the Datastore Key
|
||||
* @param {String} [properties.ds.suffix = ''] Suffix of the Datastore Key
|
||||
* @param {function} [properties.ds.setTransformer] Transformer function of the provided data for being persisted.
|
||||
* @param {function} [properties.ds.getTransformer] Transformer function of the persisted data to be loaded.
|
||||
* @param {string} properties.eventName Name of the event to emit by the PeerStore.
|
||||
* @param {string} properties.eventProperty Name of the property to emit by the PeerStore.
|
||||
* @param {function} [properties.eventTransformer] Transformer function of the provided data for being emitted.
|
||||
*/
|
||||
constructor ({
|
||||
peerStore,
|
||||
event,
|
||||
ds
|
||||
}) {
|
||||
constructor ({ peerStore, eventName, eventProperty, eventTransformer = passthrough }) {
|
||||
this._ps = peerStore
|
||||
this.event = event
|
||||
this.ds = ds
|
||||
this.eventName = eventName
|
||||
this.eventProperty = eventProperty
|
||||
this.eventTransformer = eventTransformer
|
||||
|
||||
/**
|
||||
* Map known peers to their data.
|
||||
@ -50,38 +35,12 @@ class Book {
|
||||
}
|
||||
|
||||
/**
|
||||
* Load data from peerStore datastore into the books datastructures.
|
||||
* This will not persist the replicated data nor emit modify events.
|
||||
* @private
|
||||
* @return {Promise<void>}
|
||||
* Set known data of a provided peer.
|
||||
* @param {PeerId} peerId
|
||||
* @param {Array<Data>|Data} data
|
||||
*/
|
||||
async _loadData () {
|
||||
if (!this._ps._datastore || !this._ps._enabledPersistance || !this.ds) {
|
||||
return
|
||||
}
|
||||
|
||||
const prefix = this.ds.prefix || ''
|
||||
const suffix = this.ds.suffix || ''
|
||||
const transformer = this.ds.getTransformer || passthrough
|
||||
|
||||
for await (const { key, value } of this._ps._datastore.query({ prefix })) {
|
||||
try {
|
||||
// PeerId to add to the book
|
||||
const b32key = key.toString()
|
||||
.replace(prefix, '') // remove prefix from key
|
||||
.replace(suffix, '') // remove suffix from key
|
||||
const peerId = PeerId.createFromCID(b32key)
|
||||
// Data in the format to add to the book
|
||||
const data = transformer(value)
|
||||
// Add the book without persist the replicated data and emit modify
|
||||
this._setData(peerId, data, {
|
||||
persist: false,
|
||||
emit: false
|
||||
})
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
}
|
||||
set (peerId, data) {
|
||||
throw errcode(new Error('set must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED')
|
||||
}
|
||||
|
||||
/**
|
||||
@ -90,11 +49,10 @@ class Book {
|
||||
* @param {PeerId} peerId peerId of the data to store
|
||||
* @param {Array<*>} data data to store.
|
||||
* @param {Object} [options] storing options.
|
||||
* @param {boolean} [options.persist = true] persist the provided data.
|
||||
* @param {boolean} [options.emit = true] emit the provided data.
|
||||
* @return {Promise<void>}
|
||||
* @return {void}
|
||||
*/
|
||||
async _setData (peerId, data, { persist = true, emit = true } = {}) {
|
||||
_setData (peerId, data, { emit = true } = {}) {
|
||||
const b58key = peerId.toB58String()
|
||||
|
||||
// Store data in memory
|
||||
@ -102,53 +60,10 @@ class Book {
|
||||
this._setPeerId(peerId)
|
||||
|
||||
// Emit event
|
||||
if (this.event && emit) {
|
||||
const transformer = this.event.transformer || passthrough
|
||||
|
||||
this._ps.emit(this.event.name, {
|
||||
peerId,
|
||||
[this.event.property]: transformer(data)
|
||||
})
|
||||
}
|
||||
|
||||
// Add to Persistence datastore
|
||||
persist && await this._persistData(peerId, data)
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist data on the datastore
|
||||
* @private
|
||||
* @param {PeerId} peerId peerId of the data to persist
|
||||
* @param {Array<*>} data data to persist
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async _persistData (peerId, data) {
|
||||
if (!this._ps._datastore || !this._ps._enabledPersistance || !this.ds) {
|
||||
return
|
||||
}
|
||||
|
||||
const prefix = this.ds.prefix || ''
|
||||
const suffix = this.ds.suffix || ''
|
||||
const transformer = this.ds.setTransformer || passthrough
|
||||
|
||||
const b32key = peerId.toString()
|
||||
const k = `${prefix}${b32key}${suffix}`
|
||||
try {
|
||||
const value = transformer(data)
|
||||
|
||||
await this._ps._datastore.put(new Key(k), value)
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Set known data of a provided peer.
|
||||
* @param {PeerId} peerId
|
||||
* @param {Array<Data>|Data} data
|
||||
*/
|
||||
set (peerId, data) {
|
||||
throw errcode(new Error('set must be implemented by the subclass'), 'ERR_NOT_IMPLEMENTED')
|
||||
emit && this._ps.emit(this.eventName, {
|
||||
peerId,
|
||||
[this.eventProperty]: this.eventTransformer(data)
|
||||
})
|
||||
}
|
||||
|
||||
/**
|
||||
@ -189,22 +104,11 @@ class Book {
|
||||
return false
|
||||
}
|
||||
|
||||
// Emit event
|
||||
this.event && this._ps.emit(this.event.name, {
|
||||
this._ps.emit(this.eventName, {
|
||||
peerId,
|
||||
[this.event.property]: []
|
||||
[this.eventProperty]: []
|
||||
})
|
||||
|
||||
// Update Persistence datastore
|
||||
if (this._ps._datastore && this._ps._enabledPersistance && this.ds) {
|
||||
const prefix = this.ds.prefix || ''
|
||||
const suffix = this.ds.suffix || ''
|
||||
const b32key = peerId.toString()
|
||||
|
||||
const k = `${prefix}${b32key}${suffix}`
|
||||
this._ps._datastore.delete(new Key(k))
|
||||
}
|
||||
|
||||
return true
|
||||
}
|
||||
|
||||
|
@ -32,18 +32,10 @@ class PeerStore extends EventEmitter {
|
||||
|
||||
/**
|
||||
* @constructor
|
||||
* @param {Object} properties
|
||||
* @param {Datastore} [properties.datastore] Datastore to persist data.
|
||||
* @param {boolean} [properties.persistance = true] Persist peerstore data.
|
||||
*/
|
||||
constructor ({ datastore, persistance = true } = {}) {
|
||||
constructor () {
|
||||
super()
|
||||
|
||||
/**
|
||||
* Backend datastore used to persist data.
|
||||
*/
|
||||
this._datastore = datastore
|
||||
|
||||
/**
|
||||
* AddressBook containing a map of peerIdStr to Address.
|
||||
*/
|
||||
@ -60,19 +52,17 @@ class PeerStore extends EventEmitter {
|
||||
* @type {Map<string, Array<PeerId>}
|
||||
*/
|
||||
this.peerIds = new Map()
|
||||
|
||||
this._enabledPersistance = persistance
|
||||
}
|
||||
|
||||
/**
|
||||
* Load data from the datastore to populate the PeerStore.
|
||||
* Start the PeerStore.
|
||||
*/
|
||||
async load () {
|
||||
if (this._enabledPersistance) {
|
||||
await this.addressBook._loadData()
|
||||
await this.protoBook._loadData()
|
||||
}
|
||||
}
|
||||
start () {}
|
||||
|
||||
/**
|
||||
* Stop the PeerStore.
|
||||
*/
|
||||
stop () {}
|
||||
|
||||
/**
|
||||
* Get all the stored information of every peer.
|
||||
|
9
src/peer-store/persistent/consts.js
Normal file
9
src/peer-store/persistent/consts.js
Normal file
@ -0,0 +1,9 @@
|
||||
'use strict'
|
||||
|
||||
module.exports.COMMON_NAMESPACE = '/peers/'
|
||||
|
||||
// /peers/protos/<b32 peer id no padding>
|
||||
module.exports.ADDRESS_NAMESPACE = '/peers/addrs/'
|
||||
|
||||
// /peers/addrs/<b32 peer id no padding>
|
||||
module.exports.PROTOCOL_NAMESPACE = '/peers/protos/'
|
225
src/peer-store/persistent/index.js
Normal file
225
src/peer-store/persistent/index.js
Normal file
@ -0,0 +1,225 @@
|
||||
'use strict'
|
||||
|
||||
const debug = require('debug')
|
||||
const log = debug('libp2p:persistent-peer-store')
|
||||
log.error = debug('libp2p:persistent-peer-store:error')
|
||||
|
||||
const { Key } = require('interface-datastore')
|
||||
const multiaddr = require('multiaddr')
|
||||
const PeerId = require('peer-id')
|
||||
|
||||
const PeerStore = require('..')
|
||||
|
||||
const {
|
||||
ADDRESS_NAMESPACE,
|
||||
COMMON_NAMESPACE,
|
||||
PROTOCOL_NAMESPACE
|
||||
} = require('./consts')
|
||||
|
||||
const Addresses = require('./pb/address-book.proto')
|
||||
const Protocols = require('./pb/proto-book.proto')
|
||||
|
||||
/**
|
||||
* Responsible for managing the persistence of data in the PeerStore.
|
||||
*/
|
||||
class PersistentPeerStore extends PeerStore {
|
||||
/**
|
||||
* @constructor
|
||||
* @param {Object} properties
|
||||
* @param {Datastore} properties.datastore Datastore to persist data.
|
||||
* @param {number} [properties.threshold = 5] Number of dirty peers allowed before commit data.
|
||||
*/
|
||||
constructor ({ datastore, threshold = 5 }) {
|
||||
super()
|
||||
|
||||
/**
|
||||
* Backend datastore used to persist data.
|
||||
*/
|
||||
this._datastore = datastore
|
||||
|
||||
/**
|
||||
* Peers modified after the latest data persisted.
|
||||
*/
|
||||
this._dirtyPeers = new Set()
|
||||
|
||||
this.threshold = threshold
|
||||
this._addDirtyPeer = this._addDirtyPeer.bind(this)
|
||||
}
|
||||
|
||||
/**
|
||||
* Start Persistent PeerStore.
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async start () {
|
||||
log('Persistent PeerStore is starting')
|
||||
|
||||
// Handlers for dirty peers
|
||||
this.on('change:protocols', this._addDirtyPeer)
|
||||
this.on('change:multiaddrs', this._addDirtyPeer)
|
||||
|
||||
// Load data
|
||||
for await (const entry of this._datastore.query({ prefix: COMMON_NAMESPACE })) {
|
||||
this._processDatastoreEntry(entry)
|
||||
}
|
||||
|
||||
log('Persistent PeerStore started')
|
||||
}
|
||||
|
||||
async stop () {
|
||||
log('Persistent PeerStore is stopping')
|
||||
this.removeAllListeners()
|
||||
await this._commitData()
|
||||
log('Persistent PeerStore stopped')
|
||||
}
|
||||
|
||||
/**
|
||||
* Add modified peer to the dirty set
|
||||
* @private
|
||||
* @param {Object} params
|
||||
* @param {PeerId} params.peerId
|
||||
*/
|
||||
_addDirtyPeer ({ peerId }) {
|
||||
const peerIdstr = peerId.toB58String()
|
||||
|
||||
log('add dirty peer', peerIdstr)
|
||||
this._dirtyPeers.add(peerIdstr)
|
||||
|
||||
if (this._dirtyPeers.size >= this.threshold) {
|
||||
// Commit current data
|
||||
this._commitData()
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add all the peers current data to a datastore batch and commit it.
|
||||
* @private
|
||||
* @param {Array<string>} peers
|
||||
* @return {Promise<void>}
|
||||
*/
|
||||
async _commitData () {
|
||||
const commitPeers = Array.from(this._dirtyPeers)
|
||||
|
||||
if (!commitPeers.length) {
|
||||
return
|
||||
}
|
||||
|
||||
// Clear Dirty Peers set
|
||||
this._dirtyPeers.clear()
|
||||
|
||||
log('create batch commit')
|
||||
const batch = this._datastore.batch()
|
||||
for (const peerIdStr of commitPeers) {
|
||||
// PeerId (replace by keyBook)
|
||||
const peerId = this.peerIds.get(peerIdStr)
|
||||
|
||||
// Address Book
|
||||
this._batchAddressBook(peerId, batch)
|
||||
|
||||
// Proto Book
|
||||
this._batchProtoBook(peerId, batch)
|
||||
}
|
||||
|
||||
await batch.commit()
|
||||
log('batch committed')
|
||||
}
|
||||
|
||||
/**
|
||||
* Add address book data of the peer to the batch.
|
||||
* @private
|
||||
* @param {PeerId} peerId
|
||||
* @param {Object} batch
|
||||
*/
|
||||
_batchAddressBook (peerId, batch) {
|
||||
const b32key = peerId.toString()
|
||||
const key = new Key(`${ADDRESS_NAMESPACE}${b32key}`)
|
||||
|
||||
const addresses = this.addressBook.get(peerId)
|
||||
|
||||
try {
|
||||
// Deleted from the book
|
||||
if (!addresses) {
|
||||
batch.delete(key)
|
||||
return
|
||||
}
|
||||
|
||||
const encodedData = Addresses.encode({
|
||||
addrs: addresses.map((address) => ({
|
||||
multiaddr: address.multiaddr.buffer
|
||||
}))
|
||||
})
|
||||
|
||||
batch.put(key, encodedData)
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Add proto book data of the peer to the batch.
|
||||
* @private
|
||||
* @param {PeerId} peerId
|
||||
* @param {Object} batch
|
||||
*/
|
||||
_batchProtoBook (peerId, batch) {
|
||||
const b32key = peerId.toString()
|
||||
const key = new Key(`${PROTOCOL_NAMESPACE}${b32key}`)
|
||||
|
||||
const protocols = this.protoBook.get(peerId)
|
||||
|
||||
try {
|
||||
// Deleted from the book
|
||||
if (!protocols) {
|
||||
batch.delete(key)
|
||||
return
|
||||
}
|
||||
|
||||
const encodedData = Protocols.encode({ protocols })
|
||||
|
||||
batch.put(key, encodedData)
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Process datastore entry and add its data to the correct book.
|
||||
* @private
|
||||
* @param {Object} params
|
||||
* @param {string} params.key datastore key
|
||||
* @param {Buffer} params.value datastore value stored
|
||||
*/
|
||||
_processDatastoreEntry ({ key, value }) {
|
||||
try {
|
||||
const keyParts = key.toString().split('/')
|
||||
const peerId = PeerId.createFromCID(keyParts[3])
|
||||
|
||||
let decoded
|
||||
switch (keyParts[2]) {
|
||||
case 'addrs':
|
||||
decoded = Addresses.decode(value)
|
||||
|
||||
this.addressBook._setData(
|
||||
peerId,
|
||||
decoded.addrs.map((address) => ({
|
||||
multiaddr: multiaddr(address.multiaddr)
|
||||
})),
|
||||
{ emit: false })
|
||||
break
|
||||
case 'protos':
|
||||
decoded = Protocols.decode(value)
|
||||
|
||||
this.protoBook._setData(
|
||||
peerId,
|
||||
new Set(decoded.protocols),
|
||||
{ emit: false })
|
||||
break
|
||||
default:
|
||||
log('invalid data persisted for: ', key.toString())
|
||||
}
|
||||
} catch (err) {
|
||||
log.error(err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
module.exports = PersistentPeerStore
|
@ -2,10 +2,13 @@
|
||||
|
||||
const protons = require('protons')
|
||||
|
||||
/* eslint-disable no-tabs */
|
||||
const message = `
|
||||
message Addresses {
|
||||
repeated bytes addrs = 1;
|
||||
message Address {
|
||||
required bytes multiaddr = 1;
|
||||
}
|
||||
|
||||
repeated Address addrs = 1;
|
||||
}
|
||||
`
|
||||
|
@ -8,7 +8,6 @@ log.error = debug('libp2p:peer-store:proto-book:error')
|
||||
const PeerId = require('peer-id')
|
||||
|
||||
const Book = require('./book')
|
||||
const Protobuf = require('./pb/proto-book.proto')
|
||||
|
||||
const {
|
||||
codes: { ERR_INVALID_PARAMETERS }
|
||||
@ -17,8 +16,7 @@ const {
|
||||
/**
|
||||
* The ProtoBook is responsible for keeping the known supported
|
||||
* protocols of a peer.
|
||||
* This data will be persisted in the PeerStore datastore as follows:
|
||||
* /peers/protos/<b32 peer id no padding>
|
||||
* @fires ProtoBook#change:protocols
|
||||
*/
|
||||
class ProtoBook extends Book {
|
||||
/**
|
||||
@ -32,22 +30,9 @@ class ProtoBook extends Book {
|
||||
*/
|
||||
super({
|
||||
peerStore,
|
||||
event: {
|
||||
name: 'change:protocols',
|
||||
property: 'protocols',
|
||||
transformer: (data) => Array.from(data)
|
||||
},
|
||||
ds: {
|
||||
prefix: '/peers/protos/',
|
||||
setTransformer: (data) => Protobuf.encode({
|
||||
protocols: Array.from(data)
|
||||
}),
|
||||
getTransformer: (encData) => {
|
||||
const data = Protobuf.decode(encData)
|
||||
|
||||
return new Set(data.protocols)
|
||||
}
|
||||
}
|
||||
eventName: 'change:protocols',
|
||||
eventProperty: 'protocols',
|
||||
eventTransformer: (data) => Array.from(data)
|
||||
})
|
||||
|
||||
/**
|
||||
@ -124,6 +109,8 @@ class ProtoBook extends Book {
|
||||
return this
|
||||
}
|
||||
|
||||
protocols = [...newSet]
|
||||
|
||||
this._setData(peerId, newSet)
|
||||
log(`added provided protocols for ${id}`)
|
||||
|
||||
|
@ -4,11 +4,9 @@
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
|
||||
const PeerStore = require('../../src/peer-store')
|
||||
const multiaddr = require('multiaddr')
|
||||
const { MemoryDatastore } = require('interface-datastore')
|
||||
|
||||
const peerUtils = require('../utils/creators/peer')
|
||||
|
||||
@ -152,192 +150,3 @@ describe('peer-store', () => {
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('libp2p.peerStore', () => {
|
||||
let libp2p
|
||||
let memoryDatastore
|
||||
|
||||
beforeEach(async () => {
|
||||
memoryDatastore = new MemoryDatastore()
|
||||
;[libp2p] = await peerUtils.createPeer({
|
||||
started: false,
|
||||
config: {
|
||||
datastore: memoryDatastore,
|
||||
peerStore: {
|
||||
persistance: true
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
it('should try to load content from an empty datastore', async () => {
|
||||
const spyPeerStore = sinon.spy(libp2p.peerStore, 'load')
|
||||
const spyDs = sinon.spy(memoryDatastore, 'query')
|
||||
|
||||
await libp2p.start()
|
||||
expect(spyPeerStore).to.have.property('callCount', 1)
|
||||
// Should be called for AddressBook and ProtoBook
|
||||
expect(spyDs).to.have.property('callCount', 2)
|
||||
// No data to populate
|
||||
expect(libp2p.peerStore.peers.size).to.eq(0)
|
||||
})
|
||||
|
||||
it('should store peerStore content on datastore', async () => {
|
||||
const [peer] = await peerUtils.createPeerId({ number: 2 })
|
||||
const multiaddrs = [multiaddr('/ip4/156.10.1.22/tcp/1000')]
|
||||
const protocols = ['/ping/1.0.0']
|
||||
const spyDs = sinon.spy(memoryDatastore, 'put')
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
// AddressBook
|
||||
await libp2p.peerStore.addressBook.set(peer, multiaddrs)
|
||||
|
||||
expect(spyDs).to.have.property('callCount', 1)
|
||||
|
||||
// ProtoBook
|
||||
await libp2p.peerStore.protoBook.set(peer, protocols)
|
||||
|
||||
expect(spyDs).to.have.property('callCount', 2)
|
||||
|
||||
// Should have two peer records stored in the datastore
|
||||
const queryParams = {
|
||||
prefix: '/peers/'
|
||||
}
|
||||
let count = 0
|
||||
for await (const _ of memoryDatastore.query(queryParams)) { // eslint-disable-line
|
||||
count++
|
||||
}
|
||||
expect(count).to.equal(2)
|
||||
})
|
||||
|
||||
it('should load content to the peerStore when restart but not put in datastore again', async () => {
|
||||
const spyDs = sinon.spy(memoryDatastore, 'put')
|
||||
const peers = await peerUtils.createPeerId({ number: 2 })
|
||||
const multiaddrs = [
|
||||
multiaddr('/ip4/156.10.1.22/tcp/1000'),
|
||||
multiaddr('/ip4/156.10.1.23/tcp/1000')
|
||||
]
|
||||
const protocols = ['/ping/1.0.0']
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
// AddressBook
|
||||
await libp2p.peerStore.addressBook.set(peers[0], [multiaddrs[0]])
|
||||
await libp2p.peerStore.addressBook.set(peers[1], [multiaddrs[1]])
|
||||
|
||||
// ProtoBook
|
||||
await libp2p.peerStore.protoBook.set(peers[0], protocols)
|
||||
await libp2p.peerStore.protoBook.set(peers[1], protocols)
|
||||
|
||||
expect(spyDs).to.have.property('callCount', 4)
|
||||
expect(libp2p.peerStore.peers.size).to.equal(2)
|
||||
|
||||
await libp2p.stop()
|
||||
|
||||
// Load on restart
|
||||
const spyAb = sinon.spy(libp2p.peerStore.addressBook, '_loadData')
|
||||
const spyPb = sinon.spy(libp2p.peerStore.protoBook, '_loadData')
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
expect(spyAb).to.have.property('callCount', 1)
|
||||
expect(spyPb).to.have.property('callCount', 1)
|
||||
expect(spyDs).to.have.property('callCount', 4)
|
||||
expect(libp2p.peerStore.peers.size).to.equal(2)
|
||||
})
|
||||
|
||||
it('should load content to the peerStore when a new node is started with the same datastore', async () => {
|
||||
const peers = await peerUtils.createPeerId({ number: 2 })
|
||||
const multiaddrs = [
|
||||
multiaddr('/ip4/156.10.1.22/tcp/1000'),
|
||||
multiaddr('/ip4/156.10.1.23/tcp/1000')
|
||||
]
|
||||
const protocols = ['/ping/1.0.0']
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
// AddressBook
|
||||
await libp2p.peerStore.addressBook.set(peers[0], [multiaddrs[0]])
|
||||
await libp2p.peerStore.addressBook.set(peers[1], [multiaddrs[1]])
|
||||
|
||||
// ProtoBook
|
||||
await libp2p.peerStore.protoBook.set(peers[0], protocols)
|
||||
await libp2p.peerStore.protoBook.set(peers[1], protocols)
|
||||
|
||||
expect(libp2p.peerStore.peers.size).to.equal(2)
|
||||
|
||||
await libp2p.stop()
|
||||
|
||||
// Use a new node with the previously populated datastore
|
||||
const [newNode] = await peerUtils.createPeer({
|
||||
started: false,
|
||||
config: {
|
||||
datastore: memoryDatastore,
|
||||
peerStore: {
|
||||
persistance: true
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
expect(newNode.peerStore.peers.size).to.equal(0)
|
||||
|
||||
const spyAb = sinon.spy(newNode.peerStore.addressBook, '_loadData')
|
||||
const spyPb = sinon.spy(newNode.peerStore.protoBook, '_loadData')
|
||||
|
||||
await newNode.start()
|
||||
|
||||
expect(spyAb).to.have.property('callCount', 1)
|
||||
expect(spyPb).to.have.property('callCount', 1)
|
||||
|
||||
expect(newNode.peerStore.peers.size).to.equal(2)
|
||||
|
||||
// Validate data
|
||||
const peer0 = newNode.peerStore.get(peers[0])
|
||||
expect(peer0.id.toB58String()).to.eql(peers[0].toB58String())
|
||||
expect(peer0.protocols).to.have.members(protocols)
|
||||
expect(peer0.addresses.map((a) => a.multiaddr.toString())).to.have.members([multiaddrs[0].toString()])
|
||||
|
||||
const peer1 = newNode.peerStore.get(peers[1])
|
||||
expect(peer1.id.toB58String()).to.eql(peers[1].toB58String())
|
||||
expect(peer1.protocols).to.have.members(protocols)
|
||||
expect(peer1.addresses.map((a) => a.multiaddr.toString())).to.have.members([multiaddrs[1].toString()])
|
||||
|
||||
await newNode.stop()
|
||||
})
|
||||
|
||||
it('should delete content from the datastore on delete', async () => {
|
||||
const [peer] = await peerUtils.createPeerId({ number: 2 })
|
||||
const multiaddrs = [multiaddr('/ip4/156.10.1.22/tcp/1000')]
|
||||
const protocols = ['/ping/1.0.0']
|
||||
const spyDs = sinon.spy(memoryDatastore, 'delete')
|
||||
const spyAddressBook = sinon.spy(libp2p.peerStore.addressBook, 'delete')
|
||||
const spyProtoBook = sinon.spy(libp2p.peerStore.protoBook, 'delete')
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
// AddressBook
|
||||
await libp2p.peerStore.addressBook.set(peer, multiaddrs)
|
||||
// ProtoBook
|
||||
await libp2p.peerStore.protoBook.set(peer, protocols)
|
||||
|
||||
expect(spyDs).to.have.property('callCount', 0)
|
||||
|
||||
// Delete from PeerStore
|
||||
libp2p.peerStore.delete(peer)
|
||||
await libp2p.stop()
|
||||
|
||||
expect(spyAddressBook).to.have.property('callCount', 1)
|
||||
expect(spyProtoBook).to.have.property('callCount', 1)
|
||||
expect(spyDs).to.have.property('callCount', 2)
|
||||
|
||||
// Should have zero peer records stored in the datastore
|
||||
const queryParams = {
|
||||
prefix: '/peers/'
|
||||
}
|
||||
|
||||
for await (const _ of memoryDatastore.query(queryParams)) { // eslint-disable-line
|
||||
throw new Error('Datastore should be empty')
|
||||
}
|
||||
})
|
||||
})
|
||||
|
374
test/peer-store/persisted-peer-store.spec.js
Normal file
374
test/peer-store/persisted-peer-store.spec.js
Normal file
@ -0,0 +1,374 @@
|
||||
'use strict'
|
||||
/* eslint-env mocha */
|
||||
|
||||
const chai = require('chai')
|
||||
chai.use(require('dirty-chai'))
|
||||
const { expect } = chai
|
||||
const sinon = require('sinon')
|
||||
|
||||
const PeerStore = require('../../src/peer-store/persistent')
|
||||
const multiaddr = require('multiaddr')
|
||||
const { MemoryDatastore } = require('interface-datastore')
|
||||
|
||||
const peerUtils = require('../utils/creators/peer')
|
||||
|
||||
describe('Persisted PeerStore', () => {
|
||||
let datastore, peerStore
|
||||
|
||||
describe('start and stop flows', () => {
|
||||
beforeEach(() => {
|
||||
datastore = new MemoryDatastore()
|
||||
peerStore = new PeerStore({ datastore })
|
||||
})
|
||||
|
||||
afterEach(() => peerStore.stop())
|
||||
|
||||
it('should try to load content from an empty datastore on start', async () => {
|
||||
const spyQuery = sinon.spy(datastore, 'query')
|
||||
const spyProcessEntry = sinon.spy(peerStore, '_processDatastoreEntry')
|
||||
|
||||
await peerStore.start()
|
||||
expect(spyQuery).to.have.property('callCount', 1)
|
||||
expect(spyProcessEntry).to.have.property('callCount', 0)
|
||||
|
||||
// No data to populate
|
||||
expect(peerStore.peers.size).to.eq(0)
|
||||
})
|
||||
|
||||
it('should try to commit data on stop but should not add to batch if not exists', async () => {
|
||||
const spyDs = sinon.spy(peerStore, '_commitData')
|
||||
const spyBatch = sinon.spy(datastore, 'batch')
|
||||
|
||||
await peerStore.start()
|
||||
expect(spyDs).to.have.property('callCount', 0)
|
||||
|
||||
await peerStore.stop()
|
||||
expect(spyBatch).to.have.property('callCount', 0)
|
||||
expect(spyDs).to.have.property('callCount', 1)
|
||||
})
|
||||
})
|
||||
|
||||
describe('simple setup with content stored per change (threshold 1)', () => {
|
||||
beforeEach(() => {
|
||||
datastore = new MemoryDatastore()
|
||||
peerStore = new PeerStore({ datastore, threshold: 1 })
|
||||
})
|
||||
|
||||
afterEach(() => peerStore.stop())
|
||||
|
||||
it('should store peerStore content on datastore after peer marked as dirty (threshold 1)', async () => {
|
||||
const [peer] = await peerUtils.createPeerId({ number: 2 })
|
||||
const multiaddrs = [multiaddr('/ip4/156.10.1.22/tcp/1000')]
|
||||
const protocols = ['/ping/1.0.0']
|
||||
const spyDirty = sinon.spy(peerStore, '_addDirtyPeer')
|
||||
const spyDs = sinon.spy(datastore, 'batch')
|
||||
|
||||
await peerStore.start()
|
||||
|
||||
// AddressBook
|
||||
peerStore.addressBook.set(peer, multiaddrs)
|
||||
|
||||
expect(spyDirty).to.have.property('callCount', 1)
|
||||
expect(spyDs).to.have.property('callCount', 1)
|
||||
|
||||
// ProtoBook
|
||||
peerStore.protoBook.set(peer, protocols)
|
||||
|
||||
expect(spyDirty).to.have.property('callCount', 2)
|
||||
expect(spyDs).to.have.property('callCount', 2)
|
||||
|
||||
// Should have two peer records stored in the datastore
|
||||
const queryParams = {
|
||||
prefix: '/peers/'
|
||||
}
|
||||
|
||||
let count = 0
|
||||
for await (const _ of datastore.query(queryParams)) { // eslint-disable-line
|
||||
count++
|
||||
}
|
||||
expect(count).to.equal(2)
|
||||
|
||||
// Validate data
|
||||
const storedPeer = peerStore.get(peer)
|
||||
expect(storedPeer.id.toB58String()).to.eql(peer.toB58String())
|
||||
expect(storedPeer.protocols).to.have.members(protocols)
|
||||
expect(storedPeer.addresses.map((a) => a.multiaddr.toString())).to.have.members([multiaddrs[0].toString()])
|
||||
})
|
||||
|
||||
it('should load content to the peerStore when restart but not put in datastore again', async () => {
|
||||
const spyDs = sinon.spy(datastore, 'batch')
|
||||
const peers = await peerUtils.createPeerId({ number: 2 })
|
||||
const multiaddrs = [
|
||||
multiaddr('/ip4/156.10.1.22/tcp/1000'),
|
||||
multiaddr('/ip4/156.10.1.23/tcp/1000')
|
||||
]
|
||||
const protocols = ['/ping/1.0.0']
|
||||
|
||||
await peerStore.start()
|
||||
|
||||
// AddressBook
|
||||
peerStore.addressBook.set(peers[0], [multiaddrs[0]])
|
||||
peerStore.addressBook.set(peers[1], [multiaddrs[1]])
|
||||
|
||||
// ProtoBook
|
||||
peerStore.protoBook.set(peers[0], protocols)
|
||||
peerStore.protoBook.set(peers[1], protocols)
|
||||
|
||||
expect(spyDs).to.have.property('callCount', 4)
|
||||
expect(peerStore.peers.size).to.equal(2)
|
||||
|
||||
await peerStore.stop()
|
||||
peerStore.peerIds.clear()
|
||||
peerStore.addressBook.data.clear()
|
||||
peerStore.protoBook.data.clear()
|
||||
|
||||
// Load on restart
|
||||
const spy = sinon.spy(peerStore, '_processDatastoreEntry')
|
||||
|
||||
await peerStore.start()
|
||||
|
||||
expect(spy).to.have.property('callCount', 4) // 4 datastore entries
|
||||
expect(spyDs).to.have.property('callCount', 4) // 4 previous operations
|
||||
|
||||
expect(peerStore.peers.size).to.equal(2)
|
||||
expect(peerStore.addressBook.data.size).to.equal(2)
|
||||
expect(peerStore.protoBook.data.size).to.equal(2)
|
||||
})
|
||||
|
||||
it('should delete content from the datastore on delete', async () => {
|
||||
const [peer] = await peerUtils.createPeerId()
|
||||
const multiaddrs = [multiaddr('/ip4/156.10.1.22/tcp/1000')]
|
||||
const protocols = ['/ping/1.0.0']
|
||||
|
||||
await peerStore.start()
|
||||
|
||||
// AddressBook
|
||||
peerStore.addressBook.set(peer, multiaddrs)
|
||||
// ProtoBook
|
||||
peerStore.protoBook.set(peer, protocols)
|
||||
|
||||
const spyDs = sinon.spy(datastore, 'batch')
|
||||
const spyAddressBook = sinon.spy(peerStore.addressBook, 'delete')
|
||||
const spyProtoBook = sinon.spy(peerStore.protoBook, 'delete')
|
||||
|
||||
// Delete from PeerStore
|
||||
peerStore.delete(peer)
|
||||
await peerStore.stop()
|
||||
|
||||
expect(spyAddressBook).to.have.property('callCount', 1)
|
||||
expect(spyProtoBook).to.have.property('callCount', 1)
|
||||
expect(spyDs).to.have.property('callCount', 2)
|
||||
|
||||
// Should have zero peer records stored in the datastore
|
||||
const queryParams = {
|
||||
prefix: '/peers/'
|
||||
}
|
||||
|
||||
for await (const _ of datastore.query(queryParams)) { // eslint-disable-line
|
||||
throw new Error('Datastore should be empty')
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
describe('setup with content not stored per change (threshold 2)', () => {
|
||||
beforeEach(() => {
|
||||
datastore = new MemoryDatastore()
|
||||
peerStore = new PeerStore({ datastore, threshold: 2 })
|
||||
})
|
||||
|
||||
afterEach(() => peerStore.stop())
|
||||
|
||||
it('should not commit until threshold is reached', async () => {
|
||||
const spyDirty = sinon.spy(peerStore, '_addDirtyPeer')
|
||||
const spyDs = sinon.spy(datastore, 'batch')
|
||||
|
||||
const peers = await peerUtils.createPeerId({ number: 2 })
|
||||
|
||||
const multiaddrs = [multiaddr('/ip4/156.10.1.22/tcp/1000')]
|
||||
const protocols = ['/ping/1.0.0']
|
||||
|
||||
await peerStore.start()
|
||||
|
||||
expect(spyDirty).to.have.property('callCount', 0)
|
||||
expect(spyDs).to.have.property('callCount', 0)
|
||||
|
||||
// Add Peer0 data in multiple books
|
||||
peerStore.addressBook.set(peers[0], multiaddrs)
|
||||
peerStore.protoBook.set(peers[0], protocols)
|
||||
|
||||
// Remove data from the same Peer
|
||||
peerStore.addressBook.delete(peers[0])
|
||||
|
||||
expect(spyDirty).to.have.property('callCount', 3)
|
||||
expect(peerStore._dirtyPeers.size).to.equal(1)
|
||||
expect(spyDs).to.have.property('callCount', 0)
|
||||
|
||||
const queryParams = {
|
||||
prefix: '/peers/'
|
||||
}
|
||||
for await (const _ of datastore.query(queryParams)) { // eslint-disable-line
|
||||
throw new Error('Datastore should be empty')
|
||||
}
|
||||
|
||||
// Add data for second book
|
||||
peerStore.addressBook.set(peers[1], multiaddrs)
|
||||
|
||||
expect(spyDirty).to.have.property('callCount', 4)
|
||||
expect(spyDs).to.have.property('callCount', 1)
|
||||
expect(peerStore._dirtyPeers.size).to.equal(0) // Reset
|
||||
|
||||
// Should have two peer records stored in the datastore
|
||||
let count = 0
|
||||
for await (const _ of datastore.query(queryParams)) { // eslint-disable-line
|
||||
count++
|
||||
}
|
||||
expect(count).to.equal(2)
|
||||
expect(peerStore.peers.size).to.equal(2)
|
||||
})
|
||||
|
||||
it('should commit on stop if threshold was not reached', async () => {
|
||||
const spyDirty = sinon.spy(peerStore, '_addDirtyPeer')
|
||||
const spyDs = sinon.spy(datastore, 'batch')
|
||||
|
||||
const protocols = ['/ping/1.0.0']
|
||||
const [peer] = await peerUtils.createPeerId()
|
||||
|
||||
await peerStore.start()
|
||||
|
||||
// Add Peer data in a booka
|
||||
peerStore.protoBook.set(peer, protocols)
|
||||
|
||||
expect(spyDs).to.have.property('callCount', 0)
|
||||
expect(spyDirty).to.have.property('callCount', 1)
|
||||
expect(peerStore._dirtyPeers.size).to.equal(1)
|
||||
|
||||
const queryParams = {
|
||||
prefix: '/peers/'
|
||||
}
|
||||
for await (const _ of datastore.query(queryParams)) { // eslint-disable-line
|
||||
throw new Error('Datastore should be empty')
|
||||
}
|
||||
|
||||
await peerStore.stop()
|
||||
|
||||
expect(spyDirty).to.have.property('callCount', 1)
|
||||
expect(spyDs).to.have.property('callCount', 1)
|
||||
expect(peerStore._dirtyPeers.size).to.equal(0) // Reset
|
||||
|
||||
// Should have one peer record stored in the datastore
|
||||
let count = 0
|
||||
for await (const _ of datastore.query(queryParams)) { // eslint-disable-line
|
||||
count++
|
||||
}
|
||||
expect(count).to.equal(1)
|
||||
expect(peerStore.peers.size).to.equal(1)
|
||||
})
|
||||
})
|
||||
})
|
||||
|
||||
describe('libp2p.peerStore (Persisted)', () => {
|
||||
describe('disabled by default', () => {
|
||||
let libp2p
|
||||
|
||||
before(async () => {
|
||||
[libp2p] = await peerUtils.createPeer({
|
||||
started: false
|
||||
})
|
||||
})
|
||||
|
||||
afterEach(() => libp2p.stop())
|
||||
|
||||
it('should not have have persistence capabilities', async () => {
|
||||
await libp2p.start()
|
||||
expect(libp2p.peerStore._dirtyPeers).to.not.exist()
|
||||
expect(libp2p.peerStore.threshold).to.not.exist()
|
||||
})
|
||||
})
|
||||
|
||||
describe('enabled', () => {
|
||||
let libp2p
|
||||
let memoryDatastore
|
||||
|
||||
beforeEach(async () => {
|
||||
memoryDatastore = new MemoryDatastore()
|
||||
;[libp2p] = await peerUtils.createPeer({
|
||||
started: false,
|
||||
config: {
|
||||
datastore: memoryDatastore,
|
||||
peerStore: {
|
||||
persistence: true,
|
||||
threshold: 2 // trigger on second peer changed
|
||||
}
|
||||
}
|
||||
})
|
||||
})
|
||||
|
||||
afterEach(() => libp2p.stop())
|
||||
|
||||
it('should start on libp2p start and load content', async () => {
|
||||
const spyPeerStore = sinon.spy(libp2p.peerStore, 'start')
|
||||
const spyDs = sinon.spy(memoryDatastore, 'query')
|
||||
|
||||
await libp2p.start()
|
||||
expect(spyPeerStore).to.have.property('callCount', 1)
|
||||
expect(spyDs).to.have.property('callCount', 1)
|
||||
})
|
||||
|
||||
it('should load content to the peerStore when a new node is started with the same datastore', async () => {
|
||||
const peers = await peerUtils.createPeerId({ number: 2 })
|
||||
const multiaddrs = [
|
||||
multiaddr('/ip4/156.10.1.22/tcp/1000'),
|
||||
multiaddr('/ip4/156.10.1.23/tcp/1000')
|
||||
]
|
||||
const protocols = ['/ping/1.0.0']
|
||||
|
||||
await libp2p.start()
|
||||
|
||||
// AddressBook
|
||||
libp2p.peerStore.addressBook.set(peers[0], [multiaddrs[0]])
|
||||
libp2p.peerStore.addressBook.set(peers[1], [multiaddrs[1]])
|
||||
|
||||
// ProtoBook
|
||||
libp2p.peerStore.protoBook.set(peers[0], protocols)
|
||||
libp2p.peerStore.protoBook.set(peers[1], protocols)
|
||||
|
||||
expect(libp2p.peerStore.peers.size).to.equal(2)
|
||||
|
||||
await libp2p.stop()
|
||||
|
||||
// Use a new node with the previously populated datastore
|
||||
const [newNode] = await peerUtils.createPeer({
|
||||
started: false,
|
||||
config: {
|
||||
datastore: memoryDatastore,
|
||||
peerStore: {
|
||||
persistence: true
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
expect(newNode.peerStore.peers.size).to.equal(0)
|
||||
|
||||
const spy = sinon.spy(newNode.peerStore, '_processDatastoreEntry')
|
||||
|
||||
await newNode.start()
|
||||
|
||||
expect(spy).to.have.property('callCount', 4) // 4 datastore entries
|
||||
|
||||
expect(newNode.peerStore.peers.size).to.equal(2)
|
||||
|
||||
// Validate data
|
||||
const peer0 = newNode.peerStore.get(peers[0])
|
||||
expect(peer0.id.toB58String()).to.eql(peers[0].toB58String())
|
||||
expect(peer0.protocols).to.have.members(protocols)
|
||||
expect(peer0.addresses.map((a) => a.multiaddr.toString())).to.have.members([multiaddrs[0].toString()])
|
||||
|
||||
const peer1 = newNode.peerStore.get(peers[1])
|
||||
expect(peer1.id.toB58String()).to.eql(peers[1].toB58String())
|
||||
expect(peer1.protocols).to.have.members(protocols)
|
||||
expect(peer1.addresses.map((a) => a.multiaddr.toString())).to.have.members([multiaddrs[1].toString()])
|
||||
|
||||
await newNode.stop()
|
||||
})
|
||||
})
|
||||
})
|
@ -28,9 +28,6 @@ async function createPeer ({ number = 1, fixture = true, started = true, populat
|
||||
const peers = await pTimes(number, (i) => Libp2p.create({
|
||||
peerId: peerIds[i],
|
||||
addresses,
|
||||
peerStore: {
|
||||
persistence: false
|
||||
},
|
||||
...defaultOptions,
|
||||
...config
|
||||
}))
|
||||
|
Loading…
x
Reference in New Issue
Block a user