diff --git a/requirements.txt b/requirements.txt index e4155f9..e539e7e 100644 --- a/requirements.txt +++ b/requirements.txt @@ -1,5 +1,6 @@ aiohttp==3.12.13 aiomqtt==2.4.0 +async-lru==2.1.0 beautifulsoup4==4.13.4 cachetools==5.5.2 fastapi==0.115.13 diff --git a/src/openf1/services/ingestor_livetiming/core/objects.py b/src/openf1/services/ingestor_livetiming/core/objects.py index 167c625..667e4ee 100644 --- a/src/openf1/services/ingestor_livetiming/core/objects.py +++ b/src/openf1/services/ingestor_livetiming/core/objects.py @@ -152,7 +152,12 @@ def _get_collections_cls_by_name() -> dict[str, type[Collection]]: def get_collections(meeting_key: int, session_key: int) -> list[Collection]: """Returns the instances of all available collections for a given session""" collections = [ - cls(meeting_key=meeting_key, session_key=session_key) + cls( + meeting_key=meeting_key, + session_key=session_key, + name=cls.name, + source_topics=cls.source_topics, + ) for cls in _get_collections_cls_by_name().values() ] collections = sorted(collections, key=lambda c: c.__class__.name) diff --git a/src/openf1/services/ingestor_livetiming/core/processing/main.py b/src/openf1/services/ingestor_livetiming/core/processing/main.py index 67841b5..979551d 100644 --- a/src/openf1/services/ingestor_livetiming/core/processing/main.py +++ b/src/openf1/services/ingestor_livetiming/core/processing/main.py @@ -9,10 +9,13 @@ def process_message( - meeting_key: int, session_key: int, message: Message + meeting_key: int, + session_key: int, + message: Message, ) -> dict[str, list[Document]]: """Processes a message from a given topic and returns processed documents grouped - by collection""" + by collection. If at least one collection name is given, only those collections are used. + """ # Select collections which could use this message selected_collections = get_topics_to_collections_mapping( meeting_key=meeting_key, session_key=session_key @@ -34,9 +37,7 @@ def process_message( def process_messages( - meeting_key: int, - session_key: int, - messages: list[Message], + meeting_key: int, session_key: int, messages: list[Message] ) -> dict[str, list[Document]]: """Processes messages and returns the generated documents by collection""" docs_buf = defaultdict(dict) diff --git a/src/openf1/services/ingestor_livetiming/historical/README.md b/src/openf1/services/ingestor_livetiming/historical/README.md index 84d2c2d..7e11b07 100644 --- a/src/openf1/services/ingestor_livetiming/historical/README.md +++ b/src/openf1/services/ingestor_livetiming/historical/README.md @@ -81,3 +81,16 @@ To ingest processed documents of all the available collections for season `2024` ```bash python -m openf1.services.ingestor_livetiming.historical.main ingest-season 2024 ``` + +### Enable parallelization + +> [!NOTE] +> Enabling parallelization will require significantly more system resources (CPU, RAM), especially when ingesting seasons. Use at your own discretion. + +To speed up data ingestion, the `--parallel` flag can be used to enable parallel message processing for `get-messages`, `get-processed-documents`, and `ingest-` commands. To ingest processed documents of all the available collections for meeting `1242` (year `2024`): + +```bash +python -m openf1.services.ingestor_livetiming.historical.main ingest-meeting 2024 1242 --parallel +``` + +For `ingest-meeting` and `ingest-season`, the flags `--by-session` and by `--by-meeting` can be enabled for a hybrid style of ingestion, reducing resource consumption at the cost of increased ingestion time. diff --git a/src/openf1/services/ingestor_livetiming/historical/main.py b/src/openf1/services/ingestor_livetiming/historical/main.py index 451df85..f719096 100644 --- a/src/openf1/services/ingestor_livetiming/historical/main.py +++ b/src/openf1/services/ingestor_livetiming/historical/main.py @@ -1,11 +1,13 @@ +import asyncio import json import re from datetime import datetime, timedelta from functools import lru_cache +import aiohttp import pytz import requests -import typer +from async_lru import alru_cache from loguru import logger from tqdm import tqdm @@ -16,19 +18,44 @@ get_collections, get_source_topics, ) +from openf1.services.ingestor_livetiming.historical import typer from openf1.services.ingestor_livetiming.core.processing.main import process_messages -from openf1.util.db import insert_data_sync + +from openf1.util.db import insert_data_sync, insert_data_async from openf1.util.misc import join_url, json_serializer, to_datetime, to_timedelta from openf1.util.schedule import get_meeting_keys from openf1.util.schedule import get_schedule as _get_schedule from openf1.util.schedule import get_session_keys cli = typer.Typer() +http_client_async = None # Flag to determine if the script is being run from the command line _is_called_from_cli = False +def get_http_client_async(): + """Creates an async HTTP client with an indefinite TTL only when called (lazy loading)""" + global http_client_async + if http_client_async is None: + http_client_async = aiohttp.ClientSession( + timeout=aiohttp.ClientTimeout(total=None) + ) + return http_client_async + + +async def http_client_cleanup(): + """Closes the async HTTP client and marks it for garbage collection.""" + global http_client_async + try: + if http_client_async is not None: + await http_client_async.close() + except Exception: + pass + finally: + http_client_async = None + + @cli.command() def get_schedule(year: int) -> dict: schedule = _get_schedule(year) @@ -100,9 +127,21 @@ def _get_topic_content(session_url: str, topic: str) -> list[str]: topic_filename = f"{topic}.jsonStream" url_topic = join_url(session_url, topic_filename) topic_content = requests.get(url_topic).text.split("\r\n") + return topic_content +@alru_cache() +async def _get_topic_content_async(session_url: str, topic: str): + topic_filename = f"{topic}.jsonStream" + url_topic = join_url(session_url, topic_filename) + + response = await get_http_client_async().get(url_topic) + topic_content = await response.text() + + return topic_content.split("\r\n") + + @cli.command() def get_topic_content( year: int, meeting_key: int, session_key: int, topic: str @@ -123,6 +162,8 @@ def _parse_line(line: str) -> tuple[timedelta | None, str | None]: The line is expected to be formatted as follows: (duration since session start, raw data) """ + if len(line) == 0: + return None, None pattern = r"(\d+:\d+:\d+\.\d+)(.*)" match = re.match(pattern, line) if match is None: @@ -132,15 +173,14 @@ def _parse_line(line: str) -> tuple[timedelta | None, str | None]: return session_time, raw_data -def _parse_and_decode_topic_content( +async def _parse_and_decode_topic_content( topic: str, topic_raw_content: list[str], t0: datetime, ) -> list[Message]: messages = [] + for line in topic_raw_content: - if len(line) == 0: - continue session_time, content = _parse_line(line) if session_time is None: @@ -157,11 +197,14 @@ def _parse_and_decode_topic_content( ) ) + # messages are not guaranteed to be sorted return messages -@lru_cache() -def _get_t0(session_url: str) -> datetime: +@alru_cache() +async def _get_t0( + session_url: str, +) -> datetime: """Calculates the most likely start time of a session (t0) based on Position and CarData messages. The calculation method comes from the FastF1 package (https://github.com/theOehrly/Fast-F1/blob/317bacf8c61038d7e8d0f48165330167702b349f/fastf1/core.py#L2208). @@ -169,24 +212,31 @@ def _get_t0(session_url: str) -> datetime: t_ref = datetime(1970, 1, 1) t0_candidates = [] - position_content = _get_topic_content(session_url=session_url, topic="Position.z") - position_messages = _parse_and_decode_topic_content( + position_content = await _get_topic_content_async( + session_url=session_url, topic="Position.z" + ) + cardata_content = await _get_topic_content_async( + session_url=session_url, topic="CarData.z" + ) + + position_messages = await _parse_and_decode_topic_content( topic="Position.z", topic_raw_content=position_content, t0=t_ref, ) + + cardata_messages = await _parse_and_decode_topic_content( + topic="CarData.z", + topic_raw_content=cardata_content, + t0=t_ref, + ) + for message in position_messages: for record in message.content["Position"]: timepoint = to_datetime(record["Timestamp"]) session_time = message.timepoint - t_ref t0_candidates.append(timepoint - session_time) - cardata_content = _get_topic_content(session_url=session_url, topic="CarData.z") - cardata_messages = _parse_and_decode_topic_content( - topic="CarData.z", - topic_raw_content=cardata_content, - t0=t_ref, - ) for message in cardata_messages: for record in message.content["Entries"]: timepoint = to_datetime(record["Utc"]) @@ -200,54 +250,85 @@ def _get_t0(session_url: str) -> datetime: @cli.command() -def get_t0(year: int, meeting_key: int, session_key: int) -> datetime: +async def get_t0(year: int, meeting_key: int, session_key: int) -> datetime: session_url = get_session_url( year=year, meeting_key=meeting_key, session_key=session_key ) - t0 = _get_t0(session_url) + t0 = await _get_t0(session_url=session_url) if _is_called_from_cli: print(t0) return t0 -def _get_messages(session_url: str, topics: list[str], t0: datetime) -> list[Message]: +async def _get_messages( + session_url: str, + topics: list[str], + t0: datetime, + parallel: bool = False, +) -> list[Message]: messages = [] - for topic in topics: - raw_content = _get_topic_content( - session_url=session_url, - topic=topic, + if parallel: + raw_content_topics = await asyncio.gather( + *[ + _get_topic_content_async(session_url=session_url, topic=topic) + for topic in topics + ] ) - messages += _parse_and_decode_topic_content( - topic=topic, - topic_raw_content=raw_content, - t0=t0, + messages_topics = await asyncio.gather( + *[ + _parse_and_decode_topic_content( + topic=topic, + topic_raw_content=raw_content, + t0=t0, + ) + for topic, raw_content in zip(topics, raw_content_topics) + ] ) + messages = [ + message for messages_topic in messages_topics for message in messages_topic + ] + else: + for topic in topics: + raw_content = _get_topic_content( + session_url=session_url, + topic=topic, + ) + messages += await _parse_and_decode_topic_content( + topic=topic, + topic_raw_content=raw_content, + t0=t0, + ) + messages = sorted(messages, key=lambda m: (m.timepoint, m.topic)) + return messages @cli.command() -def get_messages( +async def get_messages( year: int, meeting_key: int, session_key: int, topics: list[str], + parallel: bool = False, verbose: bool = True, ) -> list[Message]: session_url = get_session_url( year=year, meeting_key=meeting_key, session_key=session_key ) if verbose: - logger.info(f"Session URL: {session_url}") + logger.info(f"Session URL: {session_url} for session {session_key}") - t0 = _get_t0(session_url) + t0 = await _get_t0(session_url=session_url) if verbose: - logger.info(f"t0: {t0}") + logger.info(f"t0: {t0} for session {session_key}") - messages = _get_messages(session_url=session_url, topics=topics, t0=t0) + messages = await _get_messages( + session_url=session_url, topics=topics, t0=t0, parallel=parallel + ) if verbose: - logger.info(f"Fetched {len(messages)} messages") + logger.info(f"Fetched {len(messages)} messages for session {session_key}") if _is_called_from_cli: messages_json = json.dumps(messages, indent=2, default=json_serializer) @@ -256,34 +337,40 @@ def get_messages( return messages -def _get_processed_documents( +async def _get_processed_documents( year: int, meeting_key: int, session_key: int, collection_names: list[str], + parallel: bool = False, verbose: bool = True, ) -> dict[str, list[Document]]: session_url = get_session_url( year=year, meeting_key=meeting_key, session_key=session_key ) if verbose: - logger.info(f"Session URL: {session_url}") + logger.info(f"Session URL: {session_url} for session {session_key}") - t0 = _get_t0(session_url) + t0 = await _get_t0(session_url=session_url) if verbose: - logger.info(f"t0: {t0}") + logger.info(f"t0: {t0} for session {session_key}") topics = set().union(*[get_source_topics(n) for n in collection_names]) topics = sorted(list(topics)) if verbose: - logger.info(f"Topics used: {topics}") + logger.info(f"Topics used: {topics} for session {session_key}") - messages = _get_messages(session_url=session_url, topics=topics, t0=t0) + messages = await _get_messages( + session_url=session_url, + topics=topics, + t0=t0, + parallel=parallel, + ) if verbose: - logger.info(f"Fetched {len(messages)} messages") + logger.info(f"Fetched {len(messages)} messages for session {session_key}") if verbose: - logger.info("Starting processing") + logger.info(f"Starting processing for session {session_key}") docs_by_collection = process_messages( messages=messages, meeting_key=meeting_key, session_key=session_key @@ -295,24 +382,26 @@ def _get_processed_documents( if verbose: n_docs = sum(len(d) for d in docs_by_collection.values()) - logger.info(f"Processed {n_docs} documents") + logger.info(f"Processed {n_docs} documents for session {session_key}") return docs_by_collection @cli.command() -def get_processed_documents( +async def get_processed_documents( year: int, meeting_key: int, session_key: int, collection_names: list[str], + parallel: bool = False, verbose: bool = True, ) -> dict[str, list[Document]]: - docs_by_collection = _get_processed_documents( + docs_by_collection = await _get_processed_documents( year=year, meeting_key=meeting_key, session_key=session_key, collection_names=collection_names, + parallel=parallel, verbose=verbose, ) @@ -329,30 +418,55 @@ def get_processed_documents( @cli.command() -def ingest_collections( +async def ingest_collections( year: int, meeting_key: int, session_key: int, collection_names: list[str], + parallel: bool = False, verbose: bool = True, ): - docs_by_collection = _get_processed_documents( + docs_by_collection = await _get_processed_documents( year=year, meeting_key=meeting_key, session_key=session_key, collection_names=collection_names, + parallel=parallel, verbose=verbose, ) if verbose: - logger.info("Inserting documents to DB") - for collection, docs in tqdm(list(docs_by_collection.items()), disable=not verbose): - docs_mongo = [d.to_mongo_doc_sync() for d in docs] - insert_data_sync(collection_name=collection, docs=docs_mongo) + logger.info(f"Inserting documents to DB for session {session_key}") + + if parallel: + await asyncio.gather( + *[ + insert_data_async( + collection_name=collection, + docs=[d.to_mongo_doc_sync() for d in docs], + ) + for collection, docs in docs_by_collection.items() + ] + ) + else: + for collection, docs in tqdm( + list(docs_by_collection.items()), disable=not verbose + ): + docs_mongo = [d.to_mongo_doc_sync() for d in docs] + insert_data_sync(collection_name=collection, docs=docs_mongo) @cli.command() -def ingest_session(year: int, meeting_key: int, session_key: int, verbose: bool = True): +async def ingest_session( + year: int, + meeting_key: int, + session_key: int, + parallel: bool = False, + verbose: bool = True, +): + if verbose: + logger.info(f"Ingesting session {session_key}") + collections = get_collections(meeting_key=meeting_key, session_key=session_key) collection_names = sorted([c.__class__.name for c in collections]) @@ -361,41 +475,91 @@ def ingest_session(year: int, meeting_key: int, session_key: int, verbose: bool f"Ingesting {len(collection_names)} collections: {collection_names}" ) - ingest_collections( + await ingest_collections( year=year, meeting_key=meeting_key, session_key=session_key, collection_names=collection_names, + parallel=parallel, verbose=verbose, ) @cli.command() -def ingest_meeting(year: int, meeting_key: int, verbose: bool = True): +async def ingest_meeting( + year: int, + meeting_key: int, + parallel: bool = False, + by_session: bool = False, + verbose: bool = True, +): + if verbose: + logger.info(f"Ingesting meeting {meeting_key}") + session_keys = get_session_keys(year=year, meeting_key=meeting_key) + if verbose: logger.info(f"{len(session_keys)} sessions found: {session_keys}") - for session_key in session_keys: - if verbose: - logger.info(f"Ingesting session {session_key}") - ingest_session( - year=year, meeting_key=meeting_key, session_key=session_key, verbose=False + if parallel and not by_session: + await asyncio.gather( + *[ + ingest_session( + year=year, + meeting_key=meeting_key, + session_key=session_key, + parallel=parallel, + verbose=verbose, + ) + for session_key in session_keys + ] ) + else: + for session_key in session_keys: + await ingest_session( + year=year, + meeting_key=meeting_key, + session_key=session_key, + parallel=parallel, + verbose=verbose, + ) @cli.command() -def ingest_season(year: int, verbose: bool = True): +async def ingest_season( + year: int, + parallel: bool = False, + by_meeting: bool = False, + verbose: bool = True, +): meeting_keys = get_meeting_keys(year) if verbose: logger.info(f"{len(meeting_keys)} meetings found: {meeting_keys}") - for meeting_key in meeting_keys: - if verbose: - logger.info(f"Ingesting meeting {meeting_key}") - ingest_meeting(year=year, meeting_key=meeting_key, verbose=False) + if parallel and not by_meeting: + await asyncio.gather( + *[ + ingest_meeting( + year=year, + meeting_key=meeting_key, + parallel=parallel, + verbose=verbose, + ) + for meeting_key in meeting_keys + ] + ) + else: + for meeting_key in meeting_keys: + await ingest_meeting( + year=year, + meeting_key=meeting_key, + parallel=parallel, + verbose=verbose, + ) if __name__ == "__main__": _is_called_from_cli = True + # might encounter aiohttp/asyncio complaints, but cleanup is done in a separate event loop + cli.add_event_handler(typer.ON_EXIT, http_client_cleanup) cli() diff --git a/src/openf1/services/ingestor_livetiming/historical/typer.py b/src/openf1/services/ingestor_livetiming/historical/typer.py new file mode 100644 index 0000000..95907af --- /dev/null +++ b/src/openf1/services/ingestor_livetiming/historical/typer.py @@ -0,0 +1,56 @@ +from asyncio import get_running_loop, iscoroutinefunction, new_event_loop, run +from collections import defaultdict +from functools import wraps +from typing import Callable + +import typer + +ON_START = "on_start" +ON_EXIT = "on_exit" + + +class Typer(typer.Typer): + """ + Wrapper class for typer.Typer to allow for decorating async functions. + Adapted from https://github.com/byunjuneseok/async-typer. + """ + + event_handlers: defaultdict[str, list[Callable]] = defaultdict(list) + + def command(self, *args, **kwargs): + def decorator(func: Callable): + @wraps(func) + def _func(*_args, **_kwargs): + self.run_event_handlers(ON_START) + try: + if iscoroutinefunction(func): + try: + get_running_loop() + except RuntimeError: + return run(func(*_args, **_kwargs)) + else: + loop = new_event_loop() + try: + return loop.run_until_complete(func(*args, **_kwargs)) + finally: + loop.close() + + return func(*_args, **_kwargs) + finally: + self.run_event_handlers(ON_EXIT) + + super(Typer, self).command(*args, **kwargs)(_func) + + return func + + return decorator + + def add_event_handler(self, event_type: str, func: Callable) -> None: + self.event_handlers[event_type].append(func) + + def run_event_handlers(self, event_type: str): + for event in self.event_handlers[event_type]: + if iscoroutinefunction(event): + run(event()) + else: + event() diff --git a/src/openf1/util/db.py b/src/openf1/util/db.py index 53b3daa..b6d10c7 100644 --- a/src/openf1/util/db.py +++ b/src/openf1/util/db.py @@ -1,3 +1,4 @@ +import asyncio import os from collections import defaultdict from datetime import datetime, timedelta, timezone @@ -10,7 +11,7 @@ from pymongo import InsertOne, MongoClient, ReplaceOne from pymongo.errors import BulkWriteError -from openf1.util.misc import hash_obj, timed_cache +from openf1.util.misc import batched, hash_obj, timed_cache _MONGO_CONNECTION_STRING = os.getenv("MONGO_CONNECTION_STRING") _MONGO_DATABASE = os.getenv("OPENF1_DB_NAME", "openf1-livetiming") @@ -304,12 +305,18 @@ def upsert_data_sync(collection_name: str, docs: list[dict], batch_size: int = 5 collection.bulk_write(operations, ordered=False) -async def insert_data_async(collection_name: str, docs: list[dict]): +async def insert_data_async( + collection_name: str, docs: list[dict], batch_size: int = 50_000 +): collection = _get_mongo_db_async()[collection_name] try: - operations = [InsertOne(doc) for doc in docs] - await collection.bulk_write(operations, ordered=False) + await asyncio.gather( + *[ + collection.bulk_write([InsertOne(doc) for doc in batch], ordered=False) + for batch in batched(docs, batch_size) + ] + ) except BulkWriteError as bwe: for error in bwe.details.get("writeErrors", []): logger.error(f"Error during bulk write operation: {error}") diff --git a/src/openf1/util/misc.py b/src/openf1/util/misc.py index f2d5b04..f363532 100644 --- a/src/openf1/util/misc.py +++ b/src/openf1/util/misc.py @@ -2,6 +2,7 @@ from datetime import datetime, timedelta from enum import Enum from functools import wraps +from itertools import islice from typing import Any, Callable import pytz @@ -193,3 +194,19 @@ def hash_obj(obj): else: # Assume obj is hashable return obj + + +def batched(iterable, n, strict=False): + """ + This function is equivalent to Python's itertools.batched: https://docs.python.org/3/library/itertools.html#itertools.batched + + Batch data from the iterable into tuples of length n. The last batch may be shorter than n. + If strict is true, will raise a ValueError if the final batch is shorter than n. + """ + if n < 1: + raise ValueError("n must be at least one") + iterator = iter(iterable) + while batch := tuple(islice(iterator, n)): + if strict and len(batch) != n: + raise ValueError("batched(): incomplete batch") + yield batch