diff --git a/models/core/fct_lap_times.sql b/models/core/fct_lap_times.sql new file mode 100644 index 0000000..37d62b0 --- /dev/null +++ b/models/core/fct_lap_times.sql @@ -0,0 +1,13 @@ +with lap_times as ( + select + {{ dbt_utils.generate_surrogate_key(['race_id', 'driver_id', 'lap']) }} as lap_times_id, + race_id as race_id, + driver_id as driver_id, + lap as lap, + driver_position as driver_position, + lap_time_formatted as lap_time_formatted, + official_laptime as official_laptime, + lap_time_milliseconds as lap_time_milliseconds + from {{ ref('stg_lap_times') }} +) +select * from lap_times \ No newline at end of file diff --git a/models/marts/aggregates/agg_lap_times_moving_avg.py b/models/marts/aggregates/agg_lap_times_moving_avg.py new file mode 100644 index 0000000..b1dc3ac --- /dev/null +++ b/models/marts/aggregates/agg_lap_times_moving_avg.py @@ -0,0 +1,17 @@ +import pandas as pd + +def model(dbt, session): + # dbt configuration + dbt.config(packages=["pandas"]) + + # get upstream data + lap_times = dbt.ref("mrt_lap_times_years").to_pandas() + + # describe the data + lap_times["LAP_TIME_SECONDS"] = lap_times["LAP_TIME_MILLISECONDS"]/1000 + lap_time_trends = lap_times.groupby(by="RACE_YEAR")["LAP_TIME_SECONDS"].mean().to_frame() + lap_time_trends.reset_index(inplace=True) + lap_time_trends["LAP_MOVING_AVG_5_YEARS"] = lap_time_trends["LAP_TIME_SECONDS"].rolling(5).mean() + lap_time_trends.columns = lap_time_trends.columns.str.upper() + + return lap_time_trends.round(1) \ No newline at end of file diff --git a/models/marts/mrt_lap_times_years.sql b/models/marts/mrt_lap_times_years.sql new file mode 100644 index 0000000..b35db7e --- /dev/null +++ b/models/marts/mrt_lap_times_years.sql @@ -0,0 +1,19 @@ +with lap_times as ( +select * from {{ ref('fct_lap_times') }} + ), + races as ( + select * from {{ ref('dim_races') }} + ), + expanded_lap_times_by_year as ( + select + lap_times.race_id, + driver_id, + race_year, + lap, + lap_time_milliseconds + from lap_times + left join races + on lap_times.race_id = races.race_id + where lap_time_milliseconds is not null + ) + select * from expanded_lap_times_by_year \ No newline at end of file diff --git a/models/ml/prep_encoding_splitting/covariate_encoding.py b/models/ml/prep_encoding_splitting/covariate_encoding.py index 3b44892..c42fbde 100644 --- a/models/ml/prep_encoding_splitting/covariate_encoding.py +++ b/models/ml/prep_encoding_splitting/covariate_encoding.py @@ -5,7 +5,7 @@ def model(dbt, session): # dbt configuration - dbt.config(packages=["pandas","numpy","scikit-learn"]) + dbt.config(packages=["pandas==1.5.3","numpy","scikit-learn"]) # get upstream data data = dbt.ref("ml_data_prep").to_pandas() diff --git a/models/ml/prep_encoding_splitting/ml_data_prep.py b/models/ml/prep_encoding_splitting/ml_data_prep.py index 29f2eaf..1f9ef5b 100644 --- a/models/ml/prep_encoding_splitting/ml_data_prep.py +++ b/models/ml/prep_encoding_splitting/ml_data_prep.py @@ -2,7 +2,7 @@ def model(dbt, session): # dbt configuration - dbt.config(packages=["pandas"]) + dbt.config(packages=["pandas==1.5.3"]) # get upstream data fct_results = dbt.ref("mrt_results_circuits").to_pandas() diff --git a/models/ml/training_and_predictions/apply_prediction_to_position.py b/models/ml/training_and_predictions/apply_prediction_to_position.py new file mode 100644 index 0000000..dbbbb1f --- /dev/null +++ b/models/ml/training_and_predictions/apply_prediction_to_position.py @@ -0,0 +1,92 @@ +import logging +import joblib +import pandas as pd +import os +from snowflake.snowpark import types as T + +DB_STAGE = 'MODELSTAGE' +version = '1.0' +# The name of the model file +model_file_path = 'driver_position_'+version +model_file_packaged = 'driver_position_'+version+'.joblib' + +# This is a local directory, used for storing the various artifacts locally +LOCAL_TEMP_DIR = f'/tmp/driver_position' +DOWNLOAD_DIR = os.path.join(LOCAL_TEMP_DIR, 'download') +TARGET_MODEL_DIR_PATH = os.path.join(LOCAL_TEMP_DIR, 'ml_model') +TARGET_LIB_PATH = os.path.join(LOCAL_TEMP_DIR, 'lib') + +# The feature columns that were used during model training +# and that will be used during prediction +FEATURE_COLS = [ + "RACE_YEAR" + ,"RACE_NAME" + ,"GRID" + ,"CONSTRUCTOR_NAME" + ,"DRIVER" + ,"DRIVERS_AGE_YEARS" + ,"DRIVER_CONFIDENCE" + ,"CONSTRUCTOR_RELAIBLITY" + ,"TOTAL_PIT_STOPS_PER_RACE"] + +def register_udf_for_prediction(p_predictor ,p_session ,p_dbt): + + # The prediction udf + + def predict_position(p_df: T.PandasDataFrame[int, int, int, int, + int, int, int, int, int]) -> T.PandasSeries[int]: + # Snowpark currently does not set the column name in the input dataframe + # The default col names are like 0,1,2,... Hence we need to reset the column + # names to the features that we initially used for training. + p_df.columns = [*FEATURE_COLS] + + # Perform prediction. this returns an array object + pred_array = p_predictor.predict(p_df) + # Convert to series + df_predicted = pd.Series(pred_array) + return df_predicted + + # The list of packages that will be used by UDF + udf_packages = p_dbt.config.get('packages') + + predict_position_udf = p_session.udf.register( + predict_position + ,name=f'predict_position' + ,packages = udf_packages + ) + return predict_position_udf + +def download_models_and_libs_from_stage(p_session): + p_session.file.get(f'@{DB_STAGE}/{model_file_path}/{model_file_packaged}', DOWNLOAD_DIR) + +def load_model(p_session): + # Load the model and initialize the predictor + model_fl_path = os.path.join(DOWNLOAD_DIR, model_file_packaged) + predictor = joblib.load(model_fl_path) + return predictor + +# ------------------------------- +def model(dbt, session): + dbt.config( + packages = ['snowflake-snowpark-python' ,'scipy','scikit-learn' ,'pandas' ,'numpy'], + materialized = "table", + tags = "predict", + use_anonymous_sproc=True + ) + session._use_scoped_temp_objects = False + download_models_and_libs_from_stage(session) + predictor = load_model(session) + predict_position_udf = register_udf_for_prediction(predictor, session ,dbt) + + # Retrieve the data, and perform the prediction + hold_out_df = (dbt.ref("hold_out_dataset_for_prediction") + .select(*FEATURE_COLS) + ) + trained_model_file = dbt.ref("train_model_to_predict_position") + + # Perform prediction. + new_predictions_df = hold_out_df.withColumn("position_predicted" + ,predict_position_udf(*FEATURE_COLS) + ) + + return new_predictions_df \ No newline at end of file diff --git a/models/ml/training_and_predictions/train_model_to_predict_position.py b/models/ml/training_and_predictions/train_model_to_predict_position.py new file mode 100644 index 0000000..ac5b5a6 --- /dev/null +++ b/models/ml/training_and_predictions/train_model_to_predict_position.py @@ -0,0 +1,68 @@ +import snowflake.snowpark.functions as F +from sklearn.model_selection import train_test_split +import pandas as pd +from sklearn.metrics import confusion_matrix, balanced_accuracy_score +import io +from sklearn.linear_model import LogisticRegression +from joblib import dump, load +import joblib +import logging +import sys +from joblib import dump, load + +logger = logging.getLogger("mylog") + +def save_file(session, model, path, dest_filename): + input_stream = io.BytesIO() + joblib.dump(model, input_stream) + session._conn.upload_stream(input_stream, path, dest_filename) + return "successfully created file: " + path + +def model(dbt, session): + dbt.config( + packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools'], + materialized = "table", + tags = "train" + ) + # Create a stage in Snowflake to save our model file + session.sql('create or replace stage MODELSTAGE').collect() + + #session._use_scoped_temp_objects = False + version = "1.0" + logger.info('Model training version: ' + version) + + # read in our training and testing upstream dataset + test_train_df = dbt.ref("training_testing_dataset") + + # cast snowpark df to pandas df + test_train_pd_df = test_train_df.to_pandas() + target_col = "POSITION_LABEL" + + # split out covariate predictors, x, from our target column position_label, y. + split_X = test_train_pd_df.drop([target_col], axis=1) + split_y = test_train_pd_df[target_col] + + # Split out our training and test data into proportions + X_train, X_test, y_train, y_test = train_test_split(split_X, split_y, train_size=0.7, random_state=42) + train = [X_train, y_train] + test = [X_test, y_test] + # now we are only training our one model to deploy + # we are keeping the focus on the workflows and not algorithms for this lab! + model = LogisticRegression() + + # fit the preprocessing pipeline and the model together + model.fit(X_train, y_train) + y_pred = model.predict_proba(X_test)[:,1] + predictions = [round(value) for value in y_pred] + balanced_accuracy = balanced_accuracy_score(y_test, predictions) + + # Save the model to a stage + save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" ) + logger.info('Model artifact:' + "@MODELSTAGE/driver_position_"+version+".joblib") + + # Take our pandas training and testing dataframes and put them back into snowpark dataframes + snowpark_train_df = session.write_pandas(pd.concat(train, axis=1, join='inner'), "train_table", auto_create_table=True, create_temp_table=True) + snowpark_test_df = session.write_pandas(pd.concat(test, axis=1, join='inner'), "test_table", auto_create_table=True, create_temp_table=True) + + # Union our training and testing data together and add a column indicating train vs test rows + return snowpark_train_df.with_column("DATASET_TYPE", F.lit("train")).union(snowpark_test_df.with_column("DATASET_TYPE", F.lit("test"))) \ No newline at end of file diff --git a/models/staging/display_moving_avg_laptimes.sql b/models/staging/display_moving_avg_laptimes.sql new file mode 100644 index 0000000..53cbae9 --- /dev/null +++ b/models/staging/display_moving_avg_laptimes.sql @@ -0,0 +1,3 @@ +--select * from {{ ref('apply_prediction_to_position') }} + +select * from {{ ref('apply_prediction_to_position') }} order by position_predicted diff --git a/package-lock.yml b/package-lock.yml new file mode 100644 index 0000000..c73bffd --- /dev/null +++ b/package-lock.yml @@ -0,0 +1,4 @@ +packages: +- package: dbt-labs/dbt_utils + version: 1.0.0 +sha1_hash: efa9169fb1f1a1b2c967378c02b60e3d85ae464b