Skip to content

Commit c332e77

Browse files
authored
Use txn but not WAL entry in WAL flush thread (#2469)
### What problem does this PR solve? Use txn but not WAL entry in WAL flush thread ### Type of change - [x] Refactoring Signed-off-by: Jin Hai <[email protected]>
1 parent 04e93d0 commit c332e77

File tree

4 files changed

+25
-27
lines changed

4 files changed

+25
-27
lines changed

src/storage/txn/txn_manager.cpp

+6-6
Original file line numberDiff line numberDiff line change
@@ -194,27 +194,27 @@ void TxnManager::SendToWAL(Txn *txn) {
194194

195195
std::lock_guard guard(locker_);
196196
if (wait_conflict_ck_.empty()) {
197-
String error_message = fmt::format("WalManager::PutEntry wait_conflict_ck_ is empty, txn->CommitTS() {}", txn->CommitTS());
197+
String error_message = fmt::format("TxnManager::SendToWAL wait_conflict_ck_ is empty, txn->CommitTS() {}", txn->CommitTS());
198198
UnrecoverableError(error_message);
199199
}
200200
if (wait_conflict_ck_.begin()->first > commit_ts) {
201-
String error_message = fmt::format("WalManager::PutEntry wait_conflict_ck_.begin()->first {} > txn->CommitTS() {}",
201+
String error_message = fmt::format("TxnManager::SendToWAL wait_conflict_ck_.begin()->first {} > txn->CommitTS() {}",
202202
wait_conflict_ck_.begin()->first,
203203
txn->CommitTS());
204204
UnrecoverableError(error_message);
205205
}
206206
if (wal_entry) {
207-
wait_conflict_ck_.at(commit_ts) = wal_entry;
207+
wait_conflict_ck_.at(commit_ts) = txn;
208208
} else {
209209
wait_conflict_ck_.erase(commit_ts); // rollback
210210
}
211211
if (!wait_conflict_ck_.empty() && wait_conflict_ck_.begin()->second != nullptr) {
212-
Vector<WalEntry *> wal_entries;
212+
Vector<Txn *> txn_array;
213213
do {
214-
wal_entries.push_back(wait_conflict_ck_.begin()->second);
214+
txn_array.push_back(wait_conflict_ck_.begin()->second);
215215
wait_conflict_ck_.erase(wait_conflict_ck_.begin());
216216
} while (!wait_conflict_ck_.empty() && wait_conflict_ck_.begin()->second != nullptr);
217-
wal_mgr_->PutEntries(wal_entries);
217+
wal_mgr_->SubmitTxn(txn_array);
218218
}
219219
}
220220

src/storage/txn/txn_manager.cppm

+1-1
Original file line numberDiff line numberDiff line change
@@ -123,7 +123,7 @@ private:
123123
Map<TxnTimeStamp, Txn *> committing_txns_; // the txns in committing stage
124124
Set<TxnTimeStamp> checking_ts_{}; // the begin ts of txn that is used to check conflict
125125

126-
Map<TxnTimeStamp, WalEntry *> wait_conflict_ck_{}; // sorted by commit ts
126+
Map<TxnTimeStamp, Txn *> wait_conflict_ck_{}; // sorted by commit ts
127127

128128
Atomic<TxnTimeStamp> current_ts_{}; // The next txn ts
129129
Atomic<TxnTimeStamp> max_committed_ts_{};

src/storage/wal/wal_manager.cpp

+16-16
Original file line numberDiff line numberDiff line change
@@ -150,13 +150,11 @@ void WalManager::Stop() {
150150
LOG_INFO("WAL manager is stopped.");
151151
}
152152

153-
// Session request to persist an entry. Assuming txn_id of the entry has
154-
// been initialized.
155-
void WalManager::PutEntries(Vector<WalEntry *> wal_entries) {
153+
void WalManager::SubmitTxn(Vector<Txn *> &txn_array) {
156154
if (!running_.load()) {
157155
return;
158156
}
159-
wait_flush_.EnqueueBulk(wal_entries);
157+
wait_flush_.EnqueueBulk(txn_array);
160158
}
161159

162160
TxnTimeStamp WalManager::LastCheckpointTS() const {
@@ -292,24 +290,27 @@ Vector<SharedPtr<String>> WalManager::GetDiffWalEntryString(TxnTimeStamp start_t
292290
void WalManager::Flush() {
293291
LOG_TRACE("WalManager::Flush log mainloop begin");
294292

295-
Deque<WalEntry *> log_batch{};
293+
Deque<Txn *> txn_batch{};
296294
TxnManager *txn_mgr = storage_->txn_manager();
297295
ClusterManager *cluster_manager = nullptr;
298296
while (running_.load()) {
299-
wait_flush_.DequeueBulk(log_batch);
300-
if (log_batch.empty()) {
297+
wait_flush_.DequeueBulk(txn_batch);
298+
if (txn_batch.empty()) {
301299
LOG_WARN("WalManager::Dequeue empty batch logs");
302300
continue;
303301
}
304302

305-
for (const auto &entry : log_batch) {
306-
// Empty WalEntry (read-only transactions) shouldn't go into WalManager.
307-
if (entry == nullptr) {
303+
for (const auto &txn : txn_batch) {
304+
if (txn == nullptr) {
308305
// terminate entry
306+
LOG_INFO("Terminate WAL Manager flush thread");
309307
running_ = false;
310308
break;
311309
}
312310

311+
WalEntry *entry = txn->GetWALEntry();
312+
// Empty WalEntry (read-only transactions) shouldn't go into WalManager.
313+
313314
if (entry->cmds_.empty()) {
314315
continue;
315316
// UnrecoverableError(fmt::format("WalEntry of txn_id {} commands is empty", entry->txn_id_));
@@ -371,13 +372,12 @@ void WalManager::Flush() {
371372
cluster_manager->SyncLogs();
372373
}
373374

374-
for (const auto &entry : log_batch) {
375-
Txn *txn = txn_mgr->GetTxn(entry->txn_id_);
376-
if (txn != nullptr) {
377-
txn->CommitBottom();
378-
}
375+
// Commit bottom
376+
for (const auto &txn : txn_batch) {
377+
txn->CommitBottom();
378+
// TODO: if txn is checkpoint, swap WAL file
379379
}
380-
log_batch.clear();
380+
txn_batch.clear();
381381

382382
// Check if the wal file is too large, swap to a new one.
383383
try {

src/storage/wal/wal_manager.cppm

+2-4
Original file line numberDiff line numberDiff line change
@@ -79,9 +79,7 @@ public:
7979

8080
void Stop();
8181

82-
// Session request to persist an entry. Assuming txn_id of the entry has
83-
// been initialized.
84-
void PutEntries(Vector<WalEntry *> wal_entries);
82+
void SubmitTxn(Vector<Txn *> &txn_batch);
8583

8684
// Flush is scheduled regularly. It collects a batch of transactions, sync
8785
// wal and do parallel committing. Each sync cost ~1s. Each checkpoint cost
@@ -173,7 +171,7 @@ private:
173171
Thread flush_thread_{};
174172

175173
// TxnManager and Flush thread access following members
176-
BlockingQueue<WalEntry *> wait_flush_{"WalManager"};
174+
BlockingQueue<Txn *> wait_flush_{"WalManager"};
177175

178176
// Only Flush thread access following members
179177
std::ofstream ofs_{};

0 commit comments

Comments
 (0)