Switching from AVM interpreter to AVM runner with background execution (#111)

This commit is contained in:
Pavel
2021-12-28 20:53:25 +03:00
committed by GitHub
parent 4aefddecaa
commit 9d0c7b2bb8
22 changed files with 442 additions and 206 deletions

View File

@ -14,14 +14,6 @@
* limitations under the License.
*/
import {
AirInterpreter,
CallRequestsArray,
CallResultsArray,
InterpreterResult,
LogLevel,
CallServiceResult as AvmCallServiceResult,
} from '@fluencelabs/avm';
import { Multiaddr } from 'multiaddr';
import { CallServiceData, CallServiceResult, GenericCallServiceHandler, ResultCodes } from './commonTypes';
import { CallServiceHandler as LegacyCallServiceHandler } from './compilerSupport/LegacyCallServiceHandler';
@ -29,12 +21,13 @@ import { PeerIdB58 } from './commonTypes';
import { FluenceConnection } from './FluenceConnection';
import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle';
import { KeyPair } from './KeyPair';
import { createInterpreter, dataToString } from './utils';
import { filter, pipe, Subject, tap } from 'rxjs';
import { dataToString, jsonify } from './utils';
import { concatMap, filter, pipe, Subject, tap } from 'rxjs';
import { RequestFlow } from './compilerSupport/v1';
import log from 'loglevel';
import { BuiltInServiceContext, builtInServices } from './builtInServices';
import { instanceOf } from 'ts-pattern';
import { AvmRunner, InterpreterResult, LogLevel } from '@fluencelabs/avm-runner-interface';
import { AvmRunnerBackground } from '@fluencelabs/avm-runner-background';
/**
* Node of the Fluence network specified as a pair of node's multiaddr and it's peer id
@ -102,6 +95,11 @@ export interface PeerConfig {
* If the option is not set default TTL will be 7000
*/
defaultTtlMs?: number;
/**
* Plugable AVM runner implementation. If not specified AvmBackgroundRunner will be used
*/
avmRunner?: AvmRunner;
}
/**
@ -158,6 +156,7 @@ export class FluencePeer {
getStatus(): PeerStatus {
const hasKeyPair = this._keyPair !== undefined;
return {
// TODO:: use explicit mechanism for peer's state
isInitialized: hasKeyPair,
isConnected: this._connection !== undefined,
peerId: this._keyPair?.Libp2pPeerId?.toB58String() || null,
@ -182,7 +181,8 @@ export class FluencePeer {
? config?.defaultTtlMs
: DEFAULT_TTL;
this._interpreter = await createInterpreter(config?.avmLogLevel || 'off');
this._avmRunner = config?.avmRunner || new AvmRunnerBackground();
await this._avmRunner.init(config?.avmLogLevel || 'off');
if (config?.connectTo) {
let connectToMultiAddr: Multiaddr;
@ -223,9 +223,12 @@ export class FluencePeer {
* and disconnects from the Fluence network
*/
async stop() {
this._keyPair = undefined; // This will set peer to non-initialized state and stop particle processing
this._relayPeerId = null;
this._stopParticleProcessing();
await this._disconnect();
this._relayPeerId = null;
await this._avmRunner?.terminate();
this._avmRunner = undefined;
this._legacyCallServiceHandler = null;
this._particleSpecificHandlers.clear();
@ -361,7 +364,7 @@ export class FluencePeer {
private _relayPeerId: PeerIdB58 | null = null;
private _keyPair: KeyPair;
private _connection: FluenceConnection;
private _interpreter: AirInterpreter;
private _avmRunner: AvmRunner;
private _timeouts: Array<NodeJS.Timeout> = [];
private _particleQueues = new Map<string, Subject<ParticleQueueItem>>();
@ -392,6 +395,11 @@ export class FluencePeer {
});
this._outgoingParticles.subscribe(async (item) => {
// Do not send particle after the peer has been stopped
if (!this.getStatus().isInitialized) {
return;
}
if (!this._connection) {
item.particle.logTo('error', 'cannot send particle, peer is not connected');
item.onStageChange({ stage: 'sendingError' });
@ -422,14 +430,43 @@ export class FluencePeer {
.pipe(
// force new line
filterExpiredParticles(this._expireParticle.bind(this)),
concatMap(async (item) => {
// Is `.stop()` was called we need to stop particle processing immediately
if (!this.getStatus().isInitialized) {
return null;
}
// IMPORTANT!
// AVM runner execution and prevData <-> newData swapping
// MUST happen sequentially (in a critical section).
// Otherwise the race between runner might occur corrupting the prevData
const result = await runAvmRunner(
this.getStatus().peerId,
this._avmRunner,
item.particle,
prevData,
);
const newData = Buffer.from(result.data);
prevData = newData;
return {
...item,
result: result,
newData: newData,
};
}),
)
.subscribe((item) => {
const particle = item.particle;
const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, prevData);
.subscribe(async (item) => {
// Is `.stop()` was called we need to stop particle processing immediately
if (!this.getStatus().isInitialized) {
return;
}
// Do not continue if there was an error in particle interpretation
if (!isInterpretationSuccessful(result)) {
item.onStageChange({ stage: 'interpreterError', errorMessage: result.errorMessage });
if (!isInterpretationSuccessful(item.result)) {
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.errorMessage });
return;
}
@ -437,26 +474,23 @@ export class FluencePeer {
item.onStageChange({ stage: 'interpreted' });
}, 0);
const newData = Buffer.from(result.data);
prevData = newData;
// send particle further if requested
if (result.nextPeerPks.length > 0) {
const newParticle = particle.clone();
newParticle.data = newData;
if (item.result.nextPeerPks.length > 0) {
const newParticle = item.particle.clone();
newParticle.data = item.newData;
this._outgoingParticles.next({ ...item, particle: newParticle });
}
// execute call requests if needed
// and put particle with the results back to queue
if (result.callRequests.length > 0) {
for (let [key, cr] of result.callRequests) {
if (item.result.callRequests.length > 0) {
for (let [key, cr] of item.result.callRequests) {
const req = {
fnName: cr.functionName,
args: cr.arguments,
serviceId: cr.serviceId,
tetraplets: cr.tetraplets,
particleContext: particle.getParticleContext(),
particleContext: item.particle.getParticleContext(),
};
this._execSingleCallRequest(req)
@ -470,11 +504,11 @@ export class FluencePeer {
)
.then((res) => {
const serviceResult = {
result: JSON.stringify(res.result),
result: jsonify(res.result),
retCode: res.retCode,
};
const newParticle = particle.clone();
const newParticle = item.particle.clone();
newParticle.callResults = [[key, serviceResult]];
newParticle.data = Buffer.from([]);
@ -490,7 +524,7 @@ export class FluencePeer {
}
private async _execSingleCallRequest(req: CallServiceData): Promise<CallServiceResult> {
log.debug('executing call service handler', req);
log.debug('executing call service handler', jsonify(req));
const particleId = req.particleContext.particleId;
// trying particle-specific handler
@ -533,7 +567,9 @@ export class FluencePeer {
? await handler(req)
: {
retCode: ResultCodes.error,
result: `No handler has been registered for serviceId='${req.serviceId}' fnName='${req.fnName}' args='${req.args}'`,
result: `No handler has been registered for serviceId='${req.serviceId}' fnName='${
req.fnName
}' args='${jsonify(req.args)}'`,
};
}
@ -541,7 +577,7 @@ export class FluencePeer {
res.result = null;
}
log.debug('executed call service handler, req and res are: ', req, res);
log.debug('executed call service handler, req and res are: ', jsonify(req), jsonify(res));
return res;
}
@ -589,16 +625,15 @@ function registerDefaultServices(peer: FluencePeer, context: BuiltInServiceConte
}
}
function runInterpreter(
async function runAvmRunner(
currentPeerId: PeerIdB58,
interpreter: AirInterpreter,
runner: AvmRunner,
particle: Particle,
prevData: Uint8Array,
): InterpreterResult {
): Promise<InterpreterResult> {
particle.logTo('debug', 'Sending particle to interpreter');
log.debug('prevData: ', dataToString(prevData));
log.debug('data: ', dataToString(particle.data));
const interpreterResult = interpreter.invoke(
const interpreterResult = await runner.run(
particle.script,
prevData,
particle.data,
@ -613,9 +648,9 @@ function runInterpreter(
toLog.data = dataToString(toLog.data);
if (isInterpretationSuccessful(interpreterResult)) {
log.debug('Interpreter result: ', toLog);
log.debug('Interpreter result: ', jsonify(toLog));
} else {
log.error('Interpreter failed: ', toLog);
log.error('Interpreter failed: ', jsonify(toLog));
}
return interpreterResult;
}