diff --git a/src/__test__/integration/builtins.spec.ts b/src/__test__/integration/builtins.spec.ts index 18af9b33..db31fddc 100644 --- a/src/__test__/integration/builtins.spec.ts +++ b/src/__test__/integration/builtins.spec.ts @@ -71,7 +71,7 @@ describe('Builtins usage suite', () => { let base64 = 'MjNy'; - await uploadModule(client, 'test_broken_module', base64, config); + await uploadModule(client, 'test_broken_module', base64, config, 10000); }); it('add_blueprint', async function () { @@ -101,9 +101,9 @@ describe('Builtins usage suite', () => { let buf = Buffer.from(key); let r = Math.random().toString(36).substring(7); - await addProvider(client, buf, dev2peerId, r); + await addProvider(client, buf, dev2peerId, r, undefined, 10000); - let pr = await getProviders(client, buf); + let pr = await getProviders(client, buf, undefined, 10000); console.log(pr); console.log(r); expect(r).toEqual(pr[0][0].service_id); diff --git a/src/__test__/unit/air.spec.ts b/src/__test__/unit/air.spec.ts index 97547b0a..577fd390 100644 --- a/src/__test__/unit/air.spec.ts +++ b/src/__test__/unit/air.spec.ts @@ -44,6 +44,30 @@ describe('== AIR suite', () => { expect(res).toEqual(arg); }); + it('call broken script', async function () { + const client = await createLocalClient(); + + const script = `(htyth)`; + + await expect(client.sendScript(script)).rejects.toContain("aqua script can't be parsed"); + }); + + it('call script without ttl', async function () { + const client = await createLocalClient(); + + const script = `(call %init_peer_id% ("" "") [""])`; + + await expect(client.sendScript(script, undefined, 1)).rejects.toContain("Particle expired"); + }); + + it.skip('call broken script by fetch', async function () { + const client = await createLocalClient(); + + const script = `(htyth)`; + + await expect(client.fetch(script, ['result'])).rejects.toContain("aqua script can't be parsed"); + }); + it('check particle arguments', async function () { // arrange const serviceId = 'test_service'; diff --git a/src/internal/FluenceClientBase.ts b/src/internal/FluenceClientBase.ts index 804ae9c8..0602ebdb 100644 --- a/src/internal/FluenceClientBase.ts +++ b/src/internal/FluenceClientBase.ts @@ -89,7 +89,7 @@ export abstract class FluenceClientBase { async sendScript(script: string, data?: Map, ttl?: number): Promise { const particle = await build(this.selfPeerIdFull, script, data, ttl); - this.processor.executeLocalParticle(particle); + await this.processor.executeLocalParticle(particle); return particle.id; } } diff --git a/src/internal/FluenceClientImpl.ts b/src/internal/FluenceClientImpl.ts index eb28cc5c..cbcd5d7f 100644 --- a/src/internal/FluenceClientImpl.ts +++ b/src/internal/FluenceClientImpl.ts @@ -73,10 +73,11 @@ export class FluenceClientImpl extends FluenceClientBase implements FluenceClien script = wrapFetchCall(script, callBackId, resultArgNames); const particle = await build(this.selfPeerIdFull, script, data, ttl, callBackId); - return new Promise((resolve, reject) => { + const prFetch = new Promise(async (resolve, reject) => { this.fetchParticles.set(callBackId, { resolve, reject }); - this.processor.executeLocalParticle(particle); }); + const prExec = this.processor.executeLocalParticle(particle); + return prExec.then(() => prFetch); } // TODO:: better naming probably? diff --git a/src/internal/ParticleProcessor.ts b/src/internal/ParticleProcessor.ts index b485b474..7e909ca3 100644 --- a/src/internal/ParticleProcessor.ts +++ b/src/internal/ParticleProcessor.ts @@ -79,8 +79,15 @@ export class ParticleProcessor { async executeLocalParticle(particle: ParticleDto) { this.strategy?.onLocalParticleRecieved(particle); - await this.handleParticle(particle).catch((err) => { - log.error('particle processing failed: ' + err); + return new Promise((resolve, reject) => { + const resolveCallback = function () { + resolve() + } + const rejectCallback = function (err: any) { + reject(err) + } + // we check by callbacks that the script passed through the interpreter without errors + this.handleParticle(particle, resolveCallback, rejectCallback) }); } @@ -143,8 +150,10 @@ export class ParticleProcessor { /** * Pass a particle to a interpreter and send a result to other services. + * `resolve` will be completed if ret_code equals 0 + * `reject` will be completed if ret_code not equals 0 */ - private async handleParticle(particle: ParticleDto): Promise { + private async handleParticle(particle: ParticleDto, resolve?: () => void, reject?: (r: any) => any): Promise { // if a current particle is processing, add new particle to the queue if (this.getCurrentParticleId() !== undefined && this.getCurrentParticleId() !== particle.id) { this.enqueueParticle(particle); @@ -160,6 +169,7 @@ export class ParticleProcessor { let actualTtl = particle.timestamp + particle.ttl - now; if (actualTtl <= 0) { this.strategy?.onParticleTimeout(particle, now); + if (reject) reject(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`) } else { // if there is no subscription yet, previous data is empty let prevData: Uint8Array = Buffer.from([]); @@ -191,6 +201,26 @@ export class ParticleProcessor { if (stepperOutcome.next_peer_pks.length > 0) { this.strategy.sendParticleFurther(newParticle); } + + if (stepperOutcome.ret_code == 0) { + if (resolve) { + resolve() + } + } else { + const error = stepperOutcome.error_message; + if (reject) { + reject(error); + } else { + log.error("Unhandled error: ", error); + } + } + } + } catch (e) { + if (reject) { + reject(e); + } else { + log.error("Unhandled error: ", e) + throw e; } } finally { // get last particle from the queue diff --git a/src/internal/commonTypes.ts b/src/internal/commonTypes.ts index 71a0c566..068a581b 100644 --- a/src/internal/commonTypes.ts +++ b/src/internal/commonTypes.ts @@ -30,6 +30,7 @@ export interface StepperOutcome { ret_code: number; data: Uint8Array; next_peer_pks: string[]; + error_message: string; } export interface ResolvedTriplet {