Skip to content

Commit

Permalink
map pids to the owner container
Browse files Browse the repository at this point in the history
Signed-off-by: keyangxie <[email protected]>
  • Loading branch information
xiekeyang committed Apr 22, 2021
1 parent 1f6ff6d commit 3b173a0
Show file tree
Hide file tree
Showing 2 changed files with 123 additions and 26 deletions.
143 changes: 117 additions & 26 deletions pkg/container.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,27 +18,53 @@ package main

import (
"bufio"
"context"
"fmt"
"os"
"regexp"
"strings"

"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/dcgm"
"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/nvml"
"github.com/sirupsen/logrus"

"k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/fields"
"k8s.io/apimachinery/pkg/labels"
"k8s.io/client-go/kubernetes"
"k8s.io/client-go/rest"
"k8s.io/client-go/tools/clientcmd"
)

var (
kubePattern = regexp.MustCompile(`\d+:memory:/kubepods/[^/]+/pod([^/]+)/([0-9a-f]{64})`)
// 8:memory:/kubepods/besteffort/pod187fd8f5-a423-4a1a-89b8-8928a71746a4/4923b671ebdf52ab54ad88084a2c7e0cf534504dde5dfbe7c84bdc4fe24cd46e
// 6:memory:/kubepods/podc418bc7e-5ed8-11eb-801b-5254000a3b55/6ebf770b3e5d195576eca906e112c492507587cdfabc6d491caa2bd5d633cb6f
kubePattern = regexp.MustCompile(`\d+:memory:/kubepods/(?:[^/]+/)?pod([^/]+)/([0-9a-f]{64})`)
)

type ContainerKey struct {
PodUid string
ContainerId string
}

type ContainerInfo struct {
PodUid string
ContainerId string

PodName string
PodNamespace string
ContainerName string
}

func NewCgroupMapper(devices []dcgm.Device) *CgroupMapper {
logrus.Infof("Container metrics collection enabled!")
return &CgroupMapper{DeviceList: devices}
client, err := initKubeClient()
if err != nil {
logrus.Errorf("can't get kubernetes client: %s", err)
return nil
}
return &CgroupMapper{DeviceList: devices, K8sClient: client}
}

func (c *CgroupMapper) Name() string {
Expand All @@ -55,11 +81,10 @@ func (c *CgroupMapper) K8sProcess() ([]ContainerMetric, error) {
return nil, err
}

containers, err := mapContainerPid(pids)
containers, err := c.mapContainerPid(pids)
if err != nil {
return nil, err
}
fmt.Printf("containers: %+v", containers)

utils, err := devGetAllProcessesUtilization(device.UUID)
if err != nil {
Expand Down Expand Up @@ -87,38 +112,62 @@ func (c *CgroupMapper) K8sProcess() ([]ContainerMetric, error) {
GPUUUID: device.UUID,
GPUDevice: fmt.Sprintf("nvidia%d", device.GPU),

Namespace: "default",
Pod: container.PodUid,
Container: container.ContainerId,
Namespace: container.PodNamespace,
Pod: container.PodName,
Container: container.ContainerName,
})
}
fmt.Printf("container: %+v\n", container)
}
}
logrus.Infof("metrics: %+v\n", metrics)
return metrics, nil
}

func (c *CgroupMapper) Process(metrics [][]Metric) error {
return nil
}

func getPodContainer(pid uint) (ContainerInfo, error) {
f, err := os.Open(fmt.Sprintf("/rootfs/proc/%d/cgroup", pid))
func (c *CgroupMapper) getPodInfo() (map[ContainerKey]ContainerInfo, error) {
logrus.Infof("Get Pod and Container Information")
nodeName := os.Getenv("NODE_NAME")
if nodeName == "" {
logrus.Infof("Failed to get node name")
return nil, fmt.Errorf("Failed to get node name")
}
selector := fields.SelectorFromSet(fields.Set{"spec.nodeName": nodeName})

ctx, _ := context.WithCancel(context.Background())
podList, err := c.K8sClient.CoreV1().Pods(v1.NamespaceAll).List(ctx, metav1.ListOptions{
FieldSelector: selector.String(),
LabelSelector: labels.Everything().String(),
ResourceVersion: "0",
})
if err != nil {
logrus.Errorf("open cgroup failed: %s", err)
return ContainerInfo{}, err
logrus.Infof("Get Pod and Container Information failed: %s", err)
return nil, err
}
defer f.Close()

scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
parts := kubePattern.FindStringSubmatch(line)
if parts != nil {
return ContainerInfo{PodUid: parts[1], ContainerId: parts[2]}, nil
logrus.Infof("Get some Pod and Container(%d).", len(podList.Items))
containers := make(map[ContainerKey]ContainerInfo)
for _, pod := range podList.Items {
fmt.Printf("pod name(%s), namespace(%s), uid(%s)\n", pod.ObjectMeta.Name, pod.ObjectMeta.Namespace, pod.ObjectMeta.UID)
for _, container := range pod.Status.ContainerStatuses {
containerId := strings.Replace(container.ContainerID, "docker://", "", 1)
containers[ContainerKey{
PodUid: fmt.Sprintf("%s", pod.ObjectMeta.UID),
ContainerId: containerId,
}] = ContainerInfo{
PodUid: fmt.Sprintf("%s", pod.ObjectMeta.UID),
ContainerId: containerId,
PodName: pod.ObjectMeta.Name,
PodNamespace: pod.ObjectMeta.Namespace,
ContainerName: container.Name,
}
fmt.Printf("container name(%s), id(%s)\n", container.Name, containerId)
}
}
return ContainerInfo{}, fmt.Errorf("could't find pod by pid(%d)", pid)
fmt.Printf("(getPodInfo) containers: %+v\n", containers)
return containers, nil
}

func listPidOnDev(uuid string) ([]uint, error) {
Expand All @@ -140,6 +189,7 @@ func listPidOnDev(uuid string) ([]uint, error) {
for _, info := range infos {
pids = append(pids, info.PID)
}
fmt.Printf("[listPidOnDev] pids: %+v\n", pids)
return pids, nil
}

Expand Down Expand Up @@ -170,10 +220,16 @@ func devGetAllProcessesUtilization(devUuid string) (map[uint]dcgm.UtilizationInf
return util, nil
}

func mapContainerPid(pids []uint) (map[ContainerInfo][]uint, error) {
containers := make(map[ContainerInfo][]uint)
func (c *CgroupMapper) mapContainerPid(pids []uint) (map[ContainerInfo][]uint, error) {
infos, err := c.getPodInfo()
if err != nil {
logrus.Errorf("failed to get pod Info: %s", err)
return nil, err
}

containers := make(map[ContainerInfo][]uint)
for _, pid := range pids {
fmt.Printf("[mapContainerPid] search pid(%d)\n", pid)
f, err := os.Open(fmt.Sprintf("/rootfs/proc/%d/cgroup", pid))
if err != nil {
logrus.Errorf("open cgroup failed: %s", err)
Expand All @@ -185,15 +241,20 @@ func mapContainerPid(pids []uint) (map[ContainerInfo][]uint, error) {
for scanner.Scan() {
line := scanner.Text()
parts := kubePattern.FindStringSubmatch(line)
fmt.Printf("[mapContainerPid] find pid(%d) parts: %+v\n", pid, parts)
if parts != nil {
key := ContainerInfo{
PodUid: parts[1],
ContainerId: parts[2],
value, ok := infos[ContainerKey{PodUid: parts[1], ContainerId: parts[2]}]
if !ok {
logrus.Errorf("container doesn't exist: %v", ContainerKey{PodUid: parts[1], ContainerId: parts[2]})
return nil, fmt.Errorf("container doesn't exist: %v", ContainerKey{PodUid: parts[1], ContainerId: parts[2]})
}
containers[key] = append(containers[key], pid)
containers[value] = append(containers[value], pid)
} else {
logrus.Errorf("[mapContainerPid] can't find container for pid(%d)", pid)
}
}
}
fmt.Printf("(mapContainerPid)containers: %+v\n", containers)
return containers, nil
}

Expand All @@ -219,3 +280,33 @@ func aggreContainersUtil(utils map[uint]dcgm.UtilizationInfo, containers map[Con

return containerUtils, nil
}

func initKubeClient() (*kubernetes.Clientset, error) {
logrus.Infof("Init Kube Client")
kubeconfigFile := os.Getenv("KUBECONFIG")
var err error
var config *rest.Config

if _, err = os.Stat(kubeconfigFile); err != nil {
config, err = rest.InClusterConfig()
if err != nil {
logrus.Errorf("Failed due to %v", err)
return nil, err
}
} else {
config, err = clientcmd.BuildConfigFromFlags("", kubeconfigFile)
if err != nil {
logrus.Errorf("Failed due to %v", err)
return nil, err
}
}

client, err := kubernetes.NewForConfig(config)
if err != nil {
logrus.Errorf("Failed due to %v", err)
return nil, err
}

logrus.Infof("Init KubeClient success.")
return client, nil
}
6 changes: 6 additions & 0 deletions pkg/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,9 @@ import (
"text/template"

"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/dcgm"

"k8s.io/client-go/informers"
"k8s.io/client-go/kubernetes"
)

var (
Expand Down Expand Up @@ -144,4 +147,7 @@ type PodInfo struct {

type CgroupMapper struct {
DeviceList []dcgm.Device
K8sClient *kubernetes.Clientset
// SharedInformers gives access to informers for the controller.
SharedInformers informers.SharedInformerFactory
}

0 comments on commit 3b173a0

Please sign in to comment.