|
19 | 19 |
|
20 | 20 | #include "collection_id_cache_entry.hxx" |
21 | 21 | #include "core/app_telemetry_meter.hxx" |
| 22 | +#include "core/cluster_options.hxx" |
22 | 23 | #include "core/config_listener.hxx" |
23 | 24 | #include "core/document_id.hxx" |
24 | 25 | #include "core/error_context/key_value_error_map_info.hxx" |
@@ -367,6 +368,53 @@ class bucket_impl |
367 | 368 | return { 0, std::nullopt }; |
368 | 369 | } |
369 | 370 |
|
| 371 | + void connect_session(std::size_t index) |
| 372 | + { |
| 373 | + const std::scoped_lock lock(config_mutex_, sessions_mutex_); |
| 374 | + if (!config_) { |
| 375 | + return; |
| 376 | + } |
| 377 | + |
| 378 | + const auto& node = config_->nodes[index]; |
| 379 | + |
| 380 | + const auto& hostname = node.hostname_for(origin_.options().network); |
| 381 | + auto port = node.port_or( |
| 382 | + origin_.options().network, service_type::key_value, origin_.options().enable_tls, 0); |
| 383 | + if (port == 0) { |
| 384 | + return; |
| 385 | + } |
| 386 | + |
| 387 | + const couchbase::core::origin origin(origin_.credentials(), hostname, port, origin_.options()); |
| 388 | + io::mcbp_session session = |
| 389 | + origin_.options().enable_tls |
| 390 | + ? io::mcbp_session( |
| 391 | + client_id_, node.node_uuid, ctx_, tls_, origin, state_listener_, name_, known_features_) |
| 392 | + : io::mcbp_session( |
| 393 | + client_id_, node.node_uuid, ctx_, origin, state_listener_, name_, known_features_); |
| 394 | + CB_LOG_DEBUG(R"({} rev={}, connect idx={}, session="{}", address="{}:{}")", |
| 395 | + log_prefix_, |
| 396 | + config_->rev_str(), |
| 397 | + node.index, |
| 398 | + session.id(), |
| 399 | + hostname, |
| 400 | + port); |
| 401 | + session.bootstrap( |
| 402 | + [self = shared_from_this(), session](std::error_code err, |
| 403 | + topology::configuration cfg) mutable -> void { |
| 404 | + if (err) { |
| 405 | + return self->remove_session(session.id()); |
| 406 | + } |
| 407 | + self->update_config(std::move(cfg)); |
| 408 | + session.on_configuration_update(self); |
| 409 | + session.on_stop([id = session.id(), self]() -> void { |
| 410 | + self->remove_session(id); |
| 411 | + }); |
| 412 | + self->drain_deferred_queue({}); |
| 413 | + }, |
| 414 | + true); |
| 415 | + sessions_.insert_or_assign(index, std::move(session)); |
| 416 | + } |
| 417 | + |
370 | 418 | void restart_sessions() |
371 | 419 | { |
372 | 420 | const std::scoped_lock lock(config_mutex_, sessions_mutex_); |
@@ -812,50 +860,56 @@ class bucket_impl |
812 | 860 | continue; |
813 | 861 | } |
814 | 862 |
|
815 | | - const couchbase::core::origin origin( |
816 | | - origin_.credentials(), hostname, port, origin_.options()); |
817 | | - io::mcbp_session session = |
818 | | - origin_.options().enable_tls |
819 | | - ? io::mcbp_session(client_id_, |
820 | | - node.node_uuid, |
821 | | - ctx_, |
822 | | - tls_, |
823 | | - origin, |
824 | | - state_listener_, |
825 | | - name_, |
826 | | - known_features_) |
827 | | - : io::mcbp_session( |
828 | | - client_id_, node.node_uuid, ctx_, origin, state_listener_, name_, known_features_); |
829 | | - CB_LOG_DEBUG(R"({} rev={}, add session="{}", address="{}:{}", index={})", |
830 | | - log_prefix_, |
831 | | - config.rev_str(), |
832 | | - session.id(), |
833 | | - hostname, |
834 | | - port, |
835 | | - node.index); |
836 | | - session.bootstrap( |
837 | | - [self = shared_from_this(), session, idx = next_index]( |
838 | | - std::error_code err, topology::configuration cfg) mutable { |
839 | | - if (err) { |
840 | | - CB_LOG_WARNING( |
841 | | - R"({} failed to bootstrap session="{}", address="{}:{}", index={}, ec={})", |
842 | | - session.log_prefix(), |
843 | | - session.id(), |
844 | | - session.bootstrap_hostname(), |
845 | | - session.bootstrap_port(), |
846 | | - idx, |
847 | | - err.message()); |
848 | | - return self->remove_session(session.id()); |
849 | | - } |
850 | | - self->update_config(std::move(cfg)); |
851 | | - session.on_configuration_update(self); |
852 | | - session.on_stop([id = session.id(), self]() { |
853 | | - self->remove_session(id); |
854 | | - }); |
855 | | - self->drain_deferred_queue({}); |
856 | | - }, |
857 | | - true); |
858 | | - new_sessions.insert_or_assign(next_index, std::move(session)); |
| 863 | + if (!origin_.options().enable_lazy_connections) { |
| 864 | + const couchbase::core::origin origin( |
| 865 | + origin_.credentials(), hostname, port, origin_.options()); |
| 866 | + io::mcbp_session session = origin_.options().enable_tls |
| 867 | + ? io::mcbp_session(client_id_, |
| 868 | + node.node_uuid, |
| 869 | + ctx_, |
| 870 | + tls_, |
| 871 | + origin, |
| 872 | + state_listener_, |
| 873 | + name_, |
| 874 | + known_features_) |
| 875 | + : io::mcbp_session(client_id_, |
| 876 | + node.node_uuid, |
| 877 | + ctx_, |
| 878 | + origin, |
| 879 | + state_listener_, |
| 880 | + name_, |
| 881 | + known_features_); |
| 882 | + CB_LOG_DEBUG(R"({} rev={}, add session="{}", address="{}:{}", index={})", |
| 883 | + log_prefix_, |
| 884 | + config.rev_str(), |
| 885 | + session.id(), |
| 886 | + hostname, |
| 887 | + port, |
| 888 | + node.index); |
| 889 | + session.bootstrap( |
| 890 | + [self = shared_from_this(), session, idx = next_index]( |
| 891 | + std::error_code err, topology::configuration cfg) mutable { |
| 892 | + if (err) { |
| 893 | + CB_LOG_WARNING( |
| 894 | + R"({} failed to bootstrap session="{}", address="{}:{}", index={}, ec={})", |
| 895 | + session.log_prefix(), |
| 896 | + session.id(), |
| 897 | + session.bootstrap_hostname(), |
| 898 | + session.bootstrap_port(), |
| 899 | + idx, |
| 900 | + err.message()); |
| 901 | + return self->remove_session(session.id()); |
| 902 | + } |
| 903 | + self->update_config(std::move(cfg)); |
| 904 | + session.on_configuration_update(self); |
| 905 | + session.on_stop([id = session.id(), self]() { |
| 906 | + self->remove_session(id); |
| 907 | + }); |
| 908 | + self->drain_deferred_queue({}); |
| 909 | + }, |
| 910 | + true); |
| 911 | + new_sessions.insert_or_assign(next_index, std::move(session)); |
| 912 | + } |
859 | 913 | ++next_index; |
860 | 914 | } |
861 | 915 | std::swap(sessions_, new_sessions); |
@@ -1206,4 +1260,10 @@ bucket::direct_re_queue(const std::shared_ptr<mcbp::queue_request>& req, bool is |
1206 | 1260 | { |
1207 | 1261 | return impl_->direct_re_queue(req, is_retry); |
1208 | 1262 | } |
| 1263 | + |
| 1264 | +void |
| 1265 | +bucket::connect_session(std::size_t index) |
| 1266 | +{ |
| 1267 | + return impl_->connect_session(index); |
| 1268 | +} |
1209 | 1269 | } // namespace couchbase::core |
0 commit comments