Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 17 additions & 57 deletions models/ml/training_and_prediction/apply_prediction_to_position.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,16 @@
import pandas as pd
import os
from snowflake.snowpark import types as T
from snowflake.ml.registry import Registry
from snowflake.snowpark.functions import col

DB_STAGE = 'MODELSTAGE'
version = '1.0'
# The name of the model file
model_file_path = 'driver_position_'+version
model_file_packaged = 'driver_position_'+version+'.joblib'

# This is a local directory, used for storing the various artifacts locally
LOCAL_TEMP_DIR = f'/tmp/driver_position'
DOWNLOAD_DIR = os.path.join(LOCAL_TEMP_DIR, 'download')
TARGET_MODEL_DIR_PATH = os.path.join(LOCAL_TEMP_DIR, 'ml_model')
TARGET_LIB_PATH = os.path.join(LOCAL_TEMP_DIR, 'lib')
# DB_STAGE = 'MODELSTAGE'
# version = '1.0'
# # The name of the model file


model_name = 'driver_position'

# The feature columns that were used during model training
# and that will be used during prediction
Expand All @@ -29,64 +27,26 @@
,"CONSTRUCTOR_RELAIBLITY"
,"TOTAL_PIT_STOPS_PER_RACE"]

def register_udf_for_prediction(p_predictor ,p_session ,p_dbt):

# The prediction udf

def predict_position(p_df: T.PandasDataFrame[int, int, int, int,
int, int, int, int, int]) -> T.PandasSeries[int]:
# Snowpark currently does not set the column name in the input dataframe
# The default col names are like 0,1,2,... Hence we need to reset the column
# names to the features that we initially used for training.
p_df.columns = [*FEATURE_COLS]

# Perform prediction. this returns an array object
pred_array = p_predictor.predict(p_df)
# Convert to series
df_predicted = pd.Series(pred_array)
return df_predicted

# The list of packages that will be used by UDF
udf_packages = p_dbt.config.get('packages')

predict_position_udf = p_session.udf.register(
predict_position
,name=f'predict_position'
,packages = udf_packages
)
return predict_position_udf

def download_models_and_libs_from_stage(p_session):
p_session.file.get(f'@{DB_STAGE}/{model_file_path}/{model_file_packaged}', DOWNLOAD_DIR)

def load_model(p_session):
# Load the model and initialize the predictor
model_fl_path = os.path.join(DOWNLOAD_DIR, model_file_packaged)
predictor = joblib.load(model_fl_path)
return predictor

# -------------------------------
def model(dbt, session):
dbt.config(
packages = ['snowflake-snowpark-python' ,'scipy','scikit-learn' ,'pandas' ,'numpy'],
packages = ['snowflake-snowpark-python' ,'scipy','scikit-learn' ,'pandas' ,'numpy','snowflake-ml-python'],
materialized = "table",
tags = "predict",
use_anonymous_sproc=True
)
session._use_scoped_temp_objects = False
download_models_and_libs_from_stage(session)
predictor = load_model(session)
predict_position_udf = register_udf_for_prediction(predictor, session ,dbt)

# Retrieve the data, and perform the prediction
hold_out_df = (dbt.ref("hold_out_dataset_for_prediction")
.select(*FEATURE_COLS)
)
trained_model_file = dbt.ref("train_model_to_predict_position")

# Perform prediction.
new_predictions_df = hold_out_df.withColumn("position_predicted"
,predict_position_udf(*FEATURE_COLS)
)

return new_predictions_df
reg = Registry(session=session, database_name=dbt.this.database, schema_name=dbt.this.schema)

m = reg.get_model(model_name)
mv = m.default
new_predictions_df = mv.run(hold_out_df, function_name="predict")
renamed_df = new_predictions_df.rename(col("output_feature_0"), "position_predicted")

return renamed_df
Original file line number Diff line number Diff line change
Expand Up @@ -9,28 +9,47 @@
import logging
import sys
from joblib import dump, load
from snowflake.ml.registry import Registry

logger = logging.getLogger("mylog")

def save_file(session, model, path, dest_filename):
input_stream = io.BytesIO()
joblib.dump(model, input_stream)
session._conn.upload_stream(input_stream, path, dest_filename)
return "successfully created file: " + path
#credit: code borrowed from https://medium.com/snowflake/getting-started-with-snowpark-model-registry-131e5a2783c4
def get_next_version(reg, model_name) -> str:
models = reg.show_models()
if models.empty:
return "V_1"
elif model_name not in models["name"].to_list():
return "V_1"
max_version = max(
ast.literal_eval(models.loc[models["name"] == model_name, "versions"].values[0])
)
return f"V_{int(max_version.split('_')[-1]) + 1}"

def register_model(session, reg, model_name, model, sample_input_data):
reg.log_model(
model_name=model_name,
version_name=get_next_version(reg, model_name),
model=model,
sample_input_data = sample_input_data
)
return "successfully registered model: " + model_name

def model(dbt, session):
dbt.config(
packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools'],
packages = ['numpy','scikit-learn','pandas','numpy','joblib','cachetools','snowflake-ml-python'],
materialized = "table",
tags = "train"
)
# Create a stage in Snowflake to save our model file
session.sql('create or replace stage MODELSTAGE').collect()
#session.sql('create or replace stage MODELSTAGE').collect()

#session._use_scoped_temp_objects = False
version = "1.0"
logger.info('Model training version: ' + version)

reg = Registry(session=session, database_name=dbt.this.database, schema_name=dbt.this.schema)


# read in our training and testing upstream dataset
test_train_df = dbt.ref("training_testing_dataset")

Expand All @@ -56,13 +75,16 @@ def model(dbt, session):
predictions = [round(value) for value in y_pred]
balanced_accuracy = balanced_accuracy_score(y_test, predictions)

# Save the model to a stage
save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" )
logger.info('Model artifact:' + "@MODELSTAGE/driver_position_"+version+".joblib")


# Take our pandas training and testing dataframes and put them back into snowpark dataframes
snowpark_train_df = session.write_pandas(pd.concat(train, axis=1, join='inner'), "train_table", auto_create_table=True, create_temp_table=True)
snowpark_test_df = session.write_pandas(pd.concat(test, axis=1, join='inner'), "test_table", auto_create_table=True, create_temp_table=True)

# Save the model to a stage
#save_file(session, model, "@MODELSTAGE/driver_position_"+version, "driver_position_"+version+".joblib" )
model_name = 'driver_position'
register_model(session, reg, model_name, model, sample_input_data =split_X )
logger.info('Model name: ' + model_name)

# Union our training and testing data together and add a column indicating train vs test rows
return snowpark_train_df.with_column("DATASET_TYPE", F.lit("train")).union(snowpark_test_df.with_column("DATASET_TYPE", F.lit("test")))
8 changes: 7 additions & 1 deletion packages.yml
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
packages:
- package: dbt-labs/dbt_utils
version: 1.0.0
version: 1.0.0
- package: calogica/dbt_expectations
version: 0.10.2
- package: dbt-labs/audit_helper
version: 0.12.0
- package: dbt-labs/dbt_project_evaluator
version: 0.12.0