From 036340e94f12844171d42dcfd2f020758e8e624b Mon Sep 17 00:00:00 2001 From: Felix Engelmann Date: Fri, 12 Sep 2025 12:57:55 -0400 Subject: [PATCH 1/8] new worker event assembly --- dranspose/debug_worker.py | 15 ++--- dranspose/worker.py | 117 +++++++++++++++++++------------------- 2 files changed, 68 insertions(+), 64 deletions(-) diff --git a/dranspose/debug_worker.py b/dranspose/debug_worker.py index 02d96b2..a5f4962 100644 --- a/dranspose/debug_worker.py +++ b/dranspose/debug_worker.py @@ -5,7 +5,6 @@ from contextlib import asynccontextmanager from typing import AsyncGenerator, Any -import zmq from fastapi import FastAPI from starlette.responses import Response @@ -29,13 +28,15 @@ async def work(self) -> None: while True: self.dequeue_task = None self.dequeue_task = asyncio.create_task(self.assignment_queue.get()) - ingesterset = await self.dequeue_task - done = await self.poll_internals(ingesterset) - if set(done) != {zmq.POLLIN}: - self._logger.warning("not all sockets are pollIN %s", done) - continue + evn, streamset = await self.dequeue_task - event = await self.build_event(ingesterset) + self.new_data = asyncio.Future() + while set(self.stream_queues.get(evn, {}).keys()) != streamset: + await self.new_data + self.new_data = asyncio.Future() + self.new_data = None + + event = EventData(event_number=evn, streams=self.stream_queues[evn]) self._logger.debug("adding event %s to buffer", event) self.buffer.append(event) diff --git a/dranspose/worker.py b/dranspose/worker.py index 3ceb3b2..834808d 100644 --- a/dranspose/worker.py +++ b/dranspose/worker.py @@ -18,7 +18,7 @@ import redis.exceptions as rexceptions from dranspose.distributed import DistributedService, DistributedSettings -from dranspose.event import InternalWorkerMessage, EventData, ResultData +from dranspose.event import InternalWorkerMessage, EventData, ResultData, StreamData from dranspose.helpers.utils import done_callback, cancel_and_wait from dranspose.protocol import ( WorkerState, @@ -34,6 +34,7 @@ GENERIC_WORKER, WorkerTag, WorkAssignmentList, + EventNumber, ) @@ -78,16 +79,18 @@ def __init__(self, settings: Optional[WorkerSettings] = None): self.ctx = zmq.asyncio.Context() self._ingesters: dict[IngesterName, ConnectedIngester] = {} - self._stream_map: dict[StreamName, zmq._future._AsyncSocket] = {} + self._ingester_tasks: list[Task[None]] = [] self.poll_task: Optional[Future[list[int]]] = None self._reducer_service_uuid: Optional[UUID4] = None self.out_socket: Optional[zmq._future._AsyncSocket] = None self.assignment_queue: asyncio.Queue[ - set[zmq._future._AsyncSocket] + tuple[EventNumber, set[StreamName]] ] = asyncio.Queue() - self.dequeue_task: Optional[Task[set[zmq._future._AsyncSocket]]] = None + self.dequeue_task: Optional[Task[tuple[EventNumber, set[StreamName]]]] = None + self.new_data: Future[None] | None = None + self.stream_queues: dict[EventNumber, dict[StreamName, StreamData]] = {} self.param_descriptions = [] self.custom = None @@ -123,6 +126,7 @@ async def run(self) -> None: self.manage_receiver_task = asyncio.create_task(self.manage_receiver()) self.manage_receiver_task.add_done_callback(done_callback) self.assignment_queue = asyncio.Queue() + self.start_receive_ingesters() self.work_task = asyncio.create_task(self.work()) self.work_task.add_done_callback(done_callback) self.assign_task = asyncio.create_task(self.manage_assignments()) @@ -163,58 +167,50 @@ async def manage_assignments(self) -> None: continue assignments = assignments[sub][0][0] self._logger.debug("got assignments %s", assignments) - self._logger.debug("stream map %s", self._stream_map) work_assignment_list = WorkAssignmentList.validate_json( assignments[1]["data"] ) for work_assignment in work_assignment_list: - ingesterset = set() + streamset: set[StreamName] = set() for stream, workers in work_assignment.assignments.items(): if self.state.name in workers: - try: - ingesterset.add(self._stream_map[stream]) - except KeyError: - self._logger.error( - "ingester for stream %s not connected, available: %s", - stream, - self._ingesters, - ) - self._logger.debug("receive from ingesters %s", ingesterset) - if len(ingesterset) > 0: - await self.assignment_queue.put(ingesterset) + streamset.add(stream) + self._logger.debug("receive from streams %s", streamset) + if len(streamset) > 0: + await self.assignment_queue.put( + (work_assignment.event_number, streamset) + ) lastev = assignments[0] - async def poll_internals( - self, ingesterset: set[zmq._future._AsyncSocket] - ) -> list[int]: - """ - A task to simultaneously check if there is a message available from all required ingesters - We do not yet receive from the zmq sockets as it is harder to cancel a receive call. - """ - self._logger.debug("poll internal sockets %s", ingesterset) - poll_tasks = [sock.poll() for sock in ingesterset] - self._logger.debug("await poll tasks %s", poll_tasks) - self.poll_task = asyncio.gather(*poll_tasks) - done = await self.poll_task - self._logger.debug("data is available done: %s", done) - return done - - async def build_event( - self, ingesterset: set[zmq._future._AsyncSocket] - ) -> EventData: - "All relevant ingester sockets have a message, receive it and assemble an EventData object" - msgs = [] - for sock in ingesterset: - res = await sock.recv_multipart(copy=False) + async def receive_ingester(self, conn_ing: ConnectedIngester) -> None: + while True: + res = await conn_ing.socket.recv_multipart(copy=False) + self._logger.debug( + "got zmq message from ingester: %s", conn_ing.config.name + ) prelim = json.loads(res[0].bytes) pos = 1 for stream, data in prelim["streams"].items(): data["frames"] = res[pos : pos + data["length"]] pos += data["length"] msg = InternalWorkerMessage.model_validate(prelim) - msgs.append(msg) - - return EventData.from_internals(msgs) + if msg.event_number not in self.stream_queues: + self.stream_queues[msg.event_number] = {} + self.stream_queues[msg.event_number].update(msg.streams) + self._logger.debug( + "added streams %s at event %d", msg.streams.keys(), msg.event_number + ) + if self.new_data is not None: + if not self.new_data.done(): + self.new_data.set_result(None) + + def start_receive_ingesters(self) -> None: + self._logger.info("starting ingester receiving tasks") + self.stream_queues = {} + for conn_ing in self._ingesters.values(): + t = asyncio.create_task(self.receive_ingester(conn_ing)) + t.add_done_callback(done_callback) + self._ingester_tasks.append(t) async def work(self) -> None: self._logger.info("started work task") @@ -245,16 +241,25 @@ async def work(self) -> None: self.dequeue_task = None self.dequeue_task = asyncio.create_task(self.assignment_queue.get()) - ingesterset = await self.dequeue_task + evn, streamset = await self.dequeue_task perf_got_assignments = time.perf_counter() - done = await self.poll_internals(ingesterset) - if set(done) != {zmq.POLLIN}: - self._logger.warning("not all sockets are pollIN %s", done) - continue + + self._logger.debug("looking for streams %s at event %d", streamset, evn) + self.new_data = asyncio.Future() + while set(self.stream_queues.get(evn, {}).keys()) != streamset: + self._logger.debug( + "keys did not match %s, %s, wait again", + self.stream_queues.get(evn, {}).keys(), + streamset, + ) + self._logger.debug("current queue is %s", self.stream_queues) + await self.new_data + self.new_data = asyncio.Future() + self.new_data = None perf_got_work = time.perf_counter() - event = await self.build_event(ingesterset) + event = EventData(event_number=evn, streams=self.stream_queues[evn]) perf_assembled_event = time.perf_counter() self._logger.debug("received work %s", event) @@ -326,6 +331,9 @@ async def work(self) -> None: if proced % 500 == 0: self._logger.info("processed %d events", proced) completed.append(event.event_number) + + del self.stream_queues[evn] + has_result.append(result is not None) times = WorkerTimes.from_timestamps( perf_start, @@ -388,13 +396,12 @@ async def restart_work( self, new_uuid: UUID4, active_streams: list[StreamName] ) -> None: self._logger.info("resetting config %s", new_uuid) - if self.poll_task: - await cancel_and_wait(self.poll_task) - self.poll_task = None - self._logger.debug("cancelled poll task") await cancel_and_wait(self.work_task) self._logger.info("clean up in sockets") await cancel_and_wait(self.assign_task) + self._logger.info("stop receiving") + for t in self._ingester_tasks: + await cancel_and_wait(t) for iname, ing in self._ingesters.items(): while True: res = await ing.socket.poll(timeout=0.001) @@ -406,6 +413,7 @@ async def restart_work( self.assignment_queue = asyncio.Queue() self.state.mapping_uuid = new_uuid + self.start_receive_ingesters() self.work_task = asyncio.create_task(self.work()) self.work_task.add_done_callback(done_callback) self.assign_task = asyncio.create_task(self.manage_assignments()) @@ -494,11 +502,6 @@ async def manage_ingesters(self) -> None: self._logger.info("removing stale ingester %s", iname) self._ingesters[iname].socket.close() del self._ingesters[iname] - self._stream_map = { - s: conn_ing.socket - for ing, conn_ing in self._ingesters.items() - for s in conn_ing.config.streams - } new_ingesters = [a.config for a in self._ingesters.values()] if self.state.ingesters != new_ingesters: self.state.ingesters = new_ingesters From b2c1d5b59d4dda01b57c0e6b69a141b959dccdb2 Mon Sep 17 00:00:00 2001 From: Felix Engelmann Date: Fri, 12 Sep 2025 18:00:41 -0400 Subject: [PATCH 2/8] cleanup on stop, still some tests hang --- dranspose/worker.py | 94 ++++++++++++++++++++++++--------------------- 1 file changed, 50 insertions(+), 44 deletions(-) diff --git a/dranspose/worker.py b/dranspose/worker.py index 834808d..ef0acb7 100644 --- a/dranspose/worker.py +++ b/dranspose/worker.py @@ -212,6 +212,42 @@ def start_receive_ingesters(self) -> None: t.add_done_callback(done_callback) self._ingester_tasks.append(t) + async def _run_payload(self, tick_wait_until, event): + tick = False + # we internally cache when the redis will expire to reduce hitting redis on every event + if tick_wait_until - time.time() < 0: + if hasattr(self.worker, "get_tick_interval"): + wait_ms = int(self.worker.get_tick_interval(self.parameters) * 1000) + dist_clock = await self.redis.set( + RedisKeys.clock(self.state.mapping_uuid), + "🕙", + px=wait_ms, + nx=True, + ) + if dist_clock is True: + tick = True + else: + expire_ms = await self.redis.pttl( + RedisKeys.clock(self.state.mapping_uuid) + ) + tick_wait_until = time.time() + (expire_ms / 1000) + try: + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + self.worker.process_event, + event, + self.parameters, + tick, + ) + except Exception as e: + self._logger.error( + "custom worker failed: %s\n%s", + e.__repr__(), + traceback.format_exc(), + ) + return tick_wait_until, result + async def work(self) -> None: self._logger.info("started work task") @@ -246,6 +282,7 @@ async def work(self) -> None: self._logger.debug("looking for streams %s at event %d", streamset, evn) self.new_data = asyncio.Future() + terminate = False while set(self.stream_queues.get(evn, {}).keys()) != streamset: self._logger.debug( "keys did not match %s, %s, wait again", @@ -253,8 +290,12 @@ async def work(self) -> None: streamset, ) self._logger.debug("current queue is %s", self.stream_queues) - await self.new_data + terminate = await self.new_data + if terminate is True: + break self.new_data = asyncio.Future() + if terminate: + break self.new_data = None perf_got_work = time.perf_counter() @@ -265,41 +306,10 @@ async def work(self) -> None: self._logger.debug("received work %s", event) result = None if self.worker: - tick = False - # we internally cache when the redis will expire to reduce hitting redis on every event - if tick_wait_until - time.time() < 0: - if hasattr(self.worker, "get_tick_interval"): - wait_ms = int( - self.worker.get_tick_interval(self.parameters) * 1000 - ) - dist_clock = await self.redis.set( - RedisKeys.clock(self.state.mapping_uuid), - "🕙", - px=wait_ms, - nx=True, - ) - if dist_clock is True: - tick = True - else: - expire_ms = await self.redis.pttl( - RedisKeys.clock(self.state.mapping_uuid) - ) - tick_wait_until = time.time() + (expire_ms / 1000) - try: - loop = asyncio.get_event_loop() - result = await loop.run_in_executor( - None, - self.worker.process_event, - event, - self.parameters, - tick, - ) - except Exception as e: - self._logger.error( - "custom worker failed: %s\n%s", - e.__repr__(), - traceback.format_exc(), - ) + tick_wait_until, result = await self._run_payload( + tick_wait_until, event + ) + perf_custom_code = time.perf_counter() self._logger.debug("got result %s", result) if result is not None: @@ -395,6 +405,8 @@ async def finish_work(self) -> None: async def restart_work( self, new_uuid: UUID4, active_streams: list[StreamName] ) -> None: + if self.new_data is not None: + self.new_data.set_result(True) self._logger.info("resetting config %s", new_uuid) await cancel_and_wait(self.work_task) self._logger.info("clean up in sockets") @@ -402,14 +414,6 @@ async def restart_work( self._logger.info("stop receiving") for t in self._ingester_tasks: await cancel_and_wait(t) - for iname, ing in self._ingesters.items(): - while True: - res = await ing.socket.poll(timeout=0.001) - if res == zmq.POLLIN: - await ing.socket.recv_multipart(copy=False) - self._logger.debug("discarded internal message from %s", iname) - else: - break self.assignment_queue = asyncio.Queue() self.state.mapping_uuid = new_uuid @@ -526,6 +530,8 @@ async def close(self) -> None: e.__repr__(), traceback.format_exc(), ) + if self.new_data is not None: + self.new_data.set_result(True) await cancel_and_wait(self.manage_ingester_task) await cancel_and_wait(self.manage_receiver_task) await cancel_and_wait(self.metrics_task) From b84a7c0346b651a7a2c0dbf8695eed020823aa16 Mon Sep 17 00:00:00 2001 From: Felix Engelmann Date: Fri, 12 Sep 2025 18:01:51 -0400 Subject: [PATCH 3/8] new parallel stins ingester --- dranspose/ingesters/__init__.py | 4 + dranspose/ingesters/stins_parallel.py | 96 +++++++++++++++++++++ tests/aux_payloads.py | 37 ++++++++ tests/test_stins_parallel.py | 120 ++++++++++++++++++++++++++ 4 files changed, 257 insertions(+) create mode 100644 dranspose/ingesters/stins_parallel.py create mode 100644 tests/aux_payloads.py create mode 100644 tests/test_stins_parallel.py diff --git a/dranspose/ingesters/__init__.py b/dranspose/ingesters/__init__.py index f49b7a0..03af265 100644 --- a/dranspose/ingesters/__init__.py +++ b/dranspose/ingesters/__init__.py @@ -10,6 +10,10 @@ ZmqPullSingleIngester, ZmqPullSingleSettings, ) +from dranspose.ingesters.stins_parallel import ( # noqa: F401 + StinsParallelIngester, + StinsParallelSettings, +) from dranspose.ingesters.zmqpull_eiger_legacy import ( # noqa: F401 ZmqPullEigerLegacyIngester, ZmqPullEigerLegacySettings, diff --git a/dranspose/ingesters/stins_parallel.py b/dranspose/ingesters/stins_parallel.py new file mode 100644 index 0000000..933f0d8 --- /dev/null +++ b/dranspose/ingesters/stins_parallel.py @@ -0,0 +1,96 @@ +import asyncio +from typing import AsyncGenerator, Optional + +import zmq + + +from dranspose.data.stream1 import Stream1Packet, Stream1End +from dranspose.event import StreamData, InternalWorkerMessage +from dranspose.ingester import Ingester, IngesterSettings +from dranspose.protocol import ( + StreamName, + ZmqUrl, + WorkAssignment, + WorkerName, +) + + +class StinsParallelSettings(IngesterSettings): + upstream_url: ZmqUrl + + +class StinsParallelIngester(Ingester): + """ + A simple ingester class to comsume a stream from the streaming-receiver repub port + """ + + def __init__(self, settings: Optional[StinsParallelSettings] = None) -> None: + if settings is not None: + self._streaming_single_settings = settings + else: + self._streaming_single_settings = StinsParallelSettings() + + super().__init__(settings=self._streaming_single_settings) + self.in_socket: Optional[zmq._future._AsyncSocket] = None + + async def work(self) -> None: + self._logger.info("started stins ingester work task") + + if len(self.active_streams) == 0: + self._logger.warning("this ingester has no active streams, stopping worker") + return + sourcegen = self.run_source(self.active_streams[0]) + try: + while True: + nextiwm: InternalWorkerMessage = await anext(sourcegen) + + work_assignment: WorkAssignment = await self.assignment_queue.get() + while work_assignment.event_number < nextiwm.event_number: + work_assignment: WorkAssignment = await self.assignment_queue.get() + + workermessages: dict[WorkerName, InternalWorkerMessage] = {} + for stream, workers in work_assignment.assignments.items(): + for worker in workers: + if worker not in workermessages: + workermessages[worker] = nextiwm + self._logger.debug("workermessages %s", workermessages) + await self._send_workermessages(workermessages) + self.state.processed_events += 1 + except asyncio.exceptions.CancelledError: + self._logger.info("stopping worker") + for stream in self.active_streams: + await self.stop_source(stream) + + async def run_source( + self, stream: StreamName + ) -> AsyncGenerator[InternalWorkerMessage, None]: + self.in_socket = self.ctx.socket(zmq.PULL) + self.in_socket.connect(str(self._streaming_single_settings.upstream_url)) + self._logger.info( + "pulling from %s", self._streaming_single_settings.upstream_url + ) + + while True: + parts = await self.in_socket.recv_multipart(copy=False) + try: + packet = Stream1Packet.validate_json(parts[0].bytes) + except Exception as e: + self._logger.error("packet not valid %s", e.__repr__()) + continue + self._logger.debug("msg number %d", packet.msg_number) + yield InternalWorkerMessage( + event_number=packet.msg_number, + streams={stream: StreamData(typ="STINS", frames=parts)}, + ) + + if isinstance(packet, Stream1End): + break + while True: + self._logger.debug("discarding messages until next run") + await self.in_socket.recv_multipart(copy=False) + + async def stop_source(self, stream: StreamName) -> None: + if self.in_socket: + self._logger.info("closing socket without linger") + self.in_socket.close(linger=0) + self.in_socket = None diff --git a/tests/aux_payloads.py b/tests/aux_payloads.py new file mode 100644 index 0000000..57f13ae --- /dev/null +++ b/tests/aux_payloads.py @@ -0,0 +1,37 @@ +import json +import logging +from typing import Any + +from dranspose.event import EventData, ResultData +from dranspose.protocol import ReducerState + + +class TestWorker: + def __init__(self, **kwargs: Any) -> None: + pass + + def process_event(self, event: EventData, *args, **kwargs): + for stream in event.streams: + for i, _ in enumerate(event.streams[stream].frames): + event.streams[stream].frames[i] = ( + event.streams[stream].frames[i].bytes[:500] + ) + return event, args, kwargs + + +class TestReducer: + def __init__(self, state: ReducerState | None = None, **kwargs: dict) -> None: + self.publish: dict[str, dict] = {"results": {}, "parameters": {}} + + def process_result( + self, result: ResultData, parameters: dict | None = None + ) -> None: + logging.info("parameters are %s", parameters) + self.publish["results"][str(result.event_number)] = { + k: json.loads(v.frames[0]) if v.typ == "STINS" else "blob" + for k, v in result.payload[0].streams.items() + } + self.publish["parameters"][result.event_number] = parameters + + def finish(self, parameters: dict | None = None) -> None: + print("finished dummy reducer work") diff --git a/tests/test_stins_parallel.py b/tests/test_stins_parallel.py new file mode 100644 index 0000000..6563348 --- /dev/null +++ b/tests/test_stins_parallel.py @@ -0,0 +1,120 @@ +import asyncio +import logging +from typing import Awaitable, Callable, Any, Coroutine, Optional +import h5pyd + +import aiohttp +import numpy as np + +import pytest +from _pytest.fixtures import FixtureRequest +import zmq.asyncio +import zmq +from pydantic_core import Url + +from dranspose.ingester import Ingester +from dranspose.ingesters.stins_parallel import ( + StinsParallelIngester, + StinsParallelSettings, +) + +from dranspose.protocol import ( + StreamName, + WorkerName, + VirtualWorker, + VirtualConstraint, +) + +from dranspose.worker import Worker, WorkerSettings + +from tests.utils import wait_for_controller, wait_for_finish + + +@pytest.mark.asyncio +async def test_parallel( + request: FixtureRequest, + controller: None, + reducer: Callable[[Optional[str]], Awaitable[None]], + create_worker: Callable[[Worker], Awaitable[Worker]], + create_ingester: Callable[[Ingester], Awaitable[Ingester]], + stream_eiger: Callable[[zmq.Context[Any], int, int], Coroutine[Any, Any, None]], +) -> None: + await reducer("tests.aux_payloads:TestReducer") + await create_worker( + Worker( + settings=WorkerSettings( + worker_name=WorkerName("w1"), + worker_class="tests.aux_payloads:TestWorker", + ), + ) + ) + ing1 = await create_ingester( + StinsParallelIngester( + settings=StinsParallelSettings( + ingester_streams=[StreamName("eiger")], + upstream_url=Url("tcp://localhost:9999"), + ), + ) + ) + ing2 = await create_ingester( + StinsParallelIngester( + settings=StinsParallelSettings( + ingester_name="eiger-2", + ingester_streams=[StreamName("eiger")], + upstream_url=Url("tcp://localhost:9999"), + ingester_url=Url("tcp://localhost:10011"), + ), + ) + ) + + async with aiohttp.ClientSession() as session: + await wait_for_controller( + streams={StreamName("eiger")}, workers={WorkerName("w1")} + ) + + ntrig = 10 + resp = await session.post( + "http://localhost:5000/api/v1/mapping", + json={ + "eiger": [ + [ + VirtualWorker(constraint=VirtualConstraint(2 * i)).model_dump( + mode="json" + ) + ] + for i in range(1, ntrig) + ], + }, + ) + assert resp.status == 200 + + with zmq.asyncio.Context() as context: + asyncio.create_task(stream_eiger(context, 9999, ntrig - 1)) + + content = await wait_for_finish() + + assert content == { + "last_assigned": ntrig + 1, + "completed_events": ntrig + 1, + "total_events": ntrig + 1, + "finished": True, + } + + logging.info("proc1 ev %d", ing1.state.processed_events) + logging.info("proc2 ev %d", ing2.state.processed_events) + assert ing1.state.processed_events < ntrig + assert ing2.state.processed_events < ntrig + assert ing1.state.processed_events + ing2.state.processed_events == ntrig + 1 + + def work() -> None: + f = h5pyd.File("http://localhost:5001/", "r") + assert f[f"results/{0}/eiger/htype"][()] == b"header" + assert f[f"results/{ntrig}/eiger/htype"][()] == b"series_end" + for i in range(1, ntrig): + assert f[f"results/{i}/eiger/msg_number"][()] == i + assert f[f"results/{i}/eiger/htype"][()] == b"image" + assert f[f"results/{i}/eiger/frame"][()] == i - 1 + assert np.array_equal(f[f"results/{i}/eiger/shape"][:], [1475, 831]) + + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, work) From 0df066a41ce040bc7b8e833abc17ea7db9f13d70 Mon Sep 17 00:00:00 2001 From: Felix Engelmann Date: Fri, 12 Sep 2025 18:23:06 -0400 Subject: [PATCH 4/8] typing --- dranspose/ingesters/stins_parallel.py | 6 +- dranspose/worker.py | 83 ++++++++++++++------------- tests/aux_payloads.py | 32 +++++++---- tests/test_stins_parallel.py | 3 +- 4 files changed, 70 insertions(+), 54 deletions(-) diff --git a/dranspose/ingesters/stins_parallel.py b/dranspose/ingesters/stins_parallel.py index 933f0d8..dca24b8 100644 --- a/dranspose/ingesters/stins_parallel.py +++ b/dranspose/ingesters/stins_parallel.py @@ -39,14 +39,14 @@ async def work(self) -> None: if len(self.active_streams) == 0: self._logger.warning("this ingester has no active streams, stopping worker") return - sourcegen = self.run_source(self.active_streams[0]) + sourcegen = self.run_source_part(self.active_streams[0]) try: while True: nextiwm: InternalWorkerMessage = await anext(sourcegen) work_assignment: WorkAssignment = await self.assignment_queue.get() while work_assignment.event_number < nextiwm.event_number: - work_assignment: WorkAssignment = await self.assignment_queue.get() + work_assignment = await self.assignment_queue.get() workermessages: dict[WorkerName, InternalWorkerMessage] = {} for stream, workers in work_assignment.assignments.items(): @@ -61,7 +61,7 @@ async def work(self) -> None: for stream in self.active_streams: await self.stop_source(stream) - async def run_source( + async def run_source_part( self, stream: StreamName ) -> AsyncGenerator[InternalWorkerMessage, None]: self.in_socket = self.ctx.socket(zmq.PULL) diff --git a/dranspose/worker.py b/dranspose/worker.py index ef0acb7..c41deca 100644 --- a/dranspose/worker.py +++ b/dranspose/worker.py @@ -89,7 +89,7 @@ def __init__(self, settings: Optional[WorkerSettings] = None): tuple[EventNumber, set[StreamName]] ] = asyncio.Queue() self.dequeue_task: Optional[Task[tuple[EventNumber, set[StreamName]]]] = None - self.new_data: Future[None] | None = None + self.new_data: Future[bool] | None = None self.stream_queues: dict[EventNumber, dict[StreamName, StreamData]] = {} self.param_descriptions = [] @@ -202,7 +202,7 @@ async def receive_ingester(self, conn_ing: ConnectedIngester) -> None: ) if self.new_data is not None: if not self.new_data.done(): - self.new_data.set_result(None) + self.new_data.set_result(False) def start_receive_ingesters(self) -> None: self._logger.info("starting ingester receiving tasks") @@ -212,40 +212,44 @@ def start_receive_ingesters(self) -> None: t.add_done_callback(done_callback) self._ingester_tasks.append(t) - async def _run_payload(self, tick_wait_until, event): - tick = False - # we internally cache when the redis will expire to reduce hitting redis on every event - if tick_wait_until - time.time() < 0: - if hasattr(self.worker, "get_tick_interval"): - wait_ms = int(self.worker.get_tick_interval(self.parameters) * 1000) - dist_clock = await self.redis.set( - RedisKeys.clock(self.state.mapping_uuid), - "🕙", - px=wait_ms, - nx=True, - ) - if dist_clock is True: - tick = True - else: - expire_ms = await self.redis.pttl( - RedisKeys.clock(self.state.mapping_uuid) + async def _run_payload( + self, tick_wait_until: float, event: EventData + ) -> tuple[float, ResultData | None]: + result = None + if self.worker: + tick = False + # we internally cache when the redis will expire to reduce hitting redis on every event + if tick_wait_until - time.time() < 0: + if hasattr(self.worker, "get_tick_interval"): + wait_ms = int(self.worker.get_tick_interval(self.parameters) * 1000) + dist_clock = await self.redis.set( + RedisKeys.clock(self.state.mapping_uuid), + "🕙", + px=wait_ms, + nx=True, ) - tick_wait_until = time.time() + (expire_ms / 1000) - try: - loop = asyncio.get_event_loop() - result = await loop.run_in_executor( - None, - self.worker.process_event, - event, - self.parameters, - tick, - ) - except Exception as e: - self._logger.error( - "custom worker failed: %s\n%s", - e.__repr__(), - traceback.format_exc(), - ) + if dist_clock is True: + tick = True + else: + expire_ms = await self.redis.pttl( + RedisKeys.clock(self.state.mapping_uuid) + ) + tick_wait_until = time.time() + (expire_ms / 1000) + try: + loop = asyncio.get_event_loop() + result = await loop.run_in_executor( + None, + self.worker.process_event, + event, + self.parameters, + tick, + ) + except Exception as e: + self._logger.error( + "custom worker failed: %s\n%s", + e.__repr__(), + traceback.format_exc(), + ) return tick_wait_until, result async def work(self) -> None: @@ -304,11 +308,10 @@ async def work(self) -> None: perf_assembled_event = time.perf_counter() self._logger.debug("received work %s", event) - result = None - if self.worker: - tick_wait_until, result = await self._run_payload( - tick_wait_until, event - ) + + tick_wait_until, result = await self._run_payload( + tick_wait_until, event + ) perf_custom_code = time.perf_counter() self._logger.debug("got result %s", result) diff --git a/tests/aux_payloads.py b/tests/aux_payloads.py index 57f13ae..e097158 100644 --- a/tests/aux_payloads.py +++ b/tests/aux_payloads.py @@ -2,29 +2,39 @@ import logging from typing import Any +import zmq + from dranspose.event import EventData, ResultData -from dranspose.protocol import ReducerState +from dranspose.protocol import ReducerState, WorkParameter, ParameterName class TestWorker: def __init__(self, **kwargs: Any) -> None: pass - def process_event(self, event: EventData, *args, **kwargs): + def process_event( + self, event: EventData, *args: tuple[Any, ...], **kwargs: dict[str, Any] + ) -> tuple[EventData, tuple[Any, ...], dict[str, Any]]: for stream in event.streams: - for i, _ in enumerate(event.streams[stream].frames): - event.streams[stream].frames[i] = ( - event.streams[stream].frames[i].bytes[:500] - ) + frame: zmq.Frame | bytes + for i, frame in enumerate(event.streams[stream].frames): + if isinstance(frame, zmq.Frame): + event.streams[stream].frames[i] = frame.bytes[:500] + elif isinstance(frame, bytes): + event.streams[stream].frames[i] = frame[:500] return event, args, kwargs class TestReducer: - def __init__(self, state: ReducerState | None = None, **kwargs: dict) -> None: - self.publish: dict[str, dict] = {"results": {}, "parameters": {}} + def __init__( + self, state: ReducerState | None = None, **kwargs: dict[str, Any] + ) -> None: + self.publish: dict[str, dict[Any, Any]] = {"results": {}, "parameters": {}} def process_result( - self, result: ResultData, parameters: dict | None = None + self, + result: ResultData, + parameters: dict[ParameterName, WorkParameter] | None = None, ) -> None: logging.info("parameters are %s", parameters) self.publish["results"][str(result.event_number)] = { @@ -33,5 +43,7 @@ def process_result( } self.publish["parameters"][result.event_number] = parameters - def finish(self, parameters: dict | None = None) -> None: + def finish( + self, parameters: dict[ParameterName, WorkParameter] | None = None + ) -> None: print("finished dummy reducer work") diff --git a/tests/test_stins_parallel.py b/tests/test_stins_parallel.py index 6563348..c07527b 100644 --- a/tests/test_stins_parallel.py +++ b/tests/test_stins_parallel.py @@ -23,6 +23,7 @@ WorkerName, VirtualWorker, VirtualConstraint, + IngesterName, ) from dranspose.worker import Worker, WorkerSettings @@ -59,7 +60,7 @@ async def test_parallel( ing2 = await create_ingester( StinsParallelIngester( settings=StinsParallelSettings( - ingester_name="eiger-2", + ingester_name=IngesterName("eiger-2"), ingester_streams=[StreamName("eiger")], upstream_url=Url("tcp://localhost:9999"), ingester_url=Url("tcp://localhost:10011"), From b02582e755d0df0672a09807e1a98ad274904bc7 Mon Sep 17 00:00:00 2001 From: Felix Engelmann Date: Fri, 3 Oct 2025 12:44:36 -0400 Subject: [PATCH 5/8] use event instead of future --- dranspose/worker.py | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/dranspose/worker.py b/dranspose/worker.py index c41deca..a26e5ad 100644 --- a/dranspose/worker.py +++ b/dranspose/worker.py @@ -89,7 +89,8 @@ def __init__(self, settings: Optional[WorkerSettings] = None): tuple[EventNumber, set[StreamName]] ] = asyncio.Queue() self.dequeue_task: Optional[Task[tuple[EventNumber, set[StreamName]]]] = None - self.new_data: Future[bool] | None = None + self.new_data = asyncio.Event() + self.should_terminate = False self.stream_queues: dict[EventNumber, dict[StreamName, StreamData]] = {} self.param_descriptions = [] @@ -200,9 +201,7 @@ async def receive_ingester(self, conn_ing: ConnectedIngester) -> None: self._logger.debug( "added streams %s at event %d", msg.streams.keys(), msg.event_number ) - if self.new_data is not None: - if not self.new_data.done(): - self.new_data.set_result(False) + self.new_data.set() def start_receive_ingesters(self) -> None: self._logger.info("starting ingester receiving tasks") @@ -269,6 +268,7 @@ async def work(self) -> None: ) await self.notify_worker_ready() + self.should_terminate = False try: proced = 0 completed = [] @@ -285,8 +285,7 @@ async def work(self) -> None: perf_got_assignments = time.perf_counter() self._logger.debug("looking for streams %s at event %d", streamset, evn) - self.new_data = asyncio.Future() - terminate = False + self.new_data.clear() while set(self.stream_queues.get(evn, {}).keys()) != streamset: self._logger.debug( "keys did not match %s, %s, wait again", @@ -294,13 +293,12 @@ async def work(self) -> None: streamset, ) self._logger.debug("current queue is %s", self.stream_queues) - terminate = await self.new_data - if terminate is True: + await self.new_data.wait() + if self.should_terminate: break - self.new_data = asyncio.Future() - if terminate: + self.new_data.clear() + if self.should_terminate: break - self.new_data = None perf_got_work = time.perf_counter() @@ -408,8 +406,8 @@ async def finish_work(self) -> None: async def restart_work( self, new_uuid: UUID4, active_streams: list[StreamName] ) -> None: - if self.new_data is not None: - self.new_data.set_result(True) + self.should_terminate = True + self.new_data.set() self._logger.info("resetting config %s", new_uuid) await cancel_and_wait(self.work_task) self._logger.info("clean up in sockets") @@ -533,8 +531,8 @@ async def close(self) -> None: e.__repr__(), traceback.format_exc(), ) - if self.new_data is not None: - self.new_data.set_result(True) + self.should_terminate = True + self.new_data.set() await cancel_and_wait(self.manage_ingester_task) await cancel_and_wait(self.manage_receiver_task) await cancel_and_wait(self.metrics_task) From 3d99bf71f4abdcc880ae7645f77ba3230deee556 Mon Sep 17 00:00:00 2001 From: Felix Engelmann Date: Fri, 3 Oct 2025 12:56:46 -0400 Subject: [PATCH 6/8] use event in debugworker --- dranspose/debug_worker.py | 7 +++---- 1 file changed, 3 insertions(+), 4 deletions(-) diff --git a/dranspose/debug_worker.py b/dranspose/debug_worker.py index a5f4962..e8fd8a9 100644 --- a/dranspose/debug_worker.py +++ b/dranspose/debug_worker.py @@ -30,11 +30,10 @@ async def work(self) -> None: self.dequeue_task = asyncio.create_task(self.assignment_queue.get()) evn, streamset = await self.dequeue_task - self.new_data = asyncio.Future() + self.new_data.clear() while set(self.stream_queues.get(evn, {}).keys()) != streamset: - await self.new_data - self.new_data = asyncio.Future() - self.new_data = None + await self.new_data.wait() + self.new_data.clear() event = EventData(event_number=evn, streams=self.stream_queues[evn]) self._logger.debug("adding event %s to buffer", event) From b71d0724df58d06f06c5000018c198360010a772 Mon Sep 17 00:00:00 2001 From: Felix Engelmann Date: Fri, 10 Oct 2025 16:03:09 -0400 Subject: [PATCH 7/8] start testing vds --- tests/test_stins_parallel.py | 105 ++++++++++++++++++++++++++--------- 1 file changed, 79 insertions(+), 26 deletions(-) diff --git a/tests/test_stins_parallel.py b/tests/test_stins_parallel.py index c07527b..3712304 100644 --- a/tests/test_stins_parallel.py +++ b/tests/test_stins_parallel.py @@ -3,11 +3,9 @@ from typing import Awaitable, Callable, Any, Coroutine, Optional import h5pyd -import aiohttp import numpy as np import pytest -from _pytest.fixtures import FixtureRequest import zmq.asyncio import zmq from pydantic_core import Url @@ -21,19 +19,24 @@ from dranspose.protocol import ( StreamName, WorkerName, - VirtualWorker, - VirtualConstraint, IngesterName, + WorkerTag, ) from dranspose.worker import Worker, WorkerSettings -from tests.utils import wait_for_controller, wait_for_finish +from tests.utils import ( + wait_for_controller, + wait_for_finish, + set_sequence, + monopart_sequence, + vworker, + set_uniform_sequence, +) @pytest.mark.asyncio -async def test_parallel( - request: FixtureRequest, +async def est_parallel( controller: None, reducer: Callable[[Optional[str]], Awaitable[None]], create_worker: Callable[[Worker], Awaitable[Worker]], @@ -68,26 +71,10 @@ async def test_parallel( ) ) - async with aiohttp.ClientSession() as session: - await wait_for_controller( - streams={StreamName("eiger")}, workers={WorkerName("w1")} - ) + await wait_for_controller(streams={StreamName("eiger")}, workers={WorkerName("w1")}) - ntrig = 10 - resp = await session.post( - "http://localhost:5000/api/v1/mapping", - json={ - "eiger": [ - [ - VirtualWorker(constraint=VirtualConstraint(2 * i)).model_dump( - mode="json" - ) - ] - for i in range(1, ntrig) - ], - }, - ) - assert resp.status == 200 + ntrig = 10 + await set_uniform_sequence(streams={StreamName("eiger")}, ntrig=ntrig) with zmq.asyncio.Context() as context: asyncio.create_task(stream_eiger(context, 9999, ntrig - 1)) @@ -119,3 +106,69 @@ def work() -> None: loop = asyncio.get_event_loop() await loop.run_in_executor(None, work) + + +@pytest.mark.asyncio +async def test_virtualds( + controller: None, + reducer: Callable[[Optional[str]], Awaitable[None]], + create_worker: Callable[[Worker], Awaitable[Worker]], + create_ingester: Callable[[Ingester], Awaitable[Ingester]], + stream_eiger: Callable[[zmq.Context[Any], int, int], Coroutine[Any, Any, None]], +) -> None: + await reducer("tests.aux_payloads:TestReducer") + await create_worker( + Worker( + settings=WorkerSettings( + worker_name=WorkerName("Weven"), + worker_tags={WorkerTag("even")}, + worker_class="tests.test_stins_parallel:VirtualWorker", + ), + ) + ) + await create_worker( + Worker( + settings=WorkerSettings( + worker_name=WorkerName("Wodd"), + worker_tags={WorkerTag("even")}, + worker_class="tests.test_stins_parallel:VirtualWorker", + ), + ) + ) + await create_ingester( + StinsParallelIngester( + settings=StinsParallelSettings( + ingester_streams=[StreamName("eiger")], + upstream_url=Url("tcp://localhost:9999"), + ), + ) + ) + await create_ingester( + StinsParallelIngester( + settings=StinsParallelSettings( + ingester_name=IngesterName("eiger-2"), + ingester_streams=[StreamName("eiger")], + upstream_url=Url("tcp://localhost:9999"), + ingester_url=Url("tcp://localhost:10011"), + ), + ) + ) + + await wait_for_controller(streams={StreamName("eiger")}, workers={WorkerName("w1")}) + + ntrig = 10 + await set_sequence( + monopart_sequence( + { + "eiger": [ + [vworker(tags={"even" if i % 2 else "odd"})] + for i in range(1, ntrig) + ], + } + ) + ) + + with zmq.asyncio.Context() as context: + asyncio.create_task(stream_eiger(context, 9999, ntrig - 1)) + + await wait_for_finish() From f33f68f3c2b0c11310795ac09c55af35f56c99e8 Mon Sep 17 00:00:00 2001 From: Felix Engelmann Date: Fri, 30 Jan 2026 19:25:38 -0500 Subject: [PATCH 8/8] added stream1 parallel ingester --- dranspose/ingesters/stream1_parallel.py | 55 +++++++++++++ tests/aux_payloads.py | 2 +- tests/test_data_streams.py | 5 ++ tests/test_stins_parallel.py | 100 +++++++++++++++++++++++- tests/utils.py | 4 +- 5 files changed, 161 insertions(+), 5 deletions(-) create mode 100644 dranspose/ingesters/stream1_parallel.py diff --git a/dranspose/ingesters/stream1_parallel.py b/dranspose/ingesters/stream1_parallel.py new file mode 100644 index 0000000..c0c87f7 --- /dev/null +++ b/dranspose/ingesters/stream1_parallel.py @@ -0,0 +1,55 @@ +from typing import Optional, AsyncGenerator +import zmq + +from dranspose.data.eiger_legacy import ( + EigerLegacyPacket, + EigerLegacyEnd, + EigerLegacyImage, + EigerLegacyHeader, +) +from dranspose.event import InternalWorkerMessage, StreamData +from dranspose.ingesters.stins_parallel import StinsParallelIngester +from dranspose.ingesters.zmqpull_eiger_legacy import ZmqPullEigerLegacySettings +from dranspose.protocol import StreamName, EventNumber + + +class Stream1ParallelIngester(StinsParallelIngester): + def __init__(self, settings: Optional[ZmqPullEigerLegacySettings] = None) -> None: + if settings is not None: + self._streaming_settings = settings + else: + self._streaming_settings = ZmqPullEigerLegacySettings() + + super().__init__(settings=self._streaming_settings) + self.in_socket: Optional[zmq._future._AsyncSocket] = None + + async def run_source_part( + self, stream: StreamName + ) -> AsyncGenerator[InternalWorkerMessage, None]: + self.in_socket = self.ctx.socket(zmq.PULL) + self.in_socket.connect(str(self._streaming_settings.upstream_url)) + self._logger.info("pulling from %s", self._streaming_settings.upstream_url) + + while True: + parts = await self.in_socket.recv_multipart(copy=False) + try: + packet = EigerLegacyPacket.validate_json(parts[0].bytes) + except Exception as e: + self._logger.error("packet not valid %s", e.__repr__()) + continue + msg_number = None + if isinstance(packet, EigerLegacyImage): + msg_number = EventNumber(packet.frame + 1) + elif isinstance(packet, EigerLegacyHeader): + msg_number = EventNumber(0) + elif isinstance(packet, EigerLegacyEnd): + break + self._logger.debug("msg number %d", msg_number) + yield InternalWorkerMessage( + event_number=msg_number, + streams={stream: StreamData(typ="EIGER_LEGACY", frames=parts)}, + ) + + while True: + self._logger.debug("discarding messages until next run") + await self.in_socket.recv_multipart(copy=False) diff --git a/tests/aux_payloads.py b/tests/aux_payloads.py index fc1daca..2d59069 100644 --- a/tests/aux_payloads.py +++ b/tests/aux_payloads.py @@ -38,7 +38,7 @@ def process_result( ) -> None: logging.info("parameters are %s", parameters) self.publish["results"][str(result.event_number)] = { - k: json.loads(v.frames[0]) if v.typ == "STINS" else "blob" + k: json.loads(v.frames[0]) if v.typ in ["STINS", "EIGER_LEGACY"] else "blob" for k, v in result.payload[0].streams.items() } self.publish["parameters"][str(result.event_number)] = parameters diff --git a/tests/test_data_streams.py b/tests/test_data_streams.py index 0299ee0..b36a48c 100644 --- a/tests/test_data_streams.py +++ b/tests/test_data_streams.py @@ -1,3 +1,4 @@ +import logging import pickle import cbor2 @@ -291,15 +292,19 @@ def test_albaem_stream() -> None: def test_eiger_legacy_stream() -> None: with open("tests/data/eiger-small.cbors", "rb") as f: + count = 0 while True: try: frames = cbor2.load(f) pkg = EigerLegacyPacket.validate_json(frames[0]) + logging.debug("pkg %s", pkg) if isinstance(pkg, EigerLegacyHeader): assert pkg.header_detail == "all" elif isinstance(pkg, EigerLegacyImage): assert pkg.frame >= 0 else: assert pkg.series == 6 + count += 1 except EOFError: break + assert count == 5 diff --git a/tests/test_stins_parallel.py b/tests/test_stins_parallel.py index 3712304..890ea92 100644 --- a/tests/test_stins_parallel.py +++ b/tests/test_stins_parallel.py @@ -1,5 +1,7 @@ import asyncio import logging +import os +from pathlib import PosixPath from typing import Awaitable, Callable, Any, Coroutine, Optional import h5pyd @@ -15,12 +17,15 @@ StinsParallelIngester, StinsParallelSettings, ) +from dranspose.ingesters.stream1_parallel import Stream1ParallelIngester +from dranspose.ingesters.zmqpull_eiger_legacy import ZmqPullEigerLegacySettings from dranspose.protocol import ( StreamName, WorkerName, IngesterName, WorkerTag, + MappingName, ) from dranspose.worker import Worker, WorkerSettings @@ -32,11 +37,12 @@ monopart_sequence, vworker, set_uniform_sequence, + uniform_sequence, ) @pytest.mark.asyncio -async def est_parallel( +async def test_parallel( controller: None, reducer: Callable[[Optional[str]], Awaitable[None]], create_worker: Callable[[Worker], Awaitable[Worker]], @@ -109,7 +115,97 @@ def work() -> None: @pytest.mark.asyncio -async def test_virtualds( +async def test_stream1( + controller: None, + reducer: Callable[[Optional[str]], Awaitable[None]], + create_worker: Callable[[Worker], Awaitable[Worker]], + create_ingester: Callable[[Ingester], Awaitable[Ingester]], + stream_cbors: Callable[ + [zmq.Context[Any], int, os.PathLike[Any] | str, float, int], + Coroutine[Any, Any, None], + ], +) -> None: + await reducer("tests.aux_payloads:TestReducer") + await create_worker( + Worker( + settings=WorkerSettings( + worker_name=WorkerName("w1"), + worker_class="tests.aux_payloads:TestWorker", + ), + ) + ) + ing1 = await create_ingester( + Stream1ParallelIngester( + settings=ZmqPullEigerLegacySettings( + ingester_streams=[StreamName("eiger")], + upstream_url=Url("tcp://localhost:9999"), + ), + ) + ) + ing2 = await create_ingester( + Stream1ParallelIngester( + settings=ZmqPullEigerLegacySettings( + ingester_name=IngesterName("eiger-2"), + ingester_streams=[StreamName("eiger")], + upstream_url=Url("tcp://localhost:9999"), + ingester_url=Url("tcp://localhost:10011"), + ), + ) + ) + + await wait_for_controller(streams={StreamName("eiger")}, workers={WorkerName("w1")}) + + ntrig = 4 + seq = uniform_sequence(streams={StreamName("eiger")}, ntrig=ntrig) + start_part = {"eiger": [[vworker()]]} + seq["parts"]["start"] = start_part + seq["sequence"].insert(0, MappingName("start")) + logging.info("sequence %s", seq) + await set_sequence(seq, all_wrap=False) + + with zmq.asyncio.Context() as context: + asyncio.create_task( + stream_cbors( + context, + 9999, + PosixPath("tests/data/eiger-small.cbors"), + 0.1, + zmq.PUSH, + begin=0, + ) + ) + + content = await wait_for_finish() + + assert content == { + "last_assigned": ntrig, + "completed_events": ntrig, + "total_events": ntrig, + "finished": True, + } + + logging.info("proc1 ev %d", ing1.state.processed_events) + logging.info("proc2 ev %d", ing2.state.processed_events) + assert ing1.state.processed_events < ntrig + assert ing2.state.processed_events < ntrig + assert ing1.state.processed_events + ing2.state.processed_events == ntrig + + def work() -> None: + f = h5pyd.File("http://localhost:5001/", "r") + assert f[f"results/{0}/eiger/htype"][()] == b"dheader-1.0" + # The last message is lost with the parallel ingester for Stream1 + # assert f[f"results/{ntrig}/eiger/htype"][()] == b"series_end" + for i in range(1, ntrig): + # assert f[f"results/{i}/eiger/msg_number"][()] == i + assert f[f"results/{i}/eiger/htype"][()] == b"dimage-1.0" + assert f[f"results/{i}/eiger/frame"][()] == i - 1 + + loop = asyncio.get_event_loop() + await loop.run_in_executor(None, work) + + +@pytest.mark.asyncio +async def est_virtualds( controller: None, reducer: Callable[[Optional[str]], Awaitable[None]], create_worker: Callable[[Worker], Awaitable[Worker]], diff --git a/tests/utils.py b/tests/utils.py index b13fd77..cdd19ac 100644 --- a/tests/utils.py +++ b/tests/utils.py @@ -121,10 +121,10 @@ def uniform_sequence(streams: set[StreamName], ntrig: int) -> dict[str, Any]: ) -async def set_sequence(sequence: dict[Any, Any]) -> str: +async def set_sequence(sequence: dict[Any, Any], all_wrap: bool = True) -> str: async with aiohttp.ClientSession() as session: resp = await session.post( - "http://localhost:5000/api/v1/sequence/", json=sequence + f"http://localhost:5000/api/v1/sequence/?all_wrap={all_wrap}", json=sequence ) if resp.status != 200: print("sent", sequence)