@@ -4,19 +4,15 @@ namespace PowerSync.Common.Client.Sync.Bucket;
44using System . Collections . Generic ;
55using System . Linq ;
66using System . Threading . Tasks ;
7-
87using Microsoft . Extensions . Logging ;
98using Microsoft . Extensions . Logging . Abstractions ;
10-
119using Newtonsoft . Json ;
12-
1310using PowerSync . Common . DB ;
1411using PowerSync . Common . DB . Crud ;
1512using PowerSync . Common . Utils ;
1613
1714public class SqliteBucketStorage : EventStream < BucketStorageEvent > , IBucketStorageAdapter
1815{
19-
2016 public static readonly string MAX_OP_ID = "9223372036854775807" ;
2117
2218 private readonly IDBAdapter db ;
@@ -37,7 +33,8 @@ private record ExistingTableRowsResult(string name);
3733 public SqliteBucketStorage ( IDBAdapter db , ILogger ? logger = null )
3834 {
3935 this . db = db ;
40- this . logger = logger ?? NullLogger . Instance ; ;
36+ this . logger = logger ?? NullLogger . Instance ;
37+ ;
4138 hasCompletedSync = false ;
4239 pendingBucketDeletes = true ;
4340 tableNames = [ ] ;
@@ -62,9 +59,10 @@ public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null)
6259
6360 public async Task Init ( )
6461 {
65-
6662 hasCompletedSync = false ;
67- var existingTableRows = await db . GetAll < ExistingTableRowsResult > ( "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'" ) ;
63+ var existingTableRows =
64+ await db . GetAll < ExistingTableRowsResult > (
65+ "SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'" ) ;
6866
6967 foreach ( var row in existingTableRows )
7068 {
@@ -79,6 +77,7 @@ public async Task Init()
7977 }
8078
8179 private record ClientIdResult ( string ? client_id ) ;
80+
8281 public async Task < string > GetClientId ( )
8382 {
8483 if ( clientId == null )
@@ -95,12 +94,15 @@ public string GetMaxOpId()
9594 return MAX_OP_ID ;
9695 }
9796
98- public void StartSession ( ) { }
97+ public void StartSession ( )
98+ {
99+ }
99100
100101 public async Task < BucketState [ ] > GetBucketStates ( )
101102 {
102103 return
103- await db . GetAll < BucketState > ( "SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'" ) ;
104+ await db . GetAll < BucketState > (
105+ "SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'" ) ;
104106 }
105107
106108 public async Task SaveSyncData ( SyncDataBatch batch )
@@ -115,6 +117,7 @@ await db.WriteTransaction(async tx =>
115117 logger . LogDebug ( "saveSyncData {message}" , JsonConvert . SerializeObject ( result ) ) ;
116118 count += b . Data . Length ;
117119 }
120+
118121 compactCounter += count ;
119122 } ) ;
120123 }
@@ -140,6 +143,7 @@ await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
140143 }
141144
142145 private record LastSyncedResult ( string ? synced_at ) ;
146+
143147 public async Task < bool > HasCompletedSync ( )
144148 {
145149 if ( hasCompletedSync ) return true ;
@@ -155,11 +159,13 @@ public async Task<SyncLocalDatabaseResult> SyncLocalDatabase(Checkpoint checkpoi
155159 var validation = await ValidateChecksums ( checkpoint ) ;
156160 if ( ! validation . CheckpointValid )
157161 {
158- logger . LogError ( "Checksums failed for {failures}" , JsonConvert . SerializeObject ( validation . CheckpointFailures ) ) ;
162+ logger . LogError ( "Checksums failed for {failures}" ,
163+ JsonConvert . SerializeObject ( validation . CheckpointFailures ) ) ;
159164 foreach ( var failedBucket in validation . CheckpointFailures ?? [ ] )
160165 {
161166 await DeleteBucket ( failedBucket ) ;
162167 }
168+
163169 return new SyncLocalDatabaseResult
164170 {
165171 Ready = false ,
@@ -210,7 +216,7 @@ private async Task<bool> UpdateObjectsFromBuckets(Checkpoint checkpoint)
210216 return await db . WriteTransaction ( async tx =>
211217 {
212218 var result = await tx . Execute ( "INSERT INTO powersync_operations(op, data) VALUES(?, ?)" ,
213- [ "sync_local" , "" ] ) ;
219+ [ "sync_local" , "" ] ) ;
214220
215221 return result . InsertId == 1 ;
216222 } ) ;
@@ -220,18 +226,16 @@ private record ResultResult(object result);
220226
221227 public class ResultDetail
222228 {
223- [ JsonProperty ( "valid" ) ]
224- public bool Valid { get ; set ; }
229+ [ JsonProperty ( "valid" ) ] public bool Valid { get ; set ; }
225230
226- [ JsonProperty ( "failed_buckets" ) ]
227- public List < string > ? FailedBuckets { get ; set ; }
231+ [ JsonProperty ( "failed_buckets" ) ] public List < string > ? FailedBuckets { get ; set ; }
228232 }
229233
230234 public async Task < SyncLocalDatabaseResult > ValidateChecksums (
231235 Checkpoint checkpoint )
232236 {
233237 var result = await db . Get < ResultResult > ( "SELECT powersync_validate_checkpoint(?) as result" ,
234- [ JsonConvert . SerializeObject ( checkpoint ) ] ) ;
238+ [ JsonConvert . SerializeObject ( checkpoint ) ] ) ;
235239
236240 logger . LogDebug ( "validateChecksums result item {message}" , JsonConvert . SerializeObject ( result ) ) ;
237241
@@ -298,6 +302,7 @@ await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES (?, ?)",
298302 }
299303
300304 private record TargetOpResult ( string target_op ) ;
305+
301306 private record SequenceResult ( int seq ) ;
302307
303308 public async Task < bool > UpdateLocalTarget ( Func < Task < string > > callback )
@@ -351,16 +356,18 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
351356
352357 if ( seqAfter != seqBefore )
353358 {
354- logger . LogDebug ( "[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})" , seqAfter , seqBefore ) ;
359+ logger . LogDebug ( "[updateLocalTarget] seqAfter ({seqAfter}) != seqBefore ({seqBefore})" , seqAfter ,
360+ seqBefore ) ;
355361 return false ;
356362 }
357363
358364 var response = await tx . Execute (
359- "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
360- [ opId ]
361- ) ;
365+ "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
366+ [ opId ]
367+ ) ;
362368
363- logger . LogDebug ( "[updateLocalTarget] Response from updating target_op: {response}" , JsonConvert . SerializeObject ( response ) ) ;
369+ logger . LogDebug ( "[updateLocalTarget] Response from updating target_op: {response}" ,
370+ JsonConvert . SerializeObject ( response ) ) ;
364371 return true ;
365372 } ) ;
366373 }
@@ -388,33 +395,33 @@ public async Task<bool> UpdateLocalTarget(Func<Task<string>> callback)
388395 var last = all [ all . Length - 1 ] ;
389396
390397 return new CrudBatch (
391- Crud : all ,
392- HaveMore : true ,
393- CompleteCallback : async ( string ? writeCheckpoint ) =>
394- {
395- await db . WriteTransaction ( async tx =>
398+ Crud : all ,
399+ HaveMore : true ,
400+ CompleteCallback : async ( string ? writeCheckpoint ) =>
396401 {
397- await tx . Execute ( "DELETE FROM ps_crud WHERE id <= ?" , [ last . ClientId ] ) ;
398-
399- if ( ! string . IsNullOrEmpty ( writeCheckpoint ) )
402+ await db . WriteTransaction ( async tx =>
400403 {
401- var crudResult = await tx . GetAll < object > ( "SELECT 1 FROM ps_crud LIMIT 1" ) ;
402- if ( crudResult ? . Length > 0 )
404+ await tx . Execute ( "DELETE FROM ps_crud WHERE id <= ?" , [ last . ClientId ] ) ;
405+
406+ if ( ! string . IsNullOrEmpty ( writeCheckpoint ) )
407+ {
408+ var crudResult = await tx . GetAll < object > ( "SELECT 1 FROM ps_crud LIMIT 1" ) ;
409+ if ( crudResult ? . Length > 0 )
410+ {
411+ await tx . Execute (
412+ "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
413+ [ writeCheckpoint ] ) ;
414+ }
415+ }
416+ else
403417 {
404418 await tx . Execute (
405419 "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
406- [ writeCheckpoint ] ) ;
420+ [ GetMaxOpId ( ) ] ) ;
407421 }
408- }
409- else
410- {
411- await tx . Execute (
412- "UPDATE ps_buckets SET target_op = CAST(? as INTEGER) WHERE name='$local'" ,
413- [ GetMaxOpId ( ) ] ) ;
414- }
415- } ) ;
416- }
417- ) ;
422+ } ) ;
423+ }
424+ ) ;
418425 }
419426
420427 public async Task < CrudEntry ? > NextCrudItem ( )
@@ -436,13 +443,15 @@ public async Task SetTargetCheckpoint(Checkpoint checkpoint)
436443 }
437444
438445 record ControlResult ( string ? r ) ;
446+
439447 public async Task < string > Control ( string op , object ? payload )
440448 {
441449 return await db . WriteTransaction ( async tx =>
442450 {
443- var result = await tx . Get < ControlResult > ( "SELECT powersync_control(?, ?) AS r" , [ op , payload ] ) ;
444- Console . WriteLine ( "Control Response: " + result . r ) ;
445- return result . r ;
446- } ) ;
451+ var result = await tx . Get < ControlResult > ( "SELECT powersync_control(?, ?) AS r" , [ op , payload ] ) ;
452+
453+
454+ return result . r ;
455+ } ) ;
447456 }
448- }
457+ }
0 commit comments