Skip to content

Commit e359ca9

Browse files
committed
Cleaning out BucketStorage interface.
1 parent 24bceba commit e359ca9

File tree

6 files changed

+7
-1249
lines changed

6 files changed

+7
-1249
lines changed

PowerSync/PowerSync.Common/Client/Sync/Bucket/BucketStorageAdapter.cs

Lines changed: 1 addition & 43 deletions
Original file line numberDiff line numberDiff line change
@@ -20,38 +20,6 @@ public class Checkpoint
2020
public string? WriteCheckpoint { get; set; } = null;
2121
}
2222

23-
public class BucketState
24-
{
25-
[JsonProperty("bucket")]
26-
public string Bucket { get; set; } = null!;
27-
28-
[JsonProperty("op_id")]
29-
public string OpId { get; set; } = null!;
30-
}
31-
32-
public class SyncLocalDatabaseResult
33-
{
34-
[JsonProperty("ready")]
35-
public bool Ready { get; set; }
36-
37-
[JsonProperty("checkpointValid")]
38-
public bool CheckpointValid { get; set; }
39-
40-
[JsonProperty("checkpointFailures")]
41-
public string[]? CheckpointFailures { get; set; }
42-
43-
public override bool Equals(object? obj)
44-
{
45-
if (obj is not SyncLocalDatabaseResult other) return false;
46-
return JsonConvert.SerializeObject(this) == JsonConvert.SerializeObject(other);
47-
}
48-
49-
public override int GetHashCode()
50-
{
51-
return JsonConvert.SerializeObject(this).GetHashCode();
52-
}
53-
}
54-
5523
public class BucketChecksum
5624
{
5725
[JsonProperty("bucket")]
@@ -84,21 +52,11 @@ public class BucketStorageEvent
8452
public interface IBucketStorageAdapter : IEventStream<BucketStorageEvent>
8553
{
8654
Task Init();
87-
Task SaveSyncData(SyncDataBatch batch);
88-
Task RemoveBuckets(string[] buckets);
89-
Task SetTargetCheckpoint(Checkpoint checkpoint);
90-
91-
void StartSession();
92-
93-
Task<BucketState[]> GetBucketStates();
94-
95-
Task<SyncLocalDatabaseResult> SyncLocalDatabase(Checkpoint checkpoint);
96-
55+
9756
Task<CrudEntry?> NextCrudItem();
9857
Task<bool> HasCrud();
9958
Task<CrudBatch?> GetCrudBatch(int limit = 100);
10059

101-
Task<bool> HasCompletedSync();
10260
Task<bool> UpdateLocalTarget(Func<Task<string>> callback);
10361

10462
/// <summary>

PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs

Lines changed: 2 additions & 175 deletions
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,6 @@ public class SqliteBucketStorage : EventStream<BucketStorageEvent>, IBucketStora
1616
public static readonly string MAX_OP_ID = "9223372036854775807";
1717

1818
private readonly IDBAdapter db;
19-
private bool hasCompletedSync;
2019
private bool pendingBucketDeletes;
2120
private readonly HashSet<string> tableNames;
2221
private string? clientId;
@@ -34,8 +33,6 @@ public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null)
3433
{
3534
this.db = db;
3635
this.logger = logger ?? NullLogger.Instance;
37-
;
38-
hasCompletedSync = false;
3936
pendingBucketDeletes = true;
4037
tableNames = [];
4138

@@ -59,7 +56,6 @@ public SqliteBucketStorage(IDBAdapter db, ILogger? logger = null)
5956

6057
public async Task Init()
6158
{
62-
hasCompletedSync = false;
6359
var existingTableRows =
6460
await db.GetAll<ExistingTableRowsResult>(
6561
"SELECT name FROM sqlite_master WHERE type='table' AND name GLOB 'ps_data_*'");
@@ -93,171 +89,7 @@ public string GetMaxOpId()
9389
{
9490
return MAX_OP_ID;
9591
}
96-
97-
public void StartSession()
98-
{
99-
}
100-
101-
public async Task<BucketState[]> GetBucketStates()
102-
{
103-
return
104-
await db.GetAll<BucketState>(
105-
"SELECT name as bucket, cast(last_op as TEXT) as op_id FROM ps_buckets WHERE pending_delete = 0 AND name != '$local'");
106-
}
107-
108-
public async Task SaveSyncData(SyncDataBatch batch)
109-
{
110-
await db.WriteTransaction(async tx =>
111-
{
112-
int count = 0;
113-
foreach (var b in batch.Buckets)
114-
{
115-
var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
116-
["save", JsonConvert.SerializeObject(new { buckets = new[] { b.ToJSON() } })]);
117-
logger.LogDebug("saveSyncData {message}", JsonConvert.SerializeObject(result));
118-
count += b.Data.Length;
119-
}
120-
121-
compactCounter += count;
122-
});
123-
}
124-
125-
public async Task RemoveBuckets(string[] buckets)
126-
{
127-
foreach (var bucket in buckets)
128-
{
129-
await DeleteBucket(bucket);
130-
}
131-
}
132-
133-
private async Task DeleteBucket(string bucket)
134-
{
135-
await db.WriteTransaction(async tx =>
136-
{
137-
await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
138-
["delete_bucket", bucket]);
139-
});
140-
141-
logger.LogDebug("Done deleting bucket");
142-
pendingBucketDeletes = true;
143-
}
144-
145-
private record LastSyncedResult(string? synced_at);
146-
147-
public async Task<bool> HasCompletedSync()
148-
{
149-
if (hasCompletedSync) return true;
150-
151-
var result = await db.Get<LastSyncedResult>("SELECT powersync_last_synced_at() as synced_at");
152-
153-
hasCompletedSync = result.synced_at != null;
154-
return hasCompletedSync;
155-
}
156-
157-
public async Task<SyncLocalDatabaseResult> SyncLocalDatabase(Checkpoint checkpoint)
158-
{
159-
var validation = await ValidateChecksums(checkpoint);
160-
if (!validation.CheckpointValid)
161-
{
162-
logger.LogError("Checksums failed for {failures}",
163-
JsonConvert.SerializeObject(validation.CheckpointFailures));
164-
foreach (var failedBucket in validation.CheckpointFailures ?? [])
165-
{
166-
await DeleteBucket(failedBucket);
167-
}
168-
169-
return new SyncLocalDatabaseResult
170-
{
171-
Ready = false,
172-
CheckpointValid = false,
173-
CheckpointFailures = validation.CheckpointFailures
174-
};
175-
}
176-
177-
var bucketNames = checkpoint.Buckets.Select(b => b.Bucket).ToArray();
178-
await db.WriteTransaction(async tx =>
179-
{
180-
await tx.Execute(
181-
"UPDATE ps_buckets SET last_op = ? WHERE name IN (SELECT json_each.value FROM json_each(?))",
182-
[checkpoint.LastOpId, JsonConvert.SerializeObject(bucketNames)]
183-
);
184-
185-
if (checkpoint.WriteCheckpoint != null)
186-
{
187-
await tx.Execute(
188-
"UPDATE ps_buckets SET last_op = ? WHERE name = '$local'",
189-
[checkpoint.WriteCheckpoint]
190-
);
191-
}
192-
});
193-
194-
var valid = await UpdateObjectsFromBuckets(checkpoint);
195-
if (!valid)
196-
{
197-
logger.LogDebug("Not at a consistent checkpoint - cannot update local db");
198-
return new SyncLocalDatabaseResult
199-
{
200-
Ready = false,
201-
CheckpointValid = true
202-
};
203-
}
204-
205-
await ForceCompact();
206-
207-
return new SyncLocalDatabaseResult
208-
{
209-
Ready = true,
210-
CheckpointValid = true
211-
};
212-
}
213-
214-
private async Task<bool> UpdateObjectsFromBuckets(Checkpoint checkpoint)
215-
{
216-
return await db.WriteTransaction(async tx =>
217-
{
218-
var result = await tx.Execute("INSERT INTO powersync_operations(op, data) VALUES(?, ?)",
219-
["sync_local", ""]);
220-
221-
return result.InsertId == 1;
222-
});
223-
}
224-
225-
private record ResultResult(object result);
226-
227-
public class ResultDetail
228-
{
229-
[JsonProperty("valid")] public bool Valid { get; set; }
230-
231-
[JsonProperty("failed_buckets")] public List<string>? FailedBuckets { get; set; }
232-
}
233-
234-
public async Task<SyncLocalDatabaseResult> ValidateChecksums(
235-
Checkpoint checkpoint)
236-
{
237-
var result = await db.Get<ResultResult>("SELECT powersync_validate_checkpoint(?) as result",
238-
[JsonConvert.SerializeObject(checkpoint)]);
239-
240-
logger.LogDebug("validateChecksums result item {message}", JsonConvert.SerializeObject(result));
241-
242-
if (result == null) return new SyncLocalDatabaseResult { CheckpointValid = false, Ready = false };
243-
244-
var resultDetail = JsonConvert.DeserializeObject<ResultDetail>(result.result.ToString() ?? "{}");
245-
246-
if (resultDetail?.Valid == true)
247-
{
248-
return new SyncLocalDatabaseResult { Ready = true, CheckpointValid = true };
249-
}
250-
else
251-
{
252-
return new SyncLocalDatabaseResult
253-
{
254-
CheckpointValid = false,
255-
Ready = false,
256-
CheckpointFailures = resultDetail?.FailedBuckets?.ToArray() ?? []
257-
};
258-
}
259-
}
260-
92+
26193
/// <summary>
26294
/// Force a compact operation, primarily for testing purposes.
26395
/// </summary>
@@ -435,12 +267,7 @@ public async Task<bool> HasCrud()
435267
{
436268
return await db.GetOptional<object>("SELECT 1 as ignore FROM ps_crud LIMIT 1") != null;
437269
}
438-
439-
public async Task SetTargetCheckpoint(Checkpoint checkpoint)
440-
{
441-
// No Op
442-
await Task.CompletedTask;
443-
}
270+
444271

445272
record ControlResult(string? r);
446273

PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,4 +122,4 @@ public class FetchCredentials: Instruction
122122

123123
public class CloseSyncStream : Instruction { }
124124
public class FlushFileSystem : Instruction { }
125-
public class DidCompleteSync : Instruction { }
125+
public class DidCompleteSync : Instruction { }

PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -619,11 +619,6 @@ await locks.ObtainLock(new LockOptions<Task>
619619
});
620620
}
621621

622-
public async Task<bool> HasCompletedSync()
623-
{
624-
return await Options.Adapter.HasCompletedSync();
625-
}
626-
627622
public async Task WaitForReady()
628623
{
629624
// Do nothing

PowerSync/PowerSync.Common/DB/Crud/SyncStatus.cs

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -26,18 +26,15 @@ public class SyncDataFlowStatus
2626
public class SyncPriorityStatus
2727
{
2828
[JsonProperty("uploading")] public int Priority { get; set; }
29-
30-
29+
3130
[JsonProperty("lastSyncedAt")] public DateTime? LastSyncedAt { get; set; }
3231

3332
[JsonProperty("hasSynced")] public bool? HasSynced { get; set; }
3433
}
3534

3635
public class SyncStatusOptions
3736
{
38-
public SyncStatusOptions()
39-
{
40-
}
37+
public SyncStatusOptions() {}
4138

4239
public SyncStatusOptions(SyncStatusOptions options)
4340
{
@@ -146,7 +143,7 @@ public string ToJSON()
146143
{
147144
return JsonConvert.SerializeObject(this);
148145
}
149-
146+
150147
private static int ComparePriorities(SyncPriorityStatus a, SyncPriorityStatus b)
151148
{
152149
// Lower numbers = higher priority

0 commit comments

Comments
 (0)