Skip to content

Commit cf55a5f

Browse files
committed
enhance worker balancing logic
1 parent 4aad6e2 commit cf55a5f

11 files changed

+575
-63
lines changed

README.md

+40-1
Original file line numberDiff line numberDiff line change
@@ -36,7 +36,46 @@ noroutine.init({
3636
pool: 5, // number of workers in thread pool
3737
wait: 2000, // maximum delay to wait for a free thread
3838
timeout: 5000, // maximum timeout for executing a functions
39-
monitoring: 5000, // event loop utilization monitoring interval
39+
monitoring: 5000, // balancer monitoring interval
40+
balancerFactory: customBalancerFactory, // balancer factory
41+
});
42+
```
43+
44+
### Balancer Factory
45+
46+
`balancerFactory` field is optional and serves as a factory for the balancer function. By default, the balancing strategy relies on event loop utilization. However, this default behavior can be extended or modified by specifying a custom balancer factory.
47+
48+
The balancer factory is executed once during the initialization process, and it takes the worker priority pool as a parameter.
49+
50+
The outcome of the balancer factory's execution is the balancer function itself. This function will be executed automatically at regular monitoring intervals, implementing the chosen balancing strategy as defined by the balancer factory.
51+
52+
Example (auto scaling balancer):
53+
54+
```js
55+
noroutine.init({
56+
modules: [module1, module2],
57+
pool: 8,
58+
monitoring: 5000,
59+
balancerFactory: (pool) => {
60+
const defaultBalancer = noroutine.defaultBalancerFactory(pool);
61+
const minCapacity = 1;
62+
const maxCapacity = pool.getCapacity();
63+
return () => {
64+
const currentCapacity = pool.getCapacity();
65+
let minPriority = Infinity;
66+
let maxPriority = -Infinity;
67+
defaultBalancer();
68+
for (const [, priority] of pool) {
69+
minPriority = Math.min(minPriority, priority);
70+
maxPriority = Math.max(maxPriority, priority);
71+
}
72+
if (1 / minPriority > 0.9) {
73+
pool.setCapacity(Math.min(maxCapacity, currentCapacity + 1));
74+
} else if (1 / maxPriority < 0.1) {
75+
pool.setCapacity(Math.max(minCapacity, currentCapacity - 1));
76+
}
77+
};
78+
},
4079
});
4180
```
4281

lib/balancer.js

+33
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,33 @@
1+
'use strict';
2+
class Balancer {
3+
constructor(pool) {
4+
this.metrics = new WeakMap();
5+
this.pool = pool;
6+
}
7+
8+
monitoring() {
9+
for (const [worker] of this.pool) {
10+
const metrics = this.getMetrics(worker);
11+
const current = worker.performance.eventLoopUtilization();
12+
const delta = worker.performance.eventLoopUtilization(
13+
current,
14+
metrics.elu,
15+
);
16+
metrics.elu = current;
17+
const priority = 1 / Math.abs(delta.utilization);
18+
this.pool.setPriority(worker, priority);
19+
}
20+
}
21+
22+
getMetrics(worker) {
23+
return (
24+
this.metrics.get(worker) ||
25+
(this.metrics.set(worker, {
26+
elu: worker.performance.eventLoopUtilization(),
27+
}),
28+
this.getMetrics(worker))
29+
);
30+
}
31+
}
32+
33+
module.exports = { Balancer };

lib/message-types.js

+17
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
'use strict';
2+
3+
const WorkerMessageType = {
4+
RESULT: 'result',
5+
AVAILABILITY: 'raincheck',
6+
STOP: 'stopped',
7+
};
8+
9+
const ParentMessageType = {
10+
EXECUTE: 'call',
11+
STOP: 'stop',
12+
};
13+
14+
module.exports = {
15+
WorkerMessageType,
16+
ParentMessageType,
17+
};

lib/priority-pool.js

+122
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,122 @@
1+
'use strict';
2+
class PriorityPool {
3+
constructor({ factory, destructor = (x) => x, capacity }) {
4+
this.capacity = capacity;
5+
this.factory = factory;
6+
this.destructor = destructor;
7+
this.available = new Set();
8+
this.pool = new Map();
9+
this.waitQueue = [];
10+
}
11+
12+
static from(iterable) {
13+
const pool = new PriorityPool({
14+
capacity: Infinity,
15+
});
16+
for (const value of iterable) {
17+
pool.add(value, 0);
18+
}
19+
return pool;
20+
}
21+
22+
get waitQueueSize() {
23+
return this.waitQueue.length;
24+
}
25+
26+
get size() {
27+
return this.pool.size;
28+
}
29+
30+
getCandidate() {
31+
let priority = -Infinity;
32+
let candidate = null;
33+
for (const item of this.available) {
34+
const itemPriority = this.pool.get(item);
35+
if (priority < itemPriority) {
36+
priority = itemPriority;
37+
candidate = item;
38+
}
39+
}
40+
return candidate;
41+
}
42+
43+
async capture() {
44+
const item = this.getCandidate();
45+
if (item) return this.available.delete(item) && item;
46+
if (this.pool.size < this.capacity && this.factory) {
47+
return this.createItem();
48+
}
49+
return new Promise((resolve) => {
50+
this.waitQueue.push(resolve);
51+
});
52+
}
53+
54+
release(item) {
55+
if (!this.pool.has(item)) return false;
56+
if (this.capacity < this.pool.size) {
57+
this.delete(item);
58+
return true;
59+
}
60+
const resolve = this.waitQueue.shift();
61+
if (resolve) resolve(item);
62+
else this.available.add(item);
63+
return true;
64+
}
65+
66+
setPriority(item, priority) {
67+
if (!this.pool.has(item)) return false;
68+
this.pool.set(item, priority);
69+
return true;
70+
}
71+
72+
add(item, priority) {
73+
if (this.capacity === this.pool.size) return false;
74+
this.pool.set(item, priority);
75+
this.available.add(item);
76+
return true;
77+
}
78+
79+
delete(item) {
80+
this.available.delete(item);
81+
if (this.pool.delete(item)) return this?.destructor(item);
82+
return void 0;
83+
}
84+
85+
getCapacity() {
86+
return this.capacity;
87+
}
88+
89+
async setCapacity(capacity) {
90+
for (let i = 0; i < this.pool.size - capacity; i++) {
91+
const item = this.getCandidate();
92+
this.delete(item);
93+
}
94+
this.capacity = capacity;
95+
return this;
96+
}
97+
98+
flash() {
99+
const result = [];
100+
for (const [item] of this.pool) {
101+
this.available.delete(item);
102+
this.pool.delete(item);
103+
result.push(this?.destructor(item));
104+
}
105+
return result;
106+
}
107+
108+
entries() {
109+
return this.pull.entries();
110+
}
111+
112+
[Symbol.iterator]() {
113+
return this.pool.entries();
114+
}
115+
116+
createItem() {
117+
const item = this.factory();
118+
this.pool.set(item, 0);
119+
return item;
120+
}
121+
}
122+
module.exports = { PriorityPool };

lib/worker.js

+59-6
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,70 @@
11
'use strict';
22

33
const { parentPort, workerData } = require('worker_threads');
4-
4+
const { WorkerMessageType, ParentMessageType } = require('./message-types');
55
const names = workerData.modules;
66
const target = names.reduce((o, name) => ({ ...o, ...require(name) }), {});
77

8-
parentPort.on('message', (message) => {
8+
const worker = {
9+
tasks: [],
10+
handler: null,
11+
};
12+
13+
const execute = (message) => {
914
const method = target[message.method];
10-
method(...message.args)
15+
const availabilityMessage = setImmediate(() => {
16+
parentPort.postMessage({
17+
type: WorkerMessageType.AVAILABILITY,
18+
});
19+
});
20+
const task = Promise.resolve(method(...message.args))
1121
.then((result) => {
12-
parentPort.postMessage({ id: message.id, result });
22+
parentPort.postMessage({
23+
type: WorkerMessageType.RESULT,
24+
id: message.id,
25+
result,
26+
});
1327
})
1428
.catch((error) => {
15-
parentPort.postMessage({ id: message.id, error });
29+
parentPort.postMessage({
30+
type: WorkerMessageType.RESULT,
31+
id: message.id,
32+
error,
33+
});
34+
})
35+
.finally(() => {
36+
clearImmediate(availabilityMessage);
1637
});
17-
});
38+
worker.tasks.push(task);
39+
};
40+
41+
const stop = (message) => {
42+
parentPort.off('message', worker.handler);
43+
Promise.allSettled(worker.tasks).then((data) =>
44+
parentPort.postMessage({
45+
id: message.id,
46+
type: WorkerMessageType.STOP,
47+
data,
48+
}),
49+
);
50+
};
51+
52+
const messageHandler = (message) => {
53+
switch (message.type) {
54+
case ParentMessageType.EXECUTE:
55+
execute(message);
56+
break;
57+
case ParentMessageType.STOP:
58+
stop(message);
59+
break;
60+
default:
61+
throw new Error(`Unknown parent message type: ${message.type}`);
62+
}
63+
};
64+
65+
const start = () => {
66+
worker.handler = messageHandler;
67+
parentPort.on('message', worker.handler);
68+
};
69+
70+
start();

noroutine.d.ts

+6
Original file line numberDiff line numberDiff line change
@@ -1,10 +1,16 @@
1+
import { PriorityPool } from './types/priority-pool';
2+
3+
export type BalancerFactory = (pool: PriorityPool<Worker, number>) => Function;
4+
15
export interface NoroutineOptions {
26
modules: object[];
37
pool?: number;
48
wait?: number;
59
timeout?: number;
610
monitoring?: number;
11+
balancerFactory?: BalancerFactory;
712
}
813

914
export function init(options: NoroutineOptions): void;
1015
export function finalize(): Promise<void>;
16+
export const defaultBalancerFactory: BalancerFactory;

0 commit comments

Comments
 (0)