Skip to content

Commit

Permalink
Refactor transaction commit (#2468)
Browse files Browse the repository at this point in the history
### What problem does this PR solve?

- Add txn text in query context
- Update log

### Type of change

- [x] Refactoring

---------

Signed-off-by: Jin Hai <[email protected]>
  • Loading branch information
JinHai-CN authored Jan 17, 2025
1 parent c332e77 commit 197a243
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 11 deletions.
13 changes: 8 additions & 5 deletions src/main/query_context.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -422,13 +422,16 @@ QueryResult QueryContext::HandleAdminStatement(const AdminStatement *admin_state

void QueryContext::BeginTxn(const BaseStatement *base_statement) {
if (session_ptr_->GetTxn() == nullptr) {
bool is_checkpoint = base_statement != nullptr && base_statement->type_ == StatementType::kFlush;
Txn *new_txn = nullptr;
// TODO: more type check and setting
if (is_checkpoint) {
new_txn = storage_->txn_manager()->BeginTxn(nullptr, TransactionType::kCheckpoint);
if(base_statement == nullptr) {
new_txn = storage_->txn_manager()->BeginTxn(MakeUnique<String>(""), TransactionType::kNormal);
} else {
new_txn = storage_->txn_manager()->BeginTxn(nullptr, TransactionType::kNormal);
// TODO: more type check and setting
if (base_statement->type_ == StatementType::kFlush) {
new_txn = storage_->txn_manager()->BeginTxn(MakeUnique<String>(base_statement->ToString()), TransactionType::kCheckpoint);
} else {
new_txn = storage_->txn_manager()->BeginTxn(MakeUnique<String>(base_statement->ToString()), TransactionType::kNormal);
}
}
session_ptr_->SetTxn(new_txn);
}
Expand Down
2 changes: 1 addition & 1 deletion src/main/query_context.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,7 @@ public:

inline u64 GetNextNodeID() { return ++current_max_node_id_; }

void BeginTxn(const BaseStatement *statement = nullptr);
void BeginTxn(const BaseStatement *statement);

TxnTimeStamp CommitTxn();

Expand Down
5 changes: 5 additions & 0 deletions src/storage/txn/txn.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -540,6 +540,11 @@ TxnState Txn::GetTxnState() const {
return txn_context_ptr_->state_;
}

TransactionType Txn::GetTxnType() const {
std::shared_lock<std::shared_mutex> r_locker(rw_locker_);
return txn_context_ptr_->txn_type_;
}

bool Txn::IsWriteTransaction() const { return txn_context_ptr_->is_write_transaction_; }

void Txn::SetTxnRollbacking(TxnTimeStamp rollback_ts) {
Expand Down
2 changes: 2 additions & 0 deletions src/storage/txn/txn.cppm
Original file line number Diff line number Diff line change
Expand Up @@ -206,6 +206,8 @@ public:

TxnState GetTxnState() const;

TransactionType GetTxnType() const;

bool IsWriteTransaction() const;

void SetTxnCommitted();
Expand Down
5 changes: 2 additions & 3 deletions src/storage/txn/txn_manager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,7 @@ Txn *TxnManager::BeginTxn(UniquePtr<String> txn_text, TransactionType txn_type)
// Record the start ts of the txn
TxnTimeStamp begin_ts = current_ts_ + 1;

bool ckp_txn = txn_type == TransactionType::kCheckpoint;
if (ckp_txn) {
if (txn_type == TransactionType::kCheckpoint) {
if (ckp_begin_ts_ != UNCOMMIT_TS) {
// not set ckp_begin_ts_ may not truncate the wal file.
LOG_WARN(fmt::format("Another checkpoint txn is started in {}, new checkpoint {} will do nothing", ckp_begin_ts_, begin_ts));
Expand Down Expand Up @@ -395,7 +394,7 @@ void TxnManager::CleanupTxn(Txn *txn, bool commit) {
bool TxnManager::InCheckpointProcess(TxnTimeStamp commit_ts) {
std::lock_guard guard(locker_);
if (commit_ts > ckp_begin_ts_) {
LOG_TRACE(fmt::format("Full checkpoint begin in {}, cur txn commit_ts: {}, swap to new wal file", ckp_begin_ts_, commit_ts));
LOG_TRACE(fmt::format("Full/Delta checkpoint begin at {}, cur txn commit_ts: {}, swap to new wal file", ckp_begin_ts_, commit_ts));
ckp_begin_ts_ = UNCOMMIT_TS;
return true;
}
Expand Down
4 changes: 2 additions & 2 deletions src/unit_test/test_helper/sql_runner.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -80,14 +80,14 @@ SharedPtr<DataTable> SQLRunner::Run(const String &sql_text, bool print) {
UnrecoverableError(parsed_result->error_message_);
}

query_context_ptr->BeginTxn();

// LogicalPlanner logical_planner(query_context_ptr.get());
// Optimizer optimizer(query_context_ptr.get());
// PhysicalPlanner physical_planner(query_context_ptr.get());
// FragmentBuilder fragment_builder(query_context_ptr.get());
BaseStatement *statement = (*parsed_result->statements_ptr_)[0];

query_context_ptr->BeginTxn(statement);

SharedPtr<BindContext> bind_context;
query_context_ptr->logical_planner()->Build(statement, bind_context);
query_context_ptr->set_max_node_id(bind_context->GetNewLogicalNodeId());
Expand Down

0 comments on commit 197a243

Please sign in to comment.