handle errors on local particle call (#25)

This commit is contained in:
Dima
2021-02-24 14:43:21 +03:00
committed by GitHub
parent c10cd1942a
commit 3eacf708e6
6 changed files with 65 additions and 9 deletions

View File

@ -71,7 +71,7 @@ describe('Builtins usage suite', () => {
let base64 = 'MjNy';
await uploadModule(client, 'test_broken_module', base64, config);
await uploadModule(client, 'test_broken_module', base64, config, 10000);
});
it('add_blueprint', async function () {
@ -101,9 +101,9 @@ describe('Builtins usage suite', () => {
let buf = Buffer.from(key);
let r = Math.random().toString(36).substring(7);
await addProvider(client, buf, dev2peerId, r);
await addProvider(client, buf, dev2peerId, r, undefined, 10000);
let pr = await getProviders(client, buf);
let pr = await getProviders(client, buf, undefined, 10000);
console.log(pr);
console.log(r);
expect(r).toEqual(pr[0][0].service_id);

View File

@ -44,6 +44,30 @@ describe('== AIR suite', () => {
expect(res).toEqual(arg);
});
it('call broken script', async function () {
const client = await createLocalClient();
const script = `(htyth)`;
await expect(client.sendScript(script)).rejects.toContain("aqua script can't be parsed");
});
it('call script without ttl', async function () {
const client = await createLocalClient();
const script = `(call %init_peer_id% ("" "") [""])`;
await expect(client.sendScript(script, undefined, 1)).rejects.toContain("Particle expired");
});
it.skip('call broken script by fetch', async function () {
const client = await createLocalClient();
const script = `(htyth)`;
await expect(client.fetch(script, ['result'])).rejects.toContain("aqua script can't be parsed");
});
it('check particle arguments', async function () {
// arrange
const serviceId = 'test_service';

View File

@ -89,7 +89,7 @@ export abstract class FluenceClientBase {
async sendScript(script: string, data?: Map<string, any>, ttl?: number): Promise<string> {
const particle = await build(this.selfPeerIdFull, script, data, ttl);
this.processor.executeLocalParticle(particle);
await this.processor.executeLocalParticle(particle);
return particle.id;
}
}

View File

@ -73,10 +73,11 @@ export class FluenceClientImpl extends FluenceClientBase implements FluenceClien
script = wrapFetchCall(script, callBackId, resultArgNames);
const particle = await build(this.selfPeerIdFull, script, data, ttl, callBackId);
return new Promise<T>((resolve, reject) => {
const prFetch = new Promise<T>(async (resolve, reject) => {
this.fetchParticles.set(callBackId, { resolve, reject });
this.processor.executeLocalParticle(particle);
});
const prExec = this.processor.executeLocalParticle(particle);
return prExec.then(() => prFetch);
}
// TODO:: better naming probably?

View File

@ -79,8 +79,15 @@ export class ParticleProcessor {
async executeLocalParticle(particle: ParticleDto) {
this.strategy?.onLocalParticleRecieved(particle);
await this.handleParticle(particle).catch((err) => {
log.error('particle processing failed: ' + err);
return new Promise((resolve, reject) => {
const resolveCallback = function () {
resolve()
}
const rejectCallback = function (err: any) {
reject(err)
}
// we check by callbacks that the script passed through the interpreter without errors
this.handleParticle(particle, resolveCallback, rejectCallback)
});
}
@ -143,8 +150,10 @@ export class ParticleProcessor {
/**
* Pass a particle to a interpreter and send a result to other services.
* `resolve` will be completed if ret_code equals 0
* `reject` will be completed if ret_code not equals 0
*/
private async handleParticle(particle: ParticleDto): Promise<void> {
private async handleParticle(particle: ParticleDto, resolve?: () => void, reject?: (r: any) => any): Promise<void> {
// if a current particle is processing, add new particle to the queue
if (this.getCurrentParticleId() !== undefined && this.getCurrentParticleId() !== particle.id) {
this.enqueueParticle(particle);
@ -160,6 +169,7 @@ export class ParticleProcessor {
let actualTtl = particle.timestamp + particle.ttl - now;
if (actualTtl <= 0) {
this.strategy?.onParticleTimeout(particle, now);
if (reject) reject(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`)
} else {
// if there is no subscription yet, previous data is empty
let prevData: Uint8Array = Buffer.from([]);
@ -191,6 +201,26 @@ export class ParticleProcessor {
if (stepperOutcome.next_peer_pks.length > 0) {
this.strategy.sendParticleFurther(newParticle);
}
if (stepperOutcome.ret_code == 0) {
if (resolve) {
resolve()
}
} else {
const error = stepperOutcome.error_message;
if (reject) {
reject(error);
} else {
log.error("Unhandled error: ", error);
}
}
}
} catch (e) {
if (reject) {
reject(e);
} else {
log.error("Unhandled error: ", e)
throw e;
}
} finally {
// get last particle from the queue

View File

@ -30,6 +30,7 @@ export interface StepperOutcome {
ret_code: number;
data: Uint8Array;
next_peer_pks: string[];
error_message: string;
}
export interface ResolvedTriplet {