diff --git a/src/storage/knn_index/knn_hnsw/data_store/lvq_vec_store.cppm b/src/storage/knn_index/knn_hnsw/data_store/lvq_vec_store.cppm index 7cde4c6a1c..638c6616ea 100644 --- a/src/storage/knn_index/knn_hnsw/data_store/lvq_vec_store.cppm +++ b/src/storage/knn_index/knn_hnsw/data_store/lvq_vec_store.cppm @@ -115,7 +115,8 @@ public: normalized = MakeUniqueForOverwrite(this->dim_); DataType norm = 0; for (SizeT j = 0; j < this->dim_; ++j) { - norm += src[j] * src[j]; + DataType x = src[j]; + norm += x * x; } norm = std::sqrt(norm); if (norm == 0) { diff --git a/src/storage/wal/catalog_delta_entry.cpp b/src/storage/wal/catalog_delta_entry.cpp index 126626f1dd..a0ed431d81 100644 --- a/src/storage/wal/catalog_delta_entry.cpp +++ b/src/storage/wal/catalog_delta_entry.cpp @@ -1275,7 +1275,7 @@ void GlobalCatalogDeltaEntry::AddDeltaEntryInner(CatalogDeltaEntry *delta_entry) bool found = iter != delta_ops_.end(); if (found) { CatalogDeltaOperation *op = iter->second.get(); - if (op->commit_ts_ < last_full_ckp_ts_) { + if (op->commit_ts_ <= last_full_ckp_ts_) { delta_ops_.erase(iter); found = false; } diff --git a/src/storage/wal/wal_entry.cpp b/src/storage/wal/wal_entry.cpp index e6a90d7201..48b20707e5 100644 --- a/src/storage/wal/wal_entry.cpp +++ b/src/storage/wal/wal_entry.cpp @@ -145,7 +145,7 @@ String WalBlockInfo::ToString() const { } } ss << "]"; - return ss.str(); + return std::move(ss).str(); } WalSegmentInfo::WalSegmentInfo(SegmentEntry *segment_entry) @@ -201,7 +201,7 @@ String WalSegmentInfo::ToString() const { ss << "segment_id: " << segment_id_ << ", column_count: " << column_count_ << ", row_count: " << row_count_ << ", actual_row_count: " << actual_row_count_ << ", row_capacity: " << row_capacity_; ss << ", block_info count: " << block_infos_.size() << std::endl; - return ss.str(); + return std::move(ss).str(); } WalChunkIndexInfo::WalChunkIndexInfo(ChunkIndexEntry *chunk_index_entry) @@ -297,7 +297,7 @@ String WalChunkIndexInfo::ToString() const { std::stringstream ss; ss << "chunk_id: " << chunk_id_ << ", base_name: " << base_name_ << ", base_rowid: " << base_rowid_.ToString() << ", row_count: " << row_count_ << ", deprecate_ts: " << deprecate_ts_; - return ss.str(); + return std::move(ss).str(); } SharedPtr WalCmd::ReadAdv(const char *&ptr, i32 max_bytes) { @@ -934,14 +934,14 @@ String WalCmdCreateDatabase::ToString() const { ss << "db name: " << db_name_ << std::endl; ss << "db dir: " << db_dir_tail_ << std::endl; ss << "db comment: " << db_comment_ << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdDropDatabase::ToString() const { std::stringstream ss; ss << "Drop Database: " << std::endl; ss << "db name: " << db_name_ << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdCreateTable::ToString() const { @@ -950,7 +950,7 @@ String WalCmdCreateTable::ToString() const { ss << "db name: " << db_name_ << std::endl; ss << "table name: " << table_def_->ToString() << std::endl; ss << "table dir: " << table_dir_tail_ << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdDropTable::ToString() const { @@ -958,7 +958,7 @@ String WalCmdDropTable::ToString() const { ss << "Drop Table: " << std::endl; ss << "db name: " << db_name_ << std::endl; ss << "table name: " << table_name_ << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdCreateIndex::ToString() const { @@ -967,7 +967,7 @@ String WalCmdCreateIndex::ToString() const { ss << "db name: " << db_name_ << std::endl; ss << "table name: " << table_name_ << std::endl; ss << "index def: " << index_base_->ToString() << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdDropIndex::ToString() const { @@ -976,7 +976,7 @@ String WalCmdDropIndex::ToString() const { ss << "db name: " << db_name_ << std::endl; ss << "table name: " << table_name_ << std::endl; ss << "index name: " << index_name_ << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdImport::ToString() const { @@ -986,7 +986,7 @@ String WalCmdImport::ToString() const { ss << "table name: " << table_name_ << std::endl; auto &segment_info = segment_info_; ss << segment_info.ToString() << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdAppend::ToString() const { @@ -995,7 +995,7 @@ String WalCmdAppend::ToString() const { ss << "db name: " << db_name_ << std::endl; ss << "table name: " << table_name_ << std::endl; ss << block_->ToBriefString(); - return ss.str(); + return std::move(ss).str(); } String WalCmdDelete::ToString() const { @@ -1004,7 +1004,7 @@ String WalCmdDelete::ToString() const { ss << "db name: " << db_name_ << std::endl; ss << "table name: " << table_name_ << std::endl; ss << "delete row cout: " << row_ids_.size() << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdSetSegmentStatusSealed::ToString() const { @@ -1013,7 +1013,7 @@ String WalCmdSetSegmentStatusSealed::ToString() const { ss << "db name: " << db_name_ << std::endl; ss << "table name: " << table_name_ << std::endl; ss << "segment id: " << segment_id_ << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdUpdateSegmentBloomFilterData::ToString() const { @@ -1022,7 +1022,7 @@ String WalCmdUpdateSegmentBloomFilterData::ToString() const { ss << "db name: " << db_name_ << std::endl; ss << "table name: " << table_name_ << std::endl; ss << "segment id: " << segment_id_ << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdCheckpoint::ToString() const { @@ -1031,7 +1031,7 @@ String WalCmdCheckpoint::ToString() const { ss << "catalog path: " << fmt::format("{}/{}", catalog_path_, catalog_name_) << std::endl; ss << "max commit ts: " << max_commit_ts_ << std::endl; ss << "is full checkpoint: " << is_full_checkpoint_ << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdCompact::ToString() const { @@ -1049,7 +1049,7 @@ String WalCmdCompact::ToString() const { ss << new_seg_info.ToString() << " | "; } ss << std::endl; - return String(); + return std::move(ss).str(); } String WalCmdOptimize::ToString() const { @@ -1063,7 +1063,7 @@ String WalCmdOptimize::ToString() const { ss << param_ptr->ToString() << " | "; } ss << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdDumpIndex::ToString() const { @@ -1082,7 +1082,7 @@ String WalCmdDumpIndex::ToString() const { ss << chunk_id << " | "; } ss << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdRenameTable::ToString() const { @@ -1091,7 +1091,7 @@ String WalCmdRenameTable::ToString() const { ss << "db name: " << db_name_ << std::endl; ss << "table name: " << table_name_ << std::endl; ss << "new table name: " << new_table_name_ << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalCmdAddColumns::ToString() const { @@ -1103,7 +1103,7 @@ String WalCmdAddColumns::ToString() const { for (auto &column_def : column_defs_) { ss << column_def->ToString() << " | "; } - return ss.str(); + return std::move(ss).str(); } String WalCmdDropColumns::ToString() const { @@ -1115,7 +1115,7 @@ String WalCmdDropColumns::ToString() const { for (auto &column_name : column_names_) { ss << column_name << " | "; } - return ss.str(); + return std::move(ss).str(); } String WalCmdCreateDatabase::CompactInfo() const { @@ -1230,7 +1230,7 @@ String WalCmdDumpIndex::CompactInfo() const { segment_id_, ss.str()); - return ss.str(); + return std::move(ss).str(); } String WalCmdRenameTable::CompactInfo() const { @@ -1388,7 +1388,7 @@ String WalEntry::ToString() const { ss << cmd->ToString(); } ss << "========================" << std::endl; - return ss.str(); + return std::move(ss).str(); } String WalEntry::CompactInfo() const { @@ -1401,7 +1401,7 @@ String WalEntry::CompactInfo() const { if (cmds_.size() > 0) { ss << cmds_.back()->CompactInfo(); } - return ss.str(); + return std::move(ss).str(); } String WalCmd::WalCommandTypeToString(WalCommandType type) { diff --git a/src/unit_test/storage/wal/catalog_delta_entry.cpp b/src/unit_test/storage/wal/catalog_delta_entry.cpp index 50b1689415..b7baaac2d3 100644 --- a/src/unit_test/storage/wal/catalog_delta_entry.cpp +++ b/src/unit_test/storage/wal/catalog_delta_entry.cpp @@ -375,6 +375,10 @@ TEST_P(CatalogDeltaEntryTest, MergeEntries) { local_catalog_delta_entry->operations().push_back(std::move(op)); } + for (auto &op : local_catalog_delta_entry->operations()) { + op->commit_ts_ = 1; + } + // merge global_catalog_delta_entry->ReplayDeltaEntry(std::move(local_catalog_delta_entry)); // check ops diff --git a/tools/parse_compact_log.py b/tools/parse_compact_log.py new file mode 100644 index 0000000000..0f124d1868 --- /dev/null +++ b/tools/parse_compact_log.py @@ -0,0 +1,127 @@ +import re + + +def parse_log1(log_file: str) -> tuple[list[int], list[int]]: + # line example: [12:56:58.986] [815716] [info] Compact commit: test_compact2, new segment: 1492, old segment: 1473 1474 1475 1476 + lines = [] + with open(log_file, "r") as f: + for line in f: + if "Compact commit: " in line: + lines.append(line) + remove_segs = set() + add_segs = set() + re_pattern = re.compile( + r".*Compact commit: (\w+), new segment: (\d+), old segment: (.*)" + ) + for line in lines: + m = re_pattern.match(line) + if not m: + print("No match") + + grp = m.groups() + table_name = grp[0] + new_segment = int(grp[1]) + old_segments = list(map(int, grp[2].split(" ")[:-1])) + # print(f"Compact {old_segments} -> {new_segment}") + + add_segs.add(new_segment) + for old_segment in old_segments: + if old_segment in remove_segs: + raise ValueError( + f"Old segment {old_segment} has already been compacted" + ) + if old_segment in add_segs: + add_segs.remove(old_segment) + remove_segs.add(old_segment) + + remove_segs = list(remove_segs) + add_segs = list(add_segs) + remove_segs.sort() + add_segs.sort() + return remove_segs, add_segs + + +def parse_log2(log_file) -> tuple[list[int], list[int]]: + # line example: [11:08:19.727] [55245] [info] Read delta op: AddSegmentEntryOp begin_ts: 8142, txn_id: 4281, commit_ts: 8143, merge_flag: 2, encode: #default_db#test_compact2#3978 min_row_ts: 8143 max_row_ts: 8143 first_delete_ts: 18446744073709551615 row_capacity: 8388608 row_count: 3 actual_row_count: 3 column_count: 2 + # line2 example: + # [COMPACT] + # Compact: + # db name: default_db + # table name: test_compact2 + # deprecated segment: 49328 | 49329 | 49330 | 49331 | + # new segment: segment_id: 49333, column_count: 2, row_count: 12, actual_row_count: 12, row_capacity: 8388608, block_info count: 1 + + lines = [] + lines2 = [] + with open(log_file, "r") as f: + for line in f: + if "AddSegmentEntryOp" in line: + lines.append(line) + elif "[COMPACT]" in line: + concat_line = line + for i in range(5): + try: + concat_line += next(f) + except StopIteration: + break + concat_line = concat_line.replace("\n", " ") + lines2.append(concat_line) + remove_segs2 = set() + add_segs2 = set() + re_pattern = re.compile(r".*AddSegmentEntryOp.*merge_flag: (\d+), encode: (\S+)") + for line in lines: + m = re_pattern.match(line) + if not m: + print("No match") + continue + + grp = m.groups() + merge_flag = int(grp[0]) + encode = grp[1] + segment_id = int(encode.split("#")[-1]) + if merge_flag == 2: + add_segs2.add(segment_id) + elif merge_flag == 1: + if segment_id in add_segs2: + add_segs2.remove(segment_id) + remove_segs2.add(segment_id) + + re_pattern2 = re.compile(r".*deprecated segment: ([\d\| ]*).*segment_id: (\d+)") + for line2 in lines2: + # print(line2) + m = re_pattern2.match(line2) + if not m: + print("No match") + continue + + grp = m.groups() + old_segments = list(map(int, grp[0].split("|")[:-1])) + new_segment = int(grp[1]) + add_segs2.add(new_segment) + for old_segment in old_segments: + if old_segment in remove_segs2: + raise ValueError( + f"Old segment {old_segment} has already been compacted" + ) + else: + if old_segment in add_segs2: + add_segs2.remove(old_segment) + remove_segs2.add(old_segment) + + remove_segs2 = list(remove_segs2) + add_segs2 = list(add_segs2) + remove_segs2.sort() + add_segs2.sort() + return remove_segs2, add_segs2 + + +if __name__ == "__main__": + log_file1 = "restart_test.log.37" + remove_segs, add_segs = parse_log1(log_file1) + print("remove_segs", remove_segs) + print("add_segs", add_segs) + + log_file2 = "restart_test.log.38" + remove_segs2, add_segs2 = parse_log2(log_file2) + print("remove_segs2", remove_segs2) + print("add_segs2", add_segs2)