Skip to content

Commit 80b7dd3

Browse files
committed
Updated WaitForFirstSync to support priorities. Removing old sync from remote.
1 parent e359ca9 commit 80b7dd3

File tree

4 files changed

+58
-195
lines changed

4 files changed

+58
-195
lines changed

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 52 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -158,22 +158,49 @@ public async Task WaitForReady()
158158

159159
await isReadyTask;
160160
}
161+
162+
public class PrioritySyncRequest
163+
{
164+
public CancellationToken? Token { get; set; }
165+
public int? Priority { get; set; }
166+
}
161167

162-
public async Task WaitForFirstSync(CancellationToken? cancellationToken = null)
168+
/// <summary>
169+
/// Wait for the first sync operation to complete.
170+
/// </summary>
171+
/// <param name="request">
172+
/// An object providing a cancellation token and a priority target.
173+
/// When a priority target is set, the task may complete when all buckets with the given (or higher)
174+
/// priorities have been synchronized. This can be earlier than a complete sync.
175+
/// </param>
176+
/// <returns>A task which will complete once the first full sync has completed.</returns>
177+
public async Task WaitForFirstSync(PrioritySyncRequest? request = null)
163178
{
164-
if (CurrentStatus.HasSynced == true)
179+
var priority = request?.Priority ?? null;
180+
var cancellationToken = request?.Token ?? null;
181+
182+
bool StatusMatches(SyncStatus status)
183+
{
184+
if (priority == null)
185+
{
186+
return status.HasSynced == true;
187+
}
188+
return status.StatusForPriority(priority.Value).HasSynced == true;
189+
}
190+
191+
if (StatusMatches(CurrentStatus))
165192
{
166193
return;
167194
}
168195

169196
var tcs = new TaskCompletionSource<bool>();
170197
var cts = new CancellationTokenSource();
171198

172-
var _ = Task.Run(() =>
199+
_ = Task.Run(() =>
173200
{
174201
foreach (var update in Listen(cts.Token))
175202
{
176-
if (update.StatusChanged?.HasSynced == true)
203+
if (update.StatusChanged != null && StatusMatches(update.StatusChanged!))
177204
{
178205
cts.Cancel();
179206
tcs.SetResult(true);
@@ -230,7 +257,7 @@ private async Task LoadVersion()
230257
}
231258
}
232259

233-
private record LastSyncedResult(int? priority, string? last_synced_at);
260+
private record LastSyncedResult(int priority, string? last_synced_at);
234261

235262
protected async Task UpdateHasSynced()
236263
{
@@ -239,6 +266,7 @@ protected async Task UpdateHasSynced()
239266
);
240267

241268
DateTime? lastCompleteSync = null;
269+
List<DB.Crud.SyncPriorityStatus> priorityStatuses = [];
242270

243271
foreach (var result in results)
244272
{
@@ -249,17 +277,28 @@ protected async Task UpdateHasSynced()
249277
// This lowest-possible priority represents a complete sync.
250278
lastCompleteSync = parsedDate;
251279
}
280+
else
281+
{
282+
priorityStatuses.Add(new DB.Crud.SyncPriorityStatus
283+
{
284+
Priority = result.priority,
285+
HasSynced = true,
286+
LastSyncedAt = parsedDate
287+
});
288+
}
252289
}
253290

254291
var hasSynced = lastCompleteSync != null;
255-
if (hasSynced != CurrentStatus.HasSynced)
292+
var updatedStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options)
256293
{
257-
CurrentStatus = new SyncStatus(new SyncStatusOptions(CurrentStatus.Options)
258-
{
259-
HasSynced = hasSynced,
260-
LastSyncedAt = lastCompleteSync,
261-
});
262-
294+
HasSynced = hasSynced,
295+
PriorityStatusEntries = priorityStatuses.ToArray(),
296+
LastSyncedAt = lastCompleteSync,
297+
});
298+
299+
if (!updatedStatus.IsEqual(CurrentStatus))
300+
{
301+
CurrentStatus = updatedStatus;
263302
Emit(new PowerSyncDBEvent { StatusChanged = CurrentStatus });
264303
}
265304
}
@@ -536,7 +575,7 @@ await tx.Execute(
536575
}
537576

538577
/// <summary>
539-
/// Get an unique client id for this database.
578+
/// Get a unique client id for this database.
540579
///
541580
/// The id is not reset when the database is cleared, only when the database is deleted.
542581
/// </summary>

PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs

Lines changed: 2 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,13 @@ namespace PowerSync.Common.Client.Sync.Stream;
1616
public class SyncStreamOptions
1717
{
1818
public string Path { get; set; } = "";
19+
1920
public StreamingSyncRequest Data { get; set; } = new();
2021
public Dictionary<string, string> Headers { get; set; } = new();
2122

2223
public CancellationToken CancellationToken { get; set; } = CancellationToken.None;
2324
}
2425

25-
public class RequestDetails
26-
{
27-
public string Url { get; set; } = "";
28-
public Dictionary<string, string> Headers { get; set; } = new();
29-
}
30-
3126
public class Remote
3227
{
3328
private readonly HttpClient httpClient;
@@ -120,34 +115,6 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
120115
var responseData = await response.Content.ReadAsStringAsync();
121116
return JsonConvert.DeserializeObject<T>(responseData)!;
122117
}
123-
124-
public async IAsyncEnumerable<StreamingSyncLine?> OldPostStream(SyncStreamOptions options)
125-
{
126-
using var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
127-
using var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken);
128-
129-
if (response.Content == null)
130-
{
131-
throw new HttpRequestException($"HTTP {response.StatusCode}: No content");
132-
}
133-
else
134-
if (!response.IsSuccessStatusCode)
135-
{
136-
var errorText = await response.Content.ReadAsStringAsync();
137-
throw new HttpRequestException($"HTTP {response.StatusCode}: {errorText}");
138-
}
139-
140-
var stream = await response.Content.ReadAsStreamAsync();
141-
142-
// Read NDJSON stream
143-
using var reader = new StreamReader(stream, Encoding.UTF8);
144-
string? line;
145-
146-
while ((line = await reader.ReadLineAsync()) != null)
147-
{
148-
yield return ParseStreamingSyncLine(JObject.Parse(line));
149-
}
150-
}
151118

152119
/// <summary>
153120
/// Posts to the stream endpoint and returns a raw NDJSON stream that can be read line by line.
@@ -175,35 +142,7 @@ public async Task<Stream> PostStreamRaw(SyncStreamOptions options)
175142
return await response.Content.ReadAsStreamAsync();
176143
}
177144

178-
public static StreamingSyncLine? ParseStreamingSyncLine(JObject json)
179-
{
180-
// Determine the type based on available keys
181-
if (json.ContainsKey("checkpoint"))
182-
{
183-
return json.ToObject<StreamingSyncCheckpoint>();
184-
}
185-
else if (json.ContainsKey("checkpoint_diff"))
186-
{
187-
return json.ToObject<StreamingSyncCheckpointDiff>();
188-
}
189-
else if (json.ContainsKey("checkpoint_complete"))
190-
{
191-
return json.ToObject<StreamingSyncCheckpointComplete>();
192-
}
193-
else if (json.ContainsKey("data"))
194-
{
195-
return json.ToObject<StreamingSyncDataJSON>();
196-
}
197-
else if (json.ContainsKey("token_expires_in"))
198-
{
199-
return json.ToObject<StreamingSyncKeepalive>();
200-
}
201-
else
202-
{
203-
return null;
204-
}
205-
}
206-
145+
207146
private async Task<HttpRequestMessage> BuildRequest(HttpMethod method, string path, object? data = null, Dictionary<string, string>? additionalHeaders = null)
208147
{
209148
var credentials = await GetCredentials();

PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -332,10 +332,10 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
332332
});
333333
}
334334

335-
protected async Task<bool> StreamingSyncIteration(CancellationToken signal,
335+
protected async Task StreamingSyncIteration(CancellationToken signal,
336336
PowerSyncConnectionOptions? options)
337337
{
338-
return await locks.ObtainLock(new LockOptions<bool>
338+
await locks.ObtainLock(new LockOptions<bool>
339339
{
340340
Type = LockType.SYNC,
341341
Token = signal,
@@ -473,11 +473,9 @@ async Task HandleInstruction(Instruction instruction)
473473
// DownloadProgress = info.Downloading?.Buckets
474474
}
475475
},
476-
// TODO handle errors later?
477476
new UpdateSyncStatusOptions
478477
{
479-
// ClearDownloadError = true,
480-
// ClearUploadError = true
478+
ClearDownloadError = true,
481479
}
482480
);
483481

PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncTypes.cs

Lines changed: 1 addition & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -4,54 +4,6 @@ namespace PowerSync.Common.Client.Sync.Stream;
44
using PowerSync.Common.DB.Crud;
55
using Newtonsoft.Json;
66

7-
public class ContinueCheckpointRequest
8-
{
9-
[JsonProperty("buckets")]
10-
public List<BucketRequest> Buckets { get; set; } = new();
11-
12-
[JsonProperty("checkpoint_token")]
13-
public string CheckpointToken { get; set; } = "";
14-
15-
[JsonProperty("limit")]
16-
public int? Limit { get; set; }
17-
}
18-
19-
public class SyncNewCheckpointRequest
20-
{
21-
[JsonProperty("buckets")]
22-
public List<BucketRequest>? Buckets { get; set; }
23-
24-
[JsonProperty("request_checkpoint")]
25-
public RequestCheckpoint RequestCheckpoint { get; set; } = new();
26-
27-
[JsonProperty("limit")]
28-
public int? Limit { get; set; }
29-
}
30-
31-
public class RequestCheckpoint
32-
{
33-
[JsonProperty("include_data")]
34-
public bool IncludeData { get; set; }
35-
36-
[JsonProperty("include_checksum")]
37-
public bool IncludeChecksum { get; set; }
38-
}
39-
40-
public class SyncResponse
41-
{
42-
[JsonProperty("data")]
43-
public List<SyncDataBucketJSON>? Data { get; set; }
44-
45-
[JsonProperty("has_more")]
46-
public bool HasMore { get; set; }
47-
48-
[JsonProperty("checkpoint_token")]
49-
public string? CheckpointToken { get; set; }
50-
51-
[JsonProperty("checkpoint")]
52-
public Checkpoint? Checkpoint { get; set; }
53-
}
54-
557
public class StreamingSyncRequest
568
{
579
[JsonProperty("buckets")]
@@ -80,69 +32,4 @@ public class BucketRequest
8032

8133
[JsonProperty("after")]
8234
public string After { get; set; } = "";
83-
}
84-
85-
public abstract class StreamingSyncLine { }
86-
87-
public class StreamingSyncCheckpoint : StreamingSyncLine
88-
{
89-
[JsonProperty("checkpoint")]
90-
public Checkpoint Checkpoint { get; set; } = new();
91-
}
92-
93-
public class StreamingSyncCheckpointDiff : StreamingSyncLine
94-
{
95-
[JsonProperty("checkpoint_diff")]
96-
public CheckpointDiff CheckpointDiff { get; set; } = new();
97-
}
98-
99-
public class CheckpointDiff
100-
{
101-
[JsonProperty("last_op_id")]
102-
public string LastOpId { get; set; } = "";
103-
104-
[JsonProperty("updated_buckets")]
105-
public List<BucketChecksum> UpdatedBuckets { get; set; } = new();
106-
107-
[JsonProperty("removed_buckets")]
108-
public List<string> RemovedBuckets { get; set; } = new();
109-
110-
[JsonProperty("write_checkpoint")]
111-
public string WriteCheckpoint { get; set; } = "";
112-
}
113-
114-
public class StreamingSyncDataJSON : StreamingSyncLine
115-
{
116-
[JsonProperty("data")]
117-
public SyncDataBucketJSON Data { get; set; } = new();
118-
}
119-
120-
public class StreamingSyncCheckpointComplete : StreamingSyncLine
121-
{
122-
[JsonProperty("checkpoint_complete")]
123-
public CheckpointComplete CheckpointComplete { get; set; } = new();
124-
}
125-
126-
public class CheckpointComplete
127-
{
128-
[JsonProperty("last_op_id")]
129-
public string LastOpId { get; set; } = "";
130-
}
131-
132-
public class StreamingSyncKeepalive : StreamingSyncLine
133-
{
134-
[JsonProperty("token_expires_in")]
135-
public int? TokenExpiresIn { get; set; }
136-
}
137-
138-
public class CrudRequest
139-
{
140-
[JsonProperty("data")]
141-
public List<CrudEntry> Data { get; set; } = new();
142-
}
143-
144-
public class CrudResponse
145-
{
146-
[JsonProperty("checkpoint")]
147-
public string? Checkpoint { get; set; }
148-
}
35+
}

0 commit comments

Comments
 (0)