Integrate async AquaVM into fluence-js (#88)

This commit is contained in:
Pavel
2021-10-20 22:20:43 +03:00
committed by GitHub
parent 727d59fb61
commit fe52648103
35 changed files with 2758 additions and 1739 deletions

View File

@ -1,19 +1,42 @@
import { AirInterpreter, CallServiceResult, LogLevel, ParticleHandler, SecurityTetraplet } from '@fluencelabs/avm';
import log from 'loglevel';
/*
* Copyright 2021 Fluence Labs Limited
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import {
AirInterpreter,
CallRequestsArray,
CallResultsArray,
InterpreterResult,
LogLevel,
CallServiceResult as AvmCallServiceResult,
} from '@fluencelabs/avm';
import { Multiaddr } from 'multiaddr';
import PeerId from 'peer-id';
import { CallServiceHandler } from './CallServiceHandler';
import { CallServiceData, CallServiceResult, GenericCallServiceHandler, ResultCodes } from './commonTypes';
import { CallServiceHandler as LegacyCallServiceHandler } from './compilerSupport/LegacyCallServiceHandler';
import { PeerIdB58 } from './commonTypes';
import makeDefaultClientHandler from './defaultClientHandler';
import { FluenceConnection, FluenceConnectionOptions } from './FluenceConnection';
import { logParticle, Particle } from './particle';
import { FluenceConnection } from './FluenceConnection';
import { Particle } from './Particle';
import { KeyPair } from './KeyPair';
import { RequestFlow } from './RequestFlow';
import { loadRelayFn, loadVariablesService } from './RequestFlowBuilder';
import { createInterpreter } from './utils';
import { createInterpreter, dataToString } from './utils';
import { filter, pipe, Subject, tap } from 'rxjs';
import { RequestFlow } from './compilerSupport/v1';
import log from 'loglevel';
import { defaultServices } from './defaultServices';
/**
* Node of the Fluence detwork specified as a pair of node's multiaddr and it's peer id
* Node of the Fluence network specified as a pair of node's multiaddr and it's peer id
*/
type Node = {
peerId: PeerIdB58;
@ -26,6 +49,8 @@ type Node = {
*/
export type AvmLoglevel = LogLevel;
const DEFAULT_TTL = 7000;
/**
* Configuration used when initiating Fluence Peer
*/
@ -123,15 +148,11 @@ export class FluencePeer {
* Get the peer's status
*/
getStatus(): PeerStatus {
let isConnected = false;
if (this._connection) {
isConnected = this._connection?.isConnected();
}
const hasKeyPair = this._keyPair !== undefined;
return {
isInitialized: hasKeyPair,
isConnected: isConnected,
peerId: this._selfPeerId,
isConnected: this._connection !== undefined,
peerId: this._keyPair?.Libp2pPeerId?.toB58String() || null,
relayPeerId: this._relayPeerId || null,
};
}
@ -148,30 +169,52 @@ export class FluencePeer {
this._keyPair = await KeyPair.randomEd25519();
}
await this._initAirInterpreter(config?.avmLogLevel || 'off');
this._callServiceHandler = makeDefaultClientHandler();
this._interpreter = await createInterpreter(config?.avmLogLevel || 'off');
if (config?.connectTo) {
let theAddress: Multiaddr;
let connectToMultiAddr: Multiaddr;
let fromNode = (config.connectTo as any).multiaddr;
if (fromNode) {
theAddress = new Multiaddr(fromNode);
connectToMultiAddr = new Multiaddr(fromNode);
} else {
theAddress = new Multiaddr(config.connectTo as string);
connectToMultiAddr = new Multiaddr(config.connectTo as string);
}
await this._connect(theAddress);
this._relayPeerId = connectToMultiAddr.getPeerId();
if (this._connection) {
await this._connection.disconnect();
}
this._connection = await FluenceConnection.createConnection({
peerId: this._keyPair.Libp2pPeerId,
relayAddress: connectToMultiAddr,
dialTimeoutMs: config.dialTimeoutMs,
onIncomingParticle: (p) => this._incomingParticles.next(p),
});
await this._connect();
}
this._legacyCallServiceHandler = new LegacyCallServiceHandler();
registerDefaultServices(this);
this._startParticleProcessing();
}
/**
* Uninitializes the peer: stops all the underltying workflows, stops the Aqua VM
* Un-initializes the peer: stops all the underlying workflows, stops the Aqua VM
* and disconnects from the Fluence network
*/
async stop() {
this._stopParticleProcessing();
await this._disconnect();
this._callServiceHandler = null;
this._relayPeerId = null;
this._legacyCallServiceHandler = null;
this._particleSpecificHandlers.clear();
this._commonHandlers.clear();
this._timeoutHandlers.clear();
}
// internal api
@ -181,8 +224,80 @@ export class FluencePeer {
*/
get internals() {
return {
initiateFlow: this._initiateFlow.bind(this),
callServiceHandler: this._callServiceHandler,
/**
* Initiates a new particle execution starting from local peer
* @param particle - particle to start execution of
*/
initiateParticle: (particle: Particle): void => {
if (particle.initPeerId === undefined) {
particle.initPeerId = this.getStatus().peerId;
}
if (particle.ttl === undefined) {
particle.ttl = DEFAULT_TTL;
}
this._incomingParticles.next(particle);
},
/**
* Register Call Service handler functions
*/
regHandler: {
/**
* Register handler for all particles
*/
common: (
// force new line
serviceId: string,
fnName: string,
handler: GenericCallServiceHandler,
) => {
this._commonHandlers.set(serviceFnKey(serviceId, fnName), handler);
},
/**
* Register handler which will be called only for particle with the specific id
*/
forParticle: (
particleId: string,
serviceId: string,
fnName: string,
handler: GenericCallServiceHandler,
) => {
let psh = this._particleSpecificHandlers.get(particleId);
if (psh === undefined) {
psh = new Map<string, GenericCallServiceHandler>();
this._particleSpecificHandlers.set(particleId, psh);
}
psh.set(serviceFnKey(serviceId, fnName), handler);
},
/**
* Register handler which will be called upon particle timeout
*/
timeout: (particleId: string, handler: () => void) => {
this._timeoutHandlers.set(particleId, handler);
},
},
/**
* @deprecated
*/
initiateFlow: (request: RequestFlow): void => {
const particle = request.particle;
this._legacyParticleSpecificHandlers.set(particle.id, {
handler: request.handler,
error: request.error,
timeout: request.timeout,
});
this.internals.initiateParticle(particle);
},
/**
* @deprecated
*/
callServiceHandler: this._legacyCallServiceHandler,
};
}
@ -193,153 +308,274 @@ export class FluencePeer {
*/
private _isFluenceAwesome = true;
private async _initiateFlow(request: RequestFlow): Promise<void> {
// setting `relayVariableName` here. If the client is not connected (i.e it is created as local) then there is no relay
request.handler.on(loadVariablesService, loadRelayFn, () => {
return this._relayPeerId || '';
});
await request.initState(this._keyPair.Libp2pPeerId);
logParticle(log.debug, 'executing local particle', request.getParticle());
request.handler.combineWith(this._callServiceHandler);
this._requests.set(request.id, request);
this._processRequest(request);
}
private _callServiceHandler: CallServiceHandler;
private _keyPair: KeyPair;
private _requests: Map<string, RequestFlow> = new Map();
private _currentRequestId: string | null = null;
private _watchdog;
private _connection: FluenceConnection;
private _interpreter: AirInterpreter;
private async _initAirInterpreter(logLevel: AvmLoglevel): Promise<void> {
this._interpreter = await createInterpreter(this._interpreterCallback.bind(this), this._selfPeerId, logLevel);
}
private async _connect(multiaddr: Multiaddr, options?: FluenceConnectionOptions): Promise<void> {
const nodePeerId = multiaddr.getPeerId();
if (!nodePeerId) {
throw Error("'multiaddr' did not contain a valid peer id");
}
if (this._connection) {
await this._connection.disconnect();
}
const node = PeerId.createFromB58String(nodePeerId);
const connection = new FluenceConnection(
multiaddr,
node,
this._keyPair.Libp2pPeerId,
this._executeIncomingParticle.bind(this),
);
await connection.connect(options);
this._connection = connection;
this._initWatchDog();
// TODO:: make public when full connection\disconnection cycle is implemented properly
private async _connect(): Promise<void> {
return this._connection?.connect();
}
// TODO:: make public when full connection\disconnection cycle is implemented properly
private async _disconnect(): Promise<void> {
if (this._connection) {
await this._connection.disconnect();
return this._connection.disconnect();
}
this._clearWathcDog();
this._requests.forEach((r) => {
r.cancel();
}
// Queues for incoming and outgoing particles
private _incomingParticles = new Subject<Particle>();
private _outgoingParticles = new Subject<Particle>();
// Call service handler
private _particleSpecificHandlers = new Map<string, Map<string, GenericCallServiceHandler>>();
private _commonHandlers = new Map<string, GenericCallServiceHandler>();
private _timeoutHandlers = new Map<string, () => void>();
// Internal peer state
private _relayPeerId: PeerIdB58 | null = null;
private _keyPair: KeyPair;
private _connection: FluenceConnection;
private _interpreter: AirInterpreter;
private _timeouts: Array<NodeJS.Timeout> = [];
private _startParticleProcessing() {
const particleQueues = new Map<string, Subject<Particle>>();
this._incomingParticles
.pipe(
tap((x) => x.logTo('debug', 'particle received:')),
filterExpiredParticles(),
)
.subscribe((p) => {
let particlesQueue = particleQueues.get(p.id);
if (!particlesQueue) {
particlesQueue = this._createParticlesProcessingQueue();
particleQueues.set(p.id, particlesQueue);
const timeout = setTimeout(() => {
log.debug(`particle ${p.id} has expired. Deleting particle-related queues and handlers`);
particleQueues.delete(p.id);
const timeoutHandler = this._timeoutHandlers.get(p.id);
if (timeoutHandler) {
timeoutHandler();
}
this._particleSpecificHandlers.delete(p.id);
this._timeoutHandlers.delete(p.id);
}, p.actualTtl());
this._timeouts.push(timeout);
}
particlesQueue.next(p);
});
this._outgoingParticles.subscribe((p) => {
this._connection.sendParticle(p);
});
}
private get _selfPeerId(): PeerIdB58 | null {
return this._keyPair?.Libp2pPeerId?.toB58String() || null;
private _createParticlesProcessingQueue() {
let particlesQueue = new Subject<Particle>();
let prevData: Uint8Array = Buffer.from([]);
particlesQueue
.pipe(
// force new line
filterExpiredParticles(),
)
.subscribe((x) => {
const result = runInterpreter(this.getStatus().peerId, this._interpreter, x, prevData);
prevData = Buffer.from(result.data);
// send particle further if requested
if (result.nextPeerPks.length > 0) {
const newParticle = x.clone();
newParticle.data = prevData;
this._outgoingParticles.next(newParticle);
}
// execute call requests if needed
// and put particle with the results back to queue
if (result.callRequests.length > 0) {
this._execCallRequests(x, result.callRequests).then((callResults) => {
const newParticle = x.clone();
newParticle.callResults = callResults;
newParticle.data = Buffer.from([]);
particlesQueue.next(newParticle);
});
}
});
return particlesQueue;
}
private get _relayPeerId(): PeerIdB58 | null {
return this._connection?.nodePeerId?.toB58String() || null;
}
private async _execCallRequests(p: Particle, callRequests: CallRequestsArray): Promise<CallResultsArray> {
// execute all requests asynchronously
const promises = callRequests.map(([key, callRequest]) => {
const req = {
fnName: callRequest.functionName,
args: callRequest.arguments,
serviceId: callRequest.serviceId,
tetraplets: callRequest.tetraplets,
particleContext: p.getParticleContext(),
};
private async _executeIncomingParticle(particle: Particle) {
logParticle(log.debug, 'incoming particle received', particle);
// execute single requests and catch possible errors
const promise = this._execSingleCallRequest(req)
.catch(
(err): CallServiceResult => ({
retCode: ResultCodes.exceptionInHandler,
result: `Handler failed. fnName="${req.fnName}" serviceId="${
req.serviceId
}" error: ${err.toString()}`,
}),
)
.then(
(res): AvmCallServiceResult => ({
result: JSON.stringify(res.result),
retCode: res.retCode,
}),
)
.then((res): [key: number, res: AvmCallServiceResult] => [key, res]);
let request = this._requests.get(particle.id);
if (request) {
await request.receiveUpdate(particle);
} else {
request = RequestFlow.createExternal(particle);
request.handler.combineWith(this._callServiceHandler);
}
this._requests.set(request.id, request);
await this._processRequest(request);
}
private _processRequest(request: RequestFlow) {
try {
this._currentRequestId = request.id;
request.execute(this._interpreter, this._connection, this._relayPeerId);
} catch (err) {
log.error('particle processing failed: ' + err);
} finally {
this._currentRequestId = null;
}
}
private _interpreterCallback: ParticleHandler = (
serviceId: string,
fnName: string,
args: any[],
tetraplets: SecurityTetraplet[][],
): CallServiceResult => {
if (this._currentRequestId === null) {
throw Error('current request can`t be null here');
}
const request = this._requests.get(this._currentRequestId);
const particle = request.getParticle();
if (particle === null) {
throw new Error("particle can't be null here, current request id: " + this._currentRequestId);
}
const res = request.handler.execute({
serviceId,
fnName,
args,
tetraplets,
particleContext: {
particleId: request.id,
initPeerId: particle.init_peer_id,
timestamp: particle.timestamp,
ttl: particle.ttl,
signature: particle.signature,
},
return promise;
});
// don't block
const res = await Promise.all(promises);
log.debug(`Executed call service for particle id=${p.id}, Call service results: `, res);
return res;
}
private async _execSingleCallRequest(req: CallServiceData): Promise<CallServiceResult> {
const particleId = req.particleContext.particleId;
// trying particle-specific handler
const lh = this._legacyParticleSpecificHandlers.get(particleId);
let res: CallServiceResult = {
result: undefined,
retCode: undefined,
};
if (lh !== undefined) {
res = lh.handler.execute(req);
}
// if it didn't return any result trying to run the common handler
if (res?.result === undefined) {
res = this._legacyCallServiceHandler.execute(req);
}
// No result from legacy handler.
// Trying to execute async handler
if (res.retCode === undefined) {
const key = serviceFnKey(req.serviceId, req.fnName);
const psh = this._particleSpecificHandlers.get(particleId);
let handler: GenericCallServiceHandler;
// we should prioritize handler for this particle if there is one
// if particle-specific handlers exist for this particle try getting handler there
if (psh !== undefined) {
handler = psh.get(key);
}
// then try to find a common handler for all particles with this service-fn key
// if there is no particle-specific handler, get one from common map
if (handler === undefined) {
handler = this._commonHandlers.get(key);
}
// if we found a handler, execute it
// otherwise return useful error message to AVM
res = handler
? await handler(req)
: {
retCode: ResultCodes.unknownError,
result: `No handler has been registered for serviceId='${req.serviceId}' fnName='${req.fnName}' args='${req.args}'`,
};
}
if (res.result === undefined) {
log.error(
`Call to serviceId=${serviceId} fnName=${fnName} unexpectedly returned undefined result, falling back to null. Particle id=${request.id}`,
);
res.result = null;
}
return {
ret_code: res.retCode,
result: JSON.stringify(res.result),
};
};
private _initWatchDog() {
this._watchdog = setInterval(() => {
for (let key in this._requests.keys) {
if (this._requests.get(key).hasExpired()) {
this._requests.delete(key);
}
}
}, 5000); // TODO: make configurable
return res;
}
private _clearWathcDog() {
clearInterval(this._watchdog);
private _stopParticleProcessing() {
// do not hang if the peer has been stopped while some of the timeouts are still being executed
for (let item of this._timeouts) {
clearTimeout(item);
}
}
/**
* @deprecated
*/
private _legacyParticleSpecificHandlers = new Map<
string,
{
handler: LegacyCallServiceHandler;
timeout?: () => void;
error?: (reason?: any) => void;
}
>();
/**
* @deprecated
*/
private _legacyCallServiceHandler: LegacyCallServiceHandler;
}
function serviceFnKey(serviceId: string, fnName: string) {
return `${serviceId}/${fnName}`;
}
function registerDefaultServices(peer: FluencePeer) {
for (let serviceId in defaultServices) {
for (let fnName in defaultServices[serviceId]) {
const h = defaultServices[serviceId][fnName];
peer.internals.regHandler.common(serviceId, fnName, h);
}
}
}
function runInterpreter(
currentPeerId: PeerIdB58,
interpreter: AirInterpreter,
particle: Particle,
prevData: Uint8Array,
): InterpreterResult {
particle.logTo('debug', 'Sending particle to interpreter');
log.debug('prevData: ', dataToString(prevData));
log.debug('data: ', dataToString(particle.data));
const interpreterResult = interpreter.invoke(
particle.script,
prevData,
particle.data,
{
initPeerId: particle.initPeerId,
currentPeerId: currentPeerId,
},
particle.callResults,
);
const toLog: any = { ...interpreterResult };
toLog.data = dataToString(toLog.data);
log.debug('Interpreter result: ', toLog);
return interpreterResult;
}
function filterExpiredParticles() {
return pipe(
tap((p: Particle) => {
if (p.hasExpired) {
log.debug(`particle ${p.id} has expired`);
}
}),
filter((x: Particle) => !x.hasExpired()),
);
}