Skip to content

Commit ca0152d

Browse files
committed
Fix instruction parsing.
1 parent ca8f217 commit ca0152d

File tree

4 files changed

+97
-73
lines changed

4 files changed

+97
-73
lines changed

PowerSync/PowerSync.Common/Client/Sync/Bucket/SqliteBucketStorage.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -441,7 +441,7 @@ public async Task<string> Control(string op, object? payload)
441441
return await db.WriteTransaction(async tx =>
442442
{
443443
var result = await tx.Get<ControlResult>("SELECT powersync_control(?, ?) AS r", [op, payload]);
444-
Console.WriteLine(result.r);
444+
Console.WriteLine("Control Response: " + result.r);
445445
return result.r;
446446
});
447447
}

PowerSync/PowerSync.Common/Client/Sync/Stream/CoreInstructions.cs

Lines changed: 43 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -3,11 +3,50 @@
33
namespace PowerSync.Common.Client.Sync.Stream;
44

55
using Newtonsoft.Json;
6-
using PowerSync.Common.Client.Sync.Stream;
76

8-
[JsonConverter(typeof(InstructionConverter))]
7+
/// <summary>
8+
/// An internal instruction emitted by the sync client in the core extension in response to the
9+
/// SDK passing sync data into the extension.
10+
/// </summary>
911
public abstract class Instruction
1012
{
13+
14+
public static Instruction[] ParseInstructions(string rawResponse)
15+
{
16+
var jsonArray = JArray.Parse(rawResponse);
17+
List<Instruction> instructions = [];
18+
19+
Console.WriteLine("Scanning instructions: "+ jsonArray.Count);
20+
foreach (JObject item in jsonArray)
21+
{
22+
instructions.Add(ParseInstruction(item));
23+
Console.WriteLine("Parsed instruction: " + JsonConvert.SerializeObject(ParseInstruction(item)));
24+
}
25+
26+
27+
28+
return instructions.ToArray();
29+
}
30+
31+
public static Instruction? ParseInstruction(JObject json)
32+
{
33+
if (json.ContainsKey("LogLine"))
34+
return json["LogLine"]!.ToObject<LogLine>();
35+
if (json.ContainsKey("UpdateSyncStatus"))
36+
return json["UpdateSyncStatus"]!.ToObject<UpdateSyncStatus>();
37+
if (json.ContainsKey("EstablishSyncStream"))
38+
return json["EstablishSyncStream"]!.ToObject<EstablishSyncStream>();
39+
if (json.ContainsKey("FetchCredentials"))
40+
return json["FetchCredentials"]!.ToObject<FetchCredentials>();
41+
if (json.ContainsKey("CloseSyncStream"))
42+
return new CloseSyncStream();
43+
if (json.ContainsKey("FlushFileSystem"))
44+
return new FlushFileSystem();
45+
if (json.ContainsKey("DidCompleteSync"))
46+
return new DidCompleteSync();
47+
48+
throw new JsonSerializationException("Unknown Instruction type.");
49+
}
1150
}
1251

1352
public class LogLine: Instruction
@@ -40,7 +79,7 @@ public class CoreSyncStatus
4079
public bool Connecting { get; set; }
4180

4281
[JsonProperty("priority_status")]
43-
public List<SyncPriorityStatus> PriorityStatus { get; set; } = null!;
82+
public List<SyncPriorityStatus> PriorityStatus { get; set; } = [];
4483

4584
[JsonProperty("downloading")]
4685
public DownloadProgress? Downloading { get; set; }
@@ -87,34 +126,4 @@ public class FetchCredentials: Instruction
87126

88127
public class CloseSyncStream : Instruction { }
89128
public class FlushFileSystem : Instruction { }
90-
public class DidCompleteSync : Instruction { }
91-
92-
public class InstructionConverter : JsonConverter<Instruction>
93-
{
94-
public override Instruction ReadJson(JsonReader reader, Type objectType, Instruction? existingValue, bool hasExistingValue, JsonSerializer serializer)
95-
{
96-
var jsonObject = JObject.Load(reader);
97-
Console.WriteLine("Meep" + jsonObject.ToString());
98-
if (jsonObject.ContainsKey("LogLine"))
99-
return jsonObject["LogLine"]!.ToObject<LogLine>(serializer)!;
100-
if (jsonObject.ContainsKey("UpdateSyncStatus"))
101-
return jsonObject["UpdateSyncStatus"]!.ToObject<UpdateSyncStatus>(serializer)!;
102-
if (jsonObject.ContainsKey("EstablishSyncStream"))
103-
return jsonObject["EstablishSyncStream"]!.ToObject<EstablishSyncStream>(serializer)!;
104-
if (jsonObject.ContainsKey("FetchCredentials"))
105-
return jsonObject["FetchCredentials"]!.ToObject<FetchCredentials>(serializer)!;
106-
if (jsonObject.ContainsKey("CloseSyncStream"))
107-
return new CloseSyncStream();
108-
if (jsonObject.ContainsKey("FlushFileSystem"))
109-
return new FlushFileSystem();
110-
if (jsonObject.ContainsKey("DidCompleteSync"))
111-
return new DidCompleteSync();
112-
Console.WriteLine("Throwing on" + jsonObject.ToString());
113-
throw new JsonSerializationException("Unknown Instruction type.");
114-
}
115-
116-
public override void WriteJson(JsonWriter writer, Instruction? value, JsonSerializer serializer)
117-
{
118-
throw new NotImplementedException("Writing not implemented.");
119-
}
120-
}
129+
public class DidCompleteSync : Instruction { }

PowerSync/PowerSync.Common/Client/Sync/Stream/Remote.cs

Lines changed: 22 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -121,12 +121,25 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
121121
return JsonConvert.DeserializeObject<T>(responseData)!;
122122
}
123123

124-
/// <summary>
125-
/// Posts to the stream endpoint and returns an async enumerable of parsed NDJSON lines.
126-
/// </summary>
127-
public async IAsyncEnumerable<StreamingSyncLine?> PostStream(SyncStreamOptions options)
124+
public async IAsyncEnumerable<StreamingSyncLine?> OldPostStream(SyncStreamOptions options)
128125
{
129-
using var stream = await PostStreamRaw(options);
126+
using var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
127+
using var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken);
128+
129+
if (response.Content == null)
130+
{
131+
throw new HttpRequestException($"HTTP {response.StatusCode}: No content");
132+
}
133+
else
134+
if (!response.IsSuccessStatusCode)
135+
{
136+
var errorText = await response.Content.ReadAsStringAsync();
137+
throw new HttpRequestException($"HTTP {response.StatusCode}: {errorText}");
138+
}
139+
140+
var stream = await response.Content.ReadAsStreamAsync();
141+
142+
// Read NDJSON stream
130143
using var reader = new StreamReader(stream, Encoding.UTF8);
131144
string? line;
132145

@@ -135,14 +148,14 @@ public async Task<T> Get<T>(string path, Dictionary<string, string>? headers = n
135148
yield return ParseStreamingSyncLine(JObject.Parse(line));
136149
}
137150
}
138-
151+
139152
/// <summary>
140-
/// Posts to the stream endpoint and returns a raw stream that can be read line by line.
153+
/// Posts to the stream endpoint and returns a raw NDJSON stream that can be read line by line.
141154
/// </summary>
142155
public async Task<Stream> PostStreamRaw(SyncStreamOptions options)
143156
{
144-
using var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
145-
using var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken);
157+
var requestMessage = await BuildRequest(HttpMethod.Post, options.Path, options.Data, options.Headers);
158+
var response = await httpClient.SendAsync(requestMessage, HttpCompletionOption.ResponseHeadersRead, options.CancellationToken);
146159

147160
if (response.Content == null)
148161
{

PowerSync/PowerSync.Common/Client/Sync/Stream/StreamingSyncImplementation.cs

Lines changed: 31 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
using System.Text;
2+
using Newtonsoft.Json.Linq;
23

34
namespace PowerSync.Common.Client.Sync.Stream;
45

@@ -107,12 +108,12 @@ public class StreamingSyncImplementation : EventStream<StreamingSyncImplementati
107108
private Task? streamingSyncTask;
108109
public Action TriggerCrudUpload { get; }
109110
private Action? notifyCompletedUploads;
110-
111+
111112
private CancellationTokenSource? crudUpdateCts;
112113
private readonly ILogger logger;
113114

114115
private readonly StreamingSyncLocks locks;
115-
116+
116117

117118
public StreamingSyncImplementation(StreamingSyncImplementationOptions options)
118119
{
@@ -286,7 +287,7 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
286287
catch (Exception ex)
287288
{
288289
logger.LogError("Caught exception in streaming sync: {message}", ex.Message);
289-
290+
Console.WriteLine(ex.StackTrace);
290291
// Either:
291292
// - A network request failed with a failed connection or not OKAY response code.
292293
// - There was a sync processing error.
@@ -344,9 +345,9 @@ protected async Task<bool> StreamingSyncIteration(CancellationToken signal,
344345
{
345346
Params = options?.Params ?? DEFAULT_STREAM_CONNECTION_OPTIONS.Params
346347
};
347-
348+
348349
await RustSyncIteration(signal, resolvedOptions);
349-
350+
350351
return true;
351352
}
352353
});
@@ -638,15 +639,14 @@ private async Task RustSyncIteration(CancellationToken? signal, RequiredPowerSyn
638639
var nestedCts = new CancellationTokenSource();
639640
signal?.Register(() => { nestedCts.Cancel(); });
640641

641-
642+
642643
try
643644
{
644-
notifyCompletedUploads = () => {
645-
Task.Run(async () => await Control("completed_upload"));
646-
};
647-
645+
notifyCompletedUploads = () => { Task.Run(async () => await Control("completed_upload")); };
646+
648647
await Control("start", JsonConvert.SerializeObject(resolvedOptions.Params));
649-
if (receivingLines != null) {
648+
if (receivingLines != null)
649+
{
650650
await receivingLines;
651651
}
652652
}
@@ -657,7 +657,7 @@ private async Task RustSyncIteration(CancellationToken? signal, RequiredPowerSyn
657657
}
658658

659659
return;
660-
660+
661661
async Task Connect(EstablishSyncStream instruction)
662662
{
663663
var syncOptions = new SyncStreamOptions
@@ -666,17 +666,16 @@ async Task Connect(EstablishSyncStream instruction)
666666
CancellationToken = nestedCts.Token,
667667
Data = instruction.Request
668668
};
669-
670-
using var stream = await Options.Remote.PostStreamRaw(syncOptions);
669+
670+
var stream = await Options.Remote.PostStreamRaw(syncOptions);
671671
using var reader = new StreamReader(stream, Encoding.UTF8);
672672
string? line;
673673

674674
while ((line = await reader.ReadLineAsync()) != null)
675675
{
676676
logger.LogDebug("Parsing line for rust sync stream {message}", line);
677-
await Control("line_binary", line);
677+
await Control("line_text", line);
678678
}
679-
680679
}
681680

682681
async Task Stop()
@@ -689,7 +688,7 @@ async Task Control(string op, object? payload = null)
689688
logger.LogDebug("Control call {message}", op);
690689

691690
var rawResponse = await Options.Adapter.Control(op, payload);
692-
await HandleInstructions(JsonConvert.DeserializeObject<Instruction[]>(rawResponse));
691+
await HandleInstructions(Instruction.ParseInstructions(rawResponse));
693692
}
694693

695694
async Task HandleInstructions(Instruction[] instructions)
@@ -699,10 +698,11 @@ async Task HandleInstructions(Instruction[] instructions)
699698
await HandleInstruction(instruction);
700699
}
701700
}
702-
701+
703702
DB.Crud.SyncPriorityStatus CoreStatusToSyncStatus(SyncPriorityStatus status)
704703
{
705-
logger.LogWarning("Sync status {status}", status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null);
704+
logger.LogWarning("Sync status {status}",
705+
status?.LastSyncedAt != null ? new DateTime(status!.LastSyncedAt) : null);
706706
return new DB.Crud.SyncPriorityStatus
707707
{
708708
Priority = status.Priority,
@@ -733,9 +733,10 @@ async Task HandleInstruction(Instruction instruction)
733733
break;
734734
case UpdateSyncStatus syncStatus:
735735
var info = syncStatus.Status;
736-
var coreCompleteSync = info.PriorityStatus.FirstOrDefault(s => s.Priority == SyncProgress.FULL_SYNC_PRIORITY);
736+
var coreCompleteSync =
737+
info.PriorityStatus.FirstOrDefault(s => s.Priority == SyncProgress.FULL_SYNC_PRIORITY);
737738
var completeSync = coreCompleteSync != null ? CoreStatusToSyncStatus(coreCompleteSync) : null;
738-
739+
739740
UpdateSyncStatus(new SyncStatusOptions
740741
{
741742
Connected = info.Connected,
@@ -753,21 +754,22 @@ async Task HandleInstruction(Instruction instruction)
753754
// TODO handle errors later?
754755
new UpdateSyncStatusOptions
755756
{
756-
ClearDownloadError = true,
757-
ClearUploadError = true
757+
// ClearDownloadError = true,
758+
// ClearUploadError = true
758759
}
759760
);
760-
761+
761762
break;
762763
case EstablishSyncStream establishSyncStream:
763-
if (receivingLines != null) {
764+
if (receivingLines != null)
765+
{
764766
// Already connected, this shouldn't happen during a single iteration.
765767
throw new Exception("Unexpected request to establish sync stream, already connected");
766768
}
767-
769+
768770
receivingLines = Connect(establishSyncStream);
769771
break;
770-
case FetchCredentials { DidExpire: true, }:
772+
case FetchCredentials { DidExpire: true, }:
771773
Options.Remote.InvalidateCredentials();
772774
break;
773775
case FetchCredentials:
@@ -781,7 +783,7 @@ async Task HandleInstruction(Instruction instruction)
781783
break;
782784
case DidCompleteSync:
783785
UpdateSyncStatus(
784-
new SyncStatusOptions{},
786+
new SyncStatusOptions { },
785787
new UpdateSyncStatusOptions { ClearDownloadError = true });
786788
break;
787789
}
@@ -929,7 +931,7 @@ protected void UpdateSyncStatus(SyncStatusOptions options, UpdateSyncStatusOptio
929931
? null
930932
: options.DataFlow?.UploadError ?? SyncStatus.DataFlowStatus.UploadError,
931933
},
932-
PriorityStatusEntries = options.PriorityStatusEntries ?? SyncStatus.PriorityStatusEntries
934+
PriorityStatusEntries = options.PriorityStatusEntries ?? SyncStatus.PriorityStatusEntries
933935
});
934936

935937
if (!SyncStatus.Equals(updatedStatus))

0 commit comments

Comments
 (0)