@@ -35,7 +35,9 @@ export interface WatchOnChangeEvent {
35
35
changedTables : string [ ] ;
36
36
}
37
37
38
- export interface PowerSyncDBListener extends StreamingSyncImplementationListener { }
38
+ export interface PowerSyncDBListener extends StreamingSyncImplementationListener {
39
+ initialized : ( ) => void ;
40
+ }
39
41
40
42
const POWERSYNC_TABLE_MATCH = / ( ^ p s _ d a t a _ _ | ^ p s _ d a t a _ l o c a l _ _ ) / ;
41
43
@@ -61,6 +63,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
61
63
protected static transactionMutex : Mutex = new Mutex ( ) ;
62
64
63
65
closed : boolean ;
66
+ ready : boolean ;
64
67
65
68
currentStatus ?: SyncStatus ;
66
69
syncStreamImplementation ?: AbstractStreamingSyncImplementation ;
@@ -69,14 +72,16 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
69
72
private abortController : AbortController | null ;
70
73
protected bucketStorageAdapter : BucketStorageAdapter ;
71
74
private syncStatusListenerDisposer ?: ( ) => void ;
72
- protected initialized : Promise < void > ;
75
+ protected _isReadyPromise : Promise < void > | null ;
73
76
74
77
constructor ( protected options : PowerSyncDatabaseOptions ) {
75
78
super ( ) ;
76
- this . currentStatus = null ;
79
+ this . _isReadyPromise = null ;
80
+ this . bucketStorageAdapter = this . generateBucketStorageAdapter ( ) ;
77
81
this . closed = true ;
82
+ this . currentStatus = null ;
78
83
this . options = { ...DEFAULT_POWERSYNC_DB_OPTIONS , ...options } ;
79
- this . bucketStorageAdapter = this . generateBucketStorageAdapter ( ) ;
84
+ this . ready = false ;
80
85
this . sdkVersion = '' ;
81
86
}
82
87
@@ -98,16 +103,40 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
98
103
99
104
protected abstract generateBucketStorageAdapter ( ) : BucketStorageAdapter ;
100
105
106
+ /**
107
+ * @returns A promise which will resolve once initialization is completed.
108
+ */
109
+ async waitForReady ( ) : Promise < void > {
110
+ if ( this . ready ) {
111
+ return ;
112
+ }
113
+
114
+ return (
115
+ this . _isReadyPromise ||
116
+ ( this . _isReadyPromise = new Promise ( ( resolve ) => {
117
+ const l = this . registerListener ( {
118
+ initialized : ( ) => {
119
+ this . ready = true ;
120
+ resolve ( ) ;
121
+ l ?.( ) ;
122
+ }
123
+ } ) ;
124
+ } ) )
125
+ ) ;
126
+ }
127
+
101
128
abstract _init ( ) : Promise < void > ;
129
+
130
+ /**
131
+ * This performs the total initialization process.
132
+ */
102
133
async init ( ) {
103
- this . initialized = ( async ( ) => {
104
- await this . _init ( ) ;
105
- await this . bucketStorageAdapter . init ( ) ;
106
- await this . database . execute ( 'SELECT powersync_replace_schema(?)' , [ JSON . stringify ( this . schema . toJSON ( ) ) ] ) ;
107
- const version = await this . options . database . execute ( 'SELECT powersync_rs_version()' ) ;
108
- this . sdkVersion = version . rows ?. item ( 0 ) [ 'powersync_rs_version()' ] ?? '' ;
109
- } ) ( ) ;
110
- await this . initialized ;
134
+ await this . _init ( ) ;
135
+ await this . bucketStorageAdapter . init ( ) ;
136
+ await this . database . execute ( 'SELECT powersync_replace_schema(?)' , [ JSON . stringify ( this . schema . toJSON ( ) ) ] ) ;
137
+ const version = await this . options . database . execute ( 'SELECT powersync_rs_version()' ) ;
138
+ this . sdkVersion = version . rows ?. item ( 0 ) [ 'powersync_rs_version()' ] ?? '' ;
139
+ this . iterateListeners ( ( cb ) => cb . initialized ?.( ) ) ;
111
140
}
112
141
113
142
/**
@@ -117,7 +146,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
117
146
// close connection if one is open
118
147
await this . disconnect ( ) ;
119
148
120
- await this . initialized ;
149
+ await this . waitForReady ( ) ;
121
150
this . syncStreamImplementation = this . generateSyncStreamImplementation ( connector ) ;
122
151
this . syncStatusListenerDisposer = this . syncStreamImplementation . registerListener ( {
123
152
statusChanged : ( status ) => {
@@ -175,7 +204,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
175
204
* must be constructed.
176
205
*/
177
206
async close ( ) {
178
- await this . initialized ;
207
+ await this . waitForReady ( ) ;
179
208
180
209
await this . disconnect ( ) ;
181
210
this . database . close ( ) ;
@@ -305,31 +334,31 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
305
334
* Execute a statement and optionally return results
306
335
*/
307
336
async execute ( sql : string , parameters ?: any [ ] ) {
308
- await this . initialized ;
337
+ await this . waitForReady ( ) ;
309
338
return this . database . execute ( sql , parameters ) ;
310
339
}
311
340
312
341
/**
313
342
* Execute a read-only query and return results
314
343
*/
315
344
async getAll < T > ( sql : string , parameters ?: any [ ] ) : Promise < T [ ] > {
316
- await this . initialized ;
345
+ await this . waitForReady ( ) ;
317
346
return this . database . getAll ( sql , parameters ) ;
318
347
}
319
348
320
349
/**
321
350
* Execute a read-only query and return the first result, or null if the ResultSet is empty.
322
351
*/
323
352
async getOptional < T > ( sql : string , parameters ?: any [ ] ) : Promise < T | null > {
324
- await this . initialized ;
353
+ await this . waitForReady ( ) ;
325
354
return this . database . getOptional ( sql , parameters ) ;
326
355
}
327
356
328
357
/**
329
358
* Execute a read-only query and return the first result, error if the ResultSet is empty.
330
359
*/
331
360
async get < T > ( sql : string , parameters ?: any [ ] ) : Promise < T > {
332
- await this . initialized ;
361
+ await this . waitForReady ( ) ;
333
362
return this . database . get ( sql , parameters ) ;
334
363
}
335
364
@@ -339,7 +368,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
339
368
* In most cases, [readTransaction] should be used instead.
340
369
*/
341
370
async readLock < T > ( callback : ( db : DBAdapter ) => Promise < T > ) {
342
- await this . initialized ;
371
+ await this . waitForReady ( ) ;
343
372
return mutexRunExclusive ( AbstractPowerSyncDatabase . transactionMutex , ( ) => callback ( this . database ) ) ;
344
373
}
345
374
@@ -348,7 +377,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
348
377
* In most cases, [writeTransaction] should be used instead.
349
378
*/
350
379
async writeLock < T > ( callback : ( db : DBAdapter ) => Promise < T > ) {
351
- await this . initialized ;
380
+ await this . waitForReady ( ) ;
352
381
return mutexRunExclusive ( AbstractPowerSyncDatabase . transactionMutex , async ( ) => {
353
382
const res = await callback ( this . database ) ;
354
383
_ . defer ( ( ) => this . syncStreamImplementation ?. triggerCrudUpload ( ) ) ;
@@ -360,7 +389,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
360
389
callback : ( tx : Transaction ) => Promise < T > ,
361
390
lockTimeout : number = DEFAULT_LOCK_TIMEOUT_MS
362
391
) : Promise < T > {
363
- await this . initialized ;
392
+ await this . waitForReady ( ) ;
364
393
return this . database . readTransaction (
365
394
async ( tx ) => {
366
395
const res = await callback ( { ...tx } ) ;
@@ -375,7 +404,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
375
404
callback : ( tx : Transaction ) => Promise < T > ,
376
405
lockTimeout : number = DEFAULT_LOCK_TIMEOUT_MS
377
406
) : Promise < T > {
378
- await this . initialized ;
407
+ await this . waitForReady ( ) ;
379
408
return this . database . writeTransaction (
380
409
async ( tx ) => {
381
410
const res = await callback ( tx ) ;
@@ -389,7 +418,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
389
418
390
419
async * watch ( sql : string , parameters ?: any [ ] , options ?: SQLWatchOptions ) : AsyncIterable < QueryResult > {
391
420
//Fetch initial data
392
- yield await this . execute ( sql , parameters ) ;
421
+ yield await this . executeReadOnly ( sql , parameters ) ;
393
422
394
423
const resolvedTables = options ?. tables ?? [ ] ;
395
424
if ( ! options ?. tables ) {
@@ -408,7 +437,7 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
408
437
...( options ?? { } ) ,
409
438
tables : resolvedTables
410
439
} ) ) {
411
- yield await this . execute ( sql , parameters ) ;
440
+ yield await this . executeReadOnly ( sql , parameters ) ;
412
441
}
413
442
}
414
443
@@ -459,4 +488,9 @@ export abstract class AbstractPowerSyncDatabase extends BaseObserver<PowerSyncDB
459
488
return ( ) => dispose ( ) ;
460
489
} ) ;
461
490
}
491
+
492
+ private async executeReadOnly ( sql : string , params : any [ ] ) {
493
+ await this . waitForReady ( ) ;
494
+ return this . database . readLock ( ( tx ) => tx . execute ( sql , params ) ) ;
495
+ }
462
496
}
0 commit comments