diff --git a/.gitignore b/.gitignore index 7efc577..7fde3fa 100644 --- a/.gitignore +++ b/.gitignore @@ -28,19 +28,16 @@ lightning_logs/ # Autogluon AutogluonModels/ -# temo file -temp_* -autopytorch1h_result.json -metrics.tar.gz -thirdparty/* -tsbench-weekend-exp-others.json -tsbench-weekend-exp.json -sourcedir.tar.gz -model.tar.gz -train_leaderboard.csv -configs/benchmark/auto/autogluon_test.yaml -m5 +# temp file +temp* +thirdparty +tsf_data +APT_run +data_loader.py +constant.py +cache + # Submodules -vendor/ \ No newline at end of file +vendor/ diff --git a/Dockerfile_local b/Dockerfile_local new file mode 100644 index 0000000..083c5d8 --- /dev/null +++ b/Dockerfile_local @@ -0,0 +1,19 @@ +FROM python:3.8.12-buster + +# Install R +RUN apt-get update \ + && apt-get install -y r-base \ + && R -e 'install.packages(c("forecast", "nnfor"), repos="https://cloud.r-project.org")' + +# Install project dependencies +RUN pip install poetry==1.1.6 \ + && poetry config virtualenvs.create false +COPY poetry.lock pyproject.toml /dependencies/ +RUN cd /dependencies \ + && poetry install --no-dev --no-root --no-interaction --no-ansi + +# install autogluon with locally code +RUN pip uninstall autogluon -y +COPY thirdparty/autogluon /dependencies/autogluon/ +WORKDIR /dependencies/autogluon/ +RUN ./full_install.sh diff --git a/bin/build-container.sh b/bin/build-container.sh index 5c65d8a..cab79d0 100644 --- a/bin/build-container.sh +++ b/bin/build-container.sh @@ -12,9 +12,23 @@ AWS_REGION=$(aws configure get region) ECR_PASSWORD=$(aws ecr get-login-password) REGISTRY=$AWS_ACCOUNT.dkr.ecr.$AWS_REGION.amazonaws.com +DOCKERFILE_PATH="Dockerfile" +if [ "$1" = "local" ]; +then + echo "Build docker image with local autogluon" + DOCKERFILE_PATH="Dockerfile_local" +else + echo "Build docker image with remote autogluon" +fi + +echo "Building image..." +docker build \ + -t $REGISTRY/tsbench:autogluon \ + -f $DOCKERFILE_PATH . + echo "Logging in to ECR..." echo $ECR_PASSWORD | \ docker login --username AWS --password-stdin $REGISTRY echo "Pushing image..." -docker push $REGISTRY/tsbench:latest +docker push $REGISTRY/tsbench:autogluon diff --git a/bin/setup-ec2.sh b/bin/setup-ec2.sh index 71ff7a8..1a16b69 100644 --- a/bin/setup-ec2.sh +++ b/bin/setup-ec2.sh @@ -4,9 +4,9 @@ set -e #-------------------------------------------------------------------------------------------------- echo "Installing packages..." sudo apt-get update -sudo apt-get install -y make build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev \ +sudo apt-get install -y make cmake build-essential libssl-dev zlib1g-dev libbz2-dev libreadline-dev \ libsqlite3-dev wget curl llvm libncurses5-dev xz-utils tk-dev libxml2-dev libxmlsec1-dev \ - libffi-dev liblzma-dev libcurl4-openssl-dev r-base p7zip-full awscli + libffi-dev liblzma-dev libcurl4-openssl-dev r-base p7zip-full awscli python3-dev #-------------------------------------------------------------------------------------------------- echo "Installing Node.js..." diff --git a/configs/benchmark/autogluon_benchmark/autogluon.yaml b/configs/benchmark/autogluon_benchmark/autogluon.yaml new file mode 100644 index 0000000..405488e --- /dev/null +++ b/configs/benchmark/autogluon_benchmark/autogluon.yaml @@ -0,0 +1,24 @@ +# seeds: [42, 137, 69, 720, 24] +seeds: [42] + +datasets: + - covid_deaths + - m3_quarterly + - hospital + - tourism_quarterly + - m4_hourly + - m3_other + - tourism_monthly + - m4_weekly + - m3_monthly + - nn5 + - electricity + +models: + autogluon: + - key: presets + values: [best_quality] + - key: run_time + values: [86400, 144000] + - key: eval_metric + values: [MASE] # AVAILABLE_METRICS = ["MASE", "MAPE", "sMAPE", "mean_wQuantileLoss", "MSE", "RMSE"] diff --git a/configs/benchmark/autogluon_benchmark/autogluon_runbook.yaml b/configs/benchmark/autogluon_benchmark/autogluon_runbook.yaml new file mode 100644 index 0000000..3ae0945 --- /dev/null +++ b/configs/benchmark/autogluon_benchmark/autogluon_runbook.yaml @@ -0,0 +1,23 @@ +seeds: [42] + +datasets: + - covid_deaths + - m3_quarterly + - hospital + - tourism_quarterly + - m4_hourly + - m3_other + - tourism_monthly + - m4_weekly + - m3_monthly + - nn5 + - electricity + +models: + autogluon: + - key: presets + values: [medium_quality] + - key: run_time + values: [30] + - key: eval_metric + values: [mean_wQuantileLoss] # AVAILABLE_METRICS = ["MASE", "MAPE", "sMAPE", "mean_wQuantileLoss", "MSE", "RMSE"] diff --git a/configs/benchmark/autogluon_benchmark/baseline.yaml b/configs/benchmark/autogluon_benchmark/baseline.yaml new file mode 100644 index 0000000..299a78b --- /dev/null +++ b/configs/benchmark/autogluon_benchmark/baseline.yaml @@ -0,0 +1,30 @@ +seeds: [42, 137, 69, 720, 24] + +datasets: + - covid_deaths + - m3_quarterly + - hospital + - tourism_quarterly + - m4_hourly + - m3_other + - tourism_monthly + - m4_weekly + - m3_monthly + - nn5 + - electricity + +models: + arima: [] + ets: [] + npts: [] + prophet: [] + deepar: + - key: context_length_multiple + values: [1, 2, 4, 8] + - key: [num_layers, num_cells] + values: [[1, 20], [2, 40], [4, 80]] + mqcnn: + - key: context_length_multiple + values: [1, 2, 4, 8] + - key: [num_filters, kernel_size_first, kernel_size_hidden, kernel_size_last] + values: [[20, 3, 3, 2], [30, 7, 3, 3], [40, 14, 7, 3]] diff --git a/pyproject.toml b/pyproject.toml index 03853a0..3426a50 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -10,15 +10,14 @@ version = "1.0.0" [tool.poetry.dependencies] PyYAML = "^5.4.1" -autogluon = "^0.3.1" -catch22 = "^0.2.0" +mxnet = "1.9" +autogluon = "0.5.0" click = "^7.1.2" fastparquet = "^0.6.1" fbprophet = "^0.7.1" -gluonts = {git = "https://github.com/awslabs/gluon-ts.git", rev = "7c94c1149875f6ad2e0d7b0a6bcee952f14d3fb1"} +gluonts = "0.9.5" holidays = "^0.11.1" lightkit = "^0.3.6" -mxnet = "1.8.0.post0" numpy = "^1.21.4" pandas = "^1.2.4" plotly = "^5.3.1" @@ -32,7 +31,7 @@ pytorch-lightning = "^1.5.0" rpy2 = ">=2.9.*,<3.*" sagemaker = "^2.40.0" sagemaker-training = "^3.9.2" -scikit-learn = "^0.24.2" +scikit-learn = "1.0.2" scipy = "^1.6.3" seaborn = "^0.11.2" statsmodels = "^0.13.0" diff --git a/runbook.md b/runbook.md new file mode 100644 index 0000000..c55a9b9 --- /dev/null +++ b/runbook.md @@ -0,0 +1,272 @@ +## Setting up an EC2 instance + +Launch an AWS EC2 instance with Ubuntu 20.04 (this avoids potential troubles) with enough disk storage. +We need at least 500GB to download and upload datasets. + +Create an IAM role, called `SagemakerAdmin`, which has the following policies. +- `AmazonEC2ContainerRegistryFullAccess` +- `AmazonSageMakerFullAccess` +- `AmazonS3FullAccess` + +For the `SagemakerAdmin` role, make sure the Trust relationships is like following +to grant SageMaker principal permissions to assume the role: + +```angular2html +{ + "Version": "2012-10-17", + "Statement": [ + { + "Effect": "Allow", + "Principal": { + "Service": "ec2.amazonaws.com" + }, + "Action": "sts:AssumeRole" + }, + { + "Sid": "", + "Effect": "Allow", + "Principal": { + "Service": "sagemaker.amazonaws.com" + }, + "Action": "sts:AssumeRole" + } + ] +} +``` +Attach the IAM role `SagemakerAdmin` to the instance. One could do so by doing: + +Actions -> Security -> Modify IAM role -> Select `SagemakerAdmin` -> Update IAM role. + + +### Config AWS CLI +Before run schedule, we must config the aws cli (only region is needed, they others can be left empty). + +```bash +aws configure +AWS Access Key ID [None]: +AWS Secret Access Key [None]: +Default region name [None]: us-west-2 +Default output format [None]: json +``` + +## Install +### Clone the package +```bash +git clone https://github.com/Yuang-Deng/tsbench.git +git checkout autogluon_dev +``` + +### Setting up environment + +### Install libraries that are needed for running tsbench +```bash +bash bin/setup-ec2.sh +source $HOME/.poetry/env +``` + +### Install python virtual environment through `poetry` +```bash +poetry install +``` + +After all packages are successfully installed, you can see this line in terminal +```bash +Installing the current project: tsbench (1.0.0) +``` + +To activate the virtual environment in your terminal: + +```bash +poetry shell +``` + +## Prepare the Data + +Before evaluating forecasting methods, you need to prepare the benchmark datasets. +You can run the following commands (assuming that you have executed `poetry shell`): + +```bash +# Download and preprocess all datasets +tsbench datasets download + +# Upload locally available datasets to your S3 bucket +tsbench datasets upload --bucket +``` + +Remember the name of the bucket that you used here. You will need it later! +We don't include Kaggle datasets in this runbook for automation reason. +If needed, please refer to `README.md` for using Kaggle datasets. + +## Prepare AWS Sagemaker + +As training jobs on AWS Sagemaker run in Docker containers, you will need to build your own and +upload it to the ECR registry. For this, you must first create an ECR repository named `tsbench`. +Then, you can build and upload it by using the following utility script (it may take up to 1 hour): + +```bash +bash bin/build-container.sh +``` + +The default docker image tag is `autogluon`. + +### Build docker image with local autogluon +It may happen that you want to test a version of autogluon that has not been merged. +For this, you need to create a folder named as `thirdparty` under project root +directory, then go inside `thridparty` folder and put your version of autogluon there. + +Then build the docker image with `local` option. +```bash +sh bin/build-container.sh local +``` + +## Launch Sagemaker job +```bash +tsbench evaluations schedule \ + --config_path configs/benchmark/autogluon_benchmark/autogluon_runbook.yaml \ + --sagemaker_role \ + --experiment \ + --data_bucket \ + --data_bucket_prefix \ + --output_bucket \ + --output_bucket_prefix \ + --docker_image=tsbench:autogluon \ + --max_runtime=120 +``` + +## Collect the results of sagemaker job and summarize (work in progress) +```bash +tsbench evaluations download \ + --experiment \ + --include_forecasts=False \ + --include_leaderboard=False # only relevant if you want to download leaderboard.csv from autogluon + +tsbench evaluations summarize \ + --experiment \ +``` + +# Other things might be helpful +This section includes things that are not generally needed for running the benchmarmks. +It mostly contains convenient note when development the package. +## For launching without command line options + +```python +{ + // Use IntelliSense to learn about possible attributes. + // Hover to view descriptions of existing attributes. + // For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387 + "version": "0.2.0", + "configurations": [ + + { + "name": "Python: schedule", + "type": "python", + "request": "launch", + "program": "./src/cli/evaluations/schedule.py", + "console": "integratedTerminal", + "justMyCode": false, + "args": [ + "--config_path=./configs/benchmark/auto/tsbench_seed.yaml", + "--sagemaker_role=AmazonSageMaker-ExecutionRole-20210222T141759", + "--experiment=tsbench-random-seed-exp3", + "--data_bucket=yuangbucket/tsbench", + "--data_bucket_prefix=data", + "--output_bucket=yuangbucket/tsbench", + "--output_bucket_prefix=evaluations", + "--docker_image=tsbench-autogluon:jun23_1", + "--max_runtime=120", + "--nskip=1" + ] + }, + { + "name": "Python: schedule test", + "type": "python", + "request": "launch", + "program": "./src/cli/evaluations/schedule.py", + "console": "integratedTerminal", + "justMyCode": false, + "args": [ + "--config_path=./configs/benchmark/auto/autogluon_test.yaml", + "--sagemaker_role=AmazonSageMaker-ExecutionRole-20210222T141759", + "--experiment=tsbench-codelocation-test", + "--data_bucket=yuangbucket/tsbench", + "--data_bucket_prefix=data", + "--output_bucket=yuangbucket/tsbench", + "--output_bucket_prefix=evaluations", + "--docker_image=tsbench-autogluon:jun22", + "--max_runtime=120" + ] + }, + { + "name": "Python: evaluate", + "type": "python", + "request": "launch", + "program": "./src/evaluate.py", + "console": "integratedTerminal", + "justMyCode": false, + "args": [ + "--dataset=solar", + // "--dataset=hospital", + "--model=autogluon", + "--autogluon_presets=good_quality", + "--autogluon_run_time=60" + ] + }, + { + "name": "Python: downlooad metrics", + "type": "python", + "request": "launch", + "program": "./src/cli/evaluations/download.py", + "console": "integratedTerminal", + "justMyCode": false, + "args": [ + "--experiment=tsbench-random-seed-exp3", + // "--experiment=tsbench-leaderboard-test", + "--include_forecasts=False", + "--include_leaderboard=False", + "--format=True", + ] + }, + { + "name": "Python: visualization", + "type": "python", + "request": "launch", + "program": "./src/cli/evaluations/result_visualization.py", + "console": "integratedTerminal", + "justMyCode": false, + "args": [ + "--experiment=tsbench-weekend-exp", + ] + }, + { + "name": "Python: dataset download", + "type": "python", + "request": "launch", + "program": "./src/cli/datasets/download_s3.py", + "console": "integratedTerminal", + "justMyCode": false, + "args": [ + // "--bucket=yuangbucket/tsbench", + ] + }, + ] +} +``` + + +# Integrate Auto-PyTorch +```bash +cd thirdparty +git clone git clone git@github.com:dengdifan/Auto-PyTorch.git +cd Auto-PyTorch +pip install -e . +cd .. +git clone https://github.com/dengdifan/ConfigSpace.git +cd ConfigSpace +pip install . +cd thirdparty/Auto-PyTorch/autoPyTorch +rm -rf automl_common +git clone git@github.com:automl/automl_common.git +cd automl_common +pip install -e . +pip install pytorch_forecasting +``` \ No newline at end of file diff --git a/src/cli/datasets/__init__.py b/src/cli/datasets/__init__.py index 055195b..1341242 100644 --- a/src/cli/datasets/__init__.py +++ b/src/cli/datasets/__init__.py @@ -12,7 +12,7 @@ # permissions and limitations under the License. from ._main import datasets -from .compute_catch22 import compute_catch22 # type: ignore +# from .compute_catch22 import compute_catch22 # type: ignore from .compute_stats import compute_stats # type: ignore from .download import download # type: ignore from .upload import upload # type: ignore diff --git a/src/cli/datasets/compute_catch22.py b/src/cli/datasets/compute_catch22.py index 51cadef..0c6a6fe 100644 --- a/src/cli/datasets/compute_catch22.py +++ b/src/cli/datasets/compute_catch22.py @@ -1,94 +1,94 @@ -# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. -# -# Licensed under the Apache License, Version 2.0 (the "License"). -# You may not use this file except in compliance with the License. -# A copy of the License is located at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# or in the "license" file accompanying this file. This file is distributed -# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either -# express or implied. See the License for the specific language governing -# permissions and limitations under the License. +# # Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# # +# # Licensed under the Apache License, Version 2.0 (the "License"). +# # You may not use this file except in compliance with the License. +# # A copy of the License is located at +# # +# # http://www.apache.org/licenses/LICENSE-2.0 +# # +# # or in the "license" file accompanying this file. This file is distributed +# # on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# # express or implied. See the License for the specific language governing +# # permissions and limitations under the License. -import os -from pathlib import Path -from typing import Any, Dict, Optional -import catch22 -import click -import pandas as pd -from tqdm.auto import tqdm -from tqdm.contrib.concurrent import process_map -from tsbench.config import DATASET_REGISTRY -from tsbench.constants import DEFAULT_DATA_CATCH22_PATH, DEFAULT_DATA_PATH -from ._main import datasets +# import os +# from pathlib import Path +# from typing import Any, Dict, Optional +# import catch22 +# import click +# import pandas as pd +# from tqdm.auto import tqdm +# from tqdm.contrib.concurrent import process_map +# from tsbench.config import DATASET_REGISTRY +# from tsbench.constants import DEFAULT_DATA_CATCH22_PATH, DEFAULT_DATA_PATH +# from ._main import datasets -@datasets.command(short_help="Compute catch22 features.") -@click.option( - "--dataset", - type=str, - default=None, - help=( - "The dataset to compute the catch22 features for. " - "If not provided, computes features for all datasets." - ), -) -@click.option( - "--data_path", - type=click.Path(exists=True), - default=DEFAULT_DATA_PATH, - show_default=True, - help="The local path where datasets are stored.", -) -@click.option( - "--output_path", - type=click.Path(), - default=DEFAULT_DATA_CATCH22_PATH, - show_default=True, - help=( - "The path where catch22 features are written to. " - "Features are written as `.parquet` files." - ), -) -def compute_catch22(dataset: Optional[str], data_path: str, output_path: str): - """ - Computes the catch22 features for each time series in a dataset. +# @datasets.command(short_help="Compute catch22 features.") +# @click.option( +# "--dataset", +# type=str, +# default=None, +# help=( +# "The dataset to compute the catch22 features for. " +# "If not provided, computes features for all datasets." +# ), +# ) +# @click.option( +# "--data_path", +# type=click.Path(exists=True), +# default=DEFAULT_DATA_PATH, +# show_default=True, +# help="The local path where datasets are stored.", +# ) +# @click.option( +# "--output_path", +# type=click.Path(), +# default=DEFAULT_DATA_CATCH22_PATH, +# show_default=True, +# help=( +# "The path where catch22 features are written to. " +# "Features are written as `.parquet` files." +# ), +# ) +# def compute_catch22(dataset: Optional[str], data_path: str, output_path: str): +# """ +# Computes the catch22 features for each time series in a dataset. - Computations are either run for a single dataset or all datasets in the - registry. - """ - target = Path(data_path) - target.mkdir(parents=True, exist_ok=True) +# Computations are either run for a single dataset or all datasets in the +# registry. +# """ +# target = Path(data_path) +# target.mkdir(parents=True, exist_ok=True) - if dataset is None: - dataset_names = [(k, v(target)) for k, v in DATASET_REGISTRY.items()] - else: - dataset_names = [(dataset, DATASET_REGISTRY[dataset](target))] +# if dataset is None: +# dataset_names = [(k, v(target)) for k, v in DATASET_REGISTRY.items()] +# else: +# dataset_names = [(dataset, DATASET_REGISTRY[dataset](target))] - directory = Path(output_path) - directory.mkdir(parents=True, exist_ok=True) +# directory = Path(output_path) +# directory.mkdir(parents=True, exist_ok=True) - for dataset_name, config in tqdm( - dataset_names, disable=len(dataset_names) == 1 - ): - file = directory / f"{dataset_name}.parquet" - if file.exists(): - continue +# for dataset_name, config in tqdm( +# dataset_names, disable=len(dataset_names) == 1 +# ): +# file = directory / f"{dataset_name}.parquet" +# if file.exists(): +# continue - ts_features = process_map( - _get_features, - config.data.train( - val=False - ).gluonts(), # Get features on train set - max_workers=os.cpu_count(), - desc=dataset_name, - chunksize=1, - ) - df = pd.DataFrame(ts_features) - df.to_parquet(file) # type: ignore +# ts_features = process_map( +# _get_features, +# config.data.train( +# val=False +# ).gluonts(), # Get features on train set +# max_workers=os.cpu_count(), +# desc=dataset_name, +# chunksize=1, +# ) +# df = pd.DataFrame(ts_features) +# df.to_parquet(file) # type: ignore -def _get_features(ts: Dict[str, Any]) -> Dict[str, Any]: - features = catch22.catch22_all(ts["target"]) - return dict(zip(features["names"], features["values"])) +# def _get_features(ts: Dict[str, Any]) -> Dict[str, Any]: +# features = catch22.catch22_all(ts["target"]) +# return dict(zip(features["names"], features["values"])) diff --git a/src/cli/datasets/upload.py b/src/cli/datasets/upload.py index 4d2f164..d446def 100644 --- a/src/cli/datasets/upload.py +++ b/src/cli/datasets/upload.py @@ -47,5 +47,5 @@ def upload(path: str, bucket: str, prefix: str): s3 = default_session().client("s3") for config in tqdm(DATASET_REGISTRY.values()): upload_directory( - s3, path / config.name(), bucket, f"{prefix}/{config.name()}" + s3, path + '/' + config.name(), bucket, f"{prefix}/{config.name()}" ) diff --git a/src/cli/evaluations/__init__.py b/src/cli/evaluations/__init__.py index c2da09e..74471cf 100644 --- a/src/cli/evaluations/__init__.py +++ b/src/cli/evaluations/__init__.py @@ -15,5 +15,6 @@ from .archive import archive # type: ignore from .download import download # type: ignore from .schedule import schedule # type: ignore +from .summarize import summarize # type: ignore __all__ = ["evaluations"] diff --git a/src/cli/evaluations/download.py b/src/cli/evaluations/download.py index 030230b..74b1852 100644 --- a/src/cli/evaluations/download.py +++ b/src/cli/evaluations/download.py @@ -26,8 +26,7 @@ from tsbench.evaluations import aws from tsbench.evaluations.aws import default_session from tsbench.evaluations.tracking.job import Job, load_jobs_from_analysis -from ._main import evaluations - +from cli.evaluations._main import evaluations @evaluations.command(short_help="Download evaluations to your file system.") @click.option( @@ -49,6 +48,15 @@ "only the training, validation and testing metrics." ), ) +@click.option( + "--include_leaderboard", + type=bool, + default=False, + help=( + "Whether to download leaderboard, just usefull for" + "models that store the leaderboard." + ), +) @click.option( "--evaluations_path", type=click.Path(), @@ -57,7 +65,7 @@ help="The path to which to download the evaluations to.", ) def download( - experiment: Optional[str], include_forecasts: bool, evaluations_path: str + experiment: Optional[str], include_forecasts: bool, include_leaderboard: bool, evaluations_path: str ): """ Downloads either the evaluations of a single AWS Sagemaker experiment or @@ -66,6 +74,7 @@ def download( The evaluations are downloaded to the provided directory. """ target = Path(evaluations_path) + target = Path.joinpath(target, experiment) target.mkdir(parents=True, exist_ok=True) if experiment is None: @@ -73,17 +82,27 @@ def download( _download_public_evaluations( include_forecasts=include_forecasts, evaluations_path=target ) + other_jobs = [] else: print(f"Downloading data from experiment '{experiment}'...") analysis = aws.Analysis(experiment) + other_jobs = analysis.other_jobs process_map( partial( - _move_job, target=target, include_forecasts=include_forecasts + _move_job, target=target, include_forecasts=include_forecasts, include_leaderboard=include_leaderboard ), load_jobs_from_analysis(analysis), chunksize=1, ) - + # abnormal results + abnormal_results = [] + if len(other_jobs) > 0: + for job in other_jobs: + res = {} + res.update(job.hyperparameters) + res['status'] = job.status + abnormal_results.append(res) + print(res['model'], ' \t', res['dataset'], ' \t', res['status']) def _download_public_evaluations( include_forecasts: bool, evaluations_path: Path @@ -154,5 +173,5 @@ def _extract_object_names(response: Dict[str, Any]) -> List[str]: ] -def _move_job(job: Job, target: Path, include_forecasts: bool): - job.save(target, include_forecasts=include_forecasts) +def _move_job(job: Job, target: Path, include_forecasts: bool, include_leaderboard: bool): + job.save(target, include_forecasts=include_forecasts, include_leaderboard=include_leaderboard) diff --git a/src/cli/evaluations/schedule.py b/src/cli/evaluations/schedule.py index a49864c..100e64d 100644 --- a/src/cli/evaluations/schedule.py +++ b/src/cli/evaluations/schedule.py @@ -169,11 +169,15 @@ def job_factory() -> str: all_configurations = generate_configurations(Path(config_path)) # Then, we can run the training, passing parameters as required + source_bucket_prefix = "source" for configuration in iterate_configurations(all_configurations, nskip): # Create the estimator estimator = CustomFramework( sagemaker_session=sm_session, role=sagemaker_role, + code_location=( + f"s3://{output_bucket}/{source_bucket_prefix}/{experiment}" + ), tags=[ {"Key": "Experiment", "Value": experiment}, ], @@ -204,6 +208,9 @@ def job_factory() -> str: }, ) + estimator.framework_version = '0.0.1', + estimator.py_version = '3.8' + while True: # Try fitting the estimator try: @@ -224,4 +231,4 @@ def job_factory() -> str: print(f">>> Launched job: {estimator.latest_training_job.name}") # type: ignore - print(">>> Successfully scheduled all training jobs.") + print(">>> Successfully scheduled all training jobs.") \ No newline at end of file diff --git a/src/cli/evaluations/summarize.py b/src/cli/evaluations/summarize.py new file mode 100644 index 0000000..4d62d18 --- /dev/null +++ b/src/cli/evaluations/summarize.py @@ -0,0 +1,99 @@ +# Copyright 2018 Amazon.com, Inc. or its affiliates. All Rights Reserved. +# +# Licensed under the Apache License, Version 2.0 (the "License"). +# You may not use this file except in compliance with the License. +# A copy of the License is located at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# or in the "license" file accompanying this file. This file is distributed +# on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either +# express or implied. See the License for the specific language governing +# permissions and limitations under the License. + +import json +from pathlib import Path +import click +import os +import pandas as pd +from tsbench.constants import DEFAULT_EVALUATIONS_PATH +from cli.evaluations._main import evaluations + +BASELINES = ["arima", "ets", "prophet", "mqcnn"] + +METRICS = ["mase", "smape", "nrmse", "nd", "ncrps"] + +DATASETS = ["m3_yearly", "m3_quarterly", "m3_monthly", "m3_other", "m4_quarterly", "m4_monthly", + "m4_weekly", "m4_daily", "m4_hourly", "m4_yearly", "tourism_quarterly", "tourism_monthly", + "dominick", "weather", "hospital", "covid_deaths", "electricity", "kdd_2018", "nn5", "rossmann", "solar", "taxi", "wiki"] + +@evaluations.command( + short_help="Visulize results as a table." +) +@click.option( + "--evaluations_path", + type=click.Path(exists=True), + default=DEFAULT_EVALUATIONS_PATH, + help="The directory where TSBench evaluations are stored.", +) +@click.option( + "--experiment", + type=click.Path(), + required=True, + help="The experiment name which you want to visualization.", +) +@click.option( + "--metric", + type=click.Path(), + default='ncrps', + help="The metric score shown in table.", +) +def summarize(evaluations_path: Path, experiment: str, metric:str): + results = [] + source = Path(evaluations_path) + source = Path.joinpath(source, experiment) + if Path.joinpath(source, experiment + '.csv').exists(): + print("load from csv file:", Path.joinpath(source, experiment + '.csv')) + res_df = pd.read_csv(Path.joinpath(source, experiment + '.csv')) + index_models = list(set(res_df.model.values.tolist())) + else: + models = os.listdir(source) + autogluon_models = set() + for model in models: + model_dir = Path.joinpath(source, model) + if Path.is_file(model_dir): + continue + datasets = os.listdir(model_dir) + for ds in datasets: + # TODO collect dataset we need, try collect all dataset but just print we need + if ds not in DATASETS: + continue + ds_dir = Path.joinpath(model_dir, ds) + hyperparameters = os.listdir(ds_dir) + for hp in hyperparameters: + hp_dir = Path.joinpath(ds_dir, hp) + config = json.load(open(Path.joinpath(hp_dir, 'config.json'), 'r')) + performance = json.load(open(Path.joinpath(hp_dir, 'performance.json'), 'r')) + n = len(performance['performances']) + res = {} + if model == 'autogluon': + autogluom_model = model + '-' + config['hyperparameters']['presets'] + '-' + str(config['hyperparameters']['run_time']) + autogluon_models.add(autogluom_model) + res['model'] = autogluom_model + else: + res['model'] = model + res['dataset'] = ds + res.update(performance['performances'][-1]['testing']) + val_loss = performance['performances'][n-1]['evaluation']['val_loss'] if 'evaluation' in performance['performances'][n-1] else -1 + res['val_loss'] = val_loss + res['seed'] = config['seed'] + res['hps'] = hp + results.append(res) + + index_models = BASELINES + list(autogluon_models) + res_df = pd.DataFrame(results) + res_df.to_csv(Path.joinpath(source, experiment + '.csv')) + print('results has been saved at:', Path.joinpath(source, experiment + '.csv')) + + res_df = res_df.loc[res_df.groupby(['dataset', 'model', 'seed']).val_loss.idxmin()] + print(res_df.pivot_table(index='dataset', columns='model', values=metric).reindex(index_models, axis=1)) \ No newline at end of file diff --git a/src/evaluate.py b/src/evaluate.py index b2e7089..0072c04 100644 --- a/src/evaluate.py +++ b/src/evaluate.py @@ -103,6 +103,9 @@ @click.option("--tft_num_heads", default=4, show_default=True) @click.option("--nbeats_num_stacks", default=30, show_default=True) @click.option("--nbeats_num_blocks", default=1, show_default=True) +@click.option("--autogluon_presets", default=None, show_default=True) +@click.option("--autogluon_run_time", default=1*60*60*10, show_default=True) +@click.option("--autogluon_eval_metric", default='mean_wQuantileLoss', show_default=True) def main( dataset: str, model: str, @@ -126,7 +129,7 @@ def main( """ # Basic configuration env.use_tqdm = use_tqdm - logging.basicConfig(level=logging.INFO) + logging.getLogger().setLevel(logging.INFO) # Setup model_dir = Path(model_path) diff --git a/src/tsbench/config/dataset/__init__.py b/src/tsbench/config/dataset/__init__.py index 9be961b..b8f317f 100644 --- a/src/tsbench/config/dataset/__init__.py +++ b/src/tsbench/config/dataset/__init__.py @@ -12,7 +12,7 @@ # permissions and limitations under the License. from ._base import DatasetConfig, DatasetSplit, EvaluationDataset -from ._factory import DATASET_REGISTRY, get_dataset_config +from ._factory import DATASET_REGISTRY, get_dataset_config, construct_pandas_frame_from_iterable_dataset from .datasets import * __all__ = [ @@ -21,4 +21,5 @@ "EvaluationDataset", "DATASET_REGISTRY", "get_dataset_config", + "construct_pandas_frame_from_iterable_dataset", ] diff --git a/src/tsbench/config/dataset/_base.py b/src/tsbench/config/dataset/_base.py index ec8d2d1..cfd3554 100644 --- a/src/tsbench/config/dataset/_base.py +++ b/src/tsbench/config/dataset/_base.py @@ -187,6 +187,9 @@ def gluonts(self) -> Dataset: self._directory / "gluonts" / self._split, freq=self._metadata.freq ) + def pandas(self) -> Dataset: + pass + def evaluation(self) -> EvaluationDataset: """ Returns the NumPy arrays that are used to perform evaluation. diff --git a/src/tsbench/config/dataset/_factory.py b/src/tsbench/config/dataset/_factory.py index 2f4fb1c..4f33d13 100644 --- a/src/tsbench/config/dataset/_factory.py +++ b/src/tsbench/config/dataset/_factory.py @@ -11,6 +11,10 @@ # express or implied. See the License for the specific language governing # permissions and limitations under the License. +from collections.abc import Iterable +import itertools + +import pandas as pd from pathlib import Path from typing import Dict, Type, TypeVar, Union from tsbench.constants import DEFAULT_DATA_PATH @@ -18,6 +22,9 @@ DATASET_REGISTRY: Dict[str, Type[DatasetConfig]] = {} +ITEMID = "item_id" +TIMESTAMP = "timestamp" + D = TypeVar("D", bound=Type[DatasetConfig]) @@ -47,3 +54,50 @@ def get_dataset_config( # Get the dataset assert name in DATASET_REGISTRY, f"Dataset name '{name}' is unknown." return DATASET_REGISTRY[name](Path(path)) + +def construct_pandas_frame_from_iterable_dataset( + iterable_dataset: Iterable +) -> pd.DataFrame: + _validate_iterable(iterable_dataset) + + all_ts = [] + id_set = set() + for i, ts in enumerate(iterable_dataset): + # print(ts['item_id']) + id_set.add(ts['item_id']) + start_timestamp = ts["start"] + target = ts["target"] + datetime_index = tuple( + pd.date_range( + start_timestamp, periods=len(target), freq=start_timestamp.freq + ) + ) + idx = pd.MultiIndex.from_product( + [(i,), datetime_index], names=[ITEMID, TIMESTAMP] + ) + ts_df = pd.Series(target, name="target", index=idx).to_frame() + all_ts.append(ts_df) + print(len(id_set)) + return pd.concat(all_ts) + +def _validate_iterable(data: Iterable): + if not isinstance(data, Iterable): + raise ValueError("data must be of type Iterable.") + + first = next(iter(data), None) + if first is None: + raise ValueError("data has no time-series.") + + for i, ts in enumerate(itertools.chain([first], data)): + if not isinstance(ts, dict): + raise ValueError( + f"{i}'th time-series in data must be a dict, got{type(ts)}" + ) + if not ("target" in ts and "start" in ts): + raise ValueError( + f"{i}'th time-series in data must have 'target' and 'start', got{ts.keys()}" + ) + if not isinstance(ts["start"], pd.Timestamp) or ts["start"].freq is None: + raise ValueError( + f"{i}'th time-series must have timestamp as 'start' with freq specified, got {ts['start']}" + ) diff --git a/src/tsbench/config/dataset/datasets.py b/src/tsbench/config/dataset/datasets.py index 9283e7c..395527c 100644 --- a/src/tsbench/config/dataset/datasets.py +++ b/src/tsbench/config/dataset/datasets.py @@ -312,27 +312,27 @@ def has_time_features(self) -> bool: return False -@register_dataset -@dataclass(frozen=True) -class M5DatasetConfig(GluonTsDatasetConfig): - """ - The dataset configuration for the `m5` dataset. - """ +# @register_dataset +# @dataclass(frozen=True) +# class M5DatasetConfig(GluonTsDatasetConfig): +# """ +# The dataset configuration for the `m5` dataset. +# """ - @classmethod - def name(cls) -> str: - return "m5" +# @classmethod +# def name(cls) -> str: +# return "m5" - @property - def max_training_time(self) -> int: - return 28800 +# @property +# def max_training_time(self) -> int: +# return 28800 - def _materialize(self, directory: Path, regenerate: bool = False) -> None: - shutil.copytree( - Path.home() / ".mxnet" / "gluon-ts" / "datasets" / "m5", - directory / "m5", - ) - super()._materialize(directory, regenerate=True) +# def _materialize(self, directory: Path, regenerate: bool = False) -> None: +# shutil.copytree( +# Path.home() / ".mxnet" / "gluon-ts" / "datasets" / "m5", +# directory / "m5", +# ) +# super()._materialize(directory, regenerate=True) @register_dataset @@ -984,261 +984,261 @@ def _prediction_length(self) -> int: return 18 -@register_dataset -@dataclass(frozen=True) -class RossmannDatasetConfig(KaggleDatasetConfig): - """ - The dataset configuration for the "Rossmann Store Sales" Kaggle - competition. - """ - - @classmethod - def name(cls) -> str: - return "rossmann" - - @property - def max_training_time(self) -> int: - return 7200 - - @property - def _link(self) -> str: - return "https://www.kaggle.com/c/rossmann-store-sales" - - def _extract_data( - self, path: Path - ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: - # Read the raw data - data = cast(pd.DataFrame, pd.read_csv(path / "train.csv")) - stores = cast(pd.DataFrame, pd.read_csv(path / "store.csv")) - - # Generate GluonTS dataset - metadata = { - "freq": "D", - "prediction_length": 48, - "feat_static_cat": [ - { - "name": "store", - "cardinality": len(stores), - }, - ], - } - - series = [] - for i, store_data in data.groupby("Store"): - sorted_data = store_data.sort_values("Date") - series.append( - { - "item_id": int(i) - 1, - "start": sorted_data.Date.min(), - "target": sorted_data.Sales.to_list(), - "feat_static_cat": [ - int(i) - 1, - ], - } - ) - - return metadata, series - - -@register_dataset -@dataclass(frozen=True) -class CorporacionFavoritaDatasetConfig(KaggleDatasetConfig): - """ - The dataset configuration for the "Corporación Favorita" Kaggle - competition. - """ - - @classmethod - def name(cls) -> str: - return "corporacion_favorita" - - @property - def max_training_time(self) -> int: - return 28800 - - @property - def _link(self) -> str: - return "https://www.kaggle.com/c/favorita-grocery-sales-forecasting" - - def _extract_data( - self, path: Path - ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: - # Read the raw data - data = cast(pd.DataFrame, pd.read_csv(path / "train.csv")) - stores = cast(pd.DataFrame, pd.read_csv(path / "stores.csv")) - item_ids = np.sort(data.item_nbr.unique()) - - # Generate GluonTS dataset - metadata = { - "freq": "D", - "prediction_length": 16, - "feat_static_cat": [ - { - "name": "store", - "cardinality": len(stores), - }, - { - "name": "item", - "cardinality": len(item_ids), - }, - ], - } - - series = [] - for i, ((item, store_id), group_data) in enumerate( - data.groupby(["item_nbr", "store_nbr"]) - ): - item_id = np.where(item_ids == item)[0][0] - sorted_data = group_data.sort_values("date") - sales = pd.Series( - sorted_data.unit_sales.to_numpy(), - index=pd.DatetimeIndex(sorted_data.date), - ) - series.append( - { - "item_id": i, - "start": sorted_data.date.min(), - "target": sales.resample("D") - .first() - .fillna(value=0) - .to_list(), - "feat_static_cat": [ - int(store_id) - 1, - int(item_id), - ], - } - ) - - return metadata, series - - -@register_dataset -@dataclass(frozen=True) -class WalmartDatasetConfig(KaggleDatasetConfig): - """ - The dataset configuration for the "Walmart Recruiting" Kaggle competition. - """ - - @classmethod - def name(cls) -> str: - return "walmart" - - @property - def max_training_time(self) -> int: - return 7200 - - @property - def _link(self) -> str: - return "https://www.kaggle.com/c/walmart-recruiting-store-sales-forecasting" - - def _extract_data( - self, path: Path - ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: - # Read the raw data - data = cast(pd.DataFrame, pd.read_csv(path / "train.csv")) - department_ids = np.sort(data.Dept.unique()) - - # Generate GluonTS dataset - metadata = { - "freq": "W", - "prediction_length": 39, - "feat_static_cat": [ - { - "name": "store", - "cardinality": len( - data.Store.unique() - ), # pylint: disable=no-member - }, - { - "name": "department", - "cardinality": len(department_ids), - }, - ], - } - - series = [] - # pylint: disable=no-member - for i, ((store_id, department), group_data) in enumerate( - data.groupby(["Store", "Dept"]) - ): - department_id = np.where(department_ids == department)[0][0] - sorted_data = group_data.sort_values("Date") - series.append( - { - "item_id": i, - "start": sorted_data.Date.min(), - "target": sorted_data.Weekly_Sales.to_list(), - "feat_static_cat": [ - int(store_id) - 1, - int(department_id), - ], - } - ) - - return metadata, series - - -@register_dataset -@dataclass(frozen=True) -class RestaurantDatasetConfig(KaggleDatasetConfig): - """ - The dataset configuration for the "Restaurant" Kaggle competition. - """ - - @classmethod - def name(cls) -> str: - return "restaurant" - - @property - def max_training_time(self) -> int: - return 7200 - - @property - def _link(self) -> str: - return ( - "https://www.kaggle.com/c/recruit-restaurant-visitor-forecasting" - ) - - def _extract_data( - self, path: Path - ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: - # Read the raw data - data = cast(pd.DataFrame, pd.read_csv(path / "air_visit_data.csv")) - store_ids = np.sort(data.air_store_id.unique()) - - # Generate GluonTS dataset - metadata = { - "freq": "D", - "prediction_length": 39, - "feat_static_cat": [ - { - "name": "restaurant", - "cardinality": len(store_ids), - }, - ], - } - - series = [] - # pylint: disable=no-member - for i, (store, group_data) in enumerate(data.groupby("air_store_id")): - store_id = np.where(store_ids == store)[0][0] - sorted_data = group_data.sort_values("visit_date") - visitors = pd.Series( - sorted_data.visitors.to_numpy(), - index=pd.DatetimeIndex(sorted_data.visit_date), - ) - series.append( - { - "item_id": i, - "start": sorted_data.visit_date.min(), - "target": visitors.resample("D") - .first() - .fillna(value=0) - .to_list(), - "feat_static_cat": [ - int(store_id), - ], - } - ) - - return metadata, series +# @register_dataset +# @dataclass(frozen=True) +# class RossmannDatasetConfig(KaggleDatasetConfig): +# """ +# The dataset configuration for the "Rossmann Store Sales" Kaggle +# competition. +# """ +# +# @classmethod +# def name(cls) -> str: +# return "rossmann" +# +# @property +# def max_training_time(self) -> int: +# return 7200 +# +# @property +# def _link(self) -> str: +# return "https://www.kaggle.com/c/rossmann-store-sales" +# +# def _extract_data( +# self, path: Path +# ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: +# # Read the raw data +# data = cast(pd.DataFrame, pd.read_csv(path / "train.csv")) +# stores = cast(pd.DataFrame, pd.read_csv(path / "store.csv")) +# +# # Generate GluonTS dataset +# metadata = { +# "freq": "D", +# "prediction_length": 48, +# "feat_static_cat": [ +# { +# "name": "store", +# "cardinality": len(stores), +# }, +# ], +# } +# +# series = [] +# for i, store_data in data.groupby("Store"): +# sorted_data = store_data.sort_values("Date") +# series.append( +# { +# "item_id": int(i) - 1, +# "start": sorted_data.Date.min(), +# "target": sorted_data.Sales.to_list(), +# "feat_static_cat": [ +# int(i) - 1, +# ], +# } +# ) +# +# return metadata, series +# +# +# @register_dataset +# @dataclass(frozen=True) +# class CorporacionFavoritaDatasetConfig(KaggleDatasetConfig): +# """ +# The dataset configuration for the "Corporación Favorita" Kaggle +# competition. +# """ +# +# @classmethod +# def name(cls) -> str: +# return "corporacion_favorita" +# +# @property +# def max_training_time(self) -> int: +# return 28800 +# +# @property +# def _link(self) -> str: +# return "https://www.kaggle.com/c/favorita-grocery-sales-forecasting" +# +# def _extract_data( +# self, path: Path +# ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: +# # Read the raw data +# data = cast(pd.DataFrame, pd.read_csv(path / "train.csv")) +# stores = cast(pd.DataFrame, pd.read_csv(path / "stores.csv")) +# item_ids = np.sort(data.item_nbr.unique()) +# +# # Generate GluonTS dataset +# metadata = { +# "freq": "D", +# "prediction_length": 16, +# "feat_static_cat": [ +# { +# "name": "store", +# "cardinality": len(stores), +# }, +# { +# "name": "item", +# "cardinality": len(item_ids), +# }, +# ], +# } +# +# series = [] +# for i, ((item, store_id), group_data) in enumerate( +# data.groupby(["item_nbr", "store_nbr"]) +# ): +# item_id = np.where(item_ids == item)[0][0] +# sorted_data = group_data.sort_values("date") +# sales = pd.Series( +# sorted_data.unit_sales.to_numpy(), +# index=pd.DatetimeIndex(sorted_data.date), +# ) +# series.append( +# { +# "item_id": i, +# "start": sorted_data.date.min(), +# "target": sales.resample("D") +# .first() +# .fillna(value=0) +# .to_list(), +# "feat_static_cat": [ +# int(store_id) - 1, +# int(item_id), +# ], +# } +# ) +# +# return metadata, series +# +# +# @register_dataset +# @dataclass(frozen=True) +# class WalmartDatasetConfig(KaggleDatasetConfig): +# """ +# The dataset configuration for the "Walmart Recruiting" Kaggle competition. +# """ +# +# @classmethod +# def name(cls) -> str: +# return "walmart" +# +# @property +# def max_training_time(self) -> int: +# return 7200 +# +# @property +# def _link(self) -> str: +# return "https://www.kaggle.com/c/walmart-recruiting-store-sales-forecasting" +# +# def _extract_data( +# self, path: Path +# ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: +# # Read the raw data +# data = cast(pd.DataFrame, pd.read_csv(path / "train.csv")) +# department_ids = np.sort(data.Dept.unique()) +# +# # Generate GluonTS dataset +# metadata = { +# "freq": "W", +# "prediction_length": 39, +# "feat_static_cat": [ +# { +# "name": "store", +# "cardinality": len( +# data.Store.unique() +# ), # pylint: disable=no-member +# }, +# { +# "name": "department", +# "cardinality": len(department_ids), +# }, +# ], +# } +# +# series = [] +# # pylint: disable=no-member +# for i, ((store_id, department), group_data) in enumerate( +# data.groupby(["Store", "Dept"]) +# ): +# department_id = np.where(department_ids == department)[0][0] +# sorted_data = group_data.sort_values("Date") +# series.append( +# { +# "item_id": i, +# "start": sorted_data.Date.min(), +# "target": sorted_data.Weekly_Sales.to_list(), +# "feat_static_cat": [ +# int(store_id) - 1, +# int(department_id), +# ], +# } +# ) +# +# return metadata, series +# +# +# @register_dataset +# @dataclass(frozen=True) +# class RestaurantDatasetConfig(KaggleDatasetConfig): +# """ +# The dataset configuration for the "Restaurant" Kaggle competition. +# """ +# +# @classmethod +# def name(cls) -> str: +# return "restaurant" +# +# @property +# def max_training_time(self) -> int: +# return 7200 +# +# @property +# def _link(self) -> str: +# return ( +# "https://www.kaggle.com/c/recruit-restaurant-visitor-forecasting" +# ) +# +# def _extract_data( +# self, path: Path +# ) -> Tuple[Dict[str, Any], List[Dict[str, Any]]]: +# # Read the raw data +# data = cast(pd.DataFrame, pd.read_csv(path / "air_visit_data.csv")) +# store_ids = np.sort(data.air_store_id.unique()) +# +# # Generate GluonTS dataset +# metadata = { +# "freq": "D", +# "prediction_length": 39, +# "feat_static_cat": [ +# { +# "name": "restaurant", +# "cardinality": len(store_ids), +# }, +# ], +# } +# +# series = [] +# # pylint: disable=no-member +# for i, (store, group_data) in enumerate(data.groupby("air_store_id")): +# store_id = np.where(store_ids == store)[0][0] +# sorted_data = group_data.sort_values("visit_date") +# visitors = pd.Series( +# sorted_data.visitors.to_numpy(), +# index=pd.DatetimeIndex(sorted_data.visit_date), +# ) +# series.append( +# { +# "item_id": i, +# "start": sorted_data.visit_date.min(), +# "target": visitors.resample("D") +# .first() +# .fillna(value=0) +# .to_list(), +# "feat_static_cat": [ +# int(store_id), +# ], +# } +# ) +# +# return metadata, series diff --git a/src/tsbench/config/model/__init__.py b/src/tsbench/config/model/__init__.py index 5f5da9b..738a7d0 100644 --- a/src/tsbench/config/model/__init__.py +++ b/src/tsbench/config/model/__init__.py @@ -13,6 +13,7 @@ from ._base import ModelConfig, TrainConfig from ._factory import get_model_config, MODEL_REGISTRY +from .wrapper import * from .models import * __all__ = [ diff --git a/src/tsbench/config/model/models.py b/src/tsbench/config/model/models.py index 3c12310..e4a4b89 100644 --- a/src/tsbench/config/model/models.py +++ b/src/tsbench/config/model/models.py @@ -29,6 +29,7 @@ from gluonts.model.tft import TemporalFusionTransformerEstimator from gluonts.mx.trainer.callback import Callback from gluonts.time_feature import Constant +from tsbench.config.model.wrapper.auto_gluon_estimater import AutoGluonEstimator from mxnet.gluon import nn from tsbench.config.dataset import DatasetConfig from tsbench.config.dataset.datasets import WindFarmsDatasetConfig @@ -205,7 +206,7 @@ def create_predictor( ) -> Predictor: mqcnn_estimator = cast(MQCNNEstimator, estimator) transform = mqcnn_estimator.create_transformation() - return mqcnn_estimator.create_predictor(transform, cast) # type: ignore + return mqcnn_estimator.create_predictor(transform, network) # type: ignore def create_estimator( self, @@ -548,3 +549,38 @@ def create_estimator( prediction_length=prediction_length, method_name="ets", ) + +@register_model +@dataclass(frozen=True) +class AutoGluonModelConfig(ModelConfig, TrainConfig): + """ + The ETS estimator config. + """ + + presets: str = "default" + run_time: int = 1 * 60 * 60 + eval_metric: str = "mean_wQuantileLoss" + + @classmethod + def name(cls) -> str: + return "autogluon" + + def create_predictor(self, estimator: Estimator, network: nn.HybridBlock) -> Predictor: + return self.create_predictor(estimator, network) + + def create_estimator( + self, + freq: str, + prediction_length: int, + time_features: bool, + training_time: float, + validation_milestones: List[float], + callbacks: List[Callback], + ) -> Estimator: + return AutoGluonEstimator( + freq=freq, + prediction_length=prediction_length, + presets=self.presets, + run_time=self.run_time, + eval_metric=self.eval_metric, + ) diff --git a/src/tsbench/config/model/wrapper/__init__.py b/src/tsbench/config/model/wrapper/__init__.py new file mode 100644 index 0000000..3b5ac50 --- /dev/null +++ b/src/tsbench/config/model/wrapper/__init__.py @@ -0,0 +1,2 @@ +from .auto_gluon_predictor import AutoGluonPredictor +from .auto_gluon_estimater import AutoGluonEstimator \ No newline at end of file diff --git a/src/tsbench/config/model/wrapper/auto_gluon_estimater.py b/src/tsbench/config/model/wrapper/auto_gluon_estimater.py new file mode 100644 index 0000000..45401e4 --- /dev/null +++ b/src/tsbench/config/model/wrapper/auto_gluon_estimater.py @@ -0,0 +1,107 @@ +from typing import Dict, Optional + +from mxnet.gluon import HybridBlock + +from gluonts.core.component import validated +from gluonts.dataset.common import Dataset + +from gluonts.dataset.common import Dataset +from gluonts.model.estimator import Estimator +from gluonts.model.predictor import Predictor +from gluonts.transform import Transformation + +from .auto_gluon_predictor import AutoGluonPredictor +try: + from autogluon.timeseries import TimeSeriesPredictor, TimeSeriesDataFrame +except ImportError: + TimeSeriesPredictor = None + +AUTOGLUON_IS_INSTALLED = TimeSeriesPredictor is not None + +USAGE_MESSAGE = """ +Cannot import `autogluon`. + +The `AutoGluonEstimator` is a thin wrapper for calling the `AutoGluon` package. +""" + +class AutoGluonEstimator(Estimator): + """ + Wrapper around `Autogluon `_. + + The `AutoGluonPredictor` is a thin wrapper for calling the `Autogluon` + + Parameters + ---------- + freq + Time frequency of the data, e.g. '1H' + prediction_length + Number of time points to predict + run_time + The time limit parameter for autogluon + eval_metric + The metric score in leaderboard results + presets + The preset parameter used in autogluon + """ + + @validated() + def __init__( + self, + freq: str, + prediction_length: int, + run_time: int, + eval_metric: str, + presets: Optional[str], + ) -> None: + super().__init__(freq=freq, prediction_length=prediction_length) + + if not AUTOGLUON_IS_INSTALLED: + raise ImportError(USAGE_MESSAGE) + + self.freq = freq + self.prediction_length = prediction_length + self.autogluonts = TimeSeriesPredictor(prediction_length=prediction_length, eval_metric=eval_metric) + self.presets = presets + self.run_time = run_time + + def train_model( + self, + training_data: Dataset, + validation_data: Optional[Dataset] = None, + num_workers: Optional[int] = None, + num_prefetch: Optional[int] = None, + shuffle_buffer_length: Optional[int] = None, + cache_data: bool = False, + ) -> AutoGluonPredictor: + + train_dataframe = TimeSeriesDataFrame(training_data) + valid_dataframe = TimeSeriesDataFrame(validation_data) + + tspredictor = self.autogluonts.fit(train_dataframe, tuning_data=valid_dataframe, presets=self.presets, time_limit=self.run_time) + + return AutoGluonPredictor(tspredictor, prediction_length=self.prediction_length, freq=self.freq) + + + def train( + self, + training_data: Dataset, + validation_data: Optional[Dataset] = None, + num_workers: Optional[int] = None, + num_prefetch: Optional[int] = None, + shuffle_buffer_length: Optional[int] = None, + cache_data: bool = False, + **kwargs, + ) -> AutoGluonPredictor: + return self.train_model( + training_data=training_data, + validation_data=validation_data, + num_workers=num_workers, + num_prefetch=num_prefetch, + shuffle_buffer_length=shuffle_buffer_length, + cache_data=cache_data, + ) + + def create_predictor( + self, transformation: Transformation, trained_network: HybridBlock + ) -> Predictor: + return AutoGluonPredictor(self.autogluonts, prediction_length=self.prediction_length, freq=self.freq) diff --git a/src/tsbench/config/model/wrapper/auto_gluon_predictor.py b/src/tsbench/config/model/wrapper/auto_gluon_predictor.py new file mode 100644 index 0000000..ae24da9 --- /dev/null +++ b/src/tsbench/config/model/wrapper/auto_gluon_predictor.py @@ -0,0 +1,99 @@ +from typing import Iterator, Optional + +import json +from pathlib import Path +import os + +import warnings + +from gluonts.dataset.common import Dataset +from gluonts.model.predictor import Predictor +from gluonts.model.forecast import QuantileForecast + +try: + from autogluon.timeseries import TimeSeriesPredictor, TimeSeriesDataFrame +except ImportError: + TimeSeriesPredictor = None + +AUTOGLUON_IS_INSTALLED = TimeSeriesPredictor is not None + +USAGE_MESSAGE = """ +Cannot import `autogluon`. + +The `AutoGluonEstimator` is a thin wrapper for calling the `AutoGluon` package. +""" + +class AutoGluonPredictor(Predictor): + + def __init__(self, model: TimeSeriesPredictor, prediction_length: int, freq: str, lead_time: int = 0) -> None: + super().__init__(prediction_length, freq, lead_time) + + if not AUTOGLUON_IS_INSTALLED: + raise ImportError(USAGE_MESSAGE) + + self.prediction_length = prediction_length + self.freq = freq + self.predictor = model + + def predict( + self, + dataset: Dataset, + num_samples: Optional[int] = None, + num_workers: Optional[int] = None, + num_prefetch: Optional[int] = None, + **kwargs, + ) -> Iterator[QuantileForecast]: + data_frame = TimeSeriesDataFrame(dataset) + outputs = self.predictor.predict(data_frame) + + metas = outputs.index.values + cancat_len = outputs.shape[0] + assert cancat_len % self.prediction_length == 0 + ts_num = cancat_len // self.prediction_length + + # resault wraper + colums = outputs.columns[1:] + for i in range(ts_num): + cur_val = outputs.values[i * self.prediction_length : (i + 1) * self.prediction_length, 1:].T + meta = metas[i * self.prediction_length : (i + 1) * self.prediction_length] + yield QuantileForecast( + forecast_arrays=cur_val, + start_date=meta[0][1], + freq=self.freq, + forecast_keys=colums, + item_id=meta[0][0]) + + def leaderboard( + self, + dataset: Dataset, + **kwargs, + ): + warnings.filterwarnings("ignore") + data_frame = TimeSeriesDataFrame(dataset) + # 116154 1990-01-31 + # FIXME There are some problems with the use of this leaderboard data + model_path = os.getenv("SM_MODEL_DIR") or Path.home() / "models" + leaderboard = self.predictor.leaderboard(data_frame) + leaderboard.to_csv(Path.joinpath(Path(model_path), 'leaderboard.csv')) + print('leaderboard has been saved at:', Path.joinpath(Path(model_path), 'leaderboard.csv')) + + def deserialize(cls, path: Path, **kwargs) -> "Predictor": + predictor = TimeSeriesPredictor.load(cls, path) # type: ignore + file = path / "metadata.pickle" + with file.open("r") as f: + meta = json.load(f) + return AutoGluonPredictor(model=predictor, + freq=meta["freq"], prediction_length=meta["prediction_length"] + ) + + def serialize(self, path: Path) -> None: + self.predictor.save() + file = path / "metadata.pickle" + with file.open("w") as f: + json.dump( + { + "freq": self.freq, + "prediction_length": self.prediction_length, + }, + f, + ) diff --git a/src/tsbench/evaluations/aws/analytics.py b/src/tsbench/evaluations/aws/analytics.py index f65674a..9da1be5 100644 --- a/src/tsbench/evaluations/aws/analytics.py +++ b/src/tsbench/evaluations/aws/analytics.py @@ -119,31 +119,47 @@ def pull_logs(self) -> list[str]: log_file = self._cache_dir() / "logs.txt" if log_file.exists(): with log_file.open("r") as f: - return f.read().split("\n") + logs = f.read().split("\n") + # if logs are completed, return them + if "Reporting training SUCCESS" in logs[-1]: + return logs + # If not, fetch them - client = default_session().client("logs") - streams = client.describe_log_streams( - logGroupName="/aws/sagemaker/TrainingJobs", - logStreamNamePrefix=self.info["TrainingJobName"], - ) - res = [] - for stream in streams["logStreams"]: - params = { - "logGroupName": "/aws/sagemaker/TrainingJobs", - "logStreamName": stream["logStreamName"], - "startFromHead": True, - } - result = client.get_log_events(**params) - res.extend([event["message"] for event in result["events"]]) - while "nextForwardToken" in result: - next_token = result["nextForwardToken"] - result = client.get_log_events(nextToken=next_token, **params) - if result["nextForwardToken"] == next_token: - # The same token as before indicates end of stream, see - # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.get_log_events - break + complete = False + retry = 1 + while not complete: + client = default_session().client("logs") + streams = client.describe_log_streams( + logGroupName="/aws/sagemaker/TrainingJobs", + logStreamNamePrefix=self.info["TrainingJobName"], + ) + res = [] + for stream in streams["logStreams"]: + params = { + "logGroupName": "/aws/sagemaker/TrainingJobs", + "logStreamName": stream["logStreamName"], + "startFromHead": True, + } + result = client.get_log_events(**params) res.extend([event["message"] for event in result["events"]]) + while "nextForwardToken" in result: + next_token = result["nextForwardToken"] + result = client.get_log_events(nextToken=next_token, **params) + if result["nextForwardToken"] == next_token: + # The same token as before indicates end of stream, see + # https://boto3.amazonaws.com/v1/documentation/api/latest/reference/services/logs.html#CloudWatchLogs.Client.get_log_events + break + res.extend([event["message"] for event in result["events"]]) + if "Reporting training SUCCESS" not in res[-1]: + logging.warning( + "%dth retry, log download failed, sleep %d seconds and retry", + retry, 60, + ) + retry += 1 + time.sleep(60) + else: + complete = True # Store them log_file.parent.mkdir(parents=True, exist_ok=True) @@ -182,13 +198,24 @@ def metrics(self) -> dict[str, np.ndarray]: # If not, get them from the logs, write them to the file system and return metrics = { metric["Name"]: [ - float(x) + float(x[0]) if isinstance(x, tuple) else float(x) for x in re.findall(metric["Regex"], "\n".join(self.logs)) ] for metric in self.info["AlgorithmSpecification"][ "MetricDefinitions" ] } + # use custmize traing time replace traing time when traing time = 0 + if len(metrics['training_time']) == 0: + for (k, v) in metrics.items(): + if '_traing_time' in k: + metrics['training_time'] = v + # embad empty metric + job_num = len(metrics['training_time']) + for (k, v) in metrics.items(): + if len(v) == 0: + metrics[k] = [0] * job_num + with metrics_file.open("w+") as f: json.dump(metrics, f) @@ -317,13 +344,14 @@ def __init__( same hyperparameters are found. """ self.experiment_name = experiment - training_jobs, duplicates = _fetch_training_jobs( + training_jobs, duplicates, other_jobs = _fetch_training_jobs( default_session(), self.experiment_name, only_completed, resolve_duplicates, ) self.duplicates = duplicates + self.other_jobs = other_jobs self.map = {t.name: t for t in training_jobs if include(t)} if len(self.map) < len(training_jobs): logging.warning( @@ -364,7 +392,7 @@ def _fetch_training_jobs( experiment: str, only_completed: bool, resolve_duplicates: bool, -) -> tuple[list[TrainingJob], list[TrainingJob]]: +) -> tuple[list[TrainingJob], list[TrainingJob], list[TrainingJob]]: """ Fetches all training jobs which are associated with this experiment. """ @@ -403,13 +431,19 @@ def _fetch_training_jobs( time.sleep(1) jobs = [TrainingJob(r["TrainingJob"]) for r in results] + other_jobs = [] if only_completed: completed_jobs = [j for j in jobs if j.status == "Completed"] + other_jobs = [j for j in jobs if j.status != "Completed"] if len(completed_jobs) < len(jobs): c = Counter([j.status for j in jobs]) d = dict(c) del d["Completed"] + logging.warning( + " completed %d jobs", + len(completed_jobs), + ) logging.warning( " Analysis is ignoring %d jobs %s", len(jobs) - len(completed_jobs), @@ -439,7 +473,7 @@ def _fetch_training_jobs( ) jobs = list(unique.values()) - return jobs, duplicates + return jobs, duplicates, other_jobs # ------------------------------------------------------------------------------------------------- diff --git a/src/tsbench/evaluations/metrics/sagemaker.py b/src/tsbench/evaluations/metrics/sagemaker.py index 99b483c..4db372c 100644 --- a/src/tsbench/evaluations/metrics/sagemaker.py +++ b/src/tsbench/evaluations/metrics/sagemaker.py @@ -38,8 +38,35 @@ def metric_definitions() -> List[Dict[str, str]]: "Name": "val_ncrps", "Regex": _metric_regex("val_ncrps"), }, + { + "Name": "val_nd", + "Regex": _metric_regex("val_nd"), + }, + { + "Name": "val_nrmse", + "Regex": _metric_regex("val_nrmse"), + }, + { + "Name": "val_mase", + "Regex": _metric_regex("val_mase"), + }, + { + "Name": "val_smape", + "Regex": _metric_regex("val_smape"), + }, + { + "Name": "val_latency", + "Regex": _metric_regex("val_latency"), + }, + ] + custimze_metrics = [ + { + "Name": "autogluon_traing_time", + "Regex": f"Total runtime: {_FLOATING_POINT_REGEX}" + }, + ] - return scalar_metrics + list_metrics + return scalar_metrics + list_metrics + custimze_metrics # ------------------------------------------------------------------------------------------------- diff --git a/src/tsbench/evaluations/tracking/job.py b/src/tsbench/evaluations/tracking/job.py index 5ec6f50..3961b2d 100644 --- a/src/tsbench/evaluations/tracking/job.py +++ b/src/tsbench/evaluations/tracking/job.py @@ -176,7 +176,7 @@ def get_forecast(self, index: int) -> QuantileForecasts: cast(Path, self.source_path) / "forecasts" / f"model_{index:02}" ) - def save(self, path: Path, include_forecasts: bool = True) -> None: + def save(self, path: Path, include_forecasts: bool = False, include_leaderboard: bool = False) -> None: """ Stores all data associated with the training job in an auto-generated, unique folder within the provided directory. @@ -201,7 +201,7 @@ def save(self, path: Path, include_forecasts: bool = True) -> None: # Make sure folder exists and is empty if target.exists(): - if _check_all_data_available(target): + if _check_all_data_available(target, include_forecasts=include_forecasts, include_leaderboard=include_leaderboard): return shutil.rmtree(target) @@ -214,32 +214,42 @@ def save(self, path: Path, include_forecasts: bool = True) -> None: json.dump(self.performance, f, indent=4) # As well as the forecasts (we ignore val forecasts as they are never used) - if include_forecasts: + if include_forecasts or include_leaderboard: num_models = len(self.performance["performances"]) with self.source_job.artifact(cache=False) as artifact: - (target / "forecasts").mkdir() - for i in range(num_models): - (target / "forecasts" / f"model_{i:02}").mkdir() - shutil.copyfile( - artifact.path - / "predictions" - / f"model_{i}" - / "values.npy", - target / "forecasts" / f"model_{i:02}" / "values.npy", - ) - shutil.copyfile( - artifact.path - / "predictions" - / f"model_{i}" - / "metadata.npz", - target - / "forecasts" - / f"model_{i:02}" - / "metadata.npz", - ) + if include_forecasts: + (target / "forecasts").mkdir() + for i in range(num_models): + (target / "forecasts" / f"model_{i:02}").mkdir() + shutil.copyfile( + artifact.path + / "predictions" + / f"model_{i}" + / "values.npy", + target / "forecasts" / f"model_{i:02}" / "values.npy", + ) + shutil.copyfile( + artifact.path + / "predictions" + / f"model_{i}" + / "metadata.npz", + target + / "forecasts" + / f"model_{i:02}" + / "metadata.npz", + ) + if include_leaderboard: + if not (artifact.path / "leaderboard.csv").exists(): + print('this job has no leaderboard') + else: + shutil.copyfile( + artifact.path + / "leaderboard.csv", + target / "leaderboard.csv", + ) # Finally check that saving all data worked as expected - assert _check_all_data_available(target) + assert _check_all_data_available(target, include_forecasts=include_forecasts, include_leaderboard=include_leaderboard) # ------------------------------------------------------------------------------------------------- @@ -256,7 +266,11 @@ def load_jobs_from_analysis(analysis: Analysis) -> list[Job]: This function might take a long time if job logs are not cached. They are downloaded sequentially. """ - return [Job.from_training_job(job) for job in analysis] + jobs = [] + for job in analysis: + jobs.append(Job.from_training_job(job)) + return jobs + # return [Job.from_training_job(job) for job in analysis] def load_jobs_from_directory(directory: Path) -> list[Job]: @@ -307,6 +321,10 @@ def _extract_performance(job: TrainingJob) -> dict[str, Any]: "val_loss", "val_mean_weighted_quantile_loss", "val_ncrps", + "val_mase", + "val_smape", + "val_nrmse", + "val_nd", ], "testing": [ "mase", @@ -345,7 +363,7 @@ def _extract_performance(job: TrainingJob) -> dict[str, Any]: "meta": { "num_model_parameters": int( job.metrics["num_model_parameters"][0].item() - ), + ) if job.metrics["num_model_parameters"].shape[0] > 0 else 0, "latency": np.mean(job.metrics["latency"]).item(), }, "performances": performances, @@ -361,10 +379,22 @@ def _extract_performance(job: TrainingJob) -> dict[str, Any]: return result -def _check_all_data_available(target: Path) -> bool: - return ( - (target / "config.json").exists() - and (target / "performance.json").exists() - and (target / "forecasts").exists() - and len(os.listdir(target / "forecasts")) in (1, 11) - ) +def _check_all_data_available(target: Path, include_forecasts: bool, include_leaderboard: bool) -> bool: + res = ( + (target / "config.json").exists() + and (target / "performance.json").exists() + ) + if include_leaderboard: + res = ( + res + # and (target / "train_leaderboard.csv").exists() + and (target / "leaderboard.csv").exists() + ) + if include_forecasts: + res = ( + res + and (target / "forecasts").exists() + and len(os.listdir(target / "forecasts")) in (1, 11) + ) + + return res diff --git a/src/tsbench/evaluations/training/evaluate.py b/src/tsbench/evaluations/training/evaluate.py index 04f3baf..9eed784 100644 --- a/src/tsbench/evaluations/training/evaluate.py +++ b/src/tsbench/evaluations/training/evaluate.py @@ -91,11 +91,11 @@ def evaluate_predictors( for metric, value in evaluation.summary.items(): log_metric(metric, value) log_metric("latency", latency) + predictor.leaderboard(dataset) else: - log_metric( - "val_ncprs", - evaluation.summary["ncrps"], - ) + for metric, value in evaluation.summary.items(): + log_metric("val_" + metric, value) + log_metric("val_latency", latency) def serialize_predictors(self, directory: Path) -> None: """ diff --git a/src/tsbench/evaluations/training/fit.py b/src/tsbench/evaluations/training/fit.py index 285247f..6a6a55e 100644 --- a/src/tsbench/evaluations/training/fit.py +++ b/src/tsbench/evaluations/training/fit.py @@ -24,6 +24,7 @@ ModelSaverCallback, ParameterCountCallback, ) +from tsbench.config.model import AutoGluonModelConfig from .evaluate import FitResult from .logging import log_metric @@ -122,6 +123,11 @@ def fit_estimator( # pylint: disable=too-many-statements log_metric("training_time", 0) return FitResult(config, [predictor], [0.0], 0) + # TODO the autogluon can not add callback functions, it just return in there, + # this problem will be solved later + if isinstance(config, AutoGluonModelConfig): + return FitResult(config, [predictor], [0.0], 0) + # Otherwise, we need to load all models that were stored by the callback predictors = [] model_paths = [] diff --git a/src/tsbench/forecasts/prediction.py b/src/tsbench/forecasts/prediction.py index 386b199..404589d 100644 --- a/src/tsbench/forecasts/prediction.py +++ b/src/tsbench/forecasts/prediction.py @@ -20,7 +20,11 @@ from gluonts.evaluation.backtest import make_evaluation_predictions from gluonts.model.forecast import QuantileForecast, SampleForecast from gluonts.model.predictor import ParallelizedPredictor, Predictor -from gluonts.support.util import maybe_len +try: + # TODO this problem may be caused by the version of gluonts? + from gluonts.support.util import maybe_len +except: + from gluonts.itertools import maybe_len from pandas.tseries.frequencies import to_offset from tqdm.auto import tqdm from .quantile import QuantileForecasts