|
|
|
@ -18,13 +18,17 @@ log.error = debug('libp2p:identify:error')
|
|
|
|
|
|
|
|
|
|
const {
|
|
|
|
|
MULTICODEC_IDENTIFY,
|
|
|
|
|
MULTICODEC_IDENTIFY_LEGACY,
|
|
|
|
|
MULTICODEC_IDENTIFY_PUSH,
|
|
|
|
|
MULTICODEC_IDENTIFY_PUSH_LEGACY,
|
|
|
|
|
AGENT_VERSION,
|
|
|
|
|
PROTOCOL_VERSION
|
|
|
|
|
} = require('./consts')
|
|
|
|
|
|
|
|
|
|
const errCode = require('err-code')
|
|
|
|
|
const { codes } = require('../errors')
|
|
|
|
|
const { messages, codes } = require('../errors')
|
|
|
|
|
const Envelope = require('../record-manager/envelope')
|
|
|
|
|
const PeerRecord = require('../record-manager/peer-record')
|
|
|
|
|
|
|
|
|
|
class IdentifyService {
|
|
|
|
|
/**
|
|
|
|
@ -89,11 +93,27 @@ class IdentifyService {
|
|
|
|
|
push (connections) {
|
|
|
|
|
const pushes = connections.map(async connection => {
|
|
|
|
|
try {
|
|
|
|
|
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY_PUSH)
|
|
|
|
|
const { protocol, stream } = await connection.newStream([MULTICODEC_IDENTIFY_PUSH, MULTICODEC_IDENTIFY_PUSH_LEGACY])
|
|
|
|
|
|
|
|
|
|
// Handle Legacy
|
|
|
|
|
if (protocol === MULTICODEC_IDENTIFY_PUSH_LEGACY) {
|
|
|
|
|
return pipe(
|
|
|
|
|
[{
|
|
|
|
|
listenAddrs: this._libp2p.multiaddrs.map((ma) => ma.buffer),
|
|
|
|
|
protocols: Array.from(this._protocols.keys())
|
|
|
|
|
}],
|
|
|
|
|
pb.encode(Message),
|
|
|
|
|
stream,
|
|
|
|
|
consume
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const envelope = this._libp2p.recordManager.getPeerRecord()
|
|
|
|
|
const signedPeerRecord = envelope.marshal()
|
|
|
|
|
|
|
|
|
|
await pipe(
|
|
|
|
|
[{
|
|
|
|
|
listenAddrs: this._libp2p.multiaddrs.map((ma) => ma.buffer),
|
|
|
|
|
signedPeerRecord,
|
|
|
|
|
protocols: Array.from(this._protocols.keys())
|
|
|
|
|
}],
|
|
|
|
|
pb.encode(Message),
|
|
|
|
@ -135,7 +155,7 @@ class IdentifyService {
|
|
|
|
|
* @returns {Promise<void>}
|
|
|
|
|
*/
|
|
|
|
|
async identify (connection) {
|
|
|
|
|
const { stream } = await connection.newStream(MULTICODEC_IDENTIFY)
|
|
|
|
|
const { protocol, stream } = await connection.newStream([MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_LEGACY])
|
|
|
|
|
const [data] = await pipe(
|
|
|
|
|
[],
|
|
|
|
|
stream,
|
|
|
|
@ -160,7 +180,8 @@ class IdentifyService {
|
|
|
|
|
publicKey,
|
|
|
|
|
listenAddrs,
|
|
|
|
|
protocols,
|
|
|
|
|
observedAddr
|
|
|
|
|
observedAddr,
|
|
|
|
|
signedPeerRecord
|
|
|
|
|
} = message
|
|
|
|
|
|
|
|
|
|
const id = await PeerId.createFromPubKey(publicKey)
|
|
|
|
@ -172,8 +193,40 @@ class IdentifyService {
|
|
|
|
|
// Get the observedAddr if there is one
|
|
|
|
|
observedAddr = IdentifyService.getCleanMultiaddr(observedAddr)
|
|
|
|
|
|
|
|
|
|
// LEGACY: differentiate message with SignedPeerRecord
|
|
|
|
|
if (protocol === MULTICODEC_IDENTIFY_LEGACY) {
|
|
|
|
|
// Update peers data in PeerStore
|
|
|
|
|
this.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr)))
|
|
|
|
|
this.peerStore.protoBook.set(id, protocols)
|
|
|
|
|
|
|
|
|
|
// TODO: Track our observed address so that we can score it
|
|
|
|
|
log('received observed address of %s', observedAddr)
|
|
|
|
|
|
|
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Open envelope and verify if is authenticated
|
|
|
|
|
let envelope
|
|
|
|
|
try {
|
|
|
|
|
envelope = await Envelope.openAndCertify(signedPeerRecord, PeerRecord.DOMAIN)
|
|
|
|
|
} catch (err) {
|
|
|
|
|
log('received invalid envelope, discard it')
|
|
|
|
|
throw errCode(new Error(messages.ERR_INVALID_ENVELOPE), codes.ERR_INVALID_ENVELOPE)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Decode peer record
|
|
|
|
|
let peerRecord
|
|
|
|
|
try {
|
|
|
|
|
peerRecord = await PeerRecord.createFromProtobuf(envelope.payload)
|
|
|
|
|
} catch (err) {
|
|
|
|
|
log('received invalid peer record, discard it')
|
|
|
|
|
throw errCode(new Error(messages.ERR_INVALID_PEER_RECORD), codes.ERR_INVALID_PEER_RECORD)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// TODO: Store as certified record
|
|
|
|
|
|
|
|
|
|
// Update peers data in PeerStore
|
|
|
|
|
this.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr)))
|
|
|
|
|
this.peerStore.addressBook.set(id, peerRecord.multiaddrs.map((addr) => multiaddr(addr)))
|
|
|
|
|
this.peerStore.protoBook.set(id, protocols)
|
|
|
|
|
this.peerStore.metadataBook.set(id, 'AgentVersion', Buffer.from(message.agentVersion))
|
|
|
|
|
|
|
|
|
@ -194,16 +247,20 @@ class IdentifyService {
|
|
|
|
|
switch (protocol) {
|
|
|
|
|
case MULTICODEC_IDENTIFY:
|
|
|
|
|
return this._handleIdentify({ connection, stream })
|
|
|
|
|
case MULTICODEC_IDENTIFY_LEGACY:
|
|
|
|
|
return this._handleIdentifyLegacy({ connection, stream })
|
|
|
|
|
case MULTICODEC_IDENTIFY_PUSH:
|
|
|
|
|
return this._handlePush({ connection, stream })
|
|
|
|
|
case MULTICODEC_IDENTIFY_PUSH_LEGACY:
|
|
|
|
|
return this._handlePushLegacy({ connection, stream })
|
|
|
|
|
default:
|
|
|
|
|
log.error('cannot handle unknown protocol %s', protocol)
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sends the `Identify` response to the requesting peer over the
|
|
|
|
|
* given `connection`
|
|
|
|
|
* Sends the `Identify` response with the Signed Peer Record
|
|
|
|
|
* to the requesting peer over the given `connection`
|
|
|
|
|
* @private
|
|
|
|
|
* @param {object} options
|
|
|
|
|
* @param {*} options.stream
|
|
|
|
@ -215,6 +272,40 @@ class IdentifyService {
|
|
|
|
|
publicKey = this.peerId.pubKey.bytes
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const envelope = this._libp2p.recordManager.getPeerRecord()
|
|
|
|
|
const signedPeerRecord = envelope.marshal()
|
|
|
|
|
|
|
|
|
|
const message = Message.encode({
|
|
|
|
|
protocolVersion: PROTOCOL_VERSION,
|
|
|
|
|
agentVersion: AGENT_VERSION,
|
|
|
|
|
publicKey,
|
|
|
|
|
signedPeerRecord,
|
|
|
|
|
observedAddr: connection.remoteAddr.buffer,
|
|
|
|
|
protocols: Array.from(this._protocols.keys())
|
|
|
|
|
})
|
|
|
|
|
|
|
|
|
|
pipe(
|
|
|
|
|
[message],
|
|
|
|
|
lp.encode(),
|
|
|
|
|
stream,
|
|
|
|
|
consume
|
|
|
|
|
)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Sends the `Identify` response with listen addresses (LEGACY)
|
|
|
|
|
* to the requesting peer over the given `connection`
|
|
|
|
|
* @private
|
|
|
|
|
* @param {object} options
|
|
|
|
|
* @param {*} options.stream
|
|
|
|
|
* @param {Connection} options.connection
|
|
|
|
|
*/
|
|
|
|
|
_handleIdentifyLegacy ({ connection, stream }) {
|
|
|
|
|
let publicKey = Buffer.alloc(0)
|
|
|
|
|
if (this.peerId.pubKey) {
|
|
|
|
|
publicKey = this.peerId.pubKey.bytes
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
const message = Message.encode({
|
|
|
|
|
protocolVersion: PROTOCOL_VERSION,
|
|
|
|
|
agentVersion: AGENT_VERSION,
|
|
|
|
@ -259,6 +350,63 @@ class IdentifyService {
|
|
|
|
|
return log.error('received invalid message', err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Open envelope and verify if is authenticated
|
|
|
|
|
let envelope
|
|
|
|
|
try {
|
|
|
|
|
envelope = await Envelope.openAndCertify(message.signedPeerRecord, PeerRecord.DOMAIN)
|
|
|
|
|
} catch (err) {
|
|
|
|
|
log('received invalid envelope, discard it')
|
|
|
|
|
throw errCode(new Error(messages.ERR_INVALID_ENVELOPE), codes.ERR_INVALID_ENVELOPE)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Decode peer record
|
|
|
|
|
let peerRecord
|
|
|
|
|
try {
|
|
|
|
|
peerRecord = await PeerRecord.createFromProtobuf(envelope.payload)
|
|
|
|
|
} catch (err) {
|
|
|
|
|
log('received invalid peer record, discard it')
|
|
|
|
|
throw errCode(new Error(messages.ERR_INVALID_PEER_RECORD), codes.ERR_INVALID_PEER_RECORD)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update peers data in PeerStore
|
|
|
|
|
const id = connection.remotePeer
|
|
|
|
|
try {
|
|
|
|
|
// TODO: Store as certified record
|
|
|
|
|
|
|
|
|
|
this.peerStore.addressBook.set(id, peerRecord.multiaddrs.map((addr) => multiaddr(addr)))
|
|
|
|
|
} catch (err) {
|
|
|
|
|
return log.error('received invalid listen addrs', err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update the protocols
|
|
|
|
|
this.peerStore.protoBook.set(id, message.protocols)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Reads the Identify Push message from the given `connection`
|
|
|
|
|
* with listen addresses (LEGACY)
|
|
|
|
|
* @private
|
|
|
|
|
* @param {object} options
|
|
|
|
|
* @param {*} options.stream
|
|
|
|
|
* @param {Connection} options.connection
|
|
|
|
|
*/
|
|
|
|
|
async _handlePushLegacy ({ connection, stream }) {
|
|
|
|
|
const [data] = await pipe(
|
|
|
|
|
[],
|
|
|
|
|
stream,
|
|
|
|
|
lp.decode(),
|
|
|
|
|
take(1),
|
|
|
|
|
toBuffer,
|
|
|
|
|
collect
|
|
|
|
|
)
|
|
|
|
|
|
|
|
|
|
let message
|
|
|
|
|
try {
|
|
|
|
|
message = Message.decode(data)
|
|
|
|
|
} catch (err) {
|
|
|
|
|
return log.error('received invalid message', err)
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Update peers data in PeerStore
|
|
|
|
|
const id = connection.remotePeer
|
|
|
|
|
try {
|
|
|
|
@ -279,6 +427,8 @@ module.exports.IdentifyService = IdentifyService
|
|
|
|
|
*/
|
|
|
|
|
module.exports.multicodecs = {
|
|
|
|
|
IDENTIFY: MULTICODEC_IDENTIFY,
|
|
|
|
|
IDENTIFY_PUSH: MULTICODEC_IDENTIFY_PUSH
|
|
|
|
|
IDENTIFY_LEGACY: MULTICODEC_IDENTIFY_LEGACY,
|
|
|
|
|
IDENTIFY_PUSH: MULTICODEC_IDENTIFY_PUSH,
|
|
|
|
|
IDENTIFY_PUSH_LEGACY: MULTICODEC_IDENTIFY_PUSH_LEGACY
|
|
|
|
|
}
|
|
|
|
|
module.exports.Message = Message
|
|
|
|
|