-
Notifications
You must be signed in to change notification settings - Fork 0
Clp integration s3 #1
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
// clear the stage from last run | ||
m_query_runner->populate_string_queries(); | ||
|
||
// probably have another class for query evaluation and filter |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this a TODO?
m_current_schema_index = | ||
(m_current_schema_index + 1) % m_matched_schemas.size(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same question, why it is a cycle?
m_current_schema_table_loaded = false; | ||
} | ||
|
||
if (m_expression_value != EvaluatedValue::False) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Can we replace all "!= EvaluatedValue::False" with "== EvaluatedValue::True"? I think that'll be more clear.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actually "EvaluatedValue::True == m_expression_value", I remember that we should write the constant before the variable in the condition
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
There's an Unknown case
m_error_code = ErrorCode::DictionaryNotFound; | ||
continue; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why it's "continue" but not "break" when running into the error? Actually I feel a bit confusing in this inner while loop, I fill we mixed two things in the same iteration. If here we just want to skip those schemas not found, maybe we can filter them before this inner while loop starts
while (false == m_completed_archive_cycles) { | ||
m_error_code = load_archive(); | ||
|
||
if (ErrorCode::Success == m_error_code) { | ||
m_query_runner = std::make_shared<QueryRunner>( | ||
m_expr, | ||
m_schema_match, | ||
m_ignore_case, | ||
m_schema_map, | ||
m_schema_tree, | ||
m_projection, | ||
m_var_dict, | ||
m_log_dict); | ||
m_query_runner->populate_string_queries(); | ||
break; | ||
} | ||
move_to_next_archive(); | ||
} | ||
} | ||
return 0; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But here we are actually already in the while (false == m_completed_archive_cycles) loop right? Seems we can get rid of the outer one? Because when this loop ends, m_completed_archive_cycles is true and the outer loop also ends. So does the outer while loop actually only iterates once?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
No. could be multiple times
size_t Cursor::fetch_next( | ||
size_t num_rows, | ||
std::vector<ColumnData>& column_vectors) { | ||
if (m_error_code != ErrorCode::Success) { | ||
return 0; | ||
} | ||
|
||
while (false == m_completed_archive_cycles) { | ||
while (false == m_completed_schema_cycles) { | ||
// whether the schema table is loaded | ||
if (false == m_current_schema_table_loaded) { | ||
m_current_schema_id = m_matched_schemas[m_current_schema_index]; | ||
m_query_runner->set_schema(m_current_schema_id); | ||
m_query_runner->populate_searched_wildcard_columns(); | ||
m_expression_value = m_query_runner->constant_propagate(); | ||
|
||
if (m_expression_value != EvaluatedValue::False) { | ||
m_query_runner->add_wildcard_columns_to_searched_columns(); | ||
|
||
if (m_archive_read_stage < ArchiveReadStage::TablesInitialized) { | ||
m_archive_reader.open_packed_streams(); | ||
m_archive_read_stage = ArchiveReadStage::TablesInitialized; | ||
} | ||
auto& reader = m_archive_reader.read_schema_table( | ||
m_current_schema_id, false, false); | ||
reader.initialize_filter_with_column_map(m_query_runner.get()); | ||
m_error_code = ErrorCode::Success; | ||
m_current_schema_table_loaded = true; | ||
} else { | ||
m_current_schema_index = | ||
(m_current_schema_index + 1) % m_matched_schemas.size(); | ||
m_error_code = ErrorCode::DictionaryNotFound; | ||
continue; | ||
} | ||
} | ||
|
||
if (auto num_rows_fetched = | ||
m_query_runner->fetch_next(num_rows, column_vectors); | ||
num_rows_fetched > 0) { | ||
return num_rows_fetched; | ||
} | ||
|
||
m_current_schema_index = | ||
(m_current_schema_index + 1) % m_matched_schemas.size(); | ||
m_completed_schema_cycles = m_current_schema_index == m_end_schema_index; | ||
m_current_schema_table_loaded = false; | ||
} | ||
|
||
move_to_next_archive(); | ||
while (false == m_completed_archive_cycles) { | ||
m_error_code = load_archive(); | ||
|
||
if (ErrorCode::Success == m_error_code) { | ||
m_query_runner = std::make_shared<QueryRunner>( | ||
m_expr, | ||
m_schema_match, | ||
m_ignore_case, | ||
m_schema_map, | ||
m_schema_tree, | ||
m_projection, | ||
m_var_dict, | ||
m_log_dict); | ||
m_query_runner->populate_string_queries(); | ||
break; | ||
} | ||
move_to_next_archive(); | ||
} | ||
} | ||
return 0; | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels a major code duplication, any chance we can improve this?
size_t num_rows, | ||
std::vector<facebook::velox::VectorPtr>& column_vectors); | ||
|
||
size_t fetch_next(size_t num_rows, std::vector<ColumnData>& column_vectors); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess this method could be private? Seems no other places directly call it
* The main reason is that here we don't want to allow projection to travel | ||
* inside unstructured objects -- it may be possible to support such a thing | ||
* in the future, but it poses some extra challenges (e.g. deciding what to do | ||
* when projecting repeated elements in a structure). |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Mark it as TODO?
std::vector<int32_t> local_matching_node_list; | ||
auto cur_node_id = tree->get_object_subtree_node_id(); | ||
auto it = column->descriptor_begin(); | ||
while (it != column->descriptor_end()) { | ||
bool matched_any{false}; | ||
auto cur_it = it++; | ||
bool last_token = it == column->descriptor_end(); | ||
auto const& cur_node = tree->get_node(cur_node_id); | ||
for (int32_t child_node_id : cur_node.get_children_ids()) { | ||
auto const& child_node = tree->get_node(child_node_id); | ||
|
||
// Intermediate nodes must be objects | ||
if (false == last_token && child_node.get_type() != NodeType::Object) { | ||
continue; | ||
} | ||
|
||
if (child_node.get_key_name() != cur_it->get_token()) { | ||
continue; | ||
} | ||
|
||
matched_any = true; | ||
if (last_token && | ||
column->matches_type(node_to_literal_type(child_node.get_type()))) { | ||
m_matching_nodes.insert(child_node_id); | ||
local_matching_node_list.push_back(child_node_id); | ||
} else if (false == last_token) { | ||
cur_node_id = child_node_id; | ||
break; | ||
} | ||
} | ||
|
||
if (false == matched_any) { | ||
break; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feel not very confident reviewing this function
* Set the schema to filter | ||
* @param schema | ||
*/ | ||
void set_schema(int32_t schema) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe rename it to init_schema? It does much more things than just set "m_schema"
* @param vector_index | ||
* @param column_vectors | ||
*/ | ||
void get_message( |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How about renaming it to "populate_message"? It's weird that a getter function return void
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Also introduce a bit of term "message" here? It's essentially the a piece of data in clp-s perspective and for each "message" we convert it to a "row" of results right?
m_datestring_readers.clear(); | ||
m_basic_readers.clear(); | ||
|
||
auto matching_nodes_list = m_projection->get_matching_nodes_list(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we rename it to "get_matched_nodes_list"? because in Cursor, there is a m_matched_schemas
|
||
auto matching_nodes_list = m_projection->get_matching_nodes_list(); | ||
for (const auto& node_ids : matching_nodes_list) { | ||
if (node_ids.empty()) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I am a bit confusing here. So essentially, are the schemas and the nodes here the same thing? They both represent the data types? And I figure the "node" is a term for clp-s internal use only, but here we are talking about columns which is a database term. Shall we unite them as "schema" to avoid misunderstanding?
|
||
// Try to find a matching column in m_column_map | ||
bool found_reader = false; | ||
for (const auto node_id : node_ids) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
So it'll take the first matched "node_id" (i.e., the type) as the column, then the order of iterating the node_ids matters. Do we expect it iterates the node_ids with a specific order or not? If not, are the all node_ids matched to the same column (i.e., field) the same? (e.g., Varchar and ClpString are the same, you are free to use either and it does no matter) If they are the same, then why we maintain a vector of them rather than we explicitly define a clp-s node type for each type of the projected field, so that we won't need any vector to hold all matched ids and make the code more concise?
if (!found_reader) { | ||
m_projected_columns.push_back(nullptr); | ||
m_projected_types.push_back(NodeType::Unknown); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I guess if node_ids.empty() is true then there would be no entries for it in the column_map? Is this check duplicate with the check at the beginning of the for loop?
ClpStringColumnReader* clp_reader = | ||
dynamic_cast<ClpStringColumnReader*>(column_reader); | ||
VariableStringColumnReader* var_reader = | ||
dynamic_cast<VariableStringColumnReader*>(column_reader); | ||
DateStringColumnReader* date_reader = | ||
dynamic_cast<DateStringColumnReader*>(column_reader); | ||
if (nullptr != clp_reader && | ||
clp_reader->get_type() == NodeType::ClpString) { | ||
m_clp_string_readers[column_id].push_back(clp_reader); | ||
} else if ( | ||
nullptr != var_reader && | ||
var_reader->get_type() == NodeType::VarString) { | ||
m_var_string_readers[column_id].push_back(var_reader); | ||
} else if (nullptr != date_reader) { | ||
// Datestring readers with a given column ID are guaranteed not to | ||
// repeat | ||
m_datestring_readers.emplace(column_id, date_reader); | ||
} else { | ||
m_basic_readers[column_id].push_back(column_reader); | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I saw you wrote like this somewhere else, so why not also do it here:
ClpStringColumnReader* clp_reader = | |
dynamic_cast<ClpStringColumnReader*>(column_reader); | |
VariableStringColumnReader* var_reader = | |
dynamic_cast<VariableStringColumnReader*>(column_reader); | |
DateStringColumnReader* date_reader = | |
dynamic_cast<DateStringColumnReader*>(column_reader); | |
if (nullptr != clp_reader && | |
clp_reader->get_type() == NodeType::ClpString) { | |
m_clp_string_readers[column_id].push_back(clp_reader); | |
} else if ( | |
nullptr != var_reader && | |
var_reader->get_type() == NodeType::VarString) { | |
m_var_string_readers[column_id].push_back(var_reader); | |
} else if (nullptr != date_reader) { | |
// Datestring readers with a given column ID are guaranteed not to | |
// repeat | |
m_datestring_readers.emplace(column_id, date_reader); | |
} else { | |
m_basic_readers[column_id].push_back(column_reader); | |
} | |
if (auto* clp_reader = dynamic_cast<ClpStringColumnReader*>(column_reader); | |
clp_reader && clp_reader->get_type() == NodeType::ClpString) { | |
m_clp_string_readers[column_id].push_back(clp_reader); | |
} else if (auto* var_reader = dynamic_cast<VariableStringColumnReader*>(column_reader); | |
var_reader && var_reader->get_type() == NodeType::VarString) { | |
m_var_string_readers[column_id].push_back(var_reader); | |
} else if (auto* date_reader = dynamic_cast<DateStringColumnReader*>(column_reader)) { | |
m_datestring_readers.emplace(column_id, date_reader); | |
} else { | |
m_basic_readers[column_id].push_back(column_reader); | |
} |
// Unstructured arrays with the same column id can not appear multiple times | ||
// in one schema in the current implementation. | ||
auto rit = m_extracted_unstructured_arrays.emplace( | ||
column_id, | ||
std::get<std::string>( | ||
m_basic_readers[column_id][0]->extract_value(m_cur_message))); | ||
return rit.first->second; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't understand this, need to familiarize clp-s more. Is this is a caveat did we mention it in the RFC? (or somewhere more obvious, people seldom notices feature-related caveat hiding in the comments in the code
size_t num_rows, | ||
std::vector<facebook::velox::VectorPtr>& column_vectors) { | ||
size_t num_rows_fetched = 0; | ||
while (m_cur_message < m_num_messages) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
maybe rename the m_cur_message to m_cur_message_id?
for (size_t i = 0; i < column_vectors.size(); ++i) { | ||
switch (m_projected_types[i]) { | ||
case NodeType::Integer: { | ||
auto vector = column_vectors[i]->asFlatVector<int64_t>(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
rename vector to int_vector? (align with other data types below?)
|
||
auto array_begin_offset = m_array_offsets[i]; | ||
auto array_end_offset = array_begin_offset; | ||
auto elements = array_vector->elements() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
elements -> array_vector_elements?
|
||
for (auto arrayElement : obj.get_array()) { | ||
// Get each array element as a string | ||
auto elementStringWithQuotes = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
shall we use dash or camel naming here (like float_vector, bool_vector you use dash, but here it is camel)? element_string_with_quotes? Maybe make it consistent.
case NodeType::NullValue: | ||
case NodeType::Unknown: | ||
column_vectors[i]->setNull(vector_index, true); | ||
break; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Missing default handler, should be the same as NullValue and Unknown
void QueryRunner::get_message( | ||
uint64_t message_index, | ||
uint64_t vector_index, | ||
std::vector<ColumnData>& column_vectors) { | ||
for (size_t i = 0; i < column_vectors.size(); ++i) { | ||
switch (m_projected_types[i]) { | ||
case NodeType::Integer: { | ||
auto& vector = std::get<std::vector<int64_t>>(column_vectors[i]); | ||
vector[vector_index] = std::get<int64_t>( | ||
m_projected_columns[i]->extract_value(message_index)); | ||
break; | ||
} | ||
case NodeType::Float: { | ||
auto& vector = std::get<std::vector<double>>(column_vectors[i]); | ||
vector[vector_index] = std::get<double>( | ||
m_projected_columns[i]->extract_value(message_index)); | ||
break; | ||
} | ||
case NodeType::Boolean: { | ||
auto& vector = std::get<std::vector<bool>>(column_vectors[i]); | ||
vector[vector_index] = | ||
std::get<uint8_t>( | ||
m_projected_columns[i]->extract_value(message_index)) != 0; | ||
break; | ||
} | ||
case NodeType::ClpString: | ||
case NodeType::VarString: | ||
case NodeType::UnstructuredArray: { | ||
auto& vector = std::get<std::vector<std::string>>(column_vectors[i]); | ||
vector[vector_index] = std::get<std::string>( | ||
m_projected_columns[i]->extract_value(message_index)); | ||
break; | ||
} | ||
default: | ||
break; | ||
} | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same as mentioned before, feels a major duplicated code. If we want to take care of both std::vector<facebook::velox::VectorPtr>
and std::vector<ColumnData>
, can we extract an interface to wrap VectorPtr
and ColumnData
? Then we only have one get_message and these switch handlers.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
And it'll also reduce the other place that is duplicated
} | ||
|
||
bool QueryRunner::evaluate(Expression* expr, int32_t schema) { | ||
if (m_expression_value == EvaluatedValue::True) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
EvaluatedValue::True == m_expression_value
ret = false; | ||
} | ||
|
||
do { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why do-while? I seldom see people use this. Is the expr
nullable here?
// must be an AND-expr because AST would have been simplified | ||
// to eliminate nested OR |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Feels a major incomplete of functionality, same as before, we should mention this in a more obvious place not hiding in comments
bool QueryRunner::evaluate_filter(FilterExpr* expr, int32_t schema) { | ||
auto* column = expr->get_column().get(); | ||
int32_t column_id = column->get_column_id(); | ||
auto literal = expr->get_operand(); | ||
Query* q = nullptr; | ||
std::unordered_set<int64_t>* matching_vars = nullptr; | ||
switch (column->get_literal_type()) { | ||
case LiteralType::IntegerT: | ||
return evaluate_int_filter(expr->get_operation(), column_id, literal); | ||
case LiteralType::FloatT: | ||
return evaluate_float_filter(expr->get_operation(), column_id, literal); | ||
case LiteralType::ClpStringT: | ||
q = m_expr_clp_query[expr]; | ||
return evaluate_clp_string_filter( | ||
expr->get_operation(), q, m_clp_string_readers[column_id]); | ||
case LiteralType::VarStringT: | ||
matching_vars = m_expr_var_match_map.at(expr); | ||
return evaluate_var_string_filter( | ||
expr->get_operation(), | ||
m_var_string_readers[column_id], | ||
matching_vars); | ||
case LiteralType::BooleanT: | ||
return evaluate_bool_filter(expr->get_operation(), column_id, literal); | ||
case LiteralType::ArrayT: | ||
return evaluate_array_filter( | ||
expr->get_operation(), | ||
column->get_unresolved_tokens(), | ||
get_cached_decompressed_unstructured_array(column_id), | ||
literal); | ||
case LiteralType::EpochDateT: | ||
return evaluate_epoch_date_filter( | ||
expr->get_operation(), m_datestring_readers[column_id], literal); | ||
// case LiteralType::NullT: | ||
// null checks are always turned into existence operators -- | ||
// no need to evaluate here | ||
default: | ||
return false; | ||
} | ||
} |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Similarly, a lot of duplicated code on implementing this filter. Can we switch(expr->get_operation())
here, and for each type of operation, we have a evaluate_core
function, which overloads for each data type (i.e., int, float, etc). Then we only have one switch, and the data type specific logic will be taken care by function overload. Then we can get rid of these evaluate_X_filter and evaluate_X_filter_core things and significantly reduce the lines of the code.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nvm, found the rest were already open sourced. Yea, maybe we can move these reused part to clp_src, to reduce the coupling with velox and duplication
m_maybe_string = operand->as_var_string(m_array_search_string, op) || | ||
operand->as_clp_string(m_array_search_string, op); | ||
|
||
return evaluate_wildcard_array_filter(array, op, operand); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is there possibility that stack overflows when using recursion?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Actionable comments posted: 3
🧹 Nitpick comments (5)
velox/connectors/clp/ClpConnectorSplit.h (1)
17-19
: Consider including more contextual details intoString()
.
While it currently returns the schema and table, includingarchivePath_
as part of the formatted string may help debugging when multiple archives are in use.-return fmt::format("CLP: {}.{}", schemaName_, tableName_); +return fmt::format("CLP: {}.{} (archive path: {})", + schemaName_, + tableName_, + archivePath_);velox/connectors/clp/ClpDataSource.cpp (4)
28-28
: Adjust comparison to follow coding guidelines.According to our coding guidelines, we should prefer
false == <expression>
rather than!<expression>
. The code should be updated to match this pattern.- if ("local" != inputSource_ && "s3" != inputSource_) { + if (false == (inputSource_ == "local" || inputSource_ == "s3")) {
97-109
: Use enum for input source types.String comparisons for input source types are error-prone and less type-safe. Consider creating an enum class for these values to make the code more maintainable and robust.
+enum class InputSourceType { + Local, + S3 +}; + +InputSourceType parseInputSourceType(const std::string& source) { + std::string lowerSource = source; + boost::algorithm::to_lower(lowerSource); + if (lowerSource == "local") { + return InputSourceType::Local; + } else if (lowerSource == "s3") { + return InputSourceType::S3; + } + VELOX_USER_FAIL("Illegal input source: {}", source); +} // In constructor: - inputSource_ = clpConfig->inputSource(); - boost::algorithm::to_lower(inputSource_); + inputSourceType_ = parseInputSourceType(clpConfig->inputSource()); // Then update the addSplit method: - if (inputSource_ == "local") { + if (inputSourceType_ == InputSourceType::Local) { cursor_ = std::make_unique<search_lib::Cursor>( archiveDir_, clp_s::InputSource::Filesystem, std::vector<std::string>{archiveId}, false); - } else if (inputSource_ == "s3") { + } else if (inputSourceType_ == InputSourceType::S3) { cursor_ = std::make_unique<search_lib::Cursor>( archiveDir_ + archiveId, clp_s::InputSource::Network, std::vector<std::string>{}, false); }
1-4
: Optimize includes.The
<iostream>
include might not be needed if thestd::cout
line is removed or replaced with a logger. Similarly, check if<optional>
is required after implementing the suggested changes.
43-46
: Error message clarity.Consider using a more specific error message that explains why the column handle must be present.
- "ColumnHandle not found for output name: {}", + "ColumnHandle must not be empty for output name: {}",
📜 Review details
Configuration used: CodeRabbit UI
Review profile: CHILL
Plan: Pro
📒 Files selected for processing (6)
CMake/ExternalAntlr4Cpp.cmake
(1 hunks)CMake/FindANTLR.cmake
(1 hunks)CMakeLists.txt
(5 hunks)velox/connectors/clp/CMakeLists.txt
(1 hunks)velox/connectors/clp/ClpConnectorSplit.h
(1 hunks)velox/connectors/clp/ClpDataSource.cpp
(1 hunks)
🚧 Files skipped from review as they are similar to previous changes (2)
- CMake/ExternalAntlr4Cpp.cmake
- velox/connectors/clp/CMakeLists.txt
🧰 Additional context used
📓 Path-based instructions (1)
`**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}`: - Prefer `false == ...
**/*.{cpp,hpp,java,js,jsx,tpp,ts,tsx}
: - Preferfalse == <expression>
rather than!<expression>
.
velox/connectors/clp/ClpDataSource.cpp
🧠 Learnings (1)
CMakeLists.txt (1)
Learnt from: anlowee
PR: y-scope/velox#1
File: CMakeLists.txt:458-483
Timestamp: 2025-03-17T15:01:02.465Z
Learning: When contributing to the Velox project, prefer adding new code on top of existing scripts rather than restructuring the original code organization, especially for dependency management.
🔇 Additional comments (13)
velox/connectors/clp/ClpConnectorSplit.h (2)
6-6
: Consistent naming convention for new Split struct.
Declaring the connector split as astruct
is consistent with the usage found in other Velox connectors (e.g., Hive, Tpch). No concerns here.
7-15
: Constructor parameters look appropriate.
All required fields are captured, and the base class constructor is invoked correctly. The usage ofconst std::string&
parameters prevents unnecessary copies.CMake/FindANTLR.cmake (2)
25-26
: Static ANTLR4 usage.
Using-DANTLR4CPP_STATIC
implies static linking of the ANTLR4 runtime. If the project needs to switch to shared linking later, adapt this definition accordingly.Also applies to: 28-29
55-161
: MacroANTLR_TARGET
implementation.
This macro is largely borrowed from standard ANTLR conventions, which is acceptable. The configuration is clear and properly handles multiple outputs, listeners, and visitors. No immediate concerns.CMakeLists.txt (5)
99-99
: CLP connector build option default.
Enabling the CLP connector by default is fine if you expect broad usage. If it is more niche, consider defaulting it to OFF to reduce build overhead for most users.
141-141
: Correct disabling of CLP in minimal builds.
Turning off the CLP connector when building minimal is consistent with the approach for other optional features.
158-158
: Enable CLP while testing.
For thorough test coverage, this is legitimate. Confirm that tests exist for the new CLP functionality if they are not already added.
451-470
: Conditional resolution of CLP connector dependencies.
Bundlingspdlog
and resolvingmsgpack-cxx
,CURL
, andANTLR
uponVELOX_ENABLE_CLP_CONNECTOR
is a clean approach. The checks and error messages are helpful.
472-475
: Loadingabsl
for CLP connector or testing.
This aligns with the user preference of appending new code rather than restructuring existing scripts. Good usage ofresolve_dependency(absl)
.velox/connectors/clp/ClpDataSource.cpp (4)
27-27
: Need null check for clpTableHandle.The
clpTableHandle
is cast fromtableHandle
but there's no null check before using it, which could cause a null pointer dereference in line 32. Consider adding aVELOX_CHECK_NOT_NULL(clpTableHandle)
after the cast.
36-37
: Add logging for the wildcard query fallback.When the query is empty or not provided, the code silently defaults to a wildcard query "*". This might not be what users expect. Consider adding some log information about why the wildcard is being used.
} else { + LOG(INFO) << "No specific query provided or query is empty. Using wildcard query '*' instead."; kqlQuery_ = "*"; }
120-125
: Consider simpler vector initialization.The initialization of vectors with NULL values feels unnecessarily complex. This approach creates vectors and immediately marks all values as NULL, which might be inefficient and confusing.
Consider if there's a more direct way to create vectors with NULL values, or if this pattern is really necessary. If the current approach is required, add a comment explaining why.
128-128
: Replace std::cout with Velox logger.Using
std::cout
for debugging is not appropriate for production code. Consider using Velox's logging framework instead.- std::cout << "rowsFetched" << rowsFetched << std::endl; + VLOG(1) << "rowsFetched: " << rowsFetched;Alternatively, if this is just debugging code, it should be removed completely.
if (APPLE) | ||
set(ENV{JAVA_HOME} "/usr/local/opt/openjdk@11/") | ||
endif () |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Check for openjdk@11 path existence on macOS.
Hardcoding the path to /usr/local/opt/openjdk@11/
might break on systems where openjdk@11 is installed elsewhere or not installed in that manner. Consider verifying existence or offering a user-overridable variable.
🏁 Script executed:
#!/bin/bash
# Verify that the directory /usr/local/opt/openjdk@11 actually exists before setting JAVA_HOME.
if [ ! -d "/usr/local/opt/openjdk@11" ]; then
echo "Directory /usr/local/opt/openjdk@11 not found. Please install openjdk@11 or set JAVA_HOME accordingly."
exit 1
fi
Length of output: 266
Action Required: Avoid Hardcoding JAVA_HOME on macOS
The verification script confirmed that /usr/local/opt/openjdk@11
does not exist on the tested system, which supports the original concern. Hardcoding this path could lead to build failures on systems where OpenJDK 11 is installed in a different location or not installed via Homebrew.
- In
CMake/FindANTLR.cmake
(lines 21–23), please either:- Check for the existence of the directory before setting
JAVA_HOME
, or - Provide a user-overridable variable that allows specifying an alternate OpenJDK path.
- Check for the existence of the directory before setting
@@ -393,6 +396,7 @@ set(BOOST_INCLUDE_LIBRARIES | |||
program_options | |||
regex | |||
system | |||
url |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
💡 Verification agent
🧩 Analysis chain
Added Boost::url to included libraries.
Check that your Boost version (≥1.77.0) properly supports boost::url
. Older releases may not ship a stable url
component.
🌐 Web query:
Is boost::url officially stable in Boost 1.77.0?
💡 Result:
Boost.URL is not officially stable in Boost 1.77.0 based on the provided documentation and release notes. Here's the key evidence:
-
Boost 1.77.0 Release Notes explicitly list updated libraries including Filesystem v4, but make no mention of URL library additions or stabilization[4][13].
-
First Official Appearance of Boost.URL in release notes appears in Boost 1.87.0 (Dec 2024), where it's listed as an updated library[3][10].
-
Library Documentation for Boost 1.77.0 shows no URL-related components in its comprehensive list of libraries[13], while later releases like 1.87.0 specifically mention URL library updates[10].
For developers needing URL handling in Boost 1.77.0, alternative approaches would be required, as the dedicated URL library wasn't yet available in this release. The URL component appears to have been stabilized in later Boost versions, with 1.87.0 showing significant development activity around its Unordered containers and URL-related features[10].
Citations:
- 1: https://boostorg.github.io/outcome/changelog.html
- 2: https://discourse.cmake.org/t/using-boost-with-cmake/6299
- 3: https://www.boost.org/users/download/
- 4: https://www.boost.org/users/history/version_1_77_0.html
- 5: https://github.com/boostorg/boost
- 6: https://www.baeldung.com/linux/boost-install-on-ubuntu
- 7: https://www.linuxfromscratch.org/blfs/view/11.0/general/boost.html
- 8: https://cppget.org/libboost-logic/1.77.0+1
- 9: https://www.badprog.com/c-boost-setting-up-on-windows-10
- 10: https://www.boost.io/releases/
- 11: https://cppget.org/libboost-static-string/1.77.0+1
- 12: https://nextcloud.com/changelog/
- 13: https://www.boost.org/doc/libs/1_77_0/
- 14: https://www.cppget.org/libboost-tokenizer?f=full&q=boost
- 15: https://cppget.org/libboost-format
- 16: Boost.Test 1.65.1, 1.66.0 link problem (Windows) bincrafters/community#94
- 17: https://github.com/boostorg/website/blob/master/feed/history/boost_1_77_0.qbk
Attention: Update Boost Version Requirement for Boost::url
The current addition of Boost::url
in CMakeLists.txt (line 399) assumes support in Boost versions ≥1.77.0. However, Boost documentation and release notes confirm that the URL component is not officially stable in Boost 1.77.0—it only appears as a stable feature from Boost 1.87.0 onward. Please update the minimum Boost version requirement or consider an alternative solution if you need to support earlier versions.
- File: CMakeLists.txt, Line 399.
- Action: Either update the minimum required Boost version to ≥1.87.0 or remove/revise the dependency on
Boost::url
if maintaining Boost 1.77.0 compatibility.
false); | ||
} | ||
|
||
cursor_->execute_query(kqlQuery_, fields_); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
🛠️ Refactor suggestion
Add error handling for execute_query.
The execute_query
method might fail, but there's no error handling for this case. Consider adding error handling to provide better diagnostics.
- cursor_->execute_query(kqlQuery_, fields_);
+ try {
+ cursor_->execute_query(kqlQuery_, fields_);
+ } catch (const std::exception& e) {
+ VELOX_USER_FAIL("Failed to execute query '{}': {}", kqlQuery_, e.what());
+ }
📝 Committable suggestion
‼️ IMPORTANT
Carefully review the code before committing. Ensure that it accurately replaces the highlighted code, contains no missing lines, and has no issues with indentation. Thoroughly test & benchmark the code to ensure it meets the requirements.
cursor_->execute_query(kqlQuery_, fields_); | |
try { | |
cursor_->execute_query(kqlQuery_, fields_); | |
} catch (const std::exception& e) { | |
VELOX_USER_FAIL("Failed to execute query '{}': {}", kqlQuery_, e.what()); | |
} |
Summary by CodeRabbit
New Features
Build & Dependency Enhancements
Refinements