another way to structure aqua

This commit is contained in:
Pavel Murygin 2021-06-18 18:23:19 +03:00
parent 297b92f1f4
commit 9f1afb2ea5
2 changed files with 312 additions and 379 deletions

View File

@ -4,14 +4,11 @@ service DashboardEvent("event"):
collectPeerInfo(peer: PeerId, ident: Info, services: []Service, blueprints: []Blueprint, modules: []Module) collectPeerInfo(peer: PeerId, ident: Info, services: []Service, blueprints: []Blueprint, modules: []Module)
collectServiceInterface(peer: PeerId, serviceId: string, iface: Interface) collectServiceInterface(peer: PeerId, serviceId: string, iface: Interface)
func dashboardEvent_collectServiceInterface(peer: PeerId, serviceId: string, iface: Interface):
DashboardEvent.collectServiceInterface(peer, serviceId, iface)
func collectServiceInterfaces(peer: PeerId, services: []Service): func collectServiceInterfaces(peer: PeerId, services: []Service):
on peer:
for srv <- services par: for srv <- services par:
on peer:
info <- Srv.get_interface(srv.id) info <- Srv.get_interface(srv.id)
dashboardEvent_collectServiceInterface(peer, srv.id, info.interface) DashboardEvent.collectServiceInterface(peer, srv.id, info.interface)
func askAllAndSend(peer: PeerId): func askAllAndSend(peer: PeerId):
on peer: on peer:
@ -19,8 +16,8 @@ func askAllAndSend(peer: PeerId):
blueprints <- Dist.list_blueprints() blueprints <- Dist.list_blueprints()
modules <- Dist.list_modules() modules <- Dist.list_modules()
services <- Srv.list() services <- Srv.list()
DashboardEvent.collectPeerInfo(peer, ident, services, blueprints, modules)
collectServiceInterfaces(peer, services) collectServiceInterfaces(peer, services)
DashboardEvent.collectPeerInfo(peer, ident, services, blueprints, modules)
func findAndAskNeighboursSchema(relayPeerId: PeerId, clientId: PeerId): func findAndAskNeighboursSchema(relayPeerId: PeerId, clientId: PeerId):

View File

@ -10,7 +10,7 @@ import { RequestFlowBuilder } from '@fluencelabs/fluence/dist/api.unstable';
export async function askAllAndSend(client, peer) { export async function collectServiceInterfaces(client, peer, services) {
let request; let request;
const promise = new Promise((resolve, reject) => { const promise = new Promise((resolve, reject) => {
request = new RequestFlowBuilder() request = new RequestFlowBuilder()
@ -19,6 +19,73 @@ export async function askAllAndSend(client, peer) {
` `
(xor (xor
(seq (seq
(seq
(seq
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(call %init_peer_id% ("getDataSrv" "peer") [] peer)
)
(call %init_peer_id% ("getDataSrv" "services") [] services)
)
(fold services srv
(par
(seq
(seq
(seq
(call -relay- ("op" "identity") [])
(xor
(call peer ("srv" "get_interface") [srv.$.id!] info)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
)
)
(call -relay- ("op" "identity") [])
)
(call %init_peer_id% ("event" "collectServiceInterface") [peer srv.$.id! info.$.interface!])
)
(next srv)
)
)
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
`,
)
.configHandler((h) => {
h.on('getDataSrv', '-relay-', () => {
return client.relayPeerId;
});
h.on('getDataSrv', 'peer', () => {return peer;});
h.on('getDataSrv', 'services', () => {return services;});
h.onEvent('errorHandlingSrv', 'error', (args) => {
// assuming error is the single argument
const [err] = args;
reject(err);
});
})
.handleScriptError(reject)
.handleTimeout(() => {
reject('Request timed out for collectServiceInterfaces');
})
.build();
});
await client.initiateFlow(request);
return Promise.race([promise, Promise.resolve()]);
}
export async function askAllAndSend(client, peer) {
let request;
const promise = new Promise((resolve, reject) => {
request = new RequestFlowBuilder()
.disableInjections()
.withRawScript(
`
(xor
(seq (seq
(seq (seq
(seq (seq
@ -49,33 +116,31 @@ export async function askAllAndSend(client, peer) {
) )
(call -relay- ("op" "identity") []) (call -relay- ("op" "identity") [])
) )
(call %init_peer_id% ("event" "collectPeerInfo") [peer ident services blueprints modules])
)
(call -relay- ("op" "identity") [])
)
(xor
(fold services srv (fold services srv
(par (par
(seq (seq
(call peer ("srv" "get_interface") [srv.$.id!] info)
(call peer ("event" "collectServiceInterface") [peer srv.$.id! info.$.interface!])
)
(next srv)
)
)
(seq (seq
(seq
(call -relay- ("op" "identity") [])
(xor
(call peer ("srv" "get_interface") [srv.$.id!] info)
(seq (seq
(call -relay- ("op" "identity") []) (call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
) )
)
)
(call -relay- ("op" "identity") []) (call -relay- ("op" "identity") [])
) )
(call %init_peer_id% ("event" "collectServiceInterface") [peer srv.$.id! info.$.interface!])
)
(next srv)
) )
) )
(seq )
(call -relay- ("op" "identity") []) (call %init_peer_id% ("event" "collectPeerInfo") [peer ident services blueprints modules])
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3]) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
) )
`, `,
@ -104,304 +169,6 @@ export async function askAllAndSend(client, peer) {
export async function getAll(client, relayPeerId, knownPeers) {
let request;
const promise = new Promise((resolve, reject) => {
request = new RequestFlowBuilder()
.disableInjections()
.withRawScript(
`
(xor
(seq
(seq
(seq
(seq
(seq
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(call %init_peer_id% ("getDataSrv" "relayPeerId") [] relayPeerId)
)
(call %init_peer_id% ("getDataSrv" "knownPeers") [] knownPeers)
)
(par
(seq
(seq
(seq
(seq
(seq
(call -relay- ("op" "identity") [])
(xor
(seq
(seq
(seq
(call relayPeerId ("peer" "identify") [] ident)
(call relayPeerId ("dist" "list_blueprints") [] blueprints)
)
(call relayPeerId ("dist" "list_modules") [] modules)
)
(call relayPeerId ("srv" "list") [] services)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
)
)
(call -relay- ("op" "identity") [])
)
(call %init_peer_id% ("event" "collectPeerInfo") [relayPeerId ident services blueprints modules])
)
(call -relay- ("op" "identity") [])
)
(xor
(fold services srv
(par
(seq
(call relayPeerId ("srv" "get_interface") [srv.$.id!] info)
(call relayPeerId ("event" "collectServiceInterface") [relayPeerId srv.$.id! info.$.interface!])
)
(next srv)
)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
)
)
(seq
(seq
(seq
(call -relay- ("op" "identity") [])
(fold knownPeers peer
(par
(seq
(seq
(seq
(seq
(seq
(call -relay- ("op" "identity") [])
(xor
(seq
(seq
(seq
(call peer ("peer" "identify") [] ident0)
(call peer ("dist" "list_blueprints") [] blueprints0)
)
(call peer ("dist" "list_modules") [] modules0)
)
(call peer ("srv" "list") [] services0)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
)
)
(call -relay- ("op" "identity") [])
)
(call %init_peer_id% ("event" "collectPeerInfo") [peer ident0 services0 blueprints0 modules0])
)
(call -relay- ("op" "identity") [])
)
(xor
(fold services0 srv0
(par
(seq
(call peer ("srv" "get_interface") [srv0.$.id!] info0)
(call peer ("event" "collectServiceInterface") [peer srv0.$.id! info0.$.interface!])
)
(next srv0)
)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4])
)
)
)
(seq
(call -relay- ("op" "identity") [])
(next peer)
)
)
)
)
(call -relay- ("op" "identity") [])
)
(call %init_peer_id% ("op" "identity") [])
)
)
)
(call -relay- ("op" "identity") [])
)
(xor
(seq
(call relayPeerId ("kad" "neighborhood") [%init_peer_id% false] neighbors)
(fold neighbors n
(par
(xor
(seq
(call n ("kad" "neighborhood") [%init_peer_id% false] neighbors2)
(fold neighbors2 n2
(seq
(seq
(seq
(seq
(seq
(xor
(seq
(seq
(seq
(call n2 ("peer" "identify") [] ident1)
(call n2 ("dist" "list_blueprints") [] blueprints1)
)
(call n2 ("dist" "list_modules") [] modules1)
)
(call n2 ("srv" "list") [] services1)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5])
)
)
(call -relay- ("op" "identity") [])
)
(call n ("event" "collectPeerInfo") [n2 ident1 services1 blueprints1 modules1])
)
(xor
(fold services1 srv1
(par
(seq
(call n2 ("srv" "get_interface") [srv1.$.id!] info1)
(call n2 ("event" "collectServiceInterface") [n2 srv1.$.id! info1.$.interface!])
)
(next srv1)
)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 6])
)
)
)
(call -relay- ("op" "identity") [])
)
(next n2)
)
)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 7])
)
)
(seq
(seq
(seq
(call -relay- ("op" "identity") [])
(next n)
)
(call -relay- ("op" "identity") [])
)
(call %init_peer_id% ("op" "identity") [])
)
)
)
)
(seq
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 8])
)
(call -relay- ("op" "identity") [])
)
)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 9])
)
)
`,
)
.configHandler((h) => {
h.on('getDataSrv', '-relay-', () => {
return client.relayPeerId;
});
h.on('getDataSrv', 'relayPeerId', () => {return relayPeerId;});
h.on('getDataSrv', 'knownPeers', () => {return knownPeers;});
h.onEvent('errorHandlingSrv', 'error', (args) => {
// assuming error is the single argument
const [err] = args;
reject(err);
});
})
.handleScriptError(reject)
.handleTimeout(() => {
reject('Request timed out for getAll');
})
.build();
});
await client.initiateFlow(request);
return Promise.race([promise, Promise.resolve()]);
}
export async function dashboardEvent_collectServiceInterface(client, peer, serviceId, iface) {
let request;
const promise = new Promise((resolve, reject) => {
request = new RequestFlowBuilder()
.disableInjections()
.withRawScript(
`
(xor
(seq
(seq
(seq
(seq
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(call %init_peer_id% ("getDataSrv" "peer") [] peer)
)
(call %init_peer_id% ("getDataSrv" "serviceId") [] serviceId)
)
(call %init_peer_id% ("getDataSrv" "iface") [] iface)
)
(call %init_peer_id% ("event" "collectServiceInterface") [peer serviceId iface])
)
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
)
`,
)
.configHandler((h) => {
h.on('getDataSrv', '-relay-', () => {
return client.relayPeerId;
});
h.on('getDataSrv', 'peer', () => {return peer;});
h.on('getDataSrv', 'serviceId', () => {return serviceId;});
h.on('getDataSrv', 'iface', () => {return iface;});
h.onEvent('errorHandlingSrv', 'error', (args) => {
// assuming error is the single argument
const [err] = args;
reject(err);
});
})
.handleScriptError(reject)
.handleTimeout(() => {
reject('Request timed out for dashboardEvent_collectServiceInterface');
})
.build();
});
await client.initiateFlow(request);
return Promise.race([promise, Promise.resolve()]);
}
export async function findAndAskNeighboursSchema(client, relayPeerId, clientId) { export async function findAndAskNeighboursSchema(client, relayPeerId, clientId) {
let request; let request;
const promise = new Promise((resolve, reject) => { const promise = new Promise((resolve, reject) => {
@ -432,7 +199,6 @@ export async function findAndAskNeighboursSchema(client, relayPeerId, clientId)
(fold neighbors2 n2 (fold neighbors2 n2
(seq (seq
(seq (seq
(seq
(seq (seq
(seq (seq
(xor (xor
@ -453,26 +219,27 @@ export async function findAndAskNeighboursSchema(client, relayPeerId, clientId)
) )
(call -relay- ("op" "identity") []) (call -relay- ("op" "identity") [])
) )
(call n ("event" "collectPeerInfo") [n2 ident services blueprints modules])
)
(xor
(fold services srv (fold services srv
(par (par
(seq (seq
(seq
(xor
(call n2 ("srv" "get_interface") [srv.$.id!] info) (call n2 ("srv" "get_interface") [srv.$.id!] info)
(call n2 ("event" "collectServiceInterface") [n2 srv.$.id! info.$.interface!])
)
(next srv)
)
)
(seq (seq
(call -relay- ("op" "identity") []) (call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
) )
) )
)
(call -relay- ("op" "identity") []) (call -relay- ("op" "identity") [])
) )
(call n ("event" "collectServiceInterface") [n2 srv.$.id! info.$.interface!])
)
(next srv)
)
)
)
(call n ("event" "collectPeerInfo") [n2 ident services blueprints modules])
)
(next n2) (next n2)
) )
) )
@ -537,7 +304,7 @@ h.on('getDataSrv', 'clientId', () => {return clientId;});
export async function collectServiceInterfaces(client, peer, services) { export async function getAll(client, relayPeerId, knownPeers) {
let request; let request;
const promise = new Promise((resolve, reject) => { const promise = new Promise((resolve, reject) => {
request = new RequestFlowBuilder() request = new RequestFlowBuilder()
@ -547,37 +314,206 @@ export async function collectServiceInterfaces(client, peer, services) {
(xor (xor
(seq (seq
(seq (seq
(seq
(seq (seq
(seq (seq
(call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-) (call %init_peer_id% ("getDataSrv" "-relay-") [] -relay-)
(call %init_peer_id% ("getDataSrv" "peer") [] peer) (call %init_peer_id% ("getDataSrv" "relayPeerId") [] relayPeerId)
) )
(call %init_peer_id% ("getDataSrv" "services") [] services) (call %init_peer_id% ("getDataSrv" "knownPeers") [] knownPeers)
) )
(call -relay- ("op" "identity") [])
)
(xor
(fold services srv
(par (par
(seq (seq
(call peer ("srv" "get_interface") [srv.$.id!] info)
(call peer ("event" "collectServiceInterface") [peer srv.$.id! info.$.interface!])
)
(next srv)
)
)
(seq (seq
(seq
(seq
(call -relay- ("op" "identity") [])
(xor
(seq
(seq
(seq
(call relayPeerId ("peer" "identify") [] ident)
(call relayPeerId ("dist" "list_blueprints") [] blueprints)
)
(call relayPeerId ("dist" "list_modules") [] modules)
)
(call relayPeerId ("srv" "list") [] services)
)
(seq (seq
(call -relay- ("op" "identity") []) (call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1]) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 1])
) )
)
)
(call -relay- ("op" "identity") [])
)
(fold services srv
(par
(seq
(seq
(seq
(call -relay- ("op" "identity") [])
(xor
(call relayPeerId ("srv" "get_interface") [srv.$.id!] info)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2])
)
)
)
(call -relay- ("op" "identity") [])
)
(call %init_peer_id% ("event" "collectServiceInterface") [relayPeerId srv.$.id! info.$.interface!])
)
(next srv)
)
)
)
(call %init_peer_id% ("event" "collectPeerInfo") [relayPeerId ident services blueprints modules])
)
(fold knownPeers peer
(par
(seq
(seq
(seq
(seq
(call -relay- ("op" "identity") [])
(xor
(seq
(seq
(seq
(call peer ("peer" "identify") [] ident0)
(call peer ("dist" "list_blueprints") [] blueprints0)
)
(call peer ("dist" "list_modules") [] modules0)
)
(call peer ("srv" "list") [] services0)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 3])
)
)
)
(call -relay- ("op" "identity") [])
)
(fold services0 srv0
(par
(seq
(seq
(seq
(call -relay- ("op" "identity") [])
(xor
(call peer ("srv" "get_interface") [srv0.$.id!] info0)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 4])
)
)
)
(call -relay- ("op" "identity") [])
)
(call %init_peer_id% ("event" "collectServiceInterface") [peer srv0.$.id! info0.$.interface!])
)
(next srv0)
)
)
)
(call %init_peer_id% ("event" "collectPeerInfo") [peer ident0 services0 blueprints0 modules0])
)
(next peer)
)
)
)
)
(call -relay- ("op" "identity") [])
)
(xor
(seq
(call relayPeerId ("kad" "neighborhood") [%init_peer_id% false] neighbors)
(fold neighbors n
(par
(xor
(seq
(call n ("kad" "neighborhood") [%init_peer_id% false] neighbors2)
(fold neighbors2 n2
(seq
(seq
(seq
(seq
(xor
(seq
(seq
(seq
(call n2 ("peer" "identify") [] ident1)
(call n2 ("dist" "list_blueprints") [] blueprints1)
)
(call n2 ("dist" "list_modules") [] modules1)
)
(call n2 ("srv" "list") [] services1)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 5])
)
)
(call -relay- ("op" "identity") [])
)
(fold services1 srv1
(par
(seq
(seq
(xor
(call n2 ("srv" "get_interface") [srv1.$.id!] info1)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 6])
)
)
(call -relay- ("op" "identity") [])
)
(call n ("event" "collectServiceInterface") [n2 srv1.$.id! info1.$.interface!])
)
(next srv1)
)
)
)
(call n ("event" "collectPeerInfo") [n2 ident1 services1 blueprints1 modules1])
)
(next n2)
)
)
)
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 7])
)
)
(seq
(seq
(seq
(call -relay- ("op" "identity") [])
(next n)
)
(call -relay- ("op" "identity") [])
)
(call %init_peer_id% ("op" "identity") [])
)
)
)
)
(seq
(seq
(call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 8])
)
(call -relay- ("op" "identity") []) (call -relay- ("op" "identity") [])
) )
) )
) )
(seq (seq
(call -relay- ("op" "identity") []) (call -relay- ("op" "identity") [])
(call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 2]) (call %init_peer_id% ("errorHandlingSrv" "error") [%last_error% 9])
) )
) )
@ -587,8 +523,8 @@ export async function collectServiceInterfaces(client, peer, services) {
h.on('getDataSrv', '-relay-', () => { h.on('getDataSrv', '-relay-', () => {
return client.relayPeerId; return client.relayPeerId;
}); });
h.on('getDataSrv', 'peer', () => {return peer;}); h.on('getDataSrv', 'relayPeerId', () => {return relayPeerId;});
h.on('getDataSrv', 'services', () => {return services;}); h.on('getDataSrv', 'knownPeers', () => {return knownPeers;});
h.onEvent('errorHandlingSrv', 'error', (args) => { h.onEvent('errorHandlingSrv', 'error', (args) => {
// assuming error is the single argument // assuming error is the single argument
@ -598,7 +534,7 @@ h.on('getDataSrv', 'services', () => {return services;});
}) })
.handleScriptError(reject) .handleScriptError(reject)
.handleTimeout(() => { .handleTimeout(() => {
reject('Request timed out for collectServiceInterfaces'); reject('Request timed out for getAll');
}) })
.build(); .build();
}); });