diff --git a/Player_Tracking/afl_player_tracking_and_crowd_monitoring/player_tracking_logic/CSV Converter.py b/Player_Tracking/afl_player_tracking_and_crowd_monitoring/player_tracking_logic/CSV Converter.py new file mode 100644 index 00000000..98a90955 --- /dev/null +++ b/Player_Tracking/afl_player_tracking_and_crowd_monitoring/player_tracking_logic/CSV Converter.py @@ -0,0 +1,177 @@ +# -*- coding: utf-8 -*- +"""Untitled25.ipynb + +Automatically generated by Colab. + +Original file is located at + https://colab.research.google.com/drive/1_lpSJ727oRCSAGy86MTC3AN7Zrg6nSMi +""" + +from __future__ import annotations +import json +from pathlib import Path +from typing import Any, Iterable, Optional, Tuple, Dict + +import pandas as pd + +from google.colab import files + + +def _get(d: dict, path: Iterable[str], default=None): + cur = d + for k in path: + if not isinstance(cur, dict) or k not in cur: + return default + cur = cur[k] + return cur + +def is_tracking_schema(obj: Any) -> bool: + """Heuristic for AFL tracking schema: dict -> 'tracking_results' (list) -> frame dicts with 'players' list.""" + if not isinstance(obj, dict): + return False + trs = obj.get("tracking_results") + if not isinstance(trs, list) or not trs: + return False + for fr in trs: + if isinstance(fr, dict) and isinstance(fr.get("players"), list): + return True + return False + +def parse_tracking_json_to_df(json_obj: dict, conf_min: float = 0.0) -> pd.DataFrame: + rows = [] + for fr in json_obj.get("tracking_results", []): + if not isinstance(fr, dict): + continue + fno = fr.get("frame_number") + ts = fr.get("timestamp") + for p in fr.get("players", []): + if not isinstance(p, dict): + continue + + conf = p.get("confidence") + if conf is None: + continue + try: + if float(conf) < conf_min: + continue + except Exception: + pass + + x1 = _get(p, ["bbox","x1"]); y1 = _get(p, ["bbox","y1"]) + x2 = _get(p, ["bbox","x2"]); y2 = _get(p, ["bbox","y2"]) + cx = _get(p, ["center","x"]); cy = _get(p, ["center","y"]) + + w = p.get("width", (x2 - x1) if (x1 is not None and x2 is not None) else None) + h = p.get("height", (y2 - y1) if (y1 is not None and y2 is not None) else None) + + rows.append([fno, p.get("player_id"), ts, x1, y1, x2, y2, cx, cy, w, h, conf]) + + df = pd.DataFrame(rows, columns=[ + "frame_id","player_id","timestamp_s", + "x1","y1","x2","y2","cx","cy","w","h","confidence" + ]) + if df.empty: + return df + + for c in ["frame_id","player_id"]: + df[c] = pd.to_numeric(df[c], errors="coerce").astype("Int64") + for c in ["timestamp_s","x1","y1","x2","y2","cx","cy","w","h","confidence"]: + df[c] = pd.to_numeric(df[c], errors="coerce") + return df.sort_values(["frame_id","player_id"]).reset_index(drop=True) + + +def _guess_record_path(obj: Any) -> Optional[Tuple[str, list]]: + if isinstance(obj, list) and all(isinstance(x, dict) for x in obj): + return ("", []) + if isinstance(obj, dict): + list_keys = [k for k, v in obj.items() if isinstance(v, list)] + if len(list_keys) == 1 and all(isinstance(x, dict) for x in obj[list_keys[0]]): + meta_fields = [k for k in obj.keys() if k != list_keys[0]] + return (list_keys[0], meta_fields) + return None + +def normalize_json_to_df( + json_obj: Any, + record_key: Optional[str] = None, + meta: Optional[list[str]] = None, + sep: str = ".", +) -> pd.DataFrame: + guess = _guess_record_path(json_obj) if record_key is None else None + if record_key is None and guess: + record_key, guessed_meta = guess + if meta is None: + meta = guessed_meta + + if record_key == "": + data = json_obj + return pd.json_normalize(data, sep=sep, max_level=None) + + if record_key: + if not isinstance(json_obj, dict) or record_key not in json_obj: + raise ValueError(f"record_key '{record_key}' not found in root JSON object") + data = json_obj[record_key] + return pd.json_normalize(data, sep=sep, meta=meta or [], max_level=None) + + return pd.json_normalize(json_obj, sep=sep, max_level=None) + + +def colab_json2csv( + mode: str = "auto", + conf_min: float = 0.0, + record_key: Optional[str] = None, + meta: Optional[list[str]] = None, + sep: str = ".", + suffix: str = "_parsed", + download: bool = True, + preview_rows: int = 5, +) -> Dict[str, pd.DataFrame]: + + uploaded = files.upload() + if not uploaded: + print("No files uploaded.") + return {} + + results: Dict[str, pd.DataFrame] = {} + + for fname, content in uploaded.items(): + try: + obj = json.loads(content.decode("utf-8")) + except Exception as e: + print(f"[ERROR] Could not parse JSON in {fname}: {e}") + continue + + chosen = mode + if mode == "auto": + chosen = "tracking" if is_tracking_schema(obj) else "normalize" + + try: + if chosen == "tracking": + df = parse_tracking_json_to_df(obj, conf_min=conf_min) + else: + df = normalize_json_to_df(obj, record_key=record_key, meta=meta, sep=sep) + except Exception as e: + print(f"[ERROR] Failed to convert {fname} in {chosen} mode: {e}") + continue + + stem = Path(fname).stem + out_csv = f"{stem}{suffix}.csv" + df.to_csv(out_csv, index=False) + results[fname] = df + + nrows = len(df) + print(f"[OK] {fname} → {out_csv} ({nrows} rows) | mode={chosen}") + + if nrows and preview_rows: + display(df.head(preview_rows)) + + if download: + try: + files.download(out_csv) + except Exception as e: + print(f"(Download skipped for {out_csv}: {e})") + + return results + + +dfs = colab_json2csv() + diff --git a/Player_Tracking/afl_player_tracking_and_crowd_monitoring/player_tracking_logic/Sprint Detection.py b/Player_Tracking/afl_player_tracking_and_crowd_monitoring/player_tracking_logic/Sprint Detection.py new file mode 100644 index 00000000..c366b59a --- /dev/null +++ b/Player_Tracking/afl_player_tracking_and_crowd_monitoring/player_tracking_logic/Sprint Detection.py @@ -0,0 +1,41 @@ +# -*- coding: utf-8 -*- +"""Untitled27.ipynb + +Automatically generated by Colab. + +Original file is located at + https://colab.research.google.com/drive/1KUJ2_CipSaGbrTYK5_OFHtxEvfXOEDDt +""" + +import pandas as pd + +df = pd.read_csv("players_over_25kmh.csv") + +SPRINT_THRESHOLD = 7.0 + +df["is_sprint"] = df["speed_kmh"] > SPRINT_THRESHOLD + +segments = [] +for player_id, player_data in df.groupby("player_id"): + sprinting = False + start_frame = None + + for idx, row in player_data.iterrows(): + if row["is_sprint"] and not sprinting: + sprinting = True + start_frame = row["frame_id"] + elif not row["is_sprint"] and sprinting: + sprinting = False + end_frame = row["frame_id"] - 1 + segments.append([player_id, start_frame, end_frame]) + + if sprinting: + segments.append([player_id, start_frame, player_data["frame_id"].iloc[-1]]) + +sprint_segments = pd.DataFrame(segments, columns=["player_id", "start_frame", "end_frame"]) + +sprint_segments.to_csv("flagged_sprints.csv", index=False) + +print("Flagged sprint segments saved to flagged_sprints.csv") +print(sprint_segments.head()) +