Skip to content

Commit e4c51e3

Browse files
authored
refactor: dbm and file manager (#124)
* refactor: dbm * fix: test * remove: log * fix: test
1 parent fd2e373 commit e4c51e3

24 files changed

+310
-345
lines changed

.gitignore

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -43,3 +43,7 @@ Thumbs.db
4343

4444
# NX
4545
.nx
46+
47+
48+
# Parquet data
49+
/electron-app/src/data

benchmarking/src/app/benchmarking-tests/dbm-benchmarking.spec.ts

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,9 @@ describe('Benchmarking DBMs', () => {
2020

2121
browser = await puppeteer.launch({
2222
headless: 'new',
23+
args: ['--no-sandbox', '--disable-setuid-sandbox'],
2324
});
25+
2426
page = await browser.newPage();
2527

2628
//Wait for the server to start by visiting the page

benchmarking/src/app/dbm-context/parallel-memory-dbm-context.tsx

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,7 @@ export const ParallelMemoryDBMProvider = ({
1515
}: {
1616
children: JSX.Element;
1717
}) => {
18-
const [dbm, setdbm] = useState<DBMParallel<SharedArrayBuffer> | null>(null);
18+
const [dbm, setdbm] = useState<DBMParallel | null>(null);
1919
const instanceManagerRef = useRef<InstanceManager>(new InstanceManager());
2020
const fileManagerRef = useRef<ParallelMemoryFileManager>(
2121
new ParallelMemoryFileManager({

benchmarking/src/app/hooks/dbm-context.tsx

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,17 @@
11
import { DBM, DBMParallel, FileManagerType } from '@devrev/meerkat-dbm';
22
import React from 'react';
33

4-
export type DBMContextType<T> = {
5-
dbm: DBM | DBMParallel<T>;
4+
export type DBMContextType = {
5+
dbm: DBM | DBMParallel;
66
fileManager: FileManagerType;
77
};
88

9-
export const DBMContext = React.createContext<DBMContextType<any> | undefined>(
9+
export const DBMContext = React.createContext<DBMContextType | undefined>(
1010
undefined
1111
);
1212

13-
export const useDBM = <T,>() => {
14-
const context = React.useContext(DBMContext) as DBMContextType<T> | undefined;
13+
export const useDBM = () => {
14+
const context = React.useContext(DBMContext) as DBMContextType | undefined;
1515
if (context === undefined) {
1616
throw new Error('useDBM must be used within a DBMProvider');
1717
}

meerkat-dbm/src/dbm/__test__/dbm-parallel.spec.ts

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -37,8 +37,8 @@ iFrameRunnerManager.iFrameManagers.set('1', runnerMock as any);
3737
iFrameRunnerManager.iFrameManagers.set('2', runnerMock as any);
3838

3939
describe('DBMParallel', () => {
40-
let dbmParallel: DBMParallel<SharedArrayBuffer>;
41-
let fileManager: FileManagerType<SharedArrayBuffer>;
40+
let dbmParallel: DBMParallel;
41+
let fileManager: FileManagerType;
4242
let instanceManager: InstanceManager;
4343

4444
beforeAll(async () => {

meerkat-dbm/src/dbm/__test__/dbm.spec.ts

Lines changed: 5 additions & 76 deletions
Original file line numberDiff line numberDiff line change
@@ -9,11 +9,11 @@ import { DBM } from '../dbm';
99
import { DBMConstructorOptions, TableConfig } from '../types';
1010
import { InstanceManager } from './mock';
1111

12-
export class MockFileManager<T> implements FileManagerType<T> {
13-
private fileBufferStore: Record<string, FileBufferStore<T>> = {};
12+
export class MockFileManager implements FileManagerType {
13+
private fileBufferStore: Record<string, FileBufferStore> = {};
1414
private tables: Record<string, Table> = {};
1515

16-
async bulkRegisterFileBuffer(props: FileBufferStore<T>[]): Promise<void> {
16+
async bulkRegisterFileBuffer(props: FileBufferStore[]): Promise<void> {
1717
for (const prop of props) {
1818
this.fileBufferStore[prop.fileName] = prop;
1919
this.tables[prop.tableName] = this.tables[prop.tableName] || {
@@ -23,7 +23,7 @@ export class MockFileManager<T> implements FileManagerType<T> {
2323
}
2424
}
2525

26-
async registerFileBuffer(prop: FileBufferStore<T>): Promise<void> {
26+
async registerFileBuffer(prop: FileBufferStore): Promise<void> {
2727
this.fileBufferStore[prop.fileName] = prop;
2828
this.tables[prop.tableName] = this.tables[prop.tableName] || { files: [] };
2929
this.tables[prop.tableName].files.push(prop);
@@ -40,18 +40,10 @@ export class MockFileManager<T> implements FileManagerType<T> {
4040

4141
this.registerFileBuffer({
4242
...fileData,
43-
buffer: [] as T,
43+
buffer: new Uint8Array(),
4444
});
4545
}
4646

47-
async getFileBuffer(name: string): Promise<T> {
48-
const fileBuffer = this.fileBufferStore[name];
49-
if (!fileBuffer) {
50-
throw new Error(`File buffer for ${name} not found`);
51-
}
52-
return fileBuffer.buffer;
53-
}
54-
5547
async mountFileBufferByTables(tables: TableConfig[]): Promise<void> {
5648
const tableNames = tables.map((table) => table.name);
5749
for (const tableName of tableNames) {
@@ -462,69 +454,6 @@ describe('DBM', () => {
462454
});
463455
});
464456

465-
describe('table locks', () => {
466-
it('should lock the table and release it', async () => {
467-
const tableName = 'exampleTable';
468-
469-
// Request the lock for the table and then release it
470-
await dbm.lockTables([tableName]);
471-
472-
expect(dbm.isTableLocked(tableName)).toBe(true);
473-
474-
await dbm.unlockTables([tableName]);
475-
476-
expect(dbm.isTableLocked(tableName)).toBe(false);
477-
478-
// Again request the lock for the table
479-
await dbm.lockTables([tableName]);
480-
481-
await dbm.unlockTables([tableName]);
482-
});
483-
484-
it('two consumers requesting lock for the same table', async () => {
485-
const tableName = 'exampleTable';
486-
487-
// Set up promises for the two consumers
488-
const consumer1Promise = dbm.lockTables([tableName]);
489-
const consumer2Promise = dbm.lockTables([tableName]);
490-
491-
// Wait for the first consumer to get the lock
492-
await expect(consumer1Promise).resolves.toBeUndefined();
493-
494-
expect(dbm.isTableLocked(tableName)).toBe(true);
495-
496-
const timeout1 = new Promise((resolve) => {
497-
setTimeout(resolve, 1000, 'TIMEOUT');
498-
});
499-
500-
// Promise.race will wait for either the promises be resolved
501-
// consumer2 will not be able to get the lock as it is already locked by consumer1
502-
await expect(Promise.race([consumer2Promise, timeout1])).resolves.toBe(
503-
'TIMEOUT'
504-
);
505-
506-
// Release the lock for the first consumer
507-
await dbm.unlockTables([tableName]);
508-
509-
// Check if the table is still locked as the consumer2 will get the lock
510-
expect(dbm.isTableLocked(tableName)).toBe(true);
511-
512-
const timeout2 = new Promise((resolve) => {
513-
setTimeout(resolve, 1000, 'TIMEOUT');
514-
});
515-
516-
// This time the consumer2 will get the lock
517-
await expect(
518-
Promise.race([consumer2Promise, timeout2])
519-
).resolves.toBeUndefined();
520-
521-
// Release the lock
522-
await dbm.unlockTables([tableName]);
523-
524-
expect(dbm.isTableLocked(tableName)).toBe(false);
525-
});
526-
});
527-
528457
describe('create connection callback', () => {
529458
it('should call the create connection callback', async () => {
530459
await dbm.query('SELECT 1');
Lines changed: 70 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,70 @@
1+
import { TableLockManager } from '../table-lock-manager';
2+
3+
describe('Table Lock Manager', () => {
4+
let tableLockManager: TableLockManager;
5+
6+
beforeEach(() => {
7+
tableLockManager = new TableLockManager();
8+
});
9+
10+
it('should lock the table and release it', async () => {
11+
const tableName = 'exampleTable';
12+
13+
// Request the lock for the table and then release it
14+
await tableLockManager.lockTables([tableName]);
15+
16+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
17+
18+
await tableLockManager.unlockTables([tableName]);
19+
20+
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
21+
22+
// Again request the lock for the table
23+
await tableLockManager.lockTables([tableName]);
24+
25+
await tableLockManager.unlockTables([tableName]);
26+
});
27+
28+
it('two consumers requesting lock for the same table', async () => {
29+
const tableName = 'exampleTable';
30+
31+
// Set up promises for the two consumers
32+
const consumer1Promise = tableLockManager.lockTables([tableName]);
33+
const consumer2Promise = tableLockManager.lockTables([tableName]);
34+
35+
// Wait for the first consumer to get the lock
36+
await expect(consumer1Promise).resolves.toBeUndefined();
37+
38+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
39+
40+
const timeout1 = new Promise((resolve) => {
41+
setTimeout(resolve, 1000, 'TIMEOUT');
42+
});
43+
44+
// Promise.race will wait for either the promises be resolved
45+
// consumer2 will not be able to get the lock as it is already locked by consumer1
46+
await expect(Promise.race([consumer2Promise, timeout1])).resolves.toBe(
47+
'TIMEOUT'
48+
);
49+
50+
// Release the lock for the first consumer
51+
await tableLockManager.unlockTables([tableName]);
52+
53+
// Check if the table is still locked as the consumer2 will get the lock
54+
expect(tableLockManager.isTableLocked(tableName)).toBe(true);
55+
56+
const timeout2 = new Promise((resolve) => {
57+
setTimeout(resolve, 1000, 'TIMEOUT');
58+
});
59+
60+
// This time the consumer2 will get the lock
61+
await expect(
62+
Promise.race([consumer2Promise, timeout2])
63+
).resolves.toBeUndefined();
64+
65+
// Release the lock
66+
await tableLockManager.unlockTables([tableName]);
67+
68+
expect(tableLockManager.isTableLocked(tableName)).toBe(false);
69+
});
70+
});

meerkat-dbm/src/dbm/dbm-parallel/dbm-parallel.ts

Lines changed: 7 additions & 70 deletions
Original file line numberDiff line numberDiff line change
@@ -4,12 +4,8 @@ import {
44
BROWSER_RUNNER_TYPE,
55
BrowserRunnerExecQueryMessageResponse,
66
} from '../../window-communication/runner-types';
7-
import {
8-
DBMConstructorOptions,
9-
QueryOptions,
10-
TableConfig,
11-
TableLock,
12-
} from '../types';
7+
import { TableLockManager } from '../table-lock-manager';
8+
import { DBMConstructorOptions, QueryOptions, TableConfig } from '../types';
139
import { IFrameRunnerManager } from './runner-manager';
1410

1511
//Round Robin for multiple runners like 10
@@ -21,10 +17,9 @@ const roundRobin = (counter: number, maxValue: number): number => {
2117
return counter + 1;
2218
};
2319

24-
export class DBMParallel<BufferType = Uint8Array> {
25-
private fileManager: FileManagerType<BufferType>;
20+
export class DBMParallel extends TableLockManager {
21+
private fileManager: FileManagerType;
2622
private logger: DBMLogger;
27-
private tableLockRegistry: Record<string, TableLock> = {};
2823

2924
private onEvent?: (event: DBMEvent) => void;
3025
private options: DBMConstructorOptions['options'];
@@ -42,9 +37,11 @@ export class DBMParallel<BufferType = Uint8Array> {
4237
instanceManager,
4338
onDuckDBShutdown,
4439
iFrameRunnerManager,
45-
}: DBMConstructorOptions<BufferType> & {
40+
}: DBMConstructorOptions & {
4641
iFrameRunnerManager: IFrameRunnerManager;
4742
}) {
43+
super();
44+
4845
this.fileManager = fileManager;
4946
this.logger = logger;
5047
this.onEvent = onEvent;
@@ -91,66 +88,6 @@ export class DBMParallel<BufferType = Uint8Array> {
9188
}, this.options.shutdownInactiveTime);
9289
}
9390

94-
async lockTables(tableNames: string[]): Promise<void> {
95-
const promises = [];
96-
97-
for (const tableName of tableNames) {
98-
const tableLock = this.tableLockRegistry[tableName];
99-
100-
// If the table lock doesn't exist, create a new lock
101-
if (!tableLock) {
102-
this.tableLockRegistry[tableName] = {
103-
isLocked: true,
104-
promiseQueue: [],
105-
};
106-
continue;
107-
}
108-
109-
// If the table is already locked, add the promise to the queue
110-
if (tableLock.isLocked) {
111-
const promise = new Promise<void>((resolve, reject) => {
112-
tableLock.promiseQueue.push({ reject, resolve });
113-
});
114-
promises.push(promise);
115-
}
116-
117-
// Set the table as locked
118-
tableLock.isLocked = true;
119-
}
120-
121-
// Wait for all promises to resolve (locks to be acquired)
122-
await Promise.all(promises);
123-
}
124-
125-
async unlockTables(tableNames: string[]): Promise<void> {
126-
for (const tableName of tableNames) {
127-
const tableLock = this.tableLockRegistry[tableName];
128-
129-
// If the table lock doesn't exist, create a new lock
130-
if (!tableLock) {
131-
this.tableLockRegistry[tableName] = {
132-
isLocked: false,
133-
promiseQueue: [],
134-
};
135-
}
136-
137-
const nextPromiseInQueue = tableLock?.promiseQueue?.shift();
138-
139-
// If there is a promise in the queue, resolve it and keep the table as locked
140-
if (nextPromiseInQueue) {
141-
tableLock.isLocked = true;
142-
nextPromiseInQueue.resolve();
143-
} else {
144-
// If there are no promises in the queue, set the table as unlocked
145-
tableLock.isLocked = false;
146-
}
147-
}
148-
}
149-
150-
isTableLocked(tableName: string): boolean {
151-
return this.tableLockRegistry[tableName]?.isLocked ?? false;
152-
}
153-
15491
public async queryWithTables({
15592
query,
15693
tables,

meerkat-dbm/src/dbm/dbm-parallel/runner-manager.ts

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,7 @@ import { IFrameManager } from './iframe-manager';
1212
export interface IFrameRunnerManagerConstructor {
1313
runnerURL: string;
1414
origin: string;
15-
fetchTableFileBuffers: (
16-
tables: TableConfig[]
17-
) => Promise<FileBufferStore<SharedArrayBuffer>[]>;
15+
fetchTableFileBuffers: (tables: TableConfig[]) => Promise<FileBufferStore[]>;
1816
fetchPreQuery: (
1917
runnerId: string,
2018
tableWiseFiles: TableWiseFiles[]
@@ -47,7 +45,7 @@ export class IFrameRunnerManager {
4745

4846
private fetchTableFileBuffers: (
4947
tables: TableConfig[]
50-
) => Promise<FileBufferStore<SharedArrayBuffer>[]>;
48+
) => Promise<FileBufferStore[]>;
5149
private fetchPreQuery: (
5250
runnerId: string,
5351
tableWiseFiles: TableWiseFiles[]

0 commit comments

Comments
 (0)