Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 0 additions & 1 deletion include/pingcap/coprocessor/Client.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@
#include <pingcap/kv/RegionClient.h>

#include <atomic>
#include <condition_variable>
#include <cstdint>
#include <mutex>
#include <thread>
Expand Down
1 change: 1 addition & 0 deletions include/pingcap/kv/Backoff.h
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ constexpr int prewriteMaxBackoff = 20000;
constexpr int commitMaxBackoff = 41000;
constexpr int splitRegionBackoff = 20000;
constexpr int cleanupMaxBackoff = 20000;
constexpr int bgResolveLockMaxBackoff = 20000;
constexpr int copBuildTaskMaxBackoff = 5000;
constexpr int copNextMaxBackoff = 60000;
constexpr int pessimisticLockMaxBackoff = 20000;
Expand Down
2 changes: 2 additions & 0 deletions include/pingcap/kv/Cluster.h
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,8 @@ struct Cluster
mpp_prober->stop();
if (region_cache)
region_cache->stop();
if (lock_resolver)
lock_resolver->stopBgResolve();
thread_pool->stop();
}

Expand Down
33 changes: 20 additions & 13 deletions include/pingcap/kv/LockResolver.h
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@
#include <pingcap/Log.h>
#include <pingcap/kv/RegionCache.h>

#include <condition_variable>
#include <optional>
#include <queue>
#include <string>
#include <unordered_map>

namespace pingcap
{
Expand All @@ -23,22 +25,14 @@ struct TxnStatus
::kvrpcpb::Action action;
std::optional<::kvrpcpb::LockInfo> primary_lock;
bool isCommitted() const { return ttl == 0 && commit_ts > 0; }
bool isRollback() const
{
return ttl == 0 && (action == kvrpcpb::Action::NoAction || action == kvrpcpb::Action::TTLExpireRollback || action == kvrpcpb::Action::LockNotExistRollback);
}

bool isCacheable() const
{
if (isCommitted())
{
return true;
}
if (ttl == 0)
{
if (action == kvrpcpb::Action::NoAction || action == kvrpcpb::Action::LockNotExistRollback
|| action == kvrpcpb::Action::TTLExpireRollback)
{
return true;
}
}
return false;
return isCommitted() || isRollback();
}
};

Expand Down Expand Up @@ -213,6 +207,10 @@ class LockResolver
cluster = cluster_;
}

void backgroundResolve();
void addPendingLocksForBgResolve(uint64_t caller_start_ts, const std::vector<LockPtr> & locks);
void stopBgResolve();

// resolveLocks tries to resolve Locks. The resolving process is in 3 steps:
// 1) Use the `lockTTL` to pick up all expired locks. Only locks that are too
// old are considered orphan locks and will be handled later. If all locks
Expand All @@ -225,6 +223,9 @@ class LockResolver

int64_t resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std::vector<LockPtr> & locks, std::vector<uint64_t> & pushed);

// tryGetBypassLock checks the status of the transactions which own the locks in `locks`, and collect the txn ids which can be bypassed
void tryGetBypassLock(Backoffer & bo, uint64_t caller_start_ts, const std::unordered_map<uint64_t, std::vector<LockPtr>> & locks, std::vector<uint64_t> & bypass_lock_ts);

int64_t resolveLocks(
Backoffer & bo,
uint64_t caller_start_ts,
Expand Down Expand Up @@ -296,6 +297,12 @@ class LockResolver
std::unordered_map<int64_t, TxnStatus> resolved;
std::queue<int64_t> cached;

// fields for background resolve
std::mutex bg_mutex;
std::condition_variable bg_cv;
std::atomic<bool> stopped{false};
std::vector<std::pair<uint64_t, std::vector<LockPtr>>> pending_locks;

Logger * log;
};

Expand Down
6 changes: 6 additions & 0 deletions src/kv/Cluster.cc
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,12 @@ void Cluster::startBackgroundTasks()
region_cache->updateCachePeriodically();
});
}
if (lock_resolver)
{
thread_pool->enqueue([this] {
lock_resolver->backgroundResolve();
});
}
}

} // namespace kv
Expand Down
124 changes: 115 additions & 9 deletions src/kv/LockResolver.cc
Original file line number Diff line number Diff line change
@@ -1,7 +1,13 @@
#include <pingcap/RedactHelpers.h>
#include <pingcap/kv/Backoff.h>
#include <pingcap/kv/LockResolver.h>
#include <pingcap/kv/RegionClient.h>

#include <cstdint>
#include <mutex>
#include <unordered_map>
#include <vector>

namespace pingcap
{
namespace kv
Expand All @@ -19,6 +25,69 @@ int64_t LockResolver::resolveLocks(Backoffer & bo, uint64_t caller_start_ts, std
return resolveLocks(bo, caller_start_ts, locks, pushed, false);
}

void LockResolver::tryGetBypassLock(
Backoffer & bo,
uint64_t caller_start_ts,
const std::unordered_map<uint64_t, std::vector<LockPtr>> & locks,
std::vector<uint64_t> & bypass_lock_ts)
{
try
{
bypass_lock_ts.reserve(locks.size());
for (const auto & lock_entry : locks)
{
// should not happen, just for safety
if (lock_entry.second.empty())
continue;
TxnStatus status;
try
{
status = getTxnStatusFromLock(bo, lock_entry.second[0], caller_start_ts, false);
}
catch (Exception & e)
{
log->warning("get txn status failed: " + e.displayText());
// each txn is independent, so just continue to check other txns
continue;
}

if (status.ttl == 0)
{
if ((status.primary_lock.has_value() && status.primary_lock->use_async_commit()))
{
// todo resolve async locks on the fly since the size of async locks are limited(less than 256), the resolve cost should be small
// once async locks is resolved, even if status.isCommmited() < caller_start_ts, it will not block tiflash's read
addPendingLocksForBgResolve(caller_start_ts, lock_entry.second);
continue;
}
if (status.isRollback() || (status.isCommitted() && status.commit_ts > caller_start_ts))
{
// the lock can be bypassed if the txn is rolled back or committed after caller_start_ts
bypass_lock_ts.push_back(lock_entry.first);
}
if (status.isRollback() || status.isCommitted())
{
// resolve lock in background threads if the status is determined
addPendingLocksForBgResolve(caller_start_ts, lock_entry.second);
}
}
else // status.ttl != 0
{
if (status.action == ::kvrpcpb::MinCommitTSPushed)
{
// min_commit_ts is pushed, so the lock can be bypassed
bypass_lock_ts.push_back(lock_entry.first);
}
}
}
}
catch (...)
{
// tryGetBypassLock is just an optimization, should not throw any exception even fails
log->warning("tryGetBypassLock failed");
}
}

int64_t LockResolver::resolveLocks(
Backoffer & bo,
uint64_t caller_start_ts,
Expand All @@ -30,7 +99,6 @@ int64_t LockResolver::resolveLocks(
if (locks.empty())
return before_txn_expired.value();
std::unordered_map<uint64_t, std::unordered_set<RegionVerID>> clean_txns;
bool push_fail = false;
if (!for_write)
{
pushed.reserve(locks.size());
Expand All @@ -50,7 +118,6 @@ int64_t LockResolver::resolveLocks(
{
log->warning("get txn status failed: " + e.displayText());
before_txn_expired.update(0);
pushed.clear();
return before_txn_expired.value();
}

Expand Down Expand Up @@ -97,7 +164,6 @@ int64_t LockResolver::resolveLocks(
{
log->warning("resolve txn failed: " + e.displayText());
before_txn_expired.update(0);
pushed.clear();
return before_txn_expired.value();
}
}
Expand All @@ -114,7 +180,6 @@ int64_t LockResolver::resolveLocks(
if (lock->lock_type != ::kvrpcpb::PessimisticLock && lock->txn_id > caller_start_ts)
{
log->warning("write conflict detected");
pushed.clear();
// TODO: throw write conflict exception
throw Exception("write conflict", ErrorCodes::UnknownError);
}
Expand All @@ -123,7 +188,6 @@ int64_t LockResolver::resolveLocks(
{
if (status.action != ::kvrpcpb::MinCommitTSPushed)
{
push_fail = true;
break;
}
pushed.push_back(lock->txn_id);
Expand All @@ -132,10 +196,6 @@ int64_t LockResolver::resolveLocks(
break;
}
}
if (push_fail)
{
pushed.clear();
}
return before_txn_expired.value();
}

Expand Down Expand Up @@ -536,6 +596,52 @@ TxnStatus LockResolver::getTxnStatusFromLock(Backoffer & bo, LockPtr lock, uint6
}
}

void LockResolver::backgroundResolve()
{
while (!stopped.load())
{
std::vector<std::pair<uint64_t, std::vector<LockPtr>>> to_resolve;
{
std::unique_lock lk(bg_mutex);
bg_cv.wait(lk, [this] {
return !pending_locks.empty() || stopped.load();
});
if (stopped.load())
{
return;
}
pending_locks.swap(to_resolve);
}

for (auto & lock_entry : to_resolve)
{
pingcap::kv::Backoffer bo(pingcap::kv::bgResolveLockMaxBackoff);
try
{
std::vector<uint64_t> ignored;
resolveLocks(bo, lock_entry.first, lock_entry.second, ignored);
}
catch (...)
{
// ignore all errors, and do not retry. Let the next reader to trigger resolve again.
}
}
}
}

void LockResolver::addPendingLocksForBgResolve(uint64_t caller_start_ts, const std::vector<LockPtr> & locks)
{
std::unique_lock lk(bg_mutex);
pending_locks.push_back({caller_start_ts, locks});
bg_cv.notify_one();
}

void LockResolver::stopBgResolve()
{
std::unique_lock lk(bg_mutex);
stopped.store(true);
bg_cv.notify_all();
}

} // namespace kv
} // namespace pingcap