From f5425b4746f436f84a41bae6584adb8b200ba33d Mon Sep 17 00:00:00 2001
From: Akim <59872966+akim-bow@users.noreply.github.com>
Date: Fri, 29 Dec 2023 03:33:28 +0700
Subject: [PATCH] fix: Enable async loading of all dependency resources (#408)

* Async loading

* Fix lint
---
 .../core/js-client-isomorphic/src/types.ts    |  5 ++-
 .../src/worker-resolvers/browser.ts           |  9 +++-
 .../src/worker-resolvers/node.ts              | 10 +++--
 .../core/js-client/src/jsPeer/FluencePeer.ts  |  3 +-
 packages/core/js-client/src/marine/loader.ts  | 11 +++--
 .../core/js-client/src/marine/worker/index.ts | 43 ++-----------------
 6 files changed, 27 insertions(+), 54 deletions(-)

diff --git a/packages/core/js-client-isomorphic/src/types.ts b/packages/core/js-client-isomorphic/src/types.ts
index 6e81bf27..01b84814 100644
--- a/packages/core/js-client-isomorphic/src/types.ts
+++ b/packages/core/js-client-isomorphic/src/types.ts
@@ -14,7 +14,8 @@
  * limitations under the License.
  */
 
-import { Worker } from "@fluencelabs/threads/master";
+import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker";
+import { ModuleThread } from "@fluencelabs/threads/master";
 
 import versions from "./versions.js";
 
@@ -23,7 +24,7 @@ type VersionedPackage = { name: string; version: string };
 export type GetWorkerFn = (
   pkg: FetchedPackages,
   CDNUrl: string,
-) => Promise<Worker>;
+) => Promise<ModuleThread<MarineBackgroundInterface>>;
 
 export const getVersionedPackage = (pkg: FetchedPackages): VersionedPackage => {
   return {
diff --git a/packages/core/js-client-isomorphic/src/worker-resolvers/browser.ts b/packages/core/js-client-isomorphic/src/worker-resolvers/browser.ts
index 7e2b1e71..8eddff98 100644
--- a/packages/core/js-client-isomorphic/src/worker-resolvers/browser.ts
+++ b/packages/core/js-client-isomorphic/src/worker-resolvers/browser.ts
@@ -14,7 +14,8 @@
  * limitations under the License.
  */
 
-import { BlobWorker } from "@fluencelabs/threads/master";
+import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker";
+import { BlobWorker, ModuleThread, spawn } from "@fluencelabs/threads/master";
 
 import { fetchResource } from "../fetchers/browser.js";
 import type { FetchedPackages, GetWorkerFn } from "../types.js";
@@ -34,5 +35,9 @@ export const getWorker: GetWorkerFn = async (
   };
 
   const workerCode = await fetchWorkerCode();
-  return BlobWorker.fromText(workerCode);
+
+  const workerThread: ModuleThread<MarineBackgroundInterface> =
+    await spawn<MarineBackgroundInterface>(BlobWorker.fromText(workerCode));
+
+  return workerThread;
 };
diff --git a/packages/core/js-client-isomorphic/src/worker-resolvers/node.ts b/packages/core/js-client-isomorphic/src/worker-resolvers/node.ts
index 44c49d70..09602a26 100644
--- a/packages/core/js-client-isomorphic/src/worker-resolvers/node.ts
+++ b/packages/core/js-client-isomorphic/src/worker-resolvers/node.ts
@@ -18,12 +18,13 @@ import { createRequire } from "module";
 import { dirname, relative } from "path";
 import { fileURLToPath } from "url";
 
-import { Worker } from "@fluencelabs/threads/master";
+import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker";
+import { ModuleThread, spawn, Worker } from "@fluencelabs/threads/master";
 
 import type { FetchedPackages, GetWorkerFn } from "../types.js";
 import { getVersionedPackage } from "../types.js";
 
-export const getWorker: GetWorkerFn = (pkg: FetchedPackages) => {
+export const getWorker: GetWorkerFn = async (pkg: FetchedPackages) => {
   const require = createRequire(import.meta.url);
 
   const pathToThisFile = dirname(fileURLToPath(import.meta.url));
@@ -33,5 +34,8 @@ export const getWorker: GetWorkerFn = (pkg: FetchedPackages) => {
 
   const relativePathToWorker = relative(pathToThisFile, pathToWorker);
 
-  return Promise.resolve(new Worker(relativePathToWorker));
+  const workerThread: ModuleThread<MarineBackgroundInterface> =
+    await spawn<MarineBackgroundInterface>(new Worker(relativePathToWorker));
+
+  return workerThread;
 };
diff --git a/packages/core/js-client/src/jsPeer/FluencePeer.ts b/packages/core/js-client/src/jsPeer/FluencePeer.ts
index 926788b4..a6de998e 100644
--- a/packages/core/js-client/src/jsPeer/FluencePeer.ts
+++ b/packages/core/js-client/src/jsPeer/FluencePeer.ts
@@ -356,9 +356,8 @@ export abstract class FluencePeer {
       await this.connection.sendParticle(item.result.nextPeerPks, newParticle);
       log_particle.trace("id %s. send successful", newParticle.id);
     } catch (e) {
-      log_particle.error("id %s. send failed %j", newParticle.id, e);
-
       const message = getErrorMessage(e);
+      log_particle.error("id %s. send failed %s", newParticle.id, message);
 
       item.onError(
         new SendError(
diff --git a/packages/core/js-client/src/marine/loader.ts b/packages/core/js-client/src/marine/loader.ts
index 1c64042f..678f3400 100644
--- a/packages/core/js-client/src/marine/loader.ts
+++ b/packages/core/js-client/src/marine/loader.ts
@@ -16,18 +16,19 @@
 
 import { fetchResource } from "@fluencelabs/js-client-isomorphic/fetcher";
 import { getWorker } from "@fluencelabs/js-client-isomorphic/worker-resolver";
-import { Worker } from "@fluencelabs/threads/master";
+import type { MarineBackgroundInterface } from "@fluencelabs/marine-worker";
+import type { ModuleThread } from "@fluencelabs/threads/master";
 
 type StrategyReturnType = [
   marineJsWasm: ArrayBuffer,
   avmWasm: ArrayBuffer,
-  worker: Worker,
+  worker: ModuleThread<MarineBackgroundInterface>,
 ];
 
 export const loadMarineDeps = async (
   CDNUrl: string,
 ): Promise<StrategyReturnType> => {
-  const [marineJsWasm, avmWasm] = await Promise.all([
+  const [marineJsWasm, avmWasm, worker] = await Promise.all([
     fetchResource(
       "@fluencelabs/marine-js",
       "/dist/marine-js.wasm",
@@ -38,10 +39,8 @@ export const loadMarineDeps = async (
     fetchResource("@fluencelabs/avm", "/dist/avm.wasm", CDNUrl).then((res) => {
       return res.arrayBuffer();
     }),
+    getWorker("@fluencelabs/marine-worker", CDNUrl),
   ]);
 
-  // TODO: load worker in parallel with avm and marine, test that it works
-  const worker = await getWorker("@fluencelabs/marine-worker", CDNUrl);
-
   return [marineJsWasm, avmWasm, worker];
 };
diff --git a/packages/core/js-client/src/marine/worker/index.ts b/packages/core/js-client/src/marine/worker/index.ts
index 232b3cb9..66c665cc 100644
--- a/packages/core/js-client/src/marine/worker/index.ts
+++ b/packages/core/js-client/src/marine/worker/index.ts
@@ -21,51 +21,29 @@ import type {
   JSONValueNonNullable,
   CallParameters,
 } from "@fluencelabs/marine-worker";
-import {
-  ModuleThread,
-  Thread,
-  spawn,
-  Worker,
-} from "@fluencelabs/threads/master";
+import { ModuleThread, Thread } from "@fluencelabs/threads/master";
 
 import { MarineLogger, marineLogger } from "../../util/logger.js";
 import { IMarineHost } from "../interfaces.js";
 
 export class MarineBackgroundRunner implements IMarineHost {
-  private workerThread?: ModuleThread<MarineBackgroundInterface>;
-
   private loggers = new Map<string, MarineLogger>();
 
   constructor(
     private marineJsWasm: ArrayBuffer,
     private avmWasm: ArrayBuffer,
-    private worker: Worker,
+    private workerThread: ModuleThread<MarineBackgroundInterface>,
   ) {}
 
   async hasService(serviceId: string) {
-    if (this.workerThread === undefined) {
-      throw new Error("Worker is not initialized");
-    }
-
     return this.workerThread.hasService(serviceId);
   }
 
   async removeService(serviceId: string) {
-    if (this.workerThread === undefined) {
-      throw new Error("Worker is not initialized");
-    }
-
     await this.workerThread.removeService(serviceId);
   }
 
   async start(): Promise<void> {
-    if (this.workerThread !== undefined) {
-      throw new Error("Worker thread already initialized");
-    }
-
-    const workerThread: ModuleThread<MarineBackgroundInterface> =
-      await spawn<MarineBackgroundInterface>(this.worker);
-
     const logfn: LogFunction = (message) => {
       const serviceLogger = this.loggers.get(message.service);
 
@@ -76,9 +54,8 @@ export class MarineBackgroundRunner implements IMarineHost {
       serviceLogger[message.level](message.message);
     };
 
-    workerThread.onLogMessage().subscribe(logfn);
-    await workerThread.init(this.marineJsWasm);
-    this.workerThread = workerThread;
+    this.workerThread.onLogMessage().subscribe(logfn);
+    await this.workerThread.init(this.marineJsWasm);
     await this.createService(this.avmWasm, "avm");
   }
 
@@ -86,10 +63,6 @@ export class MarineBackgroundRunner implements IMarineHost {
     serviceModule: ArrayBuffer | SharedArrayBuffer,
     serviceId: string,
   ): Promise<void> {
-    if (this.workerThread === undefined) {
-      throw new Error("Worker is not initialized");
-    }
-
     this.loggers.set(serviceId, marineLogger(serviceId));
     await this.workerThread.createService(serviceModule, serviceId);
   }
@@ -100,10 +73,6 @@ export class MarineBackgroundRunner implements IMarineHost {
     args: Array<JSONValueNonNullable> | Record<string, JSONValueNonNullable>,
     callParams?: CallParameters,
   ): Promise<JSONValue> {
-    if (this.workerThread === undefined) {
-      throw new Error("Worker is not initialized");
-    }
-
     return this.workerThread.callService(
       serviceId,
       functionName,
@@ -113,10 +82,6 @@ export class MarineBackgroundRunner implements IMarineHost {
   }
 
   async stop(): Promise<void> {
-    if (this.workerThread === undefined) {
-      return;
-    }
-
     await this.workerThread.terminate();
     await Thread.terminate(this.workerThread);
   }