From 3b173a00e10fd6e4bcb9feabdd041211cbddc6c3 Mon Sep 17 00:00:00 2001 From: keyangxie Date: Thu, 22 Apr 2021 14:59:35 +0800 Subject: [PATCH] map pids to the owner container Signed-off-by: keyangxie --- pkg/container.go | 143 ++++++++++++++++++++++++++++++++++++++--------- pkg/types.go | 6 ++ 2 files changed, 123 insertions(+), 26 deletions(-) diff --git a/pkg/container.go b/pkg/container.go index 4a60db6..d5d8ca6 100644 --- a/pkg/container.go +++ b/pkg/container.go @@ -18,27 +18,53 @@ package main import ( "bufio" + "context" "fmt" "os" "regexp" + "strings" "github.com/NVIDIA/gpu-monitoring-tools/bindings/go/dcgm" "github.com/NVIDIA/gpu-monitoring-tools/bindings/go/nvml" "github.com/sirupsen/logrus" + + "k8s.io/api/core/v1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" + "k8s.io/apimachinery/pkg/fields" + "k8s.io/apimachinery/pkg/labels" + "k8s.io/client-go/kubernetes" + "k8s.io/client-go/rest" + "k8s.io/client-go/tools/clientcmd" ) var ( - kubePattern = regexp.MustCompile(`\d+:memory:/kubepods/[^/]+/pod([^/]+)/([0-9a-f]{64})`) + // 8:memory:/kubepods/besteffort/pod187fd8f5-a423-4a1a-89b8-8928a71746a4/4923b671ebdf52ab54ad88084a2c7e0cf534504dde5dfbe7c84bdc4fe24cd46e + // 6:memory:/kubepods/podc418bc7e-5ed8-11eb-801b-5254000a3b55/6ebf770b3e5d195576eca906e112c492507587cdfabc6d491caa2bd5d633cb6f + kubePattern = regexp.MustCompile(`\d+:memory:/kubepods/(?:[^/]+/)?pod([^/]+)/([0-9a-f]{64})`) ) +type ContainerKey struct { + PodUid string + ContainerId string +} + type ContainerInfo struct { PodUid string ContainerId string + + PodName string + PodNamespace string + ContainerName string } func NewCgroupMapper(devices []dcgm.Device) *CgroupMapper { logrus.Infof("Container metrics collection enabled!") - return &CgroupMapper{DeviceList: devices} + client, err := initKubeClient() + if err != nil { + logrus.Errorf("can't get kubernetes client: %s", err) + return nil + } + return &CgroupMapper{DeviceList: devices, K8sClient: client} } func (c *CgroupMapper) Name() string { @@ -55,11 +81,10 @@ func (c *CgroupMapper) K8sProcess() ([]ContainerMetric, error) { return nil, err } - containers, err := mapContainerPid(pids) + containers, err := c.mapContainerPid(pids) if err != nil { return nil, err } - fmt.Printf("containers: %+v", containers) utils, err := devGetAllProcessesUtilization(device.UUID) if err != nil { @@ -87,14 +112,14 @@ func (c *CgroupMapper) K8sProcess() ([]ContainerMetric, error) { GPUUUID: device.UUID, GPUDevice: fmt.Sprintf("nvidia%d", device.GPU), - Namespace: "default", - Pod: container.PodUid, - Container: container.ContainerId, + Namespace: container.PodNamespace, + Pod: container.PodName, + Container: container.ContainerName, }) } - fmt.Printf("container: %+v\n", container) } } + logrus.Infof("metrics: %+v\n", metrics) return metrics, nil } @@ -102,23 +127,47 @@ func (c *CgroupMapper) Process(metrics [][]Metric) error { return nil } -func getPodContainer(pid uint) (ContainerInfo, error) { - f, err := os.Open(fmt.Sprintf("/rootfs/proc/%d/cgroup", pid)) +func (c *CgroupMapper) getPodInfo() (map[ContainerKey]ContainerInfo, error) { + logrus.Infof("Get Pod and Container Information") + nodeName := os.Getenv("NODE_NAME") + if nodeName == "" { + logrus.Infof("Failed to get node name") + return nil, fmt.Errorf("Failed to get node name") + } + selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName}) + + ctx, _ := context.WithCancel(context.Background()) + podList, err := c.K8sClient.CoreV1().Pods(v1.NamespaceAll).List(ctx, metav1.ListOptions{ + FieldSelector: selector.String(), + LabelSelector: labels.Everything().String(), + ResourceVersion: "0", + }) if err != nil { - logrus.Errorf("open cgroup failed: %s", err) - return ContainerInfo{}, err + logrus.Infof("Get Pod and Container Information failed: %s", err) + return nil, err } - defer f.Close() - scanner := bufio.NewScanner(f) - for scanner.Scan() { - line := scanner.Text() - parts := kubePattern.FindStringSubmatch(line) - if parts != nil { - return ContainerInfo{PodUid: parts[1], ContainerId: parts[2]}, nil + logrus.Infof("Get some Pod and Container(%d).", len(podList.Items)) + containers := make(map[ContainerKey]ContainerInfo) + for _, pod := range podList.Items { + fmt.Printf("pod name(%s), namespace(%s), uid(%s)\n", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace, pod.ObjectMeta.UID) + for _, container := range pod.Status.ContainerStatuses { + containerId := strings.Replace(container.ContainerID, "docker://", "", 1) + containers[ContainerKey{ + PodUid: fmt.Sprintf("%s", pod.ObjectMeta.UID), + ContainerId: containerId, + }] = ContainerInfo{ + PodUid: fmt.Sprintf("%s", pod.ObjectMeta.UID), + ContainerId: containerId, + PodName: pod.ObjectMeta.Name, + PodNamespace: pod.ObjectMeta.Namespace, + ContainerName: container.Name, + } + fmt.Printf("container name(%s), id(%s)\n", container.Name, containerId) } } - return ContainerInfo{}, fmt.Errorf("could't find pod by pid(%d)", pid) + fmt.Printf("(getPodInfo) containers: %+v\n", containers) + return containers, nil } func listPidOnDev(uuid string) ([]uint, error) { @@ -140,6 +189,7 @@ func listPidOnDev(uuid string) ([]uint, error) { for _, info := range infos { pids = append(pids, info.PID) } + fmt.Printf("[listPidOnDev] pids: %+v\n", pids) return pids, nil } @@ -170,10 +220,16 @@ func devGetAllProcessesUtilization(devUuid string) (map[uint]dcgm.UtilizationInf return util, nil } -func mapContainerPid(pids []uint) (map[ContainerInfo][]uint, error) { - containers := make(map[ContainerInfo][]uint) +func (c *CgroupMapper) mapContainerPid(pids []uint) (map[ContainerInfo][]uint, error) { + infos, err := c.getPodInfo() + if err != nil { + logrus.Errorf("failed to get pod Info: %s", err) + return nil, err + } + containers := make(map[ContainerInfo][]uint) for _, pid := range pids { + fmt.Printf("[mapContainerPid] search pid(%d)\n", pid) f, err := os.Open(fmt.Sprintf("/rootfs/proc/%d/cgroup", pid)) if err != nil { logrus.Errorf("open cgroup failed: %s", err) @@ -185,15 +241,20 @@ func mapContainerPid(pids []uint) (map[ContainerInfo][]uint, error) { for scanner.Scan() { line := scanner.Text() parts := kubePattern.FindStringSubmatch(line) + fmt.Printf("[mapContainerPid] find pid(%d) parts: %+v\n", pid, parts) if parts != nil { - key := ContainerInfo{ - PodUid: parts[1], - ContainerId: parts[2], + value, ok := infos[ContainerKey{PodUid: parts[1], ContainerId: parts[2]}] + if !ok { + logrus.Errorf("container doesn't exist: %v", ContainerKey{PodUid: parts[1], ContainerId: parts[2]}) + return nil, fmt.Errorf("container doesn't exist: %v", ContainerKey{PodUid: parts[1], ContainerId: parts[2]}) } - containers[key] = append(containers[key], pid) + containers[value] = append(containers[value], pid) + } else { + logrus.Errorf("[mapContainerPid] can't find container for pid(%d)", pid) } } } + fmt.Printf("(mapContainerPid)containers: %+v\n", containers) return containers, nil } @@ -219,3 +280,33 @@ func aggreContainersUtil(utils map[uint]dcgm.UtilizationInfo, containers map[Con return containerUtils, nil } + +func initKubeClient() (*kubernetes.Clientset, error) { + logrus.Infof("Init Kube Client") + kubeconfigFile := os.Getenv("KUBECONFIG") + var err error + var config *rest.Config + + if _, err = os.Stat(kubeconfigFile); err != nil { + config, err = rest.InClusterConfig() + if err != nil { + logrus.Errorf("Failed due to %v", err) + return nil, err + } + } else { + config, err = clientcmd.BuildConfigFromFlags("", kubeconfigFile) + if err != nil { + logrus.Errorf("Failed due to %v", err) + return nil, err + } + } + + client, err := kubernetes.NewForConfig(config) + if err != nil { + logrus.Errorf("Failed due to %v", err) + return nil, err + } + + logrus.Infof("Init KubeClient success.") + return client, nil +} diff --git a/pkg/types.go b/pkg/types.go index 3bebca0..bfec6fa 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -23,6 +23,9 @@ import ( "text/template" "github.com/NVIDIA/gpu-monitoring-tools/bindings/go/dcgm" + + "k8s.io/client-go/informers" + "k8s.io/client-go/kubernetes" ) var ( @@ -144,4 +147,7 @@ type PodInfo struct { type CgroupMapper struct { DeviceList []dcgm.Device + K8sClient *kubernetes.Clientset + // SharedInformers gives access to informers for the controller. + SharedInformers informers.SharedInformerFactory }