Skip to content

Commit a72cdef

Browse files
fix ib-k8s bugs
UFM Cleanup on Pod Lifecycle Events: Fixed issue where GUIDs weren't properly removed from UFM (Unified Fabric Manager) when pods completed (success/error) or were deleted. GUID Reallocation Conflicts: Added logic to remove existing GUID allocations from UFM before assigning a new partition (PKey) to prevent conflicts when the same GUID is reused. Pod State Handling: Improved pod lifecycle management by treating finished pods (succeeded/failed) the same as deleted pods for cleanup purposes. Signed-off-by: Mladjan Gadzic <[email protected]>
1 parent cd4ac7d commit a72cdef

File tree

13 files changed

+486
-66
lines changed

13 files changed

+486
-66
lines changed

pkg/daemon/daemon.go

Lines changed: 109 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -140,21 +140,25 @@ func NewDaemon() (Daemon, error) {
140140
return nil, err
141141
}
142142

143+
// Create daemon instance first to access syncGuidPool method
144+
daemonInstance := &daemon{
145+
config: daemonConfig,
146+
watcher: nil, // Will be set later
147+
kubeClient: client,
148+
guidPool: guidPool,
149+
smClient: smClient,
150+
guidPodNetworkMap: make(map[string]string),
151+
}
152+
143153
// Reset guid pool with already allocated guids to avoid collisions
144-
err = syncGUIDPool(smClient, guidPool)
154+
err = daemonInstance.syncGUIDPool()
145155
if err != nil {
146156
return nil, err
147157
}
148158

149159
podWatcher := watcher.NewWatcher(podEventHandler, client)
150-
return &daemon{
151-
config: daemonConfig,
152-
watcher: podWatcher,
153-
kubeClient: client,
154-
guidPool: guidPool,
155-
smClient: smClient,
156-
guidPodNetworkMap: make(map[string]string),
157-
}, nil
160+
daemonInstance.watcher = podWatcher
161+
return daemonInstance, nil
158162
}
159163

160164
func (d *daemon) Run() {
@@ -291,14 +295,22 @@ func (d *daemon) processPodsForNetwork(
291295
}
292296

293297
// Verify if GUID already exist for given network ID and allocates new one if not
294-
func (d *daemon) allocatePodNetworkGUID(allocatedGUID, podNetworkID string, podUID types.UID) error {
298+
func (d *daemon) allocatePodNetworkGUID(allocatedGUID, podNetworkID string, podUID types.UID, targetPkey string) error {
299+
existingPkey, _ := d.guidPool.Get(allocatedGUID)
300+
if existingPkey != "" {
301+
// This happens when a GUID is being reallocated to a different PKey
302+
// (e.g., pod was rescheduled or network configuration changed)
303+
if err := d.removeStaleGUID(allocatedGUID, existingPkey); err != nil {
304+
log.Warn().Msgf("failed to remove stale GUID %s from pkey %s: %v", allocatedGUID, existingPkey, err)
305+
}
306+
}
295307
if mappedID, exist := d.guidPodNetworkMap[allocatedGUID]; exist {
296308
if podNetworkID != mappedID {
297309
return fmt.Errorf("failed to allocate requested guid %s, already allocated for %s",
298310
allocatedGUID, mappedID)
299311
}
300-
} else if err := d.guidPool.AllocateGUID(allocatedGUID); err != nil {
301-
return fmt.Errorf("failed to allocate GUID for pod ID %s, wit error: %v", podUID, err)
312+
} else if err := d.guidPool.AllocateGUID(allocatedGUID, targetPkey); err != nil {
313+
return fmt.Errorf("failed to allocate GUID for pod ID %s, with error: %v", podUID, err)
302314
} else {
303315
d.guidPodNetworkMap[allocatedGUID] = podNetworkID
304316
}
@@ -329,7 +341,7 @@ func (d *daemon) processNetworkGUID(
329341
return fmt.Errorf("failed to parse user allocated guid %s with error: %v", allocatedGUID, err)
330342
}
331343

332-
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID)
344+
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID, spec.PKey)
333345
if err != nil {
334346
return err
335347
}
@@ -339,7 +351,7 @@ func (d *daemon) processNetworkGUID(
339351
switch err {
340352
// If the guid pool is exhausted, need to sync with SM in case there are unsynced changes
341353
case guid.ErrGUIDPoolExhausted:
342-
err = syncGUIDPool(d.smClient, d.guidPool)
354+
err = d.syncGUIDPool()
343355
if err != nil {
344356
return err
345357
}
@@ -349,7 +361,7 @@ func (d *daemon) processNetworkGUID(
349361
}
350362

351363
allocatedGUID = guidAddr.String()
352-
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID)
364+
err = d.allocatePodNetworkGUID(allocatedGUID, podNetworkID, pi.pod.UID, spec.PKey)
353365
if err != nil {
354366
return err
355367
}
@@ -373,28 +385,84 @@ func (d *daemon) processNetworkGUID(
373385
return nil
374386
}
375387

376-
func syncGUIDPool(smClient plugins.SubnetManagerClient, guidPool guid.Pool) error {
377-
usedGuids, err := smClient.ListGuidsInUse()
388+
func (d *daemon) removeStaleGUID(allocatedGUID, existingPkey string) error {
389+
parsedPkey, err := utils.ParsePKey(existingPkey)
390+
if err != nil {
391+
log.Error().Msgf("failed to parse PKey %s with error: %v", existingPkey, err)
392+
return err
393+
}
394+
guidAddr, err := guid.ParseGUID(allocatedGUID)
395+
if err != nil {
396+
return fmt.Errorf("failed to parse user allocated guid %s with error: %v", allocatedGUID, err)
397+
}
398+
allocatedGUIDList := []net.HardwareAddr{guidAddr.HardWareAddress()}
399+
// Try to remove pKeys via subnet manager in backoff loop
400+
if err = wait.ExponentialBackoff(backoffValues, func() (bool, error) {
401+
log.Info().Msgf("removing guids of previous pods from pKey %s"+
402+
" with subnet manager %s", existingPkey,
403+
d.smClient.Name())
404+
if err = d.smClient.RemoveGuidsFromPKey(parsedPkey, allocatedGUIDList); err != nil {
405+
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
406+
" with subnet manager %s with error: %v", existingPkey,
407+
d.smClient.Name(), err)
408+
return false, nil //nolint:nilerr // retry pattern for exponential backoff
409+
}
410+
return true, nil
411+
}); err != nil {
412+
log.Warn().Msgf("failed to remove guids of removed pods from pKey %s"+
413+
" with subnet manager %s", existingPkey, d.smClient.Name())
414+
return err
415+
}
416+
417+
if err = d.guidPool.ReleaseGUID(allocatedGUID); err != nil {
418+
log.Warn().Msgf("failed to release guid \"%s\" with error: %v", allocatedGUID, err)
419+
return err
420+
}
421+
delete(d.guidPodNetworkMap, allocatedGUID)
422+
log.Info().Msgf("successfully released %s from pkey %s", allocatedGUID, existingPkey)
423+
return nil
424+
}
425+
426+
func (d *daemon) syncGUIDPool() error {
427+
usedGuids, err := d.smClient.ListGuidsInUse()
378428
if err != nil {
379429
return err
380430
}
381431

382432
// Reset guid pool with already allocated guids to avoid collisions
383-
err = guidPool.Reset(usedGuids)
433+
err = d.guidPool.Reset(usedGuids)
384434
if err != nil {
385435
return err
386436
}
437+
438+
// Remove stale GUIDs that are no longer in use by the subnet manager
439+
// This handles cleanup of GUIDs from deleted/finished pods
440+
for allocatedGUID, podNetworkID := range d.guidPodNetworkMap {
441+
if _, found := usedGuids[allocatedGUID]; !found {
442+
// If GUID is not found in the subnet manager's list of used GUIDs,
443+
// it means the pod was deleted/finished and we should clean it up
444+
log.Info().Msgf("removing stale GUID %s for pod network %s", allocatedGUID, podNetworkID)
445+
if err = d.guidPool.ReleaseGUID(allocatedGUID); err != nil {
446+
log.Warn().Msgf("failed to release stale guid \"%s\" with error: %v", allocatedGUID, err)
447+
} else {
448+
delete(d.guidPodNetworkMap, allocatedGUID)
449+
log.Info().Msgf("successfully cleaned up stale GUID %s", allocatedGUID)
450+
}
451+
}
452+
}
453+
387454
return nil
388455
}
389456

390457
// Update and set Pod's network annotation.
391458
// If failed to update annotation, pod's GUID added into the list to be removed from Pkey.
392-
func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]net.HardwareAddr) error {
459+
func (d *daemon) updatePodNetworkAnnotation(pi *podNetworkInfo, removedList *[]net.HardwareAddr, pkey string) error {
393460
if pi.ibNetwork.CNIArgs == nil {
394461
pi.ibNetwork.CNIArgs = &map[string]interface{}{}
395462
}
396463

397464
(*pi.ibNetwork.CNIArgs)[utils.InfiniBandAnnotation] = utils.ConfiguredInfiniBandPod
465+
(*pi.ibNetwork.CNIArgs)[utils.PkeyAnnotation] = pkey
398466

399467
netAnnotations, err := json.Marshal(pi.networks)
400468
if err != nil {
@@ -488,7 +556,7 @@ func (d *daemon) AddPeriodicUpdate() {
488556
// Update annotations for PODs that finished the previous steps successfully
489557
var removedGUIDList []net.HardwareAddr
490558
for _, pi := range passedPods {
491-
err = d.updatePodNetworkAnnotation(pi, &removedGUIDList)
559+
err = d.updatePodNetworkAnnotation(pi, &removedGUIDList, ibCniSpec.PKey)
492560
if err != nil {
493561
log.Error().Msgf("%v", err)
494562
}
@@ -595,7 +663,21 @@ func (d *daemon) DeletePeriodicUpdate() {
595663
continue
596664
}
597665

598-
guidList = append(guidList, podGUIDs...)
666+
// Process each GUID from the pod
667+
for _, guidAddr := range podGUIDs {
668+
podNetworkID := utils.GeneratePodNetworkID(pod, networkName)
669+
if guidPodEntry, exist := d.guidPodNetworkMap[guidAddr.String()]; exist {
670+
if podNetworkID == guidPodEntry {
671+
log.Info().Msgf("matched guid %s to pod %s, removing", guidAddr, guidPodEntry)
672+
guidList = append(guidList, guidAddr)
673+
} else {
674+
log.Warn().Msgf("guid %s is allocated to another pod %s not %s, not removing",
675+
guidAddr, guidPodEntry, podNetworkID)
676+
}
677+
} else {
678+
log.Warn().Msgf("guid %s is not allocated to any pod on delete", guidAddr)
679+
}
680+
}
599681
}
600682

601683
if ibCniSpec.PKey != "" && len(guidList) != 0 {
@@ -657,6 +739,9 @@ func (d *daemon) initPool() error {
657739
for index := range pods.Items {
658740
log.Debug().Msgf("checking pod for network annotations %v", pods.Items[index])
659741
pod := pods.Items[index]
742+
if utils.PodIsFinished(&pod) {
743+
continue
744+
}
660745
networks, err := netAttUtils.ParsePodNetworkAnnotation(&pod)
661746
if err != nil {
662747
continue
@@ -671,6 +756,7 @@ func (d *daemon) initPool() error {
671756
if err != nil {
672757
continue
673758
}
759+
674760
podNetworkID := string(pod.UID) + network.Name
675761
if _, exist := d.guidPodNetworkMap[podGUID]; exist {
676762
if podNetworkID != d.guidPodNetworkMap[podGUID] {
@@ -679,8 +765,8 @@ func (d *daemon) initPool() error {
679765
}
680766
continue
681767
}
682-
683-
if err = d.guidPool.AllocateGUID(podGUID); err != nil {
768+
podPkey, _ := utils.GetPodNetworkPkey(network)
769+
if err = d.guidPool.AllocateGUID(podGUID, podPkey); err != nil {
684770
err = fmt.Errorf("failed to allocate guid for running pod: %v", err)
685771
log.Error().Msgf("%v", err)
686772
continue

pkg/daemon/daemon_suite_test.go

Lines changed: 29 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,29 @@
1+
// Copyright 2025 NVIDIA CORPORATION & AFFILIATES
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
//
15+
// SPDX-License-Identifier: Apache-2.0
16+
17+
package daemon_test
18+
19+
import (
20+
"testing"
21+
22+
. "github.com/onsi/ginkgo/v2"
23+
. "github.com/onsi/gomega"
24+
)
25+
26+
func TestDaemon(t *testing.T) {
27+
RegisterFailHandler(Fail)
28+
RunSpecs(t, "Daemon Suite")
29+
}

0 commit comments

Comments
 (0)