diff --git a/pkg/eni/local.go b/pkg/eni/local.go index 85dcd7e5..dc00c67b 100644 --- a/pkg/eni/local.go +++ b/pkg/eni/local.go @@ -404,6 +404,7 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR expectV4 := 0 expectV6 := 0 + var ipv4, ipv6 *IP if l.enableIPv4 { if localIPRequest.NoCache { if len(l.ipv4)+l.allocatingV4.Len() >= l.cap { @@ -411,7 +412,7 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR } expectV4 = 1 } else { - ipv4 := l.ipv4.PeekAvailable(cni.PodID) + ipv4 = l.ipv4.PeekAvailable(cni.PodID) if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } else if ipv4 == nil { @@ -427,7 +428,7 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR } expectV6 = 1 } else { - ipv6 := l.ipv6.PeekAvailable(cni.PodID) + ipv6 = l.ipv6.PeekAvailable(cni.PodID) if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap { return nil, []Trace{{Condition: Full}} } else if ipv6 == nil { @@ -441,6 +442,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}} } + ok1 := l.enableIPv4 && ipv4 != nil || !l.enableIPv4 + ok2 := l.enableIPv6 && ipv6 != nil || !l.enableIPv6 + + if ok1 && ok2 { + // direct return + respCh := make(chan *AllocResp) + // assign ip to pod , as we are ready + // this must be protected by lock + if ipv4 != nil { + ipv4.Allocate(cni.PodID) + } + if ipv6 != nil { + ipv6.Allocate(cni.PodID) + } + + go func() { + l.cond.L.Lock() + defer l.cond.L.Unlock() + + l.commit(ctx, respCh, ipv4, ipv6, cni.PodID) + }() + return respCh, nil + } + for i := 0; i < expectV4; i++ { l.allocatingV4 = append(l.allocatingV4, localIPRequest) } @@ -591,9 +616,6 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local default: } - resp := &AllocResp{} - - var ip types.IPSet2 var ipv4, ipv6 *IP if l.enableIPv4 { ipv4 = l.ipv4.PeekAvailable(cni.PodID) @@ -602,7 +624,6 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local l.cond.Wait() continue } - ip.IPv4 = ipv4.ip } if l.enableIPv6 { ipv6 = l.ipv6.PeekAvailable(cni.PodID) @@ -610,34 +631,9 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local l.cond.Wait() continue } - ip.IPv6 = ipv6.ip } - resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{ - ENI: *l.eni, - IP: ip, - }) - - log.Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String()) - - select { - case <-ctx.Done(): - continue - case respCh <- resp: - // mark the ip as allocated - if ipv4 != nil { - ipv4.Allocate(cni.PodID) - if cni.PodID != "" { - metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec() - } - } - if ipv6 != nil { - ipv6.Allocate(cni.PodID) - if cni.PodID != "" { - metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec() - } - } - } + l.commit(ctx, respCh, ipv4, ipv6, cni.PodID) return } @@ -1045,6 +1041,47 @@ func (l *Local) Status() Status { return s } +// commit send the allocated ip result to respCh +// if ctx canceled, the respCh will be closed +func (l *Local) commit(ctx context.Context, respCh chan *AllocResp, ipv4, ipv6 *IP, podID string) { + var ip types.IPSet2 + if ipv4 != nil { + ip.IPv4 = ipv4.ip + ipv4.Allocate(podID) + if podID != "" { + metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec() + } + } + if ipv6 != nil { + ip.IPv6 = ipv6.ip + ipv6.Allocate(podID) + if podID != "" { + metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec() + } + } + resp := &AllocResp{} + resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{ + ENI: *l.eni, + IP: ip, + }) + select { + case <-ctx.Done(): + if ipv4 != nil { + ipv4.Release(podID) + } + if ipv6 != nil { + ipv6.Release(podID) + } + + // parent cancel the context, so close the ch + close(respCh) + + return + case respCh <- resp: + logr.FromContextOrDiscard(ctx).Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String()) + } +} + // syncIPLocked will mark ip as invalid , if not found in remote func syncIPLocked(lo Set, remote []netip.Addr) { s := sets.New[netip.Addr](remote...) diff --git a/pkg/eni/local_test.go b/pkg/eni/local_test.go index e0ae7ee3..b8f751f0 100644 --- a/pkg/eni/local_test.go +++ b/pkg/eni/local_test.go @@ -9,6 +9,7 @@ import ( "time" "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" "golang.org/x/time/rate" "k8s.io/apimachinery/pkg/util/cache" "k8s.io/apimachinery/pkg/util/sets" @@ -571,3 +572,139 @@ func TestAllocFromFactory(t *testing.T) { assert.Equal(t, req1, local.allocatingV4[0]) assert.Equal(t, req1, local.allocatingV6[0]) } + +func Test_factoryDisposeWorker_unAssignIP(t *testing.T) { + f := factorymocks.NewFactory(t) + // even we have two jobs ,we only get one ip + f.On("UnAssignNIPv4", "eni-1", []netip.Addr{netip.MustParseAddr("192.0.2.1")}, mock.Anything).Return(nil).Once() + f.On("UnAssignNIPv6", "eni-1", []netip.Addr{netip.MustParseAddr("fd00::1")}, mock.Anything).Return(nil).Once() + + local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{ + EnableIPv4: true, + EnableIPv6: true, + BatchSize: 10, + }, "") + local.status = statusInUse + + local.ipv4.Add(&IP{ + ip: netip.MustParseAddr("192.0.2.1"), + primary: false, + podID: "", + status: ipStatusDeleting, + }) + + local.ipv4.Add(&IP{ + ip: netip.MustParseAddr("192.0.2.2"), + primary: false, + podID: "", + status: ipStatusValid, + }) + + local.ipv6.Add(&IP{ + ip: netip.MustParseAddr("fd00::1"), + primary: false, + podID: "", + status: ipStatusDeleting, + }) + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go local.factoryDisposeWorker(ctx) + + time.Sleep(1 * time.Second) + assert.Len(t, local.ipv4, 1) + assert.Len(t, local.ipv6, 0) +} + +func Test_factoryDisposeWorker_releaseIP(t *testing.T) { + f := factorymocks.NewFactory(t) + // even we have two jobs ,we only get one ip + f.On("DeleteNetworkInterface", "eni-1").Return(nil).Once() + + local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{ + EnableIPv4: true, + EnableIPv6: true, + BatchSize: 10, + }, "") + local.status = statusDeleting + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + go local.factoryDisposeWorker(ctx) + + time.Sleep(1 * time.Second) + assert.Nil(t, local.eni) +} + +func Test_commit_responsed(t *testing.T) { + f := factorymocks.NewFactory(t) + local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{ + EnableIPv4: true, + EnableIPv6: true, + BatchSize: 10, + }, "") + local.status = statusInUse + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + respCh := make(chan *AllocResp) + ipv4 := &IP{ + ip: netip.MustParseAddr("127.0.0.1"), + primary: false, + podID: "", + status: ipStatusValid, + } + ipv6 := &IP{ + ip: netip.MustParseAddr("fd00::1"), + primary: false, + podID: "", + status: ipStatusValid, + } + wg := sync.WaitGroup{} + wg.Add(1) + go func() { + defer wg.Done() + + <-respCh + }() + + local.commit(ctx, respCh, ipv4, ipv6, "foo") + + wg.Wait() + + assert.Equal(t, "foo", ipv4.podID) + assert.Equal(t, "foo", ipv6.podID) +} + +func Test_commit_canceled(t *testing.T) { + f := factorymocks.NewFactory(t) + local := NewLocalTest(&daemon.ENI{ID: "eni-1"}, f, &types.PoolConfig{ + EnableIPv4: true, + EnableIPv6: true, + BatchSize: 10, + }, "") + local.status = statusInUse + + ctx, cancel := context.WithCancel(context.Background()) + cancel() + + respCh := make(chan *AllocResp) + ipv4 := &IP{ + ip: netip.MustParseAddr("127.0.0.1"), + primary: false, + podID: "foo", + status: ipStatusValid, + } + ipv6 := &IP{ + ip: netip.MustParseAddr("fd00::1"), + primary: false, + podID: "foo", + status: ipStatusValid, + } + + local.commit(ctx, respCh, ipv4, ipv6, "foo") + + assert.Equal(t, "", ipv4.podID) + assert.Equal(t, "", ipv6.podID) +}