2020-05-14 15:20:39 +03:00
|
|
|
/*
|
2020-05-14 17:30:17 +03:00
|
|
|
* Copyright 2020 Fluence Labs Limited
|
2020-05-14 15:20:39 +03:00
|
|
|
*
|
2020-05-14 17:30:17 +03:00
|
|
|
* Licensed under the Apache License, Version 2.0 (the "License");
|
|
|
|
* you may not use this file except in compliance with the License.
|
|
|
|
* You may obtain a copy of the License at
|
2020-05-14 15:20:39 +03:00
|
|
|
*
|
2020-05-14 17:30:17 +03:00
|
|
|
* http://www.apache.org/licenses/LICENSE-2.0
|
2020-05-14 15:20:39 +03:00
|
|
|
*
|
2020-05-14 17:30:17 +03:00
|
|
|
* Unless required by applicable law or agreed to in writing, software
|
|
|
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
|
|
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
|
|
|
* See the License for the specific language governing permissions and
|
|
|
|
* limitations under the License.
|
2020-05-14 15:20:39 +03:00
|
|
|
*/
|
|
|
|
|
2020-07-27 16:39:54 +03:00
|
|
|
import {Address, createPeerAddress} from "./address";
|
2020-05-14 15:20:39 +03:00
|
|
|
import {
|
|
|
|
callToString,
|
|
|
|
FunctionCall,
|
|
|
|
genUUID,
|
|
|
|
makeFunctionCall,
|
2020-07-27 16:39:54 +03:00
|
|
|
makeProvideMessage,
|
2020-05-14 15:20:39 +03:00
|
|
|
parseFunctionCall
|
2020-08-20 20:28:32 +03:00
|
|
|
} from "./functionCall";
|
2020-08-26 18:48:17 +03:00
|
|
|
|
2020-05-14 15:20:39 +03:00
|
|
|
import Websockets from "libp2p-websockets";
|
|
|
|
import Mplex from "libp2p-mplex";
|
|
|
|
import SECIO from "libp2p-secio";
|
|
|
|
import Peer from "libp2p";
|
|
|
|
import {decode, encode} from "it-length-prefixed";
|
|
|
|
import pipe from "it-pipe";
|
|
|
|
import Multiaddr from "multiaddr";
|
2020-08-26 18:48:17 +03:00
|
|
|
import PeerId from "peer-id";
|
2020-05-14 15:20:39 +03:00
|
|
|
|
|
|
|
export const PROTOCOL_NAME = '/fluence/faas/1.0.0';
|
|
|
|
|
|
|
|
enum Status {
|
|
|
|
Initializing = "Initializing",
|
|
|
|
Connected = "Connected",
|
|
|
|
Disconnected = "Disconnected"
|
|
|
|
}
|
|
|
|
|
|
|
|
export class FluenceConnection {
|
|
|
|
|
2020-08-26 18:48:17 +03:00
|
|
|
private readonly selfPeerId: PeerId;
|
2020-06-12 19:54:09 +03:00
|
|
|
readonly sender: Address;
|
2020-05-14 15:20:39 +03:00
|
|
|
private node: LibP2p;
|
|
|
|
private readonly address: Multiaddr;
|
2020-07-27 16:39:54 +03:00
|
|
|
readonly nodePeerId: PeerId;
|
2020-08-26 18:48:17 +03:00
|
|
|
private readonly selfPeerIdStr: string;
|
2020-05-14 15:20:39 +03:00
|
|
|
private readonly handleCall: (call: FunctionCall) => FunctionCall | undefined;
|
|
|
|
|
2020-08-26 18:48:17 +03:00
|
|
|
constructor(multiaddr: Multiaddr, hostPeerId: PeerId, selfPeerId: PeerId, sender: Address, handleCall: (call: FunctionCall) => FunctionCall | undefined) {
|
|
|
|
this.selfPeerId = selfPeerId;
|
2020-05-14 15:20:39 +03:00
|
|
|
this.handleCall = handleCall;
|
2020-08-26 18:48:17 +03:00
|
|
|
this.selfPeerIdStr = selfPeerId.toB58String();
|
2020-05-14 15:20:39 +03:00
|
|
|
this.address = multiaddr;
|
|
|
|
this.nodePeerId = hostPeerId;
|
2020-06-19 14:29:06 +03:00
|
|
|
this.sender = sender
|
2020-05-14 15:20:39 +03:00
|
|
|
}
|
|
|
|
|
2020-07-27 16:39:54 +03:00
|
|
|
makeReplyTo(reply?: string): Address {
|
|
|
|
if (reply) {
|
2020-06-30 16:34:05 +03:00
|
|
|
let replyToWithHash = {...this.sender}
|
2020-07-27 16:39:54 +03:00
|
|
|
if (typeof reply === "string") replyToWithHash.hash = reply;
|
2020-06-30 16:34:05 +03:00
|
|
|
return replyToWithHash;
|
|
|
|
} else {
|
|
|
|
return this.sender;
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
2020-05-14 15:20:39 +03:00
|
|
|
async connect() {
|
2020-08-26 18:48:17 +03:00
|
|
|
let peerInfo = this.selfPeerId;
|
2020-05-14 15:20:39 +03:00
|
|
|
this.node = await Peer.create({
|
2020-08-26 18:48:17 +03:00
|
|
|
peerId: peerInfo,
|
2020-05-14 15:20:39 +03:00
|
|
|
config: {},
|
|
|
|
modules: {
|
|
|
|
transport: [Websockets],
|
|
|
|
streamMuxer: [Mplex],
|
|
|
|
connEncryption: [SECIO],
|
|
|
|
peerDiscovery: []
|
|
|
|
},
|
|
|
|
});
|
|
|
|
|
|
|
|
await this.startReceiving();
|
|
|
|
}
|
|
|
|
|
|
|
|
isConnected() {
|
|
|
|
return this.status === Status.Connected
|
|
|
|
}
|
|
|
|
|
|
|
|
// connection status. If `Disconnected`, it cannot be reconnected
|
|
|
|
private status: Status = Status.Initializing;
|
|
|
|
|
|
|
|
private async startReceiving() {
|
|
|
|
if (this.status === Status.Initializing) {
|
|
|
|
await this.node.start();
|
|
|
|
|
2020-08-26 18:48:17 +03:00
|
|
|
console.log("dialing to the node with address: " + this.node.peerId.toB58String());
|
2020-05-14 15:20:39 +03:00
|
|
|
|
|
|
|
await this.node.dial(this.address);
|
|
|
|
|
|
|
|
let _this = this;
|
|
|
|
|
|
|
|
this.node.handle([PROTOCOL_NAME], async ({connection, stream}) => {
|
|
|
|
pipe(
|
|
|
|
stream.source,
|
|
|
|
decode(),
|
|
|
|
async function (source: AsyncIterable<string>) {
|
|
|
|
for await (const msg of source) {
|
|
|
|
try {
|
2020-08-26 18:48:17 +03:00
|
|
|
console.log(_this.selfPeerIdStr);
|
2020-05-14 15:20:39 +03:00
|
|
|
let call = parseFunctionCall(msg);
|
|
|
|
let response = _this.handleCall(call);
|
|
|
|
|
|
|
|
// send a response if it exists, do nothing otherwise
|
|
|
|
if (response) {
|
|
|
|
await _this.sendCall(response);
|
|
|
|
}
|
|
|
|
} catch(e) {
|
|
|
|
console.log("error on handling a new incoming message: " + e);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
}
|
|
|
|
)
|
|
|
|
});
|
|
|
|
|
|
|
|
this.status = Status.Connected;
|
|
|
|
} else {
|
|
|
|
throw Error(`can't start receiving. Status: ${this.status}`);
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
private checkConnectedOrThrow() {
|
|
|
|
if (this.status !== Status.Connected) {
|
|
|
|
throw Error(`connection is in ${this.status} state`)
|
|
|
|
}
|
|
|
|
}
|
|
|
|
|
|
|
|
async disconnect() {
|
|
|
|
await this.node.stop();
|
|
|
|
this.status = Status.Disconnected;
|
|
|
|
}
|
|
|
|
|
|
|
|
private async sendCall(call: FunctionCall) {
|
|
|
|
let callStr = callToString(call);
|
2020-05-25 19:49:13 +03:00
|
|
|
console.log("send function call: " + JSON.stringify(JSON.parse(callStr), undefined, 2));
|
2020-05-14 15:20:39 +03:00
|
|
|
console.log(call);
|
|
|
|
|
|
|
|
// create outgoing substream
|
|
|
|
const conn = await this.node.dialProtocol(this.address, PROTOCOL_NAME) as {stream: Stream; protocol: string};
|
|
|
|
|
|
|
|
pipe(
|
|
|
|
[callStr],
|
|
|
|
// at first, make a message varint
|
|
|
|
encode(),
|
|
|
|
conn.stream.sink,
|
|
|
|
);
|
|
|
|
}
|
|
|
|
|
|
|
|
/**
|
|
|
|
* Send FunctionCall to the connected node.
|
|
|
|
*/
|
2020-08-20 20:28:32 +03:00
|
|
|
async sendFunctionCall(target: Address, args: any, moduleId?: string, fname?: string, msgId?: string, name?: string) {
|
2020-05-14 15:20:39 +03:00
|
|
|
this.checkConnectedOrThrow();
|
|
|
|
|
|
|
|
let replyTo;
|
2020-07-27 16:39:54 +03:00
|
|
|
if (msgId) replyTo = this.makeReplyTo(msgId);
|
2020-05-14 15:20:39 +03:00
|
|
|
|
2020-08-20 20:28:32 +03:00
|
|
|
let call = makeFunctionCall(genUUID(), target, this.sender, args, moduleId, fname, replyTo, name);
|
2020-05-14 15:20:39 +03:00
|
|
|
|
|
|
|
await this.sendCall(call);
|
|
|
|
}
|
|
|
|
|
2020-07-27 16:39:54 +03:00
|
|
|
async provideName(name: string) {
|
2020-06-26 16:12:37 +03:00
|
|
|
let target = createPeerAddress(this.nodePeerId.toB58String())
|
2020-07-27 16:39:54 +03:00
|
|
|
let regMsg = await makeProvideMessage(name, target, this.sender);
|
2020-05-14 15:20:39 +03:00
|
|
|
await this.sendCall(regMsg);
|
|
|
|
}
|
2020-06-19 14:29:06 +03:00
|
|
|
}
|