187 lines
5.9 KiB
TypeScript
Raw Normal View History

2020-05-14 15:20:39 +03:00
/*
2020-05-14 17:30:17 +03:00
* Copyright 2020 Fluence Labs Limited
2020-05-14 15:20:39 +03:00
*
2020-05-14 17:30:17 +03:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
2020-05-14 15:20:39 +03:00
*
2020-05-14 17:30:17 +03:00
* http://www.apache.org/licenses/LICENSE-2.0
2020-05-14 15:20:39 +03:00
*
2020-05-14 17:30:17 +03:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
2020-05-14 15:20:39 +03:00
*/
import { PeerIdB58 } from '@fluencelabs/interfaces';
import { FluenceConnection, ParticleHandler } from '../interfaces/index.js';
import { pipe } from 'it-pipe';
import { encode, decode } from 'it-length-prefixed';
import type { PeerId } from '@libp2p/interface-peer-id';
import { createLibp2p, Libp2p } from 'libp2p';
import { noise } from '@chainsafe/libp2p-noise';
import { mplex } from '@libp2p/mplex';
import { webSockets } from '@libp2p/websockets';
import { all } from '@libp2p/websockets/filters';
import { multiaddr } from '@multiformats/multiaddr';
import type { MultiaddrInput, Multiaddr } from '@multiformats/multiaddr';
import type { Connection } from '@libp2p/interface-connection';
import map from 'it-map';
import { fromString } from 'uint8arrays/from-string';
import { toString } from 'uint8arrays/to-string';
import { logger } from '../util/logger.js';
const log = logger('connection');
2020-05-14 15:20:39 +03:00
2021-08-24 17:37:03 +03:00
export const PROTOCOL_NAME = '/fluence/particle/2.0.0';
2020-05-14 15:20:39 +03:00
/**
* Options to configure fluence connection
*/
export interface FluenceConnectionOptions {
/**
* Peer id of the Fluence Peer
*/
peerId: PeerId;
/**
* Multiaddress of the relay to make connection to
*/
relayAddress: MultiaddrInput;
/**
* The dialing timeout in milliseconds
*/
dialTimeoutMs?: number;
}
/**
* Implementation for JS peers which connects to Fluence through relay node
*/
export class RelayConnection extends FluenceConnection {
constructor(
public peerId: PeerIdB58,
private _lib2p2Peer: Libp2p,
private _relayAddress: Multiaddr,
public readonly relayPeerId: PeerIdB58,
) {
super();
}
2021-04-13 15:11:52 +03:00
private _connection?: Connection;
2021-04-13 15:11:52 +03:00
static async createConnection(options: FluenceConnectionOptions): Promise<RelayConnection> {
const lib2p2Peer = await createLibp2p({
peerId: options.peerId,
transports: [
webSockets({
filter: all,
}),
],
streamMuxers: [mplex()],
connectionEncryption: [noise()],
2020-05-14 15:20:39 +03:00
});
const relayMultiaddr = multiaddr(options.relayAddress);
const relayPeerId = relayMultiaddr.getPeerId();
if (relayPeerId === null) {
throw new Error('Specified multiaddr is invalid or missing peer id: ' + options.relayAddress);
}
2020-05-14 15:20:39 +03:00
return new RelayConnection(
// force new line
options.peerId.toString(),
lib2p2Peer,
relayMultiaddr,
relayPeerId,
);
2020-05-14 15:20:39 +03:00
}
async disconnect() {
await this._lib2p2Peer.unhandle(PROTOCOL_NAME);
await this._lib2p2Peer.stop();
2020-05-14 15:20:39 +03:00
}
async sendParticle(nextPeerIds: PeerIdB58[], particle: string): Promise<void> {
if (nextPeerIds.length !== 1 && nextPeerIds[0] !== this.relayPeerId) {
throw new Error(
`Relay connection only accepts peer id of the connected relay. Got: ${JSON.stringify(
nextPeerIds,
)} instead.`,
);
}
/*
TODO:: find out why this doesn't work and a new connection has to be established each time
if (this._connection.streams.length !== 1) {
throw new Error('Incorrect number of streams in FluenceConnection');
}
2020-09-28 17:01:49 +03:00
const sink = this._connection.streams[0].sink;
*/
2020-05-14 15:20:39 +03:00
const stream = await this._lib2p2Peer.dialProtocol(this._relayAddress, PROTOCOL_NAME);
const sink = stream.sink;
2020-05-14 15:20:39 +03:00
pipe(
[fromString(particle)],
// @ts-ignore
2020-05-14 15:20:39 +03:00
encode(),
sink,
2020-05-14 15:20:39 +03:00
);
}
async connect(onIncomingParticle: ParticleHandler) {
await this._lib2p2Peer.start();
// TODO: make it configurable
const handleOptions = {
maxInboundStreams: 1024,
maxOutboundStreams: 1024,
};
this._lib2p2Peer.handle(
[PROTOCOL_NAME],
async ({ connection, stream }) => {
pipe(
stream.source,
// @ts-ignore
decode(),
// @ts-ignore
(source) => map(source, (buf) => toString(buf.subarray())),
async (source) => {
try {
for await (const msg of source) {
try {
onIncomingParticle(msg);
} catch (e) {
log.error('error on handling a new incoming message: %j', e);
}
}
} catch (e) {
log.error('connection closed: %j', e);
}
},
);
},
handleOptions,
);
log.debug("dialing to the node with client's address: %s", this._lib2p2Peer.peerId.toString());
try {
this._connection = await this._lib2p2Peer.dial(this._relayAddress);
} catch (e: any) {
if (e.name === 'AggregateError' && e._errors?.length === 1) {
const error = e._errors[0];
throw new Error(`Error dialing node ${this._relayAddress}:\n${error.code}\n${error.message}`);
} else {
throw e;
}
}
}
2020-06-19 14:29:06 +03:00
}