Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Support delta update on filter that is not projected out #12285

Open
wants to merge 1 commit into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion velox/connectors/hive/HiveDataSource.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -323,6 +323,7 @@ void HiveDataSource::addSplit(std::shared_ptr<ConnectorSplit> split) {
// so we initialize it beforehand.
splitReader_->configureReaderOptions(randomSkip_);
splitReader_->prepareSplit(metadataFilter_, runtimeStats_);
readerOutputType_ = splitReader_->readerOutputType();
}

vector_size_t HiveDataSource::applyBucketConversion(
Expand Down Expand Up @@ -378,7 +379,12 @@ std::optional<RowVectorPtr> HiveDataSource::next(
return nullptr;
}

if (!output_) {
// Bucket conversion or delta update could add extra column to reader output.
auto needsExtraColumn = [&] {
return output_->asUnchecked<RowVector>()->childrenSize() <
readerOutputType_->size();
};
if (!output_ || needsExtraColumn()) {
output_ = BaseVector::create(readerOutputType_, 0, pool_);
}

Expand Down
12 changes: 9 additions & 3 deletions velox/connectors/hive/SplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -147,7 +147,11 @@ void SplitReader::configureReaderOptions(
void SplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) {
auto rowType = createReader();
createReader();
if (emptySplit_) {
return;
}
auto rowType = getAdaptedRowType();

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
Expand Down Expand Up @@ -221,7 +225,7 @@ std::string SplitReader::toString() const {
static_cast<const void*>(baseRowReader_.get()));
}

RowTypePtr SplitReader::createReader() {
void SplitReader::createReader() {
VELOX_CHECK_NE(
baseReaderOpts_.fileFormat(), dwio::common::FileFormat::UNKNOWN);

Expand All @@ -237,7 +241,7 @@ RowTypePtr SplitReader::createReader() {
hiveConfig_->ignoreMissingFiles(
connectorQueryCtx_->sessionProperties())) {
emptySplit_ = true;
return nullptr;
return;
}
throw;
}
Expand All @@ -258,7 +262,9 @@ RowTypePtr SplitReader::createReader() {

baseReader_ = dwio::common::getReaderFactory(baseReaderOpts_.fileFormat())
->createReader(std::move(baseFileInput), baseReaderOpts_);
}

RowTypePtr SplitReader::getAdaptedRowType() const {
auto& fileType = baseReader_->rowType();
auto columnTypes = adaptColumns(fileType, baseReaderOpts_.fileSchema());
auto columnNames = fileType->names();
Expand Down
12 changes: 10 additions & 2 deletions velox/connectors/hive/SplitReader.h
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,10 @@ class SplitReader {

void setConnectorQueryCtx(const ConnectorQueryCtx* connectorQueryCtx);

const RowTypePtr& readerOutputType() const {
return readerOutputType_;
}

std::string toString() const;

protected:
Expand All @@ -113,7 +117,11 @@ class SplitReader {

/// Create the dwio::common::Reader object baseReader_, which will be used to
/// read the data file's metadata and schema
RowTypePtr createReader();
void createReader();

// Adjust the scan spec according to the current split, then return the
// adapted row type.
RowTypePtr getAdaptedRowType() const;

// Check if the filters pass on the column statistics. When delta update is
// present, the corresonding filter should be disabled before calling this
Expand Down Expand Up @@ -155,7 +163,7 @@ class SplitReader {
const ConnectorQueryCtx* connectorQueryCtx_;
const std::shared_ptr<const HiveConfig> hiveConfig_;

const RowTypePtr readerOutputType_;
RowTypePtr readerOutputType_;
const std::shared_ptr<io::IoStatistics> ioStats_;
FileHandleFactory* const fileHandleFactory_;
folly::Executor* const executor_;
Expand Down
6 changes: 5 additions & 1 deletion velox/connectors/hive/iceberg/IcebergSplitReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,11 @@ IcebergSplitReader::IcebergSplitReader(
void IcebergSplitReader::prepareSplit(
std::shared_ptr<common::MetadataFilter> metadataFilter,
dwio::common::RuntimeStatistics& runtimeStats) {
auto rowType = createReader();
createReader();
if (emptySplit_) {
return;
}
auto rowType = getAdaptedRowType();

if (checkIfSplitIsEmpty(runtimeStats)) {
VELOX_CHECK(emptySplit_);
Expand Down
3 changes: 2 additions & 1 deletion velox/dwio/common/Reader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ VectorPtr RowReader::projectColumns(
}
}
for (auto& childSpec : spec.children()) {
VELOX_CHECK_NULL(childSpec->deltaUpdate());
VectorPtr child;
if (childSpec->isConstant()) {
child = BaseVector::wrapInConstant(
Expand Down Expand Up @@ -133,7 +134,7 @@ void RowReader::readWithRowNumber(
const auto& rowNumberColumnName = rowNumberColumnInfo->name;
column_index_t numChildren{0};
for (auto& column : options.scanSpec()->children()) {
if (column->projectOut()) {
if (column->keepValues()) {
++numChildren;
}
}
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/ScanSpec.h
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ class ScanSpec {
}

bool keepValues() const {
return projectOut_;
return projectOut_ || deltaUpdate_;
}

// Position in the RowVector returned by the top level scan. Applies
Expand Down
2 changes: 1 addition & 1 deletion velox/dwio/common/SelectiveStructColumnReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,7 @@ void SelectiveStructColumnReaderBase::getValues(
setComplexNulls(rows, *result);
for (const auto& childSpec : scanSpec_->children()) {
VELOX_TRACE_HISTORY_PUSH("getValues %s", childSpec->fieldName().c_str());
if (!childSpec->projectOut()) {
if (!childSpec->keepValues()) {
continue;
}

Expand Down
Loading