-
Notifications
You must be signed in to change notification settings - Fork 85
from_json: null only schema-mismatched rows #4728
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,100 @@ | ||
| /* | ||
| * Copyright (c) 2026, NVIDIA CORPORATION. | ||
| * | ||
| * Licensed under the Apache License, Version 2.0 (the "License"); | ||
| * you may not use this file except in compliance with the License. | ||
| * You may obtain a copy of the License at | ||
| * | ||
| * http://www.apache.org/licenses/LICENSE-2.0 | ||
| * | ||
| * Unless required by applicable law or agreed to in writing, software | ||
| * distributed under the License is distributed on an "AS IS" BASIS, | ||
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. | ||
| * See the License for the specific language governing permissions and | ||
| * limitations under the License. | ||
| */ | ||
|
|
||
| #include <cudf_test/column_wrapper.hpp> | ||
|
|
||
| #include <cudf/strings/strings_column_view.hpp> | ||
| #include <cudf/types.hpp> | ||
|
|
||
| #include <json_utils.hpp> | ||
| #include <nvbench/nvbench.cuh> | ||
|
|
||
| #include <memory> | ||
| #include <string> | ||
| #include <vector> | ||
|
|
||
| namespace { | ||
|
|
||
| std::unique_ptr<cudf::column> make_input(cudf::size_type num_rows, cudf::size_type mismatch_percent) | ||
| { | ||
| std::string const valid = R"({"data":{"c2":[{"c3":19,"c4":"x"}],"c1":1},"id":10})"; | ||
| std::string const mismatched = R"({"data":{"c2":[19],"c1":2},"id":20})"; | ||
|
|
||
| std::vector<std::string> rows; | ||
| rows.reserve(num_rows); | ||
| for (cudf::size_type row = 0; row < num_rows; ++row) { | ||
| rows.push_back(mismatch_percent > 0 && row % 100 < mismatch_percent ? mismatched : valid); | ||
| } | ||
| return cudf::test::strings_column_wrapper(rows.begin(), rows.end()).release(); | ||
| } | ||
|
|
||
| std::vector<std::string> nested_schema_names() | ||
| { | ||
| return {"data", "c1", "c2", "element", "c3", "c4", "id"}; | ||
| } | ||
|
|
||
| std::vector<int> nested_schema_num_children() { return {2, 0, 1, 2, 0, 0, 0}; } | ||
|
|
||
| std::vector<int> nested_schema_types() | ||
| { | ||
| return {static_cast<int>(cudf::type_id::STRUCT), | ||
| static_cast<int>(cudf::type_id::INT32), | ||
| static_cast<int>(cudf::type_id::LIST), | ||
| static_cast<int>(cudf::type_id::STRUCT), | ||
| static_cast<int>(cudf::type_id::INT32), | ||
| static_cast<int>(cudf::type_id::STRING), | ||
| static_cast<int>(cudf::type_id::INT32)}; | ||
| } | ||
|
|
||
| std::vector<int> nested_schema_scales() { return {0, 0, 0, 0, 0, 0, 0}; } | ||
|
|
||
| std::vector<int> nested_schema_precisions() { return {-1, -1, -1, -1, -1, -1, -1}; } | ||
|
|
||
| } // namespace | ||
|
|
||
| void BM_from_json_to_structs(nvbench::state& state) | ||
| { | ||
| auto const num_rows = static_cast<cudf::size_type>(state.get_int64("num_rows")); | ||
| auto const mismatch_percent = static_cast<cudf::size_type>(state.get_int64("mismatch_percent")); | ||
| auto const input = make_input(num_rows, mismatch_percent); | ||
|
|
||
| auto const col_names = nested_schema_names(); | ||
| auto const num_children = nested_schema_num_children(); | ||
| auto const types = nested_schema_types(); | ||
| auto const scales = nested_schema_scales(); | ||
| auto const precisions = nested_schema_precisions(); | ||
|
|
||
| state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { | ||
| auto output = spark_rapids_jni::from_json_to_structs(cudf::strings_column_view{input->view()}, | ||
| col_names, | ||
| num_children, | ||
| types, | ||
| scales, | ||
| precisions, | ||
| true, | ||
| true, | ||
| true, | ||
| true, | ||
| true); | ||
| }); | ||
|
|
||
| state.add_buffer_size(num_rows, "rows", "Rows"); | ||
| } | ||
|
|
||
| NVBENCH_BENCH(BM_from_json_to_structs) | ||
| .set_name("from_json_to_structs") | ||
| .add_int64_axis("num_rows", {10000, 100000}) | ||
| .add_int64_axis("mismatch_percent", {0, 1}); |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -21,14 +21,17 @@ | |
|
|
||
| #include <cudf/column/column_device_view.cuh> | ||
| #include <cudf/column/column_factories.hpp> | ||
| #include <cudf/copying.hpp> | ||
| #include <cudf/detail/iterator.cuh> | ||
| #include <cudf/detail/utilities/cuda.cuh> | ||
| #include <cudf/detail/utilities/vector_factories.hpp> | ||
| #include <cudf/detail/valid_if.cuh> | ||
| #include <cudf/io/json.hpp> | ||
| #include <cudf/lists/lists_column_view.hpp> | ||
| #include <cudf/null_mask.hpp> | ||
| #include <cudf/strings/detail/strings_children.cuh> | ||
| #include <cudf/strings/strings_column_view.hpp> | ||
| #include <cudf/utilities/bit.hpp> | ||
| #include <cudf/utilities/traits.hpp> | ||
|
|
||
| #include <rmm/cuda_stream_view.hpp> | ||
|
|
@@ -48,6 +51,9 @@ | |
| #include <thrust/transform.h> | ||
| #include <thrust/uninitialized_fill.h> | ||
|
|
||
| #include <algorithm> | ||
| #include <map> | ||
|
|
||
| namespace spark_rapids_jni { | ||
|
|
||
| namespace detail { | ||
|
|
@@ -142,6 +148,80 @@ std::pair<cudf::io::schema_element, schema_element_with_precision> generate_stru | |
| cudf::data_type{cudf::type_id::STRUCT}, -1, std::move(schema_cols_with_precisions)}}; | ||
| } | ||
|
|
||
| void nullify_rows(cudf::column& input, | ||
| std::vector<cudf::size_type> const& row_indices, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) | ||
| { | ||
| if (row_indices.empty()) { return; } | ||
|
|
||
| auto const input_view = input.view(); | ||
| for (auto const row : row_indices) { | ||
| CUDF_EXPECTS(row >= 0 && row < input_view.size(), "Schema mismatch row index out of bounds."); | ||
| } | ||
|
|
||
| auto null_mask = | ||
| input_view.nullable() | ||
| ? cudf::copy_bitmask(input_view, stream, mr) | ||
| : cudf::create_null_mask(input_view.size(), cudf::mask_state::ALL_VALID, stream, mr); | ||
| auto d_row_indices = cudf::detail::make_device_uvector_async(row_indices, stream, mr); | ||
|
|
||
| auto mask_ptr = static_cast<cudf::bitmask_type*>(null_mask.data()); | ||
| thrust::for_each(rmm::exec_policy_nosync(stream), | ||
| d_row_indices.begin(), | ||
| d_row_indices.end(), | ||
| [mask_ptr] __device__(auto const row) { cudf::clear_bit(mask_ptr, row); }); | ||
|
|
||
| auto const null_count = cudf::null_count( | ||
| static_cast<cudf::bitmask_type const*>(null_mask.data()), 0, input_view.size(), stream); | ||
| input.set_null_mask(std::move(null_mask), null_count); | ||
| } | ||
|
|
||
| std::unique_ptr<cudf::column> make_lists_column_with_null_sanitization( | ||
| cudf::size_type num_rows, | ||
| std::unique_ptr<cudf::column> offsets_column, | ||
| std::unique_ptr<cudf::column> child_column, | ||
| cudf::size_type null_count, | ||
| rmm::device_buffer&& null_mask, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) | ||
| { | ||
| std::vector<std::unique_ptr<cudf::column>> children; | ||
| children.emplace_back(std::move(offsets_column)); | ||
| children.emplace_back(std::move(child_column)); | ||
| auto output = std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::LIST}, | ||
| num_rows, | ||
| rmm::device_buffer{}, | ||
| std::move(null_mask), | ||
| null_count, | ||
| std::move(children)); | ||
| // Row-level schema mismatch nulls can leave child data under null parents; sanitize it here. | ||
| if (null_count > 0) { output = cudf::purge_nonempty_nulls(output->view(), stream, mr); } | ||
|
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Use |
||
| return output; | ||
|
Comment on lines
+193
to
+200
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
If The existing test only checks null-mask equality at the top level — it does not verify that |
||
| } | ||
|
|
||
| std::unique_ptr<cudf::column> make_structs_column_with_null_consistency( | ||
| cudf::size_type num_rows, | ||
| std::vector<std::unique_ptr<cudf::column>>&& children, | ||
| cudf::size_type null_count, | ||
| rmm::device_buffer&& null_mask, | ||
| rmm::cuda_stream_view stream, | ||
| rmm::device_async_resource_ref mr) | ||
| { | ||
| if (null_count > 0) { | ||
| // make_structs_column superimposes parent nulls onto children for a consistent nested column. | ||
| return cudf::make_structs_column( | ||
| num_rows, std::move(children), null_count, std::move(null_mask), stream, mr); | ||
| } | ||
|
|
||
| return std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::STRUCT}, | ||
| num_rows, | ||
| rmm::device_buffer{}, | ||
| std::move(null_mask), | ||
| null_count, | ||
| std::move(children)); | ||
| } | ||
|
|
||
| using string_index_pair = cuda::std::pair<char const*, cudf::size_type>; | ||
|
|
||
| std::unique_ptr<cudf::column> cast_strings_to_booleans(cudf::column_view const& input, | ||
|
|
@@ -711,14 +791,14 @@ std::unique_ptr<cudf::column> convert_data_type(InputType&& input, | |
| new_children.emplace_back(convert_data_type( | ||
| std::move(child), child_schema, allow_nonnumeric_numbers, is_us_locale, stream, mr)); | ||
|
|
||
| // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` | ||
| // on the child column as it does not have non-empty nulls. | ||
| return std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::LIST}, | ||
| num_rows, | ||
| rmm::device_buffer{}, | ||
| std::move(*input_content.null_mask), | ||
| null_count, | ||
| std::move(new_children)); | ||
| return make_lists_column_with_null_sanitization( | ||
| num_rows, | ||
| std::move(new_children[cudf::lists_column_view::offsets_column_index]), | ||
| std::move(new_children[cudf::lists_column_view::child_column_index]), | ||
| null_count, | ||
| std::move(*input_content.null_mask), | ||
| stream, | ||
| mr); | ||
| } | ||
|
|
||
| if (schema.type.id() == cudf::type_id::STRUCT) { | ||
|
|
@@ -733,14 +813,12 @@ std::unique_ptr<cudf::column> convert_data_type(InputType&& input, | |
| mr)); | ||
| } | ||
|
|
||
| // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` | ||
| // on the children columns. | ||
| return std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::STRUCT}, | ||
| num_rows, | ||
| rmm::device_buffer{}, | ||
| std::move(*input_content.null_mask), | ||
| null_count, | ||
| std::move(new_children)); | ||
| return make_structs_column_with_null_consistency(num_rows, | ||
| std::move(new_children), | ||
| null_count, | ||
| std::move(*input_content.null_mask), | ||
| stream, | ||
| mr); | ||
| } | ||
| } else { // input_is_const_cv | ||
| auto const null_count = input.null_count(); | ||
|
|
@@ -761,14 +839,14 @@ std::unique_ptr<cudf::column> convert_data_type(InputType&& input, | |
| new_children.emplace_back( | ||
| convert_data_type(child, child_schema, allow_nonnumeric_numbers, is_us_locale, stream, mr)); | ||
|
|
||
| // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` | ||
| // on the child column as it does not have non-empty nulls. | ||
| return std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::LIST}, | ||
| num_rows, | ||
| rmm::device_buffer{}, | ||
| cudf::copy_bitmask(input, stream, mr), | ||
| null_count, | ||
| std::move(new_children)); | ||
| return make_lists_column_with_null_sanitization( | ||
| num_rows, | ||
| std::move(new_children[cudf::lists_column_view::offsets_column_index]), | ||
| std::move(new_children[cudf::lists_column_view::child_column_index]), | ||
| null_count, | ||
| cudf::copy_bitmask(input, stream, mr), | ||
| stream, | ||
| mr); | ||
| } | ||
|
|
||
| if (schema.type.id() == cudf::type_id::STRUCT) { | ||
|
|
@@ -783,14 +861,12 @@ std::unique_ptr<cudf::column> convert_data_type(InputType&& input, | |
| mr)); | ||
| } | ||
|
|
||
| // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` | ||
| // on the children columns. | ||
| return std::make_unique<cudf::column>(cudf::data_type{cudf::type_id::STRUCT}, | ||
| num_rows, | ||
| rmm::device_buffer{}, | ||
| cudf::copy_bitmask(input, stream, mr), | ||
| null_count, | ||
| std::move(new_children)); | ||
| return make_structs_column_with_null_consistency(num_rows, | ||
| std::move(new_children), | ||
| null_count, | ||
| cudf::copy_bitmask(input, stream, mr), | ||
| stream, | ||
| mr); | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -838,9 +914,9 @@ std::unique_ptr<cudf::column> from_json_to_structs(cudf::strings_column_view con | |
| .dtypes(schema) | ||
| .prune_columns(schema.child_types.size() != 0); | ||
|
|
||
| auto const parsed_table_with_meta = cudf::io::read_json(opts_builder.build()); | ||
| auto const& parsed_meta = parsed_table_with_meta.metadata; | ||
| auto parsed_columns = parsed_table_with_meta.tbl->release(); | ||
| auto parsed_result = cudf::io::read_json_with_row_diagnostics(opts_builder.build(), stream, mr); | ||
| auto const& parsed_meta = parsed_result.data.metadata; | ||
| auto parsed_columns = parsed_result.data.tbl->release(); | ||
|
|
||
| CUDF_EXPECTS(parsed_columns.size() == schema.child_types.size(), | ||
| "Numbers of output columns is different from schema size."); | ||
|
|
@@ -855,6 +931,14 @@ std::unique_ptr<cudf::column> from_json_to_structs(cudf::strings_column_view con | |
|
|
||
| auto const& [col_name, col_schema] = schema_with_precision.child_types[i]; | ||
| CUDF_EXPECTS(parsed_meta.schema_info[i].name == col_name, "Mismatched column name."); | ||
| auto const mismatch_rows = | ||
| std::find_if(parsed_result.diagnostics.top_level_columns_with_schema_mismatch_rows.begin(), | ||
| parsed_result.diagnostics.top_level_columns_with_schema_mismatch_rows.end(), | ||
| [&col_name](auto const& row_info) { return row_info.column_name == col_name; }); | ||
| if (mismatch_rows != | ||
| parsed_result.diagnostics.top_level_columns_with_schema_mismatch_rows.end()) { | ||
| nullify_rows(*parsed_columns[i], mismatch_rows->row_indices, stream, mr); | ||
|
Comment on lines
+934
to
+940
Collaborator
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Why is this executed on host? |
||
| } | ||
|
Comment on lines
+934
to
+941
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time! |
||
| converted_cols.emplace_back(convert_data_type(std::move(parsed_columns[i]), | ||
| col_schema, | ||
| allow_nonnumeric_numbers, | ||
|
|
@@ -867,15 +951,13 @@ std::unique_ptr<cudf::column> from_json_to_structs(cudf::strings_column_view con | |
| auto [null_mask, null_count] = cudf::detail::valid_if( | ||
| valid_it, valid_it + should_be_nullified->size(), thrust::logical_not<bool>{}, stream, mr); | ||
|
|
||
| // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` | ||
| // on the children columns. | ||
| return std::make_unique<cudf::column>( | ||
| cudf::data_type{cudf::type_id::STRUCT}, | ||
| return make_structs_column_with_null_consistency( | ||
| input.size(), | ||
| rmm::device_buffer{}, | ||
| null_count > 0 ? std::move(null_mask) : rmm::device_buffer{0, stream, mr}, | ||
| std::move(converted_cols), | ||
| null_count, | ||
| std::move(converted_cols)); | ||
| null_count > 0 ? std::move(null_mask) : rmm::device_buffer{0, stream, mr}, | ||
| stream, | ||
| mr); | ||
| } | ||
|
|
||
| } // namespace | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This loop iterates every mismatch row index on the CPU before any GPU work starts. For pathological inputs (e.g., a table where 50% of rows mismatch), this serializes a potentially large host-side validation pass. The GPU kernel below (
thrust::for_each) is inherently bounded by the device memory it touches, so out-of-range indices would produce undefined GPU behavior regardless — this check should at minimum only run in debug builds, or be replaced with an assertion on the max element rather than a per-element loop.Note: If this suggestion doesn't match your team's coding style, reply to this and let me know. I'll remember it for next time!