feat(js-client): Add fire-and-forget flag [DXJ-562] (#400)

* Add fire-and-forget flag

* Fix test

* Store particle behavior in queue item

* Fix tests
This commit is contained in:
Akim 2023-12-16 01:21:31 +07:00 committed by GitHub
parent 9b629eef2e
commit 86a73027e5
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
7 changed files with 20 additions and 2 deletions

View File

@ -80,6 +80,7 @@ describe("User API methods", () => {
args: {}, args: {},
peer, peer,
script, script,
fireAndForget: false,
}); });
expect(res).toBe(7); expect(res).toBe(7);

View File

@ -134,6 +134,7 @@ export const v5_callFunction = async (
peer: peerOrArg, peer: peerOrArg,
args: callArgs, args: callArgs,
config, config,
fireAndForget: returnTypeVoid,
}); });
if (returnTypeVoid) { if (returnTypeVoid) {

View File

@ -55,7 +55,7 @@ describe("FluenceClient usage test suite", () => {
}, },
}); });
peer.internals.initiateParticle(particle, resolve, reject); peer.internals.initiateParticle(particle, resolve, reject, false);
}); });
await expect(promise).rejects.toThrow(ExpirationError); await expect(promise).rejects.toThrow(ExpirationError);

View File

@ -49,6 +49,7 @@ export type CallAquaFunctionArgs = {
config?: CallAquaFunctionConfig | undefined; config?: CallAquaFunctionConfig | undefined;
peer: FluencePeer; peer: FluencePeer;
args: { [key: string]: JSONValue | ArgCallbackFunction }; args: { [key: string]: JSONValue | ArgCallbackFunction };
fireAndForget: boolean;
}; };
export type CallAquaFunctionConfig = { export type CallAquaFunctionConfig = {
@ -60,6 +61,7 @@ export const callAquaFunction = async ({
config = {}, config = {},
peer, peer,
args, args,
fireAndForget,
}: CallAquaFunctionArgs) => { }: CallAquaFunctionArgs) => {
log.trace("calling aqua function %j", { script, config, args }); log.trace("calling aqua function %j", { script, config, args });
@ -85,6 +87,6 @@ export const callAquaFunction = async ({
registerParticleScopeService(peer, particle, errorHandlingService(reject)); registerParticleScopeService(peer, particle, errorHandlingService(reject));
peer.internals.initiateParticle(particle, resolve, reject); peer.internals.initiateParticle(particle, resolve, reject, fireAndForget);
}); });
}; };

View File

@ -255,11 +255,13 @@ export abstract class FluencePeer {
* @param particle - particle to start execution of * @param particle - particle to start execution of
* @param onSuccess - callback which is called when particle execution succeed * @param onSuccess - callback which is called when particle execution succeed
* @param onError - callback which is called when particle execution fails * @param onError - callback which is called when particle execution fails
* @param fireAndForget - determines whether particle has fire-and-forget behavior
*/ */
initiateParticle: ( initiateParticle: (
particle: IParticle, particle: IParticle,
onSuccess: (result: JSONValue) => void, onSuccess: (result: JSONValue) => void,
onError: (error: Error) => void, onError: (error: Error) => void,
fireAndForget: boolean = true,
): void => { ): void => {
if (!this.isInitialized) { if (!this.isInitialized) {
throw new Error( throw new Error(
@ -278,6 +280,7 @@ export abstract class FluencePeer {
callResults: [], callResults: [],
onSuccess, onSuccess,
onError, onError,
fireAndForget,
}); });
}, },
@ -607,6 +610,9 @@ export abstract class FluencePeer {
if (item.result.callRequests.length > 0) { if (item.result.callRequests.length > 0) {
// TS doesn't allow to pass just 'item' // TS doesn't allow to pass just 'item'
void this.execCallRequests({ ...item, result: item.result }); void this.execCallRequests({ ...item, result: item.result });
} else if (item.fireAndForget === true) {
// Local work done.
item.onSuccess(null);
} }
return connectionPromise; return connectionPromise;

View File

@ -187,6 +187,7 @@ export interface ParticleQueueItem {
callResults: CallResultsArray; callResults: CallResultsArray;
onSuccess: (result: JSONValue) => void; onSuccess: (result: JSONValue) => void;
onError: (error: Error) => void; onError: (error: Error) => void;
fireAndForget?: boolean;
} }
/** /**

View File

@ -103,11 +103,18 @@ export const compileAqua = async (aquaFile: string): Promise<CompiledFile> => {
const functions = Object.entries(compilationResult.functions) const functions = Object.entries(compilationResult.functions)
.map(([name, fnInfo]: [string, FunctionInfo]) => { .map(([name, fnInfo]: [string, FunctionInfo]) => {
const callFn = (peer: FluencePeer, args: PassedArgs) => { const callFn = (peer: FluencePeer, args: PassedArgs) => {
const def = fnInfo.funcDef;
const isReturnTypeVoid =
def.arrow.codomain.tag === "nil" ||
def.arrow.codomain.items.length === 0;
return callAquaFunction({ return callAquaFunction({
script: fnInfo.script, script: fnInfo.script,
config: {}, config: {},
peer: peer, peer: peer,
args, args,
fireAndForget: isReturnTypeVoid,
}); });
}; };