|
36 | 36 | #include "presto_cpp/main/operators/ShuffleWrite.h" |
37 | 37 | #include "presto_cpp/main/types/TypeParser.h" |
38 | 38 | #include "velox/exec/TraceUtil.h" |
| 39 | +#include "velox/connectors/hive/HiveDataSink.h" |
39 | 40 |
|
40 | 41 | using namespace facebook::velox; |
41 | 42 | using namespace facebook::velox::exec; |
@@ -75,6 +76,19 @@ RowTypePtr toRowType( |
75 | 76 | return ROW(std::move(names), std::move(types)); |
76 | 77 | } |
77 | 78 |
|
| 79 | +RowTypePtr toRowType( |
| 80 | + const std::vector<protocol::VariableReferenceExpression>& variables, |
| 81 | + const std::vector<TypePtr>& types) { |
| 82 | + VELOX_CHECK_EQ(types.size(), variables.size()); |
| 83 | + std::vector<std::string> names; |
| 84 | + names.reserve(variables.size()); |
| 85 | + for (const auto& variable : variables) { |
| 86 | + names.emplace_back(variable.name); |
| 87 | + } |
| 88 | + |
| 89 | + return ROW(std::move(names), std::move(types)); |
| 90 | +} |
| 91 | + |
78 | 92 | std::shared_ptr<connector::ColumnHandle> toColumnHandle( |
79 | 93 | const protocol::ColumnHandle* column, |
80 | 94 | const TypeParser& typeParser) { |
@@ -1526,9 +1540,15 @@ VeloxQueryPlanConverterBase::toVeloxQueryPlan( |
1526 | 1540 | sourceVeloxPlan, |
1527 | 1541 | tableWriteInfo, |
1528 | 1542 | taskId); |
| 1543 | + |
| 1544 | + std::vector<TypePtr> tableTypes; |
| 1545 | + auto inputColumns = std::dynamic_pointer_cast<const connector::hive::HiveInsertTableHandle>(insertTableHandle->connectorInsertTableHandle())->inputColumns(); |
| 1546 | + for (const auto& inputColumn : inputColumns) { |
| 1547 | + tableTypes.emplace_back(inputColumn->dataType()); |
| 1548 | + } |
1529 | 1549 | return std::make_shared<core::TableWriteNode>( |
1530 | 1550 | node->id, |
1531 | | - toRowType(node->columns, typeParser_), |
| 1551 | + toRowType(node->columns, tableTypes), |
1532 | 1552 | node->columnNames, |
1533 | 1553 | columnStatsSpec, |
1534 | 1554 | std::move(insertTableHandle), |
|
0 commit comments