Skip to content

Commit 67ce0d4

Browse files
committed
extract clb ApiCall and improve cleanup performance
Signed-off-by: roc <[email protected]>
1 parent af63bcb commit 67ce0d4

File tree

4 files changed

+118
-70
lines changed

4 files changed

+118
-70
lines changed

internal/controller/clbbinding.go

Lines changed: 55 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -678,50 +678,25 @@ func (r *CLBBindingReconciler[T]) cleanup(ctx context.Context, bd T) (result ctr
678678
if err = r.ensureState(ctx, bd, networkingv1alpha1.CLBBindingStateDeleting); err != nil {
679679
return result, errors.WithStack(err)
680680
}
681+
ch := make(chan error)
681682
for _, binding := range status.PortBindings {
682-
releasePort := func() {
683-
log.V(3).Info("release allocated port", "port", binding.LoadbalancerPort, "protocol", binding.Protocol, "pool", binding.Pool, "lb", binding.LoadbalancerId)
684-
portpool.Allocator.Release(binding.Pool, binding.LoadbalancerId, portFromPortBindingStatus(&binding))
685-
}
686-
if lis, err := clb.GetListenerByIdOrPort(ctx, binding.Region, binding.LoadbalancerId, binding.ListenerId, int64(binding.LoadbalancerPort), binding.Protocol); err != nil {
687-
if errors.Is(err, clb.ErrListenerNotFound) { // 监听器已删除,忽略
688-
releasePort()
689-
continue
690-
}
691-
} else {
692-
if lis == nil { // 监听器已删除,忽略
693-
releasePort()
694-
continue
695-
}
696-
}
697-
// 解绑 lb
698-
if err := clb.DeleteListenerByIdOrPort(ctx, binding.Region, binding.LoadbalancerId, binding.ListenerId, int64(binding.LoadbalancerPort), binding.Protocol); err != nil {
699-
e := errors.Cause(err)
700-
switch e {
701-
case clb.ErrListenerNotFound: // 监听器不存在,认为成功,从端口池释放
702-
log.Info("delete listener while listener not found, ignore")
703-
releasePort()
704-
continue
705-
case clb.ErrOtherListenerNotFound: // 因同一批次删除的其它监听器不存在导致删除失败,需重试
706-
log.Error(err, "requeue due to delete listener failed cuz other listener not found")
707-
result.RequeueAfter = 20 * time.Millisecond
708-
return result, nil
709-
}
710-
if clb.IsLoadBalancerNotExistsError(e) { // lb 不存在,忽略
711-
log.Info("lb not found, ignore when cleanup listener")
712-
releasePort()
713-
continue
714-
}
715-
if clb.IsRequestLimitExceededError(e) {
716-
log.Info("requeue due to clb api request limit exceeded when cleanup listener")
717-
result.RequeueAfter = time.Second
718-
return result, nil
683+
go func(binding *networkingv1alpha1.PortBindingStatus) {
684+
if err := r.cleanupPortBinding(ctx, binding); err != nil {
685+
ch <- err
686+
} else {
687+
ch <- nil
719688
}
720-
return result, errors.Wrapf(err, "failed to delete listener (%s/%d/%s/%s)", binding.LoadbalancerId, binding.LoadbalancerPort, binding.Protocol, binding.ListenerId)
721-
} else { // 删除成功,释放端口
722-
releasePort()
689+
}(&binding)
690+
}
691+
for range status.PortBindings {
692+
e := <-ch
693+
if e != nil {
694+
err = multierr.Append(err, e)
723695
}
724696
}
697+
if err != nil {
698+
return result, errors.WithStack(err)
699+
}
725700
// 清理完成,检查 obj 是否是正常状态,如果是,通常是手动删除 CLBBinding 场景,此时触发一次 obj 对账,让被删除的 CLBBinding 重新创建出来
726701
backend, err := bd.GetAssociatedObject(ctx, r.Client)
727702
if err != nil {
@@ -738,6 +713,44 @@ func (r *CLBBindingReconciler[T]) cleanup(ctx context.Context, bd T) (result ctr
738713
return result, nil
739714
}
740715

716+
func (r *CLBBindingReconciler[T]) cleanupPortBinding(ctx context.Context, binding *networkingv1alpha1.PortBindingStatus) error {
717+
lis, err := clb.GetListenerByIdOrPort(ctx, binding.Region, binding.LoadbalancerId, binding.ListenerId, int64(binding.LoadbalancerPort), binding.Protocol)
718+
if err != nil {
719+
return errors.WithStack(err)
720+
}
721+
releasePort := func() {
722+
if portpool.Allocator.Release(binding.Pool, binding.LoadbalancerId, portFromPortBindingStatus(binding)) {
723+
log.FromContext(ctx).V(3).Info("release allocated port", "port", binding.LoadbalancerPort, "protocol", binding.Protocol, "pool", binding.Pool, "lb", binding.LoadbalancerId)
724+
}
725+
}
726+
if lis == nil { // 监听器已删除,忽略
727+
releasePort()
728+
return nil
729+
}
730+
// 解绑 lb
731+
err = clb.DeleteListenerByIdOrPort(ctx, binding.Region, binding.LoadbalancerId, binding.ListenerId, int64(binding.LoadbalancerPort), binding.Protocol)
732+
if err != nil {
733+
errCause := errors.Cause(err)
734+
switch errCause {
735+
case clb.ErrListenerNotFound: // 监听器不存在,忽略
736+
log.FromContext(ctx).Info("delete listener while listener not found, ignore")
737+
releasePort()
738+
return nil
739+
default:
740+
if clb.IsLoadBalancerNotExistsError(errCause) { // lb 不存在,忽略
741+
log.FromContext(ctx).Info("lb not found, ignore when cleanup listener")
742+
releasePort()
743+
return nil
744+
}
745+
}
746+
// 其它错误,不释放端口,返回错误
747+
return errors.WithStack(err)
748+
} else { // 没有错误,删除成功
749+
releasePort()
750+
return nil
751+
}
752+
}
753+
741754
func generatePortsFromAnnotation(anno string) (ports []networkingv1alpha1.PortEntry, err error) {
742755
rd := bufio.NewReader(strings.NewReader(anno))
743756
for {
@@ -886,8 +899,8 @@ func (r *CLBBindingReconciler[T]) syncCLBBinding(ctx context.Context, obj client
886899
}
887900
}
888901
default:
889-
// 没有配置注解,如发现有 CLBBinding,则删除掉
890-
if err == nil {
902+
// 没有配置注解
903+
if err == nil { // 没有错误,说明获取 CLBBinding 成功,删除掉这个多余的 CLBBinding
891904
r.Recorder.Eventf(obj, corev1.EventTypeNormal, "DeleteCLBBinding", "delete %s %s", binding.GetType(), obj.GetName())
892905
if err := r.Delete(ctx, bd); err != nil {
893906
return result, errors.WithStack(err)

pkg/clb/api.go

Lines changed: 36 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,36 @@
1+
package clb
2+
3+
import (
4+
"context"
5+
"time"
6+
7+
"github.com/pkg/errors"
8+
clb "github.com/tencentcloud/tencentcloud-sdk-go/tencentcloud/clb/v20180317"
9+
"sigs.k8s.io/controller-runtime/pkg/log"
10+
)
11+
12+
func ApiCall[Req, Res any](ctx context.Context, apiName, region string, doReq func(ctx context.Context, client *clb.Client) (req Req, res Res, err error)) (res Res, reqCount int, err error) {
13+
client := GetClient(region)
14+
for {
15+
before := time.Now()
16+
req, res, err := doReq(ctx, client)
17+
LogAPI(ctx, apiName, req, res, time.Since(before), err)
18+
reqCount++
19+
if err != nil {
20+
if IsRequestLimitExceededError(err) { // 云 API 限频,重试
21+
log.FromContext(ctx).V(3).Info("clb api request limit exceeded")
22+
select { // context 撤销,不继续重试
23+
case <-ctx.Done():
24+
return res, reqCount, err
25+
default:
26+
}
27+
time.Sleep(time.Second)
28+
continue
29+
} else { // 其它错误,抛给调用者
30+
return res, reqCount, errors.WithStack(err)
31+
}
32+
} else { // 请求成功,返回 response
33+
return res, reqCount, nil
34+
}
35+
}
36+
}

pkg/clb/batch-listener.go

Lines changed: 17 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -159,18 +159,17 @@ func startDescribeListenerProccessor(concurrent int) {
159159
apiName := "DescribeListeners"
160160
StartBatchProccessor(concurrent, apiName, false, DescribeListenerChan, func(region, lbId string, tasks []*DescribeListenerTask) {
161161
startTime := time.Now()
162-
defer func() {
163-
clbLog.V(10).Info(fmt.Sprintf("batch proccess %s performance", apiName), "cost", time.Since(startTime).String())
164-
}()
165-
req := clb.NewDescribeListenersRequest()
166-
req.LoadBalancerId = &lbId
167-
for _, task := range tasks {
168-
req.ListenerIds = append(req.ListenerIds, &task.ListenerId)
169-
}
170-
client := GetClient(region)
171-
before := time.Now()
172-
resp, err := client.DescribeListeners(req)
173-
LogAPI(nil, apiName, req, resp, time.Since(before), err)
162+
res, reqCount, err := ApiCall(context.Background(), "DescribeListeners", region, func(ctx context.Context, client *clb.Client) (req *clb.DescribeListenersRequest, res *clb.DescribeListenersResponse, err error) {
163+
req = clb.NewDescribeListenersRequest()
164+
req.LoadBalancerId = &lbId
165+
for _, task := range tasks {
166+
req.ListenerIds = append(req.ListenerIds, &task.ListenerId)
167+
}
168+
res, err = client.DescribeListenersWithContext(ctx, req)
169+
return
170+
})
171+
clbLog.V(10).Info("batch proccess performance", "api", apiName, "cost", time.Since(startTime).String(), "reqCount", reqCount)
172+
// 查询失败,所有 task 都失败,全部返回错误
174173
if err != nil {
175174
for _, task := range tasks {
176175
task.Result <- &DescribeListenerResult{
@@ -179,11 +178,13 @@ func startDescribeListenerProccessor(concurrent int) {
179178
}
180179
return
181180
}
181+
// 查询成功
182182
taskMap := make(map[string]*DescribeListenerTask)
183183
for _, task := range tasks {
184184
taskMap[task.ListenerId] = task
185185
}
186-
for _, lis := range resp.Response.Listeners {
186+
// 给查到结果的 task 返回 listener 信息
187+
for _, lis := range res.Response.Listeners {
187188
task := taskMap[*lis.ListenerId]
188189
result := &DescribeListenerResult{
189190
Listener: &Listener{
@@ -199,11 +200,9 @@ func startDescribeListenerProccessor(concurrent int) {
199200
task.Result <- result
200201
delete(taskMap, *lis.ListenerId)
201202
}
202-
for listenerId, task := range taskMap {
203-
err = errors.Wrapf(ErrListenerNotFound, "listener %s not found", listenerId)
204-
task.Result <- &DescribeListenerResult{
205-
Err: err,
206-
}
203+
// 不存在的 listener 返回空结果
204+
for _, task := range taskMap { // 没找到监听器,返回空结果
205+
task.Result <- &DescribeListenerResult{}
207206
}
208207
})
209208
}

pkg/clb/listener.go

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -62,19 +62,19 @@ func GetListenerByIdOrPort(ctx context.Context, region, lbId string, listenerId
6262
}
6363

6464
func GetListenerByPort(ctx context.Context, region, lbId string, port int64, protocol string) (lis *Listener, err error) {
65-
req := clb.NewDescribeListenersRequest()
66-
req.Port = &port
67-
req.LoadBalancerId = &lbId
68-
req.Protocol = &protocol
69-
client := GetClient(region)
70-
before := time.Now()
71-
resp, err := client.DescribeListenersWithContext(ctx, req)
72-
LogAPI(ctx, "DescribeListeners", req, resp, time.Since(before), err)
65+
res, _, err := ApiCall(ctx, "DescribeListeners", region, func(ctx context.Context, client *clb.Client) (req *clb.DescribeListenersRequest, res *clb.DescribeListenersResponse, err error) {
66+
req = clb.NewDescribeListenersRequest()
67+
req.Port = &port
68+
req.LoadBalancerId = &lbId
69+
req.Protocol = &protocol
70+
res, err = client.DescribeListenersWithContext(ctx, req)
71+
return
72+
})
7373
if err != nil {
7474
return nil, errors.WithStack(err)
7575
}
76-
if len(resp.Response.Listeners) > 0 { // TODO: 精细化判断数量(超过1个的不可能发生的情况)
77-
lis = convertListener(resp.Response.Listeners[0])
76+
if len(res.Response.Listeners) > 0 {
77+
lis = convertListener(res.Response.Listeners[0])
7878
return
7979
}
8080
return

0 commit comments

Comments
 (0)