Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Avoid to call import and export Arrow array for native execution #1055

Closed
Closed
Show file tree
Hide file tree
Changes from 54 commits
Commits
Show all changes
68 commits
Select commit Hold shift + click to select a range
50d686e
fix: Optimize not to call getNullCount as much as possible
kazuyukitanimura Aug 9, 2024
532b9f6
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Aug 13, 2024
388ea7f
fix: Optimize not to call getNullCount as much as possible
kazuyukitanimura Aug 13, 2024
dbd4016
fix: Optimize not to call getNullCount as much as possible
kazuyukitanimura Aug 13, 2024
a773832
fix: Optimize not to call getNullCount as much as possible
kazuyukitanimura Aug 13, 2024
f6176fb
fix ci
kazuyukitanimura Aug 14, 2024
87bcd03
fix ci
kazuyukitanimura Aug 15, 2024
a903b41
fix ci
kazuyukitanimura Aug 15, 2024
bc7136e
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Aug 17, 2024
0d87c6e
fix: Optimize CheckOverflow
kazuyukitanimura Aug 19, 2024
0cd44c0
fix: Optimize CheckOverflow
kazuyukitanimura Aug 19, 2024
9d68eed
fix: Optimize CheckOverflow
kazuyukitanimura Aug 19, 2024
d0569bc
fix: Optimize CheckOverflow
kazuyukitanimura Aug 20, 2024
7385cc8
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Aug 20, 2024
efb4a50
fix: Remove export
kazuyukitanimura Aug 20, 2024
4c64265
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Sep 5, 2024
027dd06
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Sep 7, 2024
8e4ea8d
fix: Remove export
kazuyukitanimura Sep 7, 2024
5df5309
fix: Remove export
kazuyukitanimura Sep 10, 2024
c2e2874
fix: Remove export
kazuyukitanimura Sep 12, 2024
5045374
fix: Remove export
kazuyukitanimura Sep 17, 2024
2da220f
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Sep 27, 2024
d4d03e8
chore: fix compatibility guide
kazuyukitanimura Sep 27, 2024
513acd3
fix: Remove export
kazuyukitanimura Sep 28, 2024
2af8b36
fix: Remove export
kazuyukitanimura Sep 28, 2024
18fff6e
fix: Remove export
kazuyukitanimura Oct 2, 2024
b50348f
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Oct 2, 2024
eaebd6d
fix: Remove export
kazuyukitanimura Oct 2, 2024
93e350d
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Oct 2, 2024
f93ba1d
fix: Remove export
kazuyukitanimura Oct 3, 2024
49f1903
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Oct 3, 2024
8032c26
fix: Remove export
kazuyukitanimura Oct 4, 2024
bf4d3e4
fix: Remove export
kazuyukitanimura Oct 8, 2024
a6d5e77
fix: Remove export
kazuyukitanimura Oct 15, 2024
1967a68
Revert "fix: Remove export"
kazuyukitanimura Oct 15, 2024
7299ae9
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Oct 15, 2024
81339a8
fix: Remove export
kazuyukitanimura Oct 15, 2024
41a6695
Revert "fix: Remove export"
kazuyukitanimura Oct 15, 2024
e356b8a
"fix: Remove export"
kazuyukitanimura Oct 15, 2024
282c3e3
fix: Fallback to Spark if named_struct contains duplicate field names…
viirya Oct 13, 2024
7fae9c5
chore: Reserve memory for native shuffle writer per partition (#988)
viirya Oct 14, 2024
37b1e9c
chore: Bump arrow-rs to 53.1.0 and datafusion (#1001)
kazuyukitanimura Oct 14, 2024
a3602df
fix: Remove export
kazuyukitanimura Oct 16, 2024
765a387
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Oct 23, 2024
cd0ae7e
fix: Remove export
kazuyukitanimura Oct 25, 2024
bee3740
fix: Remove export
kazuyukitanimura Oct 26, 2024
c2f7288
fix: Remove export
kazuyukitanimura Oct 28, 2024
59ed00c
fix: Remove export
kazuyukitanimura Oct 28, 2024
75dd04c
fix: Remove export
kazuyukitanimura Nov 1, 2024
dc2c0bf
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Nov 1, 2024
58cb5a2
fix: Remove export
kazuyukitanimura Nov 1, 2024
df03371
fix: Remove export
kazuyukitanimura Nov 4, 2024
01fd025
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Nov 4, 2024
74043b5
fix: Remove export
kazuyukitanimura Nov 5, 2024
5ac4408
fix: Remove export
kazuyukitanimura Nov 5, 2024
10ffc64
fix: Remove export
kazuyukitanimura Nov 8, 2024
14053ec
fix: Remove export
kazuyukitanimura Nov 8, 2024
ed8f546
fix: Remove export
kazuyukitanimura Nov 8, 2024
7ede86d
fix: Remove export
kazuyukitanimura Nov 9, 2024
e49d092
fix: Remove export
kazuyukitanimura Nov 9, 2024
a8a38f7
fix: Remove export
kazuyukitanimura Nov 9, 2024
adf5a90
fix: Remove export
kazuyukitanimura Nov 9, 2024
0bf2415
fix: Remove export
kazuyukitanimura Nov 9, 2024
203e16e
fix: Remove export
kazuyukitanimura Nov 11, 2024
97960d7
fix: Remove export
kazuyukitanimura Nov 12, 2024
ec72117
fix: Remove export
kazuyukitanimura Nov 12, 2024
8a01335
Merge remote-tracking branch 'upstream/main' into optimize-null-count
kazuyukitanimura Nov 13, 2024
0c7a228
fix: Remove export
kazuyukitanimura Nov 13, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,8 @@ public class BatchReader extends RecordReader<Void, ColumnarBatch> implements Cl
/** The TaskContext object for executing this task. */
private final TaskContext taskContext;

private boolean hasNativeOperations = false;

// Only for testing
public BatchReader(String file, int capacity) {
this(file, capacity, null, null);
Expand Down Expand Up @@ -215,7 +217,8 @@ public BatchReader(AbstractColumnReader[] columnReaders) {
boolean useLegacyDateTimestamp,
StructType partitionSchema,
InternalRow partitionValues,
Map<String, SQLMetric> metrics) {
Map<String, SQLMetric> metrics,
boolean hasNativeOperations) {
this.conf = conf;
this.capacity = capacity;
this.sparkSchema = sparkSchema;
Expand All @@ -229,6 +232,7 @@ public BatchReader(AbstractColumnReader[] columnReaders) {
this.footer = footer;
this.metrics = metrics;
this.taskContext = TaskContext$.MODULE$.get();
this.hasNativeOperations = hasNativeOperations;
}

/**
Expand Down Expand Up @@ -586,7 +590,8 @@ private boolean loadNextRowGroupIfNecessary() throws Throwable {
capacity,
useDecimal128,
useLazyMaterialization,
useLegacyDateTimestamp);
useLegacyDateTimestamp,
hasNativeOperations);
reader.setPageReader(rowGroupReader.getPageReader(columns.get(i)));
columnReaders[i] = reader;
}
Expand Down
34 changes: 25 additions & 9 deletions common/src/main/java/org/apache/comet/parquet/ColumnReader.java
Original file line number Diff line number Diff line change
Expand Up @@ -44,11 +44,7 @@
import org.apache.spark.sql.types.DataType;

import org.apache.comet.CometConf;
import org.apache.comet.vector.CometDecodedVector;
import org.apache.comet.vector.CometDictionary;
import org.apache.comet.vector.CometDictionaryVector;
import org.apache.comet.vector.CometPlainVector;
import org.apache.comet.vector.CometVector;
import org.apache.comet.vector.*;

public class ColumnReader extends AbstractColumnReader {
protected static final Logger LOG = LoggerFactory.getLogger(ColumnReader.class);
Expand All @@ -58,7 +54,7 @@ public class ColumnReader extends AbstractColumnReader {
* The current Comet vector holding all the values read by this column reader. Owned by this
* reader and MUST be closed after use.
*/
private CometDecodedVector currentVector;
private CometVector currentVector;

/** Dictionary values for this column. Only set if the column is using dictionary encoding. */
protected CometDictionary dictionary;
Expand Down Expand Up @@ -90,6 +86,8 @@ public class ColumnReader extends AbstractColumnReader {

private final CometSchemaImporter importer;

private final boolean hasNativeOperations;

private ArrowArray array = null;
private ArrowSchema schema = null;

Expand All @@ -99,11 +97,13 @@ public ColumnReader(
CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
boolean useLegacyDateTimestamp,
boolean hasNativeOperations) {
super(type, descriptor, useDecimal128, useLegacyDateTimestamp);
assert batchSize > 0 : "Batch size must be positive, found " + batchSize;
this.batchSize = batchSize;
this.importer = importer;
this.hasNativeOperations = hasNativeOperations;
initNative();
}

Expand Down Expand Up @@ -171,7 +171,23 @@ public void close() {
}

/** Returns a decoded {@link CometDecodedVector Comet vector}. */
public CometDecodedVector loadVector() {
public CometVector loadVector() {
if (hasNativeOperations) {
if (currentVector != null) {
currentVector.close();
}

array = ArrowArray.allocateNew(ALLOCATOR);
schema = ArrowSchema.allocateNew(ALLOCATOR);

long arrayAddr = array.memoryAddress();
long schemaAddr = schema.memoryAddress();

Native.currentBatch(nativeHandle, arrayAddr, schemaAddr);
currentVector = new CometNativeVector(null, useDecimal128, arrayAddr, schemaAddr);
return currentVector;
}

// Only re-use Comet vector iff:
// 1. if we're not using dictionary encoding, since with dictionary encoding, the native
// side may fallback to plain encoding and the underlying memory address for the vector
Expand Down Expand Up @@ -264,7 +280,7 @@ protected void readPage() {
if (page == null) {
throw new RuntimeException("overreading: returned DataPage is null");
}
;

int pageValueCount = page.getValueCount();
page.accept(
new DataPage.Visitor<Void>() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,14 @@ public LazyColumnReader(
int batchSize,
boolean useDecimal128,
boolean useLegacyDateTimestamp) {
super(sparkReadType, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
super(
sparkReadType,
descriptor,
importer,
batchSize,
useDecimal128,
useLegacyDateTimestamp,
false);
this.batchSize = 0; // the batch size is set later in `readBatch`
this.vector = new CometLazyVector(sparkReadType, this, useDecimal128);
}
Expand Down
25 changes: 20 additions & 5 deletions common/src/main/java/org/apache/comet/parquet/Utils.java
Original file line number Diff line number Diff line change
Expand Up @@ -32,10 +32,18 @@ public static ColumnReader getColumnReader(
CometSchemaImporter importer,
int batchSize,
boolean useDecimal128,
boolean useLazyMaterialization) {
boolean useLazyMaterialization,
boolean hasNativeOperations) {
// TODO: support `useLegacyDateTimestamp` for Iceberg
return getColumnReader(
type, descriptor, importer, batchSize, useDecimal128, useLazyMaterialization, true);
type,
descriptor,
importer,
batchSize,
useDecimal128,
useLazyMaterialization,
true,
hasNativeOperations);
}

public static ColumnReader getColumnReader(
Expand All @@ -45,13 +53,20 @@ public static ColumnReader getColumnReader(
int batchSize,
boolean useDecimal128,
boolean useLazyMaterialization,
boolean useLegacyDateTimestamp) {
if (useLazyMaterialization && supportLazyMaterialization(type)) {
boolean useLegacyDateTimestamp,
boolean hasNativeOperations) {
if (useLazyMaterialization && !hasNativeOperations && supportLazyMaterialization(type)) {
return new LazyColumnReader(
type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
} else {
return new ColumnReader(
type, descriptor, importer, batchSize, useDecimal128, useLegacyDateTimestamp);
type,
descriptor,
importer,
batchSize,
useDecimal128,
useLegacyDateTimestamp,
hasNativeOperations);
}
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,82 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.comet.vector;

import org.apache.arrow.vector.ValueVector;
import org.apache.spark.sql.types.DataType;

public class CometNativeVector extends CometVector {
private final long arrayAddress;
private final long schemaAddress;

public CometNativeVector(
DataType type, boolean useDecimal128, long arrayAddress, long schemaAddress) {
super(type, useDecimal128);
this.arrayAddress = arrayAddress;
this.schemaAddress = schemaAddress;
}

@Override
public void setNumNulls(int numNulls) {}

@Override
public void setNumValues(int numValues) {}

@Override
public int numValues() {
return 0;
}

@Override
public ValueVector getValueVector() {
return null;
}

@Override
public CometVector slice(int offset, int length) {
return null;
}

@Override
public boolean hasNull() {
return false;
}

@Override
public int numNulls() {
return 0;
}

@Override
public boolean isNullAt(int i) {
return false;
}

@Override
public void close() {}

public long getArrayAddress() {
return arrayAddress;
}

public long getSchemaAddress() {
return schemaAddress;
}
}
45 changes: 24 additions & 21 deletions common/src/main/scala/org/apache/comet/vector/NativeUtil.scala
Original file line number Diff line number Diff line change
Expand Up @@ -88,14 +88,16 @@ class NativeUtil {
* an exported batches object containing an array containing number of rows + pairs of memory
* addresses in the format of (address of Arrow array, address of Arrow schema)
*/
def exportBatch(
arrayAddrs: Array[Long],
schemaAddrs: Array[Long],
batch: ColumnarBatch): Int = {
def exportBatch(batch: ColumnarBatch): Array[Long] = {
val numRows = mutable.ArrayBuffer.empty[Int]
val builder = Array.newBuilder[Long]
builder += batch.numRows()

(0 until batch.numCols()).foreach { index =>
batch.column(index) match {
case a: CometNativeVector =>
builder += a.getArrayAddress
builder += a.getSchemaAddress
case a: CometVector =>
val valueVector = a.getValueVector

Expand All @@ -107,16 +109,16 @@ class NativeUtil {
null
}

// The array and schema structures are allocated by native side.
// Don't need to deallocate them here.
val arrowSchema = ArrowSchema.wrap(schemaAddrs(index))
val arrowArray = ArrowArray.wrap(arrayAddrs(index))
kazuyukitanimura marked this conversation as resolved.
Show resolved Hide resolved
val arrowSchema = ArrowSchema.allocateNew(allocator)
val arrowArray = ArrowArray.allocateNew(allocator)
Data.exportVector(
allocator,
getFieldVector(valueVector, "export"),
provider,
arrowArray,
arrowSchema)
builder += arrowArray.memoryAddress()
builder += arrowSchema.memoryAddress()
case c =>
throw new SparkException(
"Comet execution only takes Arrow Arrays, but got " +
Expand All @@ -133,34 +135,35 @@ class NativeUtil {
// the Arrow arrays. For example, Iceberg column reader will skip deleted rows internally in
// its `CometVector` implementation. The `ColumnarBatch` returned by the reader will report
// logical number of rows which is less than actual number of rows due to row deletion.
numRows.headOption.getOrElse(batch.numRows())

builder.result()
}

/**
* Gets the next batch from native execution.
*
* @param numOutputCols
* The number of output columns
* @param func
* The function to call to get the next batch
* @return
* The number of row of the next batch, or None if there are no more batches
*/
def getNextBatch(
numOutputCols: Int,
func: (Array[Long], Array[Long]) => Long): Option[ColumnarBatch] = {
val (arrays, schemas) = allocateArrowStructs(numOutputCols)

val arrayAddrs = arrays.map(_.memoryAddress())
val schemaAddrs = schemas.map(_.memoryAddress())

val result = func(arrayAddrs, schemaAddrs)
kazuyukitanimura marked this conversation as resolved.
Show resolved Hide resolved
def getNextBatch(func: () => Array[Long]): Option[ColumnarBatch] = {
val cometBatchElements = func()
val result = cometBatchElements(0)
val arrayBuilder = Array.newBuilder[ArrowArray]
val schemaBuilder = Array.newBuilder[ArrowSchema]
for (i <- 1 until cometBatchElements.length by 2) {
arrayBuilder += ArrowArray.wrap(cometBatchElements(i))
schemaBuilder += ArrowSchema.wrap(cometBatchElements(i + 1))
}
val arrays = arrayBuilder.result()
val schemas = schemaBuilder.result()

result match {
case -1 =>
// EOF
None
case numRows =>
case numRows if numRows >= 0 =>
val cometVectors = importVector(arrays, schemas)
Some(new ColumnarBatch(cometVectors.toArray, numRows.toInt))
case flag =>
Expand Down
Loading
Loading