From 98fb09d3baeecea83218e47b6dd9f71e2b57fb7b Mon Sep 17 00:00:00 2001
From: OJ Kwon <kwon.ohjoong@gmail.com>
Date: Mon, 2 Dec 2019 21:32:22 -0800
Subject: [PATCH] feat(threadpool): implement worker_threads threadpool

---
 package-lock.json                 |  19 ++-
 package.json                      |   6 +-
 src/ThreadPool.js                 | 241 ++++++++++++++++++++++++++++++
 src/index.js                      |   3 +
 src/messagePortTransferHandler.js |  26 ++++
 src/workerEntryPoint.js           | 122 +++++++++++++++
 6 files changed, 407 insertions(+), 10 deletions(-)
 create mode 100644 src/ThreadPool.js
 create mode 100644 src/messagePortTransferHandler.js
 create mode 100644 src/workerEntryPoint.js

diff --git a/package-lock.json b/package-lock.json
index c69ab55..0eca2d0 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -2222,6 +2222,11 @@
         "delayed-stream": "~1.0.0"
       }
     },
+    "comlink": {
+      "version": "4.1.0",
+      "resolved": "https://registry.npmjs.org/comlink/-/comlink-4.1.0.tgz",
+      "integrity": "sha512-YU5KVol4zfJsCyl5F4ea2+EfIntsUaHdudxcC58H59gF7j6retT4C/EAY/zjReQKSFAOxjMYW2OeoL39EDsjBw=="
+    },
     "commander": {
       "version": "2.11.0",
       "resolved": "https://registry.npmjs.org/commander/-/commander-2.11.0.tgz",
@@ -8328,7 +8333,7 @@
         },
         "semver": {
           "version": "5.3.0",
-          "resolved": "http://registry.npmjs.org/semver/-/semver-5.3.0.tgz",
+          "resolved": "https://registry.npmjs.org/semver/-/semver-5.3.0.tgz",
           "integrity": "sha1-myzl094C0XxgEq0yaqa00M9U+U8=",
           "dev": true
         },
@@ -10330,10 +10335,9 @@
       }
     },
     "rxjs": {
-      "version": "6.3.3",
-      "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-6.3.3.tgz",
-      "integrity": "sha512-JTWmoY9tWCs7zvIk/CvRjhjGaOd+OVBM987mxFo+OW66cGpdKjZcpmc74ES1sB//7Kl/PAe8+wEakuhG4pcgOw==",
-      "dev": true,
+      "version": "6.5.3",
+      "resolved": "https://registry.npmjs.org/rxjs/-/rxjs-6.5.3.tgz",
+      "integrity": "sha512-wuYsAYYFdWTAnAaPoKGNhfpWwKZbJW+HgAJ+mImp+Epl7BG8oNWBCTyRM8gba9k4lk8BgWdoYm21Mo/RYhhbgA==",
       "requires": {
         "tslib": "^1.9.0"
       }
@@ -10785,7 +10789,7 @@
       "dependencies": {
         "source-map": {
           "version": "0.4.4",
-          "resolved": "http://registry.npmjs.org/source-map/-/source-map-0.4.4.tgz",
+          "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.4.4.tgz",
           "integrity": "sha1-66T12pwNyZneaAMti092FzZSA2s=",
           "dev": true,
           "requires": {
@@ -12100,8 +12104,7 @@
     "tslib": {
       "version": "1.9.3",
       "resolved": "https://registry.npmjs.org/tslib/-/tslib-1.9.3.tgz",
-      "integrity": "sha512-4krF8scpejhaOgqzBEcGM7yDIEfi0/8+8zDRZhNZZ2kjmHJ4hv3zCbQWxoJGz1iw5U0Jl0nma13xzHXcncMavQ==",
-      "dev": true
+      "integrity": "sha512-4krF8scpejhaOgqzBEcGM7yDIEfi0/8+8zDRZhNZZ2kjmHJ4hv3zCbQWxoJGz1iw5U0Jl0nma13xzHXcncMavQ=="
     },
     "tty-browserify": {
       "version": "0.0.0",
diff --git a/package.json b/package.json
index 920fbd3..8509bc9 100644
--- a/package.json
+++ b/package.json
@@ -28,9 +28,11 @@
     "webpack-defaults": "webpack-defaults"
   },
   "dependencies": {
-    "neo-async": "^2.6.0",
+    "comlink": "^4.1.0",
     "loader-runner": "^2.3.1",
-    "loader-utils": "^1.1.0"
+    "loader-utils": "^1.1.0",
+    "neo-async": "^2.6.0",
+    "rxjs": "^6.5.3"
   },
   "devDependencies": {
     "babel-cli": "^6.26.0",
diff --git a/src/ThreadPool.js b/src/ThreadPool.js
new file mode 100644
index 0000000..001e39d
--- /dev/null
+++ b/src/ThreadPool.js
@@ -0,0 +1,241 @@
+import * as path from 'path';
+import { Worker } from 'worker_threads';
+import { wrap, proxy } from 'comlink';
+import { Subject, asapScheduler, zip } from 'rxjs';
+import { filter, map, mergeMap, observeOn, takeUntil } from 'rxjs/operators';
+
+const nodeEndpoint = require('comlink/dist/umd/node-adapter');
+
+/**
+ * Create worker object wraps native threads with proxy interfaces.
+ */
+const createWorker = () => {
+  const worker = new Worker(path.resolve(__dirname, './workerEntryPoint.js')});
+  const workerProxy = wrap(nodeEndpoint(worker));
+  worker.unref();
+
+  let disposed = false;
+
+  return {
+    disposed,
+    workerProxy,
+    close: () => {
+      disposed = true;
+      return new Promise((resolve) => {
+        worker.once('exit', () => {
+          workerProxy[releaseProxy]();
+          resolve();
+        });
+
+        workerProxy.close();
+      });
+    },
+  };
+};
+
+/**
+ * Create comlink proxy-wrapped transferrable object from given
+ * worker data context.
+ *
+ * Each loader's data context includes Webpack::loader::LoaderContext
+ * have various functions. This marshall splits object between POJO to functions,
+ * then wrap all functions into comlink proxy to avoid cloning attempt.
+ *
+ * In marshalled object, non-proxied (POJO) contains all keys of proxied fn property
+ * as iterating & gettings keys to proxy object is bit tricky.
+ *
+ * Note `workerEntryPoint` have additional handling for some edge cases as well.
+ */
+const marshallWorkerDataContext = context =>
+  Object.entries(context).reduce(
+    (acc, [key, value]) => {
+      if (typeof value === 'function') {
+        acc[1][key] = proxy(value);
+        acc[0].proxyFnKeys.push(key);
+      } else {
+        acc[0][key] = value;
+      }
+
+      return acc;
+    },
+    [{ proxyFnKeys: [] }, {}],
+  );
+
+export default class ThreadPool {
+  constructor(maxWorkers) {
+    this.maxWorkers = maxWorkers;
+    this.taskCount = 0;
+    this.timeoutId = null;
+
+    this.taskQueue = new Subject();
+    this.workerQueue = new Subject();
+    this.disposeAwaiter = new Subject();
+
+    this.workerPool = [...new Array(maxWorkers)].map(() => createWorker());
+
+    this.poolSubscription = this.startPoolScheduler();
+  }
+
+  /**
+   * Ask all threads to exit once queued task completes.
+   */
+  async closeWorkers() {
+    let worker = this.workerPool.shift();
+    while (worker) {
+      if (!worker.disposed) {
+        await worker.close();
+      }
+      worker = this.workerPool.shift();
+    }
+  }
+
+  /**
+   * Try to exit existing workers when there's no task scheduled within timeout (2sec)
+   * If there is any running task when timeout reaches extend timeout to next timeout tick.
+   */
+  async scheduleTimeout() {
+    if (this.timeoutId) {
+      clearTimeout(this.timeoutId);
+      this.timeoutId = null;
+    }
+
+    this.timeoutId = setTimeout(async () => {
+      if (this.taskCount === 0) {
+        await this.closeWorkers();
+      } else {
+        this.scheduleTimeout();
+      }
+    }, 2000);
+  }
+
+  /**
+  * Run task via worker, raises timeoutError if worker does not respond in timeout period (10sec).
+  * Most cases this happens when task is scheduled into disposed worker which released complink proxy already.
+  */
+  tryRunTaskWithTimeout(
+    worker,
+    id,
+    context,
+    proxyContext,
+  ) {
+    let runTaskTimeoutId = null;
+
+    return new Promise((resolve, reject) => {
+      runTaskTimeoutId = setTimeout(() => {
+        if (worker.disposed) {
+          this.workerPool.splice(this.workerPool.indexOf(worker), 1);
+        }
+        reject({ timeout: true });
+      }, 10000);
+
+      worker.workerProxy.run({ id }, context, proxyContext).then(
+        (result) => {
+          if (runTaskTimeoutId) {
+            clearTimeout(runTaskTimeoutId);
+            runTaskTimeoutId = null;
+          }
+          resolve(result);
+        },
+        (err) => {
+          if (runTaskTimeoutId) {
+            clearTimeout(runTaskTimeoutId);
+            runTaskTimeoutId = null;
+          }
+          reject(err);
+        },
+      );
+    });
+  }
+
+  /**
+   * Actual task scheduler.
+   */
+  startPoolScheduler() {
+    const sub = zip(
+      // Each time new task is scheduled, reset timeout for close worker.
+      // If this task is scheduled after timeout, it will reinstall worker threads.
+      this.taskQueue.pipe(
+        map((v) => {
+          this.scheduleTimeout();
+
+          if (this.workerPool.length < this.maxWorkers) {
+            for (let idx = this.workerPool.length; idx < this.maxWorkers; idx++) {
+              const worker = createWorker();
+              this.workerPool.push(worker);
+              this.workerQueue.next(worker);
+            }
+          }
+          return v;
+        }),
+      ),
+      this.workerQueue.pipe(
+        filter(x => !x.disposed),
+        map((v, i) => v),
+      ),
+    )
+      .pipe(
+        observeOn(asapScheduler),
+        takeUntil(this.disposeAwaiter),
+        mergeMap(async ([task, worker]) => {
+          if (!worker || worker.disposed) {
+            this.taskQueue.next(task);
+          }
+          const { context, proxyContext, id } = task;
+
+          try {
+            const result = await this.tryRunTaskWithTimeout(worker, id, context, proxyContext);
+            task.onComplete(result);
+            return { id, value: true };
+          } catch (err) {
+            if (!!err && err.timeout) {
+              this.taskQueue.next(task);
+            } else {
+              task.onError(err);
+            }
+
+            return { id, value: false };
+          }
+        }),
+      )
+      .subscribe(
+        (x) => {
+          this.taskCount--;
+
+          // Each time task completes, queue new worker to let zip operator picks up task / worker pair
+          const worker = this.workerPool[(x.id + 1) % this.maxWorkers];
+          this.workerQueue.next(worker);
+        },
+        null,
+        () => {
+          this.closeWorkers();
+        },
+      );
+
+    // Queue all workers when starting scheduler
+    this.workerPool.forEach(function initialQueue(w) { this.workerQueue.next(w); });
+
+    return sub;
+  }
+
+  warmup() {
+    /* noop */
+  }
+
+  run(data, callback) {
+    if (this.timeoutId) {
+      clearTimeout(this.timeoutId);
+      this.timeoutId = null;
+    }
+
+    ++this.taskCount;
+
+    const [normalContext, proxyContext] = marshallWorkerDataContext(context);
+    this.taskQueue.next({
+      context: normalContext,
+      // Wrap whole object into proxy again, otherwise worker will try clone
+      proxyContext: proxy(proxyContext),
+      onComplete: callback,
+      onError: callback,
+    });
+  }
+}
diff --git a/src/index.js b/src/index.js
index a4140ed..7381c07 100644
--- a/src/index.js
+++ b/src/index.js
@@ -1,5 +1,8 @@
 import loaderUtils from 'loader-utils';
 import { getPool } from './workerPools';
+import { setupTransferHandler } from './messagePortTransferHandler';
+
+setupTransferHandler();
 
 function pitch() {
   const options = loaderUtils.getOptions(this) || {};
diff --git a/src/messagePortTransferHandler.js b/src/messagePortTransferHandler.js
new file mode 100644
index 0000000..f1139c6
--- /dev/null
+++ b/src/messagePortTransferHandler.js
@@ -0,0 +1,26 @@
+import { expose, proxyMarker, transferHandlers, wrap } from 'comlink';
+import { MessageChannel } from 'worker_threads';
+
+const nodeEndpoint = require('comlink/dist/umd/node-adapter');
+
+/**
+ * Override comlink's default proxy handler to use Node endpoints
+ * https://github.com/GoogleChromeLabs/comlink/issues/313
+ */
+const setupTransferHandler = () => {
+  transferHandlers.set('proxy', {
+    canHandle: obj => obj && obj[proxyMarker],
+    serialize: (obj) => {
+      const { port1, port2 } = new MessageChannel();
+      expose(obj, nodeEndpoint(port1));
+      return [port2, [port2]];
+    },
+    deserialize: (port) => {
+      port = nodeEndpoint(port);
+      port.start();
+      return wrap(port);
+    },
+  });
+};
+
+export { setupTransferHandler };
diff --git a/src/workerEntryPoint.js b/src/workerEntryPoint.js
new file mode 100644
index 0000000..519978b
--- /dev/null
+++ b/src/workerEntryPoint.js
@@ -0,0 +1,122 @@
+import * as fs from 'fs';
+import { promisify } from 'util';
+import * as loaderRunner from 'loader-runner';
+import { Subject, from, of } from 'rxjs';
+import { catchError, map, mergeMap } from 'rxjs/operators';
+import { expose, proxy } from 'comlink';
+import { parentPort } from 'worker_threads';
+import { setupTransferHandler } from './messagePortTransferHandler';
+const nodeEndpoint = require('comlink/dist/umd/node-adapter');
+
+const asyncLoaderRunner = promisify(loaderRunner.runLoaders.bind(loaderRunner));
+
+/**
+ * Construct option object for loaderRunner.
+ */
+const buildLoaderOption = (
+  context,
+  proxyContext,
+) => {
+  // context is plain object cloned from main process
+  const options = {
+    ...context,
+    // For fs, we won't try to proxy from Webpack::loader::LoaderContext as
+    // it's complex object.
+    readResource: fs.readFile.bind(fs),
+    context: {
+      options: {
+        context: context.rootContext,
+      },
+      fs,
+      webpack: true,
+    },
+  };
+
+  // also context appends all available keys for proxied object,
+  // augument option object using it
+  context.proxyFnKeys.forEach(key => (options.context[key] = proxyContext[key]));
+
+  // Webpack::loader::LoaderContext::resolve expects callback fn as param.
+  // Same as proxied fn from main process to worker, callback fn in worker cannot be
+  // cloned into main process - we'll wrap `resolve` here to forward proxy fn
+  options.context.resolve = (resolveContext, request, callback) =>
+    proxyContext.resolve(resolveContext, request, proxy(callback));
+
+  return options;
+};
+
+
+/**
+ * Interface to allow running specified task in worker threads,
+ * exposed via comlink proxy.
+ */
+const taskRunner = (() => {
+  const workerTaskQueue = new Subject();
+  let isRunning = false;
+  let isClosed = false;
+
+  const run = async (queuedTask) => {
+    isRunning = true;
+    const { task, context, proxyContext } = queuedTask;
+
+    const loaderOptions = buildLoaderOption(context, proxyContext);
+
+    const result = await asyncLoaderRunner(loaderOptions);
+
+    isRunning = false;
+    return result;
+  };
+
+  workerTaskQueue
+    .pipe(
+      mergeMap(queuedTask =>
+        from(run(queuedTask)).pipe(
+          map((result) => { return { result, onComplete: queuedTask.onComplete }; }),
+          catchError(err => of({ err, onError: queuedTask.onError })),
+        ),
+      ),
+    )
+    .subscribe(
+      (taskResult) => {
+        const { result, err, onComplete, onError } = taskResult;
+        if (err) {
+          onError(err);
+        } else {
+          onComplete(result);
+        }
+      },
+      (e) => {
+        process.exit(-1);
+      },
+      () => {
+        process.exit(0);
+      },
+    );
+
+  return {
+    isAvailable: () => !isClosed && !isRunning,
+    close: () => {
+      isClosed = true;
+      workerTaskQueue.complete();
+    },
+    run: (
+      task,
+      context,
+      proxyContext,
+    ) =>
+      new Promise((resolve, reject) => {
+        workerTaskQueue.next({
+          task,
+          context,
+          proxyContext,
+          onComplete: resolve,
+          onError: reject,
+        });
+      }),
+  };
+})();
+
+setupTransferHandler();
+expose(taskRunner, nodeEndpoint(parentPort));
+
+export { taskRunner };