diff --git a/CMakeLists.txt b/CMakeLists.txt index 974711d19f6a..66d509ed3a1d 100644 --- a/CMakeLists.txt +++ b/CMakeLists.txt @@ -143,6 +143,7 @@ option(USERVER_FEATURE_MYSQL "Provide asynchronous driver for MariaDB/MySQL" "${ option(USERVER_FEATURE_ROCKS "Provide asynchronous driver for Rocks" "${USERVER_LIB_ENABLED_DEFAULT}") option(USERVER_FEATURE_YDB "Provide asynchronous driver for YDB" "${USERVER_YDB_DEFAULT}") option(USERVER_FEATURE_OTLP "Provide asynchronous OTLP exporters" "${USERVER_LIB_ENABLED_DEFAULT}") +option(USERVER_FEATURE_ETCD "Provide asynchronous driver for etcd" "${USERVER_LIB_ENABLED_DEFAULT}") set(CMAKE_DEBUG_POSTFIX d) @@ -277,6 +278,11 @@ if (USERVER_FEATURE_YDB) list(APPEND USERVER_AVAILABLE_COMPONENTS ydb) endif() +if (USERVER_FEATURE_ETCD) + _require_userver_core("USERVER_FEATURE_ETCD") + add_subdirectory(etcd) +endif() + add_subdirectory(libraries) if (USERVER_BUILD_TESTS) diff --git a/cmake/install/userver-etcd-config.cmake b/cmake/install/userver-etcd-config.cmake new file mode 100644 index 000000000000..0d4826b3ab69 --- /dev/null +++ b/cmake/install/userver-etcd-config.cmake @@ -0,0 +1,11 @@ +include_guard(GLOBAL) + +if(userver_etcd_FOUND) + return() +endif() + +find_package(userver REQUIRED COMPONENTS + core +) + +set(userver_etcd_FOUND TRUE) diff --git a/etcd/CMakeLists.txt b/etcd/CMakeLists.txt new file mode 100644 index 000000000000..aced3e0e2523 --- /dev/null +++ b/etcd/CMakeLists.txt @@ -0,0 +1,6 @@ +project(userver-etcd CXX) + +userver_module(etcd + SOURCE_DIR "${CMAKE_CURRENT_SOURCE_DIR}" + UTEST_SOURCES "${CMAKE_CURRENT_SOURCE_DIR}/src/*_test.cpp" +) diff --git a/etcd/include/userver/etcd/client.hpp b/etcd/include/userver/etcd/client.hpp new file mode 100644 index 000000000000..3466002494b3 --- /dev/null +++ b/etcd/include/userver/etcd/client.hpp @@ -0,0 +1,39 @@ +#pragma once + +#include +#include +#include +#include +#include + +#include +#include +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +class Client { +public: + virtual ~Client() = default; + + virtual void Put(const std::string& key, const std::string& value) = 0; + + [[nodiscard]] virtual std::optional Get(const std::string& key) = 0; + + [[nodiscard]] virtual std::vector Range(const std::string& key) = 0; + + virtual void Delete(const std::string& key) = 0; + + virtual WatchListener StartWatch(const std::string& key) = 0; +}; + +using ClientPtr = std::shared_ptr; + +} // namespace etcd + +USERVER_NAMESPACE_END diff --git a/etcd/include/userver/etcd/component.hpp b/etcd/include/userver/etcd/component.hpp new file mode 100644 index 000000000000..48111eb626a3 --- /dev/null +++ b/etcd/include/userver/etcd/component.hpp @@ -0,0 +1,28 @@ +#pragma once + +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +class Component final : public components::ComponentBase { +public: + static constexpr std::string_view kName = "etcd-сlient"; + + Component(const components::ComponentConfig&, const components::ComponentContext&); + + static yaml_config::Schema GetStaticConfigSchema(); + + ClientPtr GetClient(); + +private: + const ClientPtr etcd_client_ptr_; +}; + +} // namespace etcd + +USERVER_NAMESPACE_END diff --git a/etcd/include/userver/etcd/exceptions.hpp b/etcd/include/userver/etcd/exceptions.hpp new file mode 100644 index 000000000000..a6215cb5fbf7 --- /dev/null +++ b/etcd/include/userver/etcd/exceptions.hpp @@ -0,0 +1,19 @@ +#pragma once + +#include + +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +/// @brief Base class for all etcd client exceptions +class EtcdError : public std::runtime_error { +public: + using std::runtime_error::runtime_error; +}; + +} // namespace etcd + +USERVER_NAMESPACE_END diff --git a/etcd/include/userver/etcd/settings.hpp b/etcd/include/userver/etcd/settings.hpp new file mode 100644 index 000000000000..4fbab86254fc --- /dev/null +++ b/etcd/include/userver/etcd/settings.hpp @@ -0,0 +1,27 @@ +#pragma once + +#include +#include +#include + +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +struct ClientSettings final { + const std::vector endpoints; + const std::uint32_t attempts; + const std::chrono::microseconds request_timeout_ms; +}; + +} // namespace etcd + +namespace formats::parse { + +etcd::ClientSettings Parse(const yaml_config::YamlConfig& value, To); + +} + +USERVER_NAMESPACE_END diff --git a/etcd/include/userver/etcd/watch_listener.hpp b/etcd/include/userver/etcd/watch_listener.hpp new file mode 100644 index 000000000000..b46a454ceb5a --- /dev/null +++ b/etcd/include/userver/etcd/watch_listener.hpp @@ -0,0 +1,32 @@ +#pragma once + +#include + +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +struct KeyValueEvent final { + std::string key; + std::string value; + std::int32_t version; +}; + +struct WatchListener final { + concurrent::SpscQueue::Consumer consumer; + + KeyValueEvent GetEvent(); +}; + +} // namespace etcd + +namespace formats::parse { + +etcd::KeyValueEvent Parse(const formats::json::Value& value, To); + +} + +USERVER_NAMESPACE_END diff --git a/etcd/library.yaml b/etcd/library.yaml new file mode 100644 index 000000000000..0988e0388455 --- /dev/null +++ b/etcd/library.yaml @@ -0,0 +1,9 @@ +project-name: userver-etcd +project-alt-names: + - yandex-userver-etcd +maintainers: + - Common components +description: Etcd driver + +libraries: + - userver-core diff --git a/etcd/src/etcd/client_impl.cpp b/etcd/src/etcd/client_impl.cpp new file mode 100644 index 000000000000..09f4af79525b --- /dev/null +++ b/etcd/src/etcd/client_impl.cpp @@ -0,0 +1,226 @@ +#include + +#include +#include +#include + +#include + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +namespace { + +const std::uint32_t kMinRetryStatusCode = 500; +const std::uint32_t kMaxRetryStatusCode = 599; + +const std::uint32_t kMinGoodStatusCode = 200; +const std::uint32_t kMaxGoodStatusCode = 299; + +const std::string kKeyPrefix = "/etcd/"; +const std::string kLastPossibleKeyPrefix = "/etcd0"; + +std::string BuildPutUrl(const std::string& service_url) { return fmt::format("{}/v3/kv/put", service_url); } + +std::string BuildPutData(const std::string& key, const std::string& value) { + const auto etcd_key = kKeyPrefix + key; + return formats::json::ToString(formats::json::MakeObject( + "key", crypto::base64::Base64Encode(etcd_key), "value", crypto::base64::Base64Encode(value) + )); +} + +std::string BuildRangeUrl(const std::string& service_url) { return fmt::format("{}/v3/kv/range", service_url); } + +std::string BuildRangeData(const std::string& key) { + const auto etcd_key = kKeyPrefix + key; + return formats::json::ToString(formats::json::MakeObject( + "key", crypto::base64::Base64Encode(etcd_key), "range_end", crypto::base64::Base64Encode(kLastPossibleKeyPrefix) + )); +} + +std::string BuildDeleteUrl(const std::string& service_url) { return fmt::format("{}/v3/kv/deleterange", service_url); } + +std::string BuildDeleteData(const std::string& key) { + const auto etcd_key = kKeyPrefix + key; + return formats::json::ToString(formats::json::MakeObject("key", crypto::base64::Base64Encode(etcd_key))); +} + +std::string BuildWatchUrl(const std::string& service_url) { return fmt::format("{}/v3/watch", service_url); } + +std::string BuildWatchData(const std::string& key) { + const auto etcd_key = kKeyPrefix + key; + return formats::json::ToString(formats::json::MakeObject( + "create_request", formats::json::MakeObject("key", crypto::base64::Base64Encode(etcd_key)) + )); +} + +bool ShouldRetry(const http::StatusCode status_code) { + return kMinRetryStatusCode <= status_code && status_code <= kMaxRetryStatusCode; +} + +void CheckResponseStatusCode(const http::StatusCode status_code) { + if (status_code < kMinGoodStatusCode || kMaxGoodStatusCode < status_code) { + throw EtcdError(fmt::format("Got bad status code from etcd: {}", status_code)); + } +} + +} // namespace + +namespace impl { + +ClientImpl::ClientImpl(clients::http::Client& http_client, ClientSettings settings) + : http_client_(http_client), settings_(settings) {} + +void ClientImpl::Put(const std::string& key, const std::string& value) { + auto response = PerformEtcdRequest(BuildPutUrl, BuildPutData(key, value)); +} + +std::optional ClientImpl::Get(const std::string& key) { + auto response = PerformEtcdRequest(BuildRangeUrl, BuildRangeData(key)); + + const auto json_body = formats::json::FromString(response->body()); + if (!json_body.HasMember("kvs")) { + return std::nullopt; + } + const auto& key_value_list = json_body["kvs"]; + const auto etcd_key = kKeyPrefix + key; + for (const auto& key_value : key_value_list) { + if (crypto::base64::Base64Decode(key_value["key"].As()) == etcd_key) { + return crypto::base64::Base64Decode(key_value["value"].As()); + } + } + return std::nullopt; +} + +std::vector ClientImpl::Range(const std::string& key) { + auto response = PerformEtcdRequest(BuildRangeUrl, BuildRangeData(key)); + + const auto json_body = formats::json::FromString(response->body()); + if (!json_body.HasMember("kvs")) { + return {}; + } + const auto& key_value_list = json_body["kvs"]; + std::vector values; + values.reserve(key_value_list.GetSize()); + for (const auto& key_value : key_value_list) { + values.push_back(crypto::base64::Base64Decode(key_value["value"].As())); + } + return values; +} + +void ClientImpl::Delete(const std::string& key) { + auto response = PerformEtcdRequest(BuildDeleteUrl, BuildDeleteData(key)); +} + +WatchListener ClientImpl::StartWatch(const std::string& key) { + auto queue = concurrent::SpscQueue::Create(); + + auto watch_queues_ptr = watch_queues_.Lock(); + watch_queues_ptr->push_back(queue); + + utils::Async("watch task", [&key, producer = queue->GetProducer(), this] mutable { + this->WatchKeyChanges(key, std::move(producer)); + }).Detach(); + + return WatchListener{.consumer = queue->GetConsumer()}; +} + +clients::http::StreamedResponse ClientImpl::PerformStreamedEtcdRequest( + const std::function& url_builder, + const std::string& data +) { + auto endpoints = settings_.endpoints; + utils::Shuffle(endpoints); + + std::optional maybe_streamed_response; + for (const auto& endpoint : endpoints) { + const auto queue = concurrent::StringStreamQueue::Create(); + maybe_streamed_response = http_client_.CreateRequest() + .post(url_builder(endpoint), data) + .retry(settings_.attempts) + .timeout(1'000'000'000) + .async_perform_stream_body(queue); + auto& streamed_response = maybe_streamed_response.value(); + if (!ShouldRetry(streamed_response.StatusCode())) { + CheckResponseStatusCode(streamed_response.StatusCode()); + return std::move(streamed_response); + } + } + if (maybe_streamed_response.has_value()) { + throw EtcdError( + "Failed to get Ok response from etcd with status code: " + maybe_streamed_response.value().StatusCode() + ); + } else { + throw EtcdError(fmt::format("Failed to get streamed response, number of etcd endpoints: {}", endpoints.size())); + } +} + +std::shared_ptr ClientImpl::PerformEtcdRequest( + const std::function& url_builder, + const std::string& data +) { + auto endpoints = settings_.endpoints; + utils::Shuffle(endpoints); + + std::shared_ptr response_ptr; + for (const auto& endpoint : endpoints) { + response_ptr = http_client_.CreateRequest() + .post(url_builder(endpoint), data) + .retry(settings_.attempts) + .timeout(settings_.request_timeout_ms.count()) + .perform(); + if (!ShouldRetry(response_ptr->status_code())) { + response_ptr->raise_for_status(); + return response_ptr; + } + } + + throw EtcdError("Failed to get Ok response from etcd with error: " + response_ptr->body()); +} + +void ClientImpl::WatchKeyChanges(const std::string& key, concurrent::SpscQueue::Producer producer) { + auto stream_response = PerformStreamedEtcdRequest(BuildWatchUrl, BuildWatchData(key)); + + std::string body_part; + while (stream_response.ReadChunk( + body_part, engine::Deadline::FromTimePoint(std::chrono::system_clock::time_point::max()) + )) { + const auto watch_response = formats::json::FromString(body_part); + LOG_DEBUG() << watch_response; + if (!watch_response["result"].HasMember("events")) { + LOG_DEBUG() << "No events in watch part response, skipping"; + continue; + } + for (const auto& event : watch_response["result"]["events"]) { + if (!event.HasMember("kv")) { + continue; + } + LOG_DEBUG() << "Got event with kv: " << event["kv"]; + if (!producer.Push(event["kv"].As())) { + LOG_ERROR() << "Could not push to queue, aborting task"; + return; + }; + } + } +} + +} // namespace impl + +} // namespace etcd + +USERVER_NAMESPACE_END diff --git a/etcd/src/etcd/client_impl.hpp b/etcd/src/etcd/client_impl.hpp new file mode 100644 index 000000000000..24cfe9e27d6f --- /dev/null +++ b/etcd/src/etcd/client_impl.hpp @@ -0,0 +1,50 @@ +#pragma once + +#include +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +namespace impl { + +class ClientImpl : public Client { +public: + ClientImpl(clients::http::Client& http_client, ClientSettings settings); + + void Put(const std::string& key, const std::string& value) override; + + [[nodiscard]] std::optional Get(const std::string& key) override; + + [[nodiscard]] std::vector Range(const std::string& key) override; + + void Delete(const std::string& key) override; + + WatchListener StartWatch(const std::string& key) override; + +private: + [[nodiscard]] std::shared_ptr + PerformEtcdRequest(const std::function& url_builder, const std::string& data); + + [[nodiscard]] clients::http::StreamedResponse PerformStreamedEtcdRequest( + const std::function& url_builder, + const std::string& data + ); + + void WatchKeyChanges(const std::string& key, concurrent::SpscQueue::Producer producer); + + using WatchQueuePtr = std::shared_ptr>; + clients::http::Client& http_client_; + concurrent::Variable> watch_queues_; + const ClientSettings settings_; +}; + +} // namespace impl + +} // namespace etcd + +USERVER_NAMESPACE_END diff --git a/etcd/src/etcd/component.cpp b/etcd/src/etcd/component.cpp new file mode 100644 index 000000000000..3f0f4d871601 --- /dev/null +++ b/etcd/src/etcd/component.cpp @@ -0,0 +1,47 @@ +#include + +#include +#include +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +Component::Component(const components::ComponentConfig& config, const components::ComponentContext& context) + : ComponentBase(config, context), + etcd_client_ptr_(std::make_shared( + context.FindComponent().GetHttpClient(), + config.As() + )) {} + +yaml_config::Schema Component::GetStaticConfigSchema() { + return yaml_config::MergeSchemas(R"( +type: object +description: Etcd cluster component +additionalProperties: false +properties: + endpoints: + type: array + description: Etcd endpoints + items: + type: string + description: host + attempts: + type: integer + description: > + Number of attempts per one endpoints, total number of attempts is number of endpoints times attempts + minimum: 1 + request_timeout_ms: + type: integer + description: Number of miliseconds between request attempts + minimum: 1 +)"); +} + +ClientPtr Component::GetClient() { return etcd_client_ptr_; } + +} // namespace etcd + +USERVER_NAMESPACE_END diff --git a/etcd/src/etcd/settings.cpp b/etcd/src/etcd/settings.cpp new file mode 100644 index 000000000000..d00e27446821 --- /dev/null +++ b/etcd/src/etcd/settings.cpp @@ -0,0 +1,31 @@ +#include + +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +namespace { + +constexpr std::uint32_t kDefaultAttempts{3}; +constexpr std::chrono::milliseconds kDefaultRequestTimeout{1'000}; + +} // namespace + +} // namespace etcd + +namespace formats::parse { + +etcd::ClientSettings Parse(const yaml_config::YamlConfig& cofig, To) { + return etcd::ClientSettings{ + .endpoints = cofig["endpoints"].As>(), + .attempts = cofig["attempts"].As(etcd::kDefaultAttempts), + .request_timeout_ms = cofig["request_timeout_ms"].As(etcd::kDefaultRequestTimeout), + }; +} + +} // namespace formats::parse + +USERVER_NAMESPACE_END diff --git a/etcd/src/etcd/watch_listener.cpp b/etcd/src/etcd/watch_listener.cpp new file mode 100644 index 000000000000..db1c5ca762e6 --- /dev/null +++ b/etcd/src/etcd/watch_listener.cpp @@ -0,0 +1,32 @@ +#include + +#include +#include + +USERVER_NAMESPACE_BEGIN + +namespace etcd { + +KeyValueEvent WatchListener::GetEvent() { + KeyValueEvent event; + if (!consumer.Pop(event)) { + throw EtcdError("Consumer pop failed"); + } + return event; +} + +} // namespace etcd + +namespace formats::parse { + +etcd::KeyValueEvent Parse(const formats::json::Value& value, To) { + return etcd::KeyValueEvent{ + .key = crypto::base64::Base64Decode(value["key"].As()), + .value = crypto::base64::Base64Decode(value["value"].As()), + .version = std::stoi(value["version"].As()), + }; +} + +} // namespace formats::parse + +USERVER_NAMESPACE_END diff --git a/etcd/tests/etcd_client_test.cpp b/etcd/tests/etcd_client_test.cpp new file mode 100644 index 000000000000..e69de29bb2d1 diff --git a/testsuite/pytest_plugins/pytest_userver/plugins/config.py b/testsuite/pytest_plugins/pytest_userver/plugins/config.py index a65ce4bfc6f0..8acc5d8df815 100644 --- a/testsuite/pytest_plugins/pytest_userver/plugins/config.py +++ b/testsuite/pytest_plugins/pytest_userver/plugins/config.py @@ -406,7 +406,7 @@ def allowed_url_prefixes_extra() -> typing.List[str]: @ingroup userver_testsuite_fixtures """ - return [] + return ["http://localhost:2379"] @pytest.fixture(scope='session') @@ -430,7 +430,7 @@ def patch_config(config, config_vars): return http_client = components['http-client'] or {} http_client['testsuite-enabled'] = True - http_client['testsuite-timeout'] = '10s' + # http_client['testsuite-timeout'] = '30s' allowed_urls = [mockserver_info.base_url] if mockserver_ssl_info: