diff --git a/apisix/discovery/eureka/init.lua b/apisix/discovery/eureka/init.lua index df72a5269e59..68c0e43106bb 100644 --- a/apisix/discovery/eureka/init.lua +++ b/apisix/discovery/eureka/init.lua @@ -29,10 +29,12 @@ local ngx_timer_every = ngx.timer.every local string_sub = string.sub local str_find = core.string.find local log = core.log +local semaphore = require("ngx.semaphore") local default_weight local applications - +local init_sema +local initial_fetched = false local _M = { version = 0.1, @@ -140,31 +142,72 @@ local function parse_instance(instance) end +local function build_endpoints() + local host_list = local_conf.discovery and local_conf.discovery.eureka and local_conf.discovery.eureka.host + if not host_list or #host_list == 0 then + log.error("do not set eureka.host") + return nil + end + + local endpoints = core.table.new(#host_list, 0) + for _, h in ipairs(host_list) do + local url = h + local basic_auth + local auth_idx = str_find(url, "@") + if auth_idx then + local protocol_idx = str_find(url, "://") + local protocol = string_sub(url, 1, protocol_idx + 2) + local user_and_password = string_sub(url, protocol_idx + 3, auth_idx - 1) + local other = string_sub(url, auth_idx + 1) + url = protocol .. other + basic_auth = "Basic " .. ngx.encode_base64(user_and_password) + end + if local_conf.discovery.eureka.prefix then + url = url .. local_conf.discovery.eureka.prefix + end + if string_sub(url, #url) ~= "/" then + url = url .. "/" + end + core.table.insert(endpoints, { url = url, auth = basic_auth }) + end + return endpoints +end + + local function fetch_full_registry(premature) if premature then return end - local request_uri, basic_auth = service_info() - if not request_uri then + local endpoints = build_endpoints() + if not endpoints or #endpoints == 0 then return end - local res, err = request(request_uri, basic_auth, "GET", "apps") - if not res then - log.error("failed to fetch registry", err) - return + local res, err + local used_endpoint + local start = math_random(#endpoints) + for i = 0, #endpoints - 1 do + local ep = endpoints[((start + i) % #endpoints) + 1] + log.info("eureka uri:", ep.url, ".") + local r, e = request(ep.url, ep.auth, "GET", "apps") + if r and r.body and r.status == 200 then + res = r + used_endpoint = ep + break + end + log.warn("failed to fetch registry from ", ep.url, ": ", e or (r and ("status=" .. tostring(r.status)) or "unknown")) end - if not res.body or res.status ~= 200 then - log.error("failed to fetch registry, status = ", res.status) + if not res then + log.error("failed to fetch registry from all eureka hosts") return end local json_str = res.body - local data, err = core.json.decode(json_str) + local data, derr = core.json.decode(json_str) if not data then - log.error("invalid response body: ", json_str, " err: ", err) + log.error("invalid response body: ", json_str, " err: ", derr) return end local apps = data.applications.application @@ -185,17 +228,33 @@ local function fetch_full_registry(premature) metadata = metadata, }) if metadata then - -- remove useless data metadata.weight = nil end end end end applications = up_apps + log.info("successfully updated service registry, services count=", + core.table.nkeys(up_apps), "; source=", used_endpoint and used_endpoint.url or "unknown") + if not initial_fetched then + initial_fetched = true + if init_sema then + init_sema:post(1) + end + end end function _M.nodes(service_name) + if not applications then + if init_sema then + local ok, err = init_sema:wait(3) + if not ok then + log.warn("wait eureka initial fetch timeout: ", err) + end + end + end + if not applications then log.error("failed to fetch nodes for : ", service_name) return @@ -204,12 +263,13 @@ function _M.nodes(service_name) return applications[service_name] end - function _M.init_worker() default_weight = local_conf.discovery.eureka.weight or 100 log.info("default_weight:", default_weight, ".") local fetch_interval = local_conf.discovery.eureka.fetch_interval or 30 log.info("fetch_interval:", fetch_interval, ".") + init_sema = semaphore.new() + ngx_timer_at(0, fetch_full_registry) ngx_timer_every(fetch_interval, fetch_full_registry) end