diff --git a/pipelines/azure_command_center.py b/pipelines/azure_command_center.py index ebc86d7f..f17aafaa 100644 --- a/pipelines/azure_command_center.py +++ b/pipelines/azure_command_center.py @@ -16,6 +16,8 @@ from pipelines.batch.setup_job import main as setup_job from pipelines.postprocess_forecast_batches import main as postprocess +LOCAL_COPY_DIR = Path.home() / "stf_forecast_fig_share" + DEFAULT_RNG_KEY = 12345 DEFAULT_TRAINING_DAYS = 150 DEFAULT_EXCLUDE_LAST_N_DAYS = 1 @@ -473,10 +475,16 @@ def ask_integer_choice(choices): "Skip processing for model batch directories that already have figures?", default=True, ) + save_local_copy = Confirm.ask( + f"Save a local copy of figures to {LOCAL_COPY_DIR}?", + default=True, + ) + local_copy_dir = LOCAL_COPY_DIR if save_local_copy else "" postprocess( base_forecast_dir=pyrenew_hew_prod_output_path / output_subdir, - diseases=ALL_DISEASES, + diseases=list(ALL_DISEASES), skip_existing=skip_existing, + local_copy_dir=local_copy_dir, ) input("Press enter to continue...") diff --git a/pipelines/postprocess_forecast_batches.py b/pipelines/postprocess_forecast_batches.py index fa56a6f2..da5bf563 100644 --- a/pipelines/postprocess_forecast_batches.py +++ b/pipelines/postprocess_forecast_batches.py @@ -9,6 +9,7 @@ import argparse import datetime as dt import logging +import shutil from pathlib import Path import collate_plots as cp @@ -51,26 +52,62 @@ def process_model_batch_dir(model_batch_dir_path: Path, plot_ext: str = "pdf") - combine_hubverse_tables(model_batch_dir_path) +def model_batch_dir_to_target_path( + model_batch_dir: str, + max_last_training_date: dt.date, + pre_path: Path | str, +) -> Path: + parts = parse_model_batch_dir_name(model_batch_dir) + lookback = (parts["last_training_date"] - parts["first_training_date"]).days + 1 + omit = (max_last_training_date - parts["last_training_date"]).days + 1 + target_path = Path( + pre_path, + f"lookback-{lookback}-omit-{omit}", + parts["disease"], + ) + return target_path + + def main( base_forecast_dir: Path | str, diseases: list[str] | set[str] = ["COVID-19", "Influenza", "RSV"], skip_existing: bool = True, + local_copy_dir: Path | str = "", ) -> None: logging.basicConfig(level=logging.INFO) logger = logging.getLogger(__name__) - to_process = get_all_forecast_dirs(base_forecast_dir, list(diseases)) + to_process = get_all_forecast_dirs(base_forecast_dir, diseases) + # compute max last training date across all model batch dirs and assume this corresponds to omitting 1 day. + max_last_training_date = max( + [ + parse_model_batch_dir_name(model_batch_dir)["last_training_date"] + for model_batch_dir in to_process + ] + ) + if skip_existing: + to_process = [ + batch_dir + for batch_dir in to_process + if not any( + Path(base_forecast_dir, batch_dir).glob("*-hubverse-table.parquet") + ) + ] + for batch_dir in to_process: model_batch_dir_path = Path(base_forecast_dir, batch_dir) - hubverse_tbl_exists = bool( - list(model_batch_dir_path.glob("*-hubverse-table.parquet")) - ) - if hubverse_tbl_exists and skip_existing: - logger.info(f"Skipping {batch_dir}, hubverse table already exists.") - else: - logger.info(f"Processing {batch_dir}...") - process_model_batch_dir(model_batch_dir_path) - logger.info(f"Finished processing {batch_dir}") - logger.info(f"Finished processing {base_forecast_dir}.") + logger.info(f"Processing {batch_dir}...") + process_model_batch_dir(model_batch_dir_path) + logger.info(f"Finished processing {batch_dir}") + if local_copy_dir: + source_dir = Path(base_forecast_dir, batch_dir, "figures") + target_dir = model_batch_dir_to_target_path( + batch_dir, max_last_training_date, local_copy_dir + ) + logger.info( + f"Copying from {source_dir.relative_to(base_forecast_dir)} to {target_dir.relative_to(local_copy_dir)}..." + ) + shutil.copytree(source_dir, target_dir, dirs_exist_ok=True) + logger.info(f"Finished processing {base_forecast_dir}.") if __name__ == "__main__": @@ -98,6 +135,12 @@ def main( action="store_true", help="Skip processing for model batch directories that already have been processed.", ) + parser.add_argument( + "--local-copy-dir", + type=str, + default="", + help="Save a local copy of the processed files to this directory, if supplied.", + ) args = parser.parse_args() args.diseases = args.diseases.split()