Skip to content
Merged
2 changes: 1 addition & 1 deletion pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ build-backend = "setuptools.build_meta"

[project]
name = "openf1"
version = "1.8.4"
version = "1.9.0"
authors = [
{ name="Bruno Godefroy" }
]
Expand Down
1 change: 1 addition & 0 deletions src/openf1/services/ingestor_livetiming/core/objects.py
Original file line number Diff line number Diff line change
Expand Up @@ -187,4 +187,5 @@ def get_topics() -> set[str]:
for cls in _get_collections_cls_by_name().values():
topics.update(cls.source_topics)
topics.add("SessionInfo")
topics.add("Heartbeat")
return topics
10 changes: 10 additions & 0 deletions src/openf1/services/ingestor_livetiming/real_time/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,13 @@ python -m openf1.services.ingestor_livetiming.real_time.app

The recording must be started at least 1h before the start of the session for races,
and at least 15 minutes before the start of the session for practice and qualifying.

### Live telemetry (optional)

To also ingest live telemetry, provide an F1TV subscription token:

```bash
export F1_TOKEN=...
```

See [this guide](https://github.com/SoMuchForSubtlety/f1viewer/wiki/Getting-your-subscription-token) for how to obtain a token. Note that tokens typically expire after 4 days.
77 changes: 62 additions & 15 deletions src/openf1/services/ingestor_livetiming/real_time/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,48 +6,95 @@
from loguru import logger

from openf1.services.ingestor_livetiming.core.objects import get_topics
from openf1.services.ingestor_livetiming.real_time.processing import ingest_file
from openf1.services.ingestor_livetiming.real_time.processing import ingest_files
from openf1.services.ingestor_livetiming.real_time.recording import record_to_file
from openf1.util.gcs import upload_to_gcs_periodically

TIMEOUT = 10800 # Terminate job if no data received for 3 hours (in seconds)
GCS_BUCKET = os.getenv("OPENF1_INGESTOR_LIVETIMING_GCS_BUCKET_RAW")
F1_TOKEN = os.getenv("F1_TOKEN")


async def main():
with tempfile.NamedTemporaryFile(mode="w", delete=True) as temp:
logger.info(f"Recording raw data to '{temp.name}'")
tasks = []

with tempfile.TemporaryDirectory() as temp_dir:
# Record raw data and save it to file
topics = get_topics()
logger.info(f"Starting live recording of the following topics: {topics}")
task_recording = asyncio.create_task(
record_to_file(filepath=temp.name, topics=topics, timeout=TIMEOUT)

tasks = []

temp_file_signalr = os.path.join(temp_dir, "signalr.txt")
logger.info(f"Recording raw signalr data to '{temp_file_signalr}'")
task_recording_signalr = asyncio.create_task(
record_to_file(
filepath=temp_file_signalr,
topics=topics,
timeout=TIMEOUT,
is_authenticated=False,
)
)
tasks.append(task_recording)
tasks.append(task_recording_signalr)

if F1_TOKEN:
temp_file_signalrcore = os.path.join(temp_dir, "signalrcore.txt")
logger.info(f"Recording raw signalrcore data to '{temp_file_signalrcore}'")
task_recording_signalrcore = asyncio.create_task(
record_to_file(
filepath=temp_file_signalrcore,
topics=topics,
timeout=TIMEOUT,
is_authenticated=True,
)
)
tasks.append(task_recording_signalrcore)

# Save received raw data to GCS, for debugging
if GCS_BUCKET:
# Save received raw data to GCS, for debugging
logger.info("Starting periodic GCS upload of raw data")
gcs_filekey = datetime.now(timezone.utc).strftime("%Y/%m/%d/%H:%M:%S.txt")
task_upload_raw = asyncio.create_task(
gcs_filekey = datetime.now(timezone.utc).strftime(
"%Y/%m/%d/signalr/%H:%M:%S.txt"
)
task_upload_raw_signalr = asyncio.create_task(
upload_to_gcs_periodically(
filepath=temp.name,
filepath=temp_file_signalr,
bucket=GCS_BUCKET,
destination_key=gcs_filekey,
interval=timedelta(seconds=60),
)
)
tasks.append(task_upload_raw)
tasks.append(task_upload_raw_signalr)

if F1_TOKEN:
gcs_filekey = datetime.now(timezone.utc).strftime(
"%Y/%m/%d/signalrcore/%H:%M:%S.txt"
)
task_upload_raw_signalrcore = asyncio.create_task(
upload_to_gcs_periodically(
filepath=temp_file_signalrcore,
bucket=GCS_BUCKET,
destination_key=gcs_filekey,
interval=timedelta(seconds=60),
offset=timedelta(
seconds=30
), # ensure both file uploads don't occur simultaneously
)
)
tasks.append(task_upload_raw_signalrcore)

# Ingest received data
logger.info("Starting data ingestion")
task_ingest = asyncio.create_task(ingest_file(temp.name))
temp_files = (
[temp_file_signalr, temp_file_signalrcore]
if F1_TOKEN
else [temp_file_signalr]
)
task_ingest = asyncio.create_task(ingest_files(temp_files))
tasks.append(task_ingest)

# Wait for the recording task to stop
await asyncio.wait([task_recording], return_when=asyncio.FIRST_COMPLETED)
await asyncio.wait(
[task_recording_signalr], return_when=asyncio.FIRST_COMPLETED
)
logger.info("Recording stopped")

# Cancel all the tasks
Expand Down
80 changes: 64 additions & 16 deletions src/openf1/services/ingestor_livetiming/real_time/processing.py
Original file line number Diff line number Diff line change
Expand Up @@ -108,43 +108,91 @@ async def ingest_line(line: str):
logger.warning("Inserting to MongoDB timed out. Skipping messages.")


async def ingest_file(filepath: str):
"""Ingests data from the specified file.
async def ingest_files(filepaths: list[str]):
"""Ingests data from the specified files.
This function first reads and processes all existing lines in the file.
This function first reads and processes all existing lines in each file sequentially.
After processing existing content, it continuously watches for new lines
appended to the file and processes them in real-time.
appended to any of the files and processes them in real-time.
"""
try:
await initialize_mqtt()

with open(filepath, "r") as file:
# Read and ingest existing lines
# Wait for all files to be created
logger.info("Waiting for recorder files to be created...")
max_wait = 60
wait_interval = 0.5
elapsed = 0

while elapsed < max_wait:
all_exist = all(os.path.exists(filepath) for filepath in filepaths)
if all_exist:
logger.info("All recorder files found")
break
await asyncio.sleep(wait_interval)
elapsed += wait_interval
else:
missing_files = [fp for fp in filepaths if not os.path.exists(fp)]
logger.error(f"Timeout waiting for files to be created: {missing_files}")

# Open all files and keep them open
open_files = []
for filepath in filepaths:
if not os.path.exists(filepath):
logger.warning(f"Skipping non-existent file: {filepath}")
continue

try:
file = open(filepath, "r")
open_files.append((filepath, file))
except Exception:
logger.exception(f"Failed to open file: {filepath}")

if not open_files:
logger.error("No files could be opened")
return

# First, ingest existing content from each file
for filepath, file in open_files:
lines = file.readlines()
for line in lines:
try:
await ingest_line(line)
except Exception:
logger.exception(
"Failed to ingest line, skipping to prevent crash. "
f"Failed to ingest line from {filepath}, skipping to prevent crash. "
f"Line content: '{line.strip()}'"
)

# Move to the end of the file
file.seek(0, 2)

# Watch for new lines
while True:
# Watch for new lines in all files
while True:
found_data = False
for filepath, file in open_files:
try:
line = file.readline()
if not line:
await asyncio.sleep(0.1) # Sleep a bit before trying again
continue
await ingest_line(line)
if line:
found_data = True
await ingest_line(line)
except Exception:
logger.exception(
"Failed to ingest line, skipping to prevent crash. "
f"Line content: '{line.strip()}'"
f"Failed to ingest line from {filepath}, skipping to prevent crash. "
f"Line content: '{line.strip() if line else ''}'"
)

# If no data was found in any file, sleep briefly before checking again
if not found_data:
await asyncio.sleep(0.1)

except Exception:
logger.exception(f"An unexpected error occurred while ingesting {filepath}")
logger.exception(
f"An unexpected error occurred while ingesting files: {filepaths}"
)
finally:
for filepath, file in open_files:
try:
file.close()
except Exception:
logger.exception(f"Failed to close file: {filepath}")
15 changes: 8 additions & 7 deletions src/openf1/services/ingestor_livetiming/real_time/recording.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,9 @@
from loguru import logger


async def record_to_file(filepath: str, topics: list[str], timeout: int):
async def record_to_file(
filepath: str, topics: list[str], timeout: int, is_authenticated: bool
):
"""Records raw F1 data to a file, using a slightly modified version of the FastF1
live timing module (https://github.com/br-g/fastf1-livetiming)
"""
Expand All @@ -12,23 +14,22 @@ async def record_to_file(filepath: str, topics: list[str], timeout: int):
command = (
["python", "-m", "fastf1_livetiming", "save", filepath]
+ sorted(list(topics))
+ (["--auth"] if is_authenticated else [])
+ ["--timeout", str(timeout)]
)
proc = await asyncio.create_subprocess_exec(
*command, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE
)
proc = await asyncio.create_subprocess_exec(*command)

stdout, stderr = await proc.communicate()
# Wait for the process to complete
await proc.wait()

# Check if the process exited cleanly with an exit code of 0.
if proc.returncode == 0:
logger.info(f"Stdout: {stdout.decode().strip()}")
logger.info("Recorder subprocess completed successfully.")
break
else:
logger.error(
f"Recorder subprocess failed with exit code {proc.returncode}."
)
logger.error(f"Stderr: {stderr.decode().strip()}")
except Exception:
logger.exception(
"An unexpected exception occurred while trying to run "
Expand Down
8 changes: 7 additions & 1 deletion src/openf1/util/gcs.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,17 @@ def upload_to_gcs(filepath: Path, bucket: str, destination_key: str):


async def upload_to_gcs_periodically(
filepath: Path, bucket: str, destination_key: Path, interval: timedelta
filepath: Path,
bucket: str,
destination_key: Path,
interval: timedelta,
offset: timedelta = timedelta(seconds=0),
):
"""Periodically uploads a file to Google Cloud Storage (GCS) at specified intervals"""
loop = asyncio.get_running_loop()

await asyncio.sleep(offset.total_seconds())

while True:
try:
# Wait for the specified interval
Expand Down