Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
72 changes: 70 additions & 2 deletions core/bucket.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

#include "collection_id_cache_entry.hxx"
#include "core/app_telemetry_meter.hxx"
#include "core/cluster_options.hxx"
#include "core/config_listener.hxx"
#include "core/document_id.hxx"
#include "core/error_context/key_value_error_map_info.hxx"
Expand Down Expand Up @@ -319,11 +320,11 @@ class bucket_impl
{
if (req->key_.empty()) {
if (auto server = server_by_vbucket(req->vbucket_, req->replica_index_); server) {
return find_session_by_index(server.value());
return find_or_connect_session_by_index(server.value());
}
} else if (auto [partition, server] = map_id(req->key_, req->replica_index_); server) {
req->vbucket_ = partition;
return find_session_by_index(server.value());
return find_or_connect_session_by_index(server.value());
}
return std::nullopt;
}
Expand Down Expand Up @@ -367,6 +368,53 @@ class bucket_impl
return { 0, std::nullopt };
}

void connect_session(std::size_t index)
{
const std::scoped_lock lock(config_mutex_, sessions_mutex_);
if (!config_) {
return;
}

const auto& node = config_->nodes[index];

const auto& hostname = node.hostname_for(origin_.options().network);
auto port = node.port_or(
origin_.options().network, service_type::key_value, origin_.options().enable_tls, 0);
if (port == 0) {
return;
}

const couchbase::core::origin origin(origin_.credentials(), hostname, port, origin_.options());
io::mcbp_session session =
origin_.options().enable_tls
? io::mcbp_session(
client_id_, node.node_uuid, ctx_, tls_, origin, state_listener_, name_, known_features_)
: io::mcbp_session(
client_id_, node.node_uuid, ctx_, origin, state_listener_, name_, known_features_);
CB_LOG_DEBUG(R"({} rev={}, connect idx={}, session="{}", address="{}:{}")",
log_prefix_,
config_->rev_str(),
node.index,
session.id(),
hostname,
port);
session.bootstrap(
[self = shared_from_this(), session](std::error_code err,
topology::configuration cfg) mutable -> void {
if (err) {
return self->remove_session(session.id());
}
self->update_config(std::move(cfg));
session.on_configuration_update(self);
session.on_stop([id = session.id(), self]() -> void {
self->remove_session(id);
});
self->drain_deferred_queue({});
},
true);
sessions_.insert_or_assign(index, std::move(session));
}

void restart_sessions()
{
const std::scoped_lock lock(config_mutex_, sessions_mutex_);
Expand Down Expand Up @@ -812,6 +860,10 @@ class bucket_impl
continue;
}

if (origin_.options().enable_lazy_connections) {
++next_index;
continue;
}
const couchbase::core::origin origin(
origin_.credentials(), hostname, port, origin_.options());
io::mcbp_session session =
Expand Down Expand Up @@ -885,6 +937,16 @@ class bucket_impl
return {};
}

[[nodiscard]] auto find_or_connect_session_by_index(std::size_t index)
-> std::optional<io::mcbp_session>
{
if (auto session = find_session_by_index(index); session) {
return session;
}
connect_session(index);
return {};
}

[[nodiscard]] auto next_session_index() -> std::size_t
{
const std::scoped_lock lock(sessions_mutex_);
Expand Down Expand Up @@ -1206,4 +1268,10 @@ bucket::direct_re_queue(const std::shared_ptr<mcbp::queue_request>& req, bool is
{
return impl_->direct_re_queue(req, is_retry);
}

void
bucket::connect_session(std::size_t index)
{
return impl_->connect_session(index);
}
} // namespace couchbase::core
5 changes: 5 additions & 0 deletions core/bucket.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,8 @@ public:
});
}

void connect_session(std::size_t index);

template<typename Request>
void map_and_send(std::shared_ptr<operations::mcbp_command<bucket, Request>> cmd)
{
Expand Down Expand Up @@ -142,6 +144,9 @@ public:
session.has_value() ? session->bootstrap_address() : "",
session.has_value() && session->has_config(),
config_rev());
if (!session) {
Copy link
Contributor

@thejcfactor thejcfactor Nov 10, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If lazy connections are not enabled, we want to defer the command still, right?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@thejcfactor right, we always defer the command. This change just ensures that the connection exist or pending.

connect_session(index);
}
return defer_command([self = shared_from_this(), cmd](std::error_code ec) {
if (ec == errc::common::request_canceled) {
return cmd->cancel(retry_reason::do_not_retry);
Expand Down
1 change: 1 addition & 0 deletions core/cluster_options.hxx
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,7 @@ public:
};
bool preserve_bootstrap_nodes_order{ false };
bool allow_enterprise_analytics{ false };
bool enable_lazy_connections{ false };
};

} // namespace couchbase::core
2 changes: 2 additions & 0 deletions core/utils/connection_string.cxx
Original file line number Diff line number Diff line change
Expand Up @@ -590,6 +590,8 @@ extract_options(connection_string& connstr)
parse_option(connstr.options.preserve_bootstrap_nodes_order, name, value, connstr.warnings);
} else if (name == "allow_enterprise_analytics") {
parse_option(connstr.options.allow_enterprise_analytics, name, value, connstr.warnings);
} else if (name == "enable_lazy_connections") {
parse_option(connstr.options.enable_lazy_connections, name, value, connstr.warnings);
} else {
connstr.warnings.push_back(
fmt::format(R"(unknown parameter "{}" in connection string (value "{}"))", name, value));
Expand Down
Loading