Stepper integration impl (#955)

This commit is contained in:
Dima
2020-10-12 14:07:28 +03:00
committed by GitHub
parent 0a49f84d08
commit 2792dcfc93
21 changed files with 569 additions and 136 deletions

View File

@ -16,13 +16,14 @@
import {Particle} from "./particle";
import {StepperOutcome} from "./stepperOutcome";
import * as PeerId from "peer-id";
import Multiaddr from "multiaddr"
import {FluenceConnection} from "./fluenceConnection";
import {Subscriptions} from "./subscriptions";
import * as stepper from "../stepper";
const WASM = stepper.loadWasm();
import {addParticle, getCurrentParticleId, popParticle, setCurrentParticleId} from "./globalState";
import {instantiateStepper, Stepper} from "./stepper";
import log from "loglevel";
export class FluenceClient {
readonly selfPeerId: PeerId;
@ -30,6 +31,7 @@ export class FluenceClient {
private nodePeerIdStr: string;
private subscriptions = new Subscriptions();
private stepper: Stepper = undefined;
connection: FluenceConnection;
@ -39,38 +41,63 @@ export class FluenceClient {
}
/**
* Waits a response that match the predicate.
*
* @param id
* @param ttl
* Pass a particle to a stepper and send a result to other services.
*/
waitResponse(id: string, ttl: number): Promise<Particle> {
return new Promise((resolve, reject) => {
// subscribe for responses, to handle response
// TODO if there's no conn, reject
this.subscriptions.subscribe(id, (particle: Particle) => {
resolve(particle);
}, ttl);
})
private handleParticle(particle: Particle): void {
// if a current particle is processing, add new particle to the queue
if (getCurrentParticleId() !== undefined) {
addParticle(particle);
} else {
if (this.stepper === undefined) {
throw new Error("Undefined. Stepper is not initialized. User 'Fluence.connect' to create a client.")
}
// start particle processing if queue is empty
try {
let stepperOutcomeStr = this.stepper(particle.init_peer_id, particle.script, JSON.stringify(particle.data))
let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr);
log.info("inner stepper outcome:");
log.info(stepperOutcome);
// do nothing if there is no `next_peer_pks`
if (stepperOutcome.next_peer_pks.length > 0) {
let newParticle: Particle = {...particle};
newParticle.data = JSON.parse(stepperOutcome.data);
this.connection.sendParticle(newParticle).catch((reason) => {
console.error(`Error on sending particle with id ${particle.id}: ${reason}`)
});
}
} finally {
// get last particle from the queue
let nextParticle = popParticle();
// start the processing of a new particle if it exists
if (nextParticle) {
// update current particle
setCurrentParticleId(nextParticle.id);
this.handleParticle(nextParticle)
} else {
// wait for a new call (do nothing) if there is no new particle in a queue
setCurrentParticleId(undefined);
}
}
}
}
/**
* Handle incoming call.
* If FunctionCall returns - we should send it as a response.
* Handle incoming particle from a relay.
*/
handleParticle(): (particle: Particle) => void {
private handleExternalParticle(): (particle: Particle) => void {
let _this = this;
return (particle: Particle) => {
// call all subscriptions for a new call
if (!_this.subscriptions.applyToSubscriptions(particle)) {
// if there is no subscription, use Stepper
WASM.then((w) => {
let stepperOutcomeStr = w.invoke(particle.init_peer_id, particle.script, JSON.stringify(particle.data))
let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr);
console.log(stepperOutcome)
})
let now = Date.now();
if (particle.timestamp + particle.ttl > now) {
_this.handleParticle(particle);
} else {
console.log(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`)
}
}
}
@ -102,15 +129,19 @@ export class FluenceClient {
}
let peerId = PeerId.createFromB58String(nodePeerId);
let connection = new FluenceConnection(multiaddr, peerId, this.selfPeerId, this.handleParticle());
this.stepper = await instantiateStepper(this.selfPeerId);
let connection = new FluenceConnection(multiaddr, peerId, this.selfPeerId, this.handleExternalParticle());
await connection.connect();
this.connection = connection;
}
async sendParticle(particle: Particle): Promise<Particle> {
await this.connection.sendParticle(particle);
return this.waitResponse(particle.id, particle.ttl);
sendParticle(particle: Particle): string {
this.handleParticle(particle);
this.subscriptions.subscribe(particle.id, particle.ttl);
return particle.id
}
}