Skip to content

Commit 915456e

Browse files
committed
fix portpool allocator
Signed-off-by: roc <[email protected]>
1 parent 81ee452 commit 915456e

File tree

2 files changed

+125
-156
lines changed

2 files changed

+125
-156
lines changed

internal/portpool/portpool.go

Lines changed: 80 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package portpool
22

33
import (
44
"context"
5+
"iter"
56
"slices"
67
"sync"
78

@@ -112,68 +113,105 @@ type PortPool struct {
112113
lbList []LBKey
113114
}
114115

116+
func (pp *PortPool) getCache() iter.Seq2[LBKey, map[ProtocolPort]struct{}] {
117+
return func(yield func(LBKey, map[ProtocolPort]struct{}) bool) {
118+
switch pp.LbPolicy {
119+
case constant.LbPolicyInOrder, constant.LbPolicyUniform: // 有序分配 或 均匀分配
120+
if pp.LbPolicy == constant.LbPolicyUniform { // 如果是均匀分配,则需要按已分配数量排序,找分配数量最小的 lb 分配
121+
slices.SortFunc(pp.lbList, func(a, b LBKey) int {
122+
return len(pp.cache[a]) - len(pp.cache[b])
123+
})
124+
}
125+
for _, lbKey := range pp.lbList {
126+
if !yield(lbKey, pp.cache[lbKey]) { // 若 yield 返回 false 则中断
127+
return
128+
}
129+
}
130+
default: // 默认用 Random,按 map 的 key 顺序遍历(golang map 的 key 是无序的,每次遍历顺序随机)
131+
for lbKey, allocated := range pp.cache {
132+
if !yield(lbKey, allocated) { // 若 yield 返回 false 则中断
133+
return
134+
}
135+
}
136+
}
137+
}
138+
}
139+
115140
func (pp *PortPool) IsLbExists(key LBKey) bool {
116141
pp.mu.Lock()
117142
defer pp.mu.Unlock()
118143
_, exists := pp.cache[key]
119144
return exists
120145
}
121146

122-
// 分配指定端口
123-
func (pp *PortPool) AllocatePort(ctx context.Context, quota int64, ports ...ProtocolPort) ([]PortAllocation, bool) {
147+
func (pp *PortPool) AllocatePortFromRange(ctx context.Context, startPort, endPort, quota, segmentLength uint16, protocol string) ([]PortAllocation, bool) {
124148
pp.mu.Lock()
125149
defer pp.mu.Unlock()
126150
if len(pp.cache) == 0 {
127151
return nil, true
128152
}
153+
portNum := 1
154+
if protocol == constant.ProtocolTCPUDP {
155+
portNum = 2
156+
}
129157
quotaExceeded := true
130-
tryAllocate := func(lbKey LBKey, allocated map[ProtocolPort]struct{}) []PortAllocation {
131-
if int64(len(allocated)+len(ports)) > quota { // 监听器数量已满,换下个 lb
132-
return nil
158+
for lb, allocated := range pp.getCache() {
159+
if uint16(len(allocated)+portNum) > quota { // 监听器数量已满,换下个 lb
160+
continue
133161
}
134162
quotaExceeded = false
135-
canAllocate := true
136-
for _, port := range ports { // 确保所有待分配的端口都未被分配
137-
if _, exists := allocated[port.Key()]; exists { // 有端口已被占用,标记无法分配
138-
canAllocate = false
139-
break
163+
for port := startPort; port <= endPort; port += segmentLength { // 遍历该端口池的所有端口号
164+
endPort := uint16(0)
165+
if segmentLength > 1 {
166+
endPort = port + segmentLength - 1
140167
}
141-
}
142-
if canAllocate { // 找到有 lb 可分配端口,分配端口并返回
143-
result := []PortAllocation{}
144-
for _, port := range ports {
145-
allocated[port.Key()] = struct{}{}
146-
pa := PortAllocation{
147-
PortPool: pp,
148-
ProtocolPort: port,
149-
LBKey: lbKey,
150-
}
151-
result = append(result, pa)
168+
if result := pp.tryAllocateFromLb(lb, allocated, port, endPort, protocol); len(result) > 0 {
169+
return result, false
152170
}
153-
return result
154171
}
155-
return nil
156-
}
157-
switch pp.LbPolicy {
158-
case constant.LbPolicyInOrder, constant.LbPolicyUniform: // 有序分配 或 均匀分配
159-
if pp.LbPolicy == constant.LbPolicyUniform { // 如果是均匀分配,则需要按已分配数量排序,找分配数量最小的 lb 分配
160-
slices.SortFunc(pp.lbList, func(a, b LBKey) int {
161-
return len(pp.cache[a]) - len(pp.cache[b])
162-
})
172+
}
173+
return nil, quotaExceeded
174+
}
175+
176+
func (pp *PortPool) tryAllocateFromLb(lbKey LBKey, allocated map[ProtocolPort]struct{}, port, endPort uint16, protocol string) []PortAllocation {
177+
ports := portsToAllocate(port, endPort, protocol)
178+
for _, port := range ports { // 确保所有待分配的端口都未被分配
179+
if _, exists := allocated[port.Key()]; exists { // 有端口已被占用,标记无法分配
180+
return nil
181+
}
182+
}
183+
result := []PortAllocation{}
184+
for _, port := range ports {
185+
allocated[port.Key()] = struct{}{}
186+
pa := PortAllocation{
187+
PortPool: pp,
188+
ProtocolPort: port,
189+
LBKey: lbKey,
163190
}
164-
for _, lbKey := range pp.lbList {
165-
allocated := pp.cache[lbKey]
166-
result := tryAllocate(lbKey, allocated)
167-
if len(result) > 0 {
168-
return result, false
169-
}
191+
result = append(result, pa)
192+
}
193+
return result
194+
}
195+
196+
// 分配指定端口
197+
func (pp *PortPool) AllocatePort(ctx context.Context, quota int64, port, endPort uint16, protocol string) ([]PortAllocation, bool) {
198+
pp.mu.Lock()
199+
defer pp.mu.Unlock()
200+
if len(pp.cache) == 0 {
201+
return nil, true
202+
}
203+
quotaExceeded := true
204+
portNum := 1
205+
if protocol == constant.ProtocolTCPUDP {
206+
portNum = 2
207+
}
208+
for lbKey, allocated := range pp.getCache() {
209+
if int64(len(allocated)+portNum) > quota { // 监听器数量已满,换下个 lb
210+
continue
170211
}
171-
default: // 默认用 Random,按 map 的 key 顺序遍历(golang map 的 key 是无序的,每次遍历顺序随机)
172-
for lbKey, allocated := range pp.cache {
173-
result := tryAllocate(lbKey, allocated)
174-
if len(result) > 0 {
175-
return result, false
176-
}
212+
quotaExceeded = false
213+
if result := pp.tryAllocateFromLb(lbKey, allocated, port, endPort, protocol); len(result) > 0 {
214+
return result, false
177215
}
178216
}
179217
// 所有 lb 都无法分配此端口,返回空结果

internal/portpool/portpools.go

Lines changed: 45 additions & 114 deletions
Original file line numberDiff line numberDiff line change
@@ -38,82 +38,47 @@ func (pp PortPools) Names() string {
3838
// 从所有端口池中都分配出指定端口,不同端口池可分配不同端口
3939
func (pp PortPools) allocatePortAcrossPools(
4040
ctx context.Context,
41-
startPort, endPort, quota, segmentLength uint16,
42-
getPortsToAllocate func(port, endPort uint16) (ports []ProtocolPort),
43-
) (PortAllocations, error) {
41+
startPort, endPort, quota, segmentLength uint16, protocol string,
42+
) PortAllocations {
4443
log.FromContext(ctx).V(10).Info("allocatePortAcrossPools", "pools", pp.Names(), "startPort", startPort, "endPort", endPort, "segmentLength", segmentLength)
4544
var allocatedPorts PortAllocations
46-
LOOP_POOL:
4745
for _, pool := range pp { // 遍历所有端口池(由于不需要保证所有端口池的端口号相同,因此外层循环直接遍历端口池)
48-
for port := startPort; port <= endPort; port += segmentLength { // 遍历该端口池的所有端口号
49-
select {
50-
case <-ctx.Done():
51-
allocatedPorts.Release()
52-
if err := ctx.Err(); err != nil {
53-
return nil, errors.WithStack(err)
54-
}
55-
return nil, nil
56-
default:
57-
}
58-
endPort := uint16(0)
59-
if segmentLength > 1 {
60-
endPort = port + segmentLength - 1
61-
}
62-
portsToAllocate := getPortsToAllocate(port, endPort)
63-
// 尝试分配端口
64-
result, quotaExceeded := pool.AllocatePort(ctx, int64(quota), portsToAllocate...)
65-
if quotaExceeded { // 超配额,不可能分配成功,不再继续尝试
66-
allocatedPorts.Release()
67-
return nil, nil
68-
}
69-
if len(result) > 0 { // 该端口池分配到了端口,追加到结果中
70-
allocatedPorts = append(allocatedPorts, result...)
71-
log.FromContext(ctx).V(10).Info("allocated port", "pool", pool.Name, "port", port)
72-
continue LOOP_POOL
73-
} else {
74-
// 该端口池中无法分配此端口,尝试下一个端口
75-
log.FromContext(ctx).V(10).Info("no available port can be allocated, try next port", "pool", pool.Name, "port", port)
76-
continue
77-
}
46+
// 尝试分配端口
47+
result, quotaExceeded := pool.AllocatePortFromRange(ctx, startPort, endPort, quota, segmentLength, protocol)
48+
if quotaExceeded { // 超配额,不可能分配成功,不再继续尝试
49+
allocatedPorts.Release()
50+
return nil
51+
}
52+
if len(result) > 0 { // 该端口池分配到了端口,追加到结果中
53+
allocatedPorts = append(allocatedPorts, result...)
54+
log.FromContext(ctx).V(10).Info("allocated port", "pool", pool.Name, "ports", allocatedPorts)
55+
} else { // 只要有一个端口池分配失败,则认为无法分配,释放已分配端口,等待 lb 扩容
56+
allocatedPorts.Release()
57+
return nil
7858
}
79-
// 该端口池所有端口都无法分配,不再尝试
80-
allocatedPorts.Release()
81-
return nil, nil
8259
}
8360
// 所有端口池都分配成功,返回结果
84-
return allocatedPorts, nil
61+
return allocatedPorts
8562
}
8663

8764
// 从所有端口池中都分配出指定端口,不同端口池必须分配相同端口
8865
func (pp PortPools) allocateSamePortAcrossPools(
8966
ctx context.Context,
90-
startPort, endPort, quota, segmentLength uint16,
91-
getPortsToAllocate func(port, endPort uint16) (ports []ProtocolPort),
92-
) (PortAllocations, error) {
67+
startPort, endPort, quota, segmentLength uint16, protocol string,
68+
) PortAllocations {
9369
log.FromContext(ctx).Info("allocateSamePortAcrossPools", "pools", pp.Names(), "startPort", startPort, "endPort", endPort, "segmentLength", segmentLength)
9470
LOOP_PORT:
9571
for port := startPort; port <= endPort; port += segmentLength { // 遍历所有端口号,确保所有端口池都能分配到相同端口号
9672
endPort := uint16(0)
9773
if segmentLength > 1 {
9874
endPort = port + segmentLength - 1
9975
}
100-
portsToAllocate := getPortsToAllocate(port, endPort)
10176
var allocatedPorts PortAllocations
10277
for _, pool := range pp { // 在所有端口池中查找可用端口,TCP 和 UDP 端口号相同且都未被分配,则分配此端口号
103-
select {
104-
case <-ctx.Done():
105-
allocatedPorts.Release()
106-
if err := ctx.Err(); err != nil {
107-
return nil, errors.WithStack(err)
108-
}
109-
return nil, nil
110-
default:
111-
}
112-
113-
results, quotaExceeded := pool.AllocatePort(ctx, int64(quota), portsToAllocate...)
78+
results, quotaExceeded := pool.AllocatePort(ctx, int64(quota), port, endPort, protocol)
11479
if quotaExceeded {
11580
allocatedPorts.Release()
116-
return nil, nil
81+
return nil
11782
}
11883
if len(results) > 0 {
11984
// 此端口池分配到了此端口,追加到结果中
@@ -124,10 +89,10 @@ LOOP_PORT:
12489
}
12590
}
12691
// 分配结束,返回结果可能为空)
127-
return allocatedPorts, nil
92+
return allocatedPorts
12893
}
12994
// 所有端口池都无法分配,返回空结果
130-
return nil, nil
95+
return nil
13196
}
13297

13398
var (
@@ -136,6 +101,28 @@ var (
136101
ErrPortPoolNotAllocatable = errors.New("port pool not allocatable")
137102
)
138103

104+
func portsToAllocate(port, endPort uint16, protocol string) (ports []ProtocolPort) {
105+
if protocol == constant.ProtocolTCPUDP {
106+
ports = append(ports, ProtocolPort{
107+
Port: port,
108+
Protocol: constant.ProtocolTCP,
109+
EndPort: endPort,
110+
})
111+
ports = append(ports, ProtocolPort{
112+
Port: port,
113+
Protocol: constant.ProtocolUDP,
114+
EndPort: endPort,
115+
})
116+
} else {
117+
ports = append(ports, ProtocolPort{
118+
Port: port,
119+
Protocol: protocol,
120+
EndPort: endPort,
121+
})
122+
}
123+
return
124+
}
125+
139126
// 从一个或多个端口池中分配一个指定协议的端口,分配成功返回端口号,失败返回错误
140127
func (pp PortPools) AllocatePort(ctx context.Context, protocol string, useSamePortAcrossPools bool) (ports PortAllocations, err error) {
141128
startPort := uint16(0)
@@ -187,66 +174,10 @@ func (pp PortPools) AllocatePort(ctx context.Context, protocol string, useSamePo
187174
segmentLength = 1
188175
}
189176

190-
getPortsToAllocate := func(port, endPort uint16) (ports []ProtocolPort) {
191-
if protocol == constant.ProtocolTCPUDP {
192-
ports = append(ports, ProtocolPort{
193-
Port: port,
194-
Protocol: constant.ProtocolTCP,
195-
EndPort: endPort,
196-
})
197-
ports = append(ports, ProtocolPort{
198-
Port: port,
199-
Protocol: constant.ProtocolUDP,
200-
EndPort: endPort,
201-
})
202-
} else {
203-
ports = append(ports, ProtocolPort{
204-
Port: port,
205-
Protocol: protocol,
206-
EndPort: endPort,
207-
})
208-
}
209-
return
210-
}
211-
212177
if useSamePortAcrossPools {
213-
ports, err = pp.allocateSamePortAcrossPools(ctx, startPort, endPort, quota, segmentLength, getPortsToAllocate)
178+
ports = pp.allocateSamePortAcrossPools(ctx, startPort, endPort, quota, segmentLength, protocol)
214179
} else {
215-
ports, err = pp.allocatePortAcrossPools(ctx, startPort, endPort, quota, segmentLength, getPortsToAllocate)
216-
}
217-
if err != nil {
218-
return nil, errors.WithStack(err)
180+
ports = pp.allocatePortAcrossPools(ctx, startPort, endPort, quota, segmentLength, protocol)
219181
}
220182
return ports, nil
221183
}
222-
223-
// func (pp PortPools) ReleasePort(lbId string, port uint16, protocol string) {
224-
// if protocol == "TCPUDP" {
225-
// tcpPort := ProtocolPort{
226-
// Port: port,
227-
// Protocol: "TCP",
228-
// }
229-
// udpPort := ProtocolPort{
230-
// Port: port,
231-
// Protocol: "UDP",
232-
// }
233-
// for _, portPool := range pp {
234-
// if portCache := portPool.cache[lbId]; portCache != nil {
235-
// delete(portCache, tcpPort)
236-
// delete(portCache, udpPort)
237-
// break
238-
// }
239-
// }
240-
// } else {
241-
// port := ProtocolPort{
242-
// Port: port,
243-
// Protocol: protocol,
244-
// }
245-
// for _, portPool := range pp {
246-
// if portCache := portPool.cache[lbId]; portCache != nil {
247-
// delete(portCache, port)
248-
// break
249-
// }
250-
// }
251-
// }
252-
// }

0 commit comments

Comments
 (0)