@@ -94,8 +94,6 @@ def __init__(
9494 self ._pending_next_msgs_calls = None
9595 self ._pending_size = 0
9696 self ._wait_for_msgs_task = None
97- # For compatibility with tests that expect _message_iterator
98- self ._message_iterator = None
9997
10098 # For JetStream enabled subscriptions.
10199 self ._jsi : Optional [JetStreamContext ._JSI ] = None
@@ -147,13 +145,6 @@ async def _message_generator(self) -> AsyncIterator[Msg]:
147145 if self ._closed :
148146 break
149147
150- # Check if wrapper was cancelled (for compatibility with tests).
151- if (
152- hasattr (self , "_message_iterator" )
153- and self ._message_iterator
154- and self ._message_iterator ._unsubscribed_future .done ()
155- ):
156- break
157148
158149 # Check max message limit based on how many we've yielded so far.
159150 if self ._max_msgs > 0 and yielded_count >= self ._max_msgs :
@@ -177,9 +168,6 @@ async def _message_generator(self) -> AsyncIterator[Msg]:
177168
178169 # Check if we should auto-unsubscribe after yielding this message.
179170 if self ._max_msgs > 0 and yielded_count >= self ._max_msgs :
180- # Cancel the wrapper too for consistency.
181- if hasattr (self , "_message_iterator" ) and self ._message_iterator :
182- self ._message_iterator ._cancel ()
183171 break
184172 except asyncio .CancelledError :
185173 pass
@@ -271,8 +259,7 @@ def _start(self, error_cb):
271259 pass
272260 else :
273261 # For async iteration, we now use a generator directly via the messages property
274- # But we create a compatibility wrapper for tests
275- self ._message_iterator = _CompatibilityIteratorWrapper (self )
262+ pass
276263
277264 async def drain (self ):
278265 """
@@ -343,8 +330,6 @@ def _stop_processing(self) -> None:
343330 """
344331 if self ._wait_for_msgs_task and not self ._wait_for_msgs_task .done ():
345332 self ._wait_for_msgs_task .cancel ()
346- if hasattr (self , "_message_iterator" ) and self ._message_iterator :
347- self ._message_iterator ._cancel ()
348333
349334 # Only put sentinel if there are active async generators
350335 try :
@@ -396,16 +381,3 @@ async def _wait_for_msgs(self, error_cb) -> None:
396381 break
397382
398383
399- class _CompatibilityIteratorWrapper :
400- """
401- Compatibility wrapper that provides the same interface as the old _SubscriptionMessageIterator
402- but uses the more efficient generator internally.
403- """
404-
405- def __init__ (self , sub : Subscription ) -> None :
406- self ._sub = sub
407- self ._unsubscribed_future : asyncio .Future [bool ] = asyncio .Future ()
408-
409- def _cancel (self ) -> None :
410- if not self ._unsubscribed_future .done ():
411- self ._unsubscribed_future .set_result (True )
0 commit comments