mirror of
https://github.com/fluencelabs/fluence-js.git
synced 2025-06-29 07:41:35 +00:00
Tetraplets (#1)
This commit is contained in:
@ -14,22 +14,21 @@
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
import { build, Particle } from './particle';
|
||||
import { StepperOutcome } from './stepperOutcome';
|
||||
import * as PeerId from 'peer-id';
|
||||
import Multiaddr from 'multiaddr';
|
||||
import { FluenceConnection } from './fluenceConnection';
|
||||
import { Subscriptions } from './subscriptions';
|
||||
import { enqueueParticle, getCurrentParticleId, popParticle, setCurrentParticleId } from './globalState';
|
||||
import { instantiateInterpreter, InterpreterInvoke } from './stepper';
|
||||
import log from 'loglevel';
|
||||
import { waitService } from './helpers/waitService';
|
||||
import { ModuleConfig } from './moduleConfig';
|
||||
|
||||
import {build, Particle} from "./particle";
|
||||
import {StepperOutcome} from "./stepperOutcome";
|
||||
import * as PeerId from "peer-id";
|
||||
import Multiaddr from "multiaddr"
|
||||
import {FluenceConnection} from "./fluenceConnection";
|
||||
import {Subscriptions} from "./subscriptions";
|
||||
import {enqueueParticle, getCurrentParticleId, popParticle, setCurrentParticleId} from "./globalState";
|
||||
import {instantiateInterpreter, InterpreterInvoke} from "./stepper";
|
||||
import log from "loglevel";
|
||||
import {waitService} from "./helpers/waitService";
|
||||
import {ModuleConfig} from "./moduleConfig";
|
||||
const bs58 = require('bs58');
|
||||
|
||||
const bs58 = require('bs58')
|
||||
|
||||
const INFO_LOG_LEVEL = 2
|
||||
const INFO_LOG_LEVEL = 2;
|
||||
|
||||
export class FluenceClient {
|
||||
readonly selfPeerId: PeerId;
|
||||
@ -50,22 +49,21 @@ export class FluenceClient {
|
||||
* Pass a particle to a interpreter and send a result to other services.
|
||||
*/
|
||||
private async handleParticle(particle: Particle): Promise<void> {
|
||||
|
||||
// if a current particle is processing, add new particle to the queue
|
||||
if (getCurrentParticleId() !== undefined && getCurrentParticleId() !== particle.id) {
|
||||
enqueueParticle(particle);
|
||||
} else {
|
||||
if (this.interpreter === undefined) {
|
||||
throw new Error("Undefined. Interpreter is not initialized. Use 'Fluence.connect' to create a client.")
|
||||
throw new Error("Undefined. Interpreter is not initialized. Use 'Fluence.connect' to create a client.");
|
||||
}
|
||||
// start particle processing if queue is empty
|
||||
try {
|
||||
setCurrentParticleId(particle.id)
|
||||
setCurrentParticleId(particle.id);
|
||||
// check if a particle is relevant
|
||||
let now = Date.now();
|
||||
let actualTtl = particle.timestamp + particle.ttl - now;
|
||||
if (actualTtl <= 0) {
|
||||
log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`)
|
||||
log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`);
|
||||
} else {
|
||||
// if there is no subscription yet, previous data is empty
|
||||
let prevData = [];
|
||||
@ -73,35 +71,40 @@ export class FluenceClient {
|
||||
if (prevParticle) {
|
||||
prevData = prevParticle.data;
|
||||
// update a particle in a subscription
|
||||
this.subscriptions.update(particle)
|
||||
this.subscriptions.update(particle);
|
||||
} else {
|
||||
// set a particle with actual ttl
|
||||
this.subscriptions.subscribe(particle, actualTtl)
|
||||
this.subscriptions.subscribe(particle, actualTtl);
|
||||
}
|
||||
let stepperOutcomeStr = this.interpreter(particle.init_peer_id, particle.script, JSON.stringify(prevData), JSON.stringify(particle.data))
|
||||
let stepperOutcomeStr = this.interpreter(
|
||||
particle.init_peer_id,
|
||||
particle.script,
|
||||
JSON.stringify(prevData),
|
||||
JSON.stringify(particle.data),
|
||||
);
|
||||
let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr);
|
||||
|
||||
if (log.getLevel() <= INFO_LOG_LEVEL) {
|
||||
log.info("inner interpreter outcome:");
|
||||
let so = {...stepperOutcome}
|
||||
log.info('inner interpreter outcome:');
|
||||
let so = { ...stepperOutcome };
|
||||
try {
|
||||
so.data = JSON.parse(Buffer.from(so.data).toString("utf8"));
|
||||
so.data = JSON.parse(Buffer.from(so.data).toString('utf8'));
|
||||
log.info(so);
|
||||
} catch (e) {
|
||||
log.info("cannot parse StepperOutcome data as JSON: ", e);
|
||||
log.info('cannot parse StepperOutcome data as JSON: ', e);
|
||||
}
|
||||
}
|
||||
|
||||
// update data after aquamarine execution
|
||||
let newParticle: Particle = {...particle};
|
||||
newParticle.data = stepperOutcome.data
|
||||
let newParticle: Particle = { ...particle };
|
||||
newParticle.data = stepperOutcome.data;
|
||||
|
||||
this.subscriptions.update(newParticle)
|
||||
this.subscriptions.update(newParticle);
|
||||
|
||||
// do nothing if there is no `next_peer_pks` or if client isn't connected to the network
|
||||
if (stepperOutcome.next_peer_pks.length > 0 && this.connection) {
|
||||
await this.connection.sendParticle(newParticle).catch((reason) => {
|
||||
console.error(`Error on sending particle with id ${particle.id}: ${reason}`)
|
||||
console.error(`Error on sending particle with id ${particle.id}: ${reason}`);
|
||||
});
|
||||
}
|
||||
}
|
||||
@ -112,7 +115,7 @@ export class FluenceClient {
|
||||
if (nextParticle) {
|
||||
// update current particle
|
||||
setCurrentParticleId(nextParticle.id);
|
||||
await this.handleParticle(nextParticle)
|
||||
await this.handleParticle(nextParticle);
|
||||
} else {
|
||||
// wait for a new call (do nothing) if there is no new particle in a queue
|
||||
setCurrentParticleId(undefined);
|
||||
@ -125,21 +128,20 @@ export class FluenceClient {
|
||||
* Handle incoming particle from a relay.
|
||||
*/
|
||||
private handleExternalParticle(): (particle: Particle) => Promise<void> {
|
||||
|
||||
let _this = this;
|
||||
|
||||
return async (particle: Particle) => {
|
||||
let data = particle.data;
|
||||
let error: any = data["protocol!error"]
|
||||
let error: any = data['protocol!error'];
|
||||
if (error !== undefined) {
|
||||
log.error("error in external particle: ")
|
||||
log.error(error)
|
||||
log.error('error in external particle: ');
|
||||
log.error(error);
|
||||
} else {
|
||||
log.info("handle external particle: ")
|
||||
log.info(particle)
|
||||
log.info('handle external particle: ');
|
||||
log.info(particle);
|
||||
await _this.handleParticle(particle);
|
||||
}
|
||||
}
|
||||
};
|
||||
}
|
||||
|
||||
async disconnect(): Promise<void> {
|
||||
@ -162,13 +164,13 @@ export class FluenceClient {
|
||||
multiaddr = Multiaddr(multiaddr);
|
||||
|
||||
if (!this.interpreter) {
|
||||
throw Error("you must call 'instantiateInterpreter' before 'connect'")
|
||||
throw Error("you must call 'instantiateInterpreter' before 'connect'");
|
||||
}
|
||||
|
||||
let nodePeerId = multiaddr.getPeerId();
|
||||
this.nodePeerIdStr = nodePeerId;
|
||||
if (!nodePeerId) {
|
||||
throw Error("'multiaddr' did not contain a valid peer id")
|
||||
throw Error("'multiaddr' did not contain a valid peer id");
|
||||
}
|
||||
|
||||
let firstConnection: boolean = true;
|
||||
@ -186,7 +188,7 @@ export class FluenceClient {
|
||||
|
||||
async sendParticle(particle: Particle): Promise<string> {
|
||||
await this.handleParticle(particle);
|
||||
return particle.id
|
||||
return particle.id;
|
||||
}
|
||||
|
||||
async executeParticle(particle: Particle) {
|
||||
@ -194,21 +196,29 @@ export class FluenceClient {
|
||||
}
|
||||
|
||||
nodeIdentityCall(): string {
|
||||
return `(call "${this.nodePeerIdStr}" ("op" "identity") [] void[])`
|
||||
return `(call "${this.nodePeerIdStr}" ("op" "identity") [] void[])`;
|
||||
}
|
||||
|
||||
async requestResponse<T>(name: string, call: (nodeId: string) => string, returnValue: string, data: Map<string, any>, handleResponse: (args: any[]) => T, nodeId?: string, ttl?: number): Promise<T> {
|
||||
async requestResponse<T>(
|
||||
name: string,
|
||||
call: (nodeId: string) => string,
|
||||
returnValue: string,
|
||||
data: Map<string, any>,
|
||||
handleResponse: (args: any[]) => T,
|
||||
nodeId?: string,
|
||||
ttl?: number,
|
||||
): Promise<T> {
|
||||
if (!ttl) {
|
||||
ttl = 10000
|
||||
ttl = 10000;
|
||||
}
|
||||
|
||||
if (!nodeId) {
|
||||
nodeId = this.nodePeerIdStr
|
||||
nodeId = this.nodePeerIdStr;
|
||||
}
|
||||
|
||||
let serviceCall = call(nodeId)
|
||||
let serviceCall = call(nodeId);
|
||||
|
||||
let namedPromise = waitService(name, handleResponse, ttl)
|
||||
let namedPromise = waitService(name, handleResponse, ttl);
|
||||
|
||||
let script = `(seq
|
||||
${this.nodeIdentityCall()}
|
||||
@ -220,18 +230,24 @@ export class FluenceClient {
|
||||
(call "${this.selfPeerIdStr}" ("${namedPromise.name}" "") [${returnValue}] void[])
|
||||
)
|
||||
)
|
||||
`
|
||||
`;
|
||||
|
||||
let particle = await build(this.selfPeerId, script, data, ttl)
|
||||
let particle = await build(this.selfPeerId, script, data, ttl);
|
||||
await this.sendParticle(particle);
|
||||
|
||||
return namedPromise.promise
|
||||
return namedPromise.promise;
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a script to add module to a relay. Waiting for a response from a relay.
|
||||
*/
|
||||
async addModule(name: string, moduleBase64: string, config?: ModuleConfig, nodeId?: string, ttl?: number): Promise<void> {
|
||||
async addModule(
|
||||
name: string,
|
||||
moduleBase64: string,
|
||||
config?: ModuleConfig,
|
||||
nodeId?: string,
|
||||
ttl?: number,
|
||||
): Promise<void> {
|
||||
if (!config) {
|
||||
config = {
|
||||
name: name,
|
||||
@ -239,122 +255,166 @@ export class FluenceClient {
|
||||
logger_enabled: true,
|
||||
wasi: {
|
||||
envs: {},
|
||||
preopened_files: ["/tmp"],
|
||||
preopened_files: ['/tmp'],
|
||||
mapped_dirs: {},
|
||||
}
|
||||
}
|
||||
},
|
||||
};
|
||||
}
|
||||
|
||||
let data = new Map()
|
||||
data.set("module_bytes", moduleBase64)
|
||||
data.set("module_config", config)
|
||||
let data = new Map();
|
||||
data.set('module_bytes', moduleBase64);
|
||||
data.set('module_config', config);
|
||||
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "add_module") [module_bytes module_config] void[])`
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "add_module") [module_bytes module_config] void[])`;
|
||||
|
||||
return this.requestResponse("addModule", call, "", data, () => {}, nodeId, ttl)
|
||||
return this.requestResponse('addModule', call, '', data, () => {}, nodeId, ttl);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a script to add module to a relay. Waiting for a response from a relay.
|
||||
*/
|
||||
async addBlueprint(name: string, dependencies: string[], blueprintId?: string, nodeId?: string, ttl?: number): Promise<string> {
|
||||
let returnValue = "blueprint_id";
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "add_blueprint") [blueprint] ${returnValue})`
|
||||
async addBlueprint(
|
||||
name: string,
|
||||
dependencies: string[],
|
||||
blueprintId?: string,
|
||||
nodeId?: string,
|
||||
ttl?: number,
|
||||
): Promise<string> {
|
||||
let returnValue = 'blueprint_id';
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "add_blueprint") [blueprint] ${returnValue})`;
|
||||
|
||||
let data = new Map()
|
||||
data.set("blueprint", { name: name, dependencies: dependencies, id: blueprintId })
|
||||
let data = new Map();
|
||||
data.set('blueprint', { name: name, dependencies: dependencies, id: blueprintId });
|
||||
|
||||
return this.requestResponse("addBlueprint", call, returnValue, data, (args: any[]) => args[0] as string, nodeId, ttl)
|
||||
return this.requestResponse(
|
||||
'addBlueprint',
|
||||
call,
|
||||
returnValue,
|
||||
data,
|
||||
(args: any[]) => args[0] as string,
|
||||
nodeId,
|
||||
ttl,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Send a script to create a service to a relay. Waiting for a response from a relay.
|
||||
*/
|
||||
async createService(blueprintId: string, nodeId?: string, ttl?: number): Promise<string> {
|
||||
let returnValue = "service_id";
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("srv" "create") [blueprint_id] ${returnValue})`
|
||||
let returnValue = 'service_id';
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("srv" "create") [blueprint_id] ${returnValue})`;
|
||||
|
||||
let data = new Map()
|
||||
data.set("blueprint_id", blueprintId)
|
||||
let data = new Map();
|
||||
data.set('blueprint_id', blueprintId);
|
||||
|
||||
return this.requestResponse("createService", call, returnValue, data, (args: any[]) => args[0] as string, nodeId, ttl)
|
||||
return this.requestResponse(
|
||||
'createService',
|
||||
call,
|
||||
returnValue,
|
||||
data,
|
||||
(args: any[]) => args[0] as string,
|
||||
nodeId,
|
||||
ttl,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all available modules hosted on a connected relay.
|
||||
*/
|
||||
async getAvailableModules(nodeId?: string, ttl?: number): Promise<string[]> {
|
||||
let returnValue = "modules";
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "get_modules") [] ${returnValue})`
|
||||
let returnValue = 'modules';
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "get_modules") [] ${returnValue})`;
|
||||
|
||||
return this.requestResponse("getAvailableModules", call, returnValue, new Map(), (args: any[]) => args[0] as string[], nodeId, ttl)
|
||||
return this.requestResponse(
|
||||
'getAvailableModules',
|
||||
call,
|
||||
returnValue,
|
||||
new Map(),
|
||||
(args: any[]) => args[0] as string[],
|
||||
nodeId,
|
||||
ttl,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get all available blueprints hosted on a connected relay.
|
||||
*/
|
||||
async getBlueprints(nodeId: string, ttl?: number): Promise<string[]> {
|
||||
let returnValue = "blueprints";
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "get_blueprints") [] ${returnValue})`
|
||||
let returnValue = 'blueprints';
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dist" "get_blueprints") [] ${returnValue})`;
|
||||
|
||||
return this.requestResponse("getBlueprints", call, returnValue, new Map(), (args: any[]) => args[0] as string[], nodeId, ttl)
|
||||
return this.requestResponse(
|
||||
'getBlueprints',
|
||||
call,
|
||||
returnValue,
|
||||
new Map(),
|
||||
(args: any[]) => args[0] as string[],
|
||||
nodeId,
|
||||
ttl,
|
||||
);
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a provider to DHT network to neighborhood around a key.
|
||||
*/
|
||||
async addProvider(key: Buffer, providerPeer: string, providerServiceId?: string, nodeId?: string, ttl?: number): Promise<void> {
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dht" "add_provider") [key provider] void[])`
|
||||
async addProvider(
|
||||
key: Buffer,
|
||||
providerPeer: string,
|
||||
providerServiceId?: string,
|
||||
nodeId?: string,
|
||||
ttl?: number,
|
||||
): Promise<void> {
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dht" "add_provider") [key provider] void[])`;
|
||||
|
||||
key = bs58.encode(key)
|
||||
key = bs58.encode(key);
|
||||
|
||||
let provider = {
|
||||
peer: providerPeer,
|
||||
service_id: providerServiceId
|
||||
}
|
||||
service_id: providerServiceId,
|
||||
};
|
||||
|
||||
let data = new Map()
|
||||
data.set("key", key)
|
||||
data.set("provider", provider)
|
||||
let data = new Map();
|
||||
data.set('key', key);
|
||||
data.set('provider', provider);
|
||||
|
||||
return this.requestResponse("addProvider", call, "", data, () => {}, nodeId, ttl)
|
||||
return this.requestResponse('addProvider', call, '', data, () => {}, nodeId, ttl);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get a provider from DHT network from neighborhood around a key..
|
||||
*/
|
||||
async getProviders(key: Buffer, nodeId?: string, ttl?: number): Promise<any> {
|
||||
key = bs58.encode(key)
|
||||
key = bs58.encode(key);
|
||||
|
||||
let returnValue = "providers"
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dht" "get_providers") [key] providers[])`
|
||||
let returnValue = 'providers';
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dht" "get_providers") [key] providers[])`;
|
||||
|
||||
let data = new Map()
|
||||
data.set("key", key)
|
||||
let data = new Map();
|
||||
data.set('key', key);
|
||||
|
||||
return this.requestResponse("getProviders", call, returnValue, data, (args) => args[0], nodeId, ttl)
|
||||
return this.requestResponse('getProviders', call, returnValue, data, (args) => args[0], nodeId, ttl);
|
||||
}
|
||||
|
||||
/**
|
||||
* Get relays neighborhood
|
||||
*/
|
||||
async neighborhood(node: string, ttl?: number): Promise<string[]> {
|
||||
let returnValue = "neighborhood"
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dht" "neighborhood") [node] ${returnValue})`
|
||||
let returnValue = 'neighborhood';
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("dht" "neighborhood") [node] ${returnValue})`;
|
||||
|
||||
let data = new Map()
|
||||
data.set("node", node)
|
||||
let data = new Map();
|
||||
data.set('node', node);
|
||||
|
||||
return this.requestResponse("neighborhood", call, returnValue, data, (args) => args[0] as string[], node, ttl)
|
||||
return this.requestResponse('neighborhood', call, returnValue, data, (args) => args[0] as string[], node, ttl);
|
||||
}
|
||||
|
||||
/**
|
||||
* Call relays 'identity' method. It should return passed 'fields'
|
||||
*/
|
||||
async relayIdentity(fields: string[], data: Map<string, any>, nodeId?: string, ttl?: number): Promise<any> {
|
||||
let returnValue = "id";
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("op" "identity") [${fields.join(" ")}] ${returnValue})`
|
||||
let returnValue = 'id';
|
||||
let call = (nodeId: string) => `(call "${nodeId}" ("op" "identity") [${fields.join(' ')}] ${returnValue})`;
|
||||
|
||||
return this.requestResponse("getIdentity", call, returnValue, data, (args: any[]) => args[0], nodeId, ttl)
|
||||
return this.requestResponse('getIdentity', call, returnValue, data, (args: any[]) => args[0], nodeId, ttl);
|
||||
}
|
||||
}
|
||||
|
Reference in New Issue
Block a user