Skip to content

New worker event assembly#41

Open
felix-engelmann wants to merge 10 commits into
mainfrom
scale-ingester
Open

New worker event assembly#41
felix-engelmann wants to merge 10 commits into
mainfrom
scale-ingester

Conversation

@felix-engelmann
Copy link
Copy Markdown
Owner

This change will allow to horizontally scale ingesters and connect to a shared PULL socket.

@mecascella mecascella marked this pull request as ready for review September 17, 2025 07:27
Copy link
Copy Markdown
Collaborator

@mecascella mecascella left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wow, thanks! I'm only halfway through it, but I'll start asking some questions.

super().__init__(settings=self._streaming_single_settings)
self.in_socket: Optional[zmq._future._AsyncSocket] = None

async def work(self) -> None:
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Summary (to see if I get it):

  • reimplementing work() in the Parallel ingester means (reasonably) no dumping. I guess one could still use @voidcase's Dumper, but the dump would be a pain to replay.
  • run_source_part just dumps everything it gets to work() until it gets a Stream1End
  • work() now has to deal with the fact that it may not get all messages, so it requests work_assignments until it gets the right one

Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Quoting @felix-engelmann "The magic is that the parallel [ingester] can't use simple counting and has to rely on data in the stream ie msg_numbe[r]"

Copy link
Copy Markdown
Collaborator

@mecascella mecascella Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was thinking about this, and I started wondering if it would not make more sense to move the responsibility of providing the message number from Ingester.work() to Ingester.run_source().

This would cause some code duplication, as all existing ingester would need a few lines to count the current message number, but would open the possibility for future ingesters to decide how to compute the event number.

Maybe we can just keep this in mind if this becomes an issue for other ingesters in the future.

Copy link
Copy Markdown
Collaborator

@mecascella mecascella Sep 18, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • the dump would be a pain to replay.

I've changed my mind! I just went back to the replay code, and it looks to me like this would just work fine!

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I like having a shadow of work() which does mostly but not entirely the same thing. Partly because it's duplicate code, and it also makes it very unclear what is different about this ingester unless you read both implementations of work() side by side like I am doing now. I am still not sure what is going on here.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Dumping from multiple parallel ingesters would be a difficult problem, especially if two scans overlap and end up in the same files.
Moving all "numbering" responsibility to the run_source might not work well with the following edge case where streams have different rates. This magic mapping of frame/msg_number to event number puts an implicit contract on the trigger map. All streams that use such an parallel ingester must have exactly one event per STINS packet. For a slow stream (e.g. sardana at the end of a fly line) it will be impossible for the ingester to "count" how many events passed. The ingester there "zips" packets with the assignment queue which provides the correct event number.

streams={stream: StreamData(typ="STINS", frames=parts)},
)

if isinstance(packet, Stream1End):
Copy link
Copy Markdown
Collaborator

@mecascella mecascella Sep 17, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the parallel ingester that does not get the Stream1End will keep running and consuming messages.
That should not be a problem because there will be no assignments, right?

In general, we are walking away from the notion of a stream with a well-defined start and end; is this ingester more fragile? Or were we just more paranoid before?

Would it make sense to have only one partial ingester have an open socket until it receives a Stream1Start, and then notify the others? If some delay is ok, one could even just use Redis for that.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are assuming more implicit usage contracts, yes. When only consuming with one stream, it is easy to enforce a strict sequence from start until end. The parallel ingester in this PR does not have any of these guarantees.

I like the idea of having a single ingester connected, waiting for the start and once received notifying others to start "helping out" with consuming messages from their PULL sockets. Then once some ingester got an end message, it notifies all others to disconnect except the "main" ingester which is again waiting for the next start.

For communication, I would suggest redis, as it already is the bottleneck on round trip times and we don't need more dependencies. A possible design could be that the main ingester has a special flag and all ingesters for a stream share a redis-stream to which they post the start message and the end message and then act accordingly.

We need to loop in the logic of restarting a stuck scan where there is no end message etc. but that should be possible to model.

Comment thread dranspose/worker.py
self._reducer_service_uuid: Optional[UUID4] = None
self.out_socket: Optional[zmq._future._AsyncSocket] = None

self.assignment_queue: asyncio.Queue[
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Again, for my own sanity (and hoping it helps anyone looking at this):

  • manage_assignments() used to fill the assignment_queue with a set of ingesters, now it puts() a tuple of evt_num and set of streams (one ingester can have multiple streams).
  • each worker has a zmq socket per ingester; before, since ingesters send events in order, if you (the worker) know that the next event has ingesters a,c,d... you just waited for all of them to send something and when they all have, you assemble the event out of whatever you received relying on the ordered delivery guarantee.
  • Now, you (worker) have a receive_ingester task for each ingester, that adds the received data in stream_queues indexed by the evt_num. work() awaits assignment_queue, then compares the stream set with the streams in stream_queues for that event until all streams are there. The calls _run_payload, which is just the old code refactored out.
  • having a per ingester receive_ingester task means no more gather() -> no need to poll sockets before awaiting them

Comment thread dranspose/worker.py Outdated
Copy link
Copy Markdown

@voidcase voidcase left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Tbh, I am feeling a bit of the heebie-jeebies about this PR. It feels like we are heading in a high complexity direction, and I am now having trouble reasoning about the system. I had a talk with @mecascella about it, and I think it might be good if I try to draw a complete(ish) map of the flow of information of dranspose. I can't point to any particular improvements right now, but I feel it in my bones, and I need to figure out what they are trying to tell me.

Comment on lines +28 to +31
if settings is not None:
self._streaming_single_settings = settings
else:
self._streaming_single_settings = StinsParallelSettings()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
if settings is not None:
self._streaming_single_settings = settings
else:
self._streaming_single_settings = StinsParallelSettings()
self._streaming_single_settings = settings or StinsParallelSettings()

Pythons boolean operators are more useful than they first might seem.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The danger with implicit bool casting vs None test is visible in this contrived example:

class Test():
    def __bool__(self):
        return False

t = Test()

t is None
#out: False

t or 1
#out: 1

I only want to create an empty Settings object if None was provided as parameter. If the provided value happens to be an implicit false, we get the problem:

a = 0.0

a or 1 
#out: 1

super().__init__(settings=self._streaming_single_settings)
self.in_socket: Optional[zmq._future._AsyncSocket] = None

async def work(self) -> None:
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I like having a shadow of work() which does mostly but not entirely the same thing. Partly because it's duplicate code, and it also makes it very unclear what is different about this ingester unless you read both implementations of work() side by side like I am doing now. I am still not sure what is going on here.

logging.info("proc2 ev %d", ing2.state.processed_events)
assert ing1.state.processed_events < ntrig
assert ing2.state.processed_events < ntrig
assert ing1.state.processed_events + ing2.state.processed_events == ntrig + 1
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe something to check that the work is roughly equally distributed?

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The problem with zmq pull is that there is no guarantee at all about how messages are distributed. Even the < ntrig could fail if all messages go to a single ingester.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sure, there is no guarantee from zmq. But I would argue that if one ingester does get all the work, the program is not doing fulfilling the main purpose of the parallel ingester, to speed up throughput via parallelization, and therefore the test should fail.

Copy link
Copy Markdown
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

To actually test for roughly even distribution, I would perform a test with way more packets, e.g. 10000 and then make assertions on the distribution. Zmq may easily deliver 10 packets to the same PULL socket by accident.

@felix-engelmann
Copy link
Copy Markdown
Owner Author

One additional idea was to provide a test payload which acts as a file writer to dump all frames to a file per worker and use the reducer to write a hdf5 virtual dataset.
Depending on the Trigger map, either the worker allocation happens randomly, or if every worker has a specific label, the file writing can enforce strict round robin.
An issue with any part is, that the event rate of the reducer can not keep up with the incoming stream, so the workers need to batch whatever they send to the reducer.
We may use the StreamEnd message to flush any pending state of workers to the receiver, but then need to be careful with aborted scans.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants