Skip to content

Commit 3e7e9a1

Browse files
Reduce memory usage and improve perf when reading VCF headers (#631) (#632)
Co-authored-by: George Powley <[email protected]>
1 parent 7c10e3a commit 3e7e9a1

File tree

2 files changed

+79
-146
lines changed

2 files changed

+79
-146
lines changed

Diff for: libtiledbvcf/src/dataset/tiledbvcfdataset.cc

+77-144
Original file line numberDiff line numberDiff line change
@@ -1030,8 +1030,8 @@ std::shared_ptr<tiledb::Array> TileDBVCFDataset::open_data_array(
10301030
std::unordered_map<uint32_t, SafeBCFHdr> TileDBVCFDataset::fetch_vcf_headers_v4(
10311031
const std::vector<SampleAndId>& samples,
10321032
std::unordered_map<std::string, size_t>* lookup_map,
1033-
const bool all_samples,
1034-
const bool first_sample) const {
1033+
bool all_samples,
1034+
bool first_sample) const {
10351035
LOG_DEBUG(
10361036
"[fetch_vcf_headers_v4] start all_samples={} first_sample={} "
10371037
"(VmRSS = {})",
@@ -1062,175 +1062,108 @@ std::unordered_map<uint32_t, SafeBCFHdr> TileDBVCFDataset::fetch_vcf_headers_v4(
10621062
throw std::runtime_error(
10631063
"Cannot fetch TileDB-VCF vcf headers; Array object unexpectedly null");
10641064

1065-
Query query(*ctx_, *vcf_header_array_);
1065+
// Use a managed query to fetch the headers. Note that the previous
1066+
// implementation used the TILEDB_ROW_MAJOR layout to read the results in
1067+
// sorted order. However, reading results in sorted order is not required
1068+
// because we are adding the results to a map. The managed query uses the
1069+
// TILEDB_UNORDERED layout which is more efficient.
1070+
auto mq =
1071+
std::make_unique<ManagedQuery>(vcf_header_array_, "fetch_vcf_headers_v4");
1072+
mq->select_columns({"sample", "header"});
10661073

1074+
// By default all samples are read.
10671075
if (!samples.empty()) {
1068-
// If all samples but we have a sample list we know its sorted and can use
1069-
// the min/max
1070-
if (all_samples) {
1071-
query.add_range(
1072-
0, samples[0].sample_name, samples[samples.size() - 1].sample_name);
1073-
} else {
1074-
for (const auto& sample : samples) {
1075-
query.add_range(0, sample.sample_name, sample.sample_name);
1076-
}
1076+
// If samples are provided, only read those samples
1077+
for (const auto& sample : samples) {
1078+
mq->select_point("sample", sample.sample_name);
10771079
}
1078-
} else if (all_samples) {
1079-
// When no samples are passed grab the first one
1080-
auto non_empty_domain = vcf_header_array_->non_empty_domain_var(0);
1081-
if (!non_empty_domain.first.empty())
1082-
query.add_range(0, non_empty_domain.first, non_empty_domain.second);
10831080
} else if (first_sample) {
1084-
// When no samples are passed grab the first one
1081+
// Only read the first sample, based on the non-empty domain. We handle the
1082+
// case where the first sample was deleted below.
10851083
auto non_empty_domain = vcf_header_array_->non_empty_domain_var(0);
1086-
if (!non_empty_domain.first.empty())
1087-
query.add_range(0, non_empty_domain.first, non_empty_domain.first);
1084+
if (!non_empty_domain.first.empty()) {
1085+
mq->select_point("sample", non_empty_domain.first);
1086+
} else if (non_empty_domain.second.empty()) {
1087+
// If the lower and upper bounds of the non-empty domain are empty, then
1088+
// the array is empty or the array contains an annotation VCF with an
1089+
// empty sample name. In either case, we will read the VCF headers for
1090+
// "all" samples to avoid an infinite loop looking for the first sample.
1091+
first_sample = false;
1092+
}
10881093
}
1089-
query.set_layout(TILEDB_ROW_MAJOR);
1090-
1091-
uint64_t header_offset_element = 0;
1092-
uint64_t header_data_element = 0;
1093-
uint64_t sample_offset_element = 0;
1094-
uint64_t sample_data_element = 0;
1095-
#if TILEDB_VERSION_MAJOR == 2 && TILEDB_VERSION_MINOR < 2
1096-
std::pair<uint64_t, uint64_t> header_est_size =
1097-
query.est_result_size_var("header");
1098-
header_offset_element =
1099-
std::max(header_est_size.first, static_cast<uint64_t>(1));
1100-
header_data_element =
1101-
std::max(header_est_size.second / sizeof(char), static_cast<uint64_t>(1));
1102-
1103-
// Sample estimate
1104-
std::pair<uint64_t, uint64_t> sample_est_size =
1105-
query.est_result_size_var("sample");
1106-
sample_offset_element =
1107-
std::max(sample_est_size.first, static_cast<uint64_t>(1));
1108-
sample_data_element =
1109-
std::max(sample_est_size.second / sizeof(char), static_cast<uint64_t>(1));
1110-
#else
1111-
std::array<uint64_t, 2> header_est_size = query.est_result_size_var("header");
1112-
header_offset_element =
1113-
std::max(header_est_size[0] / sizeof(uint64_t), static_cast<uint64_t>(1));
1114-
header_data_element =
1115-
std::max(header_est_size[1] / sizeof(char), static_cast<uint64_t>(1));
1116-
1117-
// Sample estimate
1118-
std::array<uint64_t, 2> sample_est_size = query.est_result_size_var("sample");
1119-
sample_offset_element =
1120-
std::max(sample_est_size[0] / sizeof(uint64_t), static_cast<uint64_t>(1));
1121-
sample_data_element =
1122-
std::max(sample_est_size[1] / sizeof(char), static_cast<uint64_t>(1));
1123-
#endif
1124-
1125-
std::vector<uint64_t> offsets(header_offset_element);
1126-
std::vector<char> data(header_data_element);
1127-
std::vector<uint64_t> sample_offsets(sample_offset_element);
1128-
std::vector<char> sample_data(sample_data_element);
1129-
LOG_DEBUG(
1130-
"[fetch_vcf_headers_v4] allocate done (VmRSS = {})",
1131-
utils::memory_usage_str());
11321094

1133-
Query::Status status;
11341095
uint32_t sample_idx = 0;
1096+
while (!mq->is_complete()) {
1097+
mq->submit();
1098+
auto num_rows = mq->results()->num_rows();
1099+
1100+
// Handle the case where the first sample was deleted - create a new
1101+
// query that reads all samples and exit the loop after processing the
1102+
// first sample.
1103+
if (first_sample && num_rows == 0) {
1104+
LOG_DEBUG(
1105+
"[fetch_vcf_headers_v4] the first sample was deleted, resubmitting");
1106+
mq = std::make_unique<ManagedQuery>(
1107+
vcf_header_array_, "fetch_vcf_headers_v4");
1108+
mq->select_columns({"sample", "header"});
1109+
}
11351110

1136-
do {
1137-
// Always reset buffer to avoid issue with core library and REST not using
1138-
// original buffer sizes
1139-
query.set_buffer("header", offsets, data);
1140-
query.set_buffer("sample", sample_offsets, sample_data);
1141-
1142-
status = query.submit();
1143-
1144-
LOG_DEBUG(
1145-
"[fetch_vcf_headers_v4] query done (VmRSS = {})",
1146-
utils::memory_usage_str());
1147-
1148-
auto result_el = query.result_buffer_elements();
1149-
uint64_t num_offsets = result_el["header"].first;
1150-
uint64_t num_chars = result_el["header"].second;
1151-
uint64_t num_samples_offsets = result_el["sample"].first;
1152-
uint64_t num_samples_chars = result_el["sample"].second;
1153-
1154-
bool has_results = num_chars != 0;
1155-
1156-
if (status == Query::Status::INCOMPLETE && !has_results) {
1157-
// If there are no results, double the size of the buffer and then
1158-
// resubmit the query.
1159-
1160-
if (num_chars == 0)
1161-
data.resize(data.size() * 2);
1162-
1163-
if (num_offsets == 0)
1164-
offsets.resize(offsets.size() * 2);
1165-
1166-
if (num_samples_chars == 0)
1167-
sample_data.resize(sample_data.size() * 2);
1168-
1169-
if (num_samples_offsets == 0)
1170-
sample_offsets.resize(sample_offsets.size() * 2);
1171-
1172-
} else if (has_results) {
1173-
// Parse the samples.
1174-
1175-
for (size_t offset_idx = 0; offset_idx < num_offsets; ++offset_idx) {
1176-
// Get sample
1177-
char* sample_beg = sample_data.data() + sample_offsets[offset_idx];
1178-
uint64_t sample_end = offset_idx == num_samples_offsets - 1 ?
1179-
num_samples_chars :
1180-
sample_offsets[offset_idx + 1];
1181-
uint64_t sample_size = sample_end - sample_offsets[offset_idx];
1182-
std::string sample(sample_beg, sample_size);
1183-
1184-
char* beg_hdr = data.data() + offsets[offset_idx];
1185-
uint64_t end =
1186-
offset_idx == num_offsets - 1 ? num_chars : offsets[offset_idx + 1];
1187-
uint64_t start = offsets[offset_idx];
1188-
uint64_t hdr_size = end - start;
1111+
for (unsigned int i = 0; i < num_rows; i++) {
1112+
// Create a string from the string_view to guarantee null termination
1113+
// as required by htslib APIs.
1114+
auto hdr_str = std::string(mq->string_view("header", i));
1115+
auto sample = std::string(mq->string_view("sample", i));
11891116

1190-
std::string hdr_str(beg_hdr, hdr_size);
1117+
bcf_hdr_t* hdr = bcf_hdr_init("r");
1118+
if (!hdr) {
1119+
throw std::runtime_error(
1120+
"Error fetching VCF header data; error allocating VCF header.");
1121+
}
11911122

1192-
bcf_hdr_t* hdr = bcf_hdr_init("r");
1193-
if (!hdr)
1194-
throw std::runtime_error(
1195-
"Error fetching VCF header data; error allocating VCF header.");
1123+
if (0 != bcf_hdr_parse(hdr, const_cast<char*>(hdr_str.c_str()))) {
1124+
throw std::runtime_error(
1125+
"TileDBVCFDataset::fetch_vcf_headers_v4: Error parsing the BCF "
1126+
"header for sample " +
1127+
sample + ".");
1128+
}
11961129

1197-
if (0 != bcf_hdr_parse(hdr, const_cast<char*>(hdr_str.c_str()))) {
1130+
if (!sample.empty()) {
1131+
if (0 != bcf_hdr_add_sample(hdr, sample.c_str())) {
11981132
throw std::runtime_error(
1199-
"TileDBVCFDataset::fetch_vcf_headers_v4: Error parsing the BCF "
1200-
"header for sample " +
1133+
"TileDBVCFDataset::fetch_vcf_headers_v4: Error adding sample "
1134+
"to "
1135+
"BCF header for sample " +
12011136
sample + ".");
12021137
}
1138+
}
12031139

1204-
if (!sample.empty()) {
1205-
if (0 != bcf_hdr_add_sample(hdr, sample.c_str())) {
1206-
throw std::runtime_error(
1207-
"TileDBVCFDataset::fetch_vcf_headers_v4: Error adding sample "
1208-
"to "
1209-
"BCF header for sample " +
1210-
sample + ".");
1211-
}
1212-
}
1140+
if (bcf_hdr_sync(hdr) < 0) {
1141+
throw std::runtime_error(
1142+
"Error in bcftools: failed to update VCF header.");
1143+
}
12131144

1214-
if (bcf_hdr_sync(hdr) < 0)
1215-
throw std::runtime_error(
1216-
"Error in bcftools: failed to update VCF header.");
1145+
result.emplace(
1146+
std::make_pair(sample_idx, SafeBCFHdr(hdr, bcf_hdr_destroy)));
1147+
if (lookup_map != nullptr) {
1148+
(*lookup_map)[sample] = sample_idx;
1149+
}
12171150

1218-
result.emplace(
1219-
std::make_pair(sample_idx, SafeBCFHdr(hdr, bcf_hdr_destroy)));
1220-
if (lookup_map != nullptr)
1221-
(*lookup_map)[sample] = sample_idx;
1151+
++sample_idx;
12221152

1223-
++sample_idx;
1153+
// Exit the query loop if we only want the first sample.
1154+
if (first_sample) {
1155+
goto done;
12241156
}
12251157
}
1226-
} while (status == Query::Status::INCOMPLETE);
1158+
}
12271159

1160+
done:
12281161
if (tiledb_stats_enabled_)
12291162
tiledb::Stats::enable();
12301163
LOG_DEBUG(
12311164
"[fetch_vcf_headers_v4] done (VmRSS = {})", utils::memory_usage_str());
12321165
return result;
1233-
} // namespace vcf
1166+
}
12341167

12351168
std::unordered_map<uint32_t, SafeBCFHdr> TileDBVCFDataset::fetch_vcf_headers(
12361169
const std::vector<SampleAndId>& samples) const {

Diff for: libtiledbvcf/src/stats/column_buffer.h

+2-2
Original file line numberDiff line numberDiff line change
@@ -47,9 +47,9 @@ using namespace tiledb;
4747
*
4848
*/
4949
class ColumnBuffer {
50-
inline static const size_t DEFAULT_ALLOC_BYTES = 1 << 26; // 64 MiB
50+
inline static const size_t DEFAULT_ALLOC_BYTES = 1 << 27; // 128 MiB
5151
inline static const std::string CONFIG_KEY_INIT_BYTES =
52-
"soma.init_buffer_bytes";
52+
"vcf.init_buffer_bytes";
5353

5454
public:
5555
//===================================================================

0 commit comments

Comments
 (0)