Skip to content

Commit aa2f980

Browse files
committed
Web Locks API initial implementation
Refs: #416
1 parent f752f0d commit aa2f980

File tree

3 files changed

+106
-0
lines changed

3 files changed

+106
-0
lines changed

.eslintignore

+1
Original file line numberDiff line numberDiff line change
@@ -1 +1,2 @@
11
dist
2+
locks.js

lib/locks.js

+101
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,101 @@
1+
'use strict';
2+
3+
const { Worker, isMainThread, parentPort } = require('worker_threads');
4+
5+
const threads = new Set();
6+
7+
const LOCKED = 0;
8+
const UNLOCKED = 1;
9+
10+
const sendMessage = message => {
11+
if (isMainThread) {
12+
for (const thread of threads) {
13+
thread.worker.postMessage(message);
14+
}
15+
} else {
16+
parentPort.postMessage(message);
17+
}
18+
};
19+
20+
class Mutex {
21+
constructor(resourceName, shared, initial = false) {
22+
this.resourceName = resourceName;
23+
this.lock = new Int32Array(shared, 0, 1);
24+
if (initial) Atomics.store(this.lock, 0, UNLOCKED);
25+
this.owner = false;
26+
this.trying = false;
27+
this.callback = null;
28+
}
29+
30+
enter(callback) {
31+
this.callback = callback;
32+
this.trying = true;
33+
this.tryEnter();
34+
}
35+
36+
tryEnter() {
37+
if (!this.callback) return;
38+
const prev = Atomics.exchange(this.lock, 0, LOCKED);
39+
if (prev === UNLOCKED) {
40+
this.owner = true;
41+
this.trying = false;
42+
this.callback(this).then(() => {
43+
this.leave();
44+
});
45+
this.callback = null;
46+
}
47+
}
48+
49+
leave() {
50+
if (!this.owner) return;
51+
Atomics.store(this.lock, 0, UNLOCKED);
52+
this.owner = false;
53+
sendMessage({ kind: 'leave', resourceName: this.resourceName });
54+
}
55+
}
56+
57+
const resources = new Map();
58+
59+
const request = (resourceName, callback) => {
60+
let lock = resources.get(resourceName);
61+
if (!lock) {
62+
const buffer = new SharedArrayBuffer(4);
63+
lock = new Mutex(resourceName, buffer, true);
64+
resources.set(resourceName, lock);
65+
sendMessage({ kind: 'create', resourceName, buffer });
66+
}
67+
lock.enter(callback);
68+
return lock;
69+
};
70+
71+
const receiveMessage = message => {
72+
const { kind, resourceName, buffer } = message;
73+
if (kind === 'create') {
74+
const lock = new Mutex(resourceName, buffer);
75+
resources.set(resourceName, lock);
76+
}
77+
};
78+
79+
if (!isMainThread) {
80+
parentPort.on('message', receiveMessage);
81+
}
82+
83+
class Thread {
84+
constructor() {
85+
const worker = new Worker(__filename);
86+
this.worker = worker;
87+
threads.add(this);
88+
worker.on('message', message => {
89+
for (const thread of threads) {
90+
if (thread.worker !== worker) {
91+
thread.worker.postMessage(message);
92+
}
93+
}
94+
receiveMessage(message);
95+
});
96+
}
97+
}
98+
99+
const locks = { resources, request, sendMessage, receiveMessage, Thread };
100+
101+
module.exports = { locks };

metasync.js

+4
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,9 @@ if (nodeVerion >= 10) {
2222
submodules.push(require('./lib/async-iterator'));
2323
}
2424

25+
if (nodeVerion >= 11) {
26+
submodules.push(require('./lib/locks'));
27+
}
28+
2529
const { compose } = submodules[0];
2630
module.exports = Object.assign(compose, ...submodules);

0 commit comments

Comments
 (0)