Skip to content

Commit 48e9bc0

Browse files
authored
feat: add table lock (#60)
* add: paritions filter in get table * update: test * add: table lock * update: test * add: test * update: test * add: islocked method
1 parent 6465650 commit 48e9bc0

File tree

7 files changed

+185
-21
lines changed

7 files changed

+185
-21
lines changed

meerkat-dbm/src/dbm/dbm.spec.ts

+65-2
Original file line numberDiff line numberDiff line change
@@ -109,8 +109,8 @@ export class MockFileManager implements FileManagerType {
109109
return data;
110110
}
111111

112-
async getTableData(tableName: string): Promise<Table | undefined> {
113-
return this.tables[tableName];
112+
async getTableData(table: TableConfig): Promise<Table | undefined> {
113+
return this.tables[table.name];
114114
}
115115

116116
async setTableMetadata(table: string, metadata: object): Promise<void> {
@@ -490,4 +490,67 @@ describe('DBM', () => {
490490
expect(promises[1].status).toBe('rejected');
491491
});
492492
});
493+
494+
describe('table locks', () => {
495+
it('should lock the table and release it', async () => {
496+
const tableName = 'exampleTable';
497+
498+
// Request the lock for the table and then release it
499+
await dbm.lockTables([tableName]);
500+
501+
expect(dbm.isTableLocked(tableName)).toBe(true);
502+
503+
await dbm.unlockTables([tableName]);
504+
505+
expect(dbm.isTableLocked(tableName)).toBe(false);
506+
507+
// Again request the lock for the table
508+
await dbm.lockTables([tableName]);
509+
510+
await dbm.unlockTables([tableName]);
511+
});
512+
513+
it('two consumers requesting lock for the same table', async () => {
514+
const tableName = 'exampleTable';
515+
516+
// Set up promises for the two consumers
517+
const consumer1Promise = dbm.lockTables([tableName]);
518+
const consumer2Promise = dbm.lockTables([tableName]);
519+
520+
// Wait for the first consumer to get the lock
521+
await expect(consumer1Promise).resolves.toBeUndefined();
522+
523+
expect(dbm.isTableLocked(tableName)).toBe(true);
524+
525+
const timeout1 = new Promise((resolve) => {
526+
setTimeout(resolve, 1000, 'TIMEOUT');
527+
});
528+
529+
// Promise.race will wait for either the promises be resolved
530+
// consumer2 will not be able to get the lock as it is already locked by consumer1
531+
await expect(Promise.race([consumer2Promise, timeout1])).resolves.toBe(
532+
'TIMEOUT'
533+
);
534+
535+
// Release the lock for the first consumer
536+
await dbm.unlockTables([tableName]);
537+
538+
// Check if the table is still locked as the consumer2 will get the lock
539+
expect(dbm.isTableLocked(tableName)).toBe(true);
540+
541+
const timeout2 = new Promise((resolve) => {
542+
setTimeout(resolve, 1000, 'TIMEOUT');
543+
});
544+
545+
// This time the consumer2 will get the lock
546+
await expect(
547+
Promise.race([consumer2Promise, timeout2])
548+
).resolves.toBeUndefined();
549+
550+
// Release the lock
551+
await dbm.unlockTables([tableName]);
552+
553+
expect(dbm.isTableLocked(tableName)).toBe(false);
554+
});
555+
});
493556
});

meerkat-dbm/src/dbm/dbm.ts

+74
Original file line numberDiff line numberDiff line change
@@ -9,13 +9,15 @@ import {
99
QueryOptions,
1010
QueryQueueItem,
1111
TableConfig,
12+
TableLock,
1213
} from './types';
1314

1415
export class DBM {
1516
private fileManager: FileManagerType;
1617
private instanceManager: InstanceManagerType;
1718
private connection: AsyncDuckDBConnection | null = null;
1819
private queriesQueue: QueryQueueItem[] = [];
20+
private tableLockRegistry: Record<string, TableLock> = {};
1921

2022
private logger: DBMLogger;
2123
private onEvent?: (event: DBMEvent) => void;
@@ -99,6 +101,66 @@ export class DBM {
99101
return this.connection;
100102
}
101103

104+
async lockTables(tableNames: string[]): Promise<void> {
105+
const promises = [];
106+
107+
for (const tableName of tableNames) {
108+
const tableLock = this.tableLockRegistry[tableName];
109+
110+
// If the table lock doesn't exist, create a new lock
111+
if (!tableLock) {
112+
this.tableLockRegistry[tableName] = {
113+
isLocked: true,
114+
promiseQueue: [],
115+
};
116+
continue;
117+
}
118+
119+
// If the table is already locked, add the promise to the queue
120+
if (tableLock.isLocked) {
121+
const promise = new Promise<void>((resolve, reject) => {
122+
tableLock.promiseQueue.push({ reject, resolve });
123+
});
124+
promises.push(promise);
125+
}
126+
127+
// Set the table as locked
128+
tableLock.isLocked = true;
129+
}
130+
131+
// Wait for all promises to resolve (locks to be acquired)
132+
await Promise.all(promises);
133+
}
134+
135+
async unlockTables(tableNames: string[]): Promise<void> {
136+
for (const tableName of tableNames) {
137+
const tableLock = this.tableLockRegistry[tableName];
138+
139+
// If the table lock doesn't exist, create a new lock
140+
if (!tableLock) {
141+
this.tableLockRegistry[tableName] = {
142+
isLocked: false,
143+
promiseQueue: [],
144+
};
145+
}
146+
147+
const nextPromiseInQueue = tableLock?.promiseQueue?.shift();
148+
149+
// If there is a promise in the queue, resolve it and keep the table as locked
150+
if (nextPromiseInQueue) {
151+
tableLock.isLocked = true;
152+
nextPromiseInQueue.resolve();
153+
} else {
154+
// If there are no promises in the queue, set the table as unlocked
155+
tableLock.isLocked = false;
156+
}
157+
}
158+
}
159+
160+
isTableLocked(tableName: string): boolean {
161+
return this.tableLockRegistry[tableName]?.isLocked ?? false;
162+
}
163+
102164
private async _queryWithTables(
103165
query: string,
104166
tables: TableConfig[],
@@ -199,6 +261,11 @@ export class DBM {
199261
}
200262

201263
try {
264+
/**
265+
* Lock the tables
266+
*/
267+
this.lockTables(this.currentQueryItem.tables.map((table) => table.name));
268+
202269
const startTime = Date.now();
203270
this.logger.debug(
204271
'Time since query was added to the queue:',
@@ -242,6 +309,13 @@ export class DBM {
242309
* Reject the promise, so the caller can catch the error
243310
*/
244311
this.currentQueryItem?.promise.reject(error);
312+
} finally {
313+
/**
314+
* Unlock the tables
315+
*/
316+
this.unlockTables(
317+
this.currentQueryItem.tables.map((table) => table.name)
318+
);
245319
}
246320

247321
/**

meerkat-dbm/src/dbm/types.ts

+8
Original file line numberDiff line numberDiff line change
@@ -101,3 +101,11 @@ export interface QueryQueueItem {
101101
connectionId: string;
102102
options?: QueryOptions;
103103
}
104+
105+
export interface TableLock {
106+
promiseQueue: {
107+
resolve: () => void;
108+
reject: () => void;
109+
}[];
110+
isLocked: boolean;
111+
}

meerkat-dbm/src/file-manager/__tests__/indexed-db-file-manager.spec.ts

+28-14
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ const mockDB = {
1919
};
2020
}
2121
};
22+
2223
describe('IndexedDBFileManager', () => {
2324
let fileManager: IndexedDBFileManager;
2425
let db: AsyncDuckDB;
@@ -29,22 +30,23 @@ describe('IndexedDBFileManager', () => {
2930
tableName: 'taxi1',
3031
fileName: 'taxi1.parquet',
3132
buffer: new Uint8Array([1, 2, 3]),
32-
fileType: FILE_TYPES.PARQUET
33+
fileType: FILE_TYPES.PARQUET,
34+
partitions: ['customer_oid=1'],
3335
};
3436

3537
const fileBuffers = [
3638
{
3739
tableName: 'taxi1',
3840
fileName: 'taxi2.parquet',
3941
buffer: new Uint8Array([1, 2, 3, 4]),
40-
fileType: FILE_TYPES.PARQUET
42+
fileType: FILE_TYPES.PARQUET,
4143
},
4244
{
4345
tableName: 'taxi2',
4446
fileName: 'taxi3.parquet',
4547
buffer: new Uint8Array([1, 2, 3, 4, 5]),
46-
fileType: FILE_TYPES.PARQUET
47-
}
48+
fileType: FILE_TYPES.PARQUET,
49+
},
4850
];
4951

5052
beforeAll(() => {
@@ -57,7 +59,7 @@ describe('IndexedDBFileManager', () => {
5759
},
5860
terminateDB: async () => {
5961
return;
60-
}
62+
},
6163
};
6264
});
6365

@@ -71,7 +73,7 @@ describe('IndexedDBFileManager', () => {
7173
logger: log,
7274
onEvent: (event) => {
7375
console.log(event);
74-
}
76+
},
7577
});
7678

7779
await fileManager.initializeDB();
@@ -91,7 +93,13 @@ describe('IndexedDBFileManager', () => {
9193
expect(tableData1.length).toBe(1);
9294
expect(tableData1[0]).toEqual({
9395
tableName: 'taxi1',
94-
files: [{ fileName: fileBuffer.fileName, fileType: FILE_TYPES.PARQUET }]
96+
files: [
97+
{
98+
fileName: fileBuffer.fileName,
99+
fileType: FILE_TYPES.PARQUET,
100+
partitions: fileBuffer.partitions,
101+
},
102+
],
95103
});
96104

97105
/**
@@ -112,7 +120,7 @@ describe('IndexedDBFileManager', () => {
112120
expect(tableData2.length).toBe(2);
113121
expect(tableData2[0].files.map((file) => file.fileName)).toEqual([
114122
'taxi1.parquet',
115-
'taxi2.parquet'
123+
'taxi2.parquet',
116124
]);
117125

118126
/**
@@ -121,7 +129,7 @@ describe('IndexedDBFileManager', () => {
121129
expect(fileBufferData2.map((file) => file.fileName)).toEqual([
122130
'taxi1.parquet',
123131
'taxi2.parquet',
124-
'taxi3.parquet'
132+
'taxi3.parquet',
125133
]);
126134
});
127135

@@ -144,7 +152,7 @@ describe('IndexedDBFileManager', () => {
144152
// Register the same file with a different buffer
145153
await fileManager.registerFileBuffer({
146154
...fileBuffer,
147-
buffer: new Uint8Array([1])
155+
buffer: new Uint8Array([1]),
148156
});
149157

150158
const fileBufferData2 = await indexedDB.files.toArray();
@@ -156,14 +164,20 @@ describe('IndexedDBFileManager', () => {
156164
});
157165

158166
it('should return the table data', async () => {
159-
const fileData = await fileManager.getTableData('taxi1');
167+
const fileData = await fileManager.getTableData({
168+
name: 'taxi1',
169+
partitions: fileBuffer.partitions,
170+
});
160171

161172
expect(fileData).toEqual({
162173
files: [
163-
{ fileName: 'taxi1.parquet', fileType: 'parquet' },
164-
{ fileName: 'taxi2.parquet', fileType: 'parquet' }
174+
{
175+
fileName: 'taxi1.parquet',
176+
fileType: 'parquet',
177+
partitions: fileBuffer.partitions,
178+
},
165179
],
166-
tableName: 'taxi1'
180+
tableName: 'taxi1',
167181
});
168182
});
169183

meerkat-dbm/src/file-manager/file-manager-type.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -91,7 +91,7 @@ export interface FileManagerType {
9191
* @param tableName - The name of the table.
9292
* @returns Table object if found.
9393
*/
94-
getTableData: (tableName: string) => Promise<Table | undefined>;
94+
getTableData: (table: TableConfig) => Promise<Table | undefined>;
9595

9696
/**
9797
* @description

meerkat-dbm/src/file-manager/indexed-db/indexed-db-file-manager.ts

+8-3
Original file line numberDiff line numberDiff line change
@@ -264,10 +264,15 @@ export class IndexedDBFileManager implements FileManagerType {
264264
await Promise.all(promises);
265265
}
266266

267-
async getTableData(tableName: string): Promise<Table | undefined> {
268-
const tableData = await this.indexedDB.tablesKey.get(tableName);
267+
async getTableData(table: TableConfig): Promise<Table | undefined> {
268+
const tableData = await this.indexedDB.tablesKey.get(table.name);
269+
270+
if (!tableData) return undefined;
269271

270-
return tableData;
272+
return {
273+
...tableData,
274+
files: getFilesByPartition(tableData?.files ?? [], table.partitions),
275+
};
271276
}
272277

273278
async setTableMetadata(tableName: string, metadata: object): Promise<void> {

meerkat-dbm/src/file-manager/memory/memory-file-manager.ts

+1-1
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ export class MemoryDBFileManager implements FileManagerType {
9393
return [];
9494
}
9595

96-
async getTableData(tableName: string): Promise<Table | undefined> {
96+
async getTableData(table: TableConfig): Promise<Table | undefined> {
9797
// not needed for memory file manager
9898
return;
9999
}

0 commit comments

Comments
 (0)