diff --git a/modules/assim.sequential/R/Analysis_sda_block.R b/modules/assim.sequential/R/Analysis_sda_block.R index f70788d2357..46b791ad071 100644 --- a/modules/assim.sequential/R/Analysis_sda_block.R +++ b/modules/assim.sequential/R/Analysis_sda_block.R @@ -21,11 +21,12 @@ analysis_sda_block <- function (settings, block.list.all, X, obs.mean, obs.cov, # grab cores from settings. cores <- as.numeric(settings$state.data.assimilation$batch.settings$general.job$cores) # if we didn't assign number of CPUs in the settings. - if (is.null(cores)) { - cores <- parallel::detectCores() - 1 - # if we only have one CPU. - if (cores < 1) cores <- 1 + if (length(cores) == 0 | is.null(cores)) { + cores <- parallel::detectCores() } + cores <- cores - 1 + # if we only have one CPU. + if (cores < 1) cores <- 1 #convert from vector values to block lists. if ("try-error" %in% class(try(block.results <- build.block.xy(settings = settings, block.list.all = block.list.all, @@ -570,6 +571,7 @@ update_q <- function (block.list.all, t, nt, aqq.Init = NULL, bqq.Init = NULL, M #if it's an update. if (is.null(MCMC_dat)) { #loop over blocks + #if t=1 or if it's a fresh run. if (t == 1) { for (i in seq_along(block.list)) { nvar <- length(block.list[[i]]$data$muf) diff --git a/modules/assim.sequential/R/sda.enkf_MultiSite.R b/modules/assim.sequential/R/sda.enkf_MultiSite.R index d8f2d847c50..c133efd1c07 100644 --- a/modules/assim.sequential/R/sda.enkf_MultiSite.R +++ b/modules/assim.sequential/R/sda.enkf_MultiSite.R @@ -17,34 +17,30 @@ #' @param Q Process covariance matrix given if there is no data to estimate it. #' @param restart Used for iterative updating previous forecasts. Default NULL. List object includes file path to previous runs and start date for SDA. #' @param pre_enkf_params Used for passing pre-existing time-series of process error into the current SDA runs to ignore the impact by the differences between process errors. -#' @param ensemble.samples Pass ensemble.samples from outside to avoid GitHub check issues. +#' @param ensemble.samples list of ensemble parameters across PFTs. Default is NULL. #' @param control List of flags controlling the behavior of the SDA. -#' `trace` for reporting back the SDA outcomes; #' `TimeseriesPlot` for post analysis examination; -#' `debug` decide if we want to pause the code and examining the variables inside the function; -#' `pause` decide if we want to pause the SDA workflow at current time point t; -#' `Profiling` decide if we want to export the temporal SDA outputs in CSV file; #' `OutlierDetection` decide if we want to execute the outlier detection each time after the model forecasting; -#' `parallel_qsub` decide if we want to execute the `qsub` job submission under parallel mode; #' `send_email` contains lists for sending email to report the SDA progress; #' `keepNC` decide if we want to keep the NetCDF files inside the out directory; #' `forceRun` decide if we want to proceed the Bayesian MCMC sampling without observations; #' `run_parallel` decide if we want to run the SDA under parallel mode for the `future_map` function; #' `MCMC.args` include lists for controling the MCMC sampling process (iteration, nchains, burnin, and nthin.). -#' @param cov_dir Directory containing yearly covariate stacks named like "covariates_YYYY.tiff". -#' @param debias_start_year Integer year (e.g., 2015). If `NULL`, debiasing is OFF. -#' @param debias_drop_incomplete_covariates Logical; drop sites with any NA covariates -#' @param debias_enforce_consistent_obs Logical; drop sites that lost any previously -#' @param debias_require_obs_at_t_for_predict Logical; only make residual predictions +#' `merge_nc` determine if we want to merge all netCDF files across sites and ensembles. +#' If it's set as `TRUE`, we will then combine all netCDF files into the `merged_nc` folder within the `outdir`. +#' `execution` decide the way we want to execute model +#' including `local` ,where we execute the model locally; +#' `qsub`, where we use the traditional `start_model_runs` function for submission; +#' `qsub_parallel`, where we first combine jobs and submit them into the SCC. +#' @param debias List: R list containing the covariance directory and the start year. +#' covariance directory should include GeoTIFF files named by year. +#' start year is numeric input which decide when to start the debiasing feature. #' @param ... Additional arguments, currently ignored #' -#' #' @return NONE #' @import nimble furrr #' @export #' -#' -#' sda.enkf.multisite <- function(settings, obs.mean, obs.cov, @@ -52,24 +48,16 @@ sda.enkf.multisite <- function(settings, restart = NULL, pre_enkf_params = NULL, ensemble.samples = NULL, - control=list(trace = TRUE, - TimeseriesPlot = FALSE, - debug = FALSE, - pause = FALSE, - Profiling = FALSE, + control=list(TimeseriesPlot = FALSE, OutlierDetection=FALSE, - parallel_qsub = TRUE, send_email = NULL, keepNC = TRUE, forceRun = TRUE, run_parallel = TRUE, - MCMC.args = NULL), - cov_dir = NULL, - debias_start_year = NULL, - debias_drop_incomplete_covariates = TRUE, - debias_enforce_consistent_obs = TRUE, - debias_require_obs_at_t_for_predict = FALSE, - ...) { + MCMC.args = NULL, + merge_nc = TRUE, + execution = "local"), + debias = list(cov.dir = NULL, start.year = NULL), ...) { #add if/else for when restart points to folder instead if T/F set restart as T if(is.list(restart)){ old.dir <- restart$filepath @@ -78,6 +66,7 @@ sda.enkf.multisite <- function(settings, }else{ restart_flag = FALSE } + # register parallel nodes. if(control$run_parallel){ if (future::supportsMulticore()) { future::plan(future::multicore) @@ -85,8 +74,6 @@ sda.enkf.multisite <- function(settings, future::plan(future::multisession) } } - if (control$debug) browser() - tictoc::tic("Preparation") ###-------------------------------------------------------------------### ### read settings ### ###-------------------------------------------------------------------### @@ -99,46 +86,22 @@ sda.enkf.multisite <- function(settings, host <- settings$host forecast.time.step <- settings$state.data.assimilation$forecast.time.step #idea for later generalizing nens <- as.numeric(settings$ensemble$size) - processvar <- settings$state.data.assimilation$process.variance - if(processvar=="TRUE"){ - processvar <- TRUE - }else{ - processvar <- FALSE - } - Localization.FUN <- settings$state.data.assimilation$Localization.FUN # localization function - scalef <- settings$state.data.assimilation$scalef %>% as.numeric() # scale factor for localization + processvar <- settings$state.data.assimilation$process.variance %>% as.logical var.names <- sapply(settings$state.data.assimilation$state.variable, '[[', "variable.name") names(var.names) <- NULL multi.site.flag <- PEcAn.settings::is.MultiSettings(settings) - readsFF <- NULL # this keeps the forward forecast - - is.local <- PEcAn.remote::is.localhost(settings$host) - #------------------Reading up the MCMC settings - nitr.GEF <- ifelse(is.null(settings$state.data.assimilation$nitrGEF), - 5e4, - settings$state.data.assimilation$nitrGEF %>% - as.numeric) - nthin <- ifelse(is.null(settings$state.data.assimilation$nthin), - 10, - settings$state.data.assimilation$nthin %>% - as.numeric) - nburnin<- ifelse(is.null(settings$state.data.assimilation$nburnin), - 1e4, - settings$state.data.assimilation$nburnin %>% - as.numeric) - censored.data<-ifelse(is.null(settings$state.data.assimilation$censored.data), - TRUE, - settings$state.data.assimilation$censored.data %>% - as.logical) + is.local <- PEcAn.remote::is.localhost(host) + # if we want to censor data in the GEF. + censored.data<-settings$state.data.assimilation$censored.data + if (is.null(censored.data)) censored.data <- TRUE #--------Initialization FORECAST <- ANALYSIS <- ens_weights <- list() enkf.params <- list() restart.list <- NULL #create SDA folder to store output if(!dir.exists(settings$outdir)) dir.create(settings$outdir, showWarnings = FALSE) - ##### Creating matrices that describe the bounds of the state variables - ##### interval is remade everytime depending on the data at time t + ##### interval is remade every time depending on the data at time t ##### state.interval stays constant and converts new.analysis to be within the correct bounds interval <- NULL state.interval <- cbind(as.numeric(lapply(settings$state.data.assimilation$state.variables,'[[','min_value')), @@ -158,51 +121,10 @@ sda.enkf.multisite <- function(settings, distances <- sp::spDists(site.locs, longlat = TRUE) #turn that into a blocked matrix format blocked.dis <- block_matrix(distances %>% as.numeric(), rep(length(var.names), length(site.ids))) - }else{ conf.settings <- list(settings) site.ids <- as.character(settings$run$site$id) } - - # Build site coords once (used by covariate extraction) - site_coords <- purrr::map_df(settings$run, ~ dplyr::tibble( - site = as.character(.x$site$id), - lon = suppressWarnings(as.numeric(.x$site$lon)), - lat = suppressWarnings(as.numeric(.x$site$lat)) - )) - - # Memoized cache: one data.frame per year - cov_cache <- new.env(parent = emptyenv()) - - .load_cov_year <- function(year) { - key <- as.character(year) - if (exists(key, cov_cache, inherits = FALSE)) return(get(key, cov_cache)) - - if (is.null(cov_dir)) { - stop("Debiasing requires covariates, but `cov_dir` is NULL. Set `cov_dir` or disable debiasing.") - } - - # Reuse the existing extractor; filter to one year - df_all <- generate_covariates_df( - site_coords = site_coords, - cov_dir = cov_dir, - crs = "EPSG:4326", - file_prefix = "covariates_" - ) - df_year <- df_all[df_all$year == as.integer(year), , drop = FALSE] - - assign(key, df_year, cov_cache) - df_year - } - - .cov_df_for_years <- function(years) { - yrs <- unique(as.integer(years)) - dplyr::bind_rows(lapply(yrs, .load_cov_year)) - } - - - - ###-------------------------------------------------------------------### ### check dates before data assimilation ### ###-------------------------------------------------------------------###---- @@ -210,14 +132,10 @@ sda.enkf.multisite <- function(settings, if (restart_flag) { start.cut <- lubridate::ymd_hms(start.cut) #start.cut taken from restart list as date to begin runs Start.year <-lubridate::year(start.cut) - }else{ start.cut <- lubridate::ymd_hms(settings$state.data.assimilation$start.date, truncated = 3) Start.year <- (lubridate::year(settings$state.data.assimilation$start.date)) } - #Enabling debias feature - debias_enabled <- !is.null(debias_start_year) - End.year <- lubridate::year(settings$state.data.assimilation$end.date) # dates that assimilations will be done for - obs will be subsetted based on this assim.sda <- Start.year:End.year obs.mean <- obs.mean[sapply(lubridate::year(names(obs.mean)), function(obs.year) obs.year %in% (assim.sda))] #checks obs.mean dates against assimyear dates @@ -241,13 +159,8 @@ sda.enkf.multisite <- function(settings, read_restart_times <- c(lubridate::ymd_hms(start.cut, truncated = 3), obs.times) nt <- length(obs.times) #sets length of for loop for Forecast/Analysis if (nt==0) PEcAn.logger::logger.severe('There has to be at least one Obs.') - -# Model Specific Setup ---------------------------------------------------- - + # Model Specific Setup ---------------------------------------------------- #--get model specific functions - #my.write_restart <- paste0("PEcAn.", model, "::write_restart.", model) - #my.read_restart <- paste0("PEcAn.", model, "::read_restart.", model) - #my.split_inputs <- paste0("PEcAn.", model, "::split_inputs.", model) do.call("library", list(paste0("PEcAn.", model))) my.write_restart <- paste0("write_restart.", model) my.read_restart <- paste0("read_restart.", model) @@ -258,19 +171,17 @@ sda.enkf.multisite <- function(settings, register.xml <- system.file(paste0("register.", model, ".xml"), package = paste0("PEcAn.", model)) register <- XML::xmlToList(XML::xmlParse(register.xml)) no_split <- !as.logical(register$exact.dates) - if (!exists(my.split_inputs) & !no_split) { PEcAn.logger::logger.warn(my.split_inputs, "does not exist") PEcAn.logger::logger.severe("please make sure that the PEcAn interface is loaded for", model) PEcAn.logger::logger.warn(my.split_inputs, "If your model does not need the split function you can specify that in register.Model.xml in model's inst folder by adding FALSE tag.") - } #split met if model calls for it #create a folder to store extracted met files if(!file.exists(paste0(settings$outdir, "/Extracted_met/"))){ dir.create(paste0(settings$outdir, "/Extracted_met/")) } - + PEcAn.logger::logger.info("Splitting mets!") conf.settings <-conf.settings %>% `class<-`(c("list")) %>% #until here, it separates all the settings for all sites that listed in the xml file furrr::future_map(function(settings) { @@ -294,13 +205,12 @@ sda.enkf.multisite <- function(settings, # changing the start and end date which will be used for model2netcdf.model settings$run$start.date <- lubridate::ymd_hms(settings$state.data.assimilation$start.date, truncated = 3) settings$run$end.date <- lubridate::ymd_hms(settings$state.data.assimilation$end.date, truncated = 3) - } } else{ inputs.split <- inputs } settings - }) + }, .progress = F) conf.settings<- PEcAn.settings::as.MultiSettings(conf.settings) ###-------------------------------------------------------------------### ### If this is a restart - Picking up were we left last time ### @@ -323,7 +233,7 @@ sda.enkf.multisite <- function(settings, #sim.time <-2:nt # if It's restart I added +1 from the start to nt (which is the last year of old sim) to make the first sim in restart time t=2 #new.params and params.list are already loaded in the environment only need to grab X X <-FORECAST[[length(FORECAST)]] - }else{ + } else { PEcAn.logger::logger.info("The SDA output from the older simulation doesn't exist, assuming first SDA run with unconstrainded forecast output") #loading param info from previous forecast if(!exists("ensemble.samples") || is.null(ensemble.samples)){ @@ -341,15 +251,26 @@ sda.enkf.multisite <- function(settings, #out.configs object required to build X and restart.list object required for build X #TODO: there should be an easier way to do this than to rerun write.ensemble.configs restart.list <- vector("list", length(conf.settings)) + # make sure we have the input_design variable before running the write configuration function. + if (!exists("input_design")) { + PEcAn.logger::logger.info("The input_design is not found for write configuration function call.") + return(0) + } out.configs <- conf.settings %>% `class<-`(c("list")) %>% furrr::future_map2(restart.list, function(settings, restart.arg) { # Loading the model package - this is required bc of the furrr library(paste0("PEcAn.",settings$model$type), character.only = TRUE) # wrtting configs for each settings - this does not make a difference with the old code + # if we don't specify the input_design. + if (!exists("input_design")) { + input_design <- NULL + } + # wrtting configs for each settings - this does not make a difference with the old code PEcAn.uncertainty::write.ensemble.configs( + input_design = input_design, ensemble.size = nens, - defaults = settings$pfts, + defaults = defaults, ensemble.samples = ensemble.samples, settings = settings, model = settings$model$type, @@ -369,7 +290,7 @@ sda.enkf.multisite <- function(settings, var.names = var.names, my.read_restart = my.read_restart, restart_flag = restart_flag) - + #let's read the parameters of each site/ens params.list <- reads %>% purrr::map(~.x %>% purrr::map("params")) # Now let's read the state variables of site/ens @@ -396,549 +317,402 @@ sda.enkf.multisite <- function(settings, # weight matrix wt.mat <- matrix(NA, nrow = nens, ncol = nt) # Reading param samples------------------------------- - #create params object using samples generated from TRAITS functions - if(restart_flag){ - new.params <- new.params - }else{ - if(!file.exists(file.path(settings$outdir, "samples.Rdata"))) PEcAn.logger::logger.severe("samples.Rdata cannot be found. Make sure you generate samples by running the get.parameter.samples function before running SDA.") - #Generate parameter needs to be run before this to generate the samples. This is hopefully done in the main workflow. - if(is.null(ensemble.samples)){ - load(file.path(settings$outdir, "samples.Rdata")) - } - #reformatting params - new.params <- sda_matchparam(settings, ensemble.samples, site.ids, nens) - - } - - - #TODO: incorporate Phyllis's restart work - #sample all inputs specified in the settings$ensemble - #now looking into the xml - samp <- conf.settings$ensemble$samplingspace - #finding who has a parent - parents <- lapply(samp,'[[', 'parent') - #order parents based on the need of who has to be first - order <- names(samp)[lapply(parents, function(tr) which(names(samp) %in% tr)) %>% unlist()] - #new ordered sampling space - samp.ordered <- samp[c(order, names(samp)[!(names(samp) %in% order)])] - #performing the sampling - inputs <- vector("list", length(conf.settings)) - #for the tags specified in the xml, do the sampling for a random site and then replicate the same sample ids for the remaining sites for each ensemble member - for (i in seq_along(samp.ordered)) { - random_site <- sample(1:length(conf.settings),1) - if (is.null(inputs[[random_site]])) { - inputs[[random_site]] <- list() - } - input_tag<-names(samp.ordered)[i] - #call the function responsible for generating the ensemble for the random site - inputs[[random_site]][[input_tag]] <- PEcAn.uncertainty::input.ens.gen(settings=conf.settings[[random_site]], - input=input_tag, - method=samp.ordered[[i]]$method, - parent_ids=NULL) - #replicate the same ids for the remaining sites - for (s in seq_along(conf.settings)) { - if (s!= random_site) { - if (is.null(inputs[[s]])) { - inputs[[s]] <- list() - } - input_path <- conf.settings[[s]]$run$inputs[[tolower(input_tag)]]$path - inputs[[s]][[input_tag]]$ids<-inputs[[random_site]][[input_tag]]$ids - inputs[[s]][[input_tag]]$samples<- input_path[inputs[[random_site]][[input_tag]]$ids] - } + #create params object using samples generated from TRAITS functions + if(restart_flag){ + new.params <- new.params + } else { + if(!file.exists(file.path(settings$outdir, "samples.Rdata"))) PEcAn.logger::logger.severe("samples.Rdata cannot be found. Make sure you generate samples by running the get.parameter.samples function before running SDA.") + #Generate parameter needs to be run before this to generate the samples. This is hopefully done in the main workflow. + if(is.null(ensemble.samples)){ + load(file.path(settings$outdir, "samples.Rdata")) } + #reformatting params + new.params <- sda_matchparam(settings, ensemble.samples, site.ids, nens) + # if it's not a restart run, we will generate the joint input design. + # get the joint input design. + input_design <- PEcAn.uncertainty::generate_joint_ensemble_design(settings = settings[[1]], + ensemble_samples = ensemble.samples, + ensemble_size = nens)[[1]] } - #Using our R script to import python functions from debias.py - py <- .get_debias_mod() - train_X <- NULL # a data.frame or matrix of size (N_past × P_features) - train_y <- numeric() # a numeric vector of residuals - raw_prev <- NULL # raw forecast mean from previous step - train_buf <- new.env(parent = emptyenv()) - # --- New: containers for storing debias weights over time --- - DEBIAS_WEIGHTS <- list() # nested list: DEBIAS_WEIGHTS[[time]][[var]] = named vector (KNN weight, TREE weight) - # Flat, tidy log of learner weights over time (easier to write/export/plot). - # Columns: - # time : character label for the time step (e.g., "2012-12-31") - # var : state variable name (e.g., "LAI", "AbvGrndWood") - # learner : model id / learner name ("KNN", "TREE", etc.) - # weight : numeric weight for that learner at that time/variable - DEBIAS_WEIGHTS_DF <- data.frame( - time = character(), - var = character(), - learner = character(), - weight = numeric(), - stringsAsFactors = FALSE - ) - # Small helper to append one row per learner into DEBIAS_WEIGHTS_DF. - # Args: - # t_label : time label for the current step (character) - # var : variable name (character) - # w_named : *named* numeric vector of weights (e.g., c(KNN=0.6, TREE=0.4)). - # If unnamed, we auto-name as "learner_1", "learner_2", ... - # Returns: - # A data.frame with columns (time, var, learner, weight) ready to rbind(). - - # Should we run debiasing this cycle? - # Debiasing requires: debias_mode == TRUE, t > 1, and (if provided) obs.year >= debias_start_year - .should_debias <- function(t, enabled, obs_year, start_year) { - isTRUE(enabled) && t > 1 && !is.null(start_year) && (as.integer(obs_year) >= as.integer(start_year)) - } - - # Flat RMSE tracker across all times and variables. - # Columns: - # time : time label - # var : variable name - # rmse_pre : RMSE before debias (raw ensemble mean vs obs) - # rmse_post : RMSE after debias (corrected mean vs obs) - DIAG <- list() # per-time: DIAG[[time]] = list(comp=..., rmse=...) - RMSE_DF <- data.frame( - time = character(), - var = character(), - rmse_pre = numeric(), rmse_post = numeric(), - mae_pre = numeric(), mae_post = numeric(), - bias_pre = numeric(), bias_post = numeric(), - r2_pre = numeric(), r2_post = numeric(), - stringsAsFactors = FALSE - ) - FEATURE_IMP_DF <- data.frame( - time = character(), - var = character(), - feature = character(), - importance = numeric(), - stringsAsFactors = FALSE - ) - ###------------------------------------------------------------------------------------------------### ### loop over time ### ###------------------------------------------------------------------------------------------------### + # initialize the lists of covariates for the debias feature. + pre.states <- vector("list", length = length(var.names)) %>% purrr::set_names(var.names) for(t in 1:nt){ - obs.t <- as.character(lubridate::date(obs.times[t])) - obs.year <- lubridate::year(obs.times[t]) - ###-------------------------------------------------------------------------### - ### Taking care of Forecast. Splitting / Writting / running / reading back### - ###-------------------------------------------------------------------------###----- - #- Check to see if this is the first run or not and what inputs needs to be sent to write.ensemble configs - if (t>1){ - #for next time step split the met if model requires - #-Splitting the input for the models that they don't care about the start and end time of simulations and they run as long as their met file. - inputs.split <- metSplit(conf.settings, inputs, settings, model, no_split = FALSE, obs.times, t, nens, restart_flag = FALSE, my.split_inputs) - - #---------------- setting up the restart argument for each site separately and keeping them in a list - restart.list <- - furrr::future_pmap(list(out.configs, conf.settings %>% `class<-`(c("list")), params.list, inputs.split), - function(configs, settings, new.params, inputs) { - #if the new state for each site only has one row/col. - #then we need to convert it to matrix to solve the indexing issue. - new_state_site <- new.state[, which(attr(X, "Site") %in% settings$run$site$id)] - if(is.vector(new_state_site)){ - new_state_site <- matrix(new_state_site) - } - list( - runid = configs$runs$id, - start.time = strptime(obs.times[t -1], format = "%Y-%m-%d %H:%M:%S") + lubridate::second(lubridate::hms("00:00:01")), - stop.time = strptime(obs.times[t], format ="%Y-%m-%d %H:%M:%S"), - settings = settings, - new.state = new_state_site, - new.params = new.params, - inputs = inputs, - RENAME = TRUE, - ensemble.id = settings$ensemble$ensemble.id - ) - }) - } else { ## t == 1 - restart.list <- vector("list", length(conf.settings)) + obs.t <- as.character(lubridate::date(obs.times[t])) + obs.year <- lubridate::year(obs.t) + PEcAn.logger::logger.info(paste("Processing date:", obs.t)) + ###-------------------------------------------------------------------------### + ### Taking care of Forecast. Splitting / Writting / running / reading back### + ###-------------------------------------------------------------------------###----- + #- Check to see if this is the first run or not and what inputs needs to be sent to write.ensemble configs + if (t>1){ + #for next time step split the met if model requires + #-Splitting the input for the models that they don't care about the start and end time of simulations and they run as long as their met file. + inputs.split <- metSplit(conf.settings, inputs, settings, model, no_split = FALSE, obs.times, t, nens, restart_flag = FALSE, my.split_inputs) + #---------------- setting up the restart argument for each site separately and keeping them in a list + restart.list <- + furrr::future_pmap(list(out.configs, conf.settings %>% `class<-`(c("list")), params.list, inputs.split), + function(configs, settings, new.params, inputs) { + #if the new state for each site only has one row/col. + #then we need to convert it to matrix to solve the indexing issue. + new_state_site <- new.state[, which(attr(X, "Site") %in% settings$run$site$id)] + if(is.vector(new_state_site)){ + new_state_site <- matrix(new_state_site) + } + list( + runid = configs$runs$id, + start.time = strptime(obs.times[t -1], format = "%Y-%m-%d %H:%M:%S") + lubridate::second(lubridate::hms("00:00:01")), + stop.time = strptime(obs.times[t], format ="%Y-%m-%d %H:%M:%S"), + settings = settings, + new.state = new_state_site, + new.params = new.params, + inputs = inputs, + RENAME = TRUE, + ensemble.id = settings$ensemble$ensemble.id + ) + }) + } else { ## t == 1 + restart.list <- vector("list", length(conf.settings)) + } + #add flag for restart t=1 to skip model runs + if(restart_flag & t == 1){ + #for restart when t=1 do not need to do model runs and X should already exist in environment by this point + X <- X + } else { + # writing configs for each settings + # here we use the foreach instead of furrr + # because for some reason, the furrr has problem returning the sample paths. + PEcAn.logger::logger.info("Writting configs!") + cl <- parallel::makeCluster(parallel::detectCores() - 1) + doSNOW::registerDoSNOW(cl) + temp.settings <- NULL + restart.arg <- NULL + out.configs <- foreach::foreach(temp.settings = as.list(conf.settings), + restart.arg = restart.list, + .packages = c("Kendall", + "purrr", + "PEcAn.uncertainty", + paste0("PEcAn.", model), + "PEcAnAssimSequential")) %dopar% { + temp <- PEcAn.uncertainty::write.ensemble.configs( + input_design = input_design, + ensemble.size = nens, + defaults = temp.settings$pfts, + ensemble.samples = ensemble.samples, + settings = temp.settings, + model = temp.settings$model$type, + write.to.db = temp.settings$database$bety$write, + restart = restart.arg, + # samples=inputs, + rename = TRUE + ) + return(temp) + } %>% stats::setNames(site.ids) + parallel::stopCluster(cl) + foreach::registerDoSEQ() + # update the file paths of different inputs when t = 1. + if (t == 1) { + inputs <- out.configs %>% purrr::map(~.x$samples) } - #add flag for restart t=1 to skip model runs - if(restart_flag & t == 1){ - #for restart when t=1 do not need to do model runs and X should already exist in environment by this point - X <- X - }else{ - if (control$debug) browser() - - out.configs <-furrr::future_pmap(list(conf.settings %>% `class<-`(c("list")),restart.list, inputs), function(settings, restart.arg, inputs) { - # Loading the model package - this is required bc of the furrr - library(paste0("PEcAn.",settings$model$type), character.only = TRUE) - # wrtting configs for each settings - this does not make a difference with the old code - PEcAn.uncertainty::write.ensemble.configs( - ensemble.size = nens, - defaults = settings$pfts, - ensemble.samples = ensemble.samples, - settings = settings, - model = settings$model$type, - write.to.db = settings$database$bety$write, - restart = restart.arg, - samples=inputs, - rename = TRUE - ) - }) %>% - stats::setNames(site.ids) - - #if it's a rabbitmq job sumbmission, we will first copy and paste the whole run folder within the SDA to the remote host. - if (!is.null(settings$host$rabbitmq)) { - settings$host$rabbitmq$prefix <- paste0(obs.year, ".nc") - cp2cmd <- gsub("@RUNDIR@", settings$host$rundir, settings$host$rabbitmq$cp2cmd) - try(system(cp2cmd, intern = TRUE)) - } - - #I'm rewriting the runs because when I use the parallel approach for writing configs the run.txt will get messed up; because multiple cores want to write on it at the same time. - runs.tmp <- list.dirs(rundir, full.names = F) - runs.tmp <- runs.tmp[grepl("ENS-*|[0-9]", runs.tmp)] - writeLines(runs.tmp[runs.tmp != ''], file.path(rundir, 'runs.txt')) - paste(file.path(rundir, 'runs.txt')) ## testing - Sys.sleep(0.01) ## testing - if(control$parallel_qsub){ - if (is.null(control$jobs.per.file)) { - PEcAn.remote::qsub_parallel(settings, prefix = paste0(obs.year, ".nc")) - } else { - PEcAn.remote::qsub_parallel(settings, files=PEcAn.remote::merge_job_files(settings, control$jobs.per.file), prefix = paste0(obs.year, ".nc")) - } - }else{ - PEcAn.workflow::start_model_runs(settings, write=settings$database$bety$write) + #if it's a rabbitmq job sumbmission, we will first copy and paste the whole run folder within the SDA to the remote host. + if (!is.null(settings$host$rabbitmq)) { + settings$host$rabbitmq$prefix <- paste0(obs.year, ".nc") + cp2cmd <- gsub("@RUNDIR@", rundir, settings$host$rabbitmq$cp2cmd) + try(system(cp2cmd, intern = TRUE)) + } + # get ensemble ids for each site. + ensemble.ids <- site.ids %>% furrr::future_map(function(i){ + run.list <- c() + for (j in 1:nens) { + run.list <- c(run.list, paste0("ENS-", sprintf("%05d", j), "-", i)) } - #------------- Reading - every iteration and for SDA - #put building of X into a function that gets called - max_t <- 0 - while("try-error" %in% class( - try(reads <- build_X(out.configs = out.configs, - settings = settings, - new.params = new.params, - nens = nens, - read_restart_times = read_restart_times, - outdir = outdir, - t = t, - var.names = var.names, - my.read_restart = my.read_restart, - restart_flag = restart_flag), silent = T)) - ){ - Sys.sleep(10) - max_t <- max_t + 1 - if(max_t > 3){ - PEcAn.logger::logger.info("Can't find outputed NC file! Please rerun the code!") - break - return(0) - } - PEcAn.logger::logger.info("Empty folder, try again!") + return(run.list)}, .progress = F) %>% unlist + # create folder paths to each ensemble of each site. + runs.tmp <- file.path(rundir, ensemble.ids) + # start model runs. + PEcAn.logger::logger.info("Running models!") + # if we want to submit jobs through the combined job file. + if(control$execution == "qsub_parallel"){ + if (is.null(control$jobs.per.file)) { + PEcAn.remote::qsub_parallel(settings, prefix = paste0(obs.year, ".nc")) + } else { + PEcAn.remote::qsub_parallel(settings, files=PEcAn.remote::merge_job_files(settings, control$jobs.per.file), prefix = paste0(obs.year, ".nc")) } - - if (control$debug) browser() - #let's read the parameters of each site/ens - params.list <- reads %>% purrr::map(~.x %>% purrr::map("params")) - # Now let's read the state variables of site/ens - #don't need to build X when t=1 - X <- reads %>% purrr::map(~.x %>% purrr::map_df(~.x[["X"]] %>% t %>% as.data.frame)) - - - #replacing crazy outliers before it's too late - if (control$OutlierDetection){ - X <- outlier.detector.boxplot(X) - PEcAn.logger::logger.info("Outlier Detection.") - } - - # Now we have a matrix that columns are state variables and rows are ensembles. - # this matrix looks like this - # GWBI AbvGrndWood GWBI AbvGrndWood - #[1,] 3.872521 37.2581 3.872521 37.2581 - # But therer is an attribute called `Site` which tells yout what column is for what site id - check out attr (X,"Site") - if (multi.site.flag){ - X <- X %>% - purrr::map_dfc(~.x) %>% - as.matrix() %>% - `colnames<-`(c(rep(var.names, length(X)))) %>% - `attr<-`('Site',c(rep(site.ids, each=length(var.names)))) + } else if (control$execution == "local") { + # if we want to execute jobs locally. + job.files <- file.path(runs.tmp, "job.sh") + temp <- job.files %>% furrr::future_map(function(f){ + cmd <- paste0("cd ", dirname(f), ";./job.sh") + system(cmd, intern = F, ignore.stdout = T, ignore.stderr = T) + }, .progress = F) + } else if (control$execution == "qsub") { + # if we want to submit jobs through the regular job submission function. + PEcAn.workflow::start_model_runs(settings, write=settings$database$bety$write) + } + # Reading model outputs. + PEcAn.logger::logger.info("Reading forecast outputs!") + reads <- build_X(out.configs = out.configs, + settings = settings, + new.params = new.params, + nens = nens, + read_restart_times = read_restart_times, + outdir = outdir, + t = t, + var.names = var.names, + my.read_restart = my.read_restart, + restart_flag = restart_flag) + #let's read the parameters of each site/ens + params.list <- reads %>% purrr::map(~.x %>% purrr::map("params")) + # Now let's read the state variables of site/ens + #don't need to build X when t=1 + X <- reads %>% purrr::map(~.x %>% purrr::map_df(~.x[["X"]] %>% t %>% as.data.frame)) + #replacing crazy outliers before it's too late + if (control$OutlierDetection){ + X <- outlier.detector.boxplot(X) + PEcAn.logger::logger.info("Outlier Detection.") + } + # convert from forecast list to data frame. + X <- seq_along(X) %>% furrr::future_map(function(i){ + temp <- do.call(cbind, X[i]) + colnames(temp) <- paste0(var.names, ".", i) + return(temp) + }) %>% + dplyr::bind_cols() %>% + `colnames<-`(c(rep(var.names, length(X)))) %>% + `attr<-`('Site',c(rep(site.ids, each=length(var.names)))) + } ## end else from restart & t==1 + # start debiasing. + if (!is.null(debias$start.year)) { + debias.out <- NULL + if (obs.year >= debias$start.year) { + PEcAn.logger::logger.info("Start debiasing!") + debias.out <- sda_bias_correction(site.locs, + t, pre.X, X, + obs.mean, + state.interval, + debias$cov.dir, + pre.states, + .get_debias_mod) + X <- debias.out$X + pre.states <- debias.out$pre.states + } + } + FORECAST[[obs.t]] <- pre.X <- X + ###-------------------------------------------------------------------### + ### preparing OBS ### + ###-------------------------------------------------------------------###---- + #To trigger the analysis function with free run, you need to first specify the control$forceRun as TRUE, + #Then specify the settings$state.data.assimilation$scalef as 0, and settings$state.data.assimilation$free.run as TRUE. + if (!is.null(obs.mean[[t]][[1]]) | (as.logical(settings$state.data.assimilation$free.run) & control$forceRun)) { + #decide if we want to estimate the process variance and choose the according function. + if(processvar == FALSE) { + an.method <- EnKF + } else if (processvar == TRUE && settings$state.data.assimilation$q.type %in% c("SINGLE", "SITE")) { + an.method <- GEF.MultiSite + } + #initialize MCMC arguments. + if (is.null(control$MCMC.args)) { + MCMC.args <- list(niter = 1e5, + nthin = 10, + nchain = 1, + nburnin = 5e4) + } else { + MCMC.args <- control$MCMC.args + } + #decide if we want the block analysis function or multi-site analysis function. + if (processvar == TRUE && settings$state.data.assimilation$q.type %in% c("vector", "wishart")) { + #initialize block.list.all. + if (t == 1 | !exists("block.list.all")) { + block.list.all <- obs.mean %>% purrr::map(function(l){NULL}) } - - } ## end else from restart & t==1 - - - - raw_mean_t <- colMeans(X) - site_index <- attr(X, "Site") - col_vars <- colnames(X) - FORECAST[[obs.t]] <- X - name_map <- debias_name_map - # DEBIAS STEP - if (.should_debias(t, debias_enabled, obs.year, debias_start_year)) { # <- removed extra ')' - # Load only the years needed for this step (t-1 and t) - yrs_needed <- c(lubridate::year(obs.times[t - 1]), lubridate::year(obs.times[t])) - covariates_df_tt <- .cov_df_for_years(yrs_needed) - - out <- sda_apply_debias_step( - t = t, - obs.t = obs.t, - X = X, - raw_prev = raw_prev, - raw_mean_t = raw_mean_t, - site_index = site_index, - col_vars = col_vars, - obs.times = obs.times, - obs.mean = obs.mean, - covariates_df = covariates_df_tt, # << use the per-step covariates - py = py, - train_buf = train_buf, - name_map = name_map, - drop_incomplete_covariates = debias_drop_incomplete_covariates, - enforce_consistent_obs = debias_enforce_consistent_obs, - require_obs_at_t_for_predict = debias_require_obs_at_t_for_predict, - state.interval = state.interval, - clip_lower_bound = 0.01 - ) - X <- out$X - if (!is.null(out$weights_entry)) DEBIAS_WEIGHTS[[obs.t]] <- out$weights_entry - if (nrow(out$weights_df_rows)) DEBIAS_WEIGHTS_DF <- rbind(DEBIAS_WEIGHTS_DF, out$weights_df_rows) - DIAG[[obs.t]] <- out$diag - if (nrow(out$rmse_rows)) RMSE_DF <- rbind(RMSE_DF, out$rmse_rows) - if (!is.null(out$feature_rows) && nrow(out$feature_rows)) { - FEATURE_IMP_DF <- rbind(FEATURE_IMP_DF, out$feature_rows) + #running analysis function. + enkf.params[[obs.t]] <- analysis_sda_block(settings, block.list.all, X, obs.mean, obs.cov, t, nt, MCMC.args, pre_enkf_params) + enkf.params[[obs.t]] <- c(enkf.params[[obs.t]], RestartList = list(restart.list %>% stats::setNames(site.ids))) + block.list.all <- enkf.params[[obs.t]]$block.list.all + #Forecast + mu.f <- enkf.params[[obs.t]]$mu.f + Pf <- enkf.params[[obs.t]]$Pf + #Analysis + Pa <- enkf.params[[obs.t]]$Pa + mu.a <- enkf.params[[obs.t]]$mu.a + } else if (exists("an.method")) { + #Making R and Y + Obs.cons <- Construct.R(site.ids, var.names, obs.mean[[t]], obs.cov[[t]]) + Y <- Obs.cons$Y + R <- Obs.cons$R + if (length(Y) > 1) { + PEcAn.logger::logger.info("The zero variances in R and Pf is being replaced by half and one fifth of the minimum variance in those matrices respectively.") + diag(R)[which(diag(R)==0)] <- min(diag(R)[which(diag(R) != 0)])/2 } - - FORECAST[[obs.t]] <- X - } - raw_prev <- raw_mean_t - - - ###-------------------------------------------------------------------### - ### preparing OBS ### - ###-------------------------------------------------------------------###---- - #To trigger the analysis function with free run, you need to first specify the control$forceRun as TRUE, - #Then specify the settings$state.data.assimilation$scalef as 0, and settings$state.data.assimilation$free.run as TRUE. - if (!is.null(obs.mean[[t]][[1]]) | (as.logical(settings$state.data.assimilation$free.run) & control$forceRun)) { - # TODO: as currently configured, Analysis runs even if all obs are NA, - # which clearly should be triggering the `else` of this if, but the - # `else` has not been invoked in a while an may need updating - - - #decide if we want to estimate the process variance and choose the according function. - if(processvar == FALSE) { - an.method<-EnKF - } else if (processvar == TRUE && settings$state.data.assimilation$q.type %in% c("SINGLE", "SITE")) { - an.method<-GEF.MultiSite + # making the mapping operator + H <- Construct.H.multisite(site.ids, var.names, obs.mean[[t]]) + #Pass aqq and bqq. + aqq <- NULL + bqq <- numeric(nt + 1) + Pf <- NULL + #if t>1 + if(is.null(pre_enkf_params) && t>1){ + aqq <- enkf.params[[t-1]]$aqq + bqq <- enkf.params[[t-1]]$bqq + X.new<-enkf.params[[t-1]]$X.new } - - #decide if we want the block analysis function or multi-site analysis function. - if (processvar == TRUE && settings$state.data.assimilation$q.type %in% c("vector", "wishart")) { - #initialize block.list.all. - if (t == 1 | !exists("block.list.all")) { - block.list.all <- obs.mean %>% purrr::map(function(l){NULL}) - } - #initialize MCMC arguments. - if (is.null(control$MCMC.args)) { - MCMC.args <- list(niter = 1e5, - nthin = 10, - nchain = 3, - nburnin = 5e4) - } else { - MCMC.args <- control$MCMC.args - } - #running analysis function. - enkf.params[[obs.t]] <- analysis_sda_block(settings, block.list.all, X, obs.mean, obs.cov, t, nt, MCMC.args, pre_enkf_params) - enkf.params[[obs.t]] <- c(enkf.params[[obs.t]], RestartList = list(restart.list %>% stats::setNames(site.ids))) - block.list.all <- enkf.params[[obs.t]]$block.list.all - #Forecast - mu.f <- enkf.params[[obs.t]]$mu.f - Pf <- enkf.params[[obs.t]]$Pf - #Analysis - Pa <- enkf.params[[obs.t]]$Pa - mu.a <- enkf.params[[obs.t]]$mu.a - } else if (exists("an.method")) { - #Making R and Y - Obs.cons <- Construct.R(site.ids, var.names, obs.mean[[t]], obs.cov[[t]]) - Y <- Obs.cons$Y - R <- Obs.cons$R - if (length(Y) > 1) { - PEcAn.logger::logger.info("The zero variances in R and Pf is being replaced by half and one fifth of the minimum variance in those matrices respectively.") - diag(R)[which(diag(R)==0)] <- min(diag(R)[which(diag(R) != 0)])/2 - } - # making the mapping operator - H <- Construct.H.multisite(site.ids, var.names, obs.mean[[t]]) - #Pass aqq and bqq. - aqq <- NULL - bqq <- numeric(nt + 1) - Pf <- NULL - #if t>1 - if(is.null(pre_enkf_params) && t>1){ - aqq <- enkf.params[[t-1]]$aqq - bqq <- enkf.params[[t-1]]$bqq - X.new<-enkf.params[[t-1]]$X.new - } - if(!is.null(pre_enkf_params) && t>1){ - aqq <- pre_enkf_params[[t-1]]$aqq - bqq <- pre_enkf_params[[t-1]]$bqq - X.new<-pre_enkf_params[[t-1]]$X.new - } - if(!is.null(pre_enkf_params)){ - Pf <- pre_enkf_params[[t]]$Pf - } - recompileTobit = !exists('Cmcmc_tobit2space') - recompileGEF = !exists('Cmcmc') - #weight list - # This reads ensemble weights generated by `get_ensemble_weights` function from assim.sequential package - weight_list <- list() - if(!file.exists(file.path(settings$outdir, "ensemble_weights.Rdata"))){ - PEcAn.logger::logger.warn("ensemble_weights.Rdata cannot be found. Make sure you generate samples by running the get.ensemble.weights function before running SDA if you want the ensembles to be weighted.") - #create null list - for(tt in 1:length(obs.times)){ - weight_list[[tt]] <- rep(1,nens) #no weights - } - } else{ - load(file.path(settings$outdir, "ensemble_weights.Rdata")) ## loads ensemble.samples - } - wts <- unlist(weight_list[[t]]) - #-analysis function - enkf.params[[obs.t]] <- GEF.MultiSite( - settings, - FUN = an.method, - Forecast = list(Q = Q, X = X), - Observed = list(R = R, Y = Y), - H = H, - extraArg = list( - aqq = aqq, - bqq = bqq, - Pf = Pf, - t = t, - nitr.GEF = nitr.GEF, - nthin = nthin, - nburnin = nburnin, - censored.data = censored.data, - recompileGEF = recompileGEF, - recompileTobit = recompileTobit, - wts = wts - ), - choose = choose, - nt = nt, - obs.mean = obs.mean, - nitr = 100000, - nburnin = 10000, - obs.cov = obs.cov, - site.ids = site.ids, - blocked.dis = blocked.dis, - distances = distances - ) - tictoc::tic(paste0("Preparing for Adjustment for cycle = ", t)) - #Forecast - mu.f <- enkf.params[[obs.t]]$mu.f - Pf <- enkf.params[[obs.t]]$Pf - #Analysis - Pa <- enkf.params[[obs.t]]$Pa - mu.a <- enkf.params[[obs.t]]$mu.a - #extracting extra outputs - if (control$debug) browser() - if (processvar) { - aqq <- enkf.params[[obs.t]]$aqq - bqq <- enkf.params[[obs.t]]$bqq - } - # Adding obs elements to the enkf.params - #This can later on help with diagnostics - enkf.params[[obs.t]] <- - c( - enkf.params[[obs.t]], - R = list(R), - Y = list(Y), - RestartList = list(restart.list %>% stats::setNames(site.ids)) - ) + if(!is.null(pre_enkf_params) && t>1){ + aqq <- pre_enkf_params[[t-1]]$aqq + bqq <- pre_enkf_params[[t-1]]$bqq + X.new<-pre_enkf_params[[t-1]]$X.new } - - ###-------------------------------------------------------------------### - ### Trace ### - ###-------------------------------------------------------------------###---- - #-- writing Trace-------------------- - if(control$trace) { - PEcAn.logger::logger.warn ("\n --------------------------- ",obs.year," ---------------------------\n") - PEcAn.logger::logger.warn ("\n --------------Obs mean----------- \n") - print(enkf.params[[obs.t]]$Y) - PEcAn.logger::logger.warn ("\n --------------Obs Cov ----------- \n") - print(enkf.params[[obs.t]]$R) - PEcAn.logger::logger.warn ("\n --------------Forecast mean ----------- \n") - print(enkf.params[[obs.t]]$mu.f) - PEcAn.logger::logger.warn ("\n --------------Forecast Cov ----------- \n") - print(enkf.params[[obs.t]]$Pf) - PEcAn.logger::logger.warn ("\n --------------Analysis mean ----------- \n") - print(t(enkf.params[[obs.t]]$mu.a)) - PEcAn.logger::logger.warn ("\n --------------Analysis Cov ----------- \n") - print(enkf.params[[obs.t]]$Pa) - PEcAn.logger::logger.warn ("\n ------------------------------------------------------\n") + if(!is.null(pre_enkf_params)){ + Pf <- pre_enkf_params[[t]]$Pf } - if (control$debug) browser() - if (control$pause) readline(prompt="Press [enter] to continue \n") - } else { - ###-------------------------------------------------------------------### - ### No Observations -- ###---- - ###-----------------------------------------------------------------### - ### no process variance -- forecast is the same as the analysis ### - if (processvar==FALSE) { - mu.a <- mu.f - Pa <- Pf + Q - ### yes process variance -- no data - } else { - mu.f <- colMeans(X) #mean Forecast - This is used as an initial condition - mu.a <- mu.f - if(is.null(Q)){ - q.bar <- diag(ncol(X)) - PEcAn.logger::logger.warn('Process variance not estimated. Analysis has been given uninformative process variance') + recompileTobit = !exists('Cmcmc_tobit2space') + recompileGEF = !exists('Cmcmc') + #weight list + # This reads ensemble weights generated by `get_ensemble_weights` function from assim.sequential package + weight_list <- list() + if(!file.exists(file.path(settings$outdir, "ensemble_weights.Rdata"))){ + PEcAn.logger::logger.warn("ensemble_weights.Rdata cannot be found. Make sure you generate samples by running the get.ensemble.weights function before running SDA if you want the ensembles to be weighted.") + #create null list + for(tt in 1:length(obs.times)){ + weight_list[[tt]] <- rep(1,nens) #no weights } - # Pa <- Pf + matrix(solve(q.bar), dim(Pf)[1], dim(Pf)[2]) - #will throw an error when q.bar and Pf are different sizes i.e. when you are running with no obs and do not variance for all state variables - #Pa <- Pf + solve(q.bar) - #hack have Pa = Pf for now - # if(!is.null(pre_enkf_params)){ - # Pf <- pre_enkf_params[[t]]$Pf - # }else{ - # Pf <- stats::cov(X) # Cov Forecast - This is used as an initial condition - # } - Pf <- stats::cov(X) - Pa <- Pf + } else{ + load(file.path(settings$outdir, "ensemble_weights.Rdata")) ## loads ensemble.samples + } + wts <- unlist(weight_list[[t]]) + #-analysis function + enkf.params[[obs.t]] <- GEF.MultiSite( + settings, + FUN = an.method, + Forecast = list(Q = Q, X = X), + Observed = list(R = R, Y = Y), + H = H, + extraArg = list( + aqq = aqq, + bqq = bqq, + Pf = Pf, + t = t, + nitr.GEF = MCMC.args$niter, + nthin = MCMC.args$nthin, + nburnin = MCMC.args$nburnin, + censored.data = censored.data, + recompileGEF = recompileGEF, + recompileTobit = recompileTobit, + wts = wts + ), + choose = choose, + nt = nt, + obs.mean = obs.mean, + nitr = 100000, + nburnin = 10000, + obs.cov = obs.cov, + site.ids = site.ids, + blocked.dis = blocked.dis, + distances = distances + ) + tictoc::tic(paste0("Preparing for Adjustment for cycle = ", t)) + #Forecast + mu.f <- enkf.params[[obs.t]]$mu.f + Pf <- enkf.params[[obs.t]]$Pf + #Analysis + Pa <- enkf.params[[obs.t]]$Pa + mu.a <- enkf.params[[obs.t]]$mu.a + #extracting extra outputs + if (processvar) { + aqq <- enkf.params[[obs.t]]$aqq + bqq <- enkf.params[[obs.t]]$bqq } - enkf.params[[obs.t]] <- list(mu.f = mu.f, Pf = Pf, mu.a = mu.a, Pa = Pa) + # Adding obs elements to the enkf.params + #This can later on help with diagnostics + enkf.params[[obs.t]] <- + c( + enkf.params[[obs.t]], + R = list(R), + Y = list(Y), + RestartList = list(restart.list %>% stats::setNames(site.ids)) + ) } - + } else { ###-------------------------------------------------------------------### - ### adjust/update state matrix ### - ###-------------------------------------------------------------------###---- - tictoc::tic(paste0("Adjustment for cycle = ", t)) - if(adjustment == TRUE){ - analysis <-adj.ens(Pf, X, mu.f, mu.a, Pa) + ### No Observations -- ###---- + ###-----------------------------------------------------------------### + ### no process variance -- forecast is the same as the analysis ### + if (processvar==FALSE) { + mu.a <- mu.f + Pa <- Pf + Q + ### yes process variance -- no data } else { - analysis <- as.data.frame(mvtnorm::rmvnorm(as.numeric(nrow(X)), mu.a, Pa, method = "svd")) - } - colnames(analysis) <- colnames(X) - ##### Mapping analysis vectors to be in bounds of state variables - for(i in 1:ncol(analysis)){ - int.save <- state.interval[which(colnames(analysis)[i]==var.names),] - analysis[analysis[,i] < int.save[1],i] <- int.save[1] - analysis[analysis[,i] > int.save[2],i] <- int.save[2] - } - ## in the future will have to be separated from analysis - - new.state <- as.data.frame(analysis) - ANALYSIS[[obs.t]] <- analysis - ens_weights[[obs.t]] <- PEcAnAssimSequential::sda_weights_site(FORECAST, ANALYSIS, t, as.numeric(settings$ensemble$size)) - ###-------------------------------------------------------------------### - ### save outputs ### - ###-------------------------------------------------------------------###---- - Viz.output <- list(settings, obs.mean, obs.cov) #keeping obs data and settings for later visualization in Dashboard - - save(site.locs, - t, - FORECAST, - ANALYSIS, - enkf.params, - new.state, new.params,params.list, ens_weights, - out.configs, ensemble.samples, inputs, Viz.output, - DIAG, DEBIAS_WEIGHTS, DEBIAS_WEIGHTS_DF, RMSE_DF, - FEATURE_IMP_DF, - file = file.path(settings$outdir, "sda.output.Rdata")) - - tictoc::tic(paste0("Visulization for cycle = ", t)) - - #writing down the image - either you asked for it or nor :) - if ((t%%2 == 0 | t == nt) & (control$TimeseriesPlot)){ - if (as.logical(settings$state.data.assimilation$free.run)) { - SDA_timeseries_plot(ANALYSIS, FORECAST, obs.mean, obs.cov, settings$outdir, by = "var", types = c("FORECAST", "ANALYSIS")) - } else { - SDA_timeseries_plot(ANALYSIS, FORECAST, obs.mean, obs.cov, settings$outdir, by = "var", types = c("FORECAST", "ANALYSIS", "OBS")) + mu.f <- colMeans(X) #mean Forecast - This is used as an initial condition + mu.a <- mu.f + if(is.null(Q)){ + q.bar <- diag(ncol(X)) + PEcAn.logger::logger.warn('Process variance not estimated. Analysis has been given uninformative process variance') } + # Pa <- Pf + matrix(solve(q.bar), dim(Pf)[1], dim(Pf)[2]) + #will throw an error when q.bar and Pf are different sizes i.e. when you are running with no obs and do not variance for all state variables + #Pa <- Pf + solve(q.bar) + #hack have Pa = Pf for now + # if(!is.null(pre_enkf_params)){ + # Pf <- pre_enkf_params[[t]]$Pf + # }else{ + # Pf <- stats::cov(X) # Cov Forecast - This is used as an initial condition + # } + Pf <- stats::cov(X) + Pa <- Pf } - #Saving the profiling result - if (control$Profiling) alltocs(file.path(settings$outdir,"SDA", "Profiling.csv")) + enkf.params[[obs.t]] <- list(mu.f = mu.f, Pf = Pf, mu.a = mu.a, Pa = Pa) + } + ###-------------------------------------------------------------------### + ### adjust/update state matrix ### + ###-------------------------------------------------------------------###---- + tictoc::tic(paste0("Adjustment for cycle = ", t)) + if(adjustment == TRUE){ + analysis <-adj.ens(Pf, X, mu.f, mu.a, Pa) + } else { + # if we don't have the analysis from the analysis function. + if (is.null(enkf.params[[obs.t]]$analysis)) { + analysis <- as.data.frame(mvtnorm::rmvnorm(as.numeric(nrow(X)), mu.a, Pa, method = "svd")) + } else { + analysis <- enkf.params[[obs.t]]$analysis + } + } + colnames(analysis) <- colnames(X) + ##### Mapping analysis vectors to be in bounds of state variables + for(i in 1:ncol(analysis)){ + int.save <- state.interval[which(colnames(analysis)[i]==var.names),] + analysis[analysis[,i] < int.save[1],i] <- int.save[1] + analysis[analysis[,i] > int.save[2],i] <- int.save[2] + } + ## in the future will have to be separated from analysis + + new.state <- as.data.frame(analysis) + ANALYSIS[[obs.t]] <- analysis + ens_weights[[obs.t]] <- PEcAnAssimSequential::sda_weights_site(FORECAST, ANALYSIS, t, as.numeric(settings$ensemble$size)) + ###-------------------------------------------------------------------### + ### save outputs ### + ###-------------------------------------------------------------------###---- + Viz.output <- list(settings, obs.mean, obs.cov) #keeping obs data and settings for later visualization in Dashboard + # save SDA outputs. + save(site.locs, + t, + FORECAST, + ANALYSIS, + enkf.params, + new.state, new.params,params.list, ens_weights, + out.configs, ensemble.samples, inputs, Viz.output, + debias.out, + file = file.path(settings$outdir, "sda.output.Rdata")) + tictoc::tic(paste0("Visulization for cycle = ", t)) + # writing down the image - either you asked for it or nor :) + if ((t%%2 == 0 | t == nt) & (control$TimeseriesPlot)){ + if (as.logical(settings$state.data.assimilation$free.run)) { + SDA_timeseries_plot(ANALYSIS, FORECAST, obs.mean, obs.cov, settings$outdir, by = "var", types = c("FORECAST", "ANALYSIS")) + } else { + SDA_timeseries_plot(ANALYSIS, FORECAST, obs.mean, obs.cov, settings$outdir, by = "var", types = c("FORECAST", "ANALYSIS", "OBS")) + } + } # remove files as SDA runs if (!(control$keepNC) && t == 1){ unlink(list.files(outdir, "*.nc", recursive = TRUE, full.names = TRUE)) @@ -950,12 +724,23 @@ sda.enkf.multisite <- function(settings, system2(sendmail, c("-f", paste0("\"", control$send_email$from, "\""), paste0("\"", control$send_email$to, "\""), "<", mailfile)) unlink(mailfile) } - gc() - # useful for debugging to keep .nc files for assimilated years. T = 2, because this loops removes the files that were run when starting the next loop -# if (keepNC && t == 1){ -# unlink(list.files(outdir, "*.nc", recursive = TRUE, full.names = TRUE)) -# } - ## MCD: I commented the above "if" out because if you are restarting from a previous forecast, this might delete the files in that earlier folder + gc() } ### end loop over time -} # sda.enkf - + # merge NC files. + if (control$merge_nc) { + nc.folder <- file.path(settings$outdir, "merged_nc") + if (file.exists(nc.folder)) unlink(nc.folder) + dir.create(nc.folder) + temp <- PEcAn.utils::nc_merge_all_sites_by_year(model.outdir = outdir, + nc.outdir = nc.folder, + ens.num = nens, + site.ids = as.numeric(site.ids), + start.date = obs.times[1], + end.date = obs.times[length(obs.times)], + time.step = paste(1, settings$state.data.assimilation$forecast.time.step), + cores = parallel::detectCores() - 1) + # remove rundir and outdir. + unlink(rundir, recursive = T) + unlink(outdir, recursive = T) + } +} # sda.enkf \ No newline at end of file diff --git a/modules/assim.sequential/R/sda.enkf_parallel.R b/modules/assim.sequential/R/sda.enkf_parallel.R index ff802cc2ca7..a2e3a105bec 100644 --- a/modules/assim.sequential/R/sda.enkf_parallel.R +++ b/modules/assim.sequential/R/sda.enkf_parallel.R @@ -9,7 +9,7 @@ #' @param obs.cov Lists of date times named by time points, which contains lists of sites named by site ids, which contains observation covariances for all state variables of each site for each time point. #' @param Q Process covariance matrix given if there is no data to estimate it. #' @param pre_enkf_params Used for passing pre-existing time-series of process error into the current SDA runs to ignore the impact by the differences between process errors. -#' @param ensemble.samples Pass ensemble.samples from outside to avoid GitHub check issues. +#' @param ensemble.samples list of ensemble parameters across PFTs. Default is NULL. #' @param outdir physical path to the folder that stores the SDA outputs. Default is NULL. #' @param control List of flags controlling the behavior of the SDA. #' `TimeseriesPlot` for post analysis examination; @@ -18,6 +18,11 @@ #' `keepNC` decide if we want to keep the NetCDF files inside the out directory; #' `forceRun` decide if we want to proceed the Bayesian MCMC sampling without observations; #' `MCMC.args` include lists for controling the MCMC sampling process (iteration, nchains, burnin, and nthin.). +#' `merge_nc` determine if we want to merge all netCDF files across sites and ensembles. +#' If it's set as `TRUE`, we will then combine all netCDF files into the `merged_nc` folder within the `outdir`. +#' @param debias List: R list containing the covariance directory and the start year. +#' covariance directory should include GeoTIFF files named by year. +#' start year is numeric input which decide when to start the debiasing feature. #' #' @return NONE #' @export @@ -34,20 +39,23 @@ sda.enkf_local <- function(settings, send_email = NULL, keepNC = TRUE, forceRun = TRUE, - MCMC.args = NULL)) { - # initialize parallel. - if (future::supportsMulticore()) { - future::plan(future::multicore) - } else { - future::plan(future::multisession) - } + MCMC.args = NULL, + merge_nc = TRUE), + debias = list(cov.dir = NULL, start.year = NULL)) { # grab cores from settings. cores <- as.numeric(settings$state.data.assimilation$batch.settings$general.job$cores) # if we didn't assign number of CPUs in the settings. - if (is.null(cores)) { - cores <- parallel::detectCores() - 1 - # if we only have one CPU. - if (cores < 1) cores <- 1 + if (length(cores) == 0 | is.null(cores)) { + cores <- parallel::detectCores() + } + cores <- cores - 1 + # if we only have one CPU. + if (cores < 1) cores <- 1 + # initialize parallel. + if (future::supportsMulticore()) { + future::plan(future::multicore, workers = cores) + } else { + future::plan(future::multisession, workers = cores) } # Tweak outdir if it's specified from outside. if (!is.null(outdir)) { @@ -140,7 +148,6 @@ sda.enkf_local <- function(settings, register.xml <- system.file(paste0("register.", model, ".xml"), package = paste0("PEcAn.", model)) register <- XML::xmlToList(XML::xmlParse(register.xml)) no_split <- !as.logical(register$exact.dates) - if (!exists(my.split_inputs) & !no_split) { PEcAn.logger::logger.warn(my.split_inputs, "does not exist") PEcAn.logger::logger.severe("please make sure that the PEcAn interface is loaded for", model) @@ -193,40 +200,33 @@ sda.enkf_local <- function(settings, } #reformatting params new.params <- sda_matchparam(settings, ensemble.samples, site.ids, nens) - #sample met ensemble members - #sample all inputs specified in the settings$ensemble - #now looking into the xml - samp <- conf.settings$ensemble$samplingspace - #finding who has a parent - parents <- lapply(samp,'[[', 'parent') - #order parents based on the need of who has to be first - order <- names(samp)[lapply(parents, function(tr) which(names(samp) %in% tr)) %>% unlist()] - #new ordered sampling space - samp.ordered <- samp[c(order, names(samp)[!(names(samp) %in% order)])] - #performing the sampling - inputs <- vector("list", length(conf.settings)) - # For the tags specified in the xml I do the sampling - for (s in seq_along(conf.settings)){ - if (is.null(inputs[[s]])) { - inputs[[s]] <- list() - } - for (i in seq_along(samp.ordered)){ - #call the function responsible for generating the ensemble - inputs[[s]][[names(samp.ordered)[i]]] <- PEcAn.uncertainty::input.ens.gen(settings=conf.settings[[s]], - input=names(samp.ordered)[i], - method=samp.ordered[[i]]$method, - parent_ids=NULL) + # get the joint input design. + for (i in seq_along(settings)) { + # get the input names that are registered for sampling. + names.sampler <- names(settings$ensemble$samplingspace) + # get the input names for the current site. + names.site.input <- names(settings[[i]]$run$inputs) + # remove parameters field from the list. + names.sampler <- names.sampler[-which(names.sampler == "parameters")] + # find a site that has all registered inputs except for the parameter field. + if (all(names.sampler %in% names.site.input)) { + input_design <- PEcAn.uncertainty::generate_joint_ensemble_design(settings = settings[[i]], + ensemble_samples = ensemble.samples, + ensemble_size = nens)[[1]] + break } } ###------------------------------------------------------------------------------------------------### ### loop over time ### ###------------------------------------------------------------------------------------------------### + # initialize the lists of covariates for the debias feature. + pre.states <- vector("list", length = length(var.names)) %>% purrr::set_names(var.names) for(t in 1:nt){ # initialize dat for saving memory usage. sda.outputs <- FORECAST <- enkf.params <- ANALYSIS <- ens_weights <- list() obs.t <- as.character(lubridate::date(obs.times[t])) obs.year <- lubridate::year(obs.t) - PEcAn.logger::logger.info(paste("Processing Year:", obs.year)) + PEcAn.logger::logger.info(paste("Processing date:", obs.t)) ###-------------------------------------------------------------------------### ### Taking care of Forecast. Splitting / Writting / running / reading back### ###-------------------------------------------------------------------------###----- @@ -276,7 +276,7 @@ sda.enkf_local <- function(settings, new.state = new_state_site, new.params = new.params, inputs = inputs, - RENAME = TRUE, + RENAME = FALSE, ensemble.id = settings$ensemble$ensemble.id ) }) @@ -286,22 +286,41 @@ sda.enkf_local <- function(settings, # release memory. gc() # submit jobs for writing configs. + # writing configs for each settings PEcAn.logger::logger.info("Writting configs!") - out.configs <- furrr::future_pmap(list(conf.settings %>% `class<-`(c("list")), restart.list, inputs), function(settings, restart.arg, inputs) { - # Loading the model package - this is required bc of the furrr - library(paste0("PEcAn.",settings$model$type), character.only = TRUE) - # wrtting configs for each settings - this does not make a difference with the old code - PEcAn.uncertainty::write.ensemble.configs( - defaults = settings$pfts, - ensemble.samples = ensemble.samples, - settings = settings, - model = settings$model$type, - write.to.db = settings$database$bety$write, - restart = restart.arg, - samples=inputs, - rename = F - ) - }) %>% stats::setNames(site.ids) + # here we use the foreach instead of furrr + # because for some reason, the furrr has problem returning the sample paths. + cl <- parallel::makeCluster(cores) + doSNOW::registerDoSNOW(cl) + temp.settings <- NULL + restart.arg <- NULL + out.configs <- foreach::foreach(temp.settings = as.list(conf.settings), + restart.arg = restart.list, + .packages = c("Kendall", + "purrr", + "PEcAn.uncertainty", + paste0("PEcAn.", model), + "PEcAnAssimSequential")) %dopar% { + temp <- PEcAn.uncertainty::write.ensemble.configs( + input_design = input_design, + ensemble.size = nens, + defaults = temp.settings$pfts, + ensemble.samples = ensemble.samples, + settings = temp.settings, + model = temp.settings$model$type, + write.to.db = temp.settings$database$bety$write, + restart = restart.arg, + # samples=inputs, + rename = FALSE + ) + return(temp) + } %>% stats::setNames(site.ids) + parallel::stopCluster(cl) + foreach::registerDoSEQ() + # update the file paths of different inputs when t = 1. + if (t == 1) { + inputs <- out.configs %>% purrr::map(~.x$samples) + } # collect run info. # get ensemble ids for each site. ensemble.ids <- site.ids %>% furrr::future_map(function(i){ @@ -350,7 +369,23 @@ sda.enkf_local <- function(settings, dplyr::bind_cols() %>% `colnames<-`(c(rep(var.names, length(X)))) %>% `attr<-`('Site',c(rep(site.ids, each=length(var.names)))) - FORECAST[[obs.t]] <- X + # start debiasing. + debias.out <- NULL + if (!is.null(debias$start.year)) { + if (obs.year >= debias$start.year) { + PEcAn.logger::logger.info("Start debiasing!") + debias.out <- sda_bias_correction(site.locs, + t, pre.X, X, + obs.mean, + state.interval, + debias$cov.dir, + pre.states, + .get_debias_mod) + X <- debias.out$X + pre.states <- debias.out$pre.states + } + } + FORECAST[[obs.t]] <- pre.X <- X gc() ###-------------------------------------------------------------------### ### preparing OBS ### @@ -384,11 +419,26 @@ sda.enkf_local <- function(settings, #Analysis Pa <- enkf.params[[obs.t]]$Pa mu.a <- enkf.params[[obs.t]]$mu.a + } else { + mu.f <- colMeans(X) #mean Forecast - This is used as an initial condition + mu.a <- mu.f + if(is.null(Q)){ + q.bar <- diag(ncol(X)) + PEcAn.logger::logger.warn('Process variance not estimated. Analysis has been given uninformative process variance') + } + Pf <- stats::cov(X) + Pa <- Pf + enkf.params[[obs.t]] <- list(mu.f = mu.f, Pf = Pf, mu.a = mu.a, Pa = Pa) } ###-------------------------------------------------------------------### ### adjust/update state matrix ### ###-------------------------------------------------------------------###---- - analysis <- enkf.params[[obs.t]]$analysis + # if we don't have the analysis from the analysis function. + if (is.null(enkf.params[[obs.t]]$analysis)) { + analysis <- as.data.frame(mvtnorm::rmvnorm(as.numeric(nrow(X)), mu.a, Pa, method = "svd")) + } else { + analysis <- enkf.params[[obs.t]]$analysis + } enkf.params[[obs.t]]$analysis <- NULL ##### Mapping analysis vectors to be in bounds of state variables for(i in 1:ncol(analysis)){ @@ -410,7 +460,9 @@ sda.enkf_local <- function(settings, enkf.params = enkf.params[[obs.t]], ens_weights[[obs.t]], params.list = params.list, - restart.list = restart.list) + restart.list = restart.list, + debias.out = debias.out) + # save file to the out folder. save(sda.outputs, file = file.path(settings$outdir, paste0("sda.output", t, ".Rdata"))) # remove files as SDA runs @@ -442,6 +494,23 @@ sda.enkf_local <- function(settings, names(analysis.all) <- as.character(lubridate::date(obs.times)) names(forecast.all) <- as.character(lubridate::date(obs.times)) save(list = c("analysis.all", "forecast.all"), file = file.path(settings$outdir, "sda.all.forecast.analysis.Rdata")) + # merge NC files. + if (control$merge_nc) { + nc.folder <- file.path(settings$outdir, "merged_nc") + if (file.exists(nc.folder)) unlink(nc.folder) + dir.create(nc.folder) + temp <- PEcAn.utils::nc_merge_all_sites_by_year(model.outdir = outdir, + nc.outdir = nc.folder, + ens.num = nens, + site.ids = as.numeric(site.ids), + start.date = obs.times[1], + end.date = obs.times[length(obs.times)], + time.step = paste(1, settings$state.data.assimilation$forecast.time.step), + cores = cores) + # remove rundir and outdir. + unlink(rundir, recursive = T) + unlink(outdir, recursive = T) + } gc() } @@ -466,10 +535,23 @@ sda.enkf_local <- function(settings, #' `MCMC.args` include lists for controling the MCMC sampling process (iteration, nchains, burnin, and nthin.). #' @param block.index list of site ids for each block, default is NULL. This is used when the localization turns on. #' Please keep using the default value because the localization feature is still in development. +#' @param debias List: R list containing the covariance directory and the start year. +#' covariance directory should include GeoTIFF files named by year. +#' start year is numeric input which decide when to start the debiasing feature. +#' #' @author Dongchen Zhang #' @return NONE #' @export -qsub_sda <- function(settings, obs.mean, obs.cov, Q, pre_enkf_params, ensemble.samples, outdir, control, block.index = NULL) { +qsub_sda <- function(settings, + obs.mean, + obs.cov, + Q, + pre_enkf_params, + ensemble.samples, + outdir, + control, + block.index = NULL, + debias = list(cov.dir = NULL, start.year = NULL)) { # read from settings. L <- length(settings) # grab info from settings. @@ -549,6 +631,7 @@ qsub_sda <- function(settings, obs.mean, obs.cov, Q, pre_enkf_params, ensemble.s outdir = folder.path, # outdir cores = cores, control = control, + debias = debias, site.ids = block.site.inds) saveRDS(configs, file = file.path(folder.path, "configs.rds")) # create job file. @@ -596,7 +679,8 @@ qsub_sda_batch <- function(folder.path) { configs$pre_enkf_params, configs$ensemble.samples, configs$outdir, - configs$control) + configs$control, + configs$debias) } ##' This function can help to assemble sda outputs (analysis and forecasts) from each job execution. diff --git a/modules/assim.sequential/R/sda_bias_correction.R b/modules/assim.sequential/R/sda_bias_correction.R new file mode 100644 index 00000000000..b9e17874e85 --- /dev/null +++ b/modules/assim.sequential/R/sda_bias_correction.R @@ -0,0 +1,143 @@ +#' @description +#' This function helps to correct the forecasts' biases based on +#' ML (random forest) training on the previous time point. +#' @title sda_bias_correction +#' +#' @param site.locs data.frame: data.frame that contains longitude and latitude in its first and second column. +#' @param t numeric: the current number of time points (e.g., t=1 for the beginning time point). +#' @param pre.X data.frame: data frame of model forecast at the previous time point +#' that has n (ensemble size) rows and n.var (number of variables) times n.site (number of locations) columns. +#' (e.g., 100 ensembles, 4 variables, and 8,000 locations will end up with data.frame of 100 rows and 32,000 columns) +#' @param X data.frame: data frame of model forecast at the current time point. +#' @param obs.mean List: lists of date times named by time points, which contains lists of sites named by site ids, +#' which contains observation means for each state variables of each site for each time point. +#' @param state.interval matrix: containing the upper and lower boundaries for each state variable. +#' @param cov.dir character: physical path to the directory contains the time series covariate maps. +#' @param py.init R function: R function to initialize the python functions. Default is NULL. +#' the default random forest will be used if `py.init` is NULL. +#' @param pre.states list: containing previous covariates for each location. +#' +#' @return list: the current X after the bias-correction; the ML model for each variable; predicted residuals. +#' +#' @author Dongchen Zhang +#' @importFrom dplyr %>% + +sda_bias_correction <- function (site.locs, + t, pre.X, X, + obs.mean, + state.interval, + cov.dir, + pre.states, + py.init = NULL) { + # if we have prescribed python script to use. + if (!is.null(py.init)) { + # load python functions. + py <- py.init() + } + # grab variable names. + var.names <- rownames(state.interval) + # create terra spatial points. + pts <- terra::vect(cbind(site.locs$Lon, site.locs$Lat), crs = "epsg:4326") + # grab the current year. + y <- lubridate::year(names(obs.mean))[t] + # if we don't have previous extracted information. + # grab the covariate file path. + cov.file.pre <- list.files(cov.dir, full.names = T)[which(grepl(y-1, list.files(cov.dir)))] # previous covaraites. + # extract covariates for the previous time point. + cov.pre <- terra::extract(x = terra::rast(cov.file.pre), y = pts)[,-1] # remove the first ID column. + # factorize land cover band. + if ("LC" %in% colnames(cov.pre)) { + cov.pre[,"LC"] <- factor(cov.pre[,"LC"]) + } + # extract covariates for the current time point. + cov.file <- list.files(cov.dir, full.names = T)[which(grepl(y, list.files(cov.dir)))] # current covaraites. + cov.current <- terra::extract(x = terra::rast(cov.file), y = pts)[,-1] # remove the first ID column. + complete.inds <- which(stats::complete.cases(cov.current)) + cov.current <- cov.current[complete.inds,] + # factorize land cover band. + if ("LC" %in% colnames(cov.current)) { + cov.current[,"LC"] <- factor(cov.current[,"LC"]) + } + cov.names <- colnames(cov.current) # grab band names for the covariate map. + # loop over variables. + # initialize model list for each variable. + models <- res.vars <- vector("list", length = length(var.names)) %>% purrr::set_names(var.names) + for (v in var.names) { + message(paste("processing", v)) + # train residuals on the previous time point. + # grab column index for the current variable. + inds <- which(grepl(v, colnames(pre.X))) + # grab observations for the current variable. + obs.v <- obs.mean[[t-1]] %>% purrr::map(function(obs){ + if (is.null(obs[[v]])) { + return(NA) + } else { + return(obs[[v]]) + } + }) %>% unlist + # calculate residuals for the previous time point. + res.pre <- colMeans(pre.X[,inds]) - obs.v + # prepare training data set. + ml.df <- cbind(cov.pre, colMeans(pre.X)[inds], res.pre) + colnames(ml.df)[length(ml.df)-1] <- "raw_dat" # rename the column name. + ml.df <- rbind(pre.states[[v]], ml.df) # grab previous covariates. + ml.df <- ml.df[which(stats::complete.cases(ml.df)),] + pre.states[[v]] <- ml.df # store the historical covariates for future use. + # prepare predicting covariates. + cov.df <- cbind(cov.current, colMeans(X)[inds[complete.inds]]) + colnames(cov.df)[length(cov.df)] <- "raw_dat" + if (nrow(ml.df) == 0) next # jump to the next loop if we have zero records. + if (is.null(py.init)) { + # random forest training. + formula <- stats::as.formula(paste("res.pre", "~", paste(cov.names, collapse = " + "))) + model <- randomForest::randomForest(formula, + data = ml.df, + ntree = 1000, + na.action = stats::na.omit, + keep.forest = TRUE, + importance = TRUE) + var.imp <- randomForest::importance(model) + models[[v]] <- var.imp # store the variable importance. + # predict residuals for the current time point. + res.current <- stats::predict(model, cov.df) + } else { + # using functions from the python script. + # training. + fi_ret <- py$train_full_model( + name = as.character(v), # current variable name. + X = as.matrix(ml.df[,-length(ml.df)]), # covariates + previous forecast means. + y = as.numeric(ml.df[["res.pre"]]), # residuals. + feature_names = colnames(ml.df[,-length(ml.df)]) + ) + # predicting. + res.current <- py$predict_residual(as.character(v), as.matrix(cov.df)) + # store model outputs. + # weights. + w_now <- try(py$get_model_weights(as.character(v)), silent = TRUE) + w_now <- min(max(as.numeric(w_now), 0), 1) + w_named <- c(KNN = w_now, TREE = 1 - w_now) + # var importance. + fi_ret <- tryCatch(reticulate::py_to_r(fi_ret), error = function(e) fi_ret) + fn <- as.character(unlist(fi_ret[["names"]], use.names = FALSE)) + fv <- as.numeric(unlist(fi_ret[["importances"]], use.names = FALSE)) %>% purrr::set_names(fn) + models[[v]] <- list(weights = w_named, var.imp = fv) # store the variable importance. + } + # assign NAs to places with no observations in the previous time point. + res <- rep(NA, length(obs.v)) %>% purrr::set_names(unique(attributes(X)$Site)) + res[complete.inds] <- res.current + res[which(is.na(obs.v))] <- NA + res.vars[[v]] <- res + # correct the current forecasts. + for (i in seq_along(inds)) { + if (is.na(res[i])) next + X[,inds[i]] <- X[,inds[i]] - res[i] + } + } + # map forecasts towards the prescribed variable boundaries. + for(i in 1:ncol(X)){ + int.save <- state.interval[which(startsWith(colnames(X)[i], var.names)),] + X[X[,i] < int.save[1],i] <- int.save[1] + X[X[,i] > int.save[2],i] <- int.save[2] + } + return(list(X = X, models = models, res = res.vars, pre.states = pre.states)) +} \ No newline at end of file diff --git a/modules/assim.sequential/inst/anchor/SDA_NA_runner.R b/modules/assim.sequential/inst/anchor/SDA_NA_runner.R index 0760c8ca0aa..0ffd1648d8d 100644 --- a/modules/assim.sequential/inst/anchor/SDA_NA_runner.R +++ b/modules/assim.sequential/inst/anchor/SDA_NA_runner.R @@ -29,8 +29,11 @@ setwd("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/") settings_dir <- "/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/pecan.xml" settings <- PEcAn.settings::read.settings(settings_dir) +# update settings with the actual PFTs. +settings <- PEcAn.settings::prepare.settings(settings) + # setup the batch job settings. -general.job <- list(cores = 28, folder.num = 80) +general.job <- list(cores = 28, folder.num = 35) batch.settings = structure(list( general.job = general.job, qsub.cmd = "qsub -l h_rt=24:00:00 -l mem_per_core=4G -l buyin -pe omp @CORES@ -V -N @NAME@ -o @STDOUT@ -e @STDERR@ -S /bin/bash" @@ -38,14 +41,11 @@ batch.settings = structure(list( settings$state.data.assimilation$batch.settings <- batch.settings # alter the ensemble size. -settings$ensemble$size <- 10 - -# update settings with the actual PFTs. -settings <- PEcAn.settings::prepare.settings(settings) +settings$ensemble$size <- 100 # load observations. -load("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/observation/Rdata/obs_mean.Rdata") -load("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/observation/Rdata/obs_cov.Rdata") +load("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/observation/Rdata/obs.mean.Rdata") +load("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/observation/Rdata/obs.cov.Rdata") # replace zero observations and variances with small numbers. for (i in 1:length(obs.mean)) { @@ -83,4 +83,11 @@ PEcAnAssimSequential::qsub_sda(settings = settings, send_email = NULL, keepNC = FALSE, forceRun = TRUE, - MCMC.args = NULL)) \ No newline at end of file + MCMC.args = NULL, + merge_nc = TRUE), + block.index = NULL, + debias = list(cov.dir = "/projectnb/dietzelab/dongchen/anchorSites/NA_runs/covariates_lc_ts/covariates_nolatlon/", start.year = 2014)) + +# export sda output. +PEcAnAssimSequential::sda_assemble("/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site/batch", + "/projectnb/dietzelab/dongchen/anchorSites/NA_runs/SDA_8k_site") \ No newline at end of file diff --git a/modules/assim.sequential/R/debias_step.R b/modules/assim.sequential/inst/debias_step.R similarity index 100% rename from modules/assim.sequential/R/debias_step.R rename to modules/assim.sequential/inst/debias_step.R diff --git a/modules/assim.sequential/man/debias_cov_by_columns.Rd b/modules/assim.sequential/man/debias_cov_by_columns.Rd deleted file mode 100644 index c62a88cd09f..00000000000 --- a/modules/assim.sequential/man/debias_cov_by_columns.Rd +++ /dev/null @@ -1,42 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/debias_step.R -\name{debias_cov_by_columns} -\alias{debias_cov_by_columns} -\title{Expand per-site covariates to row-per-column alignment} -\usage{ -debias_cov_by_columns( - covariates_df, - obs_date, - site_index, - obs.mean, - t_idx, - drop_incomplete_covariates = TRUE, - enforce_consistent_obs = TRUE -) -} -\arguments{ -\item{covariates_df}{See `debias_get_covariates_for_date()`.} - -\item{obs_date}{Date for which to fetch covariates (year is used).} - -\item{site_index}{Site id per column of `X`.} - -\item{obs.mean}{Observation structure used if enforcing consistency.} - -\item{t_idx}{Time index associated with `obs_date`.} - -\item{drop_incomplete_covariates, }{enforce_consistent_obs See above.} -} -\value{ -A numeric matrix with **nrow = length(site_index)** and one column per - covariate feature. Rows align 1:1 with the columns of `X`. -} -\description{ -Expand per-site covariates to **row-per-column** alignment -} -\details{ -Converts the per-site covariate data into a matrix aligned with the **columns of `X`**. -For any column whose site was filtered out, the function inserts a row of `NA` -features to preserve shape and ordering. -} -\keyword{internal} diff --git a/modules/assim.sequential/man/debias_get_covariates_for_date.Rd b/modules/assim.sequential/man/debias_get_covariates_for_date.Rd deleted file mode 100644 index 5cf7078c3be..00000000000 --- a/modules/assim.sequential/man/debias_get_covariates_for_date.Rd +++ /dev/null @@ -1,52 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/debias_step.R -\name{debias_get_covariates_for_date} -\alias{debias_get_covariates_for_date} -\title{Covariates for a date, with optional site filtering} -\usage{ -debias_get_covariates_for_date( - covariates_df, - obs_date, - site_index, - obs.mean, - t_idx, - drop_incomplete_covariates = TRUE, - enforce_consistent_obs = TRUE -) -} -\arguments{ -\item{covariates_df}{Long data frame with columns `site`, `year`, and feature columns.} - -\item{obs_date}{Date–time corresponding to the **previous or current** SDA step. -Only the **year** is used.} - -\item{site_index}{Character or numeric vector giving the **site id per column of X**.} - -\item{obs.mean}{Observation structure (see `debias_sites_inconsistent_obs()`).} - -\item{t_idx}{Integer time index used to evaluate consistency up to t or t–1.} - -\item{drop_incomplete_covariates}{Logical. If `TRUE`, drop sites with any missing -covariate this year; if `FALSE`, keep all sites present in `covariates_df` for that year.} - -\item{enforce_consistent_obs}{Logical. If `TRUE`, drop sites that became inconsistent -in observations up to `t_idx`. Requires `obs.mean` and `t_idx`.} -} -\value{ -A data frame with columns `site`, `year`, and covariate features for **eligible sites**. - Attributes: - - `dropped_missing_covariates`: sites removed for missing covariates, - - `dropped_inconsistent_obs`: sites removed for observation inconsistency (if enforced). -} -\description{ -Covariates for a date, with optional site filtering -} -\details{ -Filters sites for the year of `obs_date` according to: -- `drop_incomplete_covariates`: remove sites with any NA in covariate features. -- `enforce_consistent_obs`: remove sites that became inconsistent up to `t_idx`. - -Returns a data frame for that **year × eligible sites**, sorted by `site`, with -attributes listing which sites were dropped (useful for logging). -} -\keyword{internal} diff --git a/modules/assim.sequential/man/debias_helpers.Rd b/modules/assim.sequential/man/debias_helpers.Rd deleted file mode 100644 index e28b5686ae4..00000000000 --- a/modules/assim.sequential/man/debias_helpers.Rd +++ /dev/null @@ -1,103 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/debias_step.R -\docType{data} -\name{debias_name_map} -\alias{debias_name_map} -\alias{debias_sites_with_complete_covariates_year} -\alias{debias_sites_inconsistent_obs} -\alias{debias_obs_vec_for_time} -\alias{debias_build_comp_df} -\alias{debias_rmse_by_var} -\title{Map observation names -> state names} -\format{ -An object of class \code{character} of length 4. -} -\usage{ -debias_name_map - -debias_sites_with_complete_covariates_year( - covariates_df, - year, - candidate_sites -) - -debias_sites_inconsistent_obs(obs.mean, t_idx, name_map = debias_name_map) - -debias_obs_vec_for_time( - t_idx, - site_index, - col_vars, - obs.mean, - name_map = debias_name_map -) - -debias_build_comp_df(site_index, col_vars, pre_mean, post_mean, obs_vec) - -debias_rmse_by_var(comp_df) -} -\arguments{ -\item{covariates_df}{A long data frame with columns `site`, `year`, and one column -per covariate feature (e.g., climate, soils, topography layers).} - -\item{year}{Integer year to check (extracted via `lubridate::year()` elsewhere).} - -\item{candidate_sites}{Character vector of site ids to intersect with.} - -\item{obs.mean}{Observation structure (list by time → list by site → named numeric).} - -\item{t_idx}{Time index (1-based) to pull observations from `obs.mean`.} - -\item{name_map}{Optional map from observation names → state names.} - -\item{site_index}{Site id per column of `X`.} - -\item{col_vars}{Variable name per column of `X`.} - -\item{pre_mean}{Vector of pre-debias column means at time `t`.} - -\item{post_mean}{Vector of post-debias column means at time `t`.} - -\item{obs_vec}{Observation vector for time `t` (aligned).} - -\item{comp_df}{Output of `debias_build_comp_df()`.} -} -\value{ -Character vector of sites that have no missing covariates in `year`. - -Character vector of **site ids** that are inconsistent at `t_idx`. - -Numeric vector of length `length(col_vars)` with `NA` where not observed. - -Data frame with columns: `site`, `var`, `pre`, `post`, `obs` - sorted by (`var`, `site`). - -Data frame with columns: `var`, `rmse_pre`, `rmse_post`. -} -\description{ -This mapping is applied whenever observations are merged into the state layout. -If your upstream naming changes, **edit here** to keep the rest of the code stable. - -Returns the subset of `candidate_sites` whose covariate row at `year` has **no NA** -in any covariate feature column. - -A site is flagged **inconsistent at `t_idx`** if it is missing *any* variable at time -`t_idx` that it had reported in **any earlier time** (1..`t_idx-1`). This prevents -training/evaluation on sites that drop variables mid-series. - -Builds a vector with one entry per **column of `X`**, using the `(site, var)` layout -defined by `site_index` and `col_vars`. Observation names in `obs.mean` are -remapped through `name_map`. - -Computes RMSE for each state variable, separately for the pre- and post-debias -column means against observations. NAs are ignored per variable. -} -\details{ -This function only inspects **presence/absence** of variables, not their values. -} -\examples{ -\dontrun{ -ok <- debias_sites_with_complete_covariates_year(cov_df, 2012, sites) -} - -} -\keyword{internal} diff --git a/modules/assim.sequential/man/debias_weights_rows.Rd b/modules/assim.sequential/man/debias_weights_rows.Rd deleted file mode 100644 index 511fa1dd0ae..00000000000 --- a/modules/assim.sequential/man/debias_weights_rows.Rd +++ /dev/null @@ -1,13 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/debias_step.R -\name{debias_weights_rows} -\alias{debias_weights_rows} -\title{Build tidy rows for learner weights (internal)} -\usage{ -debias_weights_rows(time_label, var, w_named) -} -\description{ -Returns a data.frame with columns: time, var, learner, weight. -If `w_named` has no names, they are auto-labeled as learner_1..k. -} -\keyword{internal} diff --git a/modules/assim.sequential/man/qsub_sda.Rd b/modules/assim.sequential/man/qsub_sda.Rd index a0222e68a68..dec1d7679d8 100644 --- a/modules/assim.sequential/man/qsub_sda.Rd +++ b/modules/assim.sequential/man/qsub_sda.Rd @@ -13,7 +13,8 @@ qsub_sda( ensemble.samples, outdir, control, - block.index = NULL + block.index = NULL, + debias = list(cov.dir = NULL, start.year = NULL) ) } \arguments{ @@ -42,6 +43,10 @@ The default is NULL, where we will be using outdir from the settings object.} \item{block.index}{list of site ids for each block, default is NULL. This is used when the localization turns on. Please keep using the default value because the localization feature is still in development.} + +\item{debias}{List: R list containing the covariance directory and the start year. +covariance directory should include GeoTIFF files named by year. +start year is numeric input which decide when to start the debiasing feature.} } \value{ NONE diff --git a/modules/assim.sequential/man/sda.enkf.multisite.Rd b/modules/assim.sequential/man/sda.enkf.multisite.Rd index 008f1a81f83..e24a8b816cd 100644 --- a/modules/assim.sequential/man/sda.enkf.multisite.Rd +++ b/modules/assim.sequential/man/sda.enkf.multisite.Rd @@ -12,14 +12,10 @@ sda.enkf.multisite( restart = NULL, pre_enkf_params = NULL, ensemble.samples = NULL, - control = list(trace = TRUE, TimeseriesPlot = FALSE, debug = FALSE, pause = FALSE, - Profiling = FALSE, OutlierDetection = FALSE, parallel_qsub = TRUE, send_email = NULL, - keepNC = TRUE, forceRun = TRUE, run_parallel = TRUE, MCMC.args = NULL), - cov_dir = NULL, - debias_start_year = NULL, - debias_drop_incomplete_covariates = TRUE, - debias_enforce_consistent_obs = TRUE, - debias_require_obs_at_t_for_predict = FALSE, + control = list(TimeseriesPlot = FALSE, OutlierDetection = FALSE, send_email = NULL, + keepNC = TRUE, forceRun = TRUE, run_parallel = TRUE, MCMC.args = NULL, merge_nc = + TRUE, execution = "local"), + debias = list(cov.dir = NULL, start.year = NULL), ... ) } @@ -36,31 +32,26 @@ sda.enkf.multisite( \item{pre_enkf_params}{Used for passing pre-existing time-series of process error into the current SDA runs to ignore the impact by the differences between process errors.} -\item{ensemble.samples}{Pass ensemble.samples from outside to avoid GitHub check issues.} +\item{ensemble.samples}{list of ensemble parameters across PFTs. Default is NULL.} \item{control}{List of flags controlling the behavior of the SDA. -`trace` for reporting back the SDA outcomes; `TimeseriesPlot` for post analysis examination; -`debug` decide if we want to pause the code and examining the variables inside the function; -`pause` decide if we want to pause the SDA workflow at current time point t; -`Profiling` decide if we want to export the temporal SDA outputs in CSV file; `OutlierDetection` decide if we want to execute the outlier detection each time after the model forecasting; -`parallel_qsub` decide if we want to execute the `qsub` job submission under parallel mode; `send_email` contains lists for sending email to report the SDA progress; `keepNC` decide if we want to keep the NetCDF files inside the out directory; `forceRun` decide if we want to proceed the Bayesian MCMC sampling without observations; `run_parallel` decide if we want to run the SDA under parallel mode for the `future_map` function; -`MCMC.args` include lists for controling the MCMC sampling process (iteration, nchains, burnin, and nthin.).} +`MCMC.args` include lists for controling the MCMC sampling process (iteration, nchains, burnin, and nthin.). +`merge_nc` determine if we want to merge all netCDF files across sites and ensembles. +If it's set as `TRUE`, we will then combine all netCDF files into the `merged_nc` folder within the `outdir`. +`execution` decide the way we want to execute model +including `local` ,where we execute the model locally; +`qsub`, where we use the traditional `start_model_runs` function for submission; +`qsub_parallel`, where we first combine jobs and submit them into the SCC.} -\item{cov_dir}{Directory containing yearly covariate stacks named like "covariates_YYYY.tiff".} - -\item{debias_start_year}{Integer year (e.g., 2015). If `NULL`, debiasing is OFF.} - -\item{debias_drop_incomplete_covariates}{Logical; drop sites with any NA covariates} - -\item{debias_enforce_consistent_obs}{Logical; drop sites that lost any previously} - -\item{debias_require_obs_at_t_for_predict}{Logical; only make residual predictions} +\item{debias}{List: R list containing the covariance directory and the start year. +covariance directory should include GeoTIFF files named by year. +start year is numeric input which decide when to start the debiasing feature.} \item{...}{Additional arguments, currently ignored} } diff --git a/modules/assim.sequential/man/sda.enkf_local.Rd b/modules/assim.sequential/man/sda.enkf_local.Rd index 78b108d1d07..f804c7d1382 100644 --- a/modules/assim.sequential/man/sda.enkf_local.Rd +++ b/modules/assim.sequential/man/sda.enkf_local.Rd @@ -13,7 +13,8 @@ sda.enkf_local( ensemble.samples = NULL, outdir = NULL, control = list(TimeseriesPlot = FALSE, OutlierDetection = FALSE, send_email = NULL, - keepNC = TRUE, forceRun = TRUE, MCMC.args = NULL) + keepNC = TRUE, forceRun = TRUE, MCMC.args = NULL, merge_nc = TRUE), + debias = list(cov.dir = NULL, start.year = NULL) ) } \arguments{ @@ -27,7 +28,7 @@ sda.enkf_local( \item{pre_enkf_params}{Used for passing pre-existing time-series of process error into the current SDA runs to ignore the impact by the differences between process errors.} -\item{ensemble.samples}{Pass ensemble.samples from outside to avoid GitHub check issues.} +\item{ensemble.samples}{list of ensemble parameters across PFTs. Default is NULL.} \item{outdir}{physical path to the folder that stores the SDA outputs. Default is NULL.} @@ -37,7 +38,13 @@ sda.enkf_local( `send_email` contains lists for sending email to report the SDA progress; `keepNC` decide if we want to keep the NetCDF files inside the out directory; `forceRun` decide if we want to proceed the Bayesian MCMC sampling without observations; -`MCMC.args` include lists for controling the MCMC sampling process (iteration, nchains, burnin, and nthin.).} +`MCMC.args` include lists for controling the MCMC sampling process (iteration, nchains, burnin, and nthin.). +`merge_nc` determine if we want to merge all netCDF files across sites and ensembles. +If it's set as `TRUE`, we will then combine all netCDF files into the `merged_nc` folder within the `outdir`.} + +\item{debias}{List: R list containing the covariance directory and the start year. +covariance directory should include GeoTIFF files named by year. +start year is numeric input which decide when to start the debiasing feature.} } \value{ NONE diff --git a/modules/assim.sequential/man/sda_apply_debias_step.Rd b/modules/assim.sequential/man/sda_apply_debias_step.Rd deleted file mode 100644 index fe105608563..00000000000 --- a/modules/assim.sequential/man/sda_apply_debias_step.Rd +++ /dev/null @@ -1,105 +0,0 @@ -% Generated by roxygen2: do not edit by hand -% Please edit documentation in R/debias_step.R -\name{sda_apply_debias_step} -\alias{sda_apply_debias_step} -\title{Apply residual debiasing for a single SDA time step} -\usage{ -sda_apply_debias_step( - t, - obs.t, - X, - raw_prev, - raw_mean_t, - site_index, - col_vars, - obs.times, - obs.mean, - covariates_df, - py, - train_buf, - name_map = debias_name_map, - drop_incomplete_covariates = TRUE, - enforce_consistent_obs = TRUE, - require_obs_at_t_for_predict = FALSE, - state.interval = state.interval, - clip_lower_bound = 0.01 -) -} -\arguments{ -\item{t}{Integer current time index (t > 1 required to train from t–1).} - -\item{obs.t}{Character or time label for logging (e.g., ISO date string for `t`).} - -\item{X}{Numeric matrix of the **current** ensemble at time `t` (members × columns).} - -\item{raw_prev}{Numeric vector of the **raw** column mean at `t-1`.} - -\item{raw_mean_t}{Numeric vector of the **raw** column mean at `t`.} - -\item{site_index}{Vector of site ids per column of `X`.} - -\item{col_vars}{Vector of variable names per column of `X` (state-space names).} - -\item{obs.times}{Datetime vector indexed by `t` (length ≥ `t`) for covariate year lookup.} - -\item{obs.mean}{Observation structure (time → site → named numeric vector).} - -\item{covariates_df}{Long data frame with columns `site`, `year`, and feature columns.} - -\item{py}{Python bridge object exposing: -- `train_full_model(name, X, y)`, -- `predict_residual(name, X)`, -- `get_model_weights(name)`, -- `has_model(name)` (logical).} - -\item{train_buf}{R environment used to accumulate training rows per variable across steps.} - -\item{name_map}{Optional map from observation names → state names.} - -\item{drop_incomplete_covariates}{Logical; if `TRUE`, drop sites with missing covariates (per year).} - -\item{enforce_consistent_obs}{Logical; if `TRUE`, drop sites inconsistent up to relevant `t_idx`.} - -\item{require_obs_at_t_for_predict}{Logical; if `TRUE`, only predict residuals for columns with} - -\item{state.interval}{Matrix/data.frame of per-variable bounds; either rownames = variable with columns `min`,`max`, -or a data frame with a `variable` column plus `min`/`max`. Used to clip post-debias values.} - -\item{clip_lower_bound}{Numeric; minimum floor applied to lower bounds (default 0.01). -**observations present at t** (useful for constrained comparisons).} -} -\value{ -A list with: -\describe{ - \item{X}{The **mean-shifted** ensemble matrix at time `t` (same dims as input `X`).} - \item{weights_entry}{Optional named list of learner weights by variable (if provided by `py`).} - \item{weights_df_rows}{A tidy data frame of weights emitted this step (time, var, learner, weight).} - \item{diag}{A list with: - \itemize{ - \item `comp`: per-column comparison table (`pre`, `post`, `obs`) for diagnostics. - \item `rmse`: per-variable metrics (RMSE/MAE/bias/R²) **pre vs post**. - } - } - \item{rmse_rows}{A tidy slice of metrics with the `time` column attached for easy logging.} -} -} -\description{ -Apply residual debiasing for a single SDA time step -} -\details{ -At time `t`, this function: -1. Builds a training set from **t–1**: residuals `y = obs_prev - raw_prev` and features - `[covariates_prev, raw_prev]` for each variable. -2. Trains/updates the Python-side learner (`py$train_full_model`). -3. Predicts residuals at **t** using `[covariates_t, raw_mean_t]`. -4. Mean-shifts the ensemble `X` by adding predicted residuals to `raw_mean_t`. -5. Computes per-variable metrics (RMSE, MAE, bias, R²) pre vs post. -} -\note{ -- If covariates are missing (no feature columns) for either `t-1` or `t`, the function - returns `X` unchanged and emits NA metrics (shape-preserving behavior). -- Predicted residuals that are non-finite are coerced to 0 to avoid contaminating `X`. -- The **mean-shift** keeps the ensemble spread intact: we subtract the raw mean and - add the corrected mean (`raw_mean_t + predicted_residual`). -} -\keyword{internal} diff --git a/modules/assim.sequential/man/sda_bias_correction.Rd b/modules/assim.sequential/man/sda_bias_correction.Rd new file mode 100644 index 00000000000..1d201c2873f --- /dev/null +++ b/modules/assim.sequential/man/sda_bias_correction.Rd @@ -0,0 +1,51 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/sda_bias_correction.R +\name{sda_bias_correction} +\alias{sda_bias_correction} +\title{sda_bias_correction} +\usage{ +sda_bias_correction( + site.locs, + t, + pre.X, + X, + obs.mean, + state.interval, + cov.dir, + pre.states, + py.init = NULL +) +} +\arguments{ +\item{site.locs}{data.frame: data.frame that contains longitude and latitude in its first and second column.} + +\item{t}{numeric: the current number of time points (e.g., t=1 for the beginning time point).} + +\item{pre.X}{data.frame: data frame of model forecast at the previous time point +that has n (ensemble size) rows and n.var (number of variables) times n.site (number of locations) columns. +(e.g., 100 ensembles, 4 variables, and 8,000 locations will end up with data.frame of 100 rows and 32,000 columns)} + +\item{X}{data.frame: data frame of model forecast at the current time point.} + +\item{obs.mean}{List: lists of date times named by time points, which contains lists of sites named by site ids, +which contains observation means for each state variables of each site for each time point.} + +\item{state.interval}{matrix: containing the upper and lower boundaries for each state variable.} + +\item{cov.dir}{character: physical path to the directory contains the time series covariate maps.} + +\item{pre.states}{list: containing previous covariates for each location.} + +\item{py.init}{R function: R function to initialize the python functions. Default is NULL. +the default random forest will be used if `py.init` is NULL.} +} +\value{ +list: the current X after the bias-correction; the ML model for each variable; predicted residuals. +} +\description{ +This function helps to correct the forecasts' biases based on +ML (random forest) training on the previous time point. +} +\author{ +Dongchen Zhang +} diff --git a/modules/data.remote/R/SMAP_SMP_prep.R b/modules/data.remote/R/SMAP_SMP_prep.R index 4ab2b3f0098..0c184a489b0 100644 --- a/modules/data.remote/R/SMAP_SMP_prep.R +++ b/modules/data.remote/R/SMAP_SMP_prep.R @@ -72,7 +72,7 @@ SMAP_SMP_prep <- function(site_info, start_date, end_date, time_points, time_points <- time_points[which(lubridate::year(time_points)>=2015)] #filter out any time points that are before 2015 #initialize SMAP_Output SMAP_Output <- matrix(NA, length(site_info$site_id), 2*length(time_points)+1) %>% - `colnames<-`(c("site_id", paste0(time_points, "_SoilMoist"), paste0(time_points, "_SD"))) %>% as.data.frame()#we need: site_id, LAI, std, target time point. + `colnames<-`(c("site_id", paste0(time_points, "_SoilMoistFrac"), paste0(time_points, "_SD"))) %>% as.data.frame()#we need: site_id, LAI, std, target time point. SMAP_Output$site_id <- site_info$site_id #Calculate SMAP for each time step and site. #loop over time and site @@ -88,15 +88,15 @@ SMAP_SMP_prep <- function(site_info, start_date, end_date, time_points, out.t <- rbind(out.t, list(mean = NA, sd = NA)) } } - out.t %>% purrr::set_names(c(paste0(t, "_SoilMoist"), paste0(t, "_SD"))) + out.t %>% purrr::set_names(c(paste0(t, "_SoilMoistFrac"), paste0(t, "_SD"))) }, .progress = T) for (i in seq_along(time_points)) { t <- time_points[i]#otherwise the t will be number instead of date. - SMAP_Output[, paste0(t, "_SoilMoist")] <- SMAP.list[[i]][,paste0(t, "_SoilMoist")] + SMAP_Output[, paste0(t, "_SoilMoistFrac")] <- SMAP.list[[i]][,paste0(t, "_SoilMoistFrac")] SMAP_Output[, paste0(t, "_SD")] <- SMAP.list[[i]][,paste0(t, "_SD")] } PEcAn.logger::logger.info("SMAP SMP Prep Completed!") - list(SMP_Output = SMAP_Output, time_points = time_points, var = "SoilMoist") + list(SMP_Output = SMAP_Output, time_points = time_points, var = "SoilMoistFrac") } #' Prepare SMAP soil moisture profile (SMP) data from the NASA DAAC server for the SDA workflow. diff --git a/modules/uncertainty/R/ensemble.R b/modules/uncertainty/R/ensemble.R index d44add22cf9..ca576aedcb4 100644 --- a/modules/uncertainty/R/ensemble.R +++ b/modules/uncertainty/R/ensemble.R @@ -211,7 +211,6 @@ get.ensemble.samples <- function( ensemble.size, pft.samples, env.samples, ##' @param clean remove old output first? ##' @param write.to.db logical: Record this run in BETY? ##' @param restart In case this is a continuation of an old simulation. restart needs to be a list with name tags of runid, inputs, new.params (parameters), new.state (initial condition), ensemble.id (ensemble id), start.time and stop.time.See Details. -##' @param samples Sampled inputs such as met and parameter files ##' @param rename Decide if we want to rename previous output files, for example convert from sipnet.out to sipnet.2020-07-16.out. ##' ##' @return list, containing $runs = data frame of runids, $ensemble.id = the ensemble ID for these runs and $samples with ids and samples used for each tag. Also writes sensitivity analysis configuration files as a side effect @@ -226,7 +225,7 @@ get.ensemble.samples <- function( ensemble.size, pft.samples, env.samples, ##' @export ##' @author David LeBauer, Carl Davidson, Hamze Dokoohaki write.ensemble.configs <- function(input_design , ensemble.size, defaults, ensemble.samples, settings, model, - clean = FALSE, write.to.db = TRUE, restart = NULL, samples = NULL, rename = FALSE) { + clean = FALSE, write.to.db = TRUE, restart = NULL, rename = FALSE) { # Check if there are NO inputs @@ -320,22 +319,18 @@ for (input_tag in names(settings$run$inputs)) { } #now looking into the xml samp <- settings$ensemble$samplingspace - if(is.null(samples)){ - #performing the sampling - samples <- list() - input_tags <- names(settings$run$inputs) - - for (input_tag in input_tags) { - if (input_tag %in% colnames(input_design)) { - input_paths <- settings$run$inputs[[input_tag]]$path - input_indices <- input_design[[input_tag]] - - samples[[input_tag]] <- list( - samples = lapply(input_indices, function(idx) input_paths[[idx]]) - ) - } - - } + #performing the sampling + samples <- list() + input_tags <- names(settings$run$inputs) + for (input_tag in input_tags) { + if (input_tag %in% colnames(input_design)) { + input_paths <- settings$run$inputs[[input_tag]]$path + input_indices <- input_design[[input_tag]] + + samples[[input_tag]] <- list( + samples = lapply(input_indices, function(idx) input_paths[[idx]]) + ) + } } # if there is a tag required by the model but it is not specified in the xml then I replicate n times the first element required_tags%>% @@ -515,6 +510,8 @@ for (input_tag in names(settings$run$inputs)) { for (i in seq_len(ensemble.size)) { input_list <- list() for (input_tag in names(inputs)) { + # if it's the parameter list, skip. + if (input_tag == "parameters") next if (!is.null(inputs[[input_tag]]$samples[[i]])) input_list[[input_tag]] <- list(path = inputs[[input_tag]]$samples[[i]]) } diff --git a/modules/uncertainty/R/generate_joint_ensemble_design.R b/modules/uncertainty/R/generate_joint_ensemble_design.R index 0d66ce24794..2c50de1ed44 100644 --- a/modules/uncertainty/R/generate_joint_ensemble_design.R +++ b/modules/uncertainty/R/generate_joint_ensemble_design.R @@ -4,19 +4,25 @@ #' are shared across sites to ensure consistent parameter sampling. #' #' @param settings A PEcAn settings object containing ensemble configuration -#' @param sobol for activating sobol #' @param ensemble_size Integer specifying the number of ensemble members +#' @param ensemble_samples list of ensemble parameters across PFTs. The default is NULL. +#' Since the `input_design` will only be generated once for the entire model run, +#' the only situation, where we might want to recycle the existing `ensemble_samples`, +#' is when we split and submit the larger SDA runs (e.g., 8,000 sites) into +#' smaller SDA experiments (e.g., 100 sites per job), where we want to keep using +#' the same parameters rather than creating new parameters for each job. +#' @param sobol for activating sobol #' @return A list containing ensemble samples and indices #' #' @export generate_joint_ensemble_design <- function(settings, ensemble_size, + ensemble_samples = NULL, sobol = FALSE) { if (sobol) { ensemble_size <- as.numeric(ensemble_size) * 2 } - ens.sample.method <- settings$ensemble$samplingspace$parameters$method design_list <- list() sampled_inputs <- list() @@ -51,29 +57,34 @@ generate_joint_ensemble_design <- function(settings, sampled_inputs[[input_tag]] <- input_result$ids design_list[[input_tag]] <- input_result$ids } - - # Sample parameters - PEcAn.uncertainty::get.parameter.samples( - settings, - ensemble.size = ensemble_size, - posterior.files, - ens.sample.method - ) - + + # Sample parameters if we don't have it. + if (is.null(ensemble_samples)) { + PEcAn.uncertainty::get.parameter.samples( + settings, + ensemble.size = ensemble_size, + posterior.files, + ens.sample.method + ) + samples.file <- file.path(settings$outdir, "samples.Rdata") + } + # Load samples from file - samples.file <- file.path(settings$outdir, "samples.Rdata") samples <- new.env() - if (file.exists(samples.file)) { - load(samples.file, envir = samples) - if (!is.null(samples$ensemble.samples)) { - # Just a placeholder: extract representative trait index per ensemble member - # You may want to flatten or select indices per trait - design_list[["param"]] <- seq_len(ensemble_size) + # if we don't have the parameters from the outside. + if (is.null(ensemble_samples)) { + if (file.exists(samples.file)) { + load(samples.file, envir = samples) } else { - PEcAn.logger::logger.warn("ensemble.samples not found in samples.Rdata") + PEcAn.logger::logger.error(samples.file, "not found, this file is required") } + } + if (!is.null(samples$ensemble.samples) | !is.null(ensemble_samples)) { + # Just a placeholder: extract representative trait index per ensemble member + # You may want to flatten or select indices per trait + design_list[["param"]] <- seq_len(ensemble_size) } else { - PEcAn.logger::logger.error(samples.file, "not found, this file is required") + PEcAn.logger::logger.warn("ensemble.samples not found in samples.Rdata") } design_matrix <- data.frame(design_list) @@ -85,7 +96,5 @@ generate_joint_ensemble_design <- function(settings, return(sobol_obj) } - - return(list(X = design_matrix)) -} +} \ No newline at end of file diff --git a/modules/uncertainty/man/generate_joint_ensemble_design.Rd b/modules/uncertainty/man/generate_joint_ensemble_design.Rd index 2e53c7db6a5..aee0d59a109 100644 --- a/modules/uncertainty/man/generate_joint_ensemble_design.Rd +++ b/modules/uncertainty/man/generate_joint_ensemble_design.Rd @@ -7,13 +7,25 @@ Creates a joint ensemble design that maintains parameter correlations across all sites in a multi-site run. This function generates sample indices that are shared across sites to ensure consistent parameter sampling.} \usage{ -generate_joint_ensemble_design(settings, ensemble_size, sobol = FALSE) +generate_joint_ensemble_design( + settings, + ensemble_size, + ensemble_samples = NULL, + sobol = FALSE +) } \arguments{ \item{settings}{A PEcAn settings object containing ensemble configuration} \item{ensemble_size}{Integer specifying the number of ensemble members} +\item{ensemble_samples}{list of ensemble parameters across PFTs. The default is NULL. +Since the `input_design` will only be generated once for the entire model run, +the only situation, where we might want to recycle the existing `ensemble_samples`, +is when we split and submit the larger SDA runs (e.g., 8,000 sites) into +smaller SDA experiments (e.g., 100 sites per job), where we want to keep using +the same parameters rather than creating new parameters for each job.} + \item{sobol}{for activating sobol} } \value{ diff --git a/modules/uncertainty/man/write.ensemble.configs.Rd b/modules/uncertainty/man/write.ensemble.configs.Rd index 86abcc87026..b746d57b01c 100644 --- a/modules/uncertainty/man/write.ensemble.configs.Rd +++ b/modules/uncertainty/man/write.ensemble.configs.Rd @@ -14,7 +14,6 @@ write.ensemble.configs( clean = FALSE, write.to.db = TRUE, restart = NULL, - samples = NULL, rename = FALSE ) } @@ -37,8 +36,6 @@ write.ensemble.configs( \item{restart}{In case this is a continuation of an old simulation. restart needs to be a list with name tags of runid, inputs, new.params (parameters), new.state (initial condition), ensemble.id (ensemble id), start.time and stop.time.See Details.} -\item{samples}{Sampled inputs such as met and parameter files} - \item{rename}{Decide if we want to rename previous output files, for example convert from sipnet.out to sipnet.2020-07-16.out.} } \value{