Skip to content

feat(SDK-2185): FDv2 streaming base, initializer, and synchronizer#267

Open
kinyoklion wants to merge 4 commits intomainfrom
rlamb/sdk-2185/fdv2-streaming
Open

feat(SDK-2185): FDv2 streaming base, initializer, and synchronizer#267
kinyoklion wants to merge 4 commits intomainfrom
rlamb/sdk-2185/fdv2-streaming

Conversation

@kinyoklion
Copy link
Copy Markdown
Member

@kinyoklion kinyoklion commented May 6, 2026

Stacked on #266 (SSE capability surface). Mark this PR ready for review only after #266 merges and the base is rebased onto main.

Jira: SDK-2185

Scope

Phase B1 from the parallel plan — the streaming half of FDv2 sources, mirroring the polling vertical that landed in SDK-2183 / SDK-2184.

Three new files in packages/common_client/lib/src/data_sources/fdv2/:

streaming_base.dartFDv2StreamingBase

Wraps an SSEClient with FDv2 protocol semantics. Single-subscription StreamController<FDv2SourceResult>:

  • onListen → builds a fresh FDv2ProtocolHandler, subscribes to the SSE client's stream.
  • onCancel → tears down without emitting shutdown (subscriber-initiated).
  • close() → emits shutdown then closes; idempotent.

Per event:

  • Named SSE events get parsed as JSON, wrapped in an FDv2Event, fed to the handler.
  • ActionPayloadChangeSetResult with persist: true.
  • ActionGoodbye → goodbye StatusResult; closes the connection (server told us to disconnect).
  • ActionServerError / ActionError → interrupted StatusResult; SSE client's built-in backoff handles reconnect.
  • ActionNone → no emission.
  • Legacy ping events → injected PingHandler (one-shot poll); result is forwarded.
  • x-ld-fd-fallback: true on the OpenEvent → terminalError with fdv1Fallback: true; closes.
  • SSE transport error → interrupted; reconnect via SSE client.

Closed-ness tracked via a single Completer<void> _stoppedSignal (matches the polling synchronizer pattern from SDK-2184).

streaming_initializer.dartFDv2StreamingInitializer

Implements Initializer. Subscribes to the base, completes run() with the first emission, then closes the connection. close() before the first emission yields a shutdown result.

streaming_synchronizer.dartFDv2StreamingSynchronizer

Implements Synchronizer. Thin adapter forwarding the base's stream and close(), so the orchestrator can treat polling and streaming uniformly.

Auth wiring (deferred to orchestrator)

FDv2StreamingBase does not build URLs or set headers. The orchestrator (SDK-2186) constructs the SSEClient, deciding via sseClient.hasCapability(SSECapability.requestHeaders) whether to set the credential in the Authorization header or as the auth URL query parameter. The streaming source consumes whatever client it's handed.

Verification

  • 25 new tests across the three files; full FDv2 directory now at 185 tests.
  • dart analyze lib test clean in common_client/.
  • melos run analyze and melos run test both pass workspace-wide.

Follow-up (out of scope)

  • The orchestrator (C1 / SDK-2186) is what actually wires FDv2StreamingBase into the SDK. This PR delivers the building blocks.

Note

Medium Risk
Introduces new long-lived SSE streaming source logic and lifecycle/error handling, which can affect SDK update delivery and reconnect behavior. Risk is mitigated by extensive new unit tests, but the change touches core data-source plumbing.

Overview
Adds an FDv2 SSE-based streaming implementation (FDv2StreamingBase) that translates SSE events into FDv2SourceResults, including handling goodbye, server errors, legacy ping (poll bridge), reconnects (per-OpenEvent handler reset), and x-ld-fd-fallback termination; transport errors are surfaced as interrupted with sanitized logging.

Adds streaming adapters for orchestrator integration: a one-shot FDv2StreamingInitializer that returns the first streaming result then tears down, and a FDv2StreamingSynchronizer that exposes the base as a Synchronizer.

Tightens FDv2 protocol error logging to include payload id (or '<unknown>') and reason, and extends test infrastructure (TestSseClient.isClosed) plus comprehensive new tests covering lifecycle, parsing failures, reconnect safety, ping de-duping, and log sanitization.

Reviewed by Cursor Bugbot for commit 892fc8b. Bugbot is set up for automated code reviews on this repo. Configure here.

@kinyoklion kinyoklion force-pushed the rlamb/sdk-2185/fdv2-streaming branch 2 times, most recently from 97eca5b to 6a9d5ec Compare May 7, 2026 17:16
Base automatically changed from rlamb/sse-client-capabilities to main May 7, 2026 20:25
@kinyoklion kinyoklion force-pushed the rlamb/sdk-2185/fdv2-streaming branch from 6a9d5ec to 42d879a Compare May 7, 2026 21:06
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am undecided if we should support this. If we don't then we can combine the base into the synchronizer and discard it.

Flutter does run on web, but it is inherently a single page app. So I don't think one-shot mode is nearly as important as it is for js.

Lets test code assert that the SUT correctly tears the SSE
connection down. Test-only addition; the production SSEClient
implementations do not expose this state, and the TestSseClient
class is already documented as test-only with no semver guarantee.
kinyoklion added 2 commits May 7, 2026 15:15
- streaming_base.dart: wraps an SSEClient. Single-subscription
  StreamController<FDv2SourceResult>. On subscribe, opens the SSE
  stream and creates a fresh FDv2ProtocolHandler. Each named SSE
  event is parsed as JSON, wrapped in an FDv2Event, and fed to the
  handler. ActionPayload becomes a ChangeSetResult (persist: true);
  ActionGoodbye becomes a goodbye StatusResult and closes the
  connection; ActionServerError / ActionError become interrupted
  StatusResults; ActionNone does not emit. Legacy `ping` events
  invoke the injected PingHandler and forward its result. The
  `x-ld-fd-fallback` header on the OpenEvent emits terminalError
  with fdv1Fallback=true and closes. SSE transport errors surface as
  interrupted; the SSE client's built-in backoff handles reconnect.
  Closed-ness is tracked via a single Completer<void> _stoppedSignal
  matching the polling synchronizer.
- streaming_initializer.dart: implements Initializer. Subscribes to
  the base, completes run() with the first emission, then closes the
  connection. close() before the first emission yields a shutdown
  StatusResult.
- streaming_synchronizer.dart: implements Synchronizer. Thin adapter
  forwarding the base's stream so the orchestrator can treat polling
  and streaming uniformly.

Tests cover: lifecycle (open on subscribe, cancel teardown,
close+shutdown, idempotency), event handling (xfer-full payload,
environmentId from header, goodbye, malformed data, non-object
data, transport error), FDv1 fallback header (true / case-
insensitive / false ignored), legacy ping bridge (forwards result,
handles thrower), and synchronizer forwarding.

The orchestrator (SDK-2186) wires the SSEClient with the right URL
and auth strategy based on SSEClient.hasCapability(requestHeaders);
the streaming source consumes whatever client it's given.
Drops the per-file FakeSseClient stub and replaces it with the
shared TestSseClient from launchdarkly_event_source_client, driven
via the existing emitEvent / emitError API plus the new isClosed
observer. Net: ~50 lines of duplicated stub code removed across
three test files.
@kinyoklion kinyoklion force-pushed the rlamb/sdk-2185/fdv2-streaming branch from 96f16d0 to 24264af Compare May 7, 2026 22:17
…ping serialization, sanitized error log

Six fixes in the streaming vertical, each with a regression test
that fails against the prior commit:

- close-after-self-close. Goodbye and FDv1-fallback branches closed
  the controller without completing the stop signal, so a close()
  call from inside the listener's onData (the natural orchestrator
  reaction to a goodbye) would pass the signal-based guard and crash
  inside _controller.add(...) on a closed controller. Consolidated
  the three terminal paths (close, goodbye, fdv1-fallback) through
  a single _terminate() helper that orders operations safely.
  Synchronizer-level close-after-goodbye is fixed automatically.
- malformed-shape event data. The previous try/catch wrapped only
  jsonDecode; the structural casts inside per-event fromJson
  factories could throw TypeError synchronously from processEvent
  and become an unhandled async exception. Wrapped processEvent in
  the same try/catch and added handler reset on the failure path.
- protocol handler not reset on SSE reconnect. _onListen rebuilt the
  handler once per stream subscription, not per connection. SSE
  auto-reconnect emits a fresh OpenEvent on the same subscription;
  if the server resumes via Last-Event-ID without re-sending
  server-intent, stale _tempUpdates from the prior connection bleed
  into the new payload. Reset the handler on every OpenEvent so the
  SDK defends against the bleed regardless of server behavior.
- unbounded concurrent ping handlers. Two consecutive ping events
  spawned two concurrent polls (out-of-order races + DoS
  amplification). Serialized via a _pingInFlight flag; excess pings
  drop while one is in flight (one in-flight poll already satisfies
  the FDv2 ping semantic).
- SSE error log leaked the full request URL. _handleSseError logged
  the raw exception at debug, whose toString embeds the URL (and the
  base64-encoded context in GET mode). Mirrored the polling base's
  _describeError to categorize without echoing the underlying
  exception.
- ActionServerError log format. _processError in the protocol
  handler now emits the spec-mandated "An issue was encountered
  receiving updates for payload '{id}' with reason: '{reason}'.
  Automatic retry will occur." (was free-form text without payload
  id). Pre-existing from SDK-2182, surfaces externally for the
  first time in this PR.

Plus a one-line clarification in the streaming-base class doc that
restart() is intentionally not exposed -- the orchestrator handles
connection lifecycle by tearing down a streaming source and
constructing a fresh one, not by reconnecting an existing one.
/// and on every subsequent [OpenEvent] (SSE auto-reconnect), so a
/// partial transfer from the previous connection cannot bleed into
/// the new one regardless of whether the server re-sends
/// `server-intent` after a Last-Event-ID resumption. Also called
Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In practice this last-event-id part isn't relevant to us.

@kinyoklion kinyoklion marked this pull request as ready for review May 8, 2026 17:45
@kinyoklion kinyoklion requested a review from a team as a code owner May 8, 2026 17:45
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes using default mode and found 2 potential issues.

Fix All in Cursor

❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.

Reviewed by Cursor Bugbot for commit 892fc8b. Configure here.

case OpenEvent open:
_handleOpen(open);
case MessageEvent message:
_handleMessage(message);
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Missing unawaited() for fire-and-forget async call

Low Severity

_handleMessage(message) is a Future<void> async function called from the sync _handleEvent callback, but the returned Future is silently discarded without wrapping it in unawaited(). The team convention (demonstrated in polling_synchronizer.dart line 82 where unawaited(_doPoll()) is used in its _onListen callback) requires using unawaited() from dart:async to explicitly signal fire-and-forget intent when a sync callback kicks off async work.

Fix in Cursor Fix in Web

Triggered by learned rule: Use unawaited() not Future.microtask() for fire-and-forget async calls

Reviewed by Cursor Bugbot for commit 892fc8b. Configure here.

final FDv2StreamingBase _base;
final Completer<FDv2SourceResult> _completer = Completer<FDv2SourceResult>();
StreamSubscription<FDv2SourceResult>? _subscription;
bool _closed = false;
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Initializer uses bool instead of Completer for state

Low Severity

FDv2StreamingInitializer implements the Initializer contract but tracks closed state with bool _closed instead of a Completer<void> _stoppedSignal. The team's established pattern for Initializer/Synchronizer implementations (as seen in polling_initializer.dart with _closedSignal and polling_synchronizer.dart with _stoppedSignal) uses a Completer as the single authority for closed/stopped state.

Fix in Cursor Fix in Web

Triggered by learned rule: Use Completer as single authority for closed/stopped state in data sources

Reviewed by Cursor Bugbot for commit 892fc8b. Configure here.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant