Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
63 changes: 63 additions & 0 deletions annotation_converter.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,63 @@
# === annotation_converter.py ===
# Convert Basil's event annotations into a flat CSV
# Keeps "None" rows so we preserve full frame coverage

import argparse
import pandas as pd
import ast
from pathlib import Path

def parse_args():
ap = argparse.ArgumentParser(description="Convert Basil annotation export into flat CSV")
ap.add_argument("--in", dest="inp", required=True, help="Raw annotations CSV (export from Basil)")
ap.add_argument("--out", dest="out", required=True, help="Output parsed CSV")
return ap.parse_args()

def extract_event(cell):
"""
Each 'annotations' cell contains a JSON-like string with event info.
Example:
"[{'result': [{'value': {'choices': ['Kick']}}]}]"
"""
try:
parsed = ast.literal_eval(cell)
if isinstance(parsed, list) and parsed:
res = parsed[0].get("result", [])
if res:
choices = res[0].get("value", {}).get("choices", [])
if choices:
return choices[0].lower().strip()
except Exception:
return None
return None

def main():
args = parse_args()
df = pd.read_csv(args.inp)

if "annotations" not in df.columns or "data.image" not in df.columns:
raise ValueError("Expected columns 'annotations' and 'data.image' in input file")

# Extract event_name
df["event_name"] = df["annotations"].apply(extract_event)

# Ensure lowercase, and fill missing with "none"
df["event_name"] = df["event_name"].fillna("none").astype(str)

# Derive frame_id from filename
df["frame_id"] = (
df["data.image"]
.str.extract(r"frame_(\d+)\.png")[0]
.astype(int)
)

# Keep essential cols
out_df = df[["frame_id", "event_name"]].sort_values("frame_id").reset_index(drop=True)

# Save
Path(args.out).parent.mkdir(parents=True, exist_ok=True)
out_df.to_csv(args.out, index=False)
print(f"✅ Parsed annotations with {len(out_df)} rows → {args.out}")

if __name__ == "__main__":
main()
173 changes: 173 additions & 0 deletions annotation_sync.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
# === annotation_sync.py ===
# =============================================================================
# HOW TO RUN ANNOTATION SYNC
#
# Example: syncing Basil's events with Xuan's parsed tracking
#
# FULL dataset (all frames):
# python annotation_sync.py ^
# --events data/events_annotations/parsed_event_annotation.csv ^
# --track data/parsed_tracking.csv ^
# --out data/synced_annotations/synced_annotations.csv ^
# --report-dir data/synced_annotations --mode full
#
# EVENT-only dataset (only annotated events, skip "none"):
# python annotation_sync.py ^
# --events data/events_annotations/parsed_event_annotation.csv ^
# --track data/parsed_tracking.csv ^
# --out data/synced_annotations/synced_annotations.csv ^
# --report-dir data/synced_annotations --mode event
#
# Outputs:
# - synced_annotations_full.csv (all frames with tracking merged)
# - synced_annotations_event.csv (only annotated events)
# - sync_summary.json (match stats)
# - unmatched_events.csv (frames without matches)
# =============================================================================

import os, argparse, json
import pandas as pd
import numpy as np
from pathlib import Path
from collections import defaultdict

# -------------------------
# CLI
# -------------------------
def parse_args():
ap = argparse.ArgumentParser(description="Sync Basil events with Xuan tracking.")
ap.add_argument("--events", required=True, help="Events CSV (from annotation_converter).")
ap.add_argument("--track", required=True, help="Parsed tracking CSV (from convert_xuan_json).")
ap.add_argument("--out", required=True, help="Output CSV base name.")
ap.add_argument("--report-dir", required=True, help="Directory for reports.")
ap.add_argument("--frame-window", type=int, default=2, help="± frames for nearest-frame recovery.")
ap.add_argument("--mode", choices=["full", "event"], default="full",
help="Sync mode: 'full' keeps all frames, 'event' keeps only annotated events.")
return ap.parse_args()

# -------------------------
# Load data
# -------------------------
def load_data(events_path, track_path):
events_df = pd.read_csv(events_path)
track_df = pd.read_csv(track_path)

# enforce expected cols
if "event_name" not in events_df.columns:
raise ValueError("Events CSV must have 'event_name' column.")
if "frame_id" not in events_df.columns:
raise ValueError("Events CSV must have 'frame_id' column.")

# normalise types
events_df["frame_id"] = pd.to_numeric(events_df["frame_id"], errors="coerce").astype("Int64")
events_df["event_name"] = events_df["event_name"].astype(str).str.lower().str.strip()

for c in ["frame_id","player_id"]:
if c in track_df.columns:
track_df[c] = pd.to_numeric(track_df[c], errors="coerce").astype("Int64")

return events_df, track_df

# -------------------------
# Sync annotations
# -------------------------
def sync(events_df, track_df, frame_window=2):
# exact join on frame_id
merged = events_df.merge(track_df, how="left", on="frame_id", suffixes=("_ev",""))
exact_hits = merged["x1"].notna().sum()
print(f"Exact matches: {exact_hits}/{len(events_df)}")

# nearest frame recovery
unmatched = merged[merged["x1"].isna()][["frame_id","event_name"]].copy()
if unmatched.empty:
return merged

recovered = []
grouped = dict(tuple(track_df.groupby("frame_id")))
for _, row in unmatched.iterrows():
f = int(row["frame_id"])
best = None
best_delta = None
for d in range(-frame_window, frame_window+1):
cand = grouped.get(f+d)
if cand is None:
continue
pick = cand.sort_values("confidence", ascending=False).iloc[0].copy()
delta = abs(d)
if best is None or delta < best_delta:
best = pick
best_delta = delta
if best is not None:
r = {"frame_id": f, "event_name": row["event_name"]}
for c in ["player_id","timestamp_s","x1","y1","x2","y2","cx","cy","w","h","confidence"]:
r[c] = best.get(c, np.nan)
r["frame_id_matched"] = best.get("frame_id", f)
recovered.append(r)

if recovered:
rec_df = pd.DataFrame(recovered)
merged = pd.concat([merged, rec_df], ignore_index=True)

return merged

# -------------------------
# Summarize sync
# -------------------------
def summarize(events_df, synced_df, report_dir):
Path(report_dir).mkdir(parents=True, exist_ok=True)

total = len(events_df)
# unique events matched at least once
matched_events = synced_df.groupby(["frame_id", "event_name"])["x1"].apply(lambda x: x.notna().any())
matched = matched_events.sum()
unmatched = total - matched

by_event = events_df["event_name"].value_counts().to_dict()

print("\n====== Annotation Sync Summary ======")
print(f"Events total : {total}")
print(f"Matched (with boxes) : {matched}")
print(f"Unmatched : {unmatched}")

# save unmatched list
unmatched_df = synced_df[synced_df["x1"].isna()][["frame_id","event_name"]]
unmatched_df.to_csv(Path(report_dir)/"unmatched_events.csv", index=False)

# save summary json
report_json = {
"events_total": total,
"matched": int(matched),
"unmatched": int(unmatched),
"by_event": by_event
}
with open(Path(report_dir)/"sync_summary.json", "w") as f:
json.dump(report_json, f, indent=2)

print(f"\n📄 Reports saved in {report_dir}")

# -------------------------
# Save outputs
# -------------------------
def save_outputs(synced_df, args):
out_path = Path(args.out)
if args.mode == "event":
synced_df = synced_df[synced_df["event_name"] != "none"].copy()
out_path = out_path.with_name("synced_annotations_event.csv")
else:
out_path = out_path.with_name("synced_annotations_full.csv")

synced_df.to_csv(out_path, index=False)
print(f"\n✅ Synced annotations saved → {out_path}")

# -------------------------
# Main
# -------------------------
def main():
args = parse_args()
events_df, track_df = load_data(args.events, args.track)
synced_df = sync(events_df, track_df, frame_window=args.frame_window)
summarize(events_df, synced_df, args.report_dir)
save_outputs(synced_df, args)

if __name__ == "__main__":
main()
83 changes: 83 additions & 0 deletions event_timing_analysis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
"""
Event Timing Analysis (Cumulative Only)
---------------------------------------
This script generates a single timeline graph showing the **cumulative count of AFL events**
across the duration of a clip. It helps visualize when events (kick, mark, tackle) occur
relative to the clip length.

Inputs:
--events : Path to synced annotations CSV
--out-dir: Directory to save results
--fps : Frames per second of the video (used to convert frames → seconds)

Outputs (saved in --out-dir):
- cumulative_timeline.csv : Table of cumulative event counts over time
- cumulative_timeline.png : Line graph showing cumulative counts
"""

import argparse
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
from pathlib import Path

def load_data(path: Path, fps: float) -> pd.DataFrame:
df = pd.read_csv(path)

# drop invalid/blank events
df = df[df["event_name"].notna()]
df = df[df["event_name"].str.lower() != "none"]

# create time_s if missing
if "time_s" not in df.columns:
df["time_s"] = df["frame_id"] / fps

return df

def cumulative_counts(df: pd.DataFrame, fps: float, clip_len: float):
times = np.linspace(0, clip_len, int(clip_len * fps) + 1)
cum = pd.DataFrame(index=times)
for ev, g in df.groupby("event_name"):
y, _ = np.histogram(g["time_s"], bins=times)
y_cum = np.concatenate([[0], y.cumsum()])[: len(times)]
cum[ev] = y_cum
return cum

def plot_cumulative(cum: pd.DataFrame, out_dir: Path):
plt.figure(figsize=(10,6))
for col in cum.columns:
plt.plot(cum.index, cum[col], label=col)
plt.xlabel("Time (s)")
plt.ylabel("Cumulative Events")
plt.title("Cumulative Event Timeline")
plt.legend()
plt.tight_layout()
plt.savefig(out_dir / "cumulative_timeline.png")
plt.close()

def main():
ap = argparse.ArgumentParser()
ap.add_argument("--events", required=True, type=Path, help="Synced annotations CSV")
ap.add_argument("--out-dir", required=True, type=Path, help="Output directory")
ap.add_argument("--fps", required=True, type=float, help="Frames per second")
args = ap.parse_args()

df = load_data(args.events, args.fps)

# detect clip length
max_frame = df["frame_id"].max()
clip_len = max_frame / args.fps
print(f"Detected clip length: {clip_len:.2f} sec")

# compute cumulative timeline
cum = cumulative_counts(df, args.fps, clip_len)

# save results
args.out_dir.mkdir(parents=True, exist_ok=True)
cum.to_csv(args.out_dir / "cumulative_timeline.csv")
plot_cumulative(cum, args.out_dir)

print(f"Saved cumulative timeline → {args.out_dir}")

if __name__ == "__main__":
main()
Loading