Replies: 5 comments 3 replies
-
You might want to try creating |
Beta Was this translation helpful? Give feedback.
-
Hello @adamreeve , thanks for the inputs. I have tried to change and use the arrow::Result<std::shared_ptr<arrow::RecordBatchReader>> ReadFilteredParquetRecordReader(
const std::string &parquet_path, const std::int64_t start, const std::int64_t end)
{
// Create a local filesystem
ARROW_ASSIGN_OR_RAISE(auto filesystem, fs::FileSystemFromUri("file:///"));
// Set up file selector for a single file
fs::FileInfo file_info(parquet_path, fs::FileType::File);
// Create a Parquet file format
auto format = std::make_shared<ds::ParquetFileFormat>();
auto parquet_scan_options = std::make_shared<arrow::dataset::ParquetFragmentScanOptions>();
// Configure general Parquet reader settings
auto reader_properties = std::make_shared<parquet::ReaderProperties>(arrow::default_memory_pool());
reader_properties->set_buffer_size(64*1024*1024); // 64 MB buffer size
reader_properties->enable_buffered_stream();
// Configure Arrow-specific Parquet reader settings
auto arrow_reader_props = std::make_shared<parquet::ArrowReaderProperties>();
arrow_reader_props->set_batch_size(10000); // default 64 * 1024// Configure general Parquet reader settings
arrow_reader_props->set_use_threads(true); // Enable multithreading
arrow_reader_props->set_pre_buffer(false); // Enable pre-buffering
parquet_scan_options->reader_properties = reader_properties;
parquet_scan_options->arrow_reader_properties = arrow_reader_props;
auto scan_options = std::make_shared<arrow::dataset::ScanOptions>();
arrow::dataset::FileSystemFactoryOptions options;
ARROW_ASSIGN_OR_RAISE(auto factory, arrow::dataset::FileSystemDatasetFactory::Make(
filesystem, {file_info}, format, options));
ARROW_ASSIGN_OR_RAISE(auto dataset, factory->Finish());
// Create a scanner builder
ARROW_ASSIGN_OR_RAISE(auto scan_builder, dataset->NewScan());
ARROW_RETURN_NOT_OK(scan_builder->FragmentScanOptions(parquet_scan_options));
// Set a filter: e.g., column start <= "INDEX" <= end
auto filter = cp::and_(cp::greater_equal(cp::field_ref("INDEX"), cp::literal(start)),
cp::less_equal(cp::field_ref("INDEX"), cp::literal(end)));
ARROW_RETURN_NOT_OK(scan_builder->Filter(filter));
// Specify the columns to read, e.g., "DATA"
ARROW_RETURN_NOT_OK(scan_builder->Project({"DATA"}));
// Finish the scanner
ARROW_ASSIGN_OR_RAISE(auto scanner, scan_builder->Finish());
scanner->options()->use_threads = true; // Enable multithreading for better performance
scanner->options()->cache_metadata = false; // Enable async reading
// scanner->options()->batch_readahead = 10; // Set batch readahead for performance
// scanner->options()->fragment_readahead = 5; // Set fragment readahead for performance
// Read the filtered table
return scanner->ToRecordBatchReader();
} But the memory usage reduction was less pronounced than I was expecting. I was able to use the parquet stats, to determine the row groups that I need and reading sequentially these row groups controlled the memory usage, but it was slower than using the dataset API. |
Beta Was this translation helpful? Give feedback.
-
Also you can try to use another MemoryPool to see if that helps. See https://arrow.apache.org/docs/cpp/env_vars.html#envvar-ARROW_DEFAULT_MEMORY_POOL |
Beta Was this translation helpful? Give feedback.
-
Hello. The suggestion by @adamreeve to reduce the batch_readahead was effective in reduce the memory consumption with a increase in time to read the file. What I found to be more unexpected is that the memory used (with readahead of 16) to be many times greater than the size of the uncompressed file (the parquet file has 6.8GB and the saved column 7.6GB) at about 70GB. |
Beta Was this translation helpful? Give feedback.
-
In addition to |
Beta Was this translation helpful? Give feedback.
Uh oh!
There was an error while loading. Please reload this page.
-
Hello. I am trying to use the cpp Dataset API to use predicate pushdown and projections to simplify reading a parquet file. However it appear that does not matter (from the memory usage point of view) if I materialize the scanner directly to a table (using the method
scanner->ToTable()
or the methodscanner->ToRecordBatchReader()
I have two functions that differs only on the return type (
arrow::RecordBatchReader
) orarrow::Table
I was expecting to reduce the memory usage using the RecordBatchReader approach. Next I iterate over the batches as shown here:
But the memory usage is almost the same of the case of returning a table directly and iterating over its chunks.
Thanks =)
Beta Was this translation helpful? Give feedback.
All reactions