Skip to content

Commit

Permalink
[#25195] docdb: support release advisory lock
Browse files Browse the repository at this point in the history
Summary:
This diff implements explicitly unlock advisory locks with WriteRpc:
1. Unlock all locks held by a transaction: traverse transaction reverse index and delete each of them from intentsdb.
2. Unlock a specific lock (lock key and lock mode has to be specified) held by a transaction: traverse transaction reverse index and delete the first one that matches the specified lock key and lock mode.
Jira: DB-14380

Test Plan: advisory_lock-test

Reviewers: hsunder, bkolagani

Reviewed By: hsunder, bkolagani

Subscribers: ybase, yql

Differential Revision: https://phorge.dev.yugabyte.com/D40565
  • Loading branch information
Huqicheng committed Dec 12, 2024
1 parent 82d16a8 commit 5449d39
Show file tree
Hide file tree
Showing 22 changed files with 384 additions and 65 deletions.
98 changes: 98 additions & 0 deletions src/yb/client/advisory_lock-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -320,6 +320,104 @@ TEST_F(AdvisoryLockTest, LeaderChange) {
CheckNumIntents(cluster_.get(), 3, table_->id());
}

TEST_F(AdvisoryLockTest, UnlockAllAdvisoryLocks) {
auto session = NewSession();

// TODO(advisory-lock #24079): This transaction should be a virtual transaction.
auto txn = ASSERT_RESULT(StartTransaction());
session->SetTransaction(txn);
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
// There would be 1 for txn metadata entry.
// Each lock will have 1 txn reverse index + 1 primary intent.
CheckNumIntents(cluster_.get(), 7, table_->id());

// Rlease all locks.
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateUnlockAllOp(
kDBOid, sidecars_.get()))));
// Should be just txn metadata left unremoved.
CheckNumIntents(cluster_.get(), 1, table_->id());

// Ensure all locks are actually released so that concurrent lock request won't be blocked.
auto session2 = NewSession();
auto txn2 = ASSERT_RESULT(StartTransaction());
session2->SetTransaction(txn2);
ASSERT_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 4, table_->id());

ASSERT_OK(Commit(txn));
ASSERT_OK(Commit(txn2));
}

TEST_F(AdvisoryLockTest, Unlock) {
auto session = NewSession();

// TODO(advisory-lock #24079): This transaction should be a virtual transaction.
auto txn = ASSERT_RESULT(StartTransaction());
session->SetTransaction(txn);

ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
// There would be 1 for txn metadata entry.
// Each lock will have 1 txn reverse index + 1 primary intent.
CheckNumIntents(cluster_.get(), 7, table_->id());

// Releasing a non-existing share lock should fail.
ASSERT_TRUE(IsStatusSkipLocking(
session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateUnlockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_SHARE, sidecars_.get())))));

std::atomic_bool session2_locked{false};
auto session2 = NewSession();
TestThreadHolder thread_holder;
thread_holder.AddThreadFunctor([session2, this, &session2_locked] {
auto txn2 = ASSERT_RESULT(StartTransaction());
session2->SetTransaction(txn2);
CHECK_OK(session2->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateLockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE,
/* wait= */ true, sidecars_.get()))));
session2_locked.store(true);
});

ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateUnlockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 5, table_->id());
SleepFor(1s);
ASSERT_FALSE(session2_locked.load());
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateUnlockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 3, table_->id());
SleepFor(1s);
ASSERT_FALSE(session2_locked.load());
ASSERT_OK(session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateUnlockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, sidecars_.get()))));
CheckNumIntents(cluster_.get(), 1, table_->id());
thread_holder.JoinAll();
ASSERT_TRUE(session2_locked.load());

// All locks have been released. Any unlock requests should fail.
ASSERT_TRUE(IsStatusSkipLocking(
session->TEST_ApplyAndFlush(ASSERT_RESULT(advisory_locks_table_->CreateUnlockOp(
kDBOid, 0, 0, 1, PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE, sidecars_.get())))));
ASSERT_OK(Commit(txn));
}

class AdvisoryLocksDisabledTest : public AdvisoryLockTest {
protected:
void SetFlags() override {
Expand Down
6 changes: 4 additions & 2 deletions src/yb/client/async_rpc.cc
Original file line number Diff line number Diff line change
Expand Up @@ -585,12 +585,14 @@ WriteRpc::WriteRpc(const AsyncRpcData& data)
: AsyncRpcBase(data, YBConsistencyLevel::STRONG) {
TRACE_TO(trace_, "WriteRpc initiated");
VTRACE_TO(1, trace_, "Tablet $0 table $1", data.tablet->tablet_id(), table()->name().ToString());
const auto op_group = data.ops.front().yb_op->group();

if (data.ops.front().yb_op->group() == OpGroup::kLock) {
if (op_group == OpGroup::kLock || op_group == OpGroup::kUnlock) {
FillOps<YBPgsqlLockOp>(
ops_, YBOperation::Type::PGSQL_LOCK, &req_, req_.mutable_pgsql_lock_batch());
// Set wait policy for non-blocking lock requests.
if (!down_cast<YBPgsqlLockOp*>(data.ops.front().yb_op.get())->mutable_request()->wait()) {
if (op_group == OpGroup::kLock &&
!down_cast<YBPgsqlLockOp*>(data.ops.front().yb_op.get())->mutable_request()->wait()) {
req_.mutable_write_batch()->set_wait_policy(WAIT_SKIP);
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion src/yb/client/batcher.cc
Original file line number Diff line number Diff line change
Expand Up @@ -657,7 +657,8 @@ std::shared_ptr<AsyncRpc> Batcher::CreateRpc(

switch (op_group) {
case OpGroup::kWrite: FALLTHROUGH_INTENDED;
case OpGroup::kLock:
case OpGroup::kLock: FALLTHROUGH_INTENDED;
case OpGroup::kUnlock:
return std::make_shared<WriteRpc>(data);
case OpGroup::kLeaderRead:
return std::make_shared<ReadRpc>(data, YBConsistencyLevel::STRONG);
Expand Down
12 changes: 10 additions & 2 deletions src/yb/client/yb_op.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1055,6 +1055,15 @@ YBPgsqlLockOpPtr YBPgsqlLockOp::NewLock(
return op;
}

YBPgsqlLockOpPtr YBPgsqlLockOp::NewUnlock(
const std::shared_ptr<YBTable>& table, rpc::Sidecars* sidecars) {
auto op = std::make_shared<YBPgsqlLockOp>(table, sidecars);
auto* req = op->mutable_request();
req->set_is_lock(false);
req->set_client(YQL_CLIENT_PGSQL);
return op;
}

bool YBPgsqlLockOp::succeeded() const {
return response().status() == PgsqlResponsePB::PGSQL_STATUS_OK;
}
Expand All @@ -1064,8 +1073,7 @@ bool YBPgsqlLockOp::applied() {
}

OpGroup YBPgsqlLockOp::group() {
// TODO(advisory-lock #25195): We should use a different group for locks and unlocks.
return OpGroup::kLock;
return request_->is_lock() ? OpGroup::kLock : OpGroup::kUnlock;
}

std::string YBPgsqlLockOp::ToString() const {
Expand Down
4 changes: 3 additions & 1 deletion src/yb/client/yb_op.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,7 @@ class YBSession;
class YBStatusCallback;
class YBTable;

YB_DEFINE_ENUM(OpGroup, (kWrite)(kLock)(kLeaderRead)(kConsistentPrefixRead));
YB_DEFINE_ENUM(OpGroup, (kWrite)(kLock)(kUnlock)(kLeaderRead)(kConsistentPrefixRead));

// A write or read operation operates on a single table and partial row.
// The YBOperation class itself allows the batcher to get to the
Expand Down Expand Up @@ -593,6 +593,8 @@ class YBPgsqlLockOp : public YBPgsqlOp {
PgsqlLockRequestPB* mutable_request() { return request_; }

static YBPgsqlLockOpPtr NewLock(const std::shared_ptr<YBTable>& table, rpc::Sidecars* sidecars);
static YBPgsqlLockOpPtr NewUnlock(
const std::shared_ptr<YBTable>& table, rpc::Sidecars* sidecars);

private:
virtual Type type() const override { return PGSQL_LOCK; }
Expand Down
20 changes: 12 additions & 8 deletions src/yb/docdb/doc_write_batch.cc
Original file line number Diff line number Diff line change
Expand Up @@ -871,13 +871,7 @@ void DocWriteBatch::Clear() {
cache_.Clear();
}

// TODO(lw_uc) allocate entries on the same arena, then just reference them.
void DocWriteBatch::MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const {
for (auto& entry : put_batch_) {
auto* kv_pair = kv_pb->add_write_pairs();
kv_pair->dup_key(entry.key);
kv_pair->dup_value(entry.value);
}
void DocWriteBatch::MoveLocksToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb, bool is_lock) const {
for (const auto& entry : lock_batch_) {
auto* lock_pair = kv_pb->add_lock_pairs();
lock_pair->mutable_lock()->dup_key(entry.lock.key);
Expand All @@ -886,8 +880,18 @@ void DocWriteBatch::MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const {
entry.mode == PgsqlLockRequestPB::PG_LOCK_EXCLUSIVE
? dockv::DocdbLockMode::DOCDB_LOCK_EXCLUSIVE
: dockv::DocdbLockMode::DOCDB_LOCK_SHARE);
lock_pair->set_is_lock(true);
lock_pair->set_is_lock(is_lock);
}
}

// TODO(lw_uc) allocate entries on the same arena, then just reference them.
void DocWriteBatch::MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const {
for (auto& entry : put_batch_) {
auto* kv_pair = kv_pb->add_write_pairs();
kv_pair->dup_key(entry.key);
kv_pair->dup_value(entry.value);
}
MoveLocksToWriteBatchPB(kv_pb, /* is_lock= */ true);
if (has_ttl()) {
kv_pb->set_ttl(ttl_ns());
}
Expand Down
1 change: 1 addition & 0 deletions src/yb/docdb/doc_write_batch.h
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,7 @@ class DocWriteBatch {
return put_batch_;
}

void MoveLocksToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb, bool is_lock) const;
void MoveToWriteBatchPB(LWKeyValueWriteBatchPB *kv_pb) const;

// This is used in tests when measuring the number of seeks that a given update to this batch
Expand Down
3 changes: 2 additions & 1 deletion src/yb/docdb/docdb_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,8 @@ Status DocDBRocksDBUtil::PopulateRocksDBWriteBatch(
dwb.MoveToWriteBatchPB(&kv_write_batch);
TransactionalWriter writer(
kv_write_batch, hybrid_time, *current_txn_id_, txn_isolation_level_,
partial_range_key_intents, /* replicated_batches_state= */ Slice(), intra_txn_write_id_);
partial_range_key_intents, /* replicated_batches_state= */ Slice(), intra_txn_write_id_,
/* applier= */ nullptr);
DirectWriteToWriteBatchHandler handler(rocksdb_write_batch);
RETURN_NOT_OK(writer.Apply(&handler));
intra_txn_write_id_ = writer.intra_txn_write_id();
Expand Down
37 changes: 37 additions & 0 deletions src/yb/docdb/pgsql_operation.cc
Original file line number Diff line number Diff line change
Expand Up @@ -30,11 +30,13 @@
#include "yb/common/ql_value.h"
#include "yb/common/row_mark.h"

#include "yb/common/transaction_error.h"
#include "yb/docdb/doc_pg_expr.h"
#include "yb/docdb/doc_pgsql_scanspec.h"
#include "yb/docdb/doc_read_context.h"
#include "yb/docdb/doc_rowwise_iterator.h"
#include "yb/docdb/doc_write_batch.h"
#include "yb/docdb/docdb.h"
#include "yb/docdb/docdb.messages.h"
#include "yb/docdb/docdb_debug.h"
#include "yb/docdb/docdb_pgapi.h"
Expand Down Expand Up @@ -2596,7 +2598,42 @@ void PgsqlLockOperation::ClearResponse() {
}
}

Result<bool> PgsqlLockOperation::LockExists(const DocOperationApplyData& data) {
dockv::KeyBytes advisory_lock_key(encoded_doc_key_.as_slice());
advisory_lock_key.AppendKeyEntryType(dockv::KeyEntryType::kIntentTypeSet);
advisory_lock_key.AppendIntentTypeSet(GetIntentTypes(IsolationLevel::NON_TRANSACTIONAL));
dockv::KeyBytes txn_reverse_index_prefix;
AppendTransactionKeyPrefix(txn_op_context_.transaction_id, &txn_reverse_index_prefix);
txn_reverse_index_prefix.AppendKeyEntryType(dockv::KeyEntryType::kMaxByte);
auto reverse_index_upperbound = txn_reverse_index_prefix.AsSlice();
auto iter = CreateRocksDBIterator(
data.doc_write_batch->doc_db().intents, &KeyBounds::kNoBounds,
BloomFilterMode::DONT_USE_BLOOM_FILTER, boost::none,
rocksdb::kDefaultQueryId, nullptr, &reverse_index_upperbound,
rocksdb::CacheRestartBlockKeys::kFalse);
Slice key_prefix = txn_reverse_index_prefix.AsSlice();
key_prefix.remove_suffix(1);
iter.Seek(key_prefix);
bool found = false;
while (iter.Valid()) {
if (!iter.key().starts_with(key_prefix)) {
break;
}
if (iter.value().starts_with(advisory_lock_key.AsSlice())) {
found = true;
break;
}
iter.Next();
}
RETURN_NOT_OK(iter.status());
return found;
}

Status PgsqlLockOperation::Apply(const DocOperationApplyData& data) {
if (!request_.is_lock() && !VERIFY_RESULT(LockExists(data))) {
return STATUS_EC_FORMAT(InternalError, TransactionError(TransactionErrorCode::kSkipLocking),
"Try to release non-existing lock $0", doc_key_.ToString());
}
Slice value(&(dockv::ValueEntryTypeAsChar::kRowLock), 1);
auto& entry = data.doc_write_batch->AddLock();
entry.lock.key = encoded_doc_key_.as_slice();
Expand Down
2 changes: 2 additions & 0 deletions src/yb/docdb/pgsql_operation.h
Original file line number Diff line number Diff line change
Expand Up @@ -268,6 +268,8 @@ class PgsqlLockOperation :
private:
void ClearResponse() override;

Result<bool> LockExists(const DocOperationApplyData& data);

const TransactionOperationContext txn_op_context_;

// Input arguments.
Expand Down
Loading

0 comments on commit 5449d39

Please sign in to comment.