Skip to content

Commit 2912280

Browse files
committed
improve port pool allocator
Signed-off-by: roc <[email protected]>
1 parent 853fa95 commit 2912280

File tree

9 files changed

+107
-97
lines changed

9 files changed

+107
-97
lines changed

cmd/app/setup_manager.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"k8s.io/apimachinery/pkg/runtime"
1111
utilruntime "k8s.io/apimachinery/pkg/util/runtime"
1212
clientgoscheme "k8s.io/client-go/kubernetes/scheme"
13+
"k8s.io/client-go/tools/record"
1314
ctrl "sigs.k8s.io/controller-runtime"
1415
"sigs.k8s.io/controller-runtime/pkg/client"
1516
"sigs.k8s.io/controller-runtime/pkg/healthz"
@@ -24,7 +25,7 @@ func init() {
2425
}
2526

2627
func SetupManager(mgr ctrl.Manager) {
27-
if err := mgr.Add(&initCache{mgr.GetClient()}); err != nil {
28+
if err := mgr.Add(&initCache{mgr.GetClient(), mgr.GetEventRecorderFor("clbportpool-controller")}); err != nil {
2829
setupLog.Error(err, "problem add init cache")
2930
os.Exit(1)
3031
}
@@ -44,6 +45,7 @@ func SetupManager(mgr ctrl.Manager) {
4445

4546
type initCache struct {
4647
client.Client
48+
record.EventRecorder
4749
}
4850

4951
func (i *initCache) NeedLeaderElection() bool {
@@ -60,7 +62,7 @@ func (i *initCache) Start(ctx context.Context) error {
6062
}
6163
for index := range ppl.Items {
6264
pp := &ppl.Items[index]
63-
if err := portpool.Allocator.AddPool(portpoolutil.NewPortPool(pp, i.Client)); err != nil {
65+
if err := portpool.Allocator.AddPool(portpoolutil.NewPortPool(pp, i.Client, i.EventRecorder)); err != nil {
6466
return err
6567
}
6668
lbIds := []string{}

internal/controller/clbbinding.go

Lines changed: 6 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -59,18 +59,18 @@ func (r *CLBBindingReconciler[T]) sync(ctx context.Context, bd T) (result ctrl.R
5959
}
6060
// 确保所有端口都已分配且绑定 obj
6161
if newResult, err := r.ensureCLBBinding(ctx, bd); err != nil {
62-
// 如果是等待端口池扩容 CLB,确保状态为 WaitForLB,并重新入队,以便在 CLB 扩容完成后能自动分配端口并绑定 obj
63-
if errors.Is(err, portpool.ErrWaitLBScale) {
64-
result.RequeueAfter = 3 * time.Second
62+
errCause := errors.Cause(err)
63+
switch errCause {
64+
case portpool.ErrNewLBCreated, portpool.ErrNewLBCreating: // 扩容了 lb,或者正在扩容,忽略,因为会自动触发对账
6565
return result, nil
6666
}
6767
// 如果是被云 API 限流(默认每秒 20 qps 限制),1s 后重新入队
68-
if clb.IsRequestLimitExceededError(errors.Cause(err)) {
68+
if clb.IsRequestLimitExceededError(errCause) {
6969
result.RequeueAfter = time.Second
7070
return result, nil
7171
}
7272
// 其它非资源冲突的错误,将错误记录到状态中方便排障
73-
if !apierrors.IsConflict(err) {
73+
if !apierrors.IsConflict(errCause) {
7474
if status.State != networkingv1alpha1.CLBBindingStateFailed {
7575
status.State = networkingv1alpha1.CLBBindingStateFailed
7676
status.Message = err.Error()
@@ -79,7 +79,7 @@ func (r *CLBBindingReconciler[T]) sync(ctx context.Context, bd T) (result ctrl.R
7979
}
8080
}
8181
// lb 已不存在,没必要重新入队对账,保持 Failed 状态即可。
82-
if clb.IsLbIdNotFoundError(errors.Cause(err)) {
82+
if clb.IsLbIdNotFoundError(errCause) {
8383
return result, nil
8484
}
8585
return result, errors.WithStack(err)
@@ -487,8 +487,6 @@ LOOP_PORT:
487487
if err := r.Status().Update(ctx, bd.GetObject()); err != nil {
488488
return result, errors.WithStack(err)
489489
}
490-
result = &ctrl.Result{}
491-
result.RequeueAfter = 2 * time.Second
492490
return result, nil
493491
}
494492
return result, errors.WithStack(err)

internal/controller/clbportpool_controller.go

Lines changed: 1 addition & 52 deletions
Original file line numberDiff line numberDiff line change
@@ -266,50 +266,9 @@ func (r *CLBPortPoolReconciler) ensureLb(ctx context.Context, pool *networkingv1
266266
return nil
267267
}
268268

269-
// 扩容 CLB
270-
func (r *CLBPortPoolReconciler) createLB(ctx context.Context, pool *networkingv1alpha1.CLBPortPool) error {
271-
if !portpoolutil.CanCreateLB(ctx, pool) { // 触发自动创建 CLB,但当前却无法扩容,一般不可能发生,除非代码 BUG
272-
r.Recorder.Event(pool, corev1.EventTypeWarning, "CreateLoadBalancer", "not able to scale clb while scale is been tiggered")
273-
if err := r.ensureState(ctx, pool, networkingv1alpha1.CLBPortPoolStateActive); err != nil {
274-
return errors.WithStack(err)
275-
}
276-
return nil
277-
}
278-
// 创建 CLB
279-
r.Recorder.Event(pool, corev1.EventTypeNormal, "CreateLoadBalancer", "try to create clb")
280-
lbId, err := clb.CreateCLB(ctx, pool.GetRegion(), clb.ConvertCreateLoadBalancerRequest(pool.Spec.AutoCreate.Parameters))
281-
if err != nil {
282-
r.Recorder.Eventf(pool, corev1.EventTypeWarning, "CreateLoadBalancer", "create clb failed: %s", err.Error())
283-
return errors.WithStack(err)
284-
}
285-
r.Recorder.Eventf(pool, corev1.EventTypeNormal, "CreateLoadBalancer", "create clb success: %s", lbId)
286-
if err := portpool.Allocator.AddLbId(pool.Name, lbId); err != nil {
287-
return errors.WithStack(err)
288-
}
289-
addLbIdToStatus := func() error {
290-
p := &networkingv1alpha1.CLBPortPool{}
291-
if err := r.Get(ctx, client.ObjectKeyFromObject(pool), p); err != nil {
292-
return errors.WithStack(err)
293-
}
294-
p.Status.State = networkingv1alpha1.CLBPortPoolStateActive // 创建成功,状态改为 Active,以便再次可分配端口
295-
p.Status.LoadbalancerStatuses = append(p.Status.LoadbalancerStatuses, networkingv1alpha1.LoadBalancerStatus{
296-
LoadbalancerID: lbId,
297-
AutoCreated: util.GetPtr(true),
298-
})
299-
if err := r.Status().Update(ctx, p); err != nil {
300-
return errors.WithStack(err)
301-
}
302-
return nil
303-
}
304-
if err := util.RetryIfPossible(addLbIdToStatus); err != nil {
305-
return errors.WithStack(err)
306-
}
307-
return nil
308-
}
309-
310269
func (r *CLBPortPoolReconciler) ensureAllocatorCache(_ context.Context, pool *networkingv1alpha1.CLBPortPool) error {
311270
if !portpool.Allocator.IsPoolExists(pool.Name) { // 分配器缓存中不存在,则添加
312-
if err := portpool.Allocator.AddPool(portpoolutil.NewPortPool(pool, r.Client)); err != nil {
271+
if err := portpool.Allocator.AddPool(portpoolutil.NewPortPool(pool, r.Client, r.Recorder)); err != nil {
313272
return errors.WithStack(err)
314273
}
315274
}
@@ -326,16 +285,6 @@ func (r *CLBPortPoolReconciler) sync(ctx context.Context, pool *networkingv1alph
326285
}
327286
}
328287

329-
// 被通知扩容CLB时,执行扩容操作
330-
if pool.Status.State == networkingv1alpha1.CLBPortPoolStateScaling {
331-
if err := r.createLB(ctx, pool); err != nil { // 执行扩容
332-
return result, errors.WithStack(err)
333-
}
334-
// 扩容成功,重新对账
335-
result.Requeue = true
336-
return result, nil
337-
}
338-
339288
// 确保分配器缓存中存在该 port pool
340289
if err := r.ensureAllocatorCache(ctx, pool); err != nil {
341290
return result, errors.WithStack(err)

internal/portpool/allocator.go

Lines changed: 0 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ import (
66
"sync"
77

88
"github.com/pkg/errors"
9-
"sigs.k8s.io/controller-runtime/pkg/client"
109
)
1110

1211
// PortAllocator 管理多个端口池
@@ -137,16 +136,6 @@ func (pa *PortAllocator) Release(pool, lbId string, port ProtocolPort) {
137136

138137
var Allocator = NewPortAllocator()
139138

140-
var (
141-
allocator *PortAllocator
142-
allocatorMux sync.Mutex
143-
apiClient client.Client
144-
)
145-
146-
func Init(client client.Client) {
147-
apiClient = client
148-
}
149-
150139
func (pa *PortAllocator) MarkAllocated(poolName string, lbId string, port uint16, endPort *uint16, protocol string) {
151140
pa.mu.Lock()
152141
defer pa.mu.Unlock()

internal/portpool/error.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,8 @@ var (
66
ErrPoolNotFound = errors.New("port pool not found")
77
ErrNoPortAvailable = errors.New("no available port in pool")
88
ErrSegmentLengthNotEqual = errors.New("segment length is not equal across all port pools")
9-
ErrWaitLBScale = errors.New("waiting for clb scale")
9+
ErrNewLBCreated = errors.New("new lb created")
10+
ErrNewLBCreating = errors.New("new lb is creating")
1011
ErrUnknown = errors.New("unknown error")
1112
ErrNoFreeLb = errors.New("no free lb available")
1213
)

internal/portpool/portpool.go

Lines changed: 10 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,13 +41,22 @@ func (pas PortAllocations) Release() {
4141
}
4242
}
4343

44+
type CreateLbResult int
45+
46+
const (
47+
CreateLbResultError CreateLbResult = iota
48+
CreateLbResultSuccess
49+
CreateLbResultForbidden
50+
CreateLbResultCreating
51+
)
52+
4453
type CLBPortPool interface {
4554
GetName() string
4655
GetRegion() string
4756
GetStartPort() uint16
4857
GetEndPort() uint16
4958
GetSegmentLength() uint16
50-
TryNotifyCreateLB(ctx context.Context) (int, error)
59+
TryCreateLB(ctx context.Context) (CreateLbResult, error)
5160
}
5261

5362
// PortPool 管理单个端口池的状态

internal/portpool/portpools.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -78,17 +78,17 @@ LOOP_POOL:
7878
// 该端口池所有端口都无法分配,或者监听器数量超配额,为保证事务性,释放已分配的端口,并尝试通知端口池扩容 CLB 来补充端口池
7979
allocatedPorts.Release()
8080
// 检查端口池是否可以创建 CLB
81-
result, err := pool.TryNotifyCreateLB(ctx)
81+
result, err := pool.TryCreateLB(ctx)
8282
if err != nil {
8383
return nil, errors.WithStack(err)
8484
}
8585
switch result {
86-
case -1: // 不能自动创建,返回端口不足的错误
87-
log.FromContext(ctx).V(10).Info("port is not enough", "pool", pool.GetName())
86+
case CreateLbResultForbidden: // 不能自动创建,返回端口不足的错误
8887
return nil, ErrNoPortAvailable
89-
case 2, 1: // 已经通知过或通知成功,重新入队
90-
log.FromContext(ctx).V(10).Info("wait lb to scale", "pool", pool.GetName())
91-
return nil, ErrWaitLBScale
88+
case CreateLbResultCreating: // 正在创建 CLB,创建完后会自动触发对账
89+
return nil, ErrNewLBCreating
90+
case CreateLbResultSuccess: // 已经通知过或通知成功,重新入队
91+
return nil, ErrNewLBCreated
9292
default: // 不可能的状态
9393
return nil, ErrUnknown
9494
}

internal/portpool/util/portpool.go

Lines changed: 78 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -2,10 +2,15 @@ package util
22

33
import (
44
"context"
5+
"sync"
56

67
"github.com/pkg/errors"
8+
corev1 "k8s.io/api/core/v1"
9+
"k8s.io/client-go/tools/record"
710

811
networkingv1alpha1 "github.com/imroc/tke-extend-network-controller/api/v1alpha1"
12+
"github.com/imroc/tke-extend-network-controller/internal/portpool"
13+
"github.com/imroc/tke-extend-network-controller/pkg/clb"
914
"github.com/imroc/tke-extend-network-controller/pkg/util"
1015
"sigs.k8s.io/controller-runtime/pkg/client"
1116
"sigs.k8s.io/controller-runtime/pkg/log"
@@ -14,24 +19,83 @@ import (
1419
type PortPool struct {
1520
*networkingv1alpha1.CLBPortPool
1621
client.Client
22+
record.EventRecorder
23+
mu sync.Mutex
24+
creating bool
1725
}
1826

19-
func (p *PortPool) TryNotifyCreateLB(ctx context.Context) (int, error) {
27+
func (p *PortPool) TryCreateLB(ctx context.Context) (portpool.CreateLbResult, error) {
28+
if p.creating {
29+
return portpool.CreateLbResultCreating, nil
30+
}
31+
log.FromContext(ctx).V(10).Info("TryCreateLB")
2032
pp := &networkingv1alpha1.CLBPortPool{}
2133
if err := p.Get(ctx, client.ObjectKeyFromObject(p.CLBPortPool), pp); err != nil {
22-
return 0, errors.WithStack(err)
34+
return portpool.CreateLbResultError, errors.WithStack(err)
35+
}
36+
// 还未初始化的端口池,不能创建负载均衡器
37+
if pp.Status.State == "" || pp.Status.State == networkingv1alpha1.CLBPortPoolStatePending {
38+
return portpool.CreateLbResultForbidden, nil
39+
}
40+
// 没有显式启用自动创建的端口池,不能创建负载均衡器
41+
if pp.Spec.AutoCreate == nil || !pp.Spec.AutoCreate.Enabled {
42+
log.FromContext(ctx).V(10).Info("not able to create lb cuz auto create is not enabled")
43+
return portpool.CreateLbResultForbidden, nil
2344
}
24-
if !CanCreateLB(ctx, pp) {
25-
return -1, nil
45+
p.mu.Lock()
46+
defer p.mu.Unlock()
47+
if len(pp.Status.LoadbalancerStatuses) > len(p.CLBPortPool.Status.LoadbalancerStatuses) {
48+
p.CLBPortPool = pp
2649
}
27-
if pp.Status.State == networkingv1alpha1.CLBPortPoolStateScaling { // 已经在扩容了
28-
return 2, nil
50+
// 自动创建的 CLB 数量达到配置上限的端口池,不能创建负载均衡器
51+
if !util.IsZero(pp.Spec.AutoCreate.MaxLoadBalancers) { // 要读结构体中的,cache 获取到不一定是实时最新的
52+
// 检查是否已创建了足够的 CLB
53+
num := uint16(0)
54+
for _, lbStatus := range p.CLBPortPool.Status.LoadbalancerStatuses {
55+
if lbStatus.AutoCreated != nil && *lbStatus.AutoCreated && lbStatus.State != networkingv1alpha1.LoadBalancerStateNotFound {
56+
num++
57+
}
58+
}
59+
// 如果已创建数量已满,则直接返回
60+
if num >= *pp.Spec.AutoCreate.MaxLoadBalancers {
61+
log.FromContext(ctx).V(10).Info("max auto-created loadbalancers is reached", "num", num, "max", *pp.Spec.AutoCreate.MaxLoadBalancers)
62+
return portpool.CreateLbResultForbidden, nil
63+
}
64+
}
65+
p.creating = true
66+
defer func() {
67+
p.creating = false
68+
}()
69+
p.EventRecorder.Event(pp, corev1.EventTypeNormal, "CreateLoadBalancer", "try to create clb")
70+
lbId, err := clb.CreateCLB(ctx, pp.GetRegion(), clb.ConvertCreateLoadBalancerRequest(pp.Spec.AutoCreate.Parameters))
71+
if err != nil {
72+
p.EventRecorder.Eventf(pp, corev1.EventTypeWarning, "CreateLoadBalancer", "create clb failed: %s", err.Error())
73+
return portpool.CreateLbResultError, errors.WithStack(err)
74+
}
75+
p.EventRecorder.Eventf(pp, corev1.EventTypeNormal, "CreateLoadBalancer", "create clb success: %s", lbId)
76+
if err := portpool.Allocator.AddLbId(pp.Name, lbId); err != nil {
77+
return portpool.CreateLbResultError, errors.WithStack(err)
78+
}
79+
addLbIdToStatus := func() error {
80+
pp := &networkingv1alpha1.CLBPortPool{}
81+
if err := p.Client.Get(ctx, client.ObjectKeyFromObject(p.CLBPortPool), pp); err != nil {
82+
return errors.WithStack(err)
83+
}
84+
pp.Status.State = networkingv1alpha1.CLBPortPoolStateActive // 创建成功,状态改为 Active,以便再次可分配端口
85+
pp.Status.LoadbalancerStatuses = append(pp.Status.LoadbalancerStatuses, networkingv1alpha1.LoadBalancerStatus{
86+
LoadbalancerID: lbId,
87+
AutoCreated: util.GetPtr(true),
88+
})
89+
if err := p.Client.Status().Update(ctx, pp); err != nil {
90+
return errors.WithStack(err)
91+
}
92+
p.CLBPortPool = pp
93+
return nil
2994
}
30-
pp.Status.State = networkingv1alpha1.CLBPortPoolStateScaling
31-
if err := p.Client.Status().Update(ctx, pp); err != nil {
32-
return 0, errors.WithStack(err)
95+
if err := util.RetryIfPossible(addLbIdToStatus); err != nil {
96+
return portpool.CreateLbResultError, errors.WithStack(err)
3397
}
34-
return 1, nil // 成功通知扩容
98+
return portpool.CreateLbResultSuccess, nil
3599
}
36100

37101
func (p *PortPool) GetStartPort() uint16 {
@@ -52,10 +116,11 @@ func (p *PortPool) GetSegmentLength() uint16 {
52116
return *p.Spec.SegmentLength
53117
}
54118

55-
func NewPortPool(pp *networkingv1alpha1.CLBPortPool, c client.Client) *PortPool {
119+
func NewPortPool(pp *networkingv1alpha1.CLBPortPool, c client.Client, recorder record.EventRecorder) *PortPool {
56120
return &PortPool{
57-
CLBPortPool: pp,
58-
Client: c,
121+
CLBPortPool: pp,
122+
Client: c,
123+
EventRecorder: recorder,
59124
}
60125
}
61126

internal/webhook/v1alpha1/webhook_suite_test.go

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -112,9 +112,6 @@ var _ = BeforeSuite(func() {
112112
err = SetupCLBPortPoolWebhookWithManager(mgr)
113113
Expect(err).NotTo(HaveOccurred())
114114

115-
err = SetupCLBPodBindingWebhookWithManager(mgr)
116-
Expect(err).NotTo(HaveOccurred())
117-
118115
// +kubebuilder:scaffold:webhook
119116

120117
go func() {

0 commit comments

Comments
 (0)