diff --git a/site/package-lock.json b/site/package-lock.json index 192c934d80..d8bf6acef1 100644 --- a/site/package-lock.json +++ b/site/package-lock.json @@ -15200,4 +15200,4 @@ } } } -} \ No newline at end of file +} diff --git a/site/sfguides/src/_shared_assets/Snowflake_SwitchRole.png b/site/sfguides/src/_shared_assets/Snowflake_SwitchRole.png deleted file mode 100644 index 37f563ecbe..0000000000 Binary files a/site/sfguides/src/_shared_assets/Snowflake_SwitchRole.png and /dev/null differ diff --git a/site/sfguides/src/hpo-with-experiment-tracking/assets/hpo_example.ipynb b/site/sfguides/src/hpo-with-experiment-tracking/assets/hpo_example.ipynb new file mode 100644 index 0000000000..9d5b4ea5a7 --- /dev/null +++ b/site/sfguides/src/hpo-with-experiment-tracking/assets/hpo_example.ipynb @@ -0,0 +1,309 @@ +{ + "metadata": { + "language_info": { + "name": "python" + }, + "lastEditStatus": { + "notebookId": "6wwgc5yvkslbtwzqxiyl", + "authorId": "317811122459", + "authorName": "ADMIN", + "authorEmail": "marie.coolsaet@snowflake.com", + "sessionId": "ccfa6938-7d2b-4e2a-aee9-a515762cbb80", + "lastEditTime": 1762192477049 + } + }, + "nbformat_minor": 2, + "nbformat": 4, + "cells": [ + { + "cell_type": "markdown", + "metadata": { + "name": "cell1" + }, + "source": [ + "# Distributed Hyperparameter Tuning with Experiment Tracking in Snowflake\n", + "\n", + "This notebook demonstrates how to use Snowflake's ML capabilities for:\n", + "1. **Experiment Tracking** - Log parameters, metrics, and models\n", + "2. **Distributed HPO** - Parallel hyperparameter optimization at scale\n", + "3. **Container Runtime** - Leverage Snowpark Container Services for ML workloads\n", + "\n", + "We'll build a classification model using the Wine Quality dataset and optimize it using distributed hyperparameter tuning while tracking all experiments in Snowflake.\n" + ], + "id": "ce110000-1111-2222-3333-ffffff000000" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell2", + "collapsed": false + }, + "source": "## Prerequisites\n\n- Snowflake account with a database and schema\n- CREATE EXPERIMENT privilege on your schema\n- snowflake-ml-python >= 1.9.1\n- Notebook configured for Container Runtime on SPCS (Compute Pool with instance type `CPU_X64_S`)\n", + "id": "ce110000-1111-2222-3333-ffffff000001" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell3" + }, + "source": [ + "## Step 1: Setup and Data Loading\n" + ], + "id": "ce110000-1111-2222-3333-ffffff000002" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "cell4", + "language": "python" + }, + "outputs": [], + "source": "import pandas as pd\nimport numpy as np\nfrom datetime import datetime\nfrom sklearn.model_selection import train_test_split\nfrom sklearn.preprocessing import StandardScaler\nfrom sklearn import metrics\nfrom xgboost import XGBClassifier\n\nfrom snowflake.snowpark.context import get_active_session\nfrom snowflake.snowpark import Session\nfrom snowflake.ml.experiment.experiment_tracking import ExperimentTracking\nfrom snowflake.ml.modeling import tune\nfrom snowflake.ml.modeling.tune.search import RandomSearch, BayesOpt\nfrom snowflake.ml.data.data_connector import DataConnector\nfrom snowflake.ml.runtime_cluster import scale_cluster\n\n# Get active Snowflake session\nsession = get_active_session()\nprint(f\"Connected to Snowflake: {session.get_current_database()}.{session.get_current_schema()}\")\n\n# Create dated experiment name for tracking runs over time\nexperiment_date = datetime.now().strftime(\"%Y%m%d\")\nexperiment_name = f\"Wine_Quality_Classification_{experiment_date}\"\nprint(f\"\\nExperiment Name: {experiment_name}\")\n", + "id": "ce110000-1111-2222-3333-ffffff000003" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell5" + }, + "source": [ + "### Generate Wine Quality Classification Dataset\n", + "\n", + "We'll create a synthetic dataset inspired by wine quality prediction. The goal is to classify wines as high quality (1) or standard quality (0) based on chemical properties.\n" + ], + "id": "ce110000-1111-2222-3333-ffffff000004" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "cell6", + "language": "python" + }, + "outputs": [], + "source": "# Generate synthetic wine quality dataset\nnp.random.seed(42)\nn_samples = 20000\n\n# Feature generation with realistic correlations\ndata = {\n \"FIXED_ACIDITY\": np.random.normal(7.0, 1.5, n_samples),\n \"VOLATILE_ACIDITY\": np.random.gamma(2, 0.2, n_samples),\n \"CITRIC_ACID\": np.random.beta(2, 5, n_samples),\n \"RESIDUAL_SUGAR\": np.random.lognormal(1, 0.8, n_samples),\n \"CHLORIDES\": np.random.gamma(3, 0.02, n_samples),\n \"FREE_SULFUR_DIOXIDE\": np.random.normal(30, 15, n_samples),\n \"TOTAL_SULFUR_DIOXIDE\": np.random.normal(120, 40, n_samples),\n \"DENSITY\": np.random.normal(0.997, 0.003, n_samples),\n \"PH\": np.random.normal(3.2, 0.3, n_samples),\n \"SULPHATES\": np.random.gamma(4, 0.15, n_samples),\n \"ALCOHOL\": np.random.normal(10.5, 1.5, n_samples)\n}\n\ndf = pd.DataFrame(data)\n\n# Create quality target based on feature combinations\nquality_score = (\n 0.3 * (df[\"ALCOHOL\"] - df[\"ALCOHOL\"].mean()) / df[\"ALCOHOL\"].std() +\n 0.2 * (df[\"CITRIC_ACID\"] - df[\"CITRIC_ACID\"].mean()) / df[\"CITRIC_ACID\"].std() -\n 0.25 * (df[\"VOLATILE_ACIDITY\"] - df[\"VOLATILE_ACIDITY\"].mean()) / df[\"VOLATILE_ACIDITY\"].std() +\n 0.15 * (df[\"SULPHATES\"] - df[\"SULPHATES\"].mean()) / df[\"SULPHATES\"].std() +\n np.random.normal(0, 0.3, n_samples) # Add noise\n)\n\n# Binary classification: 1 = high quality, 0 = standard quality\ndf[\"QUALITY\"] = (quality_score > quality_score.quantile(0.6)).astype(int)\n\nprint(f\"Dataset shape: {df.shape}\")\nprint(f\"\\nClass distribution:\\n{df['QUALITY'].value_counts()}\")\nprint(f\"\\nFeature statistics:\\n{df.describe()}\")\n", + "id": "ce110000-1111-2222-3333-ffffff000005" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell7" + }, + "source": [ + "### Prepare Train/Validation/Test Splits\n" + ], + "id": "ce110000-1111-2222-3333-ffffff000006" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "cell8", + "language": "python" + }, + "outputs": [], + "source": "# Separate features and target\nX = df.drop('QUALITY', axis=1)\ny = df['QUALITY']\n\n# Create train/val/test splits\nX_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.15, random_state=42, stratify=y)\nX_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.18, random_state=42, stratify=y_temp)\n\n# Scale features\nscaler = StandardScaler()\nX_train_scaled = pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns)\nX_val_scaled = pd.DataFrame(scaler.transform(X_val), columns=X_val.columns)\nX_test_scaled = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns)\n\nprint(f\"Training set: {X_train_scaled.shape[0]} samples\")\nprint(f\"Validation set: {X_val_scaled.shape[0]} samples\")\nprint(f\"Test set: {X_test_scaled.shape[0]} samples\")\n", + "id": "ce110000-1111-2222-3333-ffffff000007" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell9" + }, + "source": [ + "## Step 2: Baseline Model with Experiment Tracking\n", + "\n", + "Before running distributed HPO, let's train a baseline model and log it to Snowflake Experiment Tracking.\n" + ], + "id": "ce110000-1111-2222-3333-ffffff000008" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "cell10", + "language": "python" + }, + "outputs": [], + "source": "# Initialize Experiment Tracking\nexp = ExperimentTracking(session=session)\nexp.set_experiment(experiment_name)\n\n# Note: Snowflake supports autologging for certain ML frameworks, but this example uses \n# explicit logging (exp.log_params, exp.log_metrics) to demonstrate a framework-agnostic \n# approach. Explicit logging works with any ML library (scikit-learn, XGBoost, PyTorch, \n# TensorFlow, custom frameworks, etc.) and gives you precise control over what gets logged, \n# without requiring integration with Snowflake's modeling APIs.\n\n# Train baseline model\nwith exp.start_run(run_name=\"baseline_xgboost\") as run:\n # Define baseline parameters\n baseline_params = {\n 'n_estimators': 100,\n 'max_depth': 6,\n 'learning_rate': 0.1,\n 'subsample': 0.8,\n 'colsample_bytree': 0.8,\n 'gamma': 0.1,\n 'min_child_weight': 8,\n 'random_state': 42,\n }\n \n # Log parameters\n exp.log_params(baseline_params)\n \n # Train model\n baseline_model = XGBClassifier(**baseline_params)\n baseline_model.fit(X_train_scaled, y_train)\n \n # Evaluate on validation set\n y_val_pred = baseline_model.predict(X_val_scaled)\n y_val_proba = baseline_model.predict_proba(X_val_scaled)[:, 1]\n \n # Calculate metrics\n val_metrics = {\n 'val_accuracy': metrics.accuracy_score(y_val, y_val_pred),\n 'val_precision': metrics.precision_score(y_val, y_val_pred),\n 'val_recall': metrics.recall_score(y_val, y_val_pred),\n 'val_f1': metrics.f1_score(y_val, y_val_pred),\n 'val_roc_auc': metrics.roc_auc_score(y_val, y_val_proba)\n }\n \n # Log metrics\n exp.log_metrics(val_metrics)\n \n print(\"Baseline Model Performance:\")\n for metric, value in val_metrics.items():\n print(f\" {metric}: {value:.4f}\")\n", + "id": "ce110000-1111-2222-3333-ffffff000009" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell11", + "collapsed": false + }, + "source": "## Step 3: Distributed Hyperparameter Optimization\n\nNow we'll use Snowflake's distributed HPO capabilities to find optimal hyperparameters. The HPO workload will:\n- Scale across multiple nodes in the SPCS compute pool\n- Run trials in parallel for faster optimization\n- Automatically log all trials to Experiment Tracking\n", + "id": "ce110000-1111-2222-3333-ffffff000010" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell12" + }, + "source": [ + "### Prepare Data Connectors\n", + "\n", + "Convert our pandas DataFrames to Snowflake DataConnectors for distributed processing.\n" + ], + "id": "ce110000-1111-2222-3333-ffffff000011" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "cell13", + "language": "python" + }, + "outputs": [], + "source": "# Combine features and target for each split\ntrain_df = pd.concat([X_train_scaled, y_train.reset_index(drop=True)], axis=1)\nval_df = pd.concat([X_val_scaled, y_val.reset_index(drop=True)], axis=1)\n\n# Create DataConnectors\ndataset_map = {\n \"train\": DataConnector.from_dataframe(session.create_dataframe(train_df)),\n \"val\": DataConnector.from_dataframe(session.create_dataframe(val_df)),\n}\n\nprint(\"Data connectors created successfully\")\n", + "id": "ce110000-1111-2222-3333-ffffff000012" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell14" + }, + "source": [ + "### Define Training Function with Experiment Tracking\n", + "\n", + "The training function will be executed for each trial. It integrates both HPO and Experiment Tracking.\n" + ], + "id": "ce110000-1111-2222-3333-ffffff000013" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "cell15", + "language": "python" + }, + "outputs": [], + "source": "def train_function():\n \"\"\"\n Training function executed for each HPO trial.\n Integrates with both TunerContext and ExperimentTracking.\n \"\"\" \n trial_session = Session.builder.getOrCreate()\n \n # Get tuner context\n tuner_context = tune.get_tuner_context()\n params = tuner_context.get_hyper_params()\n dm = tuner_context.get_dataset_map()\n \n # Initialize experiment tracking for this trial\n exp = ExperimentTracking(session=trial_session)\n exp.set_experiment(experiment_name)\n with exp.start_run():\n # Log hyperparameters\n exp.log_params(params)\n \n # Load data\n train_data = dm[\"train\"].to_pandas()\n val_data = dm[\"val\"].to_pandas()\n \n # Separate features and target\n X_train = train_data.drop('QUALITY', axis=1)\n y_train = train_data['QUALITY']\n X_val = val_data.drop('QUALITY', axis=1)\n y_val = val_data['QUALITY']\n \n # Train model with hyperparameters from HPO\n model = XGBClassifier(**params)\n model.fit(X_train, y_train)\n \n # Evaluate on validation set\n y_val_pred = model.predict(X_val)\n y_val_proba = model.predict_proba(X_val)[:, 1]\n \n # Calculate validation metrics\n val_metrics = {\n 'val_accuracy': metrics.accuracy_score(y_val, y_val_pred),\n 'val_precision': metrics.precision_score(y_val, y_val_pred),\n 'val_recall': metrics.recall_score(y_val, y_val_pred),\n 'val_f1': metrics.f1_score(y_val, y_val_pred),\n 'val_roc_auc': metrics.roc_auc_score(y_val, y_val_proba)\n }\n \n # Log metrics to experiment tracking\n exp.log_metrics(val_metrics)\n \n # Report to HPO framework (optimize on validation F1)\n tuner_context.report(metrics=val_metrics, model=model)\n", + "id": "ce110000-1111-2222-3333-ffffff000014" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell16" + }, + "source": [ + "### Define Search Space\n", + "\n", + "We'll define the hyperparameter search space using Snowflake's sampling functions.\n" + ], + "id": "ce110000-1111-2222-3333-ffffff000015" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "cell17", + "language": "python" + }, + "outputs": [], + "source": "# Define search space for XGBoost\nsearch_space = {\n 'n_estimators': tune.randint(50, 300),\n 'max_depth': tune.randint(3, 15),\n 'learning_rate': tune.loguniform(0.01, 0.3),\n 'subsample': tune.uniform(0.5, 1.0),\n 'colsample_bytree': tune.uniform(0.5, 1.0),\n 'gamma': tune.uniform(0.0, 0.5),\n 'min_child_weight': tune.randint(1, 10),\n 'random_state': 42,\n}\n\nprint(\"Search space defined:\")\nfor param, space in search_space.items():\n print(f\" {param}: {space}\")\n", + "id": "ce110000-1111-2222-3333-ffffff000016" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell18", + "collapsed": false + }, + "source": "### Configure and Run HPO\n\nConfigure the tuner to:\n- Maximize F1 score\n- Run 50 trials with random search\n- Execute trials in parallel across available nodes\n", + "id": "ce110000-1111-2222-3333-ffffff000017" + }, + { + "cell_type": "markdown", + "id": "dd695b29-e15f-46ba-8388-b4f5932a84ad", + "metadata": { + "name": "cell23", + "collapsed": false + }, + "source": "#### Monitor Node Activity with the Ray Dashboard\nUse the output url to access the dashboard" + }, + { + "cell_type": "code", + "id": "4736f03a-e044-4133-8a6e-7d90066fb9ed", + "metadata": { + "language": "python", + "name": "cell22" + }, + "outputs": [], + "source": "from snowflake.ml.runtime_cluster import get_ray_dashboard_url\nget_ray_dashboard_url()", + "execution_count": null + }, + { + "cell_type": "markdown", + "id": "75ffd2fe-7fbe-4e9f-b595-7e5794c7d828", + "metadata": { + "name": "cell24", + "collapsed": false + }, + "source": "#### Run HPO" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "cell19", + "language": "python" + }, + "outputs": [], + "source": "# Scale cluster for distributed processing\nprint(\"Scaling cluster for distributed HPO...\")\nscale_cluster(10) # Scale up nodes\n\n# Configure tuner\ntuner_config = tune.TunerConfig(\n metric='val_f1',\n mode='max',\n search_alg=RandomSearch(),\n num_trials=50\n)\n\n# Create tuner\ntuner = tune.Tuner(\n train_func=train_function,\n search_space=search_space,\n tuner_config=tuner_config\n)\n\nprint(\"Starting distributed hyperparameter optimization...\")\n\n# Run HPO\ntry:\n results = tuner.run(dataset_map=dataset_map)\n print(\"\\nHPO completed successfully\")\nexcept Exception as e:\n print(f\"\\nError during HPO: {e}\")\n raise\nfinally:\n # Scale cluster back down\n scale_cluster(1)\n print(\"Cluster scaled back to 1 node\")\n", + "id": "ce110000-1111-2222-3333-ffffff000018" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell20", + "collapsed": false + }, + "source": [ + "## Step 4: Analyze Results\n" + ], + "id": "ce110000-1111-2222-3333-ffffff000019" + }, + { + "cell_type": "code", + "execution_count": null, + "metadata": { + "name": "cell21", + "language": "python" + }, + "outputs": [], + "source": "# Display all results\nprint(\"BEST MODEL FOUND\")\nprint(\"=\"*60)\n\n# Extract best hyperparameters\nprint(f\"\\nBest Parameters:\")\nbest_model = results.best_model\nparams = best_model.get_xgb_params()\nprint(params)\n\n# Compare with baseline\nbest_f1 = results.best_result['val_f1'][0]\nbaseline_f1 = val_metrics['val_f1'] # From baseline model\nimprovement = ((best_f1 - baseline_f1) / baseline_f1) * 100\n\nprint(f\"\\nPerformance Comparison:\")\nprint(f\" Baseline F1: {baseline_f1:.4f}\")\nprint(f\" Best HPO F1: {best_f1:.4f}\")\nprint(f\" Improvement: {improvement:+.2f}%\")\n\n# Get test set f1 score\ny_test_pred = best_model.predict(X_test_scaled)\ntest_f1 = metrics.f1_score(y_test, y_test_pred)\nprint(f\"\\n\\n Best HPO Test Set F1: {test_f1:.4f}\")\n\nresults.best_result", + "id": "ce110000-1111-2222-3333-ffffff000020" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell30", + "collapsed": false + }, + "source": "## Step 5: View Results in Snowflake UI\n\nAll experiment runs are now available in the Snowflake UI:\n\n1. Navigate to **AI & ML > Experiments** in the left sidebar\n2. Find the `Wine_Quality_Classification_YYYYMMDD` experiment (with today's date)\n3. Compare runs, view metrics, and analyze results\n\n**Note**: Each time you run this notebook on a different day, it creates a new dated experiment, allowing you to track model performance over time and across different data versions.\n\nThe UI provides:\n- Side-by-side run comparisons\n- Metric visualizations\n- Parameter distributions\n- Model artifacts and metadata\n", + "id": "ce110000-1111-2222-3333-ffffff000029" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell31", + "collapsed": false + }, + "source": "## Summary\n\nIn this notebook, we demonstrated:\n\n1. **Experiment Tracking**: Logged parameters and metrics to Snowflake\n2. **Distributed HPO**: Ran 50 trials in parallel across multiple nodes\n3. **Integration**: Combined both capabilities for comprehensive ML experimentation\n", + "id": "ce110000-1111-2222-3333-ffffff000030" + }, + { + "cell_type": "markdown", + "metadata": { + "name": "cell32", + "collapsed": false + }, + "source": "## Next Steps\n\n### Extend this Example\n\n1. **Adjust the search space** - Modify hyperparameter ranges based on your problem domain and data size\n2. **Increase trial count** - Scale to 100-200 trials for more thorough optimization\n3. **Scale compute clusters** - Adjust `scale_cluster()` to increase or decrease parallelism\n4. **Deploy the winning model** - Register to Snowflake Model Registry\n\n\n\n", + "id": "ce110000-1111-2222-3333-ffffff000031" + } + ] +} \ No newline at end of file diff --git a/site/sfguides/src/hpo-with-experiment-tracking/hpo-with-experiment-tracking.md b/site/sfguides/src/hpo-with-experiment-tracking/hpo-with-experiment-tracking.md new file mode 100644 index 0000000000..af57deb77d --- /dev/null +++ b/site/sfguides/src/hpo-with-experiment-tracking/hpo-with-experiment-tracking.md @@ -0,0 +1,424 @@ +author: Marie Coolsaet +id: hpo-with-experiment-tracking +summary: Learn how to combine distributed hyperparameter optimization with experiment tracking in Snowflake to build scalable, reproducible ML workflows. +categories: snowflake-site:taxonomy/snowflake-feature/ai-ml,snowflake-site:taxonomy/snowflake-feature/snowflake-ml-functions,snowflake-site:taxonomy/snowflake-feature/snowpark-container-services +environments: web +status: Published +feedback link: https://github.com/Snowflake-Labs/sfguides/issues +tags: Data Science, Machine Learning, Hyperparameter Optimization, Experiment Tracking, XGBoost, SPCS +language: en + +# Distributed Hyperparameter Optimization with Experiment Tracking + +## Overview +This quickstart demonstrates how to combine two powerful Snowflake ML capabilities: + +- **Distributed Hyperparameter Optimization (HPO)** – Run model tuning in parallel on Snowpark Container Runtime +- **Experiment Tracking** – Automatically log parameters, metrics, and model artifacts for every run + +Together, these tools let you move from one-off experiments to scalable, reproducible ML workflows — all within Snowflake. + +### Challenges Addressed +- Sequential hyperparameter tuning is slow +- Manual experiment tracking is error-prone +- Distributed infrastructure setup is complex +- Reproducing past experiments requires detailed documentation + +### Prerequisites +- A Snowflake account with a database and schema +- CREATE EXPERIMENT privilege on your schema +- Familiarity with Python and ML concepts + +### What You'll Learn +- How to set up experiment tracking for ML runs +- How to run distributed HPO across multiple nodes +- How to log and compare experiment results +- How to view experiment history in Snowsight + +### What You'll Need +- snowflake-ml-python >= 1.9.1 +- Notebook configured for Container Runtime on SPCS (Compute Pool with instance type `CPU_X64_S`) + +### What You'll Build +- A complete ML pipeline with distributed hyperparameter optimization +- An XGBoost classification model optimized for wine quality prediction +- A tracked experiment with logged parameters, metrics, and models + + + + +## Setup and Data + +First, let's import the necessary libraries and set up our Snowflake session. + +```python +import pandas as pd +import numpy as np +from datetime import datetime +from sklearn.model_selection import train_test_split +from sklearn.preprocessing import StandardScaler +from sklearn import metrics +from xgboost import XGBClassifier + +from snowflake.snowpark.context import get_active_session +from snowflake.snowpark import Session +from snowflake.ml.experiment.experiment_tracking import ExperimentTracking +from snowflake.ml.modeling import tune +from snowflake.ml.modeling.tune.search import RandomSearch, BayesOpt +from snowflake.ml.data.data_connector import DataConnector +from snowflake.ml.runtime_cluster import scale_cluster + +# Get active Snowflake session +session = get_active_session() +print(f"Connected to Snowflake: {session.get_current_database()}.{session.get_current_schema()}") + +# Create dated experiment name for tracking runs over time +experiment_date = datetime.now().strftime("%Y%m%d") +experiment_name = f"Wine_Quality_Classification_{experiment_date}" +print(f"\nExperiment Name: {experiment_name}") +``` + +### Generate Wine Quality Classification Dataset + +We'll create a synthetic dataset inspired by wine quality prediction. The goal is to classify wines as high quality (1) or standard quality (0) based on chemical properties. + +```python +# Generate synthetic wine quality dataset +np.random.seed(42) +n_samples = 20000 + +# Feature generation with realistic correlations +data = { + "FIXED_ACIDITY": np.random.normal(7.0, 1.5, n_samples), + "VOLATILE_ACIDITY": np.random.gamma(2, 0.2, n_samples), + "CITRIC_ACID": np.random.beta(2, 5, n_samples), + "RESIDUAL_SUGAR": np.random.lognormal(1, 0.8, n_samples), + "CHLORIDES": np.random.gamma(3, 0.02, n_samples), + "FREE_SULFUR_DIOXIDE": np.random.normal(30, 15, n_samples), + "TOTAL_SULFUR_DIOXIDE": np.random.normal(120, 40, n_samples), + "DENSITY": np.random.normal(0.997, 0.003, n_samples), + "PH": np.random.normal(3.2, 0.3, n_samples), + "SULPHATES": np.random.gamma(4, 0.15, n_samples), + "ALCOHOL": np.random.normal(10.5, 1.5, n_samples) +} + +df = pd.DataFrame(data) + +# Create quality target based on feature combinations +quality_score = ( + 0.3 * (df["ALCOHOL"] - df["ALCOHOL"].mean()) / df["ALCOHOL"].std() + + 0.2 * (df["CITRIC_ACID"] - df["CITRIC_ACID"].mean()) / df["CITRIC_ACID"].std() - + 0.25 * (df["VOLATILE_ACIDITY"] - df["VOLATILE_ACIDITY"].mean()) / df["VOLATILE_ACIDITY"].std() + + 0.15 * (df["SULPHATES"] - df["SULPHATES"].mean()) / df["SULPHATES"].std() + + np.random.normal(0, 0.3, n_samples) # Add noise +) + +# Binary classification: 1 = high quality, 0 = standard quality +df["QUALITY"] = (quality_score > quality_score.quantile(0.6)).astype(int) + +print(f"Dataset shape: {df.shape}") +print(f"\nClass distribution:\n{df['QUALITY'].value_counts()}") +``` + +### Prepare Train/Validation/Test Splits + +```python +# Separate features and target +X = df.drop('QUALITY', axis=1) +y = df['QUALITY'] + +# Create train/val/test splits +X_temp, X_test, y_temp, y_test = train_test_split(X, y, test_size=0.15, random_state=42, stratify=y) +X_train, X_val, y_train, y_val = train_test_split(X_temp, y_temp, test_size=0.18, random_state=42, stratify=y_temp) + +# Scale features +scaler = StandardScaler() +X_train_scaled = pd.DataFrame(scaler.fit_transform(X_train), columns=X_train.columns) +X_val_scaled = pd.DataFrame(scaler.transform(X_val), columns=X_val.columns) +X_test_scaled = pd.DataFrame(scaler.transform(X_test), columns=X_test.columns) + +print(f"Training set: {X_train_scaled.shape[0]} samples") +print(f"Validation set: {X_val_scaled.shape[0]} samples") +print(f"Test set: {X_test_scaled.shape[0]} samples") +``` + + +## Baseline Model + +Before running distributed HPO, let's train a baseline model and log it to Snowflake Experiment Tracking. + +```python +# Initialize Experiment Tracking +exp = ExperimentTracking(session=session) +exp.set_experiment(experiment_name) + +# Train baseline model +with exp.start_run(run_name="baseline_xgboost") as run: + # Define baseline parameters + baseline_params = { + 'n_estimators': 100, + 'max_depth': 6, + 'learning_rate': 0.1, + 'subsample': 0.8, + 'colsample_bytree': 0.8, + 'gamma': 0.1, + 'min_child_weight': 8, + 'random_state': 42, + } + + # Log parameters + exp.log_params(baseline_params) + + # Train model + baseline_model = XGBClassifier(**baseline_params) + baseline_model.fit(X_train_scaled, y_train) + + # Evaluate on validation set + y_val_pred = baseline_model.predict(X_val_scaled) + y_val_proba = baseline_model.predict_proba(X_val_scaled)[:, 1] + + # Calculate metrics + val_metrics = { + 'val_accuracy': metrics.accuracy_score(y_val, y_val_pred), + 'val_precision': metrics.precision_score(y_val, y_val_pred), + 'val_recall': metrics.recall_score(y_val, y_val_pred), + 'val_f1': metrics.f1_score(y_val, y_val_pred), + 'val_roc_auc': metrics.roc_auc_score(y_val, y_val_proba) + } + + # Log metrics + exp.log_metrics(val_metrics) + + print("Baseline Model Performance:") + for metric, value in val_metrics.items(): + print(f" {metric}: {value:.4f}") +``` + + +## Data Connectors + +Convert pandas DataFrames to Snowflake DataConnectors for distributed processing. + +```python +# Combine features and target for each split +train_df = pd.concat([X_train_scaled, y_train.reset_index(drop=True)], axis=1) +val_df = pd.concat([X_val_scaled, y_val.reset_index(drop=True)], axis=1) + +# Create DataConnectors +dataset_map = { + "train": DataConnector.from_dataframe(session.create_dataframe(train_df)), + "val": DataConnector.from_dataframe(session.create_dataframe(val_df)), +} + +print("Data connectors created successfully") +``` + + +## Training Function + +The training function will be executed for each HPO trial. It integrates both HPO and Experiment Tracking. + +```python +def train_function(): + """ + Training function executed for each HPO trial. + Integrates with both TunerContext and ExperimentTracking. + """ + trial_session = Session.builder.getOrCreate() + + # Get tuner context + tuner_context = tune.get_tuner_context() + params = tuner_context.get_hyper_params() + dm = tuner_context.get_dataset_map() + + # Initialize experiment tracking for this trial + exp = ExperimentTracking(session=trial_session) + exp.set_experiment(experiment_name) + with exp.start_run(): + # Log hyperparameters + exp.log_params(params) + + # Load data + train_data = dm["train"].to_pandas() + val_data = dm["val"].to_pandas() + + # Separate features and target + X_train = train_data.drop('QUALITY', axis=1) + y_train = train_data['QUALITY'] + X_val = val_data.drop('QUALITY', axis=1) + y_val = val_data['QUALITY'] + + # Train model with hyperparameters from HPO + model = XGBClassifier(**params) + model.fit(X_train, y_train) + + # Evaluate on validation set + y_val_pred = model.predict(X_val) + y_val_proba = model.predict_proba(X_val)[:, 1] + + # Calculate validation metrics + val_metrics = { + 'val_accuracy': metrics.accuracy_score(y_val, y_val_pred), + 'val_precision': metrics.precision_score(y_val, y_val_pred), + 'val_recall': metrics.recall_score(y_val, y_val_pred), + 'val_f1': metrics.f1_score(y_val, y_val_pred), + 'val_roc_auc': metrics.roc_auc_score(y_val, y_val_proba) + } + + # Log metrics to experiment tracking + exp.log_metrics(val_metrics) + + # Report to HPO framework (optimize on validation F1) + tuner_context.report(metrics=val_metrics, model=model) +``` + + +## Configure the Search Space + +Define the hyperparameter search space using Snowflake's sampling functions. + +```python +# Define search space for XGBoost +search_space = { + 'n_estimators': tune.randint(50, 300), + 'max_depth': tune.randint(3, 15), + 'learning_rate': tune.loguniform(0.01, 0.3), + 'subsample': tune.uniform(0.5, 1.0), + 'colsample_bytree': tune.uniform(0.5, 1.0), + 'gamma': tune.uniform(0.0, 0.5), + 'min_child_weight': tune.randint(1, 10), + 'random_state': 42, +} + +print("Search space defined:") +for param, space in search_space.items(): + print(f" {param}: {space}") +``` + + +## Run Distributed HPO + +Configure the tuner to maximize F1 score, run 50 trials with random search, and execute trials in parallel across available nodes. + +### Monitor Node Activity with the Ray Dashboard + +Use the output URL to access the dashboard and monitor your distributed HPO jobs: + +```python +from snowflake.ml.runtime_cluster import get_ray_dashboard_url +get_ray_dashboard_url() +``` + +### Run HPO + +```python +# Scale cluster for distributed processing +print("Scaling cluster for distributed HPO...") +scale_cluster(10) # Scale up nodes + +# Configure tuner +tuner_config = tune.TunerConfig( + metric='val_f1', + mode='max', + search_alg=RandomSearch(), + num_trials=50 +) + +# Create tuner +tuner = tune.Tuner( + train_func=train_function, + search_space=search_space, + tuner_config=tuner_config +) + +print("Starting distributed hyperparameter optimization...") + +# Run HPO +try: + results = tuner.run(dataset_map=dataset_map) + print("\nHPO completed successfully") +except Exception as e: + print(f"\nError during HPO: {e}") + raise +finally: + # Scale cluster back down + scale_cluster(1) + print("Cluster scaled back to 1 node") +``` + +**Note:** Remember to scale your cluster back down after HPO completes to avoid unnecessary compute costs. + + +## Analyze Results + +Let's examine the best model found during hyperparameter optimization. + +```python +# Display all results +print("BEST MODEL FOUND") +print("="*60) + +# Extract best hyperparameters +print(f"\nBest Parameters:") +best_model = results.best_model +params = best_model.get_xgb_params() +print(params) + +# Compare with baseline +best_f1 = results.best_result['val_f1'][0] +baseline_f1 = val_metrics['val_f1'] # From baseline model +improvement = ((best_f1 - baseline_f1) / baseline_f1) * 100 + +print(f"\nPerformance Comparison:") +print(f" Baseline F1: {baseline_f1:.4f}") +print(f" Best HPO F1: {best_f1:.4f}") +print(f" Improvement: {improvement:+.2f}%") + +# Get test set f1 score +y_test_pred = best_model.predict(X_test_scaled) +test_f1 = metrics.f1_score(y_test, y_test_pred) +print(f"\n\nBest HPO Test Set F1: {test_f1:.4f}") + +results.best_result +``` + + +## View in Snowsight + +All experiment runs are now available in the Snowflake UI: + +1. Navigate to **AI & ML → Experiments** in the left sidebar +2. Find the `Wine_Quality_Classification_YYYYMMDD` experiment (with today's date) +3. Compare runs, view metrics, and analyze results + +The Snowsight UI provides: +- Side-by-side run comparisons +- Metric visualizations +- Parameter distributions +- Model artifacts and metadata + + +## Conclusion & Next Steps + +Congratulations! You've successfully built a distributed hyperparameter optimization pipeline with integrated experiment tracking in Snowflake. + +### What We've Covered +- Setting up Snowflake Experiment Tracking for ML runs +- Creating a baseline model with logged parameters and metrics +- Defining a hyperparameter search space for XGBoost +- Running distributed HPO across multiple SPCS nodes +- Analyzing and comparing experiment results in Snowsight + +### Next Steps + +1. **Adjust the search space** - Modify hyperparameter ranges based on your problem domain and data size +2. **Increase trial count** - Scale to 100-200 trials for more thorough optimization +3. **Scale compute clusters** - Adjust `scale_cluster()` to increase or decrease parallelism +4. **Deploy the winning model** - Register to Snowflake Model Registry + +### Related Resources +- [Blog post: Experiment Tracking with Distributed HPO in Snowflake](https://mariesoehlcoolsaet.medium.com/experiment-tracking-with-distributed-hpo-in-snowflake-cdf5ff41ba16) +- [Snowpark ML Overview](https://docs.snowflake.com/en/developer-guide/snowpark-ml/overview) +- [Experiment Tracking Documentation](https://docs.snowflake.com/en/developer-guide/snowflake-ml/experiments) +- [Parallel HPO Documentation](https://docs.snowflake.com/en/developer-guide/snowflake-ml/container-hpo) +