Implement peer.timeout built-in function (#101)

This commit is contained in:
Pavel
2021-11-17 09:21:32 +03:00
committed by GitHub
parent 2de819144a
commit defe961413
7 changed files with 180 additions and 77 deletions

10
package-lock.json generated
View File

@ -5580,7 +5580,10 @@
"name": "@achingbrain/node-fetch",
"version": "2.6.7",
"resolved": "https://registry.npmjs.org/@achingbrain/node-fetch/-/node-fetch-2.6.7.tgz",
"integrity": "sha512-iTASGs+HTFK5E4ZqcMsHmeJ4zodyq8L38lZV33jwqcBJYoUt3HjN4+ot+O9/0b+ke8ddE7UgOtVuZN/OkV19/g=="
"integrity": "sha512-iTASGs+HTFK5E4ZqcMsHmeJ4zodyq8L38lZV33jwqcBJYoUt3HjN4+ot+O9/0b+ke8ddE7UgOtVuZN/OkV19/g==",
"engines": {
"node": "4.x || >=6.0.0"
}
},
"node_modules/node-forge": {
"version": "0.10.0",
@ -7252,11 +7255,6 @@
"safer-buffer": "^2.0.2",
"tweetnacl": "~0.14.0"
},
"bin": {
"sshpk-conv": "bin/sshpk-conv",
"sshpk-sign": "bin/sshpk-sign",
"sshpk-verify": "bin/sshpk-verify"
},
"engines": {
"node": ">=0.10.0"
}

View File

@ -1,4 +1,4 @@
import { FluencePeer } from '../../index';
import { FluencePeer, setLogLevel } from '../../index';
import { Particle } from '../../internal/Particle';
import { handleTimeout } from '../../internal/utils';
import { registerHandlersHelper } from '../util';
@ -72,4 +72,88 @@ describe('Avm spec', () => {
await peer.stop();
});
it('Timeout in par call: race', async () => {
// arrange
const peer = new FluencePeer();
await peer.start();
// act
const promise = new Promise((resolve, reject) => {
const script = `
(seq
(call %init_peer_id% ("op" "identity") ["slow_result"] arg)
(seq
(par
(call %init_peer_id% ("peer" "timeout") [1000 arg] $result)
(call %init_peer_id% ("op" "identity") ["fast_result"] $result)
)
(call %init_peer_id% ("return" "return") [$result.$[0]])
)
)
`;
const particle = Particle.createNew(script);
registerHandlersHelper(peer, particle, {
return: {
return: (args) => {
resolve(args[0]);
},
},
});
peer.internals.initiateParticle(particle, handleTimeout(reject));
});
// assert
const res = await promise;
expect(res).toBe('fast_result');
await peer.stop();
});
it('Timeout in par call: wait', async () => {
// arrange
const peer = new FluencePeer();
await peer.start();
// act
const promise = new Promise((resolve, reject) => {
const script = `
(seq
(call %init_peer_id% ("op" "identity") ["timeout_msg"] arg)
(seq
(seq
(par
(call %init_peer_id% ("peer" "timeout") [1000 arg] $ok_or_err)
(call "invalid_peer" ("op" "identity") ["never"] $ok_or_err)
)
(xor
(match $ok_or_err.$[0] "timeout_msg"
(ap "failed_with_timeout" $result)
)
(ap "impossible happened" $result)
)
)
(call %init_peer_id% ("return" "return") [$result.$[0]])
)
)
`;
const particle = Particle.createNew(script);
registerHandlersHelper(peer, particle, {
return: {
return: (args) => {
resolve(args[0]);
},
},
});
peer.internals.initiateParticle(particle, handleTimeout(reject));
});
// assert
const res = await promise;
expect(res).toBe('failed_with_timeout');
await peer.stop();
});
});

View File

@ -5,39 +5,44 @@ import { defaultServices } from '../../internal/defaultServices';
describe('Tests for default handler', () => {
// prettier-ignore
each`
fnName | args | retCode | result
${'identity'} | ${[]} | ${0} | ${{}}
${'identity'} | ${[1]} | ${0} | ${1}
${'identity'} | ${[1, 2]} | ${1} | ${'identity accepts up to 1 arguments, received 2 arguments'}
serviceId | fnName | args | retCode | result
${'op'} | ${'identity'} | ${[]} | ${0} | ${{}}
${'op'} | ${'identity'} | ${[1]} | ${0} | ${1}
${'op'} | ${'identity'} | ${[1, 2]} | ${1} | ${'identity accepts up to 1 arguments, received 2 arguments'}
${'noop'} | ${[1, 2]} | ${0} | ${{}}
${'op'} | ${'noop'} | ${[1, 2]} | ${0} | ${{}}
${'array'} | ${[1, 2, 3]} | ${0} | ${[1, 2, 3]}
${'op'} | ${'array'} | ${[1, 2, 3]} | ${0} | ${[1, 2, 3]}
${'concat'} | ${[[1, 2], [3, 4], [5, 6]]} | ${0} | ${[1, 2, 3, 4, 5, 6]}
${'concat'} | ${[[1, 2]]} | ${0} | ${[1, 2]}
${'concat'} | ${[]} | ${0} | ${[]}
${'concat'} | ${[1, [1, 2], 1]} | ${1} | ${"All arguments of 'concat' must be arrays: arguments 0, 2 are not"}
${'op'} | ${'concat'} | ${[[1, 2], [3, 4], [5, 6]]} | ${0} | ${[1, 2, 3, 4, 5, 6]}
${'op'} | ${'concat'} | ${[[1, 2]]} | ${0} | ${[1, 2]}
${'op'} | ${'concat'} | ${[]} | ${0} | ${[]}
${'op'} | ${'concat'} | ${[1, [1, 2], 1]} | ${1} | ${"All arguments of 'concat' must be arrays: arguments 0, 2 are not"}
${'string_to_b58'} | ${["test"]} | ${0} | ${"3yZe7d"}
${'string_to_b58'} | ${["test", 1]} | ${1} | ${"string_to_b58 accepts only one string argument"}
${'op'} | ${'string_to_b58'} | ${["test"]} | ${0} | ${"3yZe7d"}
${'op'} | ${'string_to_b58'} | ${["test", 1]} | ${1} | ${"string_to_b58 accepts only one string argument"}
${'string_from_b58'} | ${["3yZe7d"]} | ${0} | ${"test"}
${'string_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"string_from_b58 accepts only one string argument"}
${'op'} | ${'string_from_b58'} | ${["3yZe7d"]} | ${0} | ${"test"}
${'op'} | ${'string_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"string_from_b58 accepts only one string argument"}
${'bytes_to_b58'} | ${[[116, 101, 115, 116]]} | ${0} | ${"3yZe7d"}
${'bytes_to_b58'} | ${[[116, 101, 115, 116], 1]} | ${1} | ${"bytes_to_b58 accepts only single argument: array of numbers"}
${'op'} | ${'bytes_to_b58'} | ${[[116, 101, 115, 116]]} | ${0} | ${"3yZe7d"}
${'op'} | ${'bytes_to_b58'} | ${[[116, 101, 115, 116], 1]} | ${1} | ${"bytes_to_b58 accepts only single argument: array of numbers"}
${'bytes_from_b58'} | ${["3yZe7d"]} | ${0} | ${[116, 101, 115, 116]}
${'bytes_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"bytes_from_b58 accepts only one string argument"}
${'op'} | ${'bytes_from_b58'} | ${["3yZe7d"]} | ${0} | ${[116, 101, 115, 116]}
${'op'} | ${'bytes_from_b58'} | ${["3yZe7d", 1]} | ${1} | ${"bytes_from_b58 accepts only one string argument"}
${'peer'} | ${'timeout'} | ${[200, []]} | ${0} | ${[]}}
${'peer'} | ${'timeout'} | ${[200, ['test']]} | ${0} | ${['test']}}
${'peer'} | ${'timeout'} | ${[]} | ${1} | ${'timeout accepts exactly two arguments: timeout duration in ms and an optional message string'}}
${'peer'} | ${'timeout'} | ${[200, 'test', 1]} | ${1} | ${'timeout accepts exactly two arguments: timeout duration in ms and an optional message string'}}
`.test(
//
'$fnName with $args expected retcode: $retCode and result: $result',
async ({ fnName, args, retCode, result }) => {
async ({ serviceId, fnName, args, retCode, result }) => {
// arrange
const req: CallServiceData = {
serviceId: 'op',
serviceId: serviceId,
fnName: fnName,
args: args,
tetraplets: [],

View File

@ -389,6 +389,11 @@ export class FluencePeer {
});
this._outgoingParticles.subscribe(async (item) => {
if (!this._connection) {
item.particle.logTo('error', 'cannot send particle, peer is not connected');
item.onStageChange({ stage: 'sendingError' });
return;
}
await this._connection.sendParticle(item.particle);
item.onStageChange({ stage: 'sent' });
});
@ -442,34 +447,16 @@ export class FluencePeer {
// execute call requests if needed
// and put particle with the results back to queue
if (result.callRequests.length > 0) {
this._execCallRequests(particle, result.callRequests).then((callResults) => {
const newParticle = particle.clone();
newParticle.callResults = callResults;
newParticle.data = Buffer.from([]);
particlesQueue.next({ ...item, particle: newParticle });
});
} else {
item.onStageChange({ stage: 'localWorkDone' });
}
});
return particlesQueue;
}
private async _execCallRequests(p: Particle, callRequests: CallRequestsArray): Promise<CallResultsArray> {
// execute all requests asynchronously
const promises = callRequests.map(([key, callRequest]) => {
for (let [key, cr] of result.callRequests) {
const req = {
fnName: callRequest.functionName,
args: callRequest.arguments,
serviceId: callRequest.serviceId,
tetraplets: callRequest.tetraplets,
particleContext: p.getParticleContext(),
fnName: cr.functionName,
args: cr.arguments,
serviceId: cr.serviceId,
tetraplets: cr.tetraplets,
particleContext: particle.getParticleContext(),
};
// execute single requests and catch possible errors
const promise = this._execSingleCallRequest(req)
this._execSingleCallRequest(req)
.catch(
(err): CallServiceResult => ({
retCode: ResultCodes.exceptionInHandler,
@ -478,23 +465,29 @@ export class FluencePeer {
}" error: ${err.toString()}`,
}),
)
.then(
(res): AvmCallServiceResult => ({
.then((res) => {
const serviceResult = {
result: JSON.stringify(res.result),
retCode: res.retCode,
}),
)
.then((res): [key: number, res: AvmCallServiceResult] => [key, res]);
};
return promise;
const newParticle = particle.clone();
newParticle.callResults = [[key, serviceResult]];
newParticle.data = Buffer.from([]);
particlesQueue.next({ ...item, particle: newParticle });
});
// don't block
const res = await Promise.all(promises);
log.debug(`Executed call service for particle id=${p.id}, Call service results: `, res);
return res;
}
} else {
item.onStageChange({ stage: 'localWorkDone' });
}
});
return particlesQueue;
}
private async _execSingleCallRequest(req: CallServiceData): Promise<CallServiceResult> {
log.debug('executing call service handler', req);
const particleId = req.particleContext.particleId;
// trying particle-specific handler
@ -545,6 +538,7 @@ export class FluencePeer {
res.result = null;
}
log.debug('executed call service handler, req and res are: ', req, res);
return res;
}

View File

@ -146,6 +146,7 @@ export type ParticleExecutionStage =
| { stage: 'interpreterError'; errorMessage: string }
| { stage: 'localWorkDone' }
| { stage: 'sent' }
| { stage: 'sendingError' }
| { stage: 'expired' };
export interface ParticleQueueItem {

View File

@ -328,6 +328,10 @@ export function callFunction(rawFnArgs: Array<any>, def: FunctionCallDef, script
resolve(undefined);
}
if (stage.stage === 'sendingError') {
reject(`Could not send particle for ${def.functionName}: not connected`);
}
if (stage.stage === 'expired') {
reject(`Request timed out after ${particle.ttl} for ${def.functionName}`);
}

View File

@ -99,6 +99,23 @@ export const defaultServices: { [serviceId in string]: { [fnName in string]: Gen
},
peer: {
timeout: (req) => {
if (req.args.length !== 2) {
return error(
'timeout accepts exactly two arguments: timeout duration in ms and an optional message string',
);
}
const durationMs = req.args[0];
const message = req.args[1];
return new Promise((resolve) => {
setTimeout(() => {
const res = success(message);
resolve(res);
}, durationMs);
});
},
identify: (req) => {
return error('The JS implementation of Peer does not support identify');
},