Skip to content

Commit 465e50b

Browse files
committed
Implement Lock and LockManager
Refs: #416
1 parent 161590c commit 465e50b

File tree

1 file changed

+94
-33
lines changed

1 file changed

+94
-33
lines changed

lib/locks.js

+94-33
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
const { Worker, isMainThread, parentPort } = require('worker_threads');
44

55
const threads = new Set();
6+
const resources = new Map();
67

78
const LOCKED = 0;
89
const UNLOCKED = 1;
@@ -17,62 +18,95 @@ const sendMessage = message => {
1718
}
1819
};
1920

21+
class Lock {
22+
constructor(name, options, callback) {
23+
if (typeof options === 'function') {
24+
callback = options;
25+
options = {};
26+
}
27+
const { mode, ifAvailable, steal } = options;
28+
this.name = name;
29+
this.mode = mode || 'exclusive';
30+
this.ifAvailable = ifAvailable || false;
31+
this.steal = steal || false;
32+
this.callback = callback;
33+
}
34+
}
35+
2036
class Mutex {
21-
constructor(resourceName, shared, initial = false) {
22-
this.resourceName = resourceName;
23-
this.lock = new Int32Array(shared, 0, 1);
24-
if (initial) Atomics.store(this.lock, 0, UNLOCKED);
37+
constructor(resourceName, buffer, initial = false) {
38+
this.name = resourceName;
39+
this.flag = new Int32Array(buffer, 0, 1);
40+
if (initial) Atomics.store(this.flag, 0, UNLOCKED);
2541
this.owner = false;
2642
this.trying = false;
27-
this.callback = null;
43+
this.queue = [];
44+
this.current = null;
2845
}
2946

30-
enter(callback) {
31-
this.callback = callback;
47+
enter(lock) {
48+
this.queue.push(lock);
3249
this.trying = true;
33-
this.tryEnter();
50+
return this.tryEnter();
3451
}
3552

3653
tryEnter() {
37-
if (!this.callback) return;
38-
const prev = Atomics.exchange(this.lock, 0, LOCKED);
39-
if (prev === UNLOCKED) {
40-
this.owner = true;
41-
this.trying = false;
42-
this.callback(this).then(() => {
43-
this.leave();
44-
});
45-
this.callback = null;
46-
}
54+
if (this.queue.length === 0) return;
55+
const prev = Atomics.exchange(this.flag, 0, LOCKED);
56+
if (prev === LOCKED) return;
57+
this.owner = true;
58+
this.trying = false;
59+
const lock = this.queue.shift();
60+
this.current = lock;
61+
return lock.callback(lock).then(() => {
62+
this.leave();
63+
});
64+
}
65+
66+
enterIfAvailable(lock) {
67+
if (this.owner) return lock.callback();
68+
const prev = Atomics.exchange(this.flag, 0, LOCKED);
69+
if (prev === LOCKED) return lock.callback();
70+
this.owner = true;
71+
this.trying = false;
72+
this.current = lock;
73+
return lock.callback(lock).then(() => {
74+
this.leave();
75+
});
4776
}
4877

4978
leave() {
5079
if (!this.owner) return;
51-
Atomics.store(this.lock, 0, UNLOCKED);
80+
Atomics.store(this.flag, 0, UNLOCKED);
5281
this.owner = false;
53-
sendMessage({ kind: 'leave', resourceName: this.resourceName });
82+
this.current = null;
83+
sendMessage({ kind: 'leave', resourceName: this.name });
84+
this.tryEnter();
5485
}
5586
}
5687

57-
const resources = new Map();
58-
59-
const request = (resourceName, callback) => {
60-
let lock = resources.get(resourceName);
61-
if (!lock) {
88+
const request = (resourceName, options, callback) => {
89+
const lock = new Lock(resourceName, options, callback);
90+
let mutex = resources.get(resourceName);
91+
if (!mutex) {
6292
const buffer = new SharedArrayBuffer(4);
63-
lock = new Mutex(resourceName, buffer, true);
64-
resources.set(resourceName, lock);
93+
mutex = new Mutex(resourceName, buffer, true);
94+
resources.set(resourceName, mutex);
6595
sendMessage({ kind: 'create', resourceName, buffer });
6696
}
67-
lock.enter(callback);
68-
return lock;
97+
if (lock.ifAvailable) return mutex.enterIfAvailable(lock);
98+
return mutex.enter(lock);
6999
};
70100

71101
const receiveMessage = message => {
72102
const { kind, resourceName, buffer } = message;
73103
if (kind === 'create') {
74-
const lock = new Mutex(resourceName, buffer);
75-
resources.set(resourceName, lock);
104+
const mutex = new Mutex(resourceName, buffer);
105+
resources.set(resourceName, mutex);
106+
} else if (kind === 'leave') {
107+
for (const mutex of resources) {
108+
if (mutex.trying) mutex.tryEnter();
109+
}
76110
}
77111
};
78112

@@ -96,6 +130,33 @@ class Thread {
96130
}
97131
}
98132

99-
const locks = { resources, request, sendMessage, receiveMessage, Thread };
133+
class LockManagerSnapshot {
134+
constructor() {
135+
const held = [];
136+
const pending = [];
137+
this.held = held;
138+
this.pending = pending;
139+
140+
for (const mutex of resources) {
141+
if (mutex.queue.length > 0) {
142+
pending.push(...mutex.queue);
143+
}
144+
if (mutex.current) {
145+
held.push(mutex.current);
146+
}
147+
}
148+
}
149+
}
150+
151+
class LockManager {
152+
constructor() {
153+
this.request = request;
154+
this.Thread = Thread;
155+
}
156+
query() {
157+
const snapshot = new LockManagerSnapshot();
158+
return Promise.resolve(snapshot);
159+
}
160+
}
100161

101-
module.exports = { locks };
162+
module.exports = { locks: new LockManager() };

0 commit comments

Comments
 (0)