Skip to content
Open
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
126 changes: 114 additions & 12 deletions apisix/discovery/eureka/init.lua
Original file line number Diff line number Diff line change
Expand Up @@ -29,10 +29,12 @@ local ngx_timer_every = ngx.timer.every
local string_sub = string.sub
local str_find = core.string.find
local log = core.log
local semaphore = require("ngx.semaphore")

local default_weight
local applications

local init_sema
local initial_fetched = false
Comment on lines +36 to +37
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you explain some of the functions of this?


local _M = {
version = 0.1,
Expand Down Expand Up @@ -140,31 +142,73 @@ local function parse_instance(instance)
end


local function build_endpoints()
local host_list = local_conf.discovery and local_conf.discovery.eureka and local_conf.discovery.eureka.host
if not host_list or #host_list == 0 then
log.error("do not set eureka.host")
return nil
end

local endpoints = core.table.new(#host_list, 0)
for _, h in ipairs(host_list) do
local url = h
local basic_auth
local auth_idx = str_find(url, "@")
if auth_idx then
local protocol_idx = str_find(url, "://")
local protocol = string_sub(url, 1, protocol_idx + 2)
local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1)
local other = string_sub(url, auth_idx + 1)
url = protocol .. other
basic_auth = "Basic " .. ngx.encode_base64(user_and_password)
end
if local_conf.discovery.eureka.prefix then
url = url .. local_conf.discovery.eureka.prefix
end
if string_sub(url, #url) ~= "/" then
url = url .. "/"
end
core.table.insert(endpoints, { url = url, auth = basic_auth })
end
return endpoints
end


local function fetch_full_registry(premature)
if premature then
return
end

local request_uri, basic_auth = service_info()
if not request_uri then
-- 遍历所有 eureka 端点,直到成功
local endpoints = build_endpoints()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it's better to execute build_endpoints directly during the init_worker phase.

if not endpoints or #endpoints == 0 then
return
end

local res, err = request(request_uri, basic_auth, "GET", "apps")
if not res then
log.error("failed to fetch registry", err)
return
local res, err
local used_endpoint
local start = math_random(#endpoints)
for i = 0, #endpoints - 1 do
local ep = endpoints[((start + i) % #endpoints) + 1]
log.info("eureka uri:", ep.url, ".")
local r, e = request(ep.url, ep.auth, "GET", "apps")
if r and r.body and r.status == 200 then
res = r
used_endpoint = ep
break
end
log.warn("failed to fetch registry from ", ep.url, ": ", e or (r and ("status=" .. tostring(r.status)) or "unknown"))
end

if not res.body or res.status ~= 200 then
log.error("failed to fetch registry, status = ", res.status)
if not res then
log.error("failed to fetch registry from all eureka hosts")
return
end

local json_str = res.body
local data, err = core.json.decode(json_str)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this change necessary?

local data, derr = core.json.decode(json_str)
if not data then
log.error("invalid response body: ", json_str, " err: ", err)
log.error("invalid response body: ", json_str, " err: ", derr)
return
end
local apps = data.applications.application
Expand All @@ -185,17 +229,33 @@ local function fetch_full_registry(premature)
metadata = metadata,
})
if metadata then
-- remove useless data
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why remove this?

metadata.weight = nil
end
end
end
end
applications = up_apps
log.info("successfully updated service registry, services count=",
core.table.nkeys(up_apps), "; source=", used_endpoint and used_endpoint.url or "unknown")
if not initial_fetched then
initial_fetched = true
if init_sema then
init_sema:post(1)
end
end
end


function _M.nodes(service_name)
if not applications then
if init_sema then
local ok, err = init_sema:wait(3)
if not ok then
log.warn("wait eureka initial fetch timeout: ", err)
end
end
end

if not applications then
log.error("failed to fetch nodes for : ", service_name)
return
Expand All @@ -205,11 +265,53 @@ function _M.nodes(service_name)
end


-- 注释掉文件缓存相关依赖与变量,避免写盘/读盘
-- local core_io = require("apisix.core.io")
-- local io_open = io.open
-- local cache_file = ngx.config.prefix() .. "logs/eureka_registry.json"
-- local save_registry_cache
-- local load_registry_cache
-- 将文件缓存函数整体注释掉,避免写盘与读盘
--[[
local function save_registry_cache(apps)
local body, err = core.json.encode({ applications = apps, ts = ngx.now() })
if not body then
log.error("encode eureka registry cache failed: ", err)
return
end
local f, ferr = io_open(cache_file, "w")
if not f then
log.error("open eureka registry cache file failed: ", ferr,
", path: ", cache_file)
return
end
f:write(body)
f:close()
end

local function load_registry_cache()
local body = core_io.get_file(cache_file)
if not body then
return nil, "no cache file"
end
local data, err = core.json.decode(body)
if not data or not data.applications then
return nil, "invalid cache format: " .. (err or "")
end
return data.applications
end
]]
-- 移除保存缓存调用
-- save_registry_cache(up_apps)


function _M.init_worker()
default_weight = local_conf.discovery.eureka.weight or 100
log.info("default_weight:", default_weight, ".")
local fetch_interval = local_conf.discovery.eureka.fetch_interval or 30
log.info("fetch_interval:", fetch_interval, ".")
init_sema = semaphore.new()

ngx_timer_at(0, fetch_full_registry)
ngx_timer_every(fetch_interval, fetch_full_registry)
end
Expand Down
Loading