mirror of
https://github.com/fluencelabs/js-libp2p
synced 2025-06-21 04:51:33 +00:00
chore: address review
This commit is contained in:
@ -22,9 +22,9 @@ const PeerRecord = require('../record/peer-record')
|
||||
|
||||
const {
|
||||
MULTICODEC_IDENTIFY,
|
||||
MULTICODEC_IDENTIFY_LEGACY,
|
||||
MULTICODEC_IDENTIFY_1_0_0,
|
||||
MULTICODEC_IDENTIFY_PUSH,
|
||||
MULTICODEC_IDENTIFY_PUSH_LEGACY,
|
||||
MULTICODEC_IDENTIFY_PUSH_1_0_0,
|
||||
AGENT_VERSION,
|
||||
PROTOCOL_VERSION
|
||||
} = require('./consts')
|
||||
@ -97,26 +97,12 @@ class IdentifyService {
|
||||
push (connections) {
|
||||
const pushes = connections.map(async connection => {
|
||||
try {
|
||||
const { protocol, stream } = await connection.newStream([MULTICODEC_IDENTIFY_PUSH, MULTICODEC_IDENTIFY_PUSH_LEGACY])
|
||||
|
||||
// Handle Legacy
|
||||
if (protocol === MULTICODEC_IDENTIFY_PUSH_LEGACY) {
|
||||
return pipe(
|
||||
[{
|
||||
listenAddrs: this._libp2p.multiaddrs.map((ma) => ma.buffer),
|
||||
protocols: Array.from(this._protocols.keys())
|
||||
}],
|
||||
pb.encode(Message),
|
||||
stream,
|
||||
consume
|
||||
)
|
||||
}
|
||||
|
||||
const envelope = await this._getSelfPeerRecord()
|
||||
const signedPeerRecord = envelope.marshal()
|
||||
const { stream } = await connection.newStream([MULTICODEC_IDENTIFY_PUSH, MULTICODEC_IDENTIFY_PUSH_1_0_0])
|
||||
const signedPeerRecord = await this._getSelfPeerRecord()
|
||||
|
||||
await pipe(
|
||||
[{
|
||||
listenAddrs: this._libp2p.multiaddrs.map((ma) => ma.buffer),
|
||||
signedPeerRecord,
|
||||
protocols: Array.from(this._protocols.keys())
|
||||
}],
|
||||
@ -159,7 +145,7 @@ class IdentifyService {
|
||||
* @returns {Promise<void>}
|
||||
*/
|
||||
async identify (connection) {
|
||||
const { protocol, stream } = await connection.newStream([MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_LEGACY])
|
||||
const { protocol, stream } = await connection.newStream([MULTICODEC_IDENTIFY, MULTICODEC_IDENTIFY_1_0_0])
|
||||
const [data] = await pipe(
|
||||
[],
|
||||
stream,
|
||||
@ -198,7 +184,7 @@ class IdentifyService {
|
||||
observedAddr = IdentifyService.getCleanMultiaddr(observedAddr)
|
||||
|
||||
// LEGACY: differentiate message with SignedPeerRecord
|
||||
if (protocol === MULTICODEC_IDENTIFY_LEGACY) {
|
||||
if (protocol === MULTICODEC_IDENTIFY_1_0_0) {
|
||||
// Update peers data in PeerStore
|
||||
this.peerStore.addressBook.set(id, listenAddrs.map((addr) => multiaddr(addr)))
|
||||
this.peerStore.protoBook.set(id, protocols)
|
||||
@ -250,13 +236,11 @@ class IdentifyService {
|
||||
handleMessage ({ connection, stream, protocol }) {
|
||||
switch (protocol) {
|
||||
case MULTICODEC_IDENTIFY:
|
||||
case MULTICODEC_IDENTIFY_1_0_0:
|
||||
return this._handleIdentify({ connection, stream })
|
||||
case MULTICODEC_IDENTIFY_LEGACY:
|
||||
return this._handleIdentifyLegacy({ connection, stream })
|
||||
case MULTICODEC_IDENTIFY_PUSH:
|
||||
case MULTICODEC_IDENTIFY_PUSH_1_0_0:
|
||||
return this._handlePush({ connection, stream })
|
||||
case MULTICODEC_IDENTIFY_PUSH_LEGACY:
|
||||
return this._handlePushLegacy({ connection, stream })
|
||||
default:
|
||||
log.error('cannot handle unknown protocol %s', protocol)
|
||||
}
|
||||
@ -276,45 +260,14 @@ class IdentifyService {
|
||||
publicKey = this.peerId.pubKey.bytes
|
||||
}
|
||||
|
||||
const envelope = await this._getSelfPeerRecord()
|
||||
const signedPeerRecord = envelope.marshal()
|
||||
|
||||
const message = Message.encode({
|
||||
protocolVersion: PROTOCOL_VERSION,
|
||||
agentVersion: AGENT_VERSION,
|
||||
publicKey,
|
||||
signedPeerRecord,
|
||||
observedAddr: connection.remoteAddr.buffer,
|
||||
protocols: Array.from(this._protocols.keys())
|
||||
})
|
||||
|
||||
pipe(
|
||||
[message],
|
||||
lp.encode(),
|
||||
stream,
|
||||
consume
|
||||
)
|
||||
}
|
||||
|
||||
/**
|
||||
* Sends the `Identify` response with listen addresses (LEGACY)
|
||||
* to the requesting peer over the given `connection`
|
||||
* @private
|
||||
* @param {object} options
|
||||
* @param {*} options.stream
|
||||
* @param {Connection} options.connection
|
||||
*/
|
||||
_handleIdentifyLegacy ({ connection, stream }) {
|
||||
let publicKey = Buffer.alloc(0)
|
||||
if (this.peerId.pubKey) {
|
||||
publicKey = this.peerId.pubKey.bytes
|
||||
}
|
||||
const signedPeerRecord = await this._getSelfPeerRecord()
|
||||
|
||||
const message = Message.encode({
|
||||
protocolVersion: PROTOCOL_VERSION,
|
||||
agentVersion: AGENT_VERSION,
|
||||
publicKey,
|
||||
listenAddrs: this._libp2p.multiaddrs.map((ma) => ma.buffer),
|
||||
signedPeerRecord,
|
||||
observedAddr: connection.remoteAddr.buffer,
|
||||
protocols: Array.from(this._protocols.keys())
|
||||
})
|
||||
@ -354,6 +307,22 @@ class IdentifyService {
|
||||
return log.error('received invalid message', err)
|
||||
}
|
||||
|
||||
const id = connection.remotePeer
|
||||
|
||||
// Legacy
|
||||
if (!message.signedPeerRecord) {
|
||||
try {
|
||||
this.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr)))
|
||||
} catch (err) {
|
||||
return log.error('received invalid listen addrs', err)
|
||||
}
|
||||
|
||||
// Update the protocols
|
||||
this.peerStore.protoBook.set(id, message.protocols)
|
||||
|
||||
return
|
||||
}
|
||||
|
||||
// Open envelope and verify if is authenticated
|
||||
let envelope
|
||||
try {
|
||||
@ -373,7 +342,6 @@ class IdentifyService {
|
||||
}
|
||||
|
||||
// Update peers data in PeerStore
|
||||
const id = connection.remotePeer
|
||||
try {
|
||||
// TODO: Store as certified record
|
||||
|
||||
@ -387,45 +355,8 @@ class IdentifyService {
|
||||
}
|
||||
|
||||
/**
|
||||
* Reads the Identify Push message from the given `connection`
|
||||
* with listen addresses (LEGACY)
|
||||
* @private
|
||||
* @param {object} options
|
||||
* @param {*} options.stream
|
||||
* @param {Connection} options.connection
|
||||
*/
|
||||
async _handlePushLegacy ({ connection, stream }) {
|
||||
const [data] = await pipe(
|
||||
[],
|
||||
stream,
|
||||
lp.decode(),
|
||||
take(1),
|
||||
toBuffer,
|
||||
collect
|
||||
)
|
||||
|
||||
let message
|
||||
try {
|
||||
message = Message.decode(data)
|
||||
} catch (err) {
|
||||
return log.error('received invalid message', err)
|
||||
}
|
||||
|
||||
// Update peers data in PeerStore
|
||||
const id = connection.remotePeer
|
||||
try {
|
||||
this.peerStore.addressBook.set(id, message.listenAddrs.map((addr) => multiaddr(addr)))
|
||||
} catch (err) {
|
||||
return log.error('received invalid listen addrs', err)
|
||||
}
|
||||
|
||||
// Update the protocols
|
||||
this.peerStore.protoBook.set(id, message.protocols)
|
||||
}
|
||||
|
||||
/**
|
||||
* Get self signed peer record envelope.
|
||||
* @return {Envelope}
|
||||
* Get self signed peer record raw envelope.
|
||||
* @return {Buffer}
|
||||
*/
|
||||
async _getSelfPeerRecord () {
|
||||
// TODO: Verify if updated
|
||||
@ -437,7 +368,9 @@ class IdentifyService {
|
||||
peerId: this.peerId,
|
||||
multiaddrs: this._libp2p.multiaddrs
|
||||
})
|
||||
this._selfRecord = await Envelope.seal(peerRecord, this.peerId)
|
||||
const envelope = await Envelope.seal(peerRecord, this.peerId)
|
||||
|
||||
this._selfRecord = envelope.marshal()
|
||||
|
||||
return this._selfRecord
|
||||
}
|
||||
@ -450,8 +383,8 @@ module.exports.IdentifyService = IdentifyService
|
||||
*/
|
||||
module.exports.multicodecs = {
|
||||
IDENTIFY: MULTICODEC_IDENTIFY,
|
||||
IDENTIFY_LEGACY: MULTICODEC_IDENTIFY_LEGACY,
|
||||
IDENTIFY_1_0_0: MULTICODEC_IDENTIFY_1_0_0,
|
||||
IDENTIFY_PUSH: MULTICODEC_IDENTIFY_PUSH,
|
||||
IDENTIFY_PUSH_LEGACY: MULTICODEC_IDENTIFY_PUSH_LEGACY
|
||||
IDENTIFY_PUSH_1_0_0: MULTICODEC_IDENTIFY_PUSH_1_0_0
|
||||
}
|
||||
module.exports.Message = Message
|
||||
|
Reference in New Issue
Block a user