From 9e13df93ecde0ce9f2aa74aa87f1e9fc7e84a495 Mon Sep 17 00:00:00 2001 From: Sitam Meur <103279526+sitamgithub-MSIT@users.noreply.github.com> Date: Sun, 25 Aug 2024 01:21:24 +0000 Subject: [PATCH] gaze_tracker file predict function updated --- app/services/gaze_tracker.py | 326 ++++++++++++++++++++++++----------- 1 file changed, 226 insertions(+), 100 deletions(-) diff --git a/app/services/gaze_tracker.py b/app/services/gaze_tracker.py index 5cbffe2..b2506c9 100644 --- a/app/services/gaze_tracker.py +++ b/app/services/gaze_tracker.py @@ -1,114 +1,238 @@ -from sklearn.metrics import mean_squared_error, mean_absolute_error, mean_squared_log_error, r2_score -from sklearn.model_selection import train_test_split -from sklearn.preprocessing import StandardScaler, PolynomialFeatures -from sklearn.pipeline import make_pipeline -from sklearn.cluster import KMeans -from sklearn import linear_model -from pathlib import Path -import pandas as pd -import numpy as np - +# Necessary imports +import warnings -def predict(data, test_data, k): - - df = pd.read_csv(data) - df = df.drop(['screen_height', 'screen_width'], axis=1) +warnings.filterwarnings("ignore") - df_test = pd.read_csv(test_data) - df_test = df_test.drop(['screen_height', 'screen_width'], axis=1) - - X_train_x = df[['left_iris_x', 'right_iris_x']] - y_train_x = df['point_x'] - - sc = StandardScaler() - X_train_x = sc.fit_transform(X_train_x) - - X_test_x = df_test[['left_iris_x', 'right_iris_x']] - y_test_x = df_test['point_x'] - - sc = StandardScaler() - X_test_x = sc.fit_transform(X_test_x) - - model = make_pipeline(PolynomialFeatures( - 2), linear_model.LinearRegression()) - model.fit(X_train_x, y_train_x) - y_pred_x = model.predict(X_test_x) - - X_train_y = df[['left_iris_y', 'right_iris_y']] - y_train_y = df['point_y'] - - sc = StandardScaler() - X_train_y = sc.fit_transform(X_train_y) +import numpy as np +import pandas as pd +from pathlib import Path - X_test_y = df_test[['left_iris_y', 'right_iris_y']] - y_test_y = df_test['point_y'] +# Scikit-learn imports +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler, PolynomialFeatures +from sklearn.pipeline import make_pipeline +# Model imports +from sklearn import linear_model +from sklearn.svm import SVR +from sklearn.cluster import KMeans +from sklearn.model_selection import GridSearchCV + +# Metrics imports +from sklearn.metrics import make_scorer +from sklearn.metrics import ( + mean_squared_error, + mean_absolute_error, + mean_squared_log_error, + r2_score, +) + +# Local imports +from app.services.metrics import ( + func_precision_x, + func_presicion_y, + func_accuracy_x, + func_accuracy_y, +) +from app.services.config import hyperparameters + + +# Machine learning models to use +models = { + "Linear Regression": make_pipeline( + PolynomialFeatures(2), linear_model.LinearRegression() + ), + "Ridge Regression": make_pipeline(PolynomialFeatures(2), linear_model.Ridge()), + "Lasso Regression": make_pipeline(PolynomialFeatures(2), linear_model.Lasso()), + "Elastic Net": make_pipeline( + PolynomialFeatures(2), linear_model.ElasticNet(alpha=1.0, l1_ratio=0.5) + ), + "Bayesian Ridge": make_pipeline( + PolynomialFeatures(2), linear_model.BayesianRidge() + ), + "SGD Regressor": make_pipeline(PolynomialFeatures(2), linear_model.SGDRegressor()), + "Support Vector Regressor": make_pipeline( + PolynomialFeatures(2), SVR(kernel="linear") + ), +} + +# Set the scoring metrics for GridSearchCV to r2_score and mean_absolute_error +scoring = { + "r2": make_scorer(r2_score), + "mae": make_scorer(mean_absolute_error), +} + + +def predict(data, k, model_X, model_Y): + """ + Predicts the gaze coordinates using machine learning models. + + Args: + - data (str): The path to the CSV file containing the training data. + - k (int): The number of clusters for KMeans clustering. + - model_X: The machine learning model to use for prediction on the X coordinate. + - model_Y: The machine learning model to use for prediction on the Y coordinate. + + Returns: + dict: A dictionary containing the predicted gaze coordinates, precision, accuracy, and cluster centroids. + """ + # Inicialize standard scaler sc = StandardScaler() - X_test_y = sc.fit_transform(X_test_y) - - model = make_pipeline(PolynomialFeatures( - 2), linear_model.LinearRegression()) - model.fit(X_train_y, y_train_y) - y_pred_y = model.predict(X_test_y) + # Load data from csv file and drop unnecessary columns + df = pd.read_csv(data) + df = df.drop(["screen_height", "screen_width"], axis=1) + + # Data for X axis + X_x = df[["left_iris_x", "right_iris_x"]] + X_y = df["point_x"] + + # Normalize data using standard scaler and split data into training and testing sets + X_x = sc.fit_transform(X_x) + X_train_x, X_test_x, y_train_x, y_test_x = train_test_split( + X_x, X_y, test_size=0.2, random_state=42 + ) + + if ( + model_X == "Linear Regression" + or model_X == "Elastic Net" + or model_X == "Support Vector Regressor" + ): + model = models[model_X] + + # Fit the model and make predictions + model.fit(X_train_x, y_train_x) + y_pred_x = model.predict(X_test_x) + + else: + pipeline = models[model_X] + param_grid = hyperparameters[model_X]["param_grid"] + + # Initialize GridSearchCV with the pipeline and parameter grid + grid_search = GridSearchCV( + pipeline, + param_grid, + cv=5, + scoring=scoring, + refit="r2", + return_train_score=True, + ) + + # Fit the GridSearchCV to the training data for X + grid_search.fit(X_train_x, y_train_x) + + # Use the best estimator to predict the values and calculate the R2 score + best_model_x = grid_search.best_estimator_ + y_pred_x = best_model_x.predict(X_test_x) + + # Data for Y axis + X_y = df[["left_iris_y", "right_iris_y"]] + y_y = df["point_y"] + + # Normalize data using standard scaler and split data into training and testing sets + X_y = sc.fit_transform(X_y) + X_train_y, X_test_y, y_train_y, y_test_y = train_test_split( + X_y, y_y, test_size=0.2, random_state=42 + ) + + if ( + model_Y == "Linear Regression" + or model_Y == "Elastic Net" + or model_Y == "Support Vector Regressor" + ): + model = models[model_Y] + + # Fit the model and make predictions + model.fit(X_train_y, y_train_y) + y_pred_y = model.predict(X_test_y) + + else: + pipeline = models[model_Y] + param_grid = hyperparameters[model_Y]["param_grid"] + + # Initialize GridSearchCV with the pipeline and parameter grid + grid_search = GridSearchCV( + pipeline, + param_grid, + cv=5, + scoring=scoring, + refit="r2", + return_train_score=True, + ) + + # Fit the GridSearchCV to the training data for X + grid_search.fit(X_train_y, y_train_y) + + # Use the best estimator to predict the values and calculate the R2 score + best_model_y = grid_search.best_estimator_ + y_pred_y = best_model_y.predict(X_test_y) + + # Convert the predictions to a numpy array and apply KMeans clustering data = np.array([y_pred_x, y_pred_y]).T - model = KMeans(n_clusters=k, n_init='auto', init='k-means++') + model = KMeans(n_clusters=k, n_init="auto", init="k-means++") y_kmeans = model.fit_predict(data) - data = {'True X': y_test_x, 'Predicted X': y_pred_x, - 'True Y': y_test_y, 'Predicted Y': y_pred_y} - + # Create a dataframe with the truth and predicted values + data = { + "True X": y_test_x, + "Predicted X": y_pred_x, + "True Y": y_test_y, + "Predicted Y": y_pred_y, + } df_data = pd.DataFrame(data) - df_data['True XY'] = list(zip(df_data['True X'], df_data['True Y'])) - - # remove unwanted data - df_data = df_data[(df_data['Predicted X'] >= 0) & - (df_data['Predicted Y'] >= 0)] - + df_data["True XY"] = list(zip(df_data["True X"], df_data["True Y"])) - def func_precision_x(group): return np.sqrt( - np.sum(np.square([group['Predicted X'], group['True X']]))) + # Filter out negative values + df_data = df_data[(df_data["Predicted X"] >= 0) & (df_data["Predicted Y"] >= 0)] - def func_presicion_y(group): return np.sqrt( - np.sum(np.square([group['Predicted Y'], group['True Y']]))) - - precision_x = df_data.groupby('True XY').apply(func_precision_x) - precision_y = df_data.groupby('True XY').apply(func_presicion_y) + # Calculate the precision and accuracy for each + precision_x = df_data.groupby("True XY").apply(func_precision_x) + precision_y = df_data.groupby("True XY").apply(func_presicion_y) + # Calculate the average precision and accuracy precision_xy = (precision_x + precision_y) / 2 precision_xy = precision_xy / np.mean(precision_xy) - def func_accuracy_x(group): return np.sqrt( - np.sum(np.square([group['True X'] - group['Predicted X']]))) - - def func_accuracy_y(group): return np.sqrt( - np.sum(np.square([group['True Y'] - group['Predicted Y']]))) - - accuracy_x = df_data.groupby('True XY').apply(func_accuracy_x) - accuracy_y = df_data.groupby('True XY').apply(func_accuracy_y) + # Calculate the accuracy for each axis + accuracy_x = df_data.groupby("True XY").apply(func_accuracy_x) + accuracy_y = df_data.groupby("True XY").apply(func_accuracy_y) + # Calculate the average accuracy accuracy_xy = (accuracy_x + accuracy_y) / 2 accuracy_xy = accuracy_xy / np.mean(accuracy_xy) + # Create a dictionary to store the data data = {} + # Iterate over the dataframe and store the data for index, row in df_data.iterrows(): - outer_key = str(row['True X']).split('.')[0] - inner_key = str(row['True Y']).split('.')[0] + # Get the outer and inner keys + outer_key = str(row["True X"]).split(".")[0] + inner_key = str(row["True Y"]).split(".")[0] + # If the outer key is not in the dictionary, add it if outer_key not in data: data[outer_key] = {} + # Add the data to the dictionary data[outer_key][inner_key] = { - 'predicted_x': df_data[(df_data['True X'] == row['True X']) & (df_data['True Y'] == row['True Y'])]['Predicted X'].values.tolist(), - 'predicted_y': df_data[(df_data['True X'] == row['True X']) & (df_data['True Y'] == row['True Y'])]['Predicted Y'].values.tolist(), - 'PrecisionSD': precision_xy[(row['True X'], row['True Y'])], - 'Accuracy': accuracy_xy[(row['True X'], row['True Y'])] + "predicted_x": df_data[ + (df_data["True X"] == row["True X"]) + & (df_data["True Y"] == row["True Y"]) + ]["Predicted X"].values.tolist(), + "predicted_y": df_data[ + (df_data["True X"] == row["True X"]) + & (df_data["True Y"] == row["True Y"]) + ]["Predicted Y"].values.tolist(), + "PrecisionSD": precision_xy[(row["True X"], row["True Y"])], + "Accuracy": accuracy_xy[(row["True X"], row["True Y"])], } - data['centroids'] = model.cluster_centers_.tolist() + # Centroids of the clusters + data["centroids"] = model.cluster_centers_.tolist() + # Return the data return data @@ -124,8 +248,8 @@ def train_to_validate_calib(calib_csv_file, predict_csv_file): # data['point_y'] = np.log(data['point_y']) # Separe os recursos (X) e os rótulos (y) - X = data[['left_iris_x', 'left_iris_y', 'right_iris_x', 'right_iris_y']] - y = data[['point_x', 'point_y']] + X = data[["left_iris_x", "left_iris_y", "right_iris_x", "right_iris_y"]] + y = data[["point_x", "point_y"]] # Crie e ajuste um modelo de regressão linear model = linear_model.LinearRegression() @@ -148,8 +272,12 @@ def train_to_validate_calib(calib_csv_file, predict_csv_file): def train_model(session_id): # Download dataset - dataset_train_path = f'{Path().absolute()}/public/training/{session_id}/train_data.csv' - dataset_session_path = f'{Path().absolute()}/public/sessions/{session_id}/session_data.csv' + dataset_train_path = ( + f"{Path().absolute()}/public/training/{session_id}/train_data.csv" + ) + dataset_session_path = ( + f"{Path().absolute()}/public/sessions/{session_id}/session_data.csv" + ) # Importing data from csv raw_dataset = pd.read_csv(dataset_train_path) @@ -159,10 +287,10 @@ def train_model(session_id): train_stats = train_stats.transpose() dataset_t = raw_dataset - dataset_s = session_dataset.drop(['timestamp'], axis=1) + dataset_s = session_dataset.drop(["timestamp"], axis=1) # Drop the columns that will be predicted - X = dataset_t.drop(['timestamp', 'mouse_x', 'mouse_y'], axis=1) + X = dataset_t.drop(["timestamp", "mouse_x", "mouse_y"], axis=1) Y1 = dataset_t.mouse_x Y2 = dataset_t.mouse_y @@ -182,7 +310,7 @@ def train_model(session_id): def model_for_mouse_x(X, Y1): - print('-----------------MODEL FOR X------------------') + print("-----------------MODEL FOR X------------------") # split dataset into train and test sets (80/20 where 20 is for test) X_train, X_test, Y1_train, Y1_test = train_test_split(X, Y1, test_size=0.2) @@ -195,13 +323,12 @@ def model_for_mouse_x(X, Y1): Y1_test = normalizeData(Y1_test) Y1_pred_test = normalizeData(Y1_pred_test) + print(f"Mean absolute error MAE = {mean_absolute_error(Y1_test, Y1_pred_test)}") + print(f"Mean squared error MSE = {mean_squared_error(Y1_test, Y1_pred_test)}") print( - f'Mean absolute error MAE = {mean_absolute_error(Y1_test, Y1_pred_test)}') - print( - f'Mean squared error MSE = {mean_squared_error(Y1_test, Y1_pred_test)}') - print( - f'Mean squared log error MSLE = {mean_squared_log_error(Y1_test, Y1_pred_test)}') - print(f'MODEL X SCORE R2 = {model.score(X, Y1)}') + f"Mean squared log error MSLE = {mean_squared_log_error(Y1_test, Y1_pred_test)}" + ) + print(f"MODEL X SCORE R2 = {model.score(X, Y1)}") # print(f'TRAIN{Y1_pred_train}') # print(f'TEST{Y1_pred_test}') @@ -209,7 +336,7 @@ def model_for_mouse_x(X, Y1): def model_for_mouse_y(X, Y2): - print('-----------------MODEL FOR Y------------------') + print("-----------------MODEL FOR Y------------------") # split dataset into train and test sets (80/20 where 20 is for test) X_train, X_test, Y2_train, Y2_test = train_test_split(X, Y2, test_size=0.2) @@ -222,16 +349,15 @@ def model_for_mouse_y(X, Y2): Y2_test = normalizeData(Y2_test) Y2_pred_test = normalizeData(Y2_pred_test) + print(f"Mean absolute error MAE = {mean_absolute_error(Y2_test, Y2_pred_test)}") + print(f"Mean squared error MSE = {mean_squared_error(Y2_test, Y2_pred_test)}") print( - f'Mean absolute error MAE = {mean_absolute_error(Y2_test, Y2_pred_test)}') - print( - f'Mean squared error MSE = {mean_squared_error(Y2_test, Y2_pred_test)}') - print( - f'Mean squared log error MSLE = {mean_squared_log_error(Y2_test, Y2_pred_test)}') - print(f'MODEL X SCORE R2 = {model.score(X, Y2)}') + f"Mean squared log error MSLE = {mean_squared_log_error(Y2_test, Y2_pred_test)}" + ) + print(f"MODEL X SCORE R2 = {model.score(X, Y2)}") # print(f'TRAIN{Y2_pred_train}') - print(f'TEST{Y2_pred_test}') + print(f"TEST{Y2_pred_test}") return model