Skip to content

Commit

Permalink
[#19185] xClusterDDLRepl: Drop extension on replication deletion
Browse files Browse the repository at this point in the history
Summary:
Add in hooks to drop the xcluster ddl replication extension on replication deletion / alter
replication remove namespace.

On the target side we always delete the extension since we do not support daisy chaining or N:1
replication.
On the source side, we first verify that no other automatic-mode outbound replication groups include
this namespace, then do the deletion.
Jira: DB-7987

Test Plan:
```
ybd --cxx-test xcluster_ddl_replication-test --gtest_filter "XClusterDDLReplicationTest.BasicSetupAlterTeardown"
```

Reviewers: hsunder, xCluster

Reviewed By: hsunder

Subscribers: ybase

Differential Revision: https://phorge.dev.yugabyte.com/D40628
  • Loading branch information
hulien22 committed Dec 15, 2024
1 parent 473c70d commit c909082
Show file tree
Hide file tree
Showing 11 changed files with 246 additions and 20 deletions.
77 changes: 64 additions & 13 deletions src/yb/integration-tests/xcluster/xcluster_ddl_replication-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@

#include "yb/cdc/xcluster_types.h"
#include "yb/client/table.h"
#include "yb/client/xcluster_client.h"
#include "yb/client/yb_table_name.h"
#include "yb/common/colocated_util.h"
#include "yb/common/common_types.pb.h"
Expand All @@ -34,7 +35,20 @@ namespace yb {

const MonoDelta kTimeout = 60s * kTimeMultiplier;

class XClusterDDLReplicationTest : public XClusterDDLReplicationTestBase {};
class XClusterDDLReplicationTest : public XClusterDDLReplicationTestBase {
public:
Status AddDatabaseToReplication(
const NamespaceId& source_db_id, const NamespaceId& target_db_id) {
auto source_xcluster_client = client::XClusterClient(*producer_client());
RETURN_NOT_OK(source_xcluster_client.AddNamespaceToOutboundReplicationGroup(
kReplicationGroupId, source_db_id));
auto bootstrap_required =
VERIFY_RESULT(IsXClusterBootstrapRequired(kReplicationGroupId, source_db_id));
SCHECK(!bootstrap_required, IllegalState, "Bootstrap should not be required");

return AddNamespaceToXClusterReplication(source_db_id, target_db_id);
}
};

// In automatic mode, sequences_data should have been created on both universe.
TEST_F(XClusterDDLReplicationTest, CheckSequenceDataTable) {
Expand All @@ -51,23 +65,60 @@ TEST_F(XClusterDDLReplicationTest, CheckSequenceDataTable) {
}));
}

TEST_F(XClusterDDLReplicationTest, CheckExtensionTableTabletCount) {
TEST_F(XClusterDDLReplicationTest, BasicSetupAlterTeardown) {
ASSERT_OK(SetUpClusters());
ASSERT_OK(CheckpointReplicationGroup());
ASSERT_OK(CreateReplicationFromCheckpoint());

auto source_xcluster_client = client::XClusterClient(*producer_client());
const auto target_master_address = consumer_cluster()->GetMasterAddresses();

// Ensure that tables are properly created with only one tablet each.
ASSERT_OK(RunOnBothClusters([&](Cluster* cluster) -> Status {
for (const auto& table_name :
{xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) {
auto yb_table_name = VERIFY_RESULT(
GetYsqlTable(cluster, namespace_name, xcluster::kDDLQueuePgSchemaName, table_name));
std::shared_ptr<client::YBTable> table;
RETURN_NOT_OK(cluster->client_->OpenTable(yb_table_name, &table));
SCHECK_EQ(table->GetPartitionCount(), 1, IllegalState, "Expected 1 tablet");
}
return Status::OK();
}));
ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name));

// Alter replication to add a new database.
const auto namespace_name2 = namespace_name + "2";
auto [source_db2_id, target_db2_id] =
ASSERT_RESULT(CreateDatabaseOnBothClusters(namespace_name2));
ASSERT_OK(AddDatabaseToReplication(source_db2_id, target_db2_id));
ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name2));

// Alter replication to remove the new database.
ASSERT_OK(source_xcluster_client.RemoveNamespaceFromOutboundReplicationGroup(
kReplicationGroupId, source_db2_id, target_master_address));
ASSERT_OK(VerifyDDLExtensionTablesDeletion(namespace_name2));
// First namespace should not be affected.
ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name));

// Add the second database to replication again to test dropping everything.
ASSERT_OK(AddDatabaseToReplication(source_db2_id, target_db2_id));
ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name2));

// Drop replication.
ASSERT_OK(source_xcluster_client.DeleteOutboundReplicationGroup(
kReplicationGroupId, target_master_address));

// Extension should no longer exist on either side.
ASSERT_OK(VerifyDDLExtensionTablesDeletion(namespace_name));
ASSERT_OK(VerifyDDLExtensionTablesDeletion(namespace_name2));
}

TEST_F(XClusterDDLReplicationTest, TestExtensionDeletionWithMultipleReplicationGroups) {
const xcluster::ReplicationGroupId kReplicationGroupId2("ReplicationGroup2");
ASSERT_OK(SetUpClusters());
ASSERT_OK(CheckpointReplicationGroup());
ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name, /*only_source=*/true));
ASSERT_OK(CheckpointReplicationGroup(kReplicationGroupId2));
ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name, /*only_source=*/true));

auto source_xcluster_client = client::XClusterClient(*producer_client());
ASSERT_OK(source_xcluster_client.DeleteOutboundReplicationGroup(
kReplicationGroupId, /*target_master_address*/ {}));
ASSERT_OK(VerifyDDLExtensionTablesCreation(namespace_name, /*only_source=*/true));

ASSERT_OK(source_xcluster_client.DeleteOutboundReplicationGroup(
kReplicationGroupId2, /*target_master_address*/ {}));
ASSERT_OK(VerifyDDLExtensionTablesDeletion(namespace_name, /*only_source=*/true));
}

TEST_F(XClusterDDLReplicationTest, DisableSplitting) {
Expand Down
56 changes: 55 additions & 1 deletion src/yb/integration-tests/xcluster/xcluster_ysql_test_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -357,6 +357,23 @@ Result<master::SysClusterConfigEntryPB> XClusterYsqlTestBase::GetClusterConfig(C
return resp.cluster_config();
}

Result<std::pair<NamespaceId, NamespaceId>> XClusterYsqlTestBase::CreateDatabaseOnBothClusters(
const NamespaceName& db_name) {
RETURN_NOT_OK(RunOnBothClusters([this, &db_name](Cluster* cluster) -> Status {
RETURN_NOT_OK(CreateDatabase(cluster, db_name));
auto table_name = VERIFY_RESULT(CreateYsqlTable(
cluster, db_name, "" /* schema_name */, "initial_table",
/*tablegroup_name=*/boost::none, /*num_tablets=*/1));
std::shared_ptr<client::YBTable> table;
RETURN_NOT_OK(cluster->client_->OpenTable(table_name, &table));
cluster->tables_.emplace_back(std::move(table));
return Status::OK();
}));
return std::make_pair(
VERIFY_RESULT(GetNamespaceId(producer_client(), db_name)),
VERIFY_RESULT(GetNamespaceId(consumer_client(), db_name)));
}

Result<YBTableName> XClusterYsqlTestBase::GetYsqlTable(
Cluster* cluster,
const std::string& namespace_name,
Expand Down Expand Up @@ -408,7 +425,7 @@ Result<YBTableName> XClusterYsqlTestBase::GetYsqlTable(
}
}
return STATUS(
IllegalState,
NotFound,
strings::Substitute("Unable to find table $0 in namespace $1", table_name, namespace_name));
}

Expand Down Expand Up @@ -1030,4 +1047,41 @@ Status XClusterYsqlTestBase::CreateReplicationFromCheckpoint(
return WaitForCreateReplicationToFinish(master_addr, namespace_names);
}

Status XClusterYsqlTestBase::VerifyDDLExtensionTablesCreation(
const NamespaceName& db_name, bool only_source) {
return RunOnBothClusters([&](Cluster* cluster) -> Status {
if (cluster == &consumer_cluster_ && only_source) {
return Status::OK();
}
for (const auto& table_name :
{xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) {
auto yb_table_name = VERIFY_RESULT(
GetYsqlTable(cluster, db_name, xcluster::kDDLQueuePgSchemaName, table_name));
std::shared_ptr<client::YBTable> table;
RETURN_NOT_OK(cluster->client_->OpenTable(yb_table_name, &table));
SCHECK_EQ(table->GetPartitionCount(), 1, IllegalState, "Expected 1 tablet");
}
return Status::OK();
});
}

Status XClusterYsqlTestBase::VerifyDDLExtensionTablesDeletion(
const NamespaceName& db_name, bool only_source) {
return RunOnBothClusters([&](Cluster* cluster) -> Status {
if (cluster == &consumer_cluster_ && only_source) {
return Status::OK();
}
for (const auto& table_name :
{xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) {
auto yb_table_name_result =
GetYsqlTable(cluster, db_name, xcluster::kDDLQueuePgSchemaName, table_name);
SCHECK(!yb_table_name_result.ok(), IllegalState, "Table $0 should not exist", table_name);
SCHECK(
yb_table_name_result.status().IsNotFound(), IllegalState, "Table $0 should not exist: $1",
table_name, yb_table_name_result.status());
}
return Status::OK();
});
}

} // namespace yb
6 changes: 6 additions & 0 deletions src/yb/integration-tests/xcluster/xcluster_ysql_test_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,9 @@ class XClusterYsqlTestBase : public XClusterTestBase {
Result<std::string> GetUniverseId(Cluster* cluster);
Result<master::SysClusterConfigEntryPB> GetClusterConfig(Cluster& cluster);

Result<std::pair<NamespaceId, NamespaceId>> CreateDatabaseOnBothClusters(
const NamespaceName& db_name);

Result<client::YBTableName> CreateYsqlTable(
Cluster* cluster,
const std::string& namespace_name,
Expand Down Expand Up @@ -171,6 +174,9 @@ class XClusterYsqlTestBase : public XClusterTestBase {
Status WaitForCreateReplicationToFinish(
const std::string& target_master_addresses, std::vector<NamespaceName> namespace_names = {});

Status VerifyDDLExtensionTablesCreation(const NamespaceName& db_name, bool only_source = false);
Status VerifyDDLExtensionTablesDeletion(const NamespaceName& db_name, bool only_source = false);

protected:
void TestReplicationWithSchemaChanges(TableId producer_table_id, bool bootstrap);

Expand Down
24 changes: 23 additions & 1 deletion src/yb/master/xcluster/master_xcluster_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@
#include "yb/master/catalog_manager.h"
#include "yb/master/catalog_manager_util.h"

DECLARE_uint32(xcluster_ysql_statement_timeout_sec);

namespace yb::master {

static const auto kXClusterDDLExtensionName = xcluster::kDDLQueuePgSchemaName;
Expand Down Expand Up @@ -140,7 +142,6 @@ Status SetupDDLReplicationExtension(
} else {
// We could have older data in the table due to a backup restore from the source universe.
// So, we drop the extension and recreate it so that we start with empty tables.
// TODO(#19185) Revisit this GUC disabling when we implement proper extension shutdown.
statements.push_back(Format("SET $0.replication_role = DISABLED", kXClusterDDLExtensionName));
statements.push_back(Format("DROP EXTENSION IF EXISTS $0", kXClusterDDLExtensionName));
statements.push_back(Format("CREATE EXTENSION $0", kXClusterDDLExtensionName));
Expand All @@ -155,4 +156,25 @@ Status SetupDDLReplicationExtension(
database_name, statements, catalog_manager, deadline, std::move(callback));
}

Status DropDDLReplicationExtension(
CatalogManagerIf& catalog_manager, const NamespaceId& namespace_id,
StdStatusCallback callback) {
auto namespace_name = VERIFY_RESULT(catalog_manager.FindNamespaceById(namespace_id))->name();
LOG(INFO) << "Dropping " << kXClusterDDLExtensionName << " extension for namespace "
<< namespace_id << " (" << namespace_name << ")";
std::vector<std::string> statements;
// Disable the extension first to prevent any conflicts with later setups.
statements.push_back(Format(
"ALTER DATABASE \"$0\" SET $1.replication_role = DISABLED", namespace_name,
kXClusterDDLExtensionName));
// Also disable for the current session.
statements.push_back(Format("SET $0.replication_role = DISABLED", kXClusterDDLExtensionName));
statements.push_back(Format("DROP EXTENSION IF EXISTS $0", kXClusterDDLExtensionName));

return ExecutePgsqlStatements(
namespace_name, statements, catalog_manager,
CoarseMonoClock::now() + MonoDelta::FromSeconds(FLAGS_xcluster_ysql_statement_timeout_sec),
std::move(callback));
}

} // namespace yb::master
3 changes: 3 additions & 0 deletions src/yb/master/xcluster/master_xcluster_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,4 +64,7 @@ Status SetupDDLReplicationExtension(
CatalogManagerIf& catalog_manager, const std::string& database_name,
XClusterDDLReplicationRole role, CoarseTimePoint deadline, StdStatusCallback callback);

Status DropDDLReplicationExtension(
CatalogManagerIf& catalog_manager, const NamespaceId& namespace_id, StdStatusCallback callback);

} // namespace yb::master
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,7 @@ DECLARE_bool(TEST_block_xcluster_checkpoint_namespace_task);
DECLARE_bool(TEST_xcluster_enable_ddl_replication);
DECLARE_bool(TEST_xcluster_enable_sequence_replication);

using namespace std::chrono_literals;
using namespace std::placeholders;
using testing::_;
using testing::AtLeast;
Expand Down Expand Up @@ -130,7 +131,7 @@ const PgSchemaName kPgSchemaName = "public", kPgSchemaName2 = "public2";
const xcluster::ReplicationGroupId kReplicationGroupId = xcluster::ReplicationGroupId("rg1");
const TableName kTableName1 = "table1", kTableName2 = "table2";
const TableId kTableId1 = "table_id_1", kTableId2 = "table_id_2";
const MonoDelta kTimeout = MonoDelta::FromSeconds(5 * kTimeMultiplier);
const MonoDelta kTimeout = 5s * kTimeMultiplier;

class XClusterOutboundReplicationGroupMockedTest : public YBTest {
public:
Expand Down Expand Up @@ -379,6 +380,17 @@ class XClusterOutboundReplicationGroupMockedTest : public YBTest {
callback(Status::OK());
return Status::OK();
},
.drop_ddl_replication_extension_func =
[this](
const NamespaceId& namespace_id,
const xcluster::ReplicationGroupId& drop_replication_group_id) -> Status {
SCHECK(UseAutomaticMode(), InternalError, "Should only be called in automatic mode");
for (const auto& table_name :
{xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) {
DropTable(namespace_id, table_name);
}
return Status::OK();
},
};

Result<scoped_refptr<NamespaceInfo>> GetNamespace(const NamespaceIdentifierPB& ns_identifier) {
Expand Down Expand Up @@ -432,6 +444,20 @@ class XClusterOutboundReplicationGroupMockedTest : public YBTest {
EXPECT_TRUE(table_ids.contains(table_id1));
EXPECT_TRUE(table_ids.contains(table_id2));
}

Status VerifyExtensionTablesDeleted(const NamespaceId namespace_id) {
if (!UseAutomaticMode()) {
return Status::OK();
}
for (const auto& table_name :
{xcluster::kDDLQueueTableName, xcluster::kDDLReplicatedTableName}) {
if (TableExists(namespace_id, table_name)) {
return STATUS_FORMAT(
InternalError, "Table $0 still exists in namespace $1", table_name, namespace_id);
}
}
return Status::OK();
}
};

class XClusterOutboundReplicationGroupMockedParameterized
Expand Down Expand Up @@ -483,6 +509,7 @@ TEST_P(XClusterOutboundReplicationGroupMockedParameterized, TestMultipleTable) {
ASSERT_NOK(result);
ASSERT_TRUE(result.status().IsNotFound());
ASSERT_TRUE(outbound_rg.IsDeleted());
ASSERT_OK(VerifyExtensionTablesDeleted(kNamespaceId));

// We should have 0 streams now.
ASSERT_TRUE(xcluster_streams.empty());
Expand Down Expand Up @@ -574,6 +601,7 @@ TEST_P(XClusterOutboundReplicationGroupMockedParameterized, AddDeleteNamespaces)
ASSERT_NOK(outbound_rg.GetNamespaceCheckpointInfo(kNamespaceId));
ASSERT_NOK(outbound_rg.GetNamespaceCheckpointInfo(namespace_id_2));
ASSERT_TRUE(xcluster_streams.empty());
ASSERT_OK(VerifyExtensionTablesDeleted(kNamespaceId));
}

TEST_P(XClusterOutboundReplicationGroupMockedParameterized, CreateTargetReplicationGroup) {
Expand Down
11 changes: 11 additions & 0 deletions src/yb/master/xcluster/xcluster_outbound_replication_group.cc
Original file line number Diff line number Diff line change
Expand Up @@ -564,6 +564,12 @@ Status XClusterOutboundReplicationGroup::RemoveNamespace(

RETURN_NOT_OK(DeleteNamespaceStreams(epoch, namespace_id, outbound_group_pb));

if (outbound_group_pb.automatic_ddl_mode()) {
// Need to drop the DDL Replication extension for automatic mode.
RETURN_NOT_OK(helper_functions_.drop_ddl_replication_extension_func(
namespace_id, outbound_rg_info_->ReplicationGroupId()));
}

outbound_group_pb.mutable_namespace_infos()->erase(namespace_id);

return Upsert(l, epoch);
Expand Down Expand Up @@ -595,6 +601,11 @@ Status XClusterOutboundReplicationGroup::Delete(

for (const auto& [namespace_id, _] : *outbound_group_pb.mutable_namespace_infos()) {
RETURN_NOT_OK(DeleteNamespaceStreams(epoch, namespace_id, outbound_group_pb));
if (outbound_group_pb.automatic_ddl_mode()) {
// Need to drop the DDL Replication extension for automatic mode.
RETURN_NOT_OK(helper_functions_.drop_ddl_replication_extension_func(
namespace_id, outbound_rg_info_->ReplicationGroupId()));
}
}
outbound_group_pb.mutable_namespace_infos()->clear();
outbound_group_pb.set_state(SysXClusterOutboundReplicationGroupEntryPB::DELETED);
Expand Down
2 changes: 2 additions & 0 deletions src/yb/master/xcluster/xcluster_outbound_replication_group.h
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,8 @@ class XClusterOutboundReplicationGroup
delete_from_sys_catalog_func;
const std::function<Status(const NamespaceId&, StdStatusCallback)>
setup_ddl_replication_extension_func;
const std::function<Status(const NamespaceId&, const xcluster::ReplicationGroupId&)>
drop_ddl_replication_extension_func;
};

explicit XClusterOutboundReplicationGroup(
Expand Down
Loading

0 comments on commit c909082

Please sign in to comment.