Skip to content

Commit

Permalink
remove parallel support for now as it is causing issues with valgrind
Browse files Browse the repository at this point in the history
  • Loading branch information
Nicholas Clark committed Nov 15, 2024
1 parent 81908f0 commit f7931f4
Show file tree
Hide file tree
Showing 6 changed files with 27 additions and 94 deletions.
1 change: 0 additions & 1 deletion DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ Imports:
rstantools (>= 2.1.1),
bayesplot (>= 1.5.0),
ggplot2 (>= 2.0.0),
parallel,
mvnfast,
purrr,
dplyr,
Expand Down
1 change: 0 additions & 1 deletion NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -260,7 +260,6 @@ importFrom(mgcv,smoothCon)
importFrom(mgcv,t2)
importFrom(mgcv,te)
importFrom(mgcv,ti)
importFrom(parallel,clusterEvalQ)
importFrom(parallel,clusterExport)
importFrom(parallel,setDefaultCluster)
importFrom(parallel,stopCluster)
Expand Down
50 changes: 16 additions & 34 deletions R/evaluate_mvgams.R
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@
#'
#'@importFrom graphics barplot boxplot axis
#'@importFrom stats quantile ecdf median predict
#'@importFrom parallel clusterExport stopCluster setDefaultCluster clusterEvalQ
#'@importFrom grDevices devAskNewPage
#'@importFrom utils lsf.str
#'@param object \code{list} object returned from \code{mvgam}
Expand All @@ -11,7 +10,7 @@
#'@param eval_timepoint \code{integer} indexing the timepoint that represents our last 'observed'
#'set of outcome data
#'@param fc_horizon \code{integer} specifying the length of the forecast horizon for evaluating forecasts
#'@param n_cores \code{integer} specifying number of cores for generating particle forecasts in parallel
#'@param n_cores Deprecated. Parallel processing is no longer supported
#'@param score \code{character} specifying the type of ranked probability score to use for evaluation. Options are:
#'`variogram`, `drps` or `crps`
#'@param log \code{logical}. Should the forecasts and truths be logged prior to scoring?
Expand Down Expand Up @@ -136,7 +135,7 @@ eval_mvgam = function(object,
n_samples = 5000,
eval_timepoint = 3,
fc_horizon = 3,
n_cores = 2,
n_cores = 1,
score = 'drps',
log = FALSE,
weights){
Expand All @@ -154,6 +153,9 @@ eval_mvgam = function(object,
validate_pos_integer(fc_horizon)
validate_pos_integer(eval_timepoint)
validate_pos_integer(n_cores)
if(n_cores > 1L){
message('argument "n_cores" is deprecated')
}
validate_pos_integer(n_samples)

if(eval_timepoint < 3){
Expand Down Expand Up @@ -312,15 +314,15 @@ eval_mvgam = function(object,
#'@param n_evaluations \code{integer} specifying the total number of evaluations to perform
#'(ignored if \code{evaluation_seq} is supplied)
#'@param fc_horizon \code{integer} specifying the length of the forecast horizon for evaluating forecasts
#'@param n_cores \code{integer} specifying number of cores for generating particle forecasts in parallel
#'@param n_cores Deprecated. Parallel processing is no longer supported
#'@rdname evaluate_mvgams
#'@export
roll_eval_mvgam = function(object,
n_evaluations = 5,
evaluation_seq,
n_samples = 5000,
fc_horizon = 3,
n_cores = 2,
n_cores = 1,
score = 'drps',
log = FALSE,
weights){
Expand All @@ -335,6 +337,9 @@ roll_eval_mvgam = function(object,
call. = FALSE)
}
validate_pos_integer(n_cores)
if(n_cores > 1L){
message('argument "n_cores" is deprecated')
}
validate_pos_integer(n_evaluations)
validate_pos_integer(n_samples)
validate_pos_integer(fc_horizon)
Expand Down Expand Up @@ -378,32 +383,7 @@ roll_eval_mvgam = function(object,
weights <- rep(1, NCOL(object$ytimes))
}

cl <- parallel::makePSOCKcluster(n_cores)
parallel::setDefaultCluster(cl)
clusterExport(NULL, c('all_timepoints',
'evaluation_seq',
'object',
'n_samples',
'fc_horizon',
'eval_mvgam',
'score',
'log',
'weights'),
envir = environment())
clusterEvalQ(cl, library(mgcv))
clusterEvalQ(cl, library(rstan))
clusterEvalQ(cl, library(dplyr))

# Grab internal functions to export to each worker
funs_list <- c('eval_mvgam')
attr(funs_list, 'envir') <- as.environment(asNamespace("mvgam"))
attr(funs_list, 'mode') <- 'function'

parallel::clusterExport(cl = cl,
funs_list,
envir = as.environment(asNamespace("mvgam")))

evals <- parallel::parLapply(cl = cl, evaluation_seq, function(timepoint){
evals <- lapply(evaluation_seq, function(timepoint){
eval_mvgam(object = object,
n_samples = n_samples,
n_cores = 1,
Expand All @@ -413,7 +393,6 @@ roll_eval_mvgam = function(object,
log = log,
weights = weights)
})
stopCluster(cl)

# Take sum of score at each evaluation point for multivariate models
sum_or_na = function(x){
Expand Down Expand Up @@ -491,15 +470,15 @@ roll_eval_mvgam = function(object,
#'posterior distribution
#'@param fc_horizon \code{integer} specifying the length of the forecast horizon for evaluating forecasts
#'@param n_evaluations \code{integer} specifying the total number of evaluations to perform
#'@param n_cores \code{integer} specifying number of cores for generating particle forecasts in parallel
#'@param n_cores Deprecated. Parallel processing is no longer supported
#'@rdname evaluate_mvgams
#'@export
compare_mvgams = function(model1,
model2,
n_samples = 1000,
fc_horizon = 3,
n_evaluations = 10,
n_cores = 2,
n_cores = 1,
score = 'drps',
log = FALSE,
weights){
Expand All @@ -526,6 +505,9 @@ compare_mvgams = function(model1,
validate_pos_integer(n_evaluations)
validate_pos_integer(fc_horizon)
validate_pos_integer(n_cores)
if(n_cores > 1L){
message('argument "n_cores" is deprecated')
}
validate_pos_integer(n_samples)

# Evaluate the two models
Expand Down
59 changes: 6 additions & 53 deletions R/forecast.mvgam.R
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
#'@title Extract or compute hindcasts and forecasts for a fitted \code{mvgam} object
#'@name forecast.mvgam
#'@importFrom parallel clusterExport stopCluster setDefaultCluster
#'@importFrom stats predict
#'@importFrom rlang missing_arg
#'@inheritParams predict.mvgam
Expand All @@ -14,7 +13,7 @@
#'observation of series 1 in the original data and the first observation for series 1 in \code{newdata})
#'@param data_test Deprecated. Still works in place of \code{newdata} but users are recommended to use
#'\code{newdata} instead for more seamless integration into `R` workflows
#'@param n_cores \code{integer} specifying number of cores for generating forecasts in parallel
#'@param n_cores Deprecated. Parallel processing is no longer supported
#'@param ... Ignored
#'@details Posterior predictions are drawn from the fitted \code{mvgam} and used to simulate a forecast distribution
#'@return An object of class \code{mvgam_forecast} containing hindcast and forecast distributions.
Expand Down Expand Up @@ -72,6 +71,9 @@ forecast.mvgam = function(object,
...){
# Check arguments
validate_pos_integer(n_cores)
if(n_cores > 1L){
message('argument "n_cores" is deprecated')
}

if(!missing("newdata")){
data_test <- newdata
Expand Down Expand Up @@ -834,56 +836,8 @@ forecast_draws = function(object,
}
}

# Set up parallel environment for looping across posterior draws
# to compute h-step ahead forecasts
cl <- parallel::makePSOCKcluster(n_cores)
parallel::setDefaultCluster(cl)
parallel::clusterExport(NULL, c('family',
'family_pars',
'trials',
'trend_model',
'trend_pars',
'type',
'use_lv',
'betas',
'betas_trend',
'n_series',
'data_test',
'series',
'series_test',
'Xp',
'Xp_trend',
'fc_horizon',
'b_uncertainty',
'trend_uncertainty',
'obs_uncertainty',
'time_dis'),
envir = environment())

# Grab internal functions to export to each worker
# funs_list <- c('extract_general_trend_pars',
# 'linkfun',
# 'forecast_trend',
# 'extract_series_trend_pars',
# 'mvgam_predict',
# 'prep_varma_params',
# 'sim_varma',
# 'validate_equaldims',
# 'varma_recursC',
# 'log_sum_exp')
# attr(funs_list, 'envir') <- as.environment(asNamespace("mvgam"))
# attr(funs_list, 'mode') <- 'function'
#
# parallel::clusterExport(cl = cl,
# funs_list,
# envir = as.environment(asNamespace("mvgam")))

parallel::clusterExport(cl = cl,
unclass(lsf.str(envir = asNamespace("mvgam"),
all = TRUE)),
envir = as.environment(asNamespace("mvgam")))

fc_preds <- parallel::parLapply(cl = cl, seq_len(dim(betas)[1]), function(i){
# Loop over draws and compute forecasts (in serial at the moment)
fc_preds <- lapply(seq_len(dim(betas)[1]), function(i){
# Sample index
samp_index <- i

Expand Down Expand Up @@ -1042,7 +996,6 @@ forecast_draws = function(object,
}
out
})
stopCluster(cl)
}

return(fc_preds)
Expand Down
8 changes: 4 additions & 4 deletions man/evaluate_mvgams.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 1 addition & 1 deletion man/forecast.mvgam.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit f7931f4

Please sign in to comment.