feat: async peerstore backed by datastores (#1058)

We have a peerstore that keeps all data for all observed peers in memory with no eviction.

This is fine when you don't discover many peers but when using the DHT you encounter a significant number of peers so our peer storage grows and grows over time.

We have a persistent peer store, but it just periodically writes peers into the datastore to be read at startup, still keeping them in memory.

It also means a restart doesn't give you any temporary reprieve from the memory leak as the previously observed peer data is read into memory at startup.

This change refactors the peerstore to use a datastore by default, reading and writing peer info as it arrives.  It can be configured with a MemoryDatastore if desired.

It was necessary to change the peerstore and *book interfaces to be asynchronous since the datastore api is asynchronous.

BREAKING CHANGE: `libp2p.handle`, `libp2p.registrar.register` and the peerstore methods have become async
This commit is contained in:
Alex Potsides
2022-01-20 12:03:35 +00:00
committed by GitHub
parent 0a4dc54d08
commit 978eb3676f
94 changed files with 3263 additions and 4039 deletions

View File

@ -9,7 +9,7 @@ const arrayEquals = require('libp2p-utils/src/array-equals')
const addressSort = require('libp2p-utils/src/address-sort')
const PeerId = require('peer-id')
const pDefer = require('p-defer')
const { MemoryDatastore } = require('datastore-core/memory')
const PeerStore = require('../../src/peer-store')
const Envelope = require('../../src/record/envelope')
const PeerRecord = require('../../src/record/peer-record')
@ -19,6 +19,11 @@ const {
codes: { ERR_INVALID_PARAMETERS }
} = require('../../src/errors')
/**
* @typedef {import('../../src/peer-store/types').PeerStore} PeerStore
* @typedef {import('../../src/peer-store/types').AddressBook} AddressBook
*/
const addr1 = new Multiaddr('/ip4/127.0.0.1/tcp/8000')
const addr2 = new Multiaddr('/ip4/20.0.0.1/tcp/8001')
const addr3 = new Multiaddr('/ip4/127.0.0.1/tcp/8002')
@ -31,10 +36,16 @@ describe('addressBook', () => {
})
describe('addressBook.set', () => {
let peerStore, ab
/** @type {PeerStore} */
let peerStore
/** @type {AddressBook} */
let ab
beforeEach(() => {
peerStore = new PeerStore({ peerId })
peerStore = new PeerStore({
peerId,
datastore: new MemoryDatastore()
})
ab = peerStore.addressBook
})
@ -42,9 +53,9 @@ describe('addressBook', () => {
peerStore.removeAllListeners()
})
it('throwns invalid parameters error if invalid PeerId is provided', () => {
it('throws invalid parameters error if invalid PeerId is provided', async () => {
try {
ab.set('invalid peerId')
await ab.set('invalid peerId')
} catch (/** @type {any} */ err) {
expect(err.code).to.equal(ERR_INVALID_PARAMETERS)
return
@ -52,9 +63,9 @@ describe('addressBook', () => {
throw new Error('invalid peerId should throw error')
})
it('throwns invalid parameters error if no addresses provided', () => {
it('throws invalid parameters error if no addresses provided', async () => {
try {
ab.set(peerId)
await ab.set(peerId)
} catch (/** @type {any} */ err) {
expect(err.code).to.equal(ERR_INVALID_PARAMETERS)
return
@ -62,9 +73,9 @@ describe('addressBook', () => {
throw new Error('no addresses should throw error')
})
it('throwns invalid parameters error if invalid multiaddrs are provided', () => {
it('throws invalid parameters error if invalid multiaddrs are provided', async () => {
try {
ab.set(peerId, ['invalid multiaddr'])
await ab.set(peerId, ['invalid multiaddr'])
} catch (/** @type {any} */ err) {
expect(err.code).to.equal(ERR_INVALID_PARAMETERS)
return
@ -72,7 +83,7 @@ describe('addressBook', () => {
throw new Error('invalid multiaddrs should throw error')
})
it('replaces the stored content by default and emit change event', () => {
it('replaces the stored content by default and emit change event', async () => {
const defer = pDefer()
const supportedMultiaddrs = [addr1, addr2]
@ -82,8 +93,8 @@ describe('addressBook', () => {
defer.resolve()
})
ab.set(peerId, supportedMultiaddrs)
const addresses = ab.get(peerId)
await ab.set(peerId, supportedMultiaddrs)
const addresses = await ab.get(peerId)
const multiaddrs = addresses.map((mi) => mi.multiaddr)
expect(multiaddrs).to.have.deep.members(supportedMultiaddrs)
@ -105,11 +116,11 @@ describe('addressBook', () => {
})
// set 1
ab.set(peerId, supportedMultiaddrsA)
await ab.set(peerId, supportedMultiaddrsA)
// set 2 (same content)
ab.set(peerId, supportedMultiaddrsB)
const addresses = ab.get(peerId)
await ab.set(peerId, supportedMultiaddrsB)
const addresses = await ab.get(peerId)
const multiaddrs = addresses.map((mi) => mi.multiaddr)
expect(multiaddrs).to.have.deep.members(supportedMultiaddrsB)
@ -130,10 +141,10 @@ describe('addressBook', () => {
})
// set 1
ab.set(peerId, supportedMultiaddrs)
await ab.set(peerId, supportedMultiaddrs)
// set 2 (same content)
ab.set(peerId, supportedMultiaddrs)
await ab.set(peerId, supportedMultiaddrs)
// Wait 50ms for incorrect second event
setTimeout(() => {
@ -145,10 +156,16 @@ describe('addressBook', () => {
})
describe('addressBook.add', () => {
let peerStore, ab
/** @type {PeerStore} */
let peerStore
/** @type {AddressBook} */
let ab
beforeEach(() => {
peerStore = new PeerStore({ peerId })
peerStore = new PeerStore({
peerId,
datastore: new MemoryDatastore()
})
ab = peerStore.addressBook
})
@ -156,9 +173,9 @@ describe('addressBook', () => {
peerStore.removeAllListeners()
})
it('throwns invalid parameters error if invalid PeerId is provided', () => {
it('throws invalid parameters error if invalid PeerId is provided', async () => {
try {
ab.add('invalid peerId')
await ab.add('invalid peerId')
} catch (/** @type {any} */ err) {
expect(err.code).to.equal(ERR_INVALID_PARAMETERS)
return
@ -166,9 +183,9 @@ describe('addressBook', () => {
throw new Error('invalid peerId should throw error')
})
it('throwns invalid parameters error if no addresses provided', () => {
it('throws invalid parameters error if no addresses provided', async () => {
try {
ab.add(peerId)
await ab.add(peerId)
} catch (/** @type {any} */ err) {
expect(err.code).to.equal(ERR_INVALID_PARAMETERS)
return
@ -176,9 +193,9 @@ describe('addressBook', () => {
throw new Error('no addresses provided should throw error')
})
it('throwns invalid parameters error if invalid multiaddrs are provided', () => {
it('throws invalid parameters error if invalid multiaddrs are provided', async () => {
try {
ab.add(peerId, ['invalid multiaddr'])
await ab.add(peerId, ['invalid multiaddr'])
} catch (/** @type {any} */ err) {
expect(err.code).to.equal(ERR_INVALID_PARAMETERS)
return
@ -193,7 +210,7 @@ describe('addressBook', () => {
defer.reject()
})
ab.add(peerId, [])
await ab.add(peerId, [])
// Wait 50ms for incorrect second event
setTimeout(() => {
@ -203,7 +220,7 @@ describe('addressBook', () => {
await defer.promise
})
it('adds the new content and emits change event', () => {
it('adds the new content and emits change event', async () => {
const defer = pDefer()
const supportedMultiaddrsA = [addr1, addr2]
@ -219,14 +236,14 @@ describe('addressBook', () => {
})
// Replace
ab.set(peerId, supportedMultiaddrsA)
let addresses = ab.get(peerId)
await ab.set(peerId, supportedMultiaddrsA)
let addresses = await ab.get(peerId)
let multiaddrs = addresses.map((mi) => mi.multiaddr)
expect(multiaddrs).to.have.deep.members(supportedMultiaddrsA)
// Add
ab.add(peerId, supportedMultiaddrsB)
addresses = ab.get(peerId)
await ab.add(peerId, supportedMultiaddrsB)
addresses = await ab.get(peerId)
multiaddrs = addresses.map((mi) => mi.multiaddr)
expect(multiaddrs).to.have.deep.members(finalMultiaddrs)
@ -249,11 +266,11 @@ describe('addressBook', () => {
})
// set 1
ab.set(peerId, supportedMultiaddrsA)
await ab.set(peerId, supportedMultiaddrsA)
// set 2 (content already existing)
ab.add(peerId, supportedMultiaddrsB)
const addresses = ab.get(peerId)
await ab.add(peerId, supportedMultiaddrsB)
const addresses = await ab.get(peerId)
const multiaddrs = addresses.map((mi) => mi.multiaddr)
expect(multiaddrs).to.have.deep.members(finalMultiaddrs)
@ -275,10 +292,10 @@ describe('addressBook', () => {
})
// set 1
ab.set(peerId, supportedMultiaddrsA)
await ab.set(peerId, supportedMultiaddrsA)
// set 2 (content already existing)
ab.add(peerId, supportedMultiaddrsB)
await ab.add(peerId, supportedMultiaddrsB)
// Wait 50ms for incorrect second event
setTimeout(() => {
@ -288,26 +305,32 @@ describe('addressBook', () => {
await defer.promise
})
it('does not add replicated content', () => {
it('does not add replicated content', async () => {
// set 1
ab.set(peerId, [addr1, addr1])
await ab.set(peerId, [addr1, addr1])
const addresses = ab.get(peerId)
const addresses = await ab.get(peerId)
expect(addresses).to.have.lengthOf(1)
})
})
describe('addressBook.get', () => {
let peerStore, ab
/** @type {PeerStore} */
let peerStore
/** @type {AddressBook} */
let ab
beforeEach(() => {
peerStore = new PeerStore({ peerId })
peerStore = new PeerStore({
peerId,
datastore: new MemoryDatastore()
})
ab = peerStore.addressBook
})
it('throwns invalid parameters error if invalid PeerId is provided', () => {
it('throws invalid parameters error if invalid PeerId is provided', async () => {
try {
ab.get('invalid peerId')
await ab.get('invalid peerId')
} catch (/** @type {any} */ err) {
expect(err.code).to.equal(ERR_INVALID_PARAMETERS)
return
@ -315,34 +338,40 @@ describe('addressBook', () => {
throw new Error('invalid peerId should throw error')
})
it('returns undefined if no multiaddrs are known for the provided peer', () => {
const addresses = ab.get(peerId)
it('returns empty if no multiaddrs are known for the provided peer', async () => {
const addresses = await ab.get(peerId)
expect(addresses).to.not.exist()
expect(addresses).to.be.empty()
})
it('returns the multiaddrs stored', () => {
it('returns the multiaddrs stored', async () => {
const supportedMultiaddrs = [addr1, addr2]
ab.set(peerId, supportedMultiaddrs)
await ab.set(peerId, supportedMultiaddrs)
const addresses = ab.get(peerId)
const addresses = await ab.get(peerId)
const multiaddrs = addresses.map((mi) => mi.multiaddr)
expect(multiaddrs).to.have.deep.members(supportedMultiaddrs)
})
})
describe('addressBook.getMultiaddrsForPeer', () => {
let peerStore, ab
/** @type {PeerStore} */
let peerStore
/** @type {AddressBook} */
let ab
beforeEach(() => {
peerStore = new PeerStore({ peerId })
peerStore = new PeerStore({
peerId,
datastore: new MemoryDatastore()
})
ab = peerStore.addressBook
})
it('throwns invalid parameters error if invalid PeerId is provided', () => {
it('throws invalid parameters error if invalid PeerId is provided', async () => {
try {
ab.getMultiaddrsForPeer('invalid peerId')
await ab.getMultiaddrsForPeer('invalid peerId')
} catch (/** @type {any} */ err) {
expect(err.code).to.equal(ERR_INVALID_PARAMETERS)
return
@ -350,28 +379,28 @@ describe('addressBook', () => {
throw new Error('invalid peerId should throw error')
})
it('returns undefined if no multiaddrs are known for the provided peer', () => {
const addresses = ab.getMultiaddrsForPeer(peerId)
it('returns empty if no multiaddrs are known for the provided peer', async () => {
const addresses = await ab.getMultiaddrsForPeer(peerId)
expect(addresses).to.not.exist()
expect(addresses).to.be.empty()
})
it('returns the multiaddrs stored', () => {
it('returns the multiaddrs stored', async () => {
const supportedMultiaddrs = [addr1, addr2]
ab.set(peerId, supportedMultiaddrs)
await ab.set(peerId, supportedMultiaddrs)
const multiaddrs = ab.getMultiaddrsForPeer(peerId)
const multiaddrs = await ab.getMultiaddrsForPeer(peerId)
multiaddrs.forEach((m) => {
expect(m.getPeerId()).to.equal(peerId.toB58String())
})
})
it('can sort multiaddrs providing a sorter', () => {
it('can sort multiaddrs providing a sorter', async () => {
const supportedMultiaddrs = [addr1, addr2]
ab.set(peerId, supportedMultiaddrs)
await ab.set(peerId, supportedMultiaddrs)
const multiaddrs = ab.getMultiaddrsForPeer(peerId, addressSort.publicAddressesFirst)
const multiaddrs = await ab.getMultiaddrsForPeer(peerId, addressSort.publicAddressesFirst)
const sortedAddresses = addressSort.publicAddressesFirst(supportedMultiaddrs.map((m) => ({ multiaddr: m })))
multiaddrs.forEach((m, index) => {
@ -381,16 +410,22 @@ describe('addressBook', () => {
})
describe('addressBook.delete', () => {
let peerStore, ab
/** @type {PeerStore} */
let peerStore
/** @type {AddressBook} */
let ab
beforeEach(() => {
peerStore = new PeerStore({ peerId })
peerStore = new PeerStore({
peerId,
datastore: new MemoryDatastore()
})
ab = peerStore.addressBook
})
it('throwns invalid parameters error if invalid PeerId is provided', () => {
it('throws invalid parameters error if invalid PeerId is provided', async () => {
try {
ab.delete('invalid peerId')
await ab.delete('invalid peerId')
} catch (/** @type {any} */ err) {
expect(err.code).to.equal(ERR_INVALID_PARAMETERS)
return
@ -398,16 +433,14 @@ describe('addressBook', () => {
throw new Error('invalid peerId should throw error')
})
it('returns false if no records exist for the peer and no event is emitted', () => {
it('does not emit an event if no records exist for the peer', async () => {
const defer = pDefer()
peerStore.on('change:multiaddrs', () => {
defer.reject()
})
const deleted = ab.delete(peerId)
expect(deleted).to.equal(false)
await ab.delete(peerId)
// Wait 50ms for incorrect invalid event
setTimeout(() => {
@ -417,11 +450,11 @@ describe('addressBook', () => {
return defer.promise
})
it('returns true if the record exists and an event is emitted', () => {
it('emits an event if the record exists', async () => {
const defer = pDefer()
const supportedMultiaddrs = [addr1, addr2]
ab.set(peerId, supportedMultiaddrs)
await ab.set(peerId, supportedMultiaddrs)
// Listen after set
peerStore.on('change:multiaddrs', ({ multiaddrs }) => {
@ -429,20 +462,24 @@ describe('addressBook', () => {
defer.resolve()
})
const deleted = ab.delete(peerId)
expect(deleted).to.equal(true)
await ab.delete(peerId)
return defer.promise
})
})
describe('certified records', () => {
let peerStore, ab
/** @type {PeerStore} */
let peerStore
/** @type {AddressBook} */
let ab
describe('consumes a valid peer record and stores its data', () => {
beforeEach(() => {
peerStore = new PeerStore({ peerId })
peerStore = new PeerStore({
peerId,
datastore: new MemoryDatastore()
})
ab = peerStore.addressBook
})
@ -455,15 +492,11 @@ describe('addressBook', () => {
const envelope = await Envelope.seal(peerRecord, peerId)
// consume peer record
const consumed = ab.consumePeerRecord(envelope)
const consumed = await ab.consumePeerRecord(envelope)
expect(consumed).to.eql(true)
// Validate stored envelope
const storedEnvelope = await ab.getPeerRecord(peerId)
expect(envelope.equals(storedEnvelope)).to.eql(true)
// Validate AddressBook addresses
const addrs = ab.get(peerId)
const addrs = await ab.get(peerId)
expect(addrs).to.exist()
expect(addrs).to.have.lengthOf(multiaddrs.length)
addrs.forEach((addr, index) => {
@ -488,7 +521,7 @@ describe('addressBook', () => {
})
// consume peer record
const consumed = ab.consumePeerRecord(envelope)
const consumed = await ab.consumePeerRecord(envelope)
expect(consumed).to.eql(true)
return defer.promise
@ -499,10 +532,10 @@ describe('addressBook', () => {
const multiaddrs = [addr1, addr2]
// Set addressBook data
ab.set(peerId, multiaddrs)
await ab.set(peerId, multiaddrs)
// Validate data exists, but not certified
let addrs = ab.get(peerId)
let addrs = await ab.get(peerId)
expect(addrs).to.exist()
expect(addrs).to.have.lengthOf(multiaddrs.length)
@ -525,14 +558,14 @@ describe('addressBook', () => {
})
// consume peer record
const consumed = ab.consumePeerRecord(envelope)
const consumed = await ab.consumePeerRecord(envelope)
expect(consumed).to.eql(true)
// Wait event
await defer.promise
// Validate data exists and certified
addrs = ab.get(peerId)
addrs = await ab.get(peerId)
expect(addrs).to.exist()
expect(addrs).to.have.lengthOf(multiaddrs.length)
addrs.forEach((addr, index) => {
@ -546,10 +579,10 @@ describe('addressBook', () => {
const multiaddrs = [addr1, addr2]
// Set addressBook data
ab.set(peerId, [addr1])
await ab.set(peerId, [addr1])
// Validate data exists, but not certified
let addrs = ab.get(peerId)
let addrs = await ab.get(peerId)
expect(addrs).to.exist()
expect(addrs).to.have.lengthOf(1)
expect(addrs[0].isCertified).to.eql(false)
@ -569,14 +602,14 @@ describe('addressBook', () => {
})
// consume peer record
const consumed = ab.consumePeerRecord(envelope)
const consumed = await ab.consumePeerRecord(envelope)
expect(consumed).to.eql(true)
// Wait event
await defer.promise
// Validate data exists and certified
addrs = ab.get(peerId)
addrs = await ab.get(peerId)
expect(addrs).to.exist()
expect(addrs).to.have.lengthOf(multiaddrs.length)
addrs.forEach((addr, index) => {
@ -591,10 +624,10 @@ describe('addressBook', () => {
const multiaddrsCertified = [addr1, addr2]
// Set addressBook data
ab.set(peerId, multiaddrsUncertified)
await ab.set(peerId, multiaddrsUncertified)
// Validate data exists, but not certified
let addrs = ab.get(peerId)
let addrs = await ab.get(peerId)
expect(addrs).to.exist()
expect(addrs).to.have.lengthOf(multiaddrsUncertified.length)
addrs.forEach((addr, index) => {
@ -616,14 +649,14 @@ describe('addressBook', () => {
})
// consume peer record
const consumed = ab.consumePeerRecord(envelope)
const consumed = await ab.consumePeerRecord(envelope)
expect(consumed).to.eql(true)
// Wait event
await defer.promise
// Validate data exists and certified
addrs = ab.get(peerId)
addrs = await ab.get(peerId)
expect(addrs).to.exist()
expect(addrs).to.have.lengthOf(multiaddrsCertified.length)
addrs.forEach((addr, index) => {
@ -635,16 +668,19 @@ describe('addressBook', () => {
describe('fails to consume invalid peer records', () => {
beforeEach(() => {
peerStore = new PeerStore({ peerId })
peerStore = new PeerStore({
peerId,
datastore: new MemoryDatastore()
})
ab = peerStore.addressBook
})
it('invalid peer record', () => {
it('invalid peer record', async () => {
const invalidEnvelope = {
payload: Buffer.from('invalid-peerRecord')
}
const consumed = ab.consumePeerRecord(invalidEnvelope)
const consumed = await ab.consumePeerRecord(invalidEnvelope)
expect(consumed).to.eql(false)
})
@ -659,7 +695,7 @@ describe('addressBook', () => {
})
const envelope = await Envelope.seal(peerRecord, peerId)
const consumed = ab.consumePeerRecord(envelope)
const consumed = await ab.consumePeerRecord(envelope)
expect(consumed).to.eql(false)
})
@ -679,10 +715,10 @@ describe('addressBook', () => {
const envelope2 = await Envelope.seal(peerRecord2, peerId)
// Consume envelope1 (bigger seqNumber)
let consumed = ab.consumePeerRecord(envelope1)
let consumed = await ab.consumePeerRecord(envelope1)
expect(consumed).to.eql(true)
consumed = ab.consumePeerRecord(envelope2)
consumed = await ab.consumePeerRecord(envelope2)
expect(consumed).to.eql(false)
})
@ -693,7 +729,7 @@ describe('addressBook', () => {
})
const envelope = await Envelope.seal(peerRecord, peerId)
const consumed = ab.consumePeerRecord(envelope)
const consumed = await ab.consumePeerRecord(envelope)
expect(consumed).to.eql(false)
})
})