From 6c2a52fa4f82a7bd0ccfb7575d8539bc61a67156 Mon Sep 17 00:00:00 2001 From: Dergousov Maxim Date: Wed, 15 Oct 2025 21:19:01 +0000 Subject: [PATCH] feat(cxx): add open_with_read_at bridge Signed-off-by: Dergousov Maxim --- vortex-cxx/CMakeLists.txt | 3 ++ vortex-cxx/Cargo.toml | 3 ++ vortex-cxx/cpp/include/vortex/file.hpp | 6 ++++ vortex-cxx/cpp/include/vortex/io.hpp | 37 +++++++++++++++++++++++ vortex-cxx/cpp/src/file.cpp | 9 ++++++ vortex-cxx/src/lib.rs | 11 +++++++ vortex-cxx/src/read.rs | 42 ++++++++++++++++++++++++++ 7 files changed, 111 insertions(+) create mode 100644 vortex-cxx/cpp/include/vortex/io.hpp diff --git a/vortex-cxx/CMakeLists.txt b/vortex-cxx/CMakeLists.txt index cc1405a44a0..013e26db348 100644 --- a/vortex-cxx/CMakeLists.txt +++ b/vortex-cxx/CMakeLists.txt @@ -69,6 +69,9 @@ target_link_libraries(vortex PUBLIC nanoarrow_static PRIVATE vortex_cxx_bridge ) +target_include_directories(vortex_cxx_bridge PUBLIC + ${CPP_INCLUDE_DIRS} +) if (VORTEX_ENABLE_ASAN) target_compile_options(vortex PRIVATE -fsanitize=leak,address,undefined -fno-omit-frame-pointer -fno-common -O1) diff --git a/vortex-cxx/Cargo.toml b/vortex-cxx/Cargo.toml index 8084087d09d..b9c6d4ef264 100644 --- a/vortex-cxx/Cargo.toml +++ b/vortex-cxx/Cargo.toml @@ -30,6 +30,9 @@ paste = { workspace = true } take_mut = { workspace = true } tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] } vortex = { workspace = true, features = ["tokio"] } +vortex-buffer = { workspace = true } +vortex-error = { workspace = true } +vortex-io = { workspace = true } [build-dependencies] cxx-build = "1.0" diff --git a/vortex-cxx/cpp/include/vortex/file.hpp b/vortex-cxx/cpp/include/vortex/file.hpp index c7b93e326d8..7996857ba81 100644 --- a/vortex-cxx/cpp/include/vortex/file.hpp +++ b/vortex-cxx/cpp/include/vortex/file.hpp @@ -9,10 +9,16 @@ namespace vortex { class ScanBuilder; +namespace io { +class VortexReadAt; +} + class VortexFile { public: static VortexFile Open(const std::string &path); static VortexFile Open(const uint8_t *data, size_t length); + + static VortexFile OpenSeekable(std::unique_ptr reader); VortexFile(VortexFile &&other) noexcept = default; VortexFile &operator=(VortexFile &&other) noexcept = default; diff --git a/vortex-cxx/cpp/include/vortex/io.hpp b/vortex-cxx/cpp/include/vortex/io.hpp new file mode 100644 index 00000000000..bf5417ea822 --- /dev/null +++ b/vortex-cxx/cpp/include/vortex/io.hpp @@ -0,0 +1,37 @@ +// SPDX-License-Identifier: Apache-2.0 +// SPDX-FileCopyrightText: Copyright the Vortex contributors + +#pragma once + +#include +#include +#include "rust/cxx.h" + +namespace vortex { + +namespace io { + +class VortexReadAt { +public: + virtual ~VortexReadAt() = default; + + virtual std::vector ReadAt(uint64_t pos, size_t len) const = 0; + + virtual uint64_t GetSize() const = 0; +}; + +/// TODO: Is there any better way to do this? +inline rust::Vec read_at(const VortexReadAt &reader, uint64_t pos, size_t len) { + auto data = reader.ReadAt(pos, len); + rust::Vec result; + result.reserve(data.size()); + std::copy(data.begin(), data.end(), std::back_inserter(result)); + return result; +} + +inline uint64_t get_size(const VortexReadAt &reader) { + return reader.GetSize(); +} + +} /// namespace io +} /// namespace vortex diff --git a/vortex-cxx/cpp/src/file.cpp b/vortex-cxx/cpp/src/file.cpp index 078c305639d..c70b1edf3c0 100644 --- a/vortex-cxx/cpp/src/file.cpp +++ b/vortex-cxx/cpp/src/file.cpp @@ -2,6 +2,7 @@ // SPDX-FileCopyrightText: Copyright the Vortex contributors #include "vortex/file.hpp" +#include "vortex/io.hpp" #include "vortex/scan.hpp" #include "vortex/exception.hpp" #include "rust/cxx.h" @@ -25,6 +26,14 @@ VortexFile VortexFile::Open(const std::string &path) { } } +VortexFile VortexFile::OpenSeekable(std::unique_ptr reader) { + try { + return VortexFile(ffi::open_with_read_at(std::move(reader))); + } catch (const rust::cxxbridge1::Error &e) { + throw VortexException(e.what()); + } +} + uint64_t VortexFile::RowCount() const { return impl_->row_count(); } diff --git a/vortex-cxx/src/lib.rs b/vortex-cxx/src/lib.rs index 6867fbb7b43..1e79fbb2f30 100644 --- a/vortex-cxx/src/lib.rs +++ b/vortex-cxx/src/lib.rs @@ -111,6 +111,17 @@ mod ffi { input_stream: *mut u8, path: &str, ) -> Result<()>; + + fn open_with_read_at(read: UniquePtr) -> Result>; + } + + #[namespace = "vortex::io"] + unsafe extern "C++" { + include!("vortex/io.hpp"); + + type VortexReadAt; + fn read_at(reader: &VortexReadAt, pos: u64, len: usize) -> Vec; + fn get_size(reader: &VortexReadAt) -> u64; } #[repr(u8)] diff --git a/vortex-cxx/src/read.rs b/vortex-cxx/src/read.rs index 466d1eb97d3..0a00a525a7c 100644 --- a/vortex-cxx/src/read.rs +++ b/vortex-cxx/src/read.rs @@ -9,6 +9,8 @@ use arrow_array::ffi::FFI_ArrowSchema; use arrow_array::ffi_stream::FFI_ArrowArrayStream; use arrow_array::{RecordBatch, RecordBatchReader}; use arrow_schema::{ArrowError, DataType, Schema, SchemaRef}; +use cxx::UniquePtr; +use futures::FutureExt; use futures::stream::TryStreamExt; use vortex::ArrayRef; use vortex::arrow::IntoArrowArray; @@ -17,9 +19,14 @@ use vortex::file::VortexOpenOptions; use vortex::io::runtime::BlockingRuntime; use vortex::scan::ScanBuilder; use vortex::scan::arrow::RecordBatchIteratorAdapter; +use vortex_buffer::{Alignment, ByteBuffer}; +use vortex_error::VortexResult; +use vortex_io::VortexReadAt; + use crate::RUNTIME; use crate::expr::Expr; +use crate::ffi::VortexReadAt as CxxReadAt; pub(crate) struct VortexFile { inner: vortex::file::VortexFile, @@ -187,3 +194,38 @@ impl ThreadsafeCloneableReader { unsafe { std::ptr::write(out_stream, stream) }; } } + +pub struct CxxVortexReadAt { + ptr: UniquePtr, +} + +/// Cxx implementation is expected to be thread-safe +unsafe impl Send for CxxVortexReadAt {} +unsafe impl Sync for CxxVortexReadAt {} + +impl VortexReadAt for CxxVortexReadAt { + fn read_at(&self, pos: u64, len: usize, _: Alignment) -> futures::future::BoxFuture<'static, VortexResult> { + let data = crate::ffi::read_at(self.ptr.as_ref().unwrap(), pos, len); + let buffer = ByteBuffer::from(data); + async move { Ok(buffer) }.boxed() + } + + fn size(&self) -> futures::future::BoxFuture<'static, VortexResult> { + let size = crate::ffi::get_size(self.ptr.as_ref().unwrap()); + async move { Ok(size) }.boxed() + } +} + +/// Postscript is guaranteed to never exceed 'u16::MAX - 8' bytes in length +/// Possible imrpovements: make initial_read_size configurable +/// See: https://docs.vortex.dev/specs/file-format +pub(crate) fn open_with_read_at(read: UniquePtr) -> Result> { + let read_at = Arc::new(CxxVortexReadAt { ptr: read }); + let file = RUNTIME.block_on(|h| { + VortexOpenOptions::new() + .with_handle(h) + .with_initial_read_size((u16::MAX - 8).into()) + .open_read_at(read_at) + })?; + Ok(Box::new(VortexFile { inner: file })) +}