Skip to content

Commit

Permalink
api: optimize the backoff for wait eni
Browse files Browse the repository at this point in the history
Signed-off-by: l1b0k <[email protected]>
  • Loading branch information
l1b0k committed Jan 22, 2024
1 parent 8d886de commit 786f792
Show file tree
Hide file tree
Showing 9 changed files with 174 additions and 60 deletions.
74 changes: 53 additions & 21 deletions pkg/aliyun/client/ecs.go
Original file line number Diff line number Diff line change
Expand Up @@ -239,35 +239,67 @@ func (a *OpenAPI) DeleteNetworkInterface(ctx context.Context, eniID string) erro
}

// WaitForNetworkInterface wait status of eni
func (a *OpenAPI) WaitForNetworkInterface(ctx context.Context, eniID string, status string, backoff wait.Backoff, ignoreNotExist bool) (*NetworkInterface, error) {
func (a *OpenAPI) WaitForNetworkInterface(ctx context.Context, eniID string, status string, ignoreNotExist bool) (*NetworkInterface, error) {
var eniInfo *NetworkInterface
if eniID == "" {
return nil, fmt.Errorf("eniID not set")
}
err := wait.ExponentialBackoff(backoff,
func() (done bool, err error) {
eni, err := a.DescribeNetworkInterface(ctx, "", []string{eniID}, "", "", "", nil)
if err != nil {
return false, nil

l := logf.FromContext(ctx).WithValues(
LogFieldAPI, "WaitForNetworkInterface",
LogFieldENIID, eniID,
"status", status,
)

i := 0
conditionFunc := func(ctx context.Context) (done bool, throttle bool, err error) {
eni, err := a.DescribeNetworkInterface(ctx, "", []string{eniID}, "", "", "", nil)
if err != nil {
if apiErr.ErrAssert(apiErr.ErrThrottling, err) {
return false, true, nil
}
if len(eni) == 0 && ignoreNotExist {
return true, apiErr.ErrNotFound
return false, false, nil
}
if len(eni) == 0 && ignoreNotExist {
return true, false, apiErr.ErrNotFound
}
if len(eni) == 1 {
if string(status) != "" && status != eni[0].Status {
return false, false, nil
}
if len(eni) == 1 {
if string(status) != "" && status != eni[0].Status {
return false, nil
}

eniInfo = eni[0]
return true, nil
}
return false, nil
},
)
if err != nil {
return nil, fmt.Errorf("error wait for eni %v to status %s, %w", eniID, status, err)
eniInfo = eni[0]
return true, false, nil
}

i++

return false, false, nil
}

waitSequence := []time.Duration{3 * time.Second, 1 * time.Second, 1 * time.Second, 5 * time.Second, 10 * time.Second, 20 * time.Second, 30 * time.Second}
defer func() {
if i >= 1 {
l.Info("slow api, wait for eni status", "count", i)
}
}()

throttleFunc := func(now time.Duration) time.Duration {
if now < 10*time.Second {
return 10 * time.Second
}
return now
}
bo := backoff.NewBackoffManager(waitSequence, conditionFunc, throttleFunc)
for {
done, err := bo.Next(ctx)
if err != nil {
return nil, fmt.Errorf("wait for eni %v to status %s failed, count %d", eniID, status, i)
}
if done {
return eniInfo, nil
}
}
return eniInfo, nil
}

// AssignPrivateIPAddress assign secondary ip
Expand Down
3 changes: 1 addition & 2 deletions pkg/aliyun/client/fake/fake_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ import (
"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
"github.com/aliyun/alibaba-cloud-sdk-go/services/vpc"
"k8s.io/apimachinery/pkg/util/uuid"
"k8s.io/apimachinery/pkg/util/wait"

"github.com/AliyunContainerService/terway/pkg/aliyun/client"
apiErr "github.com/AliyunContainerService/terway/pkg/aliyun/client/errors"
Expand Down Expand Up @@ -174,7 +173,7 @@ func (o *OpenAPI) DeleteNetworkInterface(ctx context.Context, eniID string) erro
return nil
}

func (o *OpenAPI) WaitForNetworkInterface(ctx context.Context, eniID string, status string, backoff wait.Backoff, ignoreNotExist bool) (*client.NetworkInterface, error) {
func (o *OpenAPI) WaitForNetworkInterface(ctx context.Context, eniID string, status string, ignoreNotExist bool) (*client.NetworkInterface, error) {
eni, err := o.DescribeNetworkInterface(ctx, "", []string{eniID}, "", "", "", nil)
if errors.Is(err, apiErr.ErrNotFound) && ignoreNotExist {
return nil, nil
Expand Down
3 changes: 1 addition & 2 deletions pkg/aliyun/client/interface_default.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"net/netip"

"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
"k8s.io/apimachinery/pkg/util/wait"
)

type ENI interface {
Expand All @@ -16,7 +15,7 @@ type ENI interface {
AttachNetworkInterface(ctx context.Context, eniID, instanceID, trunkENIID string) error
DetachNetworkInterface(ctx context.Context, eniID, instanceID, trunkENIID string) error
DeleteNetworkInterface(ctx context.Context, eniID string) error
WaitForNetworkInterface(ctx context.Context, eniID string, status string, backoff wait.Backoff, ignoreNotExist bool) (*NetworkInterface, error)
WaitForNetworkInterface(ctx context.Context, eniID string, status string, ignoreNotExist bool) (*NetworkInterface, error)
AssignPrivateIPAddress(ctx context.Context, eniID string, count int, idempotent string) ([]netip.Addr, error)
UnAssignPrivateIPAddresses(ctx context.Context, eniID string, ips []netip.Addr) error
AssignIpv6Addresses(ctx context.Context, eniID string, count int, idempotentKey string) ([]netip.Addr, error)
Expand Down
54 changes: 48 additions & 6 deletions pkg/backoff/backoff.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package backoff

import (
"context"
"errors"
"time"

"k8s.io/apimachinery/pkg/util/wait"
Expand Down Expand Up @@ -51,12 +53,6 @@ var backoffMap = map[string]wait.Backoff{
Jitter: 1.3,
Steps: 6,
},
WaitENIStatus: {
Duration: time.Second * 5,
Factor: 1.5,
Jitter: 0.5,
Steps: 8,
},
WaitPodENIStatus: {
Duration: time.Second * 5,
Factor: 2,
Expand Down Expand Up @@ -96,3 +92,49 @@ func Backoff(key string) wait.Backoff {
}
return b
}

type Manager struct {
sequence []time.Duration // backoff sequence
throttleFunc func(now time.Duration) time.Duration // throttle function, when happened, adjust the backoff time

conditionFunc func(ctx context.Context) (done bool, throttle bool, err error)
}

var ErrTimeout = errors.New("timeout")

func NewBackoffManager(sequence []time.Duration, conditionFunc func(ctx context.Context) (done bool, throttle bool, err error), throttleFunc func(now time.Duration) time.Duration) *Manager {
return &Manager{
sequence: sequence,
throttleFunc: throttleFunc,
conditionFunc: conditionFunc,
}
}

func (b *Manager) Next(ctx context.Context) (bool, error) {
if len(b.sequence) == 0 {
return false, ErrTimeout
}

inner, cancel := context.WithTimeout(ctx, b.sequence[0])
<-inner.Done()
cancel()
if ctx.Err() != nil {
return false, ctx.Err()
}

done, throttle, err := b.conditionFunc(ctx)
if err != nil {
return false, err
}
if done {
return true, nil
}

b.sequence = b.sequence[1:]

if throttle && len(b.sequence) > 0 && b.throttleFunc != nil {
b.sequence[0] = b.throttleFunc(b.sequence[0])
}

return false, nil
}
59 changes: 59 additions & 0 deletions pkg/backoff/backoff_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package backoff

import (
"context"
"errors"
"fmt"
"testing"
"time"
)

func TestBackoffManagerNextWithEmptySequence(t *testing.T) {
b := NewBackoffManager([]time.Duration{}, nil, nil)
_, err := b.Next(context.Background())
if err == nil || err.Error() != "timeout" {
t.Errorf("expected timeout error, got %v", err)
}
}

func TestBackoffManagerNextWithCancelledContext(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
cancel()
b := NewBackoffManager([]time.Duration{time.Second}, nil, nil)
_, err := b.Next(ctx)
if err == nil || !errors.Is(err, context.Canceled) {
t.Errorf("expected context canceled error, got %v", err)
}
}

func TestBackoffManagerNextWithConditionFuncError(t *testing.T) {
b := NewBackoffManager([]time.Duration{time.Second}, func(ctx context.Context) (bool, bool, error) {
return false, false, fmt.Errorf("condition error")
}, nil)
_, err := b.Next(context.Background())
if err == nil || err.Error() != "condition error" {
t.Errorf("expected condition error, got %v", err)
}
}

func TestBackoffManagerNextWithConditionFuncDone(t *testing.T) {
b := NewBackoffManager([]time.Duration{time.Second}, func(ctx context.Context) (bool, bool, error) {
return true, false, nil
}, nil)
_, err := b.Next(context.Background())
if err != nil {
t.Errorf("expected no error, got %v", err)
}
}

func TestBackoffManagerNextWithConditionFuncThrottle(t *testing.T) {
b := NewBackoffManager([]time.Duration{time.Second, 2 * time.Second}, func(ctx context.Context) (bool, bool, error) {
return false, true, nil
}, func(now time.Duration) time.Duration {
return 3 * time.Second
})
b.Next(context.Background())
if b.sequence[0] != 3*time.Second {
t.Errorf("expected first sequence to be 3s, got %v", b.sequence[0])
}
}
7 changes: 3 additions & 4 deletions pkg/controller/delegate/deleg.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (

"github.com/aliyun/alibaba-cloud-sdk-go/services/ecs"
"github.com/aliyun/alibaba-cloud-sdk-go/services/vpc"
"k8s.io/apimachinery/pkg/util/wait"
"sigs.k8s.io/controller-runtime/pkg/log"
)

Expand Down Expand Up @@ -92,7 +91,7 @@ func (d *Delegate) DetachNetworkInterface(ctx context.Context, eniID, instanceID
return c.DetachNetworkInterface(ctx, eniID, instanceID, trunkENIID)
}

func (d *Delegate) WaitForNetworkInterface(ctx context.Context, eniID string, status string, backoff wait.Backoff, ignoreNotExist bool) (*aliyunClient.NetworkInterface, error) {
func (d *Delegate) WaitForNetworkInterface(ctx context.Context, eniID string, status string, ignoreNotExist bool) (*aliyunClient.NetworkInterface, error) {
nodeName := common.NodeNameFromCtx(ctx)
l := log.FromContext(ctx)
l.Info("WaitForNetworkInterface", "nodeName", nodeName)
Expand All @@ -103,7 +102,7 @@ func (d *Delegate) WaitForNetworkInterface(ctx context.Context, eniID string, st
if err != nil {
return nil, err
}
return realClient.WaitForNetworkInterface(ctx, eniID, status, backoff, ignoreNotExist)
return realClient.WaitForNetworkInterface(ctx, eniID, status, ignoreNotExist)
}
if err != nil {
return nil, err
Expand All @@ -112,7 +111,7 @@ func (d *Delegate) WaitForNetworkInterface(ctx context.Context, eniID string, st
if err != nil {
return nil, err
}
return c.WaitForNetworkInterface(ctx, eniID, status, backoff, ignoreNotExist)
return c.WaitForNetworkInterface(ctx, eniID, status, ignoreNotExist)
}

func (d *Delegate) DescribeVSwitchByID(ctx context.Context, vSwitchID string) (*vpc.VSwitch, error) {
Expand Down
5 changes: 2 additions & 3 deletions pkg/controller/pod-eni/eni_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
aliyunClient "github.com/AliyunContainerService/terway/pkg/aliyun/client"
apiErr "github.com/AliyunContainerService/terway/pkg/aliyun/client/errors"
"github.com/AliyunContainerService/terway/pkg/apis/network.alibabacloud.com/v1beta1"
"github.com/AliyunContainerService/terway/pkg/backoff"
register "github.com/AliyunContainerService/terway/pkg/controller"
"github.com/AliyunContainerService/terway/pkg/controller/common"
"github.com/AliyunContainerService/terway/pkg/utils"
Expand Down Expand Up @@ -721,7 +720,7 @@ func (m *ReconcilePodENI) attachENI(ctx context.Context, podENI *v1beta1.PodENI)
return err
}

eni, err := m.aliyun.WaitForNetworkInterface(ctx, alloc.ENI.ID, aliyunClient.ENIStatusInUse, backoff.Backoff(backoff.WaitENIStatus), false)
eni, err := m.aliyun.WaitForNetworkInterface(ctx, alloc.ENI.ID, aliyunClient.ENIStatusInUse, false)
if err != nil {
return err
}
Expand Down Expand Up @@ -778,7 +777,7 @@ func (m *ReconcilePodENI) detachMemberENI(ctx context.Context, podENI *v1beta1.P
return err
}

_, err = m.aliyun.WaitForNetworkInterface(ctx, alloc.ENI.ID, aliyunClient.ENIStatusAvailable, backoff.Backoff(backoff.WaitENIStatus), true)
_, err = m.aliyun.WaitForNetworkInterface(ctx, alloc.ENI.ID, aliyunClient.ENIStatusAvailable, true)
if err == nil {
continue
}
Expand Down
14 changes: 4 additions & 10 deletions pkg/controller/pool/eni_mgr.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

aliyunClient "github.com/AliyunContainerService/terway/pkg/aliyun/client"
apiErr "github.com/AliyunContainerService/terway/pkg/aliyun/client/errors"
"github.com/AliyunContainerService/terway/pkg/backoff"
register "github.com/AliyunContainerService/terway/pkg/controller"
"github.com/AliyunContainerService/terway/pkg/controller/common"
"github.com/AliyunContainerService/terway/pkg/controller/vswitch"
Expand Down Expand Up @@ -138,8 +137,6 @@ func (m *Manager) CreateNetworkInterface(ctx context.Context, trunk, erdma bool,
return nil, err
}

time.Sleep(backoff.Backoff(backoff.WaitENIStatus).Duration)

err = m.aliyun.DeleteNetworkInterface(ctx, cached.NetworkInterfaceID)
if err != nil {
l.Error(err, "rob eni, delete failed")
Expand Down Expand Up @@ -240,19 +237,18 @@ func (m *Manager) DescribeNetworkInterface(ctx context.Context, vpcID string, en
return m.aliyun.DescribeNetworkInterface(ctx, vpcID, eniID, instanceID, instanceType, status, tags)
}

func (m *Manager) WaitForNetworkInterface(ctx context.Context, eniID string, status string, bo wait.Backoff, ignoreNotExist bool) (*aliyunClient.NetworkInterface, error) {
func (m *Manager) WaitForNetworkInterface(ctx context.Context, eniID string, status string, ignoreNotExist bool) (*aliyunClient.NetworkInterface, error) {
alloc := m.allocations.Load(eniID)
if alloc != nil && alloc.AllocType == AllocPolicyPreferPool {
return alloc.GetNetworkInterface(), nil
}

time.Sleep(bo.Duration)
realClient, _, err := common.Became(ctx, m.aliyun)
if err != nil {
return nil, err
}

return realClient.WaitForNetworkInterface(ctx, eniID, status, bo, ignoreNotExist)
return realClient.WaitForNetworkInterface(ctx, eniID, status, ignoreNotExist)
}

func (m *Manager) ModifyNetworkInterfaceAttribute(ctx context.Context, eniID string, securityGroupIDs []string) error {
Expand Down Expand Up @@ -386,19 +382,17 @@ func attachNetworkInterface(ctx context.Context, api register.Interface, eniID,
if err != nil {
return nil, err
}
time.Sleep(backoff.Backoff(backoff.WaitENIStatus).Duration)

return api.WaitForNetworkInterface(ctx, eniID, aliyunClient.ENIStatusInUse, backoff.Backoff(backoff.WaitENIStatus), false)
return api.WaitForNetworkInterface(ctx, eniID, aliyunClient.ENIStatusInUse, false)
}

func detachNetworkInterface(ctx context.Context, api register.Interface, eniID, instanceID, trunkENIID string) error {
err := api.DetachNetworkInterface(ctx, eniID, instanceID, trunkENIID)
if err != nil {
return err
}
time.Sleep(backoff.Backoff(backoff.WaitENIStatus).Duration)

_, err = api.WaitForNetworkInterface(ctx, eniID, aliyunClient.ENIStatusAvailable, backoff.Backoff(backoff.WaitENIStatus), true)
_, err = api.WaitForNetworkInterface(ctx, eniID, aliyunClient.ENIStatusAvailable, true)
if err == nil || errors.Is(err, apiErr.ErrNotFound) {
return nil
}
Expand Down
Loading

0 comments on commit 786f792

Please sign in to comment.