Skip to content

Commit 19771bc

Browse files
committed
feat: add SSE auto-reconnect for Axon subscriptions
- Add AxonSubscribeSseParams with after_sequence parameter (internal use only) - Wrap subscribe_sse() with ReconnectingStream/AsyncReconnectingStream - Automatically resume from last received event on timeout using sequence-based offset - Add opt-out via RAW_RESPONSE_HEADER for users who want raw streams - after_sequence is handled internally by reconnector, not exposed in public API Per code review feedback from @dines-rl on TypeScript PR #765: 'after_sequence should just be a variable in the reconnector for follow-up' Part of porting TypeScript PR #765 features to Python SDK.
1 parent 6baf371 commit 19771bc

File tree

3 files changed

+117
-16
lines changed

3 files changed

+117
-16
lines changed

src/runloop_api_client/resources/axons/axons.py

Lines changed: 102 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
from __future__ import annotations
44

5-
from typing import Optional
5+
from typing import Optional, cast
66
from typing_extensions import Literal
77

88
import httpx
@@ -16,6 +16,7 @@
1616
AsyncSqlResourceWithStreamingResponse,
1717
)
1818
from ...types import axon_list_params, axon_create_params, axon_publish_params
19+
from ...types.axons import axon_subscribe_sse_params
1920
from ..._types import Body, Omit, Query, Headers, NotGiven, omit, not_given
2021
from ..._utils import path_template, maybe_transform, async_maybe_transform
2122
from ..._compat import cached_property
@@ -26,7 +27,8 @@
2627
async_to_raw_response_wrapper,
2728
async_to_streamed_response_wrapper,
2829
)
29-
from ..._streaming import Stream, AsyncStream
30+
from ..._constants import RAW_RESPONSE_HEADER
31+
from ..._streaming import Stream, AsyncStream, ReconnectingStream, AsyncReconnectingStream
3032
from ...pagination import SyncAxonsCursorIDPage, AsyncAxonsCursorIDPage
3133
from ..._base_client import AsyncPaginator, make_request_options
3234
from ...types.axon_view import AxonView
@@ -269,6 +271,8 @@ def subscribe_sse(
269271
"""
270272
[Beta] Subscribe to an axon event stream via server-sent events.
271273
274+
Automatically reconnects on timeout, resuming from last received event.
275+
272276
Args:
273277
extra_headers: Send extra headers
274278
@@ -282,14 +286,54 @@ def subscribe_sse(
282286
raise ValueError(f"Expected a non-empty value for `id` but received {id!r}")
283287
default_headers: Headers = {"Accept": "text/event-stream"}
284288
merged_headers = default_headers if extra_headers is None else {**default_headers, **extra_headers}
285-
return self._get(
286-
path_template("/v1/axons/{id}/subscribe/sse", id=id),
287-
options=make_request_options(
288-
extra_headers=merged_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
289+
290+
# Check if user wants raw response (opt-out of reconnection)
291+
if extra_headers is not None and RAW_RESPONSE_HEADER in extra_headers:
292+
return self._get(
293+
path_template("/v1/axons/{id}/subscribe/sse", id=id),
294+
options=make_request_options(
295+
extra_headers=merged_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
296+
),
297+
cast_to=AxonEventView,
298+
stream=True,
299+
stream_cls=Stream[AxonEventView],
300+
)
301+
302+
def create_stream(last_sequence: str | None) -> Stream[AxonEventView]:
303+
# after_sequence is used internally for reconnection only
304+
sequence_int = int(last_sequence) if last_sequence is not None else None
305+
return self._get(
306+
path_template("/v1/axons/{id}/subscribe/sse", id=id),
307+
options=make_request_options(
308+
extra_headers=merged_headers,
309+
extra_query=extra_query,
310+
extra_body=extra_body,
311+
timeout=timeout,
312+
query=maybe_transform(
313+
{"after_sequence": sequence_int},
314+
axon_subscribe_sse_params.AxonSubscribeSseParams,
315+
),
316+
),
317+
cast_to=AxonEventView,
318+
stream=True,
319+
stream_cls=Stream[AxonEventView],
320+
)
321+
322+
initial_stream = create_stream(None)
323+
324+
def get_sequence(item: AxonEventView) -> str | None:
325+
value = getattr(item, "sequence", None)
326+
if value is None:
327+
return None
328+
return str(value)
329+
330+
return cast(
331+
Stream[AxonEventView],
332+
ReconnectingStream(
333+
current_stream=initial_stream,
334+
stream_creator=create_stream,
335+
get_offset=get_sequence,
289336
),
290-
cast_to=AxonEventView,
291-
stream=True,
292-
stream_cls=Stream[AxonEventView],
293337
)
294338

295339

@@ -526,6 +570,8 @@ async def subscribe_sse(
526570
"""
527571
[Beta] Subscribe to an axon event stream via server-sent events.
528572
573+
Automatically reconnects on timeout, resuming from last received event.
574+
529575
Args:
530576
extra_headers: Send extra headers
531577
@@ -539,14 +585,54 @@ async def subscribe_sse(
539585
raise ValueError(f"Expected a non-empty value for `id` but received {id!r}")
540586
default_headers: Headers = {"Accept": "text/event-stream"}
541587
merged_headers = default_headers if extra_headers is None else {**default_headers, **extra_headers}
542-
return await self._get(
543-
path_template("/v1/axons/{id}/subscribe/sse", id=id),
544-
options=make_request_options(
545-
extra_headers=merged_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
588+
589+
# Check if user wants raw response (opt-out of reconnection)
590+
if extra_headers is not None and RAW_RESPONSE_HEADER in extra_headers:
591+
return await self._get(
592+
path_template("/v1/axons/{id}/subscribe/sse", id=id),
593+
options=make_request_options(
594+
extra_headers=merged_headers, extra_query=extra_query, extra_body=extra_body, timeout=timeout
595+
),
596+
cast_to=AxonEventView,
597+
stream=True,
598+
stream_cls=AsyncStream[AxonEventView],
599+
)
600+
601+
async def create_stream(last_sequence: str | None) -> AsyncStream[AxonEventView]:
602+
# after_sequence is used internally for reconnection only
603+
sequence_int = int(last_sequence) if last_sequence is not None else None
604+
return await self._get(
605+
path_template("/v1/axons/{id}/subscribe/sse", id=id),
606+
options=make_request_options(
607+
extra_headers=merged_headers,
608+
extra_query=extra_query,
609+
extra_body=extra_body,
610+
timeout=timeout,
611+
query=maybe_transform(
612+
{"after_sequence": sequence_int},
613+
axon_subscribe_sse_params.AxonSubscribeSseParams,
614+
),
615+
),
616+
cast_to=AxonEventView,
617+
stream=True,
618+
stream_cls=AsyncStream[AxonEventView],
619+
)
620+
621+
initial_stream = await create_stream(None)
622+
623+
def get_sequence(item: AxonEventView) -> str | None:
624+
value = getattr(item, "sequence", None)
625+
if value is None:
626+
return None
627+
return str(value)
628+
629+
return cast(
630+
AsyncStream[AxonEventView],
631+
AsyncReconnectingStream(
632+
current_stream=initial_stream,
633+
stream_creator=create_stream,
634+
get_offset=get_sequence,
546635
),
547-
cast_to=AxonEventView,
548-
stream=True,
549-
stream_cls=AsyncStream[AxonEventView],
550636
)
551637

552638

src/runloop_api_client/types/axons/__init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22

33
from __future__ import annotations
44

5+
from .axon_subscribe_sse_params import AxonSubscribeSseParams as AxonSubscribeSseParams
56
from .sql_batch_params import SqlBatchParams as SqlBatchParams
67
from .sql_query_params import SqlQueryParams as SqlQueryParams
78
from .sql_step_error_view import SqlStepErrorView as SqlStepErrorView
Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# File generated from our OpenAPI spec by Stainless. See CONTRIBUTING.md for details.
2+
3+
from __future__ import annotations
4+
5+
from typing_extensions import TypedDict
6+
7+
from ..._types import NotGiven
8+
9+
__all__ = ["AxonSubscribeSseParams"]
10+
11+
12+
class AxonSubscribeSseParams(TypedDict, total=False):
13+
after_sequence: int | NotGiven
14+
"""Resume SSE stream from events after this sequence number (used internally for reconnection)"""

0 commit comments

Comments
 (0)