Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 15 additions & 5 deletions apisix/balancer.lua
Original file line number Diff line number Diff line change
Expand Up @@ -205,11 +205,21 @@ local function pick_server(route, ctx)

local nodes_count = #up_conf.nodes
if nodes_count == 1 then
local node = up_conf.nodes[1]
ctx.balancer_ip = node.host
ctx.balancer_port = node.port
node.upstream_host = parse_server_for_upstream_host(node, ctx.upstream_scheme)
return node
-- For least_conn balancer, we still need to use the balancer even with single node
-- to track connection counts for future load balancing decisions
if up_conf.type == "least_conn" then
core.log.debug(
"single node with least_conn balancer",
"still using balancer for connection tracking"
)
else
core.log.info("single node with ", up_conf.type, " balancer - skipping balancer")
local node = up_conf.nodes[1]
ctx.balancer_ip = node.host
ctx.balancer_port = node.port
node.upstream_host = parse_server_for_upstream_host(node, ctx.upstream_scheme)
return node
end
end

local version = ctx.upstream_version
Expand Down
189 changes: 180 additions & 9 deletions apisix/balancer/least_conn.lua
Original file line number Diff line number Diff line change
Expand Up @@ -17,33 +17,140 @@

local core = require("apisix.core")
local binaryHeap = require("binaryheap")
local dkjson = require("dkjson")
local ipairs = ipairs
local pairs = pairs

local ngx = ngx
local ngx_shared = ngx.shared
local tostring = tostring

local _M = {}

-- Shared dictionary to store connection counts across balancer recreations
local CONN_COUNT_DICT_NAME = "balancer-least-conn"
local conn_count_dict

local function least_score(a, b)
return a.score < b.score
end

-- Get the connection count key for a specific upstream and server
local function get_conn_count_key(upstream, server)
local upstream_id = upstream.id
if not upstream_id then
-- Fallback to a hash of the upstream configuration using stable encoding
upstream_id = ngx.crc32_short(dkjson.encode(upstream))
core.log.debug("generated upstream_id from hash: ", upstream_id)
end
local key = "conn_count:" .. tostring(upstream_id) .. ":" .. server
core.log.debug("generated connection count key: ", key)
return key
end


-- Get the current connection count for a server from shared dict
local function get_server_conn_count(upstream, server)
local key = get_conn_count_key(upstream, server)
local count, err = conn_count_dict:get(key)
if err then
core.log.error("failed to get connection count for ", server, ": ", err)
return 0
end
local result = count or 0
core.log.debug("retrieved connection count for server ", server, ": ", result)
return result
end

-- Increment the connection count for a server
local function incr_server_conn_count(upstream, server, delta)
local key = get_conn_count_key(upstream, server)
local new_count, err = conn_count_dict:incr(key, delta or 1, 0)
if not new_count then
core.log.error("failed to increment connection count for ", server, ": ", err)
return 0
end
core.log.debug("incremented connection count for server ", server, " by ", delta or 1,
", new count: ", new_count)
return new_count
end


-- Clean up connection counts for servers that are no longer in the upstream
local function cleanup_stale_conn_counts(upstream, current_servers)
local upstream_id = upstream.id
if not upstream_id then
upstream_id = ngx.crc32_short(dkjson.encode(upstream))
end

local prefix = "conn_count:" .. tostring(upstream_id) .. ":"
core.log.debug("cleaning up stale connection counts with prefix: ", prefix)
local keys, err = conn_count_dict:get_keys(0) -- Get all keys
if err then
core.log.error("failed to get keys from shared dict: ", err)
return
end

for _, key in ipairs(keys or {}) do
if core.string.has_prefix(key, prefix) then
local server = key:sub(#prefix + 1)
if not current_servers[server] then
-- This server is no longer in the upstream, clean it up
local ok, delete_err = conn_count_dict:delete(key)
if not ok and delete_err then
core.log.error("failed to delete stale connection count for server ",
server, ": ", delete_err)
else
core.log.info("cleaned up stale connection count for server: ", server)
end
end
end
end
end

function _M.new(up_nodes, upstream)
if not conn_count_dict then
conn_count_dict = ngx_shared[CONN_COUNT_DICT_NAME]
end

local use_persistent_counting = conn_count_dict ~= nil
if not use_persistent_counting then
core.log.warn("shared dict '",
CONN_COUNT_DICT_NAME, "' not found, using traditional least_conn mode")
end

local servers_heap = binaryHeap.minUnique(least_score)

if use_persistent_counting then
-- Clean up stale connection counts for removed servers
cleanup_stale_conn_counts(upstream, up_nodes)
end

for server, weight in pairs(up_nodes) do
local score = 1 / weight
local score
if use_persistent_counting then
-- True least connection mode: use persistent connection counts
local conn_count = get_server_conn_count(upstream, server)
score = (conn_count + 1) / weight
core.log.debug("initializing server ", server, " with persistent counting",
" | weight: ", weight, " | conn_count: ", conn_count, " | score: ", score)
else
-- Fallback mode: use original weighted round-robin behavior
score = 1 / weight
end

-- Note: the argument order of insert is different from others
servers_heap:insert({
server = server,
effect_weight = 1 / weight,
weight = weight,
effect_weight = 1 / weight, -- For backward compatibility
score = score,
use_persistent_counting = use_persistent_counting,
}, server)
end

return {
upstream = upstream,
get = function (ctx)
get = function(ctx)
local server, info, err
if ctx.balancer_tried_servers then
local tried_server_list = {}
Expand Down Expand Up @@ -75,14 +182,40 @@ function _M.new(up_nodes, upstream)
return nil, err
end

info.score = info.score + info.effect_weight
servers_heap:update(server, info)
if info.use_persistent_counting then
-- True least connection mode: update based on persistent connection counts
local current_conn_count = get_server_conn_count(upstream, server)
info.score = (current_conn_count + 1) / info.weight
servers_heap:update(server, info)
incr_server_conn_count(upstream, server, 1)
else
-- Fallback mode: use original weighted round-robin logic
info.score = info.score + info.effect_weight
servers_heap:update(server, info)
end
return server
end,
after_balance = function (ctx, before_retry)
after_balance = function(ctx, before_retry)
local server = ctx.balancer_server
local info = servers_heap:valueByPayload(server)
info.score = info.score - info.effect_weight
if not info then
core.log.error("server info not found for: ", server)
return
end

if info.use_persistent_counting then
-- True least connection mode: update based on persistent connection counts
incr_server_conn_count(upstream, server, -1)
local current_conn_count = get_server_conn_count(upstream, server)
info.score = (current_conn_count + 1) / info.weight
if info.score < 0 then
-- Prevent negative scores
info.score = 0
end
else
-- Fallback mode: use original weighted round-robin logic
info.score = info.score - info.effect_weight
end
servers_heap:update(server, info)

if not before_retry then
Expand All @@ -100,7 +233,7 @@ function _M.new(up_nodes, upstream)

ctx.balancer_tried_servers[server] = true
end,
before_retry_next_priority = function (ctx)
before_retry_next_priority = function(ctx)
if ctx.balancer_tried_servers then
core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers)
ctx.balancer_tried_servers = nil
Expand All @@ -109,5 +242,43 @@ function _M.new(up_nodes, upstream)
}
end

local function cleanup_all_conn_counts()
if not conn_count_dict then
conn_count_dict = ngx_shared[CONN_COUNT_DICT_NAME]
end

if not conn_count_dict then
-- No shared dict available, nothing to cleanup
return
end

local keys, err = conn_count_dict:get_keys(0) -- Get all keys
if err then
core.log.error("failed to get keys from shared dict during cleanup: ", err)
return
end

local cleaned_count = 0
for _, key in ipairs(keys or {}) do
if core.string.has_prefix(key, "conn_count:") then
local ok, delete_err = conn_count_dict:delete(key)
if not ok and delete_err then
core.log.warn("failed to delete connection count key during cleanup: ",
key, ", error: ", delete_err)
else
cleaned_count = cleaned_count + 1
end
end
end

if cleaned_count > 0 then
core.log.info("cleaned up ", cleaned_count, " connection count entries from shared dict")
end
end

function _M.cleanup_all()
cleanup_all_conn_counts()
end

return _M

3 changes: 3 additions & 0 deletions apisix/cli/config.lua
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ local _M = {
["prometheus-cache"] = "10m",
["standalone-config"] = "10m",
["status-report"] = "1m",
["balancer-least-conn"] = "10m",
}
},
stream = {
Expand All @@ -115,6 +116,7 @@ local _M = {
["worker-events-stream"] = "10m",
["tars-stream"] = "1m",
["upstream-healthcheck-stream"] = "10m",
["balancer-least-conn"] = "10m",
}
},
main_configuration_snippet = "",
Expand Down Expand Up @@ -162,6 +164,7 @@ local _M = {
["balancer-ewma"] = "10m",
["balancer-ewma-locks"] = "10m",
["balancer-ewma-last-touched-at"] = "10m",
["balancer-least-conn"] = "10m",
["plugin-limit-req-redis-cluster-slot-lock"] = "1m",
["plugin-limit-count-redis-cluster-slot-lock"] = "1m",
["plugin-limit-conn-redis-cluster-slot-lock"] = "1m",
Expand Down
6 changes: 6 additions & 0 deletions apisix/cli/ngx_tpl.lua
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,9 @@ lua {
{% if status then %}
lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *};
{% end %}
{% if enable_stream then %}
lua_shared_dict balancer-least-conn {* meta.lua_shared_dict["balancer-least-conn"] *};
{% end %}
lua_shared_dict nacos 10m;
}

Expand Down Expand Up @@ -290,6 +293,9 @@ http {
lua_shared_dict balancer-ewma {* http.lua_shared_dict["balancer-ewma"] *};
lua_shared_dict balancer-ewma-locks {* http.lua_shared_dict["balancer-ewma-locks"] *};
lua_shared_dict balancer-ewma-last-touched-at {* http.lua_shared_dict["balancer-ewma-last-touched-at"] *};
{% if not enable_stream then %}
lua_shared_dict balancer-least-conn {* http.lua_shared_dict["balancer-least-conn"] *};
{% end %}
lua_shared_dict etcd-cluster-health-check {* http.lua_shared_dict["etcd-cluster-health-check"] *}; # etcd health check

# for discovery shared dict
Expand Down
Loading
Loading