Skip to content

Commit

Permalink
Merge pull request AliyunContainerService#768 from l1b0k/fix/alloc
Browse files Browse the repository at this point in the history
fix: alloc problem
  • Loading branch information
BSWANG authored Jan 17, 2025
2 parents 2030ff5 + b58f4ff commit 1947216
Show file tree
Hide file tree
Showing 3 changed files with 277 additions and 33 deletions.
148 changes: 115 additions & 33 deletions pkg/eni/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -400,19 +400,19 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
}

log := logr.FromContextOrDiscard(ctx)
log.Info(fmt.Sprintf("local request %v", localIPRequest))

expectV4 := 0
expectV6 := 0

var ipv4, ipv6 *IP
if l.enableIPv4 {
if localIPRequest.NoCache {
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV4 = 1
} else {
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv4 == nil {
Expand All @@ -428,7 +428,7 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
}
expectV6 = 1
} else {
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv6 == nil {
Expand All @@ -442,6 +442,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
}

ok1 := l.enableIPv4 && ipv4 != nil || !l.enableIPv4
ok2 := l.enableIPv6 && ipv6 != nil || !l.enableIPv6

if ok1 && ok2 {
// direct return
respCh := make(chan *AllocResp)
// assign ip to pod , as we are ready
// this must be protected by lock
if ipv4 != nil {
ipv4.Allocate(cni.PodID)
}
if ipv6 != nil {
ipv6.Allocate(cni.PodID)
}

go func() {
l.cond.L.Lock()
defer l.cond.L.Unlock()

l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)
}()
return respCh, nil
}

for i := 0; i < expectV4; i++ {
l.allocatingV4 = append(l.allocatingV4, localIPRequest)
}
Expand All @@ -455,6 +479,11 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR

go l.allocWorker(ctx, cni, localIPRequest, respCh)

if l.eni == nil {
log.Info("local request", "eni", "", "req", localIPRequest)
} else {
log.Info("local request", "eni", l.eni.ID, "req", localIPRequest)
}
return respCh, nil
}

Expand Down Expand Up @@ -528,6 +557,8 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
l.cond.L.Lock()
defer l.cond.L.Unlock()

defer l.cond.Broadcast()

defer func() {
if request == nil {
return
Expand All @@ -550,6 +581,32 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
}()

log := logr.FromContextOrDiscard(ctx)
if request != nil && request.NoCache {
// as we want to do preheat, so this ip will not be consumed
// so just hang there , let this ctx done

for {
select {
case <-request.workerCtx.Done():
// work ctx finished (factory cancel it)

select {
case <-ctx.Done():
close(respCh)
case respCh <- &AllocResp{}:
}

return
case <-ctx.Done():
// parent cancel the context, so close the ch
close(respCh)
return
default:
}
l.cond.Wait()
}
}

for {
select {
case <-ctx.Done():
Expand All @@ -559,52 +616,24 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
default:
}

resp := &AllocResp{}

var ip types.IPSet2
var ipv4, ipv6 *IP
if l.enableIPv4 {
ipv4 = l.ipv4.PeekAvailable(cni.PodID)
if ipv4 == nil {
log.Info("waiting ipv4")
l.cond.Wait()
continue
}
ip.IPv4 = ipv4.ip
}
if l.enableIPv6 {
ipv6 = l.ipv6.PeekAvailable(cni.PodID)
if ipv6 == nil {
l.cond.Wait()
continue
}
ip.IPv6 = ipv6.ip
}

resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
ENI: *l.eni,
IP: ip,
})

log.Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())

select {
case <-ctx.Done():
continue
case respCh <- resp:
// mark the ip as allocated
if ipv4 != nil {
ipv4.Allocate(cni.PodID)
if cni.PodID != "" {
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
}
}
if ipv6 != nil {
ipv6.Allocate(cni.PodID)
if cni.PodID != "" {
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
}
}
}
l.commit(ctx, respCh, ipv4, ipv6, cni.PodID)

return
}
Expand Down Expand Up @@ -1012,6 +1041,47 @@ func (l *Local) Status() Status {
return s
}

// commit send the allocated ip result to respCh
// if ctx canceled, the respCh will be closed
func (l *Local) commit(ctx context.Context, respCh chan *AllocResp, ipv4, ipv6 *IP, podID string) {
var ip types.IPSet2
if ipv4 != nil {
ip.IPv4 = ipv4.ip
ipv4.Allocate(podID)
if podID != "" {
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv4)).Dec()
}
}
if ipv6 != nil {
ip.IPv6 = ipv6.ip
ipv6.Allocate(podID)
if podID != "" {
metric.ResourcePoolIdle.WithLabelValues(metric.ResourcePoolTypeLocal, string(types.IPStackIPv6)).Dec()
}
}
resp := &AllocResp{}
resp.NetworkConfigs = append(resp.NetworkConfigs, &LocalIPResource{
ENI: *l.eni,
IP: ip,
})
select {
case <-ctx.Done():
if ipv4 != nil {
ipv4.Release(podID)
}
if ipv6 != nil {
ipv6.Release(podID)
}

// parent cancel the context, so close the ch
close(respCh)

return
case respCh <- resp:
logr.FromContextOrDiscard(ctx).Info("allocWorker got ip", "eni", l.eni.ID, "ipv4", ip.IPv4.String(), "ipv6", ip.IPv6.String())
}
}

// syncIPLocked will mark ip as invalid , if not found in remote
func syncIPLocked(lo Set, remote []netip.Addr) {
s := sets.New[netip.Addr](remote...)
Expand Down Expand Up @@ -1128,12 +1198,24 @@ func (l *Local) popNIPv4Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV4, count)
l.dangingV4 = append(l.dangingV4, firstPart...)
l.allocatingV4 = secondPart

lo.ForEach(l.dangingV4, func(item *LocalIPRequest, index int) {
if item.NoCache {
item.cancel()
}
})
}

func (l *Local) popNIPv6Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV6, count)
l.dangingV6 = append(l.dangingV6, firstPart...)
l.allocatingV6 = secondPart

lo.ForEach(l.dangingV6, func(item *LocalIPRequest, index int) {
if item.NoCache {
item.cancel()
}
})
}

func Split[T any](arr []T, index int) ([]T, []T) {
Expand Down
Loading

0 comments on commit 1947216

Please sign in to comment.