Skip to content
Open
14 changes: 7 additions & 7 deletions dranspose/debug_worker.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
from contextlib import asynccontextmanager
from typing import AsyncGenerator, Any

import zmq
from fastapi import FastAPI
from starlette.responses import Response

Expand All @@ -29,13 +28,14 @@ async def work(self) -> None:
while True:
self.dequeue_task = None
self.dequeue_task = asyncio.create_task(self.assignment_queue.get())
ingesterset = await self.dequeue_task
done = await self.poll_internals(ingesterset)
if set(done) != {zmq.POLLIN}:
self._logger.warning("not all sockets are pollIN %s", done)
continue
evn, streamset = await self.dequeue_task

event = await self.build_event(ingesterset)
self.new_data.clear()
while set(self.stream_queues.get(evn, {}).keys()) != streamset:
await self.new_data.wait()
self.new_data.clear()

event = EventData(event_number=evn, streams=self.stream_queues[evn])
self._logger.debug("adding event %s to buffer", event)
self.buffer.append(event)

Expand Down
4 changes: 4 additions & 0 deletions dranspose/ingesters/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,10 @@
ZmqPullSingleIngester,
ZmqPullSingleSettings,
)
from dranspose.ingesters.stins_parallel import ( # noqa: F401
StinsParallelIngester,
StinsParallelSettings,
)
from dranspose.ingesters.zmqpull_eiger_legacy import ( # noqa: F401
ZmqPullEigerLegacyIngester,
ZmqPullEigerLegacySettings,
Expand Down
96 changes: 96 additions & 0 deletions dranspose/ingesters/stins_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
import asyncio
from typing import AsyncGenerator, Optional

import zmq


from dranspose.data.stream1 import Stream1Packet, Stream1End
from dranspose.event import StreamData, InternalWorkerMessage
from dranspose.ingester import Ingester, IngesterSettings
from dranspose.protocol import (
StreamName,
ZmqUrl,
WorkAssignment,
WorkerName,
)


class StinsParallelSettings(IngesterSettings):
upstream_url: ZmqUrl


class StinsParallelIngester(Ingester):
"""
A simple ingester class to comsume a stream from the streaming-receiver repub port
"""

def __init__(self, settings: Optional[StinsParallelSettings] = None) -> None:
if settings is not None:
self._streaming_single_settings = settings
else:
self._streaming_single_settings = StinsParallelSettings()
Comment on lines +28 to +31
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if settings is not None:
self._streaming_single_settings = settings
else:
self._streaming_single_settings = StinsParallelSettings()
self._streaming_single_settings = settings or StinsParallelSettings()

Pythons boolean operators are more useful than they first might seem.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The danger with implicit bool casting vs None test is visible in this contrived example:

class Test():
    def __bool__(self):
        return False

t = Test()

t is None
#out: False

t or 1
#out: 1

I only want to create an empty Settings object if None was provided as parameter. If the provided value happens to be an implicit false, we get the problem:

a = 0.0

a or 1 
#out: 1


super().__init__(settings=self._streaming_single_settings)
self.in_socket: Optional[zmq._future._AsyncSocket] = None

async def work(self) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary (to see if I get it):

  • reimplementing work() in the Parallel ingester means (reasonably) no dumping. I guess one could still use @voidcase's Dumper, but the dump would be a pain to replay.
  • run_source_part just dumps everything it gets to work() until it gets a Stream1End
  • work() now has to deal with the fact that it may not get all messages, so it requests work_assignments until it gets the right one

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting @felix-engelmann "The magic is that the parallel [ingester] can't use simple counting and has to rely on data in the stream ie msg_numbe[r]"

Copy link
Copy Markdown
Collaborator

@mecascella mecascella Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this, and I started wondering if it would not make more sense to move the responsibility of providing the message number from Ingester.work() to Ingester.run_source().

This would cause some code duplication, as all existing ingester would need a few lines to count the current message number, but would open the possibility for future ingesters to decide how to compute the event number.

Maybe we can just keep this in mind if this becomes an issue for other ingesters in the future.

Copy link
Copy Markdown
Collaborator

@mecascella mecascella Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • the dump would be a pain to replay.

I've changed my mind! I just went back to the replay code, and it looks to me like this would just work fine!

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I like having a shadow of work() which does mostly but not entirely the same thing. Partly because it's duplicate code, and it also makes it very unclear what is different about this ingester unless you read both implementations of work() side by side like I am doing now. I am still not sure what is going on here.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumping from multiple parallel ingesters would be a difficult problem, especially if two scans overlap and end up in the same files.
Moving all "numbering" responsibility to the run_source might not work well with the following edge case where streams have different rates. This magic mapping of frame/msg_number to event number puts an implicit contract on the trigger map. All streams that use such an parallel ingester must have exactly one event per STINS packet. For a slow stream (e.g. sardana at the end of a fly line) it will be impossible for the ingester to "count" how many events passed. The ingester there "zips" packets with the assignment queue which provides the correct event number.

self._logger.info("started stins ingester work task")

if len(self.active_streams) == 0:
self._logger.warning("this ingester has no active streams, stopping worker")
return
sourcegen = self.run_source_part(self.active_streams[0])
try:
while True:
nextiwm: InternalWorkerMessage = await anext(sourcegen)

work_assignment: WorkAssignment = await self.assignment_queue.get()
while work_assignment.event_number < nextiwm.event_number:
work_assignment = await self.assignment_queue.get()

workermessages: dict[WorkerName, InternalWorkerMessage] = {}
for stream, workers in work_assignment.assignments.items():
for worker in workers:
if worker not in workermessages:
workermessages[worker] = nextiwm
self._logger.debug("workermessages %s", workermessages)
await self._send_workermessages(workermessages)
self.state.processed_events += 1
except asyncio.exceptions.CancelledError:
self._logger.info("stopping worker")
for stream in self.active_streams:
await self.stop_source(stream)

async def run_source_part(
self, stream: StreamName
) -> AsyncGenerator[InternalWorkerMessage, None]:
self.in_socket = self.ctx.socket(zmq.PULL)
self.in_socket.connect(str(self._streaming_single_settings.upstream_url))
self._logger.info(
"pulling from %s", self._streaming_single_settings.upstream_url
)

while True:
parts = await self.in_socket.recv_multipart(copy=False)
try:
packet = Stream1Packet.validate_json(parts[0].bytes)
except Exception as e:
self._logger.error("packet not valid %s", e.__repr__())
continue
self._logger.debug("msg number %d", packet.msg_number)
yield InternalWorkerMessage(
event_number=packet.msg_number,
streams={stream: StreamData(typ="STINS", frames=parts)},
)

if isinstance(packet, Stream1End):
Copy link
Copy Markdown
Collaborator

@mecascella mecascella Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parallel ingester that does not get the Stream1End will keep running and consuming messages.
That should not be a problem because there will be no assignments, right?

In general, we are walking away from the notion of a stream with a well-defined start and end; is this ingester more fragile? Or were we just more paranoid before?

Would it make sense to have only one partial ingester have an open socket until it receives a Stream1Start, and then notify the others? If some delay is ok, one could even just use Redis for that.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are assuming more implicit usage contracts, yes. When only consuming with one stream, it is easy to enforce a strict sequence from start until end. The parallel ingester in this PR does not have any of these guarantees.

I like the idea of having a single ingester connected, waiting for the start and once received notifying others to start "helping out" with consuming messages from their PULL sockets. Then once some ingester got an end message, it notifies all others to disconnect except the "main" ingester which is again waiting for the next start.

For communication, I would suggest redis, as it already is the bottleneck on round trip times and we don't need more dependencies. A possible design could be that the main ingester has a special flag and all ingesters for a stream share a redis-stream to which they post the start message and the end message and then act accordingly.

We need to loop in the logic of restarting a stuck scan where there is no end message etc. but that should be possible to model.

break
while True:
self._logger.debug("discarding messages until next run")
await self.in_socket.recv_multipart(copy=False)

async def stop_source(self, stream: StreamName) -> None:
if self.in_socket:
self._logger.info("closing socket without linger")
self.in_socket.close(linger=0)
self.in_socket = None
55 changes: 55 additions & 0 deletions dranspose/ingesters/stream1_parallel.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
from typing import Optional, AsyncGenerator
import zmq

from dranspose.data.eiger_legacy import (
EigerLegacyPacket,
EigerLegacyEnd,
EigerLegacyImage,
EigerLegacyHeader,
)
from dranspose.event import InternalWorkerMessage, StreamData
from dranspose.ingesters.stins_parallel import StinsParallelIngester
from dranspose.ingesters.zmqpull_eiger_legacy import ZmqPullEigerLegacySettings
from dranspose.protocol import StreamName, EventNumber


class Stream1ParallelIngester(StinsParallelIngester):
def __init__(self, settings: Optional[ZmqPullEigerLegacySettings] = None) -> None:
if settings is not None:
self._streaming_settings = settings
else:
self._streaming_settings = ZmqPullEigerLegacySettings()

super().__init__(settings=self._streaming_settings)
self.in_socket: Optional[zmq._future._AsyncSocket] = None

async def run_source_part(
self, stream: StreamName
) -> AsyncGenerator[InternalWorkerMessage, None]:
self.in_socket = self.ctx.socket(zmq.PULL)
self.in_socket.connect(str(self._streaming_settings.upstream_url))
self._logger.info("pulling from %s", self._streaming_settings.upstream_url)

while True:
parts = await self.in_socket.recv_multipart(copy=False)
try:
packet = EigerLegacyPacket.validate_json(parts[0].bytes)
except Exception as e:
self._logger.error("packet not valid %s", e.__repr__())
continue
msg_number = None
if isinstance(packet, EigerLegacyImage):
msg_number = EventNumber(packet.frame + 1)
elif isinstance(packet, EigerLegacyHeader):
msg_number = EventNumber(0)
elif isinstance(packet, EigerLegacyEnd):
break
self._logger.debug("msg number %d", msg_number)
yield InternalWorkerMessage(
event_number=msg_number,
streams={stream: StreamData(typ="EIGER_LEGACY", frames=parts)},
)

while True:
self._logger.debug("discarding messages until next run")
await self.in_socket.recv_multipart(copy=False)
Loading
Loading