From f439d9b589a0a6544b61aca3736e920943ce38b5 Mon Sep 17 00:00:00 2001 From: Alex Potsides Date: Thu, 11 Aug 2022 13:21:04 +0100 Subject: [PATCH] deps!: update all deps to support no-copy operations (#1335) Updates all deps needed to support passing lists of byte arrays where they have been created from multiple input buffers. When reading multiplexed data, all messages arrive in length-prefixed buffers, which means the first few bytes tell the consumer how many bytes long next chunk will be. One length prefixed chunk can be delivered in several payloads from the underlying network transport. The first payload can also include the length prefix and some or all of the data, so we stitch these together in a `Uint8ArrayList` to avoid having to concatenate `Uint8Array`s together. Previously once we'd received enough bytes to satisfy the length prefix we'd concatenate the bytes together, but this is a potentially expensive operation where transports have small message sizes so instead just pass the `Uint8ArrayList` to the consumer and let them decide wether to concatenate or not as some consumers will be smart enough to operate on lists of `Uint8Array`s instead of always requiring a contiguous block of memory. BREAKING CHANGE: Streams are now `Duplex` --- .aegir.js | 4 +- examples/connection-encryption/1.js | 2 +- examples/delegated-routing/package.json | 6 +- examples/echo/src/dialer.js | 2 +- examples/libp2p-in-the-browser/package.json | 4 +- examples/pnet/index.js | 2 +- examples/protocol-and-stream-muxing/1.js | 2 +- examples/protocol-and-stream-muxing/2.js | 2 +- examples/protocol-and-stream-muxing/3.js | 4 +- examples/transports/2.js | 5 + examples/transports/3.js | 2 +- examples/transports/4.js | 2 +- examples/webrtc-direct/package.json | 4 +- package.json | 74 +++++----- src/circuit/README.md | 4 +- src/circuit/circuit/hop.ts | 8 +- src/circuit/circuit/stop.ts | 3 +- src/circuit/circuit/stream-handler.ts | 4 +- src/circuit/pb/index.ts | 149 +++++++++++++++++--- src/circuit/transport.ts | 6 +- src/fetch/pb/proto.ts | 121 ++++++++++++++-- src/identify/index.ts | 7 +- src/identify/pb/message.ts | 123 ++++++++++++++-- src/insecure/index.ts | 9 +- src/insecure/pb/proto.ts | 124 ++++++++++++++-- src/metrics/index.ts | 11 +- src/ping/index.ts | 2 +- src/pnet/index.ts | 2 + src/upgrader.ts | 34 ++--- test/configuration/utils.ts | 9 +- test/core/consume-peer-record.spec.ts | 4 +- test/core/encryption.spec.ts | 4 +- test/core/get-public-key.spec.ts | 4 +- test/core/listening.node.ts | 4 +- test/dialing/direct.node.ts | 22 +-- test/dialing/direct.spec.ts | 20 +-- test/fetch/fetch.node.ts | 4 +- test/metrics/index.node.ts | 3 +- test/transports/transport-manager.spec.ts | 8 +- test/upgrading/upgrader.spec.ts | 22 +-- 40 files changed, 626 insertions(+), 200 deletions(-) diff --git a/.aegir.js b/.aegir.js index 66bccedf..c0a745f8 100644 --- a/.aegir.js +++ b/.aegir.js @@ -1,6 +1,6 @@ import { WebSockets } from '@libp2p/websockets' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Noise } from '@chainsafe/libp2p-noise' import { pipe } from 'it-pipe' import { createFromJSON } from '@libp2p/peer-id-factory' @@ -31,7 +31,7 @@ export default { new Mplex() ], connectionEncryption: [ - NOISE, + new Noise(), new Plaintext() ], relay: { diff --git a/examples/connection-encryption/1.js b/examples/connection-encryption/1.js index 0e774fcb..4f03ff6d 100644 --- a/examples/connection-encryption/1.js +++ b/examples/connection-encryption/1.js @@ -34,7 +34,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/delegated-routing/package.json b/examples/delegated-routing/package.json index 04ff5e32..8c7d4302 100644 --- a/examples/delegated-routing/package.json +++ b/examples/delegated-routing/package.json @@ -3,13 +3,13 @@ "version": "0.1.0", "private": true, "dependencies": { - "@chainsafe/libp2p-noise": "^7.0.1", - "ipfs-core": "^0.14.1", + "@chainsafe/libp2p-noise": "^8.0.0", + "ipfs-core": "^0.15.4", "libp2p": "../../", "@libp2p/delegated-content-routing": "^2.0.1", "@libp2p/delegated-peer-routing": "^2.0.1", "@libp2p/kad-dht": "^3.0.0", - "@libp2p/mplex": "^4.0.2", + "@libp2p/mplex": "^5.0.0", "@libp2p/webrtc-star": "^3.0.0", "@libp2p/websockets": "^3.0.0", "react": "^17.0.2", diff --git a/examples/echo/src/dialer.js b/examples/echo/src/dialer.js index 435a9ea2..0982e69e 100644 --- a/examples/echo/src/dialer.js +++ b/examples/echo/src/dialer.js @@ -51,7 +51,7 @@ async function run() { // For each chunk of data for await (const data of source) { // Output the data - console.log('received echo:', uint8ArrayToString(data)) + console.log('received echo:', uint8ArrayToString(data.subarray())) } } ) diff --git a/examples/libp2p-in-the-browser/package.json b/examples/libp2p-in-the-browser/package.json index 57e4fe1a..f53ae096 100644 --- a/examples/libp2p-in-the-browser/package.json +++ b/examples/libp2p-in-the-browser/package.json @@ -9,9 +9,9 @@ }, "license": "ISC", "dependencies": { - "@chainsafe/libp2p-noise": "^7.0.1", + "@chainsafe/libp2p-noise": "^8.0.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/mplex": "^4.0.2", + "@libp2p/mplex": "^5.0.0", "@libp2p/webrtc-star": "^3.0.0", "@libp2p/websockets": "^3.0.0", "libp2p": "../../" diff --git a/examples/pnet/index.js b/examples/pnet/index.js index fa8268df..6d3adf0a 100644 --- a/examples/pnet/index.js +++ b/examples/pnet/index.js @@ -37,7 +37,7 @@ generateKey(otherSwarmKey) stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/protocol-and-stream-muxing/1.js b/examples/protocol-and-stream-muxing/1.js index f1ab2f20..024aaaab 100644 --- a/examples/protocol-and-stream-muxing/1.js +++ b/examples/protocol-and-stream-muxing/1.js @@ -36,7 +36,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/protocol-and-stream-muxing/2.js b/examples/protocol-and-stream-muxing/2.js index 2605938d..0b7a332b 100644 --- a/examples/protocol-and-stream-muxing/2.js +++ b/examples/protocol-and-stream-muxing/2.js @@ -35,7 +35,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg)}`) + console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg.subarray())}`) } } ).finally(() => { diff --git a/examples/protocol-and-stream-muxing/3.js b/examples/protocol-and-stream-muxing/3.js index af63bdea..84de6b1b 100644 --- a/examples/protocol-and-stream-muxing/3.js +++ b/examples/protocol-and-stream-muxing/3.js @@ -37,7 +37,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) @@ -48,7 +48,7 @@ const createNode = async () => { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/transports/2.js b/examples/transports/2.js index 9dee8878..2808c820 100644 --- a/examples/transports/2.js +++ b/examples/transports/2.js @@ -42,6 +42,11 @@ function printAddrs (node, number) { node2.handle('/print', async ({ stream }) => { const result = await pipe( stream, + async function * (source) { + for await (const list of source) { + yield list.subarray() + } + }, toBuffer ) console.log(uint8ArrayToString(result)) diff --git a/examples/transports/3.js b/examples/transports/3.js index 26abf36e..980570c7 100644 --- a/examples/transports/3.js +++ b/examples/transports/3.js @@ -37,7 +37,7 @@ function print ({ stream }) { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/transports/4.js b/examples/transports/4.js index 0e13c569..ad35181e 100644 --- a/examples/transports/4.js +++ b/examples/transports/4.js @@ -57,7 +57,7 @@ function print ({ stream }) { stream, async function (source) { for await (const msg of source) { - console.log(uint8ArrayToString(msg)) + console.log(uint8ArrayToString(msg.subarray())) } } ) diff --git a/examples/webrtc-direct/package.json b/examples/webrtc-direct/package.json index 664488dd..d5be295f 100644 --- a/examples/webrtc-direct/package.json +++ b/examples/webrtc-direct/package.json @@ -10,9 +10,9 @@ "license": "ISC", "dependencies": { "@libp2p/webrtc-direct": "^2.0.0", - "@chainsafe/libp2p-noise": "^7.0.3", + "@chainsafe/libp2p-noise": "^8.0.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/mplex": "^4.0.3", + "@libp2p/mplex": "^5.0.0", "libp2p": "../../", "wrtc": "^0.4.7" }, diff --git a/package.json b/package.json index a8071c1e..1e31ab23 100644 --- a/package.json +++ b/package.json @@ -98,36 +98,36 @@ }, "dependencies": { "@achingbrain/nat-port-mapper": "^1.0.3", - "@libp2p/components": "^2.0.1", - "@libp2p/connection": "^4.0.0", - "@libp2p/crypto": "^1.0.0", - "@libp2p/interface-address-manager": "^1.0.1", - "@libp2p/interface-connection": "^2.0.0", - "@libp2p/interface-connection-encrypter": "^1.0.2", - "@libp2p/interface-content-routing": "^1.0.1", - "@libp2p/interface-dht": "^1.0.0", - "@libp2p/interface-metrics": "^2.0.0", - "@libp2p/interface-peer-discovery": "^1.0.0", - "@libp2p/interface-peer-id": "^1.0.2", - "@libp2p/interface-peer-info": "^1.0.1", - "@libp2p/interface-peer-routing": "^1.0.0", - "@libp2p/interface-peer-store": "^1.2.0", - "@libp2p/interface-pubsub": "^2.0.0", - "@libp2p/interface-registrar": "^2.0.0", - "@libp2p/interface-stream-muxer": "^2.0.1", - "@libp2p/interface-transport": "^1.0.0", - "@libp2p/interfaces": "^3.0.2", + "@libp2p/components": "^2.0.3", + "@libp2p/connection": "^4.0.1", + "@libp2p/crypto": "^1.0.3", + "@libp2p/interface-address-manager": "^1.0.2", + "@libp2p/interface-connection": "^3.0.1", + "@libp2p/interface-connection-encrypter": "^2.0.1", + "@libp2p/interface-content-routing": "^1.0.2", + "@libp2p/interface-dht": "^1.0.1", + "@libp2p/interface-metrics": "^3.0.0", + "@libp2p/interface-peer-discovery": "^1.0.1", + "@libp2p/interface-peer-id": "^1.0.4", + "@libp2p/interface-peer-info": "^1.0.2", + "@libp2p/interface-peer-routing": "^1.0.1", + "@libp2p/interface-peer-store": "^1.2.1", + "@libp2p/interface-pubsub": "^2.0.1", + "@libp2p/interface-registrar": "^2.0.3", + "@libp2p/interface-stream-muxer": "^2.0.2", + "@libp2p/interface-transport": "^1.0.3", + "@libp2p/interfaces": "^3.0.3", "@libp2p/logger": "^2.0.0", - "@libp2p/multistream-select": "^2.0.1", + "@libp2p/multistream-select": "^3.0.0", "@libp2p/peer-collections": "^2.0.0", - "@libp2p/peer-id": "^1.1.10", - "@libp2p/peer-id-factory": "^1.0.9", - "@libp2p/peer-record": "^4.0.0", - "@libp2p/peer-store": "^3.0.0", + "@libp2p/peer-id": "^1.1.15", + "@libp2p/peer-id-factory": "^1.0.18", + "@libp2p/peer-record": "^4.0.1", + "@libp2p/peer-store": "^3.1.2", "@libp2p/tracked-map": "^2.0.1", - "@libp2p/utils": "^3.0.0", + "@libp2p/utils": "^3.0.1", "@multiformats/mafmt": "^11.0.2", - "@multiformats/multiaddr": "^10.1.8", + "@multiformats/multiaddr": "^10.3.3", "abortable-iterator": "^4.0.2", "any-signal": "^3.0.0", "datastore-core": "^7.0.0", @@ -140,7 +140,7 @@ "it-filter": "^1.0.3", "it-first": "^1.0.6", "it-foreach": "^0.1.1", - "it-handshake": "^4.0.0", + "it-handshake": "^4.1.2", "it-length-prefixed": "^8.0.2", "it-map": "^1.0.6", "it-merge": "^1.0.3", @@ -156,31 +156,31 @@ "p-retry": "^5.0.0", "p-settle": "^5.0.0", "private-ip": "^2.3.3", - "protons-runtime": "^2.0.2", + "protons-runtime": "^3.0.1", "retimer": "^3.0.0", "sanitize-filename": "^1.6.3", "set-delayed-interval": "^1.0.0", "timeout-abort-controller": "^3.0.0", - "uint8arraylist": "^2.0.0", + "uint8arraylist": "^2.3.2", "uint8arrays": "^3.0.0", "wherearewe": "^1.0.0", "xsalsa20": "^1.1.0" }, "devDependencies": { - "@chainsafe/libp2p-noise": "^7.0.2", + "@chainsafe/libp2p-noise": "^8.0.0", "@libp2p/bootstrap": "^2.0.0", - "@libp2p/daemon-client": "^2.0.0", - "@libp2p/daemon-server": "^2.0.0", + "@libp2p/daemon-client": "^2.0.4", + "@libp2p/daemon-server": "^2.0.4", "@libp2p/delegated-content-routing": "^2.0.1", "@libp2p/delegated-peer-routing": "^2.0.1", "@libp2p/floodsub": "^3.0.0", "@libp2p/interface-compliance-tests": "^3.0.1", - "@libp2p/interface-connection-encrypter-compliance-tests": "^1.0.0", - "@libp2p/interface-mocks": "^3.0.1", + "@libp2p/interface-connection-encrypter-compliance-tests": "^2.0.1", + "@libp2p/interface-mocks": "^4.0.1", "@libp2p/interop": "^2.0.0", - "@libp2p/kad-dht": "^3.0.0", + "@libp2p/kad-dht": "^3.0.1", "@libp2p/mdns": "^3.0.0", - "@libp2p/mplex": "^4.0.2", + "@libp2p/mplex": "^5.0.0", "@libp2p/pubsub": "^3.0.1", "@libp2p/tcp": "^3.0.0", "@libp2p/topology": "^3.0.0", @@ -205,7 +205,7 @@ "p-event": "^5.0.1", "p-times": "^4.0.0", "p-wait-for": "^5.0.0", - "protons": "^4.0.1", + "protons": "^5.0.0", "rimraf": "^3.0.2", "sinon": "^14.0.0", "ts-sinon": "^2.0.2" diff --git a/src/circuit/README.md b/src/circuit/README.md index cbf1dd1d..330df42a 100644 --- a/src/circuit/README.md +++ b/src/circuit/README.md @@ -41,7 +41,7 @@ import { Multiaddr } from '@multiformats/multiaddr' import Libp2p from 'libp2p' import { TCP } from '@libp2p/tcp' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Noise } from '@chainsafe/libp2p-noise' const relayAddr = ... @@ -56,7 +56,7 @@ const node = await createLibp2p({ new Mplex() ], connectionEncryption: [ - NOISE + new Noise() ] }, config: { diff --git a/src/circuit/circuit/hop.ts b/src/circuit/circuit/hop.ts index a7fd3885..1ec63e0c 100644 --- a/src/circuit/circuit/hop.ts +++ b/src/circuit/circuit/hop.ts @@ -13,6 +13,7 @@ import type { Duplex } from 'it-stream-types' import type { Circuit } from '../transport.js' import type { ConnectionManager } from '@libp2p/interface-connection-manager' import type { AbortOptions } from '@libp2p/interfaces' +import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:circuit:hop') @@ -24,7 +25,7 @@ export interface HopRequest { connectionManager: ConnectionManager } -export async function handleHop (hopRequest: HopRequest) { +export async function handleHop (hopRequest: HopRequest): Promise { const { connection, request, @@ -84,7 +85,7 @@ export async function handleHop (hopRequest: HopRequest) { srcPeer: request.srcPeer } - let destinationStream: Duplex + let destinationStream: Duplex try { log('performing STOP request') const result = await stop({ @@ -128,7 +129,7 @@ export interface HopConfig extends AbortOptions { * Performs a HOP request to a relay peer, to request a connection to another * peer. A new, virtual, connection will be created between the two via the relay. */ -export async function hop (options: HopConfig): Promise> { +export async function hop (options: HopConfig): Promise> { const { connection, request, @@ -151,6 +152,7 @@ export async function hop (options: HopConfig): Promise> { if (response.code === CircuitPB.Status.SUCCESS) { log('hop request was successful') + return streamHandler.rest() } diff --git a/src/circuit/circuit/stop.ts b/src/circuit/circuit/stop.ts index 1ad8f8e0..2e27d010 100644 --- a/src/circuit/circuit/stop.ts +++ b/src/circuit/circuit/stop.ts @@ -6,6 +6,7 @@ import { validateAddrs } from './utils.js' import type { Connection } from '@libp2p/interface-connection' import type { Duplex } from 'it-stream-types' import type { AbortOptions } from '@libp2p/interfaces' +import type { Uint8ArrayList } from 'uint8arraylist' const log = logger('libp2p:circuit:stop') @@ -18,7 +19,7 @@ export interface HandleStopOptions { /** * Handles incoming STOP requests */ -export function handleStop (options: HandleStopOptions): Duplex | undefined { +export function handleStop (options: HandleStopOptions): Duplex | undefined { const { connection, request, diff --git a/src/circuit/circuit/stream-handler.ts b/src/circuit/circuit/stream-handler.ts index 0935d5d6..0638733f 100644 --- a/src/circuit/circuit/stream-handler.ts +++ b/src/circuit/circuit/stream-handler.ts @@ -22,7 +22,7 @@ export interface StreamHandlerOptions { export class StreamHandler { private readonly stream: Stream - private readonly shake: Handshake + private readonly shake: Handshake private readonly decoder: Source constructor (options: StreamHandlerOptions) { @@ -56,7 +56,7 @@ export class StreamHandler { */ write (msg: CircuitRelay) { log('write message type %s', msg.type) - this.shake.write(lp.encode.single(CircuitRelay.encode(msg)).slice()) + this.shake.write(lp.encode.single(CircuitRelay.encode(msg))) } /** diff --git a/src/circuit/pb/index.ts b/src/circuit/pb/index.ts index ec47246a..44b0127c 100644 --- a/src/circuit/pb/index.ts +++ b/src/circuit/pb/index.ts @@ -1,9 +1,9 @@ /* eslint-disable import/export */ /* eslint-disable @typescript-eslint/no-namespace */ -import { enumeration, encodeMessage, decodeMessage, message, bytes } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { enumeration, encodeMessage, decodeMessage, message } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +import type { Codec } from 'protons-runtime' export interface CircuitRelay { type?: CircuitRelay.Type @@ -53,7 +53,7 @@ export namespace CircuitRelay { export namespace Status { export const codec = () => { - return enumeration(__StatusValues) + return enumeration(__StatusValues) } } @@ -73,7 +73,7 @@ export namespace CircuitRelay { export namespace Type { export const codec = () => { - return enumeration(__TypeValues) + return enumeration(__TypeValues) } } @@ -83,14 +83,74 @@ export namespace CircuitRelay { } export namespace Peer { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'id', codec: bytes }, - 2: { name: 'addrs', codec: bytes, repeats: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.id != null) { + writer.uint32(10) + writer.bytes(obj.id) + } else { + throw new Error('Protocol error: required field "id" was not found in object') + } + + if (obj.addrs != null) { + for (const value of obj.addrs) { + writer.uint32(18) + writer.bytes(value) + } + } else { + throw new Error('Protocol error: required field "addrs" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.id = reader.bytes() + break + case 2: + obj.addrs = obj.addrs ?? [] + obj.addrs.push(reader.bytes()) + break + default: + reader.skipType(tag & 7) + break + } + } + + obj.addrs = obj.addrs ?? [] + + if (obj.id == null) { + throw new Error('Protocol error: value for required field "id" was not found in protobuf') + } + + if (obj.addrs == null) { + throw new Error('Protocol error: value for required field "addrs" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: Peer): Uint8ArrayList => { + export const encode = (obj: Peer): Uint8Array => { return encodeMessage(obj, Peer.codec()) } @@ -99,16 +159,73 @@ export namespace CircuitRelay { } } + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'type', codec: CircuitRelay.Type.codec(), optional: true }, - 2: { name: 'srcPeer', codec: CircuitRelay.Peer.codec(), optional: true }, - 3: { name: 'dstPeer', codec: CircuitRelay.Peer.codec(), optional: true }, - 4: { name: 'code', codec: CircuitRelay.Status.codec(), optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.type != null) { + writer.uint32(8) + CircuitRelay.Type.codec().encode(obj.type, writer) + } + + if (obj.srcPeer != null) { + writer.uint32(18) + CircuitRelay.Peer.codec().encode(obj.srcPeer, writer) + } + + if (obj.dstPeer != null) { + writer.uint32(26) + CircuitRelay.Peer.codec().encode(obj.dstPeer, writer) + } + + if (obj.code != null) { + writer.uint32(32) + CircuitRelay.Status.codec().encode(obj.code, writer) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.type = CircuitRelay.Type.codec().decode(reader) + break + case 2: + obj.srcPeer = CircuitRelay.Peer.codec().decode(reader, reader.uint32()) + break + case 3: + obj.dstPeer = CircuitRelay.Peer.codec().decode(reader, reader.uint32()) + break + case 4: + obj.code = CircuitRelay.Status.codec().decode(reader) + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec } - export const encode = (obj: CircuitRelay): Uint8ArrayList => { + export const encode = (obj: CircuitRelay): Uint8Array => { return encodeMessage(obj, CircuitRelay.codec()) } diff --git a/src/circuit/transport.ts b/src/circuit/transport.ts index 5c87bd2e..eaa0f4a9 100644 --- a/src/circuit/transport.ts +++ b/src/circuit/transport.ts @@ -21,6 +21,8 @@ import type { RelayConfig } from '../index.js' import { abortableDuplex } from 'abortable-iterator' import { TimeoutController } from 'timeout-abort-controller' import { setMaxListeners } from 'events' +import type { Uint8ArrayList } from 'uint8arraylist' +import type { Duplex } from 'it-stream-types' const log = logger('libp2p:circuit') @@ -90,7 +92,7 @@ export class Circuit implements Transport, Initializable { return } - let virtualConnection + let virtualConnection: Duplex | undefined switch (request.type) { case CircuitPB.Type.CAN_HOP: { @@ -100,7 +102,7 @@ export class Circuit implements Transport, Initializable { } case CircuitPB.Type.HOP: { log('received HOP request from %p', connection.remotePeer) - virtualConnection = await handleHop({ + await handleHop({ connection, request, streamHandler, diff --git a/src/fetch/pb/proto.ts b/src/fetch/pb/proto.ts index 60855935..d4997ea4 100644 --- a/src/fetch/pb/proto.ts +++ b/src/fetch/pb/proto.ts @@ -1,22 +1,64 @@ /* eslint-disable import/export */ /* eslint-disable @typescript-eslint/no-namespace */ -import { encodeMessage, decodeMessage, message, string, enumeration, bytes } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { encodeMessage, decodeMessage, message, enumeration } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +import type { Codec } from 'protons-runtime' export interface FetchRequest { identifier: string } export namespace FetchRequest { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'identifier', codec: string } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.identifier != null) { + writer.uint32(10) + writer.string(obj.identifier) + } else { + throw new Error('Protocol error: required field "identifier" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.identifier = reader.string() + break + default: + reader.skipType(tag & 7) + break + } + } + + if (obj.identifier == null) { + throw new Error('Protocol error: value for required field "identifier" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: FetchRequest): Uint8ArrayList => { + export const encode = (obj: FetchRequest): Uint8Array => { return encodeMessage(obj, FetchRequest.codec()) } @@ -45,18 +87,73 @@ export namespace FetchResponse { export namespace StatusCode { export const codec = () => { - return enumeration(__StatusCodeValues) + return enumeration(__StatusCodeValues) } } + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'status', codec: FetchResponse.StatusCode.codec() }, - 2: { name: 'data', codec: bytes } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.status != null) { + writer.uint32(8) + FetchResponse.StatusCode.codec().encode(obj.status, writer) + } else { + throw new Error('Protocol error: required field "status" was not found in object') + } + + if (obj.data != null) { + writer.uint32(18) + writer.bytes(obj.data) + } else { + throw new Error('Protocol error: required field "data" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.status = FetchResponse.StatusCode.codec().decode(reader) + break + case 2: + obj.data = reader.bytes() + break + default: + reader.skipType(tag & 7) + break + } + } + + if (obj.status == null) { + throw new Error('Protocol error: value for required field "status" was not found in protobuf') + } + + if (obj.data == null) { + throw new Error('Protocol error: value for required field "data" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: FetchResponse): Uint8ArrayList => { + export const encode = (obj: FetchResponse): Uint8Array => { return encodeMessage(obj, FetchResponse.codec()) } diff --git a/src/identify/index.ts b/src/identify/index.ts index ba82aa81..a0f05142 100644 --- a/src/identify/index.ts +++ b/src/identify/index.ts @@ -26,7 +26,6 @@ import type { Components } from '@libp2p/components' import { TimeoutController } from 'timeout-abort-controller' import type { AbortOptions } from '@libp2p/interfaces' import { abortableDuplex } from 'abortable-iterator' -import type { Duplex } from 'it-stream-types' import { setMaxListeners } from 'events' const log = logger('libp2p:identify') @@ -179,7 +178,7 @@ export class IdentifyService implements Startable { }) // make stream abortable - const source: Duplex = abortableDuplex(stream, timeoutController.signal) + const source = abortableDuplex(stream, timeoutController.signal) await pipe( [Identify.encode({ @@ -418,7 +417,7 @@ export class IdentifyService implements Startable { }) // make stream abortable - const source: Duplex = abortableDuplex(stream, timeoutController.signal) + const source = abortableDuplex(stream, timeoutController.signal) await pipe( [message], @@ -449,7 +448,7 @@ export class IdentifyService implements Startable { let message: Identify | undefined try { // make stream abortable - const source: Duplex = abortableDuplex(stream, timeoutController.signal) + const source = abortableDuplex(stream, timeoutController.signal) const data = await pipe( [], diff --git a/src/identify/pb/message.ts b/src/identify/pb/message.ts index 1f9072d6..882b5b67 100644 --- a/src/identify/pb/message.ts +++ b/src/identify/pb/message.ts @@ -1,9 +1,9 @@ /* eslint-disable import/export */ /* eslint-disable @typescript-eslint/no-namespace */ -import { encodeMessage, decodeMessage, message, string, bytes } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { encodeMessage, decodeMessage, message } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +import type { Codec } from 'protons-runtime' export interface Identify { protocolVersion?: string @@ -16,19 +16,118 @@ export interface Identify { } export namespace Identify { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 5: { name: 'protocolVersion', codec: string, optional: true }, - 6: { name: 'agentVersion', codec: string, optional: true }, - 1: { name: 'publicKey', codec: bytes, optional: true }, - 2: { name: 'listenAddrs', codec: bytes, repeats: true }, - 4: { name: 'observedAddr', codec: bytes, optional: true }, - 3: { name: 'protocols', codec: string, repeats: true }, - 8: { name: 'signedPeerRecord', codec: bytes, optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.protocolVersion != null) { + writer.uint32(42) + writer.string(obj.protocolVersion) + } + + if (obj.agentVersion != null) { + writer.uint32(50) + writer.string(obj.agentVersion) + } + + if (obj.publicKey != null) { + writer.uint32(10) + writer.bytes(obj.publicKey) + } + + if (obj.listenAddrs != null) { + for (const value of obj.listenAddrs) { + writer.uint32(18) + writer.bytes(value) + } + } else { + throw new Error('Protocol error: required field "listenAddrs" was not found in object') + } + + if (obj.observedAddr != null) { + writer.uint32(34) + writer.bytes(obj.observedAddr) + } + + if (obj.protocols != null) { + for (const value of obj.protocols) { + writer.uint32(26) + writer.string(value) + } + } else { + throw new Error('Protocol error: required field "protocols" was not found in object') + } + + if (obj.signedPeerRecord != null) { + writer.uint32(66) + writer.bytes(obj.signedPeerRecord) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 5: + obj.protocolVersion = reader.string() + break + case 6: + obj.agentVersion = reader.string() + break + case 1: + obj.publicKey = reader.bytes() + break + case 2: + obj.listenAddrs = obj.listenAddrs ?? [] + obj.listenAddrs.push(reader.bytes()) + break + case 4: + obj.observedAddr = reader.bytes() + break + case 3: + obj.protocols = obj.protocols ?? [] + obj.protocols.push(reader.string()) + break + case 8: + obj.signedPeerRecord = reader.bytes() + break + default: + reader.skipType(tag & 7) + break + } + } + + obj.listenAddrs = obj.listenAddrs ?? [] + obj.protocols = obj.protocols ?? [] + + if (obj.listenAddrs == null) { + throw new Error('Protocol error: value for required field "listenAddrs" was not found in protobuf') + } + + if (obj.protocols == null) { + throw new Error('Protocol error: value for required field "protocols" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: Identify): Uint8ArrayList => { + export const encode = (obj: Identify): Uint8Array => { return encodeMessage(obj, Identify.codec()) } diff --git a/src/insecure/index.ts b/src/insecure/index.ts index 3ffd6618..d7734156 100644 --- a/src/insecure/index.ts +++ b/src/insecure/index.ts @@ -7,6 +7,7 @@ import type { PeerId } from '@libp2p/interface-peer-id' import { peerIdFromBytes, peerIdFromKeys } from '@libp2p/peer-id' import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter' import type { Duplex } from 'it-stream-types' +import map from 'it-map' const log = logger('libp2p:plaintext') const PROTOCOL = '/plaintext/2.0.0' @@ -47,7 +48,7 @@ async function encrypt (localId: PeerId, conn: Duplex, remoteId?: Pe // Get the Exchange message // @ts-expect-error needs to be generator const response = (await lp.decode.fromReader(shake.reader).next()).value - const id = Exchange.decode(response.slice()) + const id = Exchange.decode(response) log('read pubkey exchange from peer %p', remoteId) let peerId @@ -81,8 +82,12 @@ async function encrypt (localId: PeerId, conn: Duplex, remoteId?: Pe log('plaintext key exchange completed successfully with peer %p', peerId) shake.rest() + return { - conn: shake.stream, + conn: { + sink: shake.stream.sink, + source: map(shake.stream.source, (buf) => buf.subarray()) + }, remotePeer: peerId, remoteEarlyData: new Uint8Array() } diff --git a/src/insecure/pb/proto.ts b/src/insecure/pb/proto.ts index ec843911..1ff2468f 100644 --- a/src/insecure/pb/proto.ts +++ b/src/insecure/pb/proto.ts @@ -1,9 +1,9 @@ /* eslint-disable import/export */ /* eslint-disable @typescript-eslint/no-namespace */ -import { encodeMessage, decodeMessage, message, bytes, enumeration } from 'protons-runtime' -import type { Codec } from 'protons-runtime' +import { encodeMessage, decodeMessage, message, enumeration } from 'protons-runtime' import type { Uint8ArrayList } from 'uint8arraylist' +import type { Codec } from 'protons-runtime' export interface Exchange { id?: Uint8Array @@ -11,14 +11,57 @@ export interface Exchange { } export namespace Exchange { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'id', codec: bytes, optional: true }, - 2: { name: 'pubkey', codec: PublicKey.codec(), optional: true } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.id != null) { + writer.uint32(10) + writer.bytes(obj.id) + } + + if (obj.pubkey != null) { + writer.uint32(18) + PublicKey.codec().encode(obj.pubkey, writer) + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.id = reader.bytes() + break + case 2: + obj.pubkey = PublicKey.codec().decode(reader, reader.uint32()) + break + default: + reader.skipType(tag & 7) + break + } + } + + return obj + }) + } + + return _codec } - export const encode = (obj: Exchange): Uint8ArrayList => { + export const encode = (obj: Exchange): Uint8Array => { return encodeMessage(obj, Exchange.codec()) } @@ -43,7 +86,7 @@ enum __KeyTypeValues { export namespace KeyType { export const codec = () => { - return enumeration(__KeyTypeValues) + return enumeration(__KeyTypeValues) } } export interface PublicKey { @@ -52,14 +95,69 @@ export interface PublicKey { } export namespace PublicKey { + let _codec: Codec + export const codec = (): Codec => { - return message({ - 1: { name: 'Type', codec: KeyType.codec() }, - 2: { name: 'Data', codec: bytes } - }) + if (_codec == null) { + _codec = message((obj, writer, opts = {}) => { + if (opts.lengthDelimited !== false) { + writer.fork() + } + + if (obj.Type != null) { + writer.uint32(8) + KeyType.codec().encode(obj.Type, writer) + } else { + throw new Error('Protocol error: required field "Type" was not found in object') + } + + if (obj.Data != null) { + writer.uint32(18) + writer.bytes(obj.Data) + } else { + throw new Error('Protocol error: required field "Data" was not found in object') + } + + if (opts.lengthDelimited !== false) { + writer.ldelim() + } + }, (reader, length) => { + const obj: any = {} + + const end = length == null ? reader.len : reader.pos + length + + while (reader.pos < end) { + const tag = reader.uint32() + + switch (tag >>> 3) { + case 1: + obj.Type = KeyType.codec().decode(reader) + break + case 2: + obj.Data = reader.bytes() + break + default: + reader.skipType(tag & 7) + break + } + } + + if (obj.Type == null) { + throw new Error('Protocol error: value for required field "Type" was not found in protobuf') + } + + if (obj.Data == null) { + throw new Error('Protocol error: value for required field "Data" was not found in protobuf') + } + + return obj + }) + } + + return _codec } - export const encode = (obj: PublicKey): Uint8ArrayList => { + export const encode = (obj: PublicKey): Uint8Array => { return encodeMessage(obj, PublicKey.codec()) } diff --git a/src/metrics/index.ts b/src/metrics/index.ts index 6c4429bb..80f1c5fa 100644 --- a/src/metrics/index.ts +++ b/src/metrics/index.ts @@ -6,7 +6,6 @@ import { DefaultStats, StatsInit } from './stats.js' import type { ComponentMetricsUpdate, Metrics, Stats, TrackedMetric, TrackStreamOptions } from '@libp2p/interface-metrics' import type { PeerId } from '@libp2p/interface-peer-id' import type { Startable } from '@libp2p/interfaces/startable' -import type { Duplex } from 'it-stream-types' const initialCounters: ['dataReceived', 'dataSent'] = [ 'dataReceived', @@ -263,11 +262,11 @@ export class DefaultMetrics implements Metrics, Startable { * When the `PeerId` is known, `Metrics.updatePlaceholder` should be called * with the placeholder string returned from here, and the known `PeerId`. */ - trackStream > (opts: TrackStreamOptions): T { + trackStream (opts: TrackStreamOptions): void { const { stream, remotePeer, protocol } = opts if (!this.running) { - return stream + return } const source = stream.source @@ -275,7 +274,7 @@ export class DefaultMetrics implements Metrics, Startable { remotePeer, protocol, direction: 'in', - dataLength: chunk.length + dataLength: chunk.byteLength })) const sink = stream.sink @@ -287,14 +286,12 @@ export class DefaultMetrics implements Metrics, Startable { remotePeer, protocol, direction: 'out', - dataLength: chunk.length + dataLength: chunk.byteLength }) }), sink ) } - - return stream } } diff --git a/src/ping/index.ts b/src/ping/index.ts index 8eb83fb1..e5f73865 100644 --- a/src/ping/index.ts +++ b/src/ping/index.ts @@ -113,7 +113,7 @@ export class PingService implements Startable { ) const end = Date.now() - if (result == null || !uint8ArrayEquals(data, result)) { + if (result == null || !uint8ArrayEquals(data, result.subarray())) { throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK) } diff --git a/src/pnet/index.ts b/src/pnet/index.ts index 2a14d52d..e1333083 100644 --- a/src/pnet/index.ts +++ b/src/pnet/index.ts @@ -13,6 +13,7 @@ import { import { handshake } from 'it-handshake' import { NONCE_LENGTH } from './key-generator.js' import type { ConnectionProtector, MultiaddrConnection } from '@libp2p/interface-connection' +import map from 'it-map' const log = logger('libp2p:pnet') @@ -83,6 +84,7 @@ export class PreSharedKeyConnectionProtector implements ConnectionProtector { // Encrypt all outbound traffic createBoxStream(localNonce, this.psk), shake.stream, + (source) => map(source, (buf) => buf.subarray()), // Decrypt all inbound traffic createUnboxStream(remoteNonce, this.psk), external diff --git a/src/upgrader.ts b/src/upgrader.ts index 1c5919fe..8d314bb1 100644 --- a/src/upgrader.ts +++ b/src/upgrader.ts @@ -1,6 +1,6 @@ import { logger } from '@libp2p/logger' import errCode from 'err-code' -import { Dialer, Listener } from '@libp2p/multistream-select' +import * as mss from '@libp2p/multistream-select' import { pipe } from 'it-pipe' // @ts-expect-error mutable-proxy does not export types import mutableProxy from 'mutable-proxy' @@ -152,7 +152,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` setPeer({ toString: () => idString }) - maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) + metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) } log('starting the inbound connection upgrade') @@ -253,7 +253,7 @@ export class DefaultUpgrader extends EventEmitter implements Upg ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` setPeer({ toB58String: () => idString }) - maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) + metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) } log('Starting the outbound connection upgrade') @@ -351,9 +351,8 @@ export class DefaultUpgrader extends EventEmitter implements Upg void Promise.resolve() .then(async () => { - const mss = new Listener(muxedStream) const protocols = this.components.getRegistrar().getProtocols() - const { stream, protocol } = await mss.handle(protocols) + const { stream, protocol } = await mss.handle(muxedStream, protocols) log('%s: incoming stream opened on %s', direction, protocol) const metrics = this.components.getMetrics() @@ -405,7 +404,6 @@ export class DefaultUpgrader extends EventEmitter implements Upg log('%s: starting new stream on %s', direction, protocols) const muxedStream = muxer.newStream() - const mss = new Dialer(muxedStream) const metrics = this.components.getMetrics() let controller: TimeoutController | undefined @@ -422,10 +420,10 @@ export class DefaultUpgrader extends EventEmitter implements Upg } catch {} } - let { stream, protocol } = await mss.select(protocols, options) + const { stream, protocol } = await mss.select(muxedStream, protocols, options) if (metrics != null) { - stream = metrics.trackStream({ stream, remotePeer, protocol }) + metrics.trackStream({ stream, remotePeer, protocol }) } const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.getRegistrar()) @@ -545,12 +543,13 @@ export class DefaultUpgrader extends EventEmitter implements Upg * Attempts to encrypt the incoming `connection` with the provided `cryptos` */ async _encryptInbound (connection: Duplex): Promise { - const mss = new Listener(connection) const protocols = Array.from(this.connectionEncryption.keys()) log('handling inbound crypto protocol selection', protocols) try { - const { stream, protocol } = await mss.handle(protocols) + const { stream, protocol } = await mss.handle(connection, protocols, { + writeBytes: true + }) const encrypter = this.connectionEncryption.get(protocol) if (encrypter == null) { @@ -573,12 +572,13 @@ export class DefaultUpgrader extends EventEmitter implements Upg * The first `ConnectionEncrypter` module to succeed will be used */ async _encryptOutbound (connection: MultiaddrConnection, remotePeerId: PeerId): Promise { - const mss = new Dialer(connection) const protocols = Array.from(this.connectionEncryption.keys()) log('selecting outbound crypto protocol', protocols) try { - const { stream, protocol } = await mss.select(protocols) + const { stream, protocol } = await mss.select(connection, protocols, { + writeBytes: true + }) const encrypter = this.connectionEncryption.get(protocol) if (encrypter == null) { @@ -601,11 +601,12 @@ export class DefaultUpgrader extends EventEmitter implements Upg * muxer will be used for all future streams on the connection. */ async _multiplexOutbound (connection: MultiaddrConnection, muxers: Map): Promise<{ stream: Duplex, muxerFactory?: StreamMuxerFactory}> { - const dialer = new Dialer(connection) const protocols = Array.from(muxers.keys()) log('outbound selecting muxer %s', protocols) try { - const { stream, protocol } = await dialer.select(protocols) + const { stream, protocol } = await mss.select(connection, protocols, { + writeBytes: true + }) log('%s selected as muxer protocol', protocol) const muxerFactory = muxers.get(protocol) return { stream, muxerFactory } @@ -620,11 +621,12 @@ export class DefaultUpgrader extends EventEmitter implements Upg * selected muxer will be used for all future streams on the connection. */ async _multiplexInbound (connection: MultiaddrConnection, muxers: Map): Promise<{ stream: Duplex, muxerFactory?: StreamMuxerFactory}> { - const listener = new Listener(connection) const protocols = Array.from(muxers.keys()) log('inbound handling muxers %s', protocols) try { - const { stream, protocol } = await listener.handle(protocols) + const { stream, protocol } = await mss.handle(connection, protocols, { + writeBytes: true + }) const muxerFactory = muxers.get(protocol) return { stream, muxerFactory } } catch (err: any) { diff --git a/test/configuration/utils.ts b/test/configuration/utils.ts index bc587e5c..a34385c9 100644 --- a/test/configuration/utils.ts +++ b/test/configuration/utils.ts @@ -10,7 +10,6 @@ import type { Libp2pInit, Libp2pOptions } from '../../src/index.js' import type { PeerId } from '@libp2p/interface-peer-id' import * as cborg from 'cborg' import { peerIdFromString } from '@libp2p/peer-id' -import { Uint8ArrayList } from 'uint8arraylist' const relayAddr = MULTIADDRS_WEBSOCKETS[0] @@ -33,16 +32,16 @@ class MockPubSub extends PubSubBaseProtocol { return cborg.decode(bytes) } - encodeRpc (rpc: PubSubRPC): Uint8ArrayList { - return new Uint8ArrayList(cborg.encode(rpc)) + encodeRpc (rpc: PubSubRPC): Uint8Array { + return cborg.encode(rpc) } decodeMessage (bytes: Uint8Array): PubSubRPCMessage { return cborg.decode(bytes) } - encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList { - return new Uint8ArrayList(cborg.encode(rpc)) + encodeMessage (rpc: PubSubRPCMessage): Uint8Array { + return cborg.encode(rpc) } async publishMessage (from: PeerId, message: Message): Promise { diff --git a/test/core/consume-peer-record.spec.ts b/test/core/consume-peer-record.spec.ts index 70dad598..0f7a126c 100644 --- a/test/core/consume-peer-record.spec.ts +++ b/test/core/consume-peer-record.spec.ts @@ -1,7 +1,7 @@ /* eslint-env mocha */ import { WebSockets } from '@libp2p/websockets' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createPeerId } from '../utils/creators/peer.js' import { Multiaddr } from '@multiformats/multiaddr' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' @@ -18,7 +18,7 @@ describe('Consume peer record', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ] } libp2p = await createLibp2pNode(config) diff --git a/test/core/encryption.spec.ts b/test/core/encryption.spec.ts index 54d4d4c8..e8f52b80 100644 --- a/test/core/encryption.spec.ts +++ b/test/core/encryption.spec.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import { WebSockets } from '@libp2p/websockets' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createLibp2p, Libp2pOptions } from '../../src/index.js' import { codes as ErrorCodes } from '../../src/errors.js' import { createPeerId } from '../utils/creators/peer.js' @@ -46,7 +46,7 @@ describe('Connection encryption configuration', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ] } await createLibp2p(config) diff --git a/test/core/get-public-key.spec.ts b/test/core/get-public-key.spec.ts index c72f695d..041d9c49 100644 --- a/test/core/get-public-key.spec.ts +++ b/test/core/get-public-key.spec.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import { WebSockets } from '@libp2p/websockets' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createPeerId } from '../utils/creators/peer.js' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import type { Libp2pOptions } from '../../src/index.js' @@ -20,7 +20,7 @@ describe('getPublicKey', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ], dht: new KadDHT() } diff --git a/test/core/listening.node.ts b/test/core/listening.node.ts index fdb7e662..41eeddf8 100644 --- a/test/core/listening.node.ts +++ b/test/core/listening.node.ts @@ -2,7 +2,7 @@ import { expect } from 'aegir/chai' import { TCP } from '@libp2p/tcp' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createPeerId } from '../utils/creators/peer.js' import type { PeerId } from '@libp2p/interface-peer-id' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' @@ -31,7 +31,7 @@ describe('Listening', () => { new TCP() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) diff --git a/test/dialing/direct.node.ts b/test/dialing/direct.node.ts index 19091bf3..d4965adb 100644 --- a/test/dialing/direct.node.ts +++ b/test/dialing/direct.node.ts @@ -4,7 +4,7 @@ import { expect } from 'aegir/chai' import sinon from 'sinon' import { TCP } from '@libp2p/tcp' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { Multiaddr } from '@multiformats/multiaddr' import delay from 'delay' @@ -245,7 +245,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => { @@ -278,7 +278,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -298,7 +298,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -325,7 +325,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -354,7 +354,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -418,7 +418,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -444,7 +444,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -467,7 +467,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ], connectionProtector: new PreSharedKeyConnectionProtector({ psk: swarmKeyBuffer @@ -505,7 +505,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -541,7 +541,7 @@ describe('libp2p.dialer (direct, TCP)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) diff --git a/test/dialing/direct.spec.ts b/test/dialing/direct.spec.ts index 846800a9..f20ad79c 100644 --- a/test/dialing/direct.spec.ts +++ b/test/dialing/direct.spec.ts @@ -7,7 +7,7 @@ import delay from 'delay' import { WebSockets } from '@libp2p/websockets' import * as filters from '@libp2p/websockets/filters' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { Multiaddr } from '@multiformats/multiaddr' import { AbortError } from '@libp2p/interfaces/errors' import { MemoryDatastore } from 'datastore-core/memory' @@ -360,7 +360,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -385,7 +385,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ], connectionManager: { maxParallelDials: 10, @@ -416,7 +416,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -450,7 +450,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -490,7 +490,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -518,7 +518,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -539,7 +539,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -576,7 +576,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -602,7 +602,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) diff --git a/test/fetch/fetch.node.ts b/test/fetch/fetch.node.ts index 32a089d9..82a27d2a 100644 --- a/test/fetch/fetch.node.ts +++ b/test/fetch/fetch.node.ts @@ -4,7 +4,7 @@ import { expect } from 'aegir/chai' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import { TCP } from '@libp2p/tcp' import { Mplex } from '@libp2p/mplex' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { createPeerId } from '../utils/creators/peer.js' import { codes } from '../../src/errors.js' import type { PeerId } from '@libp2p/interface-peer-id' @@ -22,7 +22,7 @@ async function createNode (peerId: PeerId) { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) } diff --git a/test/metrics/index.node.ts b/test/metrics/index.node.ts index 89296b1e..380989eb 100644 --- a/test/metrics/index.node.ts +++ b/test/metrics/index.node.ts @@ -13,6 +13,7 @@ import type { Libp2pOptions } from '../../src/index.js' import type { DefaultMetrics } from '../../src/metrics/index.js' import pWaitFor from 'p-wait-for' import drain from 'it-drain' +import map from 'it-map' describe('libp2p.metrics', () => { let libp2p: Libp2pNode @@ -97,7 +98,7 @@ describe('libp2p.metrics', () => { const result = await pipe( [bytes], stream, - async (source) => await toBuffer(source) + async (source) => await toBuffer(map(source, (list) => list.subarray())) ) // Flush the call stack diff --git a/test/transports/transport-manager.spec.ts b/test/transports/transport-manager.spec.ts index 8ca53aa9..348ef695 100644 --- a/test/transports/transport-manager.spec.ts +++ b/test/transports/transport-manager.spec.ts @@ -5,7 +5,7 @@ import sinon from 'sinon' import { Multiaddr } from '@multiformats/multiaddr' import { WebSockets } from '@libp2p/websockets' import * as filters from '@libp2p/websockets/filters' -import { NOISE } from '@chainsafe/libp2p-noise' +import { Plaintext } from '../../src/insecure/index.js' import { DefaultAddressManager } from '../../src/address-manager/index.js' import { DefaultTransportManager, FaultTolerance } from '../../src/transport-manager.js' import { mockUpgrader } from '@libp2p/interface-mocks' @@ -109,7 +109,7 @@ describe('libp2p.transportManager (dial only)', () => { listen: ['/ip4/127.0.0.1/tcp/0'] }, transports: [new WebSockets()], - connectionEncryption: [NOISE] + connectionEncryption: [new Plaintext()] }) await expect(libp2p.start()).to.eventually.be.rejected @@ -129,7 +129,7 @@ describe('libp2p.transportManager (dial only)', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) @@ -149,7 +149,7 @@ describe('libp2p.transportManager (dial only)', () => { new WebSockets() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) diff --git a/test/upgrading/upgrader.spec.ts b/test/upgrading/upgrader.spec.ts index 9a8bf95e..48ca372e 100644 --- a/test/upgrading/upgrader.spec.ts +++ b/test/upgrading/upgrader.spec.ts @@ -8,7 +8,6 @@ import { pipe } from 'it-pipe' import all from 'it-all' import pSettle from 'p-settle' import { WebSockets } from '@libp2p/websockets' -import { NOISE } from '@chainsafe/libp2p-noise' import { PreSharedKeyConnectionProtector } from '../../src/pnet/index.js' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import swarmKey from '../fixtures/swarm.key.js' @@ -30,6 +29,7 @@ import { pEvent } from 'p-event' import { TimeoutController } from 'timeout-abort-controller' import delay from 'delay' import drain from 'it-drain' +import { Uint8ArrayList } from 'uint8arraylist' const addrs = [ new Multiaddr('/ip4/127.0.0.1/tcp/0'), @@ -409,7 +409,7 @@ describe('Upgrader', () => { source: (async function * () { // longer than the timeout await delay(1000) - yield new Uint8Array() + yield new Uint8ArrayList() }()), sink: drain }) @@ -479,7 +479,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ], connectionProtector: new PreSharedKeyConnectionProtector({ psk: uint8ArrayFromString(swarmKey) @@ -501,7 +501,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await libp2p.start() @@ -517,7 +517,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.start() @@ -548,7 +548,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await libp2p.start() @@ -562,7 +562,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.start() @@ -607,7 +607,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await libp2p.start() @@ -621,7 +621,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.start() @@ -669,7 +669,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await libp2p.start() @@ -683,7 +683,7 @@ describe('libp2p.upgrader', () => { new Mplex() ], connectionEncryption: [ - NOISE + new Plaintext() ] }) await remoteLibp2p.start()