From f845c59df0c02e3677752564b3506f2b5b5f187c Mon Sep 17 00:00:00 2001 From: Maksym Chekh Date: Fri, 15 Sep 2023 15:32:46 +0300 Subject: [PATCH] enhance worker balancing logic --- README.md | 41 +++++++++- lib/balancer.js | 33 ++++++++ lib/message-types.js | 17 +++++ lib/priority-pool.js | 122 +++++++++++++++++++++++++++++ lib/worker.js | 65 ++++++++++++++-- noroutine.d.ts | 6 ++ noroutine.js | 160 +++++++++++++++++++++++++------------- package-lock.json | 2 +- test/noroutine.js | 9 +++ test/priority-pool.js | 161 +++++++++++++++++++++++++++++++++++++++ types/priority-pool.d.ts | 21 +++++ 11 files changed, 574 insertions(+), 63 deletions(-) create mode 100644 lib/balancer.js create mode 100644 lib/message-types.js create mode 100644 lib/priority-pool.js create mode 100644 test/priority-pool.js create mode 100644 types/priority-pool.d.ts diff --git a/README.md b/README.md index 4dd1a5d..03f694b 100644 --- a/README.md +++ b/README.md @@ -36,7 +36,46 @@ noroutine.init({ pool: 5, // number of workers in thread pool wait: 2000, // maximum delay to wait for a free thread timeout: 5000, // maximum timeout for executing a functions - monitoring: 5000, // event loop utilization monitoring interval + monitoring: 5000, // balancer monitoring interval + balancerFactory: customBalancerFactory, // balancer factory +}); +``` + +### Balancer Factory + +`balancerFactory` field is optional and serves as a factory for the balancer function. By default, the balancing strategy relies on event loop utilization. However, this default behavior can be extended or modified by specifying a custom balancer factory. + +The balancer factory is executed once during the initialization process, and it takes the worker priority pool as a parameter. + +The outcome of the balancer factory's execution is the balancer function itself. This function will be executed automatically at regular monitoring intervals, implementing the chosen balancing strategy as defined by the balancer factory. + +Example (auto scaling balancer): + +```js +noroutine.init({ + modules: [module1, module2], + pool: 8, + monitoring: 5000, + balancerFactory: (pool) => { + const defaultBalancer = noroutine.defaultBalancerFactory(pool); + const minCapacity = 1; + const maxCapacity = pool.getCapacity(); + return () => { + const currentCapacity = pool.getCapacity(); + let minPriority = Infinity; + let maxPriority = -Infinity; + defaultBalancer(); + for (const [, priority] of pool) { + minPriority = Math.min(minPriority, priority); + maxPriority = Math.max(maxPriority, priority); + } + if (1 / minPriority > 0.9) { + pool.setCapacity(Math.min(maxCapacity, currentCapacity + 1)); + } else if (1 / maxPriority < 0.1) { + pool.setCapacity(Math.max(minCapacity, currentCapacity - 1)); + } + }; + }, }); ``` diff --git a/lib/balancer.js b/lib/balancer.js new file mode 100644 index 0000000..68c5628 --- /dev/null +++ b/lib/balancer.js @@ -0,0 +1,33 @@ +'use strict'; +class Balancer { + constructor(pool) { + this.metrics = new WeakMap(); + this.pool = pool; + } + + monitoring() { + for (const [worker] of this.pool) { + const metrics = this.getMetrics(worker); + const current = worker.performance.eventLoopUtilization(); + const delta = worker.performance.eventLoopUtilization( + current, + metrics.elu, + ); + metrics.elu = current; + const priority = 1 / Math.abs(delta.utilization); + this.pool.setPriority(worker, priority); + } + } + + getMetrics(worker) { + return ( + this.metrics.get(worker) || + (this.metrics.set(worker, { + elu: worker.performance.eventLoopUtilization(), + }), + this.getMetrics(worker)) + ); + } +} + +module.exports = { Balancer }; diff --git a/lib/message-types.js b/lib/message-types.js new file mode 100644 index 0000000..c5b382b --- /dev/null +++ b/lib/message-types.js @@ -0,0 +1,17 @@ +'use strict'; + +const WorkerMessageType = { + RESULT: 'result', + AVAILABILITY: 'raincheck', + STOP: 'stopped', +}; + +const ParentMessageType = { + EXECUTE: 'call', + STOP: 'stop', +}; + +module.exports = { + WorkerMessageType, + ParentMessageType, +}; diff --git a/lib/priority-pool.js b/lib/priority-pool.js new file mode 100644 index 0000000..95ad28c --- /dev/null +++ b/lib/priority-pool.js @@ -0,0 +1,122 @@ +'use strict'; +class PriorityPool { + constructor({ factory, destructor = (x) => x, capacity }) { + this.capacity = capacity; + this.factory = factory; + this.destructor = destructor; + this.available = new Set(); + this.pool = new Map(); + this.waitQueue = []; + } + + static from(iterable) { + const pool = new PriorityPool({ + capacity: Infinity, + }); + for (const value of iterable) { + pool.add(value, 0); + } + return pool; + } + + get waitQueueSize() { + return this.waitQueue.length; + } + + get size() { + return this.pool.size; + } + + getCandidate() { + let priority = -Infinity; + let candidate = null; + for (const item of this.available) { + const itemPriority = this.pool.get(item); + if (priority < itemPriority) { + priority = itemPriority; + candidate = item; + } + } + return candidate; + } + + async capture() { + const item = this.getCandidate(); + if (item) return this.available.delete(item) && item; + if (this.pool.size < this.capacity && this.factory) { + return this.createItem(); + } + return new Promise((resolve) => { + this.waitQueue.push(resolve); + }); + } + + release(item) { + if (!this.pool.has(item)) return false; + if (this.capacity < this.pool.size) { + this.delete(item); + return true; + } + const resolve = this.waitQueue.shift(); + if (resolve) resolve(item); + else this.available.add(item); + return true; + } + + setPriority(item, priority) { + if (!this.pool.has(item)) return false; + this.pool.set(item, priority); + return true; + } + + add(item, priority) { + if (this.capacity === this.pool.size) return false; + this.pool.set(item, priority); + this.available.add(item); + return true; + } + + delete(item) { + this.available.delete(item); + if (this.pool.delete(item)) return this?.destructor(item); + return void 0; + } + + getCapacity() { + return this.capacity; + } + + async setCapacity(capacity) { + for (let i = 0; i < this.pool.size - capacity; i++) { + const item = this.getCandidate(); + this.delete(item); + } + this.capacity = capacity; + return this; + } + + flash() { + const result = []; + for (const [item] of this.pool) { + this.available.delete(item); + this.pool.delete(item); + result.push(this?.destructor(item)); + } + return result; + } + + entries() { + return this.pull.entries(); + } + + [Symbol.iterator]() { + return this.pool.entries(); + } + + createItem() { + const item = this.factory(); + this.pool.set(item, 0); + return item; + } +} +module.exports = { PriorityPool }; diff --git a/lib/worker.js b/lib/worker.js index 37dcea4..4cca922 100644 --- a/lib/worker.js +++ b/lib/worker.js @@ -1,17 +1,70 @@ 'use strict'; const { parentPort, workerData } = require('worker_threads'); - +const { WorkerMessageType, ParentMessageType } = require('./message-types'); const names = workerData.modules; const target = names.reduce((o, name) => ({ ...o, ...require(name) }), {}); -parentPort.on('message', (message) => { +const worker = { + tasks: [], + handler: null, +}; + +const execute = (message) => { const method = target[message.method]; - method(...message.args) + const availabilityMessage = setImmediate(() => { + parentPort.postMessage({ + type: WorkerMessageType.AVAILABILITY, + }); + }); + const task = Promise.resolve(method(...message.args)) .then((result) => { - parentPort.postMessage({ id: message.id, result }); + parentPort.postMessage({ + type: WorkerMessageType.RESULT, + id: message.id, + result, + }); }) .catch((error) => { - parentPort.postMessage({ id: message.id, error }); + parentPort.postMessage({ + type: WorkerMessageType.RESULT, + id: message.id, + error, + }); + }) + .finally(() => { + clearImmediate(availabilityMessage); }); -}); + worker.tasks.push(task); +}; + +const stop = (message) => { + parentPort.off('message', worker.handler); + Promise.allSettled(worker.tasks).then((data) => + parentPort.postMessage({ + id: message.id, + type: WorkerMessageType.STOP, + data, + }), + ); +}; + +const messageHandler = (message) => { + switch (message.type) { + case ParentMessageType.EXECUTE: + execute(message); + break; + case ParentMessageType.STOP: + stop(message); + break; + default: + throw new Error(`Unknown parent message type: ${message.type}`); + } +}; + +const start = () => { + worker.handler = messageHandler; + parentPort.on('message', worker.handler); +}; + +start(); diff --git a/noroutine.d.ts b/noroutine.d.ts index f65331a..a7f70af 100644 --- a/noroutine.d.ts +++ b/noroutine.d.ts @@ -1,10 +1,16 @@ +import { PriorityPool } from './types/priority-pool'; + +export type BalancerFactory = (pool: PriorityPool) => Function; + export interface NoroutineOptions { modules: object[]; pool?: number; wait?: number; timeout?: number; monitoring?: number; + balancerFactory?: BalancerFactory; } export function init(options: NoroutineOptions): void; export function finalize(): Promise; +export const defaultBalancerFactory: BalancerFactory; diff --git a/noroutine.js b/noroutine.js index 7648301..645a1ba 100644 --- a/noroutine.js +++ b/noroutine.js @@ -1,6 +1,9 @@ 'use strict'; const { Worker } = require('worker_threads'); +const { PriorityPool } = require('./lib/priority-pool'); +const { Balancer } = require('./lib/balancer'); +const { WorkerMessageType, ParentMessageType } = require('./lib/message-types'); const path = require('path'); const STATUS_NOT_INITIALIZED = 0; @@ -15,63 +18,90 @@ const DEFAULT_POOL_SIZE = 5; const DEFAULT_THREAD_WAIT = 2000; const DEFAULT_TIMEOUT = 5000; const DEFAULT_MON_INTERVAL = 5000; +const DEFAULT_BALANCER_FACTORY = (pool) => { + const balancer = new Balancer(pool); + return balancer.monitoring.bind(balancer); +}; const OPTIONS_INT = ['pool', 'wait', 'timeout', 'monitoring']; -const balancer = { +const planner = { options: null, - pool: [], + pool: null, modules: null, status: STATUS_NOT_INITIALIZED, timer: null, - elu: [], - current: null, id: 1, tasks: new Map(), targets: null, }; -const monitoring = () => { - let utilization = 1; - let index = 0; - for (let i = 0; i < balancer.options.pool; i++) { - const worker = balancer.pool[i]; - const prev = balancer.elu[i]; - const current = worker.performance.eventLoopUtilization(); - const delta = worker.performance.eventLoopUtilization(current, prev); - if (delta.utilization < utilization) { - index = i; - utilization = delta.utilization; - } - balancer.elu[i] = current; - } - balancer.current = balancer.pool[index]; +const captureWorker = async () => { + const workerPromise = planner.pool.capture(); + return new Promise((resolve, reject) => { + const timeout = setTimeout(() => { + workerPromise.then((worker) => planner.pool.release(worker)); + reject(new Error('Tread wait timeout')); + }, planner.options.wait); + workerPromise.then((worker) => { + clearTimeout(timeout); + resolve(worker); + }); + }); +}; + +const stopWorker = (worker) => { + const id = planner.id++; + return new Promise((resolve) => { + planner.tasks.set(id, { resolve }); + worker.postMessage({ + type: ParentMessageType.STOP, + id, + }); + }); +}; + +const stopResult = ({ id, data }) => { + const task = planner.tasks.get(id); + planner.tasks.delete(id); + task.resolve(data); }; const invoke = async (method, args) => { - const id = balancer.id++; + const id = planner.id++; + const current = await captureWorker(); return new Promise((resolve, reject) => { const timer = setTimeout(() => { reject(new Error(`Timeout execution for method '${method}'`)); - }, balancer.options.timeout); - balancer.tasks.set(id, { resolve, reject, timer }); - balancer.current.postMessage({ id, method, args }); + }, planner.options.timeout); + planner.tasks.set(id, { resolve, reject, timer }); + current.postMessage({ type: ParentMessageType.EXECUTE, id, method, args }); }); }; const workerResults = ({ id, error, result }) => { - const task = balancer.tasks.get(id); + const task = planner.tasks.get(id); clearTimeout(task.timer); - balancer.tasks.delete(id); + planner.tasks.delete(id); if (error) task.reject(error); else task.resolve(result); }; -const register = (worker) => { - balancer.pool.push(worker); - const elu = worker.performance.eventLoopUtilization(); - balancer.elu.push(elu); - worker.on('message', workerResults); +const handleMessage = (worker, message) => { + switch (message.type) { + case WorkerMessageType.AVAILABILITY: + planner.pool.release(worker); + break; + case WorkerMessageType.RESULT: + workerResults(message); + planner.pool.release(worker); + break; + case WorkerMessageType.STOP: + stopResult(message); + break; + default: + throw new Error(`Unknown worker message type: ${message.type}`); + } }; const findModule = (module) => { @@ -89,55 +119,75 @@ const wrapModule = (module) => { } }; -const init = (options) => { - if (balancer.status !== STATUS_NOT_INITIALIZED) { - throw new Error('Can not initialize noroutine more than once'); - } - balancer.status = STATUS_INITIALIZATION; +const initPull = (workerData) => + new PriorityPool({ + factory: () => { + const worker = new Worker(WORKER_PATH, { workerData }); + worker.on('message', (message) => { + handleMessage(worker, message); + }); + return worker; + }, + destructor: async (worker) => { + if (planner.status !== STATUS_FINALIZATION) await stopWorker(worker); + return worker.terminate(); + }, + capacity: planner.options.pool, + }); + +const validateOptions = (options) => { for (const module of options.modules) { if (typeof module !== 'object') { throw new Error('Module should export an interface'); } } - balancer.options = { + const resultOptions = { modules: options.modules, pool: options.pool || DEFAULT_POOL_SIZE, wait: options.wait || DEFAULT_THREAD_WAIT, timeout: options.timeout || DEFAULT_TIMEOUT, monitoring: options.monitoring || DEFAULT_MON_INTERVAL, + balancerFactory: options.balancerFactory || DEFAULT_BALANCER_FACTORY, }; for (const key of OPTIONS_INT) { - const value = balancer.options[key]; + const value = resultOptions[key]; if (!Number.isInteger(value)) { throw new Error(`Norutine.init: options.${key} should be integer`); } } - balancer.targets = options.modules.map(findModule); + return resultOptions; +}; + +const init = (options) => { + if (planner.status !== STATUS_NOT_INITIALIZED) { + throw new Error('Can not initialize noroutine more than once'); + } + planner.status = STATUS_INITIALIZATION; + planner.options = validateOptions(options); + planner.targets = options.modules.map(findModule); for (const module of options.modules) { wrapModule(module); } const workerData = { - modules: balancer.targets, - timeout: balancer.options.timeout, + modules: planner.targets, + timeout: planner.options.timeout, }; - for (let i = 0; i < balancer.options.pool; i++) { - register(new Worker(WORKER_PATH, { workerData })); - } - balancer.current = balancer.pool[0]; - balancer.timer = setInterval(monitoring, balancer.options.monitoring); - balancer.status = STATUS_INITIALIZED; + planner.pool = initPull(workerData); + const workerBalancer = planner.options.balancerFactory(planner.pool); + planner.timer = setInterval(workerBalancer, planner.options.monitoring); + planner.status = STATUS_INITIALIZED; }; const finalize = async () => { - balancer.status = STATUS_FINALIZATION; - clearInterval(balancer.timer); - const finals = []; - for (let i = 0; i < balancer.options.pool; i++) { - const worker = balancer.pool[i]; - finals.push(worker.terminate()); - } + planner.status = STATUS_FINALIZATION; + clearInterval(planner.timer); + const finals = planner.pool.flash(); await Promise.allSettled(finals); - balancer.status = STATUS_FINALIZED; + planner.status = STATUS_FINALIZED; }; -module.exports = { init, finalize }; +module.exports = { + init, + finalize, + defaultBalancerFactory: DEFAULT_BALANCER_FACTORY, +}; diff --git a/package-lock.json b/package-lock.json index c0e81cd..e543f70 100644 --- a/package-lock.json +++ b/package-lock.json @@ -21,7 +21,7 @@ "typescript": "^5.0.4" }, "engines": { - "node": "^12.22 || ^14.17 || 16 || 18 || 19 || 20" + "node": "^14.17 || 16 || 18 || 19 || 20" }, "funding": { "type": "patreon", diff --git a/test/noroutine.js b/test/noroutine.js index 8ccff9b..e922210 100644 --- a/test/noroutine.js +++ b/test/noroutine.js @@ -6,12 +6,18 @@ const noroutine = require('..'); const module1 = require('./module1.js'); const module2 = require('./module2'); +let callBalancer; +const balancerCall = new Promise((resolve) => { + callBalancer = resolve; +}); + noroutine.init({ modules: [module1, module2], pool: 5, wait: 2000, timeout: 5000, monitoring: 5000, + balancerFactory: (pool) => () => callBalancer(pool), }); metatests.test('Noroutine execute method', async (test) => { @@ -38,6 +44,9 @@ metatests.test('Noroutine execute method', async (test) => { const res5 = await module2.method4('value5'); test.strictSame(res5, { key: 'value5' }); + const pool = await balancerCall; + test.strictSame(pool.getCapacity(), 5); + test.end(); await noroutine.finalize(); }); diff --git a/test/priority-pool.js b/test/priority-pool.js new file mode 100644 index 0000000..48a106b --- /dev/null +++ b/test/priority-pool.js @@ -0,0 +1,161 @@ +'use strict'; + +const metatests = require('metatests'); +const { PriorityPool } = require('../lib/priority-pool'); + +metatests.test('PriorityPool: create from iterable', (test) => { + const sut = PriorityPool.from(['A', 'B', 'C']); + + test.assert(sut); + test.strictSame(sut.size, 3); + test.strictSame(Array.from(sut), [ + ['A', 0], + ['B', 0], + ['C', 0], + ]); + test.end(); +}); + +metatests.test('PriorityPool: add item', (test) => { + const sut = PriorityPool.from(['A', 'B', 'C']); + + sut.add('D', 1); + + test.strictSame(sut.size, 4); + test.strictSame(Array.from(sut), [ + ['A', 0], + ['B', 0], + ['C', 0], + ['D', 1], + ]); + test.end(); +}); + +metatests.test('PriorityPool: delete item', (test) => { + const sut = PriorityPool.from(['A', 'B', 'C']); + + sut.delete('A'); + + test.strictSame(sut.size, 2); + test.strictSame(Array.from(sut), [ + ['B', 0], + ['C', 0], + ]); + test.end(); +}); + +metatests.test('PriorityPool: set priority', (test) => { + const sut = PriorityPool.from(['A', 'B', 'C']); + + const existedItem = sut.setPriority('B', 1); + const notExistedItem = sut.setPriority('D', 1); + + test.strictSame(existedItem, true); + test.strictSame(notExistedItem, false); + test.strictSame(sut.size, 3); + test.strictSame(Array.from(sut), [ + ['A', 0], + ['B', 1], + ['C', 0], + ]); + test.end(); +}); + +metatests.test('PriorityPool: capture', async (test) => { + const sut = PriorityPool.from(['A', 'B', 'C']); + sut.setPriority('B', 1); + + const obj1 = await sut.capture(); + const obj2 = await sut.capture(); + const obj3 = await sut.capture(); + + test.strictSame(obj1, 'B'); + test.strictSame(obj2, 'A'); + test.strictSame(obj3, 'C'); + test.end(); +}); + +metatests.test('PriorityPool: capture wait', async (test) => { + const sut = PriorityPool.from(['A', 'B']); + + const obj1 = await sut.capture(); + const obj2 = await sut.capture(); + const promise = sut.capture().then((obj3) => { + test.strictSame(obj3, obj2); + }); + sut.release(obj2); + await promise; + test.strictSame(obj1, 'A'); + test.strictSame(obj2, 'B'); + test.end(); +}); + +metatests.test('PriorityPool: set capacity', async (test) => { + const sut = PriorityPool.from(['A', 'B', 'C']); + + const obj1 = await sut.capture(); + const obj2 = await sut.capture(); + sut.setCapacity(1); + const sizeWhenTwoCaptured = sut.size; + sut.release(obj1); + const addResult = sut.add('D'); + + test.strictSame(addResult, false); + test.strictSame(sizeWhenTwoCaptured, 2); + test.strictSame(sut.size, 1); + test.strictSame(obj1, 'A'); + test.strictSame(obj2, 'B'); + test.strictSame(Array.from(sut), [['B', 0]]); + test.end(); +}); + +metatests.test('PriorityPool: factory', async (test) => { + const elements = new Array(10).fill(0).map((_, idx) => idx); + let idx = 0; + const sut = new PriorityPool({ + capacity: 10, + factory: () => elements[idx++], + }); + + const initialPoolSize = sut.size; + for (let i = 0; i < 5; i++) { + await sut.capture(); + } + const poolSizeAfterFiveCaptured = sut.size; + for (let i = 0; i < 5; i++) { + await sut.capture(); + } + const poolElements = Array.from(sut).map(([el]) => el); + + test.strictSame(initialPoolSize, 0); + test.strictSame(poolSizeAfterFiveCaptured, 5); + test.strictSame(sut.size, 10); + test.strictSame(poolElements, elements); + test.end(); +}); + +metatests.test('PriorityPool: destructor', async (test) => { + const elements = new Array(10).fill(0).map((_, idx) => idx); + let idx = 0; + const destructorCalls = []; + const sut = new PriorityPool({ + capacity: 10, + factory: () => elements[idx++], + destructor: (el) => (destructorCalls.push(el), el), + }); + + for (let i = 0; i < 10; i++) { + await sut.capture(); + } + sut.setCapacity(5); + for (let i = 0; i < 5; i++) { + await sut.release(i); + } + const destructorCallsAfterChangingCapacity = [...destructorCalls]; + const flashResult = sut.flash(); + + test.strictSame(destructorCallsAfterChangingCapacity, elements.slice(0, 5)); + test.strictSame(flashResult, elements.slice(5)); + test.strictSame(destructorCalls, elements); + test.strictSame(sut.size, 0); +}); diff --git a/types/priority-pool.d.ts b/types/priority-pool.d.ts new file mode 100644 index 0000000..3912640 --- /dev/null +++ b/types/priority-pool.d.ts @@ -0,0 +1,21 @@ +export interface PriorityPoolConfig { + capacity: number; + factory: () => T; + destructor: (item: T) => D; +} + +export interface PriorityPool { + waitQueueSize: number; + size: number; + + [Symbol.iterator](): IterableIterator<[T, number]>; + capture(): Promise; + release(item: T): boolean; + setPriority(item: T, priority: number): boolean; + add(item: T, priority: number): boolean; + delete(item: T): D; + getCapacity(): number; + setCapacity(capacity: number): this; + flash(): D[]; + entries(): IterableIterator<[T, number]>; +}