deps!: update all deps to support no-copy operations (#1335)

Updates all deps needed to support passing lists of byte arrays where they have been created from multiple input buffers.

When reading multiplexed data, all messages arrive in length-prefixed buffers, which means the first few bytes tell the consumer how many bytes long next chunk will be.

One length prefixed chunk can be delivered in several payloads from the underlying network transport. The first payload can also include the length prefix and some or all of the data, so we stitch these together in a `Uint8ArrayList` to avoid having to concatenate `Uint8Array`s together.

Previously once we'd received enough bytes to satisfy the length prefix we'd concatenate the bytes together, but this is a potentially expensive operation where transports have small message sizes so instead just pass the `Uint8ArrayList` to the consumer and let them decide wether to concatenate or not as some consumers will be smart enough to operate on lists of `Uint8Array`s instead of always requiring a contiguous block of memory.

BREAKING CHANGE: Streams are now `Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>`
This commit is contained in:
Alex Potsides 2022-08-11 13:21:04 +01:00 committed by GitHub
parent 564f4b8aa7
commit f439d9b589
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
40 changed files with 626 additions and 200 deletions

View File

@ -1,6 +1,6 @@
import { WebSockets } from '@libp2p/websockets' import { WebSockets } from '@libp2p/websockets'
import { Mplex } from '@libp2p/mplex' import { Mplex } from '@libp2p/mplex'
import { NOISE } from '@chainsafe/libp2p-noise' import { Noise } from '@chainsafe/libp2p-noise'
import { pipe } from 'it-pipe' import { pipe } from 'it-pipe'
import { createFromJSON } from '@libp2p/peer-id-factory' import { createFromJSON } from '@libp2p/peer-id-factory'
@ -31,7 +31,7 @@ export default {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE, new Noise(),
new Plaintext() new Plaintext()
], ],
relay: { relay: {

View File

@ -34,7 +34,7 @@ const createNode = async () => {
stream, stream,
async function (source) { async function (source) {
for await (const msg of source) { for await (const msg of source) {
console.log(uint8ArrayToString(msg)) console.log(uint8ArrayToString(msg.subarray()))
} }
} }
) )

View File

@ -3,13 +3,13 @@
"version": "0.1.0", "version": "0.1.0",
"private": true, "private": true,
"dependencies": { "dependencies": {
"@chainsafe/libp2p-noise": "^7.0.1", "@chainsafe/libp2p-noise": "^8.0.0",
"ipfs-core": "^0.14.1", "ipfs-core": "^0.15.4",
"libp2p": "../../", "libp2p": "../../",
"@libp2p/delegated-content-routing": "^2.0.1", "@libp2p/delegated-content-routing": "^2.0.1",
"@libp2p/delegated-peer-routing": "^2.0.1", "@libp2p/delegated-peer-routing": "^2.0.1",
"@libp2p/kad-dht": "^3.0.0", "@libp2p/kad-dht": "^3.0.0",
"@libp2p/mplex": "^4.0.2", "@libp2p/mplex": "^5.0.0",
"@libp2p/webrtc-star": "^3.0.0", "@libp2p/webrtc-star": "^3.0.0",
"@libp2p/websockets": "^3.0.0", "@libp2p/websockets": "^3.0.0",
"react": "^17.0.2", "react": "^17.0.2",

View File

@ -51,7 +51,7 @@ async function run() {
// For each chunk of data // For each chunk of data
for await (const data of source) { for await (const data of source) {
// Output the data // Output the data
console.log('received echo:', uint8ArrayToString(data)) console.log('received echo:', uint8ArrayToString(data.subarray()))
} }
} }
) )

View File

@ -9,9 +9,9 @@
}, },
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"@chainsafe/libp2p-noise": "^7.0.1", "@chainsafe/libp2p-noise": "^8.0.0",
"@libp2p/bootstrap": "^2.0.0", "@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^4.0.2", "@libp2p/mplex": "^5.0.0",
"@libp2p/webrtc-star": "^3.0.0", "@libp2p/webrtc-star": "^3.0.0",
"@libp2p/websockets": "^3.0.0", "@libp2p/websockets": "^3.0.0",
"libp2p": "../../" "libp2p": "../../"

View File

@ -37,7 +37,7 @@ generateKey(otherSwarmKey)
stream, stream,
async function (source) { async function (source) {
for await (const msg of source) { for await (const msg of source) {
console.log(uint8ArrayToString(msg)) console.log(uint8ArrayToString(msg.subarray()))
} }
} }
) )

View File

@ -36,7 +36,7 @@ const createNode = async () => {
stream, stream,
async function (source) { async function (source) {
for await (const msg of source) { for await (const msg of source) {
console.log(uint8ArrayToString(msg)) console.log(uint8ArrayToString(msg.subarray()))
} }
} }
) )

View File

@ -35,7 +35,7 @@ const createNode = async () => {
stream, stream,
async function (source) { async function (source) {
for await (const msg of source) { for await (const msg of source) {
console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg)}`) console.log(`from: ${protocol}, msg: ${uint8ArrayToString(msg.subarray())}`)
} }
} }
).finally(() => { ).finally(() => {

View File

@ -37,7 +37,7 @@ const createNode = async () => {
stream, stream,
async function (source) { async function (source) {
for await (const msg of source) { for await (const msg of source) {
console.log(uint8ArrayToString(msg)) console.log(uint8ArrayToString(msg.subarray()))
} }
} }
) )
@ -48,7 +48,7 @@ const createNode = async () => {
stream, stream,
async function (source) { async function (source) {
for await (const msg of source) { for await (const msg of source) {
console.log(uint8ArrayToString(msg)) console.log(uint8ArrayToString(msg.subarray()))
} }
} }
) )

View File

@ -42,6 +42,11 @@ function printAddrs (node, number) {
node2.handle('/print', async ({ stream }) => { node2.handle('/print', async ({ stream }) => {
const result = await pipe( const result = await pipe(
stream, stream,
async function * (source) {
for await (const list of source) {
yield list.subarray()
}
},
toBuffer toBuffer
) )
console.log(uint8ArrayToString(result)) console.log(uint8ArrayToString(result))

View File

@ -37,7 +37,7 @@ function print ({ stream }) {
stream, stream,
async function (source) { async function (source) {
for await (const msg of source) { for await (const msg of source) {
console.log(uint8ArrayToString(msg)) console.log(uint8ArrayToString(msg.subarray()))
} }
} }
) )

View File

@ -57,7 +57,7 @@ function print ({ stream }) {
stream, stream,
async function (source) { async function (source) {
for await (const msg of source) { for await (const msg of source) {
console.log(uint8ArrayToString(msg)) console.log(uint8ArrayToString(msg.subarray()))
} }
} }
) )

View File

@ -10,9 +10,9 @@
"license": "ISC", "license": "ISC",
"dependencies": { "dependencies": {
"@libp2p/webrtc-direct": "^2.0.0", "@libp2p/webrtc-direct": "^2.0.0",
"@chainsafe/libp2p-noise": "^7.0.3", "@chainsafe/libp2p-noise": "^8.0.0",
"@libp2p/bootstrap": "^2.0.0", "@libp2p/bootstrap": "^2.0.0",
"@libp2p/mplex": "^4.0.3", "@libp2p/mplex": "^5.0.0",
"libp2p": "../../", "libp2p": "../../",
"wrtc": "^0.4.7" "wrtc": "^0.4.7"
}, },

View File

@ -98,36 +98,36 @@
}, },
"dependencies": { "dependencies": {
"@achingbrain/nat-port-mapper": "^1.0.3", "@achingbrain/nat-port-mapper": "^1.0.3",
"@libp2p/components": "^2.0.1", "@libp2p/components": "^2.0.3",
"@libp2p/connection": "^4.0.0", "@libp2p/connection": "^4.0.1",
"@libp2p/crypto": "^1.0.0", "@libp2p/crypto": "^1.0.3",
"@libp2p/interface-address-manager": "^1.0.1", "@libp2p/interface-address-manager": "^1.0.2",
"@libp2p/interface-connection": "^2.0.0", "@libp2p/interface-connection": "^3.0.1",
"@libp2p/interface-connection-encrypter": "^1.0.2", "@libp2p/interface-connection-encrypter": "^2.0.1",
"@libp2p/interface-content-routing": "^1.0.1", "@libp2p/interface-content-routing": "^1.0.2",
"@libp2p/interface-dht": "^1.0.0", "@libp2p/interface-dht": "^1.0.1",
"@libp2p/interface-metrics": "^2.0.0", "@libp2p/interface-metrics": "^3.0.0",
"@libp2p/interface-peer-discovery": "^1.0.0", "@libp2p/interface-peer-discovery": "^1.0.1",
"@libp2p/interface-peer-id": "^1.0.2", "@libp2p/interface-peer-id": "^1.0.4",
"@libp2p/interface-peer-info": "^1.0.1", "@libp2p/interface-peer-info": "^1.0.2",
"@libp2p/interface-peer-routing": "^1.0.0", "@libp2p/interface-peer-routing": "^1.0.1",
"@libp2p/interface-peer-store": "^1.2.0", "@libp2p/interface-peer-store": "^1.2.1",
"@libp2p/interface-pubsub": "^2.0.0", "@libp2p/interface-pubsub": "^2.0.1",
"@libp2p/interface-registrar": "^2.0.0", "@libp2p/interface-registrar": "^2.0.3",
"@libp2p/interface-stream-muxer": "^2.0.1", "@libp2p/interface-stream-muxer": "^2.0.2",
"@libp2p/interface-transport": "^1.0.0", "@libp2p/interface-transport": "^1.0.3",
"@libp2p/interfaces": "^3.0.2", "@libp2p/interfaces": "^3.0.3",
"@libp2p/logger": "^2.0.0", "@libp2p/logger": "^2.0.0",
"@libp2p/multistream-select": "^2.0.1", "@libp2p/multistream-select": "^3.0.0",
"@libp2p/peer-collections": "^2.0.0", "@libp2p/peer-collections": "^2.0.0",
"@libp2p/peer-id": "^1.1.10", "@libp2p/peer-id": "^1.1.15",
"@libp2p/peer-id-factory": "^1.0.9", "@libp2p/peer-id-factory": "^1.0.18",
"@libp2p/peer-record": "^4.0.0", "@libp2p/peer-record": "^4.0.1",
"@libp2p/peer-store": "^3.0.0", "@libp2p/peer-store": "^3.1.2",
"@libp2p/tracked-map": "^2.0.1", "@libp2p/tracked-map": "^2.0.1",
"@libp2p/utils": "^3.0.0", "@libp2p/utils": "^3.0.1",
"@multiformats/mafmt": "^11.0.2", "@multiformats/mafmt": "^11.0.2",
"@multiformats/multiaddr": "^10.1.8", "@multiformats/multiaddr": "^10.3.3",
"abortable-iterator": "^4.0.2", "abortable-iterator": "^4.0.2",
"any-signal": "^3.0.0", "any-signal": "^3.0.0",
"datastore-core": "^7.0.0", "datastore-core": "^7.0.0",
@ -140,7 +140,7 @@
"it-filter": "^1.0.3", "it-filter": "^1.0.3",
"it-first": "^1.0.6", "it-first": "^1.0.6",
"it-foreach": "^0.1.1", "it-foreach": "^0.1.1",
"it-handshake": "^4.0.0", "it-handshake": "^4.1.2",
"it-length-prefixed": "^8.0.2", "it-length-prefixed": "^8.0.2",
"it-map": "^1.0.6", "it-map": "^1.0.6",
"it-merge": "^1.0.3", "it-merge": "^1.0.3",
@ -156,31 +156,31 @@
"p-retry": "^5.0.0", "p-retry": "^5.0.0",
"p-settle": "^5.0.0", "p-settle": "^5.0.0",
"private-ip": "^2.3.3", "private-ip": "^2.3.3",
"protons-runtime": "^2.0.2", "protons-runtime": "^3.0.1",
"retimer": "^3.0.0", "retimer": "^3.0.0",
"sanitize-filename": "^1.6.3", "sanitize-filename": "^1.6.3",
"set-delayed-interval": "^1.0.0", "set-delayed-interval": "^1.0.0",
"timeout-abort-controller": "^3.0.0", "timeout-abort-controller": "^3.0.0",
"uint8arraylist": "^2.0.0", "uint8arraylist": "^2.3.2",
"uint8arrays": "^3.0.0", "uint8arrays": "^3.0.0",
"wherearewe": "^1.0.0", "wherearewe": "^1.0.0",
"xsalsa20": "^1.1.0" "xsalsa20": "^1.1.0"
}, },
"devDependencies": { "devDependencies": {
"@chainsafe/libp2p-noise": "^7.0.2", "@chainsafe/libp2p-noise": "^8.0.0",
"@libp2p/bootstrap": "^2.0.0", "@libp2p/bootstrap": "^2.0.0",
"@libp2p/daemon-client": "^2.0.0", "@libp2p/daemon-client": "^2.0.4",
"@libp2p/daemon-server": "^2.0.0", "@libp2p/daemon-server": "^2.0.4",
"@libp2p/delegated-content-routing": "^2.0.1", "@libp2p/delegated-content-routing": "^2.0.1",
"@libp2p/delegated-peer-routing": "^2.0.1", "@libp2p/delegated-peer-routing": "^2.0.1",
"@libp2p/floodsub": "^3.0.0", "@libp2p/floodsub": "^3.0.0",
"@libp2p/interface-compliance-tests": "^3.0.1", "@libp2p/interface-compliance-tests": "^3.0.1",
"@libp2p/interface-connection-encrypter-compliance-tests": "^1.0.0", "@libp2p/interface-connection-encrypter-compliance-tests": "^2.0.1",
"@libp2p/interface-mocks": "^3.0.1", "@libp2p/interface-mocks": "^4.0.1",
"@libp2p/interop": "^2.0.0", "@libp2p/interop": "^2.0.0",
"@libp2p/kad-dht": "^3.0.0", "@libp2p/kad-dht": "^3.0.1",
"@libp2p/mdns": "^3.0.0", "@libp2p/mdns": "^3.0.0",
"@libp2p/mplex": "^4.0.2", "@libp2p/mplex": "^5.0.0",
"@libp2p/pubsub": "^3.0.1", "@libp2p/pubsub": "^3.0.1",
"@libp2p/tcp": "^3.0.0", "@libp2p/tcp": "^3.0.0",
"@libp2p/topology": "^3.0.0", "@libp2p/topology": "^3.0.0",
@ -205,7 +205,7 @@
"p-event": "^5.0.1", "p-event": "^5.0.1",
"p-times": "^4.0.0", "p-times": "^4.0.0",
"p-wait-for": "^5.0.0", "p-wait-for": "^5.0.0",
"protons": "^4.0.1", "protons": "^5.0.0",
"rimraf": "^3.0.2", "rimraf": "^3.0.2",
"sinon": "^14.0.0", "sinon": "^14.0.0",
"ts-sinon": "^2.0.2" "ts-sinon": "^2.0.2"

View File

@ -41,7 +41,7 @@ import { Multiaddr } from '@multiformats/multiaddr'
import Libp2p from 'libp2p' import Libp2p from 'libp2p'
import { TCP } from '@libp2p/tcp' import { TCP } from '@libp2p/tcp'
import { Mplex } from '@libp2p/mplex' import { Mplex } from '@libp2p/mplex'
import { NOISE } from '@chainsafe/libp2p-noise' import { Noise } from '@chainsafe/libp2p-noise'
const relayAddr = ... const relayAddr = ...
@ -56,7 +56,7 @@ const node = await createLibp2p({
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Noise()
] ]
}, },
config: { config: {

View File

@ -13,6 +13,7 @@ import type { Duplex } from 'it-stream-types'
import type { Circuit } from '../transport.js' import type { Circuit } from '../transport.js'
import type { ConnectionManager } from '@libp2p/interface-connection-manager' import type { ConnectionManager } from '@libp2p/interface-connection-manager'
import type { AbortOptions } from '@libp2p/interfaces' import type { AbortOptions } from '@libp2p/interfaces'
import type { Uint8ArrayList } from 'uint8arraylist'
const log = logger('libp2p:circuit:hop') const log = logger('libp2p:circuit:hop')
@ -24,7 +25,7 @@ export interface HopRequest {
connectionManager: ConnectionManager connectionManager: ConnectionManager
} }
export async function handleHop (hopRequest: HopRequest) { export async function handleHop (hopRequest: HopRequest): Promise<void> {
const { const {
connection, connection,
request, request,
@ -84,7 +85,7 @@ export async function handleHop (hopRequest: HopRequest) {
srcPeer: request.srcPeer srcPeer: request.srcPeer
} }
let destinationStream: Duplex<Uint8Array> let destinationStream: Duplex<Uint8ArrayList>
try { try {
log('performing STOP request') log('performing STOP request')
const result = await stop({ const result = await stop({
@ -128,7 +129,7 @@ export interface HopConfig extends AbortOptions {
* Performs a HOP request to a relay peer, to request a connection to another * Performs a HOP request to a relay peer, to request a connection to another
* peer. A new, virtual, connection will be created between the two via the relay. * peer. A new, virtual, connection will be created between the two via the relay.
*/ */
export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> { export async function hop (options: HopConfig): Promise<Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array>> {
const { const {
connection, connection,
request, request,
@ -151,6 +152,7 @@ export async function hop (options: HopConfig): Promise<Duplex<Uint8Array>> {
if (response.code === CircuitPB.Status.SUCCESS) { if (response.code === CircuitPB.Status.SUCCESS) {
log('hop request was successful') log('hop request was successful')
return streamHandler.rest() return streamHandler.rest()
} }

View File

@ -6,6 +6,7 @@ import { validateAddrs } from './utils.js'
import type { Connection } from '@libp2p/interface-connection' import type { Connection } from '@libp2p/interface-connection'
import type { Duplex } from 'it-stream-types' import type { Duplex } from 'it-stream-types'
import type { AbortOptions } from '@libp2p/interfaces' import type { AbortOptions } from '@libp2p/interfaces'
import type { Uint8ArrayList } from 'uint8arraylist'
const log = logger('libp2p:circuit:stop') const log = logger('libp2p:circuit:stop')
@ -18,7 +19,7 @@ export interface HandleStopOptions {
/** /**
* Handles incoming STOP requests * Handles incoming STOP requests
*/ */
export function handleStop (options: HandleStopOptions): Duplex<Uint8Array> | undefined { export function handleStop (options: HandleStopOptions): Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array> | undefined {
const { const {
connection, connection,
request, request,

View File

@ -22,7 +22,7 @@ export interface StreamHandlerOptions {
export class StreamHandler { export class StreamHandler {
private readonly stream: Stream private readonly stream: Stream
private readonly shake: Handshake private readonly shake: Handshake<Uint8ArrayList | Uint8Array>
private readonly decoder: Source<Uint8ArrayList> private readonly decoder: Source<Uint8ArrayList>
constructor (options: StreamHandlerOptions) { constructor (options: StreamHandlerOptions) {
@ -56,7 +56,7 @@ export class StreamHandler {
*/ */
write (msg: CircuitRelay) { write (msg: CircuitRelay) {
log('write message type %s', msg.type) log('write message type %s', msg.type)
this.shake.write(lp.encode.single(CircuitRelay.encode(msg)).slice()) this.shake.write(lp.encode.single(CircuitRelay.encode(msg)))
} }
/** /**

View File

@ -1,9 +1,9 @@
/* eslint-disable import/export */ /* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */ /* eslint-disable @typescript-eslint/no-namespace */
import { enumeration, encodeMessage, decodeMessage, message, bytes } from 'protons-runtime' import { enumeration, encodeMessage, decodeMessage, message } from 'protons-runtime'
import type { Codec } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist' import type { Uint8ArrayList } from 'uint8arraylist'
import type { Codec } from 'protons-runtime'
export interface CircuitRelay { export interface CircuitRelay {
type?: CircuitRelay.Type type?: CircuitRelay.Type
@ -53,7 +53,7 @@ export namespace CircuitRelay {
export namespace Status { export namespace Status {
export const codec = () => { export const codec = () => {
return enumeration<typeof Status>(__StatusValues) return enumeration<Status>(__StatusValues)
} }
} }
@ -73,7 +73,7 @@ export namespace CircuitRelay {
export namespace Type { export namespace Type {
export const codec = () => { export const codec = () => {
return enumeration<typeof Type>(__TypeValues) return enumeration<Type>(__TypeValues)
} }
} }
@ -83,14 +83,74 @@ export namespace CircuitRelay {
} }
export namespace Peer { export namespace Peer {
let _codec: Codec<Peer>
export const codec = (): Codec<Peer> => { export const codec = (): Codec<Peer> => {
return message<Peer>({ if (_codec == null) {
1: { name: 'id', codec: bytes }, _codec = message<Peer>((obj, writer, opts = {}) => {
2: { name: 'addrs', codec: bytes, repeats: true } if (opts.lengthDelimited !== false) {
}) writer.fork()
}
if (obj.id != null) {
writer.uint32(10)
writer.bytes(obj.id)
} else {
throw new Error('Protocol error: required field "id" was not found in object')
}
if (obj.addrs != null) {
for (const value of obj.addrs) {
writer.uint32(18)
writer.bytes(value)
}
} else {
throw new Error('Protocol error: required field "addrs" was not found in object')
}
if (opts.lengthDelimited !== false) {
writer.ldelim()
}
}, (reader, length) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1:
obj.id = reader.bytes()
break
case 2:
obj.addrs = obj.addrs ?? []
obj.addrs.push(reader.bytes())
break
default:
reader.skipType(tag & 7)
break
}
}
obj.addrs = obj.addrs ?? []
if (obj.id == null) {
throw new Error('Protocol error: value for required field "id" was not found in protobuf')
}
if (obj.addrs == null) {
throw new Error('Protocol error: value for required field "addrs" was not found in protobuf')
}
return obj
})
}
return _codec
} }
export const encode = (obj: Peer): Uint8ArrayList => { export const encode = (obj: Peer): Uint8Array => {
return encodeMessage(obj, Peer.codec()) return encodeMessage(obj, Peer.codec())
} }
@ -99,16 +159,73 @@ export namespace CircuitRelay {
} }
} }
let _codec: Codec<CircuitRelay>
export const codec = (): Codec<CircuitRelay> => { export const codec = (): Codec<CircuitRelay> => {
return message<CircuitRelay>({ if (_codec == null) {
1: { name: 'type', codec: CircuitRelay.Type.codec(), optional: true }, _codec = message<CircuitRelay>((obj, writer, opts = {}) => {
2: { name: 'srcPeer', codec: CircuitRelay.Peer.codec(), optional: true }, if (opts.lengthDelimited !== false) {
3: { name: 'dstPeer', codec: CircuitRelay.Peer.codec(), optional: true }, writer.fork()
4: { name: 'code', codec: CircuitRelay.Status.codec(), optional: true } }
})
if (obj.type != null) {
writer.uint32(8)
CircuitRelay.Type.codec().encode(obj.type, writer)
}
if (obj.srcPeer != null) {
writer.uint32(18)
CircuitRelay.Peer.codec().encode(obj.srcPeer, writer)
}
if (obj.dstPeer != null) {
writer.uint32(26)
CircuitRelay.Peer.codec().encode(obj.dstPeer, writer)
}
if (obj.code != null) {
writer.uint32(32)
CircuitRelay.Status.codec().encode(obj.code, writer)
}
if (opts.lengthDelimited !== false) {
writer.ldelim()
}
}, (reader, length) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1:
obj.type = CircuitRelay.Type.codec().decode(reader)
break
case 2:
obj.srcPeer = CircuitRelay.Peer.codec().decode(reader, reader.uint32())
break
case 3:
obj.dstPeer = CircuitRelay.Peer.codec().decode(reader, reader.uint32())
break
case 4:
obj.code = CircuitRelay.Status.codec().decode(reader)
break
default:
reader.skipType(tag & 7)
break
}
}
return obj
})
}
return _codec
} }
export const encode = (obj: CircuitRelay): Uint8ArrayList => { export const encode = (obj: CircuitRelay): Uint8Array => {
return encodeMessage(obj, CircuitRelay.codec()) return encodeMessage(obj, CircuitRelay.codec())
} }

View File

@ -21,6 +21,8 @@ import type { RelayConfig } from '../index.js'
import { abortableDuplex } from 'abortable-iterator' import { abortableDuplex } from 'abortable-iterator'
import { TimeoutController } from 'timeout-abort-controller' import { TimeoutController } from 'timeout-abort-controller'
import { setMaxListeners } from 'events' import { setMaxListeners } from 'events'
import type { Uint8ArrayList } from 'uint8arraylist'
import type { Duplex } from 'it-stream-types'
const log = logger('libp2p:circuit') const log = logger('libp2p:circuit')
@ -90,7 +92,7 @@ export class Circuit implements Transport, Initializable {
return return
} }
let virtualConnection let virtualConnection: Duplex<Uint8ArrayList, Uint8ArrayList | Uint8Array> | undefined
switch (request.type) { switch (request.type) {
case CircuitPB.Type.CAN_HOP: { case CircuitPB.Type.CAN_HOP: {
@ -100,7 +102,7 @@ export class Circuit implements Transport, Initializable {
} }
case CircuitPB.Type.HOP: { case CircuitPB.Type.HOP: {
log('received HOP request from %p', connection.remotePeer) log('received HOP request from %p', connection.remotePeer)
virtualConnection = await handleHop({ await handleHop({
connection, connection,
request, request,
streamHandler, streamHandler,

View File

@ -1,22 +1,64 @@
/* eslint-disable import/export */ /* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */ /* eslint-disable @typescript-eslint/no-namespace */
import { encodeMessage, decodeMessage, message, string, enumeration, bytes } from 'protons-runtime' import { encodeMessage, decodeMessage, message, enumeration } from 'protons-runtime'
import type { Codec } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist' import type { Uint8ArrayList } from 'uint8arraylist'
import type { Codec } from 'protons-runtime'
export interface FetchRequest { export interface FetchRequest {
identifier: string identifier: string
} }
export namespace FetchRequest { export namespace FetchRequest {
let _codec: Codec<FetchRequest>
export const codec = (): Codec<FetchRequest> => { export const codec = (): Codec<FetchRequest> => {
return message<FetchRequest>({ if (_codec == null) {
1: { name: 'identifier', codec: string } _codec = message<FetchRequest>((obj, writer, opts = {}) => {
}) if (opts.lengthDelimited !== false) {
writer.fork()
}
if (obj.identifier != null) {
writer.uint32(10)
writer.string(obj.identifier)
} else {
throw new Error('Protocol error: required field "identifier" was not found in object')
}
if (opts.lengthDelimited !== false) {
writer.ldelim()
}
}, (reader, length) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1:
obj.identifier = reader.string()
break
default:
reader.skipType(tag & 7)
break
}
}
if (obj.identifier == null) {
throw new Error('Protocol error: value for required field "identifier" was not found in protobuf')
}
return obj
})
}
return _codec
} }
export const encode = (obj: FetchRequest): Uint8ArrayList => { export const encode = (obj: FetchRequest): Uint8Array => {
return encodeMessage(obj, FetchRequest.codec()) return encodeMessage(obj, FetchRequest.codec())
} }
@ -45,18 +87,73 @@ export namespace FetchResponse {
export namespace StatusCode { export namespace StatusCode {
export const codec = () => { export const codec = () => {
return enumeration<typeof StatusCode>(__StatusCodeValues) return enumeration<StatusCode>(__StatusCodeValues)
} }
} }
let _codec: Codec<FetchResponse>
export const codec = (): Codec<FetchResponse> => { export const codec = (): Codec<FetchResponse> => {
return message<FetchResponse>({ if (_codec == null) {
1: { name: 'status', codec: FetchResponse.StatusCode.codec() }, _codec = message<FetchResponse>((obj, writer, opts = {}) => {
2: { name: 'data', codec: bytes } if (opts.lengthDelimited !== false) {
}) writer.fork()
}
if (obj.status != null) {
writer.uint32(8)
FetchResponse.StatusCode.codec().encode(obj.status, writer)
} else {
throw new Error('Protocol error: required field "status" was not found in object')
}
if (obj.data != null) {
writer.uint32(18)
writer.bytes(obj.data)
} else {
throw new Error('Protocol error: required field "data" was not found in object')
}
if (opts.lengthDelimited !== false) {
writer.ldelim()
}
}, (reader, length) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1:
obj.status = FetchResponse.StatusCode.codec().decode(reader)
break
case 2:
obj.data = reader.bytes()
break
default:
reader.skipType(tag & 7)
break
}
}
if (obj.status == null) {
throw new Error('Protocol error: value for required field "status" was not found in protobuf')
}
if (obj.data == null) {
throw new Error('Protocol error: value for required field "data" was not found in protobuf')
}
return obj
})
}
return _codec
} }
export const encode = (obj: FetchResponse): Uint8ArrayList => { export const encode = (obj: FetchResponse): Uint8Array => {
return encodeMessage(obj, FetchResponse.codec()) return encodeMessage(obj, FetchResponse.codec())
} }

View File

@ -26,7 +26,6 @@ import type { Components } from '@libp2p/components'
import { TimeoutController } from 'timeout-abort-controller' import { TimeoutController } from 'timeout-abort-controller'
import type { AbortOptions } from '@libp2p/interfaces' import type { AbortOptions } from '@libp2p/interfaces'
import { abortableDuplex } from 'abortable-iterator' import { abortableDuplex } from 'abortable-iterator'
import type { Duplex } from 'it-stream-types'
import { setMaxListeners } from 'events' import { setMaxListeners } from 'events'
const log = logger('libp2p:identify') const log = logger('libp2p:identify')
@ -179,7 +178,7 @@ export class IdentifyService implements Startable {
}) })
// make stream abortable // make stream abortable
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal) const source = abortableDuplex(stream, timeoutController.signal)
await pipe( await pipe(
[Identify.encode({ [Identify.encode({
@ -418,7 +417,7 @@ export class IdentifyService implements Startable {
}) })
// make stream abortable // make stream abortable
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal) const source = abortableDuplex(stream, timeoutController.signal)
await pipe( await pipe(
[message], [message],
@ -449,7 +448,7 @@ export class IdentifyService implements Startable {
let message: Identify | undefined let message: Identify | undefined
try { try {
// make stream abortable // make stream abortable
const source: Duplex<Uint8Array> = abortableDuplex(stream, timeoutController.signal) const source = abortableDuplex(stream, timeoutController.signal)
const data = await pipe( const data = await pipe(
[], [],

View File

@ -1,9 +1,9 @@
/* eslint-disable import/export */ /* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */ /* eslint-disable @typescript-eslint/no-namespace */
import { encodeMessage, decodeMessage, message, string, bytes } from 'protons-runtime' import { encodeMessage, decodeMessage, message } from 'protons-runtime'
import type { Codec } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist' import type { Uint8ArrayList } from 'uint8arraylist'
import type { Codec } from 'protons-runtime'
export interface Identify { export interface Identify {
protocolVersion?: string protocolVersion?: string
@ -16,19 +16,118 @@ export interface Identify {
} }
export namespace Identify { export namespace Identify {
let _codec: Codec<Identify>
export const codec = (): Codec<Identify> => { export const codec = (): Codec<Identify> => {
return message<Identify>({ if (_codec == null) {
5: { name: 'protocolVersion', codec: string, optional: true }, _codec = message<Identify>((obj, writer, opts = {}) => {
6: { name: 'agentVersion', codec: string, optional: true }, if (opts.lengthDelimited !== false) {
1: { name: 'publicKey', codec: bytes, optional: true }, writer.fork()
2: { name: 'listenAddrs', codec: bytes, repeats: true }, }
4: { name: 'observedAddr', codec: bytes, optional: true },
3: { name: 'protocols', codec: string, repeats: true }, if (obj.protocolVersion != null) {
8: { name: 'signedPeerRecord', codec: bytes, optional: true } writer.uint32(42)
}) writer.string(obj.protocolVersion)
}
if (obj.agentVersion != null) {
writer.uint32(50)
writer.string(obj.agentVersion)
}
if (obj.publicKey != null) {
writer.uint32(10)
writer.bytes(obj.publicKey)
}
if (obj.listenAddrs != null) {
for (const value of obj.listenAddrs) {
writer.uint32(18)
writer.bytes(value)
}
} else {
throw new Error('Protocol error: required field "listenAddrs" was not found in object')
}
if (obj.observedAddr != null) {
writer.uint32(34)
writer.bytes(obj.observedAddr)
}
if (obj.protocols != null) {
for (const value of obj.protocols) {
writer.uint32(26)
writer.string(value)
}
} else {
throw new Error('Protocol error: required field "protocols" was not found in object')
}
if (obj.signedPeerRecord != null) {
writer.uint32(66)
writer.bytes(obj.signedPeerRecord)
}
if (opts.lengthDelimited !== false) {
writer.ldelim()
}
}, (reader, length) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 5:
obj.protocolVersion = reader.string()
break
case 6:
obj.agentVersion = reader.string()
break
case 1:
obj.publicKey = reader.bytes()
break
case 2:
obj.listenAddrs = obj.listenAddrs ?? []
obj.listenAddrs.push(reader.bytes())
break
case 4:
obj.observedAddr = reader.bytes()
break
case 3:
obj.protocols = obj.protocols ?? []
obj.protocols.push(reader.string())
break
case 8:
obj.signedPeerRecord = reader.bytes()
break
default:
reader.skipType(tag & 7)
break
}
}
obj.listenAddrs = obj.listenAddrs ?? []
obj.protocols = obj.protocols ?? []
if (obj.listenAddrs == null) {
throw new Error('Protocol error: value for required field "listenAddrs" was not found in protobuf')
}
if (obj.protocols == null) {
throw new Error('Protocol error: value for required field "protocols" was not found in protobuf')
}
return obj
})
}
return _codec
} }
export const encode = (obj: Identify): Uint8ArrayList => { export const encode = (obj: Identify): Uint8Array => {
return encodeMessage(obj, Identify.codec()) return encodeMessage(obj, Identify.codec())
} }

View File

@ -7,6 +7,7 @@ import type { PeerId } from '@libp2p/interface-peer-id'
import { peerIdFromBytes, peerIdFromKeys } from '@libp2p/peer-id' import { peerIdFromBytes, peerIdFromKeys } from '@libp2p/peer-id'
import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter' import type { ConnectionEncrypter, SecuredConnection } from '@libp2p/interface-connection-encrypter'
import type { Duplex } from 'it-stream-types' import type { Duplex } from 'it-stream-types'
import map from 'it-map'
const log = logger('libp2p:plaintext') const log = logger('libp2p:plaintext')
const PROTOCOL = '/plaintext/2.0.0' const PROTOCOL = '/plaintext/2.0.0'
@ -47,7 +48,7 @@ async function encrypt (localId: PeerId, conn: Duplex<Uint8Array>, remoteId?: Pe
// Get the Exchange message // Get the Exchange message
// @ts-expect-error needs to be generator // @ts-expect-error needs to be generator
const response = (await lp.decode.fromReader(shake.reader).next()).value const response = (await lp.decode.fromReader(shake.reader).next()).value
const id = Exchange.decode(response.slice()) const id = Exchange.decode(response)
log('read pubkey exchange from peer %p', remoteId) log('read pubkey exchange from peer %p', remoteId)
let peerId let peerId
@ -81,8 +82,12 @@ async function encrypt (localId: PeerId, conn: Duplex<Uint8Array>, remoteId?: Pe
log('plaintext key exchange completed successfully with peer %p', peerId) log('plaintext key exchange completed successfully with peer %p', peerId)
shake.rest() shake.rest()
return { return {
conn: shake.stream, conn: {
sink: shake.stream.sink,
source: map(shake.stream.source, (buf) => buf.subarray())
},
remotePeer: peerId, remotePeer: peerId,
remoteEarlyData: new Uint8Array() remoteEarlyData: new Uint8Array()
} }

View File

@ -1,9 +1,9 @@
/* eslint-disable import/export */ /* eslint-disable import/export */
/* eslint-disable @typescript-eslint/no-namespace */ /* eslint-disable @typescript-eslint/no-namespace */
import { encodeMessage, decodeMessage, message, bytes, enumeration } from 'protons-runtime' import { encodeMessage, decodeMessage, message, enumeration } from 'protons-runtime'
import type { Codec } from 'protons-runtime'
import type { Uint8ArrayList } from 'uint8arraylist' import type { Uint8ArrayList } from 'uint8arraylist'
import type { Codec } from 'protons-runtime'
export interface Exchange { export interface Exchange {
id?: Uint8Array id?: Uint8Array
@ -11,14 +11,57 @@ export interface Exchange {
} }
export namespace Exchange { export namespace Exchange {
let _codec: Codec<Exchange>
export const codec = (): Codec<Exchange> => { export const codec = (): Codec<Exchange> => {
return message<Exchange>({ if (_codec == null) {
1: { name: 'id', codec: bytes, optional: true }, _codec = message<Exchange>((obj, writer, opts = {}) => {
2: { name: 'pubkey', codec: PublicKey.codec(), optional: true } if (opts.lengthDelimited !== false) {
}) writer.fork()
}
if (obj.id != null) {
writer.uint32(10)
writer.bytes(obj.id)
}
if (obj.pubkey != null) {
writer.uint32(18)
PublicKey.codec().encode(obj.pubkey, writer)
}
if (opts.lengthDelimited !== false) {
writer.ldelim()
}
}, (reader, length) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1:
obj.id = reader.bytes()
break
case 2:
obj.pubkey = PublicKey.codec().decode(reader, reader.uint32())
break
default:
reader.skipType(tag & 7)
break
}
}
return obj
})
}
return _codec
} }
export const encode = (obj: Exchange): Uint8ArrayList => { export const encode = (obj: Exchange): Uint8Array => {
return encodeMessage(obj, Exchange.codec()) return encodeMessage(obj, Exchange.codec())
} }
@ -43,7 +86,7 @@ enum __KeyTypeValues {
export namespace KeyType { export namespace KeyType {
export const codec = () => { export const codec = () => {
return enumeration<typeof KeyType>(__KeyTypeValues) return enumeration<KeyType>(__KeyTypeValues)
} }
} }
export interface PublicKey { export interface PublicKey {
@ -52,14 +95,69 @@ export interface PublicKey {
} }
export namespace PublicKey { export namespace PublicKey {
let _codec: Codec<PublicKey>
export const codec = (): Codec<PublicKey> => { export const codec = (): Codec<PublicKey> => {
return message<PublicKey>({ if (_codec == null) {
1: { name: 'Type', codec: KeyType.codec() }, _codec = message<PublicKey>((obj, writer, opts = {}) => {
2: { name: 'Data', codec: bytes } if (opts.lengthDelimited !== false) {
}) writer.fork()
}
if (obj.Type != null) {
writer.uint32(8)
KeyType.codec().encode(obj.Type, writer)
} else {
throw new Error('Protocol error: required field "Type" was not found in object')
}
if (obj.Data != null) {
writer.uint32(18)
writer.bytes(obj.Data)
} else {
throw new Error('Protocol error: required field "Data" was not found in object')
}
if (opts.lengthDelimited !== false) {
writer.ldelim()
}
}, (reader, length) => {
const obj: any = {}
const end = length == null ? reader.len : reader.pos + length
while (reader.pos < end) {
const tag = reader.uint32()
switch (tag >>> 3) {
case 1:
obj.Type = KeyType.codec().decode(reader)
break
case 2:
obj.Data = reader.bytes()
break
default:
reader.skipType(tag & 7)
break
}
}
if (obj.Type == null) {
throw new Error('Protocol error: value for required field "Type" was not found in protobuf')
}
if (obj.Data == null) {
throw new Error('Protocol error: value for required field "Data" was not found in protobuf')
}
return obj
})
}
return _codec
} }
export const encode = (obj: PublicKey): Uint8ArrayList => { export const encode = (obj: PublicKey): Uint8Array => {
return encodeMessage(obj, PublicKey.codec()) return encodeMessage(obj, PublicKey.codec())
} }

View File

@ -6,7 +6,6 @@ import { DefaultStats, StatsInit } from './stats.js'
import type { ComponentMetricsUpdate, Metrics, Stats, TrackedMetric, TrackStreamOptions } from '@libp2p/interface-metrics' import type { ComponentMetricsUpdate, Metrics, Stats, TrackedMetric, TrackStreamOptions } from '@libp2p/interface-metrics'
import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerId } from '@libp2p/interface-peer-id'
import type { Startable } from '@libp2p/interfaces/startable' import type { Startable } from '@libp2p/interfaces/startable'
import type { Duplex } from 'it-stream-types'
const initialCounters: ['dataReceived', 'dataSent'] = [ const initialCounters: ['dataReceived', 'dataSent'] = [
'dataReceived', 'dataReceived',
@ -263,11 +262,11 @@ export class DefaultMetrics implements Metrics, Startable {
* When the `PeerId` is known, `Metrics.updatePlaceholder` should be called * When the `PeerId` is known, `Metrics.updatePlaceholder` should be called
* with the placeholder string returned from here, and the known `PeerId`. * with the placeholder string returned from here, and the known `PeerId`.
*/ */
trackStream <T extends Duplex<Uint8Array>> (opts: TrackStreamOptions<T>): T { trackStream (opts: TrackStreamOptions): void {
const { stream, remotePeer, protocol } = opts const { stream, remotePeer, protocol } = opts
if (!this.running) { if (!this.running) {
return stream return
} }
const source = stream.source const source = stream.source
@ -275,7 +274,7 @@ export class DefaultMetrics implements Metrics, Startable {
remotePeer, remotePeer,
protocol, protocol,
direction: 'in', direction: 'in',
dataLength: chunk.length dataLength: chunk.byteLength
})) }))
const sink = stream.sink const sink = stream.sink
@ -287,14 +286,12 @@ export class DefaultMetrics implements Metrics, Startable {
remotePeer, remotePeer,
protocol, protocol,
direction: 'out', direction: 'out',
dataLength: chunk.length dataLength: chunk.byteLength
}) })
}), }),
sink sink
) )
} }
return stream
} }
} }

View File

@ -113,7 +113,7 @@ export class PingService implements Startable {
) )
const end = Date.now() const end = Date.now()
if (result == null || !uint8ArrayEquals(data, result)) { if (result == null || !uint8ArrayEquals(data, result.subarray())) {
throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK) throw errCode(new Error('Received wrong ping ack'), codes.ERR_WRONG_PING_ACK)
} }

View File

@ -13,6 +13,7 @@ import {
import { handshake } from 'it-handshake' import { handshake } from 'it-handshake'
import { NONCE_LENGTH } from './key-generator.js' import { NONCE_LENGTH } from './key-generator.js'
import type { ConnectionProtector, MultiaddrConnection } from '@libp2p/interface-connection' import type { ConnectionProtector, MultiaddrConnection } from '@libp2p/interface-connection'
import map from 'it-map'
const log = logger('libp2p:pnet') const log = logger('libp2p:pnet')
@ -83,6 +84,7 @@ export class PreSharedKeyConnectionProtector implements ConnectionProtector {
// Encrypt all outbound traffic // Encrypt all outbound traffic
createBoxStream(localNonce, this.psk), createBoxStream(localNonce, this.psk),
shake.stream, shake.stream,
(source) => map(source, (buf) => buf.subarray()),
// Decrypt all inbound traffic // Decrypt all inbound traffic
createUnboxStream(remoteNonce, this.psk), createUnboxStream(remoteNonce, this.psk),
external external

View File

@ -1,6 +1,6 @@
import { logger } from '@libp2p/logger' import { logger } from '@libp2p/logger'
import errCode from 'err-code' import errCode from 'err-code'
import { Dialer, Listener } from '@libp2p/multistream-select' import * as mss from '@libp2p/multistream-select'
import { pipe } from 'it-pipe' import { pipe } from 'it-pipe'
// @ts-expect-error mutable-proxy does not export types // @ts-expect-error mutable-proxy does not export types
import mutableProxy from 'mutable-proxy' import mutableProxy from 'mutable-proxy'
@ -152,7 +152,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy())
const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}`
setPeer({ toString: () => idString }) setPeer({ toString: () => idString })
maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) metrics.trackStream({ stream: maConn, remotePeer: proxyPeer })
} }
log('starting the inbound connection upgrade') log('starting the inbound connection upgrade')
@ -253,7 +253,7 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy()) ({ setTarget: setPeer, proxy: proxyPeer } = mutableProxy())
const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}` const idString = `${(Math.random() * 1e9).toString(36)}${Date.now()}`
setPeer({ toB58String: () => idString }) setPeer({ toB58String: () => idString })
maConn = metrics.trackStream({ stream: maConn, remotePeer: proxyPeer }) metrics.trackStream({ stream: maConn, remotePeer: proxyPeer })
} }
log('Starting the outbound connection upgrade') log('Starting the outbound connection upgrade')
@ -351,9 +351,8 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
void Promise.resolve() void Promise.resolve()
.then(async () => { .then(async () => {
const mss = new Listener(muxedStream)
const protocols = this.components.getRegistrar().getProtocols() const protocols = this.components.getRegistrar().getProtocols()
const { stream, protocol } = await mss.handle(protocols) const { stream, protocol } = await mss.handle(muxedStream, protocols)
log('%s: incoming stream opened on %s', direction, protocol) log('%s: incoming stream opened on %s', direction, protocol)
const metrics = this.components.getMetrics() const metrics = this.components.getMetrics()
@ -405,7 +404,6 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
log('%s: starting new stream on %s', direction, protocols) log('%s: starting new stream on %s', direction, protocols)
const muxedStream = muxer.newStream() const muxedStream = muxer.newStream()
const mss = new Dialer(muxedStream)
const metrics = this.components.getMetrics() const metrics = this.components.getMetrics()
let controller: TimeoutController | undefined let controller: TimeoutController | undefined
@ -422,10 +420,10 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
} catch {} } catch {}
} }
let { stream, protocol } = await mss.select(protocols, options) const { stream, protocol } = await mss.select(muxedStream, protocols, options)
if (metrics != null) { if (metrics != null) {
stream = metrics.trackStream({ stream, remotePeer, protocol }) metrics.trackStream({ stream, remotePeer, protocol })
} }
const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.getRegistrar()) const outgoingLimit = findOutgoingStreamLimit(protocol, this.components.getRegistrar())
@ -545,12 +543,13 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
* Attempts to encrypt the incoming `connection` with the provided `cryptos` * Attempts to encrypt the incoming `connection` with the provided `cryptos`
*/ */
async _encryptInbound (connection: Duplex<Uint8Array>): Promise<CryptoResult> { async _encryptInbound (connection: Duplex<Uint8Array>): Promise<CryptoResult> {
const mss = new Listener(connection)
const protocols = Array.from(this.connectionEncryption.keys()) const protocols = Array.from(this.connectionEncryption.keys())
log('handling inbound crypto protocol selection', protocols) log('handling inbound crypto protocol selection', protocols)
try { try {
const { stream, protocol } = await mss.handle(protocols) const { stream, protocol } = await mss.handle(connection, protocols, {
writeBytes: true
})
const encrypter = this.connectionEncryption.get(protocol) const encrypter = this.connectionEncryption.get(protocol)
if (encrypter == null) { if (encrypter == null) {
@ -573,12 +572,13 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
* The first `ConnectionEncrypter` module to succeed will be used * The first `ConnectionEncrypter` module to succeed will be used
*/ */
async _encryptOutbound (connection: MultiaddrConnection, remotePeerId: PeerId): Promise<CryptoResult> { async _encryptOutbound (connection: MultiaddrConnection, remotePeerId: PeerId): Promise<CryptoResult> {
const mss = new Dialer(connection)
const protocols = Array.from(this.connectionEncryption.keys()) const protocols = Array.from(this.connectionEncryption.keys())
log('selecting outbound crypto protocol', protocols) log('selecting outbound crypto protocol', protocols)
try { try {
const { stream, protocol } = await mss.select(protocols) const { stream, protocol } = await mss.select(connection, protocols, {
writeBytes: true
})
const encrypter = this.connectionEncryption.get(protocol) const encrypter = this.connectionEncryption.get(protocol)
if (encrypter == null) { if (encrypter == null) {
@ -601,11 +601,12 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
* muxer will be used for all future streams on the connection. * muxer will be used for all future streams on the connection.
*/ */
async _multiplexOutbound (connection: MultiaddrConnection, muxers: Map<string, StreamMuxerFactory>): Promise<{ stream: Duplex<Uint8Array>, muxerFactory?: StreamMuxerFactory}> { async _multiplexOutbound (connection: MultiaddrConnection, muxers: Map<string, StreamMuxerFactory>): Promise<{ stream: Duplex<Uint8Array>, muxerFactory?: StreamMuxerFactory}> {
const dialer = new Dialer(connection)
const protocols = Array.from(muxers.keys()) const protocols = Array.from(muxers.keys())
log('outbound selecting muxer %s', protocols) log('outbound selecting muxer %s', protocols)
try { try {
const { stream, protocol } = await dialer.select(protocols) const { stream, protocol } = await mss.select(connection, protocols, {
writeBytes: true
})
log('%s selected as muxer protocol', protocol) log('%s selected as muxer protocol', protocol)
const muxerFactory = muxers.get(protocol) const muxerFactory = muxers.get(protocol)
return { stream, muxerFactory } return { stream, muxerFactory }
@ -620,11 +621,12 @@ export class DefaultUpgrader extends EventEmitter<UpgraderEvents> implements Upg
* selected muxer will be used for all future streams on the connection. * selected muxer will be used for all future streams on the connection.
*/ */
async _multiplexInbound (connection: MultiaddrConnection, muxers: Map<string, StreamMuxerFactory>): Promise<{ stream: Duplex<Uint8Array>, muxerFactory?: StreamMuxerFactory}> { async _multiplexInbound (connection: MultiaddrConnection, muxers: Map<string, StreamMuxerFactory>): Promise<{ stream: Duplex<Uint8Array>, muxerFactory?: StreamMuxerFactory}> {
const listener = new Listener(connection)
const protocols = Array.from(muxers.keys()) const protocols = Array.from(muxers.keys())
log('inbound handling muxers %s', protocols) log('inbound handling muxers %s', protocols)
try { try {
const { stream, protocol } = await listener.handle(protocols) const { stream, protocol } = await mss.handle(connection, protocols, {
writeBytes: true
})
const muxerFactory = muxers.get(protocol) const muxerFactory = muxers.get(protocol)
return { stream, muxerFactory } return { stream, muxerFactory }
} catch (err: any) { } catch (err: any) {

View File

@ -10,7 +10,6 @@ import type { Libp2pInit, Libp2pOptions } from '../../src/index.js'
import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerId } from '@libp2p/interface-peer-id'
import * as cborg from 'cborg' import * as cborg from 'cborg'
import { peerIdFromString } from '@libp2p/peer-id' import { peerIdFromString } from '@libp2p/peer-id'
import { Uint8ArrayList } from 'uint8arraylist'
const relayAddr = MULTIADDRS_WEBSOCKETS[0] const relayAddr = MULTIADDRS_WEBSOCKETS[0]
@ -33,16 +32,16 @@ class MockPubSub extends PubSubBaseProtocol {
return cborg.decode(bytes) return cborg.decode(bytes)
} }
encodeRpc (rpc: PubSubRPC): Uint8ArrayList { encodeRpc (rpc: PubSubRPC): Uint8Array {
return new Uint8ArrayList(cborg.encode(rpc)) return cborg.encode(rpc)
} }
decodeMessage (bytes: Uint8Array): PubSubRPCMessage { decodeMessage (bytes: Uint8Array): PubSubRPCMessage {
return cborg.decode(bytes) return cborg.decode(bytes)
} }
encodeMessage (rpc: PubSubRPCMessage): Uint8ArrayList { encodeMessage (rpc: PubSubRPCMessage): Uint8Array {
return new Uint8ArrayList(cborg.encode(rpc)) return cborg.encode(rpc)
} }
async publishMessage (from: PeerId, message: Message): Promise<PublishResult> { async publishMessage (from: PeerId, message: Message): Promise<PublishResult> {

View File

@ -1,7 +1,7 @@
/* eslint-env mocha */ /* eslint-env mocha */
import { WebSockets } from '@libp2p/websockets' import { WebSockets } from '@libp2p/websockets'
import { NOISE } from '@chainsafe/libp2p-noise' import { Plaintext } from '../../src/insecure/index.js'
import { createPeerId } from '../utils/creators/peer.js' import { createPeerId } from '../utils/creators/peer.js'
import { Multiaddr } from '@multiformats/multiaddr' import { Multiaddr } from '@multiformats/multiaddr'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
@ -18,7 +18,7 @@ describe('Consume peer record', () => {
new WebSockets() new WebSockets()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
} }
libp2p = await createLibp2pNode(config) libp2p = await createLibp2pNode(config)

View File

@ -2,7 +2,7 @@
import { expect } from 'aegir/chai' import { expect } from 'aegir/chai'
import { WebSockets } from '@libp2p/websockets' import { WebSockets } from '@libp2p/websockets'
import { NOISE } from '@chainsafe/libp2p-noise' import { Plaintext } from '../../src/insecure/index.js'
import { createLibp2p, Libp2pOptions } from '../../src/index.js' import { createLibp2p, Libp2pOptions } from '../../src/index.js'
import { codes as ErrorCodes } from '../../src/errors.js' import { codes as ErrorCodes } from '../../src/errors.js'
import { createPeerId } from '../utils/creators/peer.js' import { createPeerId } from '../utils/creators/peer.js'
@ -46,7 +46,7 @@ describe('Connection encryption configuration', () => {
new WebSockets() new WebSockets()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
} }
await createLibp2p(config) await createLibp2p(config)

View File

@ -2,7 +2,7 @@
import { expect } from 'aegir/chai' import { expect } from 'aegir/chai'
import { WebSockets } from '@libp2p/websockets' import { WebSockets } from '@libp2p/websockets'
import { NOISE } from '@chainsafe/libp2p-noise' import { Plaintext } from '../../src/insecure/index.js'
import { createPeerId } from '../utils/creators/peer.js' import { createPeerId } from '../utils/creators/peer.js'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
import type { Libp2pOptions } from '../../src/index.js' import type { Libp2pOptions } from '../../src/index.js'
@ -20,7 +20,7 @@ describe('getPublicKey', () => {
new WebSockets() new WebSockets()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
], ],
dht: new KadDHT() dht: new KadDHT()
} }

View File

@ -2,7 +2,7 @@
import { expect } from 'aegir/chai' import { expect } from 'aegir/chai'
import { TCP } from '@libp2p/tcp' import { TCP } from '@libp2p/tcp'
import { NOISE } from '@chainsafe/libp2p-noise' import { Plaintext } from '../../src/insecure/index.js'
import { createPeerId } from '../utils/creators/peer.js' import { createPeerId } from '../utils/creators/peer.js'
import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerId } from '@libp2p/interface-peer-id'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
@ -31,7 +31,7 @@ describe('Listening', () => {
new TCP() new TCP()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })

View File

@ -4,7 +4,7 @@ import { expect } from 'aegir/chai'
import sinon from 'sinon' import sinon from 'sinon'
import { TCP } from '@libp2p/tcp' import { TCP } from '@libp2p/tcp'
import { Mplex } from '@libp2p/mplex' import { Mplex } from '@libp2p/mplex'
import { NOISE } from '@chainsafe/libp2p-noise' import { Plaintext } from '../../src/insecure/index.js'
import { Multiaddr } from '@multiformats/multiaddr' import { Multiaddr } from '@multiformats/multiaddr'
import delay from 'delay' import delay from 'delay'
@ -245,7 +245,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
await remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => { await remoteLibp2p.handle('/echo/1.0.0', ({ stream }) => {
@ -278,7 +278,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -298,7 +298,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -325,7 +325,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -354,7 +354,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -418,7 +418,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -444,7 +444,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -467,7 +467,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
], ],
connectionProtector: new PreSharedKeyConnectionProtector({ connectionProtector: new PreSharedKeyConnectionProtector({
psk: swarmKeyBuffer psk: swarmKeyBuffer
@ -505,7 +505,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -541,7 +541,7 @@ describe('libp2p.dialer (direct, TCP)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })

View File

@ -7,7 +7,7 @@ import delay from 'delay'
import { WebSockets } from '@libp2p/websockets' import { WebSockets } from '@libp2p/websockets'
import * as filters from '@libp2p/websockets/filters' import * as filters from '@libp2p/websockets/filters'
import { Mplex } from '@libp2p/mplex' import { Mplex } from '@libp2p/mplex'
import { NOISE } from '@chainsafe/libp2p-noise' import { Plaintext } from '../../src/insecure/index.js'
import { Multiaddr } from '@multiformats/multiaddr' import { Multiaddr } from '@multiformats/multiaddr'
import { AbortError } from '@libp2p/interfaces/errors' import { AbortError } from '@libp2p/interfaces/errors'
import { MemoryDatastore } from 'datastore-core/memory' import { MemoryDatastore } from 'datastore-core/memory'
@ -360,7 +360,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -385,7 +385,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
], ],
connectionManager: { connectionManager: {
maxParallelDials: 10, maxParallelDials: 10,
@ -416,7 +416,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -450,7 +450,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -490,7 +490,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -518,7 +518,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -539,7 +539,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -576,7 +576,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -602,7 +602,7 @@ describe('libp2p.dialer (direct, WebSockets)', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })

View File

@ -4,7 +4,7 @@ import { expect } from 'aegir/chai'
import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js' import { createLibp2pNode, Libp2pNode } from '../../src/libp2p.js'
import { TCP } from '@libp2p/tcp' import { TCP } from '@libp2p/tcp'
import { Mplex } from '@libp2p/mplex' import { Mplex } from '@libp2p/mplex'
import { NOISE } from '@chainsafe/libp2p-noise' import { Plaintext } from '../../src/insecure/index.js'
import { createPeerId } from '../utils/creators/peer.js' import { createPeerId } from '../utils/creators/peer.js'
import { codes } from '../../src/errors.js' import { codes } from '../../src/errors.js'
import type { PeerId } from '@libp2p/interface-peer-id' import type { PeerId } from '@libp2p/interface-peer-id'
@ -22,7 +22,7 @@ async function createNode (peerId: PeerId) {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
} }

View File

@ -13,6 +13,7 @@ import type { Libp2pOptions } from '../../src/index.js'
import type { DefaultMetrics } from '../../src/metrics/index.js' import type { DefaultMetrics } from '../../src/metrics/index.js'
import pWaitFor from 'p-wait-for' import pWaitFor from 'p-wait-for'
import drain from 'it-drain' import drain from 'it-drain'
import map from 'it-map'
describe('libp2p.metrics', () => { describe('libp2p.metrics', () => {
let libp2p: Libp2pNode let libp2p: Libp2pNode
@ -97,7 +98,7 @@ describe('libp2p.metrics', () => {
const result = await pipe( const result = await pipe(
[bytes], [bytes],
stream, stream,
async (source) => await toBuffer(source) async (source) => await toBuffer(map(source, (list) => list.subarray()))
) )
// Flush the call stack // Flush the call stack

View File

@ -5,7 +5,7 @@ import sinon from 'sinon'
import { Multiaddr } from '@multiformats/multiaddr' import { Multiaddr } from '@multiformats/multiaddr'
import { WebSockets } from '@libp2p/websockets' import { WebSockets } from '@libp2p/websockets'
import * as filters from '@libp2p/websockets/filters' import * as filters from '@libp2p/websockets/filters'
import { NOISE } from '@chainsafe/libp2p-noise' import { Plaintext } from '../../src/insecure/index.js'
import { DefaultAddressManager } from '../../src/address-manager/index.js' import { DefaultAddressManager } from '../../src/address-manager/index.js'
import { DefaultTransportManager, FaultTolerance } from '../../src/transport-manager.js' import { DefaultTransportManager, FaultTolerance } from '../../src/transport-manager.js'
import { mockUpgrader } from '@libp2p/interface-mocks' import { mockUpgrader } from '@libp2p/interface-mocks'
@ -109,7 +109,7 @@ describe('libp2p.transportManager (dial only)', () => {
listen: ['/ip4/127.0.0.1/tcp/0'] listen: ['/ip4/127.0.0.1/tcp/0']
}, },
transports: [new WebSockets()], transports: [new WebSockets()],
connectionEncryption: [NOISE] connectionEncryption: [new Plaintext()]
}) })
await expect(libp2p.start()).to.eventually.be.rejected await expect(libp2p.start()).to.eventually.be.rejected
@ -129,7 +129,7 @@ describe('libp2p.transportManager (dial only)', () => {
new WebSockets() new WebSockets()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
@ -149,7 +149,7 @@ describe('libp2p.transportManager (dial only)', () => {
new WebSockets() new WebSockets()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })

View File

@ -8,7 +8,6 @@ import { pipe } from 'it-pipe'
import all from 'it-all' import all from 'it-all'
import pSettle from 'p-settle' import pSettle from 'p-settle'
import { WebSockets } from '@libp2p/websockets' import { WebSockets } from '@libp2p/websockets'
import { NOISE } from '@chainsafe/libp2p-noise'
import { PreSharedKeyConnectionProtector } from '../../src/pnet/index.js' import { PreSharedKeyConnectionProtector } from '../../src/pnet/index.js'
import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string' import { fromString as uint8ArrayFromString } from 'uint8arrays/from-string'
import swarmKey from '../fixtures/swarm.key.js' import swarmKey from '../fixtures/swarm.key.js'
@ -30,6 +29,7 @@ import { pEvent } from 'p-event'
import { TimeoutController } from 'timeout-abort-controller' import { TimeoutController } from 'timeout-abort-controller'
import delay from 'delay' import delay from 'delay'
import drain from 'it-drain' import drain from 'it-drain'
import { Uint8ArrayList } from 'uint8arraylist'
const addrs = [ const addrs = [
new Multiaddr('/ip4/127.0.0.1/tcp/0'), new Multiaddr('/ip4/127.0.0.1/tcp/0'),
@ -409,7 +409,7 @@ describe('Upgrader', () => {
source: (async function * () { source: (async function * () {
// longer than the timeout // longer than the timeout
await delay(1000) await delay(1000)
yield new Uint8Array() yield new Uint8ArrayList()
}()), }()),
sink: drain sink: drain
}) })
@ -479,7 +479,7 @@ describe('libp2p.upgrader', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
], ],
connectionProtector: new PreSharedKeyConnectionProtector({ connectionProtector: new PreSharedKeyConnectionProtector({
psk: uint8ArrayFromString(swarmKey) psk: uint8ArrayFromString(swarmKey)
@ -501,7 +501,7 @@ describe('libp2p.upgrader', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
await libp2p.start() await libp2p.start()
@ -517,7 +517,7 @@ describe('libp2p.upgrader', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
await remoteLibp2p.start() await remoteLibp2p.start()
@ -548,7 +548,7 @@ describe('libp2p.upgrader', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
await libp2p.start() await libp2p.start()
@ -562,7 +562,7 @@ describe('libp2p.upgrader', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
await remoteLibp2p.start() await remoteLibp2p.start()
@ -607,7 +607,7 @@ describe('libp2p.upgrader', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
await libp2p.start() await libp2p.start()
@ -621,7 +621,7 @@ describe('libp2p.upgrader', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
await remoteLibp2p.start() await remoteLibp2p.start()
@ -669,7 +669,7 @@ describe('libp2p.upgrader', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
await libp2p.start() await libp2p.start()
@ -683,7 +683,7 @@ describe('libp2p.upgrader', () => {
new Mplex() new Mplex()
], ],
connectionEncryption: [ connectionEncryption: [
NOISE new Plaintext()
] ]
}) })
await remoteLibp2p.start() await remoteLibp2p.start()