Skip to content
Open
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 11 additions & 0 deletions dranspose/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ def replay(args: argparse.Namespace) -> None:
args.port,
keepalive,
args.nworkers,
latency=args.latency,
loop=args.loop,
)


Expand Down Expand Up @@ -306,6 +308,15 @@ def create_parser() -> argparse.ArgumentParser:
default=2,
type=int,
)
parser_replay.add_argument(
"--latency",
help="time in seconds between replay frames",
default=0,
type=float,
)
parser_replay.add_argument(
"--loop", action="store_true", help="continuously loop over replay data"
)

return parser

Expand Down
11 changes: 7 additions & 4 deletions dranspose/replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import time
import traceback
from typing import ContextManager, Iterator, Any, Optional, IO, Tuple
import itertools

import cbor2
import uvicorn
Expand Down Expand Up @@ -257,7 +258,8 @@ def replay(
broadcast_first: bool = True,
done_event: threading.Event | None = None,
start_event: threading.Event | None = None,
latency: float | None = None,
latency: float = 0,
loop: bool = False,
) -> None:
if source is not None:
sourcecls = utils.import_class(source)
Expand All @@ -267,6 +269,8 @@ def replay(
gens = [get_internals(f) for f in zmq_files]
else:
gens = []
if loop:
gens = list(map(itertools.cycle, gens))

workercls = utils.import_class(wclass)
logger.info("custom worker class %s", workercls)
Expand Down Expand Up @@ -347,9 +351,9 @@ def replay(
reducer_app.state.parameters,
tick,
)
if latency is not None:
time.sleep(latency)
time.sleep(latency)
except StopIteration:
# gens = [get_internals(f) for f in zmq_files]
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is that comment added?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this the end-result of some uncommitted iterations in the code; Bob first just re-initialised gens on StopIteration, but the final version switched to using itertools.cycle. The comment can be removed.

Comment thread
felix-engelmann marked this conversation as resolved.
Outdated
logger.debug("end of replay, calling finish")
_finish(workers, reducer, reducer_app.state.parameters)
break
Expand All @@ -366,4 +370,3 @@ def replay(
stop_event.wait()
except KeyboardInterrupt:
pass
Comment thread
felix-engelmann marked this conversation as resolved.
logger.info("replay finished")
Comment thread
felix-engelmann marked this conversation as resolved.
5 changes: 5 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,8 @@ extend-exclude = '''
docs/*
)
'''

[dependency-groups]
dev = [
"python-lsp-server>=1.14.0",
Comment thread
felix-engelmann marked this conversation as resolved.
Outdated
]
58 changes: 58 additions & 0 deletions tests/test_replay.py
Original file line number Diff line number Diff line change
Expand Up @@ -180,6 +180,64 @@ async def test_replay(
)


@pytest.mark.skipif("config.getoption('rust')", reason="rust does not support dumping")
@pytest.mark.asyncio
async def test_replay_looping(
controller: None,
reducer: Callable[[Optional[str]], Awaitable[None]],
create_worker: Callable[[WorkerName], Awaitable[Worker]],
create_ingester: Callable[[Ingester], Awaitable[Ingester]],
stream_eiger: Callable[[zmq.Context[Any], int, int], Coroutine[Any, Any, None]],
stream_orca: Callable[[zmq.Context[Any], int, int], Coroutine[Any, Any, None]],
stream_small: Callable[[zmq.Context[Any], int, int], Coroutine[Any, Any, None]],
tmp_path: Any,
) -> None:
p_eiger, p_prefix, uuid = await dump_data(
reducer,
create_worker,
create_ingester,
stream_eiger,
stream_orca,
stream_small,
tmp_path,
)
# read dump

par_file = generate_params(tmp_path)
stop_event = threading.Event()

thread = threading.Thread(
target=replay,
args=(
"tests.aux_payloads:TestWorker",
"tests.aux_payloads:TestReducer",
[p_eiger, f"{p_prefix}orca-ingester-{uuid}.cbors"],
None,
par_file,
),
kwargs={"port": 5010, "stop_event": stop_event},
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if looping should be tested, an additional "loop": True is needed. But then the test does not terminate.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I resolved this by listening to the stop signal inside the replay loop - seems sensible to me, what do you think?

)
thread.start()
await asyncio.sleep(2)
single_run_len_results = 10
test_pass = False
for _ in range(10):
f = h5pyd.File("http://localhost:5010/", "r", timeout=5)
logging.info("file %s", list(f.keys()))
len_results = len(f.get("results", []))
logging.info("Length of results: %s", len_results)
if len_results > single_run_len_results:
test_pass = True
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

f["results"] is a group with 11 subgroups: ['0', '1', '2', '3', '4', '5', '6', '7', '8', '9', '10'], one for each event_number from the replay. The second loop will overwrite the elements once over. This is a general issue with a loop, the event numbers will restart from 0, with all series_start headers. Maybe I would like to better understand the use-case of the loop feature.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Is this because of the start frame? I thought the test streams defined in conftest are length 10?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

and in addition, I see in replay that event_number will just be passed though, but in the loop regime, perhaps we could artificially increment past the last recorded event number?

I've drafted this in my last commit.
In addition I added now listening to the stop_event inside the replay loop - so we can stop an infinite replay programmatically.

Copy link
Copy Markdown
Owner

@felix-engelmann felix-engelmann Feb 27, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh! Is this because of the start frame? I thought the test streams defined in conftest are length 10?

The replay first creates its own recordings through dump_data as a test helper in test_replay.py It defines

ntrig = 10
resp = await session.post(
"http://localhost:5000/api/v1/sequence",
json=monopart_sequence(
{
"eiger": [[vworker(2 * i)] for i in range(1, ntrig)],
"orca": [[vworker(2 * i + 1)] for i in range(1, ntrig)],
"alba": [
[vworker(2 * i), vworker(2 * i + 1)] for i in range(1, ntrig)
],
"slow": [
[vworker(2 * i), vworker(2 * i + 1)] if i % 4 == 0 else None
for i in range(1, ntrig)
],
}
),
)
which generates 9 frames plus start and end for a total of 11 events.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This works now - I switched to checking that length is > 11, and I added the stop_event inside the loop for long-running (or infinite) loops so they can still be stopped sensibly (if stop_event gets set while still in the loop, a StopIteration is raised, so that we exit the loop according to the standard process.

break
await asyncio.sleep(1)
assert test_pass, "Results never had more than 10 entries"
logging.info("shut down server")
stop_event.set()
thread.join()
await asyncio.sleep(0.1)
logging.info("thread joined")


@pytest.mark.skipif("config.getoption('rust')", reason="rust does not support dumping")
@pytest.mark.asyncio
async def test_replay_gzip(
Expand Down