diff --git a/r/R/convert-array-stream.R b/r/R/convert-array-stream.R index b1f3e85e5..c8e73aedb 100644 --- a/r/R/convert-array-stream.R +++ b/r/R/convert-array-stream.R @@ -72,31 +72,21 @@ convert_array_stream <- function(array_stream, to = NULL, size = NULL, n = Inf) } else { # Otherwise, we need to collect all batches and calculate the total length # before calling nanoarrow_c_convert_array_stream(). - batches <- collect_array_stream( - array_stream, - n, - schema = schema, - validate = FALSE - ) + batch_info <- .Call(nanoarrow_c_collect_array_stream, array_stream, n) # If there is exactly one batch, use convert_array(). Converting a single # array currently takes a more efficient code path for types that can be # converted as ALTREP (e.g., strings). - if (length(batches) == 1L) { - return(.Call(nanoarrow_c_convert_array, batches[[1]], to)) + if (batch_info$n == 1L) { + array <- batch_info$stream$get_next(schema) + return(.Call(nanoarrow_c_convert_array, array, to)) } - # Otherwise, compute the final size, create another array stream, - # and call convert_array_stream() with a known size. Using .Call() - # directly because we have already type checked the inputs. - size <- .Call(nanoarrow_c_array_list_total_length, batches) - basic_stream <- .Call(nanoarrow_c_basic_array_stream, batches, schema, FALSE) - .Call( nanoarrow_c_convert_array_stream, - basic_stream, + batch_info$stream, to, - as.double(size), + as.double(batch_info$size), Inf ) } diff --git a/r/bootstrap.R b/r/bootstrap.R index c285f487b..041a4b407 100644 --- a/r/bootstrap.R +++ b/r/bootstrap.R @@ -60,7 +60,3 @@ stopifnot(file.exists("../CMakeLists.txt") && run_bundler()) f <- "src/flatcc/portable/pdiagnostic.h" lines <- readLines(f) writeLines(gsub("^#pragma", "/**/#pragma", lines), f) - -# Remove unused files -unused_files <- list.files("src", "\\.hpp$", full.names = TRUE) -unlink(unused_files) diff --git a/r/src/.gitignore b/r/src/.gitignore index c5bf0ca6e..5093a88f8 100644 --- a/r/src/.gitignore +++ b/r/src/.gitignore @@ -22,5 +22,7 @@ nanoarrow.c nanoarrow.h nanoarrow_ipc.h nanoarrow_ipc.c +nanoarrow.hpp +nanoarrow_ipc.hpp flatcc* Makevars diff --git a/r/src/init.c b/r/src/init.c index 4d2109a2f..70eec867f 100644 --- a/r/src/init.c +++ b/r/src/init.c @@ -61,6 +61,7 @@ extern SEXP nanoarrow_c_ipc_array_reader_buffer(SEXP buffer_xptr); extern SEXP nanoarrow_c_ipc_array_reader_connection(SEXP con); extern SEXP nanoarrow_c_ipc_writer_connection(SEXP con); extern SEXP nanoarrow_c_ipc_writer_write_stream(SEXP writer_xptr, SEXP array_stream_xptr); +extern SEXP nanoarrow_c_collect_array_stream(SEXP array_stream_xptr, SEXP n_sexp); extern SEXP nanoarrow_c_allocate_schema(void); extern SEXP nanoarrow_c_allocate_array(void); extern SEXP nanoarrow_c_allocate_array_stream(void); @@ -144,6 +145,7 @@ static const R_CallMethodDef CallEntries[] = { {"nanoarrow_c_ipc_writer_connection", (DL_FUNC)&nanoarrow_c_ipc_writer_connection, 1}, {"nanoarrow_c_ipc_writer_write_stream", (DL_FUNC)&nanoarrow_c_ipc_writer_write_stream, 2}, + {"nanoarrow_c_collect_array_stream", (DL_FUNC)&nanoarrow_c_collect_array_stream, 2}, {"nanoarrow_c_allocate_schema", (DL_FUNC)&nanoarrow_c_allocate_schema, 0}, {"nanoarrow_c_allocate_array", (DL_FUNC)&nanoarrow_c_allocate_array, 0}, {"nanoarrow_c_allocate_array_stream", (DL_FUNC)&nanoarrow_c_allocate_array_stream, 0}, diff --git a/r/src/nanoarrow_cpp.cc b/r/src/nanoarrow_cpp.cc index 9c0e38d68..83ebe07ac 100644 --- a/r/src/nanoarrow_cpp.cc +++ b/r/src/nanoarrow_cpp.cc @@ -26,6 +26,9 @@ #include #include +#include "nanoarrow.hpp" +#include "nanoarrow/r.h" + // Without this infrastructure, it's possible to check that all objects // are released by running devtools::test(); gc() in a fresh session and // making sure that nanoarrow:::preserved_count() is zero afterward. @@ -201,3 +204,88 @@ extern "C" void nanoarrow_preserve_and_release_on_other_thread(SEXP obj) { std::thread worker([obj] { nanoarrow_release_sexp(obj); }); worker.join(); } + +// Collector utility for iterating over and collecting batches +// Keeping this all in a single object reduces the amount of C++ deletion +// we need to keep track of. +struct ArrayVector { + nanoarrow::UniqueSchema schema; + nanoarrow::UniqueArray batch; + std::vector vec; +}; + +// Use an external pointer to handle deleting the ArrayVector in +// the event of a longjmp +static void release_array_vector_xptr(SEXP array_vector_xptr) { + auto ptr = reinterpret_cast(R_ExternalPtrAddr(array_vector_xptr)); + if (ptr != nullptr) { + delete ptr; + } +} + +// Collects the entire array stream and collects the total number of rows and +// total number of batches so that the R code on the end of this can decide +// how best to proceed. +extern "C" SEXP nanoarrow_c_collect_array_stream(SEXP array_stream_xptr, SEXP n_sexp) { + struct ArrowArrayStream* array_stream = + nanoarrow_array_stream_from_xptr(array_stream_xptr); + + double n_real = REAL(n_sexp)[0]; + int n; + if (R_FINITE(n_real)) { + n = (int)n_real; + } else { + n = INT_MAX; + } + + auto array_vector = new ArrayVector(); + SEXP array_vector_xptr = + PROTECT(R_MakeExternalPtr(array_vector, R_NilValue, R_NilValue)); + R_RegisterCFinalizer(array_vector_xptr, &release_array_vector_xptr); + + struct ArrowError error; + ArrowErrorInit(&error); + int code = ArrowArrayStreamGetSchema(array_stream, array_vector->schema.get(), &error); + if (code != NANOARROW_OK) { + Rf_error("ArrowArrayStreamGetSchema() failed (%d): %s", code, error.message); + } + + int64_t n_actual = 0; + int64_t size = 0; + while (n > 0) { + code = ArrowArrayStreamGetNext(array_stream, array_vector->batch.get(), &error); + if (code != NANOARROW_OK) { + Rf_error("ArrowArrayStreamGetNext() failed (%d): %s", code, error.message); + } + + if (array_vector->batch->release == nullptr) { + break; + } + + size += array_vector->batch->length; + ++n_actual; + --n; + array_vector->vec.push_back(std::move(array_vector->batch)); + array_vector->batch.reset(); + + R_CheckUserInterrupt(); + } + + SEXP array_stream_out_xptr = PROTECT(nanoarrow_array_stream_owning_xptr()); + struct ArrowArrayStream* array_stream_out = + nanoarrow_output_array_stream_from_xptr(array_stream_out_xptr); + + nanoarrow::VectorArrayStream(array_vector->schema.get(), std::move(array_vector->vec)) + .ToArrayStream(array_stream_out); + + SEXP size_sexp = PROTECT(Rf_ScalarReal(size)); + SEXP n_actual_sexp = PROTECT(Rf_ScalarReal(n_actual)); + const char* names[] = {"stream", "size", "n", ""}; + SEXP out = PROTECT(Rf_mkNamed(VECSXP, names)); + SET_VECTOR_ELT(out, 0, array_stream_out_xptr); + SET_VECTOR_ELT(out, 1, size_sexp); + SET_VECTOR_ELT(out, 2, n_actual_sexp); + + UNPROTECT(5); + return out; +} diff --git a/r/tools/make-callentries.R b/r/tools/make-callentries.R index 403169c3c..f40fe6541 100644 --- a/r/tools/make-callentries.R +++ b/r/tools/make-callentries.R @@ -21,7 +21,7 @@ library(tidyverse) -src_files <- list.files("src", "\\.(c|cpp)$", full.names = TRUE) %>% +src_files <- list.files("src", "\\.(c|cc|cpp)$", full.names = TRUE) %>% setdiff("src/init.c") src_sources <- src_files %>% set_names() %>% map_chr(readr::read_file)