diff --git a/pyproject.toml b/pyproject.toml index a0a28ec..b9d0fa8 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta" [project] name = "openf1" -version = "1.8.4" +version = "1.9.0" authors = [ { name="Bruno Godefroy" } ] diff --git a/src/openf1/services/ingestor_livetiming/core/objects.py b/src/openf1/services/ingestor_livetiming/core/objects.py index 167c625..823a1b9 100644 --- a/src/openf1/services/ingestor_livetiming/core/objects.py +++ b/src/openf1/services/ingestor_livetiming/core/objects.py @@ -187,4 +187,5 @@ def get_topics() -> set[str]: for cls in _get_collections_cls_by_name().values(): topics.update(cls.source_topics) topics.add("SessionInfo") + topics.add("Heartbeat") return topics diff --git a/src/openf1/services/ingestor_livetiming/real_time/README.md b/src/openf1/services/ingestor_livetiming/real_time/README.md index cf1b30c..9a5fd81 100644 --- a/src/openf1/services/ingestor_livetiming/real_time/README.md +++ b/src/openf1/services/ingestor_livetiming/real_time/README.md @@ -8,3 +8,13 @@ python -m openf1.services.ingestor_livetiming.real_time.app The recording must be started at least 1h before the start of the session for races, and at least 15 minutes before the start of the session for practice and qualifying. + +### Live telemetry (optional) + +To also ingest live telemetry, provide an F1TV subscription token: + +```bash +export F1_TOKEN=... +``` + +See [this guide](https://github.com/SoMuchForSubtlety/f1viewer/wiki/Getting-your-subscription-token) for how to obtain a token. Note that tokens typically expire after 4 days. diff --git a/src/openf1/services/ingestor_livetiming/real_time/app.py b/src/openf1/services/ingestor_livetiming/real_time/app.py index b18ca5b..36b95ae 100644 --- a/src/openf1/services/ingestor_livetiming/real_time/app.py +++ b/src/openf1/services/ingestor_livetiming/real_time/app.py @@ -6,48 +6,95 @@ from loguru import logger from openf1.services.ingestor_livetiming.core.objects import get_topics -from openf1.services.ingestor_livetiming.real_time.processing import ingest_file +from openf1.services.ingestor_livetiming.real_time.processing import ingest_files from openf1.services.ingestor_livetiming.real_time.recording import record_to_file from openf1.util.gcs import upload_to_gcs_periodically TIMEOUT = 10800 # Terminate job if no data received for 3 hours (in seconds) GCS_BUCKET = os.getenv("OPENF1_INGESTOR_LIVETIMING_GCS_BUCKET_RAW") +F1_TOKEN = os.getenv("F1_TOKEN") async def main(): - with tempfile.NamedTemporaryFile(mode="w", delete=True) as temp: - logger.info(f"Recording raw data to '{temp.name}'") - tasks = [] - + with tempfile.TemporaryDirectory() as temp_dir: # Record raw data and save it to file topics = get_topics() logger.info(f"Starting live recording of the following topics: {topics}") - task_recording = asyncio.create_task( - record_to_file(filepath=temp.name, topics=topics, timeout=TIMEOUT) + + tasks = [] + + temp_file_signalr = os.path.join(temp_dir, "signalr.txt") + logger.info(f"Recording raw signalr data to '{temp_file_signalr}'") + task_recording_signalr = asyncio.create_task( + record_to_file( + filepath=temp_file_signalr, + topics=topics, + timeout=TIMEOUT, + is_authenticated=False, + ) ) - tasks.append(task_recording) + tasks.append(task_recording_signalr) + if F1_TOKEN: + temp_file_signalrcore = os.path.join(temp_dir, "signalrcore.txt") + logger.info(f"Recording raw signalrcore data to '{temp_file_signalrcore}'") + task_recording_signalrcore = asyncio.create_task( + record_to_file( + filepath=temp_file_signalrcore, + topics=topics, + timeout=TIMEOUT, + is_authenticated=True, + ) + ) + tasks.append(task_recording_signalrcore) + + # Save received raw data to GCS, for debugging if GCS_BUCKET: - # Save received raw data to GCS, for debugging logger.info("Starting periodic GCS upload of raw data") - gcs_filekey = datetime.now(timezone.utc).strftime("%Y/%m/%d/%H:%M:%S.txt") - task_upload_raw = asyncio.create_task( + gcs_filekey = datetime.now(timezone.utc).strftime( + "%Y/%m/%d/signalr/%H:%M:%S.txt" + ) + task_upload_raw_signalr = asyncio.create_task( upload_to_gcs_periodically( - filepath=temp.name, + filepath=temp_file_signalr, bucket=GCS_BUCKET, destination_key=gcs_filekey, interval=timedelta(seconds=60), ) ) - tasks.append(task_upload_raw) + tasks.append(task_upload_raw_signalr) + + if F1_TOKEN: + gcs_filekey = datetime.now(timezone.utc).strftime( + "%Y/%m/%d/signalrcore/%H:%M:%S.txt" + ) + task_upload_raw_signalrcore = asyncio.create_task( + upload_to_gcs_periodically( + filepath=temp_file_signalrcore, + bucket=GCS_BUCKET, + destination_key=gcs_filekey, + interval=timedelta(seconds=60), + offset=timedelta( + seconds=30 + ), # ensure both file uploads don't occur simultaneously + ) + ) + tasks.append(task_upload_raw_signalrcore) # Ingest received data logger.info("Starting data ingestion") - task_ingest = asyncio.create_task(ingest_file(temp.name)) + temp_files = ( + [temp_file_signalr, temp_file_signalrcore] + if F1_TOKEN + else [temp_file_signalr] + ) + task_ingest = asyncio.create_task(ingest_files(temp_files)) tasks.append(task_ingest) # Wait for the recording task to stop - await asyncio.wait([task_recording], return_when=asyncio.FIRST_COMPLETED) + await asyncio.wait( + [task_recording_signalr], return_when=asyncio.FIRST_COMPLETED + ) logger.info("Recording stopped") # Cancel all the tasks diff --git a/src/openf1/services/ingestor_livetiming/real_time/processing.py b/src/openf1/services/ingestor_livetiming/real_time/processing.py index 9c14888..d5771bd 100644 --- a/src/openf1/services/ingestor_livetiming/real_time/processing.py +++ b/src/openf1/services/ingestor_livetiming/real_time/processing.py @@ -108,43 +108,91 @@ async def ingest_line(line: str): logger.warning("Inserting to MongoDB timed out. Skipping messages.") -async def ingest_file(filepath: str): - """Ingests data from the specified file. +async def ingest_files(filepaths: list[str]): + """Ingests data from the specified files. - This function first reads and processes all existing lines in the file. + This function first reads and processes all existing lines in each file sequentially. After processing existing content, it continuously watches for new lines - appended to the file and processes them in real-time. + appended to any of the files and processes them in real-time. """ try: await initialize_mqtt() - with open(filepath, "r") as file: - # Read and ingest existing lines + # Wait for all files to be created + logger.info("Waiting for recorder files to be created...") + max_wait = 60 + wait_interval = 0.5 + elapsed = 0 + + while elapsed < max_wait: + all_exist = all(os.path.exists(filepath) for filepath in filepaths) + if all_exist: + logger.info("All recorder files found") + break + await asyncio.sleep(wait_interval) + elapsed += wait_interval + else: + missing_files = [fp for fp in filepaths if not os.path.exists(fp)] + logger.error(f"Timeout waiting for files to be created: {missing_files}") + + # Open all files and keep them open + open_files = [] + for filepath in filepaths: + if not os.path.exists(filepath): + logger.warning(f"Skipping non-existent file: {filepath}") + continue + + try: + file = open(filepath, "r") + open_files.append((filepath, file)) + except Exception: + logger.exception(f"Failed to open file: {filepath}") + + if not open_files: + logger.error("No files could be opened") + return + + # First, ingest existing content from each file + for filepath, file in open_files: lines = file.readlines() for line in lines: try: await ingest_line(line) except Exception: logger.exception( - "Failed to ingest line, skipping to prevent crash. " + f"Failed to ingest line from {filepath}, skipping to prevent crash. " f"Line content: '{line.strip()}'" ) # Move to the end of the file file.seek(0, 2) - # Watch for new lines - while True: + # Watch for new lines in all files + while True: + found_data = False + for filepath, file in open_files: try: line = file.readline() - if not line: - await asyncio.sleep(0.1) # Sleep a bit before trying again - continue - await ingest_line(line) + if line: + found_data = True + await ingest_line(line) except Exception: logger.exception( - "Failed to ingest line, skipping to prevent crash. " - f"Line content: '{line.strip()}'" + f"Failed to ingest line from {filepath}, skipping to prevent crash. " + f"Line content: '{line.strip() if line else ''}'" ) + + # If no data was found in any file, sleep briefly before checking again + if not found_data: + await asyncio.sleep(0.1) + except Exception: - logger.exception(f"An unexpected error occurred while ingesting {filepath}") + logger.exception( + f"An unexpected error occurred while ingesting files: {filepaths}" + ) + finally: + for filepath, file in open_files: + try: + file.close() + except Exception: + logger.exception(f"Failed to close file: {filepath}") diff --git a/src/openf1/services/ingestor_livetiming/real_time/recording.py b/src/openf1/services/ingestor_livetiming/real_time/recording.py index 530d83e..43de86e 100644 --- a/src/openf1/services/ingestor_livetiming/real_time/recording.py +++ b/src/openf1/services/ingestor_livetiming/real_time/recording.py @@ -3,7 +3,9 @@ from loguru import logger -async def record_to_file(filepath: str, topics: list[str], timeout: int): +async def record_to_file( + filepath: str, topics: list[str], timeout: int, is_authenticated: bool +): """Records raw F1 data to a file, using a slightly modified version of the FastF1 live timing module (https://github.com/br-g/fastf1-livetiming) """ @@ -12,23 +14,22 @@ async def record_to_file(filepath: str, topics: list[str], timeout: int): command = ( ["python", "-m", "fastf1_livetiming", "save", filepath] + sorted(list(topics)) + + (["--auth"] if is_authenticated else []) + ["--timeout", str(timeout)] ) - proc = await asyncio.create_subprocess_exec( - *command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE - ) + proc = await asyncio.create_subprocess_exec(*command) - stdout, stderr = await proc.communicate() + # Wait for the process to complete + await proc.wait() # Check if the process exited cleanly with an exit code of 0. if proc.returncode == 0: - logger.info(f"Stdout: {stdout.decode().strip()}") + logger.info("Recorder subprocess completed successfully.") break else: logger.error( f"Recorder subprocess failed with exit code {proc.returncode}." ) - logger.error(f"Stderr: {stderr.decode().strip()}") except Exception: logger.exception( "An unexpected exception occurred while trying to run " diff --git a/src/openf1/util/gcs.py b/src/openf1/util/gcs.py index f21a1b1..ed90b29 100644 --- a/src/openf1/util/gcs.py +++ b/src/openf1/util/gcs.py @@ -22,11 +22,17 @@ def upload_to_gcs(filepath: Path, bucket: str, destination_key: str): async def upload_to_gcs_periodically( - filepath: Path, bucket: str, destination_key: Path, interval: timedelta + filepath: Path, + bucket: str, + destination_key: Path, + interval: timedelta, + offset: timedelta = timedelta(seconds=0), ): """Periodically uploads a file to Google Cloud Storage (GCS) at specified intervals""" loop = asyncio.get_running_loop() + await asyncio.sleep(offset.total_seconds()) + while True: try: # Wait for the specified interval