@@ -137,7 +137,7 @@ public StreamingSyncImplementation(StreamingSyncImplementationOptions options)
137137
138138 TriggerCrudUpload = ( ) =>
139139 {
140- if ( ! SyncStatus . Connected || SyncStatus . DataFlowStatus . Uploading )
140+ if ( ! SyncStatus . Connected )
141141 {
142142 return ;
143143 }
@@ -280,8 +280,9 @@ protected async Task StreamingSync(CancellationToken? signal, PowerSyncConnectio
280280 {
281281 break ;
282282 }
283-
283+ Console . WriteLine ( "XXXX starting" ) ;
284284 await StreamingSyncIteration ( nestedCts . Token , options ) ;
285+ Console . WriteLine ( "XXXX ending" ) ;
285286 // Continue immediately
286287 }
287288 catch ( Exception ex )
@@ -360,27 +361,10 @@ private async Task RustSyncIteration(CancellationToken? signal, RequiredPowerSyn
360361 var nestedCts = new CancellationTokenSource ( ) ;
361362 signal ? . Register ( ( ) => { nestedCts . Cancel ( ) ; } ) ;
362363
363-
364- try
365- {
366- notifyCompletedUploads = ( ) => { Task . Run ( async ( ) => await Control ( "completed_upload" ) ) ; } ;
367-
368- await Control ( "start" , JsonConvert . SerializeObject ( resolvedOptions . Params ) ) ;
369- if ( receivingLines != null )
370- {
371- await receivingLines ;
372- }
373- }
374- finally
375- {
376- notifyCompletedUploads = null ;
377- await Stop ( ) ;
378- }
379-
380- return ;
381-
382364 async Task Connect ( EstablishSyncStream instruction )
383365 {
366+ Console . WriteLine ( "----- We got het here again" + nestedCts . Token . IsCancellationRequested ) ;
367+ Console . WriteLine ( "-----" + JsonConvert . SerializeObject ( instruction . Request ) ) ;
384368 var syncOptions = new SyncStreamOptions
385369 {
386370 Path = "/sync/stream" ,
@@ -390,14 +374,19 @@ async Task Connect(EstablishSyncStream instruction)
390374
391375 var stream = await Options . Remote . PostStreamRaw ( syncOptions ) ;
392376 using var reader = new StreamReader ( stream , Encoding . UTF8 ) ;
377+
378+ syncOptions . CancellationToken . Register ( ( ) => {
379+ try { stream ? . Close ( ) ; } catch { }
380+ } ) ;
381+
393382 string ? line ;
394383
395384 while ( ( line = await reader . ReadLineAsync ( ) ) != null )
396385 {
397- logger . LogDebug ( "Parsing line for rust sync stream {message}" , line ) ;
386+ logger . LogDebug ( "Parsing line for rust sync stream {message}" , "xx" ) ;
398387 await Control ( "line_text" , line ) ;
399-
400388 }
389+ Console . WriteLine ( "Done" ) ;
401390 }
402391
403392 async Task Stop ( )
@@ -410,31 +399,18 @@ async Task Control(string op, object? payload = null)
410399 logger . LogDebug ( "Control call {message}" , op ) ;
411400
412401 var rawResponse = await Options . Adapter . Control ( op , payload ) ;
413- await HandleInstructions ( Instruction . ParseInstructions ( rawResponse ) ) ;
402+ HandleInstructions ( Instruction . ParseInstructions ( rawResponse ) ) ;
414403 }
415404
416- async Task HandleInstructions ( Instruction [ ] instructions )
405+ void HandleInstructions ( Instruction [ ] instructions )
417406 {
418407 foreach ( var instruction in instructions )
419408 {
420- await HandleInstruction ( instruction ) ;
409+ HandleInstruction ( instruction ) ;
421410 }
422411 }
423412
424- DB . Crud . SyncPriorityStatus CoreStatusToSyncStatus ( SyncPriorityStatus status )
425- {
426- logger . LogWarning ( "Sync status {status}" ,
427- status ? . LastSyncedAt != null ? new DateTime ( status ! . LastSyncedAt ) : null ) ;
428- return new DB . Crud . SyncPriorityStatus
429- {
430- Priority = status . Priority ,
431- HasSynced = status . HasSynced ?? null ,
432- // TODO check this value
433- LastSyncedAt = status ? . LastSyncedAt != null ? new DateTime ( status ! . LastSyncedAt ) : null
434- } ;
435- }
436-
437- async Task HandleInstruction ( Instruction instruction )
413+ void HandleInstruction ( Instruction instruction )
438414 {
439415 switch ( instruction )
440416 {
@@ -451,7 +427,6 @@ async Task HandleInstruction(Instruction instruction)
451427 logger . LogWarning ( "{message}" , logLine . Line ) ;
452428 break ;
453429 }
454-
455430 break ;
456431 case UpdateSyncStatus syncStatus :
457432 var info = syncStatus . Status ;
@@ -477,25 +452,20 @@ async Task HandleInstruction(Instruction instruction)
477452 ClearDownloadError = true ,
478453 }
479454 ) ;
480-
481455 break ;
482456 case EstablishSyncStream establishSyncStream :
483457 if ( receivingLines != null )
484458 {
485- // Already connected, this shouldn't happen during a single iteration.
486459 throw new Exception ( "Unexpected request to establish sync stream, already connected" ) ;
487460 }
488-
489461 receivingLines = Connect ( establishSyncStream ) ;
490462 break ;
491- case FetchCredentials { DidExpire : true , } :
492- Options . Remote . InvalidateCredentials ( ) ;
493- break ;
494- case FetchCredentials :
463+ case FetchCredentials fetchCredentials :
495464 Options . Remote . InvalidateCredentials ( ) ;
496465 break ;
497466 case CloseSyncStream :
498- CancellationTokenSource ? . Cancel ( ) ;
467+ nestedCts . Cancel ( ) ;
468+ logger . LogWarning ( "Closing stream" ) ;
499469 break ;
500470 case FlushFileSystem :
501471 // ignore
@@ -507,6 +477,27 @@ async Task HandleInstruction(Instruction instruction)
507477 break ;
508478 }
509479 }
480+
481+ try
482+ {
483+ notifyCompletedUploads = ( ) => { Task . Run ( async ( ) => await Control ( "completed_upload" ) ) ; } ;
484+ logger . LogError ( "START" ) ;
485+ await Control ( "start" , JsonConvert . SerializeObject ( resolvedOptions . Params ) ) ;
486+ if ( receivingLines != null )
487+ {
488+ await receivingLines ;
489+ logger . LogError ( "Done waiting" ) ;
490+ }
491+ else
492+ {
493+ Console . WriteLine ( "No receiving lines task was started, this should not happen." ) ;
494+ }
495+ }
496+ finally
497+ {
498+ notifyCompletedUploads = null ;
499+ await Stop ( ) ;
500+ }
510501 }
511502
512503 public new void Close ( )
@@ -668,6 +659,16 @@ private async Task DelayRetry()
668659 await Task . Delay ( Options . RetryDelayMs . Value ) ;
669660 }
670661 }
662+
663+ private static DB . Crud . SyncPriorityStatus CoreStatusToSyncStatus ( SyncPriorityStatus status )
664+ {
665+ return new DB . Crud . SyncPriorityStatus
666+ {
667+ Priority = status . Priority ,
668+ HasSynced = status . HasSynced ?? null ,
669+ LastSyncedAt = status ? . LastSyncedAt != null ? new DateTime ( status ! . LastSyncedAt ) : null
670+ } ;
671+ }
671672}
672673
673674enum LockType
0 commit comments