Skip to content

Commit

Permalink
[#21337] DocDB: Move async_client_initializer to server
Browse files Browse the repository at this point in the history
Summary:
async_client_initializer can only be used within a server, so moving this to the correct location.
This also allows us to remove `server_process` from the `client` libraries dependency.
Reduced dependence on `AsyncClientInitializer` and replaced it with `std::shared_future<client::YBClient*>` wherever possible.

Removed ClientTest.BadMasterAddress that was added in D9326 to catch CDS initialization issues. We now use YbThread that handles these errors.

Fixes #21337
Jira: DB-10238

Test Plan: Jenkins Ready

Reviewers: mbautin, xCluster, sergei

Reviewed By: mbautin

Subscribers: yql, ycdcxcluster, bogdan, ybase

Differential Revision: https://phorge.dev.yugabyte.com/D32849
  • Loading branch information
hari90 committed Mar 7, 2024
1 parent 8e490d9 commit ac87b24
Show file tree
Hide file tree
Showing 30 changed files with 71 additions and 95 deletions.
5 changes: 4 additions & 1 deletion src/yb/cdc/cdc_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,8 @@
#include "yb/rpc/rpc_context.h"
#include "yb/rpc/rpc_controller.h"

#include "yb/server/async_client_initializer.h"

#include "yb/tablet/tablet_metadata.h"
#include "yb/tablet/tablet_peer.h"
#include "yb/tablet/transaction_participant.h"
Expand Down Expand Up @@ -747,7 +749,8 @@ CDCServiceImpl::CDCServiceImpl(
rate_limiter_(std::unique_ptr<rocksdb::RateLimiter>(rocksdb::NewGenericRateLimiter(
GetAtomicFlag(&FLAGS_xcluster_get_changes_max_send_rate_mbps) * 1_MB))),
impl_(new Impl(context_.get(), &mutex_)) {
cdc_state_table_ = std::make_unique<cdc::CDCStateTable>(impl_->async_client_init_.get());
cdc_state_table_ =
std::make_unique<cdc::CDCStateTable>(impl_->async_client_init_->get_client_future());

CHECK_OK(Thread::Create(
"cdc_service", "update_peers_and_metrics", &CDCServiceImpl::UpdatePeersAndMetrics, this,
Expand Down
1 change: 0 additions & 1 deletion src/yb/cdc/cdc_service.h
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
#include "yb/cdc/cdc_service.service.h"
#include "yb/cdc/cdc_types.h"
#include "yb/cdc/cdc_util.h"
#include "yb/client/async_initializer.h"

#include "yb/master/master_client.fwd.h"

Expand Down
11 changes: 5 additions & 6 deletions src/yb/cdc/cdc_state_table.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

#include "yb/cdc/cdc_state_table.h"

#include "yb/client/async_initializer.h"
#include "yb/client/client.h"
#include "yb/client/error.h"
#include "yb/client/schema.h"
Expand Down Expand Up @@ -271,9 +270,9 @@ Result<CDCStateTableEntry> DeserializeRow(
}
} // namespace

CDCStateTable::CDCStateTable(client::AsyncClientInitializer* async_client_init)
: async_client_init_(async_client_init) {
CHECK_NOTNULL(async_client_init);
CDCStateTable::CDCStateTable(std::shared_future<client::YBClient*> client_future)
: client_future_(std::move(client_future)) {
CHECK(client_future_.valid());
}

CDCStateTable::CDCStateTable(client::YBClient* client) : client_(client) { CHECK_NOTNULL(client); }
Expand Down Expand Up @@ -425,8 +424,8 @@ Result<std::shared_ptr<client::TableHandle>> CDCStateTable::GetTable() {

Result<client::YBClient*> CDCStateTable::GetClient() {
if (!client_) {
SCHECK_NOTNULL(async_client_init_);
client_ = async_client_init_->client();
CHECK(client_future_.valid());
client_ = client_future_.get();
}

SCHECK(client_, IllegalState, "CDC Client not initialized or shutting down");
Expand Down
4 changes: 2 additions & 2 deletions src/yb/cdc/cdc_state_table.h
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ class CDCStateTableRange;
// uses the YBClient and YBSession to access the table.
class CDCStateTable {
public:
explicit CDCStateTable(client::AsyncClientInitializer* async_client_init);
explicit CDCStateTable(std::shared_future<client::YBClient*> client_future);
explicit CDCStateTable(client::YBClient* client);

static const std::string& GetNamespaceName();
Expand Down Expand Up @@ -162,7 +162,7 @@ class CDCStateTable {
const std::vector<std::string>& keys_to_delete = {}) EXCLUDES(mutex_);

std::shared_mutex mutex_;
client::AsyncClientInitializer* async_client_init_ = nullptr;
std::shared_future<client::YBClient*> client_future_;
client::YBClient* client_ = nullptr;

std::shared_ptr<client::TableHandle> cdc_table_ GUARDED_BY(mutex_);
Expand Down
2 changes: 0 additions & 2 deletions src/yb/client/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@
set(YB_PCH_PREFIX client)

set(CLIENT_SRCS
async_initializer.cc
async_rpc.cc
batcher.cc
client.cc
Expand Down Expand Up @@ -76,7 +75,6 @@ set(CLIENT_LIBS
master_proto
master_rpc
master_util
server_process
tserver_proto
tserver_service_proto
pg_auto_analyze_service_proto
Expand Down
30 changes: 0 additions & 30 deletions src/yb/client/client-test.cc
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,6 @@

#include <gtest/gtest.h>

#include "yb/client/async_initializer.h"
#include "yb/client/client-internal.h"
#include "yb/client/client-test-util.h"
#include "yb/client/client.h"
Expand Down Expand Up @@ -100,7 +99,6 @@
#include "yb/util/backoff_waiter.h"
#include "yb/util/capabilities.h"
#include "yb/util/metrics.h"
#include "yb/util/net/dns_resolver.h"
#include "yb/util/net/sockaddr.h"
#include "yb/util/random_util.h"
#include "yb/util/status.h"
Expand Down Expand Up @@ -2673,34 +2671,6 @@ TEST_F(ClientTest, GetNamespaceInfo) {
ASSERT_TRUE(resp.colocated());
}

TEST_F(ClientTest, BadMasterAddress) {
auto messenger = ASSERT_RESULT(CreateMessenger("test-messenger"));
auto host = "should.not.resolve";

// Put host entry in cache.
ASSERT_NOK(messenger->resolver().Resolve(host));

{
struct TestServerOptions : public server::ServerBaseOptions {
TestServerOptions() : server::ServerBaseOptions(1) {}
};
TestServerOptions opts;
auto master_addr = std::make_shared<server::MasterAddresses>();
// Put several hosts, so resolve would take place.
master_addr->push_back({HostPort(host, 1)});
master_addr->push_back({HostPort(host, 2)});
opts.SetMasterAddresses(master_addr);

AsyncClientInitializer async_init(
"test-client", /* timeout= */ 1s, "UUID", &opts,
/* metric_entity= */ nullptr, /* parent_mem_tracker= */ nullptr, messenger.get());
async_init.Start();
async_init.get_client_future().wait_for(1s);
}

messenger->Shutdown();
}

TEST_F(ClientTest, RefreshPartitions) {
const auto kLookupTimeout = 10s;
const auto kNumLookupThreads = 2;
Expand Down
13 changes: 7 additions & 6 deletions src/yb/client/stateful_services/stateful_service_client_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -41,22 +41,23 @@ StatefulServiceClientBase::StatefulServiceClientBase(StatefulServiceKind service

StatefulServiceClientBase::~StatefulServiceClientBase() { Shutdown(); }

Status StatefulServiceClientBase::Init(tserver::TabletServer* server) {
Status StatefulServiceClientBase::Init(
const std::string& local_hosts, const std::vector<std::vector<HostPort>>& masters,
const std::string& root_dir) {
std::vector<std::string> addresses;
for (const auto& address : *server->options().GetMasterAddresses()) {
for (const auto& address : masters) {
for (const auto& host_port : address) {
addresses.push_back(host_port.ToString());
}
}
SCHECK(!addresses.empty(), InvalidArgument, "No master address found to StatefulServiceClient.");

const auto master_addresses = JoinStrings(addresses, ",");
auto local_hosts = server->options().HostsString();

std::lock_guard lock(mutex_);
rpc::MessengerBuilder messenger_builder(service_name_ + "_Client");
secure_context_ = VERIFY_RESULT(rpc::SetupInternalSecureContext(
local_hosts, server->fs_manager()->GetDefaultRootDir(), &messenger_builder));
secure_context_ =
VERIFY_RESULT(rpc::SetupInternalSecureContext(local_hosts, root_dir, &messenger_builder));

messenger_ = VERIFY_RESULT(messenger_builder.Build());

Expand All @@ -77,7 +78,7 @@ Status StatefulServiceClientBase::Init(tserver::TabletServer* server) {
return Status::OK();
}

Status StatefulServiceClientBase::TESTInit(
Status StatefulServiceClientBase::TEST_Init(
const std::string& local_host, const std::string& master_addresses) {
std::lock_guard lock(mutex_);
rpc::MessengerBuilder messenger_builder(service_name_ + "Client");
Expand Down
11 changes: 4 additions & 7 deletions src/yb/client/stateful_services/stateful_service_client_base.h
Original file line number Diff line number Diff line change
Expand Up @@ -27,10 +27,6 @@ namespace yb {

class HostPort;

namespace tserver {
class TabletServer;
}

namespace client {
class YBClient;

Expand Down Expand Up @@ -65,10 +61,11 @@ class StatefulServiceClientBase {

virtual ~StatefulServiceClientBase();

Status Init(tserver::TabletServer* server);
Status Init(
const std::string& local_hosts, const std::vector<std::vector<HostPort>>& master_addresses,
const std::string& root_dir);

Status TESTInit(
const std::string& local_host, const std::string& master_addresses);
Status TEST_Init(const std::string& local_host, const std::string& master_addresses);

void Shutdown();

Expand Down
5 changes: 4 additions & 1 deletion src/yb/common/common_types_util.cc
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,8 @@
#include <array>
#include <utility>

#include "yb/util/logging.h"
#include "yb/util/result.h"
#include "yb/util/strongly_typed_uuid.h"

using std::array;
using std::make_pair;
Expand Down Expand Up @@ -49,4 +50,6 @@ YQLDatabase DatabaseTypeByName(const string& db_type_name) {
return YQLDatabase::YQL_DATABASE_UNKNOWN;
}

YB_STRONGLY_TYPED_UUID_IMPL(UniverseUuid);

} // namespace yb
3 changes: 3 additions & 0 deletions src/yb/common/common_types_util.h
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
#pragma once

#include "yb/common/common_types.pb.h"
#include "yb/util/strongly_typed_uuid.h"

namespace yb {

Expand All @@ -28,4 +29,6 @@ const char* DatabaseTypeName(YQLDatabase db);
// Returns the db type from its string name.
YQLDatabase DatabaseTypeByName(const std::string& db_type_name);

YB_STRONGLY_TYPED_UUID_DECL(UniverseUuid);

} // namespace yb
1 change: 0 additions & 1 deletion src/yb/fs/fs_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -118,7 +118,6 @@ const char *FsManager::kWalsRecoveryDirSuffix = ".recovery";
const char *FsManager::kRocksDBDirName = "rocksdb";
const char *FsManager::kDataDirName = "data";

YB_STRONGLY_TYPED_UUID_IMPL(UniverseUuid);
namespace {

const char kRaftGroupMetadataDirName[] = "tablet-meta";
Expand Down
2 changes: 1 addition & 1 deletion src/yb/integration-tests/mini_cluster_base.cc
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,6 @@ Result<HostPort> MiniClusterBase::GetLeaderMasterBoundRpcAddr() {

Status MiniClusterBase::InitStatefulServiceClient(client::StatefulServiceClientBase* client) {
auto host_port = VERIFY_RESULT(GetLeaderMasterBoundRpcAddr());
return client->TESTInit("127.0.0.52", host_port.ToString());
return client->TEST_Init("127.0.0.52", host_port.ToString());
}
} // namespace yb
5 changes: 2 additions & 3 deletions src/yb/master/catalog_manager.cc
Original file line number Diff line number Diff line change
Expand Up @@ -1041,8 +1041,7 @@ Status CatalogManager::Init() {
}

ysql_transaction_ = std::make_unique<YsqlTransactionDdl>(
sys_catalog_.get(), master_->async_client_initializer().get_client_future(),
background_tasks_thread_pool_.get());
sys_catalog_.get(), master_->client_future(), background_tasks_thread_pool_.get());

// Initialize the metrics emitted by the catalog manager.
load_balance_policy_->InitMetrics();
Expand All @@ -1056,7 +1055,7 @@ Status CatalogManager::Init() {
metric_create_table_too_many_tablets_ =
METRIC_create_table_too_many_tablets.Instantiate(master_->metric_entity_cluster());

cdc_state_table_ = std::make_unique<cdc::CDCStateTable>(&master_->cdc_state_client_initializer());
cdc_state_table_ = std::make_unique<cdc::CDCStateTable>(master_->cdc_state_client_future());

RETURN_NOT_OK(xcluster_manager_->Init());

Expand Down
10 changes: 9 additions & 1 deletion src/yb/master/master.cc
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
#include "yb/master/master_auto_flags_manager.h"
#include "yb/util/logging.h"

#include "yb/client/async_initializer.h"
#include "yb/server/async_client_initializer.h"
#include "yb/client/client.h"

#include "yb/common/pg_catversions.h"
Expand Down Expand Up @@ -632,6 +632,14 @@ CloneStateManager* Master::clone_state_manager() const {

AutoFlagsConfigPB Master::GetAutoFlagsConfig() const { return auto_flags_manager_->GetConfig(); }

const std::shared_future<client::YBClient*>& Master::client_future() const {
return async_client_init_->get_client_future();
}

const std::shared_future<client::YBClient*>& Master::cdc_state_client_future() const {
return cdc_state_client_init_->get_client_future();
}

Status Master::get_ysql_db_oid_to_cat_version_info_map(
const tserver::GetTserverCatalogVersionInfoRequestPB& req,
tserver::GetTserverCatalogVersionInfoResponsePB *resp) const {
Expand Down
8 changes: 2 additions & 6 deletions src/yb/master/master.h
Original file line number Diff line number Diff line change
Expand Up @@ -184,13 +184,9 @@ class Master : public tserver::DbServerBase {
uint32_t GetAutoFlagConfigVersion() const override;
AutoFlagsConfigPB GetAutoFlagsConfig() const;

yb::client::AsyncClientInitializer& async_client_initializer() {
return *async_client_init_;
}
const std::shared_future<client::YBClient*>& client_future() const;

yb::client::AsyncClientInitializer& cdc_state_client_initializer() {
return *cdc_state_client_init_;
}
const std::shared_future<client::YBClient*>& cdc_state_client_future() const;

enum MasterMetricType {
TaskMetric,
Expand Down
3 changes: 1 addition & 2 deletions src/yb/master/master_tserver.cc
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@
#include <boost/preprocessor/cat.hpp>
#include <boost/preprocessor/stringize.hpp>

#include "yb/client/async_initializer.h"
#include "yb/common/pg_types.h"

#include "yb/master/catalog_manager_if.h"
Expand Down Expand Up @@ -162,7 +161,7 @@ Status MasterTabletServer::get_ysql_db_oid_to_cat_version_info_map(
}

const std::shared_future<client::YBClient*>& MasterTabletServer::client_future() const {
return master_->async_client_initializer().get_client_future();
return master_->client_future();
}

Status MasterTabletServer::GetLiveTServers(
Expand Down
12 changes: 4 additions & 8 deletions src/yb/master/sys_catalog.cc
Original file line number Diff line number Diff line change
Expand Up @@ -444,7 +444,7 @@ void SysCatalogTable::SysCatalogStateChanged(
}

if (context->reason == StateChangeReason::NEW_LEADER_ELECTED) {
auto client_future = master_->async_client_initializer().get_client_future();
auto client_future = master_->client_future();

// Check if client was already initialized, otherwise we don't have to refresh master leader,
// since it will be fetched as part of initialization.
Expand Down Expand Up @@ -551,14 +551,10 @@ void SysCatalogTable::SetupTabletPeer(const scoped_refptr<tablet::RaftGroupMetad
// TODO: handle crash mid-creation of tablet? do we ever end up with a
// partially created tablet here?
auto tablet_peer = std::make_shared<tablet::TabletPeer>(
metadata,
local_peer_pb_,
scoped_refptr<server::Clock>(master_->clock()),
metadata, local_peer_pb_, scoped_refptr<server::Clock>(master_->clock()),
metadata->fs_manager()->uuid(),
Bind(&SysCatalogTable::SysCatalogStateChanged, Unretained(this), metadata->raft_group_id()),
metric_registry_,
nullptr /* tablet_splitter */,
master_->async_client_initializer().get_client_future());
metric_registry_, nullptr /* tablet_splitter */, master_->client_future());

std::atomic_store(&tablet_peer_, tablet_peer);
}
Expand Down Expand Up @@ -599,7 +595,7 @@ Status SysCatalogTable::OpenTablet(const scoped_refptr<tablet::RaftGroupMetadata

tablet::TabletInitData tablet_init_data = {
.metadata = metadata,
.client_future = master_->async_client_initializer().get_client_future(),
.client_future = master_->client_future(),
.clock = scoped_refptr<server::Clock>(master_->clock()),
.parent_mem_tracker = mem_manager_->tablets_overhead_mem_tracker(),
.block_based_table_mem_tracker = mem_manager_->block_based_table_mem_tracker(),
Expand Down
5 changes: 2 additions & 3 deletions src/yb/master/xcluster/xcluster_safe_time_service.cc
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,6 @@

#include <chrono>

#include "yb/client/async_initializer.h"
#include "yb/client/client.h"
#include "yb/client/error.h"
#include "yb/client/schema.h"
Expand Down Expand Up @@ -504,7 +503,7 @@ Result<XClusterSafeTimeService::ProducerTabletToSafeTimeMap>
XClusterSafeTimeService::GetSafeTimeFromTable() {
ProducerTabletToSafeTimeMap tablet_safe_time;

auto* yb_client = master_->cdc_state_client_initializer().client();
auto* yb_client = master_->client_future().get();
if (!yb_client) {
return STATUS(IllegalState, "Client not initialized or shutting down");
}
Expand Down Expand Up @@ -649,7 +648,7 @@ Status XClusterSafeTimeService::CleanupEntriesFromTable(
return OK();
}

auto* ybclient = master_->cdc_state_client_initializer().client();
auto* ybclient = master_->client_future().get();
if (!ybclient) {
return STATUS(IllegalState, "Client not initialized or shutting down");
}
Expand Down
Loading

0 comments on commit ac87b24

Please sign in to comment.