From afacef513807773a5ea63bb0ec609459aa5409b5 Mon Sep 17 00:00:00 2001 From: Dima Date: Fri, 26 Jun 2020 16:12:37 +0300 Subject: [PATCH] Protocol updates (#914) --- package.json | 2 +- src/address.ts | 6 ++-- src/fluence_client.ts | 73 ++++++++++++++++++++++++++++----------- src/fluence_connection.ts | 18 +++++++--- src/function_call.ts | 24 +++++++------ src/services.ts | 4 +-- src/test/address.spec.ts | 2 ++ src/trust/trust_graph.ts | 4 +-- 8 files changed, 89 insertions(+), 44 deletions(-) diff --git a/package.json b/package.json index 8b58955a..ce6c6aa1 100644 --- a/package.json +++ b/package.json @@ -1,6 +1,6 @@ { "name": "fluence", - "version": "0.5.6", + "version": "0.6.1", "description": "the browser js-libp2p client for the Fluence network", "main": "./dist/fluence.js", "typings": "./dist/fluence.d.ts", diff --git a/src/address.ts b/src/address.ts index 1461baad..3ca85340 100644 --- a/src/address.ts +++ b/src/address.ts @@ -27,7 +27,7 @@ export interface Protocol { } export enum ProtocolType { - Service = "service", + Providers = "providers", Peer = "peer", Signature = "signature", Client = "client" @@ -64,7 +64,7 @@ export function parseProtocol(protocol: string, protocolIterator: IterableIterat protocol = protocol.toLocaleLowerCase(); switch (protocol) { - case ProtocolType.Service: + case ProtocolType.Providers: return protocolWithValue(protocol, protocolIterator); case ProtocolType.Client: return protocolWithValue(protocol, protocolIterator); @@ -100,7 +100,7 @@ export async function createRelayAddress(relay: string, peerId: PeerId, withSig: export function createServiceAddress(service: string): Address { - let protocol = {protocol: ProtocolType.Service, value: service}; + let protocol = {protocol: ProtocolType.Providers, value: service}; return { protocols: [protocol] diff --git a/src/fluence_client.ts b/src/fluence_client.ts index f504c839..07f33cd6 100644 --- a/src/fluence_client.ts +++ b/src/fluence_client.ts @@ -101,7 +101,22 @@ export class FluenceClient { */ async sendServiceCall(serviceId: string, args: any, name?: string) { if (this.connection && this.connection.isConnected()) { - await this.connection.sendServiceCall(serviceId, args, name); + await this.connection.sendServiceCall(serviceId, false, args, name); + } else { + throw Error("client is not connected") + } + } + + /** + * Send a call to the local service on a peer the client connected with. + * + * @param serviceId + * @param args message to the service + * @param name common field for debug purposes + */ + async sendServiceLocalCall(serviceId: string, args: any, name?: string) { + if (this.connection && this.connection.isConnected()) { + await this.connection.sendServiceCall(serviceId, true, args, name); } else { throw Error("client is not connected") } @@ -119,6 +134,18 @@ export class FluenceClient { return await this.waitResponse(predicate); } + /** + * Send a call to the local service and wait a response matches predicate on a peer the client connected with. + * + * @param serviceId + * @param args message to the service + * @param predicate will be applied to each incoming call until it matches + */ + async sendServiceLocalCallWaitResponse(serviceId: string, args: any, predicate: (args: any, target: Address, replyTo: Address) => (boolean | undefined)): Promise { + await this.sendServiceLocalCall(serviceId, args); + return await this.waitResponse(predicate); + } + /** * Handle incoming call. * If FunctionCall returns - we should send it as a response. @@ -147,31 +174,35 @@ export class FluenceClient { _this.subscriptions.applyToSubscriptions(call); switch (lastProtocol.protocol) { - case ProtocolType.Service: - try { - // call of the service, service should handle response sending, error handling, requests to other services - let applied = _this.services.applyToService(lastProtocol.value, call); + case ProtocolType.Providers: - // if the request hasn't been applied, there is no such service. Return an error. - if (!applied) { - console.log(`there is no service ${lastProtocol.value}`); - return this.responseCall(call.reply_to, { - reason: `there is no such service`, - msg: call - }); - } - } catch (e) { - // if service throw an error, return it to the sender - return this.responseCall(call.reply_to, { - reason: `error on execution: ${e}`, - msg: call - }); - } return undefined; case ProtocolType.Client: if (lastProtocol.value === _this.selfPeerIdStr) { - console.log(`relay call: ${call}`); + console.log(`relay call:`); + console.log(JSON.stringify(call, undefined, 2)); + if (call.module) { + try { + // call of the service, service should handle response sending, error handling, requests to other services + let applied = _this.services.applyToService(call); + + // if the request hasn't been applied, there is no such service. Return an error. + if (!applied) { + console.log(`there is no service ${lastProtocol.value}`); + return this.responseCall(call.reply_to, { + reason: `there is no such service`, + msg: call + }); + } + } catch (e) { + // if service throw an error, return it to the sender + return this.responseCall(call.reply_to, { + reason: `error on execution: ${e}`, + msg: call + }); + } + } } else { console.warn(`this relay call is not for me: ${callToString(call)}`); return this.responseCall(call.reply_to, { diff --git a/src/fluence_connection.ts b/src/fluence_connection.ts index 81047308..f511cfed 100644 --- a/src/fluence_connection.ts +++ b/src/fluence_connection.ts @@ -14,7 +14,7 @@ * limitations under the License. */ -import {Address} from "./address"; +import {Address, createPeerAddress, createServiceAddress} from "./address"; import { callToString, FunctionCall, @@ -89,8 +89,15 @@ export class FluenceConnection { /** * Sends remote service_id call. */ - async sendServiceCall(serviceId: string, args: any, name?: string) { - let regMsg = makeCall(serviceId, args, this.sender, this.sender, name); + async sendServiceCall(serviceId: string, isLocal: boolean, args: any, name?: string) { + let target; + if (isLocal) { + target = createPeerAddress(this.nodePeerId.toB58String()); + } else { + target = createServiceAddress(serviceId); + } + + let regMsg = makeCall(serviceId, target, args, this.sender, this.sender, name); await this.sendCall(regMsg); } @@ -186,13 +193,14 @@ export class FluenceConnection { let replyTo; if (reply) replyTo = this.sender; - let call = makeFunctionCall(genUUID(), target, this.sender, args, replyTo, name); + let call = makeFunctionCall(genUUID(), target, this.sender, args, undefined, undefined, replyTo, name); await this.sendCall(call); } async registerService(serviceId: string) { - let regMsg = await makeRegisterMessage(serviceId, this.sender); + let target = createPeerAddress(this.nodePeerId.toB58String()) + let regMsg = await makeRegisterMessage(serviceId, target, this.sender); await this.sendCall(regMsg); } } diff --git a/src/function_call.ts b/src/function_call.ts index 81b49b8e..4839df00 100644 --- a/src/function_call.ts +++ b/src/function_call.ts @@ -27,6 +27,8 @@ export interface FunctionCall { target: Address, reply_to?: Address, sender: Address, + "module"?: string, + fname?: string, arguments: any, name?: string, action: "FunctionCall" @@ -45,13 +47,15 @@ export function callToString(call: FunctionCall) { return JSON.stringify(obj) } -export function makeFunctionCall(uuid: string, target: Address, sender: Address, args: object, replyTo?: Address, name?: string): FunctionCall { +export function makeFunctionCall(uuid: string, target: Address, sender: Address, args: object, moduleF?: string, fname?: string, replyTo?: Address, name?: string): FunctionCall { return { uuid: uuid, target: target, reply_to: replyTo, sender: sender, + "module": moduleF, + fname: fname, arguments: args, name: name, action: "FunctionCall" @@ -78,6 +82,8 @@ export function parseFunctionCall(str: string): FunctionCall { reply_to: replyTo, sender: sender, arguments: json.arguments, + "module": json.module, + fname: json.fname, name: json.name, action: "FunctionCall" } @@ -94,7 +100,7 @@ export function genUUID() { export async function makeRelayCall(client: PeerId, relay: PeerId, msg: any, sender: Address, replyTo?: Address, name?: string): Promise { let relayAddress = await createRelayAddress(relay.toB58String(), client, false); - return makeFunctionCall(genUUID(), relayAddress, sender, msg, replyTo, name); + return makeFunctionCall(genUUID(), relayAddress, sender, msg, undefined, undefined, replyTo, name); } /** @@ -103,25 +109,23 @@ export async function makeRelayCall(client: PeerId, relay: PeerId, msg: any, sen export function makePeerCall(client: PeerId, msg: any, sender: Address, replyTo?: Address, name?: string): FunctionCall { let peerAddress = createPeerAddress(client.toB58String()); - return makeFunctionCall(genUUID(), peerAddress, sender, msg, replyTo, name); + return makeFunctionCall(genUUID(), peerAddress, sender, msg, undefined, undefined, replyTo, name); } /** * Message to call remote service_id */ -export function makeCall(functionId: string, args: any, sender: Address, replyTo?: Address, name?: string): FunctionCall { - let target = createServiceAddress(functionId); +export function makeCall(functionId: string, target: Address, args: any, sender: Address, replyTo?: Address, name?: string): FunctionCall { - return makeFunctionCall(genUUID(), target, sender, args, replyTo, name); + + return makeFunctionCall(genUUID(), target, sender, args, functionId, undefined, replyTo, name); } /** * Message to register new service_id. */ -export async function makeRegisterMessage(serviceId: string, sender: Address): Promise { - let target = createServiceAddress("provide"); - - return makeFunctionCall(genUUID(), target, sender, {service_id: serviceId}, sender, "provide service_id"); +export async function makeRegisterMessage(serviceId: string, target: Address, sender: Address): Promise { + return makeFunctionCall(genUUID(), target, sender, {service_id: serviceId}, "provide", undefined, sender, "provide service_id"); } // TODO uncomment when this will be implemented in Fluence network diff --git a/src/services.ts b/src/services.ts index c4e54e0d..828721ab 100644 --- a/src/services.ts +++ b/src/services.ts @@ -36,8 +36,8 @@ export class Services { // could throw error from service callback // returns true if the call was applied - applyToService(serviceId: string, call: FunctionCall): boolean { - let service = this.services.get(serviceId); + applyToService(call: FunctionCall): boolean { + let service = this.services.get(call.module); if (service) { service(call); return true; diff --git a/src/test/address.spec.ts b/src/test/address.spec.ts index 3f48ad73..dc9bf95e 100644 --- a/src/test/address.spec.ts +++ b/src/test/address.spec.ts @@ -66,6 +66,8 @@ describe("Typescript usage suite", () => { arg2: 3, arg4: [1, 2, 3] }, + "mm", + "fff", addr, "2444" ); diff --git a/src/trust/trust_graph.ts b/src/trust/trust_graph.ts index 12a0355c..1cbd8840 100644 --- a/src/trust/trust_graph.ts +++ b/src/trust/trust_graph.ts @@ -36,7 +36,7 @@ export class TrustGraph { let msgId = genUUID() - let response = await this.client.sendServiceCallWaitResponse("add_certificates", { + let response = await this.client.sendServiceLocalCallWaitResponse("add_certificates", { certificates: certsStr, msg_id: msgId, peer_id: peerId @@ -64,7 +64,7 @@ export class TrustGraph { // Get certificates that stores in Kademlia neighbourhood by `peerId` key. async getCertificates(peerId: string): Promise { let msgId = genUUID(); - let resp = await this.client.sendServiceCallWaitResponse("certificates", { + let resp = await this.client.sendServiceLocalCallWaitResponse("certificates", { msg_id: msgId, peer_id: peerId }, (args) => args.msg_id && args.msg_id === msgId)