Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ void BroadcastFileWriter::writeFooter() {
VELOX_CHECK(!pageSizes_.empty());

facebook::presto::thrift::BroadcastFileFooter thriftFooter;
thriftFooter.pageSizes_ref() = pageSizes_;
thriftFooter.pageSizes() = pageSizes_;
auto serializedFooterBuf = thriftWriteIOBuf(thriftFooter);

int64_t footerSize =
Expand Down Expand Up @@ -238,7 +238,7 @@ void BroadcastFileReader::readFooter(velox::ReadFile* readFile) {
thriftRead(serializedFooter, thriftFooter);

// Extract page sizes from thrift footer
pageSizes_ = thriftFooter->pageSizes_ref().value();
pageSizes_ = thriftFooter->pageSizes().value();

// Validate the footer contents
VELOX_CHECK_GT(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,16 +39,16 @@ const std::string BASE_DATA_PATH =
TEST_F(TaskUpdateRequestTest, connectorId) {
protocol::ConnectorId connectorId;
thrift::ConnectorId thriftConnectorId;
thriftConnectorId.catalogName_ref() = "test";
thriftConnectorId.catalogName() = "test";
thrift::fromThrift(thriftConnectorId, connectorId);
ASSERT_EQ(connectorId, "test");
}

TEST_F(TaskUpdateRequestTest, optionalField) {
protocol::ResourceEstimates resourceEstimates;
thrift::ResourceEstimates thriftResourceEstimates;
thriftResourceEstimates.executionTime_ref() = 100;
thriftResourceEstimates.peakMemory_ref() = 1024 * 1024 * 1024;
thriftResourceEstimates.executionTime() = 100;
thriftResourceEstimates.peakMemory() = 1024 * 1024 * 1024;
thrift::fromThrift(thriftResourceEstimates, resourceEstimates);
ASSERT_EQ(
*resourceEstimates.executionTime,
Expand All @@ -63,9 +63,9 @@ TEST_F(TaskUpdateRequestTest, optionalField) {
TEST_F(TaskUpdateRequestTest, qualifiedObjectName) {
protocol::QualifiedObjectName qualifiedObjectName;
thrift::QualifiedObjectName thriftQualifiedObjectName;
thriftQualifiedObjectName.catalogName_ref() = "test_catalog";
thriftQualifiedObjectName.schemaName_ref() = "test_schema";
thriftQualifiedObjectName.objectName_ref() = "test_object";
thriftQualifiedObjectName.catalogName() = "test_catalog";
thriftQualifiedObjectName.schemaName() = "test_schema";
thriftQualifiedObjectName.objectName() = "test_object";
thrift::fromThrift(thriftQualifiedObjectName, qualifiedObjectName);
ASSERT_EQ(qualifiedObjectName, "test_catalog.test_schema.test_object");
}
Expand All @@ -74,11 +74,11 @@ TEST_F(TaskUpdateRequestTest, routineCharacteristics) {
protocol::RoutineCharacteristics routineCharacteristics;
thrift::RoutineCharacteristics thriftRroutineCharacteristics;
thrift::Language thriftLanguage;
thriftLanguage.language_ref() = "English";
thriftRroutineCharacteristics.language_ref() = std::move(thriftLanguage);
thriftRroutineCharacteristics.determinism_ref() =
thriftLanguage.language() = "English";
thriftRroutineCharacteristics.language() = std::move(thriftLanguage);
thriftRroutineCharacteristics.determinism() =
thrift::Determinism::NOT_DETERMINISTIC;
thriftRroutineCharacteristics.nullCallClause_ref() =
thriftRroutineCharacteristics.nullCallClause() =
thrift::NullCallClause::RETURNS_NULL_ON_NULL_INPUT;
thrift::fromThrift(thriftRroutineCharacteristics, routineCharacteristics);
ASSERT_EQ((*routineCharacteristics.language).language, "English");
Expand All @@ -93,14 +93,14 @@ TEST_F(TaskUpdateRequestTest, routineCharacteristics) {
TEST_F(TaskUpdateRequestTest, mapOutputBuffers) {
protocol::OutputBuffers outputBuffers;
thrift::OutputBuffers thriftOutputBuffers;
thriftOutputBuffers.type_ref() = thrift::BufferType::ARBITRARY;
thriftOutputBuffers.version_ref() = 1;
thriftOutputBuffers.noMoreBufferIds_ref() = true;
thriftOutputBuffers.type() = thrift::BufferType::ARBITRARY;
thriftOutputBuffers.version() = 1;
thriftOutputBuffers.noMoreBufferIds() = true;
thrift::OutputBufferId outputBufferId1;
thrift::OutputBufferId outputBufferId2;
outputBufferId1.id_ref() = 1;
outputBufferId2.id_ref() = 2;
thriftOutputBuffers.buffers_ref() = {
outputBufferId1.id() = 1;
outputBufferId2.id() = 2;
thriftOutputBuffers.buffers() = {
{outputBufferId1, 10}, {outputBufferId2, 20}};

thrift::fromThrift(thriftOutputBuffers, outputBuffers);
Expand All @@ -113,12 +113,12 @@ TEST_F(TaskUpdateRequestTest, mapOutputBuffers) {

TEST_F(TaskUpdateRequestTest, binaryHiveSplitFromThrift) {
thrift::Split thriftSplit;
thriftSplit.connectorId()->catalogName_ref() = "hive";
thriftSplit.transactionHandle()->jsonValue_ref() = R"({
thriftSplit.connectorId()->catalogName() = "hive";
thriftSplit.transactionHandle()->jsonValue() = R"({
"@type": "hive",
"uuid": "8a4d6c83-60ee-46de-9715-bc91755619fa"
})";
thriftSplit.connectorSplit()->jsonValue_ref() =
thriftSplit.connectorSplit()->jsonValue() =
slurp(getDataPath(BASE_DATA_PATH, "HiveSplit.json"));

protocol::Split split;
Expand All @@ -143,25 +143,25 @@ TEST_F(TaskUpdateRequestTest, binaryRemoteSplitFromThrift) {
thrift::RemoteTransactionHandle thriftTransactionHandle;
thrift::RemoteSplit thriftRemoteSplit;

thriftSplit.connectorId()->catalogName_ref() = "$remote";
thriftSplit.transactionHandle()->customSerializedValue_ref() =
thriftSplit.connectorId()->catalogName() = "$remote";
thriftSplit.transactionHandle()->customSerializedValue() =
thriftWrite(thriftTransactionHandle);

thriftRemoteSplit.location()->location_ref() = "/test_location";
thriftRemoteSplit.remoteSourceTaskId()->id_ref() = 100;
thriftRemoteSplit.remoteSourceTaskId()->attemptNumber_ref() = 200;
thriftRemoteSplit.remoteSourceTaskId()->stageExecutionId()->id_ref() = 300;
thriftRemoteSplit.location()->location() = "/test_location";
thriftRemoteSplit.remoteSourceTaskId()->id() = 100;
thriftRemoteSplit.remoteSourceTaskId()->attemptNumber() = 200;
thriftRemoteSplit.remoteSourceTaskId()->stageExecutionId()->id() = 300;
thriftRemoteSplit.remoteSourceTaskId()
->stageExecutionId()
->stageId()
->id_ref() = 400;
->id() = 400;
thriftRemoteSplit.remoteSourceTaskId()
->stageExecutionId()
->stageId()
->queryId_ref() = "test_query_id";
->queryId() = "test_query_id";

thriftSplit.connectorSplit()->connectorId_ref() = "$remote";
thriftSplit.connectorSplit()->customSerializedValue_ref() =
thriftSplit.connectorSplit()->connectorId() = "$remote";
thriftSplit.connectorSplit()->customSerializedValue() =
thriftWrite(thriftRemoteSplit);

protocol::Split split;
Expand All @@ -178,14 +178,14 @@ TEST_F(TaskUpdateRequestTest, unionExecutionWriterTargetFromThrift) {
// Construct ExecutionWriterTarget with CreateHandle
thrift::CreateHandle thriftCreateHandle;
thrift::ExecutionWriterTargetUnion thriftWriterTarget;
thriftCreateHandle.schemaTableName()->schema_ref() = "test_schema";
thriftCreateHandle.schemaTableName()->table_ref() = "test_table";
thriftCreateHandle.handle()->connectorId()->catalogName_ref() = "hive";
thriftCreateHandle.handle()->transactionHandle()->jsonValue_ref() = R"({
thriftCreateHandle.schemaTableName()->schema() = "test_schema";
thriftCreateHandle.schemaTableName()->table() = "test_table";
thriftCreateHandle.handle()->connectorId()->catalogName() = "hive";
thriftCreateHandle.handle()->transactionHandle()->jsonValue() = R"({
"@type": "hive",
"uuid": "8a4d6c83-60ee-46de-9715-bc91755619fa"
})";
thriftCreateHandle.handle()->connectorHandle()->jsonValue_ref() =
thriftCreateHandle.handle()->connectorHandle()->jsonValue() =
slurp(getDataPath(BASE_DATA_PATH, "HiveOutputTableHandle.json"));
;
thriftWriterTarget.set_createHandle(std::move(thriftCreateHandle));
Expand Down Expand Up @@ -233,14 +233,14 @@ TEST_F(TaskUpdateRequestTest, unionExecutionWriterTargetToThrift) {
// connector fields are not implemented.
thrift::ExecutionWriterTargetUnion thriftWriterTarget;
thrift::toThrift(writerTarget, thriftWriterTarget);
ASSERT_TRUE(thriftWriterTarget.createHandle_ref().has_value());
ASSERT_TRUE(thriftWriterTarget.createHandle().has_value());
const auto& thriftCreateHandle =
thriftWriterTarget.createHandle_ref().value();
thriftWriterTarget.createHandle().value();
ASSERT_EQ(
thriftCreateHandle.schemaTableName()->schema_ref().value(),
thriftCreateHandle.schemaTableName()->schema().value(),
"test_schema");
ASSERT_EQ(
thriftCreateHandle.schemaTableName()->table_ref().value(), "test_table");
thriftCreateHandle.schemaTableName()->table().value(), "test_table");
}

TEST_F(TaskUpdateRequestTest, fragment) {
Expand Down Expand Up @@ -283,7 +283,7 @@ TEST_F(TaskUpdateRequestTest, sessionRepresentation) {
{"Name", "Jane Doe"}, {"Age", "25"}, {"City", "Los Angeles"}};
thriftMap["Person3"] = {
{"Name", "Bob Smith"}, {"Age", "40"}, {"City", "Chicago"}};
thriftSessionRepresentation.unprocessedCatalogProperties_ref() =
thriftSessionRepresentation.unprocessedCatalogProperties() =
std::move(thriftMap);

thrift::fromThrift(thriftSessionRepresentation, sessionRepresentation);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,59 +54,59 @@ class ThriftIOTest : public ::testing::Test {

TEST_F(ThriftIOTest, thriftHostAddressRoundTrip) {
thrift::HostAddress original;
original.hostPortString_ref() = "localhost:8080";
original.hostPortString() = "localhost:8080";

testThriftRoundTrips(original);
}

TEST_F(ThriftIOTest, thriftStageIdRoundTrip) {
thrift::StageId original;
original.queryId_ref() = "test_query_123";
original.id_ref() = 42;
original.queryId() = "test_query_123";
original.id() = 42;

testThriftRoundTrips(original);
}

TEST_F(ThriftIOTest, thriftPlanNodeIdRoundTrip) {
thrift::PlanNodeId original;
original.id_ref() = "test_plan_node_123";
original.id() = "test_plan_node_123";

testThriftRoundTrips(original);
}

TEST_F(ThriftIOTest, thriftEmptyStringRoundTrip) {
thrift::HostAddress original;
original.hostPortString_ref() = ""; // Empty string
original.hostPortString() = ""; // Empty string

testThriftRoundTrips(original);
}

TEST_F(ThriftIOTest, thriftLargeDataRoundTrip) {
thrift::StageId original;
original.queryId_ref() = std::string(1000, 'x'); // Large string
original.id_ref() = INT32_MAX;
original.queryId() = std::string(1000, 'x'); // Large string
original.id() = INT32_MAX;

testThriftRoundTrips(original);
}

TEST_F(ThriftIOTest, thriftNegativeValuesRoundTrip) {
thrift::StageId original;
original.queryId_ref() = "negative_test";
original.id_ref() = -12345;
original.queryId() = "negative_test";
original.id() = -12345;

testThriftRoundTrips(original);
}

TEST_F(ThriftIOTest, thriftOutputBufferIdRoundTrip) {
thrift::OutputBufferId original;
original.id_ref() = 98765;
original.id() = 98765;

testThriftRoundTrips(original);
}

TEST_F(ThriftIOTest, thriftZeroValuesRoundTrip) {
thrift::OutputBufferId original;
original.id_ref() = 0;
original.id() = 0;

testThriftRoundTrips(original);
}
Expand All @@ -117,14 +117,14 @@ TEST_F(ThriftIOTest, thriftTaskIdRoundTrip) {
// Create nested StageExecutionId
thrift::StageExecutionId stageExecutionId;
thrift::StageId stageId;
stageId.queryId_ref() = "complex_query_789";
stageId.id_ref() = 555;
stageExecutionId.stageId_ref() = stageId;
stageExecutionId.id_ref() = 777;
original.stageExecutionId_ref() = stageExecutionId;
stageId.queryId() = "complex_query_789";
stageId.id() = 555;
stageExecutionId.stageId() = stageId;
stageExecutionId.id() = 777;
original.stageExecutionId() = stageExecutionId;

original.id_ref() = 999;
original.attemptNumber_ref() = 3;
original.id() = 999;
original.attemptNumber() = 3;

testThriftRoundTrips(original);
}
Expand All @@ -134,11 +134,11 @@ TEST_F(ThriftIOTest, thriftStageExecutionIdRoundTrip) {

// Create nested StageId
thrift::StageId stageId;
stageId.queryId_ref() = "nested_stage_query";
stageId.id_ref() = 111;
original.stageId_ref() = stageId;
stageId.queryId() = "nested_stage_query";
stageId.id() = 111;
original.stageId() = stageId;

original.id_ref() = 222;
original.id() = 222;

testThriftRoundTrips(original);
}
Expand All @@ -147,19 +147,19 @@ TEST_F(ThriftIOTest, thriftBroadcastFileFooterRoundTrip) {
thrift::BroadcastFileFooter original;

// Test with empty page sizes
original.pageSizes_ref() = std::vector<int64_t>{};
original.pageSizes() = std::vector<int64_t>{};
testThriftRoundTrips(original);

// Test with single page size
original.pageSizes_ref() = std::vector<int64_t>{1024};
original.pageSizes() = std::vector<int64_t>{1024};
testThriftRoundTrips(original);

// Test with multiple page sizes
original.pageSizes_ref() = std::vector<int64_t>{1024, 2048, 4096, 8192};
original.pageSizes() = std::vector<int64_t>{1024, 2048, 4096, 8192};
testThriftRoundTrips(original);

// Test with large page sizes
original.pageSizes_ref() = std::vector<int64_t>{
original.pageSizes() = std::vector<int64_t>{
1073741824, // 1GB
2147483648, // 2GB
268435456, // 256MB
Expand All @@ -168,6 +168,6 @@ TEST_F(ThriftIOTest, thriftBroadcastFileFooterRoundTrip) {
testThriftRoundTrips(original);

// Test with zero and negative values (edge cases)
original.pageSizes_ref() = std::vector<int64_t>{0, -1, 100, 0, 50};
original.pageSizes() = std::vector<int64_t>{0, -1, 100, 0, 50};
testThriftRoundTrips(original);
}
Loading