|
19 | 19 |
|
20 | 20 | #include "collection_id_cache_entry.hxx" |
21 | 21 | #include "core/app_telemetry_meter.hxx" |
| 22 | +#include "core/cluster_options.hxx" |
22 | 23 | #include "core/config_listener.hxx" |
23 | 24 | #include "core/document_id.hxx" |
24 | 25 | #include "core/error_context/key_value_error_map_info.hxx" |
@@ -319,11 +320,11 @@ class bucket_impl |
319 | 320 | { |
320 | 321 | if (req->key_.empty()) { |
321 | 322 | if (auto server = server_by_vbucket(req->vbucket_, req->replica_index_); server) { |
322 | | - return find_session_by_index(server.value()); |
| 323 | + return find_or_connect_session_by_index(server.value()); |
323 | 324 | } |
324 | 325 | } else if (auto [partition, server] = map_id(req->key_, req->replica_index_); server) { |
325 | 326 | req->vbucket_ = partition; |
326 | | - return find_session_by_index(server.value()); |
| 327 | + return find_or_connect_session_by_index(server.value()); |
327 | 328 | } |
328 | 329 | return std::nullopt; |
329 | 330 | } |
@@ -367,6 +368,53 @@ class bucket_impl |
367 | 368 | return { 0, std::nullopt }; |
368 | 369 | } |
369 | 370 |
|
| 371 | + void connect_session(std::size_t index) |
| 372 | + { |
| 373 | + const std::scoped_lock lock(config_mutex_, sessions_mutex_); |
| 374 | + if (!config_) { |
| 375 | + return; |
| 376 | + } |
| 377 | + |
| 378 | + const auto& node = config_->nodes[index]; |
| 379 | + |
| 380 | + const auto& hostname = node.hostname_for(origin_.options().network); |
| 381 | + auto port = node.port_or( |
| 382 | + origin_.options().network, service_type::key_value, origin_.options().enable_tls, 0); |
| 383 | + if (port == 0) { |
| 384 | + return; |
| 385 | + } |
| 386 | + |
| 387 | + const couchbase::core::origin origin(origin_.credentials(), hostname, port, origin_.options()); |
| 388 | + io::mcbp_session session = |
| 389 | + origin_.options().enable_tls |
| 390 | + ? io::mcbp_session( |
| 391 | + client_id_, node.node_uuid, ctx_, tls_, origin, state_listener_, name_, known_features_) |
| 392 | + : io::mcbp_session( |
| 393 | + client_id_, node.node_uuid, ctx_, origin, state_listener_, name_, known_features_); |
| 394 | + CB_LOG_DEBUG(R"({} rev={}, connect idx={}, session="{}", address="{}:{}")", |
| 395 | + log_prefix_, |
| 396 | + config_->rev_str(), |
| 397 | + node.index, |
| 398 | + session.id(), |
| 399 | + hostname, |
| 400 | + port); |
| 401 | + session.bootstrap( |
| 402 | + [self = shared_from_this(), session](std::error_code err, |
| 403 | + topology::configuration cfg) mutable -> void { |
| 404 | + if (err) { |
| 405 | + return self->remove_session(session.id()); |
| 406 | + } |
| 407 | + self->update_config(std::move(cfg)); |
| 408 | + session.on_configuration_update(self); |
| 409 | + session.on_stop([id = session.id(), self]() -> void { |
| 410 | + self->remove_session(id); |
| 411 | + }); |
| 412 | + self->drain_deferred_queue({}); |
| 413 | + }, |
| 414 | + true); |
| 415 | + sessions_.insert_or_assign(index, std::move(session)); |
| 416 | + } |
| 417 | + |
370 | 418 | void restart_sessions() |
371 | 419 | { |
372 | 420 | const std::scoped_lock lock(config_mutex_, sessions_mutex_); |
@@ -812,6 +860,10 @@ class bucket_impl |
812 | 860 | continue; |
813 | 861 | } |
814 | 862 |
|
| 863 | + if (origin_.options().enable_lazy_connections) { |
| 864 | + ++next_index; |
| 865 | + continue; |
| 866 | + } |
815 | 867 | const couchbase::core::origin origin( |
816 | 868 | origin_.credentials(), hostname, port, origin_.options()); |
817 | 869 | io::mcbp_session session = |
@@ -885,6 +937,16 @@ class bucket_impl |
885 | 937 | return {}; |
886 | 938 | } |
887 | 939 |
|
| 940 | + [[nodiscard]] auto find_or_connect_session_by_index(std::size_t index) |
| 941 | + -> std::optional<io::mcbp_session> |
| 942 | + { |
| 943 | + if (auto session = find_session_by_index(index); session) { |
| 944 | + return session; |
| 945 | + } |
| 946 | + connect_session(index); |
| 947 | + return {}; |
| 948 | + } |
| 949 | + |
888 | 950 | [[nodiscard]] auto next_session_index() -> std::size_t |
889 | 951 | { |
890 | 952 | const std::scoped_lock lock(sessions_mutex_); |
@@ -1206,4 +1268,10 @@ bucket::direct_re_queue(const std::shared_ptr<mcbp::queue_request>& req, bool is |
1206 | 1268 | { |
1207 | 1269 | return impl_->direct_re_queue(req, is_retry); |
1208 | 1270 | } |
| 1271 | + |
| 1272 | +void |
| 1273 | +bucket::connect_session(std::size_t index) |
| 1274 | +{ |
| 1275 | + return impl_->connect_session(index); |
| 1276 | +} |
1209 | 1277 | } // namespace couchbase::core |
0 commit comments