diff --git a/src/storage/invertedindex/column_inverter.cpp b/src/storage/invertedindex/column_inverter.cpp index 50d1e1c2c2..9550e8f82c 100644 --- a/src/storage/invertedindex/column_inverter.cpp +++ b/src/storage/invertedindex/column_inverter.cpp @@ -60,8 +60,9 @@ SizeT ColumnInverter::InvertColumn(SharedPtr column_vector, u32 ro SizeT term_count_sum = 0; for (SizeT i = 0; i < row_count; ++i) { String data = column_vector->ToString(row_offset + i); - if (data.empty()) + if (data.empty()) { continue; + } SizeT term_count = InvertColumn(begin_doc_id + i, data); column_lengths[i] = term_count; term_count_sum += term_count; @@ -246,8 +247,9 @@ void ColumnInverter::SortForOfflineDump() { // Data within each group void ColumnInverter::SpillSortResults(FILE *spill_file, u64 &tuple_count) { // spill sort results for external merge sort - if (positions_.empty()) + if (positions_.empty()) { return; + } // size of this Run in bytes u32 data_size = 0; u64 data_size_pos = ftell(spill_file); diff --git a/src/storage/invertedindex/common/external_sort_merger.cppm b/src/storage/invertedindex/common/external_sort_merger.cppm index 0142cc3b14..1217f04e90 100644 --- a/src/storage/invertedindex/common/external_sort_merger.cppm +++ b/src/storage/invertedindex/common/external_sort_merger.cppm @@ -169,6 +169,44 @@ struct KeyAddress 0; } }; +template +struct KeyAddress { + char *data{nullptr}; + u64 addr; + u32 idx; + + KeyAddress(char *p, u64 ad, u32 i) { + data = p; + addr = ad; + idx = i; + } + + KeyAddress() { + data = nullptr; + addr = -1; + idx = -1; + } + + TermTuple KEY() { return TermTuple(data + sizeof(LenType), LEN()); } + TermTuple KEY() const { return TermTuple(data + sizeof(LenType), LEN()); } + LenType LEN() const { return *(LenType *)data; } + u64 &ADDR() { return addr; } + u64 ADDR() const { return addr; } + u32 IDX() const { return idx; } + u32 &IDX() { return idx; } + + int Compare(const KeyAddress &p) const { + return KEY().Compare(p.KEY()); + } + + bool operator==(const KeyAddress &other) const { return Compare(other) == 0; } + + bool operator>(const KeyAddress &other) const { return Compare(other) < 0; } + + bool operator<(const KeyAddress &other) const { return Compare(other) > 0; } +}; + + export template class SortMerger { typedef SortMerger self_t; diff --git a/src/storage/invertedindex/memory_indexer.cpp b/src/storage/invertedindex/memory_indexer.cpp index d80376f3e5..2a43a996f2 100644 --- a/src/storage/invertedindex/memory_indexer.cpp +++ b/src/storage/invertedindex/memory_indexer.cpp @@ -91,8 +91,9 @@ MemoryIndexer::~MemoryIndexer() { } void MemoryIndexer::Insert(SharedPtr column_vector, u32 row_offset, u32 row_count, bool offline) { - if (is_spilled_) + if (is_spilled_) { Load(); + } u64 seq_inserted(0); u32 doc_count(0); @@ -121,8 +122,9 @@ void MemoryIndexer::Insert(SharedPtr column_vector, u32 row_offset auto func = [this, task, inverter](int id) { SizeT column_length_sum = inverter->InvertColumn(task->column_vector_, task->row_offset_, task->row_count_, task->start_doc_id_); column_length_sum_ += column_length_sum; - if (column_length_sum > 0) + if (column_length_sum > 0) { inverter->SortForOfflineDump(); + } this->ring_sorted_.Put(task->task_seq_, inverter); }; inverting_thread_pool_.push(std::move(func)); @@ -145,8 +147,9 @@ void MemoryIndexer::Insert(SharedPtr column_vector, u32 row_offset } void MemoryIndexer::InsertGap(u32 row_count) { - if (is_spilled_) + if (is_spilled_) { Load(); + } std::unique_lock lock(mutex_); doc_count_ += row_count; @@ -155,14 +158,16 @@ void MemoryIndexer::InsertGap(u32 row_count) { void MemoryIndexer::Commit(bool offline) { if (offline) { commiting_thread_pool_.push([this](int id) { this->CommitOffline(); }); - } else + } else { commiting_thread_pool_.push([this](int id) { this->CommitSync(); }); + } } SizeT MemoryIndexer::CommitOffline(SizeT wait_if_empty_ms) { std::unique_lock lock(mutex_commit_, std::defer_lock); - if (!lock.try_lock()) + if (!lock.try_lock()) { return 0; + } if (nullptr == spill_file_handle_) { PrepareSpillFile(); @@ -200,14 +205,16 @@ SizeT MemoryIndexer::CommitSync(SizeT wait_if_empty_ms) { }; std::unique_lock lock(mutex_commit_, std::defer_lock); - if (!lock.try_lock()) + if (!lock.try_lock()) { return 0; + } while (1) { this->ring_sorted_.GetBatch(inverters, wait_if_empty_ms); // num_merged = inverters.size(); - if (inverters.empty()) + if (inverters.empty()) { break; + } for (auto &inverter : inverters) { inverter->GeneratePosting(); num_generated += inverter->GetMerged(); @@ -353,8 +360,9 @@ void MemoryIndexer::OfflineDump() { // 2. Generate posting // 3. Dump disk segment data // LOG_INFO(fmt::format("MemoryIndexer::OfflineDump begin, num_runs_ {}", num_runs_)); - if (tuple_count_ == 0) + if (tuple_count_ == 0) { return; + } FinalSpillFile(); constexpr u32 buffer_size_of_each_run = 2 * 1024 * 1024; SortMerger *merger = new SortMerger(spill_full_path_.c_str(), num_runs_, buffer_size_of_each_run * num_runs_, 2);