diff --git a/README.md b/README.md index 3a8c1f6..eedd4a6 100644 --- a/README.md +++ b/README.md @@ -49,6 +49,7 @@ These are the MLflow objects and their attributes that can be exported. | Logged Model | [link](https://github.com/mlflow/mlflow/blob/v3.0.0/mlflow/protos/service.proto#L612) | [link](https://mlflow.org/docs/latest/api_reference/python_api/mlflow.entities.html#mlflow.entities.LoggedModel) | | | Trace | [link](https://github.com/mlflow/mlflow/blob/v2.14.0/mlflow/protos/service.proto#L459) | [link](https://mlflow.org/docs/latest/api_reference/python_api/mlflow.entities.html#mlflow.entities.Trace) | | | Prompt | [link](https://mlflow.org/docs/latest/llms/prompt-engineering/index.html) | [link](https://mlflow.org/docs/latest/python_api/mlflow.genai.html) | - | +| Evaluation Dataset | [link](https://mlflow.org/docs/latest/genai/index.html) | [link](https://mlflow.org/docs/latest/python_api/mlflow.genai.html) | - | MLflow Export Import provides rudimentary capabilities for tracking lineage of the imported Mlflow objects by having the option save the original MLflow object attributes in the imported target environment. diff --git a/README_bulk.md b/README_bulk.md index ba8ebcc..6b98910 100644 --- a/README_bulk.md +++ b/README_bulk.md @@ -30,6 +30,8 @@ Notes: | | [import-traces](#Import-traces) | [code](mlflow_export_import/bulk/import_traces.py) | Imports traces from a directory | | Prompt | [export-prompts](#Export-prompts) | [code](mlflow_export_import/bulk/export_prompts.py) | Export prompts from the MLflow Prompt Registry (MLflow 2.21.0+). | | | [import-prompts](#Import-prompts) | [code](mlflow_export_import/bulk/import_prompts.py) | Imports prompts to the MLflow Prompt Registry. | +| Evaluation Dataset | [export-evaluation-datasets](#Export-evaluation-datasets) | [code](mlflow_export_import/bulk/export_evaluation_datasets.py) | Export GenAI evaluation datasets (MLflow 3.4.0+). | +| | [import-evaluation-datasets](#Import-evaluation-datasets) | [code](mlflow_export_import/bulk/import_evaluation_datasets.py) | Imports GenAI evaluation datasets. | ## All MLflow Objects Tools @@ -621,3 +623,92 @@ import-prompts --input-dir out/prompts **Notes:** * Prompts are imported with their original names and version numbers are preserved. * All versions of each prompt are exported and imported to maintain complete version history. +* If a prompt with the same name already exists, it will be skipped with a warning to preserve version numbers. Use `--delete-prompt True` to replace existing prompts. + + +## Evaluation Datasets + +Export/import GenAI evaluation datasets from the MLflow tracking server (MLflow 3.4.0+). + +**Note:** Evaluation dataset support requires MLflow 3.4.0 or higher and a SQL-based tracking backend (SQLite, PostgreSQL, MySQL). FileStore is not supported. The export/import will be skipped with a warning message if the MLflow version doesn't support evaluation datasets or if using FileStore. + +### Export evaluation datasets + +Export evaluation datasets to a directory. + +Source: [export_evaluation_datasets.py](mlflow_export_import/bulk/export_evaluation_datasets.py). + +#### Usage + +``` +export-evaluation-datasets --help + +Options: + --output-dir TEXT Output directory. [required] + --evaluation-datasets TEXT Evaluation dataset names: 'all' for all datasets, + comma-delimited list (e.g., 'dataset1,dataset2'), + or file path ending with '.txt' containing dataset + names (one per line). [required] + --experiment-ids TEXT Comma-separated list of experiment IDs to filter + datasets. Only used when --evaluation-datasets is 'all'. + --use-threads BOOLEAN Use multithreading for export. [default: False] +``` + +#### Examples + +##### Export all evaluation datasets +``` +export-evaluation-datasets \ + --output-dir out/evaluation_datasets \ + --evaluation-datasets all +``` + +##### Export specific evaluation datasets +``` +export-evaluation-datasets \ + --output-dir out/evaluation_datasets \ + --evaluation-datasets wine-quality-eval,iris-classification-eval +``` + +##### Export all evaluation datasets for specific experiments +``` +export-evaluation-datasets \ + --output-dir out/evaluation_datasets \ + --evaluation-datasets all \ + --experiment-ids 1,2,3 +``` + +**Note:** `--experiment-ids` only filters when `--evaluation-datasets` is set to 'all'. If you specify specific dataset names, `--experiment-ids` is ignored. + +### Import evaluation datasets + +Import evaluation datasets from a directory. + +Source: [import_evaluation_datasets.py](mlflow_export_import/bulk/import_evaluation_datasets.py). + +#### Usage + +``` +import-evaluation-datasets --help + +Options: + --input-dir TEXT Input directory containing exported evaluation datasets. [required] + --delete-evaluation-dataset BOOLEAN Delete existing evaluation dataset before importing. [default: False] + --use-threads BOOLEAN Use multithreading for import. [default: False] +``` + +#### Examples + +##### Import evaluation datasets +``` +import-evaluation-datasets --input-dir out/evaluation_datasets +``` + +##### Import with evaluation dataset deletion (if dataset exists, delete it first) +``` +import-evaluation-datasets \ + --input-dir out/evaluation_datasets \ + --delete-evaluation-dataset True +``` + +**Note:** If an evaluation dataset with the same name already exists, it will be skipped with a warning. Use `--delete-evaluation-dataset True` to replace existing datasets. diff --git a/README_single.md b/README_single.md index 0c90762..192eac6 100644 --- a/README_single.md +++ b/README_single.md @@ -29,6 +29,8 @@ See sample JSON export files [here](README_export_format.md#sample-export-json-f || [import-trace](#import-trace) | [code](mlflow_export_import/trace/import_trace.py) | | Prompt | [export-prompt](#export-prompt) | [code](mlflow_export_import/prompt/export_prompt.py) | | | [import-prompt](#import-prompt) | [code](mlflow_export_import/prompt/import_prompt.py) | +| Evaluation Dataset | [export-evaluation-dataset](#export-evaluation-dataset) | [code](mlflow_export_import/evaluation_dataset/export_evaluation_dataset.py) | +| | [import-evaluation-dataset](#import-evaluation-dataset) | [code](mlflow_export_import/evaluation_dataset/import_evaluation_dataset.py) | ## Experiment Tools @@ -648,4 +650,77 @@ import-prompt --input-dir out import-prompt \ --input-dir out \ --prompt-name my-new-prompt-name -``` \ No newline at end of file +``` + + +## Evaluation Dataset Tools + +Export and import GenAI evaluation datasets (MLflow 3.4.0+). + +**Note:** Evaluation dataset support requires MLflow 3.4.0 or higher and a SQL-based tracking backend (SQLite, PostgreSQL, MySQL). FileStore is not supported. + +### Export Evaluation Dataset + +Export a single evaluation dataset to a directory. + +Source: [export_evaluation_dataset.py](mlflow_export_import/evaluation_dataset/export_evaluation_dataset.py). + +#### Usage + +``` +export-evaluation-dataset --help + +Options: + --dataset-name TEXT Name of the evaluation dataset to export (mutually exclusive with --dataset-id). + --dataset-id TEXT ID of the evaluation dataset to export (mutually exclusive with --dataset-name). + --output-dir TEXT Output directory. [required] + +Note: Either --dataset-name or --dataset-id must be provided (mutually exclusive). +``` + +#### Examples + +##### Export by dataset name +``` +export-evaluation-dataset \ + --dataset-name wine-quality-eval \ + --output-dir out +``` + +##### Export by dataset ID +``` +export-evaluation-dataset \ + --dataset-id abc123 \ + --output-dir out +``` + +### Import Evaluation Dataset + +Import an evaluation dataset from an exported directory. + +Source: [import_evaluation_dataset.py](mlflow_export_import/evaluation_dataset/import_evaluation_dataset.py). + +#### Usage + +``` +import-evaluation-dataset --help + +Options: + --input-dir TEXT Input directory containing exported evaluation dataset. [required] + --dataset-name TEXT Optional new name for the imported dataset. If not + specified, uses original name. +``` + +#### Examples + +##### Import with original name +``` +import-evaluation-dataset --input-dir out +``` + +##### Import with new name +``` +import-evaluation-dataset \ + --input-dir out \ + --dataset-name my-new-dataset-name +``` diff --git a/mlflow_export_import/bulk/export_all.py b/mlflow_export_import/bulk/export_all.py index 1a6183b..cbd13c1 100644 --- a/mlflow_export_import/bulk/export_all.py +++ b/mlflow_export_import/bulk/export_all.py @@ -24,6 +24,7 @@ from mlflow_export_import.bulk.export_models import export_models from mlflow_export_import.bulk.export_experiments import export_experiments from mlflow_export_import.bulk.export_prompts import export_prompts +from mlflow_export_import.bulk.export_evaluation_datasets import export_evaluation_datasets ALL_STAGES = "Production,Staging,Archived,None" @@ -58,7 +59,7 @@ def export_all( use_threads = use_threads ) - # Only import those experiments not exported by above export_models() + # Only export those experiments not exported by above export_models() exported_exp_names = res_models["experiments"]["experiment_names"] all_exps = SearchExperimentsIterator(mlflow_client) all_exp_names = [ exp.name for exp in all_exps ] @@ -94,6 +95,26 @@ def export_all( _logger.warning(f"Failed to export prompts: {e}") res_prompts = {"error": str(e)} + # Export evaluation datasets (returns dict with status) + res_datasets = None + try: + _logger.info("Exporting evaluation datasets...") + res_datasets = export_evaluation_datasets( + output_dir = os.path.join(output_dir, "evaluation_datasets"), + dataset_names = None, # Export all datasets + experiment_ids = None, + use_threads = use_threads, + mlflow_client = mlflow_client + ) + # Log if unsupported but don't fail + if res_datasets and "unsupported" in res_datasets: + _logger.warning(f"Evaluation datasets not supported in MLflow {res_datasets.get('mlflow_version')}") + elif res_datasets and "error" in res_datasets: + _logger.warning(f"Failed to export evaluation datasets: {res_datasets['error']}") + except Exception as e: + _logger.warning(f"Failed to export evaluation datasets: {e}") + res_datasets = {"error": str(e)} + duration = round(time.time() - start_time, 1) info_attr = { "options": { @@ -108,7 +129,8 @@ def export_all( "duration": duration, "models": res_models, "experiments": res_exps, - "prompts": res_prompts + "prompts": res_prompts, + "evaluation_datasets": res_datasets } } io_utils.write_export_file(output_dir, "manifest.json", __file__, {}, info_attr) diff --git a/mlflow_export_import/bulk/export_evaluation_datasets.py b/mlflow_export_import/bulk/export_evaluation_datasets.py new file mode 100644 index 0000000..75657c2 --- /dev/null +++ b/mlflow_export_import/bulk/export_evaluation_datasets.py @@ -0,0 +1,206 @@ +""" +Exports multiple MLflow GenAI evaluation datasets to a directory. + +Note: Evaluation datasets are first-class entities introduced in MLflow 3.4.0+. +They require a SQL-based tracking backend (SQLite, PostgreSQL, MySQL). +FileStore backend is not supported. +""" + +import os +import click +import mlflow +from concurrent.futures import ThreadPoolExecutor + +from mlflow_export_import.common import utils, io_utils +from mlflow_export_import.common.click_options import ( + opt_output_dir, + opt_evaluation_datasets, + opt_evaluation_datasets_experiment_ids, + opt_use_threads +) +from mlflow_export_import.common.version_utils import has_evaluation_dataset_support, log_version_info +from mlflow_export_import.client.client_utils import create_mlflow_client +from mlflow_export_import.evaluation_dataset.export_evaluation_dataset import export_evaluation_dataset + +_logger = utils.getLogger(__name__) + + +def export_evaluation_datasets( + output_dir, + dataset_names=None, + experiment_ids=None, + use_threads=False, + mlflow_client=None + ): + """ + Export multiple evaluation datasets to a directory. + + :param output_dir: Output directory. + :param dataset_names: List of dataset names to export. If None, exports all datasets. + :param experiment_ids: List of experiment IDs to filter datasets. Only used when dataset_names is None. + :param use_threads: Use multithreading for export. + :param mlflow_client: MLflow client. + :return: Summary of export results. + """ + + if not has_evaluation_dataset_support(): + _logger.warning(f"Evaluation datasets not supported in MLflow {mlflow.__version__} (requires 3.4.0+)") + return {"unsupported": True, "mlflow_version": mlflow.__version__} + + mlflow_client = mlflow_client or create_mlflow_client() + log_version_info() + + try: + # Get list of datasets to export + if dataset_names: + datasets_to_export = _get_specified_datasets(dataset_names) + else: + datasets_to_export = _get_all_datasets(experiment_ids) + + _logger.info(f"Found {len(datasets_to_export)} datasets to export") + + # Create output directory + os.makedirs(output_dir, exist_ok=True) + + # Export datasets + results = _export_datasets(datasets_to_export, output_dir, use_threads) + + # Summary + successful = [r for r in results if r is not None] + failed = len(results) - len(successful) + + # Create dataset info list for summary + dataset_info = [] + for dataset in datasets_to_export: + dataset_info.append({ + "name": dataset.name, + "dataset_id": dataset.dataset_id + }) + + summary = { + "total_datasets": len(datasets_to_export), + "successful_exports": len(successful), + "failed_exports": failed, + "datasets": dataset_info + } + + # Write summary + io_utils.write_export_file(output_dir, "evaluation_datasets_summary.json", __file__, summary) + + _logger.info(f"Evaluation dataset export completed: {summary}") + return summary + + except Exception as e: + _logger.error(f"Bulk evaluation dataset export failed: {str(e)}") + return {"error": str(e)} + + +def _get_all_datasets(experiment_ids=None): + """Get all available datasets from the registry using standard MLflow APIs.""" + + # Try MLflow 3.4+ genai namespace + try: + import mlflow.genai + if hasattr(mlflow.genai, 'search_datasets'): + _logger.debug("Using mlflow.genai.search_datasets API") + # Convert experiment_ids to list if provided + exp_ids = experiment_ids if experiment_ids else None + datasets = mlflow.genai.search_datasets(experiment_ids=exp_ids) + return list(datasets) + except (ImportError, AttributeError) as e: + _logger.debug(f"mlflow.genai.search_datasets not available: {e}") + except Exception as e: + _logger.error(f"mlflow.genai.search_datasets failed: {e}") + raise + + raise Exception( + f"No compatible evaluation dataset search API found in MLflow {mlflow.__version__}. " + f"Ensure MLflow 3.4.0+ is installed." + ) + + +def _get_specified_datasets(dataset_names): + """Get specified datasets by name.""" + from mlflow.tracking import get_tracking_uri + from mlflow.utils.uri import is_databricks_uri + + datasets = [] + + # In Databricks, we can get by name directly + # In non-Databricks, we need to search first to get dataset_id + if is_databricks_uri(get_tracking_uri()): + # Databricks: can use name directly + from mlflow_export_import.evaluation_dataset.export_evaluation_dataset import _get_dataset_safe + for dataset_name in dataset_names: + try: + dataset = _get_dataset_safe(name=dataset_name, dataset_id=None) + if dataset: + datasets.append(dataset) + else: + _logger.warning(f"Dataset '{dataset_name}' not found") + except Exception as e: + _logger.error(f"Error getting dataset '{dataset_name}': {e}") + else: + # Non-Databricks: search all datasets and filter by name + all_datasets = _get_all_datasets() + dataset_map = {d.name: d for d in all_datasets} + + for dataset_name in dataset_names: + if dataset_name in dataset_map: + datasets.append(dataset_map[dataset_name]) + else: + _logger.warning(f"Dataset '{dataset_name}' not found") + + return datasets + + +def _export_datasets(datasets, output_dir, use_threads): + """Export datasets with optional multithreading.""" + def export_single(dataset): + dataset_dir = os.path.join(output_dir, f"{dataset.name}_{dataset.dataset_id}") + return export_evaluation_dataset( + dataset_name=dataset.name, + dataset_id=dataset.dataset_id, + output_dir=dataset_dir + ) + + max_workers = utils.get_threads(use_threads) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + results = list(executor.map(export_single, datasets)) + + return results + + +@click.command() +@opt_output_dir +@opt_evaluation_datasets +@opt_evaluation_datasets_experiment_ids +@opt_use_threads +def main(output_dir, evaluation_datasets, experiment_ids, use_threads): + _logger.info("Options:") + for k, v in locals().items(): + _logger.info(f" {k}: {v}") + + # Handle 'all', file, or comma-separated list + if evaluation_datasets.endswith(".txt"): + with open(evaluation_datasets, 'r') as f: + dataset_names_list = [line.strip() for line in f if line.strip()] + elif evaluation_datasets.lower() == 'all': + dataset_names_list = None # Export all + else: + dataset_names_list = [name.strip() for name in evaluation_datasets.split(",")] + + experiment_ids_list = None + if experiment_ids: + experiment_ids_list = [exp_id.strip() for exp_id in experiment_ids.split(",")] + + export_evaluation_datasets( + output_dir=output_dir, + dataset_names=dataset_names_list, + experiment_ids=experiment_ids_list, + use_threads=use_threads + ) + + +if __name__ == "__main__": + main() diff --git a/mlflow_export_import/bulk/import_all.py b/mlflow_export_import/bulk/import_all.py index 3daaf72..cb443a1 100644 --- a/mlflow_export_import/bulk/import_all.py +++ b/mlflow_export_import/bulk/import_all.py @@ -31,6 +31,7 @@ def import_all( input_dir, delete_model, delete_prompt = False, + delete_dataset = False, import_permissions = False, import_source_tags = False, use_src_user_id = False, @@ -41,8 +42,8 @@ def import_all( mlflow_client = None ): """ - Import all MLflow objects: models, experiments, runs, and prompts. - This delegates to import_models() and then adds prompt import on top. + Import all MLflow objects: models, experiments, runs, prompts, and evaluation datasets. + This delegates to import_models(), import_prompts(), and import_evaluation_datasets(). """ mlflow_client = mlflow_client or create_mlflow_client() @@ -81,8 +82,31 @@ def import_all( _logger.warning(f"Failed to import prompts: {e}") prompt_res = {"error": str(e)} - # Add prompts to the report + # Import evaluation datasets if they exist (returns dict with status) + evaluation_datasets_res = None + evaluation_datasets_dir = os.path.join(input_dir, "evaluation_datasets") + if os.path.exists(evaluation_datasets_dir): + try: + from mlflow_export_import.bulk.import_evaluation_datasets import import_evaluation_datasets + _logger.info("Importing evaluation datasets...") + evaluation_datasets_res = import_evaluation_datasets( + input_dir = evaluation_datasets_dir, + delete_dataset = delete_dataset, + use_threads = use_threads, + mlflow_client = mlflow_client + ) + # Log if unsupported but don't fail + if evaluation_datasets_res and "unsupported" in evaluation_datasets_res: + _logger.warning(f"Evaluation datasets not supported in MLflow {evaluation_datasets_res.get('mlflow_version')}") + elif evaluation_datasets_res and "error" in evaluation_datasets_res: + _logger.warning(f"Failed to import evaluation datasets: {evaluation_datasets_res['error']}") + except Exception as e: + _logger.warning(f"Failed to import evaluation datasets: {e}") + evaluation_datasets_res = {"error": str(e)} + + # Add prompts and evaluation datasets to the report models_result["prompts_import"] = prompt_res + models_result["evaluation_datasets_import"] = evaluation_datasets_res _logger.info("\nImport-all report:") _logger.info(f"{json.dumps(models_result, indent=2)}\n") @@ -95,6 +119,11 @@ def import_all( type=bool, default=False ) +@click.option("--delete-evaluation-dataset", + help="Delete existing evaluation datasets before importing.", + type=bool, + default=False +) @opt_import_permissions @opt_experiment_rename_file @opt_model_rename_file @@ -103,7 +132,7 @@ def import_all( @opt_use_threads @opt_verbose -def main(input_dir, delete_model, delete_prompt, +def main(input_dir, delete_model, delete_prompt, delete_evaluation_dataset, import_permissions, experiment_rename_file, model_rename_file, @@ -120,6 +149,7 @@ def main(input_dir, delete_model, delete_prompt, input_dir = input_dir, delete_model = delete_model, delete_prompt = delete_prompt, + delete_dataset = delete_evaluation_dataset, import_permissions = import_permissions, experiment_renames = rename_utils.get_renames(experiment_rename_file), model_renames = rename_utils.get_renames(model_rename_file), diff --git a/mlflow_export_import/bulk/import_evaluation_datasets.py b/mlflow_export_import/bulk/import_evaluation_datasets.py new file mode 100644 index 0000000..dad5df7 --- /dev/null +++ b/mlflow_export_import/bulk/import_evaluation_datasets.py @@ -0,0 +1,153 @@ +""" +Imports multiple MLflow GenAI evaluation datasets from a directory. + +Note: Evaluation datasets are first-class entities introduced in MLflow 3.4.0+. +They require a SQL-based tracking backend (SQLite, PostgreSQL, MySQL). +FileStore backend is not supported. +""" + +import os +import click +import mlflow +from concurrent.futures import ThreadPoolExecutor + +from mlflow_export_import.common import utils, io_utils +from mlflow_export_import.common.click_options import ( + opt_input_dir, + opt_delete_evaluation_dataset, + opt_use_threads +) +from mlflow_export_import.common.version_utils import has_evaluation_dataset_support, log_version_info +from mlflow_export_import.client.client_utils import create_mlflow_client +from mlflow_export_import.evaluation_dataset.import_evaluation_dataset import import_evaluation_dataset + +_logger = utils.getLogger(__name__) + + +def import_evaluation_datasets( + input_dir, + delete_dataset=False, + use_threads=False, + mlflow_client=None + ): + """ + Import multiple evaluation datasets from a directory. + + :param input_dir: Input directory containing exported datasets. + :param delete_dataset: Delete existing dataset before importing (if it exists). + :param use_threads: Use multithreading for import. + :param mlflow_client: MLflow client. + :return: Summary of import results. + """ + + if not has_evaluation_dataset_support(): + _logger.warning(f"Evaluation datasets not supported in MLflow {mlflow.__version__} (requires 3.4.0+)") + return {"unsupported": True, "mlflow_version": mlflow.__version__} + + mlflow_client = mlflow_client or create_mlflow_client() + log_version_info() + + try: + # Find all dataset directories to import + dataset_dirs = _find_dataset_directories(input_dir) + + _logger.info(f"Found {len(dataset_dirs)} datasets to import") + + # Import datasets + results = _import_datasets(dataset_dirs, delete_dataset, use_threads) + + # Summary - categorize results + successful = [] + skipped = [] + failed = [] + + for result in results: + if result is None: + failed.append(result) + elif isinstance(result, tuple) and len(result) == 2: + name, dataset_id = result + if dataset_id is None: + skipped.append(name) + else: + successful.append(name) + else: + successful.append(result) + + summary = { + "total_datasets": len(dataset_dirs), + "successful_imports": len(successful), + "skipped_imports": len(skipped), + "failed_imports": len(failed) + } + + if skipped: + _logger.info(f"Skipped {len(skipped)} existing datasets: {', '.join(skipped)}") + + # Write summary + io_utils.write_export_file(input_dir, "import_summary.json", __file__, summary) + + _logger.info(f"Evaluation dataset import completed: {summary}") + return summary + + except Exception as e: + _logger.error(f"Bulk evaluation dataset import failed: {str(e)}") + return {"error": str(e)} + + +def _find_dataset_directories(input_dir): + """Find all dataset directories containing evaluation_dataset.json.""" + dataset_dirs = [] + + if not os.path.exists(input_dir): + raise Exception(f"Input directory does not exist: {input_dir}") + + # Look for directories that contain evaluation_dataset.json files + for item in os.listdir(input_dir): + item_path = os.path.join(input_dir, item) + if os.path.isdir(item_path): + dataset_file = os.path.join(item_path, "evaluation_dataset.json") + if os.path.exists(dataset_file): + dataset_dirs.append({ + "name": item, + "path": item_path + }) + _logger.info(f"Found dataset directory: {item}") + + return dataset_dirs + + +def _import_datasets(dataset_dirs, delete_dataset, use_threads): + """Import datasets with optional multithreading.""" + def import_single(dataset_dir): + _logger.info(f"Importing dataset from: {dataset_dir['name']}") + return import_evaluation_dataset( + input_dir=dataset_dir["path"], + dataset_name=None, # Use original name from export + delete_dataset=delete_dataset + ) + + max_workers = utils.get_threads(use_threads) + with ThreadPoolExecutor(max_workers=max_workers) as executor: + results = list(executor.map(import_single, dataset_dirs)) + + return results + + +@click.command() +@opt_input_dir +@opt_delete_evaluation_dataset +@opt_use_threads +def main(input_dir, delete_evaluation_dataset, use_threads): + _logger.info("Options:") + for k, v in locals().items(): + _logger.info(f" {k}: {v}") + + import_evaluation_datasets( + input_dir=input_dir, + delete_dataset=delete_evaluation_dataset, + use_threads=use_threads + ) + + +if __name__ == "__main__": + main() diff --git a/mlflow_export_import/bulk/import_prompts.py b/mlflow_export_import/bulk/import_prompts.py index b7ed476..8006fca 100644 --- a/mlflow_export_import/bulk/import_prompts.py +++ b/mlflow_export_import/bulk/import_prompts.py @@ -55,16 +55,33 @@ def import_prompts( else: results = _import_prompts_sequential(prompt_dirs, mlflow_client, delete_prompt) - # Summary - successful = [r for r in results if r is not None] - failed = len(results) - len(successful) + # Summary - categorize results + successful = [] + skipped = [] + failed = [] + + for result in results: + if result is None: + failed.append(result) + elif isinstance(result, tuple) and len(result) == 2: + name, version = result + if version is None: + skipped.append(name) + else: + successful.append(name) + else: + successful.append(result) summary = { "total_prompts": len(prompt_dirs), "successful_imports": len(successful), - "failed_imports": failed + "skipped_imports": len(skipped), + "failed_imports": len(failed) } + if skipped: + _logger.info(f"Skipped {len(skipped)} existing prompts: {', '.join(skipped)}") + # Write summary summary_path = os.path.join(input_dir, "import_summary.json") io_utils.write_export_file(input_dir, "import_summary.json", __file__, summary) diff --git a/mlflow_export_import/common/click_options.py b/mlflow_export_import/common/click_options.py index 1e0b0d8..fb6f77e 100644 --- a/mlflow_export_import/common/click_options.py +++ b/mlflow_export_import/common/click_options.py @@ -282,4 +282,32 @@ def opt_request_id(function): type=str, required=True )(function) - return function \ No newline at end of file + return function + +# == evaluation datasets + +def opt_evaluation_datasets(function): + function = click.option("--evaluation-datasets", + help="Evaluation dataset names: 'all' for all datasets, comma-delimited list (e.g., 'dataset1,dataset2'), \ +or file path ending with '.txt' containing dataset names (one per line).", + type=str, + required=True + )(function) + return function + +def opt_evaluation_datasets_experiment_ids(function): + function = click.option("--experiment-ids", + help="Comma-separated list of experiment IDs to filter datasets. Only used when --evaluation-datasets is 'all'.", + type=str, + required=False + )(function) + return function + +def opt_delete_evaluation_dataset(function): + function = click.option("--delete-evaluation-dataset", + help="Delete existing evaluation dataset before importing.", + type=bool, + default=False, + show_default=True + )(function) + return function diff --git a/mlflow_export_import/evaluation_dataset/__init__.py b/mlflow_export_import/evaluation_dataset/__init__.py new file mode 100644 index 0000000..a0f2cb4 --- /dev/null +++ b/mlflow_export_import/evaluation_dataset/__init__.py @@ -0,0 +1,3 @@ +""" +Evaluation dataset export and import. +""" diff --git a/mlflow_export_import/evaluation_dataset/export_evaluation_dataset.py b/mlflow_export_import/evaluation_dataset/export_evaluation_dataset.py new file mode 100644 index 0000000..555cc9f --- /dev/null +++ b/mlflow_export_import/evaluation_dataset/export_evaluation_dataset.py @@ -0,0 +1,156 @@ +""" +Exports MLflow GenAI evaluation datasets to a directory. + +Note: Evaluation datasets are first-class entities introduced in MLflow 3.4.0+. +They require a SQL-based tracking backend (SQLite, PostgreSQL, MySQL). +FileStore backend is not supported. +""" + +import os +import click +import mlflow +from mlflow.exceptions import RestException +from mlflow.tracking import get_tracking_uri +from mlflow.utils.uri import is_databricks_uri + +from mlflow_export_import.common import utils, io_utils +from mlflow_export_import.common.click_options import opt_output_dir +from mlflow_export_import.common.timestamp_utils import adjust_timestamps +from mlflow_export_import.common.version_utils import has_evaluation_dataset_support, log_version_info +from mlflow_export_import.client.client_utils import create_mlflow_client + +_logger = utils.getLogger(__name__) + + +def export_evaluation_dataset( + output_dir, + dataset_name=None, + dataset_id=None, + mlflow_client=None + ): + """ + Export a single evaluation dataset to a directory. + + :param output_dir: Output directory. + :param dataset_name: Name of the dataset to export. Either dataset_name or dataset_id must be provided. + :param dataset_id: ID of the dataset to export. Either dataset_name or dataset_id must be provided. + :param mlflow_client: MLflow client (not used for genai API but kept for consistency). + :return: Dataset object or None if export failed. + :raises ValueError: If neither dataset_name nor dataset_id is provided. + """ + + if not has_evaluation_dataset_support(): + raise Exception(f"Evaluation datasets not supported in MLflow {mlflow.__version__} (requires 3.4.0+)") + + if not dataset_name and not dataset_id: + raise ValueError("Either dataset_name or dataset_id must be provided") + + mlflow_client = mlflow_client or create_mlflow_client() + log_version_info() + + try: + # Get the dataset using mlflow.genai API + dataset = _get_dataset_safe(name=dataset_name, dataset_id=dataset_id) + + identifier = dataset_name or dataset_id + _logger.info(f"Exporting evaluation dataset: {identifier}") + + # Serialize dataset to dictionary + dataset_dict = dataset.to_dict() + + # Adjust timestamps for readability + if "create_time" in dataset_dict: + adjust_timestamps(dataset_dict, ["create_time"]) + + mlflow_attr = { + "evaluation_dataset": dataset_dict + } + + # Write dataset export file + io_utils.write_export_file(output_dir, "evaluation_dataset.json", __file__, mlflow_attr) + + _logger.info(f"Successfully exported evaluation dataset: {identifier}") + return dataset + + except RestException as e: + _logger.error(f"Evaluation dataset export failed: {{'dataset_name': '{dataset_name}', 'dataset_id': '{dataset_id}', 'RestException': {e.json}}}") + return None + except Exception as e: + error_msg = str(e) + # Check for FileStore backend error + if "FileStore" in error_msg or "SQL-based tracking backend" in error_msg: + _logger.error( + f"Evaluation datasets require SQL backend (SQLite, PostgreSQL, MySQL). " + f"FileStore is not supported. Error: {error_msg}" + ) + else: + _logger.error(f"Evaluation dataset export failed: {{'dataset_name': '{dataset_name}', 'dataset_id': '{dataset_id}', 'Exception': {error_msg}}}") + return None + + +def _get_dataset_safe(name=None, dataset_id=None): + """Get dataset with version-aware API calls - ensures compatibility across MLflow deployments.""" + + # Try MLflow 3.4+ genai namespace + try: + import mlflow.genai + if hasattr(mlflow.genai, 'get_dataset'): + # In Databricks: use name parameter only + # Outside Databricks: use dataset_id parameter only + if is_databricks_uri(get_tracking_uri()): + return mlflow.genai.get_dataset(name=name) + else: + # If name provided but not dataset_id, search for it + if name and not dataset_id: + _logger.info(f"Looking up dataset_id for name: {name}") + datasets = list(mlflow.genai.search_datasets()) + matching = [d for d in datasets if d.name == name] + if not matching: + raise ValueError(f"Dataset with name '{name}' not found") + if len(matching) > 1: + _logger.warning(f"Multiple datasets found with name '{name}', using first match") + dataset_id = matching[0].dataset_id + _logger.info(f"Found dataset_id: {dataset_id}") + + if not dataset_id: + raise ValueError("Parameter 'dataset_id' is required. Use search_datasets() to find the dataset ID by name if needed.") + + return mlflow.genai.get_dataset(dataset_id=dataset_id) + except (ImportError, AttributeError) as e: + _logger.debug(f"mlflow.genai.get_dataset not available: {e}") + except Exception as e: + _logger.debug(f"mlflow.genai.get_dataset failed: {e}") + raise + + raise Exception( + f"No compatible evaluation dataset API found in MLflow {mlflow.__version__}. " + f"Ensure MLflow 3.4.0+ is installed." + ) + + +@click.command() +@click.option("--dataset-name", + help="Name of the evaluation dataset to export.", + type=str, + required=False +) +@click.option("--dataset-id", + help="ID of the evaluation dataset to export.", + type=str, + required=False +) +@opt_output_dir +def main(dataset_name, dataset_id, output_dir): + _logger.info("Options:") + for k, v in locals().items(): + _logger.info(f" {k}: {v}") + + export_evaluation_dataset( + output_dir=output_dir, + dataset_name=dataset_name, + dataset_id=dataset_id + ) + + +if __name__ == "__main__": + main() diff --git a/mlflow_export_import/evaluation_dataset/import_evaluation_dataset.py b/mlflow_export_import/evaluation_dataset/import_evaluation_dataset.py new file mode 100644 index 0000000..a7561cc --- /dev/null +++ b/mlflow_export_import/evaluation_dataset/import_evaluation_dataset.py @@ -0,0 +1,323 @@ +""" +Imports MLflow GenAI evaluation datasets from a directory. + +Note: Evaluation datasets are first-class entities introduced in MLflow 3.4.0+. +They require a SQL-based tracking backend (SQLite, PostgreSQL, MySQL). +FileStore backend is not supported. +""" + +import os +import json +import click +import mlflow +from packaging import version +from mlflow.exceptions import RestException +from mlflow.tracking import get_tracking_uri +from mlflow.utils.uri import is_databricks_uri + +from mlflow_export_import.common import utils, io_utils +from mlflow_export_import.common.click_options import opt_input_dir +from mlflow_export_import.common.version_utils import has_evaluation_dataset_support, log_version_info, get_mlflow_version, get_version_info +from mlflow_export_import.client.client_utils import create_mlflow_client + +_logger = utils.getLogger(__name__) + + +def _check_import_compatibility(dataset_data): + """Check compatibility between source (exported) and target (current) MLflow versions.""" + # Get source version info from exported data + system_info = dataset_data.get("system", {}) + source_version = system_info.get("mlflow_version", "unknown") + source_tracking_uri = system_info.get("mlflow_tracking_uri", "unknown") + + # Get target version info + target_version = str(get_mlflow_version()) + target_info = get_version_info() + + _logger.info(f"Version compatibility check:") + _logger.info(f" Source MLflow version: {source_version}") + _logger.info(f" Target MLflow version: {target_version}") + _logger.info(f" Source tracking URI: {source_tracking_uri}") + + # Check for potential compatibility issues + try: + source_ver = version.parse(source_version) + target_ver = get_mlflow_version() + + # Check if target supports evaluation datasets + if not target_info["supports_evaluation_datasets"]: + raise Exception( + f"Target MLflow {target_version} does not support evaluation datasets (requires 3.4.0+)" + ) + + # Warn about major version differences + if source_ver.major != target_ver.major: + _logger.warning( + f"Major version difference detected: source v{source_ver.major} → target v{target_ver.major}" + ) + _logger.warning("Some features may not be fully compatible") + + _logger.info("Version compatibility check passed") + + except Exception as e: + _logger.warning(f"Could not parse version information: {e}") + _logger.info("Proceeding with import (version check inconclusive)") + + +def import_evaluation_dataset( + input_dir, + dataset_name=None, + delete_dataset=False, + mlflow_client=None + ): + """ + Import an evaluation dataset from exported directory. + + :param input_dir: Input directory containing exported dataset. + :param dataset_name: Optional new name for the imported dataset. If None, uses original name. + :param delete_dataset: Delete existing dataset before importing (if it exists). + :param mlflow_client: MLflow client (not used for genai API but kept for consistency). + :return: Tuple of (name, dataset_id) or None if import failed. + """ + + if not has_evaluation_dataset_support(): + raise Exception( + f"Evaluation datasets not supported in MLflow {mlflow.__version__} (requires 3.4.0+)" + ) + + mlflow_client = mlflow_client or create_mlflow_client() + log_version_info() + + try: + # Read exported dataset data + dataset_path = os.path.join(input_dir, "evaluation_dataset.json") + if not os.path.exists(dataset_path): + raise Exception(f"Evaluation dataset export file not found: {dataset_path}") + + # Read full export file for version compatibility check + with open(dataset_path, 'r') as f: + full_export_data = json.load(f) + + # Check version compatibility between source and target + _check_import_compatibility(full_export_data) + + # Read MLflow-specific data for import + dataset_data = io_utils.read_file_mlflow(dataset_path) + + _logger.info(f"Importing evaluation dataset from: {input_dir}") + + # Extract dataset information + dataset_info = dataset_data.get("evaluation_dataset", {}) + + # Use provided name or original name + final_dataset_name = dataset_name or dataset_info.get("name") + if not final_dataset_name: + raise Exception("No dataset name specified and none found in export data") + + # Check if dataset already exists + existing_dataset = None + try: + import mlflow.genai + datasets = list(mlflow.genai.search_datasets()) + matching = [d for d in datasets if d.name == final_dataset_name] + if matching: + existing_dataset = matching[0] + _logger.info(f"Found existing dataset: {final_dataset_name} (ID: {existing_dataset.dataset_id})") + except Exception as e: + _logger.debug(f"Could not search for existing dataset: {e}") + + # Delete existing dataset if requested + if delete_dataset and existing_dataset: + try: + _delete_dataset_safe(dataset_id=existing_dataset.dataset_id) + _logger.info(f"Deleted existing dataset: {final_dataset_name}") + existing_dataset = None + except Exception as e: + _logger.warning(f"Could not delete dataset '{final_dataset_name}': {e}") + + # Skip import if dataset already exists and delete not requested + if existing_dataset and not delete_dataset: + _logger.warning( + f"Evaluation dataset '{final_dataset_name}' already exists - skipping import. " + f"Use --delete-evaluation-dataset to replace." + ) + return (final_dataset_name, existing_dataset.dataset_id) + + # Extract dataset components + source = dataset_info.get("source", {}) + schema = dataset_info.get("schema") + records = dataset_info.get("records", []) + tags = dataset_info.get("tags", {}) + experiment_ids = dataset_info.get("experiment_ids", []) + + # Create dataset in destination + try: + imported_dataset = _create_dataset_safe( + name=final_dataset_name, + source=source, # Not used in MLflow 3.4.0 but kept for future compatibility + schema=schema, # Not used in MLflow 3.4.0 but kept for future compatibility + experiment_ids=experiment_ids, + tags=tags + ) + except Exception as e: + error_msg = str(e) + # Check if it's a duplicate name error - this is a fallback in case search didn't catch it + if "already exists" in error_msg.lower() or "duplicate" in error_msg.lower() or "RESOURCE_ALREADY_EXISTS" in error_msg: + _logger.warning( + f"Evaluation dataset '{final_dataset_name}' already exists - skipping import. " + f"Use --delete-evaluation-dataset to replace." + ) + return (final_dataset_name, None) + else: + # Re-raise if it's not a duplicate error + raise + + # Merge records if dataset has records + if records: + _logger.info(f"Merging {len(records)} records into dataset") + imported_dataset = _merge_records_safe(imported_dataset, records) + + dataset_id = imported_dataset.dataset_id if hasattr(imported_dataset, 'dataset_id') else None + _logger.info(f"Successfully imported evaluation dataset: {final_dataset_name} (ID: {dataset_id})") + + return (final_dataset_name, dataset_id) + + except Exception as e: + error_msg = str(e) + # Check for FileStore backend error + if "FileStore" in error_msg or "SQL-based tracking backend" in error_msg: + _logger.error( + f"Evaluation datasets require SQL backend (SQLite, PostgreSQL, MySQL). " + f"FileStore is not supported. Error: {error_msg}" + ) + else: + _logger.error(f"Evaluation dataset import failed: {error_msg}") + return None + + +def _create_dataset_safe(name, source, schema=None, experiment_ids=None, tags=None): + """Create dataset with version-aware API calls - ensures compatibility across MLflow deployments.""" + + # Try MLflow 3.4+ genai namespace + try: + import mlflow.genai + if hasattr(mlflow.genai, 'create_dataset'): + # MLflow 3.4.0 API only accepts: name, experiment_id, tags + # source and schema are not parameters - they're set automatically or via records + # IMPORTANT: Use [] to create truly independent datasets (no experiment association) + # Using None would default to experiment "0" (Default experiment) + return mlflow.genai.create_dataset( + name=name, + experiment_id=experiment_ids if experiment_ids is not None else [], + tags=tags + ) + except (ImportError, AttributeError) as e: + _logger.debug(f"mlflow.genai.create_dataset not available: {e}") + except Exception as e: + _logger.debug(f"mlflow.genai.create_dataset failed for '{name}': {e}") + raise + + raise Exception( + f"No compatible evaluation dataset API found in MLflow {mlflow.__version__}. " + f"Ensure MLflow 3.4.0+ is installed." + ) + + +def _merge_records_safe(dataset, records): + """Merge records into dataset with version-aware API calls.""" + + try: + if hasattr(dataset, 'merge_records'): + # merge_records returns a new dataset object + return dataset.merge_records(records) + except Exception as e: + _logger.debug(f"dataset.merge_records failed: {e}") + raise + + raise Exception( + f"No compatible method to merge records found in MLflow {mlflow.__version__}" + ) + + +def _set_dataset_tags_safe(dataset, tags): + """Set dataset tags with version-aware API calls.""" + + # Try mlflow.genai.set_dataset_tags + try: + import mlflow.genai + if hasattr(mlflow.genai, 'set_dataset_tags'): + mlflow.genai.set_dataset_tags( + dataset_id=dataset.dataset_id, + tags=tags + ) + return dataset + except (ImportError, AttributeError) as e: + _logger.debug(f"mlflow.genai.set_dataset_tags not available: {e}") + except Exception as e: + _logger.debug(f"mlflow.genai.set_dataset_tags failed: {e}") + raise + + raise Exception( + f"No compatible method to set dataset tags found in MLflow {mlflow.__version__}" + ) + + +def _delete_dataset_safe(name=None, dataset_id=None): + """Delete dataset with version-aware API calls.""" + # Try mlflow.genai.delete_dataset + try: + import mlflow.genai + if hasattr(mlflow.genai, 'delete_dataset'): + # In Databricks: use name parameter + # Outside Databricks: use dataset_id parameter + if is_databricks_uri(get_tracking_uri()): + mlflow.genai.delete_dataset(name=name) + else: + # For non-Databricks, we need to get the dataset_id first + if not dataset_id: + # Search for the dataset by name to get its ID + datasets = mlflow.genai.search_datasets() + for ds in datasets: + if ds.name == name: + dataset_id = ds.dataset_id + break + if dataset_id: + mlflow.genai.delete_dataset(dataset_id=dataset_id) + return + except (ImportError, AttributeError) as e: + _logger.debug(f"mlflow.genai.delete_dataset not available: {e}") + except Exception as e: + _logger.debug(f"mlflow.genai.delete_dataset failed: {e}") + raise + + raise Exception( + f"No compatible method to delete dataset found in MLflow {mlflow.__version__}" + ) + + +@click.command() +@opt_input_dir +@click.option("--evaluation-dataset-name", + help="Optional new name for the imported evaluation dataset. If not specified, uses original name.", + type=str, + required=False +) +@click.option("--delete-evaluation-dataset", + help="Delete existing evaluation dataset before importing.", + type=bool, + default=False +) +def main(input_dir, evaluation_dataset_name, delete_evaluation_dataset): + _logger.info("Options:") + for k, v in locals().items(): + _logger.info(f" {k}: {v}") + + import_evaluation_dataset( + input_dir=input_dir, + dataset_name=evaluation_dataset_name, + delete_dataset=delete_evaluation_dataset + ) + + +if __name__ == "__main__": + main() diff --git a/mlflow_export_import/prompt/export_prompt.py b/mlflow_export_import/prompt/export_prompt.py index 755a4d0..73fd583 100644 --- a/mlflow_export_import/prompt/export_prompt.py +++ b/mlflow_export_import/prompt/export_prompt.py @@ -96,7 +96,8 @@ def _get_prompt_safe(prompt_name, prompt_version): import mlflow.genai if hasattr(mlflow.genai, 'load_prompt'): return mlflow.genai.load_prompt(prompt_name, prompt_version) - except (ImportError, AttributeError, Exception): + except (ImportError, AttributeError): + # Only catch import/attribute errors, not runtime errors like "prompt not found" pass # Try MLflow client approach (works with 2.21+) @@ -104,14 +105,14 @@ def _get_prompt_safe(prompt_name, prompt_version): client = mlflow.MlflowClient() if hasattr(client, 'get_prompt_version'): return client.get_prompt_version(prompt_name, prompt_version) - except (ImportError, AttributeError, Exception): + except (ImportError, AttributeError): pass # Try top-level functions (deprecated but may work) try: if hasattr(mlflow, 'load_prompt'): return mlflow.load_prompt(prompt_name, prompt_version) - except (ImportError, AttributeError, Exception): + except (ImportError, AttributeError): pass raise Exception(f"No compatible prompt loading API found in MLflow {mlflow.__version__}. Ensure prompt registry is supported.") diff --git a/mlflow_export_import/prompt/import_prompt.py b/mlflow_export_import/prompt/import_prompt.py index 6a6088b..a6a2a95 100644 --- a/mlflow_export_import/prompt/import_prompt.py +++ b/mlflow_export_import/prompt/import_prompt.py @@ -117,18 +117,33 @@ def import_prompt( delete_prompt_util(mlflow_client, final_prompt_name) # Create prompt in destination + # Note: If prompt exists, register_prompt will fail - we catch this to preserve version numbers _logger.debug(f"Creating prompt '{final_prompt_name}' with template: {prompt_info.get('template', '')[:50]}...") - imported_prompt = _create_prompt_safe( - name=final_prompt_name, - template=prompt_info.get("template", ""), - tags=prompt_info.get("tags", {}), - commit_message=prompt_info.get("commit_message"), - mlflow_client=mlflow_client - ) - - if imported_prompt is None: - _logger.error(f"Failed to create prompt '{final_prompt_name}' - _create_prompt_safe returned None") - return None + try: + imported_prompt = _create_prompt_safe( + name=final_prompt_name, + template=prompt_info.get("template", ""), + tags=prompt_info.get("tags", {}), + commit_message=prompt_info.get("commit_message"), + mlflow_client=mlflow_client + ) + + if imported_prompt is None: + _logger.error(f"Failed to create prompt '{final_prompt_name}' - _create_prompt_safe returned None") + return None + + except Exception as e: + error_msg = str(e) + # Check if it's a duplicate error - skip to preserve version numbers + if "already exists" in error_msg.lower() or "duplicate" in error_msg.lower() or "RESOURCE_ALREADY_EXISTS" in error_msg: + _logger.warning( + f"Prompt '{final_prompt_name}' already exists - skipping import " + f"to preserve version numbers. Use --delete-prompt to replace." + ) + return (final_prompt_name, None) + else: + # Re-raise if it's not a duplicate error + raise _logger.info(f"Successfully imported prompt: {final_prompt_name}") return final_prompt_name, imported_prompt.version if hasattr(imported_prompt, 'version') else "1" diff --git a/samples/oss_mlflow/bulk/evaluation_datasets/evaluation_datasets_summary.json b/samples/oss_mlflow/bulk/evaluation_datasets/evaluation_datasets_summary.json new file mode 100644 index 0000000..a70ab70 --- /dev/null +++ b/samples/oss_mlflow/bulk/evaluation_datasets/evaluation_datasets_summary.json @@ -0,0 +1,32 @@ +{ + "system": { + "package_version": "1.0.0", + "script": "export_evaluation_datasets.py", + "export_file_version": "2", + "export_time": 1730000000, + "_export_time": "2024-10-27 00:00:00", + "mlflow_version": "3.5.1", + "mlflow_tracking_uri": "http://localhost:5000", + "platform": { + "python_version": "3.11.0", + "system": "Darwin", + "processor": "arm" + }, + "user": "user" + }, + "mlflow": { + "total_datasets": 2, + "successful_exports": 2, + "failed_exports": 0, + "datasets": [ + { + "name": "wine-quality-eval", + "dataset_id": "abc123" + }, + { + "name": "iris-classification-eval", + "dataset_id": "def456" + } + ] + } +} diff --git a/samples/oss_mlflow/bulk/evaluation_datasets/iris-classification-eval_def456/evaluation_dataset.json b/samples/oss_mlflow/bulk/evaluation_datasets/iris-classification-eval_def456/evaluation_dataset.json new file mode 100644 index 0000000..707ae81 --- /dev/null +++ b/samples/oss_mlflow/bulk/evaluation_datasets/iris-classification-eval_def456/evaluation_dataset.json @@ -0,0 +1,86 @@ +{ + "system": { + "package_version": "1.0.0", + "script": "export_evaluation_dataset.py", + "export_file_version": "2", + "export_time": 1730000000, + "_export_time": "2024-10-27 00:00:00", + "mlflow_version": "3.5.1", + "mlflow_tracking_uri": "http://localhost:5000", + "platform": { + "python_version": "3.11.0", + "system": "Darwin", + "processor": "arm" + }, + "user": "user" + }, + "mlflow": { + "evaluation_dataset": { + "dataset_id": "def456", + "name": "iris-classification-eval", + "experiment_ids": ["3"], + "tags": { + "purpose": "classification-evaluation", + "version": "2.0", + "domain": "iris-classification" + }, + "source": { + "type": "dataset", + "uri": "file:///data/iris_test.csv" + }, + "schema": { + "inputs": [ + {"name": "sepal_length", "type": "double"}, + {"name": "sepal_width", "type": "double"}, + {"name": "petal_length", "type": "double"}, + {"name": "petal_width", "type": "double"} + ], + "targets": [ + {"name": "species", "type": "string"} + ] + }, + "records": [ + { + "inputs": { + "sepal_length": 5.1, + "sepal_width": 3.5, + "petal_length": 1.4, + "petal_width": 0.2 + }, + "targets": { + "species": "setosa" + } + }, + { + "inputs": { + "sepal_length": 7.0, + "sepal_width": 3.2, + "petal_length": 4.7, + "petal_width": 1.4 + }, + "targets": { + "species": "versicolor" + } + }, + { + "inputs": { + "sepal_length": 6.3, + "sepal_width": 3.3, + "petal_length": 6.0, + "petal_width": 2.5 + }, + "targets": { + "species": "virginica" + } + } + ], + "profile": { + "num_records": 3, + "num_inputs": 4, + "num_targets": 1 + }, + "create_time": 1730000100000, + "_create_time": "2024-10-27 00:01:40" + } + } +} diff --git a/samples/oss_mlflow/bulk/evaluation_datasets/wine-quality-eval_abc123/evaluation_dataset.json b/samples/oss_mlflow/bulk/evaluation_datasets/wine-quality-eval_abc123/evaluation_dataset.json new file mode 100644 index 0000000..14aa310 --- /dev/null +++ b/samples/oss_mlflow/bulk/evaluation_datasets/wine-quality-eval_abc123/evaluation_dataset.json @@ -0,0 +1,96 @@ +{ + "system": { + "package_version": "1.0.0", + "script": "export_evaluation_dataset.py", + "export_file_version": "2", + "export_time": 1730000000, + "_export_time": "2024-10-27 00:00:00", + "mlflow_version": "3.5.1", + "mlflow_tracking_uri": "http://localhost:5000", + "platform": { + "python_version": "3.11.0", + "system": "Darwin", + "processor": "arm" + }, + "user": "user" + }, + "mlflow": { + "evaluation_dataset": { + "dataset_id": "abc123", + "name": "wine-quality-eval", + "experiment_ids": ["1", "2"], + "tags": { + "purpose": "model-evaluation", + "version": "1.0", + "domain": "wine-quality" + }, + "source": { + "type": "code", + "uri": "notebook://cell_123" + }, + "schema": { + "inputs": [ + {"name": "fixed_acidity", "type": "double"}, + {"name": "volatile_acidity", "type": "double"}, + {"name": "citric_acid", "type": "double"}, + {"name": "residual_sugar", "type": "double"}, + {"name": "chlorides", "type": "double"}, + {"name": "free_sulfur_dioxide", "type": "double"}, + {"name": "total_sulfur_dioxide", "type": "double"}, + {"name": "density", "type": "double"}, + {"name": "pH", "type": "double"}, + {"name": "sulphates", "type": "double"}, + {"name": "alcohol", "type": "double"} + ], + "targets": [ + {"name": "quality", "type": "long"} + ] + }, + "records": [ + { + "inputs": { + "fixed_acidity": 7.4, + "volatile_acidity": 0.7, + "citric_acid": 0.0, + "residual_sugar": 1.9, + "chlorides": 0.076, + "free_sulfur_dioxide": 11.0, + "total_sulfur_dioxide": 34.0, + "density": 0.9978, + "pH": 3.51, + "sulphates": 0.56, + "alcohol": 9.4 + }, + "targets": { + "quality": 5 + } + }, + { + "inputs": { + "fixed_acidity": 7.8, + "volatile_acidity": 0.88, + "citric_acid": 0.0, + "residual_sugar": 2.6, + "chlorides": 0.098, + "free_sulfur_dioxide": 25.0, + "total_sulfur_dioxide": 67.0, + "density": 0.9968, + "pH": 3.2, + "sulphates": 0.68, + "alcohol": 9.8 + }, + "targets": { + "quality": 5 + } + } + ], + "profile": { + "num_records": 2, + "num_inputs": 11, + "num_targets": 1 + }, + "create_time": 1730000000000, + "_create_time": "2024-10-27 00:00:00" + } + } +} diff --git a/setup.py b/setup.py index 11d2538..806cd93 100644 --- a/setup.py +++ b/setup.py @@ -75,7 +75,11 @@ "export-prompt = mlflow_export_import.prompt.export_prompt:main", "import-prompt = mlflow_export_import.prompt.import_prompt:main", "export-prompts = mlflow_export_import.bulk.export_prompts:main", - "import-prompts = mlflow_export_import.bulk.import_prompts:main" + "import-prompts = mlflow_export_import.bulk.import_prompts:main", + "export-evaluation-dataset = mlflow_export_import.evaluation_dataset.export_evaluation_dataset:main", + "import-evaluation-dataset = mlflow_export_import.evaluation_dataset.import_evaluation_dataset:main", + "export-evaluation-datasets = mlflow_export_import.bulk.export_evaluation_datasets:main", + "import-evaluation-datasets = mlflow_export_import.bulk.import_evaluation_datasets:main" ] } ) diff --git a/tests/open_source/test_evaluation_datasets.py b/tests/open_source/test_evaluation_datasets.py new file mode 100644 index 0000000..fa09217 --- /dev/null +++ b/tests/open_source/test_evaluation_datasets.py @@ -0,0 +1,357 @@ +""" +Tests for evaluation dataset export and import functionality. +""" +import pytest +import mlflow +from mlflow_export_import.evaluation_dataset.export_evaluation_dataset import export_evaluation_dataset +from mlflow_export_import.evaluation_dataset.import_evaluation_dataset import import_evaluation_dataset +from mlflow_export_import.bulk.export_evaluation_datasets import export_evaluation_datasets +from mlflow_export_import.bulk.import_evaluation_datasets import import_evaluation_datasets +from mlflow_export_import.common.version_utils import has_evaluation_dataset_support +from tests.utils_test import create_output_dir +from tests.open_source.init_tests import mlflow_context + + +pytestmark = pytest.mark.skipif( + not has_evaluation_dataset_support(), + reason="Evaluation datasets not supported in this MLflow version" +) + + +def _create_test_evaluation_dataset(client, name, records, tags=None, experiment_ids=None): + """Create a test evaluation dataset using version-aware API.""" + try: + import mlflow.genai + if hasattr(mlflow.genai, 'create_dataset'): + # Create dataset (MLflow 3.4.0 API) + dataset = mlflow.genai.create_dataset( + name=name, + experiment_id=experiment_ids, + tags=tags + ) + + # Merge records if provided + if records: + dataset.merge_records(records) + + return dataset + except Exception as e: + pytest.skip(f"No compatible evaluation dataset creation API available: {e}") + + +def test_export_import_evaluation_dataset(mlflow_context): + """Test single evaluation dataset export and import.""" + # Create test dataset in source + mlflow.set_tracking_uri(mlflow_context.client_src.tracking_uri) + dataset_name = "test_eval_dataset_single" + + records = [ + {"inputs": {"question": "What is 2+2?"}, "targets": {"answer": "4"}}, + {"inputs": {"question": "What is the capital of France?"}, "targets": {"answer": "Paris"}} + ] + + tags = {"test": "true", "purpose": "unit-test"} + + dataset = _create_test_evaluation_dataset( + mlflow_context.client_src, + name=dataset_name, + records=records, + tags=tags + ) + + assert dataset is not None + assert dataset.name == dataset_name + + # Export dataset + output_dir = f"{mlflow_context.output_dir}/eval_dataset_single" + create_output_dir(output_dir) + + exported = export_evaluation_dataset( + dataset_name=dataset_name, + output_dir=output_dir, + mlflow_client=mlflow_context.client_src + ) + + assert exported is not None + assert exported.name == dataset_name + + # Import dataset to destination + mlflow.set_tracking_uri(mlflow_context.client_dst.tracking_uri) + imported_name = f"{dataset_name}_imported" + + result = import_evaluation_dataset( + input_dir=output_dir, + dataset_name=imported_name, + mlflow_client=mlflow_context.client_dst + ) + + assert result is not None + assert result[0] == imported_name + + # Validate that records were actually imported (not just metadata) + imported_dataset = mlflow.genai.get_dataset(dataset_id=result[1]) + assert imported_dataset is not None + # Access records to trigger lazy loading and verify data was imported + imported_records = list(imported_dataset.records) + assert len(imported_records) == len(records) + + +def test_dataset_name_override_on_import(mlflow_context): + """Test dataset name override on import.""" + # Create test dataset in source + mlflow.set_tracking_uri(mlflow_context.client_src.tracking_uri) + original_name = "test_eval_dataset_rename" + + records = [ + {"inputs": {"text": "test"}, "targets": {"label": "positive"}} + ] + + dataset = _create_test_evaluation_dataset( + mlflow_context.client_src, + name=original_name, + records=records + ) + + # Export dataset + output_dir = f"{mlflow_context.output_dir}/eval_dataset_rename" + create_output_dir(output_dir) + + export_evaluation_dataset( + dataset_name=original_name, + output_dir=output_dir, + mlflow_client=mlflow_context.client_src + ) + + # Import with new name + mlflow.set_tracking_uri(mlflow_context.client_dst.tracking_uri) + new_name = "renamed_eval_dataset" + + result = import_evaluation_dataset( + input_dir=output_dir, + dataset_name=new_name, + mlflow_client=mlflow_context.client_dst + ) + + assert result is not None + assert result[0] == new_name + assert result[0] != original_name + + # Validate records were imported + imported_dataset = mlflow.genai.get_dataset(dataset_id=result[1]) + imported_records = list(imported_dataset.records) + assert len(imported_records) == len(records) + + +def test_bulk_export_import_evaluation_datasets(mlflow_context): + """Test bulk evaluation dataset export and import.""" + # Create multiple test datasets in source + mlflow.set_tracking_uri(mlflow_context.client_src.tracking_uri) + + datasets_data = [ + ("test_eval_bulk_1", [{"inputs": {"x": 1}, "targets": {"y": 2}}]), + ("test_eval_bulk_2", [{"inputs": {"x": 3}, "targets": {"y": 4}}]), + ("test_eval_bulk_3", [{"inputs": {"x": 5}, "targets": {"y": 6}}]) + ] + + for name, records in datasets_data: + _create_test_evaluation_dataset( + mlflow_context.client_src, + name=name, + records=records + ) + + # Export all datasets + output_dir = f"{mlflow_context.output_dir}/eval_datasets_bulk" + create_output_dir(output_dir) + + export_result = export_evaluation_datasets( + output_dir=output_dir, + dataset_names=None, # Export all + use_threads=False, + mlflow_client=mlflow_context.client_src + ) + + assert export_result is not None + assert export_result["successful_exports"] >= len(datasets_data) + assert export_result["failed_exports"] == 0 + + # Import all datasets to destination + mlflow.set_tracking_uri(mlflow_context.client_dst.tracking_uri) + + import_result = import_evaluation_datasets( + input_dir=output_dir, + use_threads=False, + mlflow_client=mlflow_context.client_dst + ) + + assert import_result is not None + assert import_result["successful_imports"] >= len(datasets_data) + + # Validate records were actually imported (not just metadata) + from mlflow.tracking import MlflowClient + client = MlflowClient() + all_datasets = client.search_datasets() + + for dataset_name, expected_records in datasets_data: + matching_datasets = [d for d in all_datasets if d.name == dataset_name] + assert len(matching_datasets) > 0, f"Dataset {dataset_name} not found" + + imported_dataset = mlflow.genai.get_dataset(dataset_id=matching_datasets[0].dataset_id) + # Access records to trigger lazy loading and verify data was imported + imported_records = list(imported_dataset.records) + assert len(imported_records) == len(expected_records) + + +def test_export_specific_evaluation_datasets(mlflow_context): + """Test exporting specific evaluation datasets by name.""" + # Create test datasets + mlflow.set_tracking_uri(mlflow_context.client_src.tracking_uri) + + _create_test_evaluation_dataset( + mlflow_context.client_src, + "eval_dataset_a", + [{"inputs": {"a": 1}, "targets": {"b": 2}}] + ) + _create_test_evaluation_dataset( + mlflow_context.client_src, + "eval_dataset_b", + [{"inputs": {"c": 3}, "targets": {"d": 4}}] + ) + _create_test_evaluation_dataset( + mlflow_context.client_src, + "eval_dataset_c", + [{"inputs": {"e": 5}, "targets": {"f": 6}}] + ) + + # Export only specific datasets + output_dir = f"{mlflow_context.output_dir}/eval_datasets_specific" + create_output_dir(output_dir) + + export_result = export_evaluation_datasets( + output_dir=output_dir, + dataset_names=["eval_dataset_a", "eval_dataset_c"], + use_threads=False, + mlflow_client=mlflow_context.client_src + ) + + assert export_result is not None + assert export_result["successful_exports"] == 2 + assert export_result["failed_exports"] == 0 + + +def test_export_missing_dataset_error(mlflow_context): + """Test error handling when exporting a non-existent dataset.""" + mlflow.set_tracking_uri(mlflow_context.client_src.tracking_uri) + + output_dir = f"{mlflow_context.output_dir}/eval_dataset_missing" + create_output_dir(output_dir) + + # Try to export a dataset that doesn't exist + result = export_evaluation_dataset( + dataset_name="nonexistent_dataset_12345", + output_dir=output_dir, + mlflow_client=mlflow_context.client_src + ) + + # Should return None on failure + assert result is None + + +def test_import_missing_file_error(mlflow_context): + """Test error handling when importing from a directory without dataset file.""" + mlflow.set_tracking_uri(mlflow_context.client_dst.tracking_uri) + + # Create empty directory + output_dir = f"{mlflow_context.output_dir}/eval_dataset_no_file" + create_output_dir(output_dir) + + # Try to import from directory without evaluation_dataset.json + result = import_evaluation_dataset( + input_dir=output_dir, + mlflow_client=mlflow_context.client_dst + ) + + # Should return None on failure + assert result is None + + +def test_version_incompatibility_warning(mlflow_context): + """Test version compatibility check logs warnings for version differences.""" + import json + import os + import logging + + # This test verifies that the version compatibility check works and logs warnings + # We'll create a mock export file with a different version + output_dir = f"{mlflow_context.output_dir}/eval_dataset_version" + create_output_dir(output_dir) + + # Create a mock exported dataset file with a different major version + dataset_file = os.path.join(output_dir, "evaluation_dataset.json") + mock_data = { + "system": { + "package_version": "1.0.0", + "script": "export_evaluation_dataset.py", + "export_file_version": "2", + "export_time": 1730000000, + "_export_time": "2024-10-27 00:00:00", + "mlflow_version": "99.0.0", # Fake future version to trigger warning + "mlflow_tracking_uri": "http://localhost:5000", + "platform": { + "python_version": "3.11.0", + "system": "Darwin", + "processor": "arm" + }, + "user": "test_user" + }, + "mlflow": { + "evaluation_dataset": { + "dataset_id": "test_id_123", + "name": "test_version_dataset", + "experiment_ids": [], + "tags": {}, + "source": { + "type": "code", + "uri": "test://dataset/test" + }, + "schema": None, + "records": [], # Empty records to avoid API issues + "profile": None, + "create_time": 1730000000000, + "_create_time": "2024-10-27 00:00:00" + } + } + } + + with open(dataset_file, 'w') as f: + json.dump(mock_data, f) + + # Capture log output to verify warnings are logged + import logging + from io import StringIO + log_capture = StringIO() + handler = logging.StreamHandler(log_capture) + handler.setLevel(logging.WARNING) + + logger = logging.getLogger('mlflow_export_import.evaluation_dataset.import_evaluation_dataset') + logger.addHandler(handler) + + # Import - the version check should run and log warnings + mlflow.set_tracking_uri(mlflow_context.client_dst.tracking_uri) + + # The import may fail due to API differences, but the version check should run + try: + result = import_evaluation_dataset( + input_dir=output_dir, + dataset_name="test_version_import", + mlflow_client=mlflow_context.client_dst + ) + except Exception: + pass # Expected to potentially fail due to API differences + + # Verify that version compatibility warnings were logged + log_output = log_capture.getvalue() + logger.removeHandler(handler) + + # Check that the major version difference warning was logged + assert "Major version difference detected" in log_output or "version" in log_output.lower()