Skip to content

Commit

Permalink
feat(gRPC): retry & healthcheck (#200)
Browse files Browse the repository at this point in the history
  • Loading branch information
spacewander authored Dec 15, 2022
1 parent 372c7c0 commit 89e048e
Show file tree
Hide file tree
Showing 3 changed files with 583 additions and 41 deletions.
173 changes: 132 additions & 41 deletions lib/resty/etcd/v3.lua
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ local now = ngx.now
local sub_str = string.sub
local str_byte = string.byte
local str_char = string.char
local str_find = string.find
local ipairs = ipairs
local pairs = pairs
local pcall = pcall
Expand Down Expand Up @@ -281,6 +282,45 @@ local function serialize_and_encode_base64(serialize_fn, data)
end


local function choose_grpc_endpoint(self)
local connect_opts = {
max_recv_msg_size = 2147483647,
}

local endpoint, err = choose_endpoint(self)
if not endpoint then
return nil, err, nil
end

if endpoint.scheme == "https" then
connect_opts.insecure = false
end

connect_opts.tls_verify = self.ssl_verify
connect_opts.client_cert = self.ssl_cert_path
connect_opts.client_key = self.ssl_key_path
connect_opts.trusted_ca = self.trusted_ca

local target
if self.unix_socket_proxy then
target = self.unix_socket_proxy
else
target = endpoint.address .. ":" .. endpoint.port
end

utils.log_info("etcd grpc connect to ", target)
local conn, err = self.grpc.connect(target, connect_opts)
if not conn then
return nil, err, endpoint.http_host
end

-- we disable health check when proxying via unix socket,
-- so the http_host will always point to a real address when the failure is reported
conn.http_host = endpoint.http_host
return conn
end


function _M.new(opts)
local timeout = opts.timeout
local ttl = opts.ttl
Expand Down Expand Up @@ -396,6 +436,7 @@ function _M.new(opts)

ssl_cert_path = opts.ssl_cert_path,
ssl_key_path = opts.ssl_key_path,
trusted_ca = opts.trusted_ca,
extra_headers = extra_headers,
sni = sni,
unix_socket_proxy = unix_socket_proxy,
Expand All @@ -418,48 +459,13 @@ function _M.new(opts)
cli.grpc = grpc
cli.call_opts = {}

local connect_opts = {
max_recv_msg_size = 2147483647,
}

local endpoint, err = choose_endpoint(cli)
if not endpoint then
return nil, err
end

if endpoint.scheme == "https" then
connect_opts.insecure = false
end

connect_opts.tls_verify = cli.ssl_verify
connect_opts.client_cert = cli.ssl_cert_path
connect_opts.client_key = cli.ssl_key_path
connect_opts.trusted_ca = opts.trusted_ca

local conn, err
if unix_socket_proxy then
conn, err = grpc.connect(unix_socket_proxy, connect_opts)
else
conn, err = grpc.connect(endpoint.address .. ":" .. endpoint.port, connect_opts)
end
local conn, err = choose_grpc_endpoint(cli)
if not conn then
return nil, err
end
cli.conn = conn

cli = setmetatable(cli, grpc_mt)

if cli.user then
local auth_req = {name = cli.user, password = cli.password}
local res, err = cli:grpc_call("etcdserverpb.Auth", "Authenticate", auth_req)
if not res then
return nil, err
end

cli.grpc_token = res.body.token
end

return cli
return setmetatable(cli, grpc_mt)
end

local sema, err = semaphore.new()
Expand Down Expand Up @@ -1017,6 +1023,17 @@ do
{"token", ""}
}
function get_grpc_metadata(self)
if not self.grpc_token and self.user then
local auth_req = {name = self.user, password = self.password}
local res, err = self:grpc_call("etcdserverpb.Auth",
"Authenticate", auth_req)
if not res then
return nil, err
end

self.grpc_token = res.body.token
end

if self.grpc_token then
metadata[1][2] = self.grpc_token
return metadata
Expand All @@ -1043,15 +1060,22 @@ function _grpc_M.create_grpc_watch_stream(self, key, attr, opts)
self.call_opts.timeout = self.timeout * 1000
end

self.call_opts.metadata = get_grpc_metadata(self)
local data, err = get_grpc_metadata(self)
if err then
return nil, err
end
self.call_opts.metadata = data

local st, err = conn:new_server_stream("etcdserverpb.Watch", "Watch", req, self.call_opts)
if not st then
-- report but don't retry by itself - APISIX will retry syncing after failed
health_check.report_failure(conn.http_host)
return nil, err
end

local res, err = st:recv()
if not res then
health_check.report_failure(conn.http_host)
return nil, err
end

Expand All @@ -1062,6 +1086,7 @@ end
function _grpc_M.read_grpc_watch_stream(self, watching_stream)
local res, err = watching_stream:recv()
if not res then
health_check.report_failure(self.conn.http_host)
return nil, err
end

Expand Down Expand Up @@ -1151,8 +1176,16 @@ function _grpc_M.convert_grpc_to_http_res(self, res)
end


local function filter_out_no_retry_err(err)
if str_find(err, "key is not provided", 1, true) then
return err
end

return nil
end


function _grpc_M.grpc_call(self, serv, meth, attr, key, val, opts)
local conn = self.conn
attr.key = key
if val then
attr.value = serialize_grpc_value(self.serializer.serialize, val)
Expand All @@ -1165,9 +1198,67 @@ function _grpc_M.grpc_call(self, serv, meth, attr, key, val, opts)
self.call_opts.timeout = self.timeout * 1000
end
self.call_opts.int64_encoding = self.grpc.INT64_AS_STRING
self.call_opts.metadata = get_grpc_metadata(self)

local res, err = conn:call(serv, meth, attr, self.call_opts)
if meth ~= "Authenticate" then
local data, err = get_grpc_metadata(self)
if err then
return nil, err
end
self.call_opts.metadata = data
end

local conn = self.conn
local http_host
local res, err
if health_check.conf.retry then
local max_retry = #self.endpoints * health_check.conf.max_fails + 1
for i = 1, max_retry do
if conn then
http_host = conn.http_host
res, err = conn:call(serv, meth, attr, self.call_opts)
if res then
self.conn = conn
break
end

if filter_out_no_retry_err(err) then
return nil, err
end
end

health_check.report_failure(http_host)

if i < max_retry then
utils.log_warn("Tried ", http_host, " failed: ",
err, ". Retrying")
end

conn, err, http_host = choose_grpc_endpoint(self)
if not conn and not http_host then
-- no endpoint can be retries
return nil, err
end
end
else
res, err = self.conn:call(serv, meth, attr, self.call_opts)
if not res then
if filter_out_no_retry_err(err) then
return nil, err
end

health_check.report_failure(self.conn.http_host)

local conn, new_err = choose_grpc_endpoint(self)
if not conn then
utils.log_info("failed to use next connection: ", new_err)
return nil, err
end

self.conn = conn
return nil, err
end
end

return self:convert_grpc_to_http_res(res), err
end

Expand Down
Empty file modified t/v3/add-auth.sh
100644 → 100755
Empty file.
Loading

0 comments on commit 89e048e

Please sign in to comment.