Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enhance worker balancing logic #34

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
41 changes: 40 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,46 @@ noroutine.init({
pool: 5, // number of workers in thread pool
wait: 2000, // maximum delay to wait for a free thread
timeout: 5000, // maximum timeout for executing a functions
monitoring: 5000, // event loop utilization monitoring interval
monitoring: 5000, // balancer monitoring interval
balancerFactory: customBalancerFactory, // balancer factory
});
```

### Balancer Factory

`balancerFactory` field is optional and serves as a factory for the balancer function. By default, the balancing strategy relies on event loop utilization. However, this default behavior can be extended or modified by specifying a custom balancer factory.

The balancer factory is executed once during the initialization process, and it takes the worker priority pool as a parameter.

The outcome of the balancer factory's execution is the balancer function itself. This function will be executed automatically at regular monitoring intervals, implementing the chosen balancing strategy as defined by the balancer factory.

Example (auto scaling balancer):

```js
noroutine.init({
modules: [module1, module2],
pool: 8,
monitoring: 5000,
balancerFactory: (pool) => {
const defaultBalancer = noroutine.defaultBalancerFactory(pool);
const minCapacity = 1;
const maxCapacity = pool.getCapacity();
return () => {
const currentCapacity = pool.getCapacity();
let minPriority = Infinity;
let maxPriority = -Infinity;
defaultBalancer();
for (const [, priority] of pool) {
minPriority = Math.min(minPriority, priority);
maxPriority = Math.max(maxPriority, priority);
}
if (1 / minPriority > 0.9) {
pool.setCapacity(Math.min(maxCapacity, currentCapacity + 1));
} else if (1 / maxPriority < 0.1) {
pool.setCapacity(Math.max(minCapacity, currentCapacity - 1));
}
};
},
});
```

Expand Down
33 changes: 33 additions & 0 deletions lib/balancer.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,33 @@
'use strict';
class Balancer {
constructor(pool) {
this.metrics = new WeakMap();
this.pool = pool;
}

monitoring() {
for (const [worker] of this.pool) {
const metrics = this.getMetrics(worker);
const current = worker.performance.eventLoopUtilization();
const delta = worker.performance.eventLoopUtilization(
current,
metrics.elu,
);
metrics.elu = current;
const priority = 1 / Math.abs(delta.utilization);
this.pool.setPriority(worker, priority);
}
}

getMetrics(worker) {
return (
this.metrics.get(worker) ||
(this.metrics.set(worker, {
elu: worker.performance.eventLoopUtilization(),
}),
this.getMetrics(worker))
);
}
}

module.exports = { Balancer };
17 changes: 17 additions & 0 deletions lib/message-types.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
'use strict';

const WorkerMessageType = {
RESULT: 'result',
AVAILABILITY: 'raincheck',
STOP: 'stopped',
};

const ParentMessageType = {
EXECUTE: 'call',
STOP: 'stop',
};

module.exports = {
WorkerMessageType,
ParentMessageType,
};
122 changes: 122 additions & 0 deletions lib/priority-pool.js
Original file line number Diff line number Diff line change
@@ -0,0 +1,122 @@
'use strict';
class PriorityPool {
constructor({ factory, destructor = (x) => x, capacity }) {
this.capacity = capacity;
this.factory = factory;
this.destructor = destructor;
this.available = new Set();
this.pool = new Map();
this.waitQueue = [];
}

static from(iterable) {
const pool = new PriorityPool({
capacity: Infinity,
});
for (const value of iterable) {
pool.add(value, 0);
}
return pool;
}

get waitQueueSize() {
return this.waitQueue.length;
}

get size() {
return this.pool.size;
}

getCandidate() {
let priority = -Infinity;
let candidate = null;
for (const item of this.available) {
const itemPriority = this.pool.get(item);
if (priority < itemPriority) {
priority = itemPriority;
candidate = item;
}
}
return candidate;
}

async capture() {
const item = this.getCandidate();
if (item) return this.available.delete(item) && item;
if (this.pool.size < this.capacity && this.factory) {
return this.createItem();
}
return new Promise((resolve) => {
this.waitQueue.push(resolve);
});
}

release(item) {
if (!this.pool.has(item)) return false;
if (this.capacity < this.pool.size) {
this.delete(item);
return true;
}
const resolve = this.waitQueue.shift();
if (resolve) resolve(item);
else this.available.add(item);
return true;
}

setPriority(item, priority) {
if (!this.pool.has(item)) return false;
this.pool.set(item, priority);
return true;
}

add(item, priority) {
if (this.capacity === this.pool.size) return false;
this.pool.set(item, priority);
this.available.add(item);
return true;
}

delete(item) {
this.available.delete(item);
if (this.pool.delete(item)) return this?.destructor(item);
return void 0;
}

getCapacity() {
return this.capacity;
}

async setCapacity(capacity) {
for (let i = 0; i < this.pool.size - capacity; i++) {
const item = this.getCandidate();
this.delete(item);
}
this.capacity = capacity;
return this;
}

flash() {
const result = [];
for (const [item] of this.pool) {
this.available.delete(item);
this.pool.delete(item);
result.push(this?.destructor(item));
}
return result;
}

entries() {
return this.pull.entries();
}

[Symbol.iterator]() {
return this.pool.entries();
}

createItem() {
const item = this.factory();
this.pool.set(item, 0);
return item;
}
}
module.exports = { PriorityPool };
65 changes: 59 additions & 6 deletions lib/worker.js
Original file line number Diff line number Diff line change
@@ -1,17 +1,70 @@
'use strict';

const { parentPort, workerData } = require('worker_threads');

const { WorkerMessageType, ParentMessageType } = require('./message-types');
const names = workerData.modules;
const target = names.reduce((o, name) => ({ ...o, ...require(name) }), {});

parentPort.on('message', (message) => {
const worker = {
tasks: [],
handler: null,
};

const execute = (message) => {
const method = target[message.method];
method(...message.args)
const availabilityMessage = setImmediate(() => {
parentPort.postMessage({
type: WorkerMessageType.AVAILABILITY,
});
});
const task = Promise.resolve(method(...message.args))
.then((result) => {
parentPort.postMessage({ id: message.id, result });
parentPort.postMessage({
type: WorkerMessageType.RESULT,
id: message.id,
result,
});
})
.catch((error) => {
parentPort.postMessage({ id: message.id, error });
parentPort.postMessage({
type: WorkerMessageType.RESULT,
id: message.id,
error,
});
})
.finally(() => {
clearImmediate(availabilityMessage);
});
});
worker.tasks.push(task);
};

const stop = (message) => {
parentPort.off('message', worker.handler);
Promise.allSettled(worker.tasks).then((data) =>
parentPort.postMessage({
id: message.id,
type: WorkerMessageType.STOP,
data,
}),
);
};

const messageHandler = (message) => {
switch (message.type) {
case ParentMessageType.EXECUTE:
execute(message);
break;
case ParentMessageType.STOP:
stop(message);
break;
default:
throw new Error(`Unknown parent message type: ${message.type}`);
}
};

const start = () => {
worker.handler = messageHandler;
parentPort.on('message', worker.handler);
};

start();
6 changes: 6 additions & 0 deletions noroutine.d.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,16 @@
import { PriorityPool } from './types/priority-pool';

export type BalancerFactory = (pool: PriorityPool<Worker, number>) => Function;

export interface NoroutineOptions {
modules: object[];
pool?: number;
wait?: number;
timeout?: number;
monitoring?: number;
balancerFactory?: BalancerFactory;
}

export function init(options: NoroutineOptions): void;
export function finalize(): Promise<void>;
export const defaultBalancerFactory: BalancerFactory;
Loading