Skip to content

Commit

Permalink
add the deadline for the allocating requests
Browse files Browse the repository at this point in the history
Signed-off-by: l1b0k <[email protected]>
  • Loading branch information
l1b0k committed Jan 3, 2025
1 parent 8f7c2af commit 898d8da
Show file tree
Hide file tree
Showing 6 changed files with 457 additions and 76 deletions.
4 changes: 2 additions & 2 deletions daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r
if pod.PodENI {
resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
} else {
req := &eni.LocalIPRequest{}
req := eni.NewLocalIPRequest()
if pod.ERdma {
req.LocalIPType = eni.LocalIPTypeERDMA
}
Expand All @@ -213,7 +213,7 @@ func (n *networkService) AllocIP(ctx context.Context, r *rpc.AllocIPRequest) (*r
if pod.PodENI || n.ipamType == types.IPAMTypeCRD {
resourceRequests = append(resourceRequests, &eni.RemoteIPRequest{})
} else {
req := &eni.LocalIPRequest{}
req := eni.NewLocalIPRequest()

if len(oldRes.GetResourceItemByType(daemon.ResourceTypeENI)) == 1 {
old := oldRes.GetResourceItemByType(daemon.ResourceTypeENI)[0]
Expand Down
165 changes: 123 additions & 42 deletions pkg/eni/local.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ import (
"sync"
"time"

"github.com/go-logr/logr"
"github.com/samber/lo"
"golang.org/x/time/rate"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/cache"
Expand Down Expand Up @@ -51,13 +53,24 @@ var rateLimit = rate.Every(1 * time.Minute / 10)

var _ ResourceRequest = &LocalIPRequest{}

func NewLocalIPRequest() *LocalIPRequest {
ctx, cancel := context.WithCancel(context.Background())
return &LocalIPRequest{
workerCtx: ctx,
cancel: cancel,
}
}

type LocalIPRequest struct {
NetworkInterfaceID string
LocalIPType string
IPv4 netip.Addr
IPv6 netip.Addr

NoCache bool // do not use cached ip

workerCtx context.Context
cancel context.CancelFunc
}

func (l *LocalIPRequest) ResourceType() ResourceType {
Expand Down Expand Up @@ -121,7 +134,9 @@ type Local struct {
batchSize int

cap int
allocatingV4, allocatingV6 int
allocatingV4, allocatingV6 AllocatingRequests
// danging, used for release
dangingV4, dangingV6 AllocatingRequests

eni *daemon.ENI
ipAllocInhibitExpireAt time.Time
Expand Down Expand Up @@ -395,30 +410,30 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
return nil, nil
}

lo, ok := request.(*LocalIPRequest)
localIPRequest, ok := request.(*LocalIPRequest)
if !ok {
return nil, []Trace{{Condition: ResourceTypeMismatch}}
}

if lo.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != lo.NetworkInterfaceID {
if localIPRequest.NetworkInterfaceID != "" && l.eni != nil && l.eni.ID != localIPRequest.NetworkInterfaceID {
return nil, []Trace{{Condition: NetworkInterfaceMismatch}}
}

log := logf.FromContext(ctx)
log.Info(fmt.Sprintf("local request %v", lo))
log := logr.FromContextOrDiscard(ctx)
log.Info(fmt.Sprintf("local request %v", localIPRequest))

expectV4 := 0
expectV6 := 0

if l.enableIPv4 {
if lo.NoCache {
if len(l.ipv4)+l.allocatingV4 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV4 = 1
} else {
ipv4 := l.ipv4.PeekAvailable(cni.PodID)
if ipv4 == nil && len(l.ipv4)+l.allocatingV4 >= l.cap {
if ipv4 == nil && len(l.ipv4)+l.allocatingV4.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv4 == nil {
expectV4 = 1
Expand All @@ -427,14 +442,14 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
}

if l.enableIPv6 {
if lo.NoCache {
if len(l.ipv6)+l.allocatingV6 >= l.cap {
if localIPRequest.NoCache {
if len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
}
expectV6 = 1
} else {
ipv6 := l.ipv6.PeekAvailable(cni.PodID)
if ipv6 == nil && len(l.ipv6)+l.allocatingV6 >= l.cap {
if ipv6 == nil && len(l.ipv6)+l.allocatingV6.Len() >= l.cap {
return nil, []Trace{{Condition: Full}}
} else if ipv6 == nil {
expectV6 = 1
Expand All @@ -447,21 +462,18 @@ func (l *Local) Allocate(ctx context.Context, cni *daemon.CNI, request ResourceR
return nil, []Trace{{Condition: InsufficientVSwitchIP, Reason: fmt.Sprintf("alloc inhibit, expire at %s", l.ipAllocInhibitExpireAt.String())}}
}

l.allocatingV4 += expectV4
l.allocatingV6 += expectV6
for i := 0; i < expectV4; i++ {
l.allocatingV4 = append(l.allocatingV4, localIPRequest)
}
for i := 0; i < expectV6; i++ {
l.allocatingV6 = append(l.allocatingV6, localIPRequest)
}

l.cond.Broadcast()

respCh := make(chan *AllocResp)

go l.allocWorker(ctx, cni, lo, respCh, func() {
// current roll back ip at same time
l.allocatingV4 -= expectV4
l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 -= expectV6
l.allocatingV6 = max(l.allocatingV6, 0)
log.Info("rollback ipv4", "ipv4", expectV4)
})
go l.allocWorker(ctx, cni, localIPRequest, respCh)

return respCh, nil
}
Expand All @@ -481,7 +493,7 @@ func (l *Local) Release(ctx context.Context, cni *daemon.CNI, request NetworkRes
return false
}

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)

if res.IP.IPv4.IsValid() {
l.ipv4.Release(cni.PodID, res.IP.IPv4)
Expand Down Expand Up @@ -529,13 +541,24 @@ func (l *Local) Priority() int {
}

// allocWorker started with each Allocate call
func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp, onErrLocked func()) {
func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *LocalIPRequest, respCh chan *AllocResp) {
done := make(chan struct{})
defer close(done)

l.cond.L.Lock()
defer l.cond.L.Unlock()

defer func() {
if request == nil {
return
}

l.switchIPv4(request)
l.switchIPv6(request)

request.cancel()
}()

go func() {
select {
case <-ctx.Done():
Expand All @@ -546,13 +569,11 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
}
}()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)
for {
select {
case <-ctx.Done():
// parent cancel the context, so close the ch
onErrLocked()

close(respCh)
return
default:
Expand Down Expand Up @@ -612,7 +633,7 @@ func (l *Local) allocWorker(ctx context.Context, cni *daemon.CNI, request *Local
func (l *Local) factoryAllocWorker(ctx context.Context) {
l.cond.L.Lock()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)
for {
if log.V(4).Enabled() {
log.V(4).Info("call allocWorker")
Expand All @@ -625,7 +646,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
default:
}

if l.allocatingV4 <= 0 && l.allocatingV6 <= 0 {
if l.allocatingV4.Len() <= 0 && l.allocatingV6.Len() <= 0 {
l.cond.Wait()
continue
}
Expand All @@ -649,8 +670,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {

if l.eni == nil {
// create eni
v4Count := min(l.batchSize, max(l.allocatingV4, 1))
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, max(l.allocatingV4.Len(), 1))
v6Count := min(l.batchSize, l.allocatingV6.Len())

l.status = statusCreating
l.cond.L.Unlock()
Expand Down Expand Up @@ -689,11 +710,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {

l.eni = eni

l.allocatingV4 -= v4Count
l.allocatingV6 -= v6Count

l.allocatingV4 = max(l.allocatingV4, 0)
l.allocatingV6 = max(l.allocatingV6, 0)
l.popNIPv4Jobs(v4Count)
l.popNIPv6Jobs(v6Count)

primary, err := netip.ParseAddr(eni.PrimaryIP.IPv4.String())
if err == nil {
Expand All @@ -713,8 +731,8 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
l.status = statusInUse
} else {
eniID := l.eni.ID
v4Count := min(l.batchSize, l.allocatingV4)
v6Count := min(l.batchSize, l.allocatingV6)
v4Count := min(l.batchSize, l.allocatingV4.Len())
v6Count := min(l.batchSize, l.allocatingV6.Len())

if v4Count > 0 {
l.cond.L.Unlock()
Expand All @@ -738,8 +756,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
continue
}

l.allocatingV4 -= len(ipv4Set)
l.allocatingV4 = max(l.allocatingV4, 0)
l.popNIPv4Jobs(len(ipv4Set))

l.ipv4.PutValid(ipv4Set...)

Expand Down Expand Up @@ -771,8 +788,7 @@ func (l *Local) factoryAllocWorker(ctx context.Context) {
continue
}

l.allocatingV6 -= len(ipv6Set)
l.allocatingV6 = max(l.allocatingV6, 0)
l.popNIPv6Jobs(len(ipv6Set))

l.ipv6.PutValid(ipv6Set...)

Expand Down Expand Up @@ -861,7 +877,7 @@ func (l *Local) Dispose(n int) int {
func (l *Local) factoryDisposeWorker(ctx context.Context) {
l.cond.L.Lock()

log := logf.FromContext(ctx)
log := logr.FromContextOrDiscard(ctx)
for {
select {
case <-ctx.Done():
Expand Down Expand Up @@ -1082,3 +1098,68 @@ func parseResourceID(id string) (string, string, error) {
}
return parts[0], parts[1], nil
}

func (l *Local) switchIPv4(req *LocalIPRequest) {
found := false
l.allocatingV4 = lo.Filter(l.allocatingV4, func(item *LocalIPRequest, index int) bool {
if item != req {
// true to keep
return true
}
found = true
return false
})
if !found {
return
}

if l.dangingV4.Len() == 0 {
// this may not happen
// call the Len() to make sure canceled job will be removed
return
}
l.allocatingV4 = append(l.allocatingV4, l.dangingV4[0])
l.dangingV4 = l.dangingV4[1:]
}

func (l *Local) switchIPv6(req *LocalIPRequest) {
found := false
l.allocatingV6 = lo.Filter(l.allocatingV6, func(item *LocalIPRequest, index int) bool {
if item != req {
// true to keep
return true
}
found = true
return false
})
if !found {
return
}

if l.dangingV6.Len() == 0 {
// this may not happen
return
}
l.allocatingV6 = append(l.allocatingV6, l.dangingV6[0])
l.dangingV6 = l.dangingV6[1:]
}

func (l *Local) popNIPv4Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV4, count)
l.dangingV4 = append(l.dangingV4, firstPart...)
l.allocatingV4 = secondPart
}

func (l *Local) popNIPv6Jobs(count int) {
firstPart, secondPart := Split(l.allocatingV6, count)
l.dangingV6 = append(l.dangingV6, firstPart...)
l.allocatingV6 = secondPart
}

func Split[T any](arr []T, index int) ([]T, []T) {
if index < 0 || index > len(arr) {
return arr, nil
}

return arr[:index], arr[index:]
}
Loading

0 comments on commit 898d8da

Please sign in to comment.