From 89e048e3cd3d4f13be79562bccc603d8d209f154 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E7=BD=97=E6=B3=BD=E8=BD=A9?= Date: Thu, 15 Dec 2022 09:47:58 +0800 Subject: [PATCH] feat(gRPC): retry & healthcheck (#200) --- lib/resty/etcd/v3.lua | 173 +++++++++++---- t/v3/add-auth.sh | 0 t/v3/grpc/health_check.t | 451 +++++++++++++++++++++++++++++++++++++++ 3 files changed, 583 insertions(+), 41 deletions(-) mode change 100644 => 100755 t/v3/add-auth.sh create mode 100644 t/v3/grpc/health_check.t diff --git a/lib/resty/etcd/v3.lua b/lib/resty/etcd/v3.lua index 4587e9d1..19708c11 100644 --- a/lib/resty/etcd/v3.lua +++ b/lib/resty/etcd/v3.lua @@ -13,6 +13,7 @@ local now = ngx.now local sub_str = string.sub local str_byte = string.byte local str_char = string.char +local str_find = string.find local ipairs = ipairs local pairs = pairs local pcall = pcall @@ -281,6 +282,45 @@ local function serialize_and_encode_base64(serialize_fn, data) end +local function choose_grpc_endpoint(self) + local connect_opts = { + max_recv_msg_size = 2147483647, + } + + local endpoint, err = choose_endpoint(self) + if not endpoint then + return nil, err, nil + end + + if endpoint.scheme == "https" then + connect_opts.insecure = false + end + + connect_opts.tls_verify = self.ssl_verify + connect_opts.client_cert = self.ssl_cert_path + connect_opts.client_key = self.ssl_key_path + connect_opts.trusted_ca = self.trusted_ca + + local target + if self.unix_socket_proxy then + target = self.unix_socket_proxy + else + target = endpoint.address .. ":" .. endpoint.port + end + + utils.log_info("etcd grpc connect to ", target) + local conn, err = self.grpc.connect(target, connect_opts) + if not conn then + return nil, err, endpoint.http_host + end + + -- we disable health check when proxying via unix socket, + -- so the http_host will always point to a real address when the failure is reported + conn.http_host = endpoint.http_host + return conn +end + + function _M.new(opts) local timeout = opts.timeout local ttl = opts.ttl @@ -396,6 +436,7 @@ function _M.new(opts) ssl_cert_path = opts.ssl_cert_path, ssl_key_path = opts.ssl_key_path, + trusted_ca = opts.trusted_ca, extra_headers = extra_headers, sni = sni, unix_socket_proxy = unix_socket_proxy, @@ -418,48 +459,13 @@ function _M.new(opts) cli.grpc = grpc cli.call_opts = {} - local connect_opts = { - max_recv_msg_size = 2147483647, - } - - local endpoint, err = choose_endpoint(cli) - if not endpoint then - return nil, err - end - - if endpoint.scheme == "https" then - connect_opts.insecure = false - end - - connect_opts.tls_verify = cli.ssl_verify - connect_opts.client_cert = cli.ssl_cert_path - connect_opts.client_key = cli.ssl_key_path - connect_opts.trusted_ca = opts.trusted_ca - - local conn, err - if unix_socket_proxy then - conn, err = grpc.connect(unix_socket_proxy, connect_opts) - else - conn, err = grpc.connect(endpoint.address .. ":" .. endpoint.port, connect_opts) - end + local conn, err = choose_grpc_endpoint(cli) if not conn then return nil, err end cli.conn = conn - cli = setmetatable(cli, grpc_mt) - - if cli.user then - local auth_req = {name = cli.user, password = cli.password} - local res, err = cli:grpc_call("etcdserverpb.Auth", "Authenticate", auth_req) - if not res then - return nil, err - end - - cli.grpc_token = res.body.token - end - - return cli + return setmetatable(cli, grpc_mt) end local sema, err = semaphore.new() @@ -1017,6 +1023,17 @@ do {"token", ""} } function get_grpc_metadata(self) + if not self.grpc_token and self.user then + local auth_req = {name = self.user, password = self.password} + local res, err = self:grpc_call("etcdserverpb.Auth", + "Authenticate", auth_req) + if not res then + return nil, err + end + + self.grpc_token = res.body.token + end + if self.grpc_token then metadata[1][2] = self.grpc_token return metadata @@ -1043,15 +1060,22 @@ function _grpc_M.create_grpc_watch_stream(self, key, attr, opts) self.call_opts.timeout = self.timeout * 1000 end - self.call_opts.metadata = get_grpc_metadata(self) + local data, err = get_grpc_metadata(self) + if err then + return nil, err + end + self.call_opts.metadata = data local st, err = conn:new_server_stream("etcdserverpb.Watch", "Watch", req, self.call_opts) if not st then + -- report but don't retry by itself - APISIX will retry syncing after failed + health_check.report_failure(conn.http_host) return nil, err end local res, err = st:recv() if not res then + health_check.report_failure(conn.http_host) return nil, err end @@ -1062,6 +1086,7 @@ end function _grpc_M.read_grpc_watch_stream(self, watching_stream) local res, err = watching_stream:recv() if not res then + health_check.report_failure(self.conn.http_host) return nil, err end @@ -1151,8 +1176,16 @@ function _grpc_M.convert_grpc_to_http_res(self, res) end +local function filter_out_no_retry_err(err) + if str_find(err, "key is not provided", 1, true) then + return err + end + + return nil +end + + function _grpc_M.grpc_call(self, serv, meth, attr, key, val, opts) - local conn = self.conn attr.key = key if val then attr.value = serialize_grpc_value(self.serializer.serialize, val) @@ -1165,9 +1198,67 @@ function _grpc_M.grpc_call(self, serv, meth, attr, key, val, opts) self.call_opts.timeout = self.timeout * 1000 end self.call_opts.int64_encoding = self.grpc.INT64_AS_STRING - self.call_opts.metadata = get_grpc_metadata(self) - local res, err = conn:call(serv, meth, attr, self.call_opts) + if meth ~= "Authenticate" then + local data, err = get_grpc_metadata(self) + if err then + return nil, err + end + self.call_opts.metadata = data + end + + local conn = self.conn + local http_host + local res, err + if health_check.conf.retry then + local max_retry = #self.endpoints * health_check.conf.max_fails + 1 + for i = 1, max_retry do + if conn then + http_host = conn.http_host + res, err = conn:call(serv, meth, attr, self.call_opts) + if res then + self.conn = conn + break + end + + if filter_out_no_retry_err(err) then + return nil, err + end + end + + health_check.report_failure(http_host) + + if i < max_retry then + utils.log_warn("Tried ", http_host, " failed: ", + err, ". Retrying") + end + + conn, err, http_host = choose_grpc_endpoint(self) + if not conn and not http_host then + -- no endpoint can be retries + return nil, err + end + end + else + res, err = self.conn:call(serv, meth, attr, self.call_opts) + if not res then + if filter_out_no_retry_err(err) then + return nil, err + end + + health_check.report_failure(self.conn.http_host) + + local conn, new_err = choose_grpc_endpoint(self) + if not conn then + utils.log_info("failed to use next connection: ", new_err) + return nil, err + end + + self.conn = conn + return nil, err + end + end + return self:convert_grpc_to_http_res(res), err end diff --git a/t/v3/add-auth.sh b/t/v3/add-auth.sh old mode 100644 new mode 100755 diff --git a/t/v3/grpc/health_check.t b/t/v3/grpc/health_check.t new file mode 100644 index 00000000..0ad58d15 --- /dev/null +++ b/t/v3/grpc/health_check.t @@ -0,0 +1,451 @@ +use Test::Nginx::Socket::Lua; + +log_level('info'); +no_long_string(); +repeat_each(1); +workers(2); + +my $etcd_version = `etcd --version`; +if ($etcd_version =~ /^etcd Version: 2/ || $etcd_version =~ /^etcd Version: 3.1./ || $etcd_version =~ /^etcd Version: 3.2./) { + plan(skip_all => "etcd is too old, skip v3 protocol"); +} else { + my $enable_tls = $ENV{ETCD_ENABLE_TLS}; + if (defined($enable_tls) && $enable_tls eq "TRUE") { + plan(skip_all => "skip test cases with auth when TLS is enabled"); + } else { + plan 'no_plan'; + } +} + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->main_config) { + $block->set_value("main_config", "thread_pool grpc-client-nginx-module threads=1;"); + } + + if (!$block->request) { + $block->set_value("request", "GET /t"); + } + + if ((!defined $block->error_log) && (!defined $block->no_error_log)) { + $block->set_value("no_error_log", "[error]"); + } + +}); + +our $HttpConfig = <<'_EOC_'; + lua_socket_log_errors off; + lua_package_path 'lib/?.lua;/usr/local/share/lua/5.3/?.lua;/usr/share/lua/5.1/?.lua;;'; + lua_shared_dict etcd_cluster_health_check 8m; + init_by_lua_block { + local cjson = require("cjson.safe") + + function check_res(data, err, val, status) + if err then + ngx.say("err: ", err) + ngx.exit(200) + end + + if val then + if data.body.kvs==nil then + ngx.exit(404) + end + if data.body.kvs and val ~= data.body.kvs[1].value then + ngx.say("failed to check value") + ngx.log(ngx.ERR, "failed to check value, got: ", data.body.kvs[1].value, + ", expect: ", val) + ngx.exit(200) + else + ngx.say("checked val as expect: ", val) + end + end + + if status and status ~= data.status then + ngx.exit(data.status) + end + end + } +_EOC_ + +run_tests(); + +__DATA__ + +=== TEST 1: sanity +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "etcd_cluster_health_check", + fail_timeout = 5, + max_fails = 3, + }) + assert( err == nil) + assert( health_check.conf ~= nil) + + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + use_grpc = true, + user = 'root', + password = 'abc123', + }) + check_res(etcd, err) + + ngx.say("done") + } + } +--- response_body +done + + + +=== TEST 2: default configuration +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "etcd_cluster_health_check", + }) + ngx.say(health_check.conf.max_fails) + ngx.say(health_check.conf.fail_timeout) + } + } +--- response_body +1 +10 + + + +=== TEST 3: bad shm_name +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "error_shm_name", + }) + ngx.say(err) + } + } +--- response_body +failed to get ngx.shared dict: error_shm_name + + + +=== TEST 4: trigger unhealthy with set +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "etcd_cluster_health_check", + fail_timeout = 10, + max_fails = 1, + }) + + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + use_grpc = true, + user = 'root', + password = 'abc123', + init_count = -1, + }) + + local res, err = etcd:set("/trigger_unhealthy", { a='abc'}) + ngx.say(err) + } + } +--- error_log eval +qr/update endpoint: http:\/\/127.0.0.1:42379 to unhealthy/ +--- response_body_like eval +qr/.* dial tcp 127.0.0.1:42379: connect: connection refused/ + + + +=== TEST 5: trigger unhealthy with watch +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "etcd_cluster_health_check", + fail_timeout = 10, + max_fails = 1, + }) + + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + use_grpc = true, + init_count = -1, + }) + + local body_chunk_fun, err = etcd:create_grpc_watch_stream("/trigger_unhealthy", {}) + if not body_chunk_fun then + ngx.say(err) + end + } + } +--- error_log eval +qr/update endpoint: http:\/\/127.0.0.1:42379 to unhealthy/ +--- response_body_like eval +qr/.* dial tcp 127.0.0.1:42379: connect: connection refused/ + + + +=== TEST 6: fault count +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "etcd_cluster_health_check", + fail_timeout = 5, + max_fails = 3, + }) + + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + use_grpc = true, + user = 'root', + password = 'abc123', + init_count = -1, + }) + + -- http://127.0.0.1:42379 will be selected only once + for i = 1, 4 do + etcd:set("/fault_count", { a='abc'}) + end + + -- here have actually been 5 reads and writes to etcd, including one to /auth/authenticate + + local fails, err = ngx.shared["etcd_cluster_health_check"]:get("http://127.0.0.1:42379") + if err then + ngx.say(err) + end + ngx.say(fails) + } + } +--- response_body +1 + + + +=== TEST 7: check endpoint is healthy +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "etcd_cluster_health_check", + fail_timeout = 3, + max_fails = 1, + }) + + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + use_grpc = true, + user = 'root', + password = 'abc123', + init_count = -1, + }) + + etcd:set("/get_target_status", { a='abc'}) + + local healthy = health_check.get_target_status("http://127.0.0.1:42379") + ngx.say(healthy) + } + } +--- response_body +false + + + +=== TEST 8: make sure `fail_timeout` works +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "etcd_cluster_health_check", + fail_timeout = 2, + max_fails = 1, + }) + + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + use_grpc = true, + user = 'root', + password = 'abc123', + init_count = -1, + }) + + local res, err + + -- make sure to select http://127.0.0.1:42379 once and trigger it to unhealthy + for i = 1, 3 do + res, err = etcd:set("/fail_timeout", "value") + end + + -- ensure that unhealthy http://127.0.0.1:42379 are no longer selected + for i = 1, 3 do + res, err = etcd:get("/fail_timeout") + assert(res, err) + assert(res.body.kvs[1].value == "value") + end + + ngx.say("done") + } + } +--- timeout: 5 +--- response_body +done +--- error_log +update endpoint: http://127.0.0.1:42379 to unhealthy + + + +=== TEST 9: has no healthy etcd endpoint, directly return an error message +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "etcd_cluster_health_check", + fail_timeout = 3, + max_fails = 1, + }) + + health_check.report_failure("http://127.0.0.1:12379") + health_check.report_failure("http://127.0.0.1:22379") + health_check.report_failure("http://127.0.0.1:32379") + + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + use_grpc = true, + user = 'root', + password = 'abc123', + }) + ngx.say(err) + } + } +--- response_body +has no healthy etcd endpoint available + + + +=== TEST 10: test if retry works +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check, err = require "resty.etcd.health_check" .init({ + shm_name = "etcd_cluster_health_check", + fail_timeout = 10, + max_fails = 1, + retry = true, + }) + + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:42379", + "http://127.0.0.1:52379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + use_grpc = true, + user = 'root', + password = 'abc123', + init_count = -1, + }) + + local res, err + for i = 1, 4 do + res, err = etcd:set("/trigger_unhealthy", "abc") + end + check_res(res, err) + local res, err = etcd:get("/trigger_unhealthy") + check_res(res, err, "abc") + -- unlike the retry in http version, we will use the same connection if the previous + -- call is successful + } + } +--- grep_error_log eval +qr/update endpoint: http:\/\/127.0.0.1:\d+ to unhealthy/ +--- grep_error_log_out +update endpoint: http://127.0.0.1:42379 to unhealthy +update endpoint: http://127.0.0.1:52379 to unhealthy +--- response_body +checked val as expect: abc + + + +=== TEST 11: ring balancer with specific init_count +--- http_config eval: $::HttpConfig +--- config + location /t { + content_by_lua_block { + local health_check = require("resty.etcd.health_check") + health_check.disable() + local etcd, err = require "resty.etcd" .new({ + protocol = "v3", + http_host = { + "http://127.0.0.1:12379", + "http://127.0.0.1:22379", + "http://127.0.0.1:32379", + }, + use_grpc = true, + init_count = 101, + }) + + local res + for i = 1, 3 do + res, err = etcd:set("/ring_balancer", "abc") + end + + ngx.say(etcd.init_count) + } + } +--- response_body +105 +--- error_log +choose endpoint: http://127.0.0.1:12379 +choose endpoint: http://127.0.0.1:22379