Skip to content

Commit

Permalink
Unify parallelisation approaches: new cl_*() functions to faciliate t…
Browse files Browse the repository at this point in the history
…he implication of parallelisation; support for socket or fork clusters; update documentation and add tips on parallelisation; and new global switches to turn on/off parallel/slow examples for speed during testing.
  • Loading branch information
edwardlavender committed Dec 2, 2021
1 parent 0e05b1d commit 71852e0
Show file tree
Hide file tree
Showing 46 changed files with 792 additions and 225 deletions.
1 change: 1 addition & 0 deletions DESCRIPTION
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ Suggests:
plotly, animation, scales, viridis,
circular,
httr, jsonlite,
rbenchmark,
rmarkdown, knitr
Remotes:
github::edwardlavender/prettyGraphics,
Expand Down
5 changes: 5 additions & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,11 @@ export(acs_setup_mobility)
export(assemble_sentinel_counts)
export(buffer_and_crop)
export(cells_from_val)
export(cl_check)
export(cl_chunks)
export(cl_export)
export(cl_lapply)
export(cl_stop)
export(coa)
export(coa_setup_delta_t)
export(crop_from_click)
Expand Down
2 changes: 1 addition & 1 deletion R/ac.R
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@
#' @param con If \code{verbose = TRUE}, \code{con} is character string that defines how messages relaying function progress are returned. If \code{con = ""}, messages are printed to the console (unless redirected by \code{\link[base]{sink}}), an approach that is only implemented if the function is not implemented in parallel. Otherwise, \code{con} defines the directory into which to write .txt files, into which messages are written to relay function progress. This approach, rather than printing to the console, is recommended for clarity, speed and debugging. If the algorithm is implemented step-wise, then a single file is written to the specified directory named acdc_log.txt. If the algorithm is implemented chunk-wise, then an additional file is written for each chunk (named dot_acdc_log_1.txt, dot_acdc_log_2.txt and so on), with the details for each chunk.
#' @param progress (optional) If the algorithm is implemented step-wise, \code{progress} is an integer (\code{1}, \code{2} or \code{3}) that defines whether or not to display a progress bar in the console as the algorithm moves over acoustic time steps (\code{1}), the archival time steps between each pair of acoustic detections (\code{2}) or both acoustic and archival time steps (\code{3}), in which case the overall acoustic progress bar is punctuated by an archival progress bar for each pair of acoustic detections. This option is useful if there is a large number of archival observations between acoustic detections. Any other input will suppress the progress bar. If the algorithm is implemented for chunks, inputs to \code{progress} are ignored and a single progress bar is shown of the progress across acoustic chunks.
#' @param split A character string that defines the time unit used to split acoustic time series into chunks (e.g., \code{"12 hours"}). If provided, this must be supported by \code{\link[lubridate]{floor_date}} (otherwise, a pre-defined list of acoustic time series can be passed to \code{acoustics}, e.g., specifying seasonal chunks). If \code{split = NULL} and a cluster has been specified (see \code{cl}) (and \code{acoustics} is a dataframe), then the acoustic time series is automatically split into chunks and the algorithm implemented for each chunk in parallel.
#' @param cl,varlist Parallelisation arguments. \code{cl} is cluster object created by \code{\link[parallel]{makeCluster}} to implement the algorithm in parallel. If supplied, the algorithm is implemented for each chunk in a list of acoustic time series, either (a) as supplied by the user (if \code{acoustics} is a list), (b) as defined by the input to \code{split}, or (c) as defined automatically from the number of nodes in the cluster if \code{split = NULL}. If \code{cl} is supplied, \code{varlist} may also be required. This is a character vector of objects to export. \code{varlist} is passed to the \code{varlist} of \code{\link[parallel]{clusterExport}}. Exported objects must be located in the global environment.
#' @param cl,varlist Parallelisation options. \code{cl} is cluster object created by \code{\link[parallel]{makeCluster}} to implement the algorithm in parallel. If supplied, the algorithm is implemented for each chunk in a list of acoustic time series, either (a) as supplied by the user (if \code{acoustics} is a list), (b) as defined by the input to \code{split}, or (c) as defined automatically from the number of nodes in the cluster if \code{split = NULL}. If \code{cl} is supplied, \code{varlist} may also be required. This is a character vector of objects to export. \code{varlist} is passed to the \code{varlist} of \code{\link[parallel]{clusterExport}}. Exported objects must be located in the global environment.
#'
#' @details The acoustic-centroid (AC) algorithm is an approach which uses acoustic detections to infer the possible locations of tagged animals within an area over some time interval. The locational information provided by acoustic detections is represented by acoustic centroids, which are areas around receivers that define where an individual could have been at each time point given the spatiotemporal pattern of detections at receivers, a model of detection probability and a movement parameter.
#'
Expand Down
2 changes: 1 addition & 1 deletion R/acdc.R
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@
#' @param con If \code{verbose = TRUE}, \code{con} is character string defines how messages relaying function progress are returned (see \code{\link[flapper]{ac}}).
#' @param progress An integer controlling the progress bar (see \code{\link[flapper]{ac}}).
#' @param split A character string that defines the time unit used to split acoustic time series into chunks (see \code{\link[flapper]{ac}}).
#' @param cl,varlist Parallelisation arguments to implement the algorithm in parallel (see \code{\link[flapper]{ac}}).
#' @param cl,varlist (optional) Parallelisation options (see \code{\link[flapper]{ac}}).
#'
#' @details The acoustic-centroid depth-contour (ACDC) algorithm is an approach which integrates acoustic detections and depth observations to infer the possible locations of tagged animals within an area over some time interval. The locational information provided by acoustic detections is represented by acoustic centroids, which are areas around receivers that define where an individual could have been at each time point given the spatiotemporal pattern of detections at receivers, a model of detection probability and a movement parameter (see \code{\link[flapper]{ac}}). The locational information provided by depth observations is represented by depth contours, which are areas that define where an individual could have been at each time point given its depth and the local bathymetry (see \code{\link[flapper]{dc}}).
#'
Expand Down
7 changes: 3 additions & 4 deletions R/acdc_analyse_record.R
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ acdc_access_maps <- function(record, type = c("map_timestep", "map_cumulative"),
#' @param xlim,ylim,fix_zlim,pretty_axis_args Axis control arguments. \code{xlim} and \code{ylim} control the axis limits, following the rules of the \code{lim} argument in \code{\link[prettyGraphics]{pretty_axis}}. \code{fix_zlim} is a logical input that defines whether or not to fix z axis limits across all plots (to facilitate comparisons), or a vector of two numbers that define a custom range for the z axis which is fixed across all plots. \code{fix_zlim = FALSE} produces plots in which the z axis is allowed to vary flexibly between time units. Other axis options supported by \code{\link[prettyGraphics]{pretty_axis}} are implemented by passing a named list of arguments to this function via \code{pretty_axis_args}.
#' @param par_param (optional) A named list of arguments, passed to \code{\link[graphics]{par}}, to control the plotting window. This is executed before plotting is initiated and therefore affects all plots.
#' @param png_param (optional) A named list of arguments, passed to \code{\link[grDevices]{png}}, to save plots to file. If supplied, the plot for each time step is saved separately. The `filename' argument should be the directory in which plots are saved. Plots are then saved as "1.png", "2.png" and so on.
#' @param cl A cluster object created by \code{\link[parallel]{makeCluster}}. If supplied, the function loops over specified time steps in parallel to make plots. This is only implemented if plots are saved to file (i.e., \code{png_param} is supplied). If supplied, the connection to the cluster is closed within the function.
#' @param cl,varlist (optional) Parallelisation options. \code{cl} is (a) a cluster object from \code{\link[parallel]{makeCluster}} or (b) an integer that defines the number of child processes. \code{varlist} is a character vector of variables for export (see \code{\link[flapper]{cl_export}}). Exported variables must be located in the global environment. If a cluster is supplied, the connection to the cluster is closed within the function (see \code{\link[flapper]{cl_stop}}). For further information, see \code{\link[flapper]{cl_lapply}} and \code{\link[flapper]{flapper-tips-parallel}}. If supplied, the function loops over specified time steps in parallel to make plots. This is only implemented if plots are saved to file (i.e., \code{png_param} is supplied).
#' @param verbose A logical variable that defines whether or not relay messages to the console to monitor function progress.
#' @param check A logical variable that defines whether or not to check user inputs to the function before its initiation.
#' @param ... Additional arguments, passed to \code{\link[raster]{plot}}, to customise the blank background plot onto which spatial layers are added, such as \code{xlab}, \code{ylab} and \code{main}.
Expand Down Expand Up @@ -231,7 +231,7 @@ acdc_plot_record <- function(record,
control_sci_notation = list(magnitude = 16L, digits = 0)),
par_param = list(),
png_param = list(),
cl = NULL,
cl = NULL, varlist = NULL,
verbose = TRUE,
check = TRUE,...){

Expand Down Expand Up @@ -369,7 +369,7 @@ acdc_plot_record <- function(record,

#### Loop over every detection
cat_to_console("... Making plots for each time step ...")
pbapply::pblapply(1:length(acdc_plot), cl = cl, function(i){
cl_lapply(1:length(acdc_plot), cl = cl, varlist = varlist, fun = function(i){

#### Set up image to save
if(save_png){
Expand Down Expand Up @@ -434,7 +434,6 @@ acdc_plot_record <- function(record,
#### Save fig
if(save_png) grDevices::dev.off()
})
if(!is.null(cl)) parallel::stopCluster(cl)

return(invisible())

Expand Down
6 changes: 2 additions & 4 deletions R/acs_pl_backend.R
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@
#' @param con If \code{verbose = TRUE}, \code{con} is character string defines how messages relaying function progress are returned. If \code{con = ""}, messages are printed to the console (unless redirected by \code{\link[base]{sink}}), an approach that is only implemented if the function is not implemented in parallel. Otherwise, \code{con} defines the directory into which to write .txt files, into which messages are written to relay function progress. This approach, rather than printing to the console, is recommended for clarity, speed and debugging. If the algorithm is implemented step-wise, then a single file is written to the specified directory named acdc_log.txt. If the algorithm is implemented chunk-wise, then an additional file is written for each chunk (named dot_acdc_log_1.txt, dot_acdc_log_2.txt and so on), with the details for each chunk.
#' @param progress (optional) If the algorithm is implemented step-wise, \code{progress} is an integer (\code{1}, \code{2} or \code{3}) that defines whether or not to display a progress bar in the console as the algorithm moves over acoustic time steps (\code{1}), the `archival' time steps between each pair of acoustic detections (\code{2}) or both acoustic and archival time steps (\code{3}), in which case the overall acoustic progress bar is punctuated by an archival progress bar for each pair of acoustic detections. This option is useful if there is a large number of archival observations between acoustic detections. Any other input will suppress the progress bar. If the algorithm is implemented for chunks, inputs to \code{progress} are ignored and a single progress bar is shown of the progress across acoustic chunks.
#' @param split A character string that defines the time unit used to split acoustic time series into chunks (e.g., \code{"12 hours"}). If provided, this must be supported by \code{\link[lubridate]{floor_date}} (otherwise, a pre-defined list of acoustic time series can be passed to \code{acoustics}, e.g., specifying seasonal chunks). If \code{split = NULL} and a cluster has been specified (see \code{cl}) (and \code{acoustics} is a dataframe), then the acoustic time series is automatically split into chunks and the algorithm implemented for each chunk in parallel.
#' @param cl,varlist Parallelisation arguments. \code{cl} is cluster object created by \code{\link[parallel]{makeCluster}} to implement the algorithm in parallel. If supplied, the algorithm is implemented for each chunk in a list of acoustic time series, either (a) as supplied by the user (if \code{acoustics} is a list), (b) as defined by the input to \code{split}, or (c) as defined automatically from the number of nodes in the cluster if \code{split = NULL}. If \code{cl} is supplied, \code{varlist} may also be required. This is a character vector of objects to export. \code{varlist} is passed to the \code{varlist} of \code{\link[parallel]{clusterExport}}. Exported objects must be located in the global environment.
#' @param cl,varlist (optional) Parallelisation options. \code{cl} is (a) a cluster object from \code{\link[parallel]{makeCluster}} or (b) an integer that defines the number of child processes to implement the algorithm in parallel. If supplied, the algorithm is implemented for each chunk in a list of acoustic time series, either (a) as supplied by the user (if \code{acoustics} is a list), (b) as defined by the input to \code{split}, or (c) as defined automatically from the number of nodes in the cluster if \code{split = NULL}. If \code{cl} is supplied, \code{varlist} may also be required. This is a character vector of objects to export (see \code{\link[flapper]{cl_export}}). Exported variables must be located in the global environment. If a cluster is supplied, the connection to the cluster is closed within the function (see \code{\link[flapper]{cl_stop}}). For further information, see \code{\link[flapper]{cl_lapply}} and \code{\link[flapper]{flapper-tips-parallel}}.
#'
#' @return The function returns an \code{\link[flapper]{acdc_archive-class}} object. If a connection to write files has also been specified, an overall log (acdc_log.txt) as well as chunk-specific logs from calls to \code{\link[flapper]{.acs}}, if applicable, are written to file.
#'
Expand Down Expand Up @@ -444,8 +444,7 @@
#### Implement algorithm in parallel
cat_to_cf(paste("... Calling .acs() to implement ACDC algorithm on", length(acoustics_ls_wth_overlap), "chunks, using", n_cores, "cores..."))
out$time <- rbind(out$time, data.frame(event = "calling_.acs()", time = Sys.time()))
if(!is.null(cl) & !is.null(varlist)) parallel::clusterExport(cl = cl, varlist = varlist)
.out <- pbapply::pblapply(1:length(acoustics_ls_wth_overlap), cl = cl, function(i){
.out <- cl_lapply(1:length(acoustics_ls_wth_overlap), cl = cl, varlist = varlist, function(i){

#### Implement algorithm
.out <- .acs(acoustics = movement_ts[[i]]$acoustics,
Expand All @@ -472,7 +471,6 @@
check = FALSE)
return(.out)
})
if(!is.null(cl)) parallel::stopCluster(cl = cl)
}

#### Return outputs
Expand Down
2 changes: 0 additions & 2 deletions R/acs_setup.R
Original file line number Diff line number Diff line change
Expand Up @@ -549,7 +549,6 @@ acs_setup_detection_kernels <-
# (used to up-weight areas around a receiver with a detection)
cat_to_console("... Getting receiver-specific kernels (for detection)...")
xy_ls <- lapply(1:length(xy), function(i) xy[i, ])
# if(!is.null(cl) & !is.null(varlist)) parallel::clusterExport(cl = cl, varlist = varlist)
detection_kernels_by_xy <-
pbapply::pblapply(xy_ls, cl = NULL, function(xyi){
## Focus on area within centroid, for speed
Expand Down Expand Up @@ -590,7 +589,6 @@ acs_setup_detection_kernels <-
det_pr_around_xyi_positive = det_pr_around_xyi_positive)
return(out)
})
# if(!is.null(cl)) parallel::stopCluster(cl = cl)
names(detection_kernels_by_xy) <- as.character(xy$receiver_id)
receiver_specific_kernels <- lapply(detection_kernels_by_xy, function(elm) elm$det_pr_around_xyi)
names(receiver_specific_kernels) <- names(detection_kernels_by_xy)
Expand Down
Loading

0 comments on commit 71852e0

Please sign in to comment.