Skip to content

Commit

Permalink
fix: alloc from cache may over the cap limit
Browse files Browse the repository at this point in the history
Signed-off-by: l1b0k <[email protected]>
  • Loading branch information
l1b0k committed Jan 17, 2025
1 parent 8ac4f62 commit c7a3215
Show file tree
Hide file tree
Showing 2 changed files with 207 additions and 33 deletions.
103 changes: 70 additions & 33 deletions pkg/eni/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -404,14 +404,15 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
expectV4 := 0
expectV6 := 0

var ipv4, ipv6 *IP
if l.enableIPv4 {
if localIPRequest.NoCache {
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV4 = 1
} else {
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv4 == nil {
Expand All @@ -427,7 +428,7 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
}
expectV6 = 1
} else {
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv6 == nil {
Expand All @@ -441,6 +442,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
}

ok1 := l.enableIPv4 && ipv4 != nil || !l.enableIPv4
ok2 := l.enableIPv6 && ipv6 != nil || !l.enableIPv6

if ok1 && ok2 {
// direct return
respCh := make(chan *AllocResp)
// assign ip to pod , as we are ready
// this must be protected by local
if ipv4 != nil {
ipv4.Allocate(cni.PodID)
}
if ipv6 != nil {
ipv6.Allocate(cni.PodID)
}

go func() {
l.cond.L.Lock()
defer l.cond.L.Unlock()

l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)
}()
return respCh, nil
}

for i := 0; i < expectV4; i++ {
l.allocatingV4 = append(l.allocatingV4, localIPRequest)
}
Expand Down Expand Up @@ -590,10 +615,7 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
return
default:
}

resp := &AllocResp{}

var ip types.IPSet2

var ipv4, ipv6 *IP
if l.enableIPv4 {
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
Expand All @@ -602,42 +624,16 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
l.cond.Wait()
continue
}
ip.IPv4 = ipv4.ip
}
if l.enableIPv6 {
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
if ipv6 == nil {
l.cond.Wait()
continue
}
ip.IPv6 = ipv6.ip
}

resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
ENI: *l.eni,
IP: ip,
})

log.Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())

select {
case <-ctx.Done():
continue
case respCh <- resp:
// mark the ip as allocated
if ipv4 != nil {
ipv4.Allocate(cni.PodID)
if cni.PodID != "" {
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
}
}
if ipv6 != nil {
ipv6.Allocate(cni.PodID)
if cni.PodID != "" {
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
}
}
}
l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)

return
}
Expand Down Expand Up @@ -1045,6 +1041,47 @@ func (l *Local) Status() Status {
return s
}

// commit send the allocated ip result to respCh
// if ctx canceled, the respCh will be closed
func (l *Local) commit(ctx context.Context, respCh chan *AllocResp, ipv4, ipv6 *IP, podID string) {
var ip types.IPSet2
if ipv4 != nil {
ip.IPv4 = ipv4.ip
ipv4.Allocate(podID)
if podID != "" {
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
}
}
if ipv6 != nil {
ip.IPv6 = ipv6.ip
ipv6.Allocate(podID)
if podID != "" {
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
}
}
resp := &AllocResp{}
resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
ENI: *l.eni,
IP: ip,
})
select {
case <-ctx.Done():
if ipv4 != nil {
ipv4.Release(podID)
}
if ipv6 != nil {
ipv6.Release(podID)
}

// parent cancel the context, so close the ch
close(respCh)

return
case respCh <- resp:
logr.FromContextOrDiscard(ctx).Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())
}
}

// syncIPLocked will mark ip as invalid , if not found in remote
func syncIPLocked(lo Set, remote []netip.Addr) {
s := sets.New[netip.Addr](remote...)
Expand Down
137 changes: 137 additions & 0 deletions pkg/eni/local_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
"golang.org/x/time/rate"
"k8s.io/apimachinery/pkg/util/cache"
"k8s.io/apimachinery/pkg/util/sets"
Expand Down Expand Up @@ -571,3 +572,139 @@ func TestAllocFromFactory(t *testing.T) {
assert.Equal(t, req1, local.allocatingV4[0])
assert.Equal(t, req1, local.allocatingV6[0])
}

func Test_factoryDisposeWorker_unAssignIP(t *testing.T) {
f := factorymocks.NewFactory(t)
// even we have two jobs ,we only get one ip
f.On("UnAssignNIPv4", "eni-1", []netip.Addr{netip.MustParseAddr("192.0.2.1")}, mock.Anything).Return(nil).Once()
f.On("UnAssignNIPv6", "eni-1", []netip.Addr{netip.MustParseAddr("fd00::1")}, mock.Anything).Return(nil).Once()

local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
EnableIPv4: true,
EnableIPv6: true,
BatchSize: 10,
}, "")
local.status = statusInUse

local.ipv4.Add(&IP{
ip: netip.MustParseAddr("192.0.2.1"),
primary: false,
podID: "",
status: ipStatusDeleting,
})

local.ipv4.Add(&IP{
ip: netip.MustParseAddr("192.0.2.2"),
primary: false,
podID: "",
status: ipStatusValid,
})

local.ipv6.Add(&IP{
ip: netip.MustParseAddr("fd00::1"),
primary: false,
podID: "",
status: ipStatusDeleting,
})

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go local.factoryDisposeWorker(ctx)

time.Sleep(1 * time.Second)
assert.Len(t, local.ipv4, 1)
assert.Len(t, local.ipv6, 0)
}

func Test_factoryDisposeWorker_releaseIP(t *testing.T) {
f := factorymocks.NewFactory(t)
// even we have two jobs ,we only get one ip
f.On("DeleteNetworkInterface", "eni-1").Return(nil).Once()

local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
EnableIPv4: true,
EnableIPv6: true,
BatchSize: 10,
}, "")
local.status = statusDeleting

ctx, cancel := context.WithCancel(context.Background())
defer cancel()
go local.factoryDisposeWorker(ctx)

time.Sleep(1 * time.Second)
assert.Nil(t, local.eni)
}

func Test_commit_responsed(t *testing.T) {
f := factorymocks.NewFactory(t)
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
EnableIPv4: true,
EnableIPv6: true,
BatchSize: 10,
}, "")
local.status = statusInUse

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

respCh := make(chan *AllocResp)
ipv4 := &IP{
ip: netip.MustParseAddr("127.0.0.1"),
primary: false,
podID: "",
status: ipStatusValid,
}
ipv6 := &IP{
ip: netip.MustParseAddr("fd00::1"),
primary: false,
podID: "",
status: ipStatusValid,
}
wg := sync.WaitGroup{}
wg.Add(1)
go func() {
defer wg.Done()

<-respCh
}()

local.commit(ctx, respCh, ipv4, ipv6, "foo")

wg.Wait()

assert.Equal(t, "foo", ipv4.podID)
assert.Equal(t, "foo", ipv6.podID)
}

func Test_commit_canceled(t *testing.T) {
f := factorymocks.NewFactory(t)
local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{
EnableIPv4: true,
EnableIPv6: true,
BatchSize: 10,
}, "")
local.status = statusInUse

ctx, cancel := context.WithCancel(context.Background())
cancel()

respCh := make(chan *AllocResp)
ipv4 := &IP{
ip: netip.MustParseAddr("127.0.0.1"),
primary: false,
podID: "foo",
status: ipStatusValid,
}
ipv6 := &IP{
ip: netip.MustParseAddr("fd00::1"),
primary: false,
podID: "foo",
status: ipStatusValid,
}

local.commit(ctx, respCh, ipv4, ipv6, "foo")

assert.Equal(t, "", ipv4.podID)
assert.Equal(t, "", ipv6.podID)
}

0 comments on commit c7a3215

Please sign in to comment.