diff --git a/.gitignore b/.gitignore index f3cc5668..55bb9b31 100644 --- a/.gitignore +++ b/.gitignore @@ -395,7 +395,7 @@ private_data/* # Ignore end to end test output pipelines/tests/end_to_end_test_output/* -pipelines/tests/epiautogp_test_output/* +pipelines/tests/epiautogp_end_to_end_test_output/* # subdirs reserved for mounting blob storage containers config diff --git a/EpiAutoGP/Project.toml b/EpiAutoGP/Project.toml index ec549f67..c8e66df8 100644 --- a/EpiAutoGP/Project.toml +++ b/EpiAutoGP/Project.toml @@ -1,7 +1,7 @@ name = "EpiAutoGP" uuid = "c2940010-6b35-4be1-8bbf-9fa0d9979e50" -authors = ["Sam Brand (USI1) "] version = "0.1.0" +authors = ["Sam Brand (USI1) "] [deps] ArgParse = "c7e460c6-2fb9-53a9-8c5b-16f535851c63" @@ -11,6 +11,7 @@ Dates = "ade2ca70-3891-5945-98fb-dc099432e06a" JSON3 = "0f8b85d8-7281-11e9-16c2-39a750bddbf1" Logging = "56ddb016-857b-54e1-b83d-db4d58db5568" NowcastAutoGP = "7e9f7f4b-f590-4c14-8324-de4fcbed18f7" +Parquet = "626c502c-15b0-58ad-a749-f091afb673ae" Random = "9a3f8284-a2c9-5f02-9a11-845980a1fd5c" Statistics = "10745b16-79ce-11e8-11f9-7d13ad32a3b2" StructTypes = "856f2bd8-1eba-4b0a-8007-ebc267875bd4" @@ -27,6 +28,7 @@ Dates = "1.11.0" JSON3 = "1.14.3" Logging = "1.11.0" NowcastAutoGP = "0.3.0" +Parquet = "0.8.6" Random = "1.11.0" Statistics = "1.11.1" StructTypes = "1.11.0" diff --git a/EpiAutoGP/run.jl b/EpiAutoGP/run.jl index c7c33b27..28e10c53 100644 --- a/EpiAutoGP/run.jl +++ b/EpiAutoGP/run.jl @@ -56,7 +56,7 @@ function main() # Create hubverse-compatible output @info "Creating hubverse-compatible forecast output..." - output_type = QuantileOutput() # Use default quantile levels + output_type = PipelineOutput() # Use default quantile levels hubverse_df = create_forecast_output( input_data, diff --git a/EpiAutoGP/src/EpiAutoGP.jl b/EpiAutoGP/src/EpiAutoGP.jl index af97efde..a3a3ffa4 100644 --- a/EpiAutoGP/src/EpiAutoGP.jl +++ b/EpiAutoGP/src/EpiAutoGP.jl @@ -1,6 +1,6 @@ module EpiAutoGP using NowcastAutoGP # Core modeling package -using CSV, DataFramesMeta, Dates, JSON3, StructTypes # Data handling packages +using CSV, DataFramesMeta, Dates, JSON3, StructTypes, Parquet # Data handling packages using ArgParse # Command-line argument parsing using Statistics # For modeling functions @@ -23,6 +23,7 @@ export prepare_for_modelling, export AbstractForecastOutput, AbstractHubverseOutput, QuantileOutput, + PipelineOutput, create_forecast_df, create_forecast_output @@ -36,6 +37,10 @@ const DEFAULT_TARGET_DICT = Dict( "nhsn" => "hosp", "nssp" => "prop ed visits" ) +const DEFAULT_TARGET_LETTER = Dict( + "nhsn" => "h", + "nssp" => "e" +) const DEFAULT_GROUP_NAME = "CFA" const DEFAULT_MODEL_NAME = "EpiAutoGP" diff --git a/EpiAutoGP/src/input.jl b/EpiAutoGP/src/input.jl index 0db25e45..736ee7f6 100644 --- a/EpiAutoGP/src/input.jl +++ b/EpiAutoGP/src/input.jl @@ -12,26 +12,14 @@ with nowcasting requirements and forecast parameters. - `reports::Vector{Real}`: Vector of case counts/measurements corresponding to each date - `pathogen::String`: Disease identifier (e.g., "COVID-19", "Influenza", "RSV") - `location::String`: Geographic location identifier (e.g., "CA", "NY", "US") +- `target::String`: Target data type (e.g., "nssp", "nhsn") +- `frequency::String`: Temporal frequency of data ("daily" or "epiweekly") +- `use_percentage::Bool`: Whether data represents percentage values +- `ed_visit_type::String`: Type of ED visits ("observed" or "other"), only applicable for NSSP target - `forecast_date::Date`: Reference date from which forecasting begins, often this will be a nowcast date - `nowcast_dates::Vector{Date}`: Dates requiring nowcasting (typically recent dates with incomplete data) - `nowcast_reports::Vector{Vector{Real}}`: Uncertainty bounds or samples for nowcast dates -# Examples -```julia -# Create a simple input dataset -data = EpiAutoGPInput( - [Date("2024-01-01"), Date("2024-01-02"), Date("2024-01-03")], - [45.0, 52.0, 38.0], - "COVID-19", - "CA", - Date("2024-01-03"), - [Date("2024-01-02"), Date("2024-01-03")], - [[50.0, 52.0, 54.0], [36.0, 38.0, 40.0]] -) - -# Validate the input -validate_input(data) # returns true if valid -``` """ struct EpiAutoGPInput dates::Vector{Date} @@ -39,6 +27,9 @@ struct EpiAutoGPInput pathogen::String location::String target::String + frequency::String + use_percentage::Bool + ed_visit_type::String forecast_date::Date nowcast_dates::Vector{Date} nowcast_reports::Vector{Vector{Real}} @@ -65,30 +56,6 @@ Performs comprehensive validation including: # Returns - `Bool`: Returns `true` if validation passes - -# Throws -- `ArgumentError`: If any validation check fails, with descriptive error message - -# Examples -```julia -# Valid data passes validation -valid_data = EpiAutoGPInput( - [Date("2024-01-01"), Date("2024-01-02")], - [45.0, 52.0], - "COVID-19", "CA", Date("2024-01-02"), - Date[], Vector{Real}[] -) -validate_input(valid_data) # returns true - -# Invalid data throws ArgumentError -invalid_data = EpiAutoGPInput( - [Date("2024-01-01")], - [-5.0], # negative values not allowed - "COVID-19", "CA", Date("2024-01-01"), - Date[], Vector{Real}[] -) -validate_input(invalid_data) # throws ArgumentError -``` """ function validate_input(data::EpiAutoGPInput; valid_targets = ["nhsn", "nssp"]) @assert data.target in valid_targets "Target must be one of $(valid_targets), got '$(data.target)'" @@ -221,41 +188,6 @@ end read_and_validate_data(path_to_json::String) -> EpiAutoGPInput Read epidemiological data from JSON file with automatic validation. - -This is the recommended function for loading input data in production workflows. -It combines [`read_data`](@ref) and [`validate_input`](@ref) to ensure that -loaded data is both structurally correct and passes all validation checks. - -# Arguments -- `path_to_json::String`: Path to the JSON file containing input data - -# Returns -- `EpiAutoGPInput`: Validated data structure ready for modeling - -# Throws -- `SystemError`: If the file cannot be read -- `JSON3.StructuralError`: If JSON structure is invalid -- `ArgumentError`: If data fails validation checks - -# Examples -```julia -# Load and validate data in one step -data = read_and_validate_data("epidata.json") - -# This is equivalent to: -data = read_data("epidata.json") -validate_input(data) - -# Use in a try-catch block for error handling -try - data = read_and_validate_data("uncertain_data.json") - println("Data loaded successfully") -catch e - @error "Failed to load data" exception=e -end -``` - -See also: [`read_data`](@ref), [`validate_input`](@ref), [`EpiAutoGPInput`](@ref) """ function read_and_validate_data(path_to_json::String) data = read_data(path_to_json) diff --git a/EpiAutoGP/src/modelling.jl b/EpiAutoGP/src/modelling.jl index 78ec941d..024395b5 100644 --- a/EpiAutoGP/src/modelling.jl +++ b/EpiAutoGP/src/modelling.jl @@ -1,5 +1,5 @@ """ - prepare_for_modelling(input::EpiAutoGPInput, transformation_name::String, n_forecast_weeks::Int, n_forecasts::Int) -> NamedTuple + prepare_for_modelling(input::EpiAutoGPInput, transformation_name::String, n_ahead::Int, n_forecasts::Int) -> NamedTuple Prepare all data and configuration needed for NowcastAutoGP modeling. @@ -9,7 +9,7 @@ formats nowcast data for the modeling pipeline, and calculates forecast dates an # Arguments - `input::EpiAutoGPInput`: The input data structure containing dates, reports, and nowcast information - `transformation_name::String`: Name of transformation to apply ("boxcox", "positive", "percentage") -- `n_forecast_weeks::Int`: Number of weeks to forecast into the future +- `n_ahead::Int`: Number of time steps (days or epiweeks) to forecast into the future - `n_forecasts::Int`: Total number of forecast samples desired # Returns @@ -21,15 +21,9 @@ A NamedTuple containing: - `n_forecasts_per_nowcast::Int`: Number of forecast samples per nowcast scenario - `transformation::Function`: Forward transformation function - `inv_transformation::Function`: Inverse transformation function - -# Examples -```julia -input = EpiAutoGPInput(...) -model_setup = prepare_for_modelling(input, "boxcox", 4, 1000) -``` """ function prepare_for_modelling(input::EpiAutoGPInput, transformation_name::String, - n_forecast_weeks::Int, n_forecasts::Int) + n_ahead::Int, n_forecasts::Int) # Extract stable confirmed data, excluding recent uncertain dates with nowcasts stable_data_idxs = findall(d -> !(d in input.nowcast_dates), input.dates) stable_data_dates = input.dates[stable_data_idxs] @@ -47,8 +41,10 @@ function prepare_for_modelling(input::EpiAutoGPInput, transformation_name::Strin # Create nowcast data structure when nowcasts exist create_nowcast_data(input.nowcast_reports, input.nowcast_dates; transformation) - # Calculate forecasting dates (starting from forecast_date and going up to n_forecast_weeks ahead) - forecast_dates = [input.forecast_date + Week(i) for i in 0:n_forecast_weeks] + # Calculate forecasting dates (starting from forecast_date and going n_ahead time steps forward) + # Use Day or Week based on frequency + time_step = input.frequency == "epiweekly" ? Week(1) : Day(1) + forecast_dates = [input.forecast_date + i * time_step for i in 0:n_ahead] # Calculate number of forecasts per nowcast sample n_forecasts_per_nowcast = isnothing(nowcast_data) ? @@ -84,14 +80,6 @@ combination with nowcast scenarios. # Returns - Fitted AutoGP model ready for forecasting - -# Examples -```julia -dates = [Date(2024,1,1), Date(2024,1,8), Date(2024,1,15)] -values = [100.0, 120.0, 95.0] -transform_func, _ = get_transformations("boxcox", values) -model = fit_base_model(dates, values; transformation=transform_func) -``` """ function fit_base_model(dates::Vector{Date}, values::Vector{<:Real}; transformation::Function, @@ -136,7 +124,7 @@ end """ forecast_with_epiautogp(input::EpiAutoGPInput; - n_forecast_weeks::Int=8, + n_ahead::Int=8, n_forecasts::Int=20, transformation_name::String="boxcox", n_particles::Int=24, @@ -153,7 +141,7 @@ This function implements the complete nowcasting and forecasting workflow: # Arguments - `input::EpiAutoGPInput`: The input data structure with dates, reports, and nowcast information -- `n_forecast_weeks::Int=8`: Number of weeks to forecast ahead from forecast_date +- `n_ahead::Int=8`: Number of time steps (days or epiweeks) to forecast ahead from forecast_date - `n_forecasts::Int=20`: Total number of forecast samples to generate - `transformation_name::String="boxcox"`: Data transformation type ("boxcox", "positive", "percentage") - `n_particles::Int=24`: Number of SMC particles for GP model fitting @@ -168,23 +156,9 @@ A NamedTuple containing: - `forecast_date::Date`: The reference date for forecasting (from input.forecast_date) - `location::String`: The location identifier (from input.location) - `disease::String`: The disease name (from input.disease) - -# Examples -```julia -# Basic forecasting -input = EpiAutoGPInput(...) -results = forecast_with_epiautogp(input) -forecast_dates, forecasts = results.forecast_dates, results.forecasts - -# Custom parameters -results = forecast_with_epiautogp(input; - n_forecast_weeks=4, - n_forecasts=1000, - transformation_name="positive") -``` """ function forecast_with_epiautogp(input::EpiAutoGPInput; - n_forecast_weeks::Int = 8, + n_ahead::Int = 8, n_forecasts::Int = 20, transformation_name::String = "boxcox", n_particles::Int = 24, @@ -193,7 +167,7 @@ function forecast_with_epiautogp(input::EpiAutoGPInput; n_hmc::Int = 50) # Prepare training data, nowcasting data and forecasting dates - model_info = prepare_for_modelling(input, transformation_name, n_forecast_weeks, n_forecasts) + model_info = prepare_for_modelling(input, transformation_name, n_ahead, n_forecasts) # Fit base model on confirmed/stable data base_model = fit_base_model( @@ -234,26 +208,17 @@ with parsed command-line arguments to execute the full nowcasting and forecastin - Same as forecast_with_epiautogp(): NamedTuple with forecast results and metadata # Expected command-line arguments -- `"n-forecast-weeks"`: Number of weeks to forecast +- `"n-ahead"`: Number of time steps (days or epiweeks) to forecast - `"n-forecast-draws"`: Total number of forecast samples - `"transformation"`: Data transformation type - `"n-particles"`: Number of SMC particles - `"smc-data-proportion"`: SMC data proportion - `"n-mcmc"`: Number of MCMC samples - `"n-hmc"`: Number of HMC samples - -# Examples -```julia -# Typical usage pattern -args = parse_arguments() -input_data = read_and_validate_data(args["json-input"]) -results = forecast_with_epiautogp(input_data, args) -forecast_dates, forecasts = results.forecast_dates, results.forecasts -``` """ function forecast_with_epiautogp(input::EpiAutoGPInput, args::Dict{String, Any}) return forecast_with_epiautogp(input; - n_forecast_weeks = args["n-forecast-weeks"], + n_ahead = args["n-ahead"], n_forecasts = args["n-forecast-draws"], transformation_name = args["transformation"], n_particles = args["n-particles"], diff --git a/EpiAutoGP/src/output.jl b/EpiAutoGP/src/output.jl index ff1d404c..8dea9087 100644 --- a/EpiAutoGP/src/output.jl +++ b/EpiAutoGP/src/output.jl @@ -2,47 +2,28 @@ AbstractForecastOutput Abstract base type for all forecast output formats in EpiAutoGP. - -This type serves as the root of the forecast output type hierarchy, allowing for -extensible output formatting while maintaining type safety and dispatch. """ abstract type AbstractForecastOutput end """ AbstractHubverseOutput <: AbstractForecastOutput -Abstract type for hubverse-compatible forecast outputs. - -The hubverse is a standardized format for epidemiological forecasting used by -the CDC and other public health organizations. All concrete subtypes must -produce outputs compatible with hubverse table specifications, e.g. quantile-based -forecasts, sample-based forecasts, etc. +Abstract type for hubverse-compatible forecast outputs in CSV format. """ abstract type AbstractHubverseOutput <: AbstractForecastOutput end """ - QuantileOutput <: AbstractHubverseOutput - -Configuration for quantile-based forecast outputs compatible with hubverse specifications. - -This struct defines the quantile levels to be computed and included in the -hubverse-compatible output table. The default quantile levels follow CDC -forecast hub standards. - -# Fields -- `quantile_levels::Vector{Float64}`: Vector of quantile levels between 0 and 1 + PipelineOutput <: AbstractForecastOutput -# Examples -```julia -# Use default quantiles (23 levels from 0.01 to 0.99) -output = QuantileOutput() +Abstract type for directly outputting forecasts as typical pipeline outputs for + `pyrenew-hew`. +""" +struct PipelineOutput <: AbstractForecastOutput end -# Custom quantiles for specific use case -output = QuantileOutput(quantile_levels = [0.25, 0.5, 0.75]) +""" + QuantileOutput <: AbstractHubverseOutput -# Single quantile (median only) -output = QuantileOutput(quantile_levels = [0.5]) -``` +Configuration for quantile-based forecast outputs compatible with hubverse specifications. """ @kwdef struct QuantileOutput <: AbstractHubverseOutput quantile_levels::Vector{Float64} = [ @@ -65,14 +46,6 @@ date (when the forecast was made) and each target date. # Returns - `Vector{Int}`: Vector of horizons in weeks (integer division by 7 days) - -# Examples -```julia -ref_date = Date("2024-01-01") -targets = [Date("2024-01-08"), Date("2024-01-15"), Date("2024-01-22")] -horizons = _make_horizon_col(targets, ref_date) -# Returns: [1, 2, 3] -``` """ function _make_horizon_col(target_end_dates::Vector{Date}, reference_date::Date) return [Dates.value(d - reference_date) ÷ 7 for d in target_end_dates] @@ -99,15 +72,6 @@ core forecast data needed for hubverse tables. - `value`: Computed quantile value - `target_end_date`: Date for which the forecast applies - `output_type`: Always "quantile" for this method - -# Examples -```julia -results = (forecast_dates = [Date("2024-01-08"), Date("2024-01-15")], - forecasts = rand(2, 100)) # 2 dates × 100 samples -output_config = QuantileOutput(quantile_levels = [0.25, 0.5, 0.75]) -df = create_forecast_df(results, output_config) -# Returns DataFrame with 6 rows (2 dates × 3 quantiles) -``` """ function create_forecast_df(results::NamedTuple, output_type::QuantileOutput) # Extract relevant data @@ -131,6 +95,23 @@ function create_forecast_df(results::NamedTuple, output_type::QuantileOutput) return forecast_df end +function create_forecast_df(results::NamedTuple, output_type::PipelineOutput) + # Extract relevant data + forecast_dates = results.forecast_dates + forecasts = results.forecasts + + # Create a DataFrame with columns: date, .value, .draw + forecast_df = mapreduce(vcat, enumerate(eachcol(forecasts))) do (draw, sampled_values) + DataFrame( + :date => forecast_dates, + Symbol(".value") => sampled_values, + Symbol(".draw") => fill(draw, length(sampled_values)) + ) + end + + return forecast_df +end + """ create_forecast_output(input, results, output_dir, output_type; kwargs...) -> DataFrame @@ -163,28 +144,6 @@ hubverse table, optionally saving it to disk. - `horizon`: Forecast horizon in weeks - `target_end_date`: Date for which forecast applies - `location`: Geographic location identifier - -# Examples -```julia -# Create and save hubverse table -output_type = QuantileOutput() -df = create_forecast_output( - input_data, results, "./output", output_type; - save_output = true, - group_name = "CDC", - model_name = "EpiAutoGP-v1" -) - -# Create table without saving -df = create_forecast_output( - input_data, results, "./output", output_type; - save_output = false -) -``` - -# File Output -When `save_output = true`, creates a CSV file with filename format: -`{reference_date}-{group_name}-{model_name}-{location}-{disease_abbr}-{target}.csv` """ function create_forecast_output( input::EpiAutoGPInput, @@ -231,3 +190,65 @@ function create_forecast_output( return forecast_df end + +function create_forecast_output( + input::EpiAutoGPInput, + results::NamedTuple, + output_dir::String, + output_type::PipelineOutput; + save_output::Bool, + disease_abbr::Dict{String, String} = DEFAULT_PATHOGEN_DICT, + target_abbr::Dict{String, String} = DEFAULT_TARGET_DICT, + group_name::String = DEFAULT_GROUP_NAME, + model_name::String = DEFAULT_MODEL_NAME +) + # Create basic forecast DataFrame with date, .draw, .value + forecast_df = create_forecast_df(results, output_type) + + # Determine variable name based on target and whether using percentages + variable_name = if input.target == "nhsn" + "observed_hospital_admissions" + else # nssp + if input.use_percentage + "prop_disease_ed_visits" + else + # Use ed_visit_type to determine which ED visits column + input.ed_visit_type == "other" ? "other_ed_visits" : "observed_ed_visits" + end + end + + # Input is in percentage format (0-100); convert to proportion (0-1) as R expects proportions for prop_ variables + if input.use_percentage && input.target == "nssp" + forecast_df[!, Symbol(".value")] = forecast_df[!, Symbol(".value")] ./ 100.0 + end + + # Add .variable and resolution columns + forecast_df[!, Symbol(".variable")] .= variable_name + forecast_df[!, :resolution] .= input.frequency + + # Add metadata columns for hubverse compatibility + forecast_df[!, :geo_value] .= input.location + forecast_df[!, :disease] .= input.pathogen + + # Add metadata columns for hubverse compatibility + forecast_df[!, :geo_value] .= input.location + forecast_df[!, :disease] .= input.pathogen + + # Convert date column to string for parquet compatibility + forecast_df[!, :date] = string.(forecast_df[!, :date]) + + # Save as parquet if requested + if save_output + # Use model-specific naming with frequency prefix + # This matches the convention: {frequency}_{model}_samples_{target_letter}.parquet + output_letter = DEFAULT_TARGET_LETTER[input.target] + parquet_filename = "$(input.frequency)_epiautogp_samples_$(output_letter).parquet" + parquet_path = joinpath(output_dir, parquet_filename) + mkpath(dirname(parquet_path)) + Parquet.write_parquet(parquet_path, forecast_df) + + @info "Saved pipeline forecast samples to $parquet_path" + end + + return forecast_df +end diff --git a/EpiAutoGP/src/parse_arguments.jl b/EpiAutoGP/src/parse_arguments.jl index 155990ac..7907ea5d 100644 --- a/EpiAutoGP/src/parse_arguments.jl +++ b/EpiAutoGP/src/parse_arguments.jl @@ -7,7 +7,7 @@ Parses command-line arguments for the EpiAutoGP model. - `--json-input::String` (required): Path to JSON file containing model input data. - `--output-dir::String` (required): Directory for saving model outputs. -- `--n-forecast-weeks::Int` (default: 8): Number of weeks to forecast. +- `--n-ahead::Int` (default: 8): Number of time steps (days or epiweeks) to forecast. - `--n-particles::Int` (default: 24): Number of particles for SMC. - `--n-mcmc::Int` (default: 100): Number of MCMC steps for GP kernel structure. - `--n-hmc::Int` (default: 50): Number of HMC steps for GP kernel hyperparameters. @@ -31,8 +31,8 @@ function parse_arguments() help = "Directory for saving model outputs" arg_type = String required = true - "--n-forecast-weeks" - help = "Number of weeks to forecast" + "--n-ahead" + help = "Number of time steps (days or epiweeks) to forecast" arg_type = Int default = 8 "--n-particles" diff --git a/EpiAutoGP/test/test_input.jl b/EpiAutoGP/test/test_input.jl index 7476de00..30a73536 100644 --- a/EpiAutoGP/test/test_input.jl +++ b/EpiAutoGP/test/test_input.jl @@ -11,7 +11,7 @@ function create_sample_input(output_path::String; n_weeks::Int = 30, for _ in 1:10] # 10 realizations, each with 3 values input_data = EpiAutoGPInput( - dates, reports, pathogen, location, "nhsn", + dates, reports, pathogen, location, "nhsn", "epiweekly", false, "observed", forecast_date, nowcast_dates, nowcast_reports ) @@ -31,7 +31,7 @@ end nowcast_reports = [[50.0, 36.0], [52.0, 38.0]] input_data = EpiAutoGPInput( - dates, reports, "COVID-19", "CA", "nhsn", + dates, reports, "COVID-19", "CA", "nhsn", "daily", false, "observed", Date("2024-01-03"), dates[2:3], nowcast_reports ) @@ -50,7 +50,7 @@ end valid_data = EpiAutoGPInput( [Date("2024-01-01"), Date("2024-01-02")], [45.0, 52.0], - "COVID-19", "CA", "nhsn", + "COVID-19", "CA", "nhsn", "daily", false, "observed", Date("2024-01-02"), [Date("2024-01-02")], [[50.0], [52.0]] @@ -59,7 +59,7 @@ end # Test without nowcasts no_nowcast = EpiAutoGPInput( - [Date("2024-01-01")], [10.0], "COVID-19", "TX", "nhsn", + [Date("2024-01-01")], [10.0], "COVID-19", "TX", "nhsn", "daily", false, "observed", Date("2024-01-01"), Date[], Vector{Real}[] ) @test validate_input(no_nowcast) == true @@ -69,29 +69,29 @@ end # Mismatched lengths @test_throws ArgumentError validate_input(EpiAutoGPInput( [Date("2024-01-01"), Date("2024-01-02")], [45.0], - "COVID-19", "CA", "nhsn", Date("2024-01-01"), Date[], Vector{Real}[] + "COVID-19", "CA", "nhsn", "daily", false, "observed", Date("2024-01-01"), Date[], Vector{Real}[] )) # Empty data @test_throws ArgumentError validate_input(EpiAutoGPInput( - Date[], Real[], "COVID-19", "CA", "nhsn", Date("2024-01-01"), Date[], Vector{Real}[] + Date[], Real[], "COVID-19", "CA", "nhsn", "daily", false, "observed", Date("2024-01-01"), Date[], Vector{Real}[] )) # Unsorted dates @test_throws ArgumentError validate_input(EpiAutoGPInput( [Date("2024-01-02"), Date("2024-01-01")], [45.0, 52.0], - "COVID-19", "CA", "nhsn", Date("2024-01-02"), Date[], Vector{Real}[] + "COVID-19", "CA", "nhsn", "daily", false, "observed", Date("2024-01-02"), Date[], Vector{Real}[] )) # Invalid nowcast structure @test_throws ArgumentError validate_input(EpiAutoGPInput( - [Date("2024-01-01")], [45.0], "COVID-19", "CA", "nhsn", Date("2024-01-01"), + [Date("2024-01-01")], [45.0], "COVID-19", "CA", "nhsn", "daily", false, "observed", Date("2024-01-01"), [Date("2024-01-01")], [[50.0, 55.0]] # Wrong length )) # Negative values @test_throws ArgumentError validate_input(EpiAutoGPInput( - [Date("2024-01-01")], [-5.0], "COVID-19", "CA", "nhsn", Date("2024-01-01"), Date[], Vector{Real}[] + [Date("2024-01-01")], [-5.0], "COVID-19", "CA", "nhsn", "daily", false, "observed", Date("2024-01-01"), Date[], Vector{Real}[] )) end @@ -104,6 +104,9 @@ end "pathogen" => "COVID-19", "location" => "CA", "target" => "nhsn", + "frequency" => "daily", + "use_percentage" => false, + "ed_visit_type" => "observed", "forecast_date" => "2024-01-02", "nowcast_dates" => ["2024-01-02"], "nowcast_reports" => [[50.0], [55.0]] diff --git a/EpiAutoGP/test/test_modelling.jl b/EpiAutoGP/test/test_modelling.jl index a00176e8..ba1c0daa 100644 --- a/EpiAutoGP/test/test_modelling.jl +++ b/EpiAutoGP/test/test_modelling.jl @@ -15,7 +15,7 @@ end return EpiAutoGPInput( - dates, reports, "COVID-19", "US", "nhsn", + dates, reports, "COVID-19", "US", "nhsn", "epiweekly", false, "observed", dates[end], nowcast_dates, nowcast_reports ) end @@ -59,7 +59,7 @@ forecast_dates, forecasts = forecast_with_epiautogp(input; - n_forecast_weeks = 2, n_forecasts = 10, + n_ahead = 2, n_forecasts = 10, transformation_name = "positive", n_particles = 1, smc_data_proportion = 0.5, n_mcmc = 3, n_hmc = 3 ) @@ -69,4 +69,32 @@ @test size(forecasts, 2) == 10 @test all(forecasts .> 0) end + + @testset "forecast_with_epiautogp - daily frequency" begin + # Create daily frequency test data + dates = [Date(2024, 1, 1) + Day(i-1) for i in 1:30] + reports = Float64[100 + 10*sin(i/5) + 2*i for i in 1:30] + + input = EpiAutoGPInput( + dates, reports, "COVID-19", "US", "nhsn", "daily", false, "observed", + dates[end], Date[], Vector{Float64}[] + ) + + forecast_dates, + forecasts = forecast_with_epiautogp(input; + n_ahead = 7, n_forecasts = 10, + transformation_name = "positive", + n_particles = 1, smc_data_proportion = 0.5, n_mcmc = 3, n_hmc = 3 + ) + + @test length(forecast_dates) == 8 # 0, 1, 2, ..., 7 days ahead + @test size(forecasts, 1) == 8 + @test size(forecasts, 2) == 10 + @test all(forecasts .> 0) + + # Verify forecast dates increment by Day(1) + @test forecast_dates[1] == dates[end] + @test forecast_dates[2] == dates[end] + Day(1) + @test forecast_dates[end] == dates[end] + Day(7) + end end diff --git a/EpiAutoGP/test/test_output.jl b/EpiAutoGP/test/test_output.jl index fa0f9058..f51f800e 100644 --- a/EpiAutoGP/test/test_output.jl +++ b/EpiAutoGP/test/test_output.jl @@ -45,6 +45,9 @@ end "COVID-19", "CA", "nhsn", + "epiweekly", + false, + "observed", Date("2024-01-01"), Date[], Vector{Real}[] diff --git a/EpiAutoGP/test/test_parse_arguments.jl b/EpiAutoGP/test/test_parse_arguments.jl index 92f836a5..0463f42e 100644 --- a/EpiAutoGP/test/test_parse_arguments.jl +++ b/EpiAutoGP/test/test_parse_arguments.jl @@ -18,7 +18,7 @@ @test parsed["output-dir"] == "/path/to/output" # Test key default values - @test parsed["n-forecast-weeks"] == 8 + @test parsed["n-ahead"] == 8 @test parsed["transformation"] == "boxcox" finally diff --git a/hewr/R/process_loc_forecast.R b/hewr/R/process_loc_forecast.R index f936ff1c..053d20f7 100644 --- a/hewr/R/process_loc_forecast.R +++ b/hewr/R/process_loc_forecast.R @@ -352,6 +352,77 @@ process_model_samples.timeseries <- function( ts_samples } +#' Process EpiAutoGP model samples +#' +#' Reads EpiAutoGP (Julia) model output from parquet files and formats +#' into standardized tidy format for downstream processing. +#' +#' @param model_type Character string indicating model type +#' @param model_run_dir Model run directory +#' @param model_name Name of directory containing model outputs +#' @param ts_samples Timeseries samples (unused for EpiAutoGP) +#' @param required_columns_e Required columns for output +#' @param n_forecast_days Number of forecast days +#' @param ... Additional arguments (unused) +#' @return Tibble of EpiAutoGP model samples with columns: +#' .draw, date, geo_value, disease, .variable, .value, resolution, +#' aggregated_numerator, aggregated_denominator +#' @exportS3Method +process_model_samples.epiautogp <- function( + model_type, + model_run_dir, + model_name, + ts_samples = NULL, + required_columns_e, + n_forecast_days, + ... +) { + model_dir <- fs::path(model_run_dir, model_name) + + # Determine the frequency and target from model name + frequency <- if (grepl("epiweekly", model_name, ignore.case = TRUE)) { + "epiweekly" + } else { + "daily" + } + + # Determine target letter (h for NHSN, e for NSSP) + target_letter <- if (grepl("nhsn", model_name, ignore.case = TRUE)) { + "h" + } else if (grepl("nssp", model_name, ignore.case = TRUE)) { + "e" + } else { + stop("Cannot determine target type from model name: ", model_name) + } + + samples_file <- sprintf( + "%s_epiautogp_samples_%s.parquet", + frequency, + target_letter + ) + + # Read EpiAutoGP samples + samples_path <- fs::path(model_dir, samples_file) + if (!fs::file_exists(samples_path)) { + stop(sprintf("EpiAutoGP samples file not found: %s", samples_path)) + } + + epiautogp_samples <- forecasttools::read_tabular(samples_path) + + # EpiAutoGP output already has all required columns from Julia: + # date, .value, .draw, .variable, resolution, geo_value, disease + # Just need to add aggregated_numerator and aggregated_denominator + samples_tidy <- epiautogp_samples |> + dplyr::mutate( + date = lubridate::as_date(.data$date), + aggregated_numerator = FALSE, + aggregated_denominator = NA + ) |> + dplyr::select(tidyselect::any_of(required_columns_e)) + + return(samples_tidy) +} + #' Detect model type from model name #' #' Internal helper function to infer model type from naming conventions diff --git a/hewr/man/process_model_samples.epiautogp.Rd b/hewr/man/process_model_samples.epiautogp.Rd new file mode 100644 index 00000000..f5d1856f --- /dev/null +++ b/hewr/man/process_model_samples.epiautogp.Rd @@ -0,0 +1,40 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/process_loc_forecast.R +\name{process_model_samples.epiautogp} +\alias{process_model_samples.epiautogp} +\title{Process EpiAutoGP model samples} +\usage{ +\method{process_model_samples}{epiautogp}( + model_type, + model_run_dir, + model_name, + ts_samples = NULL, + required_columns_e, + n_forecast_days, + ... +) +} +\arguments{ +\item{model_type}{Character string indicating model type} + +\item{model_run_dir}{Model run directory} + +\item{model_name}{Name of directory containing model outputs} + +\item{ts_samples}{Timeseries samples (unused for EpiAutoGP)} + +\item{required_columns_e}{Required columns for output} + +\item{n_forecast_days}{Number of forecast days} + +\item{...}{Additional arguments (unused)} +} +\value{ +Tibble of EpiAutoGP model samples with columns: + .draw, date, geo_value, disease, .variable, .value, resolution, + aggregated_numerator, aggregated_denominator +} +\description{ +Reads EpiAutoGP (Julia) model output from parquet files and formats +into standardized tidy format for downstream processing. +} diff --git a/pipelines/epiautogp/README.md b/pipelines/epiautogp/README.md index 9b9ae9e2..753a4e1e 100644 --- a/pipelines/epiautogp/README.md +++ b/pipelines/epiautogp/README.md @@ -1,38 +1,72 @@ # EpiAutoGP Integration Module -This module provides an interface with the `EpiAutoGP` model within the pyrenew-hew pipeline. +This module provides a forecasting pipeline interface for the `EpiAutoGP` model. -## Components +## Overview -### `prep_epiautogp_data.py` +The EpiAutoGP pipeline supports forecasting of: +- **NSSP ED visits**: Emergency department visits from the National Syndromic Surveillance Program +- **NHSN hospital admissions**: Hospital admission counts from the National Healthcare Safety Network -Contains the main data conversion function: +It operates on both **daily** and **epiweekly** temporal frequencies, with optional percentage transformations for ED visit data. -- **`convert_to_epiautogp_json()`**: Converts surveillance data to EpiAutoGP JSON format - - Supports both NSSP (ED visits) and NHSN (hospital admission counts) - - Supports daily and epiweekly data frequencies - - Option to convert ED visits to percentages of total ED visits - - Handles both legacy JSON format and TSV files - - Validates input parameters and data availability +## Pipeline Architecture + +The forecasting pipeline consists of five main steps: + +1. **Setup**: Load data, validate dates, create directory structure +2. **Data Preparation**: Process location data, evaluation data, and generate epiweekly datasets +3. **Data Conversion**: Transform data into EpiAutoGP's JSON input format +4. **Model Execution**: Run the Julia-based EpiAutoGP model +5. **Post-processing**: Process outputs, create hubverse tables, and generate plots + +## Module Components -## Data Sources +### `forecast_epiautogp.py` -The function can read from two types of data sources: +Main entry point for the forecasting pipeline. -### 1. Legacy JSON Format -- `data_for_model_fit.json` file containing `nssp_training_data` and `nhsn_training_data` +**Key Functions:** +- **`main()`**: Orchestrates the complete pipeline from setup to post-processing +- **`run_epiautogp_forecast()`**: Executes the Julia EpiAutoGP model with specified parameters -### 2. TSV Files (Recommended) -- **Daily data**: `combined_training_data.tsv` - - Contains: `observed_ed_visits`, `other_ed_visits`, `observed_hospital_admissions` -- **Epiweekly data**: `epiweekly_combined_training_data.tsv` - - Contains: `observed_ed_visits`, `other_ed_visits`, `observed_hospital_admissions` +**EpiAutoGP-Specific Parameters:** +- `--target`: Data type (`nssp` or `nhsn`) +- `--frequency`: Temporal frequency (`daily` or `epiweekly`) +- `--use-percentage`: Convert ED visits to percentage of total visits +- `--n-particles`: Number of particles for Sequential Monte Carlo (default: 24) +- `--n-mcmc`: MCMC steps for GP kernel structure (default: 100) +- `--n-hmc`: HMC steps for GP kernel hyperparameters (default: 50) +- `--n-forecast-draws`: Number of forecast draws (default: 2000) +- `--smc-data-proportion`: Data proportion per SMC step (default: 0.1) +### `epiautogp_forecast_utils.py` -## Output Format +Shared utilities for the forecast pipeline, containing modular functions for each pipeline stage. + +**Data Classes:** +- **`ForecastPipelineContext`**: Container for shared pipeline state (disease, location, dates, data sources, logger) +- **`ModelPaths`**: Container for output directory structure and file paths + +### `prep_epiautogp_data.py` + +Data conversion utilities for EpiAutoGP JSON format. + +**Key Function:** +- **`convert_to_epiautogp_json()`**: Converts surveillance data to EpiAutoGP JSON format + - Supports both NSSP (ED visits) and NHSN (hospital admission counts) + - Handles daily and epiweekly data frequencies + - Optional percentage transformation for ED visits + - Validates input parameters and data availability -The function generates a JSON file with the following structure: +**Input Data Sources:** +1. **Legacy JSON Format**: `data_for_model_fit.json` with `nssp_training_data` and `nhsn_training_data` +2. **TSV Files (Recommended)**: + - Daily: `combined_training_data.tsv` + - Epiweekly: `epiweekly_combined_training_data.tsv` + - Contains: `observed_ed_visits`, `other_ed_visits`, `observed_hospital_admissions` +**Output Format:** ```json { "dates": ["2024-09-22", "2024-09-23", ...], @@ -45,3 +79,47 @@ The function generates a JSON file with the following structure: "nowcast_reports": [] } ``` + +### `process_epiautogp_forecast.py` + +Post-processing utilities for EpiAutoGP outputs. + +**Key Function:** +- **`calculate_credible_intervals()`**: Computes median and credible intervals from posterior samples + - Default intervals: 50%, 80%, 95% +- **`process_epiautogp_forecast()`**: Converts Julia outputs to R plotting format + - Reads raw EpiAutoGP parquet files + - Calculates credible intervals + - Saves processed `samples.parquet` and `ci.parquet` files + +### `plot_epiautogp_forecast.R` + +R script for generating forecast visualizations specific to EpiAutoGP outputs. + +## Output Structure + +``` +output_dir/ +└── {disease}_r_{report_date}_f_{first_train}_t_{last_train}/ + └── model_runs/ + └── {loc}/ + └── epiautogp_{target}_{frequency}[_pct]/ + ├── data/ + │ ├── combined_training_data.tsv + │ ├── epiweekly_combined_training_data.tsv + │ └── eval_data.tsv + ├── input.json + ├── samples.parquet + ├── ci.parquet + ├── forecast.parquet (raw EpiAutoGP output) + ├── hubverse_table.csv + └── plots/ +``` + +## Integration with pyrenew-hew + +This module follows the same design patterns as other forecasting models in the pyrenew-hew pipeline: +- Shared pipeline utilities (`setup_forecast_pipeline`, `prepare_model_data`) +- Common data formats (TSV training data, hubverse tables) +- Consistent directory structure +- Modular, reusable functions exported through `__init__.py` diff --git a/pipelines/epiautogp/__init__.py b/pipelines/epiautogp/__init__.py index 6f946442..6ee5307c 100644 --- a/pipelines/epiautogp/__init__.py +++ b/pipelines/epiautogp/__init__.py @@ -2,6 +2,10 @@ EpiAutoGP integration module for pyrenew-hew pipelines. """ +from pipelines.epiautogp.epiautogp_forecast_utils import setup_forecast_pipeline from pipelines.epiautogp.prep_epiautogp_data import convert_to_epiautogp_json -__all__ = ["convert_to_epiautogp_json"] +__all__ = [ + "convert_to_epiautogp_json", + "setup_forecast_pipeline", +] diff --git a/pipelines/epiautogp/epiautogp_forecast_utils.py b/pipelines/epiautogp/epiautogp_forecast_utils.py new file mode 100644 index 00000000..e05500e4 --- /dev/null +++ b/pipelines/epiautogp/epiautogp_forecast_utils.py @@ -0,0 +1,347 @@ +""" +Shared utilities for forecast pipeline scripts. + +This module contains common functionality used across different forecast +pipelines (pyrenew, timeseries, epiautogp, etc.). +""" + +import logging +import os +from dataclasses import dataclass +from datetime import date, timedelta +from pathlib import Path +from typing import Any + +import polars as pl + +from pipelines.common_utils import ( + calculate_training_dates, + create_hubverse_table, + get_available_reports, + load_credentials, + load_nssp_data, + parse_and_validate_report_date, + plot_and_save_loc_forecast, +) +from pipelines.forecast_pyrenew import generate_epiweekly_data +from pipelines.prep_data import process_and_save_loc_data +from pipelines.prep_eval_data import save_eval_data + + +@dataclass +class ModelPaths: + """ + Container for model output directory structure and file paths. + + This class holds all the computed output paths for a specific model run, + making it easier to track where data and results are stored. + """ + + model_output_dir: Path + data_dir: Path + daily_training_data: Path + epiweekly_training_data: Path + + +@dataclass +class ForecastPipelineContext: + """ + Container for common forecast pipeline data, input configurations and + the logger. + + This class holds all the shared state that gets passed around during + a forecast pipeline run, reducing the number of parameters that need + to be passed between functions. + """ + + disease: str + loc: str + target: str + frequency: str + use_percentage: bool + ed_visit_type: str + model_name: str + param_data_dir: Path | None + eval_data_path: Path | None + nhsn_data_path: Path | None + report_date: date + first_training_date: date + last_training_date: date + n_forecast_days: int + exclude_last_n_days: int + model_batch_dir: Path + model_run_dir: Path + credentials_dict: dict[str, Any] + facility_level_nssp_data: pl.LazyFrame + loc_level_nssp_data: pl.LazyFrame + logger: logging.Logger + + def prepare_model_data(self) -> ModelPaths: + """ + Prepare training and evaluation data for a model. + + This function performs the data preparation steps that are common across + all forecast pipelines: + 1. Create model output directory + 2. Process and save location data + 3. Save evaluation data + 4. Generate epiweekly datasets + + Returns + ------- + ModelPaths + Object containing all model output directory and file paths + + Raises + ------ + ValueError + If eval_data_path is None + """ + # Create model output directory + model_output_dir = Path(self.model_run_dir, self.model_name) + data_dir = Path(model_output_dir, "data") + os.makedirs(data_dir, exist_ok=True) + + self.logger.info(f"Processing data for {self.loc}") + + # Process and save location data + process_and_save_loc_data( + loc_abb=self.loc, + disease=self.disease, + facility_level_nssp_data=self.facility_level_nssp_data, + loc_level_nssp_data=self.loc_level_nssp_data, + report_date=self.report_date, + first_training_date=self.first_training_date, + last_training_date=self.last_training_date, + save_dir=data_dir, + logger=self.logger, + credentials_dict=self.credentials_dict, + nhsn_data_path=self.nhsn_data_path, + ) + + # Save evaluation data + self.logger.info("Getting eval data...") + if self.eval_data_path is None: + raise ValueError("No path to an evaluation dataset provided.") + + save_eval_data( + loc=self.loc, + disease=self.disease, + first_training_date=self.first_training_date, + last_training_date=self.last_training_date, + latest_comprehensive_path=self.eval_data_path, + output_data_dir=data_dir, + last_eval_date=self.report_date + timedelta(days=self.n_forecast_days), + credentials_dict=self.credentials_dict, + nhsn_data_path=self.nhsn_data_path, + ) + self.logger.info("Done getting eval data.") + + # Generate epiweekly datasets + self.logger.info("Generating epiweekly datasets from daily datasets...") + generate_epiweekly_data(data_dir) + + self.logger.info("Data preparation complete.") + + # Return structured paths object + return ModelPaths( + model_output_dir=model_output_dir, + data_dir=data_dir, + daily_training_data=Path(data_dir, "combined_training_data.tsv"), + epiweekly_training_data=Path( + data_dir, "epiweekly_combined_training_data.tsv" + ), + ) + + def post_process_forecast(self) -> None: + """ + Post-process forecast outputs: create hubverse table and generate plots. + + This function performs the final post-processing steps: + 1. Generate forecast plots using hewr via plot_and_save_loc_forecast + (which also processes samples via hewr::process_loc_forecast) + 2. Create hubverse table from processed outputs + + The plot_and_save_loc_forecast function with model_name auto-detects + the model type and dispatches to process_model_samples.epiautogp(), + which reads Julia output samples, adds metadata, calculates credible + intervals, and saves formatted outputs. + + Returns + ------- + None + """ + # Generate forecast plots and process samples using hewr + # The model_name parameter triggers auto-detection and S3 dispatch to + # process_model_samples.epiautogp() which handles Julia output format + self.logger.info("Processing forecast and generating plots...") + plot_and_save_loc_forecast( + model_run_dir=self.model_run_dir, + n_forecast_days=self.n_forecast_days, + model_name=self.model_name, + ) + self.logger.info("Processing and plotting complete.") + + # Create hubverse table from processed outputs + self.logger.info("Creating hubverse table...") + create_hubverse_table(Path(self.model_run_dir, self.model_name)) + self.logger.info("Postprocessing complete.") + + +def setup_forecast_pipeline( + disease: str, + report_date: str, + loc: str, + target: str, + frequency: str, + use_percentage: bool, + ed_visit_type: str, + model_name: str, + param_data_dir: Path | None, + eval_data_path: Path | None, + nhsn_data_path: Path | None, + facility_level_nssp_data_dir: Path | str, + state_level_nssp_data_dir: Path | str, + output_dir: Path | str, + n_training_days: int, + n_forecast_days: int, + exclude_last_n_days: int = 0, + credentials_path: Path = None, + logger: logging.Logger = None, +) -> ForecastPipelineContext: + """ + Set up common forecast pipeline infrastructure. + + This function performs the initial setup steps that are common across + all forecast pipelines: + 1. Load credentials + 2. Get available report dates + 3. Parse and validate the report date + 4. Calculate training dates + 5. Load NSSP data + 6. Create batch directory structure + + Parameters + ---------- + disease : str + Disease to model (e.g., "COVID-19", "Influenza", "RSV") + report_date : str + Report date in YYYY-MM-DD format or "latest" + loc : str + Two-letter USPS location abbreviation (e.g., "CA", "NY") + target : str + Target data type: "nssp" or "nhsn" + frequency : str + Data frequency: "daily" or "epiweekly" + use_percentage : bool + If True, use percentage values for ED visits (NSSP only) + ed_visit_type : str + Type of ED visits: "observed" or "other" (NSSP only) + model_name : str + Name of the model configuration + param_data_dir : Path | None + Directory containing parameter data + eval_data_path : Path | None + Path to evaluation data file + nhsn_data_path : Path | None + Path to NHSN hospital admission data + facility_level_nssp_data_dir : Path | str + Directory containing facility-level NSSP ED visit data + state_level_nssp_data_dir : Path | str + Directory containing state-level NSSP ED visit data + output_dir : Path | str + Root directory for output + n_training_days : int + Number of days of training data + n_forecast_days : int + Number of days ahead to forecast + exclude_last_n_days : int, default=0 + Number of recent days to exclude from training + credentials_path : Path | None, default=None + Path to credentials file + logger : logging.Logger | None, default=None + Logger instance. If None, creates a new logger + + Returns + ------- + ForecastPipelineContext + Context object containing all setup information + """ + if logger is None: + logger = logging.getLogger(__name__) + + logger.info( + f"Setting up forecast pipeline for {disease}, " + f"location {loc}, report date {report_date}" + ) + + # Load credentials + credentials_dict = load_credentials(credentials_path, logger) + + # Get available reports + available_facility_level_reports = get_available_reports( + facility_level_nssp_data_dir + ) + available_loc_level_reports = get_available_reports(state_level_nssp_data_dir) + + # Parse and validate report date + report_date_parsed, loc_report_date = parse_and_validate_report_date( + report_date, + available_facility_level_reports, + available_loc_level_reports, + logger, + ) + + # Calculate training dates + first_training_date, last_training_date = calculate_training_dates( + report_date_parsed, + n_training_days, + exclude_last_n_days, + logger, + ) + + # Load NSSP data + facility_level_nssp_data, loc_level_nssp_data = load_nssp_data( + report_date_parsed, + loc_report_date, + available_facility_level_reports, + available_loc_level_reports, + facility_level_nssp_data_dir, + state_level_nssp_data_dir, + logger, + ) + + # Create model batch directory structure + model_batch_dir_name = ( + f"{disease.lower()}_r_{report_date_parsed}_f_" + f"{first_training_date}_t_{last_training_date}" + ) + model_batch_dir = Path(output_dir, model_batch_dir_name) + model_run_dir = Path(model_batch_dir, "model_runs", loc) + + logger.info(f"Model batch directory: {model_batch_dir}") + logger.info(f"Model run directory: {model_run_dir}") + + return ForecastPipelineContext( + disease=disease, + loc=loc, + target=target, + frequency=frequency, + use_percentage=use_percentage, + ed_visit_type=ed_visit_type, + model_name=model_name, + param_data_dir=param_data_dir, + eval_data_path=eval_data_path, + nhsn_data_path=nhsn_data_path, + report_date=report_date_parsed, + first_training_date=first_training_date, + last_training_date=last_training_date, + n_forecast_days=n_forecast_days, + exclude_last_n_days=exclude_last_n_days, + model_batch_dir=model_batch_dir, + model_run_dir=model_run_dir, + credentials_dict=credentials_dict, + facility_level_nssp_data=facility_level_nssp_data, + loc_level_nssp_data=loc_level_nssp_data, + logger=logger, + ) diff --git a/pipelines/epiautogp/forecast_epiautogp.py b/pipelines/epiautogp/forecast_epiautogp.py new file mode 100644 index 00000000..1b441e09 --- /dev/null +++ b/pipelines/epiautogp/forecast_epiautogp.py @@ -0,0 +1,394 @@ +import argparse +import logging +from pathlib import Path + +from pipelines.cli_utils import add_common_forecast_arguments +from pipelines.common_utils import ( + run_julia_code, + run_julia_script, +) +from pipelines.epiautogp import ( + convert_to_epiautogp_json, + setup_forecast_pipeline, +) + + +def run_epiautogp_forecast( + json_input_path: Path, + model_dir: Path, + params: dict, + execution_settings: dict, +) -> None: + """ + Run EpiAutoGP forecasting model using Julia. + + Parameters + ---------- + json_input_path : Path + Path to the JSON input file for EpiAutoGP. + model_dir : Path + Directory to save model outputs. + params : dict + Parameters to pass to EpiAutoGP. Expected keys: + - n_ahead: Number of time steps to forecast + - n_particles: Number of particles for SMC + - n_mcmc: Number of MCMC steps for GP kernel structure + - n_hmc: Number of HMC steps for GP kernel hyperparameters + - n_forecast_draws: Number of forecast draws to generate + - transformation: Type of transformation ("percentage" or "boxcox") + - smc_data_proportion: Proportion of data used in each SMC step + execution_settings : dict + Execution settings for the Julia environment. Expected keys: + - project: Julia project name + - threads: Number of threads to use + + Returns + ------- + None + + Raises + ------ + RuntimeError + If Julia environment setup or script execution fails + + Notes + ----- + This function sets up the EpiAutoGP Julia environment and runs the + forecasting script. The output is saved to model_dir. + """ + # Ensure output directory exists + model_dir.mkdir(parents=True, exist_ok=True) + + # Instantiate julia environment for EpiAutoGP + run_julia_code( + """ + using Pkg + Pkg.activate("EpiAutoGP") + Pkg.instantiate() + """, + function_name="setup_epiautogp_environment", + ) + + # Add path arguments to pass to EpiAutoGP + params["json-input"] = str(json_input_path) + params["output-dir"] = str(model_dir) + + # Convert Python dict keys (with underscores) to Julia CLI args (with hyphens) + args_to_epiautogp = [ + f"--{key.replace('_', '-')}={value}" for key, value in params.items() + ] + executor_flags = [f"--{key}={value}" for key, value in execution_settings.items()] + + # Run Julia script + run_julia_script( + "EpiAutoGP/run.jl", + args_to_epiautogp, + executor_flags=executor_flags, + function_name="run_epiautogp_forecast", + ) + return None + + +def main( + disease: str, + report_date: str, + loc: str, + facility_level_nssp_data_dir: Path | str, + state_level_nssp_data_dir: Path | str, + param_data_dir: Path | str, + output_dir: Path | str, + n_training_days: int, + n_forecast_days: int, + target: str, + frequency: str, + use_percentage: bool = False, + ed_visit_type: str = "observed", + exclude_last_n_days: int = 0, + eval_data_path: Path = None, + credentials_path: Path = None, + nhsn_data_path: Path = None, + n_particles: int = 24, + n_mcmc: int = 100, + n_hmc: int = 50, + n_forecast_draws: int = 2000, + smc_data_proportion: float = 0.1, + n_threads: int = 1, +) -> None: + """ + Run the complete EpiAutoGP forecasting pipeline for a single location. + + This function orchestrates the full EpiAutoGP forecasting pipeline: + 1. Sets up logging and generates model name + 2. Loads and validates data + 3. Prepares training and evaluation datasets + 4. Converts data to EpiAutoGP JSON format + 5. Runs EpiAutoGP forecasting model + 6. Post-processes forecast outputs and generates plots + + Parameters + ---------- + disease : str + Disease to model (e.g., "COVID-19", "Influenza", "RSV") + report_date : str + Report date in YYYY-MM-DD format or "latest" + loc : str + Two-letter USPS location abbreviation (e.g., "CA", "NY") + facility_level_nssp_data_dir : Path | str + Directory containing facility-level NSSP ED visit data + state_level_nssp_data_dir : Path | str + Directory containing state-level NSSP ED visit data + param_data_dir : Path | str + Directory containing parameter data for the model + output_dir : Path | str + Root directory for output + n_training_days : int + Number of days of training data + n_forecast_days : int + Number of days ahead to forecast + target : str + Target data type: "nssp" for ED visit data or + "nhsn" for hospital admission counts + frequency : str + Data frequency: "daily" or "epiweekly" + use_percentage : bool, default=False + If True, convert ED visits to percentage of total ED visits + (only applicable for NSSP target) + ed_visit_type : str, default="observed" + Type of ED visits to model: "observed" (disease-related) or + "other" (non-disease background). Only applicable for NSSP target + exclude_last_n_days : int, default=0 + Number of recent days to exclude from training + eval_data_path : Path | None, default=None + Path to evaluation data file + credentials_path : Path | None, default=None + Path to credentials file for data access + nhsn_data_path : Path | None, default=None + Path to NHSN hospital admission data + n_particles : int, default=24 + Number of particles for Sequential Monte Carlo (SMC) + n_mcmc : int, default=100 + Number of MCMC steps for GP kernel structure learning + n_hmc : int, default=50 + Number of Hamiltonian Monte Carlo steps for GP hyperparameters + n_forecast_draws : int, default=2000 + Number of forecast draws to generate + smc_data_proportion : float, default=0.1 + Proportion of data used in each SMC step + n_threads : int, default=1 + Number of threads for Julia execution + + Returns + ------- + None + + Raises + ------ + ValueError + If invalid parameter combinations are provided (e.g., use_percentage=True + with target="nhsn", or frequency="daily" with target="nhsn") + FileNotFoundError + If required data files or directories don't exist + RuntimeError + If Julia execution or R plotting fails + + Notes + ----- + For epiweekly forecasts, n_forecast_days is converted to weeks by dividing + by 7 and rounding up. The transformation type is set to "percentage" if + use_percentage=True, otherwise "boxcox" is used. + + The model name is automatically generated based on target, frequency, + use_percentage, and ed_visit_type parameters. + """ + # Step 0: Set up logging, model name and params to pass to epiautogp + logging.basicConfig(level=logging.INFO) + logger = logging.getLogger(__name__) + + # Generate model name + model_name = f"epiautogp_{target}_{frequency}" + if use_percentage: + model_name += "_pct" + if ed_visit_type == "other": + model_name += "_other" + + # Declare transformation type + if use_percentage: + transformation = "percentage" + else: + transformation = "boxcox" + + # Calculate n_ahead based on frequency + # For epiweekly data, convert days to weeks (rounded up) + if frequency == "epiweekly": + n_ahead = (n_forecast_days + 6) // 7 # Round up to nearest week + else: + n_ahead = n_forecast_days + + # Epiautogp params and execution settings + params = { + "n_ahead": n_ahead, + "n_particles": n_particles, + "n_mcmc": n_mcmc, + "n_hmc": n_hmc, + "n_forecast_draws": n_forecast_draws, + "transformation": transformation, + "smc_data_proportion": smc_data_proportion, + } + execution_settings = { + "project": "EpiAutoGP", + "threads": n_threads, + } + + logger.info( + "Starting single-location EpiAutoGP forecasting pipeline for " + f"location {loc}, and report date {report_date}" + ) + + # Step 1: Setup pipeline (loads data, validates dates, creates directories) + # This is the context of the forecast pipeline + context = setup_forecast_pipeline( + disease=disease, + report_date=report_date, + loc=loc, + target=target, + frequency=frequency, + use_percentage=use_percentage, + ed_visit_type=ed_visit_type, + model_name=model_name, + param_data_dir=param_data_dir, + eval_data_path=eval_data_path, + nhsn_data_path=nhsn_data_path, + facility_level_nssp_data_dir=facility_level_nssp_data_dir, + state_level_nssp_data_dir=state_level_nssp_data_dir, + output_dir=output_dir, + n_training_days=n_training_days, + n_forecast_days=n_forecast_days, + exclude_last_n_days=exclude_last_n_days, + credentials_path=credentials_path, + logger=logger, + ) + + # Step 2: Prepare data for modelling (process location data, eval data, epiweekly data) + # returns paths to prepared data files and directories + paths = context.prepare_model_data() + + # Step 3: Convert data to EpiAutoGP JSON format + logger.info("Converting data to EpiAutoGP JSON format...") + epiautogp_input_json_path = convert_to_epiautogp_json( + context=context, + paths=paths, + ) + + # Step 4: Run EpiAutoGP forecast + logger.info("Performing EpiAutoGP forecasting...") + run_epiautogp_forecast( + json_input_path=epiautogp_input_json_path, + model_dir=paths.model_output_dir, + params=params, + execution_settings=execution_settings, + ) + + # Step 5: Post-process forecast outputs + context.post_process_forecast() + + logger.info( + "Single-location EpiAutoGP pipeline complete " + f"for location {loc}, and " + f"report date {report_date}." + ) + return None + + +if __name__ == "__main__": + parser = argparse.ArgumentParser( + description="Run EpiAutoGP forecasting pipeline for disease modeling." + ) + + # Add common arguments + add_common_forecast_arguments(parser) + + # Add EpiAutoGP-specific arguments + parser.add_argument( + "--target", + type=str, + required=True, + choices=["nssp", "nhsn"], + help=( + "Target data type: 'nssp' for ED visit data or " + "'nhsn' for hospital admission counts." + ), + ) + + parser.add_argument( + "--frequency", + type=str, + default="epiweekly", + choices=["daily", "epiweekly"], + help="Data frequency: 'daily' or 'epiweekly' (default: epiweekly).", + ) + + parser.add_argument( + "--use-percentage", + action="store_true", + help=( + "Convert ED visits to percentage of total ED visits " + "(only applicable for NSSP target)." + ), + ) + + parser.add_argument( + "--ed-visit-type", + type=str, + default="observed", + choices=["observed", "other"], + help=( + "Type of ED visits to model: 'observed' (disease-related) or " + "'other' (non-disease background). Only applicable for NSSP target " + "(default: observed)." + ), + ) + + parser.add_argument( + "--n-particles", + type=int, + default=24, + help="Number of particles for SMC (default: 24).", + ) + + parser.add_argument( + "--n-mcmc", + type=int, + default=100, + help="Number of MCMC steps for GP kernel structure (default: 100).", + ) + + parser.add_argument( + "--n-hmc", + type=int, + default=50, + help="Number of HMC steps for GP kernel hyperparameters (default: 50).", + ) + + parser.add_argument( + "--n-forecast-draws", + type=int, + default=2000, + help="Number of forecast draws (default: 2000).", + ) + + parser.add_argument( + "--smc-data-proportion", + type=float, + default=0.1, + help="Proportion of data used in each SMC step (default: 0.1).", + ) + + parser.add_argument( + "--n-threads", + type=int, + default=1, + help="Number of threads to use for EpiAutoGP computations (default: 1).", + ) + + args = parser.parse_args() + main(**vars(args)) diff --git a/pipelines/epiautogp/prep_epiautogp_data.py b/pipelines/epiautogp/prep_epiautogp_data.py index 7db3a5f1..7bb6bf4b 100644 --- a/pipelines/epiautogp/prep_epiautogp_data.py +++ b/pipelines/epiautogp/prep_epiautogp_data.py @@ -12,11 +12,17 @@ import polars as pl +from pipelines.epiautogp.epiautogp_forecast_utils import ( + ForecastPipelineContext, + ModelPaths, +) + def _validate_epiautogp_parameters( target: str, frequency: str, use_percentage: bool, + ed_visit_type: str, ) -> None: """ Validate EpiAutoGP conversion parameters. @@ -24,8 +30,10 @@ def _validate_epiautogp_parameters( The inadmissible parameter combinations are: - `target` not in ['nssp', 'nhsn'] - `frequency` not in ['daily', 'epiweekly'] + - `ed_visit_type` not in ['observed', 'other'] - `use_percentage` is `True` when target is 'nhsn' (NHSN data are always counts) - `frequency` is 'daily' when target is 'nhsn' (NHSN data are only epiweekly) + - `ed_visit_type` is not 'observed' when target is 'nhsn' """ # Validate individual parameters if target not in ["nssp", "nhsn"]: @@ -34,6 +42,11 @@ def _validate_epiautogp_parameters( if frequency not in ["daily", "epiweekly"]: raise ValueError(f"frequency must be 'daily' or 'epiweekly', got '{frequency}'") + if ed_visit_type not in ["observed", "other"]: + raise ValueError( + f"ed_visit_type must be 'observed' or 'other', got '{ed_visit_type}'" + ) + # Validate parameter combinations if target == "nhsn" and use_percentage: raise ValueError( @@ -44,20 +57,18 @@ def _validate_epiautogp_parameters( if target == "nhsn" and frequency == "daily": raise ValueError("NHSN data is only available in epiweekly frequency.") + if target == "nhsn" and ed_visit_type != "observed": + raise ValueError( + "ed_visit_type is only applicable when target='nssp'. " + "For NHSN, ed_visit_type must be 'observed'." + ) + def convert_to_epiautogp_json( - daily_training_data_path: Path, - epiweekly_training_data_path: Path, - output_json_path: Path, - disease: str, - location: str, - forecast_date: dt.date, - target: str = "nssp", - frequency: str = "epiweekly", - use_percentage: bool = False, + context: ForecastPipelineContext, + paths: ModelPaths, nowcast_dates: list[dt.date] | None = None, nowcast_reports: list[list[float]] | None = None, - logger: logging.Logger | None = None, ) -> Path: """ Convert surveillance data to EpiAutoGP JSON format. @@ -68,38 +79,18 @@ def convert_to_epiautogp_json( Parameters ---------- - daily_training_data_path : Path - Path to the TSV file containing daily training data - (e.g., combined_training_data.tsv) - epiweekly_training_data_path : Path - Path to the TSV file containing epiweekly training data - (e.g., epiweekly_combined_training_data.tsv) - output_json_path : Path - Path where the EpiAutoGP JSON file will be saved - disease : str - Disease name (e.g., "COVID-19", "Influenza", "RSV") - location : str - Location abbreviation (e.g., "CA", "US", "DC") - forecast_date : dt.date - The reference date from which forecasting begins - target : str, default="nssp" - Target data type: "nssp" for ED visit data or - "nhsn" for hospital admission counts - frequency : str, default="epiweekly" - Data frequency: "daily" or "epiweekly" - use_percentage : bool, default=False - If True, convert ED visits to percentage: - observed_ed_visits / (observed_ed_visits + other_ed_visits) * 100 - Only applicable for NSSP target. - nowcast_dates : Optional[list[dt.date]], default=None + context : ForecastPipelineContext + Forecast pipeline context containing disease, location, report_date, + target, frequency, use_percentage, ed_visit_type, and logger + paths : ModelPaths + Model paths containing daily and epiweekly training data paths, + and model_output_dir where the JSON file will be saved + nowcast_dates : list[dt.date] | `None`, default=`None` Dates requiring nowcasting (typically recent dates with - incomplete data). If None, defaults to empty list. Not currently used. - nowcast_reports : Optional[list[list[float]]], default=None - Uncertainty bounds or samples for nowcast dates. If None, + incomplete data). If `None`, defaults to empty list. Not currently used. + nowcast_reports : list[list[float]] | `None`, default=`None` + Samples for nowcast dates to represent nowcast uncertainty. If `None`, defaults to empty list. Not currently used. - logger : Optional[logging.Logger], default=None - Logger instance for logging messages. If None, a module-level - logger will be created. Returns ------- @@ -111,29 +102,38 @@ def convert_to_epiautogp_json( ValueError If target is not "nssp" or "nhsn", if frequency is not "daily" or "epiweekly", if use_percentage is True when target is "nhsn", - or if the required data is not present + if frequency is "daily" when target is "nhsn", + if ed_visit_type is not "observed" when target is "nhsn", + or if the required data is not present in the TSV files FileNotFoundError If data files don't exist - Required Output Structure + Notes ----- - The output JSON for `EpiAutoGP` must have the following structure: + The output JSON file is saved to: + `paths.model_output_dir / f"{context.model_name}_input.json"` + + The output JSON for EpiAutoGP has the following structure: { "dates": ["2024-01-01", "2024-01-02", ...], "reports": [45.0, 52.0, ...], "pathogen": "COVID-19", "location": "CA", "target": "nssp", + "frequency": "epiweekly", + "use_percentage": false, + "ed_visit_type": "observed", "forecast_date": "2024-01-02", - "nowcast_dates": [], # eventually vector of dates for nowcasting - "nowcast_reports": [] # eventually vector of vectors for nowcast uncertainty + "nowcast_dates": [], + "nowcast_reports": [] } """ - if logger is None: - logger = logging.getLogger(__name__) + logger = context.logger # Validate parameters - _validate_epiautogp_parameters(target, frequency, use_percentage) + _validate_epiautogp_parameters( + context.target, context.frequency, context.use_percentage, context.ed_visit_type + ) # Set defaults for nowcasting if nowcast_dates is None: @@ -141,21 +141,24 @@ def convert_to_epiautogp_json( if nowcast_reports is None: nowcast_reports = [] + # Define input data JSON path + input_json_path = paths.model_output_dir / f"{context.model_name}_input.json" # Determine which data path to use based on frequency - if frequency == "daily": - data_path = daily_training_data_path + if context.frequency == "daily": + data_path = paths.daily_training_data else: # epiweekly - data_path = epiweekly_training_data_path + data_path = paths.epiweekly_training_data # Read data from TSV - logger.info(f"Reading {frequency} data from {data_path}") + logger.info(f"Reading {context.frequency} data from {data_path}") dates, reports = _read_tsv_data( data_path, - disease, - location, - target, - frequency, - use_percentage, + context.disease, + context.loc, + context.target, + context.frequency, + context.use_percentage, + context.ed_visit_type, logger, ) @@ -163,25 +166,28 @@ def convert_to_epiautogp_json( epiautogp_input = { "dates": [d.isoformat() for d in dates], "reports": reports, - "pathogen": disease, - "location": location, - "target": target, - "forecast_date": forecast_date.isoformat(), + "pathogen": context.disease, + "location": context.loc, + "target": context.target, + "frequency": context.frequency, + "use_percentage": context.use_percentage, + "ed_visit_type": context.ed_visit_type, + "forecast_date": context.report_date.isoformat(), "nowcast_dates": [d.isoformat() for d in nowcast_dates], "nowcast_reports": nowcast_reports, } # Write JSON file - output_json_path.parent.mkdir(parents=True, exist_ok=True) - with open(output_json_path, "w") as f: + input_json_path.parent.mkdir(parents=True, exist_ok=True) + with open(input_json_path, "w") as f: json.dump(epiautogp_input, f, indent=2) logger.info( - f"Saved EpiAutoGP input JSON for {disease} {location} " - f"(target={target}) to {output_json_path}" + f"Saved EpiAutoGP input JSON for {context.disease} {context.loc} " + f"(target={context.target}) to {input_json_path}" ) - return output_json_path + return input_json_path def _read_tsv_data( @@ -191,10 +197,48 @@ def _read_tsv_data( target: str, frequency: str, use_percentage: bool, + ed_visit_type: str, logger: logging.Logger, ) -> tuple[list[dt.date], list[float]]: """ - Read surveillance data from TSV files. + Read surveillance data from TSV files and extract target variable. + + Reads a TSV file containing surveillance data, filters for the specified + disease and location, pivots the data, and extracts the appropriate + target variable (NSSP ED visits or NHSN hospital admissions). + + Parameters + ---------- + tsv_path : Path + Path to the TSV file containing surveillance data + disease : str + Disease name (case-insensitive) + location : str + Geographic location code (e.g., "CA", "US") + target : str + Target data type: "nssp" or "nhsn" + frequency : str + Data frequency: "daily" or "epiweekly" + use_percentage : bool + If True, convert ED visits to percentage (only for NSSP) + ed_visit_type : str + Type of ED visits: "observed" or "other" (only for NSSP) + logger : logging.Logger + Logger for progress messages + + Returns + ------- + tuple[list[dt.date], list[float]] + Tuple of (dates, reports) where dates is a list of dates and + reports is a list of corresponding values + + Raises + ------ + FileNotFoundError + If the TSV file doesn't exist + ValueError + If no data is found for the specified disease and location, + or if required columns are missing """ if not tsv_path.exists(): raise FileNotFoundError(f"TSV file not found: {tsv_path}") @@ -227,7 +271,7 @@ def _read_tsv_data( # Extract data based on target if target == "nssp": dates, reports = _extract_nssp_from_pivot( - df_pivot, use_percentage, tsv_path, logger + df_pivot, use_percentage, ed_visit_type, tsv_path, logger ) else: # target == "nhsn" dates, reports = _extract_nhsn_from_pivot(df_pivot, tsv_path, logger) @@ -243,38 +287,77 @@ def _read_tsv_data( def _extract_nssp_from_pivot( df_pivot: pl.DataFrame, use_percentage: bool, + ed_visit_type: str, tsv_path: Path, logger: logging.Logger, ) -> tuple[list[dt.date], list[float]]: """ Extract NSSP ED visit data from pivoted DataFrame. + + Extracts emergency department visit data from NSSP (National Syndromic + Surveillance Program) and optionally converts to percentage format. + + Parameters + ---------- + df_pivot : pl.DataFrame + Pivoted DataFrame with columns for dates and ED visit types + use_percentage : bool + If True, calculate percentage: ed_visits / total_ed_visits * 100 + ed_visit_type : str + Type of ED visits to extract: "observed" or "other" + tsv_path : Path + Path to source TSV file (for error messages) + logger : logging.Logger + Logger for progress messages + + Returns + ------- + tuple[list[dt.date], list[float]] + Tuple of (dates, reports) with ED visit counts or percentages + + Raises + ------ + ValueError + If required columns are missing from the DataFrame """ + # Determine which ED visit column to use + if ed_visit_type == "observed": + ed_column = "observed_ed_visits" + else: # ed_visit_type == "other" + ed_column = "other_ed_visits" + # Check required columns - if "observed_ed_visits" not in df_pivot.columns: - raise ValueError(f"Column 'observed_ed_visits' not found in {tsv_path}") + if ed_column not in df_pivot.columns: + raise ValueError(f"Column '{ed_column}' not found in {tsv_path}") if use_percentage: + # For percentage, we need both columns regardless of ed_visit_type + # because the denominator is always total ED visits (observed + other) + if "observed_ed_visits" not in df_pivot.columns: + raise ValueError( + f"Column 'observed_ed_visits' required for percentage calculation but not found in {tsv_path}" + ) if "other_ed_visits" not in df_pivot.columns: raise ValueError( f"Column 'other_ed_visits' required for percentage calculation but not found in {tsv_path}" ) - # Calculate percentage + # Calculate percentage: ed_column / total_ed_visits * 100 df_pivot = df_pivot.with_columns( ( - pl.col("observed_ed_visits") + pl.col(ed_column) / (pl.col("observed_ed_visits") + pl.col("other_ed_visits")) * 100.0 ).alias("value") ) logger.info( - "Using ED visit percentage (observed_ed_visits / total_ed_visits * 100)" + f"Using {ed_visit_type} ED visit percentage ({ed_column} / total_ed_visits * 100)" ) else: # Use raw counts df_pivot = df_pivot.with_columns( - pl.col("observed_ed_visits").cast(pl.Float64).alias("value") + pl.col(ed_column).cast(pl.Float64).alias("value") ) - logger.info("Using raw ED visit counts") + logger.info(f"Using {ed_visit_type} ED visit counts ({ed_column})") # Filter out any rows with null values df_pivot = df_pivot.filter(pl.col("value").is_not_null()) @@ -292,6 +375,28 @@ def _extract_nhsn_from_pivot( ) -> tuple[list[dt.date], list[float]]: """ Extract NHSN hospital admission data from pivoted DataFrame. + + Extracts hospital admission counts from NHSN (National Healthcare Safety + Network) data. NHSN data is always reported as counts, not percentages. + + Parameters + ---------- + df_pivot : pl.DataFrame + Pivoted DataFrame with columns for dates and hospital admissions + tsv_path : Path + Path to source TSV file (for error messages) + logger : logging.Logger + Logger for progress messages + + Returns + ------- + tuple[list[dt.date], list[float]] + Tuple of (dates, reports) with hospital admission counts + + Raises + ------ + ValueError + If the 'observed_hospital_admissions' column is missing """ if "observed_hospital_admissions" not in df_pivot.columns: raise ValueError( diff --git a/pipelines/forecast_pyrenew.py b/pipelines/forecast_pyrenew.py index 922be4b1..fd7abcbc 100644 --- a/pipelines/forecast_pyrenew.py +++ b/pipelines/forecast_pyrenew.py @@ -22,9 +22,7 @@ run_r_script, ) from pipelines.fit_pyrenew_model import fit_and_save_model -from pipelines.generate_predictive import ( - generate_and_save_predictions, -) +from pipelines.generate_predictive import generate_and_save_predictions from pipelines.prep_data import process_and_save_loc_data, process_and_save_loc_param from pipelines.prep_eval_data import save_eval_data from pipelines.prep_ww_data import clean_nwss_data, preprocess_ww_data diff --git a/pipelines/tests/test_epiautogp_end_to_end.sh b/pipelines/tests/test_epiautogp_end_to_end.sh new file mode 100644 index 00000000..682530c2 --- /dev/null +++ b/pipelines/tests/test_epiautogp_end_to_end.sh @@ -0,0 +1,165 @@ +#!/bin/bash + +# End-to-end test for EpiAutoGP forecasting pipeline +# Tests multiple locations and targets (weekly NHSN and weekly percentage NSSP) +# +# Usage: +# bash pipelines/tests/test_epiautogp_end_to_end.sh [--force] +# +# Options: +# --force Delete existing test output directory + +set -e # Exit on any error + +BASE_DIR=pipelines/tests/epiautogp_end_to_end_test_output +LOCATIONS=(US CA MT DC) +DISEASES=(COVID-19) # Start with COVID-19 for testing +FORECAST_DATE=2024-12-21 + +echo "========================================" +echo "EpiAutoGP End-to-End Test" +echo "========================================" +echo "Testing configurations:" +echo " Locations: ${LOCATIONS[*]}" +echo " Diseases: ${DISEASES[*]}" +echo " Targets: NHSN (weekly), NSSP (weekly percentage), NSSP (daily counts), NSSP (daily other ED visits)" +echo " Forecast date: $FORECAST_DATE" +echo "" + +# Step 0: Clean up previous run +if [ -d "$BASE_DIR" ]; then + if [[ "$*" == *"--force"* ]]; then + echo "Cleaning previous test output..." + rm -rf "$BASE_DIR" + else + echo "ERROR: Test output directory exists: $BASE_DIR" + echo "Delete it or run with --force flag" + exit 1 + fi +fi + +# Step 1: Generate test data +echo "=========================================" +echo "Step 1: Generating test data" +echo "=========================================" +uv run python pipelines/generate_test_data.py "$BASE_DIR" + +if [ "$?" -ne 0 ]; then + echo "TEST-MODE FAIL: Generating test data failed" + exit 1 +else + echo "✓ Test data generated" + echo "" +fi + +# Step 2: Run EpiAutoGP forecasting pipeline for all locations and targets +echo "=========================================" +echo "Step 2: Running EpiAutoGP forecasts" +echo "=========================================" + +for location in "${LOCATIONS[@]}"; do + for disease in "${DISEASES[@]}"; do + echo "" + echo "Testing $disease, $location" + echo "-----------------------------------------" + + # Test 1: Weekly NHSN (hospital admissions) + echo " [1/4] Running weekly NHSN forecast..." + bash pipelines/tests/test_epiautogp_fit.sh \ + "$BASE_DIR" \ + "$disease" \ + "$location" \ + "nhsn" \ + "epiweekly" \ + "false" + + if [ "$?" -ne 0 ]; then + echo "TEST-MODE FAIL: NHSN forecast failed for $disease, $location" + exit 1 + else + echo " ✓ Weekly NHSN forecast complete" + fi + + # Test 2: Weekly NSSP percentage (ED visits as percentage) + echo " [2/4] Running weekly NSSP percentage forecast..." + bash pipelines/tests/test_epiautogp_fit.sh \ + "$BASE_DIR" \ + "$disease" \ + "$location" \ + "nssp" \ + "epiweekly" \ + "true" + + if [ "$?" -ne 0 ]; then + echo "TEST-MODE FAIL: NSSP percentage forecast failed for $disease, $location" + exit 1 + else + echo " ✓ Weekly NSSP percentage forecast complete" + fi + + # Test 3: Daily NSSP counts (ED visit counts, not percentages) + echo " [3/4] Running daily NSSP count forecast..." + bash pipelines/tests/test_epiautogp_fit.sh \ + "$BASE_DIR" \ + "$disease" \ + "$location" \ + "nssp" \ + "daily" \ + "false" + + if [ "$?" -ne 0 ]; then + echo "TEST-MODE FAIL: NSSP daily count forecast failed for $disease, $location" + exit 1 + else + echo " ✓ Daily NSSP count forecast complete" + fi + + # Test 4: Daily NSSP other ED visits (non-target background) + echo " [4/4] Running daily NSSP other ED visits forecast..." + bash pipelines/tests/test_epiautogp_fit.sh \ + "$BASE_DIR" \ + "$disease" \ + "$location" \ + "nssp" \ + "daily" \ + "false" \ + "other" + + if [ "$?" -ne 0 ]; then + echo "TEST-MODE FAIL: NSSP daily other ED visits forecast failed for $disease, $location" + exit 1 + else + echo " ✓ Daily NSSP other ED visits forecast complete" + fi + + echo "✓ All forecasts complete for $disease, $location" + done +done + +echo "" +echo "=========================================" +echo "Step 3: Verifying outputs" +echo "=========================================" + +# Count expected outputs (4 targets × number of locations × number of diseases) +EXPECTED_MODELS=$((4 * ${#LOCATIONS[@]} * ${#DISEASES[@]})) +ACTUAL_MODELS=$(find "$BASE_DIR/${FORECAST_DATE}_forecasts" -type d -name "epiautogp_*" | wc -l) + +echo "Expected models: $EXPECTED_MODELS" +echo "Actual models: $ACTUAL_MODELS" + +if [ "$ACTUAL_MODELS" -eq "$EXPECTED_MODELS" ]; then + echo "✓ Output verification passed" +else + echo "ERROR: Output count mismatch" + echo "Directory structure:" + find "$BASE_DIR/${FORECAST_DATE}_forecasts" -type d -name "epiautogp_*" + exit 1 +fi + +echo "" +echo "=========================================" +echo "All Tests Passed!" +echo "=========================================" +echo "Test output saved to: $BASE_DIR" +echo "" diff --git a/pipelines/tests/test_epiautogp_fit.sh b/pipelines/tests/test_epiautogp_fit.sh new file mode 100644 index 00000000..092c2982 --- /dev/null +++ b/pipelines/tests/test_epiautogp_fit.sh @@ -0,0 +1,62 @@ +#!/bin/bash + +# Run a single EpiAutoGP forecast +# Called by test_epiautogp_end_to_end.sh +# +# Usage: +# bash test_epiautogp_fit.sh [ed_visit_type] + +if [[ $# -lt 6 || $# -gt 7 ]]; then + echo "Usage: $0 [ed_visit_type]" + echo " target: nhsn or nssp" + echo " frequency: daily or epiweekly" + echo " use_percentage: true or false" + echo " ed_visit_type: observed or other (optional, default: observed)" + exit 1 +fi + +BASE_DIR="$1" +disease="$2" +location="$3" +target="$4" +frequency="$5" +use_percentage="$6" +ed_visit_type="${7:-observed}" # Default to "observed" if not provided + +# Build command arguments +cmd_args=( + --disease "$disease" + --loc "$location" + --facility-level-nssp-data-dir "$BASE_DIR/private_data/nssp_etl_gold" + --state-level-nssp-data-dir "$BASE_DIR/private_data/nssp_state_level_gold" + --param-data-dir "$BASE_DIR/private_data/prod_param_estimates" + --output-dir "$BASE_DIR/2024-12-21_forecasts" + --n-training-days 90 + --target "$target" + --frequency "$frequency" + --eval-data-path "$BASE_DIR/private_data/nssp-etl" + --nhsn-data-path "$BASE_DIR/private_data/nhsn_test_data/${disease}_${location}.parquet" + --n-forecast-days 28 + --n-particles 2 + --n-mcmc 2 + --n-hmc 2 + --n-forecast-draws 100 + --smc-data-proportion 0.1 +) + +# Add percentage flag if needed +if [ "$use_percentage" = "true" ]; then + cmd_args+=(--use-percentage) +fi + +# Add ed-visit-type if not default +if [ "$ed_visit_type" != "observed" ]; then + cmd_args+=(--ed-visit-type "$ed_visit_type") +fi + +uv run python pipelines/epiautogp/forecast_epiautogp.py "${cmd_args[@]}" + +if [ "$?" -ne 0 ]; then + echo "TEST-MODE FAIL: EpiAutoGP forecast pipeline failed" + exit 1 +fi diff --git a/pipelines/tests/test_epiautogp_prep.sh b/pipelines/tests/test_epiautogp_prep.sh deleted file mode 100755 index 9e9c9b52..00000000 --- a/pipelines/tests/test_epiautogp_prep.sh +++ /dev/null @@ -1,121 +0,0 @@ -#!/bin/bash - -# Test script for EpiAutoGP data preparation -# This script tests the first step of the EpiAutoGP workflow: -# 1. Generate test data -# 2. Convert to EpiAutoGP JSON format - -BASE_DIR=pipelines/tests/epiautogp_test_output -LOCATIONS=(US CA MT DC) -DISEASES=(Influenza COVID-19 RSV) -TARGETS=(nssp nhsn) - -echo "TEST-MODE: Running EpiAutoGP data preparation test with base directory $BASE_DIR" - -if [ -d "$BASE_DIR" ]; then - if [ "$1" = "--force" ]; then - rm -r "$BASE_DIR" - else - # make the user delete the directory, to avoid accidental deletes of - # test output - echo "TEST-MODE FAIL: test output directory $BASE_DIR already exists. Delete the directory and re-run the test, or run with the --force flag". - echo "DETAILS: The test output directory persists after each run to allow the user to examine output. It must be deleted and recreated at the start of each new end-to-end test run to ensure that old output does not compromise test validity." - exit 1 - fi -fi - -echo "TEST-MODE: Generating test data..." -uv run python pipelines/generate_test_data.py "$BASE_DIR" - -if [ "$?" -ne 0 ]; then - echo "TEST-MODE FAIL: Generating test data failed" - exit 1 -else - echo "TEST-MODE: Finished generating test data" -fi - -# Create output directory for JSON files -FORECAST_DATE="2024-12-21" -OUTPUT_DIR="$BASE_DIR/${FORECAST_DATE}_epiautogp_inputs" -mkdir -p "$OUTPUT_DIR" - -echo "TEST-MODE: Converting surveillance data to EpiAutoGP JSON format" - -# Test conversion for each location, disease, and target combination -for location in "${LOCATIONS[@]}"; do - for disease in "${DISEASES[@]}"; do - for target in "${TARGETS[@]}"; do - # Construct paths - if [ "$target" = "nhsn" ]; then - NHSN_PATH="$BASE_DIR/private_data/nhsn_test_data/${disease}_${location}.parquet" - - # Check if NHSN data exists for this combination - if [ ! -f "$NHSN_PATH" ]; then - echo "TEST-MODE: Skipping NHSN conversion for $disease, $location (no data file)" - continue - fi - fi - - # Create location-specific output directory - LOC_OUTPUT_DIR="$OUTPUT_DIR/${location}" - mkdir -p "$LOC_OUTPUT_DIR" - - # For NSSP, test both counts and percentages with epiweekly data - if [ "$target" = "nssp" ]; then - # Test 1: NSSP epiweekly counts - echo "TEST-MODE: Converting $target epiweekly counts for $disease, $location" - OUTPUT_JSON="$LOC_OUTPUT_DIR/epiautogp_input_${target}_epiweekly_counts_${disease// /-}.json" - uv run python pipelines/tests/test_epiautogp_prep_script.py \ - "$target" "$disease" "$location" "$BASE_DIR" "$OUTPUT_JSON" "epiweekly" "false" - - if [ "$?" -ne 0 ]; then - echo "TEST-MODE FAIL: Conversion failed for $target epiweekly counts, $disease, $location" - exit 1 - else - echo "TEST-MODE: Successfully converted $target epiweekly counts for $disease, $location" - fi - - # Test 2: NSSP epiweekly percentage - echo "TEST-MODE: Converting $target epiweekly percentage for $disease, $location" - OUTPUT_JSON="$LOC_OUTPUT_DIR/epiautogp_input_${target}_epiweekly_percentage_${disease// /-}.json" - uv run python pipelines/tests/test_epiautogp_prep_script.py \ - "$target" "$disease" "$location" "$BASE_DIR" "$OUTPUT_JSON" "epiweekly" "true" - - if [ "$?" -ne 0 ]; then - echo "TEST-MODE FAIL: Conversion failed for $target epiweekly percentage, $disease, $location" - exit 1 - else - echo "TEST-MODE: Successfully converted $target epiweekly percentage for $disease, $location" - fi - else - # For NHSN, just test epiweekly counts (no percentage option) - echo "TEST-MODE: Converting $target epiweekly data for $disease, $location" - OUTPUT_JSON="$LOC_OUTPUT_DIR/epiautogp_input_${target}_epiweekly_${disease// /-}.json" - uv run python pipelines/tests/test_epiautogp_prep_script.py \ - "$target" "$disease" "$location" "$BASE_DIR" "$OUTPUT_JSON" "epiweekly" "false" - - if [ "$?" -ne 0 ]; then - echo "TEST-MODE FAIL: Conversion failed for $target data, $disease, $location" - exit 1 - else - echo "TEST-MODE: Successfully converted $target data for $disease, $location" - fi - fi - done - done -done - -echo "TEST-MODE: All conversions complete." -echo "TEST-MODE: JSON files are in $OUTPUT_DIR" - -# Verify some JSON files were created -JSON_COUNT=$(find "$OUTPUT_DIR" -name "*.json" | wc -l) -echo "TEST-MODE: Created $JSON_COUNT JSON files" - -if [ "$JSON_COUNT" -eq 0 ]; then - echo "TEST-MODE FAIL: No JSON files were created" - exit 1 -fi - -echo "TEST-MODE: All finished successfully." -echo "TEST-MODE: You can examine the output files in $OUTPUT_DIR" diff --git a/pipelines/tests/test_epiautogp_prep_script.py b/pipelines/tests/test_epiautogp_prep_script.py deleted file mode 100644 index 4392026c..00000000 --- a/pipelines/tests/test_epiautogp_prep_script.py +++ /dev/null @@ -1,292 +0,0 @@ -""" -Script to test EpiAutoGP data preparation workflow. - -This script replicates the data setup steps from `pipelines/forecast_pyrenew.py` -up to the point where `data_for_model_fit.json` is created, then uses -that file to create the EpiAutoGP input JSON in the same directory. -""" - -import datetime as dt -import logging -import os -import sys -from pathlib import Path - -# Ensure the project root is in the path -sys.path.insert(0, ".") - - -from pipelines.common_utils import ( - calculate_training_dates, - get_available_reports, - load_nssp_data, - parse_and_validate_report_date, -) -from pipelines.epiautogp import convert_to_epiautogp_json -from pipelines.prep_data import process_and_save_loc_data - - -def main(): - if len(sys.argv) not in [6, 7, 8]: - print( - "Usage: python test_epiautogp_prep_script.py " - " [ []]", - file=sys.stderr, - ) - sys.exit(1) - - target = sys.argv[1] - disease = sys.argv[2] - location = sys.argv[3] - base_dir = Path(sys.argv[4]) - output_json = Path(sys.argv[5]) - frequency = sys.argv[6] if len(sys.argv) >= 7 else "epiweekly" - use_percentage = sys.argv[7].lower() == "true" if len(sys.argv) >= 8 else False - - # Set up logging - logging.basicConfig(level=logging.INFO) - logger = logging.getLogger(__name__) - - # Test parameters matching forecast_pyrenew.py test setup - report_date_str = "2024-12-21" - report_date = dt.date(2024, 12, 21) - n_training_days = 90 - exclude_last_n_days = 0 - - logger.info( - f"Testing EpiAutoGP data prep for {disease}, {location}, target={target}" - ) - - # Set up data directories (matching forecast_pyrenew.py) - facility_level_nssp_data_dir = base_dir / "private_data" / "nssp_etl_gold" - state_level_nssp_data_dir = base_dir / "private_data" / "nssp_state_level_gold" - nhsn_data_path = ( - base_dir / "private_data" / "nhsn_test_data" / f"{disease}_{location}.parquet" - ) - - # Get available reports and validate report date - available_facility_level_reports = get_available_reports( - facility_level_nssp_data_dir - ) - available_loc_level_reports = get_available_reports(state_level_nssp_data_dir) - - report_date, loc_report_date = parse_and_validate_report_date( - report_date_str, - available_facility_level_reports, - available_loc_level_reports, - logger, - ) - - # Calculate training dates - first_training_date, last_training_date = calculate_training_dates( - report_date, - n_training_days, - exclude_last_n_days, - logger, - ) - - # Load NSSP data - facility_level_nssp_data, loc_level_nssp_data = load_nssp_data( - report_date, - loc_report_date, - available_facility_level_reports, - available_loc_level_reports, - facility_level_nssp_data_dir, - state_level_nssp_data_dir, - logger, - ) - - # Set up output directory structure (matching forecast_pyrenew.py) - model_batch_dir_name = ( - f"{disease.lower()}_r_{report_date}_f_" - f"{first_training_date}_t_{last_training_date}" - ) - - output_dir = base_dir / f"{report_date}_epiautogp_test" - model_batch_dir = output_dir / model_batch_dir_name - model_run_dir = model_batch_dir / "model_runs" / location - - # For EpiAutoGP test, we'll create a simple model directory - model_name = f"epiautogp_{target}" - model_dir = model_run_dir / model_name - data_dir = model_dir / "data" - os.makedirs(data_dir, exist_ok=True) - - logger.info(f"Processing {location} data...") - logger.info(f"Data will be saved to {data_dir}") - - # Process and save location data - this creates data_for_model_fit.json - process_and_save_loc_data( - loc_abb=location, - disease=disease, - facility_level_nssp_data=facility_level_nssp_data, - loc_level_nssp_data=loc_level_nssp_data, - loc_level_nwss_data=None, # No wastewater for this test - report_date=report_date, - first_training_date=first_training_date, - last_training_date=last_training_date, - save_dir=data_dir, - logger=logger, - credentials_dict={}, - nhsn_data_path=nhsn_data_path, - ) - - # Verify data_for_model_fit.json was created - data_for_model_fit_path = data_dir / "data_for_model_fit.json" - if not data_for_model_fit_path.exists(): - print( - f"ERROR: data_for_model_fit.json not created at {data_for_model_fit_path}", - file=sys.stderr, - ) - sys.exit(1) - - logger.info(f"Successfully created {data_for_model_fit_path}") - - # Generate epiweekly data from the combined training data - logger.info("Generating epiweekly data using R script...") - import subprocess - - # Create a temporary R script that only processes training data - temp_r_script = data_dir / "temp_generate_epiweekly.R" - r_script_content = """ -library(argparser) -library(dplyr) -library(forecasttools) -library(fs) -library(readr) -library(lubridate) -library(stringr) - -convert_daily_to_epiweekly <- function( - data_dir, - data_name, - strict = TRUE, - day_of_week = 7 -) { - data_path <- path(data_dir, data_name) - - if (!file.exists(data_path)) { - cat(paste("Skipping", data_name, "- file does not exist\\n")) - return(invisible(NULL)) - } - - daily_data <- read_tsv( - data_path, - col_types = cols( - date = col_date(), - geo_value = col_character(), - disease = col_character(), - data_type = col_character(), - .variable = col_character(), - .value = col_double() - ) - ) - - daily_ed_data <- daily_data |> - filter(str_ends(.variable, "_ed_visits")) - - epiweekly_hosp_data <- daily_data |> - filter(.variable == "observed_hospital_admissions") - - epiweekly_ed_data <- daily_ed_data |> - forecasttools::daily_to_epiweekly( - value_col = ".value", - weekly_value_name = ".value", - id_cols = c("geo_value", "disease", "data_type", ".variable"), - strict = strict - ) |> - mutate( - date = epiweek_to_date(epiweek, epiyear, day_of_week = day_of_week) - ) |> - select(date, geo_value, disease, data_type, .variable, .value) - - epiweekly_data <- bind_rows(epiweekly_ed_data, epiweekly_hosp_data) |> - arrange(date, .variable) - - output_file <- path( - data_dir, - glue::glue("epiweekly_{data_name}") - ) - - write_tsv(epiweekly_data, output_file) -} - -args <- commandArgs(trailingOnly = TRUE) -data_dir <- args[1] -convert_daily_to_epiweekly(data_dir, "combined_training_data.tsv") -""" - - with open(temp_r_script, "w") as f: - f.write(r_script_content) - - result = subprocess.run( - ["Rscript", str(temp_r_script), str(data_dir)], capture_output=True, text=True - ) - - # Clean up temp script - temp_r_script.unlink() - - if result.returncode != 0: - print( - f"ERROR: Failed to generate epiweekly data: {result.stderr}", - file=sys.stderr, - ) - sys.exit(1) - - epiweekly_data_path = data_dir / "epiweekly_combined_training_data.tsv" - if not epiweekly_data_path.exists(): - print( - f"ERROR: epiweekly_combined_training_data.tsv not created at {epiweekly_data_path}", - file=sys.stderr, - ) - sys.exit(1) - - logger.info(f"Successfully created {epiweekly_data_path}") - - # Now create EpiAutoGP input using the TSV files - # Place it in the same data directory - daily_data_path = data_dir / "combined_training_data.tsv" - epiautogp_json_path = data_dir / f"epiautogp_input_{target}.json" - - try: - result_path = convert_to_epiautogp_json( - daily_training_data_path=daily_data_path, - epiweekly_training_data_path=epiweekly_data_path, - output_json_path=epiautogp_json_path, - disease=disease, - location=location, - forecast_date=report_date, - target=target, - frequency=frequency, - use_percentage=use_percentage, - logger=logger, - ) - logger.info(f"Successfully created EpiAutoGP input at {result_path}") - - # Verify the file was created in the data directory - assert result_path.parent == data_dir, ( - f"EpiAutoGP JSON not in data directory: {result_path.parent} != {data_dir}" - ) - - logger.info(f"Verified: epiautogp_input_{target}.json is in {data_dir}") - - # Also copy to the requested output location for the test script - import shutil - - output_json.parent.mkdir(parents=True, exist_ok=True) - shutil.copy(result_path, output_json) - logger.info(f"Also copied to {output_json} for test verification") - - print(f"SUCCESS: Created {epiautogp_json_path}") - print(f"SUCCESS: Verified files are in same directory: {data_dir}") - - except Exception as e: - print(f"ERROR: {str(e)}", file=sys.stderr) - import traceback - - traceback.print_exc() - sys.exit(1) - - -if __name__ == "__main__": - main() diff --git a/pipelines/tests/test_forecast_utils.py b/pipelines/tests/test_forecast_utils.py new file mode 100644 index 00000000..73b189d0 --- /dev/null +++ b/pipelines/tests/test_forecast_utils.py @@ -0,0 +1,408 @@ +"""Unit tests for forecast pipeline utility functions. + +These tests use mocking via `@patch` decorators to isolate the units under test. +We mock these dependencies to: + +1. Test the logic and control flow without side effects +2. Avoid requiring actual data files or external services +3. Make tests fast and deterministic +4. Focus on verifying correct function calls and parameter passing + +Each test mocks the minimum dependencies needed for that specific test case. + +The end-to-end functionality of the forecast pipeline is verified in a separate +shell-based integration test. +""" + +import datetime as dt +import logging +from dataclasses import replace +from pathlib import Path +from unittest.mock import patch + +import polars as pl +import pytest + +from pipelines.epiautogp.epiautogp_forecast_utils import ( + ForecastPipelineContext, + ModelPaths, + setup_forecast_pipeline, +) + + +@pytest.fixture +def base_context(tmp_path): + """ + Fixture providing a ForecastPipelineContext with default test values. + + Tests can use this directly or override specific fields as needed. + """ + return ForecastPipelineContext( + disease="COVID-19", + loc="CA", + target="nssp", + frequency="epiweekly", + use_percentage=False, + ed_visit_type="observed", + model_name="test_model", + param_data_dir=None, + eval_data_path=tmp_path / "eval.parquet", + nhsn_data_path=None, + report_date=dt.date(2024, 12, 20), + first_training_date=dt.date(2024, 9, 22), + last_training_date=dt.date(2024, 12, 20), + n_forecast_days=28, + exclude_last_n_days=0, + model_batch_dir=tmp_path / "batch", + model_run_dir=tmp_path / "batch" / "model_runs" / "CA", + credentials_dict={}, + facility_level_nssp_data=pl.LazyFrame(), + loc_level_nssp_data=pl.LazyFrame(), + logger=logging.getLogger(), + ) + + +class TestForecastPipelineContext: + """Tests for the ForecastPipelineContext dataclass.""" + + def test_context_initialization(self): + """Test that ForecastPipelineContext can be initialized with all fields.""" + context = ForecastPipelineContext( + disease="COVID-19", + loc="CA", + target="nssp", + frequency="epiweekly", + use_percentage=False, + ed_visit_type="observed", + model_name="test_model", + param_data_dir=None, + eval_data_path=Path("/path/to/eval.parquet"), + nhsn_data_path=None, + report_date=dt.date(2024, 12, 20), + first_training_date=dt.date(2024, 9, 22), + last_training_date=dt.date(2024, 12, 20), + n_forecast_days=28, + exclude_last_n_days=0, + model_batch_dir=Path("/output/batch"), + model_run_dir=Path("/output/batch/model_runs/CA"), + credentials_dict={"key": "value"}, + facility_level_nssp_data=pl.LazyFrame(), + loc_level_nssp_data=pl.LazyFrame(), + logger=logging.getLogger(), + ) + + assert context.disease == "COVID-19" + assert context.loc == "CA" + assert context.n_forecast_days == 28 + assert context.exclude_last_n_days == 0 + + +class TestModelPaths: + """Tests for the ModelPaths dataclass.""" + + def test_paths_initialization(self): + """Test that ModelPaths can be initialized with all fields.""" + paths = ModelPaths( + model_output_dir=Path("/output/model"), + data_dir=Path("/output/model/data"), + daily_training_data=Path("/output/model/data/combined_training_data.tsv"), + epiweekly_training_data=Path( + "/output/model/data/epiweekly_combined_training_data.tsv" + ), + ) + + assert paths.model_output_dir == Path("/output/model") + assert paths.data_dir == Path("/output/model/data") + assert paths.daily_training_data.name == "combined_training_data.tsv" + assert ( + paths.epiweekly_training_data.name == "epiweekly_combined_training_data.tsv" + ) + + +class TestSetupForecastPipeline: + """Tests for the setup_forecast_pipeline function.""" + + @patch("pipelines.epiautogp.epiautogp_forecast_utils.load_credentials") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.get_available_reports") + @patch( + "pipelines.epiautogp.epiautogp_forecast_utils.parse_and_validate_report_date" + ) + @patch("pipelines.epiautogp.epiautogp_forecast_utils.calculate_training_dates") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.load_nssp_data") + def test_setup_pipeline_returns_context( + self, + mock_load_nssp, + mock_calc_dates, + mock_parse_date, + mock_get_reports, + mock_load_creds, + tmp_path, + ): + """Test that setup_forecast_pipeline returns a properly configured context.""" + # Setup mocks + mock_get_reports.return_value = [dt.date(2024, 12, 20)] + mock_parse_date.return_value = (dt.date(2024, 12, 20), dt.date(2024, 12, 20)) + mock_calc_dates.return_value = (dt.date(2024, 9, 22), dt.date(2024, 12, 20)) + mock_load_nssp.return_value = (pl.LazyFrame(), pl.LazyFrame()) + + context = setup_forecast_pipeline( + disease="COVID-19", + report_date="latest", + loc="CA", + target="nssp", + frequency="epiweekly", + use_percentage=False, + ed_visit_type="observed", + model_name="test_model", + param_data_dir=None, + eval_data_path=tmp_path / "eval.parquet", + nhsn_data_path=None, + facility_level_nssp_data_dir=tmp_path, + state_level_nssp_data_dir=tmp_path, + output_dir=tmp_path, + n_training_days=90, + n_forecast_days=28, + exclude_last_n_days=0, + credentials_path=None, + logger=None, + ) + + assert isinstance(context, ForecastPipelineContext) + + @patch("pipelines.epiautogp.epiautogp_forecast_utils.load_credentials") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.get_available_reports") + @patch( + "pipelines.epiautogp.epiautogp_forecast_utils.parse_and_validate_report_date" + ) + @patch("pipelines.epiautogp.epiautogp_forecast_utils.calculate_training_dates") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.load_nssp_data") + def test_setup_pipeline_creates_directory_structure( + self, + mock_load_nssp, + mock_calc_dates, + mock_parse_date, + mock_get_reports, + mock_load_creds, + tmp_path, + ): + """Test that setup creates the expected directory structure.""" + mock_load_creds.return_value = {} + mock_get_reports.return_value = [dt.date(2024, 12, 20)] + mock_parse_date.return_value = (dt.date(2024, 12, 20), dt.date(2024, 12, 20)) + mock_calc_dates.return_value = (dt.date(2024, 9, 22), dt.date(2024, 12, 20)) + mock_load_nssp.return_value = (pl.LazyFrame(), pl.LazyFrame()) + + context = setup_forecast_pipeline( + disease="COVID-19", + report_date="latest", + loc="CA", + target="nssp", + frequency="epiweekly", + use_percentage=False, + ed_visit_type="observed", + model_name="test_model", + param_data_dir=None, + eval_data_path=tmp_path / "eval.parquet", + nhsn_data_path=None, + facility_level_nssp_data_dir=tmp_path, + state_level_nssp_data_dir=tmp_path, + output_dir=tmp_path, + n_training_days=90, + n_forecast_days=28, + ) + + expected_batch_dir = ( + tmp_path / "covid-19_r_2024-12-20_f_2024-09-22_t_2024-12-20" + ) + expected_run_dir = expected_batch_dir / "model_runs" / "CA" + + assert context.model_batch_dir == expected_batch_dir + assert context.model_run_dir == expected_run_dir + + +class TestPrepareModelData: + """Tests for the prepare_model_data function.""" + + @patch("pipelines.epiautogp.epiautogp_forecast_utils.generate_epiweekly_data") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.save_eval_data") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.process_and_save_loc_data") + def test_prepare_model_data_returns_paths( + self, + mock_process_loc, + mock_save_eval, + mock_gen_epiweekly, + base_context, # Fixture is injected here + ): + """Test that prepare_model_data returns ModelPaths.""" + # Use the fixture directly + paths = base_context.prepare_model_data() + + assert isinstance(paths, ModelPaths) + assert paths.model_output_dir.name == "test_model" + assert paths.data_dir.name == "data" + assert paths.daily_training_data.name == "combined_training_data.tsv" + assert ( + paths.epiweekly_training_data.name == "epiweekly_combined_training_data.tsv" + ) + + @patch("pipelines.epiautogp.epiautogp_forecast_utils.generate_epiweekly_data") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.save_eval_data") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.process_and_save_loc_data") + def test_prepare_model_data_creates_directories( + self, + mock_process_loc, + mock_save_eval, + mock_gen_epiweekly, + base_context, # Use fixture + ): + """Test that prepare_model_data creates the required directories.""" + # Use the fixture directly + paths = base_context.prepare_model_data() + + assert paths.model_output_dir.exists() + assert paths.data_dir.exists() + + @patch("pipelines.epiautogp.epiautogp_forecast_utils.process_and_save_loc_data") + def test_prepare_model_data_raises_without_eval_path(self, mock_process, tmp_path): + """Test that prepare_model_data raises ValueError without eval_data_path.""" + context = ForecastPipelineContext( + disease="COVID-19", + loc="CA", + target="nssp", + frequency="epiweekly", + use_percentage=False, + ed_visit_type="observed", + model_name="test_model", + param_data_dir=None, + eval_data_path=None, + nhsn_data_path=None, + report_date=dt.date(2024, 12, 20), + first_training_date=dt.date(2024, 9, 22), + last_training_date=dt.date(2024, 12, 20), + n_forecast_days=28, + exclude_last_n_days=0, + model_batch_dir=tmp_path / "batch", + model_run_dir=tmp_path / "batch" / "model_runs" / "CA", + credentials_dict={}, + facility_level_nssp_data=pl.LazyFrame(), + loc_level_nssp_data=pl.LazyFrame(), + logger=logging.getLogger(), + ) + + with pytest.raises(ValueError, match="No path to an evaluation dataset"): + context.prepare_model_data() + + @patch("pipelines.epiautogp.epiautogp_forecast_utils.generate_epiweekly_data") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.save_eval_data") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.process_and_save_loc_data") + def test_prepare_model_data_passes_nhsn_data_path( + self, + mock_process_loc, + mock_save_eval, + mock_gen_epiweekly, + base_context, # Use fixture + tmp_path, + ): + """Test that prepare_model_data passes nhsn_data_path to data functions.""" + nhsn_path = tmp_path / "nhsn_data.parquet" + + # Override just the fields we need for this test + context = replace( + base_context, + target="nhsn", + nhsn_data_path=nhsn_path, + ) + + _ = context.prepare_model_data() + + # Verify nhsn_data_path was passed to process_and_save_loc_data + mock_process_loc.assert_called_once() + assert mock_process_loc.call_args[1]["nhsn_data_path"] == nhsn_path + + # Verify nhsn_data_path was passed to save_eval_data + mock_save_eval.assert_called_once() + assert mock_save_eval.call_args[1]["nhsn_data_path"] == nhsn_path + + @patch("pipelines.epiautogp.epiautogp_forecast_utils.generate_epiweekly_data") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.save_eval_data") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.process_and_save_loc_data") + def test_prepare_model_data_with_nhsn_target( + self, + mock_process_loc, + mock_save_eval, + mock_gen_epiweekly, + base_context, # Use fixture + tmp_path, + ): + """Test prepare_model_data with NHSN target and data.""" + nhsn_path = tmp_path / "nhsn_hospital_admissions.parquet" + eval_path = tmp_path / "eval.parquet" + + # Override multiple fields for NHSN test + context = replace( + base_context, + target="nhsn", + model_name="epiautogp_nhsn_epiweekly", + eval_data_path=eval_path, + nhsn_data_path=nhsn_path, + credentials_dict={"key": "value"}, + ) + + paths = context.prepare_model_data() + + # Verify the method returns valid paths + assert isinstance(paths, ModelPaths) + assert paths.model_output_dir.name == "epiautogp_nhsn_epiweekly" + assert paths.data_dir.name == "data" + + +class TestPostprocessForecast: + """Tests for the postprocess_forecast function.""" + + @patch("pipelines.epiautogp.epiautogp_forecast_utils.plot_and_save_loc_forecast") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.create_hubverse_table") + def test_postprocess_calls_required_functions( + self, + mock_hubverse, + mock_plot, + base_context, # Use fixture + ): + """Test that postprocess_forecast calls plotting and hubverse creation.""" + # Override eval_data_path and exclude_last_n_days for this test + context = replace( + base_context, + eval_data_path=None, + exclude_last_n_days=5, + ) + + context.post_process_forecast() + + # Verify functions were called + mock_plot.assert_called_once() + mock_hubverse.assert_called_once() + + # Verify correct arguments to plot_and_save_loc_forecast + assert mock_plot.call_args[1]["model_run_dir"] == context.model_run_dir + assert mock_plot.call_args[1]["n_forecast_days"] == context.n_forecast_days + assert mock_plot.call_args[1]["model_name"] == "test_model" + + @patch("pipelines.epiautogp.epiautogp_forecast_utils.plot_and_save_loc_forecast") + @patch("pipelines.epiautogp.epiautogp_forecast_utils.create_hubverse_table") + def test_postprocess_calculates_correct_forecast_period( + self, + mock_hubverse, + mock_plot, + base_context, # Use fixture + ): + """Test that n_forecast_days is passed correctly.""" + # Override eval_data_path for this test + context = replace( + base_context, + eval_data_path=None, + ) + + context.post_process_forecast() + + # Verify that plot_and_save_loc_forecast was called with correct n_forecast_days + mock_plot.assert_called_once() + assert mock_plot.call_args[1]["n_forecast_days"] == context.n_forecast_days diff --git a/pipelines/tests/test_prep_epiautogp_data.py b/pipelines/tests/test_prep_epiautogp_data.py deleted file mode 100644 index cd753294..00000000 --- a/pipelines/tests/test_prep_epiautogp_data.py +++ /dev/null @@ -1,561 +0,0 @@ -""" -Tests for EpiAutoGP data conversion functions. -""" - -import datetime as dt -import json -import logging - -import polars as pl -import pytest - -from pipelines.epiautogp.prep_epiautogp_data import ( - _validate_epiautogp_parameters, - convert_to_epiautogp_json, -) - - -class TestValidateEpiAutoGPParameters: - """Test suite for parameter validation.""" - - def test_valid_nssp_daily(self): - """Test valid NSSP daily parameters.""" - # Should not raise - _validate_epiautogp_parameters("nssp", "daily", False) - - def test_valid_nssp_epiweekly(self): - """Test valid NSSP epiweekly parameters.""" - # Should not raise - _validate_epiautogp_parameters("nssp", "epiweekly", False) - - def test_valid_nssp_percentage(self): - """Test valid NSSP percentage parameters.""" - # Should not raise - _validate_epiautogp_parameters("nssp", "epiweekly", True) - - def test_valid_nhsn_epiweekly(self): - """Test valid NHSN epiweekly parameters.""" - # Should not raise - _validate_epiautogp_parameters("nhsn", "epiweekly", False) - - def test_invalid_target(self): - """Test invalid target raises ValueError.""" - with pytest.raises(ValueError, match="target must be 'nssp' or 'nhsn'"): - _validate_epiautogp_parameters("invalid", "daily", False) - - def test_invalid_frequency(self): - """Test invalid frequency raises ValueError.""" - with pytest.raises( - ValueError, match="frequency must be 'daily' or 'epiweekly'" - ): - _validate_epiautogp_parameters("nssp", "hourly", False) - - def test_nhsn_with_percentage(self): - """Test NHSN with percentage raises ValueError.""" - with pytest.raises( - ValueError, match="use_percentage is only applicable when target='nssp'" - ): - _validate_epiautogp_parameters("nhsn", "epiweekly", True) - - def test_nhsn_with_daily(self): - """Test NHSN with daily frequency raises ValueError.""" - with pytest.raises( - ValueError, match="NHSN data is only available in epiweekly frequency" - ): - _validate_epiautogp_parameters("nhsn", "daily", False) - - -class TestEpiAutoGPDataConversion: - """Test suite for EpiAutoGP data conversion functions.""" - - @pytest.fixture - def sample_epiweekly_data(self, tmp_path): - """Create a sample epiweekly_combined_training_data.tsv file with NSSP and NHSN data.""" - # Create weekly data in long format - data = { - "date": [ - "2024-09-01", - "2024-09-01", - "2024-09-01", - "2024-09-08", - "2024-09-08", - "2024-09-08", - "2024-09-15", - "2024-09-15", - "2024-09-15", - ], - "geo_value": ["CA", "CA", "CA", "CA", "CA", "CA", "CA", "CA", "CA"], - "disease": [ - "COVID-19", - "COVID-19", - "COVID-19", - "COVID-19", - "COVID-19", - "COVID-19", - "COVID-19", - "COVID-19", - "COVID-19", - ], - "data_type": [ - "train", - "train", - "train", - "train", - "train", - "train", - "train", - "train", - "train", - ], - ".variable": [ - "observed_ed_visits", - "other_ed_visits", - "observed_hospital_admissions", - "observed_ed_visits", - "other_ed_visits", - "observed_hospital_admissions", - "observed_ed_visits", - "other_ed_visits", - "observed_hospital_admissions", - ], - ".value": [50, 450, 45, 60, 540, 52, 70, 630, 38], - "lab_site_index": ["NA", "NA", "NA", "NA", "NA", "NA", "NA", "NA", "NA"], - } - df = pl.DataFrame(data) - file_path = tmp_path / "epiweekly_combined_training_data.tsv" - df.write_csv(file_path, separator="\t") - return file_path - - @pytest.fixture - def sample_daily_data(self, tmp_path): - """Create a sample combined_training_data.tsv file with daily NSSP data.""" - # Create daily data in long format - data = { - "date": [ - "2024-09-01", - "2024-09-01", - "2024-09-02", - "2024-09-02", - "2024-09-03", - "2024-09-03", - ], - "geo_value": ["CA", "CA", "CA", "CA", "CA", "CA"], - "disease": [ - "COVID-19", - "COVID-19", - "COVID-19", - "COVID-19", - "COVID-19", - "COVID-19", - ], - "data_type": ["train", "train", "train", "train", "train", "train"], - ".variable": [ - "observed_ed_visits", - "other_ed_visits", - "observed_ed_visits", - "other_ed_visits", - "observed_ed_visits", - "other_ed_visits", - ], - ".value": [50, 450, 60, 540, 70, 630], - "lab_site_index": ["", "", "", "", "", ""], - } - df = pl.DataFrame(data) - file_path = tmp_path / "combined_training_data.tsv" - df.write_csv(file_path, separator="\t") - return file_path - - @pytest.fixture - def sample_data_for_model_fit_nssp(self, tmp_path): - """Create a sample data_for_model_fit.json file with NHSN data.""" - data = { - "loc_pop": 39512223, - "right_truncation_offset": 0, - "nwss_training_data": None, - "nssp_training_data": { - "date": ["2024-09-01", "2024-09-02", "2024-09-03"], - "geo_value": ["CA", "CA", "CA"], - "data_type": ["train", "train", "train"], - "observed_ed_visits": [50, 60, 70], - "other_ed_visits": [450, 540, 630], - }, - "nhsn_training_data": { - "jurisdiction": ["CA", "CA", "CA"], - "weekendingdate": ["2024-09-07", "2024-09-14", "2024-09-21"], - "hospital_admissions": [45, 52, 38], - "data_type": ["train", "train", "train"], - }, - "nhsn_step_size": 7, - "nssp_step_size": 1, - "nwss_step_size": 1, - } - file_path = tmp_path / "data_for_model_fit.json" - with open(file_path, "w") as f: - json.dump(data, f) - return file_path - - @pytest.fixture - def sample_data_for_model_fit_nhsn(self, tmp_path): - """Create a sample data_for_model_fit.json file with NHSN data.""" - data = { - "loc_pop": 39512223, - "right_truncation_offset": 0, - "nwss_training_data": None, - "nssp_training_data": { - "date": ["2024-09-01", "2024-09-08", "2024-09-15"], - "geo_value": ["CA", "CA", "CA"], - "data_type": ["train", "train", "train"], - "observed_ed_visits": [50, 60, 70], - "other_ed_visits": [450, 540, 630], - }, - "nhsn_training_data": { - "jurisdiction": ["CA", "CA", "CA"], - "weekendingdate": ["2024-09-07", "2024-09-14", "2024-09-21"], - "hospital_admissions": [45, 52, 38], - "data_type": ["train", "train", "train"], - }, - "nhsn_step_size": 7, - "nssp_step_size": 1, - "nwss_step_size": 1, - } - file_path = tmp_path / "data_for_model_fit.json" - with open(file_path, "w") as f: - json.dump(data, f) - return file_path - - def test_convert_nssp_epiweekly_percentage_to_epiautogp_json( - self, sample_daily_data, sample_epiweekly_data, tmp_path - ): - """Test conversion of NSSP epiweekly data with percentage to EpiAutoGP JSON format.""" - output_path = tmp_path / "epiautogp_input.json" - - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=sample_epiweekly_data, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="nssp", - frequency="epiweekly", - use_percentage=True, - ) - - # Verify file was created - assert output_path.exists() - - # Read and verify JSON content - with open(output_path) as f: - result = json.load(f) - - # Check structure - assert "dates" in result - assert "reports" in result - assert "pathogen" in result - assert "location" in result - assert "target" in result - assert "forecast_date" in result - assert "nowcast_dates" in result - assert "nowcast_reports" in result - - # Check values - assert result["pathogen"] == "COVID-19" - assert result["location"] == "CA" - assert result["target"] == "nssp" - assert result["forecast_date"] == "2024-09-15" - assert len(result["dates"]) == 3 - assert result["dates"] == ["2024-09-01", "2024-09-08", "2024-09-15"] - - # Check ED visit percentages are calculated correctly - # Week 1: 50 / (50 + 450) * 100 = 10.0% - # Week 2: 60 / (60 + 540) * 100 = 10.0% - # Week 3: 70 / (70 + 630) * 100 = 10.0% - assert len(result["reports"]) == 3 - assert result["reports"][0] == pytest.approx(10.0) - assert result["reports"][1] == pytest.approx(10.0) - assert result["reports"][2] == pytest.approx(10.0) - - def test_convert_nssp_epiweekly_counts_to_epiautogp_json( - self, sample_daily_data, sample_epiweekly_data, tmp_path - ): - """Test conversion of NSSP epiweekly data with counts to EpiAutoGP JSON format.""" - output_path = tmp_path / "epiautogp_input.json" - - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=sample_epiweekly_data, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="nssp", - frequency="epiweekly", - use_percentage=False, - ) - - # Verify file was created - assert output_path.exists() - - # Read and verify JSON content - with open(output_path) as f: - result = json.load(f) - - # Check structure - assert result["pathogen"] == "COVID-19" - assert result["location"] == "CA" - assert result["target"] == "nssp" - assert result["forecast_date"] == "2024-09-15" - assert len(result["dates"]) == 3 - assert result["dates"] == ["2024-09-01", "2024-09-08", "2024-09-15"] - - # Check ED visit counts (raw, not percentages) - assert len(result["reports"]) == 3 - assert result["reports"] == [50.0, 60.0, 70.0] - - def test_convert_nssp_daily_counts_to_epiautogp_json( - self, sample_daily_data, sample_epiweekly_data, tmp_path - ): - """Test conversion of NSSP daily data with counts to EpiAutoGP JSON format.""" - output_path = tmp_path / "epiautogp_input.json" - - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=sample_epiweekly_data, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 3), - target="nssp", - frequency="daily", - use_percentage=False, - ) - - # Verify file was created - assert output_path.exists() - - # Read and verify JSON content - with open(output_path) as f: - result = json.load(f) - - # Check structure - assert result["pathogen"] == "COVID-19" - assert result["location"] == "CA" - assert result["target"] == "nssp" - assert result["forecast_date"] == "2024-09-03" - assert len(result["dates"]) == 3 - assert result["dates"] == ["2024-09-01", "2024-09-02", "2024-09-03"] - - # Check ED visit counts (raw, not percentages) - assert len(result["reports"]) == 3 - assert result["reports"] == [50.0, 60.0, 70.0] - - def test_convert_nhsn_to_epiautogp_json( - self, sample_daily_data, sample_epiweekly_data, tmp_path - ): - """Test conversion of NHSN data to EpiAutoGP JSON format.""" - output_path = tmp_path / "epiautogp_input.json" - - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=sample_epiweekly_data, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="nhsn", - frequency="epiweekly", - use_percentage=False, - ) - - # Verify file was created - assert output_path.exists() - - # Read and verify JSON content - with open(output_path) as f: - result = json.load(f) - - # Check structure - assert result["pathogen"] == "COVID-19" - assert result["location"] == "CA" - assert result["target"] == "nhsn" - assert result["forecast_date"] == "2024-09-15" - assert len(result["dates"]) == 3 - assert result["dates"] == ["2024-09-01", "2024-09-08", "2024-09-15"] - - # Check hospital admission counts - assert len(result["reports"]) == 3 - assert result["reports"] == [45.0, 52.0, 38.0] - - def test_convert_with_nowcast_data( - self, sample_daily_data, sample_epiweekly_data, tmp_path - ): - """Test conversion with nowcast dates and reports.""" - output_path = tmp_path / "epiautogp_input.json" - - nowcast_dates = [dt.date(2024, 9, 8), dt.date(2024, 9, 15)] - nowcast_reports = [[58.0, 60.0, 62.0], [68.0, 70.0, 72.0]] - - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=sample_epiweekly_data, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="nssp", - frequency="epiweekly", - use_percentage=True, - nowcast_dates=nowcast_dates, - nowcast_reports=nowcast_reports, - ) - - with open(output_path) as f: - result = json.load(f) - - assert result["nowcast_dates"] == ["2024-09-08", "2024-09-15"] - assert result["nowcast_reports"] == [[58.0, 60.0, 62.0], [68.0, 70.0, 72.0]] - - def test_invalid_target_raises_error( - self, sample_daily_data, sample_epiweekly_data, tmp_path - ): - """Test that invalid target raises ValueError.""" - output_path = tmp_path / "epiautogp_input.json" - - with pytest.raises(ValueError, match="target must be 'nssp' or 'nhsn'"): - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=sample_epiweekly_data, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="invalid_target", - frequency="epiweekly", - ) - - def test_invalid_frequency_raises_error( - self, sample_daily_data, sample_epiweekly_data, tmp_path - ): - """Test that invalid frequency raises ValueError.""" - output_path = tmp_path / "epiautogp_input.json" - - with pytest.raises( - ValueError, match="frequency must be 'daily' or 'epiweekly'" - ): - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=sample_epiweekly_data, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="nssp", - frequency="monthly", - ) - - def test_nhsn_with_use_percentage_raises_error( - self, sample_daily_data, sample_epiweekly_data, tmp_path - ): - """Test that NHSN target with use_percentage=True raises ValueError.""" - output_path = tmp_path / "epiautogp_input.json" - - with pytest.raises( - ValueError, match="use_percentage is only applicable when target='nssp'" - ): - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=sample_epiweekly_data, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="nhsn", - frequency="epiweekly", - use_percentage=True, - ) - - def test_nhsn_without_data_raises_error(self, sample_daily_data, tmp_path): - """Test that NHSN target without NHSN data in TSV raises ValueError.""" - output_path = tmp_path / "epiautogp_input.json" - - # Create epiweekly TSV without NHSN data - data = { - "date": ["2024-09-01", "2024-09-01"], - "geo_value": ["CA", "CA"], - "disease": ["COVID-19", "COVID-19"], - "data_type": ["train", "train"], - ".variable": ["observed_ed_visits", "other_ed_visits"], - ".value": [50, 450], - "lab_site_index": ["NA", "NA"], - } - df = pl.DataFrame(data) - epiweekly_file = tmp_path / "epiweekly_no_nhsn.tsv" - df.write_csv(epiweekly_file, separator="\t") - - with pytest.raises( - ValueError, match="Column 'observed_hospital_admissions' not found" - ): - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=epiweekly_file, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="nhsn", - frequency="epiweekly", - ) - - def test_missing_nssp_data_raises_error(self, sample_daily_data, tmp_path): - """Test that missing NSSP data raises ValueError.""" - output_path = tmp_path / "epiautogp_input.json" - - # Create empty epiweekly TSV (no NSSP data for the location/disease) - empty_data = { - "date": ["2024-09-01"], - "geo_value": ["NY"], # Different location - "disease": ["COVID-19"], - "data_type": ["train"], - ".variable": ["observed_ed_visits"], - ".value": [50], - "lab_site_index": ["NA"], - } - df = pl.DataFrame(empty_data) - epiweekly_file = tmp_path / "epiweekly_empty.tsv" - df.write_csv(epiweekly_file, separator="\t") - - with pytest.raises(ValueError, match="No data found for COVID-19 CA"): - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=epiweekly_file, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="nssp", - frequency="epiweekly", - ) - - def test_custom_logger( - self, sample_daily_data, sample_epiweekly_data, tmp_path, caplog - ): - """Test that custom logger is used when provided.""" - output_path = tmp_path / "epiautogp_input.json" - custom_logger = logging.getLogger("test_custom_logger") - - with caplog.at_level(logging.INFO, logger="test_custom_logger"): - convert_to_epiautogp_json( - daily_training_data_path=sample_daily_data, - epiweekly_training_data_path=sample_epiweekly_data, - output_json_path=output_path, - disease="COVID-19", - location="CA", - forecast_date=dt.date(2024, 9, 15), - target="nhsn", - frequency="epiweekly", - logger=custom_logger, - ) - - # Verify custom logger was used - assert "Using hospital admission counts" in caplog.text - assert "Saved EpiAutoGP input JSON" in caplog.text