From c90908269901a80f4e6c3cf1957ff0d5bb52b42a Mon Sep 17 00:00:00 2001 From: jhe Date: Fri, 13 Dec 2024 15:16:30 -0800 Subject: [PATCH] [#19185] xClusterDDLRepl: Drop extension on replication deletion Summary: Add in hooks to drop the xcluster ddl replication extension on replication deletion / alter replication remove namespace. On the target side we always delete the extension since we do not support daisy chaining or N:1 replication. On the source side, we first verify that no other automatic-mode outbound replication groups include this namespace, then do the deletion. Jira: DB-7987 Test Plan: ``` ybd --cxx-test xcluster_ddl_replication-test --gtest_filter "XClusterDDLReplicationTest.BasicSetupAlterTeardown" ``` Reviewers: hsunder, xCluster Reviewed By: hsunder Subscribers: ybase Differential Revision: https://phorge.dev.yugabyte.com/D40628 --- .../xcluster/xcluster_ddl_replication-test.cc | 77 +++++++++++++++---- .../xcluster/xcluster_ysql_test_base.cc | 56 +++++++++++++- .../xcluster/xcluster_ysql_test_base.h | 6 ++ .../master/xcluster/master_xcluster_util.cc | 24 +++++- src/yb/master/xcluster/master_xcluster_util.h | 3 + ...cluster_outbound_replication_group-test.cc | 30 +++++++- .../xcluster_outbound_replication_group.cc | 11 +++ .../xcluster_outbound_replication_group.h | 2 + .../xcluster/xcluster_replication_group.cc | 28 ++++++- .../xcluster/xcluster_source_manager.cc | 23 +++++- .../master/xcluster/xcluster_source_manager.h | 6 +- 11 files changed, 246 insertions(+), 20 deletions(-) diff --git a/src/yb/integration-tests/xcluster/xcluster_ddl_replication-test.cc b/src/yb/integration-tests/xcluster/xcluster_ddl_replication-test.cc index a0069b8a302b..337da72214ad 100644 --- a/src/yb/integration-tests/xcluster/xcluster_ddl_replication-test.cc +++ b/src/yb/integration-tests/xcluster/xcluster_ddl_replication-test.cc @@ -13,6 +13,7 @@ #include "yb/cdc/xcluster_types.h" #include "yb/client/table.h" +#include "yb/client/xcluster_client.h" #include "yb/client/yb_table_name.h" #include "yb/common/colocated_util.h" #include "yb/common/common_types.pb.h" @@ -34,7 +35,20 @@ namespace yb { const MonoDelta kTimeout = 60s * kTimeMultiplier; -class XClusterDDLReplicationTest : public XClusterDDLReplicationTestBase {}; +class XClusterDDLReplicationTest : public XClusterDDLReplicationTestBase { + public: + Status AddDatabaseToReplication( + const NamespaceId& source_db_id, const NamespaceId& target_db_id) { + auto source_xcluster_client = client::XClusterClient(*producer_client()); + RETURN_NOT_OK(source_xcluster_client.AddNamespaceToOutboundReplicationGroup( + kReplicationGroupId, source_db_id)); + auto bootstrap_required = + VERIFY_RESULT(IsXClusterBootstrapRequired(kReplicationGroupId, source_db_id)); + SCHECK(!bootstrap_required, IllegalState, "Bootstrap should not be required"); + + return AddNamespaceToXClusterReplication(source_db_id, target_db_id); + } +}; // In automatic mode, sequences_data should have been created on both universe. TEST_F(XClusterDDLReplicationTest, CheckSequenceDataTable) { @@ -51,23 +65,60 @@ TEST_F(XClusterDDLReplicationTest, CheckSequenceDataTable) { })); } -TEST_F(XClusterDDLReplicationTest, CheckExtensionTableTabletCount) { +TEST_F(XClusterDDLReplicationTest, BasicSetupAlterTeardown) { ASSERT_OK(SetUpClusters()); ASSERT_OK(CheckpointReplicationGroup()); ASSERT_OK(CreateReplicationFromCheckpoint()); + auto source_xcluster_client = client::XClusterClient(*producer_client()); + const auto target_master_address = consumer_cluster()->GetMasterAddresses(); + // Ensure that tables are properly created with only one tablet each. - ASSERT_OK(RunOnBothClusters([&](Cluster* cluster) -> Status { - for (const auto& table_name : - {xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) { - auto yb_table_name = VERIFY_RESULT( - GetYsqlTable(cluster, namespace_name, xcluster::kDDLQueuePgSchemaName, table_name)); - std::shared_ptr table; - RETURN_NOT_OK(cluster->client_->OpenTable(yb_table_name, &table)); - SCHECK_EQ(table->GetPartitionCount(), 1, IllegalState, "Expected 1 tablet"); - } - return Status::OK(); - })); + ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name)); + + // Alter replication to add a new database. + const auto namespace_name2 = namespace_name + "2"; + auto [source_db2_id, target_db2_id] = + ASSERT_RESULT(CreateDatabaseOnBothClusters(namespace_name2)); + ASSERT_OK(AddDatabaseToReplication(source_db2_id, target_db2_id)); + ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name2)); + + // Alter replication to remove the new database. + ASSERT_OK(source_xcluster_client.RemoveNamespaceFromOutboundReplicationGroup( + kReplicationGroupId, source_db2_id, target_master_address)); + ASSERT_OK(VerifyDDLExtensionTablesDeletion(namespace_name2)); + // First namespace should not be affected. + ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name)); + + // Add the second database to replication again to test dropping everything. + ASSERT_OK(AddDatabaseToReplication(source_db2_id, target_db2_id)); + ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name2)); + + // Drop replication. + ASSERT_OK(source_xcluster_client.DeleteOutboundReplicationGroup( + kReplicationGroupId, target_master_address)); + + // Extension should no longer exist on either side. + ASSERT_OK(VerifyDDLExtensionTablesDeletion(namespace_name)); + ASSERT_OK(VerifyDDLExtensionTablesDeletion(namespace_name2)); +} + +TEST_F(XClusterDDLReplicationTest, TestExtensionDeletionWithMultipleReplicationGroups) { + const xcluster::ReplicationGroupId kReplicationGroupId2("ReplicationGroup2"); + ASSERT_OK(SetUpClusters()); + ASSERT_OK(CheckpointReplicationGroup()); + ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name, /*only_source=*/true)); + ASSERT_OK(CheckpointReplicationGroup(kReplicationGroupId2)); + ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name, /*only_source=*/true)); + + auto source_xcluster_client = client::XClusterClient(*producer_client()); + ASSERT_OK(source_xcluster_client.DeleteOutboundReplicationGroup( + kReplicationGroupId, /*target_master_address*/ {})); + ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name, /*only_source=*/true)); + + ASSERT_OK(source_xcluster_client.DeleteOutboundReplicationGroup( + kReplicationGroupId2, /*target_master_address*/ {})); + ASSERT_OK(VerifyDDLExtensionTablesDeletion(namespace_name, /*only_source=*/true)); } TEST_F(XClusterDDLReplicationTest, DisableSplitting) { diff --git a/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc b/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc index 0d8409d35b62..8f23a2fe27a9 100644 --- a/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc +++ b/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc @@ -357,6 +357,23 @@ Result XClusterYsqlTestBase::GetClusterConfig(C return resp.cluster_config(); } +Result> XClusterYsqlTestBase::CreateDatabaseOnBothClusters( + const NamespaceName& db_name) { + RETURN_NOT_OK(RunOnBothClusters([this, &db_name](Cluster* cluster) -> Status { + RETURN_NOT_OK(CreateDatabase(cluster, db_name)); + auto table_name = VERIFY_RESULT(CreateYsqlTable( + cluster, db_name, "" /* schema_name */, "initial_table", + /*tablegroup_name=*/boost::none, /*num_tablets=*/1)); + std::shared_ptr table; + RETURN_NOT_OK(cluster->client_->OpenTable(table_name, &table)); + cluster->tables_.emplace_back(std::move(table)); + return Status::OK(); + })); + return std::make_pair( + VERIFY_RESULT(GetNamespaceId(producer_client(), db_name)), + VERIFY_RESULT(GetNamespaceId(consumer_client(), db_name))); +} + Result XClusterYsqlTestBase::GetYsqlTable( Cluster* cluster, const std::string& namespace_name, @@ -408,7 +425,7 @@ Result XClusterYsqlTestBase::GetYsqlTable( } } return STATUS( - IllegalState, + NotFound, strings::Substitute("Unable to find table $0 in namespace $1", table_name, namespace_name)); } @@ -1030,4 +1047,41 @@ Status XClusterYsqlTestBase::CreateReplicationFromCheckpoint( return WaitForCreateReplicationToFinish(master_addr, namespace_names); } +Status XClusterYsqlTestBase::VerifyDDLExtensionTablesCreation( + const NamespaceName& db_name, bool only_source) { + return RunOnBothClusters([&](Cluster* cluster) -> Status { + if (cluster == &consumer_cluster_ && only_source) { + return Status::OK(); + } + for (const auto& table_name : + {xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) { + auto yb_table_name = VERIFY_RESULT( + GetYsqlTable(cluster, db_name, xcluster::kDDLQueuePgSchemaName, table_name)); + std::shared_ptr table; + RETURN_NOT_OK(cluster->client_->OpenTable(yb_table_name, &table)); + SCHECK_EQ(table->GetPartitionCount(), 1, IllegalState, "Expected 1 tablet"); + } + return Status::OK(); + }); +} + +Status XClusterYsqlTestBase::VerifyDDLExtensionTablesDeletion( + const NamespaceName& db_name, bool only_source) { + return RunOnBothClusters([&](Cluster* cluster) -> Status { + if (cluster == &consumer_cluster_ && only_source) { + return Status::OK(); + } + for (const auto& table_name : + {xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) { + auto yb_table_name_result = + GetYsqlTable(cluster, db_name, xcluster::kDDLQueuePgSchemaName, table_name); + SCHECK(!yb_table_name_result.ok(), IllegalState, "Table $0 should not exist", table_name); + SCHECK( + yb_table_name_result.status().IsNotFound(), IllegalState, "Table $0 should not exist: $1", + table_name, yb_table_name_result.status()); + } + return Status::OK(); + }); +} + } // namespace yb diff --git a/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h b/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h index 0cc328b525a0..014919c37a81 100644 --- a/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h +++ b/src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h @@ -71,6 +71,9 @@ class XClusterYsqlTestBase : public XClusterTestBase { Result GetUniverseId(Cluster* cluster); Result GetClusterConfig(Cluster& cluster); + Result> CreateDatabaseOnBothClusters( + const NamespaceName& db_name); + Result CreateYsqlTable( Cluster* cluster, const std::string& namespace_name, @@ -171,6 +174,9 @@ class XClusterYsqlTestBase : public XClusterTestBase { Status WaitForCreateReplicationToFinish( const std::string& target_master_addresses, std::vector namespace_names = {}); + Status VerifyDDLExtensionTablesCreation(const NamespaceName& db_name, bool only_source = false); + Status VerifyDDLExtensionTablesDeletion(const NamespaceName& db_name, bool only_source = false); + protected: void TestReplicationWithSchemaChanges(TableId producer_table_id, bool bootstrap); diff --git a/src/yb/master/xcluster/master_xcluster_util.cc b/src/yb/master/xcluster/master_xcluster_util.cc index ec9e5266f022..9babc2ae7b79 100644 --- a/src/yb/master/xcluster/master_xcluster_util.cc +++ b/src/yb/master/xcluster/master_xcluster_util.cc @@ -19,6 +19,8 @@ #include "yb/master/catalog_manager.h" #include "yb/master/catalog_manager_util.h" +DECLARE_uint32(xcluster_ysql_statement_timeout_sec); + namespace yb::master { static const auto kXClusterDDLExtensionName = xcluster::kDDLQueuePgSchemaName; @@ -140,7 +142,6 @@ Status SetupDDLReplicationExtension( } else { // We could have older data in the table due to a backup restore from the source universe. // So, we drop the extension and recreate it so that we start with empty tables. - // TODO(#19185) Revisit this GUC disabling when we implement proper extension shutdown. statements.push_back(Format("SET $0.replication_role = DISABLED", kXClusterDDLExtensionName)); statements.push_back(Format("DROP EXTENSION IF EXISTS $0", kXClusterDDLExtensionName)); statements.push_back(Format("CREATE EXTENSION $0", kXClusterDDLExtensionName)); @@ -155,4 +156,25 @@ Status SetupDDLReplicationExtension( database_name, statements, catalog_manager, deadline, std::move(callback)); } +Status DropDDLReplicationExtension( + CatalogManagerIf& catalog_manager, const NamespaceId& namespace_id, + StdStatusCallback callback) { + auto namespace_name = VERIFY_RESULT(catalog_manager.FindNamespaceById(namespace_id))->name(); + LOG(INFO) << "Dropping " << kXClusterDDLExtensionName << " extension for namespace " + << namespace_id << " (" << namespace_name << ")"; + std::vector statements; + // Disable the extension first to prevent any conflicts with later setups. + statements.push_back(Format( + "ALTER DATABASE \"$0\" SET $1.replication_role = DISABLED", namespace_name, + kXClusterDDLExtensionName)); + // Also disable for the current session. + statements.push_back(Format("SET $0.replication_role = DISABLED", kXClusterDDLExtensionName)); + statements.push_back(Format("DROP EXTENSION IF EXISTS $0", kXClusterDDLExtensionName)); + + return ExecutePgsqlStatements( + namespace_name, statements, catalog_manager, + CoarseMonoClock::now() + MonoDelta::FromSeconds(FLAGS_xcluster_ysql_statement_timeout_sec), + std::move(callback)); +} + } // namespace yb::master diff --git a/src/yb/master/xcluster/master_xcluster_util.h b/src/yb/master/xcluster/master_xcluster_util.h index e91c37f539c7..31242505673d 100644 --- a/src/yb/master/xcluster/master_xcluster_util.h +++ b/src/yb/master/xcluster/master_xcluster_util.h @@ -64,4 +64,7 @@ Status SetupDDLReplicationExtension( CatalogManagerIf& catalog_manager, const std::string& database_name, XClusterDDLReplicationRole role, CoarseTimePoint deadline, StdStatusCallback callback); +Status DropDDLReplicationExtension( + CatalogManagerIf& catalog_manager, const NamespaceId& namespace_id, StdStatusCallback callback); + } // namespace yb::master diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc b/src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc index 2b3dab374dc2..785da41eeca7 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group-test.cc @@ -34,6 +34,7 @@ DECLARE_bool(TEST_block_xcluster_checkpoint_namespace_task); DECLARE_bool(TEST_xcluster_enable_ddl_replication); DECLARE_bool(TEST_xcluster_enable_sequence_replication); +using namespace std::chrono_literals; using namespace std::placeholders; using testing::_; using testing::AtLeast; @@ -130,7 +131,7 @@ const PgSchemaName kPgSchemaName = "public", kPgSchemaName2 = "public2"; const xcluster::ReplicationGroupId kReplicationGroupId = xcluster::ReplicationGroupId("rg1"); const TableName kTableName1 = "table1", kTableName2 = "table2"; const TableId kTableId1 = "table_id_1", kTableId2 = "table_id_2"; -const MonoDelta kTimeout = MonoDelta::FromSeconds(5 * kTimeMultiplier); +const MonoDelta kTimeout = 5s * kTimeMultiplier; class XClusterOutboundReplicationGroupMockedTest : public YBTest { public: @@ -379,6 +380,17 @@ class XClusterOutboundReplicationGroupMockedTest : public YBTest { callback(Status::OK()); return Status::OK(); }, + .drop_ddl_replication_extension_func = + [this]( + const NamespaceId& namespace_id, + const xcluster::ReplicationGroupId& drop_replication_group_id) -> Status { + SCHECK(UseAutomaticMode(), InternalError, "Should only be called in automatic mode"); + for (const auto& table_name : + {xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) { + DropTable(namespace_id, table_name); + } + return Status::OK(); + }, }; Result> GetNamespace(const NamespaceIdentifierPB& ns_identifier) { @@ -432,6 +444,20 @@ class XClusterOutboundReplicationGroupMockedTest : public YBTest { EXPECT_TRUE(table_ids.contains(table_id1)); EXPECT_TRUE(table_ids.contains(table_id2)); } + + Status VerifyExtensionTablesDeleted(const NamespaceId namespace_id) { + if (!UseAutomaticMode()) { + return Status::OK(); + } + for (const auto& table_name : + {xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) { + if (TableExists(namespace_id, table_name)) { + return STATUS_FORMAT( + InternalError, "Table $0 still exists in namespace $1", table_name, namespace_id); + } + } + return Status::OK(); + } }; class XClusterOutboundReplicationGroupMockedParameterized @@ -483,6 +509,7 @@ TEST_P(XClusterOutboundReplicationGroupMockedParameterized, TestMultipleTable) { ASSERT_NOK(result); ASSERT_TRUE(result.status().IsNotFound()); ASSERT_TRUE(outbound_rg.IsDeleted()); + ASSERT_OK(VerifyExtensionTablesDeleted(kNamespaceId)); // We should have 0 streams now. ASSERT_TRUE(xcluster_streams.empty()); @@ -574,6 +601,7 @@ TEST_P(XClusterOutboundReplicationGroupMockedParameterized, AddDeleteNamespaces) ASSERT_NOK(outbound_rg.GetNamespaceCheckpointInfo(kNamespaceId)); ASSERT_NOK(outbound_rg.GetNamespaceCheckpointInfo(namespace_id_2)); ASSERT_TRUE(xcluster_streams.empty()); + ASSERT_OK(VerifyExtensionTablesDeleted(kNamespaceId)); } TEST_P(XClusterOutboundReplicationGroupMockedParameterized, CreateTargetReplicationGroup) { diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc index 8ec53e2caac6..93b40bf7c375 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group.cc +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group.cc @@ -564,6 +564,12 @@ Status XClusterOutboundReplicationGroup::RemoveNamespace( RETURN_NOT_OK(DeleteNamespaceStreams(epoch, namespace_id, outbound_group_pb)); + if (outbound_group_pb.automatic_ddl_mode()) { + // Need to drop the DDL Replication extension for automatic mode. + RETURN_NOT_OK(helper_functions_.drop_ddl_replication_extension_func( + namespace_id, outbound_rg_info_->ReplicationGroupId())); + } + outbound_group_pb.mutable_namespace_infos()->erase(namespace_id); return Upsert(l, epoch); @@ -595,6 +601,11 @@ Status XClusterOutboundReplicationGroup::Delete( for (const auto& [namespace_id, _] : *outbound_group_pb.mutable_namespace_infos()) { RETURN_NOT_OK(DeleteNamespaceStreams(epoch, namespace_id, outbound_group_pb)); + if (outbound_group_pb.automatic_ddl_mode()) { + // Need to drop the DDL Replication extension for automatic mode. + RETURN_NOT_OK(helper_functions_.drop_ddl_replication_extension_func( + namespace_id, outbound_rg_info_->ReplicationGroupId())); + } } outbound_group_pb.mutable_namespace_infos()->clear(); outbound_group_pb.set_state(SysXClusterOutboundReplicationGroupEntryPB::DELETED); diff --git a/src/yb/master/xcluster/xcluster_outbound_replication_group.h b/src/yb/master/xcluster/xcluster_outbound_replication_group.h index f28691da93e9..1a4faecc8d24 100644 --- a/src/yb/master/xcluster/xcluster_outbound_replication_group.h +++ b/src/yb/master/xcluster/xcluster_outbound_replication_group.h @@ -64,6 +64,8 @@ class XClusterOutboundReplicationGroup delete_from_sys_catalog_func; const std::function setup_ddl_replication_extension_func; + const std::function + drop_ddl_replication_extension_func; }; explicit XClusterOutboundReplicationGroup( diff --git a/src/yb/master/xcluster/xcluster_replication_group.cc b/src/yb/master/xcluster/xcluster_replication_group.cc index 71e4b8df51b1..9ba6d0375f2f 100644 --- a/src/yb/master/xcluster/xcluster_replication_group.cc +++ b/src/yb/master/xcluster/xcluster_replication_group.cc @@ -28,6 +28,7 @@ #include "yb/master/xcluster/xcluster_manager_if.h" #include "yb/master/xcluster_rpc_tasks.h" +#include "yb/util/async_util.h" #include "yb/util/flags/auto_flags_util.h" #include "yb/util/is_operation_done_result.h" #include "yb/util/result.h" @@ -477,6 +478,7 @@ Status RemoveNamespaceFromReplicationGroup( auto l = universe->LockForWrite(); auto& universe_pb = l.mutable_data()->pb; + auto is_automatic_ddl_mode = IsAutomaticDdlMode(universe_pb); NamespaceId consumer_namespace_id; auto* namespace_infos = universe_pb.mutable_db_scoped_info()->mutable_namespace_infos(); @@ -509,8 +511,19 @@ Status RemoveNamespaceFromReplicationGroup( // For DB Scoped replication the streams will be cleaned up when the namespace is removed from // the outbound replication group on the source. - return RemoveTablesFromReplicationGroupInternal( - *universe, l, producer_table_ids, catalog_manager, epoch, /*cleanup_source_streams=*/false); + RETURN_NOT_OK(RemoveTablesFromReplicationGroupInternal( + *universe, l, producer_table_ids, catalog_manager, epoch, /*cleanup_source_streams=*/false)); + + if (is_automatic_ddl_mode) { + // Need to drop the DDL Replication extension for automatic mode. + // We don't support N:1 replication or daisy-chaining, so we can safely drop on the target. + Synchronizer sync; + RETURN_NOT_OK(master::DropDDLReplicationExtension( + catalog_manager, consumer_namespace_id, + [&sync](const Status& status) { sync.AsStdStatusCallback()(status); })); + RETURN_NOT_OK(sync.Wait()); + } + return Status::OK(); } Status RemoveTablesFromReplicationGroup( @@ -734,6 +747,17 @@ Status DeleteUniverseReplication( } } + // For each namespace, also cleanup the DDL Replication extension if needed. + if (l->IsAutomaticDdlMode()) { + for (const auto& namespace_info : l->pb.db_scoped_info().namespace_infos()) { + Synchronizer sync; + RETURN_NOT_OK(master::DropDDLReplicationExtension( + catalog_manager, namespace_info.consumer_namespace_id(), + [&sync](const Status& status) { sync.AsStdStatusCallback()(status); })); + RETURN_NOT_OK(sync.Wait()); + } + } + if (PREDICT_FALSE(FLAGS_TEST_exit_unfinished_deleting)) { // Exit for testing services return Status::OK(); diff --git a/src/yb/master/xcluster/xcluster_source_manager.cc b/src/yb/master/xcluster/xcluster_source_manager.cc index e2140093ddf2..425301ad659a 100644 --- a/src/yb/master/xcluster/xcluster_source_manager.cc +++ b/src/yb/master/xcluster/xcluster_source_manager.cc @@ -241,6 +241,8 @@ XClusterSourceManager::InitOutboundReplicationGroup( }, .setup_ddl_replication_extension_func = std::bind(&XClusterSourceManager::SetupDDLReplicationExtension, this, _1, _2), + .drop_ddl_replication_extension_func = + std::bind(&XClusterSourceManager::DropDDLReplicationExtension, this, _1, _2), }; return std::make_shared( @@ -1225,7 +1227,7 @@ Status XClusterSourceManager::RepairOutboundReplicationGroupRemoveTable( } std::vector -XClusterSourceManager::GetXClusterOutboundReplicationGroups(NamespaceId namespace_filter) { +XClusterSourceManager::GetXClusterOutboundReplicationGroups(NamespaceId namespace_filter) const { std::vector replication_groups; for (const auto& outbound_group : GetAllOutboundGroups()) { if (namespace_filter.empty() || outbound_group->HasNamespace(namespace_filter)) { @@ -1284,4 +1286,23 @@ Status XClusterSourceManager::SetupDDLReplicationExtension( std::move(callback)); } +Status XClusterSourceManager::DropDDLReplicationExtension( + const NamespaceId& namespace_id, + const xcluster::ReplicationGroupId& drop_replication_group_id) const { + // Check that there are no other automatic mode replication groups for this namespace. + for (const auto& outbound_group : GetAllOutboundGroups()) { + if (outbound_group->Id() != drop_replication_group_id && outbound_group->AutomaticDDLMode() && + outbound_group->HasNamespace(namespace_id)) { + LOG(INFO) << "Skipping drop of DDL replication extension for namespace " << namespace_id + << " as it also belongs to replication group " << outbound_group->Id(); + return Status::OK(); + } + } + Synchronizer sync; + RETURN_NOT_OK(master::DropDDLReplicationExtension( + catalog_manager_, namespace_id, + [&sync](const Status& status) { sync.AsStdStatusCallback()(status); })); + return sync.Wait(); +} + } // namespace yb::master diff --git a/src/yb/master/xcluster/xcluster_source_manager.h b/src/yb/master/xcluster/xcluster_source_manager.h index 0a6a4f30485f..e86220fcf74c 100644 --- a/src/yb/master/xcluster/xcluster_source_manager.h +++ b/src/yb/master/xcluster/xcluster_source_manager.h @@ -164,7 +164,7 @@ class XClusterSourceManager { const LeaderEpoch& epoch); std::vector GetXClusterOutboundReplicationGroups( - NamespaceId namespace_filter); + NamespaceId namespace_filter) const; struct XClusterOutboundReplicationGroupUserInfo { std::unordered_map> @@ -252,6 +252,10 @@ class XClusterSourceManager { Status SetupDDLReplicationExtension( const NamespaceId& namespace_id, StdStatusCallback callback) const; + Status DropDDLReplicationExtension( + const NamespaceId& namespace_id, + const xcluster::ReplicationGroupId& drop_replication_group_id) const; + Master& master_; CatalogManager& catalog_manager_; SysCatalogTable& sys_catalog_;