feat: peer store (#470)

* feat: peer-store v0

* chore: apply suggestions from code review

Co-Authored-By: Jacob Heun <jacobheun@gmail.com>
This commit is contained in:
Vasco Santos
2019-11-06 15:11:13 +01:00
committed by Jacob Heun
parent fe2a8eddbb
commit 582094a834
8 changed files with 519 additions and 16 deletions

View File

@ -11,7 +11,6 @@ const promisify = require('promisify-es6')
const each = require('async/each')
const nextTick = require('async/nextTick')
const PeerBook = require('peer-book')
const PeerInfo = require('peer-info')
const multiaddr = require('multiaddr')
const Switch = require('./switch')
@ -29,6 +28,7 @@ const { codes } = require('./errors')
const Dialer = require('./dialer')
const TransportManager = require('./transport-manager')
const Upgrader = require('./upgrader')
const PeerStore = require('./peer-store')
const notStarted = (action, state) => {
return errCode(
@ -54,7 +54,7 @@ class Libp2p extends EventEmitter {
this.datastore = this._options.datastore
this.peerInfo = this._options.peerInfo
this.peerBook = this._options.peerBook || new PeerBook()
this.peerStore = new PeerStore()
this._modules = this._options.modules
this._config = this._options.config
@ -62,13 +62,15 @@ class Libp2p extends EventEmitter {
this._discovery = [] // Discovery service instances/references
// create the switch, and listen for errors
this._switch = new Switch(this.peerInfo, this.peerBook, this._options.switch)
this._switch = new Switch(this.peerInfo, this.peerStore, this._options.switch)
// Setup the Upgrader
this.upgrader = new Upgrader({
localPeer: this.peerInfo.id,
onConnection: (connection) => {
const peerInfo = getPeerInfo(connection.remotePeer)
this.peerStore.put(peerInfo)
this.emit('peer:connect', peerInfo)
},
onConnectionEnd: (connection) => {
@ -179,10 +181,10 @@ class Libp2p extends EventEmitter {
// Once we start, emit and dial any peers we may have already discovered
this.state.on('STARTED', () => {
this.peerBook.getAllArray().forEach((peerInfo) => {
for (const peerInfo of this.peerStore.peers) {
this.emit('peer:discovery', peerInfo)
this._maybeConnect(peerInfo)
})
}
})
this._peerDiscovered = this._peerDiscovered.bind(this)
@ -245,7 +247,7 @@ class Libp2p extends EventEmitter {
/**
* Dials to the provided peer. If successful, the `PeerInfo` of the
* peer will be added to the nodes `PeerBook`
* peer will be added to the nodes `peerStore`
*
* @param {PeerInfo|PeerId|Multiaddr|string} peer The peer to dial
* @param {object} options
@ -258,7 +260,7 @@ class Libp2p extends EventEmitter {
/**
* Dials to the provided peer and handshakes with the given protocol.
* If successful, the `PeerInfo` of the peer will be added to the nodes `PeerBook`,
* If successful, the `PeerInfo` of the peer will be added to the nodes `peerStore`,
* and the `Connection` will be sent in the callback
*
* @async
@ -277,11 +279,19 @@ class Libp2p extends EventEmitter {
connection = await this.dialer.connectToPeer(peer, options)
}
const peerInfo = getPeerInfo(connection.remotePeer)
// If a protocol was provided, create a new stream
if (protocols) {
return connection.newStream(protocols)
const stream = await connection.newStream(protocols)
peerInfo.protocols.add(stream.protocol)
this.peerStore.put(peerInfo)
return stream
}
this.peerStore.put(peerInfo)
return connection
}
@ -369,12 +379,6 @@ class Libp2p extends EventEmitter {
* the `peer:discovery` event. If auto dial is enabled for libp2p
* and the current connection count is under the low watermark, the
* peer will be dialed.
*
* TODO: If `peerBook.put` becomes centralized, https://github.com/libp2p/js-libp2p/issues/345,
* it would be ideal if only new peers were emitted. Currently, with
* other modules adding peers to the `PeerBook` we have no way of knowing
* if a peer is new or not, so it has to be emitted.
*
* @private
* @param {PeerInfo} peerInfo
*/
@ -383,7 +387,7 @@ class Libp2p extends EventEmitter {
log.error(new Error(codes.ERR_DISCOVERED_SELF))
return
}
peerInfo = this.peerBook.put(peerInfo)
peerInfo = this.peerStore.put(peerInfo)
if (!this.isStarted()) return

3
src/peer-store/README.md Normal file
View File

@ -0,0 +1,3 @@
# Peerstore
WIP

190
src/peer-store/index.js Normal file
View File

@ -0,0 +1,190 @@
'use strict'
const assert = require('assert')
const debug = require('debug')
const log = debug('libp2p:peer-store')
log.error = debug('libp2p:peer-store:error')
const { EventEmitter } = require('events')
const PeerInfo = require('peer-info')
/**
* Responsible for managing known peers, as well as their addresses and metadata
* @fires PeerStore#peer Emitted when a peer is connected to this node
* @fires PeerStore#change:protocols
* @fires PeerStore#change:multiaddrs
*/
class PeerStore extends EventEmitter {
constructor () {
super()
/**
* Map of peers
*
* @type {Map<string, PeerInfo>}
*/
this.peers = new Map()
// TODO: Track ourselves. We should split `peerInfo` up into its pieces so we get better
// control and observability. This will be the initial step for removing PeerInfo
// https://github.com/libp2p/go-libp2p-core/blob/master/peerstore/peerstore.go
// this.addressBook = new Map()
// this.protoBook = new Map()
}
/**
* Stores the peerInfo of a new peer.
* If already exist, its info is updated.
* @param {PeerInfo} peerInfo
*/
put (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
// Already know the peer?
if (this.peers.has(peerInfo.id.toB58String())) {
this.update(peerInfo)
} else {
this.add(peerInfo)
// Emit the new peer found
this.emit('peer', peerInfo)
}
}
/**
* Add a new peer to the store.
* @param {PeerInfo} peerInfo
*/
add (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
// Create new instance and add values to it
const newPeerInfo = new PeerInfo(peerInfo.id)
peerInfo.multiaddrs.forEach((ma) => newPeerInfo.multiaddrs.add(ma))
peerInfo.protocols.forEach((p) => newPeerInfo.protocols.add(p))
const connectedMa = peerInfo.isConnected()
connectedMa && newPeerInfo.connect(connectedMa)
const peerProxy = new Proxy(newPeerInfo, {
set: (obj, prop, value) => {
if (prop === 'multiaddrs') {
this.emit('change:multiaddrs', {
peerInfo: obj,
multiaddrs: value.toArray()
})
} else if (prop === 'protocols') {
this.emit('change:protocols', {
peerInfo: obj,
protocols: Array.from(value)
})
}
return Reflect.set(...arguments)
}
})
this.peers.set(peerInfo.id.toB58String(), peerProxy)
}
/**
* Updates an already known peer.
* @param {PeerInfo} peerInfo
*/
update (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
const id = peerInfo.id.toB58String()
const recorded = this.peers.get(id)
// pass active connection state
const ma = peerInfo.isConnected()
if (ma) {
recorded.connect(ma)
}
// Verify new multiaddrs
// TODO: better track added and removed multiaddrs
const multiaddrsIntersection = [
...recorded.multiaddrs.toArray()
].filter((m) => peerInfo.multiaddrs.has(m))
if (multiaddrsIntersection.length !== peerInfo.multiaddrs.size ||
multiaddrsIntersection.length !== recorded.multiaddrs.size) {
// recorded.multiaddrs = peerInfo.multiaddrs
recorded.multiaddrs.clear()
for (const ma of peerInfo.multiaddrs.toArray()) {
recorded.multiaddrs.add(ma)
}
this.emit('change:multiaddrs', {
peerInfo: peerInfo,
multiaddrs: recorded.multiaddrs.toArray()
})
}
// Update protocols
// TODO: better track added and removed protocols
const protocolsIntersection = new Set(
[...recorded.protocols].filter((p) => peerInfo.protocols.has(p))
)
if (protocolsIntersection.size !== peerInfo.protocols.size ||
protocolsIntersection.size !== recorded.protocols.size) {
recorded.protocols.clear()
for (const protocol of peerInfo.protocols) {
recorded.protocols.add(protocol)
}
this.emit('change:protocols', {
peerInfo: peerInfo,
protocols: Array.from(recorded.protocols)
})
}
// Add the public key if missing
if (!recorded.id.pubKey && peerInfo.id.pubKey) {
recorded.id.pubKey = peerInfo.id.pubKey
}
}
/**
* Get the info to the given id.
* @param {string} peerId b58str id
* @returns {PeerInfo}
*/
get (peerId) {
const peerInfo = this.peers.get(peerId)
if (peerInfo) {
return peerInfo
}
return undefined
}
/**
* Removes the Peer with the matching `peerId` from the PeerStore
* @param {string} peerId b58str id
* @returns {boolean} true if found and removed
*/
remove (peerId) {
return this.peers.delete(peerId)
}
/**
* Completely replaces the existing peers metadata with the given `peerInfo`
* @param {PeerInfo} peerInfo
* @returns {void}
*/
replace (peerInfo) {
assert(PeerInfo.isPeerInfo(peerInfo), 'peerInfo must be an instance of peer-info')
this.remove(peerInfo.id.toB58String())
this.add(peerInfo)
}
}
module.exports = PeerStore