diff --git a/src/main/cpp/benchmarks/CMakeLists.txt b/src/main/cpp/benchmarks/CMakeLists.txt index 7641552622..a0d656b941 100644 --- a/src/main/cpp/benchmarks/CMakeLists.txt +++ b/src/main/cpp/benchmarks/CMakeLists.txt @@ -1,5 +1,5 @@ #============================================================================= -# Copyright (c) 2022-2025, NVIDIA CORPORATION. +# Copyright (c) 2022-2026, NVIDIA CORPORATION. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. @@ -84,5 +84,8 @@ ConfigureBench(BLOOM_FILTER_BENCH ConfigureBench(GET_JSON_OBJECT_BENCH get_json_object.cu) +ConfigureBench(FROM_JSON_TO_STRUCTS_BENCH + from_json_to_structs.cu) + ConfigureBench(PARSE_URI_BENCH parse_uri.cpp) diff --git a/src/main/cpp/benchmarks/from_json_to_structs.cu b/src/main/cpp/benchmarks/from_json_to_structs.cu new file mode 100644 index 0000000000..22af4cd213 --- /dev/null +++ b/src/main/cpp/benchmarks/from_json_to_structs.cu @@ -0,0 +1,100 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include + +#include +#include + +#include +#include + +#include +#include +#include + +namespace { + +std::unique_ptr make_input(cudf::size_type num_rows, cudf::size_type mismatch_percent) +{ + std::string const valid = R"({"data":{"c2":[{"c3":19,"c4":"x"}],"c1":1},"id":10})"; + std::string const mismatched = R"({"data":{"c2":[19],"c1":2},"id":20})"; + + std::vector rows; + rows.reserve(num_rows); + for (cudf::size_type row = 0; row < num_rows; ++row) { + rows.push_back(mismatch_percent > 0 && row % 100 < mismatch_percent ? mismatched : valid); + } + return cudf::test::strings_column_wrapper(rows.begin(), rows.end()).release(); +} + +std::vector nested_schema_names() +{ + return {"data", "c1", "c2", "element", "c3", "c4", "id"}; +} + +std::vector nested_schema_num_children() { return {2, 0, 1, 2, 0, 0, 0}; } + +std::vector nested_schema_types() +{ + return {static_cast(cudf::type_id::STRUCT), + static_cast(cudf::type_id::INT32), + static_cast(cudf::type_id::LIST), + static_cast(cudf::type_id::STRUCT), + static_cast(cudf::type_id::INT32), + static_cast(cudf::type_id::STRING), + static_cast(cudf::type_id::INT32)}; +} + +std::vector nested_schema_scales() { return {0, 0, 0, 0, 0, 0, 0}; } + +std::vector nested_schema_precisions() { return {-1, -1, -1, -1, -1, -1, -1}; } + +} // namespace + +void BM_from_json_to_structs(nvbench::state& state) +{ + auto const num_rows = static_cast(state.get_int64("num_rows")); + auto const mismatch_percent = static_cast(state.get_int64("mismatch_percent")); + auto const input = make_input(num_rows, mismatch_percent); + + auto const col_names = nested_schema_names(); + auto const num_children = nested_schema_num_children(); + auto const types = nested_schema_types(); + auto const scales = nested_schema_scales(); + auto const precisions = nested_schema_precisions(); + + state.exec(nvbench::exec_tag::sync, [&](nvbench::launch& launch) { + auto output = spark_rapids_jni::from_json_to_structs(cudf::strings_column_view{input->view()}, + col_names, + num_children, + types, + scales, + precisions, + true, + true, + true, + true, + true); + }); + + state.add_buffer_size(num_rows, "rows", "Rows"); +} + +NVBENCH_BENCH(BM_from_json_to_structs) + .set_name("from_json_to_structs") + .add_int64_axis("num_rows", {10000, 100000}) + .add_int64_axis("mismatch_percent", {0, 1}); diff --git a/src/main/cpp/src/from_json_to_structs.cu b/src/main/cpp/src/from_json_to_structs.cu index fcef701fff..ac770cdd67 100644 --- a/src/main/cpp/src/from_json_to_structs.cu +++ b/src/main/cpp/src/from_json_to_structs.cu @@ -21,14 +21,17 @@ #include #include +#include #include #include +#include #include #include #include #include #include #include +#include #include #include @@ -48,6 +51,9 @@ #include #include +#include +#include + namespace spark_rapids_jni { namespace detail { @@ -142,6 +148,80 @@ std::pair generate_stru cudf::data_type{cudf::type_id::STRUCT}, -1, std::move(schema_cols_with_precisions)}}; } +void nullify_rows(cudf::column& input, + std::vector const& row_indices, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + if (row_indices.empty()) { return; } + + auto const input_view = input.view(); + for (auto const row : row_indices) { + CUDF_EXPECTS(row >= 0 && row < input_view.size(), "Schema mismatch row index out of bounds."); + } + + auto null_mask = + input_view.nullable() + ? cudf::copy_bitmask(input_view, stream, mr) + : cudf::create_null_mask(input_view.size(), cudf::mask_state::ALL_VALID, stream, mr); + auto d_row_indices = cudf::detail::make_device_uvector_async(row_indices, stream, mr); + + auto mask_ptr = static_cast(null_mask.data()); + thrust::for_each(rmm::exec_policy_nosync(stream), + d_row_indices.begin(), + d_row_indices.end(), + [mask_ptr] __device__(auto const row) { cudf::clear_bit(mask_ptr, row); }); + + auto const null_count = cudf::null_count( + static_cast(null_mask.data()), 0, input_view.size(), stream); + input.set_null_mask(std::move(null_mask), null_count); +} + +std::unique_ptr make_lists_column_with_null_sanitization( + cudf::size_type num_rows, + std::unique_ptr offsets_column, + std::unique_ptr child_column, + cudf::size_type null_count, + rmm::device_buffer&& null_mask, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + std::vector> children; + children.emplace_back(std::move(offsets_column)); + children.emplace_back(std::move(child_column)); + auto output = std::make_unique(cudf::data_type{cudf::type_id::LIST}, + num_rows, + rmm::device_buffer{}, + std::move(null_mask), + null_count, + std::move(children)); + // Row-level schema mismatch nulls can leave child data under null parents; sanitize it here. + if (null_count > 0) { output = cudf::purge_nonempty_nulls(output->view(), stream, mr); } + return output; +} + +std::unique_ptr make_structs_column_with_null_consistency( + cudf::size_type num_rows, + std::vector>&& children, + cudf::size_type null_count, + rmm::device_buffer&& null_mask, + rmm::cuda_stream_view stream, + rmm::device_async_resource_ref mr) +{ + if (null_count > 0) { + // make_structs_column superimposes parent nulls onto children for a consistent nested column. + return cudf::make_structs_column( + num_rows, std::move(children), null_count, std::move(null_mask), stream, mr); + } + + return std::make_unique(cudf::data_type{cudf::type_id::STRUCT}, + num_rows, + rmm::device_buffer{}, + std::move(null_mask), + null_count, + std::move(children)); +} + using string_index_pair = cuda::std::pair; std::unique_ptr cast_strings_to_booleans(cudf::column_view const& input, @@ -711,14 +791,14 @@ std::unique_ptr convert_data_type(InputType&& input, new_children.emplace_back(convert_data_type( std::move(child), child_schema, allow_nonnumeric_numbers, is_us_locale, stream, mr)); - // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` - // on the child column as it does not have non-empty nulls. - return std::make_unique(cudf::data_type{cudf::type_id::LIST}, - num_rows, - rmm::device_buffer{}, - std::move(*input_content.null_mask), - null_count, - std::move(new_children)); + return make_lists_column_with_null_sanitization( + num_rows, + std::move(new_children[cudf::lists_column_view::offsets_column_index]), + std::move(new_children[cudf::lists_column_view::child_column_index]), + null_count, + std::move(*input_content.null_mask), + stream, + mr); } if (schema.type.id() == cudf::type_id::STRUCT) { @@ -733,14 +813,12 @@ std::unique_ptr convert_data_type(InputType&& input, mr)); } - // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` - // on the children columns. - return std::make_unique(cudf::data_type{cudf::type_id::STRUCT}, - num_rows, - rmm::device_buffer{}, - std::move(*input_content.null_mask), - null_count, - std::move(new_children)); + return make_structs_column_with_null_consistency(num_rows, + std::move(new_children), + null_count, + std::move(*input_content.null_mask), + stream, + mr); } } else { // input_is_const_cv auto const null_count = input.null_count(); @@ -761,14 +839,14 @@ std::unique_ptr convert_data_type(InputType&& input, new_children.emplace_back( convert_data_type(child, child_schema, allow_nonnumeric_numbers, is_us_locale, stream, mr)); - // Do not use `cudf::make_lists_column` since we do not need to call `purge_nonempty_nulls` - // on the child column as it does not have non-empty nulls. - return std::make_unique(cudf::data_type{cudf::type_id::LIST}, - num_rows, - rmm::device_buffer{}, - cudf::copy_bitmask(input, stream, mr), - null_count, - std::move(new_children)); + return make_lists_column_with_null_sanitization( + num_rows, + std::move(new_children[cudf::lists_column_view::offsets_column_index]), + std::move(new_children[cudf::lists_column_view::child_column_index]), + null_count, + cudf::copy_bitmask(input, stream, mr), + stream, + mr); } if (schema.type.id() == cudf::type_id::STRUCT) { @@ -783,14 +861,12 @@ std::unique_ptr convert_data_type(InputType&& input, mr)); } - // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` - // on the children columns. - return std::make_unique(cudf::data_type{cudf::type_id::STRUCT}, - num_rows, - rmm::device_buffer{}, - cudf::copy_bitmask(input, stream, mr), - null_count, - std::move(new_children)); + return make_structs_column_with_null_consistency(num_rows, + std::move(new_children), + null_count, + cudf::copy_bitmask(input, stream, mr), + stream, + mr); } } @@ -838,9 +914,9 @@ std::unique_ptr from_json_to_structs(cudf::strings_column_view con .dtypes(schema) .prune_columns(schema.child_types.size() != 0); - auto const parsed_table_with_meta = cudf::io::read_json(opts_builder.build()); - auto const& parsed_meta = parsed_table_with_meta.metadata; - auto parsed_columns = parsed_table_with_meta.tbl->release(); + auto parsed_result = cudf::io::read_json_with_row_diagnostics(opts_builder.build(), stream, mr); + auto const& parsed_meta = parsed_result.data.metadata; + auto parsed_columns = parsed_result.data.tbl->release(); CUDF_EXPECTS(parsed_columns.size() == schema.child_types.size(), "Numbers of output columns is different from schema size."); @@ -855,6 +931,14 @@ std::unique_ptr from_json_to_structs(cudf::strings_column_view con auto const& [col_name, col_schema] = schema_with_precision.child_types[i]; CUDF_EXPECTS(parsed_meta.schema_info[i].name == col_name, "Mismatched column name."); + auto const mismatch_rows = + std::find_if(parsed_result.diagnostics.top_level_columns_with_schema_mismatch_rows.begin(), + parsed_result.diagnostics.top_level_columns_with_schema_mismatch_rows.end(), + [&col_name](auto const& row_info) { return row_info.column_name == col_name; }); + if (mismatch_rows != + parsed_result.diagnostics.top_level_columns_with_schema_mismatch_rows.end()) { + nullify_rows(*parsed_columns[i], mismatch_rows->row_indices, stream, mr); + } converted_cols.emplace_back(convert_data_type(std::move(parsed_columns[i]), col_schema, allow_nonnumeric_numbers, @@ -867,15 +951,13 @@ std::unique_ptr from_json_to_structs(cudf::strings_column_view con auto [null_mask, null_count] = cudf::detail::valid_if( valid_it, valid_it + should_be_nullified->size(), thrust::logical_not{}, stream, mr); - // Do not use `cudf::make_structs_column` since we do not need to call `superimpose_nulls` - // on the children columns. - return std::make_unique( - cudf::data_type{cudf::type_id::STRUCT}, + return make_structs_column_with_null_consistency( input.size(), - rmm::device_buffer{}, - null_count > 0 ? std::move(null_mask) : rmm::device_buffer{0, stream, mr}, + std::move(converted_cols), null_count, - std::move(converted_cols)); + null_count > 0 ? std::move(null_mask) : rmm::device_buffer{0, stream, mr}, + stream, + mr); } } // namespace diff --git a/src/test/java/com/nvidia/spark/rapids/jni/FromJsonToStructsTest.java b/src/test/java/com/nvidia/spark/rapids/jni/FromJsonToStructsTest.java new file mode 100644 index 0000000000..2cfccf580a --- /dev/null +++ b/src/test/java/com/nvidia/spark/rapids/jni/FromJsonToStructsTest.java @@ -0,0 +1,77 @@ +/* + * Copyright (c) 2026, NVIDIA CORPORATION. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.nvidia.spark.rapids.jni; + +import ai.rapids.cudf.ColumnVector; +import ai.rapids.cudf.DType; +import ai.rapids.cudf.HostColumnVector; +import ai.rapids.cudf.JSONOptions; +import ai.rapids.cudf.Schema; +import org.junit.jupiter.api.Test; + +import java.util.Arrays; +import java.util.Collections; + +import static ai.rapids.cudf.AssertUtils.assertColumnsAreEqual; + +public class FromJsonToStructsTest { + private static JSONOptions getOptions() { + return JSONOptions.builder() + .withNormalizeSingleQuotes(true) + .withLeadingZeros(true) + .withNonNumericNumbers(true) + .withUnquotedControlChars(true) + .build(); + } + + private static Schema mixedNestedTypesSchema() { + Schema.Builder root = Schema.builder(); + Schema.Builder data = root.addColumn(DType.STRUCT, "data"); + data.column(DType.INT32, "c1"); + Schema.Builder c2 = data.addColumn(DType.LIST, "c2"); + Schema.Builder element = c2.addColumn(DType.STRUCT, "element"); + element.column(DType.INT32, "c3"); + element.column(DType.STRING, "c4"); + root.column(DType.INT32, "id"); + return root.build(); + } + + @Test + void testFromJsonToStructsNullsOnlyMismatchedRowsForDepthOneParent() { + String valid = "{\"data\":{\"c2\":[{\"c3\":19,\"c4\":\"x\"}],\"c1\":1},\"id\":10}"; + String mismatched = "{\"data\":{\"c2\":[19],\"c1\":2},\"id\":20}"; + String validAfterMismatch = "{\"data\":{\"c2\":[{\"c3\":39,\"c4\":\"z\"}],\"c1\":3},\"id\":30}"; + Schema schema = mixedNestedTypesSchema(); + + try (ColumnVector input = ColumnVector.fromStrings(valid, mismatched, validAfterMismatch); + ColumnVector actual = JSONUtils.fromJSONToStructs(input, schema, getOptions(), true); + ColumnVector expected = ColumnVector.fromStructs(schema.asHostDataType(), + new HostColumnVector.StructData( + new HostColumnVector.StructData( + 1, + Collections.singletonList(new HostColumnVector.StructData(19, "x"))), + 10), + new HostColumnVector.StructData(Arrays.asList(null, 20)), + new HostColumnVector.StructData( + new HostColumnVector.StructData( + 3, + Collections.singletonList(new HostColumnVector.StructData(39, "z"))), + 30))) { + assertColumnsAreEqual(expected, actual); + } + } +}