Skip to content

Commit 7b88fb0

Browse files
authored
add: multi read table lock manager (#153)
1 parent 640af75 commit 7b88fb0

File tree

7 files changed

+266
-84
lines changed

7 files changed

+266
-84
lines changed

meerkat-browser-runner/src/app/app.tsx

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,7 @@ import {
66
FileManagerType,
77
getMainAppName,
88
getRunnerAppName,
9-
RunnerMemoryDBFileManager,
9+
RunnerIndexedDBFileManager,
1010
WindowCommunication,
1111
} from '@devrev/meerkat-dbm';
1212

@@ -72,7 +72,7 @@ export function App() {
7272
}
7373

7474
if (!fileManagerRef.current) {
75-
fileManagerRef.current = new RunnerMemoryDBFileManager({
75+
fileManagerRef.current = new RunnerIndexedDBFileManager({
7676
instanceManager: instanceManagerRef.current,
7777
fetchTableFileBuffers: async () => [],
7878
logger: log,
@@ -82,7 +82,6 @@ export function App() {
8282
payload: event,
8383
});
8484
},
85-
communication: communicationRef.current,
8685
});
8786
}
8887

meerkat-dbm/package.json

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,6 @@
11
{
22
"name": "@devrev/meerkat-dbm",
3-
"version": "0.1.32",
3+
"version": "0.1.33",
44
"dependencies": {
55
"tslib": "^2.3.0",
66
"@duckdb/duckdb-wasm": "1.28.1-dev258.0",

meerkat-dbm/src/dbm/__test__/dbm-parallel.spec.ts

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -156,4 +156,36 @@ describe('DBMParallel', () => {
156156

157157
jest.useRealTimers();
158158
});
159+
160+
it('should wait for a write lock to be released before executing a query', async () => {
161+
const tableName = 'lockedTable';
162+
let queryExecuted = false;
163+
164+
// Mock sendRequest to track execution
165+
runnerMock.communication.sendRequest.mockImplementation(() => {
166+
queryExecuted = true;
167+
return Promise.resolve({
168+
message: { isError: false, data: [{ data: 1 }] },
169+
});
170+
});
171+
172+
// Acquire a write lock
173+
await dbmParallel.lockTables([tableName], 'write');
174+
175+
// Execute a query that requires a read lock on the same table
176+
const queryPromise = dbmParallel.queryWithTables({
177+
query: `SELECT * FROM ${tableName}`,
178+
tables: [{ name: tableName }],
179+
});
180+
181+
// The query should not have been executed yet because of the write lock
182+
expect(queryExecuted).toBe(false);
183+
184+
// Release the write lock
185+
dbmParallel.unlockTables([tableName], 'write');
186+
187+
// Now the query should be able to execute
188+
await queryPromise;
189+
expect(queryExecuted).toBe(true);
190+
});
159191
});
Lines changed: 134 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -1,70 +1,165 @@
11
import { TableLockManager } from '../table-lock-manager';
22

3-
describe('Table Lock Manager', () => {
3+
const delay = (ms: number) => new Promise((resolve) => setTimeout(resolve, ms));
4+
5+
describe('TableLockManager', () => {
46
let tableLockManager: TableLockManager;
7+
const tableName = 'testTable';
58

69
beforeEach(() => {
710
tableLockManager = new TableLockManager();
811
});
912

10-
it('should lock the table and release it', async () => {
11-
const tableName = 'exampleTable';
13+
describe('write locks', () => {
14+
it('should acquire and release a write lock', async () => {
15+
await tableLockManager.lockTables([tableName], 'write');
16+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
17+
18+
tableLockManager.unlockTables([tableName], 'write');
19+
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
20+
});
21+
22+
it('a second writer should wait for the first writer to release the lock', async () => {
23+
let writer1Finished = false;
24+
const writer1 = async () => {
25+
await tableLockManager.lockTables([tableName], 'write');
26+
27+
await delay(50); // Simulate work
28+
29+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
30+
tableLockManager.unlockTables([tableName], 'write');
31+
writer1Finished = true;
32+
};
1233

13-
// Request the lock for the table and then release it
14-
await tableLockManager.lockTables([tableName]);
34+
let writer2Finished = false;
35+
const writer2 = async () => {
36+
await tableLockManager.lockTables([tableName], 'write');
1537

16-
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
38+
// When writer2 gets the lock, writer1 should be finished.
39+
expect(writer1Finished).toBe(true);
1740

18-
await tableLockManager.unlockTables([tableName]);
41+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
42+
tableLockManager.unlockTables([tableName], 'write');
43+
writer2Finished = true;
44+
};
1945

20-
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
46+
await Promise.all([writer1(), writer2()]);
2147

22-
// Again request the lock for the table
23-
await tableLockManager.lockTables([tableName]);
48+
expect(writer1Finished).toBe(true);
49+
expect(writer2Finished).toBe(true);
2450

25-
await tableLockManager.unlockTables([tableName]);
51+
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
52+
});
2653
});
2754

28-
it('two consumers requesting lock for the same table', async () => {
29-
const tableName = 'exampleTable';
55+
describe('read locks', () => {
56+
it('should allow multiple readers to acquire a lock simultaneously', async () => {
57+
const readerPromises = [
58+
tableLockManager.lockTables([tableName], 'read'),
59+
tableLockManager.lockTables([tableName], 'read'),
60+
tableLockManager.lockTables([tableName], 'read'),
61+
];
62+
63+
await Promise.all(readerPromises);
64+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
65+
66+
tableLockManager.unlockTables([tableName], 'read');
67+
tableLockManager.unlockTables([tableName], 'read');
68+
tableLockManager.unlockTables([tableName], 'read');
69+
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
70+
});
71+
});
72+
73+
describe('mixed read/write locks', () => {
74+
it('should not allow a writer if readers have the lock', async () => {
75+
await tableLockManager.lockTables([tableName], 'read');
76+
77+
let writerAcquiredLock = false;
78+
const writerPromise = tableLockManager
79+
.lockTables([tableName], 'write')
80+
.then(() => {
81+
writerAcquiredLock = true;
82+
});
3083

31-
// Set up promises for the two consumers
32-
const consumer1Promise = tableLockManager.lockTables([tableName]);
33-
const consumer2Promise = tableLockManager.lockTables([tableName]);
84+
await delay(10); // Give writer time to wait
85+
expect(writerAcquiredLock).toBe(false);
86+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
3487

35-
// Wait for the first consumer to get the lock
36-
await expect(consumer1Promise).resolves.toBeUndefined();
88+
// Reader releases the lock
89+
tableLockManager.unlockTables([tableName], 'read');
3790

38-
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
91+
await writerPromise;
92+
expect(writerAcquiredLock).toBe(true);
93+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
3994

40-
const timeout1 = new Promise((resolve) => {
41-
setTimeout(resolve, 1000, 'TIMEOUT');
95+
tableLockManager.unlockTables([tableName], 'write');
96+
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
4297
});
4398

44-
// Promise.race will wait for either the promises be resolved
45-
// consumer2 will not be able to get the lock as it is already locked by consumer1
46-
await expect(Promise.race([consumer2Promise, timeout1])).resolves.toBe(
47-
'TIMEOUT'
48-
);
99+
it('should not allow a reader if a writer has the lock', async () => {
100+
await tableLockManager.lockTables([tableName], 'write');
49101

50-
// Release the lock for the first consumer
51-
await tableLockManager.unlockTables([tableName]);
102+
let readerAcquiredLock = false;
103+
const readerPromise = tableLockManager
104+
.lockTables([tableName], 'read')
105+
.then(() => {
106+
readerAcquiredLock = true;
107+
});
52108

53-
// Check if the table is still locked as the consumer2 will get the lock
54-
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
109+
await delay(10);
110+
expect(readerAcquiredLock).toBe(false);
111+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
55112

56-
const timeout2 = new Promise((resolve) => {
57-
setTimeout(resolve, 1000, 'TIMEOUT');
113+
// Writer releases lock
114+
tableLockManager.unlockTables([tableName], 'write');
115+
116+
await readerPromise;
117+
expect(readerAcquiredLock).toBe(true);
118+
119+
tableLockManager.unlockTables([tableName], 'read');
120+
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
58121
});
59122

60-
// This time the consumer2 will get the lock
61-
await expect(
62-
Promise.race([consumer2Promise, timeout2])
63-
).resolves.toBeUndefined();
123+
it('should prioritize waiting readers over a new writer', async () => {
124+
// Writer1 acquires the lock
125+
await tableLockManager.lockTables([tableName], 'write');
126+
127+
// Readers start waiting
128+
const readerPromises = [
129+
tableLockManager.lockTables([tableName], 'read'),
130+
tableLockManager.lockTables([tableName], 'read'),
131+
];
132+
133+
// Writer2 starts waiting
134+
const writer2Promise = tableLockManager.lockTables([tableName], 'write');
135+
136+
// Release writer1's lock
137+
tableLockManager.unlockTables([tableName], 'write');
64138

65-
// Release the lock
66-
await tableLockManager.unlockTables([tableName]);
139+
// The waiting readers should get the lock next
140+
await Promise.all(readerPromises);
141+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
67142

68-
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
143+
let writer2AcquiredLock = false;
144+
writer2Promise.then(() => {
145+
writer2AcquiredLock = true;
146+
});
147+
148+
await delay(10);
149+
// Writer2 should still be waiting
150+
expect(writer2AcquiredLock).toBe(false);
151+
152+
// Readers release locks
153+
tableLockManager.unlockTables([tableName], 'read');
154+
tableLockManager.unlockTables([tableName], 'read');
155+
156+
// Now writer2 should get the lock
157+
await writer2Promise;
158+
expect(writer2AcquiredLock).toBe(true);
159+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
160+
161+
tableLockManager.unlockTables([tableName], 'write');
162+
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
163+
});
69164
});
70165
});

meerkat-dbm/src/dbm/dbm-parallel/dbm-parallel.ts

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -128,6 +128,14 @@ export class DBMParallel extends TableLockManager {
128128
throw new Error('No runner found');
129129
}
130130

131+
/**
132+
* Lock the tables
133+
*/
134+
await this.lockTables(
135+
tables.map((table) => table.name),
136+
'read'
137+
);
138+
131139
const response =
132140
await runner.communication.sendRequest<BrowserRunnerExecQueryMessageResponse>(
133141
{
@@ -143,6 +151,14 @@ export class DBMParallel extends TableLockManager {
143151
const end = performance.now();
144152
this.logger.info(`Time to execute by DBM Parallel`, end - start);
145153

154+
/**
155+
* Unlock the tables
156+
*/
157+
this.unlockTables(
158+
tables.map((table) => table.name),
159+
'read'
160+
);
161+
146162
/**
147163
* The implementation is based on postMessage API, so we don't have the ability to throw an error from the runner
148164
* We have to check the response and throw an error if isError is true

0 commit comments

Comments
 (0)