Integrate Marine JS into Fluence peer (#149)

This commit is contained in:
Pavel
2022-04-21 14:13:26 +03:00
committed by GitHub
parent 945c8f1bce
commit 954c20e2c0
29 changed files with 6378 additions and 1307 deletions

View File

@ -24,11 +24,13 @@ import { throwIfNotSupported, dataToString, jsonify } from './utils';
import { concatMap, filter, pipe, Subject, tap } from 'rxjs';
import log from 'loglevel';
import { builtInServices } from './builtins/common';
import { AvmRunner, InterpreterResult, LogLevel } from '@fluencelabs/avm-runner-interface';
import { AvmRunnerBackground } from '@fluencelabs/avm-runner-background';
import { defaultSigGuard, Sig } from './builtins/Sig';
import { registerSig } from './_aqua/services';
import Buffer from './Buffer';
import { FluenceAppService, loadDefaults, loadWasmFromFileSystem, loadWasmFromServer } from '@fluencelabs/marine-js';
import { AVM, AvmRunner } from './avm';
import { isBrowser, isNode } from 'browser-or-node';
import { InterpreterResult, LogLevel } from '@fluencelabs/avm';
/**
* Node of the Fluence network specified as a pair of node's multiaddr and it's peer id
@ -98,10 +100,34 @@ export interface PeerConfig {
defaultTtlMs?: number;
/**
* Plugable AVM runner implementation. If not specified AvmBackgroundRunner will be used
* @deprecated. AVM run through marine-js infrastructure.
* @see marineJS option to configure AVM
*/
avmRunner?: AvmRunner;
/**
* This option allows to specify the location of various dependencies needed for marine-js.
* Each key specifies the location of the corresponding dependency.
* If Fluence peer is started inside browser the location is treated as the path to the file relative to origin.
* IF Fluence peer is started in nodejs the location is treated as the full path to file on the file system.
*/
marineJS?: {
/**
* Configures path to the marine-js worker script.
*/
workerScriptPath: string;
/**
* Configures the path to marine-js control wasm module
*/
marineWasmPath: string;
/**
* Configures the path to AVM wasm module
*/
avmWasmPath: string;
};
/**
* Enables\disabled various debugging features
*/
@ -199,7 +225,13 @@ export class FluencePeer {
? config?.defaultTtlMs
: DEFAULT_TTL;
this._avmRunner = config?.avmRunner || new AvmRunnerBackground();
this._fluenceAppService = new FluenceAppService(config?.marineJS?.workerScriptPath);
const marineDeps = config?.marineJS
? await loadMarineAndAvm(config.marineJS.marineWasmPath, config.marineJS.avmWasmPath)
: await loadDefaults();
await this._fluenceAppService.init(marineDeps.marine);
await this._fluenceAppService.createService(marineDeps.avm, 'avm');
this._avmRunner = config?.avmRunner || new AVM(this._fluenceAppService);
await this._avmRunner.init(config?.avmLogLevel || 'off');
if (config?.connectTo) {
@ -245,6 +277,32 @@ export class FluencePeer {
};
}
/**
* Registers marine service within the Fluence peer from wasm file.
* Following helper functions can be used to load wasm files:
* * loadWasmFromFileSystem
* * loadWasmFromNpmPackage
* * loadWasmFromServer
* @param wasm - buffer with the wasm file for service
* @param serviceId - the service id by which the service can be accessed in aqua
*/
async registerMarineService(wasm: SharedArrayBuffer | Buffer, serviceId: string): Promise<void> {
if (this._containsService(serviceId)) {
throw new Error(`Service with '${serviceId}' id already exists`);
}
await this._fluenceAppService.createService(wasm, serviceId);
this._marineServices.add(serviceId);
}
/**
* Removes the specified marine service from the Fluence peer
* @param serviceId - the service id to remove
*/
removeMarineService(serviceId: string): void {
this._marineServices.delete(serviceId);
}
/**
* Un-initializes the peer: stops all the underlying workflows, stops the Aqua VM
* and disconnects from the Fluence network
@ -255,10 +313,13 @@ export class FluencePeer {
this._stopParticleProcessing();
await this._disconnect();
await this._avmRunner?.terminate();
await this._fluenceAppService?.terminate();
this._avmRunner = undefined;
this._fluenceAppService = undefined;
this._particleSpecificHandlers.clear();
this._commonHandlers.clear();
this._marineServices.clear();
}
// internal api
@ -357,6 +418,7 @@ export class FluencePeer {
// Call service handler
private _marineServices = new Set<string>();
private _particleSpecificHandlers = new Map<string, Map<string, GenericCallServiceHandler>>();
private _commonHandlers = new Map<string, GenericCallServiceHandler>();
@ -364,6 +426,10 @@ export class FluencePeer {
sig: Sig;
};
private _containsService(serviceId: string): boolean {
return this._marineServices.has(serviceId) || this._commonHandlers.has(serviceId);
}
// Internal peer state
private _printParticleId: boolean = false;
@ -371,7 +437,12 @@ export class FluencePeer {
private _relayPeerId: PeerIdB58 | null = null;
private _keyPair: KeyPair;
private _connection: FluenceConnection;
/**
* @deprecated. AVM run through marine-js infrastructure. This field is needed for backward compatibility with the previous API
*/
private _avmRunner: AvmRunner;
private _fluenceAppService: FluenceAppService;
private _timeouts: Array<NodeJS.Timeout> = [];
private _particleQueues = new Map<string, Subject<ParticleQueueItem>>();
@ -534,6 +605,32 @@ export class FluencePeer {
log.debug('executing call service handler', jsonify(req));
const particleId = req.particleContext.particleId;
if (this._marineServices.has(req.serviceId)) {
const args = JSON.stringify(req.args);
const rawResult = await this._fluenceAppService.callService(req.serviceId, req.fnName, args, undefined);
try {
const result = JSON.parse(rawResult);
if (typeof result.error === 'string' && result.error.length > 0) {
return {
retCode: ResultCodes.error,
result: result.error,
};
}
if (!result.result) {
throw 'Call to marine-js returned no error and empty result. Original request: ' + jsonify(req);
}
return {
retCode: ResultCodes.success,
result: result.result,
};
} catch (e) {
throw 'Call to marine-js. Result parsing error: ' + e + ', original text: ' + rawResult;
}
}
const key = serviceFnKey(req.serviceId, req.fnName);
const psh = this._particleSpecificHandlers.get(particleId);
let handler: GenericCallServiceHandler;
@ -637,3 +734,35 @@ function filterExpiredParticles(onParticleExpiration: (item: ParticleQueueItem)
filter((x: ParticleQueueItem) => !x.particle.hasExpired()),
);
}
async function loadMarineAndAvm(
marinePath: string,
avmPath: string,
): Promise<{
marine: SharedArrayBuffer | Buffer;
avm: SharedArrayBuffer | Buffer;
}> {
let promises: [Promise<SharedArrayBuffer | Buffer>, Promise<SharedArrayBuffer | Buffer>];
// check if we are running inside the browser and instantiate worker with the corresponding script
if (isBrowser) {
promises = [
// force new line
loadWasmFromServer(marinePath),
loadWasmFromServer(avmPath),
];
} else if (isNode) {
promises = [
// force new line
loadWasmFromFileSystem(marinePath),
loadWasmFromFileSystem(avmPath),
];
} else {
throw new Error('Unknown environment');
}
const [marine, avm] = await Promise.all(promises);
return {
marine,
avm,
};
}