diff --git a/build.sbt b/build.sbt index 2f521686d..f8557cbec 100644 --- a/build.sbt +++ b/build.sbt @@ -21,6 +21,7 @@ lazy val root = Project(id = "spark-cyclone-sql-plugin", base = file(".")) .configs(VectorEngine) .configs(TPC) .configs(CMake) + .enablePlugins(SbtJavaCPP4S) .settings(version := "0.9.2") lazy val tracing = project @@ -43,6 +44,22 @@ lazy val tracing = project reStart / envVars += "LOG_DIR" -> file("tracing-dir").getAbsolutePath ) +gppCompilerPath := "g++" +nativeJavaClassPath := "com.nec.arrow.TransferDefinitions" + +includePath := (baseDirectory in Compile).value / "src/main/resources/com/nec/cyclone/cpp" + +libraryName := "libTransferDefinitions" + +makeLibraryCommands := Seq( + "g++ -std=c++17 -fPIC", + "-I", includePath.value.toString, + currentLibraryMeta.value.option, + "-o", + (libraryDestinationPath.value / s"${libraryName.value}.${currentLibraryMeta.value.extension}").toString, + ((baseDirectory in Compile).value / "src/main/resources/com/nec/cyclone/cpp/cyclone/" / "transfer-definitions.hpp").toString +) + /** * Run with: * @@ -149,6 +166,8 @@ libraryDependencies ++= { } } + +Test / fork := true Test / unmanagedJars ++= sys.env .get("CUDF_PATH") .map(path => new File(path)) @@ -168,7 +187,9 @@ CMake / parallelExecution := true */ CMake / testGrouping := (CMake / definedTests).value.map { suite => import sbt.Tests._ - Group(suite.name, Seq(suite), SubProcess(ForkOptions())) + Group(suite.name, Seq(suite), SubProcess(ForkOptions(runJVMOptions = Vector("-Djava.library.path=target/libjni"), + bootJars=Vector[java.io.File](), javaHome=None, outputStrategy=None, + workingDirectory=None, connectInput=false, envVars = Map[String, String]()))) } /** Vector Engine specific configuration */ @@ -184,7 +205,9 @@ VectorEngine / run / fork := true */ VectorEngine / testGrouping := (VectorEngine / definedTests).value.map { suite => import sbt.Tests._ - Group(suite.name, Seq(suite), SubProcess(ForkOptions())) + Group(suite.name, Seq(suite), SubProcess(ForkOptions(runJVMOptions = Vector("-Djava.library.path=target/libjni"), + bootJars=Vector[java.io.File](), javaHome=None, outputStrategy=None, + workingDirectory=None, connectInput=false, envVars = Map[String, String]()))) } /** This generates a file 'java.hprof.txt' in the project root for very simple profiling. * */ @@ -194,6 +217,7 @@ VectorEngine / run / javaOptions ++= { List("-agentlib:hprof=cpu=samples") else Nil } + VectorEngine / sourceDirectory := baseDirectory.value / "src" / "test" VectorEngine / testOptions := Seq(Tests.Filter(veFilter)) diff --git a/project/plugins.sbt b/project/plugins.sbt index 8a1de3d25..696fe46ba 100644 --- a/project/plugins.sbt +++ b/project/plugins.sbt @@ -5,3 +5,4 @@ addSbtPlugin("io.spray" % "sbt-revolver" % "0.9.1") addSbtPlugin("com.github.sbt" % "sbt-native-packager" % "1.9.6") addSbtPlugin("com.timushev.sbt" % "sbt-updates" % "0.6.0") addSbtPlugin("org.bytedeco" % "sbt-javacpp" % "1.17") +addSbtPlugin("com.github.y-yu" % "sbt-javacpp4s" % "0.1.4") diff --git a/src/main/java/com/nec/arrow/ArrowTransferStructures.java b/src/main/java/com/nec/arrow/ArrowTransferStructures.java deleted file mode 100644 index eb2376b50..000000000 --- a/src/main/java/com/nec/arrow/ArrowTransferStructures.java +++ /dev/null @@ -1,283 +0,0 @@ -/* - * Copyright (c) 2021 Xpress AI. - * - * This file is part of Spark Cyclone. - * See https://github.com/XpressAI/SparkCyclone for further info. - * - * Licensed under the Apache License, Version 2.0 (the "License"); - * you may not use this file except in compliance with the License. - * You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - * - */ -package com.nec.arrow; - -import com.sun.jna.Library; -import com.sun.jna.Pointer; -import com.sun.jna.Structure; - -public interface ArrowTransferStructures extends Library { - - @Structure.FieldOrder({"data", "count"}) - class non_null_int_vector extends Structure { - public long data; - public Integer count; - - public non_null_int_vector() { - super(); - } - - public non_null_int_vector(Pointer p) { - super(p); - read(); - } - - public static class ByReference extends non_null_int_vector implements Structure.ByReference { - public ByReference() { - } - - public ByReference(Pointer p) { - super(p); - } - } - } - - @Structure.FieldOrder({"data","validityBuffer", "count"}) - class nullable_int_vector extends Structure { - public long data; - public long validityBuffer; - public Integer count; - - public int dataSize() { - return count * 4; - } - - public nullable_int_vector() { - super(); - } - - public nullable_int_vector(Pointer p) { - super(p); - read(); - } - - public static class ByReference extends nullable_int_vector implements Structure.ByReference { - public ByReference() { - } - - public ByReference(Pointer p) { - super(p); - } - } - } - - @Structure.FieldOrder({"data", "count"}) - class non_null_int2_vector extends Structure { - public long data; - public Integer count; - - public non_null_int2_vector() { - super(); - } - - public non_null_int2_vector(Pointer p) { - super(p); - read(); - } - - public static class ByReference extends non_null_int2_vector implements Structure.ByReference { - public ByReference() { - } - - public ByReference(Pointer p) { - super(p); - } - } - } - - @Structure.FieldOrder({"data", "count"}) - class non_null_double_vector extends Structure { - public long data; - public Integer count; - - public int dataSize() { - return count * 8; - } - - public non_null_double_vector() { - super(); - } - - public non_null_double_vector(int count) { - super(); - this.count = count; - } - - public non_null_double_vector(Pointer p) { - super(p); - read(); - } - - public static class ByReference extends non_null_double_vector implements Structure.ByReference { - public ByReference() { - } - - public ByReference(Pointer p) { - super(p); - } - } - } - - @Structure.FieldOrder({"data", "validityBuffer", "count", }) - class nullable_double_vector extends Structure { - public long data; - public long validityBuffer; - public Integer count; - - public int dataSize() { - return count * 8; - } - - public nullable_double_vector() { - super(); - } - - public nullable_double_vector(Pointer p) { - super(p); - read(); - } - - public static class ByReference extends nullable_double_vector implements Structure.ByReference { - public ByReference() { - } - - public ByReference(Pointer p) { - super(p); - } - } - } - - @Structure.FieldOrder({"data", "validityBuffer", "count", }) - class nullable_bigint_vector extends Structure { - public long data; - public long validityBuffer; - public Integer count; - - public int dataSize() { - return count * 8; - } - - public nullable_bigint_vector() { - super(); - } - - public nullable_bigint_vector(Pointer p) { - super(p); - read(); - } - - public static class ByReference extends nullable_bigint_vector implements Structure.ByReference { - public ByReference() { - } - - public ByReference(Pointer p) { - super(p); - } - } - } - - @Structure.FieldOrder({"data", "offsets", "lengths", "validityBuffer", "dataSize", "count"}) - class nullable_varchar_vector extends Structure { - // The row count - public long data; - public long offsets; - public long lengths; - public long validityBuffer; - public Integer dataSize; - public Integer count; - /* 24 + 8 = 32 bytes in size */ - /* (sizeof(long) * 4) + (sizeof(int) * 2) = 40 bytes */ - - public nullable_varchar_vector() { - super(); - } - - public nullable_varchar_vector(Pointer p) { - super(p); - read(); - } - - public static class ByReference extends nullable_varchar_vector implements Structure.ByReference { - public ByReference() { - } - - public ByReference(Pointer p) { - super(p); - } - } - } - - @Structure.FieldOrder({"data", "count"}) - class non_null_bigint_vector extends Structure { - public long data; - public Integer count; - - public int dataSize() { - return count * 8; - } - - public non_null_bigint_vector() { - super(); - } - - public non_null_bigint_vector(int count) { - super(); - this.count = count; - } - - public non_null_bigint_vector(Pointer p) { - super(p); - read(); - } - - public static class ByReference extends non_null_bigint_vector implements Structure.ByReference { - public ByReference() { - } - - public ByReference(Pointer p) { - super(p); - } - } - } - - @Structure.FieldOrder({"data", "length"}) - class non_null_c_bounded_string extends Structure { - public long data; - public Integer length; - - public non_null_c_bounded_string() { - super(); - } - - public non_null_c_bounded_string(Pointer p) { - super(p); - read(); - } - - public static class ByReference extends non_null_c_bounded_string implements Structure.ByReference { - public ByReference() { - } - - public ByReference(Pointer p) { - super(p); - } - } - } -} diff --git a/src/main/java/com/nec/arrow/TransferDefinitions.java b/src/main/java/com/nec/arrow/TransferDefinitions.java new file mode 100644 index 000000000..261e383dc --- /dev/null +++ b/src/main/java/com/nec/arrow/TransferDefinitions.java @@ -0,0 +1,489 @@ +// Targeted by JavaCPP version 1.5.6: DO NOT EDIT THIS FILE + +package com.nec.arrow; + +import java.nio.*; +import org.bytedeco.javacpp.*; +import org.bytedeco.javacpp.annotation.*; + +public class TransferDefinitions extends com.nec.arrow.TransferDefinitionsConfig { + static { Loader.load(); } + +@Name("std::vector") public static class StringVector extends Pointer { + static { Loader.load(); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public StringVector(Pointer p) { super(p); } + public StringVector(BytePointer value) { this(1); put(0, value); } + public StringVector(BytePointer ... array) { this(array.length); put(array); } + public StringVector(String value) { this(1); put(0, value); } + public StringVector(String ... array) { this(array.length); put(array); } + public StringVector() { allocate(); } + public StringVector(long n) { allocate(n); } + private native void allocate(); + private native void allocate(@Cast("size_t") long n); + public native @Name("operator =") @ByRef StringVector put(@ByRef StringVector x); + + public boolean empty() { return size() == 0; } + public native long size(); + public void clear() { resize(0); } + public native void resize(@Cast("size_t") long n); + + @Index(function = "at") public native @StdString BytePointer get(@Cast("size_t") long i); + public native StringVector put(@Cast("size_t") long i, BytePointer value); + @ValueSetter @Index(function = "at") public native StringVector put(@Cast("size_t") long i, @StdString String value); + + public native @ByVal Iterator insert(@ByVal Iterator pos, @StdString BytePointer value); + public native @ByVal Iterator erase(@ByVal Iterator pos); + public native @ByVal Iterator begin(); + public native @ByVal Iterator end(); + @NoOffset @Name("iterator") public static class Iterator extends Pointer { + public Iterator(Pointer p) { super(p); } + public Iterator() { } + + public native @Name("operator ++") @ByRef Iterator increment(); + public native @Name("operator ==") boolean equals(@ByRef Iterator it); + public native @Name("operator *") @StdString BytePointer get(); + } + + public BytePointer[] get() { + BytePointer[] array = new BytePointer[size() < Integer.MAX_VALUE ? (int)size() : Integer.MAX_VALUE]; + for (int i = 0; i < array.length; i++) { + array[i] = get(i); + } + return array; + } + @Override public String toString() { + return java.util.Arrays.toString(get()); + } + + public BytePointer pop_back() { + long size = size(); + BytePointer value = get(size - 1); + resize(size - 1); + return value; + } + public StringVector push_back(BytePointer value) { + long size = size(); + resize(size + 1); + return put(size, value); + } + public StringVector put(BytePointer value) { + if (size() != 1) { resize(1); } + return put(0, value); + } + public StringVector put(BytePointer ... array) { + if (size() != array.length) { resize(array.length); } + for (int i = 0; i < array.length; i++) { + put(i, array[i]); + } + return this; + } + + public StringVector push_back(String value) { + long size = size(); + resize(size + 1); + return put(size, value); + } + public StringVector put(String value) { + if (size() != 1) { resize(1); } + return put(0, value); + } + public StringVector put(String ... array) { + if (size() != array.length) { resize(array.length); } + for (int i = 0; i < array.length; i++) { + put(i, array[i]); + } + return this; + } +} + +@Name("std::vector") public static class SizeTVector extends Pointer { + static { Loader.load(); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public SizeTVector(Pointer p) { super(p); } + public SizeTVector(long ... array) { this(array.length); put(array); } + public SizeTVector() { allocate(); } + public SizeTVector(long n) { allocate(n); } + private native void allocate(); + private native void allocate(@Cast("size_t") long n); + public native @Name("operator =") @ByRef SizeTVector put(@ByRef SizeTVector x); + + public boolean empty() { return size() == 0; } + public native long size(); + public void clear() { resize(0); } + public native void resize(@Cast("size_t") long n); + + @Index(function = "at") public native @Cast("size_t") long get(@Cast("size_t") long i); + public native SizeTVector put(@Cast("size_t") long i, long value); + + public native @ByVal Iterator insert(@ByVal Iterator pos, @Cast("size_t") long value); + public native @ByVal Iterator erase(@ByVal Iterator pos); + public native @ByVal Iterator begin(); + public native @ByVal Iterator end(); + @NoOffset @Name("iterator") public static class Iterator extends Pointer { + public Iterator(Pointer p) { super(p); } + public Iterator() { } + + public native @Name("operator ++") @ByRef Iterator increment(); + public native @Name("operator ==") boolean equals(@ByRef Iterator it); + public native @Name("operator *") @Cast("size_t") long get(); + } + + public long[] get() { + long[] array = new long[size() < Integer.MAX_VALUE ? (int)size() : Integer.MAX_VALUE]; + for (int i = 0; i < array.length; i++) { + array[i] = get(i); + } + return array; + } + @Override public String toString() { + return java.util.Arrays.toString(get()); + } + + public long pop_back() { + long size = size(); + long value = get(size - 1); + resize(size - 1); + return value; + } + public SizeTVector push_back(long value) { + long size = size(); + resize(size + 1); + return put(size, value); + } + public SizeTVector put(long value) { + if (size() != 1) { resize(1); } + return put(0, value); + } + public SizeTVector put(long ... array) { + if (size() != array.length) { resize(array.length); } + for (int i = 0; i < array.length; i++) { + put(i, array[i]); + } + return this; + } +} + +// Parsed from transfer-definitions.hpp + +/* + * Copyright (c) 2021 Xpress AI. + * + * This file is part of Spark Cyclone. + * See https://github.com/XpressAI/SparkCyclone for further info. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ +// #pragma once + +// #ifndef VE_TD_DEFS + +// #include +// #include + +// Explicitly instantiate struct template for int32_t +//typedef NullableScalarVec nullable_int_vector; + +public static class nullable_int_vector extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public nullable_int_vector() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public nullable_int_vector(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public nullable_int_vector(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public nullable_int_vector position(long position) { + return (nullable_int_vector)super.position(position); + } + @Override public nullable_int_vector getPointer(long i) { + return new nullable_int_vector((Pointer)this).offsetAddress(i); + } + + // NOTE: Field declaration order must be maintained to match existing JNA bindings + + public native IntPointer data(); public native nullable_int_vector data(IntPointer setter); // The raw data + public native @Cast("uint64_t*") LongPointer validityBuffer(); public native nullable_int_vector validityBuffer(LongPointer setter); // Bit vector to denote null values + public native int count(); public native nullable_int_vector count(int setter); // Row count (synonymous with size of data array) + public int dataSize() { + return count() * 4; + } + // Returns a deep copy of this NullableScalarVec + public native nullable_int_vector clone(); + + // Value equality check against another NullableScalarVec + public native @Cast("bool") boolean equals(@Const nullable_int_vector other); +} + +// Explicitly instantiate struct template for int64_t +//typedef NullableScalarVec nullable_bigint_vector; + +public static class nullable_bigint_vector extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public nullable_bigint_vector() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public nullable_bigint_vector(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public nullable_bigint_vector(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public nullable_bigint_vector position(long position) { + return (nullable_bigint_vector)super.position(position); + } + @Override public nullable_bigint_vector getPointer(long i) { + return new nullable_bigint_vector((Pointer)this).offsetAddress(i); + } + + // NOTE: Field declaration order must be maintained to match existing JNA bindings + + public native @Cast("int64_t*") LongPointer data(); public native nullable_bigint_vector data(LongPointer setter); // The raw data + public native @Cast("uint64_t*") LongPointer validityBuffer(); public native nullable_bigint_vector validityBuffer(LongPointer setter); // Bit vector to denote null values + public native int count(); public native nullable_bigint_vector count(int setter); // Row count (synonymous with size of data array) + public int dataSize() { + return count() * 8; + } + // Returns a deep copy of this NullableScalarVec + public native nullable_bigint_vector clone(); + + // Value equality check against another NullableScalarVec + public native @Cast("bool") boolean equals(@Const nullable_bigint_vector other); +} + +// Explicitly instantiate struct template for float +//typedef NullableScalarVec nullable_float_vector; + +public static class nullable_float_vector extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public nullable_float_vector() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public nullable_float_vector(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public nullable_float_vector(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public nullable_float_vector position(long position) { + return (nullable_float_vector)super.position(position); + } + @Override public nullable_float_vector getPointer(long i) { + return new nullable_float_vector((Pointer)this).offsetAddress(i); + } + + // NOTE: Field declaration order must be maintained to match existing JNA bindings + + public native FloatPointer data(); public native nullable_float_vector data(FloatPointer setter); // The raw data + public native @Cast("uint64_t*") LongPointer validityBuffer(); public native nullable_float_vector validityBuffer(LongPointer setter); // Bit vector to denote null values + public native int count(); public native nullable_float_vector count(int setter); // Row count (synonymous with size of data array) + + public int dataSize() { + return count() * 4; + } + + // Returns a deep copy of this NullableScalarVec + public native nullable_float_vector clone(); + + // Value equality check against another NullableScalarVec + public native @Cast("bool") boolean equals(@Const nullable_float_vector other); +} + +// Explicitly instantiate struct template for double +//typedef NullableScalarVec nullable_double_vector; + +public static class nullable_double_vector extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public nullable_double_vector() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public nullable_double_vector(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public nullable_double_vector(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public nullable_double_vector position(long position) { + return (nullable_double_vector)super.position(position); + } + @Override public nullable_double_vector getPointer(long i) { + return new nullable_double_vector((Pointer)this).offsetAddress(i); + } + + // NOTE: Field declaration order must be maintained to match existing JNA bindings + + public native DoublePointer data(); public native nullable_double_vector data(DoublePointer setter); // The raw data + public native @Cast("uint64_t*") LongPointer validityBuffer(); public native nullable_double_vector validityBuffer(LongPointer setter); // Bit vector to denote null values + public native int count(); public native nullable_double_vector count(int setter); // Row count (synonymous with size of data array) + public int dataSize() { + return count() * 8; + } + // Returns a deep copy of this NullableScalarVec + public native nullable_double_vector clone(); + + // Value equality check against another NullableScalarVec + public native @Cast("bool") boolean equals(@Const nullable_double_vector other); +} + +public static class nullable_varchar_vector extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public nullable_varchar_vector() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public nullable_varchar_vector(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public nullable_varchar_vector(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public nullable_varchar_vector position(long position) { + return (nullable_varchar_vector)super.position(position); + } + @Override public nullable_varchar_vector getPointer(long i) { + return new nullable_varchar_vector((Pointer)this).offsetAddress(i); + } + + // NOTE: Field declaration order must be maintained to match existing JNA bindings + + public native IntPointer data(); public native nullable_varchar_vector data(IntPointer setter); // The raw data containing all the varchars concatenated together + public native IntPointer offsets(); public native nullable_varchar_vector offsets(IntPointer setter); // Offsets to denote varchar start and end positions + public native IntPointer lengths(); public native nullable_varchar_vector lengths(IntPointer setter); + + public native @Cast("uint64_t*") LongPointer validityBuffer(); public native nullable_varchar_vector validityBuffer(LongPointer setter); // Bit vector to denote null values + public native int dataSize(); public native nullable_varchar_vector dataSize(int setter); // Size of data array + public native int count(); public native nullable_varchar_vector count(int setter); // The row count + + // Returns a deep copy of this nullable_varchar_vector + public native nullable_varchar_vector clone(); + + // Value equality check against another nullable_varchar_vector + public native @Cast("bool") boolean equals(@Const nullable_varchar_vector other); +} + +public static class non_null_double_vector extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public non_null_double_vector() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public non_null_double_vector(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public non_null_double_vector(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public non_null_double_vector position(long position) { + return (non_null_double_vector)super.position(position); + } + @Override public non_null_double_vector getPointer(long i) { + return new non_null_double_vector((Pointer)this).offsetAddress(i); + } + + public native DoublePointer data(); public native non_null_double_vector data(DoublePointer setter); + public native int count(); public native non_null_double_vector count(int setter); + public int dataSize() { + return count() * 8; + } +} + +public static class non_null_bigint_vector extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public non_null_bigint_vector() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public non_null_bigint_vector(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public non_null_bigint_vector(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public non_null_bigint_vector position(long position) { + return (non_null_bigint_vector)super.position(position); + } + @Override public non_null_bigint_vector getPointer(long i) { + return new non_null_bigint_vector((Pointer)this).offsetAddress(i); + } + + public native @Cast("int64_t*") LongPointer data(); public native non_null_bigint_vector data(LongPointer setter); + public native int count(); public native non_null_bigint_vector count(int setter); + public int dataSize() { + return count() * 8; + } +} + +public static class non_null_int2_vector extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public non_null_int2_vector() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public non_null_int2_vector(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public non_null_int2_vector(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public non_null_int2_vector position(long position) { + return (non_null_int2_vector)super.position(position); + } + @Override public non_null_int2_vector getPointer(long i) { + return new non_null_int2_vector((Pointer)this).offsetAddress(i); + } + + public native ShortPointer data(); public native non_null_int2_vector data(ShortPointer setter); + public native int count(); public native non_null_int2_vector count(int setter); + public int size() { + return count() * 2; + } +} + +public static class non_null_int_vector extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public non_null_int_vector() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public non_null_int_vector(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public non_null_int_vector(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public non_null_int_vector position(long position) { + return (non_null_int_vector)super.position(position); + } + @Override public non_null_int_vector getPointer(long i) { + return new non_null_int_vector((Pointer)this).offsetAddress(i); + } + + public native IntPointer data(); public native non_null_int_vector data(IntPointer setter); + public native int count(); public native non_null_int_vector count(int setter); +} + +public static class non_null_c_bounded_string extends Pointer { + static { Loader.load(); } + /** Default native constructor. */ + public non_null_c_bounded_string() { super((Pointer)null); allocate(); } + /** Native array allocator. Access with {@link Pointer#position(long)}. */ + public non_null_c_bounded_string(long size) { super((Pointer)null); allocateArray(size); } + /** Pointer cast constructor. Invokes {@link Pointer#Pointer(Pointer)}. */ + public non_null_c_bounded_string(Pointer p) { super(p); } + private native void allocate(); + private native void allocateArray(long size); + @Override public non_null_c_bounded_string position(long position) { + return (non_null_c_bounded_string)super.position(position); + } + @Override public non_null_c_bounded_string getPointer(long i) { + return new non_null_c_bounded_string((Pointer)this).offsetAddress(i); + } + + public native @Cast("char*") BytePointer data(); public native non_null_c_bounded_string data(BytePointer setter); + public native int length(); public native non_null_c_bounded_string length(int setter); +} + +public static final int VE_TD_DEFS = 1; +// #endif + + +} diff --git a/src/main/java/com/nec/arrow/TransferDefinitionsConfig.java b/src/main/java/com/nec/arrow/TransferDefinitionsConfig.java new file mode 100644 index 000000000..1cb7dc056 --- /dev/null +++ b/src/main/java/com/nec/arrow/TransferDefinitionsConfig.java @@ -0,0 +1,29 @@ +package com.nec.arrow; + +import org.bytedeco.javacpp.*; +import org.bytedeco.javacpp.annotation.*; +import org.bytedeco.javacpp.tools.*; + +@Properties( + value = @Platform( + compiler = "cpp17", + include = { +// "words.hpp", +// "char_int_conv.hpp", +// "parsefloat.hpp", + "cyclone/transfer-definitions.hpp" + } +// link = "TransferDefinitions" + ), + target = "com.nec.arrow.TransferDefinitions" +) +public class TransferDefinitionsConfig implements InfoMapper { + public void map(InfoMap infoMap) { + infoMap.put(new Info("std::vector").pointerTypes("StringVector").define()) + .put(new Info("std::vector").pointerTypes("SizeTVector").define()) + .put(new Info("NullableScalarVec").pointerTypes("NullableScalarVecInt32t")) + .put(new Info("NullableScalarVec").pointerTypes("NullableScalarVecInt64t")) + .put(new Info("NullableScalarVec").pointerTypes("NullableScalarVecFloat")) + .put(new Info("NullableScalarVec").pointerTypes("NullableScalarVecDouble")); + } +} diff --git a/src/main/resources/com/nec/cyclone/cpp/cyclone/transfer-definitions.hpp b/src/main/resources/com/nec/cyclone/cpp/cyclone/transfer-definitions.hpp index 289c45452..b7e805867 100644 --- a/src/main/resources/com/nec/cyclone/cpp/cyclone/transfer-definitions.hpp +++ b/src/main/resources/com/nec/cyclone/cpp/cyclone/transfer-definitions.hpp @@ -214,6 +214,30 @@ struct nullable_varchar_vector { const std::vector &bucket_assignments) const; }; +struct non_null_double_vector +{ + double *data; + int32_t count; +}; + +struct non_null_bigint_vector +{ + int64_t *data; + int32_t count; +}; + +struct non_null_int2_vector +{ + int16_t *data; + int32_t count; +}; + +struct non_null_int_vector +{ + int32_t *data; + int32_t count; +}; + struct non_null_c_bounded_string { char *data = nullptr; int32_t length = 0; diff --git a/src/main/scala/com/nec/arrow/ArrowInterfaces.scala b/src/main/scala/com/nec/arrow/ArrowInterfaces.scala index f856dfe97..cef68b631 100644 --- a/src/main/scala/com/nec/arrow/ArrowInterfaces.scala +++ b/src/main/scala/com/nec/arrow/ArrowInterfaces.scala @@ -19,10 +19,14 @@ */ package com.nec.arrow -import com.nec.arrow.ArrowTransferStructures._ +import com.nec.arrow.TransferDefinitions._ import org.apache.arrow.vector._ import org.apache.spark.sql.util.ArrowUtilsExposed import org.bytedeco.javacpp.BytePointer +import org.bytedeco.javacpp.DoublePointer +import org.bytedeco.javacpp.LongPointer +import org.bytedeco.javacpp.IntPointer +import org.bytedeco.javacpp.ShortPointer import sun.misc.Unsafe import sun.nio.ch.DirectBuffer @@ -43,17 +47,16 @@ object ArrowInterfaces { def c_double_vector(float8Vector: Float8Vector): non_null_double_vector = { val vc = new non_null_double_vector() - vc.data = float8Vector.getDataBufferAddress() - - vc.count = float8Vector.getValueCount + .data(new DoublePointer(float8Vector.getDataBuffer().nioBuffer.asDoubleBuffer)) + .count(float8Vector.getValueCount) vc } def c_nullable_double_vector(float8Vector: Float8Vector): nullable_double_vector = { val vc = new nullable_double_vector() - vc.data = float8Vector.getDataBufferAddress() - vc.validityBuffer = float8Vector.getValidityBufferAddress - vc.count = float8Vector.getValueCount + .data(new DoublePointer(float8Vector.getDataBuffer().nioBuffer.asDoubleBuffer)) + .validityBuffer(new LongPointer(float8Vector.getValidityBuffer().nioBuffer.asLongBuffer)) + .count(float8Vector.getValueCount) vc } @@ -94,45 +97,42 @@ object ArrowInterfaces { def c_nullable_varchar_vector(varCharVector: VarCharVector): nullable_varchar_vector = { val vc = new nullable_varchar_vector() - val dataBuffer = intCharsFromVarcharVector(varCharVector) - vc.data = dataBuffer.asInstanceOf[DirectBuffer].address() - vc.offsets = startsFromVarcharVector(varCharVector).asInstanceOf[DirectBuffer].address() - vc.lengths = lengthsFromVarcharVector(varCharVector).asInstanceOf[DirectBuffer].address() - vc.validityBuffer = - varCharVector.getValidityBuffer.nioBuffer().asInstanceOf[DirectBuffer].address() - vc.count = varCharVector.getValueCount - vc.dataSize = dataBuffer.capacity() / 4 + .data(new IntPointer(intCharsFromVarcharVector(varCharVector).asIntBuffer)) + .offsets(new IntPointer(startsFromVarcharVector(varCharVector).asIntBuffer)) + .lengths(new IntPointer(lengthsFromVarcharVector(varCharVector).asIntBuffer)) + .validityBuffer(new LongPointer(varCharVector.getValidityBuffer().nioBuffer.asLongBuffer)) + .count(varCharVector.getValueCount) + .dataSize(varCharVector.sizeOfValueBuffer()) vc } def c_bounded_string(string: String): non_null_c_bounded_string = { val vc = new non_null_c_bounded_string() - vc.data = (new BytePointer(string.length)) - .put(string.getBytes("UTF-32LE"): _*) - .address() - vc.length = string.length + .data((new BytePointer(string.length)) + .put(string.getBytes("UTF-32LE"): _*)) + .length(string.length) vc } def c_bounded_data(bytePointer: BytePointer, bufSize: Int): non_null_c_bounded_string = { val vc = new non_null_c_bounded_string() - vc.data = bytePointer.address() - vc.length = bufSize + .data(bytePointer) + .length(bufSize) vc } def c_int2_vector(intVector: IntVector): non_null_int2_vector = { val vc = new non_null_int2_vector() - vc.data = intVector.getDataBufferAddress() - vc.count = intVector.getValueCount + .data(new ShortPointer(intVector.getDataBuffer().nioBuffer.asShortBuffer)) + .count(intVector.getValueCount) vc } def c_nullable_int_vector(intVector: IntVector): nullable_int_vector = { val vc = new nullable_int_vector() - vc.data = intVector.getDataBufferAddress() - vc.validityBuffer = intVector.getValidityBufferAddress() - vc.count = intVector.getValueCount + .data(new IntPointer(intVector.getDataBuffer().nioBuffer.asIntBuffer)) + .validityBuffer(new LongPointer(intVector.getValidityBuffer().nioBuffer.asLongBuffer)) + .count(intVector.getValueCount) vc } @@ -145,10 +145,10 @@ object ArrowInterfaces { case idx if (!bitVector.isNull(idx)) => intVector.set(idx, bitVector.get(idx)) case idx => intVector.setNull(idx) } - vc.data = intVector.getDataBufferAddress() - vc.validityBuffer = bitVector.getValidityBufferAddress() - vc.count = bitVector.getValueCount - vc + val newVc = vc.data(new IntPointer(intVector.getDataBuffer().nioBuffer.asIntBuffer)) + .validityBuffer(new LongPointer(bitVector.getValidityBuffer().nioBuffer.asLongBuffer)) + .count(bitVector.getValueCount) + newVc } def c_nullable_int_vector(smallIntVector: SmallIntVector): nullable_int_vector = { @@ -162,33 +162,34 @@ object ArrowInterfaces { intVector.set(idx, smallIntVector.get(idx).toInt) case idx => intVector.setNull(idx) } - vc.data = intVector.getDataBufferAddress() - vc.validityBuffer = smallIntVector.getValidityBufferAddress() - vc.count = smallIntVector.getValueCount - vc + val newVc = vc.data(new IntPointer(intVector.getDataBuffer().nioBuffer.asIntBuffer)) + .validityBuffer( + new LongPointer(smallIntVector.getValidityBuffer().nioBuffer.asLongBuffer)) + .count(smallIntVector.getValueCount) + newVc } def c_nullable_bigint_vector(tzVector: TimeStampMicroTZVector): nullable_bigint_vector = { val vc = new nullable_bigint_vector() - vc.data = tzVector.getDataBufferAddress() - vc.validityBuffer = tzVector.getValidityBufferAddress() - vc.count = tzVector.getValueCount + .data(new LongPointer(tzVector.getDataBuffer().nioBuffer.asLongBuffer)) + .validityBuffer(new LongPointer(tzVector.getValidityBuffer().nioBuffer.asLongBuffer)) + .count(tzVector.getValueCount) vc } def c_nullable_bigint_vector(bigIntVector: BigIntVector): nullable_bigint_vector = { val vc = new nullable_bigint_vector() - vc.data = bigIntVector.getDataBufferAddress() - vc.validityBuffer = bigIntVector.getValidityBufferAddress() - vc.count = bigIntVector.getValueCount + .data(new LongPointer(bigIntVector.getDataBuffer().nioBuffer.asLongBuffer)) + .validityBuffer(new LongPointer(bigIntVector.getValidityBuffer().nioBuffer.asLongBuffer)) + .count(bigIntVector.getValueCount) vc } def c_nullable_date_vector(dateDayVector: DateDayVector): nullable_int_vector = { val vc = new nullable_int_vector() - vc.data = dateDayVector.getDataBufferAddress() - vc.validityBuffer = dateDayVector.getValidityBufferAddress() - vc.count = dateDayVector.getValueCount + .data(new IntPointer(dateDayVector.getDataBuffer().nioBuffer.asIntBuffer)) + .validityBuffer(new LongPointer(dateDayVector.getValidityBuffer().nioBuffer.asLongBuffer)) + .count(dateDayVector.getValueCount) vc } @@ -204,7 +205,7 @@ object ArrowInterfaces { } intVector.setValueCount(input.count) (0 until input.count).foreach(i => BitVectorHelper.setBit(intVector.getValidityBuffer, i)) - getUnsafe.copyMemory(input.data, intVector.getDataBufferAddress, input.count * 4) + getUnsafe.copyMemory(input.data.address, intVector.getDataBufferAddress, input.count * 4) } def non_null_bigint_vector_to_bigintVector( @@ -216,7 +217,7 @@ object ArrowInterfaces { } bigintVector.setValueCount(input.count) (0 until input.count).foreach(i => BitVectorHelper.setBit(bigintVector.getValidityBuffer, i)) - getUnsafe.copyMemory(input.data, bigintVector.getDataBufferAddress, input.dataSize()) + getUnsafe.copyMemory(input.data.address, bigintVector.getDataBufferAddress, input.dataSize()) } def nullable_bigint_vector_to_BigIntVector( @@ -228,11 +229,11 @@ object ArrowInterfaces { } bigintVector.setValueCount(input.count) getUnsafe.copyMemory( - input.validityBuffer, + input.validityBuffer.address, bigintVector.getValidityBufferAddress, Math.ceil(input.count / 64.0).toInt * 8 ) - getUnsafe.copyMemory(input.data, bigintVector.getDataBufferAddress, input.dataSize()) + getUnsafe.copyMemory(input.data.address, bigintVector.getDataBufferAddress, input.dataSize()) } def non_null_double_vector_to_float8Vector( @@ -247,7 +248,7 @@ object ArrowInterfaces { } float8Vector.setValueCount(input.count) (0 until input.count).foreach(i => BitVectorHelper.setBit(float8Vector.getValidityBuffer, i)) - getUnsafe.copyMemory(input.data, float8Vector.getDataBufferAddress, input.dataSize()) + getUnsafe.copyMemory(input.data.address, float8Vector.getDataBufferAddress, input.dataSize()) } def nullable_double_vector_to_float8Vector( @@ -262,11 +263,11 @@ object ArrowInterfaces { } float8Vector.setValueCount(input.count) getUnsafe.copyMemory( - input.validityBuffer, + input.validityBuffer.address, float8Vector.getValidityBufferAddress, Math.ceil(input.count / 64.0).toInt * 8 ) - getUnsafe.copyMemory(input.data, float8Vector.getDataBufferAddress, input.dataSize()) + getUnsafe.copyMemory(input.data.address, float8Vector.getDataBufferAddress, input.dataSize()) } def non_null_int2_vector_to_IntVector(input: non_null_int2_vector, intVector: IntVector): Unit = { @@ -275,7 +276,7 @@ object ArrowInterfaces { } intVector.setValueCount(input.count) (0 until input.count).foreach(i => BitVectorHelper.setBit(intVector.getValidityBuffer, i)) - getUnsafe.copyMemory(input.data, intVector.getDataBufferAddress, input.size()) + getUnsafe.copyMemory(input.data.address, intVector.getDataBufferAddress, input.size()) } def nullable_int_vector_to_IntVector(input: nullable_int_vector, intVector: IntVector): Unit = { @@ -287,11 +288,11 @@ object ArrowInterfaces { } intVector.setValueCount(input.count) getUnsafe.copyMemory( - input.validityBuffer, + input.validityBuffer.address, intVector.getValidityBufferAddress, Math.ceil(input.count / 64.0).toInt * 8 ) - getUnsafe.copyMemory(input.data, intVector.getDataBufferAddress, input.dataSize()) + getUnsafe.copyMemory(input.data.address, intVector.getDataBufferAddress, input.dataSize()) } def nullable_int_vector_to_SmallIntVector( @@ -320,7 +321,7 @@ object ArrowInterfaces { return } val buf = ByteBuffer.allocateDirect(input.count * 4).order(ByteOrder.LITTLE_ENDIAN) - getUnsafe.copyMemory(input.lengths, buf.asInstanceOf[DirectBuffer].address(), input.count * 4) + getUnsafe.copyMemory(input.lengths.address, buf.asInstanceOf[DirectBuffer].address(), input.count * 4) val lengths = buf.asIntBuffer() var sum = 0; for (i <- 0 until input.count) { @@ -331,13 +332,13 @@ object ArrowInterfaces { varCharVector.allocateNew(sum, input.count) varCharVector.setValueCount(input.count) getUnsafe.copyMemory( - input.validityBuffer, + input.validityBuffer.address, varCharVector.getValidityBufferAddress, Math.ceil(input.count / 64.0).toInt * 8 ) val dataBuf = ByteBuffer.allocateDirect(input.dataSize * 4).order(ByteOrder.LITTLE_ENDIAN) getUnsafe.copyMemory( - input.data, + input.data.address, dataBuf.asInstanceOf[DirectBuffer].address(), input.dataSize * 4 ) @@ -347,8 +348,8 @@ object ArrowInterfaces { dataBufArray.put(dataBuf) for (i <- 0 until input.count) { - val start = getUnsafe.getInt(input.offsets + (i * 4)) * 4 - val length = getUnsafe.getInt(input.lengths + (i * 4)) * 4 + val start = getUnsafe.getInt(input.offsets.get + (i * 4)) * 4 + val length = getUnsafe.getInt(input.lengths.get + (i * 4)) * 4 val str = new String(dataBufArray.array(), start, length, "UTF-32LE") val utf8bytes = str.getBytes @@ -382,10 +383,10 @@ object ArrowInterfaces { } timeStampVector.setValueCount(input.count) getUnsafe.copyMemory( - input.validityBuffer, + input.validityBuffer.address, timeStampVector.getValidityBufferAddress, Math.ceil(input.count / 64.0).toInt * 8 ) - getUnsafe.copyMemory(input.data, timeStampVector.getDataBufferAddress, input.dataSize()) + getUnsafe.copyMemory(input.data.address, timeStampVector.getDataBufferAddress, input.dataSize()) } } diff --git a/src/main/scala/com/nec/arrow/CArrowNativeInterface.scala b/src/main/scala/com/nec/arrow/CArrowNativeInterface.scala index 50da3436d..4336d3308 100644 --- a/src/main/scala/com/nec/arrow/CArrowNativeInterface.scala +++ b/src/main/scala/com/nec/arrow/CArrowNativeInterface.scala @@ -20,7 +20,7 @@ package com.nec.arrow import com.nec.arrow.ArrowInterfaces.c_bounded_data -import com.nec.arrow.ArrowTransferStructures._ +import com.nec.arrow.TransferDefinitions._ import com.nec.arrow.ArrowInterfaces._ import com.nec.arrow.ArrowNativeInterface.NativeArgument.VectorInputNativeArgument.InputVectorWrapper.{ BigIntVectorInputWrapper, diff --git a/src/main/scala/com/nec/arrow/VeArrowTransfers.scala b/src/main/scala/com/nec/arrow/VeArrowTransfers.scala index a49d7cf13..b56cc704c 100644 --- a/src/main/scala/com/nec/arrow/VeArrowTransfers.scala +++ b/src/main/scala/com/nec/arrow/VeArrowTransfers.scala @@ -45,7 +45,7 @@ import com.nec.arrow.ArrowNativeInterface.NativeArgument.VectorOutputNativeArgum TimeStampVectorOutputWrapper, VarCharVectorOutputWrapper } -import com.nec.arrow.ArrowTransferStructures._ +import com.nec.arrow.TransferDefinitions._ import com.nec.arrow.VeArrowNativeInterface.{copyPointerToVe, requireOk, requirePositive, Cleanup} import com.typesafe.scalalogging.LazyLogging import org.apache.arrow.vector._ @@ -53,6 +53,10 @@ import org.apache.arrow.vector._ import scala.collection.mutable import org.apache.spark.sql.util.ArrowUtilsExposed import org.bytedeco.javacpp.BytePointer +import org.bytedeco.javacpp.DoublePointer +import org.bytedeco.javacpp.LongPointer +import org.bytedeco.javacpp.IntPointer +import org.bytedeco.javacpp.ShortPointer import org.bytedeco.veoffload.global.veo import org.bytedeco.veoffload.veo_args import org.bytedeco.veoffload.veo_proc_handle @@ -251,11 +255,9 @@ object VeArrowTransfers extends LazyLogging { val keyName = "double_" + float8Vector.getName + "_" + float8Vector.getDataBuffer.capacity() logger.debug(s"Copying Buffer to VE for $keyName") val vcvr = new nullable_double_vector() - vcvr.count = float8Vector.getValueCount - vcvr.data = - copyPointerToVe(proc, new BytePointer(float8Vector.getDataBuffer.nioBuffer()))(cleanup) - vcvr.validityBuffer = - copyPointerToVe(proc, new BytePointer(float8Vector.getValidityBuffer.nioBuffer()))(cleanup) + .count(float8Vector.getValueCount) + .data(new DoublePointer(copyPointerToVe(proc, new BytePointer(float8Vector.getDataBuffer.nioBuffer()))(cleanup))) + .validityBuffer(new LongPointer(copyPointerToVe(proc, new BytePointer(float8Vector.getValidityBuffer.nioBuffer()))(cleanup))) vcvr } @@ -264,11 +266,12 @@ object VeArrowTransfers extends LazyLogging { cleanup: Cleanup ): non_null_c_bounded_string = { val vc = new non_null_c_bounded_string() - val thePtr = new BytePointer(string.getBytes("UTF-32LE"): _*) - thePtr.position(0) - vc.length = string.length - vc.data = copyPointerToVe(proc, thePtr) + + val thePtr = new BytePointer(string.getBytes("UTF-32LE"):_*) + vc + .length(string.length) + .data(new BytePointer(copyPointerToVe(proc, thePtr))) } private def make_veo_string_of_bytePointer( @@ -276,10 +279,10 @@ object VeArrowTransfers extends LazyLogging { bytePointer: BytePointer, size: Int )(implicit cleanup: Cleanup): non_null_c_bounded_string = { - val vc = new non_null_c_bounded_string() - vc.length = size bytePointer.position(0) - vc.data = copyPointerToVe(proc, bytePointer, Some(size)) + val vc = new non_null_c_bounded_string() + .length(size) + .data(new BytePointer(copyPointerToVe(proc, bytePointer, Some(size)))) vc } @@ -291,10 +294,9 @@ object VeArrowTransfers extends LazyLogging { logger.debug(s"Copying Buffer to VE for $keyName") val vcvr = new nullable_int_vector() - vcvr.count = intVector.getValueCount - vcvr.data = copyPointerToVe(proc, new BytePointer(intVector.getDataBuffer.nioBuffer()))(cleanup) - vcvr.validityBuffer = - copyPointerToVe(proc, new BytePointer(intVector.getValidityBuffer.nioBuffer()))(cleanup) + .count(intVector.getValueCount) + .data(new IntPointer(copyPointerToVe(proc, new BytePointer(intVector.getDataBuffer.nioBuffer()))(cleanup))) + .validityBuffer(new LongPointer(copyPointerToVe(proc, new BytePointer(intVector.getValidityBuffer.nioBuffer()))(cleanup))) vcvr } @@ -315,10 +317,9 @@ object VeArrowTransfers extends LazyLogging { logger.debug(s"Copying Buffer to VE for $keyName") val vcvr = new nullable_int_vector() - vcvr.count = intVector.getValueCount - vcvr.data = copyPointerToVe(proc, new BytePointer(intVector.getDataBuffer.nioBuffer()))(cleanup) - vcvr.validityBuffer = - copyPointerToVe(proc, new BytePointer(intVector.getValidityBuffer.nioBuffer()))(cleanup) + .count(intVector.getValueCount) + .data(new IntPointer(copyPointerToVe(proc, new BytePointer(intVector.getDataBuffer.nioBuffer()))(cleanup))) + .validityBuffer(new LongPointer(copyPointerToVe(proc, new BytePointer(intVector.getValidityBuffer.nioBuffer()))(cleanup))) vcvr } @@ -338,10 +339,9 @@ object VeArrowTransfers extends LazyLogging { logger.debug(s"Copying Buffer to VE for $keyName") val vcvr = new nullable_int_vector() - vcvr.count = intVector.getValueCount - vcvr.data = copyPointerToVe(proc, new BytePointer(intVector.getDataBuffer.nioBuffer()))(cleanup) - vcvr.validityBuffer = - copyPointerToVe(proc, new BytePointer(intVector.getValidityBuffer.nioBuffer()))(cleanup) + .count(intVector.getValueCount) + .data(new IntPointer(copyPointerToVe(proc, new BytePointer(intVector.getDataBuffer.nioBuffer()))(cleanup))) + .validityBuffer(new LongPointer(copyPointerToVe(proc, new BytePointer(intVector.getValidityBuffer.nioBuffer()))(cleanup))) vcvr } @@ -354,12 +354,10 @@ object VeArrowTransfers extends LazyLogging { logger.debug(s"Copying Buffer to VE for $keyName") val vcvr = new nullable_int_vector() - vcvr.count = dateDayVector.getValueCount - vcvr.data = - copyPointerToVe(proc, new BytePointer(dateDayVector.getDataBuffer.nioBuffer()))(cleanup) - vcvr.validityBuffer = - copyPointerToVe(proc, new BytePointer(dateDayVector.getValidityBuffer.nioBuffer()))(cleanup) - + .count(dateDayVector.getValueCount) + .data(new IntPointer(copyPointerToVe(proc, new BytePointer(dateDayVector.getDataBuffer.nioBuffer()))(cleanup))) + .validityBuffer(new LongPointer(copyPointerToVe(proc, new BytePointer(dateDayVector.getValidityBuffer.nioBuffer()))(cleanup))) + vcvr } @@ -371,24 +369,28 @@ object VeArrowTransfers extends LazyLogging { logger.debug(s"Copying Buffer to VE for $keyName") val dataBuff = intCharsFromVarcharVector(varcharVector) - val vcvr = new nullable_varchar_vector() - vcvr.count = varcharVector.getValueCount - vcvr.dataSize = dataBuff.capacity() / 4 val startsBuff = startsFromVarcharVector(varcharVector) val lengthsBuff = lengthsFromVarcharVector(varcharVector) - vcvr.data = copyPointerToVe( + val vcvr = new nullable_varchar_vector() + .count(varcharVector.getValueCount) + .dataSize(dataBuff.capacity().toInt / 4) + .data(new IntPointer(copyPointerToVe( proc = proc, bytePointer = new BytePointer(dataBuff), len = Some(dataBuff.capacity()) - )(cleanup) - vcvr.offsets = - copyPointerToVe(proc, new BytePointer(startsBuff), len = Some(startsBuff.capacity()))(cleanup) - vcvr.lengths = copyPointerToVe( + )(cleanup))) + .offsets(new IntPointer(copyPointerToVe( + proc, + new BytePointer(startsBuff), + len = Some(startsBuff.capacity()) + )(cleanup))) + .lengths(new IntPointer(copyPointerToVe( proc, new BytePointer(lengthsBuff), len = Some(lengthsBuff.capacity()) - )(cleanup) + )(cleanup))) + vcvr } @@ -400,11 +402,9 @@ object VeArrowTransfers extends LazyLogging { logger.debug(s"Copying Buffer to VE for $keyName") val vcvr = new nullable_bigint_vector() - vcvr.count = bigintVector.getValueCount - vcvr.data = - copyPointerToVe(proc, new BytePointer(bigintVector.getDataBuffer.nioBuffer()))(cleanup) - vcvr.validityBuffer = - copyPointerToVe(proc, new BytePointer(bigintVector.getValidityBuffer.nioBuffer()))(cleanup) + .count(bigintVector.getValueCount) + .data(new LongPointer(copyPointerToVe(proc, new BytePointer(bigintVector.getDataBuffer.nioBuffer()))(cleanup))) + .validityBuffer(new LongPointer(copyPointerToVe(proc, new BytePointer(bigintVector.getValidityBuffer.nioBuffer()))(cleanup))) vcvr } @@ -417,10 +417,9 @@ object VeArrowTransfers extends LazyLogging { logger.debug(s"Copying Buffer to VE for $keyName") val vcvr = new nullable_bigint_vector() - vcvr.count = tsVector.getValueCount - vcvr.data = copyPointerToVe(proc, new BytePointer(tsVector.getDataBuffer.nioBuffer()))(cleanup) - vcvr.validityBuffer = - copyPointerToVe(proc, new BytePointer(tsVector.getValidityBuffer.nioBuffer()))(cleanup) + .count(tsVector.getValueCount) + .data(new LongPointer(copyPointerToVe(proc, new BytePointer(tsVector.getDataBuffer.nioBuffer()))(cleanup))) + .validityBuffer(new LongPointer(copyPointerToVe(proc, new BytePointer(tsVector.getValidityBuffer.nioBuffer()))(cleanup))) vcvr } @@ -441,8 +440,8 @@ object VeArrowTransfers extends LazyLogging { val dataSize = dataCount * 8 val vhTargetPointer = (new BytePointer(dataSize)) requireOk(veo.veo_read_mem(proc, vhTargetPointer, veoPtr, dataSize)) - vec.count = dataCount - vec.data = vhTargetPointer.address() + val newVec = vec.count(dataCount) + .data(new DoublePointer(vhTargetPointer)) cleanup.add(veoPtr, dataSize) } @@ -466,9 +465,9 @@ object VeArrowTransfers extends LazyLogging { veo.veo_read_mem(proc, vhTargetPointer, veoPtr, dataSize) veo.veo_read_mem(proc, validityTargetPointer, validityPtr, dataCount) } - vec.count = dataCount - vec.data = vhTargetPointer.address() - vec.validityBuffer = validityTargetPointer.address() + val newVec = vec.count(dataCount) + .data(new DoublePointer(vhTargetPointer)) + .validityBuffer(new LongPointer(validityTargetPointer)) cleanup.add(veoPtr, dataSize) cleanup.add(validityPtr, dataCount) @@ -494,9 +493,9 @@ object VeArrowTransfers extends LazyLogging { veo.veo_read_mem(proc, vhValidityTargetPointer, validityPtr, dataCount) } - vec.count = dataCount - vec.data = vhTargetPointer.address() - vec.validityBuffer = vhValidityTargetPointer.address() + val newVec = vec.count(dataCount) + .data(new IntPointer(vhTargetPointer)) + .validityBuffer(new LongPointer(vhValidityTargetPointer)) cleanup.add(veoPtr, dataSize) cleanup.add(validityPtr, dataSize) @@ -516,8 +515,8 @@ object VeArrowTransfers extends LazyLogging { val dataSize = dataCount * 8 val vhTargetPointer = (new BytePointer(dataSize)) requireOk(veo.veo_read_mem(proc, vhTargetPointer, veoPtr, dataSize)) - vec.count = dataCount - vec.data = vhTargetPointer.address() + val newVec = vec.count(dataCount) + .data(new LongPointer(vhTargetPointer)) cleanup.add(veoPtr, dataSize) } @@ -546,9 +545,9 @@ object VeArrowTransfers extends LazyLogging { requireOk { veo.veo_read_mem(proc, validityTargetPointer, validityPtr, dataCount) } - vec.count = dataCount - vec.data = vhTargetPointer.address() - vec.validityBuffer = validityTargetPointer.address() + val newVec = vec.count(dataCount) + .data(new LongPointer(vhTargetPointer)) + .validityBuffer(new LongPointer(validityTargetPointer)) cleanup.add(veoPtr, dataSize) cleanup.add(validityPtr, dataSize) @@ -562,11 +561,10 @@ object VeArrowTransfers extends LazyLogging { /** Get data size */ val dataSize = bytePointer.getInt(32) - vec.dataSize = dataSize - - /** Get data count */ val dataCount = bytePointer.getInt(36) - vec.count = dataCount + + val newVec = vec.dataSize(dataSize) + .count(dataCount) if (dataCount < 1) { // no data, do nothing @@ -580,7 +578,7 @@ object VeArrowTransfers extends LazyLogging { requireOk { veo.veo_read_mem(proc, vhTargetDataPointer, dataPtr, dataSize) } - vec.data = vhTargetDataPointer.address() + val newVec2 = newVec.data(new IntPointer(vhTargetDataPointer)) cleanup.add(dataPtr, dataSize) /** Transfer the offsets */ @@ -590,7 +588,7 @@ object VeArrowTransfers extends LazyLogging { requireOk { veo.veo_read_mem(proc, vhTargetOffsetsPointer, offsetsPtr, (dataCount + 1) * 4) } - vec.offsets = vhTargetOffsetsPointer.address() + val newVec3 = newVec2.offsets(new IntPointer(vhTargetOffsetsPointer)) cleanup.add(offsetsPtr, (dataCount + 1) * 4) /** Transfer the validity buffer */ @@ -600,58 +598,58 @@ object VeArrowTransfers extends LazyLogging { requireOk { veo.veo_read_mem(proc, vhValidityPointer, validityPtr, Math.ceil(vec.count / 64.0).toInt * 8) } - vec.validityBuffer = vhValidityPointer.address() + val newVec4 = newVec3.validityBuffer(new LongPointer(vhValidityPointer)) cleanup.add(validityPtr, Math.ceil(vec.count / 64.0).toInt * 8) } def stringToBytePointer(str_buf: non_null_c_bounded_string): BytePointer = { - val v_bb = str_buf.getPointer.getByteBuffer(0, 12) - v_bb.putLong(0, str_buf.data) + val v_bb = str_buf.asByteBuffer + v_bb.putLong(0, str_buf.data.address) v_bb.putInt(8, str_buf.length) new BytePointer(v_bb) } def nullableDoubleVectorToBytePointer(double_vector: nullable_double_vector): BytePointer = { - val v_bb = double_vector.getPointer.getByteBuffer(0, 20) - v_bb.putLong(0, double_vector.data) - v_bb.putLong(8, double_vector.validityBuffer) + val v_bb = double_vector.asByteBuffer + v_bb.putLong(0, double_vector.data.address) + v_bb.putLong(8, double_vector.validityBuffer.address) v_bb.putInt(16, double_vector.count) new BytePointer(v_bb) } def nullableBigintVectorToBytePointer(bigint_vector: nullable_bigint_vector): BytePointer = { - val v_bb = bigint_vector.getPointer.getByteBuffer(0, 20) - v_bb.putLong(0, bigint_vector.data) - v_bb.putLong(8, bigint_vector.validityBuffer) + val v_bb = bigint_vector.asByteBuffer + v_bb.putLong(0, bigint_vector.data.address) + v_bb.putLong(8, bigint_vector.validityBuffer.address) v_bb.putInt(16, bigint_vector.count) new BytePointer(v_bb) } def nullableIntVectorToBytePointer(int_vector: nullable_int_vector): BytePointer = { - val v_bb = int_vector.getPointer.getByteBuffer(0, 20) - v_bb.putLong(0, int_vector.data) - v_bb.putLong(8, int_vector.validityBuffer) + val v_bb = int_vector.asByteBuffer + v_bb.putLong(0, int_vector.data.address) + v_bb.putLong(8, int_vector.validityBuffer.address) v_bb.putInt(16, int_vector.count) new BytePointer(v_bb) } def nonNullDoubleVectorToBytePointer(double_vector: non_null_double_vector): BytePointer = { - val v_bb = double_vector.getPointer.getByteBuffer(0, 12) - v_bb.putLong(0, double_vector.data) + val v_bb = double_vector.asByteBuffer + v_bb.putLong(0, double_vector.data.address) v_bb.putInt(8, double_vector.count) new BytePointer(v_bb) } def nonNullInt2VectorToBytePointer(int_vector: non_null_int2_vector): BytePointer = { - val v_bb = int_vector.getPointer.getByteBuffer(0, 12) - v_bb.putLong(0, int_vector.data) + val v_bb = int_vector.asByteBuffer + v_bb.putLong(0, int_vector.data.address) v_bb.putInt(8, int_vector.count) new BytePointer(v_bb) } def nonNullBigIntVectorToBytePointer(bigint_vector: non_null_bigint_vector): BytePointer = { - val v_bb = bigint_vector.getPointer.getByteBuffer(0, 12) - v_bb.putLong(0, bigint_vector.data) + val v_bb = bigint_vector.asByteBuffer + v_bb.putLong(0, bigint_vector.data.address) v_bb.putInt(8, bigint_vector.count) new BytePointer(v_bb) } @@ -659,13 +657,13 @@ object VeArrowTransfers extends LazyLogging { def nullableVarCharVectorVectorToBytePointer( varchar_vector: nullable_varchar_vector ): BytePointer = { - val v_bb = varchar_vector.getPointer.getByteBuffer(0, (8 * 4) + (4 * 2)) - v_bb.putLong(0, varchar_vector.data) - v_bb.putLong(8, varchar_vector.offsets) - v_bb.putLong(16, varchar_vector.lengths) - v_bb.putLong(24, varchar_vector.validityBuffer) + val v_bb = varchar_vector.asByteBuffer + v_bb.putLong(0, varchar_vector.data.address) + v_bb.putLong(8, varchar_vector.offsets.address) + v_bb.putLong(16, varchar_vector.lengths.address) + v_bb.putLong(24, varchar_vector.validityBuffer.address) v_bb.putInt(32, varchar_vector.dataSize) v_bb.putInt(36, varchar_vector.count) new BytePointer(v_bb) - } + } } diff --git a/src/main/scala/com/nec/ve/colvector/VeColVector.scala b/src/main/scala/com/nec/ve/colvector/VeColVector.scala index 839267598..ab387c574 100644 --- a/src/main/scala/com/nec/ve/colvector/VeColVector.scala +++ b/src/main/scala/com/nec/ve/colvector/VeColVector.scala @@ -1,12 +1,12 @@ package com.nec.ve.colvector -import com.nec.arrow.ArrowInterfaces.getUnsafe -import com.nec.arrow.ArrowTransferStructures.{ +import com.nec.arrow.TransferDefinitions.{ nullable_bigint_vector, nullable_double_vector, nullable_int_vector, nullable_varchar_vector } +import com.nec.arrow.ArrowInterfaces.getUnsafe import com.nec.arrow.VeArrowTransfers.{ nullableBigintVectorToBytePointer, nullableDoubleVectorToBytePointer, @@ -29,7 +29,11 @@ import com.nec.ve.colvector.VeColVector.getUnsafe import org.apache.arrow.memory.BufferAllocator import org.apache.arrow.vector._ import org.apache.spark.sql.vectorized.ColumnVector +import org.bytedeco.javacpp.Pointer import org.bytedeco.javacpp.BytePointer +import org.bytedeco.javacpp.DoublePointer +import org.bytedeco.javacpp.LongPointer +import org.bytedeco.javacpp.IntPointer import sun.misc.Unsafe import java.io.{DataOutputStream, OutputStream} @@ -97,6 +101,34 @@ final case class VeColVector(underlying: GenericColVector[Long]) { ) ) + class UnsafeBytePointer extends BytePointer + { + def setAddress(to: Long): Unit = { + address = to + } + } + + class UnsafeDoublePointer extends DoublePointer + { + def setAddress(to: Long): Unit = { + address = to + } + } + + class UnsafeIntPointer extends IntPointer + { + def setAddress(to: Long): Unit = { + address = to + } + } + + class UnsafeLongPointer extends LongPointer + { + def setAddress(to: Long): Unit = { + address = to + } + } + def newContainer()(implicit veProcess: VeProcess, source: VeColVectorSource, @@ -105,39 +137,59 @@ final case class VeColVector(underlying: GenericColVector[Long]) { copy(underlying = { veType match { case VeScalarType.VeNullableDouble => + val ptr = (new UnsafeDoublePointer()) + ptr.setAddress(buffers(0)) + val validityPtr = (new UnsafeLongPointer()) + validityPtr.setAddress(buffers(1)) val vcvr = new nullable_double_vector() - vcvr.count = numItems - vcvr.data = buffers(0) - vcvr.validityBuffer = buffers(1) + .count(numItems) + .data(ptr) + .validityBuffer(validityPtr) val bytePointer = nullableDoubleVectorToBytePointer(vcvr) underlying.copy(container = veProcess.putPointer(bytePointer)) case VeScalarType.VeNullableInt => + val ptr = (new UnsafeIntPointer()) + ptr.setAddress(buffers(0)) + val validityPtr = (new UnsafeLongPointer()) + validityPtr.setAddress(buffers(1)) val vcvr = new nullable_int_vector() - vcvr.count = numItems - vcvr.data = buffers(0) - vcvr.validityBuffer = buffers(1) + .count(numItems) + .data(ptr) + .validityBuffer(validityPtr) val bytePointer = nullableIntVectorToBytePointer(vcvr) underlying.copy(container = veProcess.putPointer(bytePointer)) case VeScalarType.VeNullableLong => + val ptr = (new UnsafeLongPointer()) + ptr.setAddress(buffers(0)) + val validityPtr = (new UnsafeLongPointer()) + validityPtr.setAddress(buffers(1)) val vcvr = new nullable_bigint_vector() - vcvr.count = numItems - vcvr.data = buffers(0) - vcvr.validityBuffer = buffers(1) + .count(numItems) + .data(ptr) + .validityBuffer(validityPtr) val bytePointer = nullableBigintVectorToBytePointer(vcvr) underlying.copy(container = veProcess.putPointer(bytePointer)) case VeString => - val vcvr = new nullable_varchar_vector() - vcvr.count = numItems - vcvr.data = buffers(0) - vcvr.offsets = buffers(1) - vcvr.lengths = buffers(2) - vcvr.validityBuffer = buffers(3) - vcvr.dataSize = - variableSize.getOrElse(sys.error("Invalid state - VeString has no variableSize")) + val ptr = (new UnsafeIntPointer()) + ptr.setAddress(buffers(0)) + val offsetsPtr = (new UnsafeIntPointer()) + offsetsPtr.setAddress(buffers(1)) + val lengthsPtr = (new UnsafeIntPointer()) + lengthsPtr.setAddress(buffers(2)) + val validityPtr = (new UnsafeLongPointer()) + validityPtr.setAddress(buffers(3)) + val vcvr = new nullable_varchar_vector() + .count(numItems) + .data(ptr) + .offsets(offsetsPtr) + .lengths(lengthsPtr) + .validityBuffer(validityPtr) + .dataSize( + variableSize.getOrElse(sys.error("Invalid state - VeString has no variableSize"))) val bytePointer = nullableVarCharVectorVectorToBytePointer(vcvr) underlying.copy(container = veProcess.putPointer(bytePointer)) diff --git a/src/test/scala/com/nec/arrow/ArrowInterfacesTest.scala b/src/test/scala/com/nec/cmake/ArrowInterfacesTest.scala similarity index 89% rename from src/test/scala/com/nec/arrow/ArrowInterfacesTest.scala rename to src/test/scala/com/nec/cmake/ArrowInterfacesTest.scala index 027c70616..91c7e6e5a 100644 --- a/src/test/scala/com/nec/arrow/ArrowInterfacesTest.scala +++ b/src/test/scala/com/nec/cmake/ArrowInterfacesTest.scala @@ -17,9 +17,9 @@ * limitations under the License. * */ -package com.nec.arrow +package com.nec.cmake import com.nec.arrow.ArrowInterfaces.non_null_double_vector_to_float8Vector -import com.nec.arrow.ArrowTransferStructures.non_null_double_vector +import com.nec.arrow.TransferDefinitions.non_null_double_vector import com.nec.util.RichVectors._ import org.apache.arrow.memory.RootAllocator import org.apache.arrow.vector.Float8Vector @@ -41,11 +41,10 @@ final class ArrowInterfacesTest extends AnyFreeSpec { try { val vector = new Float8Vector("value", alloc) try { - val ndv = new non_null_double_vector - ndv.count = list.size val bp = new DoublePointer(list.toArray:_*) - - ndv.data = bp.address() + val ndv = new non_null_double_vector() + .count(list.size) + .data(bp) non_null_double_vector_to_float8Vector(ndv, vector) assert(vector.toList == list) } finally vector.close()