From aa21abe465dcd0ec2e45a55c73cb749220afdde1 Mon Sep 17 00:00:00 2001 From: Pavel Date: Tue, 9 Nov 2021 14:37:44 +0300 Subject: [PATCH] Extend error handling in FluencePeer (#98) --- package-lock.json | 14 +- package.json | 2 +- src/__test__/integration/avm.spec.ts | 7 +- .../integration/compiler/compiler.spec.ts | 37 ++++- src/__test__/integration/peer.spec.ts | 38 ++++-- src/__test__/util.ts | 10 +- src/index.ts | 2 +- src/internal/FluencePeer.ts | 126 +++++++++++------- src/internal/Particle.ts | 13 ++ src/internal/commonTypes.ts | 18 ++- src/internal/compilerSupport/v2.ts | 29 ++-- src/internal/utils.ts | 21 ++- 12 files changed, 214 insertions(+), 103 deletions(-) diff --git a/package-lock.json b/package-lock.json index 20a34f12..6730dfc1 100644 --- a/package-lock.json +++ b/package-lock.json @@ -10,7 +10,7 @@ "license": "Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "4.0.0", - "@fluencelabs/avm": "0.15.4", + "@fluencelabs/avm": "0.16.6", "async": "3.2.0", "base64-js": "1.5.1", "bs58": "4.0.1", @@ -646,9 +646,9 @@ } }, "node_modules/@fluencelabs/avm": { - "version": "0.15.4", - "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.15.4.tgz", - "integrity": "sha512-NLZDq83ocJ1Helm0D8kPMSSkjxH0y+Tujg0px773zjIShbh3jgiJOjAW1xCYgTt9K0LqepjP0bWX4/8nUZfr7g==", + "version": "0.16.6", + "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.16.6.tgz", + "integrity": "sha512-RDNXW/VYAXh+E7B7+S4pTTc/1IcvtlID2xyBs/3TDlxkjbVxM7+vMcFL6cJZOzZZl+3oAWXL3ibDhE5Elcq6ug==", "dependencies": { "base64-js": "1.5.1" } @@ -8689,9 +8689,9 @@ } }, "@fluencelabs/avm": { - "version": "0.15.4", - "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.15.4.tgz", - "integrity": "sha512-NLZDq83ocJ1Helm0D8kPMSSkjxH0y+Tujg0px773zjIShbh3jgiJOjAW1xCYgTt9K0LqepjP0bWX4/8nUZfr7g==", + "version": "0.16.6", + "resolved": "https://registry.npmjs.org/@fluencelabs/avm/-/avm-0.16.6.tgz", + "integrity": "sha512-RDNXW/VYAXh+E7B7+S4pTTc/1IcvtlID2xyBs/3TDlxkjbVxM7+vMcFL6cJZOzZZl+3oAWXL3ibDhE5Elcq6ug==", "requires": { "base64-js": "1.5.1" } diff --git a/package.json b/package.json index 59c65c56..080a12ad 100644 --- a/package.json +++ b/package.json @@ -21,7 +21,7 @@ "license": "Apache-2.0", "dependencies": { "@chainsafe/libp2p-noise": "4.0.0", - "@fluencelabs/avm": "0.15.4", + "@fluencelabs/avm": "0.16.6", "async": "3.2.0", "base64-js": "1.5.1", "bs58": "4.0.1", diff --git a/src/__test__/integration/avm.spec.ts b/src/__test__/integration/avm.spec.ts index 82532711..a9b90e27 100644 --- a/src/__test__/integration/avm.spec.ts +++ b/src/__test__/integration/avm.spec.ts @@ -1,5 +1,6 @@ import { FluencePeer } from '../../index'; import { Particle } from '../../internal/Particle'; +import { handleTimeout } from '../../internal/utils'; import { registerHandlersHelper } from '../util'; describe('Avm spec', () => { @@ -21,10 +22,9 @@ describe('Avm spec', () => { resolve(res); }, }, - _timeout: reject, }); - peer.internals.initiateParticle(particle); + peer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -61,10 +61,9 @@ describe('Avm spec', () => { } }, }, - _timeout: reject, }); - peer.internals.initiateParticle(particle); + peer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert diff --git a/src/__test__/integration/compiler/compiler.spec.ts b/src/__test__/integration/compiler/compiler.spec.ts index cfa6a139..771867ca 100644 --- a/src/__test__/integration/compiler/compiler.spec.ts +++ b/src/__test__/integration/compiler/compiler.spec.ts @@ -2,6 +2,8 @@ import { Fluence, FluencePeer } from '../../..'; import { Particle } from '../../../internal/Particle'; import { registerHandlersHelper } from '../../util'; import { callMeBack, registerHelloWorld } from './gen1'; +import { callFunction } from '../../../internal/compilerSupport/v2'; +import { handleTimeout } from '../../../internal/utils'; describe('Compiler support infrastructure tests', () => { it('Compiled code for function should work', async () => { @@ -78,10 +80,9 @@ describe('Compiler support infrastructure tests', () => { resolve(val); }, }, - _timeout: reject, }); - Fluence.getPeer().internals.initiateParticle(particle); + Fluence.getPeer().internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -166,9 +167,8 @@ describe('Compiler support infrastructure tests', () => { resolve(val); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -177,4 +177,33 @@ describe('Compiler support infrastructure tests', () => { await anotherPeer.stop(); }); + + it('Should throw error if particle with incorrect AIR script is initiated', async () => { + // arrange; + const anotherPeer = new FluencePeer(); + await anotherPeer.start(); + + // act + const action = callFunction( + [anotherPeer], + { + functionName: 'dontcare', + argDefs: [], + returnType: { tag: 'void' }, + names: { + relay: '-relay-', + getDataSrv: 'getDataSrv', + callbackSrv: 'callbackSrv', + responseSrv: 'callbackSrv', + responseFnName: 'response', + errorHandlingSrv: 'errorHandlingSrv', + errorFnName: 'error', + }, + }, + 'incorrect air script', + ); + + // assert + await expect(action).rejects.toMatch(/incorrect air script/); + }); }); diff --git a/src/__test__/integration/peer.spec.ts b/src/__test__/integration/peer.spec.ts index 69a62f29..fe5e100f 100644 --- a/src/__test__/integration/peer.spec.ts +++ b/src/__test__/integration/peer.spec.ts @@ -1,7 +1,7 @@ import { Multiaddr } from 'multiaddr'; import { nodes } from '../connection'; import { Fluence, FluencePeer, setLogLevel } from '../../index'; -import { checkConnection } from '../../internal/utils'; +import { checkConnection, doNothing, handleTimeout } from '../../internal/utils'; import { Particle } from '../../internal/Particle'; import { registerHandlersHelper } from '../util'; @@ -121,10 +121,9 @@ describe('Typescript usage suite', () => { reject(error); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -169,7 +168,7 @@ describe('Typescript usage suite', () => { ) `; const particle = Particle.createNew(script); - await peer1.internals.initiateParticle(particle); + await peer1.internals.initiateParticle(particle, doNothing); // assert const res = await resMakingPromise; @@ -309,10 +308,9 @@ describe('Typescript usage suite', () => { resolve(res); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -368,10 +366,9 @@ describe('Typescript usage suite', () => { reject(error); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -381,7 +378,6 @@ describe('Typescript usage suite', () => { it('Should not crash if an error ocurred in user-defined handler', async () => { // arrange; - setLogLevel('trace'); await anotherPeer.start(); // act @@ -405,10 +401,9 @@ describe('Typescript usage suite', () => { reject(error); }, }, - _timeout: reject, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, handleTimeout(reject)); }); // assert @@ -417,6 +412,22 @@ describe('Typescript usage suite', () => { }); }); + it('Should throw error if particle is initiated on a stopped peer', async () => { + // arrange; + const stoppedPeer = new FluencePeer(); + + // act + const action = () => { + const script = `(null)`; + const particle = Particle.createNew(script); + + stoppedPeer.internals.initiateParticle(particle, doNothing); + }; + + // assert + await expect(action).toThrow('Cannot initiate new particle: peer is not initialized'); + }); + it.skip('Should throw correct error when the client tries to send a particle not to the relay', async () => { // arrange; await anotherPeer.start({ connectTo: nodes[0] }); @@ -439,7 +450,7 @@ describe('Typescript usage suite', () => { }, }); - anotherPeer.internals.initiateParticle(particle); + anotherPeer.internals.initiateParticle(particle, doNothing); }); // assert @@ -468,10 +479,9 @@ async function callIncorrectService(peer: FluencePeer): Promise { reject(error); }, }, - _timeout: reject, }); - peer.internals.initiateParticle(particle); + peer.internals.initiateParticle(particle, handleTimeout(reject)); }); return promise; diff --git a/src/__test__/util.ts b/src/__test__/util.ts index 770152dc..d7e3e895 100644 --- a/src/__test__/util.ts +++ b/src/__test__/util.ts @@ -3,14 +3,10 @@ import { Particle } from '../internal/Particle'; import { MakeServiceCall } from '../internal/utils'; export const registerHandlersHelper = (peer: FluencePeer, particle: Particle, handlers) => { - const { _timeout, ...rest } = handlers; - if (_timeout) { - peer.internals.regHandler.timeout(particle.id, _timeout); - } - for (let serviceId in rest) { - for (let fnName in rest[serviceId]) { + for (let serviceId in handlers) { + for (let fnName in handlers[serviceId]) { // of type [args] => result - const h = rest[serviceId][fnName]; + const h = handlers[serviceId][fnName]; peer.internals.regHandler.forParticle(particle.id, serviceId, fnName, MakeServiceCall(h)); } } diff --git a/src/index.ts b/src/index.ts index f75af068..92a0b7ab 100644 --- a/src/index.ts +++ b/src/index.ts @@ -44,7 +44,7 @@ export const Fluence = { }, /** - * Uninitializes the default peer: stops all the underltying workflows, stops the Aqua VM + * Un-initializes the default peer: stops all the underlying workflows, stops the Aqua VM * and disconnects from the Fluence network */ stop: (): Promise => { diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 285db03a..f2bc58e9 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -27,13 +27,14 @@ import { CallServiceData, CallServiceResult, GenericCallServiceHandler, ResultCo import { CallServiceHandler as LegacyCallServiceHandler } from './compilerSupport/LegacyCallServiceHandler'; import { PeerIdB58 } from './commonTypes'; import { FluenceConnection } from './FluenceConnection'; -import { Particle } from './Particle'; +import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle'; import { KeyPair } from './KeyPair'; import { createInterpreter, dataToString } from './utils'; import { filter, pipe, Subject, tap } from 'rxjs'; import { RequestFlow } from './compilerSupport/v1'; import log from 'loglevel'; import { defaultServices } from './defaultServices'; +import { instanceOf } from 'ts-pattern'; /** * Node of the Fluence network specified as a pair of node's multiaddr and it's peer id @@ -202,7 +203,7 @@ export class FluencePeer { peerId: this._keyPair.Libp2pPeerId, relayAddress: connectToMultiAddr, dialTimeoutMs: config.dialTimeoutMs, - onIncomingParticle: (p) => this._incomingParticles.next(p), + onIncomingParticle: (p) => this._incomingParticles.next({ particle: p, onStageChange: () => {} }), }); await this._connect(); @@ -226,7 +227,6 @@ export class FluencePeer { this._particleSpecificHandlers.clear(); this._commonHandlers.clear(); - this._timeoutHandlers.clear(); } // internal api @@ -240,7 +240,11 @@ export class FluencePeer { * Initiates a new particle execution starting from local peer * @param particle - particle to start execution of */ - initiateParticle: (particle: Particle): void => { + initiateParticle: (particle: Particle, onStageChange: (stage: ParticleExecutionStage) => void): void => { + if (!this.getStatus().isInitialized) { + throw 'Cannot initiate new particle: peer is not initialized'; + } + if (particle.initPeerId === undefined) { particle.initPeerId = this.getStatus().peerId; } @@ -249,8 +253,12 @@ export class FluencePeer { particle.ttl = this._defaultTTL; } - this._incomingParticles.next(particle); + this._incomingParticles.next({ + particle: particle, + onStageChange: onStageChange, + }); }, + /** * Register Call Service handler functions */ @@ -283,12 +291,6 @@ export class FluencePeer { psh.set(serviceFnKey(serviceId, fnName), handler); }, - /** - * Register handler which will be called upon particle timeout - */ - timeout: (particleId: string, handler: () => void) => { - this._timeoutHandlers.set(particleId, handler); - }, }, /** @@ -303,7 +305,15 @@ export class FluencePeer { timeout: request.timeout, }); - this.internals.initiateParticle(particle); + this.internals.initiateParticle(particle, (stage) => { + if (stage.stage === 'interpreterError') { + request?.error(stage.errorMessage); + } + + if (stage.stage === 'expired') { + request?.timeout(); + } + }); }, /** @@ -334,14 +344,13 @@ export class FluencePeer { // Queues for incoming and outgoing particles - private _incomingParticles = new Subject(); - private _outgoingParticles = new Subject(); + private _incomingParticles = new Subject(); + private _outgoingParticles = new Subject(); // Call service handler private _particleSpecificHandlers = new Map>(); private _commonHandlers = new Map(); - private _timeoutHandlers = new Map void>(); // Internal peer state @@ -351,15 +360,18 @@ export class FluencePeer { private _connection: FluenceConnection; private _interpreter: AirInterpreter; private _timeouts: Array = []; - private _particleQueues = new Map>(); + private _particleQueues = new Map>(); private _startParticleProcessing() { this._incomingParticles .pipe( - tap((x) => x.logTo('debug', 'particle received:')), + tap((x) => { + x.particle.logTo('debug', 'particle received:'); + }), filterExpiredParticles(this._expireParticle.bind(this)), ) - .subscribe((p) => { + .subscribe((item) => { + const p = item.particle; let particlesQueue = this._particleQueues.get(p.id); if (!particlesQueue) { @@ -367,34 +379,35 @@ export class FluencePeer { this._particleQueues.set(p.id, particlesQueue); const timeout = setTimeout(() => { - this._expireParticle(p.id); + this._expireParticle(item); }, p.actualTtl()); this._timeouts.push(timeout); } - particlesQueue.next(p); + particlesQueue.next(item); }); - this._outgoingParticles.subscribe((p) => { - this._connection.sendParticle(p); + this._outgoingParticles.subscribe(async (item) => { + await this._connection.sendParticle(item.particle); + item.onStageChange({ stage: 'sent' }); }); } - private _expireParticle(particleId: string) { - log.debug(`particle ${particleId} has expired. Deleting particle-related queues and handlers`); + private _expireParticle(item: ParticleQueueItem) { + const particleId = item.particle.id; + log.debug( + `particle ${particleId} has expired after ${item.particle.ttl}. Deleting particle-related queues and handlers`, + ); this._particleQueues.delete(particleId); - const timeoutHandler = this._timeoutHandlers.get(particleId); - if (timeoutHandler) { - timeoutHandler(); - } this._particleSpecificHandlers.delete(particleId); - this._timeoutHandlers.delete(particleId); + + item.onStageChange({ stage: 'expired' }); } private _createParticlesProcessingQueue() { - let particlesQueue = new Subject(); + let particlesQueue = new Subject(); let prevData: Uint8Array = Buffer.from([]); particlesQueue @@ -402,28 +415,42 @@ export class FluencePeer { // force new line filterExpiredParticles(this._expireParticle.bind(this)), ) - .subscribe((x) => { - const result = runInterpreter(this.getStatus().peerId, this._interpreter, x, prevData); + .subscribe((item) => { + const particle = item.particle; + const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, prevData); - prevData = Buffer.from(result.data); + // Do not continue if there was an error in particle interpretation + if (isInterpretationSuccessful(result)) { + item.onStageChange({ stage: 'interpreterError', errorMessage: result.errorMessage }); + return; + } + + setTimeout(() => { + item.onStageChange({ stage: 'interpreted' }); + }, 0); + + const newData = Buffer.from(result.data); + prevData = newData; // send particle further if requested if (result.nextPeerPks.length > 0) { - const newParticle = x.clone(); - newParticle.data = prevData; - this._outgoingParticles.next(newParticle); + const newParticle = particle.clone(); + newParticle.data = newData; + this._outgoingParticles.next({ ...item, particle: newParticle }); } // execute call requests if needed // and put particle with the results back to queue if (result.callRequests.length > 0) { - this._execCallRequests(x, result.callRequests).then((callResults) => { - const newParticle = x.clone(); + this._execCallRequests(particle, result.callRequests).then((callResults) => { + const newParticle = particle.clone(); newParticle.callResults = callResults; newParticle.data = Buffer.from([]); - particlesQueue.next(newParticle); + particlesQueue.next({ ...item, particle: newParticle }); }); + } else { + item.onStageChange({ stage: 'localWorkDone' }); } }); @@ -547,6 +574,10 @@ export class FluencePeer { private _legacyCallServiceHandler: LegacyCallServiceHandler; } +function isInterpretationSuccessful(result: InterpreterResult) { + return result.retCode !== 0 || result?.errorMessage?.length > 0; +} + function serviceFnKey(serviceId: string, fnName: string) { return `${serviceId}/${fnName}`; } @@ -582,17 +613,22 @@ function runInterpreter( const toLog: any = { ...interpreterResult }; toLog.data = dataToString(toLog.data); - log.debug('Interpreter result: ', toLog); + + if (isInterpretationSuccessful(interpreterResult)) { + log.debug('Interpreter result: ', toLog); + } else { + log.error('Interpreter failed: ', toLog); + } return interpreterResult; } -function filterExpiredParticles(onParticleExpiration: (particleId: string) => void) { +function filterExpiredParticles(onParticleExpiration: (item: ParticleQueueItem) => void) { return pipe( - tap((p: Particle) => { - if (p.hasExpired()) { - onParticleExpiration(p.id); + tap((item: ParticleQueueItem) => { + if (item.particle.hasExpired()) { + onParticleExpiration(item); } }), - filter((x: Particle) => !x.hasExpired()), + filter((x: ParticleQueueItem) => !x.particle.hasExpired()), ); } diff --git a/src/internal/Particle.ts b/src/internal/Particle.ts index 519ec66b..f22c4e37 100644 --- a/src/internal/Particle.ts +++ b/src/internal/Particle.ts @@ -140,6 +140,19 @@ export class Particle { } } +export type ParticleExecutionStage = + | { stage: 'received' } + | { stage: 'interpreted' } + | { stage: 'interpreterError'; errorMessage: string } + | { stage: 'localWorkDone' } + | { stage: 'sent' } + | { stage: 'expired' }; + +export interface ParticleQueueItem { + particle: Particle; + onStageChange: (state: ParticleExecutionStage) => void; +} + function genUUID() { return uuidv4(); } diff --git a/src/internal/commonTypes.ts b/src/internal/commonTypes.ts index 34ba66ef..02d2e92d 100644 --- a/src/internal/commonTypes.ts +++ b/src/internal/commonTypes.ts @@ -68,12 +68,28 @@ export enum ResultCodes { */ export interface ParticleContext { /** - * The particle ID + * The identifier of particle which triggered the call */ particleId: string; + + /** + * The peer id which created the particle + */ initPeerId: PeerIdB58; + + /** + * Particle's timestamp when it was created + */ timestamp: number; + + /** + * Time to live in milliseconds. The time after the particle should be expired + */ ttl: number; + + /** + * Particle's signature + */ signature: string; } diff --git a/src/internal/compilerSupport/v2.ts b/src/internal/compilerSupport/v2.ts index 0a7d8088..5c2fccd9 100644 --- a/src/internal/compilerSupport/v2.ts +++ b/src/internal/compilerSupport/v2.ts @@ -320,22 +320,25 @@ export function callFunction(rawFnArgs: Array, def: FunctionCallDef, script }; }); - // registering handler for particle timeout - peer.internals.regHandler.timeout(particle.id, () => { - reject(`Request timed out for ${def.functionName}`); - }); + peer.internals.initiateParticle(particle, (stage) => { + // If function is void, then it's completed when one of the two conditions is met: + // 1. The particle is sent to the network (state 'sent') + // 2. All CallRequests are executed, e.g., all variable loading and local function calls are completed (state 'localWorkDone') + if (def.returnType.tag === 'void' && (stage.stage === 'sent' || stage.stage === 'localWorkDone')) { + resolve(undefined); + } - peer.internals.initiateParticle(particle); + if (stage.stage === 'expired') { + reject(`Request timed out after ${particle.ttl} for ${def.functionName}`); + } + + if (stage.stage === 'interpreterError') { + reject(`Script interpretation failed for ${def.functionName}: ${stage.errorMessage}`); + } + }); }); - // if the function has void type we should resolve immediately for API symmetry with non-void types - // to help with debugging we are returning a promise which can be used to track particle errors - // we cannot return a bare promise because JS will lift it, so returning an array with the promise - if (def.returnType.tag === 'void') { - return Promise.resolve([promise]); - } else { - return promise; - } + return promise; } /** diff --git a/src/internal/utils.ts b/src/internal/utils.ts index aa3b198b..2146dc6b 100644 --- a/src/internal/utils.ts +++ b/src/internal/utils.ts @@ -18,7 +18,7 @@ import { AirInterpreter, LogLevel as AvmLogLevel } from '@fluencelabs/avm'; import log from 'loglevel'; import { CallServiceData, CallServiceResult, CallServiceResultType, ResultCodes } from './commonTypes'; import { AvmLoglevel, FluencePeer } from './FluencePeer'; -import { Particle } from './Particle'; +import { Particle, ParticleExecutionStage } from './Particle'; export const createInterpreter = (logLevel: AvmLoglevel): Promise => { const logFn = (level: AvmLogLevel, msg: string) => { @@ -53,6 +53,14 @@ export const MakeServiceCall = (fn: (args: any[]) => CallServiceResultType) => { }; }; +export const handleTimeout = (fn: Function) => (stage: ParticleExecutionStage) => { + if (stage.stage === 'expired') { + fn(); + } +}; + +export const doNothing = (stage: ParticleExecutionStage) => {}; + /** * Checks the network connection by sending a ping-like request to relay node * @param { FluenceClient } peer - The Fluence Client instance. @@ -127,11 +135,12 @@ export const checkConnection = async (peer: FluencePeer, ttl?: number): Promise< }), ); - peer.internals.regHandler.timeout(particle.id, () => { - reject('particle timed out'); - }); - - peer.internals.initiateParticle(particle); + peer.internals.initiateParticle( + particle, + handleTimeout(() => { + reject('particle timed out'); + }), + ); }); try {