Skip to content

Commit 6cd70d8

Browse files
committed
support DeleteLoadBalancerListeners (batch delete listeners)
Signed-off-by: roc <[email protected]>
1 parent 436912b commit 6cd70d8

File tree

5 files changed

+98
-4
lines changed

5 files changed

+98
-4
lines changed

internal/controller/clbbinding.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -340,13 +340,13 @@ func (r *CLBBindingReconciler[T]) ensurePortBound(ctx context.Context, backend c
340340
TargetIP: backend.GetIP(),
341341
TargetPort: int64(binding.Port),
342342
}
343-
targetToDelete := []clb.Target{}
343+
targetToDelete := []*clb.Target{}
344344
alreadyAdded := false
345345
for _, target := range targets {
346346
if *target == backendTarget {
347347
alreadyAdded = true
348348
} else {
349-
targetToDelete = append(targetToDelete, *target)
349+
targetToDelete = append(targetToDelete, target)
350350
log.FromContext(ctx).V(10).Info("remove unexpected target", "got", target, "expect", backendTarget)
351351
}
352352
}
@@ -521,7 +521,7 @@ func (r *CLBBindingReconciler[T]) cleanup(ctx context.Context, bd T) (result ctr
521521
status := bd.GetStatus()
522522
for _, binding := range status.PortBindings {
523523
// 解绑 lb
524-
if _, err := clb.DeleteListenerByPort(ctx, binding.Region, binding.LoadbalancerId, int64(binding.LoadbalancerPort), binding.Protocol); err != nil {
524+
if err := clb.DeleteListenerByIdOrPort(ctx, binding.Region, binding.LoadbalancerId, binding.ListenerId, int64(binding.LoadbalancerPort), binding.Protocol); err != nil {
525525
e := errors.Cause(err)
526526
if clb.IsLbIdNotFoundError(e) { // lb 不存在,忽略
527527
continue

pkg/clb/batch-listener.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -195,3 +195,55 @@ func startDescribeListenerProccessor(concurrent int) {
195195
}
196196
})
197197
}
198+
199+
type DeleteListenerTask struct {
200+
Ctx context.Context
201+
Region string
202+
LbId string
203+
ListenerId string
204+
Result chan error
205+
}
206+
207+
func (t *DeleteListenerTask) GetLbId() string {
208+
return t.LbId
209+
}
210+
211+
func (t *DeleteListenerTask) GetRegion() string {
212+
return t.Region
213+
}
214+
215+
var DeleteListenerChan = make(chan *DeleteListenerTask, 100)
216+
217+
func startDeleteListenerProccessor(concurrent int) {
218+
apiName := "DeleteLoadBalancerListeners"
219+
StartBatchProccessor(concurrent, apiName, DeleteListenerChan, func(region, lbId string, tasks []*DeleteListenerTask) {
220+
startTime := time.Now()
221+
defer func() {
222+
clbLog.V(10).Info(fmt.Sprintf("batch proccess %s performance", apiName), "cost", time.Since(startTime).String())
223+
}()
224+
req := clb.NewDeleteLoadBalancerListenersRequest()
225+
req.LoadBalancerId = &lbId
226+
for _, task := range tasks {
227+
req.ListenerIds = append(req.ListenerIds, &task.ListenerId)
228+
}
229+
client := GetClient(region)
230+
resp, err := client.DeleteLoadBalancerListeners(req)
231+
LogAPI(nil, apiName, req, resp, err)
232+
if err != nil {
233+
for _, task := range tasks {
234+
task.Result <- err
235+
}
236+
return
237+
}
238+
_, err = Wait(context.Background(), region, *resp.Response.RequestId, apiName)
239+
if err != nil {
240+
for _, task := range tasks {
241+
task.Result <- err
242+
}
243+
return
244+
}
245+
for _, task := range tasks {
246+
task.Result <- nil
247+
}
248+
})
249+
}

pkg/clb/batch.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@ func init() {
1919
go startDescribeListenerProccessor(concurrency)
2020
go startDescribeTargetsProccessor(concurrency)
2121
go startDeregisterTargetsProccessor(concurrency)
22+
go startDeleteListenerProccessor(concurrency)
2223
}
2324

2425
const (

pkg/clb/listener.go

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -154,6 +154,34 @@ func CreateListener(ctx context.Context, region, lbId string, port, endPort int6
154154
return
155155
}
156156

157+
func DeleteListenerByIdOrPort(ctx context.Context, region, lbId, listenerId string, port int64, protocol string) error {
158+
if listenerId == "" { // 没有监听器 ID,走慢路径
159+
if _, err := DeleteListenerByPort(ctx, region, lbId, port, protocol); err != nil {
160+
return errors.WithStack(err)
161+
}
162+
}
163+
if err := DeleteListenerById(ctx, region, lbId, listenerId); err != nil {
164+
return errors.WithStack(err)
165+
}
166+
return nil
167+
}
168+
169+
func DeleteListenerById(ctx context.Context, region, lbId, listenerId string) error {
170+
task := &DeleteListenerTask{
171+
Ctx: ctx,
172+
Region: region,
173+
LbId: lbId,
174+
ListenerId: listenerId,
175+
Result: make(chan error),
176+
}
177+
DeleteListenerChan <- task
178+
err := <-task.Result
179+
if err != nil {
180+
return errors.WithStack(err)
181+
}
182+
return nil
183+
}
184+
157185
func DeleteListenerByPort(ctx context.Context, region, lbId string, port int64, protocol string) (id string, err error) {
158186
lis, err := GetListenerByPort(ctx, region, lbId, port, protocol)
159187
if err != nil {

pkg/clb/target.go

Lines changed: 14 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -131,7 +131,20 @@ func DeregisterAllTargets(ctx context.Context, region, lbId, listenerId string)
131131
return nil
132132
}
133133

134-
func DeregisterTargetsForListenerTryBatch(ctx context.Context, region, lbId, listenerId string, targets ...Target) error {
134+
func DeregisterTargetsForListenerTryBatch(ctx context.Context, region, lbId, listenerId string, targets ...*Target) error {
135+
task := &DeregisterTargetsTask{
136+
Ctx: ctx,
137+
Region: region,
138+
LbId: lbId,
139+
ListenerId: listenerId,
140+
Targets: targets,
141+
Result: make(chan error),
142+
}
143+
DeregisterTargetsChan <- task
144+
err := <-task.Result
145+
if err != nil {
146+
return errors.WithStack(err)
147+
}
135148
return nil
136149
}
137150

0 commit comments

Comments
 (0)