diff --git a/dcgm-exporter.yaml b/dcgm-exporter.yaml index 40b7e2b..a183c66 100644 --- a/dcgm-exporter.yaml +++ b/dcgm-exporter.yaml @@ -55,6 +55,9 @@ spec: - name: cgroup readOnly: true mountPath: "/sys/fs/cgroup" + - name: proc + readOnly: true + mountPath: "/rootfs/proc" volumes: - name: "pod-gpu-resources" hostPath: @@ -63,6 +66,10 @@ spec: hostPath: type: Directory path: "/sys/fs/cgroup" + - name: proc + hostPath: + type: Directory + path: "/proc" --- diff --git a/pkg/container.go b/pkg/container.go index 1e6012b..4a60db6 100644 --- a/pkg/container.go +++ b/pkg/container.go @@ -31,13 +31,13 @@ var ( kubePattern = regexp.MustCompile(`\d+:memory:/kubepods/[^/]+/pod([^/]+)/([0-9a-f]{64})`) ) -type PodContainer struct { +type ContainerInfo struct { PodUid string ContainerId string } func NewCgroupMapper(devices []dcgm.Device) *CgroupMapper { - logrus.Infof("Kubernetes metrics (by cgroup) collection enabled!") + logrus.Infof("Container metrics collection enabled!") return &CgroupMapper{DeviceList: devices} } @@ -50,47 +50,50 @@ func (c *CgroupMapper) K8sProcess() ([]ContainerMetric, error) { var metrics []ContainerMetric for _, device := range c.DeviceList { - fmt.Println("GPU: ", device.GPU) pids, err := listPidOnDev(device.UUID) if err != nil { return nil, err } - utils, err := getUtilInfo(device.UUID) + containers, err := mapContainerPid(pids) if err != nil { return nil, err } + fmt.Printf("containers: %+v", containers) - for _, pid := range pids { - pod, err := getPodContainer(pid) - if err == nil { - if util, ok := utils[pid]; ok { - utilMap := make(map[string]string) - utilMap["DCGM_FI_K8S_GPU_UTIL"] = string(util.GPU) - utilMap["DCGM_FI_K8S_MEM_COPY_UTIL"] = string(util.Memory) - utilMap["DCGM_FI_K8S_ENC_UTIL"] = string(util.Encoder) - utilMap["DCGM_FI_K8S_DEC_UTIL"] = string(util.Decoder) - - for field, value := range utilMap { - metrics = append(metrics, ContainerMetric{ - Name: field, - Value: value, - - GPU: fmt.Sprintf("%d", device.GPU), - GPUUUID: device.UUID, - GPUDevice: fmt.Sprintf("nvidia%d", device.GPU), - - Namespace: "default", - Pod: pod.PodUid, - Container: pod.ContainerId, - }) - } - fmt.Printf("util(%d): %+v\n", pid, util) - fmt.Printf("pod: %+v\n", pod) - } - } + utils, err := devGetAllProcessesUtilization(device.UUID) + if err != nil { + return nil, err + } + + containerUtils, err := aggreContainersUtil(utils, containers) + if err != nil { + return nil, err } + for container, util := range containerUtils { + utilMap := make(map[string]string) + utilMap["DCGM_FI_K8S_GPU_UTIL"] = fmt.Sprintf("%d", util.GPU) + utilMap["DCGM_FI_K8S_MEM_COPY_UTIL"] = fmt.Sprintf("%d", util.Memory) + utilMap["DCGM_FI_K8S_ENC_UTIL"] = fmt.Sprintf("%d", util.Encoder) + utilMap["DCGM_FI_K8S_DEC_UTIL"] = fmt.Sprintf("%d", util.Decoder) + + for field, value := range utilMap { + metrics = append(metrics, ContainerMetric{ + Name: field, + Value: value, + + GPU: fmt.Sprintf("%d", device.GPU), + GPUUUID: device.UUID, + GPUDevice: fmt.Sprintf("nvidia%d", device.GPU), + + Namespace: "default", + Pod: container.PodUid, + Container: container.ContainerId, + }) + } + fmt.Printf("container: %+v\n", container) + } } return metrics, nil } @@ -99,11 +102,11 @@ func (c *CgroupMapper) Process(metrics [][]Metric) error { return nil } -func getPodContainer(pid uint) (PodContainer, error) { +func getPodContainer(pid uint) (ContainerInfo, error) { f, err := os.Open(fmt.Sprintf("/rootfs/proc/%d/cgroup", pid)) if err != nil { logrus.Errorf("open cgroup failed: %s", err) - return PodContainer{}, err + return ContainerInfo{}, err } defer f.Close() @@ -112,10 +115,10 @@ func getPodContainer(pid uint) (PodContainer, error) { line := scanner.Text() parts := kubePattern.FindStringSubmatch(line) if parts != nil { - return PodContainer{PodUid: parts[1], ContainerId: parts[2]}, nil + return ContainerInfo{PodUid: parts[1], ContainerId: parts[2]}, nil } } - return PodContainer{}, fmt.Errorf("could't find pod by pid(%d)", pid) + return ContainerInfo{}, fmt.Errorf("could't find pod by pid(%d)", pid) } func listPidOnDev(uuid string) ([]uint, error) { @@ -140,17 +143,18 @@ func listPidOnDev(uuid string) ([]uint, error) { return pids, nil } -func getUtilInfo(uuid string) (map[uint]dcgm.UtilizationInfo, error) { +func devGetAllProcessesUtilization(devUuid string) (map[uint]dcgm.UtilizationInfo, error) { util := make(map[uint]dcgm.UtilizationInfo) - device, err := nvml.NewDeviceByUUID(uuid) + device, err := nvml.NewDeviceByUUID(devUuid) if err != nil { - return nil, fmt.Errorf("Error getting device %s: %v", uuid, err) + return nil, fmt.Errorf("Error getting device %s: %v", devUuid, err) } pInfo, err := device.GetAllProcessesUtilization() if err != nil { - return nil, fmt.Errorf("Error getting device %s processes: %v", uuid, err) + return nil, fmt.Errorf("Error getting device %s processes: %v", devUuid, err) } + if len(pInfo) == 0 { return nil, nil } @@ -165,3 +169,53 @@ func getUtilInfo(uuid string) (map[uint]dcgm.UtilizationInfo, error) { return util, nil } + +func mapContainerPid(pids []uint) (map[ContainerInfo][]uint, error) { + containers := make(map[ContainerInfo][]uint) + + for _, pid := range pids { + f, err := os.Open(fmt.Sprintf("/rootfs/proc/%d/cgroup", pid)) + if err != nil { + logrus.Errorf("open cgroup failed: %s", err) + return nil, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + parts := kubePattern.FindStringSubmatch(line) + if parts != nil { + key := ContainerInfo{ + PodUid: parts[1], + ContainerId: parts[2], + } + containers[key] = append(containers[key], pid) + } + } + } + return containers, nil +} + +func aggreContainersUtil(utils map[uint]dcgm.UtilizationInfo, containers map[ContainerInfo][]uint) (map[ContainerInfo]dcgm.UtilizationInfo, error) { + containerUtils := make(map[ContainerInfo]dcgm.UtilizationInfo) + + for container, pids := range containers { + total := dcgm.UtilizationInfo{ + GPU: 0, + Memory: 0, + Encoder: 0, + Decoder: 0, + } + for _, pid := range pids { + util := utils[pid] + total.GPU += util.GPU + total.Memory += util.Memory + total.Encoder += util.Encoder + total.Decoder += util.Decoder + } + containerUtils[container] = total + } + + return containerUtils, nil +}