From f1c4711c96df04ca8af6cca3227d59ff2c0a6696 Mon Sep 17 00:00:00 2001 From: "github-actions[bot]" <41898282+github-actions[bot]@users.noreply.github.com> Date: Thu, 21 Dec 2023 08:08:10 -0500 Subject: [PATCH] Reduce memory usage and improve perf when reading VCF headers (#631) (#633) Co-authored-by: George Powley --- libtiledbvcf/src/dataset/tiledbvcfdataset.cc | 221 +++++++------------ libtiledbvcf/src/stats/column_buffer.h | 4 +- 2 files changed, 79 insertions(+), 146 deletions(-) diff --git a/libtiledbvcf/src/dataset/tiledbvcfdataset.cc b/libtiledbvcf/src/dataset/tiledbvcfdataset.cc index e011e00b6..c4b350499 100644 --- a/libtiledbvcf/src/dataset/tiledbvcfdataset.cc +++ b/libtiledbvcf/src/dataset/tiledbvcfdataset.cc @@ -1030,8 +1030,8 @@ std::shared_ptr TileDBVCFDataset::open_data_array( std::unordered_map TileDBVCFDataset::fetch_vcf_headers_v4( const std::vector& samples, std::unordered_map* lookup_map, - const bool all_samples, - const bool first_sample) const { + bool all_samples, + bool first_sample) const { LOG_DEBUG( "[fetch_vcf_headers_v4] start all_samples={} first_sample={} " "(VmRSS = {})", @@ -1062,175 +1062,108 @@ std::unordered_map TileDBVCFDataset::fetch_vcf_headers_v4( throw std::runtime_error( "Cannot fetch TileDB-VCF vcf headers; Array object unexpectedly null"); - Query query(*ctx_, *vcf_header_array_); + // Use a managed query to fetch the headers. Note that the previous + // implementation used the TILEDB_ROW_MAJOR layout to read the results in + // sorted order. However, reading results in sorted order is not required + // because we are adding the results to a map. The managed query uses the + // TILEDB_UNORDERED layout which is more efficient. + auto mq = + std::make_unique(vcf_header_array_, "fetch_vcf_headers_v4"); + mq->select_columns({"sample", "header"}); + // By default all samples are read. if (!samples.empty()) { - // If all samples but we have a sample list we know its sorted and can use - // the min/max - if (all_samples) { - query.add_range( - 0, samples[0].sample_name, samples[samples.size() - 1].sample_name); - } else { - for (const auto& sample : samples) { - query.add_range(0, sample.sample_name, sample.sample_name); - } + // If samples are provided, only read those samples + for (const auto& sample : samples) { + mq->select_point("sample", sample.sample_name); } - } else if (all_samples) { - // When no samples are passed grab the first one - auto non_empty_domain = vcf_header_array_->non_empty_domain_var(0); - if (!non_empty_domain.first.empty()) - query.add_range(0, non_empty_domain.first, non_empty_domain.second); } else if (first_sample) { - // When no samples are passed grab the first one + // Only read the first sample, based on the non-empty domain. We handle the + // case where the first sample was deleted below. auto non_empty_domain = vcf_header_array_->non_empty_domain_var(0); - if (!non_empty_domain.first.empty()) - query.add_range(0, non_empty_domain.first, non_empty_domain.first); + if (!non_empty_domain.first.empty()) { + mq->select_point("sample", non_empty_domain.first); + } else if (non_empty_domain.second.empty()) { + // If the lower and upper bounds of the non-empty domain are empty, then + // the array is empty or the array contains an annotation VCF with an + // empty sample name. In either case, we will read the VCF headers for + // "all" samples to avoid an infinite loop looking for the first sample. + first_sample = false; + } } - query.set_layout(TILEDB_ROW_MAJOR); - - uint64_t header_offset_element = 0; - uint64_t header_data_element = 0; - uint64_t sample_offset_element = 0; - uint64_t sample_data_element = 0; -#if TILEDB_VERSION_MAJOR == 2 && TILEDB_VERSION_MINOR < 2 - std::pair header_est_size = - query.est_result_size_var("header"); - header_offset_element = - std::max(header_est_size.first, static_cast(1)); - header_data_element = - std::max(header_est_size.second / sizeof(char), static_cast(1)); - - // Sample estimate - std::pair sample_est_size = - query.est_result_size_var("sample"); - sample_offset_element = - std::max(sample_est_size.first, static_cast(1)); - sample_data_element = - std::max(sample_est_size.second / sizeof(char), static_cast(1)); -#else - std::array header_est_size = query.est_result_size_var("header"); - header_offset_element = - std::max(header_est_size[0] / sizeof(uint64_t), static_cast(1)); - header_data_element = - std::max(header_est_size[1] / sizeof(char), static_cast(1)); - - // Sample estimate - std::array sample_est_size = query.est_result_size_var("sample"); - sample_offset_element = - std::max(sample_est_size[0] / sizeof(uint64_t), static_cast(1)); - sample_data_element = - std::max(sample_est_size[1] / sizeof(char), static_cast(1)); -#endif - - std::vector offsets(header_offset_element); - std::vector data(header_data_element); - std::vector sample_offsets(sample_offset_element); - std::vector sample_data(sample_data_element); - LOG_DEBUG( - "[fetch_vcf_headers_v4] allocate done (VmRSS = {})", - utils::memory_usage_str()); - Query::Status status; uint32_t sample_idx = 0; + while (!mq->is_complete()) { + mq->submit(); + auto num_rows = mq->results()->num_rows(); + + // Handle the case where the first sample was deleted - create a new + // query that reads all samples and exit the loop after processing the + // first sample. + if (first_sample && num_rows == 0) { + LOG_DEBUG( + "[fetch_vcf_headers_v4] the first sample was deleted, resubmitting"); + mq = std::make_unique( + vcf_header_array_, "fetch_vcf_headers_v4"); + mq->select_columns({"sample", "header"}); + } - do { - // Always reset buffer to avoid issue with core library and REST not using - // original buffer sizes - query.set_buffer("header", offsets, data); - query.set_buffer("sample", sample_offsets, sample_data); - - status = query.submit(); - - LOG_DEBUG( - "[fetch_vcf_headers_v4] query done (VmRSS = {})", - utils::memory_usage_str()); - - auto result_el = query.result_buffer_elements(); - uint64_t num_offsets = result_el["header"].first; - uint64_t num_chars = result_el["header"].second; - uint64_t num_samples_offsets = result_el["sample"].first; - uint64_t num_samples_chars = result_el["sample"].second; - - bool has_results = num_chars != 0; - - if (status == Query::Status::INCOMPLETE && !has_results) { - // If there are no results, double the size of the buffer and then - // resubmit the query. - - if (num_chars == 0) - data.resize(data.size() * 2); - - if (num_offsets == 0) - offsets.resize(offsets.size() * 2); - - if (num_samples_chars == 0) - sample_data.resize(sample_data.size() * 2); - - if (num_samples_offsets == 0) - sample_offsets.resize(sample_offsets.size() * 2); - - } else if (has_results) { - // Parse the samples. - - for (size_t offset_idx = 0; offset_idx < num_offsets; ++offset_idx) { - // Get sample - char* sample_beg = sample_data.data() + sample_offsets[offset_idx]; - uint64_t sample_end = offset_idx == num_samples_offsets - 1 ? - num_samples_chars : - sample_offsets[offset_idx + 1]; - uint64_t sample_size = sample_end - sample_offsets[offset_idx]; - std::string sample(sample_beg, sample_size); - - char* beg_hdr = data.data() + offsets[offset_idx]; - uint64_t end = - offset_idx == num_offsets - 1 ? num_chars : offsets[offset_idx + 1]; - uint64_t start = offsets[offset_idx]; - uint64_t hdr_size = end - start; + for (unsigned int i = 0; i < num_rows; i++) { + // Create a string from the string_view to guarantee null termination + // as required by htslib APIs. + auto hdr_str = std::string(mq->string_view("header", i)); + auto sample = std::string(mq->string_view("sample", i)); - std::string hdr_str(beg_hdr, hdr_size); + bcf_hdr_t* hdr = bcf_hdr_init("r"); + if (!hdr) { + throw std::runtime_error( + "Error fetching VCF header data; error allocating VCF header."); + } - bcf_hdr_t* hdr = bcf_hdr_init("r"); - if (!hdr) - throw std::runtime_error( - "Error fetching VCF header data; error allocating VCF header."); + if (0 != bcf_hdr_parse(hdr, const_cast(hdr_str.c_str()))) { + throw std::runtime_error( + "TileDBVCFDataset::fetch_vcf_headers_v4: Error parsing the BCF " + "header for sample " + + sample + "."); + } - if (0 != bcf_hdr_parse(hdr, const_cast(hdr_str.c_str()))) { + if (!sample.empty()) { + if (0 != bcf_hdr_add_sample(hdr, sample.c_str())) { throw std::runtime_error( - "TileDBVCFDataset::fetch_vcf_headers_v4: Error parsing the BCF " - "header for sample " + + "TileDBVCFDataset::fetch_vcf_headers_v4: Error adding sample " + "to " + "BCF header for sample " + sample + "."); } + } - if (!sample.empty()) { - if (0 != bcf_hdr_add_sample(hdr, sample.c_str())) { - throw std::runtime_error( - "TileDBVCFDataset::fetch_vcf_headers_v4: Error adding sample " - "to " - "BCF header for sample " + - sample + "."); - } - } + if (bcf_hdr_sync(hdr) < 0) { + throw std::runtime_error( + "Error in bcftools: failed to update VCF header."); + } - if (bcf_hdr_sync(hdr) < 0) - throw std::runtime_error( - "Error in bcftools: failed to update VCF header."); + result.emplace( + std::make_pair(sample_idx, SafeBCFHdr(hdr, bcf_hdr_destroy))); + if (lookup_map != nullptr) { + (*lookup_map)[sample] = sample_idx; + } - result.emplace( - std::make_pair(sample_idx, SafeBCFHdr(hdr, bcf_hdr_destroy))); - if (lookup_map != nullptr) - (*lookup_map)[sample] = sample_idx; + ++sample_idx; - ++sample_idx; + // Exit the query loop if we only want the first sample. + if (first_sample) { + goto done; } } - } while (status == Query::Status::INCOMPLETE); + } +done: if (tiledb_stats_enabled_) tiledb::Stats::enable(); LOG_DEBUG( "[fetch_vcf_headers_v4] done (VmRSS = {})", utils::memory_usage_str()); return result; -} // namespace vcf +} std::unordered_map TileDBVCFDataset::fetch_vcf_headers( const std::vector& samples) const { diff --git a/libtiledbvcf/src/stats/column_buffer.h b/libtiledbvcf/src/stats/column_buffer.h index 4147fe5d2..575b0e6c0 100644 --- a/libtiledbvcf/src/stats/column_buffer.h +++ b/libtiledbvcf/src/stats/column_buffer.h @@ -47,9 +47,9 @@ using namespace tiledb; * */ class ColumnBuffer { - inline static const size_t DEFAULT_ALLOC_BYTES = 1 << 26; // 64 MiB + inline static const size_t DEFAULT_ALLOC_BYTES = 1 << 27; // 128 MiB inline static const std::string CONFIG_KEY_INIT_BYTES = - "soma.init_buffer_bytes"; + "vcf.init_buffer_bytes"; public: //===================================================================