Skip to content

Commit a015865

Browse files
committed
test: add comprehensive unit tests for new features
Add extensive test coverage for: 1. CancellationToken class (test_cancellation.py): - Thread-safe cancellation behavior - Sync and async event handling - Lazy async event creation - raise_if_cancelled() method - Multiple tokens independence - Cross-thread cancellation 2. Polling with cancellation (test_polling.py): - Cancellation before first poll - Cancellation during polling loop - Cancellation during sleep with immediate wake-up - Cancellation with error handlers - Backward compatibility (None token) - Multiple polls with same token 3. Async polling with cancellation (test_polling_async.py): - All sync tests adapted for async - Concurrent polling with shared token - Cancellation from different async tasks - asyncio.TimeoutError handling 4. SSE auto-reconnect for Axons (test_axon_sse_reconnect.py): - ReconnectingStream/AsyncReconnectingStream usage - Sequence-based resumption after disconnect - RAW_RESPONSE_HEADER opt-out mechanism - Missing/None sequence handling - Request options preservation - AxonSubscribeSseParams structure Total: 60+ new test cases ensuring robustness of both features. Part of porting TypeScript PR #765 features to Python SDK.
1 parent 19771bc commit a015865

File tree

4 files changed

+1020
-0
lines changed

4 files changed

+1020
-0
lines changed

tests/test_axon_sse_reconnect.py

Lines changed: 341 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,341 @@
1+
"""Tests for Axon SSE auto-reconnect functionality."""
2+
3+
from unittest.mock import Mock, patch, MagicMock
4+
from typing import Iterator, AsyncIterator
5+
6+
import pytest
7+
8+
from src.runloop_api_client._streaming import Stream, AsyncStream, ReconnectingStream, AsyncReconnectingStream
9+
from src.runloop_api_client._constants import RAW_RESPONSE_HEADER
10+
from src.runloop_api_client.types.axon_event_view import AxonEventView
11+
12+
13+
class MockAxonEvent:
14+
"""Mock AxonEventView for testing."""
15+
16+
def __init__(self, sequence: int, data: str):
17+
self.sequence = sequence
18+
self.data = data
19+
20+
21+
class TestAxonSSEReconnectSync:
22+
"""Test SSE reconnection for sync Axon subscriptions."""
23+
24+
def test_subscribe_sse_returns_reconnecting_stream(self):
25+
"""Test that subscribe_sse returns a ReconnectingStream."""
26+
from src.runloop_api_client import Runloop
27+
28+
with patch.object(Runloop, "_get") as mock_get:
29+
# Mock the initial stream
30+
mock_stream = Mock(spec=Stream)
31+
mock_get.return_value = mock_stream
32+
33+
client = Runloop(api_key="test-key", base_url="http://test")
34+
35+
result = client.axons.subscribe_sse("axon-123")
36+
37+
# Should return a ReconnectingStream
38+
assert isinstance(result, ReconnectingStream)
39+
40+
def test_subscribe_sse_with_raw_header_returns_plain_stream(self):
41+
"""Test that RAW_RESPONSE_HEADER opts out of reconnection."""
42+
from src.runloop_api_client import Runloop
43+
44+
with patch.object(Runloop, "_get") as mock_get:
45+
mock_stream = Mock(spec=Stream)
46+
mock_get.return_value = mock_stream
47+
48+
client = Runloop(api_key="test-key", base_url="http://test")
49+
50+
result = client.axons.subscribe_sse("axon-123", extra_headers={RAW_RESPONSE_HEADER: "true"})
51+
52+
# Should return plain Stream, not ReconnectingStream
53+
assert not isinstance(result, ReconnectingStream)
54+
assert result == mock_stream
55+
56+
def test_reconnection_uses_last_sequence(self):
57+
"""Test that reconnection uses the sequence from the last event."""
58+
from src.runloop_api_client import Runloop
59+
60+
call_count = 0
61+
query_params = []
62+
63+
def mock_get(*args, **kwargs):
64+
nonlocal call_count
65+
# Capture query params
66+
if "query" in kwargs.get("options", {}):
67+
query_params.append(kwargs["options"]["query"])
68+
69+
# First call: return stream with events
70+
if call_count == 0:
71+
call_count += 1
72+
mock_stream = Mock(spec=Stream)
73+
74+
def mock_iter():
75+
yield MockAxonEvent(sequence=1, data="event1")
76+
yield MockAxonEvent(sequence=2, data="event2")
77+
# Simulate timeout/disconnect
78+
raise StopIteration()
79+
80+
mock_stream.__iter__ = mock_iter
81+
return mock_stream
82+
83+
# Second call (reconnection): return stream continuing from sequence 2
84+
mock_stream = Mock(spec=Stream)
85+
86+
def mock_iter():
87+
yield MockAxonEvent(sequence=3, data="event3")
88+
89+
mock_stream.__iter__ = mock_iter
90+
return mock_stream
91+
92+
with patch.object(Runloop, "_get", side_effect=mock_get):
93+
client = Runloop(api_key="test-key", base_url="http://test")
94+
95+
stream = client.axons.subscribe_sse("axon-123")
96+
97+
# Consume events
98+
events = list(stream)
99+
100+
# Should have 3 events total (2 from first stream, 1 from reconnected stream)
101+
assert len(events) == 3
102+
assert events[0].sequence == 1
103+
assert events[1].sequence == 2
104+
assert events[2].sequence == 3
105+
106+
# Check that second call used after_sequence parameter
107+
# Note: first call has None, second call should have after_sequence=2
108+
assert len(query_params) >= 2
109+
110+
def test_sequence_extraction_handles_missing_sequence(self):
111+
"""Test that missing sequence fields are handled gracefully."""
112+
from src.runloop_api_client import Runloop
113+
114+
with patch.object(Runloop, "_get") as mock_get:
115+
mock_stream = Mock(spec=Stream)
116+
117+
# Event without sequence attribute
118+
class EventWithoutSequence:
119+
pass
120+
121+
def mock_iter():
122+
yield EventWithoutSequence()
123+
124+
mock_stream.__iter__ = mock_iter
125+
mock_get.return_value = mock_stream
126+
127+
client = Runloop(api_key="test-key", base_url="http://test")
128+
129+
stream = client.axons.subscribe_sse("axon-123")
130+
131+
# Should not crash, sequence extractor should return None
132+
events = list(stream)
133+
assert len(events) == 1
134+
135+
def test_subscribe_sse_preserves_request_options(self):
136+
"""Test that extra headers, query, etc. are preserved."""
137+
from src.runloop_api_client import Runloop
138+
139+
with patch.object(Runloop, "_get") as mock_get:
140+
mock_stream = Mock(spec=Stream)
141+
mock_stream.__iter__ = lambda self: iter([])
142+
mock_get.return_value = mock_stream
143+
144+
client = Runloop(api_key="test-key", base_url="http://test")
145+
146+
extra_headers = {"X-Custom": "value"}
147+
extra_query = {"param": "value"}
148+
149+
client.axons.subscribe_sse(
150+
"axon-123", extra_headers=extra_headers, extra_query=extra_query, timeout=30.0
151+
)
152+
153+
# Verify _get was called with the options
154+
call_args = mock_get.call_args
155+
options = call_args.kwargs["options"]
156+
157+
# Headers should include Accept: text/event-stream and custom header
158+
assert "Accept" in options["extra_headers"]
159+
assert options["extra_headers"]["Accept"] == "text/event-stream"
160+
assert options["extra_headers"]["X-Custom"] == "value"
161+
assert options["extra_query"] == extra_query
162+
assert options["timeout"] == 30.0
163+
164+
165+
class TestAxonSSEReconnectAsync:
166+
"""Test SSE reconnection for async Axon subscriptions."""
167+
168+
@pytest.mark.asyncio
169+
async def test_subscribe_sse_returns_reconnecting_stream(self):
170+
"""Test that subscribe_sse returns an AsyncReconnectingStream."""
171+
from src.runloop_api_client import AsyncRunloop
172+
173+
async def mock_get(*args, **kwargs):
174+
mock_stream = Mock(spec=AsyncStream)
175+
return mock_stream
176+
177+
with patch.object(AsyncRunloop, "_get", side_effect=mock_get):
178+
client = AsyncRunloop(api_key="test-key", base_url="http://test")
179+
180+
result = await client.axons.subscribe_sse("axon-123")
181+
182+
# Should return an AsyncReconnectingStream
183+
assert isinstance(result, AsyncReconnectingStream)
184+
185+
@pytest.mark.asyncio
186+
async def test_subscribe_sse_with_raw_header_returns_plain_stream(self):
187+
"""Test that RAW_RESPONSE_HEADER opts out of reconnection."""
188+
from src.runloop_api_client import AsyncRunloop
189+
190+
async def mock_get(*args, **kwargs):
191+
mock_stream = Mock(spec=AsyncStream)
192+
return mock_stream
193+
194+
with patch.object(AsyncRunloop, "_get", side_effect=mock_get) as mock_get_method:
195+
client = AsyncRunloop(api_key="test-key", base_url="http://test")
196+
197+
result = await client.axons.subscribe_sse("axon-123", extra_headers={RAW_RESPONSE_HEADER: "true"})
198+
199+
# Should return plain AsyncStream, not AsyncReconnectingStream
200+
assert not isinstance(result, AsyncReconnectingStream)
201+
202+
@pytest.mark.asyncio
203+
async def test_reconnection_uses_last_sequence(self):
204+
"""Test that reconnection uses the sequence from the last event."""
205+
from src.runloop_api_client import AsyncRunloop
206+
207+
call_count = 0
208+
query_params = []
209+
210+
async def mock_get(*args, **kwargs):
211+
nonlocal call_count
212+
# Capture query params
213+
if "query" in kwargs.get("options", {}):
214+
query_params.append(kwargs["options"]["query"])
215+
216+
# First call: return stream with events
217+
if call_count == 0:
218+
call_count += 1
219+
mock_stream = Mock(spec=AsyncStream)
220+
221+
async def mock_iter():
222+
yield MockAxonEvent(sequence=1, data="event1")
223+
yield MockAxonEvent(sequence=2, data="event2")
224+
225+
mock_stream.__aiter__ = mock_iter
226+
return mock_stream
227+
228+
# Second call (reconnection)
229+
mock_stream = Mock(spec=AsyncStream)
230+
231+
async def mock_iter():
232+
yield MockAxonEvent(sequence=3, data="event3")
233+
234+
mock_stream.__aiter__ = mock_iter
235+
return mock_stream
236+
237+
with patch.object(AsyncRunloop, "_get", side_effect=mock_get):
238+
client = AsyncRunloop(api_key="test-key", base_url="http://test")
239+
240+
stream = await client.axons.subscribe_sse("axon-123")
241+
242+
# Consume events
243+
events = []
244+
async for event in stream:
245+
events.append(event)
246+
if len(events) >= 3:
247+
break
248+
249+
# Should have 3 events total
250+
assert len(events) == 3
251+
assert events[0].sequence == 1
252+
assert events[1].sequence == 2
253+
assert events[2].sequence == 3
254+
255+
@pytest.mark.asyncio
256+
async def test_sequence_extraction_handles_none(self):
257+
"""Test that None sequences are handled gracefully."""
258+
from src.runloop_api_client import AsyncRunloop
259+
260+
async def mock_get(*args, **kwargs):
261+
mock_stream = Mock(spec=AsyncStream)
262+
263+
# Event with sequence = None
264+
class EventWithNoneSequence:
265+
sequence = None
266+
267+
async def mock_iter():
268+
yield EventWithNoneSequence()
269+
270+
mock_stream.__aiter__ = mock_iter
271+
return mock_stream
272+
273+
with patch.object(AsyncRunloop, "_get", side_effect=mock_get):
274+
client = AsyncRunloop(api_key="test-key", base_url="http://test")
275+
276+
stream = await client.axons.subscribe_sse("axon-123")
277+
278+
# Should not crash
279+
events = []
280+
async for event in stream:
281+
events.append(event)
282+
break
283+
284+
assert len(events) == 1
285+
286+
@pytest.mark.asyncio
287+
async def test_subscribe_sse_preserves_request_options(self):
288+
"""Test that extra headers, query, etc. are preserved in async."""
289+
from src.runloop_api_client import AsyncRunloop
290+
291+
async def mock_get(*args, **kwargs):
292+
mock_stream = Mock(spec=AsyncStream)
293+
294+
async def mock_iter():
295+
return
296+
yield # Make it async generator
297+
298+
mock_stream.__aiter__ = mock_iter
299+
return mock_stream
300+
301+
with patch.object(AsyncRunloop, "_get", side_effect=mock_get) as mock_get_method:
302+
client = AsyncRunloop(api_key="test-key", base_url="http://test")
303+
304+
extra_headers = {"X-Custom": "value"}
305+
extra_query = {"param": "value"}
306+
307+
await client.axons.subscribe_sse(
308+
"axon-123", extra_headers=extra_headers, extra_query=extra_query, timeout=30.0
309+
)
310+
311+
# Verify _get was called with the options
312+
call_args = mock_get_method.call_args
313+
options = call_args.kwargs["options"]
314+
315+
# Headers should include Accept: text/event-stream and custom header
316+
assert "Accept" in options["extra_headers"]
317+
assert options["extra_headers"]["Accept"] == "text/event-stream"
318+
assert options["extra_headers"]["X-Custom"] == "value"
319+
assert options["extra_query"] == extra_query
320+
assert options["timeout"] == 30.0
321+
322+
323+
class TestAxonSubscribeSseParams:
324+
"""Test AxonSubscribeSseParams TypedDict."""
325+
326+
def test_params_structure(self):
327+
"""Test that AxonSubscribeSseParams has the correct structure."""
328+
from src.runloop_api_client.types.axons import AxonSubscribeSseParams
329+
from src.runloop_api_client._types import NOT_GIVEN
330+
331+
# Should be able to create with after_sequence
332+
params: AxonSubscribeSseParams = {"after_sequence": 123}
333+
assert params["after_sequence"] == 123
334+
335+
# Should be able to create with NOT_GIVEN
336+
params2: AxonSubscribeSseParams = {"after_sequence": NOT_GIVEN}
337+
assert params2["after_sequence"] is NOT_GIVEN
338+
339+
# Should be able to create with None implicitly (total=False)
340+
params3: AxonSubscribeSseParams = {}
341+
assert "after_sequence" not in params3

0 commit comments

Comments
 (0)