Skip to content

Commit 6be6b5f

Browse files
committed
fix(parquet): Fix inserting decimal type values precision and scale
1 parent ecb2baa commit 6be6b5f

File tree

6 files changed

+134
-6
lines changed

6 files changed

+134
-6
lines changed

velox/connectors/Connector.h

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -160,6 +160,8 @@ class ConnectorInsertTableHandle : public ISerializable {
160160

161161
virtual std::string toString() const = 0;
162162

163+
virtual const std::vector<TypePtr> getColumnHandleDataTypes() const = 0;
164+
163165
folly::dynamic serialize() const override {
164166
VELOX_NYI();
165167
}

velox/connectors/hive/HiveDataSink.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -337,6 +337,14 @@ class HiveInsertTableHandle : public ConnectorInsertTableHandle {
337337

338338
std::string toString() const override;
339339

340+
const std::vector<TypePtr> getColumnHandleDataTypes() const override {
341+
std::vector<TypePtr> tableTypes;
342+
for (const auto& i : inputColumns_) {
343+
tableTypes.emplace_back(i->dataType());
344+
}
345+
return tableTypes;
346+
}
347+
340348
protected:
341349
const std::vector<std::shared_ptr<const HiveColumnHandle>> inputColumns_;
342350
const std::shared_ptr<const LocationHandle> locationHandle_;

velox/connectors/hive/tests/HiveDataSinkTest.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1204,6 +1204,37 @@ TEST_F(HiveDataSinkTest, insertTableHandleToString) {
12041204
"HiveInsertTableHandle [dwrf zstd], [inputColumns: [ HiveColumnHandle [name: c0, columnType: Regular, dataType: BIGINT, requiredSubfields: [ ]] HiveColumnHandle [name: c1, columnType: Regular, dataType: INTEGER, requiredSubfields: [ ]] HiveColumnHandle [name: c2, columnType: Regular, dataType: SMALLINT, requiredSubfields: [ ]] HiveColumnHandle [name: c3, columnType: Regular, dataType: REAL, requiredSubfields: [ ]] HiveColumnHandle [name: c4, columnType: Regular, dataType: DOUBLE, requiredSubfields: [ ]] HiveColumnHandle [name: c5, columnType: PartitionKey, dataType: VARCHAR, requiredSubfields: [ ]] HiveColumnHandle [name: c6, columnType: PartitionKey, dataType: BOOLEAN, requiredSubfields: [ ]] ], locationHandle: LocationHandle [targetPath: /path/to/test, writePath: /path/to/test, tableType: kNew, tableFileName: ], bucketProperty: \nHiveBucketProperty[<HIVE_COMPATIBLE 4>\n\tBucket Columns:\n\t\tc5\n\tBucket Types:\n\t\tVARCHAR\n\tSortedBy Columns:\n\t\t[COLUMN[c5] ORDER[DESC NULLS LAST]]\n]\n, fileNameGenerator: HiveInsertFileNameGenerator]");
12051205
}
12061206

1207+
TEST_F(HiveDataSinkTest, insertTableHandleGetColumnHandleDataTypes) {
1208+
const int32_t kNumBuckets = 4;
1209+
auto bucketProperty = std::make_shared<HiveBucketProperty>(
1210+
HiveBucketProperty::Kind::kHiveCompatible,
1211+
kNumBuckets,
1212+
std::vector<std::string>{"c5"},
1213+
std::vector<TypePtr>{VARCHAR()},
1214+
std::vector<std::shared_ptr<const HiveSortingColumn>>{
1215+
std::make_shared<HiveSortingColumn>(
1216+
"c5", core::SortOrder{false, false})});
1217+
auto rowType =
1218+
ROW({"c0", "c5", "c6"}, {DECIMAL(10, 2), VARCHAR(), BOOLEAN()});
1219+
auto insertTableHandle = createHiveInsertTableHandle(
1220+
rowType,
1221+
TempDirectoryPath::create()->getPath(),
1222+
dwio::common::FileFormat::PARQUET,
1223+
{"c5", "c6"},
1224+
bucketProperty);
1225+
auto [precision, scale] = getDecimalPrecisionScale(
1226+
*insertTableHandle->getColumnHandleDataTypes()[0]);
1227+
ASSERT_EQ(precision, 10);
1228+
ASSERT_EQ(scale, 2);
1229+
ASSERT_EQ(
1230+
insertTableHandle->getColumnHandleDataTypes()[0]->toString(),
1231+
"DECIMAL(10, 2)");
1232+
ASSERT_EQ(
1233+
insertTableHandle->getColumnHandleDataTypes()[1]->toString(), "VARCHAR");
1234+
ASSERT_EQ(
1235+
insertTableHandle->getColumnHandleDataTypes()[2]->toString(), "BOOLEAN");
1236+
}
1237+
12071238
#ifdef VELOX_ENABLE_PARQUET
12081239
TEST_F(HiveDataSinkTest, flushPolicyWithParquet) {
12091240
const auto outputDirectory = TempDirectoryPath::create();

velox/dwio/parquet/writer/Writer.cpp

Lines changed: 79 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -229,7 +229,18 @@ std::shared_ptr<::arrow::Field> updateFieldNameRecursive(
229229
arrowMapType->item_field(), *mapType.valueType());
230230
return newField->WithType(
231231
::arrow::map(newKeyField->type(), newValueField->type()));
232-
} else if (name != "") {
232+
} else if (type.isDecimal()) {
233+
// Parquet type is set from the column type rather than inferred from the
234+
// field data.
235+
auto precisionScale = getDecimalPrecisionScale(type);
236+
if (!name.empty()) {
237+
auto newField = field->WithName(name);
238+
return newField->WithType(
239+
::arrow::decimal(precisionScale.first, precisionScale.second));
240+
}
241+
return field->WithType(
242+
::arrow::decimal(precisionScale.first, precisionScale.second));
243+
} else if (!name.empty()) {
233244
return field->WithName(name);
234245
} else {
235246
return field;
@@ -318,6 +329,72 @@ std::optional<std::string> getParquetCreatedBy(
318329
return std::nullopt;
319330
}
320331

332+
bool equivalentWithSimilarDecimal(
333+
const Type& dataType,
334+
const Type& schemaType) {
335+
if (!(typeid(dataType) == typeid(schemaType))) {
336+
return false;
337+
}
338+
if (schemaType.isRow()) {
339+
const auto& schemaTyped = schemaType.asRow();
340+
if (schemaTyped.size() != dataType.size()) {
341+
return false;
342+
}
343+
for (size_t i = 0; i < dataType.size(); ++i) {
344+
if (schemaTyped.childAt(i)->isDecimal()) {
345+
auto presicisonScaleSchema =
346+
getDecimalPrecisionScale(*schemaTyped.childAt(i));
347+
auto precisionScaleData =
348+
getDecimalPrecisionScale(*dataType.childAt(i));
349+
// Table precision can be larger than or equal to the inserted data, but
350+
// the scale must be the same.
351+
if (presicisonScaleSchema.first < precisionScaleData.first ||
352+
presicisonScaleSchema.second != precisionScaleData.second) {
353+
return false;
354+
}
355+
} else if (!equivalentWithSimilarDecimal(
356+
*dataType.childAt(i), *schemaTyped.childAt(i))) {
357+
return false;
358+
}
359+
}
360+
} else if (schemaType.isArray()) {
361+
const auto& schemaTyped = schemaType.asArray();
362+
const auto& dataTyped = dataType.asArray();
363+
if (schemaTyped.childAt(0)->isDecimal()) {
364+
auto presicisonScaleSchema =
365+
getDecimalPrecisionScale(*schemaTyped.childAt(0));
366+
auto precisionScaleData = getDecimalPrecisionScale(*dataType.childAt(0));
367+
if (presicisonScaleSchema.first < precisionScaleData.first ||
368+
presicisonScaleSchema.second != precisionScaleData.second) {
369+
return false;
370+
}
371+
}
372+
return equivalentWithSimilarDecimal(
373+
*dataTyped.elementType(), *schemaTyped.elementType());
374+
} else if (schemaType.isMap()) {
375+
auto& schemaTyped = schemaType.asMap();
376+
auto& dataTyped = dataType.asMap();
377+
return equivalentWithSimilarDecimal(
378+
*dataTyped.keyType(), *schemaTyped.keyType()) &&
379+
equivalentWithSimilarDecimal(
380+
*dataTyped.valueType(), *schemaTyped.valueType());
381+
} else if (schemaType.isFunction()) {
382+
auto& schemaTyped = schemaType.asFunction();
383+
auto& dataTyped = dataType.asFunction();
384+
if (dataTyped.size() != schemaTyped.size()) {
385+
return false;
386+
}
387+
for (auto i = 0; i < schemaTyped.size(); ++i) {
388+
if (!equivalentWithSimilarDecimal(
389+
*dataTyped.childAt(i), *schemaTyped.childAt(i))) {
390+
return false;
391+
}
392+
}
393+
return true;
394+
}
395+
return true;
396+
}
397+
321398
} // namespace
322399

323400
Writer::Writer(
@@ -425,7 +502,7 @@ dwio::common::StripeProgress getStripeProgress(
425502
*/
426503
void Writer::write(const VectorPtr& data) {
427504
VELOX_USER_CHECK(
428-
data->type()->equivalent(*schema_),
505+
equivalentWithSimilarDecimal(*data->type(), *schema_),
429506
"The file schema type should be equal with the input rowvector type.");
430507

431508
VectorPtr exportData = data;

velox/exec/TableWriter.cpp

Lines changed: 6 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -72,15 +72,17 @@ TableWriter::TableWriter(
7272
void TableWriter::setTypeMappings(
7373
const core::TableWriteNodePtr& tableWriteNode) {
7474
auto outputNames = tableWriteNode->columnNames();
75-
auto outputTypes = tableWriteNode->columns()->children();
75+
auto tableTypes = tableWriteNode->insertTableHandle()
76+
->connectorInsertTableHandle()
77+
->getColumnHandleDataTypes();
7678

7779
const auto& inputType = tableWriteNode->sources()[0]->outputType();
7880

7981
// Ids that map input to output columns.
80-
inputMapping_.reserve(outputTypes.size());
82+
inputMapping_.reserve(tableTypes.size());
8183
std::vector<TypePtr> inputTypes;
8284

83-
// Generate mappings between input and output types. Note that column names
85+
// Generate mappings between input and table types. Note that column names
8486
// must match, but in some case the types won't, for example, when writing a
8587
// struct (ROW) as a flat map (MAP).
8688
for (const auto& name : tableWriteNode->columns()->names()) {
@@ -89,7 +91,7 @@ void TableWriter::setTypeMappings(
8991
inputTypes.emplace_back(inputType->childAt(idx));
9092
}
9193

92-
mappedOutputType_ = ROW(folly::copy(outputNames), std::move(outputTypes));
94+
mappedOutputType_ = ROW(folly::copy(outputNames), std::move(tableTypes));
9395
mappedInputType_ = ROW(std::move(outputNames), std::move(inputTypes));
9496
}
9597

velox/experimental/cudf/connectors/hive/CudfHiveDataSink.h

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -253,6 +253,14 @@ class CudfHiveInsertTableHandle : public ConnectorInsertTableHandle {
253253

254254
static void registerSerDe();
255255

256+
const std::vector<TypePtr> getColumnHandleDataTypes() const override {
257+
std::vector<TypePtr> tableTypes;
258+
for (const auto& i : inputColumns_) {
259+
tableTypes.emplace_back(i->type());
260+
}
261+
return tableTypes;
262+
}
263+
256264
std::string toString() const override;
257265

258266
private:

0 commit comments

Comments
 (0)