Skip to content

perf(ingestor_livetiming): add opt-in concurrency/parallelism for historical data ingestion#305

Open
JeffreyJPZ wants to merge 22 commits intobr-g:mainfrom
JeffreyJPZ:ingest-historical-perf-improvements
Open

perf(ingestor_livetiming): add opt-in concurrency/parallelism for historical data ingestion#305
JeffreyJPZ wants to merge 22 commits intobr-g:mainfrom
JeffreyJPZ:ingest-historical-perf-improvements

Conversation

@JeffreyJPZ
Copy link
Contributor

@JeffreyJPZ JeffreyJPZ commented Jan 15, 2026

Closes #304

Description
Currently, historical data ingestion is synchronous and can take many minutes to hours to complete when ingesting large amounts of data. This PR aims to reduce the time needed by optimizing some of the "hot paths".

Changes

  • Add opt-in concurrency/parallelism with --parallel and optional --max-workers, --batch-size CLI flags
  • Update historical ingestion README with example usage
  • Parallelize _parse_and_decode_topic_content and parse_messages
  • Leverage concurrency with async database writes
  • Add wrapper class for typer.Typer to allow for decorating async functions

Results
Best-case data ingestion time should be reduced by ~90%.

Comment on lines 42 to 44
parallel: bool = False,
max_workers: int | None = None,
batch_size: int | None = None,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't exactly like propagating these through all of the functions (very easy to forget), but I'm not sure what other options there are.

Comment on lines +128 to +129
if len(line) == 0:
return None, None
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Moved the empty line check here, makes it much easier to write the parallel part.

Comment on lines 7 to 36
class Typer(typer.Typer):
"""
Wrapper class for typer.Typer to allow for decorating async functions.
Adapted from https://github.com/byunjuneseok/async-typer.
"""

def command(self, *args, **kwargs):
def decorator(func: Callable):
@wraps(func)
def _func(*_args, **_kwargs):
if iscoroutinefunction(func):
# Use current event loop if already running
loop = None
try:
loop = get_running_loop()
except RuntimeError:
pass

if loop is not None:
return loop.run_until_complete(func(*_args, **_kwargs))
else:
return run(func(*_args, **_kwargs))

return func(*_args, **_kwargs)

super(Typer, self).command(*args, **kwargs)(_func)

return func

return decorator
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is pretty similar to the package async-typer, but I'd rather not have the extra dependency.

Comment on lines 449 to 465
if parallel:
await tqdm_async.gather(
*[
insert_data_async(
collection_name=collection,
docs=[d.to_mongo_doc_sync() for d in docs],
)
for collection, docs in docs_by_collection.items()
],
disable=not verbose,
)
else:
for collection, docs in tqdm(
list(docs_by_collection.items()), disable=not verbose
):
docs_mongo = [d.to_mongo_doc_sync() for d in docs]
insert_data_sync(collection_name=collection, docs=docs_mongo)
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if async writes should be done even when --parallel isn't enabled, I left the original code there for now.

Comment on lines 379 to 523
for session_key in session_keys:
if verbose:
logger.info(f"Ingesting session {session_key}")
ingest_session(
year=year, meeting_key=meeting_key, session_key=session_key, verbose=False
# If parallel is set, program is not I/O bound so having await in a for-loop isn't an issue
await ingest_session(
year=year,
meeting_key=meeting_key,
session_key=session_key,
parallel=parallel,
max_workers=max_workers,
batch_size=batch_size,
verbose=verbose,
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can't really parallelize across meetings AND sessions AND topics as that would mean a lot of processes, so I just focused on the larger bottlenecks.

Comment on lines +199 to +212
def batched(iterable, n, strict=False):
"""
This function is equivalent to Python's itertools.batched: https://docs.python.org/3/library/itertools.html#itertools.batched

Batch data from the iterable into tuples of length n. The last batch may be shorter than n.
If strict is true, will raise a ValueError if the final batch is shorter than n.
"""
if n < 1:
raise ValueError("n must be at least one")
iterator = iter(iterable)
while batch := tuple(islice(iterator, n)):
if strict and len(batch) != n:
raise ValueError("batched(): incomplete batch")
yield batch
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

itertools.batched was added in Python 3.12 while this project supports >=3.10.

@br-g
Copy link
Owner

br-g commented Jan 17, 2026

Hello @JeffreyJPZ, thanks a lot for opening this PR.

I have run a few tests though, and the results seem to be different when --parallel is enabled.

For example, on my laptop:

  • python -m openf1.services.ingestor_livetiming.historical.main get-processed-documents 2025 1276 9839 laps gives 1156 laps
  • python -m openf1.services.ingestor_livetiming.historical.main get-processed-documents 2025 1276 9839 laps --parallel gives only 939 laps.

I'm not sure exactly why.

@JeffreyJPZ
Copy link
Contributor Author

JeffreyJPZ commented Jan 17, 2026

Good catch, it's a bit tedious to test everything manually. When I tried to reproduce it on my end, it looks like the correct number of messages are being fetched in both modes, but I got 945 processed laps when using parallel (sequential seems fine).

I think I narrowed it down to process_message, some collections implicitly assume that the messages are processed in chronological order because they need to maintain state (ex. laps). It would also explain why the number of processed documents differs (some accidental non-determinism). If we simply avoid processing those collections in parallel it should fix the issue, and there should still be noticeable benefits since large collections like car_data and position don't maintain any state.

Edit: this might be a bit difficult as each message can map to multiple collections, will have to think more about this

@JeffreyJPZ
Copy link
Contributor Author

I've decided to remove multiprocessing as after some more testing it doesn't seem to benefit from it at all, and it limits concurrency with multiple meetings/sessions. The bottleneck is processing the car_data and location collections, which take a few seconds, but with some strategic awaits to ensure all tasks get to run along with async requests, I've been able to ingest meetings in ~1min30s.

This would definitely benefit from a few integration tests #264 (ex. ensuring that the correct number of documents is processed), but Typer's support for testing is not well-documented/easy to use, and there are external systems to consider (F1 API, Mongo). A custom framework (based on pytest) would be necessary.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Add opt-in concurrency for historical data ingestion

2 participants