From 9f81bc4b7a004e5319614f3fb7bfa32804d575b3 Mon Sep 17 00:00:00 2001 From: Bob the mob Date: Tue, 9 Dec 2025 15:06:59 +0100 Subject: [PATCH 1/6] added loop and latency args --- dranspose/cli.py | 11 +++++++++++ dranspose/replay.py | 11 +++++++---- 2 files changed, 18 insertions(+), 4 deletions(-) diff --git a/dranspose/cli.py b/dranspose/cli.py index 2405273..b882778 100644 --- a/dranspose/cli.py +++ b/dranspose/cli.py @@ -201,6 +201,8 @@ def replay(args: argparse.Namespace) -> None: args.port, keepalive, args.nworkers, + latency=args.latency, + loop=args.loop, ) @@ -306,6 +308,15 @@ def create_parser() -> argparse.ArgumentParser: default=2, type=int, ) + parser_replay.add_argument( + "--latency", + help="time in seconds between replay frames", + default=0, + type=float, + ) + parser_replay.add_argument( + "--loop", action="store_true", help="continuously loop over replay data" + ) return parser diff --git a/dranspose/replay.py b/dranspose/replay.py index 4505519..418135a 100644 --- a/dranspose/replay.py +++ b/dranspose/replay.py @@ -9,6 +9,7 @@ import time import traceback from typing import ContextManager, Iterator, Any, Optional, IO, Tuple +import itertools import cbor2 import uvicorn @@ -257,7 +258,8 @@ def replay( broadcast_first: bool = True, done_event: threading.Event | None = None, start_event: threading.Event | None = None, - latency: float | None = None, + latency: float = 0, + loop: bool = False, ) -> None: if source is not None: sourcecls = utils.import_class(source) @@ -267,6 +269,8 @@ def replay( gens = [get_internals(f) for f in zmq_files] else: gens = [] + if loop: + gens = list(map(itertools.cycle, gens)) workercls = utils.import_class(wclass) logger.info("custom worker class %s", workercls) @@ -347,9 +351,9 @@ def replay( reducer_app.state.parameters, tick, ) - if latency is not None: - time.sleep(latency) + time.sleep(latency) except StopIteration: + # gens = [get_internals(f) for f in zmq_files] logger.debug("end of replay, calling finish") _finish(workers, reducer, reducer_app.state.parameters) break @@ -366,4 +370,3 @@ def replay( stop_event.wait() except KeyboardInterrupt: pass - logger.info("replay finished") From 0bbd1737c5f45413680be5dc074cde47f29db26e Mon Sep 17 00:00:00 2001 From: Bob the mob Date: Thu, 26 Feb 2026 17:05:21 +0100 Subject: [PATCH 2/6] wip: sketching out a test for looping replay. --- pyproject.toml | 5 ++++ tests/test_replay.py | 71 ++++++++++++++++++++++++++++++++++++++++++++ 2 files changed, 76 insertions(+) diff --git a/pyproject.toml b/pyproject.toml index 953bb0d..d02c105 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,3 +110,8 @@ extend-exclude = ''' docs/* ) ''' + +[dependency-groups] +dev = [ + "python-lsp-server>=1.14.0", +] diff --git a/tests/test_replay.py b/tests/test_replay.py index 92845d5..305400a 100644 --- a/tests/test_replay.py +++ b/tests/test_replay.py @@ -180,6 +180,77 @@ async def test_replay( ) + +@pytest.mark.skipif("config.getoption('rust')", reason="rust does not support dumping") +@pytest.mark.asyncio +async def test_replay_looping( + controller: None, + reducer: Callable[[Optional[str]], Awaitable[None]], + create_worker: Callable[[WorkerName], Awaitable[Worker]], + create_ingester: Callable[[Ingester], Awaitable[Ingester]], + stream_eiger: Callable[[zmq.Context[Any], int, int], Coroutine[Any, Any, None]], + stream_orca: Callable[[zmq.Context[Any], int, int], Coroutine[Any, Any, None]], + stream_small: Callable[[zmq.Context[Any], int, int], Coroutine[Any, Any, None]], + tmp_path: Any, +) -> None: + p_eiger, p_prefix, uuid = await dump_data( + reducer, + create_worker, + create_ingester, + stream_eiger, + stream_orca, + stream_small, + tmp_path, + ) + # read dump + + par_file = generate_params(tmp_path) + stop_event = threading.Event() + + thread = threading.Thread( + target=replay, + args=( + "tests.aux_payloads:TestWorker", + "tests.aux_payloads:TestReducer", + [p_eiger, f"{p_prefix}orca-ingester-{uuid}.cbors"], + None, + par_file, + ), + kwargs={"port": 5010, "stop_event": stop_event}, + ) + print("Starting thread for replay") + thread.start() + def work_pre() -> None: + print("worjpree! weehooo!") + ntrig = 10 + f = h5pyd.File("http://localhost:5010/", "r", timeout=5) + logging.info("file %s", list(f.keys())) + print(list(f.keys())) + + nrun = 0 + print("Starting polling loop soon") + await asyncio.sleep(10) + while True: + work_pre() + nrun += 1 + if nrun > 10: + assert False, "Waited too long" + asyncio.sleep(1) + print("Done") + + # loop = asyncio.get_event_loop() + # await loop.run_in_executor(None, work_pre) + # # await work_pre() + # + logging.info("shut down server") + stop_event.set() + + thread.join() + await asyncio.sleep(0.1) + logging.info("thread joined") + + + @pytest.mark.skipif("config.getoption('rust')", reason="rust does not support dumping") @pytest.mark.asyncio async def test_replay_gzip( From da6804c8c6c0d40d1ab5425431d62d9e2b24b5a3 Mon Sep 17 00:00:00 2001 From: Jeremy Metz Date: Thu, 26 Feb 2026 21:17:59 +0100 Subject: [PATCH 3/6] test: finished adding basic test of looping replay --- tests/test_replay.py | 35 +++++++++++------------------------ 1 file changed, 11 insertions(+), 24 deletions(-) diff --git a/tests/test_replay.py b/tests/test_replay.py index 305400a..2c62a74 100644 --- a/tests/test_replay.py +++ b/tests/test_replay.py @@ -180,7 +180,6 @@ async def test_replay( ) - @pytest.mark.skipif("config.getoption('rust')", reason="rust does not support dumping") @pytest.mark.asyncio async def test_replay_looping( @@ -218,39 +217,27 @@ async def test_replay_looping( ), kwargs={"port": 5010, "stop_event": stop_event}, ) - print("Starting thread for replay") thread.start() - def work_pre() -> None: - print("worjpree! weehooo!") - ntrig = 10 + await asyncio.sleep(2) + single_run_len_results = 10 + test_pass = False + for _ in range(10): f = h5pyd.File("http://localhost:5010/", "r", timeout=5) logging.info("file %s", list(f.keys())) - print(list(f.keys())) - - nrun = 0 - print("Starting polling loop soon") - await asyncio.sleep(10) - while True: - work_pre() - nrun += 1 - if nrun > 10: - assert False, "Waited too long" - asyncio.sleep(1) - print("Done") - - # loop = asyncio.get_event_loop() - # await loop.run_in_executor(None, work_pre) - # # await work_pre() - # + len_results = len(f.get("results", [])) + logging.info("Length of results: %s", len_results) + if len_results > single_run_len_results: + test_pass = True + break + await asyncio.sleep(1) + assert test_pass, "Results never had more than 10 entries" logging.info("shut down server") stop_event.set() - thread.join() await asyncio.sleep(0.1) logging.info("thread joined") - @pytest.mark.skipif("config.getoption('rust')", reason="rust does not support dumping") @pytest.mark.asyncio async def test_replay_gzip( From f0063c0a35b7bf0797478a97f14ce1d1e1603fa3 Mon Sep 17 00:00:00 2001 From: Jeremy Metz Date: Fri, 27 Feb 2026 10:30:25 +0100 Subject: [PATCH 4/6] test: fixed event-number by incrementing in replay --- dranspose/replay.py | 14 ++++++++++--- pyproject.toml | 5 ----- tests/test_replay.py | 50 +++++++++++++++++++++++++++----------------- 3 files changed, 42 insertions(+), 27 deletions(-) diff --git a/dranspose/replay.py b/dranspose/replay.py index 418135a..eb54571 100644 --- a/dranspose/replay.py +++ b/dranspose/replay.py @@ -23,6 +23,7 @@ from dranspose.event import ( InternalWorkerMessage, EventData, + EventNumber, ResultData, message_tag_hook, ) @@ -303,15 +304,15 @@ def replay( reducer_app, port=port or 5000, host="localhost", log_level="info" ) server = Server(config) - # server.run() - first = True + last_output_event_number = -1 with server.run_in_thread(port): cache = [None for _ in gens] last_tick = 0.0 if start_event is not None: start_event.wait() + while True: try: internals = [ @@ -320,6 +321,7 @@ def replay( if len(internals) == 0: break lowestevn = min([ev.event_number for ev in internals]) + lowinternals = [] cache = internals for idx, ie in enumerate(internals): @@ -327,6 +329,10 @@ def replay( lowinternals.append(ie) cache[idx] = None event = EventData.from_internals(lowinternals) + if loop: + if last_output_event_number > event.event_number: + event.event_number = EventNumber(last_output_event_number + 1) + last_output_event_number = event.event_number dst_worker_ids = [random.randint(0, len(workers) - 1)] if first and broadcast_first: @@ -352,8 +358,10 @@ def replay( tick, ) time.sleep(latency) + if stop_event is not None: + if stop_event.is_set(): + raise StopIteration() except StopIteration: - # gens = [get_internals(f) for f in zmq_files] logger.debug("end of replay, calling finish") _finish(workers, reducer, reducer_app.state.parameters) break diff --git a/pyproject.toml b/pyproject.toml index d02c105..953bb0d 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -110,8 +110,3 @@ extend-exclude = ''' docs/* ) ''' - -[dependency-groups] -dev = [ - "python-lsp-server>=1.14.0", -] diff --git a/tests/test_replay.py b/tests/test_replay.py index 2c62a74..d7463f9 100644 --- a/tests/test_replay.py +++ b/tests/test_replay.py @@ -215,27 +215,39 @@ async def test_replay_looping( None, par_file, ), - kwargs={"port": 5010, "stop_event": stop_event}, + kwargs={"port": 5010, "stop_event": stop_event, "loop": True, "latency": 0.1}, ) thread.start() - await asyncio.sleep(2) - single_run_len_results = 10 - test_pass = False - for _ in range(10): - f = h5pyd.File("http://localhost:5010/", "r", timeout=5) - logging.info("file %s", list(f.keys())) - len_results = len(f.get("results", [])) - logging.info("Length of results: %s", len_results) - if len_results > single_run_len_results: - test_pass = True - break - await asyncio.sleep(1) - assert test_pass, "Results never had more than 10 entries" - logging.info("shut down server") - stop_event.set() - thread.join() - await asyncio.sleep(0.1) - logging.info("thread joined") + + async def check_poll_results() -> None: + print("Starting to poll hdf5-rest output with h5pyd") + await asyncio.sleep(2) + # NOTE: See https://github.com/felix-engelmann/dranspose/pull/58#discussion_r2861877247 + # the results have length 11, though the stream seems to be length 10 + single_run_len_results = 11 + max_wait_time = 5 + wait_step_duration = 0.5 + wait_total_steps = int((max_wait_time // wait_step_duration) + 1) + for _ in range(wait_total_steps): + f = h5pyd.File("http://localhost:5010/", "r", timeout=5) + len_results = len(f.get("results", [])) + print("Length of results:", len_results) + + if len_results > single_run_len_results: + return + await asyncio.sleep(wait_step_duration) + assert False, "Results never had more than 10 entries" + + try: + await check_poll_results() + except Exception as err: + raise Exception from err + finally: + logging.info("shut down server") + stop_event.set() + thread.join() + await asyncio.sleep(0.1) + logging.info("thread joined") @pytest.mark.skipif("config.getoption('rust')", reason="rust does not support dumping") From 31a40d0b5daa26add3736983506be7976f39896b Mon Sep 17 00:00:00 2001 From: Jeremy Metz Date: Sun, 22 Mar 2026 16:14:34 +0100 Subject: [PATCH 5/6] chore: switch from print to logging --- tests/test_replay.py | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/tests/test_replay.py b/tests/test_replay.py index d7463f9..82756f3 100644 --- a/tests/test_replay.py +++ b/tests/test_replay.py @@ -47,7 +47,7 @@ async def dump_data( await create_worker(WorkerName("w3")) p_eiger = tmp_path / "eiger_dump.cbors" - print(p_eiger, type(p_eiger)) + logging.info("%s %s", p_eiger, type(p_eiger)) await create_ingester( ZmqPullSingleIngester( @@ -220,7 +220,7 @@ async def test_replay_looping( thread.start() async def check_poll_results() -> None: - print("Starting to poll hdf5-rest output with h5pyd") + logging.info("Starting to poll hdf5-rest output with h5pyd") await asyncio.sleep(2) # NOTE: See https://github.com/felix-engelmann/dranspose/pull/58#discussion_r2861877247 # the results have length 11, though the stream seems to be length 10 @@ -231,7 +231,7 @@ async def check_poll_results() -> None: for _ in range(wait_total_steps): f = h5pyd.File("http://localhost:5010/", "r", timeout=5) len_results = len(f.get("results", [])) - print("Length of results:", len_results) + logging.info("Length of results: %s", len_results) if len_results > single_run_len_results: return From 4eea0af8a96e7beefa0af970cdf00561e31f0d09 Mon Sep 17 00:00:00 2001 From: Jeremy Metz Date: Sun, 22 Mar 2026 16:16:57 +0100 Subject: [PATCH 6/6] chore: check 2x len results received and add sleep after event.set --- tests/test_replay.py | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/tests/test_replay.py b/tests/test_replay.py index 82756f3..3b9fabb 100644 --- a/tests/test_replay.py +++ b/tests/test_replay.py @@ -233,7 +233,7 @@ async def check_poll_results() -> None: len_results = len(f.get("results", [])) logging.info("Length of results: %s", len_results) - if len_results > single_run_len_results: + if len_results > 2 * single_run_len_results: return await asyncio.sleep(wait_step_duration) assert False, "Results never had more than 10 entries" @@ -245,6 +245,7 @@ async def check_poll_results() -> None: finally: logging.info("shut down server") stop_event.set() + await asyncio.sleep(0.1) thread.join() await asyncio.sleep(0.1) logging.info("thread joined")