Skip to content

Commit 8de793d

Browse files
committed
Working through sync implementation issue.
1 parent a2dae0a commit 8de793d

File tree

8 files changed

+77
-73
lines changed

8 files changed

+77
-73
lines changed

PowerSync/PowerSync.Common/Client/PowerSyncDatabase.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -65,11 +65,11 @@ public interface IPowerSyncDatabase : IEventStream<PowerSyncDBEvent>
6565

6666
Task<NonQueryResult> Execute(string query, object[]? parameters = null);
6767

68-
Task<T[]> GetAll<T>(string sql, params object[]? parameters);
68+
Task<T[]> GetAll<T>(string sql, object[]? parameters = null);
6969

70-
Task<T?> GetOptional<T>(string sql, params object[]? parameters);
70+
Task<T?> GetOptional<T>(string sql, object[]? parameters = null);
7171

72-
Task<T> Get<T>(string sql, params object[]? parameters);
72+
Task<T> Get<T>(string sql, object[]? parameters = null);
7373

7474
Task<T> ReadLock<T>(Func<ILockContext, Task<T>> fn, DBLockOptions? options = null);
7575

PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -271,14 +271,14 @@ public async Task<bool> HasCrud()
271271

272272
record ControlResult(string? r);
273273

274-
public async Task<string> Control(string op, object? payload)
274+
public async Task<string> Control(string op, object? payload = null)
275275
{
276+
Console.WriteLine("Calling control on extension "+ op + " - ");
276277
return await db.WriteTransaction(async tx =>
277278
{
278-
var result = await tx.Get<ControlResult>("SELECT powersync_control(?, ?) AS r", [op, payload]);
279-
280-
281-
return result.r;
279+
var result = await tx.Get<ControlResult>("SELECT powersync_control(?, ?) AS r", [op, payload ?? ""]);
280+
Console.WriteLine("completed op: " + op + " - " + JsonConvert.SerializeObject(result));
281+
return result.r!;
282282
});
283283
}
284284
}

PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,12 @@ public static Instruction[] ParseInstructions(string rawResponse)
1818

1919
foreach (JObject item in jsonArray)
2020
{
21-
instructions.Add(ParseInstruction(item));
21+
var instruction = ParseInstruction(item);
22+
if (instruction == null)
23+
{
24+
throw new JsonSerializationException("Failed to parse instruction from JSON.");
25+
}
26+
instructions.Add(instruction);
2227
}
2328

2429
return instructions.ToArray();

PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs

Lines changed: 1 addition & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -1,17 +1,15 @@
11
namespace PowerSync.Common.Client.Sync.Stream;
22

3+
using Connection;
34
using System.IO;
45
using System.Net.Http;
56
using System.Reflection;
67
using System.Text;
78
using System.Threading;
89
using System.Threading.Tasks;
9-
using System.Text.RegularExpressions;
1010

1111
using Newtonsoft.Json;
12-
using Newtonsoft.Json.Linq;
1312

14-
using PowerSync.Common.Client.Connection;
1513

1614
public class SyncStreamOptions
1715
{

PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs

Lines changed: 52 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -137,7 +137,7 @@ public StreamingSyncImplementation(StreamingSyncImplementationOptions options)
137137

138138
TriggerCrudUpload = () =>
139139
{
140-
if (!SyncStatus.Connected || SyncStatus.DataFlowStatus.Uploading)
140+
if (!SyncStatus.Connected)
141141
{
142142
return;
143143
}
@@ -280,8 +280,9 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
280280
{
281281
break;
282282
}
283-
283+
Console.WriteLine("XXXX starting");
284284
await StreamingSyncIteration(nestedCts.Token, options);
285+
Console.WriteLine("XXXX ending");
285286
// Continue immediately
286287
}
287288
catch (Exception ex)
@@ -346,41 +347,24 @@ await locks.ObtainLock(new LockOptions<bool>
346347
Params = options?.Params ?? DEFAULT_STREAM_CONNECTION_OPTIONS.Params
347348
};
348349

349-
await RustSyncIteration(signal, resolvedOptions);
350+
await SyncIteration(signal, resolvedOptions);
350351

351352
return true;
352353
}
353354
});
354355
}
355356

356-
private async Task RustSyncIteration(CancellationToken? signal, RequiredPowerSyncConnectionOptions resolvedOptions)
357+
private async Task SyncIteration(CancellationToken? signal, RequiredPowerSyncConnectionOptions resolvedOptions)
357358
{
358359
Task? receivingLines = null;
359360

360361
var nestedCts = new CancellationTokenSource();
361362
signal?.Register(() => { nestedCts.Cancel(); });
362363

363-
364-
try
365-
{
366-
notifyCompletedUploads = () => { Task.Run(async () => await Control("completed_upload")); };
367-
368-
await Control("start", JsonConvert.SerializeObject(resolvedOptions.Params));
369-
if (receivingLines != null)
370-
{
371-
await receivingLines;
372-
}
373-
}
374-
finally
375-
{
376-
notifyCompletedUploads = null;
377-
await Stop();
378-
}
379-
380-
return;
381-
382364
async Task Connect(EstablishSyncStream instruction)
383365
{
366+
Console.WriteLine("----- We got het here again" + nestedCts.Token.IsCancellationRequested);
367+
Console.WriteLine("-----" + JsonConvert.SerializeObject(instruction.Request));
384368
var syncOptions = new SyncStreamOptions
385369
{
386370
Path = "/sync/stream",
@@ -390,14 +374,19 @@ async Task Connect(EstablishSyncStream instruction)
390374

391375
var stream = await Options.Remote.PostStreamRaw(syncOptions);
392376
using var reader = new StreamReader(stream, Encoding.UTF8);
377+
378+
syncOptions.CancellationToken.Register(() => {
379+
try { stream?.Close(); } catch { }
380+
});
381+
393382
string? line;
394383

395384
while ((line = await reader.ReadLineAsync()) != null)
396385
{
397-
logger.LogDebug("Parsing line for rust sync stream {message}", line);
386+
logger.LogDebug("Parsing line for rust sync stream {message}", "xx");
398387
await Control("line_text", line);
399-
400388
}
389+
Console.WriteLine("Done");
401390
}
402391

403392
async Task Stop()
@@ -410,31 +399,18 @@ async Task Control(string op, object? payload = null)
410399
logger.LogDebug("Control call {message}", op);
411400

412401
var rawResponse = await Options.Adapter.Control(op, payload);
413-
await HandleInstructions(Instruction.ParseInstructions(rawResponse));
402+
HandleInstructions(Instruction.ParseInstructions(rawResponse));
414403
}
415404

416-
async Task HandleInstructions(Instruction[] instructions)
405+
void HandleInstructions(Instruction[] instructions)
417406
{
418407
foreach (var instruction in instructions)
419408
{
420-
await HandleInstruction(instruction);
409+
HandleInstruction(instruction);
421410
}
422411
}
423412

424-
DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status)
425-
{
426-
logger.LogWarning("Sync status {status}",
427-
status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null);
428-
return new DB.Crud.SyncPriorityStatus
429-
{
430-
Priority = status.Priority,
431-
HasSynced = status.HasSynced ?? null,
432-
// TODO check this value
433-
LastSyncedAt = status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null
434-
};
435-
}
436-
437-
async Task HandleInstruction(Instruction instruction)
413+
void HandleInstruction(Instruction instruction)
438414
{
439415
switch (instruction)
440416
{
@@ -451,7 +427,6 @@ async Task HandleInstruction(Instruction instruction)
451427
logger.LogWarning("{message}", logLine.Line);
452428
break;
453429
}
454-
455430
break;
456431
case UpdateSyncStatus syncStatus:
457432
var info = syncStatus.Status;
@@ -477,25 +452,20 @@ async Task HandleInstruction(Instruction instruction)
477452
ClearDownloadError = true,
478453
}
479454
);
480-
481455
break;
482456
case EstablishSyncStream establishSyncStream:
483457
if (receivingLines != null)
484458
{
485-
// Already connected, this shouldn't happen during a single iteration.
486459
throw new Exception("Unexpected request to establish sync stream, already connected");
487460
}
488-
489461
receivingLines = Connect(establishSyncStream);
490462
break;
491-
case FetchCredentials { DidExpire: true, }:
492-
Options.Remote.InvalidateCredentials();
493-
break;
494-
case FetchCredentials:
463+
case FetchCredentials fetchCredentials:
495464
Options.Remote.InvalidateCredentials();
496465
break;
497466
case CloseSyncStream:
498-
CancellationTokenSource?.Cancel();
467+
nestedCts.Cancel();
468+
logger.LogWarning("Closing stream");
499469
break;
500470
case FlushFileSystem:
501471
// ignore
@@ -507,6 +477,27 @@ async Task HandleInstruction(Instruction instruction)
507477
break;
508478
}
509479
}
480+
481+
try
482+
{
483+
notifyCompletedUploads = () => { Task.Run(async () => await Control("completed_upload")); };
484+
logger.LogError("START");
485+
await Control("start", JsonConvert.SerializeObject(resolvedOptions.Params));
486+
if (receivingLines != null)
487+
{
488+
await receivingLines;
489+
logger.LogError("Done waiting");
490+
}
491+
else
492+
{
493+
Console.WriteLine("No receiving lines task was started, this should not happen.");
494+
}
495+
}
496+
finally
497+
{
498+
notifyCompletedUploads = null;
499+
await Stop();
500+
}
510501
}
511502

512503
public new void Close()
@@ -668,6 +659,16 @@ private async Task DelayRetry()
668659
await Task.Delay(Options.RetryDelayMs.Value);
669660
}
670661
}
662+
663+
private static DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status)
664+
{
665+
return new DB.Crud.SyncPriorityStatus
666+
{
667+
Priority = status.Priority,
668+
HasSynced = status.HasSynced ?? null,
669+
LastSyncedAt = status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null
670+
};
671+
}
671672
}
672673

673674
enum LockType

PowerSync/PowerSync.Common/DB/IDBAdapter.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -30,13 +30,13 @@ public class QueryRows
3030
public interface IDBGetUtils
3131
{
3232
// Execute a read-only query and return results.
33-
Task<T[]> GetAll<T>(string sql, params object[]? parameters);
33+
Task<T[]> GetAll<T>(string sql, object[]? parameters = null);
3434

3535
// Execute a read-only query and return the first result, or null if the ResultSet is empty.
36-
Task<T?> GetOptional<T>(string sql, params object[]? parameters);
36+
Task<T?> GetOptional<T>(string sql, object[]? parameters = null);
3737

3838
// Execute a read-only query and return the first result, error if the ResultSet is empty.
39-
Task<T> Get<T>(string sql, params object[]? parameters);
39+
Task<T> Get<T>(string sql, object[]? parameters = null);
4040
}
4141

4242
public interface ILockContext : IDBGetUtils

PowerSync/PowerSync.Common/MDSQLite/MDSQLiteAdapter.cs

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -155,18 +155,18 @@ public Task<QueryResult> ExecuteBatch(string query, object[][]? parameters = nul
155155
throw new NotImplementedException();
156156
}
157157

158-
public async Task<T> Get<T>(string sql, params object[]? parameters)
158+
public async Task<T> Get<T>(string sql, object[]? parameters = null)
159159
{
160160
return await ReadLock((ctx) => ctx.Get<T>(sql, parameters));
161161
;
162162
}
163163

164-
public async Task<T[]> GetAll<T>(string sql, params object[]? parameters)
164+
public async Task<T[]> GetAll<T>(string sql, object[]? parameters = null)
165165
{
166166
return await ReadLock((ctx) => ctx.GetAll<T>(sql, parameters));
167167
}
168168

169-
public async Task<T?> GetOptional<T>(string sql, params object[]? parameters)
169+
public async Task<T?> GetOptional<T>(string sql, object[]? parameters = null)
170170
{
171171
return await ReadLock((ctx) => ctx.GetOptional<T>(sql, parameters));
172172
}
@@ -307,17 +307,17 @@ public Task<NonQueryResult> Execute(string query, object[]? parameters = null)
307307
return connection.Execute(query, parameters);
308308
}
309309

310-
public Task<T> Get<T>(string sql, params object[]? parameters)
310+
public Task<T> Get<T>(string sql, object[]? parameters = null)
311311
{
312312
return connection.Get<T>(sql, parameters);
313313
}
314314

315-
public Task<T[]> GetAll<T>(string sql, params object[]? parameters)
315+
public Task<T[]> GetAll<T>(string sql, object[]? parameters = null)
316316
{
317317
return connection.GetAll<T>(sql, parameters);
318318
}
319319

320-
public Task<T?> GetOptional<T>(string sql, params object[]? parameters)
320+
public Task<T?> GetOptional<T>(string sql, object[]? parameters = null)
321321
{
322322
return connection.GetOptional<T>(sql, parameters);
323323
}

PowerSync/PowerSync.Common/MDSQLite/MDSQLiteConnection.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -142,7 +142,7 @@ public async Task<QueryResult> ExecuteQuery(string query, object[]? parameters =
142142
var row = new Dictionary<string, object>();
143143
for (int i = 0; i < reader.FieldCount; i++)
144144
{
145-
row[reader.GetName(i)] = reader.IsDBNull(i) ? null : reader.GetValue(i);
145+
row[reader.GetName(i)] = reader.IsDBNull(i) ? null! : reader.GetValue(i);
146146
}
147147
rows.Add(row);
148148
}

0 commit comments

Comments
 (0)