Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 13 additions & 12 deletions geoengine/workflow.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,8 @@
from owslib.util import Authentication, ResponseWrapper
from owslib.wcs import WebCoverageService
from vega import VegaLite
import websockets
from websockets.asyncio.client import connect
from websockets import protocol, exceptions as ws_exceptions
import xarray as xr
import pyarrow as pa

Expand Down Expand Up @@ -554,7 +555,7 @@ async def raster_stream(
query_rectangle: QueryRectangle,
open_timeout: int = 60,
bands: Optional[List[int]] = None # TODO: move into query rectangle?
) -> AsyncIterator[RasterTile2D]:
):
'''Stream the workflow result as series of RasterTile2D (transformable to numpy and xarray)'''

if bands is None:
Expand Down Expand Up @@ -584,21 +585,21 @@ async def raster_stream(
if url is None:
raise InputException('Invalid websocket url')

async with websockets.asyncio.client.connect(
async with connect(
uri=self.__replace_http_with_ws(url),
extra_headers=session.auth_header,
additional_headers=session.auth_header,
open_timeout=open_timeout,
max_size=None,
) as websocket:

tile_bytes: Optional[bytes] = None

while websocket.state == websockets.protocol.State.OPEN:
while websocket.state == protocol.State.OPEN:
async def read_new_bytes() -> Optional[bytes]:
# already send the next request to speed up the process
try:
await websocket.send("NEXT")
except websockets.exceptions.ConnectionClosed:
except ws_exceptions.ConnectionClosed:
# the websocket connection is already closed, we cannot read anymore
return None

Expand All @@ -610,7 +611,7 @@ async def read_new_bytes() -> Optional[bytes]:
raise GeoEngineException({'error': data})

return data
except websockets.exceptions.ConnectionClosedOK:
except ws_exceptions.ConnectionClosedOK:
# the websocket connection closed gracefully, so we stop reading
return None

Expand Down Expand Up @@ -791,21 +792,21 @@ def process_bytes(batch_bytes: Optional[bytes]) -> Optional[gpd.GeoDataFrame]:
if url is None:
raise InputException('Invalid websocket url')

async with websockets.asyncio.client.connect(
async with connect(
uri=self.__replace_http_with_ws(url),
extra_headers=session.auth_header,
additional_headers=session.auth_header,
open_timeout=open_timeout,
max_size=None, # allow arbitrary large messages, since it is capped by the server's chunk size
) as websocket:

batch_bytes: Optional[bytes] = None

while websocket.state == websockets.protocol.State.OPEN:
while websocket.state == protocol.State.OPEN:
async def read_new_bytes() -> Optional[bytes]:
# already send the next request to speed up the process
try:
await websocket.send("NEXT")
except websockets.exceptions.ConnectionClosed:
except ws_exceptions.ConnectionClosed:
# the websocket connection is already closed, we cannot read anymore
return None

Expand All @@ -817,7 +818,7 @@ async def read_new_bytes() -> Optional[bytes]:
raise GeoEngineException({'error': data})

return data
except websockets.exceptions.ConnectionClosedOK:
except ws_exceptions.ConnectionClosedOK:
# the websocket connection closed gracefully, so we stop reading
return None

Expand Down
Loading