Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 9 additions & 3 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,8 @@ def health_check():
return jsonify({'status': 'ok'}), 200

# Route for validating calibration


@app.route("/api/session/calib_validation", methods=["POST"])
def calib_validation():
"""
Expand All @@ -72,8 +74,12 @@ def calib_validation():
return session_route.calib_results()
return Response('Invalid request method for route', status=405, mimetype='application/json')


@app.route('/api/session/batch_predict', methods=['POST'])
def batch_predict():
if request.method == 'POST':
return session_route.batch_predict()
return Response('Invalid request method for route', status=405, mimetype='application/json')
return session_route.batch_predict()


@app.route("/health", methods=["GET"])
def health():
return jsonify({"status": "ok"}), 200
34 changes: 10 additions & 24 deletions app/routes/session.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
import math
import numpy as np


from pathlib import Path
import os
import pandas as pd
Expand Down Expand Up @@ -35,10 +36,10 @@
def convert_nan_to_none(obj):
"""
Recursively converts NaN and Inf values to None for proper JSON serialization.

Args:
obj: Python object (dict, list, float, etc.)

Returns:
The object with NaN/Inf values converted to None
"""
Expand All @@ -57,8 +58,6 @@ def convert_nan_to_none(obj):
return obj




def calib_results():
from_ruxailab = json.loads(request.form['from_ruxailab'])
file_name = json.loads(request.form['file_name'])
Expand Down Expand Up @@ -110,7 +109,8 @@ def calib_results():
f"{Path().absolute()}/app/services/calib_validation/csv/data/", exist_ok=True
)
predict_csv_file = f"{Path().absolute()}/app/services/calib_validation/csv/data/{file_name}_predict_train_data.csv"
csv_columns = ["left_iris_x", "left_iris_y", "right_iris_x", "right_iris_y"]
csv_columns = ["left_iris_x", "left_iris_y",
"right_iris_x", "right_iris_y"]
try:
with open(predict_csv_file, "w") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=csv_columns)
Expand Down Expand Up @@ -147,6 +147,7 @@ def calib_results():
data = convert_nan_to_none(data)
return Response(json.dumps(data), status=200, mimetype='application/json')


def batch_predict():
try:
data = request.get_json()
Expand All @@ -162,35 +163,20 @@ def batch_predict():

base_path = Path().absolute() / "app/services/calib_validation/csv/data"
calib_csv_path = base_path / f"{calib_id}_fixed_train_data.csv"
predict_csv_path = base_path / "temp_batch_predict.csv"

# CSV temporário
with open(predict_csv_path, "w", newline="") as csvfile:
writer = csv.DictWriter(csvfile, fieldnames=[
"left_iris_x", "left_iris_y", "right_iris_x", "right_iris_y"
])
writer.writeheader()
for item in iris_data:
writer.writerow({
"left_iris_x": item["left_iris_x"],
"left_iris_y": item["left_iris_y"],
"right_iris_x": item["right_iris_x"],
"right_iris_y": item["right_iris_y"],
})
df_predict = pd.DataFrame(iris_data)

result = gaze_tracker.predict_new_data_simple(
calib_csv_path=calib_csv_path,
predict_csv_path=predict_csv_path,
predict_df=df_predict,
iris_data=iris_data,
# model_X="Random Forest Regressor",
# model_Y="Random Forest Regressor",
screen_width=screen_width,
screen_height=screen_height,
)

return jsonify(convert_nan_to_none(result))

except Exception as e:
print("Erro batch_predict:", e)

traceback.print_exc()
return Response("Erro interno", status=500)
return Response("Erro interno", status=500)
140 changes: 68 additions & 72 deletions app/services/gaze_tracker.py
Original file line number Diff line number Diff line change
@@ -1,47 +1,47 @@
# Necessary imports
from app.services.config import hyperparameters
from app.services.metrics import (
func_precision_x,
func_presicion_y,
func_accuracy_x,
func_accuracy_y,
func_total_accuracy,
)
from sklearn.metrics import (
mean_squared_error,
mean_absolute_error,
mean_squared_log_error,
r2_score,
)
from sklearn.metrics import make_scorer
import matplotlib.pyplot as plt
from sklearn.model_selection import GroupShuffleSplit
from sklearn.model_selection import GridSearchCV
from sklearn.cluster import KMeans
from sklearn.svm import SVR
from sklearn import linear_model
from sklearn.linear_model import Ridge
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import make_pipeline
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.model_selection import train_test_split
from pathlib import Path
import pandas as pd
import numpy as np
import math
import warnings

warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
from pathlib import Path

# Scikit-learn imports
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import StandardScaler, PolynomialFeatures
from sklearn.pipeline import make_pipeline
from sklearn.ensemble import RandomForestRegressor
from sklearn.linear_model import Ridge


# Model imports
from sklearn import linear_model
from sklearn.svm import SVR
from sklearn.cluster import KMeans
from sklearn.model_selection import GridSearchCV
from sklearn.model_selection import GroupShuffleSplit
import matplotlib.pyplot as plt

# Metrics imports
from sklearn.metrics import make_scorer
from sklearn.metrics import (
mean_squared_error,
mean_absolute_error,
mean_squared_log_error,
r2_score,
)

# Local imports
from app.services.metrics import (
func_precision_x,
func_presicion_y,
func_accuracy_x,
func_accuracy_y,
func_total_accuracy,
)
from app.services.config import hyperparameters


# Machine learning models to use
Expand All @@ -62,13 +62,13 @@
PolynomialFeatures(2), SVR(kernel="linear")
),
"Random Forest Regressor": make_pipeline(
RandomForestRegressor(
n_estimators=200,
max_depth=10,
min_samples_split=5,
random_state=42
)
)}
RandomForestRegressor(
n_estimators=200,
max_depth=10,
min_samples_split=5,
random_state=42
)
)}

# Set the scoring metrics for GridSearchCV to r2_score and mean_absolute_error
scoring = {
Expand All @@ -81,6 +81,7 @@ def squash(v, limit=1.0):
"""Squash não-linear estilo WebGazer"""
return np.tanh(v / limit)


def trian_and_predict(model_name, X_train, y_train, X_test, y_test, label):
"""
Helper to train a model (with or without GridSearchCV) and return predictions.
Expand All @@ -89,7 +90,6 @@ def trian_and_predict(model_name, X_train, y_train, X_test, y_test, label):
model = models[model_name]
model.fit(X_train, y_train)
y_pred = model.predict(X_test)
print(f"Score {label}: {r2_score(y_test, y_pred)}")
return y_pred
else:
pipeline = models[model_name]
Expand Down Expand Up @@ -122,11 +122,10 @@ def predict(data, k, model_X, model_Y):
dict: A dictionary containing the predicted gaze coordinates, precision, accuracy, and cluster centroids.
"""


# Load data from csv file and drop unnecessary columns
df = pd.read_csv(data)
df = df.drop(["screen_height", "screen_width"], axis=1)
print(df.head())

# Create groups (point_x, point_y)
df["group"] = list(zip(df["point_x"], df["point_y"]))

Expand All @@ -143,30 +142,31 @@ def predict(data, k, model_X, model_Y):
y_train_x, y_test_x,
X_train_y, X_test_y,
y_train_y, y_test_y
)= train_test_split(
) = train_test_split(
X_x,
X_y,
X_feature_y,
y_y,
test_size=0.2,
random_state=42,
)

# Scaling (fit on train only)
scaler_x = StandardScaler()
X_train_x = scaler_x.fit_transform(X_train_x)
X_test_x = scaler_x.transform(X_test_x)

y_pred_x = trian_and_predict(model_X, X_train_x, y_train_x, X_test_x, y_test_x, "X")

X_test_x = scaler_x.transform(X_test_x)

y_pred_x = trian_and_predict(
model_X, X_train_x, y_train_x, X_test_x, y_test_x, "X")

# Scaling (fit on train only)
scaler_y = StandardScaler()
X_train_y = scaler_y.fit_transform(X_train_y)
X_test_y = scaler_y.transform(X_test_y)
X_test_y = scaler_y.transform(X_test_y)

y_pred_y = trian_and_predict(
model_Y, X_train_y, y_train_y, X_test_y, y_test_y, "Y")


y_pred_y = trian_and_predict(model_Y, X_train_y, y_train_y, X_test_y, y_test_y, "Y")

# Convert the predictions to a numpy array and apply KMeans clustering
data = np.array([y_pred_x, y_pred_y]).T
model = KMeans(n_clusters=k, n_init="auto", init="k-means++")
Expand All @@ -181,20 +181,20 @@ def predict(data, k, model_X, model_Y):
}
df_data = pd.DataFrame(data)
df_data["True XY"] = list(zip(df_data["True X"], df_data["True Y"]))

# Filter out negative values
df_data = df_data[(df_data["Predicted X"] >= 0) & (df_data["Predicted Y"] >= 0)]
df_data = df_data[(df_data["Predicted X"] >= 0) &
(df_data["Predicted Y"] >= 0)]

# Calculate the precision and accuracy for each
# Calculate the precision and accuracy for each
precision_x = df_data.groupby("True XY").apply(func_precision_x)
precision_y = df_data.groupby("True XY").apply(func_presicion_y)

# Calculate the average precision
# Calculate the average precision
precision_xy = (precision_x + precision_y) / 2

# Calculate the average accuracy (eculidian distance)
accuracy_xy = df_data.groupby("True XY").apply(func_total_accuracy)


# Create a dictionary to store the data
data = {}
Expand Down Expand Up @@ -233,10 +233,10 @@ def predict(data, k, model_X, model_Y):

def predict_new_data_simple(
calib_csv_path,
predict_csv_path,
predict_df,
iris_data,
screen_width=None,
screen_height=None,
screen_width,
screen_height,
):
# ============================
# CONFIG (WebGazer-inspired)
Expand All @@ -255,8 +255,10 @@ def predict_new_data_simple(
y_center = screen_height / 2

# normalize targets to [-1, 1] space
y_train_x = (df_train["point_x"].values.astype(float) - x_center) / (screen_width / 2)
y_train_y = (df_train["point_y"].values.astype(float) - y_center) / (screen_height / 2)
y_train_x = (df_train["point_x"].values.astype(
float) - x_center) / (screen_width / 2)
y_train_y = (df_train["point_y"].values.astype(
float) - y_center) / (screen_height / 2)

# ensure laterality
if df_train["left_iris_x"].mean() < df_train["right_iris_x"].mean():
Expand Down Expand Up @@ -326,7 +328,7 @@ def predict_new_data_simple(
# ============================
# LOAD PREDICT
# ============================
df_pred = pd.read_csv(predict_csv_path)
df_pred = predict_df.copy()

if df_pred["left_iris_x"].mean() < df_pred["right_iris_x"].mean():
df_pred["left_iris_x"], df_pred["right_iris_x"] = (
Expand Down Expand Up @@ -369,7 +371,7 @@ def predict_new_data_simple(

# remove bias vertical
y_pred_y = y_pred_y - np.mean(y_pred_y)

y_pred_y = y_pred_y * Y_GAIN

# ============================
Expand All @@ -379,8 +381,10 @@ def predict_new_data_simple(

for i in range(len(y_pred_x)):
# baseline dinâmico
ref_mean_x = BASELINE_ALPHA * mean_px[i] + (1 - BASELINE_ALPHA) * ref_mean_x
ref_mean_y = BASELINE_ALPHA * mean_py[i] + (1 - BASELINE_ALPHA) * ref_mean_y
ref_mean_x = BASELINE_ALPHA * \
mean_px[i] + (1 - BASELINE_ALPHA) * ref_mean_x
ref_mean_y = BASELINE_ALPHA * \
mean_py[i] + (1 - BASELINE_ALPHA) * ref_mean_y

# squash não-linear
sx = squash(y_pred_x[i], SQUASH_LIMIT_X)
Expand All @@ -400,14 +404,6 @@ def predict_new_data_simple(
# ============================
# LOGS
# ============================
print("====== MODEL DEBUG ======")
print(f"y_pred_x: {np.min(y_pred_x):.3f} → {np.max(y_pred_x):.3f}")
print(f"y_pred_y: {np.min(y_pred_y):.3f} → {np.max(y_pred_y):.3f}")
print("=========================")

print("====== PIXEL SAMPLE ======")
for p in predictions[:15]:
print(f"x: {p['predicted_x']:.1f}, y: {p['predicted_y']:.1f}")

return predictions

Expand Down