Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
27 changes: 27 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,28 @@
# macOS system files
.DS_Store

# Python venv
.venv/

# Jupyter
*.ipynb
.ipynb_checkpoints/

# Python cache
__pycache__/
*.pyc
*.pyo

# Data / outputs
data/
out/
runs/

# IDEs
.vscode/
.idea/

# Model weights & large files
*.pt
*.pth
*.zip
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
\# AFL Player Tracking Logic



This folder contains scripts developed for the Redback Project AFL Player Tracking system.

These scripts support syncing, merging, evaluating, and analyzing AFL player tracking and event data.



\## 📂 Scripts



\- \*\*annotation\_converter.py\*\*

  Converts annotation formats into a consistent structure for further processing.



\- \*\*annotation\_sync.py\*\*

  Syncs manual event annotations with YOLOv8 + DeepSORT tracking outputs.



\- \*\*event\_timing\_analysis.py\*\*

  Analyzes and visualizes the timing of events across match footage.



\- \*\*merge\_synced.py\*\*

  Merges multiple synced annotation files into a single dataset.



\- \*\*prediction\_vs\_truth.py\*\*

  Evaluates predicted events against ground truth annotations, producing precision/recall metrics.



---



\## Usage



Each script can be run independently from the command line:



```bash

python annotation\_converter.py <input\_file> <output\_file>

python annotation\_sync.py <tracking\_file> <annotation\_file> <output\_file>

python event\_timing\_analysis.py <synced\_file> <output\_directory>

python merge\_synced.py <synced\_directory> <merged\_output\_file>

python prediction\_vs\_truth.py <predicted\_file> <truth\_file> <output\_report>





\## Notes



\- Replace `<...>` placeholders with the actual file paths and directories you are using.

\- Ensure that output directories exist before running scripts (create them with `mkdir` if necessary).

\- Scripts can be run independently, but together they form a full pipeline for syncing, merging, evaluating, and analyzing AFL tracking + event data.





\## Author



Developed by \*\*Jarrod Gillingham\*\* for \*Redback Project 4 – Player Tracking Logic / Integration \& Evaluation Team\*.







Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# === annotation_converter.py ===
# Convert Basil's event annotations into a flat CSV
# Keeps "None" rows so we preserve full frame coverage

import argparse
import pandas as pd
import ast
from pathlib import Path

def parse_args():
ap = argparse.ArgumentParser(description="Convert Basil annotation export into flat CSV")
ap.add_argument("--in", dest="inp", required=True, help="Raw annotations CSV (export from Basil)")
ap.add_argument("--out", dest="out", required=True, help="Output parsed CSV")
return ap.parse_args()

def extract_event(cell):
"""
Each 'annotations' cell contains a JSON-like string with event info.
Example:
"[{'result': [{'value': {'choices': ['Kick']}}]}]"
"""
try:
parsed = ast.literal_eval(cell)
if isinstance(parsed, list) and parsed:
res = parsed[0].get("result", [])
if res:
choices = res[0].get("value", {}).get("choices", [])
if choices:
return choices[0].lower().strip()
except Exception:
return None
return None

def main():
args = parse_args()
df = pd.read_csv(args.inp)

if "annotations" not in df.columns or "data.image" not in df.columns:
raise ValueError("Expected columns 'annotations' and 'data.image' in input file")

# Extract event_name
df["event_name"] = df["annotations"].apply(extract_event)

# Ensure lowercase, and fill missing with "none"
df["event_name"] = df["event_name"].fillna("none").astype(str)

# Derive frame_id from filename
df["frame_id"] = (
df["data.image"]
.str.extract(r"frame_(\d+)\.png")[0]
.astype(int)
)

# Keep essential cols
out_df = df[["frame_id", "event_name"]].sort_values("frame_id").reset_index(drop=True)

# Save
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
out_df.to_csv(args.out, index=False)
print(f"✅ Parsed annotations with {len(out_df)} rows → {args.out}")

if __name__ == "__main__":
main()
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# === annotation_sync.py ===
# =============================================================================
# HOW TO RUN ANNOTATION SYNC
#
# Example: syncing Basil's events with Xuan's parsed tracking
#
# FULL dataset (all frames):
# python annotation_sync.py ^
# --events data/events_annotations/parsed_event_annotation.csv ^
# --track data/parsed_tracking.csv ^
# --out data/synced_annotations/synced_annotations.csv ^
# --report-dir data/synced_annotations --mode full
#
# EVENT-only dataset (only annotated events, skip "none"):
# python annotation_sync.py ^
# --events data/events_annotations/parsed_event_annotation.csv ^
# --track data/parsed_tracking.csv ^
# --out data/synced_annotations/synced_annotations.csv ^
# --report-dir data/synced_annotations --mode event
#
# Outputs:
# - synced_annotations_full.csv (all frames with tracking merged)
# - synced_annotations_event.csv (only annotated events)
# - sync_summary.json (match stats)
# - unmatched_events.csv (frames without matches)
# =============================================================================

import os, argparse, json
import pandas as pd
import numpy as np
from pathlib import Path
from collections import defaultdict

# -------------------------
# CLI
# -------------------------
def parse_args():
ap = argparse.ArgumentParser(description="Sync Basil events with Xuan tracking.")
ap.add_argument("--events", required=True, help="Events CSV (from annotation_converter).")
ap.add_argument("--track", required=True, help="Parsed tracking CSV (from convert_xuan_json).")
ap.add_argument("--out", required=True, help="Output CSV base name.")
ap.add_argument("--report-dir", required=True, help="Directory for reports.")
ap.add_argument("--frame-window", type=int, default=2, help="± frames for nearest-frame recovery.")
ap.add_argument("--mode", choices=["full", "event"], default="full",
help="Sync mode: 'full' keeps all frames, 'event' keeps only annotated events.")
return ap.parse_args()

# -------------------------
# Load data
# -------------------------
def load_data(events_path, track_path):
events_df = pd.read_csv(events_path)
track_df = pd.read_csv(track_path)

# enforce expected cols
if "event_name" not in events_df.columns:
raise ValueError("Events CSV must have 'event_name' column.")
if "frame_id" not in events_df.columns:
raise ValueError("Events CSV must have 'frame_id' column.")

# normalise types
events_df["frame_id"] = pd.to_numeric(events_df["frame_id"], errors="coerce").astype("Int64")
events_df["event_name"] = events_df["event_name"].astype(str).str.lower().str.strip()

for c in ["frame_id","player_id"]:
if c in track_df.columns:
track_df[c] = pd.to_numeric(track_df[c], errors="coerce").astype("Int64")

return events_df, track_df

# -------------------------
# Sync annotations
# -------------------------
def sync(events_df, track_df, frame_window=2):
# exact join on frame_id
merged = events_df.merge(track_df, how="left", on="frame_id", suffixes=("_ev",""))
exact_hits = merged["x1"].notna().sum()
print(f"Exact matches: {exact_hits}/{len(events_df)}")

# nearest frame recovery
unmatched = merged[merged["x1"].isna()][["frame_id","event_name"]].copy()
if unmatched.empty:
return merged

recovered = []
grouped = dict(tuple(track_df.groupby("frame_id")))
for _, row in unmatched.iterrows():
f = int(row["frame_id"])
best = None
best_delta = None
for d in range(-frame_window, frame_window+1):
cand = grouped.get(f+d)
if cand is None:
continue
pick = cand.sort_values("confidence", ascending=False).iloc[0].copy()
delta = abs(d)
if best is None or delta < best_delta:
best = pick
best_delta = delta
if best is not None:
r = {"frame_id": f, "event_name": row["event_name"]}
for c in ["player_id","timestamp_s","x1","y1","x2","y2","cx","cy","w","h","confidence"]:
r[c] = best.get(c, np.nan)
r["frame_id_matched"] = best.get("frame_id", f)
recovered.append(r)

if recovered:
rec_df = pd.DataFrame(recovered)
merged = pd.concat([merged, rec_df], ignore_index=True)

return merged

# -------------------------
# Summarize sync
# -------------------------
def summarize(events_df, synced_df, report_dir):
Path(report_dir).mkdir(parents=True, exist_ok=True)

total = len(events_df)
# unique events matched at least once
matched_events = synced_df.groupby(["frame_id", "event_name"])["x1"].apply(lambda x: x.notna().any())
matched = matched_events.sum()
unmatched = total - matched

by_event = events_df["event_name"].value_counts().to_dict()

print("\n====== Annotation Sync Summary ======")
print(f"Events total : {total}")
print(f"Matched (with boxes) : {matched}")
print(f"Unmatched : {unmatched}")

# save unmatched list
unmatched_df = synced_df[synced_df["x1"].isna()][["frame_id","event_name"]]
unmatched_df.to_csv(Path(report_dir)/"unmatched_events.csv", index=False)

# save summary json
report_json = {
"events_total": total,
"matched": int(matched),
"unmatched": int(unmatched),
"by_event": by_event
}
with open(Path(report_dir)/"sync_summary.json", "w") as f:
json.dump(report_json, f, indent=2)

print(f"\n📄 Reports saved in {report_dir}")

# -------------------------
# Save outputs
# -------------------------
def save_outputs(synced_df, args):
out_path = Path(args.out)
if args.mode == "event":
synced_df = synced_df[synced_df["event_name"] != "none"].copy()
out_path = out_path.with_name("synced_annotations_event.csv")
else:
out_path = out_path.with_name("synced_annotations_full.csv")

synced_df.to_csv(out_path, index=False)
print(f"\n✅ Synced annotations saved → {out_path}")

# -------------------------
# Main
# -------------------------
def main():
args = parse_args()
events_df, track_df = load_data(args.events, args.track)
synced_df = sync(events_df, track_df, frame_window=args.frame_window)
summarize(events_df, synced_df, args.report_dir)
save_outputs(synced_df, args)

if __name__ == "__main__":
main()
Loading