diff --git a/include/pingcap/coprocessor/Client.h b/include/pingcap/coprocessor/Client.h index 945d681..ed85698 100644 --- a/include/pingcap/coprocessor/Client.h +++ b/include/pingcap/coprocessor/Client.h @@ -5,7 +5,6 @@ #include #include -#include #include #include #include diff --git a/include/pingcap/kv/Backoff.h b/include/pingcap/kv/Backoff.h index 4019767..19839df 100644 --- a/include/pingcap/kv/Backoff.h +++ b/include/pingcap/kv/Backoff.h @@ -98,6 +98,7 @@ constexpr int prewriteMaxBackoff = 20000; constexpr int commitMaxBackoff = 41000; constexpr int splitRegionBackoff = 20000; constexpr int cleanupMaxBackoff = 20000; +constexpr int bgResolveLockMaxBackoff = 20000; constexpr int copBuildTaskMaxBackoff = 5000; constexpr int copNextMaxBackoff = 60000; constexpr int pessimisticLockMaxBackoff = 20000; diff --git a/include/pingcap/kv/Cluster.h b/include/pingcap/kv/Cluster.h index ec2eac0..c23870f 100644 --- a/include/pingcap/kv/Cluster.h +++ b/include/pingcap/kv/Cluster.h @@ -67,6 +67,8 @@ struct Cluster mpp_prober->stop(); if (region_cache) region_cache->stop(); + if (lock_resolver) + lock_resolver->stopBgResolve(); thread_pool->stop(); } diff --git a/include/pingcap/kv/LockResolver.h b/include/pingcap/kv/LockResolver.h index 1376347..08177e9 100644 --- a/include/pingcap/kv/LockResolver.h +++ b/include/pingcap/kv/LockResolver.h @@ -5,9 +5,11 @@ #include #include +#include #include #include #include +#include namespace pingcap { @@ -23,22 +25,14 @@ struct TxnStatus ::kvrpcpb::Action action; std::optional<::kvrpcpb::LockInfo> primary_lock; bool isCommitted() const { return ttl == 0 && commit_ts > 0; } + bool isRollback() const + { + return ttl == 0 && (action == kvrpcpb::Action::NoAction || action == kvrpcpb::Action::TTLExpireRollback || action == kvrpcpb::Action::LockNotExistRollback); + } bool isCacheable() const { - if (isCommitted()) - { - return true; - } - if (ttl == 0) - { - if (action == kvrpcpb::Action::NoAction || action == kvrpcpb::Action::LockNotExistRollback - || action == kvrpcpb::Action::TTLExpireRollback) - { - return true; - } - } - return false; + return isCommitted() || isRollback(); } }; @@ -213,6 +207,10 @@ class LockResolver cluster = cluster_; } + void backgroundResolve(); + void addPendingLocksForBgResolve(uint64_t caller_start_ts, const std::vector & locks); + void stopBgResolve(); + // resolveLocks tries to resolve Locks. The resolving process is in 3 steps: // 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too // old are considered orphan locks and will be handled later. If all locks @@ -225,6 +223,9 @@ class LockResolver int64_t resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std::vector & locks, std::vector & pushed); + // tryGetBypassLock checks the status of the transactions which own the locks in `locks`, and collect the txn ids which can be bypassed + void tryGetBypassLock(Backoffer & bo, uint64_t caller_start_ts, const std::unordered_map> & locks, std::vector & bypass_lock_ts); + int64_t resolveLocks( Backoffer & bo, uint64_t caller_start_ts, @@ -296,6 +297,12 @@ class LockResolver std::unordered_map resolved; std::queue cached; + // fields for background resolve + std::mutex bg_mutex; + std::condition_variable bg_cv; + std::atomic stopped{false}; + std::vector>> pending_locks; + Logger * log; }; diff --git a/src/kv/Cluster.cc b/src/kv/Cluster.cc index 70b6c90..906b0d7 100644 --- a/src/kv/Cluster.cc +++ b/src/kv/Cluster.cc @@ -42,6 +42,12 @@ void Cluster::startBackgroundTasks() region_cache->updateCachePeriodically(); }); } + if (lock_resolver) + { + thread_pool->enqueue([this] { + lock_resolver->backgroundResolve(); + }); + } } } // namespace kv diff --git a/src/kv/LockResolver.cc b/src/kv/LockResolver.cc index 40dc6ef..e947d67 100644 --- a/src/kv/LockResolver.cc +++ b/src/kv/LockResolver.cc @@ -1,7 +1,13 @@ #include +#include #include #include +#include +#include +#include +#include + namespace pingcap { namespace kv @@ -19,6 +25,69 @@ int64_t LockResolver::resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std return resolveLocks(bo, caller_start_ts, locks, pushed, false); } +void LockResolver::tryGetBypassLock( + Backoffer & bo, + uint64_t caller_start_ts, + const std::unordered_map> & locks, + std::vector & bypass_lock_ts) +{ + try + { + bypass_lock_ts.reserve(locks.size()); + for (const auto & lock_entry : locks) + { + // should not happen, just for safety + if (lock_entry.second.empty()) + continue; + TxnStatus status; + try + { + status = getTxnStatusFromLock(bo, lock_entry.second[0], caller_start_ts, false); + } + catch (Exception & e) + { + log->warning("get txn status failed: " + e.displayText()); + // each txn is independent, so just continue to check other txns + continue; + } + + if (status.ttl == 0) + { + if ((status.primary_lock.has_value() && status.primary_lock->use_async_commit())) + { + // todo resolve async locks on the fly since the size of async locks are limited(less than 256), the resolve cost should be small + // once async locks is resolved, even if status.isCommmited() < caller_start_ts, it will not block tiflash's read + addPendingLocksForBgResolve(caller_start_ts, lock_entry.second); + continue; + } + if (status.isRollback() || (status.isCommitted() && status.commit_ts > caller_start_ts)) + { + // the lock can be bypassed if the txn is rolled back or committed after caller_start_ts + bypass_lock_ts.push_back(lock_entry.first); + } + if (status.isRollback() || status.isCommitted()) + { + // resolve lock in background threads if the status is determined + addPendingLocksForBgResolve(caller_start_ts, lock_entry.second); + } + } + else // status.ttl != 0 + { + if (status.action == ::kvrpcpb::MinCommitTSPushed) + { + // min_commit_ts is pushed, so the lock can be bypassed + bypass_lock_ts.push_back(lock_entry.first); + } + } + } + } + catch (...) + { + // tryGetBypassLock is just an optimization, should not throw any exception even fails + log->warning("tryGetBypassLock failed"); + } +} + int64_t LockResolver::resolveLocks( Backoffer & bo, uint64_t caller_start_ts, @@ -30,7 +99,6 @@ int64_t LockResolver::resolveLocks( if (locks.empty()) return before_txn_expired.value(); std::unordered_map> clean_txns; - bool push_fail = false; if (!for_write) { pushed.reserve(locks.size()); @@ -50,7 +118,6 @@ int64_t LockResolver::resolveLocks( { log->warning("get txn status failed: " + e.displayText()); before_txn_expired.update(0); - pushed.clear(); return before_txn_expired.value(); } @@ -97,7 +164,6 @@ int64_t LockResolver::resolveLocks( { log->warning("resolve txn failed: " + e.displayText()); before_txn_expired.update(0); - pushed.clear(); return before_txn_expired.value(); } } @@ -114,7 +180,6 @@ int64_t LockResolver::resolveLocks( if (lock->lock_type != ::kvrpcpb::PessimisticLock && lock->txn_id > caller_start_ts) { log->warning("write conflict detected"); - pushed.clear(); // TODO: throw write conflict exception throw Exception("write conflict", ErrorCodes::UnknownError); } @@ -123,7 +188,6 @@ int64_t LockResolver::resolveLocks( { if (status.action != ::kvrpcpb::MinCommitTSPushed) { - push_fail = true; break; } pushed.push_back(lock->txn_id); @@ -132,10 +196,6 @@ int64_t LockResolver::resolveLocks( break; } } - if (push_fail) - { - pushed.clear(); - } return before_txn_expired.value(); } @@ -536,6 +596,52 @@ TxnStatus LockResolver::getTxnStatusFromLock(Backoffer & bo, LockPtr lock, uint6 } } +void LockResolver::backgroundResolve() +{ + while (!stopped.load()) + { + std::vector>> to_resolve; + { + std::unique_lock lk(bg_mutex); + bg_cv.wait(lk, [this] { + return !pending_locks.empty() || stopped.load(); + }); + if (stopped.load()) + { + return; + } + pending_locks.swap(to_resolve); + } + + for (auto & lock_entry : to_resolve) + { + pingcap::kv::Backoffer bo(pingcap::kv::bgResolveLockMaxBackoff); + try + { + std::vector ignored; + resolveLocks(bo, lock_entry.first, lock_entry.second, ignored); + } + catch (...) + { + // ignore all errors, and do not retry. Let the next reader to trigger resolve again. + } + } + } +} + +void LockResolver::addPendingLocksForBgResolve(uint64_t caller_start_ts, const std::vector & locks) +{ + std::unique_lock lk(bg_mutex); + pending_locks.push_back({caller_start_ts, locks}); + bg_cv.notify_one(); +} + +void LockResolver::stopBgResolve() +{ + std::unique_lock lk(bg_mutex); + stopped.store(true); + bg_cv.notify_all(); +} } // namespace kv } // namespace pingcap