/* * Copyright 2020 Fluence Labs Limited * * Licensed under the Apache License, Version 2.0 (the "License"); * you may not use this file except in compliance with the License. * You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ import {build, Particle} from "./particle"; import {StepperOutcome} from "./stepperOutcome"; import * as PeerId from "peer-id"; import Multiaddr from "multiaddr" import {FluenceConnection} from "./fluenceConnection"; import {Subscriptions} from "./subscriptions"; import { enqueueParticle, getCurrentParticleId, popParticle, setCurrentParticleId } from "./globalState"; import {instantiateStepper, Stepper} from "./stepper"; import log from "loglevel"; import {waitService} from "./helpers/waitService"; const bs58 = require('bs58') export class FluenceClient { readonly selfPeerId: PeerId; readonly selfPeerIdStr: string; private nodePeerIdStr: string; private subscriptions = new Subscriptions(); private stepper: Stepper = undefined; connection: FluenceConnection; constructor(selfPeerId: PeerId) { this.selfPeerId = selfPeerId; this.selfPeerIdStr = selfPeerId.toB58String(); } /** * Pass a particle to a stepper and send a result to other services. */ private async handleParticle(particle: Particle): Promise { // if a current particle is processing, add new particle to the queue if (getCurrentParticleId() !== undefined && getCurrentParticleId() !== particle.id) { enqueueParticle(particle); } else { if (this.stepper === undefined) { throw new Error("Undefined. Stepper is not initialized. User 'Fluence.connect' to create a client.") } // start particle processing if queue is empty try { setCurrentParticleId(particle.id) // check if a particle is relevant let now = Date.now(); let actualTtl = particle.timestamp + particle.ttl - now; if (actualTtl <= 0) { log.info(`Particle expired. Now: ${now}, ttl: ${particle.ttl}, ts: ${particle.timestamp}`) } else { // if there is no subscription yet, previous data is empty let prevData = []; let prevParticle = this.subscriptions.get(particle.id); if (prevParticle) { prevData = prevParticle.data; // update a particle in a subscription this.subscriptions.update(particle) } else { // set a particle with actual ttl this.subscriptions.subscribe(particle, actualTtl) } let stepperOutcomeStr = this.stepper(particle.init_peer_id, particle.script, JSON.stringify(prevData), JSON.stringify(particle.data)) let stepperOutcome: StepperOutcome = JSON.parse(stepperOutcomeStr); log.info("inner stepper outcome:"); log.info(stepperOutcome); // do nothing if there is no `next_peer_pks` if (stepperOutcome.next_peer_pks.length > 0) { let newParticle: Particle = {...particle}; newParticle.data = JSON.parse(stepperOutcome.call_path); await this.connection.sendParticle(newParticle).catch((reason) => { console.error(`Error on sending particle with id ${particle.id}: ${reason}`) }); } } } finally { // get last particle from the queue let nextParticle = popParticle(); // start the processing of a new particle if it exists if (nextParticle) { // update current particle setCurrentParticleId(nextParticle.id); await this.handleParticle(nextParticle) } else { // wait for a new call (do nothing) if there is no new particle in a queue setCurrentParticleId(undefined); } } } } /** * Handle incoming particle from a relay. */ private handleExternalParticle(): (particle: Particle) => Promise { let _this = this; return async (particle: Particle) => { let data = particle.data; let error: any = data["protocol!error"] if (error !== undefined) { log.error("error in external particle: ") log.error(error) } else { log.info("handle external particle: ") log.info(particle) await _this.handleParticle(particle); } } } async disconnect(): Promise { return this.connection.disconnect(); } /** * Establish a connection to the node. If the connection is already established, disconnect and reregister all services in a new connection. * * @param multiaddr */ async connect(multiaddr: string | Multiaddr): Promise { multiaddr = Multiaddr(multiaddr); let nodePeerId = multiaddr.getPeerId(); this.nodePeerIdStr = nodePeerId; if (!nodePeerId) { throw Error("'multiaddr' did not contain a valid peer id") } let firstConnection: boolean = true; if (this.connection) { firstConnection = false; await this.connection.disconnect(); } let peerId = PeerId.createFromB58String(nodePeerId); this.stepper = await instantiateStepper(this.selfPeerId); let connection = new FluenceConnection(multiaddr, peerId, this.selfPeerId, this.handleExternalParticle()); await connection.connect(); this.connection = connection; } async sendParticle(particle: Particle): Promise { await this.handleParticle(particle); return particle.id } nodeIdentityCall(): string { return `(call "${this.nodePeerIdStr}" ("identity" "") [] void[])` } async requestResponse(name: string, call: (nodeId: string) => string, returnValue: string, data: Map, handleResponse: (args: any[]) => T, nodeId?: string, ttl?: number): Promise { if (!ttl) { ttl = 10000 } if (!nodeId) { nodeId = this.nodePeerIdStr } let serviceCall = call(nodeId) let namedPromise = waitService(name, handleResponse, ttl) let script = `(seq ${this.nodeIdentityCall()} (seq (seq ${serviceCall} ${this.nodeIdentityCall()} ) (call "${this.selfPeerIdStr}" ("${namedPromise.name}" "") [${returnValue}] void[]) ) ) ` let particle = await build(this.selfPeerId, script, data, ttl) await this.sendParticle(particle); return namedPromise.promise } /** * Send a script to add module to a relay. Waiting for a response from a relay. */ async addModule(name: string, moduleBase64: string, nodeId?: string, ttl?: number): Promise { let config = { name: name, mem_pages_count: 100, logger_enabled: true, wasi: { envs: {}, preopened_files: ["/tmp"], mapped_dirs: {}, } } let data = new Map() data.set("module_bytes", moduleBase64) data.set("module_config", config) let call = (nodeId: string) => `(call "${nodeId}" ("add_module" "") [module_bytes module_config] void[])` return this.requestResponse("addModule", call, "", data, () => {}, nodeId, ttl) } /** * Send a script to add module to a relay. Waiting for a response from a relay. */ async addBlueprint(name: string, dependencies: string[], nodeId?: string, ttl?: number): Promise { let returnValue = "blueprint_id"; let call = (nodeId: string) => `(call "${nodeId}" ("add_blueprint" "") [blueprint] ${returnValue})` let data = new Map() data.set("blueprint", { name: name, dependencies: dependencies }) return this.requestResponse("addBlueprint", call, returnValue, data, (args: any[]) => args[0] as string, nodeId, ttl) } /** * Send a script to create a service to a relay. Waiting for a response from a relay. */ async createService(blueprintId: string, nodeId?: string, ttl?: number): Promise { let returnValue = "service_id"; let call = (nodeId: string) => `(call "${nodeId}" ("create" "") [blueprint_id] ${returnValue})` let data = new Map() data.set("blueprint_id", blueprintId) return this.requestResponse("createService", call, returnValue, data, (args: any[]) => args[0] as string, nodeId, ttl) } /** * Get all available modules hosted on a connected relay. */ async getAvailableModules(nodeId?: string, ttl?: number): Promise { let returnValue = "modules"; let call = (nodeId: string) => `(call "${nodeId}" ("get_available_modules" "") [] ${returnValue})` return this.requestResponse("getAvailableModules", call, returnValue, new Map(), (args: any[]) => args[0] as string[], nodeId, ttl) } /** * Get all available blueprints hosted on a connected relay. */ async getBlueprints(nodeId: string, ttl?: number): Promise { let returnValue = "blueprints"; let call = (nodeId: string) => `(call "${nodeId}" ("get_available_modules" "") [] ${returnValue})` return this.requestResponse("getBlueprints", call, returnValue, new Map(), (args: any[]) => args[0] as string[], nodeId, ttl) } /** * Add a provider to DHT network to neighborhood around a key. */ async addProvider(key: Buffer, providerPeer: string, providerServiceId?: string, nodeId?: string, ttl?: number): Promise { let call = (nodeId: string) => `(call "${nodeId}" ("add_provider" "") [key provider] void[])` key = bs58.encode(key) let provider = { peer: providerPeer, service_id: providerServiceId } let data = new Map() data.set("key", key) data.set("provider", provider) return this.requestResponse("addProvider", call, "", data, () => {}, nodeId, ttl) } /** * Get a provider from DHT network from neighborhood around a key.. */ async getProviders(key: Buffer, nodeId?: string, ttl?: number): Promise { key = bs58.encode(key) let returnValue = "providers" let call = (nodeId: string) => `(call "${nodeId}" ("get_providers" "") [key] providers[])` let data = new Map() data.set("key", key) return this.requestResponse("getProviders", call, returnValue, data, (args) => args[0], nodeId, ttl) } /** * Get relays neighborhood */ async neighborhood(node: string, ttl?: number): Promise { let returnValue = "neighborhood" let call = (nodeId: string) => `(call "${nodeId}" ("neighborhood" "") [node] ${returnValue})` let data = new Map() data.set("node", node) return this.requestResponse("neighborhood", call, returnValue, data, (args) => args[0] as string[], node, ttl) } /** * Call relays 'identity' method. It should return passed 'fields' */ async relayIdentity(fields: string[], data: Map, nodeId?: string, ttl?: number): Promise { let returnValue = "id"; let call = (nodeId: string) => `(call "${nodeId}" ("identity" "") [${fields.join(" ")}] ${returnValue})` return this.requestResponse("getIdentity", call, returnValue, data, (args: any[]) => args[0], nodeId, ttl) } }