Ephemeral networks core implementation (#160)

This commit is contained in:
Pavel
2022-08-05 16:43:19 +03:00
committed by GitHub
parent 83c587af34
commit fb50ec271c
31 changed files with 3314 additions and 5774 deletions

View File

@ -14,10 +14,10 @@
* limitations under the License.
*/
import { Multiaddr } from 'multiaddr';
import type { MultiaddrInput } from 'multiaddr';
import { CallServiceData, CallServiceResult, GenericCallServiceHandler, ResultCodes } from './commonTypes';
import { PeerIdB58 } from './commonTypes';
import { FluenceConnection } from './FluenceConnection';
import { RelayConnection, FluenceConnection } from './FluenceConnection';
import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle';
import { KeyPair } from './KeyPair';
import { throwIfNotSupported, dataToString, jsonify, MarineLoglevel, marineLogLevelToEnvs } from './utils';
@ -42,6 +42,8 @@ type Node = {
const DEFAULT_TTL = 7000;
export type ConnectionOption = string | MultiaddrInput | Node;
/**
* Configuration used when initiating Fluence Peer
*/
@ -52,9 +54,10 @@ export interface PeerConfig {
* - string: multiaddr in string format
* - Multiaddr: multiaddr object, @see https://github.com/multiformats/js-multiaddr
* - Node: node structure, @see Node
* - Implementation of FluenceConnection class, @see FluenceConnection
* If not specified the will work locally and would not be able to send or receive particles.
*/
connectTo?: string | Multiaddr | Node;
connectTo?: ConnectionOption;
/**
* @deprecated. AVM run through marine-js infrastructure.
@ -141,7 +144,13 @@ export interface PeerConfig {
}
/**
* Information about Fluence Peer connection
* Information about Fluence Peer connection.
* Represented as object with the following keys:
* - `isInitialized`: Is the peer initialized or not.
* - `peerId`: Peer Id of the peer. Null if the peer is not initialized
* - `isConnected`: Is the peer connected to network or not
* - `relayPeerId`: Peer Id of the relay the peer is connected to. If the connection is direct relayPeerId is null
* - `isDirect`: True if the peer is connected to the network directly (not through relay)
*/
export type PeerStatus =
| {
@ -161,6 +170,13 @@ export type PeerStatus =
peerId: PeerIdB58;
isConnected: true;
relayPeerId: PeerIdB58;
}
| {
isInitialized: true;
peerId: PeerIdB58;
isConnected: true;
isDirect: true;
relayPeerId: null;
};
/**
@ -191,7 +207,7 @@ export class FluencePeer {
};
}
if (this._connection === undefined || this._relayPeerId === null) {
if (this._connection === undefined) {
return {
isInitialized: true,
peerId: this._keyPair.Libp2pPeerId.toB58String(),
@ -200,11 +216,21 @@ export class FluencePeer {
};
}
if (this._connection.relayPeerId === null) {
return {
isInitialized: true,
peerId: this._keyPair.Libp2pPeerId.toB58String(),
isConnected: true,
isDirect: true,
relayPeerId: null,
};
}
return {
isInitialized: true,
peerId: this._keyPair.Libp2pPeerId.toB58String(),
isConnected: true,
relayPeerId: this._relayPeerId,
relayPeerId: this._connection.relayPeerId,
};
}
@ -213,73 +239,17 @@ export class FluencePeer {
* and (optionally) connect to the Fluence network
* @param config - object specifying peer configuration
*/
async start(config?: PeerConfig): Promise<void> {
async start(config: PeerConfig = {}): Promise<void> {
throwIfNotSupported();
const keyPair = config.KeyPair ?? (await KeyPair.randomEd25519());
const newConfig = { ...config, KeyPair: keyPair };
const keyPair = config?.KeyPair ?? (await KeyPair.randomEd25519());
this._keyPair = keyPair;
await this.init(newConfig);
const peerId = keyPair.Libp2pPeerId.toB58String();
if (config?.debug?.printParticleId) {
this._printParticleId = true;
const conn = await configToConnection(newConfig.KeyPair, config?.connectTo, config?.dialTimeoutMs);
if (conn !== null) {
await this.connect(conn);
}
this._defaultTTL = config?.defaultTtlMs ?? DEFAULT_TTL;
if (config?.debug?.marineLogLevel) {
this._marineLogLevel = config.debug.marineLogLevel;
}
this._fluenceAppService = new FluenceAppService(config?.marineJS?.workerScriptPath);
const marineDeps = config?.marineJS
? await loadMarineAndAvm(config.marineJS.marineWasmPath, config.marineJS.avmWasmPath)
: await loadDefaults();
await this._fluenceAppService.init(marineDeps.marine);
await this._fluenceAppService.createService(
marineDeps.avm,
'avm',
undefined,
marineLogLevelToEnvs(this._marineLogLevel),
);
this._avmRunner = config?.avmRunner || new AVM(this._fluenceAppService);
await this._avmRunner.init(config?.avmLogLevel || 'off');
if (config?.connectTo) {
let connectToMultiAddr: Multiaddr;
const fromNode = (config.connectTo as any).multiaddr;
if (fromNode) {
connectToMultiAddr = new Multiaddr(fromNode);
} else {
connectToMultiAddr = new Multiaddr(config.connectTo as string);
}
this._relayPeerId = connectToMultiAddr.getPeerId();
if (this._connection) {
await this._connection.disconnect();
}
this._connection = await FluenceConnection.createConnection({
peerId: this._keyPair.Libp2pPeerId,
relayAddress: connectToMultiAddr,
dialTimeoutMs: config.dialTimeoutMs,
onIncomingParticle: (p) => this._incomingParticles.next({ particle: p, onStageChange: () => {} }),
});
await this._connect();
}
registerDefaultServices(this);
this._classServices = {
sig: new Sig(this._keyPair),
};
this._classServices.sig.securityGuard = defaultSigGuard(peerId);
registerSig(this, this._classServices.sig);
registerSig(this, peerId, this._classServices.sig);
this._startParticleProcessing();
}
getServices() {
@ -331,9 +301,8 @@ export class FluencePeer {
*/
async stop() {
this._keyPair = undefined; // This will set peer to non-initialized state and stop particle processing
this._relayPeerId = null;
this._stopParticleProcessing();
await this._disconnect();
await this.disconnect();
await this._avmRunner?.terminate();
await this._fluenceAppService?.terminate();
this._avmRunner = undefined;
@ -348,7 +317,7 @@ export class FluencePeer {
// internal api
/**
* Is not intended to be used manually. Subject to change
* @private Is not intended to be used manually. Subject to change
*/
get internals() {
return {
@ -453,22 +422,79 @@ export class FluencePeer {
};
}
/**
* @private Subject to change. Do not use this method directly
*/
async init(config: PeerConfig & Required<Pick<PeerConfig, 'KeyPair'>>) {
this._keyPair = config.KeyPair;
const peerId = this._keyPair.Libp2pPeerId.toB58String();
if (config?.debug?.printParticleId) {
this._printParticleId = true;
}
this._defaultTTL = config?.defaultTtlMs ?? DEFAULT_TTL;
if (config?.debug?.marineLogLevel) {
this._marineLogLevel = config.debug.marineLogLevel;
}
this._fluenceAppService = new FluenceAppService(config?.marineJS?.workerScriptPath);
const marineDeps = config?.marineJS
? await loadMarineAndAvm(config.marineJS.marineWasmPath, config.marineJS.avmWasmPath)
: await loadDefaults();
await this._fluenceAppService.init(marineDeps.marine);
await this._fluenceAppService.createService(
marineDeps.avm,
'avm',
undefined,
marineLogLevelToEnvs(this._marineLogLevel),
);
this._avmRunner = config?.avmRunner || new AVM(this._fluenceAppService);
await this._avmRunner.init(config?.avmLogLevel || 'off');
registerDefaultServices(this);
this._classServices = {
sig: new Sig(this._keyPair),
};
this._classServices.sig.securityGuard = defaultSigGuard(peerId);
registerSig(this, this._classServices.sig);
registerSig(this, peerId, this._classServices.sig);
this._startParticleProcessing();
}
/**
* @private Subject to change. Do not use this method directly
*/
async connect(connection: FluenceConnection): Promise<void> {
if (this._connection) {
await this._connection.disconnect();
}
this._connection = connection;
await this._connection.connect(this._onIncomingParticle.bind(this));
}
/**
* @private Subject to change. Do not use this method directly
*/
async disconnect(): Promise<void> {
if (this._connection) {
await this._connection.disconnect();
this._connection = undefined;
}
}
// private
// TODO:: make public when full connection\disconnection cycle is implemented properly
private _connect(): Promise<void> {
return this._connection?.connect() || Promise.resolve();
}
// TODO:: make public when full connection\disconnection cycle is implemented properly
private _disconnect(): Promise<void> {
return this._connection?.disconnect() || Promise.resolve();
}
// Queues for incoming and outgoing particles
private _incomingParticles = new Subject<ParticleQueueItem>();
private _outgoingParticles = new Subject<ParticleQueueItem>();
private _outgoingParticles = new Subject<ParticleQueueItem & { nextPeerIds: PeerIdB58[] }>();
// Call service handler
@ -489,7 +515,6 @@ export class FluencePeer {
private _printParticleId = false;
private _defaultTTL: number = DEFAULT_TTL;
private _relayPeerId: PeerIdB58 | null = null;
private _keyPair: KeyPair | undefined;
private _connection?: FluenceConnection;
@ -501,6 +526,11 @@ export class FluencePeer {
private _timeouts: Array<NodeJS.Timeout> = [];
private _particleQueues = new Map<string, Subject<ParticleQueueItem>>();
private _onIncomingParticle(p: string) {
const particle = Particle.fromString(p);
this._incomingParticles.next({ particle, onStageChange: () => {} });
}
private _startParticleProcessing() {
this._incomingParticles
.pipe(
@ -538,7 +568,8 @@ export class FluencePeer {
item.onStageChange({ stage: 'sendingError' });
return;
}
this._connection.sendParticle(item.particle).then(
item.particle.logTo('debug', 'sending particle:');
this._connection.sendParticle(item.nextPeerIds, item.particle.toString()).then(
() => {
item.onStageChange({ stage: 'sent' });
},
@ -612,7 +643,11 @@ export class FluencePeer {
if (item.result.nextPeerPks.length > 0) {
const newParticle = item.particle.clone();
newParticle.data = item.newData;
this._outgoingParticles.next({ ...item, particle: newParticle });
this._outgoingParticles.next({
...item,
particle: newParticle,
nextPeerIds: item.result.nextPeerPks,
});
}
// execute call requests if needed
@ -735,6 +770,38 @@ export class FluencePeer {
}
}
async function configToConnection(
keyPair: KeyPair,
connection?: ConnectionOption,
dialTimeoutMs?: number,
): Promise<FluenceConnection | null> {
if (!connection) {
return null;
}
if (connection instanceof FluenceConnection) {
return connection;
}
let connectToMultiAddr: MultiaddrInput;
// figuring out what was specified as input
const tmp = connection as any;
if (tmp.multiaddr !== undefined) {
// specified as FluenceNode (object with multiaddr and peerId props)
connectToMultiAddr = tmp.multiaddr;
} else {
// specified as MultiaddrInput
connectToMultiAddr = tmp;
}
const res = await RelayConnection.createConnection({
peerId: keyPair.Libp2pPeerId,
relayAddress: connectToMultiAddr,
dialTimeoutMs: dialTimeoutMs,
});
return res;
}
function isInterpretationSuccessful(result: InterpreterResult) {
return result.retCode === 0;
}