Skip to content

Commit 23d1d44

Browse files
committed
Web Locks API initial implementation
Refs: #416
1 parent 0274e8f commit 23d1d44

File tree

3 files changed

+202
-2
lines changed

3 files changed

+202
-2
lines changed

.eslintignore

+1-2
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,2 @@
11
dist
2-
coverage
3-
.nyc_output
2+
locks.js

lib/locks.js

+197
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,197 @@
1+
'use strict';
2+
3+
const { EventEmitter } = require('events');
4+
const { Worker, isMainThread, parentPort } = require('worker_threads');
5+
6+
const threads = new Set();
7+
8+
const LOCKED = 0;
9+
const UNLOCKED = 1;
10+
11+
const sendMessage = message => {
12+
if (isMainThread) {
13+
for (const thread of threads) {
14+
thread.worker.postMessage(message);
15+
}
16+
} else {
17+
parentPort.postMessage(message);
18+
}
19+
};
20+
21+
class Lock {
22+
constructor(name, mode = 'exclusive', buffer = null) {
23+
this.name = name;
24+
this.mode = mode; // 'exclusive' or 'shared'
25+
this.queue = [];
26+
this.owner = false;
27+
this.trying = false;
28+
const initial = !buffer;
29+
this.buffer = initial ? new SharedArrayBuffer(4) : buffer;
30+
this.flag = new Int32Array(this.buffer, 0, 1);
31+
if (initial) Atomics.store(this.flag, 0, UNLOCKED);
32+
}
33+
34+
enter(lock) {
35+
this.queue.push(lock);
36+
this.trying = true;
37+
return this.tryEnter();
38+
}
39+
40+
tryEnter() {
41+
if (this.queue.length === 0) return undefined;
42+
const prev = Atomics.exchange(this.flag, 0, LOCKED);
43+
if (prev === LOCKED) return undefined;
44+
this.owner = true;
45+
this.trying = false;
46+
const lock = this.queue.shift();
47+
return lock.callback(lock).then(() => {
48+
this.leave();
49+
});
50+
}
51+
52+
leave() {
53+
if (!this.owner) return;
54+
Atomics.store(this.flag, 0, UNLOCKED);
55+
this.owner = false;
56+
sendMessage({ kind: 'leave', name: this.name });
57+
this.tryEnter();
58+
}
59+
}
60+
61+
class LockManagerSnapshot {
62+
constructor(resources) {
63+
const held = [];
64+
const pending = [];
65+
this.held = held;
66+
this.pending = pending;
67+
68+
for (const lock of resources) {
69+
if (lock.queue.length > 0) {
70+
pending.push(...lock.queue);
71+
}
72+
if (lock.owner) {
73+
held.push(lock);
74+
}
75+
}
76+
}
77+
}
78+
79+
class LockManager {
80+
constructor() {
81+
this.collection = new Map();
82+
}
83+
84+
async request(name, options, callback) {
85+
if (typeof options === 'function') {
86+
callback = options;
87+
options = {};
88+
}
89+
const { mode = 'exclusive', signal = null } = options;
90+
91+
let lock = this.collection.get(name);
92+
if (lock) {
93+
if (mode === 'exclusive') {
94+
return new Promise(resolve => {
95+
lock.queue.push([callback, resolve]);
96+
});
97+
}
98+
} else {
99+
lock = new Lock(name, mode);
100+
this.collection.set(name, lock);
101+
const { buffer } = lock;
102+
sendMessage({ kind: 'create', name, mode, buffer });
103+
}
104+
105+
const finished = callback(lock);
106+
let aborted = null;
107+
if (signal) {
108+
aborted = new Promise((resolve, reject) => {
109+
signal.on('abort', reject);
110+
});
111+
await Promise.race([finished, aborted]);
112+
} else {
113+
await finished;
114+
}
115+
116+
let next = lock.queue.pop();
117+
while (next) {
118+
const [handler, resolve] = next;
119+
await handler(lock);
120+
resolve();
121+
next = lock.queue.pop();
122+
}
123+
this.collection.delete(name);
124+
return undefined;
125+
}
126+
127+
query() {
128+
const snapshot = new LockManagerSnapshot();
129+
return Promise.resolve(snapshot);
130+
}
131+
}
132+
133+
class AbortError extends Error {
134+
constructor(message) {
135+
super(message);
136+
this.name = 'AbortError';
137+
}
138+
}
139+
140+
class AbortSignal extends EventEmitter {
141+
constructor() {
142+
super();
143+
this.aborted = false;
144+
this.on('abort', () => {
145+
this.aborted = true;
146+
});
147+
}
148+
}
149+
150+
class AbortController {
151+
constructor() {
152+
this.signal = new AbortSignal();
153+
}
154+
155+
abort() {
156+
const error = new AbortError('The request was aborted');
157+
this.signal.emit('abort', error);
158+
}
159+
}
160+
161+
const locks = new LockManager();
162+
163+
const receiveMessage = message => {
164+
const { kind, name, mode, buffer } = message;
165+
if (kind === 'create') {
166+
const lock = new Lock(name, mode, buffer);
167+
locks.collection.set(name, lock);
168+
} else if (kind === 'leave') {
169+
for (const lock of locks.collection) {
170+
if (lock.name === name && lock.trying) {
171+
lock.tryEnter();
172+
}
173+
}
174+
}
175+
};
176+
177+
if (!isMainThread) {
178+
parentPort.on('message', receiveMessage);
179+
}
180+
181+
class Thread {
182+
constructor(filename, options) {
183+
const worker = new Worker(filename, options);
184+
this.worker = worker;
185+
threads.add(this);
186+
worker.on('message', message => {
187+
for (const thread of threads) {
188+
if (thread.worker !== worker) {
189+
thread.worker.postMessage(message);
190+
}
191+
}
192+
receiveMessage(message);
193+
});
194+
}
195+
}
196+
197+
module.exports = { locks, Thread, AbortController };

metasync.js

+4
Original file line numberDiff line numberDiff line change
@@ -21,5 +21,9 @@ if (nodeVerion >= 10) {
2121
submodules.push(require('./lib/async-iterator'));
2222
}
2323

24+
if (nodeVerion >= 11) {
25+
submodules.push(require('./lib/locks'));
26+
}
27+
2428
const { compose } = submodules[0];
2529
module.exports = Object.assign(compose, ...submodules);

0 commit comments

Comments
 (0)