diff --git a/models/ml/training_and_prediction/apply_prediction_to_position.py b/models/ml/training_and_prediction/apply_prediction_to_position.py index 2e6bd00..a6e0e03 100644 --- a/models/ml/training_and_prediction/apply_prediction_to_position.py +++ b/models/ml/training_and_prediction/apply_prediction_to_position.py @@ -3,18 +3,16 @@ import pandas as pd import os from snowflake.snowpark import types as T +from snowflake.ml.registry import Registry +from snowflake.snowpark.functions import col -DB_STAGE = 'MODELSTAGE' -version = '1.0' -# The name of the model file -model_file_path = 'driver_position_'+version -model_file_packaged = 'driver_position_'+version+'.joblib' -# This is a local directory, used for storing the various artifacts locally -LOCAL_TEMP_DIR = f'/tmp/driver_position' -DOWNLOAD_DIR = os.path.join(LOCAL_TEMP_DIR, 'download') -TARGET_MODEL_DIR_PATH = os.path.join(LOCAL_TEMP_DIR, 'ml_model') -TARGET_LIB_PATH = os.path.join(LOCAL_TEMP_DIR, 'lib') +# DB_STAGE = 'MODELSTAGE' +# version = '1.0' +# # The name of the model file + + +model_name = 'driver_position' # The feature columns that were used during model training # and that will be used during prediction @@ -29,64 +27,26 @@ ,"CONSTRUCTOR_RELAIBLITY" ,"TOTAL_PIT_STOPS_PER_RACE"] -def register_udf_for_prediction(p_predictor ,p_session ,p_dbt): - - # The prediction udf - - def predict_position(p_df: T.PandasDataFrame[int, int, int, int, - int, int, int, int, int]) -> T.PandasSeries[int]: - # Snowpark currently does not set the column name in the input dataframe - # The default col names are like 0,1,2,... Hence we need to reset the column - # names to the features that we initially used for training. - p_df.columns = [*FEATURE_COLS] - - # Perform prediction. this returns an array object - pred_array = p_predictor.predict(p_df) - # Convert to series - df_predicted = pd.Series(pred_array) - return df_predicted - - # The list of packages that will be used by UDF - udf_packages = p_dbt.config.get('packages') - - predict_position_udf = p_session.udf.register( - predict_position - ,name=f'predict_position' - ,packages = udf_packages - ) - return predict_position_udf - -def download_models_and_libs_from_stage(p_session): - p_session.file.get(f'@{DB_STAGE}/{model_file_path}/{model_file_packaged}', DOWNLOAD_DIR) - -def load_model(p_session): - # Load the model and initialize the predictor - model_fl_path = os.path.join(DOWNLOAD_DIR, model_file_packaged) - predictor = joblib.load(model_fl_path) - return predictor # ------------------------------- def model(dbt, session): dbt.config( - packages = ['snowflake-snowpark-python' ,'scipy','scikit-learn' ,'pandas' ,'numpy'], + packages = ['snowflake-snowpark-python' ,'scipy','scikit-learn' ,'pandas' ,'numpy','snowflake-ml-python'], materialized = "table", tags = "predict", use_anonymous_sproc=True ) - session._use_scoped_temp_objects = False - download_models_and_libs_from_stage(session) - predictor = load_model(session) - predict_position_udf = register_udf_for_prediction(predictor, session ,dbt) # Retrieve the data, and perform the prediction hold_out_df = (dbt.ref("hold_out_dataset_for_prediction") .select(*FEATURE_COLS) ) - trained_model_file = dbt.ref("train_model_to_predict_position") - # Perform prediction. - new_predictions_df = hold_out_df.withColumn("position_predicted" - ,predict_position_udf(*FEATURE_COLS) - ) - - return new_predictions_df \ No newline at end of file + reg = Registry(session=session, database_name=dbt.this.database, schema_name=dbt.this.schema) + + m = reg.get_model(model_name) + mv = m.default + new_predictions_df = mv.run(hold_out_df, function_name="predict") + renamed_df = new_predictions_df.rename(col("output_feature_0"), "position_predicted") + + return renamed_df \ No newline at end of file diff --git a/models/ml/training_and_prediction/train_model_to_predict_position.py b/models/ml/training_and_prediction/train_model_to_predict_position.py index b1a3fec..4674407 100644 --- a/models/ml/training_and_prediction/train_model_to_predict_position.py +++ b/models/ml/training_and_prediction/train_model_to_predict_position.py @@ -9,28 +9,47 @@ import logging import sys from joblib import dump, load +from snowflake.ml.registry import Registry logger = logging.getLogger("mylog") -def save_file(session, model, path, dest_filename): - input_stream = io.BytesIO() - joblib.dump(model, input_stream) - session._conn.upload_stream(input_stream, path, dest_filename) - return "successfully created file: " + path +#credit: code borrowed from https://medium.com/snowflake/getting-started-with-snowpark-model-registry-131e5a2783c4 +def get_next_version(reg, model_name) -> str: + models = reg.show_models() + if models.empty: + return "V_1" + elif model_name not in models["name"].to_list(): + return "V_1" + max_version = max( + ast.literal_eval(models.loc[models["name"] == model_name, "versions"].values[0]) + ) + return f"V_{int(max_version.split('_')[-1]) + 1}" + +def register_model(session, reg, model_name, model, sample_input_data): + reg.log_model( + model_name=model_name, + version_name=get_next_version(reg, model_name), + model=model, + sample_input_data = sample_input_data + ) + return "successfully registered model: " + model_name def model(dbt, session): dbt.config( - packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools'], + packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools','snowflake-ml-python'], materialized = "table", tags = "train" ) # Create a stage in Snowflake to save our model file - session.sql('create or replace stage MODELSTAGE').collect() + #session.sql('create or replace stage MODELSTAGE').collect() #session._use_scoped_temp_objects = False version = "1.0" logger.info('Model training version: ' + version) + reg = Registry(session=session, database_name=dbt.this.database, schema_name=dbt.this.schema) + + # read in our training and testing upstream dataset test_train_df = dbt.ref("training_testing_dataset") @@ -56,13 +75,16 @@ def model(dbt, session): predictions = [round(value) for value in y_pred] balanced_accuracy = balanced_accuracy_score(y_test, predictions) - # Save the model to a stage - save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" ) - logger.info('Model artifact:' + "@MODELSTAGE/driver_position_"+version+".joblib") - + # Take our pandas training and testing dataframes and put them back into snowpark dataframes snowpark_train_df = session.write_pandas(pd.concat(train, axis=1, join='inner'), "train_table", auto_create_table=True, create_temp_table=True) snowpark_test_df = session.write_pandas(pd.concat(test, axis=1, join='inner'), "test_table", auto_create_table=True, create_temp_table=True) + + # Save the model to a stage + #save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" ) + model_name = 'driver_position' + register_model(session, reg, model_name, model, sample_input_data =split_X ) + logger.info('Model name: ' + model_name) # Union our training and testing data together and add a column indicating train vs test rows return snowpark_train_df.with_column("DATASET_TYPE", F.lit("train")).union(snowpark_test_df.with_column("DATASET_TYPE", F.lit("test"))) \ No newline at end of file diff --git a/packages.yml b/packages.yml index d7c5292..cef9b1f 100644 --- a/packages.yml +++ b/packages.yml @@ -1,3 +1,9 @@ packages: - package: dbt-labs/dbt_utils - version: 1.0.0 \ No newline at end of file + version: 1.0.0 + - package: calogica/dbt_expectations + version: 0.10.2 + - package: dbt-labs/audit_helper + version: 0.12.0 + - package: dbt-labs/dbt_project_evaluator + version: 0.12.0