Extend error handling in FluencePeer (#98)

This commit is contained in:
Pavel
2021-11-09 14:37:44 +03:00
committed by GitHub
parent 337a3f45de
commit aa21abe465
12 changed files with 214 additions and 103 deletions

View File

@ -27,13 +27,14 @@ import { CallServiceData, CallServiceResult, GenericCallServiceHandler, ResultCo
import { CallServiceHandler as LegacyCallServiceHandler } from './compilerSupport/LegacyCallServiceHandler';
import { PeerIdB58 } from './commonTypes';
import { FluenceConnection } from './FluenceConnection';
import { Particle } from './Particle';
import { Particle, ParticleExecutionStage, ParticleQueueItem } from './Particle';
import { KeyPair } from './KeyPair';
import { createInterpreter, dataToString } from './utils';
import { filter, pipe, Subject, tap } from 'rxjs';
import { RequestFlow } from './compilerSupport/v1';
import log from 'loglevel';
import { defaultServices } from './defaultServices';
import { instanceOf } from 'ts-pattern';
/**
* Node of the Fluence network specified as a pair of node's multiaddr and it's peer id
@ -202,7 +203,7 @@ export class FluencePeer {
peerId: this._keyPair.Libp2pPeerId,
relayAddress: connectToMultiAddr,
dialTimeoutMs: config.dialTimeoutMs,
onIncomingParticle: (p) => this._incomingParticles.next(p),
onIncomingParticle: (p) => this._incomingParticles.next({ particle: p, onStageChange: () => {} }),
});
await this._connect();
@ -226,7 +227,6 @@ export class FluencePeer {
this._particleSpecificHandlers.clear();
this._commonHandlers.clear();
this._timeoutHandlers.clear();
}
// internal api
@ -240,7 +240,11 @@ export class FluencePeer {
* Initiates a new particle execution starting from local peer
* @param particle - particle to start execution of
*/
initiateParticle: (particle: Particle): void => {
initiateParticle: (particle: Particle, onStageChange: (stage: ParticleExecutionStage) => void): void => {
if (!this.getStatus().isInitialized) {
throw 'Cannot initiate new particle: peer is not initialized';
}
if (particle.initPeerId === undefined) {
particle.initPeerId = this.getStatus().peerId;
}
@ -249,8 +253,12 @@ export class FluencePeer {
particle.ttl = this._defaultTTL;
}
this._incomingParticles.next(particle);
this._incomingParticles.next({
particle: particle,
onStageChange: onStageChange,
});
},
/**
* Register Call Service handler functions
*/
@ -283,12 +291,6 @@ export class FluencePeer {
psh.set(serviceFnKey(serviceId, fnName), handler);
},
/**
* Register handler which will be called upon particle timeout
*/
timeout: (particleId: string, handler: () => void) => {
this._timeoutHandlers.set(particleId, handler);
},
},
/**
@ -303,7 +305,15 @@ export class FluencePeer {
timeout: request.timeout,
});
this.internals.initiateParticle(particle);
this.internals.initiateParticle(particle, (stage) => {
if (stage.stage === 'interpreterError') {
request?.error(stage.errorMessage);
}
if (stage.stage === 'expired') {
request?.timeout();
}
});
},
/**
@ -334,14 +344,13 @@ export class FluencePeer {
// Queues for incoming and outgoing particles
private _incomingParticles = new Subject<Particle>();
private _outgoingParticles = new Subject<Particle>();
private _incomingParticles = new Subject<ParticleQueueItem>();
private _outgoingParticles = new Subject<ParticleQueueItem>();
// Call service handler
private _particleSpecificHandlers = new Map<string, Map<string, GenericCallServiceHandler>>();
private _commonHandlers = new Map<string, GenericCallServiceHandler>();
private _timeoutHandlers = new Map<string, () => void>();
// Internal peer state
@ -351,15 +360,18 @@ export class FluencePeer {
private _connection: FluenceConnection;
private _interpreter: AirInterpreter;
private _timeouts: Array<NodeJS.Timeout> = [];
private _particleQueues = new Map<string, Subject<Particle>>();
private _particleQueues = new Map<string, Subject<ParticleQueueItem>>();
private _startParticleProcessing() {
this._incomingParticles
.pipe(
tap((x) => x.logTo('debug', 'particle received:')),
tap((x) => {
x.particle.logTo('debug', 'particle received:');
}),
filterExpiredParticles(this._expireParticle.bind(this)),
)
.subscribe((p) => {
.subscribe((item) => {
const p = item.particle;
let particlesQueue = this._particleQueues.get(p.id);
if (!particlesQueue) {
@ -367,34 +379,35 @@ export class FluencePeer {
this._particleQueues.set(p.id, particlesQueue);
const timeout = setTimeout(() => {
this._expireParticle(p.id);
this._expireParticle(item);
}, p.actualTtl());
this._timeouts.push(timeout);
}
particlesQueue.next(p);
particlesQueue.next(item);
});
this._outgoingParticles.subscribe((p) => {
this._connection.sendParticle(p);
this._outgoingParticles.subscribe(async (item) => {
await this._connection.sendParticle(item.particle);
item.onStageChange({ stage: 'sent' });
});
}
private _expireParticle(particleId: string) {
log.debug(`particle ${particleId} has expired. Deleting particle-related queues and handlers`);
private _expireParticle(item: ParticleQueueItem) {
const particleId = item.particle.id;
log.debug(
`particle ${particleId} has expired after ${item.particle.ttl}. Deleting particle-related queues and handlers`,
);
this._particleQueues.delete(particleId);
const timeoutHandler = this._timeoutHandlers.get(particleId);
if (timeoutHandler) {
timeoutHandler();
}
this._particleSpecificHandlers.delete(particleId);
this._timeoutHandlers.delete(particleId);
item.onStageChange({ stage: 'expired' });
}
private _createParticlesProcessingQueue() {
let particlesQueue = new Subject<Particle>();
let particlesQueue = new Subject<ParticleQueueItem>();
let prevData: Uint8Array = Buffer.from([]);
particlesQueue
@ -402,28 +415,42 @@ export class FluencePeer {
// force new line
filterExpiredParticles(this._expireParticle.bind(this)),
)
.subscribe((x) => {
const result = runInterpreter(this.getStatus().peerId, this._interpreter, x, prevData);
.subscribe((item) => {
const particle = item.particle;
const result = runInterpreter(this.getStatus().peerId, this._interpreter, particle, prevData);
prevData = Buffer.from(result.data);
// Do not continue if there was an error in particle interpretation
if (isInterpretationSuccessful(result)) {
item.onStageChange({ stage: 'interpreterError', errorMessage: result.errorMessage });
return;
}
setTimeout(() => {
item.onStageChange({ stage: 'interpreted' });
}, 0);
const newData = Buffer.from(result.data);
prevData = newData;
// send particle further if requested
if (result.nextPeerPks.length > 0) {
const newParticle = x.clone();
newParticle.data = prevData;
this._outgoingParticles.next(newParticle);
const newParticle = particle.clone();
newParticle.data = newData;
this._outgoingParticles.next({ ...item, particle: newParticle });
}
// execute call requests if needed
// and put particle with the results back to queue
if (result.callRequests.length > 0) {
this._execCallRequests(x, result.callRequests).then((callResults) => {
const newParticle = x.clone();
this._execCallRequests(particle, result.callRequests).then((callResults) => {
const newParticle = particle.clone();
newParticle.callResults = callResults;
newParticle.data = Buffer.from([]);
particlesQueue.next(newParticle);
particlesQueue.next({ ...item, particle: newParticle });
});
} else {
item.onStageChange({ stage: 'localWorkDone' });
}
});
@ -547,6 +574,10 @@ export class FluencePeer {
private _legacyCallServiceHandler: LegacyCallServiceHandler;
}
function isInterpretationSuccessful(result: InterpreterResult) {
return result.retCode !== 0 || result?.errorMessage?.length > 0;
}
function serviceFnKey(serviceId: string, fnName: string) {
return `${serviceId}/${fnName}`;
}
@ -582,17 +613,22 @@ function runInterpreter(
const toLog: any = { ...interpreterResult };
toLog.data = dataToString(toLog.data);
log.debug('Interpreter result: ', toLog);
if (isInterpretationSuccessful(interpreterResult)) {
log.debug('Interpreter result: ', toLog);
} else {
log.error('Interpreter failed: ', toLog);
}
return interpreterResult;
}
function filterExpiredParticles(onParticleExpiration: (particleId: string) => void) {
function filterExpiredParticles(onParticleExpiration: (item: ParticleQueueItem) => void) {
return pipe(
tap((p: Particle) => {
if (p.hasExpired()) {
onParticleExpiration(p.id);
tap((item: ParticleQueueItem) => {
if (item.particle.hasExpired()) {
onParticleExpiration(item);
}
}),
filter((x: Particle) => !x.hasExpired()),
filter((x: ParticleQueueItem) => !x.particle.hasExpired()),
);
}