55the multipart subscription protocol as implemented by Apollo GraphOS Router
66and other compatible servers.
77
8- Reference: https://www.apollographql.com/docs/graphos/routing/operations/subscriptions/multipart-protocol
9- Issue: https://github. com/graphql-python/gql/issues/463
8+ Reference:
9+ https://www.apollographql. com/docs/graphos/routing/operations/subscriptions/multipart-protocol
1010"""
1111
1212import asyncio
1313import json
1414import logging
1515from ssl import SSLContext
16- from typing import Any , AsyncGenerator , Callable , Dict , Optional , Tuple , Union
16+ from typing import Any , AsyncGenerator , Callable , Dict , Optional , Union
1717
1818import aiohttp
1919from aiohttp .client_reqrep import Fingerprint
@@ -92,7 +92,7 @@ async def connect(self) -> None:
9292 if self .session is not None :
9393 raise TransportAlreadyConnected ("Transport is already connected" )
9494
95- client_session_args = {
95+ client_session_args : Dict [ str , Any ] = {
9696 "cookies" : self .cookies ,
9797 "headers" : self .headers ,
9898 "auth" : self .auth ,
@@ -170,7 +170,7 @@ async def subscribe(
170170 error_text = await response .text ()
171171 raise TransportServerError (
172172 f"Server returned { response .status } : { error_text } " ,
173- response .status
173+ response .status ,
174174 )
175175
176176 content_type = response .headers .get ("Content-Type" , "" )
@@ -183,7 +183,9 @@ async def subscribe(
183183 )
184184
185185 # Parse multipart response
186- async for result in self ._parse_multipart_response (response , content_type ):
186+ async for result in self ._parse_multipart_response (
187+ response , content_type
188+ ):
187189 yield result
188190
189191 except (TransportServerError , TransportProtocolError ):
@@ -233,20 +235,24 @@ async def _parse_multipart_response(
233235 break # No complete part yet
234236
235237 # Check if this is the end boundary
236- if buffer [boundary_pos :boundary_pos + len (end_boundary_bytes )] == end_boundary_bytes :
238+ end_pos = boundary_pos + len (end_boundary_bytes )
239+ if buffer [boundary_pos :end_pos ] == end_boundary_bytes :
237240 log .debug ("Reached end boundary" )
238241 return
239242
240243 # Find the start of the next part (after this boundary)
241244 # Look for either another regular boundary or the end boundary
242- next_boundary_pos = buffer .find (boundary_bytes , boundary_pos + len (boundary_bytes ))
245+ next_boundary_pos = buffer .find (
246+ boundary_bytes , boundary_pos + len (boundary_bytes )
247+ )
243248
244249 if next_boundary_pos == - 1 :
245250 # No next boundary yet, wait for more data
246251 break
247252
248253 # Extract the part between boundaries
249- part_data = buffer [boundary_pos + len (boundary_bytes ):next_boundary_pos ]
254+ start_pos = boundary_pos + len (boundary_bytes )
255+ part_data = buffer [start_pos :next_boundary_pos ]
250256
251257 # Parse the part
252258 try :
@@ -270,16 +276,16 @@ def _parse_multipart_part(self, part_data: bytes) -> Optional[ExecutionResult]:
270276 :return: ExecutionResult or None if part is empty/heartbeat
271277 """
272278 # Split headers and body by double CRLF or double LF
273- part_str = part_data .decode (' utf-8' )
279+ part_str = part_data .decode (" utf-8" )
274280
275281 # Try different separators
276- if ' \r \n \r \n ' in part_str :
277- parts = part_str .split (' \r \n \r \n ' , 1 )
278- elif ' \n \n ' in part_str :
279- parts = part_str .split (' \n \n ' , 1 )
282+ if " \r \n \r \n " in part_str :
283+ parts = part_str .split (" \r \n \r \n " , 1 )
284+ elif " \n \n " in part_str :
285+ parts = part_str .split (" \n \n " , 1 )
280286 else :
281287 # No headers separator found, treat entire content as body
282- parts = ['' , part_str ]
288+ parts = ["" , part_str ]
283289
284290 if len (parts ) < 2 :
285291 return None
0 commit comments