diff --git a/app/main.py b/app/main.py index 0635c724..833d5264 100644 --- a/app/main.py +++ b/app/main.py @@ -59,6 +59,8 @@ def health_check(): return jsonify({'status': 'ok'}), 200 # Route for validating calibration + + @app.route("/api/session/calib_validation", methods=["POST"]) def calib_validation(): """ @@ -72,8 +74,16 @@ def calib_validation(): return session_route.calib_results() return Response('Invalid request method for route', status=405, mimetype='application/json') + @app.route('/api/session/batch_predict', methods=['POST']) def batch_predict(): - if request.method == 'POST': - return session_route.batch_predict() - return Response('Invalid request method for route', status=405, mimetype='application/json') + return session_route.batch_predict() + + +@app.route("/health", methods=["GET"]) +def health(): + return jsonify({"status": "ok"}), 200 + + +if __name__ == "__main__": + app.run(debug=True, host="127.0.0.1", port=5001) diff --git a/app/routes/session.py b/app/routes/session.py index 1db18593..8036ead7 100644 --- a/app/routes/session.py +++ b/app/routes/session.py @@ -7,6 +7,7 @@ import math import numpy as np + from pathlib import Path import os import pandas as pd @@ -35,10 +36,10 @@ def convert_nan_to_none(obj): """ Recursively converts NaN and Inf values to None for proper JSON serialization. - + Args: obj: Python object (dict, list, float, etc.) - + Returns: The object with NaN/Inf values converted to None """ @@ -57,8 +58,6 @@ def convert_nan_to_none(obj): return obj - - def calib_results(): from_ruxailab = json.loads(request.form['from_ruxailab']) file_name = json.loads(request.form['file_name']) @@ -110,7 +109,8 @@ def calib_results(): f"{Path().absolute()}/app/services/calib_validation/csv/data/", exist_ok=True ) predict_csv_file = f"{Path().absolute()}/app/services/calib_validation/csv/data/{file_name}_predict_train_data.csv" - csv_columns = ["left_iris_x", "left_iris_y", "right_iris_x", "right_iris_y"] + csv_columns = ["left_iris_x", "left_iris_y", + "right_iris_x", "right_iris_y"] try: with open(predict_csv_file, "w") as csvfile: writer = csv.DictWriter(csvfile, fieldnames=csv_columns) @@ -147,6 +147,7 @@ def calib_results(): data = convert_nan_to_none(data) return Response(json.dumps(data), status=200, mimetype='application/json') + def batch_predict(): try: data = request.get_json() @@ -162,28 +163,13 @@ def batch_predict(): base_path = Path().absolute() / "app/services/calib_validation/csv/data" calib_csv_path = base_path / f"{calib_id}_fixed_train_data.csv" - predict_csv_path = base_path / "temp_batch_predict.csv" - # CSV temporário - with open(predict_csv_path, "w", newline="") as csvfile: - writer = csv.DictWriter(csvfile, fieldnames=[ - "left_iris_x", "left_iris_y", "right_iris_x", "right_iris_y" - ]) - writer.writeheader() - for item in iris_data: - writer.writerow({ - "left_iris_x": item["left_iris_x"], - "left_iris_y": item["left_iris_y"], - "right_iris_x": item["right_iris_x"], - "right_iris_y": item["right_iris_y"], - }) + df_predict = pd.DataFrame(iris_data) result = gaze_tracker.predict_new_data_simple( calib_csv_path=calib_csv_path, - predict_csv_path=predict_csv_path, + predict_df=df_predict, iris_data=iris_data, - # model_X="Random Forest Regressor", - # model_Y="Random Forest Regressor", screen_width=screen_width, screen_height=screen_height, ) @@ -191,6 +177,6 @@ def batch_predict(): return jsonify(convert_nan_to_none(result)) except Exception as e: - print("Erro batch_predict:", e) + traceback.print_exc() - return Response("Erro interno", status=500) \ No newline at end of file + return Response("Erro interno", status=500) diff --git a/app/services/gaze_tracker.py b/app/services/gaze_tracker.py index d62fc8ea..435a0fef 100644 --- a/app/services/gaze_tracker.py +++ b/app/services/gaze_tracker.py @@ -1,47 +1,47 @@ # Necessary imports +from app.services.config import hyperparameters +from app.services.metrics import ( + func_precision_x, + func_presicion_y, + func_accuracy_x, + func_accuracy_y, + func_total_accuracy, +) +from sklearn.metrics import ( + mean_squared_error, + mean_absolute_error, + mean_squared_log_error, + r2_score, +) +from sklearn.metrics import make_scorer +import matplotlib.pyplot as plt +from sklearn.model_selection import GroupShuffleSplit +from sklearn.model_selection import GridSearchCV +from sklearn.cluster import KMeans +from sklearn.svm import SVR +from sklearn import linear_model +from sklearn.linear_model import Ridge +from sklearn.ensemble import RandomForestRegressor +from sklearn.pipeline import make_pipeline +from sklearn.preprocessing import StandardScaler, PolynomialFeatures +from sklearn.model_selection import train_test_split +from pathlib import Path +import pandas as pd +import numpy as np import math import warnings warnings.filterwarnings("ignore") -import numpy as np -import pandas as pd -from pathlib import Path # Scikit-learn imports -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import StandardScaler, PolynomialFeatures -from sklearn.pipeline import make_pipeline -from sklearn.ensemble import RandomForestRegressor -from sklearn.linear_model import Ridge # Model imports -from sklearn import linear_model -from sklearn.svm import SVR -from sklearn.cluster import KMeans -from sklearn.model_selection import GridSearchCV -from sklearn.model_selection import GroupShuffleSplit -import matplotlib.pyplot as plt # Metrics imports -from sklearn.metrics import make_scorer -from sklearn.metrics import ( - mean_squared_error, - mean_absolute_error, - mean_squared_log_error, - r2_score, -) # Local imports -from app.services.metrics import ( - func_precision_x, - func_presicion_y, - func_accuracy_x, - func_accuracy_y, - func_total_accuracy, -) -from app.services.config import hyperparameters # Machine learning models to use @@ -62,13 +62,13 @@ PolynomialFeatures(2), SVR(kernel="linear") ), "Random Forest Regressor": make_pipeline( - RandomForestRegressor( - n_estimators=200, - max_depth=10, - min_samples_split=5, - random_state=42 - ) -)} + RandomForestRegressor( + n_estimators=200, + max_depth=10, + min_samples_split=5, + random_state=42 + ) + )} # Set the scoring metrics for GridSearchCV to r2_score and mean_absolute_error scoring = { @@ -81,6 +81,7 @@ def squash(v, limit=1.0): """Squash não-linear estilo WebGazer""" return np.tanh(v / limit) + def trian_and_predict(model_name, X_train, y_train, X_test, y_test, label): """ Helper to train a model (with or without GridSearchCV) and return predictions. @@ -89,7 +90,6 @@ def trian_and_predict(model_name, X_train, y_train, X_test, y_test, label): model = models[model_name] model.fit(X_train, y_train) y_pred = model.predict(X_test) - print(f"Score {label}: {r2_score(y_test, y_pred)}") return y_pred else: pipeline = models[model_name] @@ -111,39 +111,29 @@ def trian_and_predict(model_name, X_train, y_train, X_test, y_test, label): def predict(data, k, model_X, model_Y): """ Predicts the gaze coordinates using machine learning models. - - Args: - - data (str): The path to the CSV file containing the training data. - - k (int): The number of clusters for KMeans clustering. - - model_X: The machine learning model to use for prediction on the X coordinate. - - model_Y: The machine learning model to use for prediction on the Y coordinate. - - Returns: - dict: A dictionary containing the predicted gaze coordinates, precision, accuracy, and cluster centroids. """ - # Load data from csv file and drop unnecessary columns df = pd.read_csv(data) df = df.drop(["screen_height", "screen_width"], axis=1) - print(df.head()) + # Create groups (point_x, point_y) df["group"] = list(zip(df["point_x"], df["point_y"])) # Data for X axis X_x = df[["left_iris_x", "right_iris_x"]] X_y = df["point_x"] - # groups = df["group"] + # Data for Y axis X_feature_y = df[["left_iris_y", "right_iris_y"]] y_y = df["point_y"] - # Split data into training and testing sets then Normalize data using standard scaler + ( X_train_x, X_test_x, y_train_x, y_test_x, X_train_y, X_test_y, y_train_y, y_test_y - )= train_test_split( + ) = train_test_split( X_x, X_y, X_feature_y, @@ -151,92 +141,81 @@ def predict(data, k, model_X, model_Y): test_size=0.2, random_state=42, ) - - # Scaling (fit on train only) + + # Scaling X scaler_x = StandardScaler() X_train_x = scaler_x.fit_transform(X_train_x) - X_test_x = scaler_x.transform(X_test_x) - - y_pred_x = trian_and_predict(model_X, X_train_x, y_train_x, X_test_x, y_test_x, "X") - - # Scaling (fit on train only) + X_test_x = scaler_x.transform(X_test_x) + + y_pred_x = trian_and_predict( + model_X, X_train_x, y_train_x, X_test_x, y_test_x, "X" + ) + + # Scaling Y scaler_y = StandardScaler() X_train_y = scaler_y.fit_transform(X_train_y) - X_test_y = scaler_y.transform(X_test_y) + X_test_y = scaler_y.transform(X_test_y) + + y_pred_y = trian_and_predict( + model_Y, X_train_y, y_train_y, X_test_y, y_test_y, "Y" + ) - - y_pred_y = trian_and_predict(model_Y, X_train_y, y_train_y, X_test_y, y_test_y, "Y") - - # Convert the predictions to a numpy array and apply KMeans clustering - data = np.array([y_pred_x, y_pred_y]).T + # KMeans clustering + pred_data = np.array([y_pred_x, y_pred_y]).T model = KMeans(n_clusters=k, n_init="auto", init="k-means++") - y_kmeans = model.fit_predict(data) + y_kmeans = model.fit_predict(pred_data) - # Create a dataframe with the truth and predicted values - data = { + # Create dataframe + df_data = pd.DataFrame({ "True X": y_test_x, "Predicted X": y_pred_x, "True Y": y_test_y, "Predicted Y": y_pred_y, - } - df_data = pd.DataFrame(data) + }) + df_data["True XY"] = list(zip(df_data["True X"], df_data["True Y"])) - - # Filter out negative values - df_data = df_data[(df_data["Predicted X"] >= 0) & (df_data["Predicted Y"] >= 0)] - # Calculate the precision and accuracy for each + # Filter negative values + df_data = df_data[ + (df_data["Predicted X"] >= 0) & + (df_data["Predicted Y"] >= 0) + ] + + # Metrics precision_x = df_data.groupby("True XY").apply(func_precision_x) precision_y = df_data.groupby("True XY").apply(func_presicion_y) - - # Calculate the average precision precision_xy = (precision_x + precision_y) / 2 - - # Calculate the average accuracy (eculidian distance) accuracy_xy = df_data.groupby("True XY").apply(func_total_accuracy) - - - # Create a dictionary to store the data - data = {} - - # Iterate over the dataframe and store the data - for index, row in df_data.iterrows(): - - # Get the outer and inner keys - outer_key = str(row["True X"]).split(".")[0] - inner_key = str(row["True Y"]).split(".")[0] - - # If the outer key is not in the dictionary, add it - if outer_key not in data: - data[outer_key] = {} - - # Add the data to the dictionary - data[outer_key][inner_key] = { - "predicted_x": df_data[ - (df_data["True X"] == row["True X"]) - & (df_data["True Y"] == row["True Y"]) - ]["Predicted X"].values.tolist(), - "predicted_y": df_data[ - (df_data["True X"] == row["True X"]) - & (df_data["True Y"] == row["True Y"]) - ]["Predicted Y"].values.tolist(), - "PrecisionSD": precision_xy[(row["True X"], row["True Y"])], - "Accuracy": accuracy_xy[(row["True X"], row["True Y"])], + + result = {} + + grouped = df_data.groupby(["True X", "True Y"]) + + for (true_x, true_y), group in grouped: + outer_key = str(true_x).split(".")[0] + inner_key = str(true_y).split(".")[0] + + if outer_key not in result: + result[outer_key] = {} + + result[outer_key][inner_key] = { + "predicted_x": group["Predicted X"].tolist(), + "predicted_y": group["Predicted Y"].tolist(), + "PrecisionSD": precision_xy[(true_x, true_y)], + "Accuracy": accuracy_xy[(true_x, true_y)], } - # Centroids of the clusters - data["centroids"] = model.cluster_centers_.tolist() + result["centroids"] = model.cluster_centers_.tolist() - # Return the data - return data + return result def predict_new_data_simple( calib_csv_path, - predict_csv_path, + predict_df, iris_data, - screen_width=None, - screen_height=None, + screen_width, + screen_height, ): # ============================ # CONFIG (WebGazer-inspired) @@ -255,8 +234,10 @@ def predict_new_data_simple( y_center = screen_height / 2 # normalize targets to [-1, 1] space - y_train_x = (df_train["point_x"].values.astype(float) - x_center) / (screen_width / 2) - y_train_y = (df_train["point_y"].values.astype(float) - y_center) / (screen_height / 2) + y_train_x = (df_train["point_x"].values.astype( + float) - x_center) / (screen_width / 2) + y_train_y = (df_train["point_y"].values.astype( + float) - y_center) / (screen_height / 2) # ensure laterality if df_train["left_iris_x"].mean() < df_train["right_iris_x"].mean(): @@ -326,7 +307,7 @@ def predict_new_data_simple( # ============================ # LOAD PREDICT # ============================ - df_pred = pd.read_csv(predict_csv_path) + df_pred = predict_df.copy() if df_pred["left_iris_x"].mean() < df_pred["right_iris_x"].mean(): df_pred["left_iris_x"], df_pred["right_iris_x"] = ( @@ -369,7 +350,7 @@ def predict_new_data_simple( # remove bias vertical y_pred_y = y_pred_y - np.mean(y_pred_y) - + y_pred_y = y_pred_y * Y_GAIN # ============================ @@ -379,8 +360,10 @@ def predict_new_data_simple( for i in range(len(y_pred_x)): # baseline dinâmico - ref_mean_x = BASELINE_ALPHA * mean_px[i] + (1 - BASELINE_ALPHA) * ref_mean_x - ref_mean_y = BASELINE_ALPHA * mean_py[i] + (1 - BASELINE_ALPHA) * ref_mean_y + ref_mean_x = BASELINE_ALPHA * \ + mean_px[i] + (1 - BASELINE_ALPHA) * ref_mean_x + ref_mean_y = BASELINE_ALPHA * \ + mean_py[i] + (1 - BASELINE_ALPHA) * ref_mean_y # squash não-linear sx = squash(y_pred_x[i], SQUASH_LIMIT_X) @@ -400,14 +383,6 @@ def predict_new_data_simple( # ============================ # LOGS # ============================ - print("====== MODEL DEBUG ======") - print(f"y_pred_x: {np.min(y_pred_x):.3f} → {np.max(y_pred_x):.3f}") - print(f"y_pred_y: {np.min(y_pred_y):.3f} → {np.max(y_pred_y):.3f}") - print("=========================") - - print("====== PIXEL SAMPLE ======") - for p in predictions[:15]: - print(f"x: {p['predicted_x']:.1f}, y: {p['predicted_y']:.1f}") return predictions