From a4a8f7f86e9c4a191d8a3b2f4c89731e6fbabc52 Mon Sep 17 00:00:00 2001 From: Basava Date: Tue, 12 Nov 2024 16:09:55 +0800 Subject: [PATCH] [#25312] DocDB: Table locks : Refactor lock Acquire/Release at master to MasterDdl Service Summary: As per our discussion, trying to move the master side acquire locks rpc from TabletService to a master specific one. Jira: DB-14516 Upgrade/Downgrade safety: New proto definitions introduced. Upgrade safe. Should be safe to downgrade if table/object locks is not used. Test Plan: yb_build.sh --cxx-test object_lock-test Reviewers: bkolagani, zdrudi Reviewed By: zdrudi Subscribers: yql, ybase Differential Revision: https://phorge.dev.yugabyte.com/D37266 --- src/yb/client/client-internal.cc | 8 +- src/yb/client/client.cc | 37 +++++ src/yb/client/client.h | 5 + src/yb/common/common_flags.cc | 5 + src/yb/common/common_flags.h | 1 + src/yb/integration-tests/object_lock-test.cc | 152 ++++++++++++------- src/yb/master/catalog_manager.cc | 8 +- src/yb/master/catalog_manager.h | 8 +- src/yb/master/master.h | 4 +- src/yb/master/master_ddl.proto | 36 +++++ src/yb/master/master_ddl_service.cc | 12 ++ src/yb/master/master_tablet_service.cc | 30 +--- src/yb/master/master_tserver.cc | 4 + src/yb/master/master_tserver.h | 4 +- src/yb/master/object_lock_info_manager.cc | 127 ++++++++++++---- src/yb/master/object_lock_info_manager.h | 9 +- src/yb/tserver/tablet_server.cc | 4 +- src/yb/tserver/tablet_server.h | 2 +- src/yb/tserver/tablet_server_interface.h | 4 +- 19 files changed, 329 insertions(+), 131 deletions(-) diff --git a/src/yb/client/client-internal.cc b/src/yb/client/client-internal.cc index de702396f357..ea277a5fcc72 100644 --- a/src/yb/client/client-internal.cc +++ b/src/yb/client/client-internal.cc @@ -232,8 +232,10 @@ Status YBClient::Data::SyncLeaderMasterRpc( // These are not actually exposed outside, but it's nice to auto-add using directive. YB_CLIENT_SPECIALIZE_SIMPLE(AlterNamespace); YB_CLIENT_SPECIALIZE_SIMPLE(AlterTable); +YB_CLIENT_SPECIALIZE_SIMPLE(AcquireObjectLocksGlobal); YB_CLIENT_SPECIALIZE_SIMPLE(BackfillIndex); YB_CLIENT_SPECIALIZE_SIMPLE(ChangeMasterClusterConfig); +YB_CLIENT_SPECIALIZE_SIMPLE(CheckIfPitrActive); YB_CLIENT_SPECIALIZE_SIMPLE(CreateNamespace); YB_CLIENT_SPECIALIZE_SIMPLE(CreateTable); YB_CLIENT_SPECIALIZE_SIMPLE(CreateTablegroup); @@ -246,6 +248,7 @@ YB_CLIENT_SPECIALIZE_SIMPLE(FlushTables); YB_CLIENT_SPECIALIZE_SIMPLE(GetBackfillStatus); YB_CLIENT_SPECIALIZE_SIMPLE(GetTablegroupSchema); YB_CLIENT_SPECIALIZE_SIMPLE(GetColocatedTabletSchema); +YB_CLIENT_SPECIALIZE_SIMPLE(GetCompactionStatus); YB_CLIENT_SPECIALIZE_SIMPLE(GetMasterClusterConfig); YB_CLIENT_SPECIALIZE_SIMPLE(GetNamespaceInfo); YB_CLIENT_SPECIALIZE_SIMPLE(GetTableSchema); @@ -256,16 +259,15 @@ YB_CLIENT_SPECIALIZE_SIMPLE(IsCreateTableDone); YB_CLIENT_SPECIALIZE_SIMPLE(IsDeleteNamespaceDone); YB_CLIENT_SPECIALIZE_SIMPLE(IsDeleteTableDone); YB_CLIENT_SPECIALIZE_SIMPLE(IsFlushTablesDone); -YB_CLIENT_SPECIALIZE_SIMPLE(GetCompactionStatus); YB_CLIENT_SPECIALIZE_SIMPLE(IsTruncateTableDone); YB_CLIENT_SPECIALIZE_SIMPLE(ListClones); YB_CLIENT_SPECIALIZE_SIMPLE(ListNamespaces); YB_CLIENT_SPECIALIZE_SIMPLE(ListTablegroups); YB_CLIENT_SPECIALIZE_SIMPLE(ListTables); YB_CLIENT_SPECIALIZE_SIMPLE(ListUDTypes); +YB_CLIENT_SPECIALIZE_SIMPLE(ReleaseObjectLocksGlobal); YB_CLIENT_SPECIALIZE_SIMPLE(TruncateTable); YB_CLIENT_SPECIALIZE_SIMPLE(ValidateReplicationInfo); -YB_CLIENT_SPECIALIZE_SIMPLE(CheckIfPitrActive); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Encryption, GetFullUniverseKeyRegistry); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Admin, AddTransactionStatusTablet); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Admin, AreNodesSafeToTakeDown); @@ -284,6 +286,8 @@ YB_CLIENT_SPECIALIZE_SIMPLE_EX(Client, RedisConfigGet); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Client, RedisConfigSet); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Client, ReservePgsqlOids); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Client, GetStatefulServiceLocation); +YB_CLIENT_SPECIALIZE_SIMPLE_EX(Client, AcquireObjectLocksGlobal); +YB_CLIENT_SPECIALIZE_SIMPLE_EX(Client, ReleaseObjectLocksGlobal); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Cluster, GetAutoFlagsConfig); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Cluster, ValidateAutoFlagsConfig); YB_CLIENT_SPECIALIZE_SIMPLE_EX(Cluster, IsLoadBalanced); diff --git a/src/yb/client/client.cc b/src/yb/client/client.cc index 0e2f4f8ea9c0..5d6a972d49d7 100644 --- a/src/yb/client/client.cc +++ b/src/yb/client/client.cc @@ -131,6 +131,8 @@ using google::protobuf::RepeatedPtrField; using std::make_pair; using std::string; using std::vector; +using yb::master::AcquireObjectLocksGlobalRequestPB; +using yb::master::AcquireObjectLocksGlobalResponsePB; using yb::master::AddTransactionStatusTabletRequestPB; using yb::master::AddTransactionStatusTabletResponsePB; using yb::master::AlterRoleRequestPB; @@ -219,6 +221,8 @@ using yb::master::RedisConfigGetRequestPB; using yb::master::RedisConfigGetResponsePB; using yb::master::RedisConfigSetRequestPB; using yb::master::RedisConfigSetResponsePB; +using yb::master::ReleaseObjectLocksGlobalRequestPB; +using yb::master::ReleaseObjectLocksGlobalResponsePB; using yb::master::ReplicationInfoPB; using yb::master::ReservePgsqlOidsRequestPB; using yb::master::ReservePgsqlOidsResponsePB; @@ -3035,5 +3039,38 @@ void YBClient::RequestAbortAllRpcs() { data_->rpcs_.RequestAbortAll(); } +Status YBClient::AcquireObjectLocksGlobal(const tserver::AcquireObjectLockRequestPB& lock_req) { + LOG_WITH_FUNC(INFO) << lock_req.ShortDebugString(); + AcquireObjectLocksGlobalRequestPB req; + AcquireObjectLocksGlobalResponsePB resp; + req.set_txn_id(lock_req.txn_id()); + req.set_txn_reuse_version(lock_req.txn_reuse_version()); + req.set_subtxn_id(lock_req.subtxn_id()); + req.set_session_host_uuid(lock_req.session_host_uuid()); + req.mutable_object_locks()->CopyFrom(lock_req.object_locks()); + CALL_SYNC_LEADER_MASTER_RPC(req, resp, AcquireObjectLocksGlobal); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); +} + +Status YBClient::ReleaseObjectLocksGlobal(const tserver::ReleaseObjectLockRequestPB& release_req) { + LOG_WITH_FUNC(INFO) << release_req.ShortDebugString(); + ReleaseObjectLocksGlobalRequestPB req; + ReleaseObjectLocksGlobalResponsePB resp; + req.set_txn_id(release_req.txn_id()); + req.set_txn_reuse_version(release_req.txn_reuse_version()); + req.set_subtxn_id(release_req.subtxn_id()); + req.set_session_host_uuid(release_req.session_host_uuid()); + req.mutable_object_locks()->CopyFrom(release_req.object_locks()); + req.set_release_all_locks(release_req.release_all_locks()); + CALL_SYNC_LEADER_MASTER_RPC(req, resp, ReleaseObjectLocksGlobal); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); +} + } // namespace client } // namespace yb diff --git a/src/yb/client/client.h b/src/yb/client/client.h index 18f4017636c2..b6f62f36faa7 100644 --- a/src/yb/client/client.h +++ b/src/yb/client/client.h @@ -76,6 +76,8 @@ #include "yb/server/clock.h" #include "yb/tserver/pg_client.pb.h" +#include "yb/tserver/tserver.pb.h" + #include "yb/util/enums.h" #include "yb/util/mem_tracker.h" #include "yb/util/monotime.h" @@ -1053,6 +1055,9 @@ class YBClient { void RequestAbortAllRpcs(); + Status AcquireObjectLocksGlobal(const tserver::AcquireObjectLockRequestPB& lock_req); + Status ReleaseObjectLocksGlobal(const tserver::ReleaseObjectLockRequestPB& release_req); + private: class Data; diff --git a/src/yb/common/common_flags.cc b/src/yb/common/common_flags.cc index a020161e4716..8b47538aaeb4 100644 --- a/src/yb/common/common_flags.cc +++ b/src/yb/common/common_flags.cc @@ -132,6 +132,11 @@ DEFINE_NON_RUNTIME_bool(TEST_ysql_hide_catalog_version_increment_log, false, "Hide catalog version increment log messages."); TAG_FLAG(TEST_ysql_hide_catalog_version_increment_log, hidden); +DEFINE_test_flag(bool, enable_object_locking_for_table_locks, false, + "This test flag enables the object lock APIs provided by tservers and masters - " + "AcquireObject(Global)Lock, ReleaseObject(Global)Lock. These APIs are used to " + "implement pg table locks."); + // The following flags related to the cloud, region and availability zone that an instance is // started in. These are passed in from whatever provisioning mechanics start the servers. They // are used for generic placement policies on table creation and tablet load balancing, to diff --git a/src/yb/common/common_flags.h b/src/yb/common/common_flags.h index ddfd25436529..0c44f65c6090 100644 --- a/src/yb/common/common_flags.h +++ b/src/yb/common/common_flags.h @@ -31,6 +31,7 @@ DECLARE_bool(TEST_ysql_hide_catalog_version_increment_log); DECLARE_bool(TEST_check_catalog_version_overflow); DECLARE_int32(ysql_clone_pg_schema_rpc_timeout_ms); DECLARE_bool(ysql_enable_auto_analyze_service); +DECLARE_bool(TEST_enable_object_locking_for_table_locks); namespace yb { diff --git a/src/yb/integration-tests/object_lock-test.cc b/src/yb/integration-tests/object_lock-test.cc index 19982e2de68f..3a788b6f2113 100644 --- a/src/yb/integration-tests/object_lock-test.cc +++ b/src/yb/integration-tests/object_lock-test.cc @@ -20,7 +20,7 @@ #include "yb/master/catalog_manager.h" #include "yb/master/master.h" -#include "yb/master/master_client.proxy.h" +#include "yb/master/master_ddl.proxy.h" #include "yb/master/mini_master.h" #include "yb/master/test_async_rpc_manager.h" @@ -96,11 +96,11 @@ class ObjectLockTest : public YBMiniClusterTestBase { return TServerProxyFor(cluster_->mini_tablet_server(i)); } - tserver::TabletServerServiceProxy MasterProxy(const master::MiniMaster* master) { - return tserver::TabletServerServiceProxy{proxy_cache_.get(), master->bound_rpc_addr()}; + master::MasterDdlProxy MasterProxy(const master::MiniMaster* master) { + return master::MasterDdlProxy{proxy_cache_.get(), master->bound_rpc_addr()}; } - Result MasterLeaderProxy() { + Result MasterLeaderProxy() { return MasterProxy(VERIFY_RESULT(cluster_->GetLeaderMiniMaster())); } @@ -124,10 +124,11 @@ constexpr uint64_t kObjectId = 1; constexpr uint64_t kObjectId2 = 2; constexpr size_t kTimeoutMs = 5000; -tserver::AcquireObjectLockRequestPB AcquireRequestFor( +template +Request AcquireRequestFor( const std::string& session_host_uuid, const docdb::ObjectLockOwner& owner, uint64_t database_id, uint64_t object_id, TableLockType lock_type) { - tserver::AcquireObjectLockRequestPB req; + Request req; owner.PopulateLockRequest(&req); req.set_session_host_uuid(session_host_uuid); auto* lock = req.add_object_locks(); @@ -143,30 +144,68 @@ rpc::RpcController RpcController() { return controller; } +void AcquireLockAsyncAt( + tserver::TabletServerServiceProxy* proxy, rpc::RpcController* controller, + const std::string& session_host_uuid, const docdb::ObjectLockOwner& owner, uint64_t database_id, + uint64_t object_id, TableLockType type, std::function callback, + tserver::AcquireObjectLockResponsePB* resp) { + auto req = AcquireRequestFor( + session_host_uuid, owner, database_id, object_id, type); + proxy->AcquireObjectLocksAsync(req, resp, controller, callback); +} + Status AcquireLockAt( tserver::TabletServerServiceProxy* proxy, const std::string& session_host_uuid, const docdb::ObjectLockOwner& owner, uint64_t database_id, uint64_t object_id, TableLockType type) { + CountDownLatch latch{1}; tserver::AcquireObjectLockResponsePB resp; - auto req = AcquireRequestFor(session_host_uuid, owner, database_id, object_id, type); auto rpc_controller = RpcController(); - return proxy->AcquireObjectLocks(req, &resp, &rpc_controller); + AcquireLockAsyncAt( + proxy, &rpc_controller, session_host_uuid, owner, database_id, object_id, type, + latch.CountDownCallback(), &resp); + latch.Wait(); + RETURN_NOT_OK(rpc_controller.status()); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); } -void AcquireLockAsyncAt( - tserver::TabletServerServiceProxy* proxy, rpc::RpcController* controller, +void AcquireLockGloballyAsyncAt( + master::MasterDdlProxy* proxy, rpc::RpcController* controller, const std::string& session_host_uuid, const docdb::ObjectLockOwner& owner, uint64_t database_id, - uint64_t object_id, TableLockType type, std::function callback, - tserver::AcquireObjectLockResponsePB* resp) { - auto req = AcquireRequestFor(session_host_uuid, owner, database_id, object_id, type); - proxy->AcquireObjectLocksAsync(req, resp, controller, callback); + uint64_t object_id, std::function callback, + master::AcquireObjectLocksGlobalResponsePB* resp) { + auto req = AcquireRequestFor( + session_host_uuid, owner, database_id, object_id, TableLockType::ACCESS_EXCLUSIVE); + proxy->AcquireObjectLocksGlobalAsync(req, resp, controller, callback); +} + +Status AcquireLockGloballyAt( + master::MasterDdlProxy* proxy, const std::string& session_host_uuid, + const docdb::ObjectLockOwner& owner, uint64_t database_id, uint64_t object_id) { + CountDownLatch latch{1}; + master::AcquireObjectLocksGlobalResponsePB resp; + auto rpc_controller = RpcController(); + AcquireLockGloballyAsyncAt( + proxy, &rpc_controller, session_host_uuid, owner, database_id, object_id, + latch.CountDownCallback(), &resp); + latch.Wait(); + RETURN_NOT_OK(rpc_controller.status()); + if (resp.has_error()) { + return StatusFromPB(resp.error().status()); + } + return Status::OK(); } -tserver::ReleaseObjectLockRequestPB ReleaseRequestFor( +template +Request ReleaseRequestFor( const std::string& session_host_uuid, const docdb::ObjectLockOwner& owner, std::optional database_id, std::optional object_id) { - tserver::ReleaseObjectLockRequestPB req; + Request req; owner.PopulateLockRequest(&req); + req.set_session_host_uuid(session_host_uuid); // TODO(Amit): Do we support specifiying db id but not object id? if (!database_id || !object_id) { req.set_release_all_locks(true); @@ -184,25 +223,33 @@ Status ReleaseLockAt( std::optional object_id) { tserver::ReleaseObjectLockResponsePB resp; rpc::RpcController controller = RpcController(); - auto req = ReleaseRequestFor(session_host_uuid, owner, database_id, object_id); + auto req = ReleaseRequestFor( + session_host_uuid, owner, database_id, object_id); return proxy->ReleaseObjectLocks(req, &resp, &controller); } +Status ReleaseLockGloballyAt( + master::MasterDdlProxy* proxy, const std::string& session_host_uuid, + const docdb::ObjectLockOwner& owner, std::optional database_id, + std::optional object_id) { + master::ReleaseObjectLocksGlobalResponsePB resp; + rpc::RpcController controller = RpcController(); + auto req = ReleaseRequestFor( + session_host_uuid, owner, database_id, object_id); + return proxy->ReleaseObjectLocksGlobal(req, &resp, &controller); +} + TEST_F(ObjectLockTest, AcquireObjectLocks) { const auto& kSessionHostUuid = TSUuid(0); auto master_proxy = ASSERT_RESULT(MasterLeaderProxy()); - ASSERT_OK(AcquireLockAt( - &master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId, - TableLockType::ACCESS_EXCLUSIVE)); + ASSERT_OK(AcquireLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId)); } TEST_F(ObjectLockTest, ReleaseObjectLocks) { const auto& kSessionHostUuid = TSUuid(0); auto master_proxy = ASSERT_RESULT(MasterLeaderProxy()); - ASSERT_OK(AcquireLockAt( - &master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId, - TableLockType::ACCESS_EXCLUSIVE)); - ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId)); + ASSERT_OK(AcquireLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId)); + ASSERT_OK(ReleaseLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId)); } void ObjectLockTest::testAcquireObjectLockWaitsOnTServer(bool do_master_failover) { @@ -235,13 +282,13 @@ void ObjectLockTest::testAcquireObjectLockWaitsOnTServer(bool do_master_failover } CountDownLatch ddl_latch(1); auto master_proxy = ASSERT_RESULT(MasterLeaderProxy()); - tserver::AcquireObjectLockResponsePB resp; + master::AcquireObjectLocksGlobalResponsePB resp; auto controller = RpcController(); LOG(INFO) << "Requesting DDL lock at master : " << ASSERT_RESULT(cluster_->GetLeaderMiniMaster())->ToString(); - AcquireLockAsyncAt( + AcquireLockGloballyAsyncAt( &master_proxy, &controller, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId, - TableLockType::ACCESS_EXCLUSIVE, ddl_latch.CountDownCallback(), &resp); + ddl_latch.CountDownCallback(), &resp); // Wait. But the lock acquisition should not be successful. ASSERT_OK(WaitFor( @@ -273,13 +320,11 @@ TEST_F(ObjectLockTest, AcquireObjectLocksWaitsOnTServer) { TEST_F(ObjectLockTest, AcquireAndReleaseDDLLock) { const auto& kSessionHostUuid = TSUuid(0); auto master_proxy = ASSERT_RESULT(MasterLeaderProxy()); - ASSERT_OK(AcquireLockAt( - &master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId, - TableLockType::ACCESS_EXCLUSIVE)); - ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); + ASSERT_OK(AcquireLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); + ASSERT_OK(ReleaseLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); // Release non-existent lock. - ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId2)); + ASSERT_OK(ReleaseLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId2)); } void DumpMasterAndTServerLocks( @@ -312,9 +357,7 @@ void DumpMasterAndTServerLocks( TEST_F(ObjectLockTest, DDLLockWaitsAtMaster) { const auto& kSessionHostUuid = TSUuid(0); auto master_proxy = ASSERT_RESULT(MasterLeaderProxy()); - ASSERT_OK(AcquireLockAt( - &master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId, - TableLockType::ACCESS_EXCLUSIVE)); + ASSERT_OK(AcquireLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId)); auto master_local_lock_manager = cluster_->mini_master() ->master() ->catalog_manager_impl() @@ -331,11 +374,11 @@ TEST_F(ObjectLockTest, DDLLockWaitsAtMaster) { } CountDownLatch ddl_latch(1); - tserver::AcquireObjectLockResponsePB resp; + master::AcquireObjectLocksGlobalResponsePB resp; auto controller = RpcController(); - AcquireLockAsyncAt( + AcquireLockGloballyAsyncAt( &master_proxy, &controller, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId, - TableLockType::ACCESS_EXCLUSIVE, ddl_latch.CountDownCallback(), &resp); + ddl_latch.CountDownCallback(), &resp); // Wait for the lock acquisition to wait at master. ASSERT_OK(WaitFor( @@ -357,7 +400,7 @@ TEST_F(ObjectLockTest, DDLLockWaitsAtMaster) { } // Release lock from Session-1 - ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId)); + ASSERT_OK(ReleaseLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn1, kDatabaseID, kObjectId)); // Verify that lock acquistion for session-2 is successful. ASSERT_TRUE(ddl_latch.WaitFor(MonoDelta::FromMilliseconds(kTimeoutMs))); @@ -374,7 +417,7 @@ TEST_F(ObjectLockTest, DDLLockWaitsAtMaster) { } // Release lock from Session-2 - ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); + ASSERT_OK(ReleaseLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); DumpMasterAndTServerLocks(cluster_.get(), "After releasing all locks"); ASSERT_EQ(master_local_lock_manager->TEST_GrantedLocksSize(), 0); ASSERT_EQ(master_local_lock_manager->TEST_WaitingLocksSize(), 0); @@ -396,9 +439,9 @@ TEST_F(ObjectLockTest, DDLLocksCleanupAtMaster) { for (uint64_t object_id = 0; object_id < kNumLocksTotal; object_id++) { auto host_idx = object_id / kLocksPerHost; auto ddl_idx = (object_id / kNumObjectsPerDDL) % kNumDDLsPerHost; - ASSERT_OK(AcquireLockAt( + ASSERT_OK(AcquireLockGloballyAt( &master_proxy, TSUuid(host_idx), ddl_txns[ddl_idx * kNumHosts + host_idx], kDatabaseID, - object_id, TableLockType::ACCESS_EXCLUSIVE)); + object_id)); } // Waiting locks should not be cleaned up yet. @@ -418,7 +461,8 @@ TEST_F(ObjectLockTest, DDLLocksCleanupAtMaster) { } // Release all locks taken from host-0, session-0 - ASSERT_OK(ReleaseLockAt(&master_proxy, TSUuid(0), ddl_txns[0], std::nullopt, std::nullopt)); + ASSERT_OK( + ReleaseLockGloballyAt(&master_proxy, TSUuid(0), ddl_txns[0], std::nullopt, std::nullopt)); DumpMasterAndTServerLocks(cluster_.get(), "After Releasing locks from host-0, session-0"); num_locks = kEntriesPerRequest * (kNumHosts * kNumDDLsPerHost - 1) * kNumObjectsPerDDL; @@ -457,12 +501,12 @@ TEST_F(ObjectLockTest, AcquireObjectLocksRetriesUponMultipleTServerAddition) { TableLockType::ACCESS_SHARE)); CountDownLatch ddl_latch(1); - tserver::AcquireObjectLockResponsePB resp; + master::AcquireObjectLocksGlobalResponsePB resp; auto controller = RpcController(); auto master_proxy = ASSERT_RESULT(MasterLeaderProxy()); - AcquireLockAsyncAt( + AcquireLockGloballyAsyncAt( &master_proxy, &controller, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId, - TableLockType::ACCESS_EXCLUSIVE, ddl_latch.CountDownCallback(), &resp); + ddl_latch.CountDownCallback(), &resp); // Wait. But the lock acquisition should not be successful. ASSERT_OK(WaitFor( @@ -510,7 +554,7 @@ TEST_F(ObjectLockTest, AcquireObjectLocksRetriesUponMultipleTServerAddition) { ASSERT_TRUE(ddl_latch.WaitFor(MonoDelta::FromMilliseconds(kTimeoutMs))); // Release DDL lock - ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); + ASSERT_OK(ReleaseLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); // Verify that DML lock acquistion is successful. ASSERT_TRUE(ts_latch.WaitFor(MonoDelta::FromMilliseconds(kTimeoutMs))); @@ -526,9 +570,7 @@ TEST_F(ObjectLockTest, AcquireObjectLocksRetriesUponMultipleTServerAddition) { TEST_F(ObjectLockTest, BootstrapTServersUponAddition) { const auto& kSessionHostUuid = TSUuid(0); auto master_proxy = ASSERT_RESULT(MasterLeaderProxy()); - ASSERT_OK(AcquireLockAt( - &master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId, - TableLockType::ACCESS_EXCLUSIVE)); + ASSERT_OK(AcquireLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); auto num_ts = cluster_->num_tablet_servers(); ASSERT_OK(cluster_->AddTabletServer()); @@ -554,7 +596,7 @@ TEST_F(ObjectLockTest, BootstrapTServersUponAddition) { ASSERT_EQ(ts->server()->ts_local_lock_manager()->TEST_GrantedLocksSize(), expected_locks); } - ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); + ASSERT_OK(ReleaseLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); LOG(INFO) << "Counts after releasing the DDL lock"; expected_locks = 0; @@ -593,9 +635,8 @@ TEST_F_EX(ObjectLockTest, AcquireAndReleaseDDLLockAcrossMasterFailover, MultiMas LOG(INFO) << "Acquiring lock on object " << kObjectId << " from master " << leader_master1->ToString(); auto master_proxy = MasterProxy(leader_master1); - ASSERT_OK(AcquireLockAt( - &master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId, - TableLockType::ACCESS_EXCLUSIVE)); + ASSERT_OK( + AcquireLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); } auto master_local_lock_manager1 = leader_master1->master() @@ -639,7 +680,8 @@ TEST_F_EX(ObjectLockTest, AcquireAndReleaseDDLLockAcrossMasterFailover, MultiMas LOG(INFO) << "Releasing lock on object " << kObjectId << " at master " << leader_master2->ToString(); auto master_proxy = MasterProxy(leader_master2); - ASSERT_OK(ReleaseLockAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); + ASSERT_OK( + ReleaseLockGloballyAt(&master_proxy, kSessionHostUuid, kTxn2, kDatabaseID, kObjectId)); } } diff --git a/src/yb/master/catalog_manager.cc b/src/yb/master/catalog_manager.cc index 3dae18e8b13a..7d7a1e6739a0 100644 --- a/src/yb/master/catalog_manager.cc +++ b/src/yb/master/catalog_manager.cc @@ -6231,8 +6231,8 @@ Status CatalogManager::DeleteIndexInfoFromTable( return Status::OK(); } -void CatalogManager::AcquireObjectLocks( - const tserver::AcquireObjectLockRequestPB* req, tserver::AcquireObjectLockResponsePB* resp, +void CatalogManager::AcquireObjectLocksGlobal( + const AcquireObjectLocksGlobalRequestPB* req, AcquireObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc) { VLOG(0) << __PRETTY_FUNCTION__; if (!FLAGS_TEST_enable_object_locking_for_table_locks) { @@ -6244,8 +6244,8 @@ void CatalogManager::AcquireObjectLocks( object_lock_info_manager_->LockObject(*req, resp, std::move(rpc)); } -void CatalogManager::ReleaseObjectLocks( - const tserver::ReleaseObjectLockRequestPB* req, tserver::ReleaseObjectLockResponsePB* resp, +void CatalogManager::ReleaseObjectLocksGlobal( + const ReleaseObjectLocksGlobalRequestPB* req, ReleaseObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc) { VLOG(0) << __PRETTY_FUNCTION__; if (!FLAGS_TEST_enable_object_locking_for_table_locks) { diff --git a/src/yb/master/catalog_manager.h b/src/yb/master/catalog_manager.h index 5650084a4e9d..d98ccad8ccc2 100644 --- a/src/yb/master/catalog_manager.h +++ b/src/yb/master/catalog_manager.h @@ -421,11 +421,11 @@ class CatalogManager : public CatalogManagerIf, public SnapshotCoordinatorContex rpc::RpcContext* rpc, const LeaderEpoch& epoch); - void AcquireObjectLocks( - const tserver::AcquireObjectLockRequestPB* req, tserver::AcquireObjectLockResponsePB* resp, + void AcquireObjectLocksGlobal( + const AcquireObjectLocksGlobalRequestPB* req, AcquireObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc); - void ReleaseObjectLocks( - const tserver::ReleaseObjectLockRequestPB* req, tserver::ReleaseObjectLockResponsePB* resp, + void ReleaseObjectLocksGlobal( + const ReleaseObjectLocksGlobalRequestPB* req, ReleaseObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc); void ExportObjectLockInfo(const std::string& tserver_uuid, tserver::DdlLockEntriesPB* resp); ObjectLockInfoManager* object_lock_info_manager() { return object_lock_info_manager_.get(); } diff --git a/src/yb/master/master.h b/src/yb/master/master.h index 908a01710627..3182fa128285 100644 --- a/src/yb/master/master.h +++ b/src/yb/master/master.h @@ -224,6 +224,8 @@ class Master : public tserver::DbServerBase { void WriteServerMetaCacheAsJson(JsonWriter* writer) override; + const std::string& permanent_uuid() const override; + protected: Status RegisterServices(); @@ -255,8 +257,6 @@ class Master : public tserver::DbServerBase { MonoDelta default_client_timeout() override; - const std::string& permanent_uuid() const override; - void SetupAsyncClientInit(client::AsyncClientInitializer* async_client_init) override; std::atomic state_; diff --git a/src/yb/master/master_ddl.proto b/src/yb/master/master_ddl.proto index 9ab5ffc79fc9..62a26606c655 100644 --- a/src/yb/master/master_ddl.proto +++ b/src/yb/master/master_ddl.proto @@ -19,6 +19,7 @@ option java_package = "org.yb.master"; import "yb/common/common.proto"; import "yb/common/common_types.proto"; import "yb/common/wire_protocol.proto"; +import "yb/docdb/docdb.proto"; import "yb/master/catalog_entity_info.proto"; import "yb/master/master_types.proto"; import "yb/rpc/service.proto"; @@ -750,6 +751,36 @@ message IsYsqlDdlVerificationDoneResponsePB { optional bool done = 2; } +// ============================================================================ +// Table/Object Locks +// ============================================================================ + +message AcquireObjectLocksGlobalRequestPB { + optional bytes txn_id = 1; + optional uint64 txn_reuse_version = 2; + optional uint32 subtxn_id = 3; + optional bytes session_host_uuid = 4; + repeated docdb.ObjectLockPB object_locks = 5; +} + +message AcquireObjectLocksGlobalResponsePB { + optional MasterErrorPB error = 1; +} + +message ReleaseObjectLocksGlobalRequestPB { + optional bytes txn_id = 1; + optional uint64 txn_reuse_version = 2; + optional uint32 subtxn_id = 3; + optional bytes session_host_uuid = 4; + repeated docdb.ObjectLockPB object_locks = 5; + // When set, releases locks on all objects held against the given session id. + optional bool release_all_locks = 6; +} + +message ReleaseObjectLocksGlobalResponsePB { + optional MasterErrorPB error = 1; +} + service MasterDdl { option (yb.rpc.custom_service_name) = "yb.master.MasterService"; @@ -804,4 +835,9 @@ service MasterDdl { (ReportYsqlDdlTxnStatusResponsePB); rpc IsYsqlDdlVerificationDone(IsYsqlDdlVerificationDoneRequestPB) returns (IsYsqlDdlVerificationDoneResponsePB); + + rpc AcquireObjectLocksGlobal(AcquireObjectLocksGlobalRequestPB) + returns (AcquireObjectLocksGlobalResponsePB); + rpc ReleaseObjectLocksGlobal(ReleaseObjectLocksGlobalRequestPB) + returns (ReleaseObjectLocksGlobalResponsePB); } diff --git a/src/yb/master/master_ddl_service.cc b/src/yb/master/master_ddl_service.cc index e5f9f67f714d..f35163ae61c1 100644 --- a/src/yb/master/master_ddl_service.cc +++ b/src/yb/master/master_ddl_service.cc @@ -62,6 +62,18 @@ class MasterDdlServiceImpl : public MasterServiceBase, public MasterDdlIf { (ReportYsqlDdlTxnStatus) (TruncateTable) ) + + void AcquireObjectLocksGlobal( + const AcquireObjectLocksGlobalRequestPB* req, AcquireObjectLocksGlobalResponsePB* resp, + rpc::RpcContext rpc) override { + server_->catalog_manager_impl()->AcquireObjectLocksGlobal(req, resp, std::move(rpc)); + } + + void ReleaseObjectLocksGlobal( + const ReleaseObjectLocksGlobalRequestPB* req, ReleaseObjectLocksGlobalResponsePB* resp, + rpc::RpcContext rpc) override { + server_->catalog_manager_impl()->ReleaseObjectLocksGlobal(req, resp, std::move(rpc)); + } }; } // namespace diff --git a/src/yb/master/master_tablet_service.cc b/src/yb/master/master_tablet_service.cc index f833cb839069..d41d4bb14f34 100644 --- a/src/yb/master/master_tablet_service.cc +++ b/src/yb/master/master_tablet_service.cc @@ -21,7 +21,6 @@ #include "yb/dockv/doc_key.h" -#include "yb/master/catalog_manager.h" #include "yb/master/catalog_manager_if.h" #include "yb/master/master.h" #include "yb/master/scoped_leader_shared_lock.h" @@ -33,7 +32,6 @@ #include "yb/util/logging.h" #include "yb/util/result.h" #include "yb/util/status_format.h" -#include "yb/util/trace.h" DEFINE_test_flag(int32, ysql_catalog_write_rejection_percentage, 0, "Reject specified percentage of writes to the YSQL catalog tables."); @@ -73,33 +71,17 @@ Result> MasterTabletServiceImpl::GetTabl void MasterTabletServiceImpl::AcquireObjectLocks( const tserver::AcquireObjectLockRequestPB* req, tserver::AcquireObjectLockResponsePB* resp, rpc::RpcContext context) { - if (!FLAGS_TEST_enable_object_locking_for_table_locks) { - context.RespondRpcFailure( - rpc::ErrorStatusPB::ERROR_APPLICATION, - STATUS(NotSupported, "Flag enable_object_locking_for_table_locks disabled")); - return; - } - - TRACE("Start AcquireObjectLocks"); - VLOG(2) << "Received AcquireObjectLocks RPC: " << req->DebugString(); - - master_->catalog_manager_impl()->AcquireObjectLocks(req, resp, std::move(context)); + context.RespondRpcFailure( + rpc::ErrorStatusPB::ERROR_APPLICATION, + STATUS(NotSupported, "AcquireObjectLocks is not implemented at masters")); } void MasterTabletServiceImpl::ReleaseObjectLocks( const tserver::ReleaseObjectLockRequestPB* req, tserver::ReleaseObjectLockResponsePB* resp, rpc::RpcContext context) { - if (!FLAGS_TEST_enable_object_locking_for_table_locks) { - context.RespondRpcFailure( - rpc::ErrorStatusPB::ERROR_APPLICATION, - STATUS(NotSupported, "Flag enable_object_locking_for_table_locks disabled")); - return; - } - - TRACE("Start ReleaseObjectLocks"); - VLOG(2) << "Received ReleaseObjectLocks RPC: " << req->DebugString(); - - master_->catalog_manager_impl()->ReleaseObjectLocks(req, resp, std::move(context)); + context.RespondRpcFailure( + rpc::ErrorStatusPB::ERROR_APPLICATION, + STATUS(NotSupported, "ReleaseObjectLocks is not implemented at masters")); } void MasterTabletServiceImpl::Write(const tserver::WriteRequestPB* req, diff --git a/src/yb/master/master_tserver.cc b/src/yb/master/master_tserver.cc index a18061f2c5dc..68957bb67e4e 100644 --- a/src/yb/master/master_tserver.cc +++ b/src/yb/master/master_tserver.cc @@ -230,5 +230,9 @@ bool MasterTabletServer::SkipCatalogVersionChecks() { return master_->catalog_manager()->SkipCatalogVersionChecks(); } +const std::string& MasterTabletServer::permanent_uuid() const { + return master_->permanent_uuid(); +} + } // namespace master } // namespace yb diff --git a/src/yb/master/master_tserver.h b/src/yb/master/master_tserver.h index 5a1f9d9ac678..63c74971ffd0 100644 --- a/src/yb/master/master_tserver.h +++ b/src/yb/master/master_tserver.h @@ -33,7 +33,7 @@ class MasterTabletServer : public tserver::TabletServerIf, MasterTabletServer(Master* master, scoped_refptr metric_entity); tserver::TSTabletManager* tablet_manager() override; tserver::TabletPeerLookupIf* tablet_peer_lookup() override; - tablet::TSLocalLockManager* ts_local_lock_manager() override { return nullptr; } + tablet::TSLocalLockManager* ts_local_lock_manager() const override { return nullptr; } server::Clock* Clock() override; const scoped_refptr& MetricEnt() const override; @@ -111,6 +111,8 @@ class MasterTabletServer : public tserver::TabletServerIf, bool SkipCatalogVersionChecks() override; + const std::string& permanent_uuid() const override; + private: Result CreateInternalPGConn( const std::string& database_name, const std::optional& deadline) override; diff --git a/src/yb/master/object_lock_info_manager.cc b/src/yb/master/object_lock_info_manager.cc index 6303cfa6ca1f..4ebbfd28f4c2 100644 --- a/src/yb/master/object_lock_info_manager.cc +++ b/src/yb/master/object_lock_info_manager.cc @@ -82,14 +82,14 @@ class ObjectLockInfoManager::Impl { local_lock_manager_(std::make_shared()) {} void LockObject( - const tserver::AcquireObjectLockRequestPB& req, tserver::AcquireObjectLockResponsePB* resp, + AcquireObjectLockRequestPB req, AcquireObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc); void LockObject( const tserver::AcquireObjectLockRequestPB& req, rpc::RpcContext context, StdStatusCallback callback); void UnlockObject( - const tserver::ReleaseObjectLockRequestPB& req, tserver::ReleaseObjectLockResponsePB* resp, + ReleaseObjectLockRequestPB req, ReleaseObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc); void UnlockObject( const tserver::ReleaseObjectLockRequestPB& req, std::optional context, @@ -98,10 +98,8 @@ class ObjectLockInfoManager::Impl { void ReleaseOldObjectLocks( const std::string& tserver_uuid, uint64 current_incarnation_num, bool wait); - Status PersistRequest(LeaderEpoch epoch, const tserver::AcquireObjectLockRequestPB& req) - EXCLUDES(mutex_); - Status PersistRequest(LeaderEpoch epoch, const tserver::ReleaseObjectLockRequestPB& req) - EXCLUDES(mutex_); + Status PersistRequest(LeaderEpoch epoch, const AcquireObjectLockRequestPB& req) EXCLUDES(mutex_); + Status PersistRequest(LeaderEpoch epoch, const ReleaseObjectLockRequestPB& req) EXCLUDES(mutex_); void ExportObjectLockInfo(const std::string& tserver_uuid, tserver::DdlLockEntriesPB* resp) EXCLUDES(mutex_); @@ -120,7 +118,6 @@ class ObjectLockInfoManager::Impl { return local_lock_manager_; } - private: /* The local lock manager is used to acquire and release locks on the master itself. @@ -134,6 +131,7 @@ class ObjectLockInfoManager::Impl { return local_lock_manager_; } + private: std::shared_ptr ts_local_lock_manager_during_catalog_loading() EXCLUDES(mutex_) { catalog_manager_->AssertLeaderLockAcquiredForWriting(); @@ -154,8 +152,17 @@ class ObjectLockInfoManager::Impl { std::shared_ptr local_lock_manager_ GUARDED_BY(mutex_); }; +template +class UpdateAll { + public: + UpdateAll() = default; + virtual ~UpdateAll() = default; + virtual const Req& request() const = 0; +}; + template -class UpdateAllTServers : public std::enable_shared_from_this> { +class UpdateAllTServers : public std::enable_shared_from_this>, + public UpdateAll { public: UpdateAllTServers( LeaderEpoch epoch, Master* master, CatalogManager* catalog_manager, @@ -163,7 +170,7 @@ class UpdateAllTServers : public std::enable_shared_from_this context); void Launch(); - const Req& request() const { + const Req& request() const override { return req_; } @@ -173,6 +180,9 @@ class UpdateAllTServers : public std::enable_shared_from_this TServerTaskFor( + const TabletServerId& ts_uuid, StdStatusCallback callback); + LeaderEpoch epoch_; Master* master_; CatalogManager* catalog_manager_; @@ -190,8 +200,7 @@ class UpdateTServer : public RetrySpecificTSRpcTask { public: UpdateTServer( Master* master, ThreadPool* callback_pool, const TabletServerId& ts_uuid, - std::shared_ptr> shared_all_tservers, - StdStatusCallback callback); + std::shared_ptr> shared_all_tservers, StdStatusCallback callback); server::MonitoredTaskType type() const override { return server::MonitoredTaskType::kObjectLock; } @@ -217,7 +226,7 @@ class UpdateTServer : public RetrySpecificTSRpcTask { StdStatusCallback callback_; Resp resp_; - std::shared_ptr> shared_all_tservers_; + std::shared_ptr> shared_all_tservers_; }; namespace { @@ -230,6 +239,38 @@ constexpr int kIncarnationId = 0; const std::string kNotTheMasterLeader = "Master is not the leader"; const std::string kEpochChanged = "Epoch changed"; +AcquireObjectLockRequestPB TserverRequestFor( + const AcquireObjectLocksGlobalRequestPB& master_request) { + AcquireObjectLockRequestPB req; + req.set_txn_id(master_request.txn_id()); + req.set_txn_reuse_version(master_request.txn_reuse_version()); + req.set_subtxn_id(master_request.subtxn_id()); + req.set_session_host_uuid(master_request.session_host_uuid()); + for (auto& entry : master_request.object_locks()) { + auto* lock = req.add_object_locks(); + lock->set_database_oid(entry.database_oid()); + lock->set_object_oid(entry.object_oid()); + lock->set_lock_type(entry.lock_type()); + } + return req; +} + +ReleaseObjectLockRequestPB TserverRequestFor( + const ReleaseObjectLocksGlobalRequestPB& master_request) { + ReleaseObjectLockRequestPB req; + req.set_txn_id(master_request.txn_id()); + req.set_txn_reuse_version(master_request.txn_reuse_version()); + req.set_subtxn_id(master_request.subtxn_id()); + req.set_session_host_uuid(master_request.session_host_uuid()); + for (auto& entry : master_request.object_locks()) { + auto* lock = req.add_object_locks(); + lock->set_database_oid(entry.database_oid()); + lock->set_object_oid(entry.object_oid()); + } + req.set_release_all_locks(master_request.release_all_locks()); + return req; +} + } // namespace ObjectLockInfoManager::ObjectLockInfoManager(Master* master, CatalogManager* catalog_manager) @@ -238,15 +279,15 @@ ObjectLockInfoManager::ObjectLockInfoManager(Master* master, CatalogManager* cat ObjectLockInfoManager::~ObjectLockInfoManager() = default; void ObjectLockInfoManager::LockObject( - const tserver::AcquireObjectLockRequestPB& req, tserver::AcquireObjectLockResponsePB* resp, + const AcquireObjectLocksGlobalRequestPB& req, AcquireObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc) { - impl_->LockObject(req, resp, std::move(rpc)); + impl_->LockObject(TserverRequestFor(req), resp, std::move(rpc)); } void ObjectLockInfoManager::UnlockObject( - const tserver::ReleaseObjectLockRequestPB& req, tserver::ReleaseObjectLockResponsePB* resp, + const ReleaseObjectLocksGlobalRequestPB& req, ReleaseObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc) { - impl_->UnlockObject(req, resp, std::move(rpc)); + impl_->UnlockObject(TserverRequestFor(req), resp, std::move(rpc)); } void ObjectLockInfoManager::ExportObjectLockInfo( @@ -266,6 +307,10 @@ void ObjectLockInfoManager::UpdateObjectLocks( void ObjectLockInfoManager::Clear() { impl_->Clear(); } +std::shared_ptr ObjectLockInfoManager::ts_local_lock_manager() { + return impl_->ts_local_lock_manager(); +} + std::shared_ptr ObjectLockInfoManager::TEST_ts_local_lock_manager() { return impl_->TEST_ts_local_lock_manager(); } @@ -284,7 +329,7 @@ std::shared_ptr ObjectLockInfoManager::Impl::GetOrCreateObjectLo } Status ObjectLockInfoManager::Impl::PersistRequest( - LeaderEpoch epoch, const tserver::AcquireObjectLockRequestPB& req) { + LeaderEpoch epoch, const AcquireObjectLockRequestPB& req) { VLOG(3) << __PRETTY_FUNCTION__; auto key = req.session_host_uuid(); std::shared_ptr object_lock_info = GetOrCreateObjectLockInfo(key); @@ -306,7 +351,7 @@ Status ObjectLockInfoManager::Impl::PersistRequest( } Status ObjectLockInfoManager::Impl::PersistRequest( - LeaderEpoch epoch, const tserver::ReleaseObjectLockRequestPB& req) { + LeaderEpoch epoch, const ReleaseObjectLockRequestPB& req) { VLOG(3) << __PRETTY_FUNCTION__; auto key = req.session_host_uuid(); std::shared_ptr object_lock_info = GetOrCreateObjectLockInfo(key); @@ -351,9 +396,9 @@ template void FillErrorIfRequired(const Status& status, Resp* resp) { if (!status.ok()) { if (status.IsTryAgain() && status.message().ToBuffer() == kNotTheMasterLeader) { - resp->mutable_error()->set_code(TabletServerErrorPB::NOT_THE_LEADER); + resp->mutable_error()->set_code(MasterErrorPB::NOT_THE_LEADER); } else { - resp->mutable_error()->set_code(TabletServerErrorPB::UNKNOWN_ERROR); + resp->mutable_error()->set_code(MasterErrorPB::UNKNOWN_ERROR); } StatusToPB(status, resp->mutable_error()->mutable_status()); } @@ -439,7 +484,7 @@ void ObjectLockInfoManager::Impl::ExportObjectLockInfo( - The RPCs can be retried by the YBClient/PgClient, or left for the release request to clean up. */ void ObjectLockInfoManager::Impl::LockObject( - const AcquireObjectLockRequestPB& req, AcquireObjectLockResponsePB* resp, + AcquireObjectLockRequestPB req, AcquireObjectLocksGlobalResponsePB* resp, rpc::RpcContext context) { LockObject(req, std::move(context), [resp](const Status& s) { FillErrorIfRequired(s, resp); }); } @@ -497,14 +542,14 @@ void ObjectLockInfoManager::Impl::LockObject( } // TODO: Fix this. GetAllDescriptors may need to change to handle tserver membership reliably. - auto lock_objects = - std::make_shared>( - epoch, master_, catalog_manager_, this, req, std::move(callback), std::move(context)); + auto lock_objects = std::make_shared< + UpdateAllTServers>( + epoch, master_, catalog_manager_, this, req, std::move(callback), std::move(context)); lock_objects->Launch(); } void ObjectLockInfoManager::Impl::UnlockObject( - const ReleaseObjectLockRequestPB& req, ReleaseObjectLockResponsePB* resp, + ReleaseObjectLockRequestPB req, ReleaseObjectLocksGlobalResponsePB* resp, rpc::RpcContext context) { UnlockObject(req, std::move(context), [resp](const Status& s) { FillErrorIfRequired(s, resp); }); } @@ -549,9 +594,9 @@ void ObjectLockInfoManager::Impl::UnlockObject( } } - auto unlock_objects = - std::make_shared>( - epoch, master_, catalog_manager_, this, req, std::move(callback), std::move(context)); + auto unlock_objects = std::make_shared< + UpdateAllTServers>( + epoch, master_, catalog_manager_, this, req, std::move(callback), std::move(context)); unlock_objects->Launch(); } @@ -645,6 +690,24 @@ void UpdateAllTServers::Done(size_t i, const Status& s) { } } +template <> +std::shared_ptr +UpdateAllTServers::TServerTaskFor( + const TabletServerId& ts_uuid, StdStatusCallback callback) { + return std::make_shared< + master::UpdateTServer>( + master_, catalog_manager_->AsyncTaskPool(), ts_uuid, this->shared_from_this(), callback); +} + +template <> +std::shared_ptr +UpdateAllTServers::TServerTaskFor( + const TabletServerId& ts_uuid, StdStatusCallback callback) { + return std::make_shared< + master::UpdateTServer>( + master_, catalog_manager_->AsyncTaskPool(), ts_uuid, this->shared_from_this(), callback); +} + template void UpdateAllTServers::Launch() { ts_descriptors_ = master_->ts_manager()->GetAllDescriptors(); @@ -655,9 +718,9 @@ void UpdateAllTServers::Launch() { for (size_t i = 0; i < ts_descriptors_.size(); ++i) { auto ts_uuid = ts_descriptors_[i]->permanent_uuid(); LOG(INFO) << "Launching for " << ts_uuid; - auto callback = std::bind(&UpdateAllTServers::Done, this, i, std::placeholders::_1); - auto task = std::make_shared>( - master_, catalog_manager_->AsyncTaskPool(), ts_uuid, this->shared_from_this(), callback); + auto callback = std::bind( + &UpdateAllTServers::Done, this->shared_from_this(), i, std::placeholders::_1); + auto task = TServerTaskFor(ts_uuid, callback); WARN_NOT_OK( catalog_manager_->ScheduleTask(task), yb::Format( @@ -689,7 +752,7 @@ void UpdateAllTServers::DoneAll() { template UpdateTServer::UpdateTServer( Master* master, ThreadPool* callback_pool, const TabletServerId& ts_uuid, - std::shared_ptr> shared_all_tservers, StdStatusCallback callback) + std::shared_ptr> shared_all_tservers, StdStatusCallback callback) : RetrySpecificTSRpcTask(master, callback_pool, ts_uuid, /* async_task_throttler */ nullptr), callback_(std::move(callback)), shared_all_tservers_(shared_all_tservers) {} diff --git a/src/yb/master/object_lock_info_manager.h b/src/yb/master/object_lock_info_manager.h index 23490968a1b7..20cbacdc5044 100644 --- a/src/yb/master/object_lock_info_manager.h +++ b/src/yb/master/object_lock_info_manager.h @@ -35,6 +35,10 @@ class DdlLockEntriesPB; } // namespace yb::tserver namespace yb::master { +class AcquireObjectLocksGlobalRequestPB; +class AcquireObjectLocksGlobalResponsePB; +class ReleaseObjectLocksGlobalRequestPB; +class ReleaseObjectLocksGlobalResponsePB; struct LeaderEpoch; class ObjectLockInfo; @@ -45,17 +49,18 @@ class ObjectLockInfoManager { virtual ~ObjectLockInfoManager(); void LockObject( - const tserver::AcquireObjectLockRequestPB& req, tserver::AcquireObjectLockResponsePB* resp, + const AcquireObjectLocksGlobalRequestPB& req, AcquireObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc); void UnlockObject( - const tserver::ReleaseObjectLockRequestPB& req, tserver::ReleaseObjectLockResponsePB* resp, + const ReleaseObjectLocksGlobalRequestPB& req, ReleaseObjectLocksGlobalResponsePB* resp, rpc::RpcContext rpc); void ExportObjectLockInfo(const std::string& tserver_uuid, tserver::DdlLockEntriesPB* resp); void UpdateObjectLocks(const std::string& tserver_uuid, std::shared_ptr info); void Clear(); std::shared_ptr TEST_ts_local_lock_manager(); + std::shared_ptr ts_local_lock_manager(); // Releases any object locks that may have been taken by the specified tservers's previous // incarnations. diff --git a/src/yb/tserver/tablet_server.cc b/src/yb/tserver/tablet_server.cc index 0be742288cd7..de2f88a2d27c 100644 --- a/src/yb/tserver/tablet_server.cc +++ b/src/yb/tserver/tablet_server.cc @@ -231,9 +231,7 @@ DEFINE_RUNTIME_uint32(ysql_min_new_version_ignored_count, 10, "Minimum consecutive number of times that a tserver is allowed to ignore an older catalog " "version that is retrieved from a tserver-master heartbeat response."); -DEFINE_test_flag(bool, enable_object_locking_for_table_locks, false, - "The test flag enables a mechanism using which a tserver could serve an object " - "lock request by acquiring corresponding locks at the local TSLocalLockManager."); +DECLARE_bool(TEST_enable_object_locking_for_table_locks); DECLARE_bool(enable_pg_cron); diff --git a/src/yb/tserver/tablet_server.h b/src/yb/tserver/tablet_server.h index c6c41b851a11..5d27ed0f7030 100644 --- a/src/yb/tserver/tablet_server.h +++ b/src/yb/tserver/tablet_server.h @@ -150,7 +150,7 @@ class TabletServer : public DbServerBase, public TabletServerIf { TSTabletManager* tablet_manager() override { return tablet_manager_.get(); } TabletPeerLookupIf* tablet_peer_lookup() override; - tablet::TSLocalLockManager* ts_local_lock_manager() override { + tablet::TSLocalLockManager* ts_local_lock_manager() const override { return ts_local_lock_manager_.get(); } diff --git a/src/yb/tserver/tablet_server_interface.h b/src/yb/tserver/tablet_server_interface.h index 0077540d8bb8..314789c5726f 100644 --- a/src/yb/tserver/tablet_server_interface.h +++ b/src/yb/tserver/tablet_server_interface.h @@ -58,7 +58,7 @@ class TabletServerIf : public LocalTabletServer { virtual TSTabletManager* tablet_manager() = 0; virtual TabletPeerLookupIf* tablet_peer_lookup() = 0; - virtual tablet::TSLocalLockManager* ts_local_lock_manager() = 0; + virtual tablet::TSLocalLockManager* ts_local_lock_manager() const = 0; virtual server::Clock* Clock() = 0; virtual rpc::Publisher* GetPublisher() = 0; @@ -124,6 +124,8 @@ class TabletServerIf : public LocalTabletServer { const std::string& database_name, const std::optional& deadline) = 0; virtual bool SkipCatalogVersionChecks() { return false; } + + virtual const std::string& permanent_uuid() const = 0; }; } // namespace tserver