Skip to content

Commit 39c865e

Browse files
authored
GH-48168: [C++][Parquet] Fix setting column-specific options when writing an encrypted Dataset (#48170)
### Rationale for this change Fixes creation of writer properties when writing a Parquet Dataset with encryption, so that column-specific settings aren't lost. ### What changes are included in this PR? Changes the `WriterProperties::Builder` constructor that uses existing properties so that it also sets any column-specific settings. ### Are these changes tested? Yes. ### Are there any user-facing changes? Yes, this fixes a user-facing bug. * GitHub Issue: #48168 Authored-by: Adam Reeve <[email protected]> Signed-off-by: Adam Reeve <[email protected]>
1 parent 4d03aa5 commit 39c865e

File tree

4 files changed

+257
-2
lines changed

4 files changed

+257
-2
lines changed

cpp/src/parquet/properties.cc

Lines changed: 44 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -67,4 +67,48 @@ std::shared_ptr<ArrowWriterProperties> default_arrow_writer_properties() {
6767
return default_writer_properties;
6868
}
6969

70+
void WriterProperties::Builder::CopyColumnSpecificProperties(
71+
const WriterProperties& properties) {
72+
for (const auto& [col_path, col_props] : properties.column_properties_) {
73+
if (col_props.statistics_enabled() !=
74+
default_column_properties_.statistics_enabled()) {
75+
if (col_props.statistics_enabled()) {
76+
this->enable_statistics(col_path);
77+
} else {
78+
this->disable_statistics(col_path);
79+
}
80+
}
81+
82+
if (col_props.dictionary_enabled() !=
83+
default_column_properties_.dictionary_enabled()) {
84+
if (col_props.dictionary_enabled()) {
85+
this->enable_dictionary(col_path);
86+
} else {
87+
this->disable_dictionary(col_path);
88+
}
89+
}
90+
91+
if (col_props.page_index_enabled() !=
92+
default_column_properties_.page_index_enabled()) {
93+
if (col_props.page_index_enabled()) {
94+
this->enable_write_page_index(col_path);
95+
} else {
96+
this->disable_write_page_index(col_path);
97+
}
98+
}
99+
100+
if (col_props.compression() != default_column_properties_.compression()) {
101+
this->compression(col_path, col_props.compression());
102+
}
103+
104+
if (col_props.compression_level() != default_column_properties_.compression_level()) {
105+
this->compression_level(col_path, col_props.compression_level());
106+
}
107+
108+
if (col_props.encoding() != default_column_properties_.encoding()) {
109+
this->encoding(col_path, col_props.encoding());
110+
}
111+
}
112+
}
113+
70114
} // namespace parquet

cpp/src/parquet/properties.h

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -286,7 +286,7 @@ struct PARQUET_EXPORT CdcOptions {
286286

287287
class PARQUET_EXPORT WriterProperties {
288288
public:
289-
class Builder {
289+
class PARQUET_EXPORT Builder {
290290
public:
291291
Builder()
292292
: pool_(::arrow::default_memory_pool()),
@@ -322,7 +322,9 @@ class PARQUET_EXPORT WriterProperties {
322322
content_defined_chunking_enabled_(
323323
properties.content_defined_chunking_enabled()),
324324
content_defined_chunking_options_(
325-
properties.content_defined_chunking_options()) {}
325+
properties.content_defined_chunking_options()) {
326+
CopyColumnSpecificProperties(properties);
327+
}
326328

327329
/// \brief EXPERIMENTAL: Use content-defined page chunking for all columns.
328330
///
@@ -784,6 +786,8 @@ class PARQUET_EXPORT WriterProperties {
784786
}
785787

786788
private:
789+
void CopyColumnSpecificProperties(const WriterProperties& properties);
790+
787791
MemoryPool* pool_;
788792
int64_t dictionary_pagesize_limit_;
789793
int64_t write_batch_size_;

cpp/src/parquet/properties_test.cc

Lines changed: 140 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -149,5 +149,145 @@ TEST(TestReaderProperties, GetStreamInsufficientData) {
149149
}
150150
}
151151

152+
struct WriterPropertiesTestCase {
153+
WriterPropertiesTestCase(std::shared_ptr<WriterProperties> props, std::string label)
154+
: properties(std::move(props)), label(std::move(label)) {}
155+
156+
std::shared_ptr<WriterProperties> properties;
157+
std::string label;
158+
};
159+
160+
void PrintTo(const WriterPropertiesTestCase& p, std::ostream* os) { *os << p.label; }
161+
162+
class WriterPropertiesTest : public testing::TestWithParam<WriterPropertiesTestCase> {};
163+
164+
TEST_P(WriterPropertiesTest, RoundTripThroughBuilder) {
165+
const std::shared_ptr<WriterProperties>& properties = GetParam().properties;
166+
const std::vector<std::shared_ptr<ColumnPath>> columns{
167+
ColumnPath::FromDotString("a"),
168+
ColumnPath::FromDotString("b"),
169+
};
170+
171+
const auto round_tripped = WriterProperties::Builder(*properties).build();
172+
173+
ASSERT_EQ(round_tripped->content_defined_chunking_enabled(),
174+
properties->content_defined_chunking_enabled());
175+
ASSERT_EQ(round_tripped->created_by(), properties->created_by());
176+
ASSERT_EQ(round_tripped->data_pagesize(), properties->data_pagesize());
177+
ASSERT_EQ(round_tripped->data_page_version(), properties->data_page_version());
178+
ASSERT_EQ(round_tripped->dictionary_index_encoding(),
179+
properties->dictionary_index_encoding());
180+
ASSERT_EQ(round_tripped->dictionary_pagesize_limit(),
181+
properties->dictionary_pagesize_limit());
182+
ASSERT_EQ(round_tripped->file_encryption_properties(),
183+
properties->file_encryption_properties());
184+
ASSERT_EQ(round_tripped->max_rows_per_page(), properties->max_rows_per_page());
185+
ASSERT_EQ(round_tripped->max_row_group_length(), properties->max_row_group_length());
186+
ASSERT_EQ(round_tripped->memory_pool(), properties->memory_pool());
187+
ASSERT_EQ(round_tripped->page_checksum_enabled(), properties->page_checksum_enabled());
188+
ASSERT_EQ(round_tripped->size_statistics_level(), properties->size_statistics_level());
189+
ASSERT_EQ(round_tripped->sorting_columns(), properties->sorting_columns());
190+
ASSERT_EQ(round_tripped->store_decimal_as_integer(),
191+
properties->store_decimal_as_integer());
192+
ASSERT_EQ(round_tripped->write_batch_size(), properties->write_batch_size());
193+
ASSERT_EQ(round_tripped->version(), properties->version());
194+
195+
const auto cdc_options = properties->content_defined_chunking_options();
196+
const auto round_tripped_cdc_options =
197+
round_tripped->content_defined_chunking_options();
198+
ASSERT_EQ(round_tripped_cdc_options.min_chunk_size, cdc_options.min_chunk_size);
199+
ASSERT_EQ(round_tripped_cdc_options.max_chunk_size, cdc_options.max_chunk_size);
200+
ASSERT_EQ(round_tripped_cdc_options.norm_level, cdc_options.norm_level);
201+
202+
for (const auto& column : columns) {
203+
const auto& column_properties = properties->column_properties(column);
204+
const auto& round_tripped_col = round_tripped->column_properties(column);
205+
206+
ASSERT_EQ(round_tripped_col.compression(), column_properties.compression());
207+
ASSERT_EQ(round_tripped_col.compression_level(),
208+
column_properties.compression_level());
209+
ASSERT_EQ(round_tripped_col.dictionary_enabled(),
210+
column_properties.dictionary_enabled());
211+
ASSERT_EQ(round_tripped_col.encoding(), column_properties.encoding());
212+
ASSERT_EQ(round_tripped_col.max_statistics_size(),
213+
column_properties.max_statistics_size());
214+
ASSERT_EQ(round_tripped_col.page_index_enabled(),
215+
column_properties.page_index_enabled());
216+
ASSERT_EQ(round_tripped_col.statistics_enabled(),
217+
column_properties.statistics_enabled());
218+
}
219+
}
220+
221+
std::vector<WriterPropertiesTestCase> writer_properties_test_cases() {
222+
std::vector<WriterPropertiesTestCase> test_cases;
223+
224+
test_cases.emplace_back(default_writer_properties(), "default_properties");
225+
226+
{
227+
WriterProperties::Builder builder;
228+
const auto column_a = ColumnPath::FromDotString("a");
229+
230+
builder.created_by("parquet-cpp-properties-test");
231+
builder.encoding(Encoding::BYTE_STREAM_SPLIT);
232+
builder.compression(Compression::ZSTD);
233+
builder.compression_level(2);
234+
builder.disable_dictionary();
235+
builder.disable_statistics();
236+
builder.enable_content_defined_chunking();
237+
builder.dictionary_pagesize_limit(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT - 1);
238+
builder.write_batch_size(DEFAULT_WRITE_BATCH_SIZE - 1);
239+
builder.max_row_group_length(DEFAULT_MAX_ROW_GROUP_LENGTH - 1);
240+
builder.data_pagesize(kDefaultDataPageSize - 1);
241+
builder.max_rows_per_page(kDefaultMaxRowsPerPage - 1);
242+
builder.data_page_version(ParquetDataPageVersion::V2);
243+
builder.version(ParquetVersion::type::PARQUET_2_4);
244+
builder.enable_store_decimal_as_integer();
245+
builder.disable_write_page_index();
246+
builder.set_size_statistics_level(SizeStatisticsLevel::ColumnChunk);
247+
builder.set_sorting_columns(
248+
std::vector<SortingColumn>{SortingColumn{1, true, false}});
249+
250+
test_cases.emplace_back(builder.build(), "override_defaults");
251+
}
252+
253+
const auto column_a = ColumnPath::FromDotString("a");
254+
{
255+
WriterProperties::Builder builder;
256+
builder.disable_dictionary();
257+
builder.enable_dictionary(column_a);
258+
test_cases.emplace_back(builder.build(), "dictionary_column_override");
259+
}
260+
{
261+
WriterProperties::Builder builder;
262+
builder.disable_statistics();
263+
builder.enable_statistics(column_a);
264+
test_cases.emplace_back(builder.build(), "statistics_column_override");
265+
}
266+
{
267+
WriterProperties::Builder builder;
268+
builder.compression(Compression::SNAPPY);
269+
builder.compression(column_a, Compression::UNCOMPRESSED);
270+
builder.compression_level(column_a, 2);
271+
test_cases.emplace_back(builder.build(), "compression_column_override");
272+
}
273+
{
274+
WriterProperties::Builder builder;
275+
builder.encoding(Encoding::UNDEFINED);
276+
builder.encoding(column_a, Encoding::BYTE_STREAM_SPLIT);
277+
test_cases.emplace_back(builder.build(), "encoding_column_override");
278+
}
279+
{
280+
WriterProperties::Builder builder;
281+
builder.disable_write_page_index();
282+
builder.enable_write_page_index(column_a);
283+
test_cases.emplace_back(builder.build(), "page_index_column_override");
284+
}
285+
286+
return test_cases;
287+
}
288+
289+
INSTANTIATE_TEST_SUITE_P(WriterPropertiesTest, WriterPropertiesTest,
290+
::testing::ValuesIn(writer_properties_test_cases()));
291+
152292
} // namespace test
153293
} // namespace parquet

python/pyarrow/tests/test_dataset_encryption.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -230,3 +230,70 @@ def unwrap_key(self, wrapped_key: bytes, _: str) -> bytes:
230230
dataset = ds.dataset(path, format=file_format, filesystem=mockfs)
231231
new_table = dataset.to_table()
232232
assert table == new_table
233+
234+
235+
@pytest.mark.skipif(
236+
encryption_unavailable, reason="Parquet Encryption is not currently enabled"
237+
)
238+
def test_dataset_encryption_with_selected_column_statistics():
239+
table = create_sample_table()
240+
241+
encryption_config = create_encryption_config()
242+
decryption_config = create_decryption_config()
243+
kms_connection_config = create_kms_connection_config()
244+
245+
crypto_factory = pe.CryptoFactory(kms_factory)
246+
parquet_encryption_cfg = ds.ParquetEncryptionConfig(
247+
crypto_factory, kms_connection_config, encryption_config
248+
)
249+
parquet_decryption_cfg = ds.ParquetDecryptionConfig(
250+
crypto_factory, kms_connection_config, decryption_config
251+
)
252+
253+
# create write_options with dataset encryption config
254+
# and specify that statistics should be enabled for a subset of columns.
255+
pformat = pa.dataset.ParquetFileFormat()
256+
write_options = pformat.make_write_options(
257+
encryption_config=parquet_encryption_cfg,
258+
write_statistics=["year", "n_legs"]
259+
)
260+
261+
mockfs = fs._MockFileSystem()
262+
mockfs.create_dir("/")
263+
264+
ds.write_dataset(
265+
data=table,
266+
base_dir="sample_dataset",
267+
format=pformat,
268+
file_options=write_options,
269+
filesystem=mockfs,
270+
)
271+
272+
# Open Parquet files directly and check that statistics are present
273+
# for the expected columns.
274+
pq_scan_opts = ds.ParquetFragmentScanOptions(
275+
decryption_config=parquet_decryption_cfg
276+
)
277+
pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts)
278+
dataset = ds.dataset("sample_dataset", format=pformat, filesystem=mockfs)
279+
280+
for fragment in dataset.get_fragments():
281+
decryption_properties = crypto_factory.file_decryption_properties(
282+
kms_connection_config, decryption_config, fragment.path, mockfs)
283+
with pq.ParquetFile(
284+
fragment.path,
285+
decryption_properties=decryption_properties,
286+
filesystem=mockfs,
287+
) as parquet_file:
288+
for rg_idx in range(parquet_file.metadata.num_row_groups):
289+
row_group = parquet_file.metadata.row_group(rg_idx)
290+
291+
assert row_group.column(0).statistics is not None
292+
assert row_group.column(0).statistics.min == 2019
293+
assert row_group.column(0).statistics.max == 2022
294+
295+
assert row_group.column(1).statistics is not None
296+
assert row_group.column(1).statistics.min == 2
297+
assert row_group.column(1).statistics.max == 100
298+
299+
assert row_group.column(2).statistics is None

0 commit comments

Comments
 (0)