Skip to content

Commit

Permalink
Merge pull request #1 from chengzj12/AddPodLister
Browse files Browse the repository at this point in the history
use podlister instead of listing all pods every time
  • Loading branch information
xiekeyang authored Jul 9, 2021
2 parents 0d4eaf8 + 491629c commit 22f0621
Show file tree
Hide file tree
Showing 2 changed files with 37 additions and 13 deletions.
46 changes: 35 additions & 11 deletions pkg/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,23 @@ package main

import (
"bufio"
"context"
"fmt"
"os"
"regexp"
"strings"
"time"

"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/dcgm"
"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/nvml"
"github.com/sirupsen/logrus"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
v1lister "k8s.io/client-go/listers/core/v1"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/clientcmd"
)

Expand Down Expand Up @@ -63,7 +65,8 @@ func NewCgroupMapper(devices []dcgm.Device) *CgroupMapper {
logrus.Errorf("can't get kubernetes client: %s", err)
return nil
}
return &CgroupMapper{DeviceList: devices, K8sClient: client}
podLister := newPodClients(client, v1.NamespaceAll)
return &CgroupMapper{DeviceList: devices, PodLister: podLister}
}

func (c *CgroupMapper) Name() string {
Expand Down Expand Up @@ -129,19 +132,15 @@ func (c *CgroupMapper) Process(metrics [][]Metric) error {
func (c *CgroupMapper) getPodInfo() (map[ContainerKey]ContainerInfo, error) {
logrus.Infof("Get Pod and Container Information")

ctx, _ := context.WithCancel(context.Background())
podList, err := c.K8sClient.CoreV1().Pods(v1.NamespaceAll).List(ctx, metav1.ListOptions{
LabelSelector: labels.Everything().String(),
ResourceVersion: "0",
})
pods, err := c.PodLister.List(labels.Everything())
if err != nil {
logrus.Infof("Get Pod and Container Information failed: %s", err)
return nil, err
}

logrus.Infof("Get some Pod and Container(%d).", len(podList.Items))
logrus.Infof("Get some Pod and Container(%d).", len(pods))
containers := make(map[ContainerKey]ContainerInfo)
for _, pod := range podList.Items {
for _, pod := range pods {
fmt.Printf("pod name(%s), namespace(%s), uid(%s)\n", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace, pod.ObjectMeta.UID)
for _, container := range pod.Status.ContainerStatuses {
containerId := strings.Replace(container.ContainerID, "docker://", "", 1)
Expand Down Expand Up @@ -302,3 +301,28 @@ func initKubeClient() (*kubernetes.Clientset, error) {
logrus.Infof("Init KubeClient success.")
return client, nil
}

// Creates clients watching pods: PodLister (listing only not terminated pods).
func newPodClients(kubeClient kubernetes.Interface, namespace string) v1lister.PodLister {
// We are interested in pods which are Running or Unknown or Pending
// We don't want succeeded and failed pods because they don't generate any usage anymore.
selector := fields.ParseSelectorOrDie("status.phase!=" + string(v1.PodSucceeded) +
",status.phase!=" + string(v1.PodFailed))
podListWatch := cache.NewListWatchFromClient(kubeClient.CoreV1().RESTClient(), "pods", namespace, selector)
indexer, controller := cache.NewIndexerInformer(
podListWatch,
&v1.Pod{},
time.Hour,
&cache.ResourceEventHandlerFuncs{},
cache.Indexers{cache.NamespaceIndex: cache.MetaNamespaceIndexFunc},
)
podLister := v1lister.NewPodLister(indexer)
stopCh := make(chan struct{})
go controller.Run(stopCh)
if !cache.WaitForCacheSync(make(<-chan struct{}), controller.HasSynced) {
logrus.Fatalf("Failed to sync pod cache during initialization")
} else {
logrus.Info("Initial pod synced successfully")
}
return podLister
}
4 changes: 2 additions & 2 deletions pkg/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import (
"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/dcgm"

"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
v1lister "k8s.io/client-go/listers/core/v1"
)

var (
Expand Down Expand Up @@ -147,7 +147,7 @@ type PodInfo struct {

type CgroupMapper struct {
DeviceList []dcgm.Device
K8sClient *kubernetes.Clientset
PodLister v1lister.PodLister
// SharedInformers gives access to informers for the controller.
SharedInformers informers.SharedInformerFactory
}

0 comments on commit 22f0621

Please sign in to comment.