Skip to content

Commit f549189

Browse files
committed
bug fixes
1. avoid port released multiple times which will cause port alloation conflict. 2. avoid worker stuck on batch describe listeners api call. Signed-off-by: roc <[email protected]>
1 parent 79da4b5 commit f549189

File tree

4 files changed

+79
-33
lines changed

4 files changed

+79
-33
lines changed

internal/constant/constant.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ const (
1111
Finalizer = "networking.cloud.tencent.com/finalizer"
1212
Ratain = "networking.cloud.tencent.com/retain"
1313
LastUpdateTime = "networking.cloud.tencent.com/last-update-time"
14+
FinalizedKey = "networking.cloud.tencent.com/finalized"
1415
ProtocolTCP = "TCP"
1516
ProtocolUDP = "UDP"
1617
ProtocolTCPUDP = "TCPUDP"

internal/controller/clbbinding.go

Lines changed: 43 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -239,8 +239,10 @@ func (r *CLBBindingReconciler[T]) createListener(ctx context.Context, bd clbbind
239239
// 对账单个监听器
240240
func (r *CLBBindingReconciler[T]) ensureListener(ctx context.Context, bd clbbinding.CLBBinding, binding *networkingv1alpha1.PortBindingStatus) (*networkingv1alpha1.PortBindingStatus, error) {
241241
log.FromContext(ctx).V(10).Info("ensureListener", "binding", binding)
242+
log.FromContext(ctx).V(10).Info("end ensureListener", "binding", binding)
242243
// 如果 lb 已被移除,且当前还未绑定成功,则移除该端口绑定,等待重新分配端口(配错 lb导致一直无法绑定成功,更正后,可以触发重新分配以便能够成功绑定)
243244
if bd.GetStatus().State != networkingv1alpha1.CLBBindingStateBound && !portpool.Allocator.IsLbExists(binding.Pool, portpool.NewLBKeyFromBinding(binding)) {
245+
log.FromContext(ctx).V(10).Info("remove clbbinding", "reason", "lb not exists", "binding", binding)
244246
r.Recorder.Eventf(bd.GetObject(), corev1.EventTypeNormal, "PortBindingRemoved", "lb %q not exists, remove port binding (lbPort:%s protocol:%s)", binding.LoadbalancerId, binding.LoadbalancerPort, binding.Protocol)
245247
return nil, nil
246248
}
@@ -264,11 +266,15 @@ func (r *CLBBindingReconciler[T]) ensureListener(ctx context.Context, bd clbbind
264266
}
265267
}
266268
if removeMsg != "" {
269+
log.FromContext(ctx).V(10).Info("remove clbbinding", "message", removeMsg, "binding", binding)
267270
r.Recorder.Eventf(bd.GetObject(), corev1.EventTypeWarning, removeReason, "%s (%s/%s/%d/%s)", removeMsg, binding.Pool, binding.LoadbalancerId, binding.LoadbalancerPort, binding.Protocol)
268271
// 确保监听器被清理
269272
if err := r.cleanupPortBinding(ctx, binding); err != nil {
270273
return binding, errors.WithStack(err)
271274
}
275+
if portpool.Allocator.ReleaseBinding(binding) {
276+
notifyPortPoolReconcile(binding.Pool)
277+
}
272278
return nil, nil
273279
}
274280

@@ -284,7 +290,9 @@ func (r *CLBBindingReconciler[T]) ensureListener(ctx context.Context, bd clbbind
284290
}
285291

286292
// 已有监听器 ID,对账看是否符合预期
293+
log.FromContext(ctx).V(10).Info("try GetListenerById", "binding", binding)
287294
lis, err := clb.GetListenerById(ctx, binding.Region, binding.LoadbalancerId, binding.ListenerId)
295+
log.FromContext(ctx).V(10).Info("GetListenerById", "binding", binding, "lis", lis)
288296
if err != nil {
289297
if clb.IsLoadBalancerNotExistsError(err) { // lb 已删除,通知关联的端口池重新对账
290298
r.Recorder.Eventf(
@@ -319,6 +327,7 @@ func (r *CLBBindingReconciler[T]) ensureListener(ctx context.Context, bd clbbind
319327
return binding, nil
320328
}
321329
} else { // 通过 ID 查到了监听器,对比是否符合预期
330+
log.FromContext(ctx).V(10).Info("found listener id, ensureListenerExpected", "binding", binding, "lis", lis)
322331
binding, err = r.ensureListenerExpected(ctx, binding, lis)
323332
if err != nil {
324333
return binding, errors.WithStack(err)
@@ -622,6 +631,8 @@ func patchResult(ctx context.Context, c client.Client, obj client.Object, result
622631
}
623632

624633
func (r *CLBBindingReconciler[T]) ensurePortBound(ctx context.Context, bd clbbinding.CLBBinding, backend clbbinding.Backend, binding *networkingv1alpha1.PortBindingStatus) error {
634+
log.FromContext(ctx).V(10).Info("ensurePortBound", "binding", *binding)
635+
defer log.FromContext(ctx).V(10).Info("end ensurePortBound", "binding", *binding)
625636
targets, err := clb.DescribeTargetsTryBatch(ctx, binding.Region, binding.LoadbalancerId, binding.ListenerId)
626637
if err != nil {
627638
return errors.WithStack(err)
@@ -736,7 +747,7 @@ LOOP_PORT:
736747
before := time.Now()
737748
allocated, err := portpool.Allocator.Allocate(ctx, port.Pools, port.Protocol, util.GetValue(port.UseSamePortAcrossPools))
738749
cost := time.Since(before)
739-
log.FromContext(ctx).V(3).Info("allocate port", "cost", cost.String(), "allocated", allocated, "protocol", port.Protocol, "pools", port.Pools, "useSamePortAcrossPools", util.GetValue(port.UseSamePortAcrossPools), "err", err)
750+
log.FromContext(ctx).V(3).Info("allocate port", "cost", cost.String(), "allocated", allocated.String(), "protocol", port.Protocol, "pools", port.Pools, "useSamePortAcrossPools", util.GetValue(port.UseSamePortAcrossPools), "err", err)
740751
if err != nil {
741752
return errors.WithStack(err)
742753
}
@@ -781,6 +792,9 @@ LOOP_PORT:
781792
releasePorts()
782793
return errors.WithStack(err)
783794
}
795+
for _, pool := range allocatedPorts.Pools() {
796+
notifyPortPoolReconcile(pool)
797+
}
784798
}
785799
return nil
786800
}
@@ -801,13 +815,21 @@ func (r *CLBBindingReconciler[T]) ensureState(ctx context.Context, bd clbbinding
801815

802816
// 清理 CLBBinding
803817
func (r *CLBBindingReconciler[T]) cleanup(ctx context.Context, bd T) (result ctrl.Result, err error) {
818+
anno := bd.GetAnnotations()
819+
if anno == nil {
820+
anno = make(map[string]string)
821+
}
822+
if anno[constant.FinalizedKey] == "true" {
823+
return result, nil
824+
}
804825
log := log.FromContext(ctx)
805826
status := bd.GetStatus()
806827
log.Info("cleanup "+bd.GetType(), "bindings", len(status.PortBindings))
807828
if err = r.ensureState(ctx, bd, networkingv1alpha1.CLBBindingStateDeleting); err != nil {
808829
return result, errors.WithStack(err)
809830
}
810831
ch := make(chan error)
832+
controllerutil.ContainsFinalizer(bd.GetObject(), constant.Finalizer)
811833
for _, binding := range status.PortBindings {
812834
go func(binding *networkingv1alpha1.PortBindingStatus) {
813835
if err := r.cleanupPortBinding(ctx, binding); err != nil {
@@ -826,6 +848,23 @@ func (r *CLBBindingReconciler[T]) cleanup(ctx context.Context, bd T) (result ctr
826848
if err != nil {
827849
return result, errors.WithStack(err)
828850
}
851+
// 全部解绑完成,打上标记,避免重复释放端口导致冲突(比如刚释放完端口又被其它 pod 分配,然后再次进入cleanup时又被清理,有可能再次分配给其它 pod,导致相同ip:port被重复分配)
852+
anno[constant.FinalizedKey] = "true"
853+
bd.SetAnnotations(anno)
854+
if err := r.Update(ctx, bd.GetObject()); err != nil {
855+
return result, errors.WithStack(err)
856+
}
857+
// 释放已分配端口
858+
log.V(10).Info("release allocated ports", "bindings", status.PortBindings)
859+
pools := make(map[string]struct{})
860+
for _, binding := range status.PortBindings {
861+
if portpool.Allocator.ReleaseBinding(&binding) {
862+
pools[binding.Pool] = struct{}{}
863+
}
864+
}
865+
for pool := range pools {
866+
notifyPortPoolReconcile(pool)
867+
}
829868
// 清理完成,检查 obj 是否是正常状态,如果是,通常是手动删除 CLBBinding 场景,此时触发一次 obj 对账,让被删除的 CLBBinding 重新创建出来
830869
backend, err := bd.GetAssociatedObject(ctx, r.Client)
831870
if err != nil {
@@ -843,22 +882,17 @@ func (r *CLBBindingReconciler[T]) cleanup(ctx context.Context, bd T) (result ctr
843882
}
844883

845884
func (r *CLBBindingReconciler[T]) cleanupPortBinding(ctx context.Context, binding *networkingv1alpha1.PortBindingStatus) error {
846-
releasePort := func() {
847-
if portpool.Allocator.ReleaseBinding(binding) {
848-
log.FromContext(ctx).V(3).Info("release allocated port", "port", binding.LoadbalancerPort, "protocol", binding.Protocol, "pool", binding.Pool, "lb", binding.LoadbalancerId)
849-
notifyPortPoolReconcile(binding.Pool)
850-
}
851-
}
885+
log.FromContext(ctx).V(10).Info("cleanupPortBinding", "binding", binding)
852886
lis, err := clb.GetListenerByIdOrPort(ctx, binding.Region, binding.LoadbalancerId, binding.ListenerId, int64(binding.LoadbalancerPort), binding.Protocol)
853887
if err != nil {
854888
if clb.IsLoadBalancerNotExistsError(err) { // 忽略 lbid 不存在的错误,就当清理成功
855-
releasePort()
889+
log.FromContext(ctx).V(10).Info("ignore cleanup due to lb not exists", "binding", binding)
856890
return nil
857891
}
858892
return errors.WithStack(err)
859893
}
860894
if lis == nil { // 监听器已删除,忽略
861-
releasePort()
895+
log.FromContext(ctx).V(10).Info("ignore cleanup due to listener already deleted", "binding", binding)
862896
return nil
863897
}
864898
// 清理监听器
@@ -868,19 +902,16 @@ func (r *CLBBindingReconciler[T]) cleanupPortBinding(ctx context.Context, bindin
868902
switch errCause {
869903
case clb.ErrListenerNotFound: // 监听器不存在,忽略
870904
log.FromContext(ctx).Info("delete listener while listener not found, ignore")
871-
releasePort()
872905
return nil
873906
default:
874907
if clb.IsLoadBalancerNotExistsError(errCause) { // lb 不存在,忽略
875908
log.FromContext(ctx).Info("lb not found, ignore when cleanup listener")
876-
releasePort()
877909
return nil
878910
}
879911
}
880912
// 其它错误,不释放端口,返回错误
881913
return errors.WithStack(err)
882914
} else { // 没有错误,删除成功
883-
releasePort()
884915
return nil
885916
}
886917
}

internal/portpool/portpool.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@ package portpool
22

33
import (
44
"context"
5+
"fmt"
56
"iter"
67
"reflect"
78
"slices"
@@ -65,6 +66,29 @@ func (pa PortAllocation) Release() {
6566

6667
type PortAllocations []PortAllocation
6768

69+
func (pas PortAllocations) Pools() []string {
70+
if len(pas) == 0 {
71+
return []string{}
72+
}
73+
pools := make(map[string]struct{})
74+
for _, pa := range pas {
75+
pools[pa.PortPool.Name] = struct{}{}
76+
}
77+
ret := []string{}
78+
for pool := range pools {
79+
ret = append(ret, pool)
80+
}
81+
return ret
82+
}
83+
84+
func (pas PortAllocations) String() string {
85+
ret := []string{}
86+
for _, pa := range pas {
87+
ret = append(ret, fmt.Sprintf("%s:%d/%s", pa.LBKey.LbId, pa.Port, pa.Protocol))
88+
}
89+
return fmt.Sprint(ret)
90+
}
91+
6892
func (pas PortAllocations) Release() {
6993
for _, pa := range pas {
7094
pa.Release()

pkg/clb/batch-listener.go

Lines changed: 11 additions & 21 deletions
Original file line numberDiff line numberDiff line change
@@ -173,30 +173,20 @@ func startDescribeListenerProccessor(concurrent int) {
173173
return
174174
}
175175
// 查询成功
176-
taskMap := make(map[string]*DescribeListenerTask)
177-
for _, task := range tasks {
178-
taskMap[task.ListenerId] = task
179-
}
176+
listeners := make(map[string]*clb.Listener)
180177
// 给查到结果的 task 返回 listener 信息
181178
for _, lis := range res.Response.Listeners {
182-
task := taskMap[*lis.ListenerId]
183-
result := &DescribeListenerResult{
184-
Listener: &Listener{
185-
ListenerId: *lis.ListenerId,
186-
Protocol: *lis.Protocol,
187-
Port: *lis.Port,
188-
ListenerName: *lis.ListenerName,
189-
},
190-
}
191-
if lis.EndPort != nil {
192-
result.Listener.EndPort = *lis.EndPort
193-
}
194-
task.Result <- result
195-
delete(taskMap, *lis.ListenerId)
179+
listeners[*lis.ListenerId] = lis
196180
}
197-
// 不存在的 listener 返回空结果
198-
for _, task := range taskMap { // 没找到监听器,返回空结果
199-
task.Result <- &DescribeListenerResult{}
181+
// 确保每个 task 都有返回结果(即便是空)
182+
for _, task := range tasks {
183+
if lis, ok := listeners[task.ListenerId]; ok {
184+
task.Result <- &DescribeListenerResult{
185+
Listener: convertListener(lis),
186+
}
187+
} else {
188+
task.Result <- &DescribeListenerResult{}
189+
}
200190
}
201191
})
202192
}

0 commit comments

Comments
 (0)