From defe9614136b03015f031fee5769ef8419379979 Mon Sep 17 00:00:00 2001 From: Pavel Date: Wed, 17 Nov 2021 09:21:32 +0300 Subject: [PATCH] Implement peer.timeout built-in function (#101) --- package-lock.json | 10 ++- src/__test__/integration/avm.spec.ts | 86 +++++++++++++++++++++++- src/__test__/unit/builtInHandler.spec.ts | 59 ++++++++-------- src/internal/FluencePeer.ts | 80 ++++++++++------------ src/internal/Particle.ts | 1 + src/internal/compilerSupport/v2.ts | 4 ++ src/internal/defaultServices.ts | 17 +++++ 7 files changed, 180 insertions(+), 77 deletions(-) diff --git a/package-lock.json b/package-lock.json index 2de2e938..3e245613 100644 --- a/package-lock.json +++ b/package-lock.json @@ -5580,7 +5580,10 @@ "name": "@achingbrain/node-fetch", "version": "2.6.7", "resolved": "https://registry.npmjs.org/@achingbrain/node-fetch/-/node-fetch-2.6.7.tgz", - "integrity": "sha512-iTASGs+HTFK5E4ZqcMsHmeJ4zodyq8L38lZV33jwqcBJYoUt3HjN4+ot+O9/0b+ke8ddE7UgOtVuZN/OkV19/g==" + "integrity": "sha512-iTASGs+HTFK5E4ZqcMsHmeJ4zodyq8L38lZV33jwqcBJYoUt3HjN4+ot+O9/0b+ke8ddE7UgOtVuZN/OkV19/g==", + "engines": { + "node": "4.x || >=6.0.0" + } }, "node_modules/node-forge": { "version": "0.10.0", @@ -7252,11 +7255,6 @@ "safer-buffer": "^2.0.2", "tweetnacl": "~0.14.0" }, - "bin": { - "sshpk-conv": "bin/sshpk-conv", - "sshpk-sign": "bin/sshpk-sign", - "sshpk-verify": "bin/sshpk-verify" - }, "engines": { "node": ">=0.10.0" } diff --git a/src/__test__/integration/avm.spec.ts b/src/__test__/integration/avm.spec.ts index a9b90e27..46f6dc12 100644 --- a/src/__test__/integration/avm.spec.ts +++ b/src/__test__/integration/avm.spec.ts @@ -1,4 +1,4 @@ -import { FluencePeer } from '../../index'; +import { FluencePeer, setLogLevel } from '../../index'; import { Particle } from '../../internal/Particle'; import { handleTimeout } from '../../internal/utils'; import { registerHandlersHelper } from '../util'; @@ -72,4 +72,88 @@ describe('Avm spec', () => { await peer.stop(); }); + + it('Timeout in par call: race', async () => { + // arrange + const peer = new FluencePeer(); + await peer.start(); + + // act + const promise = new Promise((resolve, reject) => { + const script = ` + (seq + (call %init_peer_id% ("op" "identity") ["slow_result"] arg) + (seq + (par + (call %init_peer_id% ("peer" "timeout") [1000 arg] $result) + (call %init_peer_id% ("op" "identity") ["fast_result"] $result) + ) + (call %init_peer_id% ("return" "return") [$result.$[0]]) + ) + ) + `; + const particle = Particle.createNew(script); + registerHandlersHelper(peer, particle, { + return: { + return: (args) => { + resolve(args[0]); + }, + }, + }); + + peer.internals.initiateParticle(particle, handleTimeout(reject)); + }); + + // assert + const res = await promise; + expect(res).toBe('fast_result'); + + await peer.stop(); + }); + + it('Timeout in par call: wait', async () => { + // arrange + const peer = new FluencePeer(); + await peer.start(); + + // act + const promise = new Promise((resolve, reject) => { + const script = ` + (seq + (call %init_peer_id% ("op" "identity") ["timeout_msg"] arg) + (seq + (seq + (par + (call %init_peer_id% ("peer" "timeout") [1000 arg] $ok_or_err) + (call "invalid_peer" ("op" "identity") ["never"] $ok_or_err) + ) + (xor + (match $ok_or_err.$[0] "timeout_msg" + (ap "failed_with_timeout" $result) + ) + (ap "impossible happened" $result) + ) + ) + (call %init_peer_id% ("return" "return") [$result.$[0]]) + ) + ) + `; + const particle = Particle.createNew(script); + registerHandlersHelper(peer, particle, { + return: { + return: (args) => { + resolve(args[0]); + }, + }, + }); + + peer.internals.initiateParticle(particle, handleTimeout(reject)); + }); + + // assert + const res = await promise; + expect(res).toBe('failed_with_timeout'); + + await peer.stop(); + }); }); diff --git a/src/__test__/unit/builtInHandler.spec.ts b/src/__test__/unit/builtInHandler.spec.ts index ef3a4336..50cb4943 100644 --- a/src/__test__/unit/builtInHandler.spec.ts +++ b/src/__test__/unit/builtInHandler.spec.ts @@ -5,39 +5,44 @@ import { defaultServices } from '../../internal/defaultServices'; describe('Tests for default handler', () => { // prettier-ignore each` - fnName | args | retCode | result - ${'identity'} | ${[]} | ${0} | ${{}} - ${'identity'} | ${[1]} | ${0} | ${1} - ${'identity'} | ${[1, 2]} | ${1} | ${'identity accepts up to 1 arguments, received 2 arguments'} - - ${'noop'} | ${[1, 2]} | ${0} | ${{}} - - ${'array'} | ${[1, 2, 3]} | ${0} | ${[1, 2, 3]} - - ${'concat'} | ${[[1, 2], [3, 4], [5, 6]]} | ${0} | ${[1, 2, 3, 4, 5, 6]} - ${'concat'} | ${[[1, 2]]} | ${0} | ${[1, 2]} - ${'concat'} | ${[]} | ${0} | ${[]} - ${'concat'} | ${[1, [1, 2], 1]} | ${1} | ${"All arguments of 'concat' must be arrays: arguments 0, 2 are not"} + serviceId | fnName | args | retCode | result + ${'op'} | ${'identity'} | ${[]} | ${0} | ${{}} + ${'op'} | ${'identity'} | ${[1]} | ${0} | ${1} + ${'op'} | ${'identity'} | ${[1, 2]} | ${1} | ${'identity accepts up to 1 arguments, received 2 arguments'} - ${'string_to_b58'} | ${["test"]} | ${0} | ${"3yZe7d"} - ${'string_to_b58'} | ${["test", 1]} | ${1} | ${"string_to_b58 accepts only one string argument"} - - ${'string_from_b58'} | ${["3yZe7d"]} | ${0} | ${"test"} - ${'string_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"string_from_b58 accepts only one string argument"} - - ${'bytes_to_b58'} | ${[[116, 101, 115, 116]]} | ${0} | ${"3yZe7d"} - ${'bytes_to_b58'} | ${[[116, 101, 115, 116], 1]} | ${1} | ${"bytes_to_b58 accepts only single argument: array of numbers"} - - ${'bytes_from_b58'} | ${["3yZe7d"]} | ${0} | ${[116, 101, 115, 116]} - ${'bytes_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"bytes_from_b58 accepts only one string argument"} + ${'op'} | ${'noop'} | ${[1, 2]} | ${0} | ${{}} -`.test( + ${'op'} | ${'array'} | ${[1, 2, 3]} | ${0} | ${[1, 2, 3]} + + ${'op'} | ${'concat'} | ${[[1, 2], [3, 4], [5, 6]]} | ${0} | ${[1, 2, 3, 4, 5, 6]} + ${'op'} | ${'concat'} | ${[[1, 2]]} | ${0} | ${[1, 2]} + ${'op'} | ${'concat'} | ${[]} | ${0} | ${[]} + ${'op'} | ${'concat'} | ${[1, [1, 2], 1]} | ${1} | ${"All arguments of 'concat' must be arrays: arguments 0, 2 are not"} + + ${'op'} | ${'string_to_b58'} | ${["test"]} | ${0} | ${"3yZe7d"} + ${'op'} | ${'string_to_b58'} | ${["test", 1]} | ${1} | ${"string_to_b58 accepts only one string argument"} + + ${'op'} | ${'string_from_b58'} | ${["3yZe7d"]} | ${0} | ${"test"} + ${'op'} | ${'string_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"string_from_b58 accepts only one string argument"} + + ${'op'} | ${'bytes_to_b58'} | ${[[116, 101, 115, 116]]} | ${0} | ${"3yZe7d"} + ${'op'} | ${'bytes_to_b58'} | ${[[116, 101, 115, 116], 1]} | ${1} | ${"bytes_to_b58 accepts only single argument: array of numbers"} + + ${'op'} | ${'bytes_from_b58'} | ${["3yZe7d"]} | ${0} | ${[116, 101, 115, 116]} + ${'op'} | ${'bytes_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"bytes_from_b58 accepts only one string argument"} + + ${'peer'} | ${'timeout'} | ${[200, []]} | ${0} | ${[]}} + ${'peer'} | ${'timeout'} | ${[200, ['test']]} | ${0} | ${['test']}} + ${'peer'} | ${'timeout'} | ${[]} | ${1} | ${'timeout accepts exactly two arguments: timeout duration in ms and an optional message string'}} + ${'peer'} | ${'timeout'} | ${[200, 'test', 1]} | ${1} | ${'timeout accepts exactly two arguments: timeout duration in ms and an optional message string'}} + + `.test( // '$fnName with $args expected retcode: $retCode and result: $result', - async ({ fnName, args, retCode, result }) => { + async ({ serviceId, fnName, args, retCode, result }) => { // arrange const req: CallServiceData = { - serviceId: 'op', + serviceId: serviceId, fnName: fnName, args: args, tetraplets: [], diff --git a/src/internal/FluencePeer.ts b/src/internal/FluencePeer.ts index 60ea10ea..d795072a 100644 --- a/src/internal/FluencePeer.ts +++ b/src/internal/FluencePeer.ts @@ -389,6 +389,11 @@ export class FluencePeer { }); this._outgoingParticles.subscribe(async (item) => { + if (!this._connection) { + item.particle.logTo('error', 'cannot send particle, peer is not connected'); + item.onStageChange({ stage: 'sendingError' }); + return; + } await this._connection.sendParticle(item.particle); item.onStageChange({ stage: 'sent' }); }); @@ -442,13 +447,37 @@ export class FluencePeer { // execute call requests if needed // and put particle with the results back to queue if (result.callRequests.length > 0) { - this._execCallRequests(particle, result.callRequests).then((callResults) => { - const newParticle = particle.clone(); - newParticle.callResults = callResults; - newParticle.data = Buffer.from([]); + for (let [key, cr] of result.callRequests) { + const req = { + fnName: cr.functionName, + args: cr.arguments, + serviceId: cr.serviceId, + tetraplets: cr.tetraplets, + particleContext: particle.getParticleContext(), + }; - particlesQueue.next({ ...item, particle: newParticle }); - }); + this._execSingleCallRequest(req) + .catch( + (err): CallServiceResult => ({ + retCode: ResultCodes.exceptionInHandler, + result: `Handler failed. fnName="${req.fnName}" serviceId="${ + req.serviceId + }" error: ${err.toString()}`, + }), + ) + .then((res) => { + const serviceResult = { + result: JSON.stringify(res.result), + retCode: res.retCode, + }; + + const newParticle = particle.clone(); + newParticle.callResults = [[key, serviceResult]]; + newParticle.data = Buffer.from([]); + + particlesQueue.next({ ...item, particle: newParticle }); + }); + } } else { item.onStageChange({ stage: 'localWorkDone' }); } @@ -457,44 +486,8 @@ export class FluencePeer { return particlesQueue; } - private async _execCallRequests(p: Particle, callRequests: CallRequestsArray): Promise { - // execute all requests asynchronously - const promises = callRequests.map(([key, callRequest]) => { - const req = { - fnName: callRequest.functionName, - args: callRequest.arguments, - serviceId: callRequest.serviceId, - tetraplets: callRequest.tetraplets, - particleContext: p.getParticleContext(), - }; - - // execute single requests and catch possible errors - const promise = this._execSingleCallRequest(req) - .catch( - (err): CallServiceResult => ({ - retCode: ResultCodes.exceptionInHandler, - result: `Handler failed. fnName="${req.fnName}" serviceId="${ - req.serviceId - }" error: ${err.toString()}`, - }), - ) - .then( - (res): AvmCallServiceResult => ({ - result: JSON.stringify(res.result), - retCode: res.retCode, - }), - ) - .then((res): [key: number, res: AvmCallServiceResult] => [key, res]); - - return promise; - }); - // don't block - const res = await Promise.all(promises); - log.debug(`Executed call service for particle id=${p.id}, Call service results: `, res); - return res; - } - private async _execSingleCallRequest(req: CallServiceData): Promise { + log.debug('executing call service handler', req); const particleId = req.particleContext.particleId; // trying particle-specific handler @@ -545,6 +538,7 @@ export class FluencePeer { res.result = null; } + log.debug('executed call service handler, req and res are: ', req, res); return res; } diff --git a/src/internal/Particle.ts b/src/internal/Particle.ts index f22c4e37..dba8747d 100644 --- a/src/internal/Particle.ts +++ b/src/internal/Particle.ts @@ -146,6 +146,7 @@ export type ParticleExecutionStage = | { stage: 'interpreterError'; errorMessage: string } | { stage: 'localWorkDone' } | { stage: 'sent' } + | { stage: 'sendingError' } | { stage: 'expired' }; export interface ParticleQueueItem { diff --git a/src/internal/compilerSupport/v2.ts b/src/internal/compilerSupport/v2.ts index ffc17c2f..c2fa351f 100644 --- a/src/internal/compilerSupport/v2.ts +++ b/src/internal/compilerSupport/v2.ts @@ -328,6 +328,10 @@ export function callFunction(rawFnArgs: Array, def: FunctionCallDef, script resolve(undefined); } + if (stage.stage === 'sendingError') { + reject(`Could not send particle for ${def.functionName}: not connected`); + } + if (stage.stage === 'expired') { reject(`Request timed out after ${particle.ttl} for ${def.functionName}`); } diff --git a/src/internal/defaultServices.ts b/src/internal/defaultServices.ts index 8fd71214..2f19ffc7 100644 --- a/src/internal/defaultServices.ts +++ b/src/internal/defaultServices.ts @@ -99,6 +99,23 @@ export const defaultServices: { [serviceId in string]: { [fnName in string]: Gen }, peer: { + timeout: (req) => { + if (req.args.length !== 2) { + return error( + 'timeout accepts exactly two arguments: timeout duration in ms and an optional message string', + ); + } + const durationMs = req.args[0]; + const message = req.args[1]; + + return new Promise((resolve) => { + setTimeout(() => { + const res = success(message); + resolve(res); + }, durationMs); + }); + }, + identify: (req) => { return error('The JS implementation of Peer does not support identify'); },