Skip to content
Open
Show file tree
Hide file tree
Changes from 9 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 24 additions & 6 deletions src/openf1/services/ingestor_livetiming/core/processing/main.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
import functools
import traceback
from collections import defaultdict

Expand All @@ -6,6 +7,7 @@
Message,
get_topics_to_collections_mapping,
)
from openf1.util.multiprocessing import map_parallel


def process_message(
Expand Down Expand Up @@ -37,16 +39,32 @@ def process_messages(
meeting_key: int,
session_key: int,
messages: list[Message],
parallel: bool = False,
max_workers: int | None = None,
batch_size: int | None = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't exactly like propagating these through all of the functions (very easy to forget), but I'm not sure what other options there are.

) -> dict[str, list[Document]]:
"""Processes messages and returns the generated documents by collection"""
docs_buf = defaultdict(dict)
for message in messages:
processed = process_message(
meeting_key=meeting_key,
session_key=session_key,
message=message,

processed = (
map_parallel(
functools.partial(process_message, meeting_key, session_key),
messages,
max_workers=max_workers,
batch_size=batch_size,
)
if parallel
else (
process_message(
meeting_key=meeting_key, session_key=session_key, message=message
)
for message in messages
)
for collection, docs in processed.items():
)

# Avoid synchronization with sequential memory writes
for p in processed:
for collection, docs in p.items():
for doc in docs:
# Replace previous version of the same doc if it exists
docs_buf[collection][doc] = doc
Expand Down
17 changes: 17 additions & 0 deletions src/openf1/services/ingestor_livetiming/historical/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -81,3 +81,20 @@ To ingest processed documents of all the available collections for season `2024`
```bash
python -m openf1.services.ingestor_livetiming.historical.main ingest-season 2024
```

### Enable parallelization

> [!NOTE]
> Enabling parallelization will require additional system resources (CPU, RAM). Use at your own discretion.

To speed up data ingestion, the `--parallel` flag can be used to enable parallel message processing. To ingest processed documents of all the available collections for season `2024` in parallel:

```bash
python -m openf1.services.ingestor_livetiming.historical.main ingest-season 2024 --parallel
```

Optional flags `--max-workers` and `--batch-size` can also be used:
- `--max-workers`: The maximum number of worker processes used to process messages (defaults to the number of CPU cores on the machine)
- `--batch-size`: The number of messages processed by each worker process at a time (defaults to 1)

Increasing `--max-workers` and/or `--batch-size` may speed up data ingestion further, but require more memory.
Loading