fluence-js/src/fluence_connection.ts

198 lines
6.3 KiB
TypeScript
Raw Normal View History

2020-05-14 15:20:39 +03:00
/*
2020-05-14 17:30:17 +03:00
* Copyright 2020 Fluence Labs Limited
2020-05-14 15:20:39 +03:00
*
2020-05-14 17:30:17 +03:00
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
2020-05-14 15:20:39 +03:00
*
2020-05-14 17:30:17 +03:00
* http://www.apache.org/licenses/LICENSE-2.0
2020-05-14 15:20:39 +03:00
*
2020-05-14 17:30:17 +03:00
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
2020-05-14 15:20:39 +03:00
*/
import {Address} from "./address";
import {
callToString,
FunctionCall,
genUUID,
makeCall,
makeFunctionCall,
makePeerCall,
makeRegisterMessage,
makeRelayCall,
parseFunctionCall
} from "./function_call";
import * as PeerId from "peer-id";
import * as PeerInfo from "peer-info";
import Websockets from "libp2p-websockets";
import Mplex from "libp2p-mplex";
import SECIO from "libp2p-secio";
import Peer from "libp2p";
import {decode, encode} from "it-length-prefixed";
import pipe from "it-pipe";
import Multiaddr from "multiaddr";
export const PROTOCOL_NAME = '/fluence/faas/1.0.0';
enum Status {
Initializing = "Initializing",
Connected = "Connected",
Disconnected = "Disconnected"
}
export class FluenceConnection {
private readonly selfPeerInfo: PeerInfo;
2020-06-12 19:54:09 +03:00
readonly sender: Address;
2020-05-14 15:20:39 +03:00
private node: LibP2p;
private readonly address: Multiaddr;
private readonly nodePeerId: PeerId;
private readonly selfPeerId: string;
private readonly handleCall: (call: FunctionCall) => FunctionCall | undefined;
constructor(multiaddr: Multiaddr, hostPeerId: PeerId, selfPeerInfo: PeerInfo, replyToAddress: Address, handleCall: (call: FunctionCall) => FunctionCall | undefined) {
this.selfPeerInfo = selfPeerInfo;
this.handleCall = handleCall;
this.selfPeerId = selfPeerInfo.id.toB58String();
this.address = multiaddr;
this.nodePeerId = hostPeerId;
2020-06-12 19:54:09 +03:00
this.sender = replyToAddress
2020-05-14 15:20:39 +03:00
}
async connect() {
let peerInfo = this.selfPeerInfo;
this.node = await Peer.create({
peerInfo,
config: {},
modules: {
transport: [Websockets],
streamMuxer: [Mplex],
connEncryption: [SECIO],
peerDiscovery: []
},
});
await this.startReceiving();
}
isConnected() {
return this.status === Status.Connected
}
// connection status. If `Disconnected`, it cannot be reconnected
private status: Status = Status.Initializing;
/**
* Sends remote service_id call.
*/
async sendServiceCall(serviceId: string, args: any, name?: string) {
2020-06-12 19:54:09 +03:00
let regMsg = makeCall(serviceId, args, this.sender, this.sender, name);
2020-05-14 15:20:39 +03:00
await this.sendCall(regMsg);
}
/**
* Sends custom message to the peer.
*/
async sendPeerCall(peer: string, msg: any, name?: string) {
2020-06-12 19:54:09 +03:00
let regMsg = makePeerCall(PeerId.createFromB58String(peer), msg, this.sender, this.sender, name);
2020-05-14 15:20:39 +03:00
await this.sendCall(regMsg);
}
/**
* Sends custom message to the peer through relay.
*/
async sendRelayCall(peer: string, relay: string, msg: any, name?: string) {
2020-06-12 19:54:09 +03:00
let regMsg = await makeRelayCall(PeerId.createFromB58String(peer), PeerId.createFromB58String(relay), msg, this.sender, this.sender, name);
2020-05-14 15:20:39 +03:00
await this.sendCall(regMsg);
}
private async startReceiving() {
if (this.status === Status.Initializing) {
await this.node.start();
console.log("dialing to the node with address: " + this.node.peerInfo.id.toB58String());
await this.node.dial(this.address);
let _this = this;
this.node.handle([PROTOCOL_NAME], async ({connection, stream}) => {
pipe(
stream.source,
decode(),
async function (source: AsyncIterable<string>) {
for await (const msg of source) {
try {
console.log(_this.selfPeerId);
let call = parseFunctionCall(msg);
let response = _this.handleCall(call);
// send a response if it exists, do nothing otherwise
if (response) {
await _this.sendCall(response);
}
} catch(e) {
console.log("error on handling a new incoming message: " + e);
}
}
}
)
});
this.status = Status.Connected;
} else {
throw Error(`can't start receiving. Status: ${this.status}`);
}
}
private checkConnectedOrThrow() {
if (this.status !== Status.Connected) {
throw Error(`connection is in ${this.status} state`)
}
}
async disconnect() {
await this.node.stop();
this.status = Status.Disconnected;
}
private async sendCall(call: FunctionCall) {
let callStr = callToString(call);
2020-05-25 19:49:13 +03:00
console.log("send function call: " + JSON.stringify(JSON.parse(callStr), undefined, 2));
2020-05-14 15:20:39 +03:00
console.log(call);
// create outgoing substream
const conn = await this.node.dialProtocol(this.address, PROTOCOL_NAME) as {stream: Stream; protocol: string};
pipe(
[callStr],
// at first, make a message varint
encode(),
conn.stream.sink,
);
}
/**
* Send FunctionCall to the connected node.
*/
async sendFunctionCall(target: Address, args: any, reply?: boolean, name?: string) {
this.checkConnectedOrThrow();
let replyTo;
2020-06-12 19:54:09 +03:00
if (reply) replyTo = this.sender;
2020-05-14 15:20:39 +03:00
2020-06-12 19:54:09 +03:00
let call = makeFunctionCall(genUUID(), target, args, this.sender, replyTo, name);
2020-05-14 15:20:39 +03:00
await this.sendCall(call);
}
async registerService(serviceId: string) {
let regMsg = await makeRegisterMessage(serviceId, this.nodePeerId, this.selfPeerInfo.id);
await this.sendCall(regMsg);
}
}