feat: connection gater (#1142)

Port of https://github.com/libp2p/go-libp2p-core/blob/master/connmgr/gater.go

Adds a new configuration key `connectionGater` which allows denying the dialing of certain peers, individual multiaddrs and the creation of connections at certain points in the connection flow.

Fixes: https://github.com/libp2p/js-libp2p/issues/175
Refs: https://github.com/libp2p/js-libp2p/issues/744
Refs: https://github.com/libp2p/js-libp2p/issues/769

Co-authored-by: mzdws <8580712+mzdws@user.noreply.gitee.com>
This commit is contained in:
Alex Potsides
2022-01-25 16:27:01 +00:00
committed by GitHub
parent 9b22c6e2f9
commit ff32eba6a0
21 changed files with 770 additions and 98 deletions

View File

@@ -13,6 +13,7 @@ const { FaultTolerance } = require('./transport-manager')
/**
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('./types').ConnectionGater} ConnectionGater
* @typedef {import('.').Libp2pOptions} Libp2pOptions
* @typedef {import('.').constructorOptions} constructorOptions
*/
@@ -27,6 +28,7 @@ const DefaultConfig = {
connectionManager: {
minConnections: 25
},
connectionGater: /** @type {ConnectionGater} */ {},
transportManager: {
faultTolerance: FaultTolerance.FATAL_ALL
},

View File

@@ -1,6 +1,9 @@
'use strict'
const debug = require('debug')
const all = require('it-all')
const filter = require('it-filter')
const { pipe } = require('it-pipe')
const log = Object.assign(debug('libp2p:dialer'), {
error: debug('libp2p:dialer:err')
})
@@ -33,12 +36,14 @@ const METRICS_PENDING_DIAL_TARGETS = 'pending-dial-targets'
* @typedef {import('../peer-store/types').PeerStore} PeerStore
* @typedef {import('../peer-store/types').Address} Address
* @typedef {import('../transport-manager')} TransportManager
* @typedef {import('../types').ConnectionGater} ConnectionGater
*/
/**
* @typedef {Object} DialerProperties
* @property {PeerStore} peerStore
* @property {TransportManager} transportManager
* @property {ConnectionGater} connectionGater
*
* @typedef {(addr:Multiaddr) => Promise<string[]>} Resolver
*
@@ -70,6 +75,7 @@ class Dialer {
constructor ({
transportManager,
peerStore,
connectionGater,
addressSorter = publicAddressesFirst,
maxParallelDials = MAX_PARALLEL_DIALS,
maxAddrsToDial = MAX_ADDRS_TO_DIAL,
@@ -78,6 +84,7 @@ class Dialer {
resolvers = {},
metrics
}) {
this.connectionGater = connectionGater
this.transportManager = transportManager
this.peerStore = peerStore
this.addressSorter = addressSorter
@@ -136,6 +143,12 @@ class Dialer {
* @returns {Promise<Connection>}
*/
async connectToPeer (peer, options = {}) {
const { id } = getPeer(peer)
if (await this.connectionGater.denyDialPeer(id)) {
throw errCode(new Error('The dial request is blocked by gater.allowDialPeer'), codes.ERR_PEER_DIAL_INTERCEPTED)
}
const dialTarget = await this._createCancellableDialTarget(peer)
if (!dialTarget.addrs.length) {
@@ -203,7 +216,13 @@ class Dialer {
await this.peerStore.addressBook.add(id, multiaddrs)
}
let knownAddrs = await this.peerStore.addressBook.getMultiaddrsForPeer(id, this.addressSorter) || []
let knownAddrs = await pipe(
await this.peerStore.addressBook.getMultiaddrsForPeer(id, this.addressSorter),
(source) => filter(source, async (multiaddr) => {
return !(await this.connectionGater.denyDialMultiaddr(id, multiaddr))
}),
(source) => all(source)
)
// If received a multiaddr to dial, it should be the first to use
// But, if we know other multiaddrs for the peer, we should try them too.

View File

@@ -12,6 +12,8 @@ exports.codes = {
PUBSUB_NOT_STARTED: 'ERR_PUBSUB_NOT_STARTED',
DHT_NOT_STARTED: 'ERR_DHT_NOT_STARTED',
CONN_ENCRYPTION_REQUIRED: 'ERR_CONN_ENCRYPTION_REQUIRED',
ERR_PEER_DIAL_INTERCEPTED: 'ERR_PEER_DIAL_INTERCEPTED',
ERR_CONNECTION_INTERCEPTED: 'ERR_CONNECTION_INTERCEPTED',
ERR_INVALID_PROTOCOLS_FOR_STREAM: 'ERR_INVALID_PROTOCOLS_FOR_STREAM',
ERR_CONNECTION_ENDED: 'ERR_CONNECTION_ENDED',
ERR_CONNECTION_FAILED: 'ERR_CONNECTION_FAILED',

View File

@@ -48,6 +48,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @typedef {import('libp2p-interfaces/src/pubsub').PubsubOptions} PubsubOptions
* @typedef {import('interface-datastore').Datastore} Datastore
* @typedef {import('./pnet')} Protector
* @typedef {import('./types').ConnectionGater} ConnectionGater
* @typedef {Object} PersistentPeerStoreOptions
* @property {number} [threshold]
*/
@@ -106,6 +107,7 @@ const { updateSelfPeerRecord } = require('./record/utils')
* @property {Libp2pModules} modules libp2p modules to use
* @property {import('./address-manager').AddressManagerOptions} [addresses]
* @property {import('./connection-manager').ConnectionManagerOptions} [connectionManager]
* @property {Partial<import('./types').ConnectionGater>} [connectionGater]
* @property {Datastore} [datastore]
* @property {import('./dialer').DialerOptions} [dialer]
* @property {import('./identify/index').HostProperties} [host] libp2p host
@@ -172,10 +174,25 @@ class Libp2p extends EventEmitter {
this.metrics = metrics
}
/** @type {ConnectionGater} */
this.connectionGater = {
denyDialPeer: async () => Promise.resolve(false),
denyDialMultiaddr: async () => Promise.resolve(false),
denyInboundConnection: async () => Promise.resolve(false),
denyOutboundConnection: async () => Promise.resolve(false),
denyInboundEncryptedConnection: async () => Promise.resolve(false),
denyOutboundEncryptedConnection: async () => Promise.resolve(false),
denyInboundUpgradedConnection: async () => Promise.resolve(false),
denyOutboundUpgradedConnection: async () => Promise.resolve(false),
filterMultiaddrForPeer: async () => Promise.resolve(true),
...this._options.connectionGater
}
/** @type {import('./peer-store/types').PeerStore} */
this.peerStore = new PeerStore({
peerId: this.peerId,
datastore: (this.datastore && this._options.peerStore.persistence) ? this.datastore : new MemoryDatastore()
datastore: (this.datastore && this._options.peerStore.persistence) ? this.datastore : new MemoryDatastore(),
addressFilter: this.connectionGater.filterMultiaddrForPeer
})
// Addresses {listen, announce, noAnnounce}
@@ -220,6 +237,7 @@ class Libp2p extends EventEmitter {
// Setup the Upgrader
this.upgrader = new Upgrader({
connectionGater: this.connectionGater,
localPeer: this.peerId,
metrics: this.metrics,
onConnection: (connection) => this.connectionManager.onConnect(connection),
@@ -262,6 +280,7 @@ class Libp2p extends EventEmitter {
this.dialer = new Dialer({
transportManager: this.transportManager,
connectionGater: this.connectionGater,
peerStore: this.peerStore,
metrics: this.metrics,
...this._options.dialer

View File

@@ -7,6 +7,11 @@ const PeerId = require('peer-id')
const { codes } = require('../errors')
const PeerRecord = require('../record/peer-record')
const Envelope = require('../record/envelope')
const { pipe } = require('it-pipe')
const all = require('it-all')
const filter = require('it-filter')
const map = require('it-map')
const each = require('it-foreach')
/**
* @typedef {import('./types').PeerStore} PeerStore
@@ -27,10 +32,12 @@ class PeerStoreAddressBook {
/**
* @param {PeerStore["emit"]} emit
* @param {import('./types').Store} store
* @param {(peerId: PeerId, multiaddr: Multiaddr) => Promise<boolean>} addressFilter
*/
constructor (emit, store) {
constructor (emit, store, addressFilter) {
this._emit = emit
this._store = store
this._addressFilter = addressFilter
}
/**
@@ -88,7 +95,7 @@ class PeerStoreAddressBook {
// Replace unsigned addresses by the new ones from the record
// TODO: Once we have ttls for the addresses, we should merge these in
updatedPeer = await this._store.patchOrCreate(peerId, {
addresses: convertMultiaddrsToAddresses(multiaddrs, true),
addresses: await filterMultiaddrs(peerId, multiaddrs, this._addressFilter, true),
peerRecordEnvelope: envelope.marshal()
})
@@ -180,6 +187,11 @@ class PeerStoreAddressBook {
throw errcode(new Error('peerId must be an instance of peer-id'), codes.ERR_INVALID_PARAMETERS)
}
if (!Array.isArray(multiaddrs)) {
log.error('multiaddrs must be an array of Multiaddrs')
throw errcode(new Error('multiaddrs must be an array of Multiaddrs'), codes.ERR_INVALID_PARAMETERS)
}
log('set await write lock')
const release = await this._store.lock.writeLock()
log('set got write lock')
@@ -188,7 +200,7 @@ class PeerStoreAddressBook {
let updatedPeer
try {
const addresses = convertMultiaddrsToAddresses(multiaddrs)
const addresses = await filterMultiaddrs(peerId, multiaddrs, this._addressFilter)
// No valid addresses found
if (!addresses.length) {
@@ -238,6 +250,11 @@ class PeerStoreAddressBook {
throw errcode(new Error('peerId must be an instance of peer-id'), codes.ERR_INVALID_PARAMETERS)
}
if (!Array.isArray(multiaddrs)) {
log.error('multiaddrs must be an array of Multiaddrs')
throw errcode(new Error('multiaddrs must be an array of Multiaddrs'), codes.ERR_INVALID_PARAMETERS)
}
log('add await write lock')
const release = await this._store.lock.writeLock()
log('add got write lock')
@@ -246,7 +263,7 @@ class PeerStoreAddressBook {
let updatedPeer
try {
const addresses = convertMultiaddrsToAddresses(multiaddrs)
const addresses = await filterMultiaddrs(peerId, multiaddrs, this._addressFilter)
// No valid addresses found
if (!addresses.length) {
@@ -337,33 +354,29 @@ class PeerStoreAddressBook {
}
/**
* Transforms received multiaddrs into Address.
*
* @private
* @param {PeerId} peerId
* @param {Multiaddr[]} multiaddrs
* @param {boolean} [isCertified]
* @returns {Address[]}
* @param {(peerId: PeerId, multiaddr: Multiaddr) => Promise<boolean>} addressFilter
* @param {boolean} isCertified
*/
function convertMultiaddrsToAddresses (multiaddrs, isCertified = false) {
if (!multiaddrs) {
log.error('multiaddrs must be provided to store data')
throw errcode(new Error('multiaddrs must be provided'), codes.ERR_INVALID_PARAMETERS)
}
// create Address for each address with no duplicates
return Array.from(
new Set(multiaddrs.map(ma => ma.toString()))
)
.map(addr => {
try {
return {
multiaddr: new Multiaddr(addr),
isCertified
}
} catch (err) {
throw errcode(err, codes.ERR_INVALID_PARAMETERS)
function filterMultiaddrs (peerId, multiaddrs, addressFilter, isCertified = false) {
return pipe(
multiaddrs,
(source) => each(source, (multiaddr) => {
if (!Multiaddr.isMultiaddr(multiaddr)) {
log.error('multiaddr must be an instance of Multiaddr')
throw errcode(new Error('multiaddr must be an instance of Multiaddr'), codes.ERR_INVALID_PARAMETERS)
}
})
}),
(source) => filter(source, (multiaddr) => addressFilter(peerId, multiaddr)),
(source) => map(source, (multiaddr) => {
return {
multiaddr: new Multiaddr(multiaddr.toString()),
isCertified
}
}),
(source) => all(source)
)
}
module.exports = PeerStoreAddressBook

View File

@@ -12,6 +12,7 @@ const Store = require('./store')
* @typedef {import('./types').PeerStore} PeerStore
* @typedef {import('./types').Peer} Peer
* @typedef {import('peer-id')} PeerId
* @typedef {import('multiaddr').Multiaddr} Multiaddr
*/
const log = Object.assign(debug('libp2p:peer-store'), {
@@ -28,14 +29,15 @@ class DefaultPeerStore extends EventEmitter {
* @param {object} properties
* @param {PeerId} properties.peerId
* @param {import('interface-datastore').Datastore} properties.datastore
* @param {(peerId: PeerId, multiaddr: Multiaddr) => Promise<boolean>} properties.addressFilter
*/
constructor ({ peerId, datastore }) {
constructor ({ peerId, datastore, addressFilter }) {
super()
this._peerId = peerId
this._store = new Store(datastore)
this.addressBook = new AddressBook(this.emit.bind(this), this._store)
this.addressBook = new AddressBook(this.emit.bind(this), this._store, addressFilter)
this.keyBook = new KeyBook(this.emit.bind(this), this._store)
this.metadataBook = new MetadataBook(this.emit.bind(this), this._store)
this.protoBook = new ProtoBook(this.emit.bind(this), this._store)

View File

@@ -100,13 +100,26 @@ class PersistentStore {
throw errcode(new Error('publicKey bytes do not match peer id publicKey bytes'), codes.ERR_INVALID_PARAMETERS)
}
// dedupe addresses
const addressSet = new Set()
const buf = PeerPB.encode({
addresses: peer.addresses.sort((a, b) => {
return a.multiaddr.toString().localeCompare(b.multiaddr.toString())
}).map(({ multiaddr, isCertified }) => ({
multiaddr: multiaddr.bytes,
isCertified
})),
addresses: peer.addresses
.filter(address => {
if (addressSet.has(address.multiaddr.toString())) {
return false
}
addressSet.add(address.multiaddr.toString())
return true
})
.sort((a, b) => {
return a.multiaddr.toString().localeCompare(b.multiaddr.toString())
})
.map(({ multiaddr, isCertified }) => ({
multiaddr: multiaddr.bytes,
isCertified
})),
protocols: peer.protocols.sort(),
pubKey: peer.pubKey ? marshalPublicKey(peer.pubKey) : undefined,
metadata: [...peer.metadata.keys()].sort().map(key => ({ key, value: peer.metadata.get(key) })),

98
src/types.ts Normal file
View File

@@ -0,0 +1,98 @@
import type PeerId from 'peer-id'
import type { Multiaddr } from 'multiaddr'
import type { MultiaddrConnection } from 'libp2p-interfaces/src/transport/types'
export interface ConnectionGater {
/**
* denyDialMultiaddr tests whether we're permitted to Dial the
* specified peer.
*
* This is called by the dialer.connectToPeer implementation before
* dialling a peer.
*
* Return true to prevent dialing the passed peer.
*/
denyDialPeer: (peerId: PeerId) => Promise<boolean>
/**
* denyDialMultiaddr tests whether we're permitted to dial the specified
* multiaddr for the given peer.
*
* This is called by the dialer.connectToPeer implementation after it has
* resolved the peer's addrs, and prior to dialling each.
*
* Return true to prevent dialing the passed peer on the passed multiaddr.
*/
denyDialMultiaddr: (peerId: PeerId, multiaddr: Multiaddr) => Promise<boolean>
/**
* denyInboundConnection tests whether an incipient inbound connection is allowed.
*
* This is called by the upgrader, or by the transport directly (e.g. QUIC,
* Bluetooth), straight after it has accepted a connection from its socket.
*
* Return true to deny the incoming passed connection.
*/
denyInboundConnection: (maConn: MultiaddrConnection) => Promise<boolean>
/**
* denyOutboundConnection tests whether an incipient outbound connection is allowed.
*
* This is called by the upgrader, or by the transport directly (e.g. QUIC,
* Bluetooth), straight after it has created a connection with its socket.
*
* Return true to deny the incoming passed connection.
*/
denyOutboundConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>
/**
* denyInboundEncryptedConnection tests whether a given connection, now encrypted,
* is allowed.
*
* This is called by the upgrader, after it has performed the security
* handshake, and before it negotiates the muxer, or by the directly by the
* transport, at the exact same checkpoint.
*
* Return true to deny the passed secured connection.
*/
denyInboundEncryptedConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>
/**
* denyOutboundEncryptedConnection tests whether a given connection, now encrypted,
* is allowed.
*
* This is called by the upgrader, after it has performed the security
* handshake, and before it negotiates the muxer, or by the directly by the
* transport, at the exact same checkpoint.
*
* Return true to deny the passed secured connection.
*/
denyOutboundEncryptedConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>
/**
* denyInboundUpgradedConnection tests whether a fully capable connection is allowed.
*
* This is called after encryption has been negotiated and the connection has been
* multiplexed, if a multiplexer is configured.
*
* Return true to deny the passed upgraded connection.
*/
denyInboundUpgradedConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>
/**
* denyOutboundUpgradedConnection tests whether a fully capable connection is allowed.
*
* This is called after encryption has been negotiated and the connection has been
* multiplexed, if a multiplexer is configured.
*
* Return true to deny the passed upgraded connection.
*/
denyOutboundUpgradedConnection: (peerId: PeerId, maConn: MultiaddrConnection) => Promise<boolean>
/**
* Used by the address book to filter passed addresses.
*
* Return true to allow storing the passed multiaddr for the passed peer.
*/
filterMultiaddrForPeer: (peer: PeerId, multiaddr: Multiaddr) => Promise<boolean>
}

View File

@@ -22,6 +22,7 @@ const { codes } = require('./errors')
* @typedef {import('libp2p-interfaces/src/crypto/types').Crypto} Crypto
* @typedef {import('libp2p-interfaces/src/connection').Connection} Connection
* @typedef {import('multiaddr').Multiaddr} Multiaddr
* @typedef {import('./types').ConnectionGater} ConnectionGater
*/
/**
@@ -35,6 +36,8 @@ class Upgrader {
/**
* @param {object} options
* @param {PeerId} options.localPeer
* @param {ConnectionGater} options.connectionGater
*
* @param {import('./metrics')} [options.metrics]
* @param {Map<string, Crypto>} [options.cryptos]
* @param {Map<string, MuxerFactory>} [options.muxers]
@@ -44,11 +47,13 @@ class Upgrader {
constructor ({
localPeer,
metrics,
connectionGater,
cryptos = new Map(),
muxers = new Map(),
onConnectionEnd = () => {},
onConnection = () => {}
}) {
this.connectionGater = connectionGater
this.localPeer = localPeer
this.metrics = metrics
this.cryptos = cryptos
@@ -76,6 +81,10 @@ class Upgrader {
let setPeer
let proxyPeer
if (await this.connectionGater.denyInboundConnection(maConn)) {
throw errCode(new Error('The multiaddr connection is blocked by gater.acceptConnection'), codes.ERR_CONNECTION_INTERCEPTED)
}
if (this.metrics) {
({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy())
const idString = (Math.random() * 1e9).toString(36) + Date.now()
@@ -99,6 +108,10 @@ class Upgrader {
protocol: cryptoProtocol
} = await this._encryptInbound(this.localPeer, protectedConn, this.cryptos))
if (await this.connectionGater.denyInboundEncryptedConnection(remotePeer, encryptedConn)) {
throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED)
}
// Multiplex the connection
if (this.muxers.size) {
({ stream: upgradedConn, Muxer } = await this._multiplexInbound(encryptedConn, this.muxers))
@@ -111,6 +124,10 @@ class Upgrader {
throw err
}
if (await this.connectionGater.denyInboundUpgradedConnection(remotePeer, encryptedConn)) {
throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED)
}
if (this.metrics) {
this.metrics.updatePlaceholder(proxyPeer, remotePeer)
setPeer(remotePeer)
@@ -143,6 +160,10 @@ class Upgrader {
const remotePeerId = PeerId.createFromB58String(idStr)
if (await this.connectionGater.denyOutboundConnection(remotePeerId, maConn)) {
throw errCode(new Error('The multiaddr connection is blocked by connectionGater.denyOutboundConnection'), codes.ERR_CONNECTION_INTERCEPTED)
}
let encryptedConn
let remotePeer
let upgradedConn
@@ -174,6 +195,10 @@ class Upgrader {
protocol: cryptoProtocol
} = await this._encryptOutbound(this.localPeer, protectedConn, remotePeerId, this.cryptos))
if (await this.connectionGater.denyOutboundEncryptedConnection(remotePeer, encryptedConn)) {
throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED)
}
// Multiplex the connection
if (this.muxers.size) {
({ stream: upgradedConn, Muxer } = await this._multiplexOutbound(encryptedConn, this.muxers))
@@ -186,6 +211,10 @@ class Upgrader {
throw err
}
if (await this.connectionGater.denyOutboundUpgradedConnection(remotePeer, encryptedConn)) {
throw errCode(new Error('The multiaddr connection is blocked by gater.acceptEncryptedConnection'), codes.ERR_CONNECTION_INTERCEPTED)
}
if (this.metrics) {
this.metrics.updatePlaceholder(proxyPeer, remotePeer)
setPeer(remotePeer)