Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions vortex-cxx/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,9 @@ target_link_libraries(vortex
PUBLIC nanoarrow_static
PRIVATE vortex_cxx_bridge
)
target_include_directories(vortex_cxx_bridge PUBLIC
${CPP_INCLUDE_DIRS}
)

if (VORTEX_ENABLE_ASAN)
target_compile_options(vortex PRIVATE -fsanitize=leak,address,undefined -fno-omit-frame-pointer -fno-common -O1)
Expand Down
3 changes: 3 additions & 0 deletions vortex-cxx/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,9 @@ paste = { workspace = true }
take_mut = { workspace = true }
tokio = { workspace = true, features = ["rt", "rt-multi-thread", "macros"] }
vortex = { workspace = true, features = ["tokio"] }
vortex-buffer = { workspace = true }
vortex-error = { workspace = true }
vortex-io = { workspace = true }

[build-dependencies]
cxx-build = "1.0"
6 changes: 6 additions & 0 deletions vortex-cxx/cpp/include/vortex/file.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,16 @@
namespace vortex {
class ScanBuilder;

namespace io {
class VortexReadAt;
}

class VortexFile {
public:
static VortexFile Open(const std::string &path);
static VortexFile Open(const uint8_t *data, size_t length);

static VortexFile OpenSeekable(std::unique_ptr<io::VortexReadAt> reader);

VortexFile(VortexFile &&other) noexcept = default;
VortexFile &operator=(VortexFile &&other) noexcept = default;
Expand Down
37 changes: 37 additions & 0 deletions vortex-cxx/cpp/include/vortex/io.hpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
// SPDX-License-Identifier: Apache-2.0
// SPDX-FileCopyrightText: Copyright the Vortex contributors

#pragma once

#include <vector>
#include <cstdint>
#include "rust/cxx.h"

namespace vortex {

namespace io {

class VortexReadAt {
public:
virtual ~VortexReadAt() = default;

virtual std::vector<uint8_t> ReadAt(uint64_t pos, size_t len) const = 0;

virtual uint64_t GetSize() const = 0;
};

/// TODO: Is there any better way to do this?
inline rust::Vec<uint8_t> read_at(const VortexReadAt &reader, uint64_t pos, size_t len) {
auto data = reader.ReadAt(pos, len);
rust::Vec<uint8_t> result;
result.reserve(data.size());
std::copy(data.begin(), data.end(), std::back_inserter(result));
return result;
}

inline uint64_t get_size(const VortexReadAt &reader) {
return reader.GetSize();
}

} /// namespace io
} /// namespace vortex
9 changes: 9 additions & 0 deletions vortex-cxx/cpp/src/file.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
// SPDX-FileCopyrightText: Copyright the Vortex contributors

#include "vortex/file.hpp"
#include "vortex/io.hpp"
#include "vortex/scan.hpp"
#include "vortex/exception.hpp"
#include "rust/cxx.h"
Expand All @@ -25,6 +26,14 @@ VortexFile VortexFile::Open(const std::string &path) {
}
}

VortexFile VortexFile::OpenSeekable(std::unique_ptr<io::VortexReadAt> reader) {
try {
return VortexFile(ffi::open_with_read_at(std::move(reader)));
} catch (const rust::cxxbridge1::Error &e) {
throw VortexException(e.what());
}
}

uint64_t VortexFile::RowCount() const {
return impl_->row_count();
}
Expand Down
11 changes: 11 additions & 0 deletions vortex-cxx/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,6 +111,17 @@ mod ffi {
input_stream: *mut u8,
path: &str,
) -> Result<()>;

fn open_with_read_at(read: UniquePtr<VortexReadAt>) -> Result<Box<VortexFile>>;
}

#[namespace = "vortex::io"]
unsafe extern "C++" {
include!("vortex/io.hpp");

type VortexReadAt;
fn read_at(reader: &VortexReadAt, pos: u64, len: usize) -> Vec<u8>;
fn get_size(reader: &VortexReadAt) -> u64;
}

#[repr(u8)]
Expand Down
42 changes: 42 additions & 0 deletions vortex-cxx/src/read.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ use arrow_array::ffi::FFI_ArrowSchema;
use arrow_array::ffi_stream::FFI_ArrowArrayStream;
use arrow_array::{RecordBatch, RecordBatchReader};
use arrow_schema::{ArrowError, DataType, Schema, SchemaRef};
use cxx::UniquePtr;
use futures::FutureExt;
use futures::stream::TryStreamExt;
use vortex::ArrayRef;
use vortex::arrow::IntoArrowArray;
Expand All @@ -17,9 +19,14 @@ use vortex::file::VortexOpenOptions;
use vortex::io::runtime::BlockingRuntime;
use vortex::scan::ScanBuilder;
use vortex::scan::arrow::RecordBatchIteratorAdapter;
use vortex_buffer::{Alignment, ByteBuffer};
use vortex_error::VortexResult;
use vortex_io::VortexReadAt;


use crate::RUNTIME;
use crate::expr::Expr;
use crate::ffi::VortexReadAt as CxxReadAt;

pub(crate) struct VortexFile {
inner: vortex::file::VortexFile,
Expand Down Expand Up @@ -187,3 +194,38 @@ impl ThreadsafeCloneableReader {
unsafe { std::ptr::write(out_stream, stream) };
}
}

pub struct CxxVortexReadAt {
ptr: UniquePtr<CxxReadAt>,
}

/// Cxx implementation is expected to be thread-safe
unsafe impl Send for CxxVortexReadAt {}
unsafe impl Sync for CxxVortexReadAt {}

impl VortexReadAt for CxxVortexReadAt {
fn read_at(&self, pos: u64, len: usize, _: Alignment) -> futures::future::BoxFuture<'static, VortexResult<ByteBuffer>> {
let data = crate::ffi::read_at(self.ptr.as_ref().unwrap(), pos, len);
let buffer = ByteBuffer::from(data);
async move { Ok(buffer) }.boxed()
}

fn size(&self) -> futures::future::BoxFuture<'static, VortexResult<u64>> {
let size = crate::ffi::get_size(self.ptr.as_ref().unwrap());
async move { Ok(size) }.boxed()
}
}

/// Postscript is guaranteed to never exceed 'u16::MAX - 8' bytes in length
/// Possible imrpovements: make initial_read_size configurable
/// See: https://docs.vortex.dev/specs/file-format
pub(crate) fn open_with_read_at(read: UniquePtr<CxxReadAt>) -> Result<Box<VortexFile>> {
let read_at = Arc::new(CxxVortexReadAt { ptr: read });
let file = RUNTIME.block_on(|h| {
VortexOpenOptions::new()
.with_handle(h)
.with_initial_read_size((u16::MAX - 8).into())
.open_read_at(read_at)
})?;
Ok(Box::new(VortexFile { inner: file }))
}
Loading