1717
1818local core = require (" apisix.core" )
1919local binaryHeap = require (" binaryheap" )
20+ local dkjson = require (" dkjson" )
2021local ipairs = ipairs
2122local pairs = pairs
22-
23+ local ngx = ngx
24+ local ngx_shared = ngx .shared
25+ local tostring = tostring
2326
2427local _M = {}
2528
29+ -- Shared dictionary to store connection counts across balancer recreations
30+ local CONN_COUNT_DICT_NAME = " balancer-least-conn"
31+ local conn_count_dict
2632
2733local function least_score (a , b )
2834 return a .score < b .score
2935end
3036
37+ -- Get the connection count key for a specific upstream and server
38+ local function get_conn_count_key (upstream , server )
39+ local upstream_id = upstream .id
40+ if not upstream_id then
41+ -- Fallback to a hash of the upstream configuration using stable encoding
42+ upstream_id = ngx .crc32_short (dkjson .encode (upstream ))
43+ core .log .debug (" generated upstream_id from hash: " , upstream_id )
44+ end
45+ local key = " conn_count:" .. tostring (upstream_id ) .. " :" .. server
46+ core .log .debug (" generated connection count key: " , key )
47+ return key
48+ end
49+
50+
51+ -- Get the current connection count for a server from shared dict
52+ local function get_server_conn_count (upstream , server )
53+ local key = get_conn_count_key (upstream , server )
54+ local count , err = conn_count_dict :get (key )
55+ if err then
56+ core .log .error (" failed to get connection count for " , server , " : " , err )
57+ return 0
58+ end
59+ local result = count or 0
60+ core .log .debug (" retrieved connection count for server " , server , " : " , result )
61+ return result
62+ end
63+
64+
65+ -- Set the connection count for a server in shared dict
66+ local function set_server_conn_count (upstream , server , count )
67+ local key = get_conn_count_key (upstream , server )
68+ local ok , err = conn_count_dict :set (key , count )
69+ if not ok then
70+ core .log .error (" failed to set connection count for " , server , " : " , err )
71+ else
72+ core .log .debug (" set connection count for server " , server , " to " , count )
73+ end
74+ end
75+
76+
77+ -- Increment the connection count for a server
78+ local function incr_server_conn_count (upstream , server , delta )
79+ local key = get_conn_count_key (upstream , server )
80+ local new_count , err = conn_count_dict :incr (key , delta or 1 , 0 )
81+ if not new_count then
82+ core .log .error (" failed to increment connection count for " , server , " : " , err )
83+ return 0
84+ end
85+ core .log .debug (" incremented connection count for server " , server , " by " , delta or 1 ,
86+ " , new count: " , new_count )
87+ return new_count
88+ end
89+
90+
91+ -- Clean up connection counts for servers that are no longer in the upstream
92+ local function cleanup_stale_conn_counts (upstream , current_servers )
93+ local upstream_id = upstream .id
94+ if not upstream_id then
95+ upstream_id = ngx .crc32_short (dkjson .encode (upstream ))
96+ end
97+
98+ local prefix = " conn_count:" .. tostring (upstream_id ) .. " :"
99+ core .log .debug (" cleaning up stale connection counts with prefix: " , prefix )
100+ local keys , err = conn_count_dict :get_keys (0 ) -- Get all keys
101+ if err then
102+ core .log .error (" failed to get keys from shared dict: " , err )
103+ return
104+ end
105+
106+ for _ , key in ipairs (keys or {}) do
107+ if core .string .has_prefix (key , prefix ) then
108+ local server = key :sub (# prefix + 1 )
109+ if not current_servers [server ] then
110+ -- This server is no longer in the upstream, clean it up
111+ local ok , delete_err = conn_count_dict :delete (key )
112+ if not ok and delete_err then
113+ core .log .error (" failed to delete stale connection count for server " , server , " : " , delete_err )
114+ else
115+ core .log .info (" cleaned up stale connection count for server: " , server )
116+ end
117+ end
118+ end
119+ end
120+ end
31121
32122function _M .new (up_nodes , upstream )
123+ if not conn_count_dict then
124+ conn_count_dict = ngx_shared [CONN_COUNT_DICT_NAME ]
125+ end
126+
127+ if not conn_count_dict then
128+ core .log .error (" shared dict '" , CONN_COUNT_DICT_NAME , " ' not found" )
129+ return nil , " shared dict not found"
130+ end
131+
33132 local servers_heap = binaryHeap .minUnique (least_score )
133+
134+ -- Clean up stale connection counts for removed servers
135+ cleanup_stale_conn_counts (upstream , up_nodes )
136+
34137 for server , weight in pairs (up_nodes ) do
35- local score = 1 / weight
138+ -- Get the persisted connection count for this server
139+ local conn_count = get_server_conn_count (upstream , server )
140+ -- Score directly reflects weighted connection count
141+ local score = (conn_count + 1 ) / weight
142+
143+ core .log .debug (" initializing server " , server ,
144+ " | weight: " , weight ,
145+ " | conn_count: " , conn_count ,
146+ " | score: " , score ,
147+ " | upstream_id: " , upstream .id or " no-id" )
148+
36149 -- Note: the argument order of insert is different from others
37150 servers_heap :insert ({
38151 server = server ,
39- effect_weight = 1 / weight ,
152+ weight = weight ,
40153 score = score ,
41154 }, server )
42155 end
43156
44157 return {
45158 upstream = upstream ,
46- get = function (ctx )
159+ get = function (ctx )
47160 local server , info , err
48161 if ctx .balancer_tried_servers then
49162 local tried_server_list = {}
@@ -75,15 +188,29 @@ function _M.new(up_nodes, upstream)
75188 return nil , err
76189 end
77190
78- info .score = info .score + info .effect_weight
191+ -- Get current connection count for detailed logging
192+ local current_conn_count = get_server_conn_count (upstream , server )
193+ info .score = (current_conn_count + 1 ) / info .weight
79194 servers_heap :update (server , info )
195+ incr_server_conn_count (upstream , server , 1 )
80196 return server
81197 end ,
82- after_balance = function (ctx , before_retry )
198+ after_balance = function (ctx , before_retry )
83199 local server = ctx .balancer_server
84200 local info = servers_heap :valueByPayload (server )
85- info .score = info .score - info .effect_weight
201+ if not info then
202+ core .log .error (" server info not found for: " , server )
203+ return
204+ end
205+
206+ local current_conn_count = get_server_conn_count (upstream , server )
207+ info .score = (current_conn_count - 1 ) / info .weight
208+ if info .score < 0 then
209+ info .score = 0 -- Prevent negative scores
210+ end
86211 servers_heap :update (server , info )
212+ -- Decrement connection count in shared dict
213+ incr_server_conn_count (upstream , server , - 1 )
87214
88215 if not before_retry then
89216 if ctx .balancer_tried_servers then
@@ -100,7 +227,7 @@ function _M.new(up_nodes, upstream)
100227
101228 ctx .balancer_tried_servers [server ] = true
102229 end ,
103- before_retry_next_priority = function (ctx )
230+ before_retry_next_priority = function (ctx )
104231 if ctx .balancer_tried_servers then
105232 core .tablepool .release (" balancer_tried_servers" , ctx .balancer_tried_servers )
106233 ctx .balancer_tried_servers = nil
@@ -109,5 +236,5 @@ function _M.new(up_nodes, upstream)
109236 }
110237end
111238
112-
113239return _M
240+
0 commit comments