Skip to content

Commit

Permalink
Fix #203
Browse files Browse the repository at this point in the history
  • Loading branch information
wlandau committed Jan 30, 2025
1 parent c61f734 commit 1635eae
Show file tree
Hide file tree
Showing 11 changed files with 309 additions and 0 deletions.
1 change: 1 addition & 0 deletions NAMESPACE
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,7 @@ export(tar_rep2)
export(tar_rep2_raw)
export(tar_rep2_run)
export(tar_rep2_run_rep)
export(tar_rep_index)
export(tar_rep_map)
export(tar_rep_map_raw)
export(tar_rep_raw)
Expand Down
1 change: 1 addition & 0 deletions NEWS.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
* Use name when passing the `quiet` argument to `quarto_inspect()` (#208, @yusuke-sasaki-jprep).
* Explicitly pass the `profile` argument to `quarto_inspect()` and `quarto_render()`. Requires R package `quarto >= 1.4`, which is already in the `DESCIRPTION`.
* `tar_map_rep()` now aggregates dynamic branches in parallel over static branches (#204).
* Add `tar_rep_index()` (#203).

# tarchetypes 0.11.0

Expand Down
17 changes: 17 additions & 0 deletions R/class_step.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,17 @@
step_new <- function() {
new.env(hash = TRUE, parent = emptyenv())
}

step_set <- function(step, batch, rep, reps) {
step$batch <- as.integer(batch)
step$rep <- as.integer(rep)
step$index <- as.integer(rep + (reps * (batch - 1)))
}

step_reset <- function(step) {
step$batch <- NULL
step$rep <- NULL
step$index <- NULL
}

step_tar_rep <- step_new()
6 changes: 6 additions & 0 deletions R/tar_map2_raw.R
Original file line number Diff line number Diff line change
Expand Up @@ -321,6 +321,12 @@ tar_map2_run_rep <- function(
}
)
if_any(anyNA(seed), NULL, targets::tar_seed_set(seed = seed))
step_set(
step = step_tar_rep,
batch = batch,
rep = rep,
reps = length(seeds)
)
out <- eval(command, envir = envir)
out <- tar_append_static_values(out, values[, columns])
out[["tar_batch"]] <- as.integer(batch)
Expand Down
6 changes: 6 additions & 0 deletions R/tar_rep2_raw.R
Original file line number Diff line number Diff line change
Expand Up @@ -156,6 +156,12 @@ tar_rep2_run <- function(command, batches, iteration, rep_workers) {
tar_rep2_run_rep <- function(rep, slice, command, batch, seeds, envir) {
seed <- as.integer(if_any(anyNA(seeds), NA_integer_, seeds[rep]))
if_any(anyNA(seed), NULL, targets::tar_seed_set(seed = seed))
step_set(
step = step_tar_rep,
batch = batch,
rep = rep,
reps = length(seeds)
)
out <- eval(command, envir = slice, enclos = envir)
out$tar_batch <- as.integer(batch)
out$tar_rep <- as.integer(rep)
Expand Down
54 changes: 54 additions & 0 deletions R/tar_rep_index.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
#' @title Get overall rep index.
#' @export
#' @family Dynamic batched replication indexing
#' @description Get the integer index of the current replication
#' in certain target factories.
#' @details
#' [tar_rep_index()] cannot run in your interactive R session
#' or even the setup portion of `_targets.R`.
#' It must be part of the R command of a target actively
#' running in a pipeline.
#'
#' In addition, [tar_rep_index()] is only compatible with
#' [tar_rep()], [tar_rep2()], [tar_map_rep()], [tar_map2_count()],
#' and [tar_map2_size()].
#' In the latter 3 cases, [tar_rep_index()] cannot be part of
#' the `values` or `command1` arguments.
#'
#' In [tar_map_rep()], each row of the `values` argument
#' (each "scenario") gets its own independent set of index values from 1 to
#' `batches * reps`.
#' @return Positive integer from 1 to `batches * reps`,
#' index of the current replication in an ongoing pipeline.
#' @examples
#' if (identical(Sys.getenv("TAR_LONG_EXAMPLES"), "true")) {
#' targets::tar_dir({ # tar_dir() runs code from a temporary directory.
#' targets::tar_script({
#' tar_map_rep(
#' x,
#' data.frame(index = tar_rep_index()),
#' batches = 2L,
#' reps = 3L,
#' values = list(value = c("a", "b"))
#' )
#' })
#' targets::tar_make()
#' x <- targets::tar_read(x)
#' all(x$index == x$tar_rep + (3L * (x$tar_batch - 1L)))
#' #> TRUE
#' })
#' }
tar_rep_index <- function() {
index <- .subset2(step_tar_rep, "index")
if (is.null(index)) {
message <- paste(
"tar_rep_index() can only run inside the R command of a target",
"actively running in a pipeline, and it is only compatible with",
"these specific target factories:",
"tar_rep(), tar_rep2(), tar_map_rep(),",
"tar_map2_count(), and tar_map2_size()."
)
targets::tar_throw_run(message)
}
index
}
6 changes: 6 additions & 0 deletions R/tar_rep_raw.R
Original file line number Diff line number Diff line change
Expand Up @@ -270,6 +270,12 @@ tar_rep_run_map <- function(expr, batch, reps, rep_workers) {
tar_rep_run_map_rep <- function(rep, expr, batch, seeds, envir) {
seed <- as.integer(if_any(anyNA(seeds), NA_integer_, seeds[rep]))
if_any(anyNA(seed), NULL, targets::tar_seed_set(seed = seed))
step_set(
step = step_tar_rep,
batch = batch,
rep = rep,
reps = length(seeds)
)
out <- eval(expr, envir = envir)
if (is.list(out)) {
out[["tar_batch"]] <- as.integer(batch)
Expand Down
3 changes: 3 additions & 0 deletions _pkgdown.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,9 @@ reference:
contents:
- 'tar_map2_count'
- 'tar_map2_size'
- title: Dynamic batched replication indexing
contents:
- 'tar_rep_index'
- title: Target selection
contents:
- 'tar_select_names'
Expand Down
52 changes: 52 additions & 0 deletions man/tar_rep_index.Rd

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

19 changes: 19 additions & 0 deletions tests/testthat/test-class_step.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
targets::tar_test("step class works", {
step <- step_new()
index <- 0L
batches <- 5L
reps <- 7L
for (batch in seq_len(batches)) {
for (rep in seq_len(reps)) {
index <- index + 1L
step_set(step = step, batch = batch, rep = rep, reps = reps)
expect_equal(step$batch, batch)
expect_equal(step$rep, rep)
expect_equal(step$index, index)
}
}
step_reset(step = step)
for (field in c("batch", "rep", "index")) {
expect_null(step[[field]])
}
})
144 changes: 144 additions & 0 deletions tests/testthat/test-tar_rep_index.R
Original file line number Diff line number Diff line change
@@ -0,0 +1,144 @@
targets::tar_test("tar_rep_index() without a pipeline", {
skip_on_cran()
expect_error(tar_rep_index(), class = "tar_condition_run")
})

targets::tar_test("tar_rep_index() in tar_rep()", {
skip_on_cran()
on.exit(step_reset(step_tar_rep))
targets::tar_script(
tar_rep(x, data.frame(index = tar_rep_index()), batches = 2L, reps = 3L)
)
targets::tar_make(callr_function = NULL)
x <- targets::tar_read(x)
expect_equal(
x$index,
x$tar_rep + (3L * (x$tar_batch - 1L))
)
})

targets::tar_test("tar_rep_index() in tar_rep2()", {
skip_on_cran()
on.exit(step_reset(step_tar_rep))
targets::tar_script({
list(
tar_rep(x, data.frame(index = tar_rep_index()), batches = 2L, reps = 3L),
tar_rep2(
name = y,
command = data.frame(index2 = tar_rep_index(), index = x$index),
x
)
)
})
targets::tar_make(callr_function = NULL)
x <- targets::tar_read(x)
expect_equal(
x$index,
x$tar_rep + (3L * (x$tar_batch - 1L))
)
y <- targets::tar_read(y)
expect_equal(
y$index,
y$tar_rep + (3L * (y$tar_batch - 1L))
)
expect_equal(y$index, y$index2)
})

targets::tar_test("tar_rep_index() in tar_map_rep()", {
skip_on_cran()
on.exit(step_reset(step_tar_rep))
targets::tar_script(
tar_map_rep(
x,
data.frame(index = tar_rep_index()),
batches = 2L,
reps = 3L,
values = list(value = c("a", "b"))
)
)
targets::tar_make(callr_function = NULL)
x <- targets::tar_read(x)
expect_equal(nrow(x), 12L)
expect_equal(
x$index,
x$tar_rep + (3L * (x$tar_batch - 1L))
)
})

targets::tar_test("tar_rep_index() in tar_map2_count()", {
skip_on_cran()
on.exit(step_reset(step_tar_rep))
targets::tar_script({
f1 <- function(arg1) {
tibble::tibble(
arg1 = arg1,
arg2 = seq_len(6)
)
}
f2 <- function(arg1, arg2) {
tibble::tibble(
result = paste(arg1, arg2),
length_arg1 = length(arg1),
length_arg2 = length(arg2),
random = sample.int(1e6, size = 1L),
index = tar_rep_index()
)
}
tar_map2_count(
x,
command1 = f1(arg1),
command2 = f2(arg1, arg2),
values = tibble::tibble(arg1 = letters[seq_len(2)]),
names = arg1,
suffix1 = "i",
suffix2 = "ii",
batches = 3
)
})
targets::tar_make(callr_function = NULL)
x <- targets::tar_read(x)
expect_equal(nrow(x), 12L)
expect_equal(
x$index,
x$tar_rep + (2L * (x$tar_batch - 1L))
)
})

targets::tar_test("tar_rep_index() in tar_map2_count()", {
skip_on_cran()
on.exit(step_reset(step_tar_rep))
targets::tar_script({
f1 <- function(arg1) {
tibble::tibble(
arg1 = arg1,
arg2 = seq_len(6)
)
}
f2 <- function(arg1, arg2) {
tibble::tibble(
result = paste(arg1, arg2),
length_arg1 = length(arg1),
length_arg2 = length(arg2),
random = sample.int(1e6, size = 1L),
index = tar_rep_index()
)
}
tar_map2_size(
x,
command1 = f1(arg1),
command2 = f2(arg1, arg2),
values = tibble::tibble(arg1 = letters[seq_len(2)]),
names = arg1,
suffix1 = "i",
suffix2 = "ii",
size = 2
)
})
targets::tar_make(callr_function = NULL)
x <- targets::tar_read(x)
expect_equal(nrow(x), 12L)
expect_equal(
x$index,
x$tar_rep + (2L * (x$tar_batch - 1L))
)
})

0 comments on commit 1635eae

Please sign in to comment.