Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 9 additions & 1 deletion pipelines/azure_command_center.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
from pipelines.batch.setup_job import main as setup_job
from pipelines.postprocess_forecast_batches import main as postprocess

LOCAL_COPY_DIR = Path.home() / "stf_forecast_fig_share"

DEFAULT_RNG_KEY = 12345
DEFAULT_TRAINING_DAYS = 150
DEFAULT_EXCLUDE_LAST_N_DAYS = 1
Expand Down Expand Up @@ -473,10 +475,16 @@ def ask_integer_choice(choices):
"Skip processing for model batch directories that already have figures?",
default=True,
)
save_local_copy = Confirm.ask(
f"Save a local copy of figures to {LOCAL_COPY_DIR}?",
default=True,
)
local_copy_dir = LOCAL_COPY_DIR if save_local_copy else ""
postprocess(
base_forecast_dir=pyrenew_hew_prod_output_path / output_subdir,
diseases=ALL_DISEASES,
diseases=list(ALL_DISEASES),
skip_existing=skip_existing,
local_copy_dir=local_copy_dir,
)

input("Press enter to continue...")
65 changes: 54 additions & 11 deletions pipelines/postprocess_forecast_batches.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import argparse
import datetime as dt
import logging
import shutil
from pathlib import Path

import collate_plots as cp
Expand Down Expand Up @@ -51,26 +52,62 @@ def process_model_batch_dir(model_batch_dir_path: Path, plot_ext: str = "pdf") -
combine_hubverse_tables(model_batch_dir_path)


def model_batch_dir_to_target_path(
model_batch_dir: str,
max_last_training_date: dt.date,
pre_path: Path | str,
) -> Path:
parts = parse_model_batch_dir_name(model_batch_dir)
lookback = (parts["last_training_date"] - parts["first_training_date"]).days + 1
omit = (max_last_training_date - parts["last_training_date"]).days + 1
target_path = Path(
pre_path,
f"lookback-{lookback}-omit-{omit}",
parts["disease"],
)
return target_path


def main(
base_forecast_dir: Path | str,
diseases: list[str] | set[str] = ["COVID-19", "Influenza", "RSV"],
skip_existing: bool = True,
local_copy_dir: Path | str = "",
) -> None:
logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)
to_process = get_all_forecast_dirs(base_forecast_dir, list(diseases))
to_process = get_all_forecast_dirs(base_forecast_dir, diseases)
# compute max last training date across all model batch dirs and assume this corresponds to omitting 1 day.
max_last_training_date = max(
[
parse_model_batch_dir_name(model_batch_dir)["last_training_date"]
for model_batch_dir in to_process
]
)
if skip_existing:
to_process = [
batch_dir
for batch_dir in to_process
if not any(
Path(base_forecast_dir, batch_dir).glob("*-hubverse-table.parquet")
)
]

for batch_dir in to_process:
model_batch_dir_path = Path(base_forecast_dir, batch_dir)
hubverse_tbl_exists = bool(
list(model_batch_dir_path.glob("*-hubverse-table.parquet"))
)
if hubverse_tbl_exists and skip_existing:
logger.info(f"Skipping {batch_dir}, hubverse table already exists.")
else:
logger.info(f"Processing {batch_dir}...")
process_model_batch_dir(model_batch_dir_path)
logger.info(f"Finished processing {batch_dir}")
logger.info(f"Finished processing {base_forecast_dir}.")
logger.info(f"Processing {batch_dir}...")
process_model_batch_dir(model_batch_dir_path)
logger.info(f"Finished processing {batch_dir}")
if local_copy_dir:
source_dir = Path(base_forecast_dir, batch_dir, "figures")
target_dir = model_batch_dir_to_target_path(
batch_dir, max_last_training_date, local_copy_dir
)
logger.info(
f"Copying from {source_dir.relative_to(base_forecast_dir)} to {target_dir.relative_to(local_copy_dir)}..."
)
shutil.copytree(source_dir, target_dir, dirs_exist_ok=True)
logger.info(f"Finished processing {base_forecast_dir}.")


if __name__ == "__main__":
Expand Down Expand Up @@ -98,6 +135,12 @@ def main(
action="store_true",
help="Skip processing for model batch directories that already have been processed.",
)
parser.add_argument(
"--local-copy-dir",
type=str,
default="",
help="Save a local copy of the processed files to this directory, if supplied.",
)

args = parser.parse_args()
args.diseases = args.diseases.split()
Expand Down