From 1635eaed424bbf1153325ee9afc581b6fa119441 Mon Sep 17 00:00:00 2001 From: wlandau Date: Thu, 30 Jan 2025 12:06:24 -0500 Subject: [PATCH] Fix #203 --- NAMESPACE | 1 + NEWS.md | 1 + R/class_step.R | 17 ++++ R/tar_map2_raw.R | 6 ++ R/tar_rep2_raw.R | 6 ++ R/tar_rep_index.R | 54 +++++++++++ R/tar_rep_raw.R | 6 ++ _pkgdown.yml | 3 + man/tar_rep_index.Rd | 52 ++++++++++ tests/testthat/test-class_step.R | 19 ++++ tests/testthat/test-tar_rep_index.R | 144 ++++++++++++++++++++++++++++ 11 files changed, 309 insertions(+) create mode 100644 R/class_step.R create mode 100644 R/tar_rep_index.R create mode 100644 man/tar_rep_index.Rd create mode 100644 tests/testthat/test-class_step.R create mode 100644 tests/testthat/test-tar_rep_index.R diff --git a/NAMESPACE b/NAMESPACE index 1211c06..c941aad 100644 --- a/NAMESPACE +++ b/NAMESPACE @@ -122,6 +122,7 @@ export(tar_rep2) export(tar_rep2_raw) export(tar_rep2_run) export(tar_rep2_run_rep) +export(tar_rep_index) export(tar_rep_map) export(tar_rep_map_raw) export(tar_rep_raw) diff --git a/NEWS.md b/NEWS.md index 0e28b06..816fc58 100644 --- a/NEWS.md +++ b/NEWS.md @@ -5,6 +5,7 @@ * Use name when passing the `quiet` argument to `quarto_inspect()` (#208, @yusuke-sasaki-jprep). * Explicitly pass the `profile` argument to `quarto_inspect()` and `quarto_render()`. Requires R package `quarto >= 1.4`, which is already in the `DESCIRPTION`. * `tar_map_rep()` now aggregates dynamic branches in parallel over static branches (#204). +* Add `tar_rep_index()` (#203). # tarchetypes 0.11.0 diff --git a/R/class_step.R b/R/class_step.R new file mode 100644 index 0000000..4485f50 --- /dev/null +++ b/R/class_step.R @@ -0,0 +1,17 @@ +step_new <- function() { + new.env(hash = TRUE, parent = emptyenv()) +} + +step_set <- function(step, batch, rep, reps) { + step$batch <- as.integer(batch) + step$rep <- as.integer(rep) + step$index <- as.integer(rep + (reps * (batch - 1))) +} + +step_reset <- function(step) { + step$batch <- NULL + step$rep <- NULL + step$index <- NULL +} + +step_tar_rep <- step_new() diff --git a/R/tar_map2_raw.R b/R/tar_map2_raw.R index b1c0f29..6302fc9 100644 --- a/R/tar_map2_raw.R +++ b/R/tar_map2_raw.R @@ -321,6 +321,12 @@ tar_map2_run_rep <- function( } ) if_any(anyNA(seed), NULL, targets::tar_seed_set(seed = seed)) + step_set( + step = step_tar_rep, + batch = batch, + rep = rep, + reps = length(seeds) + ) out <- eval(command, envir = envir) out <- tar_append_static_values(out, values[, columns]) out[["tar_batch"]] <- as.integer(batch) diff --git a/R/tar_rep2_raw.R b/R/tar_rep2_raw.R index 622a206..37d3059 100644 --- a/R/tar_rep2_raw.R +++ b/R/tar_rep2_raw.R @@ -156,6 +156,12 @@ tar_rep2_run <- function(command, batches, iteration, rep_workers) { tar_rep2_run_rep <- function(rep, slice, command, batch, seeds, envir) { seed <- as.integer(if_any(anyNA(seeds), NA_integer_, seeds[rep])) if_any(anyNA(seed), NULL, targets::tar_seed_set(seed = seed)) + step_set( + step = step_tar_rep, + batch = batch, + rep = rep, + reps = length(seeds) + ) out <- eval(command, envir = slice, enclos = envir) out$tar_batch <- as.integer(batch) out$tar_rep <- as.integer(rep) diff --git a/R/tar_rep_index.R b/R/tar_rep_index.R new file mode 100644 index 0000000..1b49b6f --- /dev/null +++ b/R/tar_rep_index.R @@ -0,0 +1,54 @@ +#' @title Get overall rep index. +#' @export +#' @family Dynamic batched replication indexing +#' @description Get the integer index of the current replication +#' in certain target factories. +#' @details +#' [tar_rep_index()] cannot run in your interactive R session +#' or even the setup portion of `_targets.R`. +#' It must be part of the R command of a target actively +#' running in a pipeline. +#' +#' In addition, [tar_rep_index()] is only compatible with +#' [tar_rep()], [tar_rep2()], [tar_map_rep()], [tar_map2_count()], +#' and [tar_map2_size()]. +#' In the latter 3 cases, [tar_rep_index()] cannot be part of +#' the `values` or `command1` arguments. +#' +#' In [tar_map_rep()], each row of the `values` argument +#' (each "scenario") gets its own independent set of index values from 1 to +#' `batches * reps`. +#' @return Positive integer from 1 to `batches * reps`, +#' index of the current replication in an ongoing pipeline. +#' @examples +#' if (identical(Sys.getenv("TAR_LONG_EXAMPLES"), "true")) { +#' targets::tar_dir({ # tar_dir() runs code from a temporary directory. +#' targets::tar_script({ +#' tar_map_rep( +#' x, +#' data.frame(index = tar_rep_index()), +#' batches = 2L, +#' reps = 3L, +#' values = list(value = c("a", "b")) +#' ) +#' }) +#' targets::tar_make() +#' x <- targets::tar_read(x) +#' all(x$index == x$tar_rep + (3L * (x$tar_batch - 1L))) +#' #> TRUE +#' }) +#' } +tar_rep_index <- function() { + index <- .subset2(step_tar_rep, "index") + if (is.null(index)) { + message <- paste( + "tar_rep_index() can only run inside the R command of a target", + "actively running in a pipeline, and it is only compatible with", + "these specific target factories:", + "tar_rep(), tar_rep2(), tar_map_rep(),", + "tar_map2_count(), and tar_map2_size()." + ) + targets::tar_throw_run(message) + } + index +} diff --git a/R/tar_rep_raw.R b/R/tar_rep_raw.R index ad9fff4..a3f8824 100644 --- a/R/tar_rep_raw.R +++ b/R/tar_rep_raw.R @@ -270,6 +270,12 @@ tar_rep_run_map <- function(expr, batch, reps, rep_workers) { tar_rep_run_map_rep <- function(rep, expr, batch, seeds, envir) { seed <- as.integer(if_any(anyNA(seeds), NA_integer_, seeds[rep])) if_any(anyNA(seed), NULL, targets::tar_seed_set(seed = seed)) + step_set( + step = step_tar_rep, + batch = batch, + rep = rep, + reps = length(seeds) + ) out <- eval(expr, envir = envir) if (is.list(out)) { out[["tar_batch"]] <- as.integer(batch) diff --git a/_pkgdown.yml b/_pkgdown.yml index d660b1f..0ce782f 100644 --- a/_pkgdown.yml +++ b/_pkgdown.yml @@ -59,6 +59,9 @@ reference: contents: - 'tar_map2_count' - 'tar_map2_size' +- title: Dynamic batched replication indexing + contents: + - 'tar_rep_index' - title: Target selection contents: - 'tar_select_names' diff --git a/man/tar_rep_index.Rd b/man/tar_rep_index.Rd new file mode 100644 index 0000000..45c9247 --- /dev/null +++ b/man/tar_rep_index.Rd @@ -0,0 +1,52 @@ +% Generated by roxygen2: do not edit by hand +% Please edit documentation in R/tar_rep_index.R +\name{tar_rep_index} +\alias{tar_rep_index} +\title{Get overall rep index.} +\usage{ +tar_rep_index() +} +\value{ +Positive integer from 1 to \code{batches * reps}, +index of the current replication in an ongoing pipeline. +} +\description{ +Get the integer index of the current replication +in certain target factories. +} +\details{ +\code{\link[=tar_rep_index]{tar_rep_index()}} cannot run in your interactive R session +or even the setup portion of \verb{_targets.R}. +It must be part of the R command of a target actively +running in a pipeline. + +In addition, \code{\link[=tar_rep_index]{tar_rep_index()}} is only compatible with +\code{\link[=tar_rep]{tar_rep()}}, \code{\link[=tar_rep2]{tar_rep2()}}, \code{\link[=tar_map_rep]{tar_map_rep()}}, \code{\link[=tar_map2_count]{tar_map2_count()}}, +and \code{\link[=tar_map2_size]{tar_map2_size()}}. +In the latter 3 cases, \code{\link[=tar_rep_index]{tar_rep_index()}} cannot be part of +the \code{values} or \code{command1} arguments. + +In \code{\link[=tar_map_rep]{tar_map_rep()}}, each row of the \code{values} argument +(each "scenario") gets its own independent set of index values from 1 to +\code{batches * reps}. +} +\examples{ +if (identical(Sys.getenv("TAR_LONG_EXAMPLES"), "true")) { +targets::tar_dir({ # tar_dir() runs code from a temporary directory. +targets::tar_script({ + tar_map_rep( + x, + data.frame(index = tar_rep_index()), + batches = 2L, + reps = 3L, + values = list(value = c("a", "b")) + ) +}) +targets::tar_make() +x <- targets::tar_read(x) +all(x$index == x$tar_rep + (3L * (x$tar_batch - 1L))) +#> TRUE +}) +} +} +\concept{Dynamic batched replication indexing} diff --git a/tests/testthat/test-class_step.R b/tests/testthat/test-class_step.R new file mode 100644 index 0000000..96386c5 --- /dev/null +++ b/tests/testthat/test-class_step.R @@ -0,0 +1,19 @@ +targets::tar_test("step class works", { + step <- step_new() + index <- 0L + batches <- 5L + reps <- 7L + for (batch in seq_len(batches)) { + for (rep in seq_len(reps)) { + index <- index + 1L + step_set(step = step, batch = batch, rep = rep, reps = reps) + expect_equal(step$batch, batch) + expect_equal(step$rep, rep) + expect_equal(step$index, index) + } + } + step_reset(step = step) + for (field in c("batch", "rep", "index")) { + expect_null(step[[field]]) + } +}) diff --git a/tests/testthat/test-tar_rep_index.R b/tests/testthat/test-tar_rep_index.R new file mode 100644 index 0000000..8dbc8e5 --- /dev/null +++ b/tests/testthat/test-tar_rep_index.R @@ -0,0 +1,144 @@ +targets::tar_test("tar_rep_index() without a pipeline", { + skip_on_cran() + expect_error(tar_rep_index(), class = "tar_condition_run") +}) + +targets::tar_test("tar_rep_index() in tar_rep()", { + skip_on_cran() + on.exit(step_reset(step_tar_rep)) + targets::tar_script( + tar_rep(x, data.frame(index = tar_rep_index()), batches = 2L, reps = 3L) + ) + targets::tar_make(callr_function = NULL) + x <- targets::tar_read(x) + expect_equal( + x$index, + x$tar_rep + (3L * (x$tar_batch - 1L)) + ) +}) + +targets::tar_test("tar_rep_index() in tar_rep2()", { + skip_on_cran() + on.exit(step_reset(step_tar_rep)) + targets::tar_script({ + list( + tar_rep(x, data.frame(index = tar_rep_index()), batches = 2L, reps = 3L), + tar_rep2( + name = y, + command = data.frame(index2 = tar_rep_index(), index = x$index), + x + ) + ) + }) + targets::tar_make(callr_function = NULL) + x <- targets::tar_read(x) + expect_equal( + x$index, + x$tar_rep + (3L * (x$tar_batch - 1L)) + ) + y <- targets::tar_read(y) + expect_equal( + y$index, + y$tar_rep + (3L * (y$tar_batch - 1L)) + ) + expect_equal(y$index, y$index2) +}) + +targets::tar_test("tar_rep_index() in tar_map_rep()", { + skip_on_cran() + on.exit(step_reset(step_tar_rep)) + targets::tar_script( + tar_map_rep( + x, + data.frame(index = tar_rep_index()), + batches = 2L, + reps = 3L, + values = list(value = c("a", "b")) + ) + ) + targets::tar_make(callr_function = NULL) + x <- targets::tar_read(x) + expect_equal(nrow(x), 12L) + expect_equal( + x$index, + x$tar_rep + (3L * (x$tar_batch - 1L)) + ) +}) + +targets::tar_test("tar_rep_index() in tar_map2_count()", { + skip_on_cran() + on.exit(step_reset(step_tar_rep)) + targets::tar_script({ + f1 <- function(arg1) { + tibble::tibble( + arg1 = arg1, + arg2 = seq_len(6) + ) + } + f2 <- function(arg1, arg2) { + tibble::tibble( + result = paste(arg1, arg2), + length_arg1 = length(arg1), + length_arg2 = length(arg2), + random = sample.int(1e6, size = 1L), + index = tar_rep_index() + ) + } + tar_map2_count( + x, + command1 = f1(arg1), + command2 = f2(arg1, arg2), + values = tibble::tibble(arg1 = letters[seq_len(2)]), + names = arg1, + suffix1 = "i", + suffix2 = "ii", + batches = 3 + ) + }) + targets::tar_make(callr_function = NULL) + x <- targets::tar_read(x) + expect_equal(nrow(x), 12L) + expect_equal( + x$index, + x$tar_rep + (2L * (x$tar_batch - 1L)) + ) +}) + +targets::tar_test("tar_rep_index() in tar_map2_count()", { + skip_on_cran() + on.exit(step_reset(step_tar_rep)) + targets::tar_script({ + f1 <- function(arg1) { + tibble::tibble( + arg1 = arg1, + arg2 = seq_len(6) + ) + } + f2 <- function(arg1, arg2) { + tibble::tibble( + result = paste(arg1, arg2), + length_arg1 = length(arg1), + length_arg2 = length(arg2), + random = sample.int(1e6, size = 1L), + index = tar_rep_index() + ) + } + tar_map2_size( + x, + command1 = f1(arg1), + command2 = f2(arg1, arg2), + values = tibble::tibble(arg1 = letters[seq_len(2)]), + names = arg1, + suffix1 = "i", + suffix2 = "ii", + size = 2 + ) + }) + targets::tar_make(callr_function = NULL) + x <- targets::tar_read(x) + expect_equal(nrow(x), 12L) + expect_equal( + x$index, + x$tar_rep + (2L * (x$tar_batch - 1L)) + ) +})