diff --git a/src/Interface.ts b/src/Interface.ts
deleted file mode 100644
index a3ed410e..00000000
--- a/src/Interface.ts
+++ /dev/null
@@ -1,85 +0,0 @@
-/*
- * Copyright 2020 Fluence Labs Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-export interface Interface {
- service_id: string,
- modules: Module[]
-}
-
-export function checkFunction(f: any): f is Function {
- if (!f.name) throw Error(`There is no 'name' field in Function struct: ${JSON.stringify(f)}`)
-
- if (f.input_types) {
- if (!(f.input_types instanceof Array)) {
- throw Error(`'input_types' should be an array: ${JSON.stringify(f)}`)
- }
- f.input_types.forEach((i: any) => {
- if ((typeof i) !== 'string') {
- throw Error(`'input_types' should be a string: ${JSON.stringify(f)}`)
- }
- });
- }
-
- if (f.output_types) {
- if (!(f.output_types instanceof Array)) {
- throw Error(`'output_types' should be an array: ${JSON.stringify(f)}`)
- }
- f.output_types.forEach((o: any) => {
- if ((typeof o) !== 'string') {
- throw Error(`'output_types' should be a string: ${JSON.stringify(f)}`)
- }
- });
- }
-
- return true;
-}
-
-function checkModule(module: any): module is Module {
- if (!module.name) throw Error(`There is no 'name' field in Module struct: ${JSON.stringify(module)}`)
- if (!module.functions) {
- module.functions.forEach((f: any) => {
- checkFunction(f)
- });
- }
-
- return true;
-}
-
-/**
- * Throws an error if 'i' is not an Interface type.
- */
-export function checkInterface(i: any): i is Interface {
- if (!i.service_id) throw new Error(`There is no 'service_id' field in Interface struct: ${JSON.stringify(i)}`)
- if (i.modules) {
- i.modules.forEach((module: any) => {
- checkModule(module);
- });
- }
-
- return true;
-
-}
-
-export interface Module {
- name: string,
- functions: Function[]
-}
-
-export interface Function {
- name: string,
- input_types: string[],
- output_types: string[]
-}
diff --git a/src/address.ts b/src/address.ts
deleted file mode 100644
index e4d1a2c9..00000000
--- a/src/address.ts
+++ /dev/null
@@ -1,176 +0,0 @@
-/*
- * Copyright 2020 Fluence Labs Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import * as PeerId from "peer-id";
-import {encode} from "bs58"
-
-export interface Address {
- protocols: Protocol[],
- hash?: string
-}
-
-export interface Protocol {
- protocol: ProtocolType,
- value?: string
-}
-
-export enum ProtocolType {
- Providers = "providers",
- Peer = "peer",
- Signature = "signature",
- Client = "client"
-}
-
-const PROTOCOL = "fluence:";
-
-export function getSignature(address: Address): string | undefined {
- return address.protocols.find(p => p.protocol === ProtocolType.Signature)?.value
-}
-
-export function addressToString(address: Address): string {
- let addressStr = PROTOCOL;
-
- for (let addr of address.protocols) {
- addressStr = addressStr + "/" + addr.protocol;
- if (addr.value) {
- addressStr = addressStr + "/" + addr.value;
- }
- }
-
- if (address.hash) {
- addressStr = addressStr + "#" + address.hash
- }
-
- return addressStr;
-}
-
-function protocolWithValue(protocol: ProtocolType, protocolIterator: IterableIterator<[number, string]>): Protocol {
-
- let protocolValue = protocolIterator.next().value;
-
- if (!protocolValue || !protocolValue[1]) {
- throw Error(`protocol '${protocol}' should be with a value`)
- }
-
- return {protocol: protocol, value: protocolValue[1]};
-}
-
-
-export function parseProtocol(protocol: string, protocolIterator: IterableIterator<[number, string]>): Protocol {
- protocol = protocol.toLocaleLowerCase();
-
- switch (protocol) {
- case ProtocolType.Providers:
- return protocolWithValue(protocol, protocolIterator);
- case ProtocolType.Client:
- return protocolWithValue(protocol, protocolIterator);
- case ProtocolType.Peer:
- return protocolWithValue(protocol, protocolIterator);
- case ProtocolType.Signature:
- return protocolWithValue(protocol, protocolIterator);
- default:
- throw Error("cannot parse protocol. Should be 'service|peer|client|signature'");
- }
-
-}
-
-export async function createRelayAddressWithSig(relay: string, peerId: PeerId, sig: string, hash?: string): Promise
{
- let protocols = [
- {protocol: ProtocolType.Peer, value: relay},
- {protocol: ProtocolType.Client, value: peerId.toB58String()},
- {protocol: ProtocolType.Signature, value: sig}
- ];
-
- return {
- protocols: protocols,
- hash: hash
- }
-}
-
-export async function createRelayAddress(relay: string, peerId: PeerId, withSig: boolean, hash?: string): Promise {
-
- let protocols = [
- {protocol: ProtocolType.Peer, value: relay},
- {protocol: ProtocolType.Client, value: peerId.toB58String()}
- ];
-
- if (withSig) {
- let str = addressToString({protocols: protocols}).replace(PROTOCOL, "");
- let signature = await peerId.privKey.sign(Buffer.from(str));
- let signatureStr = encode(signature);
-
- protocols.push({protocol: ProtocolType.Signature, value: signatureStr});
- }
-
- return {
- protocols: protocols,
- hash: hash
- }
-}
-
-export function createProviderAddress(service: string, hash?: string): Address {
-
- let protocol = {protocol: ProtocolType.Providers, value: service};
-
- return {
- protocols: [protocol],
- hash: hash
- }
-}
-
-export function createPeerAddress(peer: string, hash?: string): Address {
- let protocol = {protocol: ProtocolType.Peer, value: peer};
-
- return {
- protocols: [protocol],
- hash: hash
- }
-}
-
-export function parseAddress(str: string): Address {
- str = str.replace("fluence:", "");
-
- // delete leading slashes
- str = str.replace(/^\/+/, '');
-
- let mainAndHash = str.split("#");
-
- let parts = mainAndHash[0].split("/");
- if (parts.length < 1) {
- throw Error("address parts should not be empty")
- }
-
- let protocols: Protocol[] = [];
- let partsEntries: IterableIterator<[number, string]> = parts.entries();
-
- while (true) {
- let result = partsEntries.next();
- if (result.done) break;
- let protocol = parseProtocol(result.value[1], partsEntries);
- protocols.push(protocol);
- }
-
- let hashPart = mainAndHash.slice(1, mainAndHash.length).join();
- let hash = undefined;
- if (hashPart) {
- hash = hashPart;
- }
-
- return {
- protocols: protocols,
- hash: hash
- }
-}
diff --git a/src/blueprint.ts b/src/blueprint.ts
deleted file mode 100644
index ba5a5c42..00000000
--- a/src/blueprint.ts
+++ /dev/null
@@ -1,35 +0,0 @@
-/*
- * Copyright 2020 Fluence Labs Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-export interface Blueprint {
- dependencies: string[],
- id: string,
- name: string
-}
-
-export function checkBlueprint(b: any): b is Blueprint {
- if (!b.id) throw new Error(`There is no 'id' field in Blueprint struct: ${JSON.stringify(b)}`)
- if (b.dependencies) {
- b.dependencies.forEach((dep: any) => {
- if ((typeof dep) !== 'string') {
- throw Error(`'dependencies' should be an array of strings: ${JSON.stringify(b)}`)
- }
- });
- }
-
- return true;
-
-}
diff --git a/src/fluenceClient.ts b/src/fluenceClient.ts
index 87633a76..91d4fc4f 100644
--- a/src/fluenceClient.ts
+++ b/src/fluenceClient.ts
@@ -14,398 +14,54 @@
* limitations under the License.
*/
-import {
- Address,
- createPeerAddress,
- createRelayAddress,
- createProviderAddress,
- ProtocolType,
- addressToString, createRelayAddressWithSig
-} from "./address";
-import {callToString, FunctionCall, genUUID, makeFunctionCall,} from "./functionCall";
-import * as PeerId from "peer-id";
-import {LocalServices} from "./localServices";
-import Multiaddr from "multiaddr"
-import {Subscriptions} from "./subscriptions";
-import {FluenceConnection} from "./fluenceConnection";
-import {checkInterface, Interface} from "./Interface";
-import {Service} from "./service";
-import {Blueprint, checkBlueprint} from "./blueprint";
-import * as log from 'loglevel';
-/**
- * @param target receiver
- * @param args message in the call
- * @param moduleId module name
- * @param fname function name
- * @param name common field for debug purposes
- * @param msgId hash that will be added to replyTo address
- */
-interface Call {
- target: Address,
- args: any,
- moduleId?: string,
- fname?: string,
- msgId?: string,
- name?: string
-}
+import {Particle} from "./particle";
+import * as PeerId from "peer-id";
+import Multiaddr from "multiaddr"
+import {FluenceConnection} from "./fluenceConnection";
+import {Subscriptions} from "./subscriptions";
export class FluenceClient {
readonly selfPeerId: PeerId;
readonly selfPeerIdStr: string;
+
private nodePeerIdStr: string;
+ private subscriptions = new Subscriptions();
connection: FluenceConnection;
- private services: LocalServices = new LocalServices();
-
- private subscriptions: Subscriptions = new Subscriptions();
-
constructor(selfPeerId: PeerId) {
this.selfPeerId = selfPeerId;
this.selfPeerIdStr = selfPeerId.toB58String();
}
- /**
- * Makes call with response from function. Without reply_to field.
- */
- private responseCall(target: Address, args: any): FunctionCall {
- return makeFunctionCall(genUUID(), target, this.connection.sender, args, undefined, "response");
- }
-
/**
* Waits a response that match the predicate.
*
- * @param predicate will be applied to each incoming call until it matches
- * @param ignoreErrors ignore an errors, wait for success response
+ * @param id
+ * @param ttl
*/
- waitResponse(predicate: (args: any, target: Address, replyTo: Address, moduleId?: string, fname?: string) => (boolean | undefined), ignoreErrors: boolean): Promise {
+ waitResponse(id: string, ttl: number): Promise {
return new Promise((resolve, reject) => {
// subscribe for responses, to handle response
// TODO if there's no conn, reject
- this.subscribe((args: any, target: Address, replyTo: Address, moduleId?: string, fname?: string) => {
- if (predicate(args, target, replyTo, moduleId, fname)) {
- if (args.reason) {
- if (ignoreErrors) {
- return false;
- } else {
- reject(new Error(args.reason));
- }
- } else {
- resolve(args);
- }
- return true;
- }
- return false;
- });
- });
- }
-
- private getPredicate(msgId: string): (args: any, target: Address) => (boolean | undefined) {
- return (args: any, target: Address) => target.hash && target.hash === msgId;
- }
-
- /**
- * Send call and forget.
- *
- */
- async sendCall(call: Call) {
- if (this.connection && this.connection.isConnected()) {
- await this.connection.sendFunctionCall(call.target, call.args, call.moduleId, call.fname, call.msgId, call.name);
- } else {
- throw Error("client is not connected")
- }
- }
-
- /**
- * Send call to the provider and wait a response matches predicate.
- *
- * @param provider published name in dht
- * @param args message to the service
- * @param moduleId module name
- * @param fname function name
- * @param name debug info
- */
- async callProvider(provider: string, args: any, moduleId?: string, fname?: string, name?: string): Promise {
- let msgId = genUUID();
- let predicate = this.getPredicate(msgId);
- let address = createProviderAddress(provider);
- await this.sendCall({target: address, args: args, moduleId: moduleId, fname: fname, msgId: msgId, name: name});
- return await this.waitResponse(predicate, true);
- }
-
- /**
- * Send a message to a client that connected with a relay.
- *
- * @param relayId
- * @param clientId
- * @param sig
- * @param moduleId
- * @param args message to the service
- * @param fname function name
- * @param name debug info
- */
- async callClient(relayId: string, clientId: string, sig: string, moduleId: string, args: any, fname?: string, name?: string): Promise {
- let msgId = genUUID();
- let clientPeerId = await PeerId.createFromB58String(clientId);
- let address = await createRelayAddressWithSig(relayId, clientPeerId, sig);
-
- await this.sendCall({target: address, args: args, moduleId: moduleId, fname: fname, msgId: msgId, name: name})
- }
-
- /**
- * Send a call to the local service and wait a response matches predicate on a peer the client connected with.
- *
- * @param moduleId
- * @param addr node address
- * @param args message to the service
- * @param fname function name
- * @param name debug info
- */
- async callPeer(moduleId: string, args: any, fname?: string, addr?: string, name?: string): Promise {
- let msgId = genUUID();
- let predicate = this.getPredicate(msgId);
-
- let address;
- if (addr) {
- address = createPeerAddress(addr);
- } else {
- address = createPeerAddress(this.nodePeerIdStr);
- }
-
- await this.sendCall({target: address, args: args, moduleId: moduleId, fname: fname, msgId: msgId, name: name})
-
- return await this.waitResponse(predicate, false);
- }
-
- async callService(peerId: string, serviceId: string, moduleId: string, args: any, fname?: string): Promise {
- let target = createPeerAddress(peerId, serviceId);
- let msgId = genUUID();
- let predicate = this.getPredicate(msgId);
-
- await this.sendCall({target: target, args: args, moduleId: moduleId, fname: fname, msgId: msgId});
- return await this.waitResponse(predicate, false);
- }
-
- getService(peerId: string, serviceId: string): Service {
- return new Service(this, peerId, serviceId);
+ this.subscriptions.subscribe(id, (particle: Particle) => {
+ resolve(particle);
+ }, ttl);
+ })
}
/**
* Handle incoming call.
* If FunctionCall returns - we should send it as a response.
*/
- handleCall(): (call: FunctionCall) => FunctionCall | undefined {
+ handleParticle(): (particle: Particle) => void {
let _this = this;
- return (call: FunctionCall) => {
- log.debug("FunctionCall received:");
-
- // if other side return an error - handle it
- // TODO do it in the protocol
- /*if (call.arguments.error) {
- this.handleError(call);
- } else {
-
- }*/
-
- let target = call.target;
-
- // the tail of addresses should be you or your service
- let lastProtocol = target.protocols[target.protocols.length - 1];
-
+ return (particle: Particle) => {
// call all subscriptions for a new call
- _this.subscriptions.applyToSubscriptions(call);
-
- switch (lastProtocol.protocol) {
- case ProtocolType.Providers:
-
-
- return undefined;
- case ProtocolType.Client:
- if (lastProtocol.value === _this.selfPeerIdStr) {
- log.debug(`relay call:`);
- log.debug(JSON.stringify(call, undefined, 2));
- if (call.module) {
- try {
- // call of the service, service should handle response sending, error handling, requests to other services
- let applied = _this.services.applyToService(call);
-
- // if the request hasn't been applied, there is no such service. Return an error.
- if (!applied) {
- log.warn(`there is no service ${lastProtocol.value}`);
- return this.responseCall(call.reply_to, {
- reason: `there is no such service`,
- msg: call
- });
- }
- } catch (e) {
- // if service throw an error, return it to the sender
- return this.responseCall(call.reply_to, {
- reason: `error on execution: ${e}`,
- msg: call
- });
- }
- }
- } else {
- log.warn(`this relay call is not for me: ${callToString(call)}`);
- return this.responseCall(call.reply_to, {
- reason: `this relay call is not for me`,
- msg: call
- });
- }
- return undefined;
- case ProtocolType.Peer:
- if (lastProtocol.value === this.selfPeerIdStr) {
- log.debug(`peer call: ${call}`);
- } else {
- log.warn(`this peer call is not for me: ${callToString(call)}`);
- return this.responseCall(call.reply_to, {
- reason: `this relay call is not for me`,
- msg: call
- });
- }
- return undefined;
- }
- }
- }
-
-
- /**
- * Become a name provider. Other network members could find and call one of the providers of this name by this name.
- */
- async provideName(name: string, fn: (req: FunctionCall) => void) {
- let replyTo = this.connection.sender;
- await this.callPeer("provide", {name: name, address: addressToString(replyTo)})
-
- this.services.addService(name, fn);
- }
-
- /**
- * Sends a call to create a service on remote node.
- */
- async createService(blueprint: string, peerId?: string): Promise {
- let resp = await this.callPeer("create", {blueprint_id: blueprint}, undefined, peerId);
-
- if (resp && resp.service_id) {
- return resp.service_id
- } else {
- log.error("Unknown response type on `createService`: ", resp)
- throw new Error("Unknown response type on `createService`");
- }
- }
-
- async addBlueprint(name: string, dependencies: string[], peerId?: string): Promise {
-
- let id = genUUID();
- let blueprint = {
- name: name,
- id: id,
- dependencies: dependencies
- };
- let msg_id = genUUID();
-
- await this.callPeer("add_blueprint", {msg_id, blueprint}, undefined, peerId);
-
- return id;
- }
-
- async getInterface(serviceId: string, peerId?: string): Promise {
- let resp;
- resp = await this.callPeer("get_interface", {service_id: serviceId}, undefined, peerId)
- let i = resp.interface;
-
- if (checkInterface(i)) {
- return i;
- } else {
- throw new Error("Unexpected");
- }
- }
-
- async getAvailableBlueprints(peerId?: string): Promise {
- let resp = await this.callPeer("get_available_blueprints", {}, undefined, peerId);
-
- let blueprints = resp.available_blueprints;
-
- if (blueprints && blueprints instanceof Array) {
- return blueprints.map((b: any) => {
- if (checkBlueprint(b)) {
- return b;
- } else {
- throw new Error("Unexpected");
- }
- });
- } else {
- throw new Error("Unexpected. 'get_active_interfaces' should return an array of interfaces.");
- }
- }
-
- async getActiveInterfaces(peerId?: string): Promise {
- let resp = await this.callPeer("get_active_interfaces", {}, undefined, peerId);
-
- let interfaces = resp.active_interfaces;
- if (interfaces && interfaces instanceof Array) {
- return interfaces.map((i: any) => {
- if (checkInterface(i)) {
- return i;
- } else {
- throw new Error("Unexpected");
- }
- });
- } else {
- throw new Error("Unexpected. 'get_active_interfaces' should return an array pf interfaces.");
- }
- }
-
- async getAvailableModules(peerId?: string): Promise {
- let resp = await this.callPeer("get_available_modules", {}, undefined, peerId);
- return resp.available_modules;
- }
-
- /**
- * Add new WASM module to the node.
- *
- * @param bytes WASM in base64
- * @param name WASM identificator
- * @param mem_pages_count memory amount for WASM
- * @param envs environment variables
- * @param mapped_dirs links to directories
- * @param preopened_files files and directories that will be used in WASM
- * @param peerId the node to add module
- */
- async addModule(bytes: string, name: string, mem_pages_count: number, envs: string[], mapped_dirs: any, preopened_files: string[], peerId?: string): Promise {
- let config: any = {
- logger_enabled: true,
- mem_pages_count: mem_pages_count,
- name: name,
- wasi: {
- envs: envs,
- mapped_dirs: mapped_dirs,
- preopened_files: preopened_files
- }
- }
- let resp = await this.callPeer("add_module", {bytes: bytes, config: config}, undefined, peerId);
-
- return resp.available_modules;
- }
-
- // subscribe new hook for every incoming call, to handle in-service responses and other different cases
- // the hook will be deleted if it will return `true`
- subscribe(predicate: (args: any, target: Address, replyTo: Address, moduleId?: string, fname?: string) => (boolean | undefined)) {
- this.subscriptions.subscribe(predicate)
- }
-
-
- /**
- * Sends a call to unregister the service.
- */
- async unregisterService(moduleId: string) {
- if (this.services.deleteService(moduleId)) {
- log.warn("unregister is not implemented yet (service: ${serviceId}")
- // TODO unregister in fluence network when it will be supported
- // let regMsg = makeRegisterMessage(serviceId, PeerId.createFromB58String(this.nodePeerId));
- // await this.sendFunctionCall(regMsg);
+ _this.subscriptions.applyToSubscriptions(particle);
}
}
@@ -436,21 +92,15 @@ export class FluenceClient {
}
let peerId = PeerId.createFromB58String(nodePeerId);
- let sender = await createRelayAddress(nodePeerId, this.selfPeerId, false);
- let replyTo = await createRelayAddress(nodePeerId, this.selfPeerId, true);
- let connection = new FluenceConnection(multiaddr, peerId, this.selfPeerId, sender, replyTo, this.handleCall());
+ let connection = new FluenceConnection(multiaddr, peerId, this.selfPeerId, this.handleParticle());
await connection.connect();
this.connection = connection;
+ }
- // if the client already had a connection, it will reregister all services after establishing a new connection.
- if (!firstConnection) {
- for (let service of this.services.getAllServices().keys()) {
- await this.connection.provideName(service);
- }
-
- }
-
+ async sendParticle(particle: Particle): Promise {
+ await this.connection.sendParticle(particle);
+ return this.waitResponse(particle.id, particle.ttl);
}
}
diff --git a/src/fluenceConnection.ts b/src/fluenceConnection.ts
index f0ce9294..19914329 100644
--- a/src/fluenceConnection.ts
+++ b/src/fluenceConnection.ts
@@ -14,16 +14,6 @@
* limitations under the License.
*/
-import {Address, createPeerAddress} from "./address";
-import {
- callToString,
- FunctionCall,
- genUUID,
- makeFunctionCall,
- makeProvideMessage,
- parseFunctionCall
-} from "./functionCall";
-
import Websockets from "libp2p-websockets";
import Mplex from "libp2p-mplex";
import SECIO from "libp2p-secio";
@@ -33,6 +23,7 @@ import pipe from "it-pipe";
import Multiaddr from "multiaddr";
import PeerId from "peer-id";
import * as log from 'loglevel';
+import {parseParticle, Particle, stringifyParticle} from "./particle";
export const PROTOCOL_NAME = '/fluence/faas/1.0.0';
@@ -45,32 +36,19 @@ enum Status {
export class FluenceConnection {
private readonly selfPeerId: PeerId;
- readonly sender: Address;
- readonly replyTo: Address;
+ readonly relay: PeerId;
private node: LibP2p;
private readonly address: Multiaddr;
readonly nodePeerId: PeerId;
private readonly selfPeerIdStr: string;
- private readonly handleCall: (call: FunctionCall) => FunctionCall | undefined;
+ private readonly handleCall: (call: Particle) => void;
- constructor(multiaddr: Multiaddr, hostPeerId: PeerId, selfPeerId: PeerId, sender: Address, replyTo: Address, handleCall: (call: FunctionCall) => FunctionCall | undefined) {
+ constructor(multiaddr: Multiaddr, hostPeerId: PeerId, selfPeerId: PeerId, handleCall: (call: Particle) => void) {
this.selfPeerId = selfPeerId;
this.handleCall = handleCall;
this.selfPeerIdStr = selfPeerId.toB58String();
this.address = multiaddr;
this.nodePeerId = hostPeerId;
- this.sender = sender
- this.replyTo = replyTo
- }
-
- makeReplyTo(reply?: string): Address {
- if (reply) {
- let replyToWithHash = {...this.replyTo}
- if (typeof reply === "string") replyToWithHash.hash = reply;
- return replyToWithHash;
- } else {
- return this.sender;
- }
}
async connect() {
@@ -114,13 +92,8 @@ export class FluenceConnection {
for await (const msg of source) {
try {
log.debug(_this.selfPeerIdStr);
- let call = parseFunctionCall(msg);
- let response = _this.handleCall(call);
-
- // send a response if it exists, do nothing otherwise
- if (response) {
- await _this.sendCall(response);
- }
+ let particle = parseParticle(msg);
+ _this.handleCall(particle);
} catch(e) {
log.error("error on handling a new incoming message: " + e);
}
@@ -146,39 +119,20 @@ export class FluenceConnection {
this.status = Status.Disconnected;
}
- private async sendCall(call: FunctionCall) {
- let callStr = callToString(call);
- log.debug("send function call: " + JSON.stringify(JSON.parse(callStr), undefined, 2));
- log.debug(call);
+ async sendParticle(particle: Particle): Promise {
+ this.checkConnectedOrThrow();
+
+ let particleStr = stringifyParticle(particle);
+ log.debug("send function call: \n" + JSON.stringify(particle, undefined, 2));
// create outgoing substream
const conn = await this.node.dialProtocol(this.address, PROTOCOL_NAME) as {stream: Stream; protocol: string};
pipe(
- [callStr],
+ [particleStr],
// at first, make a message varint
encode(),
conn.stream.sink,
);
}
-
- /**
- * Send FunctionCall to the connected node.
- */
- async sendFunctionCall(target: Address, args: any, moduleId?: string, fname?: string, msgId?: string, name?: string) {
- this.checkConnectedOrThrow();
-
- let replyTo;
- if (msgId) replyTo = this.makeReplyTo(msgId);
-
- let call = makeFunctionCall(genUUID(), target, this.sender, args, moduleId, fname, replyTo, name);
-
- await this.sendCall(call);
- }
-
- async provideName(name: string) {
- let target = createPeerAddress(this.nodePeerId.toB58String())
- let regMsg = await makeProvideMessage(name, target, this.sender, this.replyTo);
- await this.sendCall(regMsg);
- }
}
diff --git a/src/functionCall.ts b/src/functionCall.ts
deleted file mode 100644
index e8f3d5cf..00000000
--- a/src/functionCall.ts
+++ /dev/null
@@ -1,106 +0,0 @@
-/*
- * Copyright 2020 Fluence Labs Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import {
- Address, addressToString, parseAddress
-} from "./address";
-import { v4 as uuidv4 } from 'uuid';
-import * as log from 'loglevel';
-
-export interface FunctionCall {
- uuid: string,
- target: Address,
- reply_to?: Address,
- sender: Address,
- "module"?: string,
- fname?: string,
- arguments: any,
- name?: string,
- action: "FunctionCall"
-}
-
-export function callToString(call: FunctionCall) {
- let obj: any = {...call};
-
- if (obj.reply_to) {
- obj.reply_to = addressToString(obj.reply_to);
- }
-
- obj.target = addressToString(obj.target);
- obj.sender = addressToString(obj.sender);
-
- return JSON.stringify(obj)
-}
-
-export function makeFunctionCall(uuid: string, target: Address, sender: Address, args: object, moduleId?: string, fname?: string, replyTo?: Address, name?: string): FunctionCall {
-
- return {
- uuid: uuid,
- target: target,
- reply_to: replyTo,
- sender: sender,
- "module": moduleId,
- fname: fname,
- arguments: args,
- name: name,
- action: "FunctionCall"
- }
-}
-
-export function parseFunctionCall(str: string): FunctionCall {
- let json = JSON.parse(str);
- log.debug(JSON.stringify(json, undefined, 2));
-
- let replyTo: Address;
- if (json.reply_to) replyTo = parseAddress(json.reply_to);
-
- if (!json.uuid) throw Error(`there is no 'uuid' field in json.\n${str}`);
- if (!json.target) throw Error(`there is no 'uuid' field in json.\n${str}`);
- if (!json.sender) throw Error(`there is no 'sender' field in json.\n${str}`);
-
- let target = parseAddress(json.target);
- let sender = parseAddress(json.sender);
-
- return {
- uuid: json.uuid,
- target: target,
- reply_to: replyTo,
- sender: sender,
- arguments: json.arguments,
- "module": json.module,
- fname: json.fname,
- name: json.name,
- action: "FunctionCall"
- }
-}
-
-export function genUUID() {
- return uuidv4();
-}
-
-/**
- * Message to provide new name.
- */
-export async function makeProvideMessage(name: string, target: Address, sender: Address, sigAddress: Address): Promise {
- return makeFunctionCall(genUUID(), target, sender, {name: name, address: addressToString(sigAddress)}, "provide", undefined, sender, "provide service_id");
-}
-
-// TODO uncomment when this will be implemented in Fluence network
-/*export function makeUnregisterMessage(serviceId: string, peerId: PeerId): FunctionCall {
- let target = createPeerAddress(peerId.toB58String());
-
- return makeFunctionCall(genUUID(), target, target, {key: serviceId}, undefined, "unregister");
-}*/
diff --git a/src/localServices.ts b/src/localServices.ts
deleted file mode 100644
index 65b1e3a0..00000000
--- a/src/localServices.ts
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Copyright 2020 Fluence Labs Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import {FunctionCall} from "./functionCall";
-
-export class LocalServices {
-
- private services: Map void> = new Map();
-
- constructor() {}
-
- addService(serviceId: string, callback: (req: FunctionCall) => void): void {
- this.services.set(serviceId, callback);
- }
-
- getAllServices(): Map void> {
- return this.services;
- }
-
- deleteService(serviceId: string): boolean {
- return this.services.delete(serviceId)
- }
-
- // could throw error from service callback
- // returns true if the call was applied
- applyToService(call: FunctionCall): boolean {
- let service = this.services.get(call.module);
- if (service) {
- service(call);
- return true;
- } else {
- return false;
- }
-
- }
-}
diff --git a/src/particle.ts b/src/particle.ts
new file mode 100644
index 00000000..0c4146d4
--- /dev/null
+++ b/src/particle.ts
@@ -0,0 +1,113 @@
+/*
+ * Copyright 2020 Fluence Labs Limited
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+import { v4 as uuidv4 } from 'uuid';
+import PeerId from "peer-id";
+import {encode} from "bs58";
+
+const DEFAULT_TTL = 7000;
+
+export interface Particle {
+ id: string,
+ init_peer_id: string,
+ timestamp: number,
+ ttl: number,
+ script: string,
+ // sign upper fields
+ signature: string,
+ data: object
+}
+
+export async function build(peerId: PeerId, script: string, data: object, ttl?: number): Promise {
+ let id = genUUID();
+ let currentTime = (new Date()).getTime();
+
+ ttl = ttl ?? DEFAULT_TTL;
+
+ let particle: Particle = {
+ id: id,
+ init_peer_id: peerId.toB58String(),
+ timestamp: currentTime,
+ ttl: ttl,
+ script: script,
+ signature: "",
+ data: data
+ }
+
+ particle.signature = await signParticle(peerId, particle);
+
+ return particle;
+}
+
+/**
+ * Copies a particle and stringify it.
+ */
+export function stringifyParticle(call: Particle): string {
+ let obj: any = {...call};
+ obj.action = "Particle"
+
+ // delete it after signatures will be implemented on nodes
+ obj.signature = []
+
+ return JSON.stringify(obj)
+}
+
+
+export function parseParticle(str: string): Particle {
+ let json = JSON.parse(str);
+
+ return {
+ id: json.id,
+ init_peer_id: json.init_peer_id,
+ timestamp: json.timestamp,
+ ttl: json.ttl,
+ script: json.script,
+ signature: json.signature,
+ data: json.data
+ }
+}
+
+export function canonicalBytes(particle: Particle) {
+ let peerIdBuf = Buffer.from(particle.init_peer_id, 'utf8');
+ let idBuf = Buffer.from(particle.id, 'utf8');
+
+ let tsArr = new ArrayBuffer(8);
+ new DataView(tsArr).setBigUint64(0, BigInt(particle.timestamp));
+ let tsBuf = Buffer.from(tsArr);
+
+ let ttlArr = new ArrayBuffer(4);
+ new DataView(ttlArr).setUint32(0, particle.ttl);
+ let ttlBuf = Buffer.from(ttlArr);
+
+ let scriptBuf = Buffer.from(particle.script, 'utf8');
+
+ return Buffer.concat([peerIdBuf, idBuf, tsBuf, ttlBuf, scriptBuf]);
+}
+
+/**
+ * Sign a particle with a private key from peerId.
+ */
+export async function signParticle(peerId: PeerId,
+ particle: Particle): Promise {
+ let bufToSign = canonicalBytes(particle);
+
+ let signature = await peerId.privKey.sign(bufToSign)
+ return encode(signature)
+}
+
+export function genUUID() {
+ return uuidv4();
+}
diff --git a/src/service.ts b/src/service.ts
deleted file mode 100644
index 7b8e6078..00000000
--- a/src/service.ts
+++ /dev/null
@@ -1,39 +0,0 @@
-/*
- * Copyright 2020 Fluence Labs Limited
- *
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-import {FluenceClient} from "./fluenceClient";
-
-export class Service {
- private readonly client: FluenceClient;
- private readonly serviceId: string;
- private readonly peerId: string;
-
- constructor(client: FluenceClient, peerId: string, serviceId: string) {
- this.client = client;
- this.serviceId = serviceId;
- this.peerId = peerId;
- }
-
- /**
- *
- * @param moduleId wich module in service to call
- * @param args parameters to call service
- * @param fname function name if existed
- */
- async call(moduleId: string, args: any, fname?: string): Promise {
- return this.client.callService(this.peerId, this.serviceId, moduleId, args, fname);
- }
-}
diff --git a/src/subscriptions.ts b/src/subscriptions.ts
index 9386303e..431ddac6 100644
--- a/src/subscriptions.ts
+++ b/src/subscriptions.ts
@@ -14,29 +14,45 @@
* limitations under the License.
*/
-import {FunctionCall} from "./functionCall";
-import {Address} from "./address";
+import {Particle} from "./particle";
export class Subscriptions {
- private subscriptions: ((args: any, target: Address, replyTo: Address, module?: string, fname?: string) => (boolean | undefined))[] = [];
+ private subscriptions: Map void> = new Map();
constructor() {}
/**
- * Subscriptions will be applied to all peer and relay messages.
- * If subscription returns true, delete subscription.
- * @param f
+ * Subscriptions will be applied by outside message if id will be the same.
+ *
+ * @param id message identificator
+ * @param f function to use with outside message
+ * @param ttl time to live, subscription will be deleted after this time
*/
- subscribe(f: (args: any, target: Address, replyTo: Address, moduleId?: string, fname?: string) => (boolean | undefined)) {
- this.subscriptions.push(f);
+ subscribe(id: string, f: (particle: Particle) => void, ttl: number) {
+ let _this = this;
+ setTimeout(() => {
+ _this.subscriptions.delete(id)
+ console.log(`Particle with id ${id} deleted by timeout`)
+ }, ttl)
+ this.subscriptions.set(id, f);
}
/**
- * Apply call to all subscriptions and delete subscriptions that return `true`.
- * @param call
+ * A particle will be applied if id of the particle was subscribed earlier.
+ * @param particle
*/
- applyToSubscriptions(call: FunctionCall) {
+ applyToSubscriptions(particle: Particle) {
// if subscription return true - delete it from subscriptions
- this.subscriptions = this.subscriptions.filter(callback => !callback(call.arguments, call.target, call.reply_to, call.module, call.fname))
+ let callback = this.subscriptions.get(particle.id)
+ if (callback) {
+ callback(particle);
+ } else {
+ if (Number(particle.timestamp) + particle.ttl > Date.now()) {
+ console.log("Old particle received. 'ttl' is ended.");
+ } else {
+ console.log("External particle received. 'Stepper' needed on client. Unimplemented.");
+ }
+ console.log(particle);
+ }
}
}
diff --git a/src/test/address.spec.ts b/src/test/address.spec.ts
deleted file mode 100644
index a68e0c0a..00000000
--- a/src/test/address.spec.ts
+++ /dev/null
@@ -1,298 +0,0 @@
-import {
- createPeerAddress,
- createRelayAddress,
- createProviderAddress,
- addressToString,
- parseAddress
-} from "../address";
-import {expect} from 'chai';
-
-import 'mocha';
-import {encode} from "bs58"
-import * as PeerId from "peer-id";
-import {callToString, genUUID, makeFunctionCall, parseFunctionCall} from "../functionCall";
-import Fluence from "../fluence";
-import {certificateFromString, certificateToString, issue} from "../trust/certificate";
-import {TrustGraph} from "../trust/trust_graph";
-import {nodeRootCert} from "../trust/misc";
-import {peerIdToSeed, seedToPeerId} from "../seed";
-import {greetingWASM} from "./greeting_wasm";
-
-describe("Typescript usage suite", () => {
-
- it("should throw an error, if protocol will be without value", () => {
- expect(() => parseAddress("/peer/")).to.throw(Error);
- });
-
- it("should be able to convert service_id address to and from string", () => {
- let addr = createProviderAddress("service_id-1");
- let str = addressToString(addr);
- let parsed = parseAddress(str);
-
- expect(parsed).to.deep.equal(addr)
- });
-
- it("should be able to convert peer address to and from string", () => {
- let pid = PeerId.createFromB58String("QmXduoWjhgMdx3rMZXR3fmkHKdUCeori9K1XkKpqeF5DrU");
- let addr = createPeerAddress(pid.toB58String());
- let str = addressToString(addr);
- let parsed = parseAddress(str);
-
- expect(parsed).to.deep.equal(addr)
- });
-
- it("should be able to convert relay address to and from string", async () => {
- let pid = await PeerId.create();
- let relayid = await PeerId.create();
- let addr = await createRelayAddress(relayid.toB58String(), pid, true);
- let str = addressToString(addr);
- let parsed = parseAddress(str);
-
- expect(parsed).to.deep.equal(addr)
- });
-
- it("should be able to convert function call to and from string", async () => {
- let pid = await PeerId.create();
- let relayid = await PeerId.create();
- let addr = await createRelayAddress(relayid.toB58String(), pid, true);
-
- let addr2 = createPeerAddress(pid.toB58String());
-
- let functionCall = makeFunctionCall(
- "123",
- addr2,
- addr2,
- {
- arg1: "123",
- arg2: 3,
- arg4: [1, 2, 3]
- },
- "mm",
- "fff",
- addr,
- "2444"
- );
-
- let str = callToString(functionCall);
-
- let parsed = parseFunctionCall(str);
-
- expect(parsed).to.deep.equal(functionCall);
-
- let functionCallWithOptional = makeFunctionCall(
- "123",
- addr,
- addr,
- {
- arg1: "123",
- arg2: 3,
- arg4: [1, 2, 3]
- }
- );
-
- let str2 = callToString(functionCallWithOptional);
-
- let parsed2 = parseFunctionCall(str2);
-
- expect(parsed2).to.deep.equal(functionCallWithOptional)
-
- });
-
- it("should create private key from seed and back", async function () {
- let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201];
- let seedStr = encode(seed)
- console.log("SEED STR: " + seedStr)
- let pid = await seedToPeerId(seedStr)
- expect(peerIdToSeed(pid)).to.be.equal(seedStr)
- })
-
- it("should serialize and deserialize certificate correctly", async function () {
- let cert = `11
-1111
-5566Dn4ZXXbBK5LJdUsE7L3pG9qdAzdPY47adjzkhEx9
-3HNXpW2cLdqXzf4jz5EhsGEBFkWzuVdBCyxzJUZu2WPVU7kpzPjatcqvdJMjTtcycVAdaV5qh2fCGphSmw8UMBkr
-158981172690500
-1589974723504
-2EvoZAZaGjKWFVdr36F1jphQ5cW7eK3yM16mqEHwQyr7
-4UAJQWzB3nTchBtwARHAhsn7wjdYtqUHojps9xV6JkuLENV8KRiWM3BhQByx5KijumkaNjr7MhHjouLawmiN1A4d
-1590061123504
-1589974723504`
-
- let deser = await certificateFromString(cert);
- let ser = certificateToString(deser);
-
- expect(ser).to.be.equal(cert);
- });
-
- // delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
- it.skip("test provide", async function () {
- this.timeout(15000);
- await testProvide();
- });
-
- // delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
- it.skip("test certs", async function () {
- this.timeout(15000);
- await testCerts();
- });
-
- // delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
- it.skip("test upload wasm", async function () {
- this.timeout(15000);
- await testUploadWasm();
- });
-
- // delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
- it.skip("test list of services and interfaces", async function () {
- this.timeout(15000);
- await testServicesAndInterfaces();
- });
-});
-
-const delay = (ms: number) => new Promise(res => setTimeout(res, ms));
-
-export async function testCerts() {
- let key1 = await Fluence.generatePeerId();
- let key2 = await Fluence.generatePeerId();
-
- // connect to two different nodes
- let cl1 = await Fluence.connect("/dns4/134.209.186.43/tcp/9003/ws/p2p/12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb", key1);
- let cl2 = await Fluence.connect("/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er", key2);
-
- let trustGraph1 = new TrustGraph(cl1);
- let trustGraph2 = new TrustGraph(cl2);
-
- let issuedAt = new Date();
- let expiresAt = new Date();
- // certificate expires after one day
- expiresAt.setDate(new Date().getDate() + 1);
-
- // create root certificate for key1 and extend it with key2
- let rootCert = await nodeRootCert(key1);
- let extended = await issue(key1, key2, rootCert, expiresAt.getTime(), issuedAt.getTime());
-
- // publish certificates to Fluence network
- await trustGraph1.publishCertificates(key2.toB58String(), [extended]);
-
- // get certificates from network
- let certs = await trustGraph2.getCertificates(key2.toB58String());
-
- // root certificate could be different because nodes save trusts with bigger `expiresAt` date and less `issuedAt` date
- expect(certs[0].chain[1].issuedFor.toB58String()).to.be.equal(extended.chain[1].issuedFor.toB58String())
- expect(certs[0].chain[1].signature).to.be.equal(extended.chain[1].signature)
- expect(certs[0].chain[1].expiresAt).to.be.equal(extended.chain[1].expiresAt)
- expect(certs[0].chain[1].issuedAt).to.be.equal(extended.chain[1].issuedAt)
-
- await cl1.disconnect();
- await cl2.disconnect();
-}
-
-export async function testUploadWasm() {
- let key1 = await Fluence.generatePeerId();
- let cl1 = await Fluence.connect("/dns4/134.209.186.43/tcp/9100/ws/p2p/12D3KooWPnLxnY71JDxvB3zbjKu9k1BCYNthGZw6iGrLYsR1RnWM", key1);
-
- let moduleName = genUUID()
- await cl1.addModule(greetingWASM, moduleName, 100, [], {}, []);
-
- let availableModules = await cl1.getAvailableModules();
- console.log(availableModules);
-
- let peerId1 = "12D3KooWPnLxnY71JDxvB3zbjKu9k1BCYNthGZw6iGrLYsR1RnWM"
-
- let blueprintId = await cl1.addBlueprint("some test blueprint", [moduleName], peerId1)
- let blueprints = await cl1.getAvailableBlueprints(peerId1)
- console.log(blueprints);
-
- let serviceId = await cl1.createService(peerId1, blueprintId);
-
- let service = cl1.getService(peerId1, serviceId);
-
- let argName = genUUID();
- let resp = await service.call(moduleName, {name: argName}, "greeting")
-
- expect(resp.result).to.be.equal(`Hi, ${argName}`)
-
- await cl1.disconnect();
-}
-
-export async function testServicesAndInterfaces() {
- let key1 = await Fluence.generatePeerId();
- let key2 = await Fluence.generatePeerId();
-
- // connect to two different nodes
- let cl1 = await Fluence.connect("/dns4/134.209.186.43/tcp/9100/ws/p2p/12D3KooWPnLxnY71JDxvB3zbjKu9k1BCYNthGZw6iGrLYsR1RnWM", key1);
- let cl2 = await Fluence.connect("/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er", key2);
-
- let peerId1 = "12D3KooWPnLxnY71JDxvB3zbjKu9k1BCYNthGZw6iGrLYsR1RnWM"
-
- let blueprintId = await cl1.addBlueprint("some test blueprint", ["ipfs_node"], peerId1)
- let serviceId = await cl2.createService(peerId1, blueprintId);
-
- let resp = await cl2.callService(peerId1, serviceId, "ipfs_node", {}, "get_address")
- console.log(resp)
-
- let interfaces = await cl1.getActiveInterfaces();
- let interfaceResp = await cl1.getInterface(serviceId, peerId1);
-
- console.log(interfaces);
- console.log(interfaceResp);
-
- let availableModules = await cl1.getAvailableModules(peerId1);
- console.log(availableModules);
-
- await cl1.disconnect();
- await cl2.disconnect();
-}
-
-// Shows how to register and call new service in Fluence network
-export async function testProvide() {
-
- let key1 = await Fluence.generatePeerId();
- let key2 = await Fluence.generatePeerId();
-
- // connect to two different nodes
- let cl1 = await Fluence.connect("/dns4/134.209.186.43/tcp/9003/ws/p2p/12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb", key1);
- let cl2 = await Fluence.connect("/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er", key2);
-
- // service name that we will register with one connection and call with another
- let providerId = "sum-calculator-" + genUUID();
-
- // register service that will add two numbers and send a response with calculation result
- await cl1.provideName(providerId, async (req) => {
- console.log("message received");
- console.log(req);
-
- console.log("send response");
-
- let message = {msgId: req.arguments.msgId, result: req.arguments.one + req.arguments.two};
-
-
- await cl1.sendCall({target: req.reply_to, args: message});
- });
-
- let req = {one: 12, two: 23};
-
- // send call to `sum-calculator` service with two numbers
- let response = await cl2.callProvider(providerId, req, providerId);
-
- let result = response.result;
- expect(result).to.be.equal(35)
-
- await cl1.connect("/dns4/relay02.fluence.dev/tcp/19001/wss/p2p/12D3KooWEXNUbCXooUwHrHBbrmjsrpHXoEphPwbjQXEGyzbqKnE9");
-
- await delay(1000);
-
- // send call to `sum-calculator` service with two numbers
- await cl2.callProvider(providerId, req, providerId, undefined, "calculator request");
-
- let response2 = await cl2.callProvider(providerId, req, providerId);
-
- let result2 = await response2.result;
- console.log("RESULT:");
- console.log(response2);
- expect(result2).to.be.equal(35);
-
- await cl1.disconnect();
- await cl2.disconnect();
-}
-
diff --git a/src/test/client.spec.ts b/src/test/client.spec.ts
new file mode 100644
index 00000000..f3b4379b
--- /dev/null
+++ b/src/test/client.spec.ts
@@ -0,0 +1,97 @@
+
+import {expect} from 'chai';
+
+import 'mocha';
+import {encode} from "bs58"
+import Fluence from "../fluence";
+import {certificateFromString, certificateToString, issue} from "../trust/certificate";
+import {TrustGraph} from "../trust/trust_graph";
+import {nodeRootCert} from "../trust/misc";
+import {peerIdToSeed, seedToPeerId} from "../seed";
+import {build, Particle} from "../particle";
+
+describe("Typescript usage suite", () => {
+
+ it("should create private key from seed and back", async function () {
+ let seed = [46, 188, 245, 171, 145, 73, 40, 24, 52, 233, 215, 163, 54, 26, 31, 221, 159, 179, 126, 106, 27, 199, 189, 194, 80, 133, 235, 42, 42, 247, 80, 201];
+ let seedStr = encode(seed)
+ console.log("SEED STR: " + seedStr)
+ let pid = await seedToPeerId(seedStr)
+ expect(peerIdToSeed(pid)).to.be.equal(seedStr)
+ })
+
+ it("should serialize and deserialize certificate correctly", async function () {
+ let cert = `11
+1111
+5566Dn4ZXXbBK5LJdUsE7L3pG9qdAzdPY47adjzkhEx9
+3HNXpW2cLdqXzf4jz5EhsGEBFkWzuVdBCyxzJUZu2WPVU7kpzPjatcqvdJMjTtcycVAdaV5qh2fCGphSmw8UMBkr
+158981172690500
+1589974723504
+2EvoZAZaGjKWFVdr36F1jphQ5cW7eK3yM16mqEHwQyr7
+4UAJQWzB3nTchBtwARHAhsn7wjdYtqUHojps9xV6JkuLENV8KRiWM3BhQByx5KijumkaNjr7MhHjouLawmiN1A4d
+1590061123504
+1589974723504`
+
+ let deser = await certificateFromString(cert);
+ let ser = certificateToString(deser);
+
+ expect(ser).to.be.equal(cert);
+ });
+
+ it("test new client", async function () {
+ let key1 = await Fluence.generatePeerId();
+ let key2 = await Fluence.generatePeerId();
+
+ // connect to two different nodes
+ let cl1 = await Fluence.connect("/dns4/134.209.186.43/tcp/9003/ws/p2p/12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb", key1);
+
+ let particle = await build(key1, "123", {a: 777, b: "567"})
+
+ let result = await cl1.sendParticle(particle)
+ console.log(result)
+ });
+
+ // delete `.skip` and run `npm run test` to check service's and certificate's api with Fluence nodes
+ it.skip("test certs", async function () {
+ this.timeout(15000);
+ await testCerts();
+ });
+});
+
+const delay = (ms: number) => new Promise(res => setTimeout(res, ms));
+
+export async function testCerts() {
+ let key1 = await Fluence.generatePeerId();
+ let key2 = await Fluence.generatePeerId();
+
+ // connect to two different nodes
+ let cl1 = await Fluence.connect("/dns4/134.209.186.43/tcp/9003/ws/p2p/12D3KooWBUJifCTgaxAUrcM9JysqCcS4CS8tiYH5hExbdWCAoNwb", key1);
+ let cl2 = await Fluence.connect("/ip4/134.209.186.43/tcp/9002/ws/p2p/12D3KooWHk9BjDQBUqnavciRPhAYFvqKBe4ZiPPvde7vDaqgn5er", key2);
+
+ let trustGraph1 = new TrustGraph(cl1);
+ let trustGraph2 = new TrustGraph(cl2);
+
+ let issuedAt = new Date();
+ let expiresAt = new Date();
+ // certificate expires after one day
+ expiresAt.setDate(new Date().getDate() + 1);
+
+ // create root certificate for key1 and extend it with key2
+ let rootCert = await nodeRootCert(key1);
+ let extended = await issue(key1, key2, rootCert, expiresAt.getTime(), issuedAt.getTime());
+
+ // publish certificates to Fluence network
+ await trustGraph1.publishCertificates(key2.toB58String(), [extended]);
+
+ // get certificates from network
+ let certs = await trustGraph2.getCertificates(key2.toB58String());
+
+ // root certificate could be different because nodes save trusts with bigger `expiresAt` date and less `issuedAt` date
+ expect(certs[0].chain[1].issuedFor.toB58String()).to.be.equal(extended.chain[1].issuedFor.toB58String())
+ expect(certs[0].chain[1].signature).to.be.equal(extended.chain[1].signature)
+ expect(certs[0].chain[1].expiresAt).to.be.equal(extended.chain[1].expiresAt)
+ expect(certs[0].chain[1].issuedAt).to.be.equal(extended.chain[1].issuedAt)
+
+ await cl1.disconnect();
+ await cl2.disconnect();
+}
diff --git a/src/trust/trust_graph.ts b/src/trust/trust_graph.ts
index 458bb36d..2f7224e1 100644
--- a/src/trust/trust_graph.ts
+++ b/src/trust/trust_graph.ts
@@ -18,6 +18,7 @@ import {FluenceClient} from "../fluenceClient";
import {Certificate, certificateFromString, certificateToString} from "./certificate";
import * as log from 'loglevel';
+// TODO update after 'aquamarine' implemented
// The client to interact with the Fluence trust graph API
export class TrustGraph {
@@ -34,10 +35,11 @@ export class TrustGraph {
certsStr.push(await certificateToString(cert));
}
- let response = await this.client.callPeer("add_certificates", {
+ /*let response = await this.client.callPeer("add_certificates", {
certificates: certsStr,
peer_id: peerId
- });
+ });*/
+ let response: any = {};
if (response.reason) {
throw Error(response.reason)
@@ -50,9 +52,10 @@ export class TrustGraph {
// Get certificates that stores in Kademlia neighbourhood by `peerId` key.
async getCertificates(peerId: string): Promise {
- let resp = await this.client.callPeer("certificates", {
+ let resp: any = {};
+ /*let resp = await this.client.callPeer("certificates", {
peer_id: peerId
- });
+ });*/
let certificatesRaw = resp.certificates