Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 72 additions & 0 deletions databallpy/optimization/constraints.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
import numpy as np
import pandas as pd

from databallpy.game import Game
from databallpy.optimization.optimization import Constraint


class TTIConstraint(Constraint):
def __init__(
self,
game: Game,
frame: pd.Series,
max_time_to_intercept_seconds: float = 1,
reaction_time: float = 0.1,
max_velocity: float = 5.0,
):
self.max_time_to_intercept_seconds = max_time_to_intercept_seconds
self.reaction_time = reaction_time
self.max_velocity = max_velocity

self.player_to_starting_pos_and_vel_map = frame[
[c + "_x" for c in game.get_column_ids()]
+ [c + "_y" for c in game.get_column_ids()]
+ [c + "_vx" for c in game.get_column_ids()]
+ [c + "_vy" for c in game.get_column_ids()]
]

# TTI Implementation from https://github.com/devinpleuler/analytics-handbook/blob/master/soccer_analytics_handbook.ipynb
def tti(self, origin, destination, velocity):
u = (origin + velocity) - origin
v = destination - origin
u_mag = np.sqrt(np.sum(u**2, axis=-1))
v_mag = np.sqrt(np.sum(v**2, axis=-1))
dot_product = np.sum(u * v, axis=-1)

denom = u_mag * v_mag
# stationary player edge case
if denom == 0:
angle = 0.0
else:
angle = np.arccos(dot_product / denom)
r_reaction = origin + velocity * self.reaction_time
d = destination - r_reaction
t = (
u_mag * angle / np.pi
+ self.reaction_time
+ np.linalg.norm(d, axis=-1) / self.max_velocity
)

return t

def check(self, proposed_new_frame, player_id) -> bool:
x_col, y_col, vx_col, vy_col = [
player_id + suffix for suffix in ["_x", "_y", "_vx", "_vy"]
]
origin = np.array(
[
self.player_to_starting_pos_and_vel_map[x_col],
self.player_to_starting_pos_and_vel_map[y_col],
]
)
velocity = np.array(
[
self.player_to_starting_pos_and_vel_map[vx_col],
self.player_to_starting_pos_and_vel_map[vy_col],
]
)
destination = np.array([proposed_new_frame[x_col], proposed_new_frame[y_col]])

return (
self.tti(origin, destination, velocity) < self.max_time_to_intercept_seconds
)
101 changes: 101 additions & 0 deletions databallpy/optimization/objectives.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
import pickle
from pathlib import Path

import numpy as np
import pandas as pd

from databallpy.features.pitch_control import get_team_influence
from databallpy.game import Game
from databallpy.optimization.optimization import ObjectiveTerm, ObjectiveType
from databallpy.schemas.tracking_data import TrackingData
from databallpy.utils.utils import sigmoid
from scipy.ndimage import zoom

XT_MODEL_PATH = Path(__file__).resolve().parents[1] / "models" / "open_play_xT.npy"


class WeightedPitchControlObjective(ObjectiveTerm):
# Computes the net pitch control for the defending team over the attacking team
def __init__(
self,
game: Game,
frame: pd.Series,
xt_array: np.ndarray | None = None,
):
super().__init__(computation_type=ObjectiveType.GRID)
self.grid = np.meshgrid(
np.linspace(
-game.pitch_dimensions[0] / 2, game.pitch_dimensions[0] / 2, 106
),
np.linspace(-game.pitch_dimensions[1] / 2, game.pitch_dimensions[1] / 2, 68),
)

self.attacking_team = frame["team_possession"]
self.defending_team = "home" if self.attacking_team == "away" else "away"
self.defending_player_ids = game.get_column_ids(team=self.defending_team)

self.attacking_team_influence = get_team_influence(
frame,
col_ids=game.get_column_ids(team=self.attacking_team),
grid=self.grid,
player_ball_distances=None,
)
if xt_array is None:
open_play_xt = np.load(XT_MODEL_PATH)
# we are using (y, x) orientation instead of (x, y) so that it matches
self.xt_array = zoom(open_play_xt, (106 / 264, 68 / 196), order=1).T
else:
self.xt_array = xt_array

if self.attacking_team == "away":
self.xt_array = np.fliplr(self.xt_array)

def compute(
self,
input_frame: pd.Series,
) -> float:
team_influence_defending = get_team_influence(
input_frame,
col_ids=self.defending_player_ids,
grid=self.grid,
player_ball_distances=None,
)
# +ve is defending team, -ve is attacking team
net_sigmoid_diff = sigmoid(
team_influence_defending - self.attacking_team_influence, d=100
) # this makes the sigmoid steeper and more binary

return sum(sum(self.xt_array * net_sigmoid_diff))


class PressureObjective(ObjectiveTerm):
def __init__(
self,
game: Game,
frame: pd.Series,
players_to_press: list[str] | None = None,
):
super().__init__(computation_type=ObjectiveType.PLAYER)

self.game = game
self.players_to_press = (
players_to_press
if players_to_press
else game.get_column_ids(team=frame["team_possession"])
)

def compute(self, input_frame: pd.Series) -> float:
# pressure method only works on tracking data object, need to temporarily reconstruct it with the new data
pressure_score = 0
temp_tracking_df = pd.DataFrame(input_frame).T
temp_tracking_df = TrackingData(
temp_tracking_df.astype(self.game.tracking_data.dtypes)
)

for attacking_player in self.players_to_press:
pressure = temp_tracking_df.get_pressure_on_player(
temp_tracking_df.index[0], attacking_player, [106, 68], d_front=9
)
# compute the mean pressure on a player by dividing by number of players being pressed
pressure_score += pressure / len(self.players_to_press)
return pressure_score
83 changes: 83 additions & 0 deletions databallpy/optimization/optimization.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
from abc import ABC, abstractmethod
from dataclasses import dataclass
from enum import Enum
from typing import Any

import pandas as pd

from databallpy.game import Game
from databallpy.utils.logging import create_logger

LOGGER = create_logger(__name__)


class ObjectiveType(str, Enum):
GRID = "grid" # computed for each grid cell
PLAYER = "player" # computed for each player


class ObjectiveTerm:
def __init__(self, computation_type: ObjectiveType):
self.computation_type = computation_type

def compute(self, input_frame: pd.Series) -> float:
raise NotImplementedError


@dataclass
class OptimizationResult:
best_frame: pd.Series
best_result: float


class Constraint(ABC):
@abstractmethod
def check(self, proposed_new_frame, player_id) -> bool:
raise NotImplementedError


class OptimizationAlgorithm(ABC):
@abstractmethod
def __init__(
self,
game: Game,
selected_frame_idx: int,
objective_terms: list[ObjectiveTerm],
weights: list[float],
constraints: list[Constraint] | None = None,
):
if len(objective_terms) != len(weights):
raise ValueError("objective_terms and weights must have equal length")

self.game = game
self.frame = game.tracking_data.loc[[selected_frame_idx]].iloc[0]
self.constraints = constraints or []

self.objective_terms = objective_terms
self.weights = weights

@abstractmethod
def run(self) -> OptimizationResult:
raise NotImplementedError


def optimize_tracking_frame(
game: Game,
selected_frame_idx: int,
objective_terms: list[ObjectiveTerm],
weights: list[float],
constraints: list[Constraint],
algorithm: type[OptimizationAlgorithm],
**algorithm_kwargs: Any,
) -> OptimizationResult:
LOGGER.info("Running optimization with %s", algorithm.__name__)

optimizer = algorithm(
game=game,
selected_frame_idx=selected_frame_idx,
objective_terms=objective_terms,
weights=weights,
constraints=constraints,
**algorithm_kwargs,
)
return optimizer.run()
Loading
Loading