Skip to content
44 changes: 43 additions & 1 deletion cpp/src/parquet/properties.h
Original file line number Diff line number Diff line change
Expand Up @@ -322,7 +322,49 @@ class PARQUET_EXPORT WriterProperties {
content_defined_chunking_enabled_(
properties.content_defined_chunking_enabled()),
content_defined_chunking_options_(
properties.content_defined_chunking_options()) {}
properties.content_defined_chunking_options()) {
for (const auto& [col_path, col_props] : properties.column_properties_) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I just noticed that WriterProperties and WriterProperties::Builder use different forms to represent column properties. Is it possible to simplify this by using the same form?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the builder needs to work this way so that different column-specific properties can be changed as well as changing default column properties without the order of operations affecting the result. I don't think the WriterProperties can be changed to use the Builder's representation without breaking the API as ColumnProperties is part of the public API and returned as a reference:

const ColumnProperties& column_properties(

if (col_props.statistics_enabled() !=
default_column_properties_.statistics_enabled()) {
if (col_props.statistics_enabled()) {
this->enable_statistics(col_path);
} else {
this->disable_statistics(col_path);
}
}

if (col_props.dictionary_enabled() !=
default_column_properties_.dictionary_enabled()) {
if (col_props.dictionary_enabled()) {
this->enable_dictionary(col_path);
} else {
this->disable_dictionary(col_path);
}
}

if (col_props.page_index_enabled() !=
default_column_properties_.page_index_enabled()) {
if (col_props.page_index_enabled()) {
this->enable_write_page_index(col_path);
} else {
this->disable_write_page_index(col_path);
}
}

if (col_props.compression() != default_column_properties_.compression()) {
this->compression(col_path, col_props.compression());
}

if (col_props.compression_level() !=
default_column_properties_.compression_level()) {
this->compression_level(col_path, col_props.compression_level());
}

if (col_props.encoding() != default_column_properties_.encoding()) {
this->encoding(col_path, col_props.encoding());
}
}
}

/// \brief EXPERIMENTAL: Use content-defined page chunking for all columns.
///
Expand Down
140 changes: 140 additions & 0 deletions cpp/src/parquet/properties_test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -149,5 +149,145 @@ TEST(TestReaderProperties, GetStreamInsufficientData) {
}
}

struct WriterPropertiesTestCase {
WriterPropertiesTestCase(std::shared_ptr<WriterProperties> props, std::string label)
: properties(std::move(props)), label(std::move(label)) {}

std::shared_ptr<WriterProperties> properties;
std::string label;
};

void PrintTo(const WriterPropertiesTestCase& p, std::ostream* os) { *os << p.label; }

class WriterPropertiesTest : public testing::TestWithParam<WriterPropertiesTestCase> {};

TEST_P(WriterPropertiesTest, RoundTripThroughBuilder) {
const std::shared_ptr<WriterProperties>& properties = GetParam().properties;
const std::vector<std::shared_ptr<ColumnPath>> columns{
ColumnPath::FromDotString("a"),
ColumnPath::FromDotString("b"),
};

const auto round_tripped = WriterProperties::Builder(*properties).build();

ASSERT_EQ(round_tripped->content_defined_chunking_enabled(),
properties->content_defined_chunking_enabled());
ASSERT_EQ(round_tripped->created_by(), properties->created_by());
ASSERT_EQ(round_tripped->data_pagesize(), properties->data_pagesize());
ASSERT_EQ(round_tripped->data_page_version(), properties->data_page_version());
ASSERT_EQ(round_tripped->dictionary_index_encoding(),
properties->dictionary_index_encoding());
ASSERT_EQ(round_tripped->dictionary_pagesize_limit(),
properties->dictionary_pagesize_limit());
ASSERT_EQ(round_tripped->file_encryption_properties(),
properties->file_encryption_properties());
ASSERT_EQ(round_tripped->max_rows_per_page(), properties->max_rows_per_page());
ASSERT_EQ(round_tripped->max_row_group_length(), properties->max_row_group_length());
ASSERT_EQ(round_tripped->memory_pool(), properties->memory_pool());
ASSERT_EQ(round_tripped->page_checksum_enabled(), properties->page_checksum_enabled());
ASSERT_EQ(round_tripped->size_statistics_level(), properties->size_statistics_level());
ASSERT_EQ(round_tripped->sorting_columns(), properties->sorting_columns());
ASSERT_EQ(round_tripped->store_decimal_as_integer(),
properties->store_decimal_as_integer());
ASSERT_EQ(round_tripped->write_batch_size(), properties->write_batch_size());
ASSERT_EQ(round_tripped->version(), properties->version());

const auto cdc_options = properties->content_defined_chunking_options();
const auto round_tripped_cdc_options =
round_tripped->content_defined_chunking_options();
ASSERT_EQ(round_tripped_cdc_options.min_chunk_size, cdc_options.min_chunk_size);
ASSERT_EQ(round_tripped_cdc_options.max_chunk_size, cdc_options.max_chunk_size);
ASSERT_EQ(round_tripped_cdc_options.norm_level, cdc_options.norm_level);

for (const auto& column : columns) {
const auto& column_properties = properties->column_properties(column);
const auto& round_tripped_col = round_tripped->column_properties(column);

ASSERT_EQ(round_tripped_col.compression(), column_properties.compression());
ASSERT_EQ(round_tripped_col.compression_level(),
column_properties.compression_level());
ASSERT_EQ(round_tripped_col.dictionary_enabled(),
column_properties.dictionary_enabled());
ASSERT_EQ(round_tripped_col.encoding(), column_properties.encoding());
ASSERT_EQ(round_tripped_col.max_statistics_size(),
column_properties.max_statistics_size());
ASSERT_EQ(round_tripped_col.page_index_enabled(),
column_properties.page_index_enabled());
ASSERT_EQ(round_tripped_col.statistics_enabled(),
column_properties.statistics_enabled());
}
}

std::vector<WriterPropertiesTestCase> writer_properties_test_cases() {
std::vector<WriterPropertiesTestCase> test_cases;

test_cases.emplace_back(default_writer_properties(), "default_properties");

{
WriterProperties::Builder builder;
const auto column_a = ColumnPath::FromDotString("a");

builder.created_by("parquet-cpp-properties-test");
builder.encoding(Encoding::BYTE_STREAM_SPLIT);
builder.compression(Compression::ZSTD);
builder.compression_level(2);
builder.disable_dictionary();
builder.disable_statistics();
builder.enable_content_defined_chunking();
builder.dictionary_pagesize_limit(DEFAULT_DICTIONARY_PAGE_SIZE_LIMIT - 1);
builder.write_batch_size(DEFAULT_WRITE_BATCH_SIZE - 1);
builder.max_row_group_length(DEFAULT_MAX_ROW_GROUP_LENGTH - 1);
builder.data_pagesize(kDefaultDataPageSize - 1);
builder.max_rows_per_page(kDefaultMaxRowsPerPage - 1);
builder.data_page_version(ParquetDataPageVersion::V2);
builder.version(ParquetVersion::type::PARQUET_2_4);
builder.enable_store_decimal_as_integer();
builder.disable_write_page_index();
builder.set_size_statistics_level(SizeStatisticsLevel::ColumnChunk);
builder.set_sorting_columns(
std::vector<SortingColumn>{SortingColumn{1, true, false}});

test_cases.emplace_back(builder.build(), "override_defaults");
}

const auto column_a = ColumnPath::FromDotString("a");
{
WriterProperties::Builder builder;
builder.disable_dictionary();
builder.enable_dictionary(column_a);
test_cases.emplace_back(builder.build(), "dictionary_column_override");
}
{
WriterProperties::Builder builder;
builder.disable_statistics();
builder.enable_statistics(column_a);
test_cases.emplace_back(builder.build(), "statistics_column_override");
}
{
WriterProperties::Builder builder;
builder.compression(Compression::SNAPPY);
builder.compression(column_a, Compression::UNCOMPRESSED);
builder.compression_level(column_a, 2);
test_cases.emplace_back(builder.build(), "compression_column_override");
}
{
WriterProperties::Builder builder;
builder.encoding(Encoding::UNDEFINED);
builder.encoding(column_a, Encoding::BYTE_STREAM_SPLIT);
test_cases.emplace_back(builder.build(), "encoding_column_override");
}
{
WriterProperties::Builder builder;
builder.disable_write_page_index();
builder.enable_write_page_index(column_a);
test_cases.emplace_back(builder.build(), "page_index_column_override");
}

return test_cases;
}

INSTANTIATE_TEST_SUITE_P(WriterPropertiesTest, WriterPropertiesTest,
::testing::ValuesIn(writer_properties_test_cases()));

} // namespace test
} // namespace parquet
67 changes: 67 additions & 0 deletions python/pyarrow/tests/test_dataset_encryption.py
Original file line number Diff line number Diff line change
Expand Up @@ -230,3 +230,70 @@ def unwrap_key(self, wrapped_key: bytes, _: str) -> bytes:
dataset = ds.dataset(path, format=file_format, filesystem=mockfs)
new_table = dataset.to_table()
assert table == new_table


@pytest.mark.skipif(
encryption_unavailable, reason="Parquet Encryption is not currently enabled"
)
def test_dataset_encryption_with_selected_column_statistics():
table = create_sample_table()

encryption_config = create_encryption_config()
decryption_config = create_decryption_config()
kms_connection_config = create_kms_connection_config()

crypto_factory = pe.CryptoFactory(kms_factory)
parquet_encryption_cfg = ds.ParquetEncryptionConfig(
crypto_factory, kms_connection_config, encryption_config
)
parquet_decryption_cfg = ds.ParquetDecryptionConfig(
crypto_factory, kms_connection_config, decryption_config
)

# create write_options with dataset encryption config
# and specify that statistics should be enabled for a subset of columns.
pformat = pa.dataset.ParquetFileFormat()
write_options = pformat.make_write_options(
encryption_config=parquet_encryption_cfg,
write_statistics=["year", "n_legs"]
)

mockfs = fs._MockFileSystem()
mockfs.create_dir("/")

ds.write_dataset(
data=table,
base_dir="sample_dataset",
format=pformat,
file_options=write_options,
filesystem=mockfs,
)

# Open Parquet files directly and check that statistics are present
# for the expected columns.
pq_scan_opts = ds.ParquetFragmentScanOptions(
decryption_config=parquet_decryption_cfg
)
pformat = pa.dataset.ParquetFileFormat(default_fragment_scan_options=pq_scan_opts)
dataset = ds.dataset("sample_dataset", format=pformat, filesystem=mockfs)

for fragment in dataset.get_fragments():
decryption_properties = crypto_factory.file_decryption_properties(
kms_connection_config, decryption_config, fragment.path, mockfs)
with pq.ParquetFile(
fragment.path,
decryption_properties=decryption_properties,
filesystem=mockfs,
) as parquet_file:
for rg_idx in range(parquet_file.metadata.num_row_groups):
row_group = parquet_file.metadata.row_group(rg_idx)

assert row_group.column(0).statistics is not None
assert row_group.column(0).statistics.min == 2019
assert row_group.column(0).statistics.max == 2022

assert row_group.column(1).statistics is not None
assert row_group.column(1).statistics.min == 2
assert row_group.column(1).statistics.max == 100

assert row_group.column(2).statistics is None
Loading