From 666986d3446c116ba902e5bf6500a4e8829c05dd Mon Sep 17 00:00:00 2001 From: coder2z Date: Thu, 29 May 2025 17:44:24 +0800 Subject: [PATCH 01/15] fix: improve least_conn balancer implementation --- apisix/balancer.lua | 17 +- apisix/balancer/least_conn.lua | 145 +++++++++++++- apisix/cli/config.lua | 3 + apisix/cli/ngx_tpl.lua | 6 + docs/en/latest/balancer-least-conn.md | 275 ++++++++++++++++++++++++++ docs/zh/latest/balancer-least-conn.md | 275 ++++++++++++++++++++++++++ t/node/least_conn_websocket.t | 195 ++++++++++++++++++ 7 files changed, 902 insertions(+), 14 deletions(-) create mode 100644 docs/en/latest/balancer-least-conn.md create mode 100644 docs/zh/latest/balancer-least-conn.md create mode 100644 t/node/least_conn_websocket.t diff --git a/apisix/balancer.lua b/apisix/balancer.lua index 0fe2e6539922..aa8c88b1dd2f 100644 --- a/apisix/balancer.lua +++ b/apisix/balancer.lua @@ -204,11 +204,18 @@ local function pick_server(route, ctx) local nodes_count = #up_conf.nodes if nodes_count == 1 then - local node = up_conf.nodes[1] - ctx.balancer_ip = node.host - ctx.balancer_port = node.port - node.upstream_host = parse_server_for_upstream_host(node, ctx.upstream_scheme) - return node + -- For least_conn balancer, we still need to use the balancer even with single node + -- to track connection counts for future load balancing decisions + if up_conf.type == "least_conn" then + core.log.debug("single node with least_conn balancer - still using balancer for connection tracking") + else + core.log.info("single node with ", up_conf.type, " balancer - skipping balancer") + local node = up_conf.nodes[1] + ctx.balancer_ip = node.host + ctx.balancer_port = node.port + node.upstream_host = parse_server_for_upstream_host(node, ctx.upstream_scheme) + return node + end end local version = ctx.upstream_version diff --git a/apisix/balancer/least_conn.lua b/apisix/balancer/least_conn.lua index 8923d1781d00..93d6eb09b71f 100644 --- a/apisix/balancer/least_conn.lua +++ b/apisix/balancer/least_conn.lua @@ -17,33 +17,146 @@ local core = require("apisix.core") local binaryHeap = require("binaryheap") +local dkjson = require("dkjson") local ipairs = ipairs local pairs = pairs - +local ngx = ngx +local ngx_shared = ngx.shared +local tostring = tostring local _M = {} +-- Shared dictionary to store connection counts across balancer recreations +local CONN_COUNT_DICT_NAME = "balancer-least-conn" +local conn_count_dict local function least_score(a, b) return a.score < b.score end +-- Get the connection count key for a specific upstream and server +local function get_conn_count_key(upstream, server) + local upstream_id = upstream.id + if not upstream_id then + -- Fallback to a hash of the upstream configuration using stable encoding + upstream_id = ngx.crc32_short(dkjson.encode(upstream)) + core.log.debug("generated upstream_id from hash: ", upstream_id) + end + local key = "conn_count:" .. tostring(upstream_id) .. ":" .. server + core.log.debug("generated connection count key: ", key) + return key +end + + +-- Get the current connection count for a server from shared dict +local function get_server_conn_count(upstream, server) + local key = get_conn_count_key(upstream, server) + local count, err = conn_count_dict:get(key) + if err then + core.log.error("failed to get connection count for ", server, ": ", err) + return 0 + end + local result = count or 0 + core.log.debug("retrieved connection count for server ", server, ": ", result) + return result +end + + +-- Set the connection count for a server in shared dict +local function set_server_conn_count(upstream, server, count) + local key = get_conn_count_key(upstream, server) + local ok, err = conn_count_dict:set(key, count) + if not ok then + core.log.error("failed to set connection count for ", server, ": ", err) + else + core.log.debug("set connection count for server ", server, " to ", count) + end +end + + +-- Increment the connection count for a server +local function incr_server_conn_count(upstream, server, delta) + local key = get_conn_count_key(upstream, server) + local new_count, err = conn_count_dict:incr(key, delta or 1, 0) + if not new_count then + core.log.error("failed to increment connection count for ", server, ": ", err) + return 0 + end + core.log.debug("incremented connection count for server ", server, " by ", delta or 1, + ", new count: ", new_count) + return new_count +end + + +-- Clean up connection counts for servers that are no longer in the upstream +local function cleanup_stale_conn_counts(upstream, current_servers) + local upstream_id = upstream.id + if not upstream_id then + upstream_id = ngx.crc32_short(dkjson.encode(upstream)) + end + + local prefix = "conn_count:" .. tostring(upstream_id) .. ":" + core.log.debug("cleaning up stale connection counts with prefix: ", prefix) + local keys, err = conn_count_dict:get_keys(0) -- Get all keys + if err then + core.log.error("failed to get keys from shared dict: ", err) + return + end + + for _, key in ipairs(keys or {}) do + if core.string.has_prefix(key, prefix) then + local server = key:sub(#prefix + 1) + if not current_servers[server] then + -- This server is no longer in the upstream, clean it up + local ok, delete_err = conn_count_dict:delete(key) + if not ok and delete_err then + core.log.error("failed to delete stale connection count for server ", server, ": ", delete_err) + else + core.log.info("cleaned up stale connection count for server: ", server) + end + end + end + end +end function _M.new(up_nodes, upstream) + if not conn_count_dict then + conn_count_dict = ngx_shared[CONN_COUNT_DICT_NAME] + end + + if not conn_count_dict then + core.log.error("shared dict '", CONN_COUNT_DICT_NAME, "' not found") + return nil, "shared dict not found" + end + local servers_heap = binaryHeap.minUnique(least_score) + + -- Clean up stale connection counts for removed servers + cleanup_stale_conn_counts(upstream, up_nodes) + for server, weight in pairs(up_nodes) do - local score = 1 / weight + -- Get the persisted connection count for this server + local conn_count = get_server_conn_count(upstream, server) + -- Score directly reflects weighted connection count + local score = (conn_count + 1) / weight + + core.log.debug("initializing server ", server, + " | weight: ", weight, + " | conn_count: ", conn_count, + " | score: ", score, + " | upstream_id: ", upstream.id or "no-id") + -- Note: the argument order of insert is different from others servers_heap:insert({ server = server, - effect_weight = 1 / weight, + weight = weight, score = score, }, server) end return { upstream = upstream, - get = function (ctx) + get = function(ctx) local server, info, err if ctx.balancer_tried_servers then local tried_server_list = {} @@ -75,15 +188,29 @@ function _M.new(up_nodes, upstream) return nil, err end - info.score = info.score + info.effect_weight + -- Get current connection count for detailed logging + local current_conn_count = get_server_conn_count(upstream, server) + info.score = (current_conn_count + 1) / info.weight servers_heap:update(server, info) + incr_server_conn_count(upstream, server, 1) return server end, - after_balance = function (ctx, before_retry) + after_balance = function(ctx, before_retry) local server = ctx.balancer_server local info = servers_heap:valueByPayload(server) - info.score = info.score - info.effect_weight + if not info then + core.log.error("server info not found for: ", server) + return + end + + local current_conn_count = get_server_conn_count(upstream, server) + info.score = (current_conn_count - 1) / info.weight + if info.score < 0 then + info.score = 0 -- Prevent negative scores + end servers_heap:update(server, info) + -- Decrement connection count in shared dict + incr_server_conn_count(upstream, server, -1) if not before_retry then if ctx.balancer_tried_servers then @@ -100,7 +227,7 @@ function _M.new(up_nodes, upstream) ctx.balancer_tried_servers[server] = true end, - before_retry_next_priority = function (ctx) + before_retry_next_priority = function(ctx) if ctx.balancer_tried_servers then core.tablepool.release("balancer_tried_servers", ctx.balancer_tried_servers) ctx.balancer_tried_servers = nil @@ -109,5 +236,5 @@ function _M.new(up_nodes, upstream) } end - return _M + diff --git a/apisix/cli/config.lua b/apisix/cli/config.lua index 10f5969e6505..cf0db649a4e7 100644 --- a/apisix/cli/config.lua +++ b/apisix/cli/config.lua @@ -97,6 +97,7 @@ local _M = { ["prometheus-metrics"] = "15m", ["standalone-config"] = "10m", ["status-report"] = "1m", + ["balancer-least-conn"] = "10m", } }, stream = { @@ -113,6 +114,7 @@ local _M = { ["worker-events-stream"] = "10m", ["tars-stream"] = "1m", ["upstream-healthcheck-stream"] = "10m", + ["balancer-least-conn"] = "10m", } }, main_configuration_snippet = "", @@ -160,6 +162,7 @@ local _M = { ["balancer-ewma"] = "10m", ["balancer-ewma-locks"] = "10m", ["balancer-ewma-last-touched-at"] = "10m", + ["balancer-least-conn"] = "10m", ["plugin-limit-req-redis-cluster-slot-lock"] = "1m", ["plugin-limit-count-redis-cluster-slot-lock"] = "1m", ["plugin-limit-conn-redis-cluster-slot-lock"] = "1m", diff --git a/apisix/cli/ngx_tpl.lua b/apisix/cli/ngx_tpl.lua index d5d03bad0ae9..79b3fe1f1930 100644 --- a/apisix/cli/ngx_tpl.lua +++ b/apisix/cli/ngx_tpl.lua @@ -73,6 +73,9 @@ lua { {% if status then %} lua_shared_dict status-report {* meta.lua_shared_dict["status-report"] *}; {% end %} + {% if enable_stream then %} + lua_shared_dict balancer-least-conn {* meta.lua_shared_dict["balancer-least-conn"] *}; + {% end %} } {% if enabled_stream_plugins["prometheus"] and not enable_http then %} @@ -284,6 +287,9 @@ http { lua_shared_dict balancer-ewma {* http.lua_shared_dict["balancer-ewma"] *}; lua_shared_dict balancer-ewma-locks {* http.lua_shared_dict["balancer-ewma-locks"] *}; lua_shared_dict balancer-ewma-last-touched-at {* http.lua_shared_dict["balancer-ewma-last-touched-at"] *}; + {% if not enable_stream then %} + lua_shared_dict balancer-least-conn {* http.lua_shared_dict["balancer-least-conn"] *}; + {% end %} lua_shared_dict etcd-cluster-health-check {* http.lua_shared_dict["etcd-cluster-health-check"] *}; # etcd health check # for discovery shared dict diff --git a/docs/en/latest/balancer-least-conn.md b/docs/en/latest/balancer-least-conn.md new file mode 100644 index 000000000000..6f8dec80ac6d --- /dev/null +++ b/docs/en/latest/balancer-least-conn.md @@ -0,0 +1,275 @@ +# Least Connection Load Balancer + +## Overview + +The `least_conn` load balancer in Apache APISIX implements a dynamic load balancing algorithm that routes requests to the upstream server with the fewest active connections. This algorithm is particularly effective for scenarios where request processing times vary significantly or when dealing with long-lived connections such as WebSocket connections. + +## Algorithm Details + +### Core Principle + +The least connection algorithm maintains a count of active connections for each upstream server and selects the server with the lowest connection count for new requests. This approach helps ensure more even distribution of load, especially when connection durations vary. + +The algorithm uses a binary min-heap data structure to efficiently track and select servers with the lowest scores. Connection counts are persisted in nginx shared memory to maintain state across configuration reloads and worker process restarts. + +### Score Calculation + +Each upstream server is assigned a dynamic score based on its current connection load and weight: + +```lua +score = (connection_count + 1) / weight +``` + +Where: +- `connection_count` - Current number of active connections to the server +- `weight` - Server weight configuration value + +Servers with lower scores are preferred for new connections. The `+1` in the formula represents the potential new connection being considered. The score is updated in real-time as connections are established and completed. + +### Connection State Management + +#### Real-time Updates +- **Connection Start**: Connection count incremented, score updated to `(new_count + 1) / weight` +- **Connection End**: Connection count decremented, score updated to `(new_count - 1) / weight` +- **Heap Maintenance**: Binary heap automatically reorders servers by score +- **Score Protection**: Prevents negative scores by setting minimum score to 0 + +#### Persistence Strategy +Connection counts are stored in nginx shared dictionary with structured keys: +``` +conn_count:{upstream_id}:{server_address} +``` + +This ensures connection state survives: +- Upstream configuration changes +- Balancer instance recreation +- Worker process restarts +- Node additions/removals + +### Connection Tracking + +#### Persistent State Management + +The balancer uses nginx shared dictionary (`balancer-least-conn`) to maintain connection counts across: +- Balancer instance recreations +- Upstream configuration changes +- Worker process restarts +- Node additions/removals + +#### Connection Count Keys + +Connection counts are stored using structured keys: +``` +conn_count:{upstream_id}:{server_address} +``` + +Where: +- `upstream_id` - Unique identifier for the upstream configuration +- `server_address` - Server address (e.g., "127.0.0.1:8080") + +#### Upstream ID Generation + +1. **Primary**: Uses `upstream.id` if available +2. **Fallback**: Generates CRC32 hash of stable JSON encoding of upstream configuration + +```lua +local upstream_id = upstream.id +if not upstream_id then + upstream_id = ngx.crc32_short(dkjson.encode(upstream)) +end +``` + +The implementation uses `dkjson.encode` instead of `core.json.encode` to ensure deterministic JSON serialization, which is crucial for generating consistent upstream IDs across different worker processes and configuration reloads. + +### Connection Lifecycle + +#### 1. Connection Establishment +When a new request is routed: +1. Select server with lowest score from the heap +2. Update server score to `(current_count + 1) / weight` +3. Increment connection count in shared dictionary +4. Update server position in the heap + +#### 2. Connection Completion +When a request completes: +1. Calculate new score as `(current_count - 1) / weight` +2. Ensure score is not negative (minimum 0) +3. Decrement connection count in shared dictionary +4. Update server position in the heap + +#### 3. Cleanup Process +During balancer recreation: +1. Identify current active servers +2. Remove connection counts for servers no longer in upstream +3. Preserve counts for existing servers + +### Data Structures + +#### Binary Heap +- **Type**: Min-heap based on server scores +- **Purpose**: Efficient selection of server with lowest score +- **Operations**: O(log n) insertion, deletion, and updates + +#### Shared Dictionary +- **Name**: `balancer-least-conn` +- **Size**: 10MB (configurable) +- **Scope**: Shared across all worker processes +- **Persistence**: Survives configuration reloads + +## Configuration + +### Automatic Setup + +The `balancer-least-conn` shared dictionary is automatically configured by APISIX with a default size of 10MB. No manual configuration is required. + +### Custom Configuration + +To customize the shared dictionary size, modify the `nginx_config.http.lua_shared_dict` section in your `conf/config.yaml`: + +```yaml +nginx_config: + http: + lua_shared_dict: + balancer-least-conn: 20m # Custom size (default: 10m) +``` + +### Upstream Configuration + +```yaml +upstreams: + - id: 1 + type: least_conn + nodes: + "127.0.0.1:8080": 1 + "127.0.0.1:8081": 2 + "127.0.0.1:8082": 1 +``` + +## Performance Characteristics + +### Time Complexity +- **Server Selection**: O(1) - heap peek operation +- **Connection Update**: O(log n) - heap update operation +- **Cleanup**: O(k) where k is the number of stored keys + +### Memory Usage +- **Per Server**: ~100 bytes (key + value + overhead) +- **Total**: Scales linearly with number of servers across all upstreams + +### Scalability +- **Servers**: Efficiently handles hundreds of servers per upstream +- **Upstreams**: Supports multiple upstreams with isolated connection tracking +- **Requests**: Minimal per-request overhead + +## Use Cases + +### Optimal Scenarios +1. **WebSocket Applications**: Long-lived connections benefit from accurate load distribution +2. **Variable Processing Times**: Requests with unpredictable duration +3. **Resource-Intensive Operations**: CPU or memory-intensive backend processing +4. **Database Connections**: Connection pooling scenarios + +### Considerations +1. **Short-lived Connections**: May have higher overhead than round-robin for very short requests +2. **Uniform Processing**: Round-robin might be simpler for uniform request processing +3. **Memory Usage**: Requires shared memory for connection state + +## Monitoring and Debugging + +### Log Messages + +#### Debug Logs +Enable debug logging to monitor balancer behavior: + +**Balancer Creation** +``` +creating new least_conn balancer for upstream: upstream_123 +``` + +**Connection Count Operations** +``` +generated connection count key: conn_count:upstream_123:127.0.0.1:8080 +retrieved connection count for 127.0.0.1:8080: 5 +setting connection count for 127.0.0.1:8080 to 6 +incrementing connection count for 127.0.0.1:8080 by 1, new count: 6 +``` + +**Server Selection** +``` +selected server: 127.0.0.1:8080 with current score: 1.2 +after_balance for server: 127.0.0.1:8080, before_retry: false +``` + +**Cleanup Operations** +``` +cleaning up stale connection counts for upstream: upstream_123 +cleaned up stale connection count for server: 127.0.0.1:8082 +``` + +#### Initialization +``` +initializing server 127.0.0.1:8080 with weight 1, base_score 1, conn_count 0, final_score 1 +``` + +#### Errors +``` +failed to set connection count for 127.0.0.1:8080: no memory +failed to increment connection count for 127.0.0.1:8080: no memory +``` + +### Shared Dictionary Monitoring + +Check shared dictionary usage: +```lua +local dict = ngx.shared["balancer-least-conn"] +local free_space = dict:free_space() +local capacity = dict:capacity() +``` + +## Error Handling + +### Missing Shared Dictionary +If the shared dictionary is not available (which should not happen with default configuration), the balancer will fail to initialize with: +``` +shared dict 'balancer-least-conn' not found +``` + +### Memory Exhaustion +When shared dictionary runs out of memory: +- Connection count updates will fail +- Warning messages will be logged +- Balancer continues to function with potentially stale counts + +### Recovery Strategies +1. **Increase Dictionary Size**: Allocate more memory +2. **Cleanup Frequency**: Implement periodic cleanup of stale entries +3. **Monitoring**: Set up alerts for dictionary usage + +## Best Practices + +### Configuration +1. **Dictionary Size**: Default 10MB is sufficient for most cases (supports ~100k connections) +2. **Server Weights**: Use appropriate weights to reflect server capacity +3. **Health Checks**: Combine with health checks for robust load balancing + +### Monitoring +1. **Connection Counts**: Monitor for unexpected accumulation +2. **Memory Usage**: Track shared dictionary utilization +3. **Performance**: Measure request distribution effectiveness + +### Troubleshooting +1. **Uneven Distribution**: Check for connection count accumulation +2. **Memory Issues**: Monitor shared dictionary free space +3. **Configuration**: Verify shared dictionary is properly configured + +## Migration and Compatibility + +### Backward Compatibility +- Graceful degradation when shared dictionary is unavailable +- No breaking changes to existing API +- Maintains existing behavior patterns + +### Upgrade Considerations +1. **Configuration**: Shared dictionary is automatically configured +2. **Memory**: Default allocation should be sufficient for most use cases +3. **Testing**: Validate load distribution in staging environment diff --git a/docs/zh/latest/balancer-least-conn.md b/docs/zh/latest/balancer-least-conn.md new file mode 100644 index 000000000000..d11ba08322b5 --- /dev/null +++ b/docs/zh/latest/balancer-least-conn.md @@ -0,0 +1,275 @@ +# 最少连接负载均衡器 + +## 概述 + +Apache APISIX 中的 `least_conn` 负载均衡器实现了一种动态负载均衡算法,将请求路由到活跃连接数最少的上游服务器。该算法特别适用于请求处理时间差异较大的场景,或处理长连接(如 WebSocket 连接)的情况。 + +## 算法详情 + +### 核心原理 + +最少连接算法为每个上游服务器维护活跃连接数的计数,并为新请求选择连接数最少的服务器。这种方法有助于确保更均匀的负载分布,特别是在连接持续时间变化较大的情况下。 + +该算法使用二进制最小堆数据结构来高效跟踪和选择得分最低的服务器。连接计数持久化存储在 nginx 共享内存中,以在配置重载和工作进程重启时保持状态。 + +### 得分计算 + +每个上游服务器被分配一个动态得分,基于其当前连接负载和权重: + +```lua +score = (connection_count + 1) / weight +``` + +其中: +- `connection_count` - 当前到服务器的活跃连接数 +- `weight` - 服务器权重配置值 + +得分较低的服务器优先获得新连接。公式中的 `+1` 表示正在考虑分配的新连接。得分会随着连接的建立和完成实时更新。 + +### 连接状态管理 + +#### 实时更新 +- **连接开始**:连接计数递增,得分更新为 `(new_count + 1) / weight` +- **连接结束**:连接计数递减,得分更新为 `(new_count - 1) / weight` +- **堆维护**:二进制堆自动按得分重新排序服务器 +- **得分保护**:通过设置最小得分为 0 防止出现负分 + +#### 持久化策略 +连接计数存储在 nginx 共享字典中,使用结构化键: +``` +conn_count:{upstream_id}:{server_address} +``` + +这确保连接状态在以下情况下保持: +- 上游配置变更 +- 负载均衡器实例重建 +- 工作进程重启 +- 节点添加/移除 + +### 连接跟踪 + +#### 持久状态管理 + +负载均衡器使用 nginx 共享字典(`balancer-least-conn`)在以下情况下维护连接计数: +- 负载均衡器实例重建 +- 上游配置变更 +- 工作进程重启 +- 节点添加/移除 + +#### 连接计数键 + +连接计数使用结构化键存储: +``` +conn_count:{upstream_id}:{server_address} +``` + +其中: +- `upstream_id` - 上游配置的唯一标识符 +- `server_address` - 服务器地址(例如:"127.0.0.1:8080") + +#### 上游 ID 生成 + +1. **主要方式**:如果可用,使用 `upstream.id` +2. **备用方式**:生成上游配置稳定 JSON 编码的 CRC32 哈希 + +```lua +local upstream_id = upstream.id +if not upstream_id then + upstream_id = ngx.crc32_short(dkjson.encode(upstream)) +end +``` + +实现使用 `dkjson.encode` 而不是 `core.json.encode` 来确保确定性的 JSON 序列化,这对于在不同工作进程和配置重载之间生成一致的上游 ID 至关重要。 + +### 连接生命周期 + +#### 1. 连接建立 +当路由新请求时: +1. 从堆中选择得分最低的服务器 +2. 将服务器得分更新为 `(current_count + 1) / weight` +3. 在共享字典中递增连接计数 +4. 更新堆中服务器的位置 + +#### 2. 连接完成 +当请求完成时: +1. 计算新得分为 `(current_count - 1) / weight` +2. 保证得分不为负(最小为 0) +3. 在共享字典中递减连接计数 +4. 更新堆中服务器的位置 + +#### 3. 清理过程 +在负载均衡器重建期间: +1. 识别当前活跃的服务器 +2. 移除不再在上游中的服务器的连接计数 +3. 保留现有服务器的计数 + +### 数据结构 + +#### 二进制堆 +- **类型**:基于服务器得分的最小堆 +- **目的**:高效选择得分最低的服务器 +- **操作**:O(log n) 插入、删除和更新 + +#### 共享字典 +- **名称**:`balancer-least-conn` +- **大小**:10MB(可配置) +- **范围**:在所有工作进程间共享 +- **持久性**:在配置重载后保持 + +## 配置 + +### 自动设置 + +`balancer-least-conn` 共享字典由 APISIX 自动配置,默认大小为 10MB。无需手动配置。 + +### 自定义配置 + +要自定义共享字典大小,请修改 `conf/config.yaml` 中的 `nginx_config.http.lua_shared_dict` 部分: + +```yaml +nginx_config: + http: + lua_shared_dict: + balancer-least-conn: 20m # 自定义大小(默认:10m) +``` + +### 上游配置 + +```yaml +upstreams: + - id: 1 + type: least_conn + nodes: + "127.0.0.1:8080": 1 + "127.0.0.1:8081": 2 + "127.0.0.1:8082": 1 +``` + +## 性能特征 + +### 时间复杂度 +- **服务器选择**:O(1) - 堆查看操作 +- **连接更新**:O(log n) - 堆更新操作 +- **清理**:O(k),其中 k 是存储键的数量 + +### 内存使用 +- **每个服务器**:约 100 字节(键 + 值 + 开销) +- **总计**:与所有上游的服务器数量线性扩展 + +### 可扩展性 +- **服务器**:高效处理每个上游数百个服务器 +- **上游**:支持多个上游,具有隔离的连接跟踪 +- **请求**:最小的每请求开销 + +## 使用场景 + +### 最佳场景 +1. **WebSocket 应用**:长连接受益于准确的负载分布 +2. **可变处理时间**:持续时间不可预测的请求 +3. **资源密集型操作**:CPU 或内存密集型后端处理 +4. **数据库连接**:连接池场景 + +### 注意事项 +1. **短连接**:对于非常短的请求,可能比轮询有更高的开销 +2. **统一处理**:对于统一的请求处理,轮询可能更简单 +3. **内存使用**:需要共享内存来存储连接状态 + +## 监控和调试 + +### 日志消息 + +#### 调试日志 +启用调试日志来监控负载均衡器行为: + +**负载均衡器创建** +``` +creating new least_conn balancer for upstream: upstream_123 +``` + +**连接数操作** +``` +generated connection count key: conn_count:upstream_123:127.0.0.1:8080 +retrieved connection count for 127.0.0.1:8080: 5 +setting connection count for 127.0.0.1:8080 to 6 +incrementing connection count for 127.0.0.1:8080 by 1, new count: 6 +``` + +**服务器选择** +``` +selected server: 127.0.0.1:8080 with current score: 1.2 +after_balance for server: 127.0.0.1:8080, before_retry: false +``` + +**清理操作** +``` +cleaning up stale connection counts for upstream: upstream_123 +cleaned up stale connection count for server: 127.0.0.1:8082 +``` + +#### 初始化 +``` +initializing server 127.0.0.1:8080 with weight 1, base_score 1, conn_count 0, final_score 1 +``` + +#### 错误 +``` +failed to set connection count for 127.0.0.1:8080: no memory +failed to increment connection count for 127.0.0.1:8080: no memory +``` + +### 共享字典监控 + +检查共享字典使用情况: +```lua +local dict = ngx.shared["balancer-least-conn"] +local free_space = dict:free_space() +local capacity = dict:capacity() +``` + +## 错误处理 + +### 缺少共享字典 +如果共享字典不可用(在默认配置下不应该发生),负载均衡器将初始化失败并显示: +``` +shared dict 'balancer-least-conn' not found +``` + +### 内存耗尽 +当共享字典内存不足时: +- 连接计数更新将失败 +- 将记录警告消息 +- 负载均衡器继续运行,但可能使用过时的计数 + +### 恢复策略 +1. **增加字典大小**:分配更多内存 +2. **清理频率**:实现过时条目的定期清理 +3. **监控**:为字典使用情况设置警报 + +## 最佳实践 + +### 配置 +1. **字典大小**:默认 10MB 对大多数情况足够(支持约 10 万连接) +2. **服务器权重**:使用适当的权重来反映服务器容量 +3. **健康检查**:与健康检查结合使用以实现稳健的负载均衡 + +### 监控 +1. **连接计数**:监控意外的累积 +2. **内存使用**:跟踪共享字典利用率 +3. **性能**:测量请求分布的有效性 + +### 故障排除 +1. **不均匀分布**:检查连接计数累积 +2. **内存问题**:监控共享字典可用空间 +3. **配置**:验证共享字典是否正确配置 + +## 迁移和兼容性 + +### 向后兼容性 +- 当共享字典不可用时优雅降级 +- 对现有 API 无破坏性更改 +- 保持现有行为模式 + +### 升级注意事项 +1. **配置**:共享字典自动配置 +2. **内存**:默认分配对大多数用例应该足够 +3. **测试**:在测试环境中验证负载分布 diff --git a/t/node/least_conn_websocket.t b/t/node/least_conn_websocket.t new file mode 100644 index 000000000000..fbac650aada0 --- /dev/null +++ b/t/node/least_conn_websocket.t @@ -0,0 +1,195 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +log_level('info'); +no_root_location(); +worker_connections(1024); +no_shuffle(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->yaml_config) { + my $yaml_config = <<_EOC_; +apisix: + node_listen: 1984 +nginx_config: + http: + lua_shared_dict: + balancer-least-conn: 10m +deployment: + role: data_plane + role_data_plane: + config_provider: yaml +_EOC_ + + $block->set_value("yaml_config", $yaml_config); + } + + my $route = <<_EOC_; +routes: + - upstream_id: 1 + uris: + - /test +#END +_EOC_ + + $block->set_value("apisix_yaml", $block->apisix_yaml . $route); + + if (!$block->request) { + $block->set_value("request", "GET /test"); + } +}); + +run_tests; + +__DATA__ + +=== TEST 1: test least_conn balancer with connection state persistence +--- apisix_yaml +upstreams: + - id: 1 + type: least_conn + nodes: + "127.0.0.1:1980": 1 + "127.0.0.1:1981": 1 +--- config + location /test { + content_by_lua_block { + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/test" + + -- Simulate multiple requests to build up connection counts + local results = {} + for i = 1, 10 do + local httpc = http.new() + local res, err = httpc:request_uri(uri) + if res then + table.insert(results, res.status) + end + httpc:close() + end + + ngx.say("requests completed: ", #results) + } + } +--- response_body +requests completed: 10 + + + + +=== TEST 2: test connection count persistence across upstream changes +--- apisix_yaml +upstreams: + - id: 1 + type: least_conn + nodes: + "127.0.0.1:1980": 1 + "127.0.0.1:1981": 1 +--- config + location /test { + content_by_lua_block { + local core = require("apisix.core") + local balancer = require("apisix.balancer.least_conn") + + -- Create a mock upstream configuration + local upstream = { + parent = { + value = { + id = "test_upstream_1" + } + } + } + + local up_nodes = { + ["127.0.0.1:1980"] = 1, + ["127.0.0.1:1981"] = 1 + } + + -- Create first balancer instance + local balancer1 = balancer.new(up_nodes, upstream) + + -- Simulate some connections + local ctx = {} + local server1 = balancer1.get(ctx) + ctx.balancer_server = server1 + + -- Simulate connection completion (this should decrement count) + balancer1.after_balance(ctx, false) + + -- Add a new node to simulate scaling + up_nodes["127.0.0.1:1982"] = 1 + + -- Create new balancer instance (simulating upstream change) + local balancer2 = balancer.new(up_nodes, upstream) + + ngx.say("balancer created successfully with persistent state") + } + } +--- response_body +balancer created successfully with persistent state + + + + +=== TEST 3: test cleanup of stale connection counts +--- apisix_yaml +upstreams: + - id: 1 + type: least_conn + nodes: + "127.0.0.1:1980": 1 + "127.0.0.1:1981": 1 + "127.0.0.1:1982": 1 +--- config + location /test { + content_by_lua_block { + local core = require("apisix.core") + local balancer = require("apisix.balancer.least_conn") + + -- Create a mock upstream configuration + local upstream = { + parent = { + value = { + id = "test_upstream_2" + } + } + } + + local up_nodes = { + ["127.0.0.1:1980"] = 1, + ["127.0.0.1:1981"] = 1, + ["127.0.0.1:1982"] = 1 + } + + -- Create first balancer instance with 3 nodes + local balancer1 = balancer.new(up_nodes, upstream) + + -- Remove one node (simulate scaling down) + up_nodes["127.0.0.1:1982"] = nil + + -- Create new balancer instance (should clean up stale counts) + local balancer2 = balancer.new(up_nodes, upstream) + + ngx.say("stale connection counts cleaned up successfully") + } + } +--- response_body +stale connection counts cleaned up successfully From 197330b4db75e5bc2517a7ad9b155f14557a5a12 Mon Sep 17 00:00:00 2001 From: coder2z Date: Tue, 10 Jun 2025 19:10:30 +0800 Subject: [PATCH 02/15] fix: md lint --- docs/en/latest/balancer-least-conn.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/en/latest/balancer-least-conn.md b/docs/en/latest/balancer-least-conn.md index 6f8dec80ac6d..aa25691ef200 100644 --- a/docs/en/latest/balancer-least-conn.md +++ b/docs/en/latest/balancer-least-conn.md @@ -90,7 +90,7 @@ When a new request is routed: 3. Increment connection count in shared dictionary 4. Update server position in the heap -#### 2. Connection Completion +#### 2. Connection Completion When a request completes: 1. Calculate new score as `(current_count - 1) / weight` 2. Ensure score is not negative (minimum 0) From 78eeadcf4741cd2c95756b087402f92f2cc42eca Mon Sep 17 00:00:00 2001 From: coder2z Date: Fri, 13 Jun 2025 09:54:40 +0800 Subject: [PATCH 03/15] fix: lint+md lint --- apisix/balancer.lua | 5 +++- apisix/balancer/least_conn.lua | 16 ++--------- docs/en/latest/balancer-least-conn.md | 39 +++++++++++++++++++++++++++ docs/zh/latest/balancer-least-conn.md | 39 +++++++++++++++++++++++++++ 4 files changed, 84 insertions(+), 15 deletions(-) diff --git a/apisix/balancer.lua b/apisix/balancer.lua index aa8c88b1dd2f..4e21da0955d2 100644 --- a/apisix/balancer.lua +++ b/apisix/balancer.lua @@ -207,7 +207,10 @@ local function pick_server(route, ctx) -- For least_conn balancer, we still need to use the balancer even with single node -- to track connection counts for future load balancing decisions if up_conf.type == "least_conn" then - core.log.debug("single node with least_conn balancer - still using balancer for connection tracking") + core.log.debug( + "single node with least_conn balancer", + "still using balancer for connection tracking" + ) else core.log.info("single node with ", up_conf.type, " balancer - skipping balancer") local node = up_conf.nodes[1] diff --git a/apisix/balancer/least_conn.lua b/apisix/balancer/least_conn.lua index 93d6eb09b71f..ec39e870cae3 100644 --- a/apisix/balancer/least_conn.lua +++ b/apisix/balancer/least_conn.lua @@ -61,19 +61,6 @@ local function get_server_conn_count(upstream, server) return result end - --- Set the connection count for a server in shared dict -local function set_server_conn_count(upstream, server, count) - local key = get_conn_count_key(upstream, server) - local ok, err = conn_count_dict:set(key, count) - if not ok then - core.log.error("failed to set connection count for ", server, ": ", err) - else - core.log.debug("set connection count for server ", server, " to ", count) - end -end - - -- Increment the connection count for a server local function incr_server_conn_count(upstream, server, delta) local key = get_conn_count_key(upstream, server) @@ -110,7 +97,8 @@ local function cleanup_stale_conn_counts(upstream, current_servers) -- This server is no longer in the upstream, clean it up local ok, delete_err = conn_count_dict:delete(key) if not ok and delete_err then - core.log.error("failed to delete stale connection count for server ", server, ": ", delete_err) + core.log.error("failed to delete stale connection count for server ", + server, ": ", delete_err) else core.log.info("cleaned up stale connection count for server: ", server) end diff --git a/docs/en/latest/balancer-least-conn.md b/docs/en/latest/balancer-least-conn.md index aa25691ef200..88140fe1fff6 100644 --- a/docs/en/latest/balancer-least-conn.md +++ b/docs/en/latest/balancer-least-conn.md @@ -21,6 +21,7 @@ score = (connection_count + 1) / weight ``` Where: + - `connection_count` - Current number of active connections to the server - `weight` - Server weight configuration value @@ -29,18 +30,22 @@ Servers with lower scores are preferred for new connections. The `+1` in the for ### Connection State Management #### Real-time Updates + - **Connection Start**: Connection count incremented, score updated to `(new_count + 1) / weight` - **Connection End**: Connection count decremented, score updated to `(new_count - 1) / weight` - **Heap Maintenance**: Binary heap automatically reorders servers by score - **Score Protection**: Prevents negative scores by setting minimum score to 0 #### Persistence Strategy + Connection counts are stored in nginx shared dictionary with structured keys: + ``` conn_count:{upstream_id}:{server_address} ``` This ensures connection state survives: + - Upstream configuration changes - Balancer instance recreation - Worker process restarts @@ -51,6 +56,7 @@ This ensures connection state survives: #### Persistent State Management The balancer uses nginx shared dictionary (`balancer-least-conn`) to maintain connection counts across: + - Balancer instance recreations - Upstream configuration changes - Worker process restarts @@ -59,11 +65,13 @@ The balancer uses nginx shared dictionary (`balancer-least-conn`) to maintain co #### Connection Count Keys Connection counts are stored using structured keys: + ``` conn_count:{upstream_id}:{server_address} ``` Where: + - `upstream_id` - Unique identifier for the upstream configuration - `server_address` - Server address (e.g., "127.0.0.1:8080") @@ -84,21 +92,27 @@ The implementation uses `dkjson.encode` instead of `core.json.encode` to ensure ### Connection Lifecycle #### 1. Connection Establishment + When a new request is routed: + 1. Select server with lowest score from the heap 2. Update server score to `(current_count + 1) / weight` 3. Increment connection count in shared dictionary 4. Update server position in the heap #### 2. Connection Completion + When a request completes: + 1. Calculate new score as `(current_count - 1) / weight` 2. Ensure score is not negative (minimum 0) 3. Decrement connection count in shared dictionary 4. Update server position in the heap #### 3. Cleanup Process + During balancer recreation: + 1. Identify current active servers 2. Remove connection counts for servers no longer in upstream 3. Preserve counts for existing servers @@ -106,11 +120,13 @@ During balancer recreation: ### Data Structures #### Binary Heap + - **Type**: Min-heap based on server scores - **Purpose**: Efficient selection of server with lowest score - **Operations**: O(log n) insertion, deletion, and updates #### Shared Dictionary + - **Name**: `balancer-least-conn` - **Size**: 10MB (configurable) - **Scope**: Shared across all worker processes @@ -148,15 +164,18 @@ upstreams: ## Performance Characteristics ### Time Complexity + - **Server Selection**: O(1) - heap peek operation - **Connection Update**: O(log n) - heap update operation - **Cleanup**: O(k) where k is the number of stored keys ### Memory Usage + - **Per Server**: ~100 bytes (key + value + overhead) - **Total**: Scales linearly with number of servers across all upstreams ### Scalability + - **Servers**: Efficiently handles hundreds of servers per upstream - **Upstreams**: Supports multiple upstreams with isolated connection tracking - **Requests**: Minimal per-request overhead @@ -164,12 +183,14 @@ upstreams: ## Use Cases ### Optimal Scenarios + 1. **WebSocket Applications**: Long-lived connections benefit from accurate load distribution 2. **Variable Processing Times**: Requests with unpredictable duration 3. **Resource-Intensive Operations**: CPU or memory-intensive backend processing 4. **Database Connections**: Connection pooling scenarios ### Considerations + 1. **Short-lived Connections**: May have higher overhead than round-robin for very short requests 2. **Uniform Processing**: Round-robin might be simpler for uniform request processing 3. **Memory Usage**: Requires shared memory for connection state @@ -179,14 +200,17 @@ upstreams: ### Log Messages #### Debug Logs + Enable debug logging to monitor balancer behavior: **Balancer Creation** + ``` creating new least_conn balancer for upstream: upstream_123 ``` **Connection Count Operations** + ``` generated connection count key: conn_count:upstream_123:127.0.0.1:8080 retrieved connection count for 127.0.0.1:8080: 5 @@ -195,23 +219,27 @@ incrementing connection count for 127.0.0.1:8080 by 1, new count: 6 ``` **Server Selection** + ``` selected server: 127.0.0.1:8080 with current score: 1.2 after_balance for server: 127.0.0.1:8080, before_retry: false ``` **Cleanup Operations** + ``` cleaning up stale connection counts for upstream: upstream_123 cleaned up stale connection count for server: 127.0.0.1:8082 ``` #### Initialization + ``` initializing server 127.0.0.1:8080 with weight 1, base_score 1, conn_count 0, final_score 1 ``` #### Errors + ``` failed to set connection count for 127.0.0.1:8080: no memory failed to increment connection count for 127.0.0.1:8080: no memory @@ -220,6 +248,7 @@ failed to increment connection count for 127.0.0.1:8080: no memory ### Shared Dictionary Monitoring Check shared dictionary usage: + ```lua local dict = ngx.shared["balancer-least-conn"] local free_space = dict:free_space() @@ -229,18 +258,23 @@ local capacity = dict:capacity() ## Error Handling ### Missing Shared Dictionary + If the shared dictionary is not available (which should not happen with default configuration), the balancer will fail to initialize with: + ``` shared dict 'balancer-least-conn' not found ``` ### Memory Exhaustion + When shared dictionary runs out of memory: + - Connection count updates will fail - Warning messages will be logged - Balancer continues to function with potentially stale counts ### Recovery Strategies + 1. **Increase Dictionary Size**: Allocate more memory 2. **Cleanup Frequency**: Implement periodic cleanup of stale entries 3. **Monitoring**: Set up alerts for dictionary usage @@ -248,16 +282,19 @@ When shared dictionary runs out of memory: ## Best Practices ### Configuration + 1. **Dictionary Size**: Default 10MB is sufficient for most cases (supports ~100k connections) 2. **Server Weights**: Use appropriate weights to reflect server capacity 3. **Health Checks**: Combine with health checks for robust load balancing ### Monitoring + 1. **Connection Counts**: Monitor for unexpected accumulation 2. **Memory Usage**: Track shared dictionary utilization 3. **Performance**: Measure request distribution effectiveness ### Troubleshooting + 1. **Uneven Distribution**: Check for connection count accumulation 2. **Memory Issues**: Monitor shared dictionary free space 3. **Configuration**: Verify shared dictionary is properly configured @@ -265,11 +302,13 @@ When shared dictionary runs out of memory: ## Migration and Compatibility ### Backward Compatibility + - Graceful degradation when shared dictionary is unavailable - No breaking changes to existing API - Maintains existing behavior patterns ### Upgrade Considerations + 1. **Configuration**: Shared dictionary is automatically configured 2. **Memory**: Default allocation should be sufficient for most use cases 3. **Testing**: Validate load distribution in staging environment diff --git a/docs/zh/latest/balancer-least-conn.md b/docs/zh/latest/balancer-least-conn.md index d11ba08322b5..c766a8523d3c 100644 --- a/docs/zh/latest/balancer-least-conn.md +++ b/docs/zh/latest/balancer-least-conn.md @@ -21,6 +21,7 @@ score = (connection_count + 1) / weight ``` 其中: + - `connection_count` - 当前到服务器的活跃连接数 - `weight` - 服务器权重配置值 @@ -29,18 +30,22 @@ score = (connection_count + 1) / weight ### 连接状态管理 #### 实时更新 + - **连接开始**:连接计数递增,得分更新为 `(new_count + 1) / weight` - **连接结束**:连接计数递减,得分更新为 `(new_count - 1) / weight` - **堆维护**:二进制堆自动按得分重新排序服务器 - **得分保护**:通过设置最小得分为 0 防止出现负分 #### 持久化策略 + 连接计数存储在 nginx 共享字典中,使用结构化键: + ``` conn_count:{upstream_id}:{server_address} ``` 这确保连接状态在以下情况下保持: + - 上游配置变更 - 负载均衡器实例重建 - 工作进程重启 @@ -51,6 +56,7 @@ conn_count:{upstream_id}:{server_address} #### 持久状态管理 负载均衡器使用 nginx 共享字典(`balancer-least-conn`)在以下情况下维护连接计数: + - 负载均衡器实例重建 - 上游配置变更 - 工作进程重启 @@ -59,11 +65,13 @@ conn_count:{upstream_id}:{server_address} #### 连接计数键 连接计数使用结构化键存储: + ``` conn_count:{upstream_id}:{server_address} ``` 其中: + - `upstream_id` - 上游配置的唯一标识符 - `server_address` - 服务器地址(例如:"127.0.0.1:8080") @@ -84,21 +92,27 @@ end ### 连接生命周期 #### 1. 连接建立 + 当路由新请求时: + 1. 从堆中选择得分最低的服务器 2. 将服务器得分更新为 `(current_count + 1) / weight` 3. 在共享字典中递增连接计数 4. 更新堆中服务器的位置 #### 2. 连接完成 + 当请求完成时: + 1. 计算新得分为 `(current_count - 1) / weight` 2. 保证得分不为负(最小为 0) 3. 在共享字典中递减连接计数 4. 更新堆中服务器的位置 #### 3. 清理过程 + 在负载均衡器重建期间: + 1. 识别当前活跃的服务器 2. 移除不再在上游中的服务器的连接计数 3. 保留现有服务器的计数 @@ -106,11 +120,13 @@ end ### 数据结构 #### 二进制堆 + - **类型**:基于服务器得分的最小堆 - **目的**:高效选择得分最低的服务器 - **操作**:O(log n) 插入、删除和更新 #### 共享字典 + - **名称**:`balancer-least-conn` - **大小**:10MB(可配置) - **范围**:在所有工作进程间共享 @@ -148,15 +164,18 @@ upstreams: ## 性能特征 ### 时间复杂度 + - **服务器选择**:O(1) - 堆查看操作 - **连接更新**:O(log n) - 堆更新操作 - **清理**:O(k),其中 k 是存储键的数量 ### 内存使用 + - **每个服务器**:约 100 字节(键 + 值 + 开销) - **总计**:与所有上游的服务器数量线性扩展 ### 可扩展性 + - **服务器**:高效处理每个上游数百个服务器 - **上游**:支持多个上游,具有隔离的连接跟踪 - **请求**:最小的每请求开销 @@ -164,12 +183,14 @@ upstreams: ## 使用场景 ### 最佳场景 + 1. **WebSocket 应用**:长连接受益于准确的负载分布 2. **可变处理时间**:持续时间不可预测的请求 3. **资源密集型操作**:CPU 或内存密集型后端处理 4. **数据库连接**:连接池场景 ### 注意事项 + 1. **短连接**:对于非常短的请求,可能比轮询有更高的开销 2. **统一处理**:对于统一的请求处理,轮询可能更简单 3. **内存使用**:需要共享内存来存储连接状态 @@ -179,14 +200,17 @@ upstreams: ### 日志消息 #### 调试日志 + 启用调试日志来监控负载均衡器行为: **负载均衡器创建** + ``` creating new least_conn balancer for upstream: upstream_123 ``` **连接数操作** + ``` generated connection count key: conn_count:upstream_123:127.0.0.1:8080 retrieved connection count for 127.0.0.1:8080: 5 @@ -195,23 +219,27 @@ incrementing connection count for 127.0.0.1:8080 by 1, new count: 6 ``` **服务器选择** + ``` selected server: 127.0.0.1:8080 with current score: 1.2 after_balance for server: 127.0.0.1:8080, before_retry: false ``` **清理操作** + ``` cleaning up stale connection counts for upstream: upstream_123 cleaned up stale connection count for server: 127.0.0.1:8082 ``` #### 初始化 + ``` initializing server 127.0.0.1:8080 with weight 1, base_score 1, conn_count 0, final_score 1 ``` #### 错误 + ``` failed to set connection count for 127.0.0.1:8080: no memory failed to increment connection count for 127.0.0.1:8080: no memory @@ -220,6 +248,7 @@ failed to increment connection count for 127.0.0.1:8080: no memory ### 共享字典监控 检查共享字典使用情况: + ```lua local dict = ngx.shared["balancer-least-conn"] local free_space = dict:free_space() @@ -229,18 +258,23 @@ local capacity = dict:capacity() ## 错误处理 ### 缺少共享字典 + 如果共享字典不可用(在默认配置下不应该发生),负载均衡器将初始化失败并显示: + ``` shared dict 'balancer-least-conn' not found ``` ### 内存耗尽 + 当共享字典内存不足时: + - 连接计数更新将失败 - 将记录警告消息 - 负载均衡器继续运行,但可能使用过时的计数 ### 恢复策略 + 1. **增加字典大小**:分配更多内存 2. **清理频率**:实现过时条目的定期清理 3. **监控**:为字典使用情况设置警报 @@ -248,16 +282,19 @@ shared dict 'balancer-least-conn' not found ## 最佳实践 ### 配置 + 1. **字典大小**:默认 10MB 对大多数情况足够(支持约 10 万连接) 2. **服务器权重**:使用适当的权重来反映服务器容量 3. **健康检查**:与健康检查结合使用以实现稳健的负载均衡 ### 监控 + 1. **连接计数**:监控意外的累积 2. **内存使用**:跟踪共享字典利用率 3. **性能**:测量请求分布的有效性 ### 故障排除 + 1. **不均匀分布**:检查连接计数累积 2. **内存问题**:监控共享字典可用空间 3. **配置**:验证共享字典是否正确配置 @@ -265,11 +302,13 @@ shared dict 'balancer-least-conn' not found ## 迁移和兼容性 ### 向后兼容性 + - 当共享字典不可用时优雅降级 - 对现有 API 无破坏性更改 - 保持现有行为模式 ### 升级注意事项 + 1. **配置**:共享字典自动配置 2. **内存**:默认分配对大多数用例应该足够 3. **测试**:在测试环境中验证负载分布 From 30ae7042ecc3e3eef66a39196341abfcc9cd3a1e Mon Sep 17 00:00:00 2001 From: coder2z Date: Fri, 13 Jun 2025 14:03:50 +0800 Subject: [PATCH 04/15] fix: lint+md lint --- t/node/least_conn_websocket.t | 195 ---------------------------------- 1 file changed, 195 deletions(-) delete mode 100644 t/node/least_conn_websocket.t diff --git a/t/node/least_conn_websocket.t b/t/node/least_conn_websocket.t deleted file mode 100644 index fbac650aada0..000000000000 --- a/t/node/least_conn_websocket.t +++ /dev/null @@ -1,195 +0,0 @@ -# -# Licensed to the Apache Software Foundation (ASF) under one or more -# contributor license agreements. See the NOTICE file distributed with -# this work for additional information regarding copyright ownership. -# The ASF licenses this file to You under the Apache License, Version 2.0 -# (the "License"); you may not use this file except in compliance with -# the License. You may obtain a copy of the License at -# -# http://www.apache.org/licenses/LICENSE-2.0 -# -# Unless required by applicable law or agreed to in writing, software -# distributed under the License is distributed on an "AS IS" BASIS, -# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -# See the License for the specific language governing permissions and -# limitations under the License. -# -use t::APISIX 'no_plan'; - -repeat_each(1); -log_level('info'); -no_root_location(); -worker_connections(1024); -no_shuffle(); - -add_block_preprocessor(sub { - my ($block) = @_; - - if (!$block->yaml_config) { - my $yaml_config = <<_EOC_; -apisix: - node_listen: 1984 -nginx_config: - http: - lua_shared_dict: - balancer-least-conn: 10m -deployment: - role: data_plane - role_data_plane: - config_provider: yaml -_EOC_ - - $block->set_value("yaml_config", $yaml_config); - } - - my $route = <<_EOC_; -routes: - - upstream_id: 1 - uris: - - /test -#END -_EOC_ - - $block->set_value("apisix_yaml", $block->apisix_yaml . $route); - - if (!$block->request) { - $block->set_value("request", "GET /test"); - } -}); - -run_tests; - -__DATA__ - -=== TEST 1: test least_conn balancer with connection state persistence ---- apisix_yaml -upstreams: - - id: 1 - type: least_conn - nodes: - "127.0.0.1:1980": 1 - "127.0.0.1:1981": 1 ---- config - location /test { - content_by_lua_block { - local http = require "resty.http" - local uri = "http://127.0.0.1:" .. ngx.var.server_port .. "/test" - - -- Simulate multiple requests to build up connection counts - local results = {} - for i = 1, 10 do - local httpc = http.new() - local res, err = httpc:request_uri(uri) - if res then - table.insert(results, res.status) - end - httpc:close() - end - - ngx.say("requests completed: ", #results) - } - } ---- response_body -requests completed: 10 - - - - -=== TEST 2: test connection count persistence across upstream changes ---- apisix_yaml -upstreams: - - id: 1 - type: least_conn - nodes: - "127.0.0.1:1980": 1 - "127.0.0.1:1981": 1 ---- config - location /test { - content_by_lua_block { - local core = require("apisix.core") - local balancer = require("apisix.balancer.least_conn") - - -- Create a mock upstream configuration - local upstream = { - parent = { - value = { - id = "test_upstream_1" - } - } - } - - local up_nodes = { - ["127.0.0.1:1980"] = 1, - ["127.0.0.1:1981"] = 1 - } - - -- Create first balancer instance - local balancer1 = balancer.new(up_nodes, upstream) - - -- Simulate some connections - local ctx = {} - local server1 = balancer1.get(ctx) - ctx.balancer_server = server1 - - -- Simulate connection completion (this should decrement count) - balancer1.after_balance(ctx, false) - - -- Add a new node to simulate scaling - up_nodes["127.0.0.1:1982"] = 1 - - -- Create new balancer instance (simulating upstream change) - local balancer2 = balancer.new(up_nodes, upstream) - - ngx.say("balancer created successfully with persistent state") - } - } ---- response_body -balancer created successfully with persistent state - - - - -=== TEST 3: test cleanup of stale connection counts ---- apisix_yaml -upstreams: - - id: 1 - type: least_conn - nodes: - "127.0.0.1:1980": 1 - "127.0.0.1:1981": 1 - "127.0.0.1:1982": 1 ---- config - location /test { - content_by_lua_block { - local core = require("apisix.core") - local balancer = require("apisix.balancer.least_conn") - - -- Create a mock upstream configuration - local upstream = { - parent = { - value = { - id = "test_upstream_2" - } - } - } - - local up_nodes = { - ["127.0.0.1:1980"] = 1, - ["127.0.0.1:1981"] = 1, - ["127.0.0.1:1982"] = 1 - } - - -- Create first balancer instance with 3 nodes - local balancer1 = balancer.new(up_nodes, upstream) - - -- Remove one node (simulate scaling down) - up_nodes["127.0.0.1:1982"] = nil - - -- Create new balancer instance (should clean up stale counts) - local balancer2 = balancer.new(up_nodes, upstream) - - ngx.say("stale connection counts cleaned up successfully") - } - } ---- response_body -stale connection counts cleaned up successfully From cec5172287fb69eeb4f198bc276bff372a026212 Mon Sep 17 00:00:00 2001 From: coder2z Date: Fri, 13 Jun 2025 14:12:59 +0800 Subject: [PATCH 05/15] fix: lint+md lint --- docs/en/latest/balancer-least-conn.md | 30 +++++++++++++++++++++++++++ docs/en/latest/config.json | 4 ++++ docs/zh/latest/balancer-least-conn.md | 30 +++++++++++++++++++++++++++ docs/zh/latest/config.json | 4 ++++ 4 files changed, 68 insertions(+) diff --git a/docs/en/latest/balancer-least-conn.md b/docs/en/latest/balancer-least-conn.md index 88140fe1fff6..d7490bb5d3cc 100644 --- a/docs/en/latest/balancer-least-conn.md +++ b/docs/en/latest/balancer-least-conn.md @@ -1,3 +1,33 @@ +--- +title: Least Connection Load Balancer +keywords: + - APISIX + - API Gateway + - Routing + - Least Connection + - Upstream +description: This document introduces the Least Connection Load Balancer (`least_conn`) in Apache APISIX, including its working principle, configuration methods, and use cases. +--- + + + # Least Connection Load Balancer ## Overview diff --git a/docs/en/latest/config.json b/docs/en/latest/config.json index cae7cee33988..a8c13f8d0abe 100644 --- a/docs/en/latest/config.json +++ b/docs/en/latest/config.json @@ -307,6 +307,10 @@ { "type": "doc", "id": "debug-mode" + }, + { + "type": "doc", + "id": "balancer-least-conn" } ] }, diff --git a/docs/zh/latest/balancer-least-conn.md b/docs/zh/latest/balancer-least-conn.md index c766a8523d3c..0e9df07a4c8d 100644 --- a/docs/zh/latest/balancer-least-conn.md +++ b/docs/zh/latest/balancer-least-conn.md @@ -1,3 +1,33 @@ +--- +title: 最少连接负载均衡器 +keywords: + - APISIX + - API 网关 + - 路由 + - 最小连接 + - 上游 +description: 本文介绍了 Apache APISIX 中的最少连接负载均衡器(`least_conn`),包括其工作原理、配置方法和使用场景。 +--- + + + # 最少连接负载均衡器 ## 概述 diff --git a/docs/zh/latest/config.json b/docs/zh/latest/config.json index 437e53be802f..62876b82e8bf 100644 --- a/docs/zh/latest/config.json +++ b/docs/zh/latest/config.json @@ -263,6 +263,10 @@ { "type": "doc", "id": "debug-mode" + }, + { + "type": "doc", + "id": "balancer-least-conn" } ] }, From 007ed0b30d7916ddd015e6bad7cd3bd0c40cd64a Mon Sep 17 00:00:00 2001 From: coder2z Date: Fri, 13 Jun 2025 14:46:02 +0800 Subject: [PATCH 06/15] fix: lint+md lint --- docs/en/latest/balancer-least-conn.md | 2 -- docs/zh/latest/balancer-least-conn.md | 2 -- 2 files changed, 4 deletions(-) diff --git a/docs/en/latest/balancer-least-conn.md b/docs/en/latest/balancer-least-conn.md index d7490bb5d3cc..efe97251b185 100644 --- a/docs/en/latest/balancer-least-conn.md +++ b/docs/en/latest/balancer-least-conn.md @@ -28,8 +28,6 @@ description: This document introduces the Least Connection Load Balancer (`least # --> -# Least Connection Load Balancer - ## Overview The `least_conn` load balancer in Apache APISIX implements a dynamic load balancing algorithm that routes requests to the upstream server with the fewest active connections. This algorithm is particularly effective for scenarios where request processing times vary significantly or when dealing with long-lived connections such as WebSocket connections. diff --git a/docs/zh/latest/balancer-least-conn.md b/docs/zh/latest/balancer-least-conn.md index 0e9df07a4c8d..29b35bc8498b 100644 --- a/docs/zh/latest/balancer-least-conn.md +++ b/docs/zh/latest/balancer-least-conn.md @@ -28,8 +28,6 @@ description: 本文介绍了 Apache APISIX 中的最少连接负载均衡器(` # --> -# 最少连接负载均衡器 - ## 概述 Apache APISIX 中的 `least_conn` 负载均衡器实现了一种动态负载均衡算法,将请求路由到活跃连接数最少的上游服务器。该算法特别适用于请求处理时间差异较大的场景,或处理长连接(如 WebSocket 连接)的情况。 From 164b5bcd81035e151fc4a62345200a04d4d6fc8f Mon Sep 17 00:00:00 2001 From: ashuiyang Date: Fri, 20 Jun 2025 17:41:27 +0800 Subject: [PATCH 07/15] fix: fix test .t lua_shared_dict config --- t/node/least_conn.t | 8 ++++++++ t/node/least_conn2.t | 4 ++++ t/node/priority-balancer/health-checker.t | 6 ++++++ t/node/priority-balancer/sanity.t | 14 ++++++++++++++ t/stream-node/priority-balancer.t | 6 ++++++ 5 files changed, 38 insertions(+) diff --git a/t/node/least_conn.t b/t/node/least_conn.t index 174252fd713d..a63e6c35fb06 100644 --- a/t/node/least_conn.t +++ b/t/node/least_conn.t @@ -58,6 +58,8 @@ run_tests(); __DATA__ === TEST 1: select highest weight +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -73,6 +75,8 @@ proxy request to 127.0.0.1:1980 while connecting to upstream === TEST 2: select least conn +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -116,6 +120,8 @@ proxy request to 127.0.0.1:1980 while connecting to upstream === TEST 3: retry +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -134,6 +140,8 @@ proxy request to 127.0.0.1:1980 while connecting to upstream === TEST 4: retry all nodes, failed +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 diff --git a/t/node/least_conn2.t b/t/node/least_conn2.t index 2a6f07c182e4..c5877a1e7530 100644 --- a/t/node/least_conn2.t +++ b/t/node/least_conn2.t @@ -35,6 +35,8 @@ run_tests(); __DATA__ === TEST 1: upstream across multiple routes should not share the same version +--- http_config +lua_shared_dict balancer-least-conn 10m; --- config location /t { content_by_lua_block { @@ -74,6 +76,8 @@ __DATA__ === TEST 2: hit +--- http_config +lua_shared_dict balancer-least-conn 10m; --- config location /t { content_by_lua_block { diff --git a/t/node/priority-balancer/health-checker.t b/t/node/priority-balancer/health-checker.t index cd970c667d60..61ffe66220ec 100644 --- a/t/node/priority-balancer/health-checker.t +++ b/t/node/priority-balancer/health-checker.t @@ -61,6 +61,8 @@ run_tests(); __DATA__ === TEST 1: all are down detected by health checker +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -113,6 +115,8 @@ proxy request to 127.0.0.2:1979 === TEST 2: use priority as backup (setup rule) +--- http_config +lua_shared_dict balancer-least-conn 10m; --- config location /t { content_by_lua_block { @@ -159,6 +163,8 @@ passed === TEST 3: use priority as backup +--- http_config +lua_shared_dict balancer-least-conn 10m; --- config location /t { content_by_lua_block { diff --git a/t/node/priority-balancer/sanity.t b/t/node/priority-balancer/sanity.t index 11acc7f32554..5186b45afdbe 100644 --- a/t/node/priority-balancer/sanity.t +++ b/t/node/priority-balancer/sanity.t @@ -62,6 +62,8 @@ run_tests(); __DATA__ === TEST 1: sanity +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -105,6 +107,8 @@ proxy request to 127.0.0.1:1980 === TEST 2: all failed +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -135,6 +139,8 @@ proxy request to 127.0.0.1:1979 === TEST 3: default priority is zero +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -165,6 +171,8 @@ proxy request to 127.0.0.1:1980 === TEST 4: least_conn +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -222,6 +230,8 @@ proxy request to 127.0.0.1:1980 while connecting to upstream === TEST 5: roundrobin +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -257,6 +267,8 @@ proxy request to 127.0.0.4:1979 === TEST 6: ewma +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -288,6 +300,8 @@ proxy request to 127.0.0.3:1979 === TEST 7: chash +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 diff --git a/t/stream-node/priority-balancer.t b/t/stream-node/priority-balancer.t index 3d0b8a80d61c..1c6436aa051d 100644 --- a/t/stream-node/priority-balancer.t +++ b/t/stream-node/priority-balancer.t @@ -52,6 +52,8 @@ run_tests(); __DATA__ === TEST 1: sanity +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml stream_routes: - id: 1 @@ -97,6 +99,8 @@ proxy request to 127.0.0.1:1995 === TEST 2: default priority is 0 +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml stream_routes: - id: 1 @@ -140,6 +144,8 @@ proxy request to 127.0.0.1:1995 === TEST 3: fix priority for nonarray nodes +--- http_config +lua_shared_dict balancer-least-conn 10m; --- apisix_yaml stream_routes: - id: 1 From a84e799b1ff01a73d01d81c4a5637e94c9e1e79c Mon Sep 17 00:00:00 2001 From: coder2z Date: Tue, 26 Aug 2025 18:39:41 +0800 Subject: [PATCH 08/15] fix: test file --- apisix/balancer/least_conn.lua | 7 ++++--- t/node/least_conn.t | 2 +- t/node/priority-balancer/sanity.t | 2 +- 3 files changed, 6 insertions(+), 5 deletions(-) diff --git a/apisix/balancer/least_conn.lua b/apisix/balancer/least_conn.lua index ec39e870cae3..827a5ebdfe89 100644 --- a/apisix/balancer/least_conn.lua +++ b/apisix/balancer/least_conn.lua @@ -191,14 +191,15 @@ function _M.new(up_nodes, upstream) return end + -- Decrement connection count in shared dict first + incr_server_conn_count(upstream, server, -1) + -- Then update score based on new connection count local current_conn_count = get_server_conn_count(upstream, server) - info.score = (current_conn_count - 1) / info.weight + info.score = (current_conn_count + 1) / info.weight if info.score < 0 then info.score = 0 -- Prevent negative scores end servers_heap:update(server, info) - -- Decrement connection count in shared dict - incr_server_conn_count(upstream, server, -1) if not before_retry then if ctx.balancer_tried_servers then diff --git a/t/node/least_conn.t b/t/node/least_conn.t index a63e6c35fb06..72c120b6bacd 100644 --- a/t/node/least_conn.t +++ b/t/node/least_conn.t @@ -114,8 +114,8 @@ GET /t qr/proxy request to \S+ while connecting to upstream/ --- grep_error_log_out proxy request to 127.0.0.1:1980 while connecting to upstream -proxy request to 0.0.0.0:1980 while connecting to upstream proxy request to 127.0.0.1:1980 while connecting to upstream +proxy request to 0.0.0.0:1980 while connecting to upstream diff --git a/t/node/priority-balancer/sanity.t b/t/node/priority-balancer/sanity.t index 5186b45afdbe..6adead7a9d31 100644 --- a/t/node/priority-balancer/sanity.t +++ b/t/node/priority-balancer/sanity.t @@ -224,8 +224,8 @@ connect() failed qr/proxy request to \S+:1980 while connecting to upstream/ --- grep_error_log_out proxy request to 127.0.0.1:1980 while connecting to upstream -proxy request to 0.0.0.0:1980 while connecting to upstream proxy request to 127.0.0.1:1980 while connecting to upstream +proxy request to 0.0.0.0:1980 while connecting to upstream From 91f33be5dcdd6b9d9b0d55ed0cf852b32b086238 Mon Sep 17 00:00:00 2001 From: coder2z Date: Wed, 27 Aug 2025 14:25:51 +0800 Subject: [PATCH 09/15] fix: test file --- t/node/least_conn.t | 10 +--------- t/node/least_conn2.t | 4 ---- t/node/priority-balancer/health-checker.t | 6 ------ t/node/priority-balancer/sanity.t | 16 +--------------- t/stream-node/priority-balancer.t | 6 ------ 5 files changed, 2 insertions(+), 40 deletions(-) diff --git a/t/node/least_conn.t b/t/node/least_conn.t index 72c120b6bacd..174252fd713d 100644 --- a/t/node/least_conn.t +++ b/t/node/least_conn.t @@ -58,8 +58,6 @@ run_tests(); __DATA__ === TEST 1: select highest weight ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -75,8 +73,6 @@ proxy request to 127.0.0.1:1980 while connecting to upstream === TEST 2: select least conn ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -114,14 +110,12 @@ GET /t qr/proxy request to \S+ while connecting to upstream/ --- grep_error_log_out proxy request to 127.0.0.1:1980 while connecting to upstream -proxy request to 127.0.0.1:1980 while connecting to upstream proxy request to 0.0.0.0:1980 while connecting to upstream +proxy request to 127.0.0.1:1980 while connecting to upstream === TEST 3: retry ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -140,8 +134,6 @@ proxy request to 127.0.0.1:1980 while connecting to upstream === TEST 4: retry all nodes, failed ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 diff --git a/t/node/least_conn2.t b/t/node/least_conn2.t index c5877a1e7530..2a6f07c182e4 100644 --- a/t/node/least_conn2.t +++ b/t/node/least_conn2.t @@ -35,8 +35,6 @@ run_tests(); __DATA__ === TEST 1: upstream across multiple routes should not share the same version ---- http_config -lua_shared_dict balancer-least-conn 10m; --- config location /t { content_by_lua_block { @@ -76,8 +74,6 @@ lua_shared_dict balancer-least-conn 10m; === TEST 2: hit ---- http_config -lua_shared_dict balancer-least-conn 10m; --- config location /t { content_by_lua_block { diff --git a/t/node/priority-balancer/health-checker.t b/t/node/priority-balancer/health-checker.t index 88dd7ead97a1..1348c9cf6932 100644 --- a/t/node/priority-balancer/health-checker.t +++ b/t/node/priority-balancer/health-checker.t @@ -61,8 +61,6 @@ run_tests(); __DATA__ === TEST 1: all are down detected by health checker ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -117,8 +115,6 @@ proxy request to 127.0.0.2:1979 === TEST 2: use priority as backup (setup rule) ---- http_config -lua_shared_dict balancer-least-conn 10m; --- config location /t { content_by_lua_block { @@ -165,8 +161,6 @@ passed === TEST 3: use priority as backup ---- http_config -lua_shared_dict balancer-least-conn 10m; --- config location /t { content_by_lua_block { diff --git a/t/node/priority-balancer/sanity.t b/t/node/priority-balancer/sanity.t index 6adead7a9d31..11acc7f32554 100644 --- a/t/node/priority-balancer/sanity.t +++ b/t/node/priority-balancer/sanity.t @@ -62,8 +62,6 @@ run_tests(); __DATA__ === TEST 1: sanity ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -107,8 +105,6 @@ proxy request to 127.0.0.1:1980 === TEST 2: all failed ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -139,8 +135,6 @@ proxy request to 127.0.0.1:1979 === TEST 3: default priority is zero ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -171,8 +165,6 @@ proxy request to 127.0.0.1:1980 === TEST 4: least_conn ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -224,14 +216,12 @@ connect() failed qr/proxy request to \S+:1980 while connecting to upstream/ --- grep_error_log_out proxy request to 127.0.0.1:1980 while connecting to upstream -proxy request to 127.0.0.1:1980 while connecting to upstream proxy request to 0.0.0.0:1980 while connecting to upstream +proxy request to 127.0.0.1:1980 while connecting to upstream === TEST 5: roundrobin ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -267,8 +257,6 @@ proxy request to 127.0.0.4:1979 === TEST 6: ewma ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 @@ -300,8 +288,6 @@ proxy request to 127.0.0.3:1979 === TEST 7: chash ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml upstreams: - id: 1 diff --git a/t/stream-node/priority-balancer.t b/t/stream-node/priority-balancer.t index 1c6436aa051d..3d0b8a80d61c 100644 --- a/t/stream-node/priority-balancer.t +++ b/t/stream-node/priority-balancer.t @@ -52,8 +52,6 @@ run_tests(); __DATA__ === TEST 1: sanity ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml stream_routes: - id: 1 @@ -99,8 +97,6 @@ proxy request to 127.0.0.1:1995 === TEST 2: default priority is 0 ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml stream_routes: - id: 1 @@ -144,8 +140,6 @@ proxy request to 127.0.0.1:1995 === TEST 3: fix priority for nonarray nodes ---- http_config -lua_shared_dict balancer-least-conn 10m; --- apisix_yaml stream_routes: - id: 1 From 13ac9c1d5f9c15afb64dca71f0dadd29c7306ab0 Mon Sep 17 00:00:00 2001 From: coder2z Date: Wed, 27 Aug 2025 14:54:08 +0800 Subject: [PATCH 10/15] fix: test file --- apisix/balancer/least_conn.lua | 108 ++++++++++++++++++++++++--------- 1 file changed, 81 insertions(+), 27 deletions(-) diff --git a/apisix/balancer/least_conn.lua b/apisix/balancer/least_conn.lua index 827a5ebdfe89..ea22d879f8d9 100644 --- a/apisix/balancer/least_conn.lua +++ b/apisix/balancer/least_conn.lua @@ -112,33 +112,38 @@ function _M.new(up_nodes, upstream) conn_count_dict = ngx_shared[CONN_COUNT_DICT_NAME] end - if not conn_count_dict then - core.log.error("shared dict '", CONN_COUNT_DICT_NAME, "' not found") - return nil, "shared dict not found" + local use_persistent_counting = conn_count_dict ~= nil + if not use_persistent_counting then + core.log.warn("shared dict '", CONN_COUNT_DICT_NAME, "' not found, using traditional least_conn mode") end local servers_heap = binaryHeap.minUnique(least_score) - -- Clean up stale connection counts for removed servers - cleanup_stale_conn_counts(upstream, up_nodes) + if use_persistent_counting then + -- Clean up stale connection counts for removed servers + cleanup_stale_conn_counts(upstream, up_nodes) + end for server, weight in pairs(up_nodes) do - -- Get the persisted connection count for this server - local conn_count = get_server_conn_count(upstream, server) - -- Score directly reflects weighted connection count - local score = (conn_count + 1) / weight - - core.log.debug("initializing server ", server, - " | weight: ", weight, - " | conn_count: ", conn_count, - " | score: ", score, - " | upstream_id: ", upstream.id or "no-id") + local score + if use_persistent_counting then + -- True least connection mode: use persistent connection counts + local conn_count = get_server_conn_count(upstream, server) + score = (conn_count + 1) / weight + core.log.debug("initializing server ", server, " with persistent counting", + " | weight: ", weight, " | conn_count: ", conn_count, " | score: ", score) + else + -- Fallback mode: use original weighted round-robin behavior + score = 1 / weight + end -- Note: the argument order of insert is different from others servers_heap:insert({ server = server, weight = weight, + effect_weight = 1 / weight, -- For backward compatibility score = score, + use_persistent_counting = use_persistent_counting, }, server) end @@ -176,11 +181,17 @@ function _M.new(up_nodes, upstream) return nil, err end - -- Get current connection count for detailed logging - local current_conn_count = get_server_conn_count(upstream, server) - info.score = (current_conn_count + 1) / info.weight - servers_heap:update(server, info) - incr_server_conn_count(upstream, server, 1) + if info.use_persistent_counting then + -- True least connection mode: update based on persistent connection counts + local current_conn_count = get_server_conn_count(upstream, server) + info.score = (current_conn_count + 1) / info.weight + servers_heap:update(server, info) + incr_server_conn_count(upstream, server, 1) + else + -- Fallback mode: use original weighted round-robin logic + info.score = info.score + info.effect_weight + servers_heap:update(server, info) + end return server end, after_balance = function(ctx, before_retry) @@ -191,13 +202,17 @@ function _M.new(up_nodes, upstream) return end - -- Decrement connection count in shared dict first - incr_server_conn_count(upstream, server, -1) - -- Then update score based on new connection count - local current_conn_count = get_server_conn_count(upstream, server) - info.score = (current_conn_count + 1) / info.weight - if info.score < 0 then - info.score = 0 -- Prevent negative scores + if info.use_persistent_counting then + -- True least connection mode: update based on persistent connection counts + incr_server_conn_count(upstream, server, -1) + local current_conn_count = get_server_conn_count(upstream, server) + info.score = (current_conn_count + 1) / info.weight + if info.score < 0 then + info.score = 0 -- Prevent negative scores + end + else + -- Fallback mode: use original weighted round-robin logic + info.score = info.score - info.effect_weight end servers_heap:update(server, info) @@ -225,5 +240,44 @@ function _M.new(up_nodes, upstream) } end +-- Clean up all connection counts in shared dict (for testing purposes) +local function cleanup_all_conn_counts() + if not conn_count_dict then + conn_count_dict = ngx_shared[CONN_COUNT_DICT_NAME] + end + + if not conn_count_dict then + -- No shared dict available, nothing to cleanup + return + end + + local keys, err = conn_count_dict:get_keys(0) -- Get all keys + if err then + core.log.error("failed to get keys from shared dict during cleanup: ", err) + return + end + + local cleaned_count = 0 + for _, key in ipairs(keys or {}) do + if core.string.has_prefix(key, "conn_count:") then + local ok, delete_err = conn_count_dict:delete(key) + if not ok and delete_err then + core.log.warn("failed to delete connection count key during cleanup: ", key, ", error: ", delete_err) + else + cleaned_count = cleaned_count + 1 + end + end + end + + if cleaned_count > 0 then + core.log.info("cleaned up ", cleaned_count, " connection count entries from shared dict") + end +end + +-- Public function to clean up all connection counts (for testing purposes only) +function _M.cleanup_for_testing() + cleanup_all_conn_counts() +end + return _M From a000604d7f24649295d19c3aa620e36d136dffb2 Mon Sep 17 00:00:00 2001 From: coder2z Date: Wed, 27 Aug 2025 16:07:00 +0800 Subject: [PATCH 11/15] fix: test file --- t/node/least_conn_persistent.t | 269 +++++++++++++++++++++++++++++++++ 1 file changed, 269 insertions(+) create mode 100644 t/node/least_conn_persistent.t diff --git a/t/node/least_conn_persistent.t b/t/node/least_conn_persistent.t new file mode 100644 index 000000000000..39032886ed54 --- /dev/null +++ b/t/node/least_conn_persistent.t @@ -0,0 +1,269 @@ +# +# Licensed to the Apache Software Foundation (ASF) under one or more +# contributor license agreements. See the NOTICE file distributed with +# this work for additional information regarding copyright ownership. +# The ASF licenses this file to You under the Apache License, Version 2.0 +# (the "License"); you may not use this file except in compliance with +# the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. +# +use t::APISIX 'no_plan'; + +repeat_each(1); +log_level('debug'); +no_root_location(); +worker_connections(1024); +no_shuffle(); + +add_block_preprocessor(sub { + my ($block) = @_; + + if (!$block->yaml_config) { + my $yaml_config = <<_EOC_; +apisix: + node_listen: 1984 +deployment: + role: data_plane + role_data_plane: + config_provider: yaml +_EOC_ + + $block->set_value("yaml_config", $yaml_config); + } + + # Add http_config to include shared dict for connection counting + my $http_config = <<_EOC_; + lua_shared_dict balancer-least-conn 10m; +_EOC_ + + $block->set_value("http_config", $http_config); + + my $route = <<_EOC_; +routes: + - upstream_id: 1 + uris: + - /mysleep +#END +_EOC_ + + $block->set_value("apisix_yaml", ($block->apisix_yaml || "") . $route); + + if (!$block->request) { + $block->set_value("request", "GET /mysleep?seconds=0.1"); + } +}); + +run_tests(); + +__DATA__ + +=== TEST 1: test connection counting with persistent shared dict +--- apisix_yaml +upstreams: + - id: 1 + type: least_conn + nodes: + "127.0.0.1:1980": 3 + "0.0.0.0:1980": 2 +--- config + location /t { + content_by_lua_block { + local http = require "resty.http" + local uri = "http://127.0.0.1:" .. ngx.var.server_port + .. "/mysleep?seconds=0.1" + + local t = {} + for i = 1, 3 do + local th = assert(ngx.thread.spawn(function(i) + local httpc = http.new() + local res, err = httpc:request_uri(uri..i, {method = "GET"}) + if not res then + ngx.log(ngx.ERR, err) + return + end + end, i)) + table.insert(t, th) + end + for i, th in ipairs(t) do + ngx.thread.wait(th) + end + } + } +--- request +GET /t +--- grep_error_log eval +qr/proxy request to \S+ while connecting to upstream/ +--- grep_error_log_out +proxy request to 127.0.0.1:1980 while connecting to upstream +proxy request to 127.0.0.1:1980 while connecting to upstream +proxy request to 0.0.0.0:1980 while connecting to upstream + + + +=== TEST 2: verify shared dict availability and connection counting +--- config + location /t { + content_by_lua_block { + -- Check if shared dict is available + local dict = ngx.shared["balancer-least-conn"] + if dict then + ngx.say("shared dict available: true") + ngx.say("shared dict capacity: ", dict:capacity()) + else + ngx.say("shared dict available: false") + return + end + + -- Test balancer creation with connection counting + local least_conn = require("apisix.balancer.least_conn") + local upstream = { + id = "test_conn_counting", + type = "least_conn" + } + local nodes = { + ["10.1.1.1:8080"] = 1, + ["10.1.1.2:8080"] = 1 + } + + -- Clean any existing data + least_conn.cleanup_all() + + -- Create balancer + local balancer = least_conn.new(nodes, upstream) + if balancer then + ngx.say("balancer with connection counting created: true") + + -- Simulate connections + for i = 1, 4 do + local ctx = {} + local server = balancer.get(ctx) + ngx.say("connection ", i, " assigned to a server") + end + + -- Check connection counts in shared dict + local count1 = dict:get("conn_count:test_conn_counting:10.1.1.1:8080") or 0 + local count2 = dict:get("conn_count:test_conn_counting:10.1.1.2:8080") or 0 + ngx.say("final connection counts - server1: ", count1, ", server2: ", count2) + + -- Total connections should be 4 + local total_connections = count1 + count2 + ngx.say("total connections tracked: ", total_connections) + ngx.say("connection counting working: ", total_connections == 4) + ngx.say("connection distribution balanced: ", count1 == 2 and count2 == 2) + + -- Cleanup + least_conn.cleanup_all() + else + ngx.say("balancer with connection counting created: false") + end + } + } +--- request +GET /t +--- response_body +shared dict available: true +shared dict capacity: 10485760 +balancer with connection counting created: true +connection 1 assigned to a server +connection 2 assigned to a server +connection 3 assigned to a server +connection 4 assigned to a server +final connection counts - server1: 2, server2: 2 +total connections tracked: 4 +connection counting working: true +connection distribution balanced: true + + + +=== TEST 3: verify cleanup function exists and works +--- config + location /t { + content_by_lua_block { + local least_conn = require("apisix.balancer.least_conn") + + if type(least_conn.cleanup_all) == "function" then + ngx.say("cleanup function exists: true") + -- Call cleanup function to test it works + least_conn.cleanup_all() + ngx.say("cleanup function executed: true") + else + ngx.say("cleanup function exists: false") + end + } + } +--- request +GET /t +--- response_body +cleanup function exists: true +cleanup function executed: true + + + +=== TEST 4: demonstrate connection counting with weighted nodes +--- config + location /t { + content_by_lua_block { + local least_conn = require("apisix.balancer.least_conn") + local dict = ngx.shared["balancer-least-conn"] + + local upstream = { + id = "test_weighted_counting", + type = "least_conn" + } + + -- Test with different weights: server1 weight=3, server2 weight=1 + local nodes = { + ["172.16.1.1:9000"] = 3, -- higher weight + ["172.16.1.2:9000"] = 1 -- lower weight + } + + -- Clean previous data + least_conn.cleanup_all() + + -- Create balancer + local balancer = least_conn.new(nodes, upstream) + + -- Make several connections + ngx.say("making connections to test weighted least connection:") + for i = 1, 6 do + local ctx = {} + local server = balancer.get(ctx) + ngx.say("connection ", i, " -> ", server) + end + + -- Check final connection counts + local count1 = dict:get("conn_count:test_weighted_counting:172.16.1.1:9000") or 0 + local count2 = dict:get("conn_count:test_weighted_counting:172.16.1.2:9000") or 0 + + ngx.say("final connection counts:") + ngx.say("server1 (weight=3): ", count1, " connections") + ngx.say("server2 (weight=1): ", count2, " connections") + + -- Higher weight server should get more connections + ngx.say("higher weight server got more connections: ", count1 > count2) + + -- Cleanup + least_conn.cleanup_all() + } + } +--- request +GET /t +--- response_body +making connections to test weighted least connection: +connection 1 -> 172.16.1.1:9000 +connection 2 -> 172.16.1.1:9000 +connection 3 -> 172.16.1.1:9000 +connection 4 -> 172.16.1.1:9000 +connection 5 -> 172.16.1.2:9000 +connection 6 -> 172.16.1.2:9000 +final connection counts: +server1 (weight=3): 4 connections +server2 (weight=1): 2 connections +higher weight server got more connections: true From cfa31ad0d3697c139756e73299f8cb2c3f7ea032 Mon Sep 17 00:00:00 2001 From: coder2z Date: Wed, 27 Aug 2025 16:14:31 +0800 Subject: [PATCH 12/15] fix: test file --- apisix/balancer/least_conn.lua | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apisix/balancer/least_conn.lua b/apisix/balancer/least_conn.lua index ea22d879f8d9..ae13d3b235db 100644 --- a/apisix/balancer/least_conn.lua +++ b/apisix/balancer/least_conn.lua @@ -274,8 +274,8 @@ local function cleanup_all_conn_counts() end end --- Public function to clean up all connection counts (for testing purposes only) -function _M.cleanup_for_testing() +-- Public function to clean up all connection counts +function _M.cleanup_all() cleanup_all_conn_counts() end From 900d3ddcfe6cb17176400e3883feeb81f4d883a0 Mon Sep 17 00:00:00 2001 From: coder2z Date: Thu, 28 Aug 2025 16:01:11 +0800 Subject: [PATCH 13/15] fix: lint --- apisix/balancer/least_conn.lua | 6 +++--- t/node/least_conn_persistent.t | 34 +++++++++++++++++----------------- 2 files changed, 20 insertions(+), 20 deletions(-) diff --git a/apisix/balancer/least_conn.lua b/apisix/balancer/least_conn.lua index ae13d3b235db..adb587c90834 100644 --- a/apisix/balancer/least_conn.lua +++ b/apisix/balancer/least_conn.lua @@ -211,7 +211,7 @@ function _M.new(up_nodes, upstream) info.score = 0 -- Prevent negative scores end else - -- Fallback mode: use original weighted round-robin logic + -- Fallback mode: use original weighted round-robin logic info.score = info.score - info.effect_weight end servers_heap:update(server, info) @@ -245,7 +245,7 @@ local function cleanup_all_conn_counts() if not conn_count_dict then conn_count_dict = ngx_shared[CONN_COUNT_DICT_NAME] end - + if not conn_count_dict then -- No shared dict available, nothing to cleanup return @@ -268,7 +268,7 @@ local function cleanup_all_conn_counts() end end end - + if cleaned_count > 0 then core.log.info("cleaned up ", cleaned_count, " connection count entries from shared dict") end diff --git a/t/node/least_conn_persistent.t b/t/node/least_conn_persistent.t index 39032886ed54..d8cc82176b9d 100644 --- a/t/node/least_conn_persistent.t +++ b/t/node/least_conn_persistent.t @@ -120,7 +120,7 @@ proxy request to 0.0.0.0:1980 while connecting to upstream ngx.say("shared dict available: false") return end - + -- Test balancer creation with connection counting local least_conn = require("apisix.balancer.least_conn") local upstream = { @@ -131,33 +131,33 @@ proxy request to 0.0.0.0:1980 while connecting to upstream ["10.1.1.1:8080"] = 1, ["10.1.1.2:8080"] = 1 } - + -- Clean any existing data least_conn.cleanup_all() - + -- Create balancer local balancer = least_conn.new(nodes, upstream) if balancer then ngx.say("balancer with connection counting created: true") - + -- Simulate connections for i = 1, 4 do local ctx = {} local server = balancer.get(ctx) ngx.say("connection ", i, " assigned to a server") end - + -- Check connection counts in shared dict local count1 = dict:get("conn_count:test_conn_counting:10.1.1.1:8080") or 0 local count2 = dict:get("conn_count:test_conn_counting:10.1.1.2:8080") or 0 ngx.say("final connection counts - server1: ", count1, ", server2: ", count2) - + -- Total connections should be 4 local total_connections = count1 + count2 ngx.say("total connections tracked: ", total_connections) ngx.say("connection counting working: ", total_connections == 4) ngx.say("connection distribution balanced: ", count1 == 2 and count2 == 2) - + -- Cleanup least_conn.cleanup_all() else @@ -187,7 +187,7 @@ connection distribution balanced: true location /t { content_by_lua_block { local least_conn = require("apisix.balancer.least_conn") - + if type(least_conn.cleanup_all) == "function" then ngx.say("cleanup function exists: true") -- Call cleanup function to test it works @@ -212,24 +212,24 @@ cleanup function executed: true content_by_lua_block { local least_conn = require("apisix.balancer.least_conn") local dict = ngx.shared["balancer-least-conn"] - + local upstream = { id = "test_weighted_counting", type = "least_conn" } - + -- Test with different weights: server1 weight=3, server2 weight=1 local nodes = { ["172.16.1.1:9000"] = 3, -- higher weight ["172.16.1.2:9000"] = 1 -- lower weight } - + -- Clean previous data least_conn.cleanup_all() - + -- Create balancer local balancer = least_conn.new(nodes, upstream) - + -- Make several connections ngx.say("making connections to test weighted least connection:") for i = 1, 6 do @@ -237,18 +237,18 @@ cleanup function executed: true local server = balancer.get(ctx) ngx.say("connection ", i, " -> ", server) end - + -- Check final connection counts local count1 = dict:get("conn_count:test_weighted_counting:172.16.1.1:9000") or 0 local count2 = dict:get("conn_count:test_weighted_counting:172.16.1.2:9000") or 0 - + ngx.say("final connection counts:") ngx.say("server1 (weight=3): ", count1, " connections") ngx.say("server2 (weight=1): ", count2, " connections") - + -- Higher weight server should get more connections ngx.say("higher weight server got more connections: ", count1 > count2) - + -- Cleanup least_conn.cleanup_all() } From 6161f3c72407d6995e66c1779b5e8748b12a8b47 Mon Sep 17 00:00:00 2001 From: coder2z Date: Thu, 28 Aug 2025 16:14:56 +0800 Subject: [PATCH 14/15] fix: lint --- apisix/balancer/least_conn.lua | 11 ++++++----- 1 file changed, 6 insertions(+), 5 deletions(-) diff --git a/apisix/balancer/least_conn.lua b/apisix/balancer/least_conn.lua index adb587c90834..cf2e912940be 100644 --- a/apisix/balancer/least_conn.lua +++ b/apisix/balancer/least_conn.lua @@ -114,7 +114,8 @@ function _M.new(up_nodes, upstream) local use_persistent_counting = conn_count_dict ~= nil if not use_persistent_counting then - core.log.warn("shared dict '", CONN_COUNT_DICT_NAME, "' not found, using traditional least_conn mode") + core.log.warn("shared dict '", + CONN_COUNT_DICT_NAME, "' not found, using traditional least_conn mode") end local servers_heap = binaryHeap.minUnique(least_score) @@ -208,7 +209,8 @@ function _M.new(up_nodes, upstream) local current_conn_count = get_server_conn_count(upstream, server) info.score = (current_conn_count + 1) / info.weight if info.score < 0 then - info.score = 0 -- Prevent negative scores + -- Prevent negative scores + info.score = 0 end else -- Fallback mode: use original weighted round-robin logic @@ -240,7 +242,6 @@ function _M.new(up_nodes, upstream) } end --- Clean up all connection counts in shared dict (for testing purposes) local function cleanup_all_conn_counts() if not conn_count_dict then conn_count_dict = ngx_shared[CONN_COUNT_DICT_NAME] @@ -262,7 +263,8 @@ local function cleanup_all_conn_counts() if core.string.has_prefix(key, "conn_count:") then local ok, delete_err = conn_count_dict:delete(key) if not ok and delete_err then - core.log.warn("failed to delete connection count key during cleanup: ", key, ", error: ", delete_err) + core.log.warn("failed to delete connection count key during cleanup: ", + key, ", error: ", delete_err) else cleaned_count = cleaned_count + 1 end @@ -274,7 +276,6 @@ local function cleanup_all_conn_counts() end end --- Public function to clean up all connection counts function _M.cleanup_all() cleanup_all_conn_counts() end From cf62d64ea111ab7b04e7ac40a0846c60645f0274 Mon Sep 17 00:00:00 2001 From: coder2z Date: Fri, 29 Aug 2025 10:08:23 +0800 Subject: [PATCH 15/15] fix: lint --- apisix/balancer/least_conn.lua | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apisix/balancer/least_conn.lua b/apisix/balancer/least_conn.lua index cf2e912940be..0baa29addf00 100644 --- a/apisix/balancer/least_conn.lua +++ b/apisix/balancer/least_conn.lua @@ -114,7 +114,7 @@ function _M.new(up_nodes, upstream) local use_persistent_counting = conn_count_dict ~= nil if not use_persistent_counting then - core.log.warn("shared dict '", + core.log.warn("shared dict '", CONN_COUNT_DICT_NAME, "' not found, using traditional least_conn mode") end