diff --git a/app/main.py b/app/main.py index 0635c72..a5cb30e 100644 --- a/app/main.py +++ b/app/main.py @@ -1,5 +1,6 @@ from flask import Flask, request, Response, jsonify from flask_cors import CORS +import numpy as np # Local imports from app from app.routes import session as session_route @@ -77,3 +78,36 @@ def batch_predict(): if request.method == 'POST': return session_route.batch_predict() return Response('Invalid request method for route', status=405, mimetype='application/json') + +from app.services.quality_monitor import quality_monitor_instance +import logging + +log = logging.getLogger('werkzeug') +log.setLevel(logging.ERROR) + +realtime_request_count = 0 + +@app.route('/api/realtime-validation', methods=['POST', 'OPTIONS']) +def realtime_validation(): + global realtime_request_count + + if request.method == 'OPTIONS': + return '', 200 + try: + data = request.get_json() + prediction = data.get('prediction') + + # Increment count to throttle logs from being spammed on terminal + realtime_request_count += 1 + should_print = (realtime_request_count % 30 == 0) + + # Delegate the actual processing logic and state management to the service + result = quality_monitor_instance.process_prediction(prediction, should_print=should_print) + + return jsonify(result) + + except Exception as e: + import traceback + print(f"Server Error: {e}") + traceback.print_exc() + return jsonify({'error': str(e)}), 500 diff --git a/app/services/gaze_tracker.py b/app/services/gaze_tracker.py index d62fc8e..bcdbf50 100644 --- a/app/services/gaze_tracker.py +++ b/app/services/gaze_tracker.py @@ -76,12 +76,11 @@ "mae": make_scorer(mean_absolute_error), } - def squash(v, limit=1.0): """Squash não-linear estilo WebGazer""" return np.tanh(v / limit) -def trian_and_predict(model_name, X_train, y_train, X_test, y_test, label): +def train_and_predict(model_name, X_train, y_train, X_test, y_test, label): """ Helper to train a model (with or without GridSearchCV) and return predictions. """ @@ -157,7 +156,7 @@ def predict(data, k, model_X, model_Y): X_train_x = scaler_x.fit_transform(X_train_x) X_test_x = scaler_x.transform(X_test_x) - y_pred_x = trian_and_predict(model_X, X_train_x, y_train_x, X_test_x, y_test_x, "X") + y_pred_x = train_and_predict(model_X, X_train_x, y_train_x, X_test_x, y_test_x, "X") # Scaling (fit on train only) scaler_y = StandardScaler() @@ -165,7 +164,7 @@ def predict(data, k, model_X, model_Y): X_test_y = scaler_y.transform(X_test_y) - y_pred_y = trian_and_predict(model_Y, X_train_y, y_train_y, X_test_y, y_test_y, "Y") + y_pred_y = train_and_predict(model_Y, X_train_y, y_train_y, X_test_y, y_test_y, "Y") # Convert the predictions to a numpy array and apply KMeans clustering data = np.array([y_pred_x, y_pred_y]).T @@ -195,7 +194,6 @@ def predict(data, k, model_X, model_Y): # Calculate the average accuracy (eculidian distance) accuracy_xy = df_data.groupby("True XY").apply(func_total_accuracy) - # Create a dictionary to store the data data = {} @@ -230,7 +228,6 @@ def predict(data, k, model_X, model_Y): # Return the data return data - def predict_new_data_simple( calib_csv_path, predict_csv_path, @@ -372,10 +369,13 @@ def predict_new_data_simple( y_pred_y = y_pred_y * Y_GAIN + # ============================ # PREDICTION LOOP (WebGazer) # ============================ predictions = [] + window_size = 30 + for i in range(len(y_pred_x)): # baseline dinâmico diff --git a/app/services/quality_monitor.py b/app/services/quality_monitor.py new file mode 100644 index 0000000..cb49981 --- /dev/null +++ b/app/services/quality_monitor.py @@ -0,0 +1,81 @@ +import numpy as np +import math + +class QualityMonitor: + def __init__(self, window_size=50, precision_cutoff_degrees=1.0, screen_dpi=110, user_distance_mm=600): + self.window_size = window_size + self.precision_cutoff_degrees = precision_cutoff_degrees + self.screen_dpi = screen_dpi + self.user_distance_mm = user_distance_mm + self.x_center_coords = [] + self.y_center_coords = [] + + def pixels_to_degrees(self, pixels, ppi=96.0, distance_cm=60.0): + # pixels in one centimeter calculation + pixels_per_cm = ppi / 2.54 + + # Convert the pixel distance to physical size in centimeters + size_cm = pixels / pixels_per_cm + + # Visual angle calculation + angle_rad = 2 * np.arctan(size_cm / (2 * distance_cm)) + angle_deg = np.degrees(angle_rad) + + return angle_deg + + def calculate_rms_s2s_precision(self, x_coords, y_coords): + if len(x_coords) < 2: + return 0.0 + + x = np.array(x_coords) + y = np.array(y_coords) + + # Calculate dfferences between consecutive points + dx = np.diff(x) + dy = np.diff(y) + + # Calculate squared distances between consecutive samples + distances = np.sqrt(dx**2 + dy**2) + + # Root Mean Square calculation + rms_s2s_px = np.sqrt(np.mean(distances**2)) + return rms_s2s_px + + def process_prediction(self, prediction, should_print=True): + + if not prediction or len(prediction) == 0: + return {"status": "no_prediction"} + + face = prediction[0] + annotations = face.get("annotations", {}) + right_iris = annotations.get("rightEyeIris", []) + + if not right_iris: + return {"status": "no_prediction"} + + right_iris_center = right_iris[0] + x_center = right_iris_center[0] + y_center = right_iris_center[1] + + self.x_center_coords.append(x_center) + self.y_center_coords.append(y_center) + + # Keep only the last N points + if len(self.x_center_coords) > self.window_size: + self.x_center_coords.pop(0) + self.y_center_coords.pop(0) + + # RMS-S2S metric calculation + precision_px = self.calculate_rms_s2s_precision(self.x_center_coords, self.y_center_coords) + precision_deg = self.pixels_to_degrees(precision_px) + + status = "good" if precision_deg < self.precision_cutoff_degrees else "poor" + points_collected = len(self.x_center_coords) + + if should_print: + print(f"Coordinates: X={x_center:.2f}, Y={y_center:.2f} | Precision: {precision_deg:.2f}° (RMS-S2S) | Status: {status}") + + return {"status": status, "precision_degrees": float(precision_deg), "precision_pixels": float(precision_px), "points_collected": points_collected} + +# Initialize with standard 0.5 degrees cutoff +quality_monitor_instance = QualityMonitor(window_size=50, precision_cutoff_degrees=0.5)