diff --git a/package-lock.json b/package-lock.json index 8a3878b2..20614f70 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1108,9 +1108,9 @@ } }, "@fluencelabs/aquamarine-interpreter": { - "version": "0.7.0", - "resolved": "https://registry.npmjs.org/@fluencelabs/aquamarine-interpreter/-/aquamarine-interpreter-0.7.0.tgz", - "integrity": "sha512-2GPsOXSakpRPJFiKAcylK6Q/UhYHrQgrs8a1GCgr/OlrQEYkC4PY4HxnrdErt8fzTUDBHH4veKHKGM+IByYhxA==" + "version": "0.7.2", + "resolved": "https://registry.npmjs.org/@fluencelabs/aquamarine-interpreter/-/aquamarine-interpreter-0.7.2.tgz", + "integrity": "sha512-4LrcpeG0ONb3/kTFgt1QNERn9e7aAJBJgqbqNnx81NqFFngTi2xypKIuyPOttcxSdZTH5mpbwwn3JKFimvOvNA==" }, "@istanbuljs/load-nyc-config": { "version": "1.1.0", diff --git a/package.json b/package.json index fb1cdd11..6404def6 100644 --- a/package.json +++ b/package.json @@ -16,7 +16,7 @@ "author": "Fluence Labs", "license": "Apache-2.0", "dependencies": { - "@fluencelabs/aquamarine-interpreter": "^0.7.0", + "@fluencelabs/aquamarine-interpreter": "0.7.2", "async": "3.2.0", "base64-js": "1.3.1", "bs58": "4.0.1", diff --git a/src/FluenceClient.ts b/src/FluenceClient.ts deleted file mode 100644 index 288a23f9..00000000 --- a/src/FluenceClient.ts +++ /dev/null @@ -1,35 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { PeerIdB58 } from './internal/commonTypes'; -import Multiaddr from 'multiaddr'; - -export interface FluenceClient { - readonly relayPeerId: PeerIdB58; - readonly selfPeerId: PeerIdB58; - readonly isConnected: boolean; - - disconnect(): Promise; - - /** - * Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection. - * - * @param multiaddr - */ - connect(multiaddr: string | Multiaddr): Promise; - - sendScript(script: string, data?: Map, ttl?: number): Promise; -} diff --git a/src/__test__/connection.ts b/src/__test__/connection.ts index 04bce751..cb6fbe6a 100644 --- a/src/__test__/connection.ts +++ b/src/__test__/connection.ts @@ -1,7 +1,3 @@ -import { generatePeerId } from '..'; -import { createClient } from '../api'; -import { FluenceClientImpl } from '../internal/FluenceClientImpl'; - // Uncomment to test on dev nodes // export const nodes = [ // { @@ -14,22 +10,15 @@ import { FluenceClientImpl } from '../internal/FluenceClientImpl'; // }, // ]; -// start docker container to run integration tests locally -// > docker run --rm -e RUST_LOG="info" -p 1210:1210 -p 4310:4310 fluencelabs/fluence:freeze -t 1210 -w 4310 -k gKdiCSUr1TFGFEgu2t8Ch1XEUsrN5A2UfBLjSZvfci9SPR3NvZpACfcpPGC3eY4zma1pk7UvYv5zb1VjvPHwCjj +/* + * start docker container to run integration tests locally: + +docker run --rm -e RUST_LOG="info" -p 1210:1210 -p 4310:4310 fluencelabs/fluence -t 1210 -w 4310 -k gKdiCSUr1TFGFEgu2t8Ch1XEUsrN5A2UfBLjSZvfci9SPR3NvZpACfcpPGC3eY4zma1pk7UvYv5zb1VjvPHwCjj + + */ export const nodes = [ { multiaddr: '/ip4/127.0.0.1/tcp/4310/ws/p2p/12D3KooWKEprYXUXqoV5xSBeyqrWLpQLLH4PXfvVkDJtmcqmh5V3', peerId: '12D3KooWKEprYXUXqoV5xSBeyqrWLpQLLH4PXfvVkDJtmcqmh5V3', }, ]; - -export const createLocalClient = async () => { - const peerId = await generatePeerId(); - const client = new FluenceClientImpl(peerId); - await client.local(); - return client; -}; - -export const createConnectedClient = async (node: string) => { - return (await createClient(node)) as FluenceClientImpl; -}; diff --git a/src/__test__/integration/builtins.spec.ts b/src/__test__/integration/builtins.spec.ts index 016dda4e..0bcfbc2e 100644 --- a/src/__test__/integration/builtins.spec.ts +++ b/src/__test__/integration/builtins.spec.ts @@ -9,16 +9,22 @@ import { uploadModule, } from '../../internal/builtins'; import { ModuleConfig } from '../../internal/moduleConfig'; -import { checkConnection } from '../../api'; -import { generatePeerId } from '../..'; -import { FluenceClientImpl } from '../../internal/FluenceClientImpl'; -import { createConnectedClient, nodes } from '../connection'; +import { createClient, FluenceClient } from '../../api.unstable'; +import { nodes } from '../connection'; + +let client: FluenceClient; describe('Builtins usage suite', () => { + afterEach(async () => { + if (client) { + await client.disconnect(); + } + }); + jest.setTimeout(10000); it('get_modules', async function () { - const client = await createConnectedClient(nodes[0].multiaddr); + client = await createClient(nodes[0].multiaddr); let modulesList = await getModules(client); @@ -26,7 +32,7 @@ describe('Builtins usage suite', () => { }); it('get_interfaces', async function () { - const client = await createConnectedClient(nodes[0].multiaddr); + client = await createClient(nodes[0].multiaddr); let interfaces = await getInterfaces(client); @@ -34,28 +40,15 @@ describe('Builtins usage suite', () => { }); it('get_blueprints', async function () { - const client = await createConnectedClient(nodes[0].multiaddr); + client = await createClient(nodes[0].multiaddr); let bpList = await getBlueprints(client); expect(bpList).not.toBeUndefined; }); - it('check_connection', async function () { - const peerId = await generatePeerId(); - const client = new FluenceClientImpl(peerId); - await client.local(); - await client.connect(nodes[0].multiaddr); - - let isConnected = await checkConnection(client); - - expect(isConnected).toEqual(true); - }); - it('upload_modules', async function () { - const client = await createConnectedClient(nodes[0].multiaddr); - - console.log('peerid: ' + client.selfPeerId); + client = await createClient(nodes[0].multiaddr); let config: ModuleConfig = { name: 'test_broken_module', @@ -75,7 +68,7 @@ describe('Builtins usage suite', () => { }); it('add_blueprint', async function () { - const client = await createConnectedClient(nodes[0].multiaddr); + client = await createClient(nodes[0].multiaddr); let bpId = 'some'; @@ -84,20 +77,19 @@ describe('Builtins usage suite', () => { expect(bpIdReturned).toEqual(bpId); }); - // FIXME:: there is no error on broken blueprint from a node - it.skip('create_service', async function () { - const client = await createConnectedClient(nodes[0].multiaddr); + it('create broken blueprint', async function () { + client = await createClient(nodes[0].multiaddr); - let serviceId = await createService(client, 'test_broken_blueprint'); + let promise = createService(client, 'test_broken_blueprint'); - // TODO there is no error on broken blueprint from a node - expect(serviceId).not.toBeUndefined; + await expect(promise).rejects.toMatchObject({ + error: expect.stringContaining("Blueprint wasn't found at"), + instruction: expect.stringContaining('blueprint_id'), + }); }); it('add and remove script', async function () { - const client = await createConnectedClient(nodes[0].multiaddr); - - console.log('peerid: ' + client.selfPeerId); + client = await createClient(nodes[0].multiaddr); let script = ` (seq @@ -107,7 +99,7 @@ describe('Builtins usage suite', () => { `; let resMakingPromise = new Promise((resolve) => { - client.registerCallback('test', 'test1', (args, _) => { + client.aquaCallHandler.on('test', 'test1', (args, _) => { resolve([...args]); return {}; }); @@ -117,7 +109,6 @@ describe('Builtins usage suite', () => { await resMakingPromise .then((args) => { - console.log('final!'); expect(args as string[]).toEqual(['1', '2', '3']); }) .finally(() => { diff --git a/src/__test__/integration/client.spec.ts b/src/__test__/integration/client.spec.ts index a87ad633..fcdb2128 100644 --- a/src/__test__/integration/client.spec.ts +++ b/src/__test__/integration/client.spec.ts @@ -1,201 +1,54 @@ -import { encode } from 'bs58'; -import { generatePeerId, peerIdToSeed, seedToPeerId } from '../../internal/peerIdUtils'; -import { FluenceClientImpl } from '../../internal/FluenceClientImpl'; -import log from 'loglevel'; -import { createClient, subscribeForErrors } from '../../api'; +import { checkConnection, createClient, FluenceClient } from '../../api.unstable'; import Multiaddr from 'multiaddr'; -import { createConnectedClient, createLocalClient, nodes } from '../connection'; +import { nodes } from '../connection'; +import { RequestFlowBuilder } from '../../internal/RequestFlowBuilder'; + +let client: FluenceClient; describe('Typescript usage suite', () => { - it('should create private key from seed and back', async function () { - // prettier-ignore - let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201]; - let seedStr = encode(seed); - log.trace('SEED STR: ' + seedStr); - let pid = await seedToPeerId(seedStr); - expect(peerIdToSeed(pid)).toEqual(seedStr); + afterEach(async () => { + if (client) { + await client.disconnect(); + } }); - describe('should make connection to network', function () { - const testProcedure = async (client: FluenceClientImpl) => { - let resMakingPromise = new Promise((resolve) => { - client.registerCallback('test', 'test', (args, _) => { - resolve(args); - return {}; - }); - }); - - let script = ` - (seq - (call "${client.relayPeerId}" ("op" "identity") []) - (call "${client.selfPeerId}" ("test" "test") [hello]) - ) - `; - - let data: Map = new Map(); - data.set('hello', 'world'); - - await client.sendScript(script, data); - - return await resMakingPromise; - }; - - it('address as string', async function () { - // arrange - const addr = nodes[0].multiaddr; - - // act - const client = (await createClient(addr)) as FluenceClientImpl; - - // assert - const res = await testProcedure(client); - expect(res).toEqual(['world']); - }); - - it('address as multiaddr', async function () { - // arrange - const addr = new Multiaddr(nodes[0].multiaddr); - - // act - const client = (await createClient(addr)) as FluenceClientImpl; - - // assert - const res = await testProcedure(client); - expect(res).toEqual(['world']); - }); - - it('address as node', async function () { - // arrange - const addr = nodes[0]; - - // act - const client = (await createClient(addr)) as FluenceClientImpl; - - // assert - const res = await testProcedure(client); - expect(res).toEqual(['world']); - }); - - it('peerid as peer id', async function () { - // arrange - const addr = nodes[0].multiaddr; - const pid = await generatePeerId(); - - // act - const client = (await createClient(addr, pid)) as FluenceClientImpl; - - // assert - const res = await testProcedure(client); - expect(res).toEqual(['world']); - }); - - it('peerid as seed', async function () { - // arrange - const addr = nodes[0].multiaddr; - const pid = peerIdToSeed(await generatePeerId()); - - // act - const client = (await createClient(addr, pid)) as FluenceClientImpl; - - // assert - const res = await testProcedure(client); - expect(res).toEqual(['world']); - }); - }); - - it('should make a call through the network', async function () { + it('should make a call through network', async () => { // arrange - const client = await createConnectedClient(nodes[0].multiaddr); - - client.registerCallback('test', 'test', (args, _) => { - log.trace('should make a call through the network, called "test" "test" with args', args); - return {}; - }); - - let resMakingPromise = new Promise((resolve) => { - client.registerCallback('test', 'reverse_args', (args, _) => { - resolve([...args].reverse()); - return {}; - }); - }); + client = await createClient(); + await client.connect(nodes[0].multiaddr); // act - let script = ` - (seq - (call "${client.relayPeerId}" ("op" "identity") []) - (seq - (call "${client.selfPeerId}" ("test" "test") [a b c d] result) - (call "${client.selfPeerId}" ("test" "reverse_args") [a b c d]) - ) + const [request, promise] = new RequestFlowBuilder() + .withRawScript( + `(seq + (call init_relay ("op" "identity") ["hello world!"] result) + (call %init_peer_id% ("callback" "callback") [result]) + )`, ) - `; - - let data: Map = new Map(); - data.set('a', 'some a'); - data.set('b', 'some b'); - data.set('c', 'some c'); - data.set('d', 'some d'); - - await client.sendScript(script, data); + .buildAsFetch<[[string]]>('callback', 'callback'); + await client.initiateFlow(request); // assert - const res = await resMakingPromise; - expect(res).toEqual(['some d', 'some c', 'some b', 'some a']); + const [[result]] = await promise; + expect(result).toBe('hello world!'); }); - it('fireAndForget should work', async function () { + it('check connection should work', async function () { + client = await createClient(); + await client.connect(nodes[0].multiaddr); + + let isConnected = await checkConnection(client); + + expect(isConnected).toEqual(true); + }); + + it('two clients should work inside the same time browser', async () => { // arrange - const client = await createConnectedClient(nodes[0].multiaddr); + const client1 = await createClient(nodes[0].multiaddr); + const client2 = await createClient(nodes[0].multiaddr); let resMakingPromise = new Promise((resolve) => { - client.registerCallback('test', 'reverse_args', (args, _) => { - resolve([...args].reverse()); - return {}; - }); - }); - - // act - let script = ` - (call "${client.selfPeerId}" ("test" "reverse_args") [a b c d]) - `; - - let data: Map = new Map(); - data.set('a', 'some a'); - data.set('b', 'some b'); - data.set('c', 'some c'); - data.set('d', 'some d'); - - await client.fireAndForget(script, data); - - // assert - const res = await resMakingPromise; - expect(res).toEqual(['some d', 'some c', 'some b', 'some a']); - }); - - it('fetch should work', async function () { - // arrange - const client = await createConnectedClient(nodes[0].multiaddr); - - // act - let script = ` - (call "${client.relayPeerId}" ("peer" "identify") [] result) - `; - const data = new Map(); - data.set('__relay', client.relayPeerId); - - const [res] = await client.fetch(script, ['result'], data); - - // assert - expect(res.external_addresses).not.toBeUndefined; - }); - - it('two clients should work inside the same time browser', async function () { - // arrange - const client1 = await createConnectedClient(nodes[0].multiaddr); - const client2 = await createConnectedClient(nodes[0].multiaddr); - - let resMakingPromise = new Promise((resolve) => { - client2.registerCallback('test', 'test', (args, _) => { + client2.aquaCallHandler.onEvent('test', 'test', (args, _) => { resolve([...args]); return {}; }); @@ -214,29 +67,93 @@ describe('Typescript usage suite', () => { data.set('c', 'some c'); data.set('d', 'some d'); - await client1.sendScript(script, data); + await client1.initiateFlow(new RequestFlowBuilder().withRawScript(script).withVariables(data).build()); let res = await resMakingPromise; expect(res).toEqual(['some a', 'some b', 'some c', 'some d']); + + await client1.disconnect(); + await client2.disconnect(); + }); + + describe('should make connection to network', () => { + it('address as string', async () => { + // arrange + const addr = nodes[0].multiaddr; + + // act + client = await createClient(addr); + const isConnected = await checkConnection(client); + + // assert + expect(isConnected).toBeTruthy; + }); + + it('address as multiaddr', async () => { + // arrange + const addr = new Multiaddr(nodes[0].multiaddr); + + // act + client = await createClient(addr); + const isConnected = await checkConnection(client); + + // assert + expect(isConnected).toBeTruthy; + }); + + it('address as node', async () => { + // arrange + const addr = nodes[0]; + + // act + client = await createClient(addr); + const isConnected = await checkConnection(client); + + // assert + expect(isConnected).toBeTruthy; + }); + + it('peerid as peer id', async () => { + // arrange + const addr = nodes[0].multiaddr; + + // act + client = await createClient(addr); + const isConnected = await checkConnection(client); + + // assert + expect(isConnected).toBeTruthy; + }); + + it('peerid as seed', async () => { + // arrange + const addr = nodes[0].multiaddr; + + // act + client = await createClient(addr); + const isConnected = await checkConnection(client); + + // assert + expect(isConnected).toBeTruthy; + }); }); it('xor handling should work with connected client', async function () { // arrange - const client = await createConnectedClient(nodes[0].multiaddr); - log.setLevel('info'); + const [request, promise] = new RequestFlowBuilder() + .withRawScript( + ` + (seq + (call init_relay ("op" "identity") []) + (call init_relay ("incorrect" "service") ["incorrect_arg"]) + ) + `, + ) + .buildWithErrorHandling(); // act - let script = ` - (seq - (call relay ("op" "identity") []) - (call relay ("incorrect" "service") ["incorrect_arg"]) - ) - `; - const data = new Map(); - data.set('relay', client.relayPeerId); - - const promise = subscribeForErrors(client, 7000); - await client.sendScript(script, data); + client = await createClient(nodes[0].multiaddr); + await client.initiateFlow(request); // assert await expect(promise).rejects.toMatchObject({ @@ -247,18 +164,25 @@ describe('Typescript usage suite', () => { it('xor handling should work with local client', async function () { // arrange - const client = await createLocalClient(); + const [request, promise] = new RequestFlowBuilder() + .withRawScript( + ` + (call %init_peer_id% ("service" "fails") []) + `, + ) + .configHandler((h) => { + h.use((req, res, _) => { + res.retCode = 1; + res.result = 'service failed internally'; + }); + }) + .buildWithErrorHandling(); // act - let script = `(call %init_peer_id% ("incorrect" "service") ["incorrect_arg"])`; - - const promise = subscribeForErrors(client, 7000); - await client.sendScript(script); + client = await createClient(); + await client.initiateFlow(request); // assert - await expect(promise).rejects.toMatchObject({ - error: expect.stringContaining('There is no service: incorrect'), - instruction: expect.stringContaining('incorrect'), - }); + await expect(promise).rejects.toMatch('service failed internally'); }); }); diff --git a/src/__test__/integration/legacy.api.spec.ts b/src/__test__/integration/legacy.api.spec.ts new file mode 100644 index 00000000..c1cf6668 --- /dev/null +++ b/src/__test__/integration/legacy.api.spec.ts @@ -0,0 +1,127 @@ +import { + createClient, + Particle, + FluenceClient, + sendParticle, + registerServiceFunction, + subscribeToEvent, + sendParticleAsFetch, +} from '../../api'; +import { nodes } from '../connection'; + +let client: FluenceClient; + +describe('Legacy api suite', () => { + it('sendParticle', async () => { + client = await createClient(nodes[0]); + + const result = new Promise((resolve) => { + subscribeToEvent(client, 'callback', 'callback', (args) => { + resolve(args[0]); + }); + }); + + const script = `(seq + (call init_relay ("op" "identity") []) + (call %init_peer_id% ("callback" "callback") [arg]) + )`; + + const data = { + arg: 'hello world!', + }; + + await sendParticle(client, new Particle(script, data, 7000)); + + expect(await result).toBe('hello world!'); + }); + + it('sendParticle Error', async () => { + client = await createClient(nodes[0]); + + const script = ` + (call init_relay ("incorrect" "service") []) + `; + + const promise = new Promise((resolve, reject) => { + sendParticle(client, new Particle(script), reject); + }); + + await expect(promise).rejects.toMatchObject({ + error: expect.stringContaining("Service with id 'incorrect' not found"), + instruction: expect.stringContaining('incorrect'), + }); + }); + + it('sendParticleAsFetch', async () => { + client = await createClient(nodes[0]); + + const script = `(seq + (call init_relay ("op" "identity") []) + (call %init_peer_id% ("service" "fn") [arg]) + )`; + + const data = { + arg: 'hello world!', + }; + + const [result] = await sendParticleAsFetch<[string]>(client, new Particle(script, data, 7000), 'fn', 'service'); + + expect(result).toBe('hello world!'); + }); + + it('sendParticleAsFetch Error', async () => { + client = await createClient(nodes[0]); + + const script = ` + (call init_relay ("incorrect" "service") []) + `; + + const promise = sendParticleAsFetch<[string]>(client, new Particle(script), 'fn', 'service'); + + await expect(promise).rejects.toMatchObject({ + error: expect.stringContaining("Service with id 'incorrect' not found"), + instruction: expect.stringContaining('incorrect'), + }); + }); + + it('registerServiceFunction', async () => { + client = await createClient(nodes[0]); + + registerServiceFunction(client, 'service', 'fn', (args) => { + return { res: args[0] + ' world!' }; + }); + + const script = `(seq + (call %init_peer_id% ("service" "fn") ["hello"] result) + (call %init_peer_id% ("callback" "callback") [result]) + )`; + + const [result] = await sendParticleAsFetch<[string]>( + client, + new Particle(script, {}, 7000), + 'callback', + 'callback', + ); + + expect(result).toEqual({ res: 'hello world!' }); + }); + + it('subscribeToEvent', async () => { + client = await createClient(nodes[0]); + + const promise = new Promise((resolve) => { + subscribeToEvent(client, 'service', 'fn', (args) => { + resolve(args[0] + ' world!'); + }); + }); + + const script = ` + (call %init_peer_id% ("service" "fn") ["hello"]) + `; + + await sendParticle(client, new Particle(script, {}, 7000)); + + const result = await promise; + expect(result).toBe('hello world!'); + }); +}); diff --git a/src/__test__/unit/AquaHandler.spec.ts b/src/__test__/unit/AquaHandler.spec.ts new file mode 100644 index 00000000..ce8961c4 --- /dev/null +++ b/src/__test__/unit/AquaHandler.spec.ts @@ -0,0 +1,323 @@ +import { AquaCallHandler, errorHandler } from '../../internal/AquaHandler'; +import { ResultCodes } from '../../internal/commonTypes'; + +const req = () => ({ + serviceId: 'service', + fnName: 'fn name', + args: [], + tetraplets: [], + particleContext: { + particleId: 'id', + }, +}); + +const res = () => ({ + res, +}); + +describe('Aqua handler tests', () => { + it('Should work without middlewares', () => { + // arrange + const handler = new AquaCallHandler(); + + // act + const res = handler.execute(req()); + + // assert + expect(res).not.toBeUndefined(); + }); + + it('Should work with no-op middleware', () => { + // arrange + const handler = new AquaCallHandler(); + handler.use((req, res, next) => { + next(); + }); + + // act + const res = handler.execute(req()); + + // assert + expect(res).not.toBeUndefined(); + }); + + it('Should work with two overlapping middlewares', () => { + // arrange + const handler = new AquaCallHandler(); + handler + .use((req, res, next) => { + res.result = { hello: 'world' }; + }) + .use((req, res, next) => { + res.result = { hello: 'incorect' }; + next(); + }); + + // act + const res = handler.execute(req()); + + // assert + expect(res).toMatchObject({ + result: { hello: 'world' }, + }); + }); + + it('Should work with two NON-overlapping middlewares', () => { + // arrange + const handler = new AquaCallHandler(); + handler + .use((req, res, next) => { + res.result = {}; + next(); + }) + .use((req, res, next) => { + res.result.name = 'john'; + next(); + }) + .use((req, res, next) => { + res.result.color = 'red'; + next(); + }); + + // act + const res = handler.execute(req()); + + // assert + expect(res).toMatchObject({ + result: { name: 'john', color: 'red' }, + }); + }); + + it('Should work with provided error handling middleware', () => { + // arrange + const handler = new AquaCallHandler(); + + handler.use(errorHandler); + handler.use((req, res, next) => { + throw new Error('some error'); + }); + + // act + const res = handler.execute(req()); + + // assert + expect(res).toMatchObject({ + retCode: ResultCodes.exceptionInHandler, + result: 'Error: some error', + }); + }); + + describe('Service handler tests', () => { + it('Should register service function', () => { + // arrange + const handler = new AquaCallHandler(); + handler.on('service', 'function', (args) => { + return { called: args }; + }); + + // act + const res = handler.execute({ + ...req(), + serviceId: 'service', + fnName: 'function', + args: ['hello', 'world'], + }); + + // assert + expect(res).toMatchObject({ + retCode: ResultCodes.success, + result: { called: ['hello', 'world'] }, + }); + }); + + it('Should UNregister service function', () => { + // arrange + const handler = new AquaCallHandler(); + const unreg = handler.on('service', 'function', (args) => { + return { called: args }; + }); + unreg(); + + // act + const res = handler.execute({ + ...req(), + serviceId: 'service', + fnName: 'function', + args: ['hello', 'world'], + }); + + // assert + expect(res).toMatchObject({ + retCode: ResultCodes.unkownError, + }); + }); + + it('Should register event', async () => { + // arrange + const handler = new AquaCallHandler(); + const returnPromise = new Promise((resolve) => { + handler.onEvent('service', 'function', (args) => { + resolve({ called: args }); + }); + }); + handler.onEvent('service', 'function', (args) => { + return { called: args }; + }); + + // act + const res = handler.execute({ + ...req(), + serviceId: 'service', + fnName: 'function', + args: ['hello', 'world'], + }); + + // assert + await expect(returnPromise).resolves.toMatchObject({ called: ['hello', 'world'] }); + }); + + it('Should UNregister event', () => { + // arrange + const handler = new AquaCallHandler(); + const unreg = handler.onEvent('service', 'function', (args) => { + // don't care + }); + unreg(); + + // act + const res = handler.execute({ + ...req(), + serviceId: 'service', + fnName: 'function', + args: ['hello', 'world'], + }); + + // assert + expect(res).toMatchObject({ + retCode: ResultCodes.unkownError, + }); + }); + + it('Should register multiple service functions', () => { + // arrange + const handler = new AquaCallHandler(); + handler.on('service', 'function1', (args) => { + return 'called function1'; + }); + handler.on('service', 'function2', (args) => { + return 'called function2'; + }); + + // act + const res1 = handler.execute({ + ...req(), + serviceId: 'service', + fnName: 'function1', + }); + const res2 = handler.execute({ + ...req(), + serviceId: 'service', + fnName: 'function2', + }); + + // assert + expect(res1).toMatchObject({ + retCode: ResultCodes.success, + result: 'called function1', + }); + expect(res2).toMatchObject({ + retCode: ResultCodes.success, + result: 'called function2', + }); + }); + + it('Should override previous function registration', () => { + // arrange + const handler = new AquaCallHandler(); + handler.on('service', 'function', (args) => { + return { called: args }; + }); + handler.on('service', 'function', (args) => { + return 'overridden'; + }); + + // act + const res = handler.execute({ + ...req(), + serviceId: 'service', + fnName: 'function', + }); + + // assert + expect(res).toMatchObject({ + retCode: ResultCodes.success, + result: 'overridden', + }); + }); + }); + + describe('Middleware combination tests', () => { + it('Should work with NON overlapping function registration', () => { + // arrange + const base = new AquaCallHandler(); + base.on('service', 'function1', (args) => { + return 'called function1'; + }); + const another = new AquaCallHandler(); + base.on('service', 'function2', (args) => { + return 'called function2'; + }); + + base.combineWith(another); + + // act + const res1 = base.execute({ + ...req(), + serviceId: 'service', + fnName: 'function1', + }); + const res2 = base.execute({ + ...req(), + serviceId: 'service', + fnName: 'function2', + }); + + // assert + expect(res1).toMatchObject({ + retCode: ResultCodes.success, + result: 'called function1', + }); + expect(res2).toMatchObject({ + retCode: ResultCodes.success, + result: 'called function2', + }); + }); + + it('Should work with overlapping function registration', () => { + // arrange + const base = new AquaCallHandler(); + base.on('service', 'function', (args) => { + return { called: args }; + }); + const another = new AquaCallHandler(); + another.on('service', 'function', (args) => { + return 'overridden'; + }); + + base.combineWith(another); + + // act + const res = base.execute({ + ...req(), + serviceId: 'service', + fnName: 'function', + }); + + // assert + expect(res).toMatchObject({ + retCode: ResultCodes.success, + result: 'overridden', + }); + }); + }); +}); diff --git a/src/__test__/unit/RequestFlow.spec.ts b/src/__test__/unit/RequestFlow.spec.ts new file mode 100644 index 00000000..5bcc4432 --- /dev/null +++ b/src/__test__/unit/RequestFlow.spec.ts @@ -0,0 +1,31 @@ +import PeerId from 'peer-id'; +import { genUUID } from '../../internal/particle'; +import { seedToPeerId } from '../../internal/peerIdUtils'; +import { RequestFlow } from '../../internal/RequestFlow'; + +describe('Request flow tests', () => { + it('particle initiation should work', async () => { + // arrange + jest.useFakeTimers(); + const seed = '4vzv3mg6cnjpEK24TXXLA3Ye7QrvKWPKqfbDvAKAyLK6'; + const mockDate = new Date(Date.UTC(2021, 2, 14)).valueOf(); + Date.now = jest.fn(() => mockDate); + + const request = RequestFlow.createLocal('(null)', 10000); + const peerId = await seedToPeerId(seed); + + // act + await request.initState(peerId); + + // assert + const particle = request.getParticle(); + expect(particle).toMatchObject({ + init_peer_id: peerId.toB58String(), + script: '(null)', + signature: '', + timestamp: mockDate, + ttl: 10000, + }); + expect(setTimeout).toHaveBeenCalledTimes(1); + }); +}); diff --git a/src/__test__/unit/air.spec.ts b/src/__test__/unit/air.spec.ts index 7bb88153..738f868e 100644 --- a/src/__test__/unit/air.spec.ts +++ b/src/__test__/unit/air.spec.ts @@ -1,26 +1,34 @@ -import { createLocalClient } from '../connection'; -import {subscribeForErrors} from "../../api"; +import { createClient, FluenceClient } from '../../api.unstable'; +import { RequestFlow } from '../../internal/RequestFlow'; +import { RequestFlowBuilder } from '../../internal/RequestFlowBuilder'; + +let client: FluenceClient; describe('== AIR suite', () => { + afterEach(async () => { + if (client) { + await client.disconnect(); + } + }); + it('check init_peer_id', async function () { // arrange const serviceId = 'test_service'; const fnName = 'return_first_arg'; + const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [%init_peer_id%])`; - const client = await createLocalClient(); - - let res; - client.registerCallback(serviceId, fnName, (args, _) => { - res = args[0]; - return res; - }); + // prettier-ignore + const [request, promise] = new RequestFlowBuilder() + .withRawScript(script) + .buildAsFetch(serviceId, fnName); // act - const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [%init_peer_id%])`; - await client.sendScript(script); + client = await createClient(); + await client.initiateFlow(request); + const [result] = await promise; // assert - expect(res).toEqual(client.selfPeerId); + expect(result).toBe(client.selfPeerId); }); it('call local function', async function () { @@ -28,10 +36,10 @@ describe('== AIR suite', () => { const serviceId = 'test_service'; const fnName = 'return_first_arg'; - const client = await createLocalClient(); + client = await createClient(); let res; - client.registerCallback(serviceId, fnName, (args, _) => { + client.aquaCallHandler.on(serviceId, fnName, (args, _) => { res = args[0]; return res; }); @@ -39,69 +47,66 @@ describe('== AIR suite', () => { // act const arg = 'hello'; const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") ["${arg}"])`; - await client.sendScript(script); + await client.initiateFlow(RequestFlow.createLocal(script)); // assert expect(res).toEqual(arg); }); - it('call broken script', async function () { - // arrange - const client = await createLocalClient(); - const script = `(incorrect)`; + describe('error handling', () => { + it('call broken script', async function () { + // arrange + const script = `(incorrect)`; + // prettier-ignore + const [request, error] = new RequestFlowBuilder() + .withRawScript(script) + .buildWithErrorHandling(); - // act - const promise = client.sendScript(script); + // act + client = await createClient(); + await client.initiateFlow(request); - // assert - await expect(promise).rejects.toContain("aqua script can't be parsed"); - }); + // assert + await expect(error).rejects.toContain("aqua script can't be parsed"); + }); - it('call script without ttl', async function () { - // arrange - const client = await createLocalClient(); - const script = `(call %init_peer_id% ("op" "identity") [""])`; + it('call script without ttl', async function () { + // arrange + const script = `(null)`; + // prettier-ignore + const [request, promise] = new RequestFlowBuilder() + .withTTL(0) + .withRawScript(script) + .buildAsFetch(); - // act - const promise = client.sendScript(script, undefined, 0); + // act + client = await createClient(); + await client.initiateFlow(request); - // assert - await expect(promise).rejects.toContain('Particle expired'); - }); - - it.skip('call broken script by fetch', async function () { - // arrange - const client = await createLocalClient(); - const script = `(incorrect)`; - - // act - const promise = client.fetch(script, ['result']); - - // assert - await expect(promise).rejects.toContain("aqua script can't be parsed"); + // assert + await expect(promise).rejects.toContain('Timed out after'); + }); }); it('check particle arguments', async function () { // arrange const serviceId = 'test_service'; const fnName = 'return_first_arg'; + const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [arg1])`; - const client = await createLocalClient(); - - let res; - client.registerCallback(serviceId, fnName, (args, _) => { - res = args[0]; - return res; - }); + // prettier-ignore + const [request, promise] = new RequestFlowBuilder() + .withRawScript(script) + .withVariable('arg1', 'hello') + .buildAsFetch(serviceId, fnName); // act - const script = `(call %init_peer_id% ("${serviceId}" "${fnName}") [arg1])`; - const data = new Map(); - data.set('arg1', 'hello'); - await client.sendScript(script, data); + client = await createClient(); + await client.initiateFlow(request); + const [result] = await promise; // assert - expect(res).toEqual('hello'); + expect(result).toEqual('hello'); }); it('check security tetraplet', async function () { @@ -111,15 +116,15 @@ describe('== AIR suite', () => { const getDataServiceId = 'get_data_service'; const getDataFnName = 'get_data'; - const client = await createLocalClient(); + client = await createClient(); - client.registerCallback(makeDataServiceId, makeDataFnName, (args, _) => { + client.aquaCallHandler.on(makeDataServiceId, makeDataFnName, (args, _) => { return { field: 42, }; }); let res; - client.registerCallback(getDataServiceId, getDataFnName, (args, tetraplets) => { + client.aquaCallHandler.on(getDataServiceId, getDataFnName, (args, tetraplets) => { res = { args: args, tetraplets: tetraplets, @@ -133,7 +138,7 @@ describe('== AIR suite', () => { (call %init_peer_id% ("${makeDataServiceId}" "${makeDataFnName}") [] result) (call %init_peer_id% ("${getDataServiceId}" "${getDataFnName}") [result.$.field]) )`; - await client.sendScript(script); + await client.initiateFlow(new RequestFlowBuilder().withRawScript(script).build()); // assert const tetraplet = res.tetraplets[0][0]; @@ -146,12 +151,12 @@ describe('== AIR suite', () => { it('check chain of services work properly', async function () { // arrange - const client = await createLocalClient(); + client = await createClient(); const serviceId1 = 'check1'; const fnName1 = 'fn1'; let res1; - client.registerCallback(serviceId1, fnName1, (args, _) => { + client.aquaCallHandler.on(serviceId1, fnName1, (args, _) => { res1 = args[0]; return res1; }); @@ -159,7 +164,7 @@ describe('== AIR suite', () => { const serviceId2 = 'check2'; const fnName2 = 'fn2'; let res2; - client.registerCallback(serviceId2, fnName2, (args, _) => { + client.aquaCallHandler.on(serviceId2, fnName2, (args, _) => { res2 = args[0]; return res2; }); @@ -167,7 +172,7 @@ describe('== AIR suite', () => { const serviceId3 = 'check3'; const fnName3 = 'fn3'; let res3; - client.registerCallback(serviceId3, fnName3, (args, _) => { + client.aquaCallHandler.on(serviceId3, fnName3, (args, _) => { res3 = args; return res3; }); @@ -182,7 +187,7 @@ describe('== AIR suite', () => { (call %init_peer_id% ("${serviceId2}" "${fnName2}") ["${arg2}"] result2)) (call %init_peer_id% ("${serviceId3}" "${fnName3}") [result1 result2])) `; - await client.sendScript(script); + await client.initiateFlow(new RequestFlowBuilder().withRawScript(script).build()); // assert expect(res1).toEqual(arg1); diff --git a/src/__test__/unit/ast.spec.ts b/src/__test__/unit/ast.spec.ts index 855b309f..6e5b6735 100644 --- a/src/__test__/unit/ast.spec.ts +++ b/src/__test__/unit/ast.spec.ts @@ -1,8 +1,9 @@ -import { parseAIR } from '../../internal/stepper'; +import { AquamarineInterpreter } from '../../internal/aqua/interpreter'; describe('== AST parsing suite', () => { it('parse simple script and return ast', async function () { - let ast = await parseAIR(` + const interpreter = await AquamarineInterpreter.create({} as any); + let ast = interpreter.parseAir(` (call node ("service" "function") [1 2 3 arg] output) `); diff --git a/src/__test__/unit/peerId.spec.ts b/src/__test__/unit/peerId.spec.ts new file mode 100644 index 00000000..a1d351da --- /dev/null +++ b/src/__test__/unit/peerId.spec.ts @@ -0,0 +1,13 @@ +import { encode } from 'bs58'; +import { peerIdToSeed, seedToPeerId } from '../..'; + +describe('Peer Id utils', () => { + it('should create private key from seed and back', async function () { + // prettier-ignore + let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201]; + let seedStr = encode(seed); + + let pid = await seedToPeerId(seedStr); + expect(peerIdToSeed(pid)).toEqual(seedStr); + }); +}); diff --git a/src/api.ts b/src/api.ts index dab43749..00503d84 100644 --- a/src/api.ts +++ b/src/api.ts @@ -1,11 +1,42 @@ -import { FluenceClient } from './FluenceClient'; -import { SecurityTetraplet } from './internal/commonTypes'; -import { Particle } from './internal/particle'; import Multiaddr from 'multiaddr'; -import PeerId, { isPeerId } from 'peer-id'; -import { generatePeerId, seedToPeerId } from './internal/peerIdUtils'; -import { FluenceClientImpl } from './internal/FluenceClientImpl'; -import log from 'loglevel'; +import PeerId from 'peer-id'; +import { PeerIdB58, SecurityTetraplet } from './internal/commonTypes'; +import * as unstable from './api.unstable'; +import { ClientImpl } from './internal/ClientImpl'; +import { RequestFlowBuilder } from './internal/RequestFlowBuilder'; +import { RequestFlow } from './internal/RequestFlow'; + +/** + * The class represents interface to Fluence Platform. To create a client @see {@link createClient} function. + */ +export interface FluenceClient { + /** + * { string } Gets the base58 representation of the current peer id. Read only + */ + readonly relayPeerId: PeerIdB58; + + /** + * { string } Gets the base58 representation of the connected relay's peer id. Read only + */ + readonly selfPeerId: PeerIdB58; + + /** + * { string } True if the client is connected to network. False otherwise. Read only + */ + readonly isConnected: boolean; + + /** + * Disconnects the client from the network + */ + disconnect(): Promise; + + /** + * Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection. + * + * @param {string | Multiaddr} [multiaddr] - Address of the node in Fluence network. + */ + connect(multiaddr: string | Multiaddr): Promise; +} type Node = { peerId: string; @@ -22,44 +53,77 @@ export const createClient = async ( connectTo?: string | Multiaddr | Node, peerIdOrSeed?: PeerId | string, ): Promise => { - let peerId; - if (!peerIdOrSeed) { - peerId = await generatePeerId(); - } else if (isPeerId(peerIdOrSeed)) { - // keep unchanged - peerId = peerIdOrSeed; - } else { - // peerId is string, therefore seed - peerId = await seedToPeerId(peerIdOrSeed); - } - - const client = new FluenceClientImpl(peerId); - - if (connectTo) { - let theAddress: Multiaddr; - let fromNode = (connectTo as any).multiaddr; - if (fromNode) { - theAddress = new Multiaddr(fromNode); - } else { - theAddress = new Multiaddr(connectTo as string); - } - - await client.connect(theAddress); - if (!(await checkConnection(client))) { - throw new Error('Connection check failed. Check if the node is working or try to connect to another node'); - } - } - - return client; + const res = await unstable.createClient(connectTo, peerIdOrSeed); + return res as any; }; +export const checkConnection = async (client: FluenceClient): Promise => { + return unstable.checkConnection(client as any); +}; + +/** + * The class representing Particle - a data structure used to perform operations on Fluence Network. It originates on some peer in the network, travels the network through a predefined path, triggering function execution along its way. + */ +export class Particle { + script: string; + data: Map; + ttl: number; + + /** + * Creates a particle with specified parameters. + * @param { String }script - Air script which defines the execution of a particle – its path, functions it triggers on peers, and so on. + * @param { Map | Record } data - Variables passed to the particle in the form of either JS Map or JS object with keys representing variable names and values representing values correspondingly + * @param { [Number]=7000 } ttl - Time to live, a timout after which the particle execution is stopped by Aquamarine. + */ + constructor(script: string, data?: Map | Record, ttl?: number) { + this.script = script; + if (data === undefined) { + this.data = new Map(); + } else if (data instanceof Map) { + this.data = data; + } else { + this.data = new Map(); + for (let k in data) { + this.data.set(k, data[k]); + } + } + + this.ttl = ttl ?? 7000; + } +} + /** * Send a particle to Fluence Network using the specified Fluence Client. * @param { FluenceClient } client - The Fluence Client instance. * @param { Particle } particle - The particle to send. */ -export const sendParticle = async (client: FluenceClient, particle: Particle): Promise => { - return await client.sendScript(particle.script, particle.data, particle.ttl); +export const sendParticle = async ( + client: FluenceClient, + particle: Particle, + onError?: (err) => void, +): Promise => { + const c = client as ClientImpl; + const [req, errorPromise] = new RequestFlowBuilder() + .withRawScript(particle.script) + .withVariables(particle.data) + .withTTL(particle.ttl) + .buildWithErrorHandling(); + + errorPromise.catch(onError); + + await c.initiateFlow(req); + return req.id; +}; + +/* + This map stores functions which unregister callbacks registered by registerServiceFunction + The key sould be created with makeKey. The value is the unresitration function + This is only needed to support legacy api +*/ +const handlersUnregistratorsMap = new Map(); +const makeKey = (client: FluenceClient, serviceId: string, fnName: string) => { + const pid = client.selfPeerId || ''; + return `${pid}/${serviceId}/${fnName}`; }; /** @@ -75,7 +139,8 @@ export const registerServiceFunction = ( fnName: string, handler: (args: any[], tetraplets: SecurityTetraplet[][]) => object, ) => { - (client as FluenceClientImpl).registerCallback(serviceId, fnName, handler); + const unregister = (client as ClientImpl).aquaCallHandler.on(serviceId, fnName, handler); + handlersUnregistratorsMap.set(makeKey(client, serviceId, fnName), unregister); }; // prettier-ignore @@ -90,7 +155,12 @@ export const unregisterServiceFunction = ( serviceId: string, fnName: string ) => { - (client as FluenceClientImpl).unregisterCallback(serviceId, fnName); + const key = makeKey(client, serviceId, fnName); + const unuse = handlersUnregistratorsMap.get(key); + if(unuse) { + unuse(); + } + handlersUnregistratorsMap.delete(key); }; /** @@ -136,79 +206,13 @@ export const sendParticleAsFetch = async ( callbackFnName: string, callbackServiceId: string = '_callback', ): Promise => { - const serviceId = callbackServiceId; - const fnName = callbackFnName; + const [request, promise] = new RequestFlowBuilder() + .withRawScript(particle.script) + .withVariables(particle.data) + .withTTL(particle.ttl) + .buildAsFetch(callbackServiceId, callbackFnName); - let promise: Promise = new Promise(function (resolve, reject) { - const unsub = subscribeToEvent(client, serviceId, fnName, (args: any[], _) => { - unsub(); - resolve(args as any); - }); - - setTimeout(() => { - unsub(); - reject(new Error(`callback for ${callbackServiceId}/${callbackFnName} timed out after ${particle.ttl}`)); - }, particle.ttl); - }); - - await sendParticle(client, particle); + await (client as ClientImpl).initiateFlow(request); return promise; }; - -export const checkConnection = async (client: FluenceClient): Promise => { - let msg = Math.random().toString(36).substring(7); - let callbackFn = 'checkConnection'; - let callbackService = '_callback'; - - const particle = new Particle( - ` - (seq - (call __relay ("op" "identity") [msg] result) - (call myPeerId ("${callbackService}" "${callbackFn}") [result]) - ) - `, - { - __relay: client.relayPeerId, - myPeerId: client.selfPeerId, - msg, - }, - ); - - if (!client.isConnected) { - return false; - } - - try { - let result = await sendParticleAsFetch(client, particle, callbackFn, callbackService); - if (result[0][0] != msg) { - log.warn("unexpected behavior. 'identity' must return arguments the passed arguments."); - } - return true; - } catch (e) { - log.error('Error on establishing connection: ', e); - return false; - } -}; - -export const subscribeForErrors = (client: FluenceClient, ttl: number): Promise => { - return new Promise((resolve, reject) => { - registerServiceFunction(client, '__magic', 'handle_xor', (args, _) => { - setTimeout(() => { - try { - reject(JSON.parse(args[0])); - } catch { - reject(args); - } - }, 0); - - unregisterServiceFunction(client, '__magic', 'handle_xor'); - return {}; - }); - - setTimeout(() => { - unregisterServiceFunction(client, '__magic', 'handle_xor'); - resolve(); - }, ttl); - }); -}; diff --git a/src/api.unstable.ts b/src/api.unstable.ts new file mode 100644 index 00000000..aab8c930 --- /dev/null +++ b/src/api.unstable.ts @@ -0,0 +1,143 @@ +import Multiaddr from 'multiaddr'; +import PeerId, { isPeerId } from 'peer-id'; +import { generatePeerId, seedToPeerId } from './internal/peerIdUtils'; +import { ClientImpl } from './internal/ClientImpl'; +import log from 'loglevel'; +import { RequestFlowBuilder } from './internal/RequestFlowBuilder'; +import { PeerIdB58 } from './internal/commonTypes'; +import { AquaCallHandler } from './internal/AquaHandler'; +import { RequestFlow } from './internal/RequestFlow'; + +export { RequestFlowBuilder } from './internal/RequestFlowBuilder'; + +/** + * The class represents interface to Fluence Platform. To create a client use @see {@link createClient} function. + */ +export interface FluenceClient { + /** + * { string } Gets the base58 representation of the current peer id. Read only + */ + readonly relayPeerId: PeerIdB58; + + /** + * { string } Gets the base58 representation of the connected relay's peer id. Read only + */ + readonly selfPeerId: PeerIdB58; + + /** + * { string } True if the client is connected to network. False otherwise. Read only + */ + readonly isConnected: boolean; + + /** + * The base handler which is used by every RequestFlow executed by this FluenceClient. + * Please note, that the handler is combined with the handler from RequestFlow before the execution occures. + * After this combination, middlewares from RequestFlow are executed before client handler's middlewares. + */ + readonly aquaCallHandler: AquaCallHandler; + + /** + * Disconnects the client from the network + */ + disconnect(): Promise; + + /** + * Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection. + * + * @param multiaddr + */ + connect(multiaddr: string | Multiaddr): Promise; + + /** + * Initiates RequestFlow execution @see { @link RequestFlow } + * @param { RequestFlow } [ request ] - RequestFlow to start the execution of + */ + initiateFlow(request: RequestFlow): Promise; +} + +type Node = { + peerId: string; + multiaddr: string; +}; + +/** + * Creates a Fluence client. If the `connectTo` is specified connects the client to the network + * @param { string | Multiaddr | Node } [connectTo] - Node in Fluence network to connect to. If not specified client will not be connected to the n + * @param { PeerId | string } [peerIdOrSeed] - The Peer Id of the created client. Specified either as PeerId structure or as seed string. Will be generated randomly if not specified + * @returns { Promise } Promise which will be resolved with the created FluenceClient + */ +export const createClient = async ( + connectTo?: string | Multiaddr | Node, + peerIdOrSeed?: PeerId | string, +): Promise => { + let peerId; + if (!peerIdOrSeed) { + peerId = await generatePeerId(); + } else if (isPeerId(peerIdOrSeed)) { + // keep unchanged + peerId = peerIdOrSeed; + } else { + // peerId is string, therefore seed + peerId = await seedToPeerId(peerIdOrSeed); + } + + const client = new ClientImpl(peerId); + + if (connectTo) { + let theAddress: Multiaddr; + let fromNode = (connectTo as any).multiaddr; + if (fromNode) { + theAddress = new Multiaddr(fromNode); + } else { + theAddress = new Multiaddr(connectTo as string); + } + + await client.connect(theAddress); + if (!(await checkConnection(client))) { + throw new Error('Connection check failed. Check if the node is working or try to connect to another node'); + } + } else { + await client.local(); + } + + return client; +}; + +/** + * Checks the network connection by sending a ping-like request to relat node + * @param { FluenceClient } client - The Fluence Client instance. + */ +export const checkConnection = async (client: FluenceClient): Promise => { + if (!client.isConnected) { + return false; + } + + const msg = Math.random().toString(36).substring(7); + const callbackFn = 'checkConnection'; + const callbackService = '_callback'; + + const [request, promise] = new RequestFlowBuilder() + .withRawScript( + `(seq + (call init_relay ("op" "identity") [msg] result) + (call %init_peer_id% ("${callbackService}" "${callbackFn}") [result]) + )`, + ) + .withVariables({ + msg, + }) + .buildAsFetch<[[string]]>(callbackService, callbackFn); + + await client.initiateFlow(request); + + try { + const [[result]] = await promise; + if (result != msg) { + log.warn("unexpected behavior. 'identity' must return arguments the passed arguments."); + } + return true; + } catch (e) { + log.error('Error on establishing connection: ', e); + return false; + } +}; diff --git a/src/index.ts b/src/index.ts index 2e6e5972..48c2b3e6 100644 --- a/src/index.ts +++ b/src/index.ts @@ -15,10 +15,8 @@ */ export { seedToPeerId, peerIdToSeed, generatePeerId } from './internal/peerIdUtils'; -export { FluenceClient } from './FluenceClient'; export { SecurityTetraplet, PeerIdB58 } from './internal/commonTypes'; export * from './api'; -export { Particle } from './internal/particle'; export * from './internal/builtins'; import log, { LogLevelDesc } from 'loglevel'; diff --git a/src/internal/AquaHandler.ts b/src/internal/AquaHandler.ts new file mode 100644 index 00000000..4cb7bdc3 --- /dev/null +++ b/src/internal/AquaHandler.ts @@ -0,0 +1,227 @@ +import { ResultCodes, SecurityTetraplet } from './commonTypes'; + +/** + * Particle context. Contains additional information about particle which triggered `call` air instruction from Aquamarine interpreter + */ +interface ParticleContext { + /** + * The particle ID + */ + particleId: string; + [x: string]: any; +} + +/** + * Represents the information passed from Aquamarine interpreter when a `call` air instruction is executed on the local peer + */ +interface AquaCall { + /** + * Service ID as specified in `call` air instruction + */ + serviceId: string; + + /** + * Function name as specified in `call` air instruction + */ + fnName: string; + + /** + * Arguments as specified in `call` air instruction + */ + args: any[]; + + /** + * Security Tetraplets recieved from Aquamarine interpreter + */ + tetraplets: SecurityTetraplet[][]; + + /** + * Particle context, @see {@link ParticleContext} + */ + particleContext: ParticleContext; + + [x: string]: any; +} + +/** + * Represents the result of the `call` air instruction to be returned into Aquamarine interpreter + */ +interface AquaCallResult { + /** + * Return code to be returned to Aquamarine interpreter + */ + retCode: ResultCodes; + + /** + * Resul object to be returned to Aquamarine interpreter + */ + result?: any; + [x: string]: any; +} + +/** + * Type for the middleware used in AquaCallHandler middleware chain. + * In a nutshell middelware is a function of request, response and function to trigger the next middleware in chain. + * Each middleware is free to write additional properties to either request or response object. + * When the chain finishes the response is passed back to Aquamarine interpreter + * @param { AquaCall } req - information about the air `call` instruction + * @param { AquaCallResult } resp - response to be passed to Aquamarine interpreter + * @param { Function } next - function which invokes next middleware in chain + */ +export type Middleware = (req: AquaCall, resp: AquaCallResult, next: Function) => void; + +/** + * Convenience middleware factory. Registeres a handler for a pair of 'serviceId/fnName'. + * The return value of the handler is passed back to Aquamarine + * @param { string } serviceId - The identifier of service which would be used to make calls from Aquamarine + * @param { string } fnName - The identifier of function which would be used to make calls from Aquamarine + * @param { (args: any[], tetraplets: SecurityTetraplet[][]) => object } handler - The handler which should handle the call. The result is any object passed back to Aquamarine + */ +export const fnHandler = ( + serviceId: string, + fnName: string, + handler: (args: any[], tetraplets: SecurityTetraplet[][]) => any, +) => { + return (req: AquaCall, resp: AquaCallResult, next: Function): void => { + if (req.fnName === fnName && req.serviceId === serviceId) { + const res = handler(req.args, req.tetraplets); + resp.retCode = ResultCodes.success; + resp.result = res; + } + next(); + }; +}; + +/** + * Convenience middleware factory. Registeres a handler for a pair of 'serviceId/fnName'. + * Similar to @see { @link fnHandler } but instead returns and empty object immediately runs the handler asynchronously + * @param { string } serviceId - The identifier of service which would be used to make calls from Aquamarine + * @param { string } fnName - The identifier of function which would be used to make calls from Aquamarine + * @param { (args: any[], tetraplets: SecurityTetraplet[][]) => void } handler - The handler which should handle the call. + */ +export const fnAsEventHandler = ( + serviceId: string, + fnName: string, + handler: (args: any[], tetraplets: SecurityTetraplet[][]) => void, +) => { + return (req: AquaCall, resp: AquaCallResult, next: Function): void => { + if (req.fnName === fnName && req.serviceId === serviceId) { + setTimeout(() => { + handler(req.args, req.tetraplets); + }, 0); + + resp.retCode = ResultCodes.success; + resp.result = {}; + } + next(); + }; +}; + +/** + * Error catching middleware + */ +export const errorHandler: Middleware = (req: AquaCall, resp: AquaCallResult, next: Function): void => { + try { + next(); + } catch (e) { + resp.retCode = ResultCodes.exceptionInHandler; + resp.result = e.toString(); + } +}; + +type AquaCallFunction = (req: AquaCall, resp: AquaCallResult) => void; + +/** + * Class defines the handling of a `call` air intruction executed by aquamarine on the local peer. + * All the execution process is defined by the chain of middlewares - architecture popular among backend web frameworks. + * Each middleware has the form of `(req: AquaCall, resp: AquaCallResult, next: Function) => void;` + * A handler starts with an empty middleware chain and does nothing. + * To execute the handler use @see { @link execute } function + */ +export class AquaCallHandler { + private middlewares: Middleware[] = []; + + /** + * Appends middleware to the chain of middlewares + * @param { Middleware } middleware + */ + use(middleware: Middleware): AquaCallHandler { + this.middlewares.push(middleware); + return this; + } + + /** + * Removes the middleware from the chain of middlewares + * @param { Middleware } middleware + */ + unUse(middleware: Middleware): AquaCallHandler { + const index = this.middlewares.indexOf(middleware); + if (index !== -1) { + this.middlewares.splice(index, 1); + } + return this; + } + + /** + * Combine handler with another one. Combintaion is done by copying middleware chain from the argument's handler into current one. + * Please note, that current handler's middlewares take precedence over the ones from handler to be combined with + * @param { AquaCallHandler } other - AquaCallHandler to be combined with + */ + combineWith(other: AquaCallHandler): AquaCallHandler { + this.middlewares = [...this.middlewares, ...other.middlewares]; + return this; + } + + /** + * Convinience method for registring @see { @link fnHandler } middleware + */ + on(serviceId: string, fnName: string, handler: (args: any[], tetraplets: SecurityTetraplet[][]) => any): Function { + const mw = fnHandler(serviceId, fnName, handler); + this.use(mw); + return () => { + this.unUse(mw); + }; + } + + /** + * Convinience method for registring @see { @link fnAsEventHandler } middleware + */ + onEvent( + serviceId: string, + fnName: string, + handler: (args: any[], tetraplets: SecurityTetraplet[][]) => void, + ): Function { + const mw = fnAsEventHandler(serviceId, fnName, handler); + this.use(mw); + return () => { + this.unUse(mw); + }; + } + + /** + * Collapses middleware chain into a single function. + */ + buildFunction(): AquaCallFunction { + const result = this.middlewares.reduceRight( + (agg, cur) => { + return (req, resp) => { + cur(req, resp, () => agg(req, resp)); + }; + }, + (req, res) => {}, + ); + + return result; + } + + /** + * Executes the handler with the specified AquaCall request. Return the result response + */ + execute(req: AquaCall): AquaCallResult { + const res: AquaCallResult = { + retCode: ResultCodes.unkownError, + }; + this.buildFunction()(req, res); + return res; + } +} diff --git a/src/internal/FluenceClientBase.ts b/src/internal/ClientImpl.ts similarity index 62% rename from src/internal/FluenceClientBase.ts rename to src/internal/ClientImpl.ts index 07315d61..76773282 100644 --- a/src/internal/FluenceClientBase.ts +++ b/src/internal/ClientImpl.ts @@ -14,16 +14,25 @@ * limitations under the License. */ -import { build } from './particle'; import * as PeerId from 'peer-id'; import Multiaddr from 'multiaddr'; import { FluenceConnection } from './FluenceConnection'; import { ParticleProcessor } from './ParticleProcessor'; -import { ParticleProcessorStrategy } from './ParticleProcessorStrategy'; -import { PeerIdB58 } from './commonTypes'; +import { PeerIdB58, SecurityTetraplet } from './commonTypes'; +import { FluenceClient } from 'src'; +import { RequestFlow } from './RequestFlow'; +import { AquaCallHandler, errorHandler, fnHandler } from './AquaHandler'; +import { loadRelayFn, loadVariablesService } from './RequestFlowBuilder'; -export abstract class FluenceClientBase { +const makeDefaultClientHandler = (): AquaCallHandler => { + const res = new AquaCallHandler(); + res.use(errorHandler); + res.use(fnHandler('op', 'identity', (args, _) => args)); + return res; +}; + +export class ClientImpl implements FluenceClient { readonly selfPeerIdFull: PeerId; get relayPeerId(): PeerIdB58 | undefined { @@ -38,16 +47,21 @@ export abstract class FluenceClientBase { return this.connection?.isConnected(); } - protected connection: FluenceConnection; + private connection: FluenceConnection; protected processor: ParticleProcessor; - protected abstract strategy: ParticleProcessorStrategy; constructor(selfPeerIdFull: PeerId) { this.selfPeerIdFull = selfPeerIdFull; + this.aquaCallHandler = makeDefaultClientHandler(); + this.processor = new ParticleProcessor(selfPeerIdFull, this.aquaCallHandler); } + aquaCallHandler: AquaCallHandler; + async disconnect(): Promise { - await this.connection.disconnect(); + if (this.connection) { + await this.connection.disconnect(); + } await this.processor.destroy(); } @@ -79,17 +93,19 @@ export abstract class FluenceClientBase { multiaddr, node, this.selfPeerIdFull, - this.processor.executeExternalParticle.bind(this.processor), + this.processor.executeIncomingParticle.bind(this.processor), ); await connection.connect(); - await this.processor.init(); - this.connection = connection; + await this.processor.init(connection); } - async sendScript(script: string, data?: Map, ttl?: number): Promise { - const particle = await build(this.selfPeerIdFull, this.relayPeerId, script, data, ttl); - await this.processor.executeLocalParticle(particle); - return particle.id; + async initiateFlow(request: RequestFlow): Promise { + // setting `relayVariableName` here. If the client is not connected (i.e it is created as local) then there is no relay + request.handler.on(loadVariablesService, loadRelayFn, () => { + return this.relayPeerId || ''; + }); + await request.initState(this.selfPeerIdFull); + this.processor.executeLocalParticle(request); } } diff --git a/src/internal/FluenceClientImpl.ts b/src/internal/FluenceClientImpl.ts deleted file mode 100644 index e2c6dfe2..00000000 --- a/src/internal/FluenceClientImpl.ts +++ /dev/null @@ -1,259 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import log from 'loglevel'; -import PeerId from 'peer-id'; -import { SecurityTetraplet, StepperOutcome } from './commonTypes'; -import { FluenceClientBase } from './FluenceClientBase'; -import { FluenceClient } from '../FluenceClient'; -import { build, genUUID, ParticleDto } from './particle'; -import { ParticleProcessor } from './ParticleProcessor'; -import { ParticleProcessorStrategy } from './ParticleProcessorStrategy'; - -const fetchCallbackServiceName = '__callback'; -const selfRelayVarName = '__relay'; - -const wrapRelayBasedCall = (script: string) => { - return ` - (seq - (call ${selfRelayVarName} ("op" "identity") []) - ${script} - ) - `; -}; - -const wrapFetchCall = (script: string, particleId: string, resultArgNames: string[]) => { - // TODO: sanitize - const resultTogether = resultArgNames.join(' '); - let res = ` - (seq - ${script} - (seq - (call ${selfRelayVarName} ("op" "identity") []) - (call %init_peer_id% ("${fetchCallbackServiceName}" "${particleId}") [${resultTogether}]) - ) - )`; - return wrapRelayBasedCall(res); -}; - -export interface FluenceEvent { - type: string; - args: any[]; -} - -export type FluenceEventHandler = (event: FluenceEvent) => void; - -export class FluenceClientImpl extends FluenceClientBase implements FluenceClient { - private eventSubscribers: Map = new Map(); - private eventValidators: Map = new Map(); - private callbacks: Map = new Map(); - private fetchParticles: Map = new Map(); - - constructor(selfPeerId: PeerId) { - super(selfPeerId); - this.processor = new ParticleProcessor(this.strategy, selfPeerId); - } - - async fetch(script: string, resultArgNames: string[], data?: Map, ttl?: number): Promise { - data = this.addRelayToArgs(data); - const callBackId = genUUID(); - script = wrapFetchCall(script, callBackId, resultArgNames); - const particle = await build(this.selfPeerIdFull, this.relayPeerId, script, data, ttl, callBackId); - - const prFetch = new Promise(async (resolve, reject) => { - this.fetchParticles.set(callBackId, { resolve, reject }); - }); - const prExec = this.processor.executeLocalParticle(particle); - return prExec.then(() => prFetch); - } - - // TODO:: better naming probably? - async fireAndForget(script: string, data?: Map, ttl?: number) { - data = this.addRelayToArgs(data); - script = wrapRelayBasedCall(script); - - await this.sendScript(script, data, ttl); - } - - registerEvent( - channel: string, - eventName: string, - validate?: (channel: string, eventName: string, args: any[], tetraplets: any[][]) => boolean, - ) { - if (!validate) { - validate = (c, e, a, t) => true; - } - - this.eventValidators.set(`${channel}/${eventName}`, validate); - } - - unregisterEvent(channel: string, eventName: string) { - this.eventValidators.delete(`${channel}/${eventName}`); - } - - registerCallback( - serviceId: string, - fnName: string, - callback: (args: any[], tetraplets: SecurityTetraplet[][]) => object, - ) { - this.callbacks.set(`${serviceId}/${fnName}`, callback); - } - - unregisterCallback(channel: string, eventName: string) { - this.eventValidators.delete(`${channel}/${eventName}`); - } - - subscribe(channel: string, handler: FluenceEventHandler) { - if (!this.eventSubscribers.get(channel)) { - this.eventSubscribers.set(channel, []); - } - - this.eventSubscribers.get(channel).push(handler); - } - - protected strategy: ParticleProcessorStrategy = { - particleHandler: (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => { - // missing built-in op - if (serviceId === 'op' && fnName === 'identity') { - return { - ret_code: 0, - result: JSON.stringify(args), - }; - } - - // async fetch model handling - if (serviceId === fetchCallbackServiceName) { - const executingParticlePromiseFns = this.fetchParticles.get(fnName); - if (executingParticlePromiseFns) { - // don't block - setTimeout(() => { - this.fetchParticles.delete(fnName); - executingParticlePromiseFns.resolve(args); - }, 0); - } - - return { - ret_code: 0, - result: JSON.stringify({}), - }; - } - - // event model handling - const eventPair = `${serviceId}/${fnName}`; - const eventValidator = this.eventValidators.get(eventPair); - if (eventValidator) { - try { - if (!eventValidator(serviceId, fnName, args, tetraplets)) { - return { - ret_code: 1, // TODO:: error codes - result: 'validation failed', - }; - } - } catch (e) { - log.error('error running validation function: ' + e); - return { - ret_code: 1, // TODO:: error codes - result: 'validation failed', - }; - } - - // don't block - setTimeout(() => { - this.pushEvent(serviceId, { - type: fnName, - args: args, - }); - }, 0); - - return { - ret_code: 0, - result: JSON.stringify({}), - }; - } - - // callback model handling - const callback = this.callbacks.get(eventPair); - if (callback) { - try { - const res = callback(args, tetraplets); - return { - ret_code: 0, - result: JSON.stringify(res), - }; - } catch (e) { - return { - ret_code: 1, // TODO:: error codes - result: JSON.stringify(e), - }; - } - } - - return { - ret_code: 1, - result: `Error. There is no service: ${serviceId}`, - }; - }, - - sendParticleFurther: async (particle: ParticleDto) => { - try { - await this.connection.sendParticle(particle); - } catch (reason) { - log.error(`Error on sending particle with id ${particle.id}: ${reason}`); - } - }, - - onParticleTimeout: (particle: ParticleDto, now: number) => { - log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`); - const executingParticle = this.fetchParticles.get(particle.id); - if (executingParticle) { - executingParticle.reject(new Error(`particle ${particle.id} timed out`)); - } - }, - onLocalParticleRecieved: (particle: ParticleDto) => { - log.debug('local particle received', particle); - }, - onExternalParticleRecieved: (particle: ParticleDto) => { - log.debug('external particle received', particle); - }, - onStepperExecuting: (particle: ParticleDto) => { - log.debug('stepper executing particle', particle); - }, - onStepperExecuted: (stepperOutcome: StepperOutcome) => { - log.debug('inner interpreter outcome:', stepperOutcome); - }, - }; - - private pushEvent(channel: string, event: FluenceEvent) { - const subs = this.eventSubscribers.get(channel); - if (subs) { - for (let sub of subs) { - sub(event); - } - } - } - - private addRelayToArgs(data: Map) { - if (data === undefined) { - data = new Map(); - } - - if (!data.has(selfRelayVarName)) { - data.set(selfRelayVarName, this.relayPeerId); - } - - return data; - } -} diff --git a/src/internal/FluenceConnection.ts b/src/internal/FluenceConnection.ts index fa1cf115..9c73db8f 100644 --- a/src/internal/FluenceConnection.ts +++ b/src/internal/FluenceConnection.ts @@ -23,7 +23,7 @@ import pipe from 'it-pipe'; import Multiaddr from 'multiaddr'; import PeerId from 'peer-id'; import * as log from 'loglevel'; -import { parseParticle, ParticleDto, toPayload } from './particle'; +import { parseParticle, Particle, toPayload } from './particle'; export const PROTOCOL_NAME = '/fluence/faas/1.0.0'; @@ -39,13 +39,13 @@ export class FluenceConnection { private readonly address: Multiaddr; readonly nodePeerId: PeerId; private readonly selfPeerIdStr: string; - private readonly handleParticle: (call: ParticleDto) => void; + private readonly handleParticle: (call: Particle) => void; constructor( multiaddr: Multiaddr, hostPeerId: PeerId, selfPeerId: PeerId, - handleParticle: (call: ParticleDto) => void, + handleParticle: (call: Particle) => void, ) { this.selfPeerId = selfPeerId; this.handleParticle = handleParticle; @@ -118,7 +118,7 @@ export class FluenceConnection { this.status = Status.Disconnected; } - async sendParticle(particle: ParticleDto): Promise { + async sendParticle(particle: Particle): Promise { this.checkConnectedOrThrow(); let action = toPayload(particle); diff --git a/src/internal/ParticleProcessor.ts b/src/internal/ParticleProcessor.ts index abcbe0c8..bdb52cdf 100644 --- a/src/internal/ParticleProcessor.ts +++ b/src/internal/ParticleProcessor.ts @@ -14,249 +14,165 @@ * limitations under the License. */ -import { ParticleDto } from './particle'; +import { logParticle, Particle } from './particle'; import * as PeerId from 'peer-id'; -import { instantiateInterpreter, InterpreterInvoke } from './stepper'; -import { ParticleHandler, SecurityTetraplet, StepperOutcome } from './commonTypes'; +import { ParticleHandler, SecurityTetraplet, CallServiceResult } from './commonTypes'; import log from 'loglevel'; -import { ParticleProcessorStrategy } from './ParticleProcessorStrategy'; - -// HACK:: make an api for aqua stepper to accept variables in an easy way! -let magicParticleStorage: Map> = new Map(); - -// HACK:: make an api for aqua stepper to accept variables in an easy way! -export function injectDataIntoParticle(particleId: string, data: Map, ttl: number) { - log.trace(`setting data for ${particleId}`, data); - magicParticleStorage.set(particleId, data); - setTimeout(() => { - log.trace(`data for ${particleId} is deleted`); - magicParticleStorage.delete(particleId); - }, ttl); -} - -// HACK:: make an api for aqua stepper to accept variables in an easy way! -const wrapWithDataInjectionHandling = ( - handler: ParticleHandler, - getCurrentParticleId: () => string, -): ParticleHandler => { - return (serviceId: string, fnName: string, args: any[], tetraplets: SecurityTetraplet[][]) => { - if (serviceId === '__magic' && fnName === 'load') { - const current = getCurrentParticleId(); - const data = magicParticleStorage.get(current); - - const res = data ? data.get(args[0]) : {}; - return { - ret_code: 0, - result: JSON.stringify(res), - }; - } - - return handler(serviceId, fnName, args, tetraplets); - }; -}; +import { RequestFlow } from './RequestFlow'; +import { AquaCallHandler } from './AquaHandler'; +import { FluenceConnection } from './FluenceConnection'; +import { AquamarineInterpreter } from './aqua/interpreter'; export class ParticleProcessor { - private interpreter: InterpreterInvoke; - private subscriptions: Map = new Map(); - private particlesQueue: ParticleDto[] = []; - private currentParticle?: string; + private readonly peerId: PeerId; + private readonly clientHandler: AquaCallHandler; - strategy: ParticleProcessorStrategy; - peerId: PeerId; + private connection: FluenceConnection; + private interpreter: AquamarineInterpreter; + private requests: Map = new Map(); + private queue: RequestFlow[] = []; + private currentRequestId: string | null = null; + private watchDog; - constructor(strategy: ParticleProcessorStrategy, peerId: PeerId) { - this.strategy = strategy; + constructor(peerId: PeerId, clientHandler: AquaCallHandler) { this.peerId = peerId; + this.clientHandler = clientHandler; } - async init() { - await this.instantiateInterpreter(); + /** + * Instantiate WebAssembly with AIR interpreter to execute AIR scripts + */ + async init(connection?: FluenceConnection) { + this.connection = connection; + this.interpreter = await AquamarineInterpreter.create({ + particleHandler: this.hanlder.bind(this), + peerId: this.peerId, + }); + this.watchDog = setInterval(() => { + for (let key in this.requests.keys) { + if (this.requests.get(key).hasExpired()) { + this.requests.delete(key); + } + } + }, 5000); } async destroy() { - // TODO: destroy interpreter + clearInterval(this.watchDog); } - async executeLocalParticle(particle: ParticleDto): Promise { - this.strategy?.onLocalParticleRecieved(particle); - return new Promise((resolve, reject) => { - // we check by callbacks that the script passed through the interpreter without errors - this.handleParticle(particle, resolve, reject); - }); - } + async executeLocalParticle(request: RequestFlow) { + request.handler.combineWith(this.clientHandler); + this.requests.set(request.id, request); - async executeExternalParticle(particle: ParticleDto): Promise { - this.strategy?.onExternalParticleRecieved(particle); - return await this.handleExternalParticle(particle); - } + logParticle(log.debug, 'external particle received', request.getParticle()); - /* - * private - */ - - private getCurrentParticleId(): string | undefined { - return this.currentParticle; - } - - private setCurrentParticleId(particle: string | undefined) { - this.currentParticle = particle; - } - - private enqueueParticle(particle: ParticleDto): void { - this.particlesQueue.push(particle); - } - - private popParticle(): ParticleDto | undefined { - return this.particlesQueue.pop(); - } - - /** - * Subscriptions will be applied by outside message if id will be the same. - * - * @param particle - * @param ttl time to live, subscription will be deleted after this time - */ - subscribe(particle: ParticleDto, ttl: number) { - let self = this; - setTimeout(() => { - self.subscriptions.delete(particle.id); - self.strategy?.onParticleTimeout(particle, Date.now()); - }, ttl); - this.subscriptions.set(particle.id, particle); - } - - updateSubscription(particle: ParticleDto): boolean { - if (this.subscriptions.has(particle.id)) { - this.subscriptions.set(particle.id, particle); - return true; - } else { - return false; - } - } - - getSubscription(id: string): ParticleDto | undefined { - return this.subscriptions.get(id); - } - - hasSubscription(particle: ParticleDto): boolean { - return this.subscriptions.has(particle.id); - } - - /** - * Pass a particle to a interpreter and send a result to other services. - * `resolve` will be completed if ret_code equals 0 - * `reject` will be completed if ret_code not equals 0 - */ - private async handleParticle(particle: ParticleDto, resolve?: () => void, reject?: (r: any) => any): Promise { - // if a current particle is processing, add new particle to the queue - if (this.getCurrentParticleId() !== undefined && this.getCurrentParticleId() !== particle.id) { - this.enqueueParticle(particle); - } else { - if (this.interpreter === undefined) { - throw new Error('Undefined. Interpreter is not initialized'); - } - // start particle processing if queue is empty - try { - this.setCurrentParticleId(particle.id); - // check if a particle is relevant - let now = Date.now(); - let actualTtl = particle.timestamp + particle.ttl - now; - if (actualTtl <= 0) { - this.strategy?.onParticleTimeout(particle, now); - if (reject) { - reject(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`); - } - } else { - // if there is no subscription yet, previous data is empty - let prevData: Uint8Array = Buffer.from([]); - let prevParticle = this.getSubscription(particle.id); - if (prevParticle) { - prevData = prevParticle.data; - // update a particle in a subscription - this.updateSubscription(particle); - } else { - // set a particle with actual ttl - this.subscribe(particle, actualTtl); - } - this.strategy.onStepperExecuting(particle); - let stepperOutcomeStr = this.interpreter( - particle.init_peer_id, - particle.script, - prevData, - particle.data, - ); - let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr); - - // update data after aquamarine execution - let newParticle: ParticleDto = { ...particle, data: stepperOutcome.data }; - this.strategy.onStepperExecuted(stepperOutcome); - - this.updateSubscription(newParticle); - - // do nothing if there is no `next_peer_pks` or if client isn't connected to the network - if (stepperOutcome.next_peer_pks.length > 0) { - this.strategy.sendParticleFurther(newParticle); - } - - if (stepperOutcome.ret_code == 0) { - if (resolve) { - resolve(); - } - } else { - const error = stepperOutcome.error_message; - if (reject) { - reject(error); - } else { - log.error('Unhandled error: ', error); - } - } - } - } catch (e) { - if (reject) { - reject(e); - } else { - log.error('Unhandled error: ', e); - throw e; - } - } finally { - // get last particle from the queue - let nextParticle = this.popParticle(); - // start the processing of a new particle if it exists - if (nextParticle) { - // update current particle - this.setCurrentParticleId(nextParticle.id); - return await this.handleParticle(nextParticle); - } else { - // wait for a new call (do nothing) if there is no new particle in a queue - this.setCurrentParticleId(undefined); - } - } + try { + this.processRequest(request); + } catch (err) { + log.error('particle processing failed: ' + err); } } /** * Handle incoming particle from a relay. */ - private async handleExternalParticle(particle: ParticleDto): Promise { - let data: any = particle.data; - let error: any = data['protocol!error']; - if (error !== undefined) { - log.error('error in external particle: ', error); + async executeIncomingParticle(particle: Particle) { + logParticle(log.debug, 'external particle received', particle); + + let request = this.requests.get(particle.id); + if (request) { + request.receiveUpdate(particle); } else { - return await this.handleParticle(particle); + request = RequestFlow.createExternal(particle); + request.handler.combineWith(this.clientHandler); + } + this.requests.set(request.id, request); + + await this.processRequest(request); + } + + private async processRequest(request: RequestFlow): Promise { + // enque the request if it's not the currently processed one + if (this.currentRequestId !== null && this.currentRequestId !== request.id) { + this.queue.push(request); + return; + } + + if (this.interpreter === undefined) { + throw new Error('Undefined. Interpreter is not initialized'); + } + + // start request processing if queue is empty + try { + this.currentRequestId = request.id; + if (request.hasExpired()) { + return; + } + + logParticle(log.debug, 'interpreter executing particle', request.getParticle()); + const interpreterOutcome = request.runInterpreter(this.interpreter); + + log.debug('inner interpreter outcome:', { + ret_code: interpreterOutcome.ret_code, + error_message: interpreterOutcome.error_message, + next_peer_pks: interpreterOutcome.next_peer_pks, + }); + + if (interpreterOutcome.ret_code !== 0) { + request.raiseError( + `Interpreter failed with code=${interpreterOutcome.ret_code} message=${interpreterOutcome.error_message}`, + ); + } + + // do nothing if there is no `next_peer_pks` or if client isn't connected to the network + if (interpreterOutcome.next_peer_pks.length > 0) { + if (!this.connection) { + log.error('Cannot send particle: non connected'); + } + + request.sendIntoConnection(this.connection); + } + } finally { + // get last request from the queue + let nextRequest = this.queue.pop(); + + // start the processing of the new request if it exists + if (nextRequest) { + // update current particle + this.currentRequestId = nextRequest.id; + + await this.processRequest(nextRequest); + } else { + // wait for a new call (do nothing) if there is no new particle in a queue + this.currentRequestId = null; + } } } - /** - * Instantiate WebAssembly with AIR interpreter to execute AIR scripts - */ - async instantiateInterpreter() { - this.interpreter = await instantiateInterpreter( - wrapWithDataInjectionHandling( - this.strategy.particleHandler.bind(this), - this.getCurrentParticleId.bind(this), - ), - this.peerId, - ); - } + private hanlder: ParticleHandler = ( + serviceId: string, + fnName: string, + args: any[], + tetraplets: SecurityTetraplet[][], + ): CallServiceResult => { + if (this.currentRequestId === null) { + throw Error('current request can`t be null here'); + } + + const request = this.requests.get(this.currentRequestId); + const res = request.handler.execute({ + serviceId, + fnName, + args, + tetraplets, + particleContext: { + particleId: request.id, + }, + }); + return { + ret_code: res.retCode, + result: JSON.stringify(res.result || {}), + }; + }; } diff --git a/src/internal/ParticleProcessorStrategy.ts b/src/internal/ParticleProcessorStrategy.ts deleted file mode 100644 index 1f6c6429..00000000 --- a/src/internal/ParticleProcessorStrategy.ts +++ /dev/null @@ -1,29 +0,0 @@ -/* - * Copyright 2020 Fluence Labs Limited - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -import { ParticleHandler, StepperOutcome } from './commonTypes'; -import { ParticleDto } from './particle'; - -export interface ParticleProcessorStrategy { - particleHandler: ParticleHandler; - sendParticleFurther: (particle: ParticleDto) => void; - - onParticleTimeout?: (particle: ParticleDto, now: number) => void; - onLocalParticleRecieved?: (particle: ParticleDto) => void; - onExternalParticleRecieved?: (particle: ParticleDto) => void; - onStepperExecuting?: (particle: ParticleDto) => void; - onStepperExecuted?: (stepperOutcome: StepperOutcome) => void; -} diff --git a/src/internal/RequestFlow.ts b/src/internal/RequestFlow.ts new file mode 100644 index 00000000..a2c892e7 --- /dev/null +++ b/src/internal/RequestFlow.ts @@ -0,0 +1,129 @@ +import log, { trace } from 'loglevel'; +import PeerId from 'peer-id'; +import { AquamarineInterpreter } from './aqua/interpreter'; +import { AquaCallHandler } from './AquaHandler'; +import { InterpreterOutcome } from './commonTypes'; +import { FluenceConnection } from './FluenceConnection'; +import { Particle, genUUID } from './particle'; + +export const DEFAULT_TTL = 7000; + +/** + * The class represents the current view (and state) of distributed the particle execution process from client's point of view. + * It stores the intermediate particles state during the process. RequestFlow is identified by the id of the particle that is executed during the flow. + * Each RequestFlow contains a separate (unique to the current flow) AquaCallHandler where the handling of `call` AIR instruction takes place + * Please note, that RequestFlow's is handler is combined with the handler from client before the execution occures. + * After the combination middlewares from RequestFlow are executed before client handler's middlewares. + */ +export class RequestFlow { + private state: Particle; + private prevData: Uint8Array = Buffer.from([]); + private onTimeoutHandlers = []; + private onErrorHandlers = []; + + readonly id: string; + readonly isExternal: boolean; + readonly script: string; + readonly handler = new AquaCallHandler(); + + ttl: number = DEFAULT_TTL; + + static createExternal(particle: Particle): RequestFlow { + const res = new RequestFlow(true, particle.id, particle.script); + res.ttl = particle.ttl; + res.state = particle; + setTimeout(res.raiseTimeout.bind(res), particle.ttl); + return res; + } + + static createLocal(script: string, ttl?: number): RequestFlow { + const res = new RequestFlow(false, genUUID(), script); + res.ttl = ttl ?? DEFAULT_TTL; + return res; + } + + constructor(isExternal: boolean, id: string, script: string) { + this.isExternal = isExternal; + this.id = id; + this.script = script; + } + + onTimeout(handler: () => void) { + this.onTimeoutHandlers.push(handler); + } + + onError(handler: (error) => void) { + this.onErrorHandlers.push(handler); + } + + async initState(peerId: PeerId): Promise { + const id = this.id; + let currentTime = Date.now(); + + const particle: Particle = { + id: id, + init_peer_id: peerId.toB58String(), + timestamp: currentTime, + ttl: this.ttl, + script: this.script, + signature: '', + data: Buffer.from([]), + }; + + this.state = particle; + setTimeout(this.raiseTimeout.bind(this), particle.ttl); + } + + receiveUpdate(particle: Particle) { + // TODO:: keep the history of particle data mb? + this.prevData = this.state.data; + this.state.data = particle.data; + } + + async sendIntoConnection(connection: FluenceConnection): Promise { + const particle = this.state; + try { + await connection.sendParticle(particle); + } catch (err) { + log.error(`Error on sending particle with id ${particle.id}: ${err}`); + } + } + + runInterpreter(interpreter: AquamarineInterpreter) { + const interpreterOutcomeStr = interpreter.invoke( + this.state.init_peer_id, + this.state.script, + this.prevData, + this.state.data, + ); + const interpreterOutcome: InterpreterOutcome = JSON.parse(interpreterOutcomeStr); + // TODO:: keep the history of particle data mb? + this.state.data = interpreterOutcome.data; + return interpreterOutcome; + } + + getParticle = () => this.state; + + hasExpired(): boolean { + let now = Date.now(); + const particle = this.getParticle(); + let actualTtl = particle.timestamp + particle.ttl - now; + return actualTtl <= 0; + } + + raiseError(error) { + for (const h of this.onErrorHandlers) { + h(error); + } + } + + private raiseTimeout() { + const now = Date.now(); + const particle = this.state; + log.info(`Particle expired. Now: ${now}, ttl: ${particle?.ttl}, ts: ${particle?.timestamp}`); + + for (const h of this.onTimeoutHandlers) { + h(); + } + } +} diff --git a/src/internal/RequestFlowBuilder.ts b/src/internal/RequestFlowBuilder.ts new file mode 100644 index 00000000..11b63c09 --- /dev/null +++ b/src/internal/RequestFlowBuilder.ts @@ -0,0 +1,197 @@ +import { of } from 'ipfs-only-hash'; +import log from 'loglevel'; +import { AquaCallHandler } from './AquaHandler'; +import { DEFAULT_TTL, RequestFlow } from './RequestFlow'; + +export const loadVariablesService = 'load'; +const loadVariablesFn = 'load_variable'; +export const loadRelayFn = 'load_relay'; +const xorHandleService = '__magic'; +const xorHandleFn = 'handle_xor'; +export const relayVariableName = 'init_relay'; + +const wrapWithXor = (script: string): string => { + return ` + (xor + ${script} + (xor + (match ${relayVariableName} "" + (call %init_peer_id% ("${xorHandleService}" "${xorHandleFn}") [%last_error%]) + ) + (seq + (call ${relayVariableName} ("op" "identity") []) + (call %init_peer_id% ("${xorHandleService}" "${xorHandleFn}") [%last_error%]) + ) + ) + )`; +}; + +class ScriptBuilder { + private script: string; + + raw(script: string): ScriptBuilder { + this.script = script; + return this; + } + + build(): string { + return this.script; + } +} + +const wrapWithVariableInjectionScript = (script: string, fields: string[]): string => { + fields.forEach((v) => { + script = ` +(seq + (call %init_peer_id% ("${loadVariablesService}" "${loadVariablesFn}") ["${v}"] ${v}) + ${script} +)`; + }); + + return script; +}; + +const wrapWithInjectRelayScript = (script: string): string => { + return ` +(seq + (seq + (call %init_peer_id% ("${loadVariablesService}" "${loadRelayFn}") [] ${relayVariableName}) + (call %init_peer_id% ("op" "identity") [%init_peer_id%] init_peer_id) + ) + ${script} +)`; +}; + +export class RequestFlowBuilder { + private ttl: number = DEFAULT_TTL; + private variables = new Map(); + private handlerConfigs: Array<(handler: AquaCallHandler) => void> = []; + private buildScript: (sb: ScriptBuilder) => void; + private onTimeout: () => void; + private onError: (error: any) => void; + + build() { + if (!this.buildScript) { + throw new Error(); + } + + const b = new ScriptBuilder(); + this.buildScript(b); + let script = b.build(); + script = wrapWithVariableInjectionScript(script, Array.from(this.variables.keys())); + script = wrapWithXor(script); + script = wrapWithInjectRelayScript(script); + + const res = RequestFlow.createLocal(script, this.ttl); + res.handler.on(loadVariablesService, loadVariablesFn, (args, _) => { + return this.variables.get(args[0]) || {}; + }); + res.handler.onEvent(xorHandleService, xorHandleFn, (args) => { + let msg; + try { + msg = JSON.parse(args[0]); + } catch (e) { + msg = e; + } + + try { + res.raiseError(msg); + } catch (e) { + log.error('Error handling script executed with error', e); + } + }); + + for (let h of this.handlerConfigs) { + h(res.handler); + } + + if (this.onTimeout) { + res.onTimeout(this.onTimeout); + } + if (this.onError) { + res.onError(this.onError); + } + + return res; + } + + withScript(action: (sb: ScriptBuilder) => void): RequestFlowBuilder { + this.buildScript = action; + return this; + } + + withRawScript(script: string): RequestFlowBuilder { + this.buildScript = (sb) => { + sb.raw(script); + }; + return this; + } + + withTTL(ttl: number): RequestFlowBuilder { + this.ttl = ttl; + return this; + } + + configHandler(config: (handler: AquaCallHandler) => void): RequestFlowBuilder { + this.handlerConfigs.push(config); + return this; + } + + handleTimeout(handler: () => void): RequestFlowBuilder { + this.onTimeout = handler; + return this; + } + + handleScriptError(handler: (error) => void): RequestFlowBuilder { + this.onError = handler; + return this; + } + + withVariable(name: string, value: any): RequestFlowBuilder { + this.variables.set(name, value); + return this; + } + + withVariables(data: Map | Record): RequestFlowBuilder { + if (data instanceof Map) { + this.variables = new Map([...Array.from(this.variables.entries()), ...Array.from(data.entries())]); + } else { + for (let k in data) { + this.variables.set(k, data[k]); + } + } + + return this; + } + + buildAsFetch( + callbackServiceId: string = 'callback', + callbackFnName: string = 'callback', + ): [RequestFlow, Promise] { + const fetchPromise = new Promise((resolve, reject) => { + this.handlerConfigs.push((h) => { + h.onEvent(callbackServiceId, callbackFnName, (args, _) => { + resolve(args as any); + }); + }); + + this.handleTimeout(() => { + reject(`Timed out after ${this.ttl}ms`); + }); + + this.handleScriptError((e) => { + reject(e); + }); + }); + + return [this.build(), fetchPromise]; + } + + buildWithErrorHandling(): [RequestFlow, Promise] { + const promise = new Promise((resolve, reject) => { + this.handleScriptError(reject); + }); + + return [this.build(), promise]; + } +} diff --git a/src/internal/aqua/index.ts b/src/internal/aqua/index.ts index b1208c3e..683e2fea 100644 --- a/src/internal/aqua/index.ts +++ b/src/internal/aqua/index.ts @@ -102,16 +102,16 @@ function passArray8ToWasm0(wasm, arg, malloc) { /** * @param {any} wasm - * @param {string} init_user_id + * @param {string} init_peer_id * @param {string} aqua * @param {string} prev_data * @param {string} data * @param {string} log_level * @returns {string} */ -export function invoke(wasm, init_user_id, aqua, prev_data, data, log_level) { +export function invoke(wasm, init_peer_id, aqua, prev_data, data, log_level) { try { - var ptr0 = passStringToWasm0(wasm, init_user_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc); + var ptr0 = passStringToWasm0(wasm, init_peer_id, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc); var len0 = WASM_VECTOR_LEN; var ptr1 = passStringToWasm0(wasm, aqua, wasm.__wbindgen_malloc, wasm.__wbindgen_realloc); var len1 = WASM_VECTOR_LEN; diff --git a/src/internal/stepper.ts b/src/internal/aqua/interpreter.ts similarity index 81% rename from src/internal/stepper.ts rename to src/internal/aqua/interpreter.ts index 1e6a976d..b0112d85 100644 --- a/src/internal/stepper.ts +++ b/src/internal/aqua/interpreter.ts @@ -15,20 +15,22 @@ */ import { toByteArray } from 'base64-js'; -import * as aqua from './aqua'; -import { return_current_peer_id, return_call_service_result, getStringFromWasm0, free } from './aqua'; -import { ParticleHandler, CallServiceResult, SecurityTetraplet } from './commonTypes'; +import * as aqua from '.'; +import { return_current_peer_id, return_call_service_result, getStringFromWasm0, free } from '.'; +import { ParticleHandler, CallServiceResult, SecurityTetraplet } from '../commonTypes'; import PeerId from 'peer-id'; import log from 'loglevel'; import wasmBs64 from '@fluencelabs/aquamarine-interpreter'; -export type InterpreterInvoke = ( - init_user_id: string, +// prettier-ignore +type InterpreterInvoke = ( + init_peer_id: string, script: string, prev_data: Uint8Array, data: Uint8Array, ) => string; + type ImportObject = { './aquamarine_client_bg.js': { // fn call_service_impl(service_id: String, fn_name: String, args: String, security_tetraplets: String) -> String; @@ -140,7 +142,7 @@ const theParticleHandler = ( tetrapletsObject = JSON.parse(tetraplets); } catch (err) { - console.error('Cannot parse arguments: ' + JSON.stringify(err)); + log.error('Cannot parse arguments: ' + JSON.stringify(err)); return { result: JSON.stringify('Cannot parse arguments: ' + JSON.stringify(err)), ret_code: 1, @@ -187,7 +189,7 @@ function newImportObject(particleHandler: ParticleHandler, cfg: HostImportsConfi return_current_peer_id(wasm, peerIdStr, arg0); }, __wbindgen_throw: (arg: any) => { - console.log(`wbindgen throw: ${JSON.stringify(arg)}`); + log.error(`wbindgen throw: ${JSON.stringify(arg)}`); }, }, host: log_import(cfg), @@ -206,17 +208,13 @@ function newLogImport(cfg: HostImportsConfig): ImportObject { } /// Instantiates AIR interpreter, and returns its `invoke` function as closure -/// NOTE: an interpreter is also called a stepper from time to time -export async function instantiateInterpreter( - particleHandler: ParticleHandler, - peerId: PeerId, -): Promise { +async function instantiateInterpreter(particleHandler: ParticleHandler, peerId: PeerId): Promise { let cfg = new HostImportsConfig((cfg) => { return newImportObject(particleHandler, cfg, peerId); }); let instance = await interpreterInstance(cfg); - return (init_user_id: string, script: string, prev_data: Uint8Array, data: Uint8Array) => { + return (init_peer_id: string, script: string, prev_data: Uint8Array, data: Uint8Array) => { let logLevel = log.getLevel(); let logLevelStr = 'info'; if (logLevel === 0) { @@ -233,13 +231,13 @@ export async function instantiateInterpreter( logLevelStr = 'off'; } - return aqua.invoke(instance.exports, init_user_id, script, prev_data, data, logLevelStr); + return aqua.invoke(instance.exports, init_peer_id, script, prev_data, data, logLevelStr); }; } /// Instantiate AIR interpreter with host imports containing only logger, but not call_service /// peerId isn't actually required for AST parsing, but host imports require it, and I don't see any workaround -export async function parseAstClosure(): Promise<(script: string) => string> { +async function parseAstClosure(): Promise<(script: string) => string> { let cfg = new HostImportsConfig((cfg) => newLogImport(cfg)); let instance = await interpreterInstance(cfg); @@ -250,7 +248,49 @@ export async function parseAstClosure(): Promise<(script: string) => string> { /// Parses script and returns AST in JSON format /// NOTE & TODO: interpreter is instantiated every time, make it a lazy constant? -export async function parseAIR(script: string): Promise { +async function parseAIR(script: string): Promise { let closure = await parseAstClosure(); return closure(script); } + +export class AquamarineInterpreter { + private wasmWrapper; + + constructor(wasmWrapper) { + this.wasmWrapper = wasmWrapper; + } + + static async create(config: { particleHandler: ParticleHandler; peerId: PeerId }) { + const cfg = new HostImportsConfig((cfg) => { + return newImportObject(config.particleHandler, cfg, config.peerId); + }); + + const instance = await interpreterInstance(cfg); + const res = new AquamarineInterpreter(instance); + return res; + } + + invoke(init_peer_id: string, script: string, prev_data: Uint8Array, data: Uint8Array): string { + let logLevel = log.getLevel(); + let logLevelStr = 'info'; + if (logLevel === 0) { + logLevelStr = 'trace'; + } else if (logLevel === 1) { + logLevelStr = 'debug'; + } else if (logLevel === 2) { + logLevelStr = 'info'; + } else if (logLevel === 3) { + logLevelStr = 'warn'; + } else if (logLevel === 4) { + logLevelStr = 'error'; + } else if (logLevel === 5) { + logLevelStr = 'off'; + } + + return aqua.invoke(this.wasmWrapper.exports, init_peer_id, script, prev_data, data, logLevelStr); + } + + parseAir(script: string): string { + return aqua.ast(this.wasmWrapper.exports, script); + } +} diff --git a/src/internal/builtins.ts b/src/internal/builtins.ts index 1d5c3afb..758dfe99 100644 --- a/src/internal/builtins.ts +++ b/src/internal/builtins.ts @@ -14,11 +14,10 @@ * limitations under the License. */ -import bs58 from 'bs58'; -import { sendParticleAsFetch } from '../api'; -import { Particle } from './particle'; -import { FluenceClient } from '../FluenceClient'; +import { RequestFlow } from './RequestFlow'; import { ModuleConfig } from './moduleConfig'; +import { RequestFlowBuilder } from './RequestFlowBuilder'; +import { FluenceClient } from 'src/api.unstable'; const nodeIdentityCall = (client: FluenceClient): string => { return `(call "${client.relayPeerId}" ("op" "identity") [])`; @@ -56,7 +55,13 @@ const requestResponse = async ( ) `; - const res = await sendParticleAsFetch(client, new Particle(script, data, ttl), name); + const [request, promise] = new RequestFlowBuilder() + .withRawScript(script) + .withVariables(data) + .withTTL(ttl) + .buildAsFetch('_callback', name); + await client.initiateFlow(request); + const res = await promise; return handleResponse(res); }; @@ -67,21 +72,25 @@ const requestResponse = async ( */ export const getModules = async (client: FluenceClient, ttl?: number): Promise => { let callbackFn = 'getModules'; - const particle = new Particle( - ` + const [req, promise] = new RequestFlowBuilder() + .withRawScript( + ` (seq (call __relay ("dist" "list_modules") [] result) (call myPeerId ("_callback" "${callbackFn}") [result]) ) `, - { + ) + .withVariables({ __relay: client.relayPeerId, myPeerId: client.selfPeerId, - }, - ttl, - ); + }) + .withTTL(ttl) + .buildAsFetch<[string[]]>('_callback', callbackFn); + client.initiateFlow(req); - return sendParticleAsFetch(client, particle, callbackFn); + const [res] = await promise; + return res; }; /** @@ -91,8 +100,9 @@ export const getModules = async (client: FluenceClient, ttl?: number): Promise => { let callbackFn = 'getInterfaces'; - const particle = new Particle( - ` + const [req, promise] = new RequestFlowBuilder() + .withRawScript( + ` (seq (seq (seq @@ -109,14 +119,17 @@ export const getInterfaces = async (client: FluenceClient, ttl?: number): Promis (call myPeerId ("_callback" "${callbackFn}") [interfaces]) ) `, - { + ) + .withVariables({ relay: client.relayPeerId, myPeerId: client.selfPeerId, - }, - ttl, - ); + }) + .withTTL(ttl) + .buildAsFetch<[string[]]>('_callback', callbackFn); - const [res] = await sendParticleAsFetch<[string[]]>(client, particle, callbackFn); + client.initiateFlow(req); + + const [res] = await promise; return res; }; @@ -153,15 +166,22 @@ export const uploadModule = async ( data.set('__relay', client.relayPeerId); data.set('myPeerId', client.selfPeerId); - let script = ` + const [req, promise] = new RequestFlowBuilder() + .withRawScript( + ` (seq (call __relay ("dist" "add_module") [module_bytes module_config] result) (call myPeerId ("_callback" "getModules") [result]) ) - `; + `, + ) + .withVariables(data) + .withTTL(ttl) + .buildAsFetch<[string[]]>('_callback', 'getModules'); - return sendParticleAsFetch(client, new Particle(script, data, ttl), 'getModules', '_callback'); + await client.initiateFlow(req); + await promise; }; /** diff --git a/src/internal/commonTypes.ts b/src/internal/commonTypes.ts index 068a581b..2adbb020 100644 --- a/src/internal/commonTypes.ts +++ b/src/internal/commonTypes.ts @@ -26,7 +26,7 @@ export type ParticleHandler = ( tetraplets: SecurityTetraplet[][], ) => CallServiceResult; -export interface StepperOutcome { +export interface InterpreterOutcome { ret_code: number; data: Uint8Array; next_peer_pks: string[]; @@ -44,3 +44,10 @@ export interface SecurityTetraplet extends ResolvedTriplet { } export type PeerIdB58 = string; + +export enum ResultCodes { + success = 0, + noServiceFound = 1, + exceptionInHandler = 2, + unkownError = 1024, +} diff --git a/src/internal/particle.ts b/src/internal/particle.ts index 0bc949ef..aa2ae698 100644 --- a/src/internal/particle.ts +++ b/src/internal/particle.ts @@ -18,43 +18,9 @@ import { v4 as uuidv4 } from 'uuid'; import { fromByteArray, toByteArray } from 'base64-js'; import PeerId from 'peer-id'; import { encode } from 'bs58'; -import { injectDataIntoParticle } from './ParticleProcessor'; -import { PeerIdB58 } from './commonTypes'; +import log, { LogLevel } from 'loglevel'; -const DEFAULT_TTL = 7000; - -/** - * The class representing Particle - a data structure used to perform operations on Fluence Network. It originates on some peer in the network, travels the network through a predefined path, triggering function execution along its way. - */ -export class Particle { - script: string; - data: Map; - ttl: number; - - /** - * Creates a particle with specified parameters. - * @param { String }script - Air script which defines the execution of a particle – its path, functions it triggers on peers, and so on. - * @param { Map | Record } data - Variables passed to the particle in the form of either JS Map or JS object with keys representing variable names and values representing values correspondingly - * @param { [Number]=7000 } ttl - Time to live, a timout after which the particle execution is stopped by Aquamarine. - */ - constructor(script: string, data?: Map | Record, ttl?: number) { - this.script = script; - if (data === undefined) { - this.data = new Map(); - } else if (data instanceof Map) { - this.data = data; - } else { - this.data = new Map(); - for (let k in data) { - this.data.set(k, data[k]); - } - } - - this.ttl = ttl ?? DEFAULT_TTL; - } -} - -export interface ParticleDto { +export interface Particle { id: string; init_peer_id: string; timestamp: number; @@ -65,6 +31,10 @@ export interface ParticleDto { data: Uint8Array; } +export const logParticle = (fn: Function, message: string, particle: Particle) => { + fn(message, particle); +}; + /** * Represents particle action to send to a node */ @@ -79,86 +49,10 @@ interface ParticlePayload { data: string; } -function wrapWithVariableInjectionScript(script: string, fields: string[]): string { - fields.forEach((v) => { - script = ` -(seq - (call %init_peer_id% ("__magic" "load") ["${v}"] ${v}) - ${script} -) - `; - }); - - return script; -} - -function wrapWithXor(script: string): string { - return ` - (xor - ${script} - (seq - (call __magic_relay ("op" "identity") []) - (call %init_peer_id% ("__magic" "handle_xor") [%last_error%]) - ) - )`; -} - -function wrapWithXorLocal(script: string): string { - return ` - (xor - ${script} - (call %init_peer_id% ("__magic" "handle_xor") [%last_error%]) - )`; -} - -export async function build( - peerId: PeerId, - relay: PeerIdB58 | undefined, - script: string, - data?: Map, - ttl?: number, - customId?: string, -): Promise { - const id = customId ?? genUUID(); - let currentTime = new Date().getTime(); - - if (data === undefined) { - data = new Map(); - } - - if (ttl === undefined) { - ttl = DEFAULT_TTL; - } - - if (relay) { - data.set('__magic_relay', relay); - } - injectDataIntoParticle(id, data, ttl); - script = wrapWithVariableInjectionScript(script, Array.from(data.keys())); - if (relay) { - script = wrapWithXor(script); - } else { - script = wrapWithXorLocal(script); - } - - let particle: ParticleDto = { - id: id, - init_peer_id: peerId.toB58String(), - timestamp: currentTime, - ttl: ttl, - script: script, - // TODO: sign particle - signature: '', - data: Buffer.from([]), - }; - - return particle; -} - /** * Creates an action to send to a node. */ -export function toPayload(particle: ParticleDto): ParticlePayload { +export function toPayload(particle: Particle): ParticlePayload { return { action: 'Particle', id: particle.id, @@ -172,7 +66,7 @@ export function toPayload(particle: ParticleDto): ParticlePayload { }; } -export function parseParticle(str: string): ParticleDto { +export function parseParticle(str: string): Particle { let json = JSON.parse(str); return {