mirror of
https://github.com/fluencelabs/fluence-js.git
synced 2025-07-31 23:11:56 +00:00
fix: Fire and forget [fixes DXJ-446] (#336)
* Add test for particle and try to fix it * Fix build * Fix tests * Fix stop order * FluencePeer refactoring * mplex to yamux * Small fixes * Refactor connections * Update packages/core/js-client/src/jsPeer/FluencePeer.ts Co-authored-by: shamsartem <shamsartem@gmail.com> * Remove redundant checks * Update packages/core/js-client/src/jsPeer/FluencePeer.ts Co-authored-by: shamsartem <shamsartem@gmail.com> * Suppress very long output of raw data * Test for parallel execution * Fix test * Misc optimization * Add minRepr * Fix reset error * Latest default nox image --------- Co-authored-by: shamsartem <shamsartem@gmail.com>
This commit is contained in:
@@ -27,10 +27,10 @@
|
||||
"license": "Apache-2.0",
|
||||
"dependencies": {
|
||||
"@chainsafe/libp2p-noise": "13.0.0",
|
||||
"@chainsafe/libp2p-yamux": "5.0.0",
|
||||
"@fluencelabs/interfaces": "workspace:*",
|
||||
"@libp2p/crypto": "2.0.3",
|
||||
"@libp2p/interface": "0.1.2",
|
||||
"@libp2p/mplex": "9.0.4",
|
||||
"@libp2p/peer-id": "3.0.2",
|
||||
"@libp2p/peer-id-factory": "3.0.3",
|
||||
"@libp2p/websockets": "7.0.4",
|
||||
|
@@ -55,20 +55,13 @@ export const makeClientPeerConfig = async (
|
||||
};
|
||||
|
||||
export class ClientPeer extends FluencePeer implements IFluenceClient {
|
||||
private relayPeerId: PeerIdB58;
|
||||
private relayConnection: RelayConnection;
|
||||
|
||||
constructor(
|
||||
peerConfig: PeerConfig,
|
||||
relayConfig: RelayConnectionConfig,
|
||||
keyPair: KeyPair,
|
||||
marine: IMarineHost,
|
||||
) {
|
||||
const relayConnection = new RelayConnection(relayConfig);
|
||||
|
||||
super(peerConfig, keyPair, marine, new JsServiceHost(), relayConnection);
|
||||
this.relayPeerId = relayConnection.getRelayPeerId();
|
||||
this.relayConnection = relayConnection;
|
||||
super(peerConfig, keyPair, marine, new JsServiceHost(), new RelayConnection(relayConfig));
|
||||
}
|
||||
|
||||
getPeerId(): string {
|
||||
@@ -83,7 +76,7 @@ export class ClientPeer extends FluencePeer implements IFluenceClient {
|
||||
connectionStateChangeHandler: (state: ConnectionState) => void = () => {};
|
||||
|
||||
getRelayPeerId(): string {
|
||||
return this.relayPeerId;
|
||||
return this.internals.getRelayPeerId();
|
||||
}
|
||||
|
||||
onConnectionStateChange(handler: (state: ConnectionState) => void): ConnectionState {
|
||||
@@ -115,7 +108,6 @@ export class ClientPeer extends FluencePeer implements IFluenceClient {
|
||||
log.trace('connecting to Fluence network');
|
||||
this.changeConnectionState('connecting');
|
||||
await super.start();
|
||||
await this.relayConnection.start();
|
||||
// TODO: check connection (`checkConnection` function) here
|
||||
this.changeConnectionState('connected');
|
||||
log.trace('connected');
|
||||
@@ -124,7 +116,6 @@ export class ClientPeer extends FluencePeer implements IFluenceClient {
|
||||
async stop(): Promise<void> {
|
||||
log.trace('disconnecting from Fluence network');
|
||||
this.changeConnectionState('disconnecting');
|
||||
await this.relayConnection.stop();
|
||||
await super.stop();
|
||||
this.changeConnectionState('disconnected');
|
||||
log.trace('disconnected');
|
||||
|
@@ -20,7 +20,7 @@ import type { PeerId } from '@libp2p/interface/peer-id';
|
||||
import { createLibp2p, Libp2p } from 'libp2p';
|
||||
|
||||
import { noise } from '@chainsafe/libp2p-noise';
|
||||
import { mplex } from '@libp2p/mplex';
|
||||
import { yamux } from '@chainsafe/libp2p-yamux';
|
||||
import { webSockets } from '@libp2p/websockets';
|
||||
import { all } from '@libp2p/websockets/filters';
|
||||
import { multiaddr } from '@multiformats/multiaddr';
|
||||
@@ -36,7 +36,8 @@ import { throwIfHasNoPeerId } from '../util/libp2pUtils.js';
|
||||
import { IConnection } from './interfaces.js';
|
||||
import { IParticle } from '../particle/interfaces.js';
|
||||
import { Particle, serializeToString } from '../particle/Particle.js';
|
||||
import { IStartable } from '../util/commonTypes.js';
|
||||
import { identifyService } from 'libp2p/identify';
|
||||
import { pingService } from 'libp2p/ping';
|
||||
|
||||
const log = logger('connection');
|
||||
|
||||
@@ -77,7 +78,7 @@ export interface RelayConnectionConfig {
|
||||
/**
|
||||
* Implementation for JS peers which connects to Fluence through relay node
|
||||
*/
|
||||
export class RelayConnection implements IStartable, IConnection {
|
||||
export class RelayConnection implements IConnection {
|
||||
private relayAddress: Multiaddr;
|
||||
private lib2p2Peer: Libp2p | null = null;
|
||||
|
||||
@@ -110,7 +111,7 @@ export class RelayConnection implements IStartable, IConnection {
|
||||
filter: all,
|
||||
}),
|
||||
],
|
||||
streamMuxers: [mplex()],
|
||||
streamMuxers: [yamux()],
|
||||
connectionEncryption: [noise()],
|
||||
connectionManager: {
|
||||
dialTimeout: this.config.dialTimeoutMs,
|
||||
@@ -118,6 +119,12 @@ export class RelayConnection implements IStartable, IConnection {
|
||||
connectionGater: {
|
||||
// By default, this function forbids connections to private peers. For example multiaddr with ip 127.0.0.1 isn't allowed
|
||||
denyDialMultiaddr: () => Promise.resolve(false)
|
||||
},
|
||||
services: {
|
||||
identify: identifyService({
|
||||
runOnConnectionOpen: false,
|
||||
}),
|
||||
ping: pingService()
|
||||
}
|
||||
});
|
||||
|
||||
@@ -158,15 +165,17 @@ export class RelayConnection implements IStartable, IConnection {
|
||||
const sink = this._connection.streams[0].sink;
|
||||
*/
|
||||
|
||||
log.trace('sending particle...');
|
||||
const stream = await this.lib2p2Peer.dialProtocol(this.relayAddress, PROTOCOL_NAME);
|
||||
log.trace('created stream with id ', stream.id);
|
||||
const sink = stream.sink;
|
||||
|
||||
pipe(
|
||||
await pipe(
|
||||
[fromString(serializeToString(particle))],
|
||||
// @ts-ignore
|
||||
encode(),
|
||||
sink,
|
||||
);
|
||||
log.trace('data written to sink');
|
||||
}
|
||||
|
||||
private async connect() {
|
||||
@@ -174,7 +183,7 @@ export class RelayConnection implements IStartable, IConnection {
|
||||
throw new Error('Relay connection is not started');
|
||||
}
|
||||
|
||||
this.lib2p2Peer.handle(
|
||||
await this.lib2p2Peer.handle(
|
||||
[PROTOCOL_NAME],
|
||||
async ({ connection, stream }) => {
|
||||
pipe(
|
||||
@@ -188,6 +197,7 @@ export class RelayConnection implements IStartable, IConnection {
|
||||
for await (const msg of source) {
|
||||
try {
|
||||
const particle = Particle.fromString(msg);
|
||||
log.trace('got particle from stream with id %s and particle id %s', stream.id, particle.id);
|
||||
this.particleSource.next(particle);
|
||||
} catch (e) {
|
||||
log.error('error on handling a new incoming message: %j', e);
|
||||
|
@@ -16,11 +16,12 @@
|
||||
import type { PeerIdB58 } from '@fluencelabs/interfaces';
|
||||
import type { Subscribable } from 'rxjs';
|
||||
import { IParticle } from '../particle/interfaces.js';
|
||||
import { IStartable } from '../util/commonTypes.js';
|
||||
|
||||
/**
|
||||
* Interface for connection used in Fluence Peer.
|
||||
*/
|
||||
export interface IConnection {
|
||||
export interface IConnection extends IStartable {
|
||||
/**
|
||||
* Observable that emits particles received from the connection.
|
||||
*/
|
||||
|
@@ -129,13 +129,21 @@ export interface IEphemeralConnection extends IConnection {
|
||||
receiveParticle(particle: Particle): void;
|
||||
}
|
||||
|
||||
export class EphemeralConnection implements IConnection, IEphemeralConnection {
|
||||
export class EphemeralConnection implements IEphemeralConnection {
|
||||
readonly selfPeerId: PeerIdB58;
|
||||
readonly connections: Map<PeerIdB58, IEphemeralConnection> = new Map();
|
||||
|
||||
constructor(selfPeerId: PeerIdB58) {
|
||||
this.selfPeerId = selfPeerId;
|
||||
}
|
||||
|
||||
start(): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
stop(): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
connectToOther(other: IEphemeralConnection) {
|
||||
if (other.selfPeerId === this.selfPeerId) {
|
||||
|
@@ -25,9 +25,23 @@ import {
|
||||
ParticleExecutionStage,
|
||||
ParticleQueueItem,
|
||||
} from '../particle/Particle.js';
|
||||
import { defaultCallParameters } from "@fluencelabs/marine-js/dist/types"
|
||||
import { defaultCallParameters } from '@fluencelabs/marine-js/dist/types'
|
||||
import { jsonify, isString } from '../util/utils.js';
|
||||
import { concatMap, filter, pipe, Subject, tap, Unsubscribable } from 'rxjs';
|
||||
import {
|
||||
concatAll,
|
||||
concatMap,
|
||||
filter,
|
||||
from,
|
||||
groupBy,
|
||||
lastValueFrom,
|
||||
mergeAll,
|
||||
mergeMap,
|
||||
Observable,
|
||||
pipe,
|
||||
Subject,
|
||||
tap,
|
||||
Unsubscribable
|
||||
} from 'rxjs';
|
||||
import { defaultSigGuard, Sig } from '../services/Sig.js';
|
||||
import { registerSig } from '../services/_aqua/services.js';
|
||||
import { registerSrv } from '../services/_aqua/single-module-srv.js';
|
||||
@@ -105,6 +119,7 @@ export abstract class FluencePeer {
|
||||
|
||||
this._startParticleProcessing();
|
||||
this.isInitialized = true;
|
||||
await this.connection.start();
|
||||
log_peer.trace('started Fluence peer');
|
||||
}
|
||||
|
||||
@@ -114,10 +129,17 @@ export abstract class FluencePeer {
|
||||
*/
|
||||
async stop() {
|
||||
log_peer.trace('stopping Fluence peer');
|
||||
|
||||
this._particleSourceSubscription?.unsubscribe();
|
||||
|
||||
log_peer.trace('Waiting for all particles to finish execution');
|
||||
this._incomingParticles.complete();
|
||||
await this._incomingParticlePromise;
|
||||
log_peer.trace('All particles finished execution');
|
||||
|
||||
this._stopParticleProcessing();
|
||||
await this.marineHost.stop();
|
||||
|
||||
await this.connection.stop();
|
||||
this.isInitialized = false;
|
||||
log_peer.trace('stopped Fluence peer');
|
||||
}
|
||||
@@ -202,6 +224,7 @@ export abstract class FluencePeer {
|
||||
/**
|
||||
* Initiates a new particle execution starting from local peer
|
||||
* @param particle - particle to start execution of
|
||||
* @param onStageChange - callback for reacting on particle state changes
|
||||
*/
|
||||
initiateParticle: (particle: IParticle, onStageChange: (stage: ParticleExecutionStage) => void): void => {
|
||||
if (!this.isInitialized) {
|
||||
@@ -238,10 +261,9 @@ export abstract class FluencePeer {
|
||||
// Queues for incoming and outgoing particles
|
||||
|
||||
private _incomingParticles = new Subject<ParticleQueueItem>();
|
||||
private _outgoingParticles = new Subject<ParticleQueueItem & { nextPeerIds: PeerIdB58[] }>();
|
||||
private _timeouts: Array<NodeJS.Timeout> = [];
|
||||
private _particleSourceSubscription?: Unsubscribable;
|
||||
private _particleQueues = new Map<string, Subject<ParticleQueueItem>>();
|
||||
private _incomingParticlePromise?: Promise<void>;
|
||||
|
||||
// Internal peer state
|
||||
|
||||
@@ -280,7 +302,7 @@ export abstract class FluencePeer {
|
||||
},
|
||||
});
|
||||
|
||||
this._incomingParticles
|
||||
this._incomingParticlePromise = lastValueFrom(this._incomingParticles
|
||||
.pipe(
|
||||
tap((item) => {
|
||||
log_particle.debug('id %s. received:', item.particle.id);
|
||||
@@ -295,47 +317,188 @@ export abstract class FluencePeer {
|
||||
log_particle.trace('id %s. call results: %j', item.particle.id, item.callResults);
|
||||
}),
|
||||
filterExpiredParticles(this._expireParticle.bind(this)),
|
||||
)
|
||||
.subscribe((item) => {
|
||||
const p = item.particle;
|
||||
let particlesQueue = this._particleQueues.get(p.id);
|
||||
groupBy(item => item.particle.id),
|
||||
mergeMap(group$ => {
|
||||
let prevData: Uint8Array = Buffer.from([]);
|
||||
let firstRun = true;
|
||||
|
||||
return group$.pipe(
|
||||
concatMap(async (item) => {
|
||||
if (firstRun) {
|
||||
const timeout = setTimeout(() => {
|
||||
this._expireParticle(item);
|
||||
}, getActualTTL(item.particle));
|
||||
|
||||
if (!particlesQueue) {
|
||||
particlesQueue = this._createParticlesProcessingQueue();
|
||||
this._particleQueues.set(p.id, particlesQueue);
|
||||
this._timeouts.push(timeout);
|
||||
firstRun = false;
|
||||
}
|
||||
|
||||
if (!this.isInitialized || this.marineHost === undefined) {
|
||||
// If `.stop()` was called return null to stop particle processing immediately
|
||||
return null;
|
||||
}
|
||||
|
||||
const timeout = setTimeout(() => {
|
||||
this._expireParticle(item);
|
||||
}, getActualTTL(p));
|
||||
// IMPORTANT!
|
||||
// AVM runner execution and prevData <-> newData swapping
|
||||
// MUST happen sequentially (in a critical section).
|
||||
// Otherwise the race might occur corrupting the prevData
|
||||
|
||||
this._timeouts.push(timeout);
|
||||
}
|
||||
log_particle.debug('id %s. sending particle to interpreter', item.particle.id);
|
||||
log_particle.trace('id %s. prevData: %s', item.particle.id, this.decodeAvmData(prevData).slice(0, 50));
|
||||
|
||||
particlesQueue.next(item);
|
||||
});
|
||||
const args = serializeAvmArgs(
|
||||
{
|
||||
initPeerId: item.particle.initPeerId,
|
||||
currentPeerId: this.keyPair.getPeerId(),
|
||||
timestamp: item.particle.timestamp,
|
||||
ttl: item.particle.ttl,
|
||||
keyFormat: KeyPairFormat.Ed25519,
|
||||
particleId: item.particle.id,
|
||||
secretKeyBytes: this.keyPair.toEd25519PrivateKey(),
|
||||
},
|
||||
item.particle.script,
|
||||
prevData,
|
||||
item.particle.data,
|
||||
item.callResults,
|
||||
);
|
||||
|
||||
this._outgoingParticles.subscribe((item) => {
|
||||
// Do not send particle after the peer has been stopped
|
||||
if (!this.isInitialized) {
|
||||
return;
|
||||
}
|
||||
let avmCallResult: InterpreterResult | Error;
|
||||
try {
|
||||
const res = await this.marineHost.callService('avm', 'invoke', args, defaultCallParameters);
|
||||
avmCallResult = deserializeAvmResult(res);
|
||||
} catch (e) {
|
||||
avmCallResult = e instanceof Error ? e : new Error(String(e));
|
||||
}
|
||||
|
||||
log_particle.debug(
|
||||
'id %s. sending particle into network. Next peer ids: %s',
|
||||
item.particle.id,
|
||||
item.nextPeerIds.toString(),
|
||||
);
|
||||
if (!(avmCallResult instanceof Error) && avmCallResult.retCode === 0) {
|
||||
const newData = Buffer.from(avmCallResult.data);
|
||||
prevData = newData;
|
||||
}
|
||||
|
||||
this.connection
|
||||
?.sendParticle(item.nextPeerIds, item.particle)
|
||||
.then(() => {
|
||||
item.onStageChange({ stage: 'sent' });
|
||||
return {
|
||||
...item,
|
||||
result: avmCallResult,
|
||||
};
|
||||
}),
|
||||
filter((item): item is NonNullable<typeof item> => item !== null),
|
||||
filterExpiredParticles<ParticleQueueItem & {result: Error | InterpreterResult }>(this._expireParticle.bind(this)),
|
||||
mergeMap(async (item) => {
|
||||
// If peer was stopped, do not proceed further
|
||||
if (!this.isInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Do not continue if there was an error in particle interpretation
|
||||
if (item.result instanceof Error) {
|
||||
log_particle.error('id %s. interpreter failed: %s', item.particle.id, item.result.message);
|
||||
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.message });
|
||||
return;
|
||||
}
|
||||
|
||||
if (item.result.retCode !== 0) {
|
||||
log_particle.error(
|
||||
'id %s. interpreter failed: retCode: %d, message: %s',
|
||||
item.particle.id,
|
||||
item.result.retCode,
|
||||
item.result.errorMessage,
|
||||
);
|
||||
log_particle.trace('id %s. avm data: %s', item.particle.id, this.decodeAvmData(item.result.data));
|
||||
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.errorMessage });
|
||||
return;
|
||||
}
|
||||
|
||||
log_particle.trace(
|
||||
'id %s. interpreter result: retCode: %d, avm data: %s',
|
||||
item.particle.id,
|
||||
item.result.retCode,
|
||||
this.decodeAvmData(item.result.data)
|
||||
);
|
||||
|
||||
setTimeout(() => {
|
||||
item.onStageChange({ stage: 'interpreted' });
|
||||
}, 0);
|
||||
|
||||
let connectionPromise: Promise<void> = Promise.resolve();
|
||||
|
||||
// send particle further if requested
|
||||
if (item.result.nextPeerPks.length > 0) {
|
||||
const newParticle = cloneWithNewData(item.particle, Buffer.from(item.result.data));
|
||||
|
||||
// Do not send particle after the peer has been stopped
|
||||
if (!this.isInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
log_particle.debug(
|
||||
'id %s. sending particle into network. Next peer ids: %s',
|
||||
newParticle.id,
|
||||
item.result.nextPeerPks.toString(),
|
||||
);
|
||||
|
||||
connectionPromise = this.connection
|
||||
?.sendParticle(item.result.nextPeerPks, newParticle)
|
||||
.then(() => {
|
||||
log_particle.trace('id %s. send successful', newParticle.id);
|
||||
item.onStageChange({ stage: 'sent' });
|
||||
})
|
||||
.catch((e: any) => {
|
||||
log_particle.error('id %s. send failed %j', newParticle.id, e);
|
||||
item.onStageChange({ stage: 'sendingError', errorMessage: e.toString() });
|
||||
});
|
||||
}
|
||||
|
||||
// execute call requests if needed
|
||||
// and put particle with the results back to queue
|
||||
if (item.result.callRequests.length > 0) {
|
||||
for (const [key, cr] of item.result.callRequests) {
|
||||
const req = {
|
||||
fnName: cr.functionName,
|
||||
args: cr.arguments,
|
||||
serviceId: cr.serviceId,
|
||||
tetraplets: cr.tetraplets,
|
||||
particleContext: getParticleContext(item.particle),
|
||||
};
|
||||
|
||||
this._execSingleCallRequest(req)
|
||||
.catch((err): CallServiceResult => {
|
||||
if (err instanceof ServiceError) {
|
||||
return {
|
||||
retCode: ResultCodes.error,
|
||||
result: err.message,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
retCode: ResultCodes.error,
|
||||
result: `Service call failed. fnName="${req.fnName}" serviceId="${
|
||||
req.serviceId
|
||||
}" error: ${err.toString()}`,
|
||||
};
|
||||
})
|
||||
.then((res) => {
|
||||
const serviceResult = {
|
||||
result: jsonify(res.result),
|
||||
retCode: res.retCode,
|
||||
};
|
||||
|
||||
const newParticle = cloneWithNewData(item.particle, Buffer.from([]));
|
||||
this._incomingParticles.next({
|
||||
...item,
|
||||
particle: newParticle,
|
||||
callResults: [[key, serviceResult]],
|
||||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
item.onStageChange({ stage: 'localWorkDone' });
|
||||
}
|
||||
|
||||
return connectionPromise;
|
||||
}),
|
||||
|
||||
)
|
||||
})
|
||||
.catch((e: any) => {
|
||||
log_particle.error('id %s. send failed %j', item.particle.id, e);
|
||||
item.onStageChange({ stage: 'sendingError', errorMessage: e.toString() });
|
||||
});
|
||||
});
|
||||
), { defaultValue: undefined });
|
||||
}
|
||||
|
||||
private _expireParticle(item: ParticleQueueItem) {
|
||||
@@ -345,179 +508,16 @@ export abstract class FluencePeer {
|
||||
item.particle.id,
|
||||
item.particle.ttl,
|
||||
);
|
||||
|
||||
this._particleQueues.delete(particleId);
|
||||
|
||||
this.jsServiceHost.removeParticleScopeHandlers(particleId);
|
||||
|
||||
item.onStageChange({ stage: 'expired' });
|
||||
}
|
||||
|
||||
|
||||
private decodeAvmData(data: Uint8Array) {
|
||||
return new TextDecoder().decode(data.buffer);
|
||||
}
|
||||
|
||||
private _createParticlesProcessingQueue() {
|
||||
const particlesQueue = new Subject<ParticleQueueItem>();
|
||||
let prevData: Uint8Array = Buffer.from([]);
|
||||
|
||||
particlesQueue
|
||||
.pipe(
|
||||
filterExpiredParticles(this._expireParticle.bind(this)),
|
||||
|
||||
concatMap(async (item) => {
|
||||
if (!this.isInitialized || this.marineHost === undefined) {
|
||||
// If `.stop()` was called return null to stop particle processing immediately
|
||||
return null;
|
||||
}
|
||||
|
||||
// IMPORTANT!
|
||||
// AVM runner execution and prevData <-> newData swapping
|
||||
// MUST happen sequentially (in a critical section).
|
||||
// Otherwise the race might occur corrupting the prevData
|
||||
|
||||
log_particle.debug('id %s. sending particle to interpreter', item.particle.id);
|
||||
log_particle.trace('id %s. prevData: %s', item.particle.id, this.decodeAvmData(prevData));
|
||||
|
||||
const args = serializeAvmArgs(
|
||||
{
|
||||
initPeerId: item.particle.initPeerId,
|
||||
currentPeerId: this.keyPair.getPeerId(),
|
||||
timestamp: item.particle.timestamp,
|
||||
ttl: item.particle.ttl,
|
||||
keyFormat: KeyPairFormat.Ed25519,
|
||||
particleId: item.particle.id,
|
||||
secretKeyBytes: this.keyPair.toEd25519PrivateKey(),
|
||||
},
|
||||
item.particle.script,
|
||||
prevData,
|
||||
item.particle.data,
|
||||
item.callResults,
|
||||
);
|
||||
|
||||
let avmCallResult: InterpreterResult | Error;
|
||||
try {
|
||||
const res = await this.marineHost.callService('avm', 'invoke', args, defaultCallParameters);
|
||||
avmCallResult = deserializeAvmResult(res);
|
||||
} catch (e) {
|
||||
avmCallResult = e instanceof Error ? e : new Error(String(e));
|
||||
}
|
||||
|
||||
if (!(avmCallResult instanceof Error) && avmCallResult.retCode === 0) {
|
||||
const newData = Buffer.from(avmCallResult.data);
|
||||
prevData = newData;
|
||||
}
|
||||
|
||||
return {
|
||||
...item,
|
||||
result: avmCallResult,
|
||||
};
|
||||
}),
|
||||
)
|
||||
.subscribe((item) => {
|
||||
// If peer was stopped, do not proceed further
|
||||
if (item === null || !this.isInitialized) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Do not proceed further if the particle is expired
|
||||
if (hasExpired(item.particle)) {
|
||||
return;
|
||||
}
|
||||
|
||||
// Do not continue if there was an error in particle interpretation
|
||||
if (item.result instanceof Error) {
|
||||
log_particle.error('id %s. interpreter failed: %s', item.particle.id, item.result.message);
|
||||
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.message });
|
||||
return;
|
||||
}
|
||||
|
||||
if (item.result.retCode !== 0) {
|
||||
log_particle.error(
|
||||
'id %s. interpreter failed: retCode: %d, message: %s',
|
||||
item.particle.id,
|
||||
item.result.retCode,
|
||||
item.result.errorMessage,
|
||||
);
|
||||
log_particle.trace('id %s. avm data: %s', item.particle.id, this.decodeAvmData(item.result.data));
|
||||
item.onStageChange({ stage: 'interpreterError', errorMessage: item.result.errorMessage });
|
||||
return;
|
||||
}
|
||||
|
||||
log_particle.trace(
|
||||
'id %s. interpreter result: retCode: %d, avm data: %s',
|
||||
item.particle.id,
|
||||
item.result.retCode,
|
||||
this.decodeAvmData(item.result.data)
|
||||
);
|
||||
|
||||
setTimeout(() => {
|
||||
item.onStageChange({ stage: 'interpreted' });
|
||||
}, 0);
|
||||
|
||||
// send particle further if requested
|
||||
if (item.result.nextPeerPks.length > 0) {
|
||||
const newParticle = cloneWithNewData(item.particle, Buffer.from(item.result.data));
|
||||
this._outgoingParticles.next({
|
||||
...item,
|
||||
particle: newParticle,
|
||||
nextPeerIds: item.result.nextPeerPks,
|
||||
});
|
||||
}
|
||||
|
||||
// execute call requests if needed
|
||||
// and put particle with the results back to queue
|
||||
if (item.result.callRequests.length > 0) {
|
||||
for (const [key, cr] of item.result.callRequests) {
|
||||
const req = {
|
||||
fnName: cr.functionName,
|
||||
args: cr.arguments,
|
||||
serviceId: cr.serviceId,
|
||||
tetraplets: cr.tetraplets,
|
||||
particleContext: getParticleContext(item.particle),
|
||||
};
|
||||
|
||||
if (hasExpired(item.particle)) {
|
||||
// just in case do not call any services if the particle is already expired
|
||||
return;
|
||||
}
|
||||
this._execSingleCallRequest(req)
|
||||
.catch((err): CallServiceResult => {
|
||||
if (err instanceof ServiceError) {
|
||||
return {
|
||||
retCode: ResultCodes.error,
|
||||
result: err.message,
|
||||
};
|
||||
}
|
||||
|
||||
return {
|
||||
retCode: ResultCodes.error,
|
||||
result: `Service call failed. fnName="${req.fnName}" serviceId="${
|
||||
req.serviceId
|
||||
}" error: ${err.toString()}`,
|
||||
};
|
||||
})
|
||||
.then((res) => {
|
||||
const serviceResult = {
|
||||
result: jsonify(res.result),
|
||||
retCode: res.retCode,
|
||||
};
|
||||
|
||||
const newParticle = cloneWithNewData(item.particle, Buffer.from([]));
|
||||
particlesQueue.next({
|
||||
...item,
|
||||
particle: newParticle,
|
||||
callResults: [[key, serviceResult]],
|
||||
});
|
||||
});
|
||||
}
|
||||
} else {
|
||||
item.onStageChange({ stage: 'localWorkDone' });
|
||||
}
|
||||
});
|
||||
|
||||
return particlesQueue;
|
||||
}
|
||||
|
||||
private async _execSingleCallRequest(req: CallServiceData): Promise<CallServiceResult> {
|
||||
const particleId = req.particleContext.particleId;
|
||||
log_particle.trace('id %s. executing call service handler %j', particleId, req);
|
||||
@@ -552,17 +552,16 @@ export abstract class FluencePeer {
|
||||
this._timeouts.forEach((timeout) => {
|
||||
clearTimeout(timeout);
|
||||
});
|
||||
this._particleQueues.clear();
|
||||
}
|
||||
}
|
||||
|
||||
function filterExpiredParticles(onParticleExpiration: (item: ParticleQueueItem) => void) {
|
||||
function filterExpiredParticles<T extends ParticleQueueItem>(onParticleExpiration: (item: T) => void) {
|
||||
return pipe(
|
||||
tap((item: ParticleQueueItem) => {
|
||||
tap((item: T) => {
|
||||
if (hasExpired(item.particle)) {
|
||||
onParticleExpiration(item);
|
||||
}
|
||||
}),
|
||||
filter((x: ParticleQueueItem) => !hasExpired(x.particle)),
|
||||
filter((x) => !hasExpired(x.particle)),
|
||||
);
|
||||
}
|
||||
|
85
packages/core/js-client/src/jsPeer/__test__/par.spec.ts
Normal file
85
packages/core/js-client/src/jsPeer/__test__/par.spec.ts
Normal file
@@ -0,0 +1,85 @@
|
||||
/*
|
||||
* Copyright 2023 Fluence Labs Limited
|
||||
*
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
import { describe, expect, it } from 'vitest';
|
||||
import { registerHandlersHelper, withPeer } from '../../util/testUtils.js';
|
||||
import { handleTimeout } from '../../particle/Particle.js';
|
||||
import { CallServiceData, ResultCodes } from '../../jsServiceHost/interfaces.js';
|
||||
|
||||
describe('FluencePeer flow tests', () => {
|
||||
it('should execute par instruction in parallel', async function () {
|
||||
await withPeer(async (peer) => {
|
||||
const res = await new Promise<any>((resolve, reject) => {
|
||||
const script = `
|
||||
(par
|
||||
(seq
|
||||
(call %init_peer_id% ("flow" "timeout") [1000 "test1"] res1)
|
||||
(call %init_peer_id% ("callback" "callback1") [res1])
|
||||
)
|
||||
(seq
|
||||
(call %init_peer_id% ("flow" "timeout") [1000 "test2"] res2)
|
||||
(call %init_peer_id% ("callback" "callback2") [res2])
|
||||
)
|
||||
)
|
||||
`;
|
||||
|
||||
const particle = peer.internals.createNewParticle(script);
|
||||
|
||||
peer.internals.regHandler.forParticle(particle.id, 'flow', 'timeout', (req: CallServiceData) => {
|
||||
const [timeout, message] = req.args;
|
||||
|
||||
return new Promise((resolve) => {
|
||||
setTimeout(() => {
|
||||
const res = {
|
||||
result: message,
|
||||
retCode: ResultCodes.success,
|
||||
};
|
||||
resolve(res);
|
||||
}, timeout);
|
||||
});
|
||||
});
|
||||
|
||||
if (particle instanceof Error) {
|
||||
return reject(particle.message);
|
||||
}
|
||||
|
||||
const values: any[] = [];
|
||||
|
||||
registerHandlersHelper(peer, particle, {
|
||||
callback: {
|
||||
callback1: (args: any) => {
|
||||
const [val] = args;
|
||||
values.push(val);
|
||||
if (values.length === 2) {
|
||||
resolve(values);
|
||||
}
|
||||
},
|
||||
callback2: (args: any) => {
|
||||
const [val] = args;
|
||||
values.push(val);
|
||||
if (values.length === 2) {
|
||||
resolve(values);
|
||||
}
|
||||
},
|
||||
},
|
||||
});
|
||||
|
||||
peer.internals.initiateParticle(particle, handleTimeout(reject));
|
||||
});
|
||||
|
||||
await expect(res).toEqual(expect.arrayContaining(["test1", "test1"]));
|
||||
});
|
||||
}, 1500);
|
||||
});
|
@@ -80,6 +80,14 @@ export const compileAqua = async (aquaFile: string): Promise<CompiledFile> => {
|
||||
};
|
||||
|
||||
class NoopConnection implements IConnection {
|
||||
start(): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
stop(): Promise<void> {
|
||||
return Promise.resolve();
|
||||
}
|
||||
|
||||
getRelayPeerId(): string {
|
||||
return 'nothing_here';
|
||||
}
|
||||
|
Reference in New Issue
Block a user