From 69d026f72f6c6c3b50798066143ecc4f839deffb Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Dr=C3=B6nner?= Date: Sat, 14 Jun 2025 09:29:02 +0200 Subject: [PATCH 1/3] use websocket connect --- geoengine/workflow.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/geoengine/workflow.py b/geoengine/workflow.py index fd08f377..6f41a3aa 100644 --- a/geoengine/workflow.py +++ b/geoengine/workflow.py @@ -25,7 +25,7 @@ from owslib.util import Authentication, ResponseWrapper from owslib.wcs import WebCoverageService from vega import VegaLite -import websockets +from websockets.asyncio.client import connect import xarray as xr import pyarrow as pa @@ -554,7 +554,7 @@ async def raster_stream( query_rectangle: QueryRectangle, open_timeout: int = 60, bands: Optional[List[int]] = None # TODO: move into query rectangle? - ) -> AsyncIterator[RasterTile2D]: + ): '''Stream the workflow result as series of RasterTile2D (transformable to numpy and xarray)''' if bands is None: @@ -584,7 +584,7 @@ async def raster_stream( if url is None: raise InputException('Invalid websocket url') - async with websockets.asyncio.client.connect( + async with connect( uri=self.__replace_http_with_ws(url), extra_headers=session.auth_header, open_timeout=open_timeout, From 6e5525c3acad7d5397ae89424973b55716e41a06 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Dr=C3=B6nner?= Date: Sat, 14 Jun 2025 09:51:12 +0200 Subject: [PATCH 2/3] update ws params --- geoengine/workflow.py | 17 +++++++++-------- 1 file changed, 9 insertions(+), 8 deletions(-) diff --git a/geoengine/workflow.py b/geoengine/workflow.py index 6f41a3aa..bad2769e 100644 --- a/geoengine/workflow.py +++ b/geoengine/workflow.py @@ -26,6 +26,7 @@ from owslib.wcs import WebCoverageService from vega import VegaLite from websockets.asyncio.client import connect +from websockets import protocol, exceptions as ws_exceptions import xarray as xr import pyarrow as pa @@ -593,12 +594,12 @@ async def raster_stream( tile_bytes: Optional[bytes] = None - while websocket.state == websockets.protocol.State.OPEN: + while websocket.state == protocol.State.OPEN: async def read_new_bytes() -> Optional[bytes]: # already send the next request to speed up the process try: await websocket.send("NEXT") - except websockets.exceptions.ConnectionClosed: + except ws_exceptions.ConnectionClosed: # the websocket connection is already closed, we cannot read anymore return None @@ -610,7 +611,7 @@ async def read_new_bytes() -> Optional[bytes]: raise GeoEngineException({'error': data}) return data - except websockets.exceptions.ConnectionClosedOK: + except ws_exceptions.ConnectionClosedOK: # the websocket connection closed gracefully, so we stop reading return None @@ -791,21 +792,21 @@ def process_bytes(batch_bytes: Optional[bytes]) -> Optional[gpd.GeoDataFrame]: if url is None: raise InputException('Invalid websocket url') - async with websockets.asyncio.client.connect( + async with connect( uri=self.__replace_http_with_ws(url), - extra_headers=session.auth_header, + additional_headers=session.auth_header, open_timeout=open_timeout, max_size=None, # allow arbitrary large messages, since it is capped by the server's chunk size ) as websocket: batch_bytes: Optional[bytes] = None - while websocket.state == websockets.protocol.State.OPEN: + while websocket.state == protocol.State.OPEN: async def read_new_bytes() -> Optional[bytes]: # already send the next request to speed up the process try: await websocket.send("NEXT") - except websockets.exceptions.ConnectionClosed: + except ws_exceptions.ConnectionClosed: # the websocket connection is already closed, we cannot read anymore return None @@ -817,7 +818,7 @@ async def read_new_bytes() -> Optional[bytes]: raise GeoEngineException({'error': data}) return data - except websockets.exceptions.ConnectionClosedOK: + except ws_exceptions.ConnectionClosedOK: # the websocket connection closed gracefully, so we stop reading return None From 7f16f79dee759cf7c696502913985d3185201af6 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Johannes=20Dr=C3=B6nner?= Date: Sat, 14 Jun 2025 10:00:46 +0200 Subject: [PATCH 3/3] extra_headers -> additional_headers --- geoengine/workflow.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/geoengine/workflow.py b/geoengine/workflow.py index bad2769e..65bd8da8 100644 --- a/geoengine/workflow.py +++ b/geoengine/workflow.py @@ -587,7 +587,7 @@ async def raster_stream( async with connect( uri=self.__replace_http_with_ws(url), - extra_headers=session.auth_header, + additional_headers=session.auth_header, open_timeout=open_timeout, max_size=None, ) as websocket: