From 79dcbef5e24b3bc31b6ce1e8307a3987ecb12db3 Mon Sep 17 00:00:00 2001 From: Mahsa Mehrpoor Date: Thu, 25 Jan 2024 09:48:30 +0000 Subject: [PATCH 1/5] Applied changes on pandas version --- models/ml/prep_encoding_splitting/covariate_encoding.py | 2 +- models/ml/prep_encoding_splitting/ml_data_prep.py | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/models/ml/prep_encoding_splitting/covariate_encoding.py b/models/ml/prep_encoding_splitting/covariate_encoding.py index 3b44892..c42fbde 100644 --- a/models/ml/prep_encoding_splitting/covariate_encoding.py +++ b/models/ml/prep_encoding_splitting/covariate_encoding.py @@ -5,7 +5,7 @@ def model(dbt, session): # dbt configuration - dbt.config(packages=["pandas","numpy","scikit-learn"]) + dbt.config(packages=["pandas==1.5.3","numpy","scikit-learn"]) # get upstream data data = dbt.ref("ml_data_prep").to_pandas() diff --git a/models/ml/prep_encoding_splitting/ml_data_prep.py b/models/ml/prep_encoding_splitting/ml_data_prep.py index 29f2eaf..1f9ef5b 100644 --- a/models/ml/prep_encoding_splitting/ml_data_prep.py +++ b/models/ml/prep_encoding_splitting/ml_data_prep.py @@ -2,7 +2,7 @@ def model(dbt, session): # dbt configuration - dbt.config(packages=["pandas"]) + dbt.config(packages=["pandas==1.5.3"]) # get upstream data fct_results = dbt.ref("mrt_results_circuits").to_pandas() From 56f8ae6d40ee5daae88097fd77e3a28ae5916bda Mon Sep 17 00:00:00 2001 From: Mahsa Mehrpoor Date: Thu, 25 Jan 2024 10:09:30 +0000 Subject: [PATCH 2/5] Created fact and mart for lap_times --- models/core/fct_lab_times.sql | 13 +++++++++++++ models/marts/mrt_lab_times_years.sql | 19 +++++++++++++++++++ 2 files changed, 32 insertions(+) create mode 100644 models/core/fct_lab_times.sql create mode 100644 models/marts/mrt_lab_times_years.sql diff --git a/models/core/fct_lab_times.sql b/models/core/fct_lab_times.sql new file mode 100644 index 0000000..37d62b0 --- /dev/null +++ b/models/core/fct_lab_times.sql @@ -0,0 +1,13 @@ +with lap_times as ( + select + {{ dbt_utils.generate_surrogate_key(['race_id', 'driver_id', 'lap']) }} as lap_times_id, + race_id as race_id, + driver_id as driver_id, + lap as lap, + driver_position as driver_position, + lap_time_formatted as lap_time_formatted, + official_laptime as official_laptime, + lap_time_milliseconds as lap_time_milliseconds + from {{ ref('stg_lap_times') }} +) +select * from lap_times \ No newline at end of file diff --git a/models/marts/mrt_lab_times_years.sql b/models/marts/mrt_lab_times_years.sql new file mode 100644 index 0000000..b35db7e --- /dev/null +++ b/models/marts/mrt_lab_times_years.sql @@ -0,0 +1,19 @@ +with lap_times as ( +select * from {{ ref('fct_lap_times') }} + ), + races as ( + select * from {{ ref('dim_races') }} + ), + expanded_lap_times_by_year as ( + select + lap_times.race_id, + driver_id, + race_year, + lap, + lap_time_milliseconds + from lap_times + left join races + on lap_times.race_id = races.race_id + where lap_time_milliseconds is not null + ) + select * from expanded_lap_times_by_year \ No newline at end of file From ae9cef52cc0ac93387bc887abda8536da8dc0ca5 Mon Sep 17 00:00:00 2001 From: Mahsa Mehrpoor Date: Thu, 25 Jan 2024 10:22:39 +0000 Subject: [PATCH 3/5] moving average on lap times over 5 years --- .../aggregates/agg_lap_times_moving_avg_2.py | 17 +++++++++++++++++ 1 file changed, 17 insertions(+) create mode 100644 models/marts/aggregates/agg_lap_times_moving_avg_2.py diff --git a/models/marts/aggregates/agg_lap_times_moving_avg_2.py b/models/marts/aggregates/agg_lap_times_moving_avg_2.py new file mode 100644 index 0000000..b1dc3ac --- /dev/null +++ b/models/marts/aggregates/agg_lap_times_moving_avg_2.py @@ -0,0 +1,17 @@ +import pandas as pd + +def model(dbt, session): + # dbt configuration + dbt.config(packages=["pandas"]) + + # get upstream data + lap_times = dbt.ref("mrt_lap_times_years").to_pandas() + + # describe the data + lap_times["LAP_TIME_SECONDS"] = lap_times["LAP_TIME_MILLISECONDS"]/1000 + lap_time_trends = lap_times.groupby(by="RACE_YEAR")["LAP_TIME_SECONDS"].mean().to_frame() + lap_time_trends.reset_index(inplace=True) + lap_time_trends["LAP_MOVING_AVG_5_YEARS"] = lap_time_trends["LAP_TIME_SECONDS"].rolling(5).mean() + lap_time_trends.columns = lap_time_trends.columns.str.upper() + + return lap_time_trends.round(1) \ No newline at end of file From 14503e5b97bf0ff95c45870962f4b39124bccab0 Mon Sep 17 00:00:00 2001 From: Mahsa Mehrpoor Date: Thu, 25 Jan 2024 10:36:37 +0000 Subject: [PATCH 4/5] logistic regression --- .../train-model_to_predict_position.py | 68 +++++++++++++++++++ 1 file changed, 68 insertions(+) create mode 100644 models/ml/training_and_prediction_2/train-model_to_predict_position.py diff --git a/models/ml/training_and_prediction_2/train-model_to_predict_position.py b/models/ml/training_and_prediction_2/train-model_to_predict_position.py new file mode 100644 index 0000000..ac5b5a6 --- /dev/null +++ b/models/ml/training_and_prediction_2/train-model_to_predict_position.py @@ -0,0 +1,68 @@ +import snowflake.snowpark.functions as F +from sklearn.model_selection import train_test_split +import pandas as pd +from sklearn.metrics import confusion_matrix, balanced_accuracy_score +import io +from sklearn.linear_model import LogisticRegression +from joblib import dump, load +import joblib +import logging +import sys +from joblib import dump, load + +logger = logging.getLogger("mylog") + +def save_file(session, model, path, dest_filename): + input_stream = io.BytesIO() + joblib.dump(model, input_stream) + session._conn.upload_stream(input_stream, path, dest_filename) + return "successfully created file: " + path + +def model(dbt, session): + dbt.config( + packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools'], + materialized = "table", + tags = "train" + ) + # Create a stage in Snowflake to save our model file + session.sql('create or replace stage MODELSTAGE').collect() + + #session._use_scoped_temp_objects = False + version = "1.0" + logger.info('Model training version: ' + version) + + # read in our training and testing upstream dataset + test_train_df = dbt.ref("training_testing_dataset") + + # cast snowpark df to pandas df + test_train_pd_df = test_train_df.to_pandas() + target_col = "POSITION_LABEL" + + # split out covariate predictors, x, from our target column position_label, y. + split_X = test_train_pd_df.drop([target_col], axis=1) + split_y = test_train_pd_df[target_col] + + # Split out our training and test data into proportions + X_train, X_test, y_train, y_test = train_test_split(split_X, split_y, train_size=0.7, random_state=42) + train = [X_train, y_train] + test = [X_test, y_test] + # now we are only training our one model to deploy + # we are keeping the focus on the workflows and not algorithms for this lab! + model = LogisticRegression() + + # fit the preprocessing pipeline and the model together + model.fit(X_train, y_train) + y_pred = model.predict_proba(X_test)[:,1] + predictions = [round(value) for value in y_pred] + balanced_accuracy = balanced_accuracy_score(y_test, predictions) + + # Save the model to a stage + save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" ) + logger.info('Model artifact:' + "@MODELSTAGE/driver_position_"+version+".joblib") + + # Take our pandas training and testing dataframes and put them back into snowpark dataframes + snowpark_train_df = session.write_pandas(pd.concat(train, axis=1, join='inner'), "train_table", auto_create_table=True, create_temp_table=True) + snowpark_test_df = session.write_pandas(pd.concat(test, axis=1, join='inner'), "test_table", auto_create_table=True, create_temp_table=True) + + # Union our training and testing data together and add a column indicating train vs test rows + return snowpark_train_df.with_column("DATASET_TYPE", F.lit("train")).union(snowpark_test_df.with_column("DATASET_TYPE", F.lit("test"))) \ No newline at end of file From 0fa1deb5ee5a2367b8d3905a6e2170d3ba15c040 Mon Sep 17 00:00:00 2001 From: Mahsa Mehrpoor Date: Thu, 25 Jan 2024 10:43:55 +0000 Subject: [PATCH 5/5] removed duplicated files --- .../train-model_to_predict_position.py | 68 ------------------- 1 file changed, 68 deletions(-) delete mode 100644 models/ml/training_and_prediction_2/train-model_to_predict_position.py diff --git a/models/ml/training_and_prediction_2/train-model_to_predict_position.py b/models/ml/training_and_prediction_2/train-model_to_predict_position.py deleted file mode 100644 index ac5b5a6..0000000 --- a/models/ml/training_and_prediction_2/train-model_to_predict_position.py +++ /dev/null @@ -1,68 +0,0 @@ -import snowflake.snowpark.functions as F -from sklearn.model_selection import train_test_split -import pandas as pd -from sklearn.metrics import confusion_matrix, balanced_accuracy_score -import io -from sklearn.linear_model import LogisticRegression -from joblib import dump, load -import joblib -import logging -import sys -from joblib import dump, load - -logger = logging.getLogger("mylog") - -def save_file(session, model, path, dest_filename): - input_stream = io.BytesIO() - joblib.dump(model, input_stream) - session._conn.upload_stream(input_stream, path, dest_filename) - return "successfully created file: " + path - -def model(dbt, session): - dbt.config( - packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools'], - materialized = "table", - tags = "train" - ) - # Create a stage in Snowflake to save our model file - session.sql('create or replace stage MODELSTAGE').collect() - - #session._use_scoped_temp_objects = False - version = "1.0" - logger.info('Model training version: ' + version) - - # read in our training and testing upstream dataset - test_train_df = dbt.ref("training_testing_dataset") - - # cast snowpark df to pandas df - test_train_pd_df = test_train_df.to_pandas() - target_col = "POSITION_LABEL" - - # split out covariate predictors, x, from our target column position_label, y. - split_X = test_train_pd_df.drop([target_col], axis=1) - split_y = test_train_pd_df[target_col] - - # Split out our training and test data into proportions - X_train, X_test, y_train, y_test = train_test_split(split_X, split_y, train_size=0.7, random_state=42) - train = [X_train, y_train] - test = [X_test, y_test] - # now we are only training our one model to deploy - # we are keeping the focus on the workflows and not algorithms for this lab! - model = LogisticRegression() - - # fit the preprocessing pipeline and the model together - model.fit(X_train, y_train) - y_pred = model.predict_proba(X_test)[:,1] - predictions = [round(value) for value in y_pred] - balanced_accuracy = balanced_accuracy_score(y_test, predictions) - - # Save the model to a stage - save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" ) - logger.info('Model artifact:' + "@MODELSTAGE/driver_position_"+version+".joblib") - - # Take our pandas training and testing dataframes and put them back into snowpark dataframes - snowpark_train_df = session.write_pandas(pd.concat(train, axis=1, join='inner'), "train_table", auto_create_table=True, create_temp_table=True) - snowpark_test_df = session.write_pandas(pd.concat(test, axis=1, join='inner'), "test_table", auto_create_table=True, create_temp_table=True) - - # Union our training and testing data together and add a column indicating train vs test rows - return snowpark_train_df.with_column("DATASET_TYPE", F.lit("train")).union(snowpark_test_df.with_column("DATASET_TYPE", F.lit("test"))) \ No newline at end of file