Skip to content

Commit

Permalink
Add container metrics collector
Browse files Browse the repository at this point in the history
Signed-off-by: keyangxie <[email protected]>
  • Loading branch information
xiekeyang committed Feb 2, 2021
1 parent a92610b commit 15156d6
Show file tree
Hide file tree
Showing 5 changed files with 257 additions and 1 deletion.
167 changes: 167 additions & 0 deletions pkg/container.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package main

import (
"bufio"
"fmt"
"os"
"regexp"

"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/dcgm"
"github.com/NVIDIA/gpu-monitoring-tools/bindings/go/nvml"
"github.com/sirupsen/logrus"
)

var (
kubePattern = regexp.MustCompile(`\d+:memory:/kubepods/[^/]+/pod([^/]+)/([0-9a-f]{64})`)
)

type PodContainer struct {
PodUid string
ContainerId string
}

func NewCgroupMapper(devices []dcgm.Device) *CgroupMapper {
logrus.Infof("Kubernetes metrics (by cgroup) collection enabled!")
return &CgroupMapper{DeviceList: devices}
}

func (c *CgroupMapper) Name() string {
return "cgroupMapper"
}

func (c *CgroupMapper) K8sProcess() ([]ContainerMetric, error) {

var metrics []ContainerMetric

for _, device := range c.DeviceList {
fmt.Println("GPU: ", device.GPU)
pids, err := listPidOnDev(device.UUID)
if err != nil {
return nil, err
}

utils, err := getUtilInfo(device.UUID)
if err != nil {
return nil, err
}

for _, pid := range pids {
pod, err := getPodContainer(pid)
if err == nil {
if util, ok := utils[pid]; ok {
utilMap := make(map[string]string)
utilMap["DCGM_FI_K8S_GPU_UTIL"] = string(util.GPU)
utilMap["DCGM_FI_K8S_MEM_COPY_UTIL"] = string(util.Memory)
utilMap["DCGM_FI_K8S_ENC_UTIL"] = string(util.Encoder)
utilMap["DCGM_FI_K8S_DEC_UTIL"] = string(util.Decoder)

for field, value := range utilMap {
metrics = append(metrics, ContainerMetric{
Name: field,
Value: value,

GPU: fmt.Sprintf("%d", device.GPU),
GPUUUID: device.UUID,
GPUDevice: fmt.Sprintf("nvidia%d", device.GPU),

Namespace: "default",
Pod: pod.PodUid,
Container: pod.ContainerId,
})
}
fmt.Printf("util(%d): %+v\n", pid, util)
fmt.Printf("pod: %+v\n", pod)
}
}
}

}
return metrics, nil
}

func (c *CgroupMapper) Process(metrics [][]Metric) error {
return nil
}

func getPodContainer(pid uint) (PodContainer, error) {
f, err := os.Open(fmt.Sprintf("/rootfs/proc/%d/cgroup", pid))
if err != nil {
logrus.Errorf("open cgroup failed: %s", err)
return PodContainer{}, err
}
defer f.Close()

scanner := bufio.NewScanner(f)
for scanner.Scan() {
line := scanner.Text()
parts := kubePattern.FindStringSubmatch(line)
if parts != nil {
return PodContainer{PodUid: parts[1], ContainerId: parts[2]}, nil
}
}
return PodContainer{}, fmt.Errorf("could't find pod by pid(%d)", pid)
}

func listPidOnDev(uuid string) ([]uint, error) {
device, err := nvml.NewDeviceByUUID(uuid)
if err != nil {
return nil, fmt.Errorf("Error getting device %s: %v", uuid, err)
}

infos, err := device.GetAllRunningProcesses()
if err != nil {
return nil, fmt.Errorf("Error getting device %s processes: %v", uuid, err)
}

if len(infos) == 0 {
return nil, nil
}

var pids []uint
for _, info := range infos {
pids = append(pids, info.PID)
}
return pids, nil
}

func getUtilInfo(uuid string) (map[uint]dcgm.UtilizationInfo, error) {
util := make(map[uint]dcgm.UtilizationInfo)
device, err := nvml.NewDeviceByUUID(uuid)
if err != nil {
return nil, fmt.Errorf("Error getting device %s: %v", uuid, err)
}

pInfo, err := device.GetAllProcessesUtilization()
if err != nil {
return nil, fmt.Errorf("Error getting device %s processes: %v", uuid, err)
}
if len(pInfo) == 0 {
return nil, nil
}
for _, info := range pInfo {
util[info.PID] = dcgm.UtilizationInfo{
GPU: int64(info.SmUtil),
Memory: int64(info.MemUtil),
Encoder: int64(info.EncUtil),
Decoder: int64(info.DecUtil),
}
}

return util, nil
}
13 changes: 13 additions & 0 deletions pkg/gpu_collector.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,19 @@ func NewDCGMCollector(c []Counter) (*DCGMCollector, func(), error) {
DeviceFields: NewDeviceFields(c),
}

count, err := dcgm.GetAllDeviceCount()
if err != nil {
return nil, func() {}, err
}

for i := uint(0); i < count; i++ {
deviceInfo, err := dcgm.GetDeviceInfo(i)
if err != nil {
return nil, func() {}, err
}
collector.DeviceList = append(collector.DeviceList, deviceInfo)
}

cleanups, err := SetupDcgmFieldsWatch(collector.DeviceFields)
if err != nil {
return nil, func() {}, err
Expand Down
4 changes: 4 additions & 0 deletions pkg/kubernetes.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,10 @@ func (p *PodMapper) Name() string {
return "podMapper"
}

func (p *PodMapper) K8sProcess() ([]ContainerMetric, error) {
return nil, nil
}

func (p *PodMapper) Process(metrics [][]Metric) error {
_, err := os.Stat(socketPath)
if os.IsNotExist(err) {
Expand Down
55 changes: 54 additions & 1 deletion pkg/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,8 @@ func NewMetricsPipeline(c *Config) (*MetricsPipeline, func(), error) {
if c.Kubernetes {
transformations = append(transformations, NewPodMapper(c))
}
// TODO: kxie
transformations = append(transformations, NewCgroupMapper(gpuCollector.DeviceList))

return &MetricsPipeline{
config: c,
Expand Down Expand Up @@ -116,19 +118,34 @@ func (m *MetricsPipeline) run() (string, error) {
return "", fmt.Errorf("Failed to collect metrics with error: %v", err)
}

var container []ContainerMetric

for _, transform := range m.transformations {
err := transform.Process(metrics)
if err != nil {
return "", fmt.Errorf("Failed to transform metrics for transorm %s: %v", err, transform.Name())
}

container, err = transform.K8sProcess()
if err != nil {
return "", fmt.Errorf("Failed to transform metrics for transorm %s: %v", err, transform.Name())
}
}

formated, err := FormatMetrics(m.countersText, m.metricsFormat, metrics)
if err != nil {
return "", fmt.Errorf("Failed to format metrics with error: %v", err)
}

return formated, nil
sep := `
`

//TODO: kxie
containerFormat, err := FormatContainerMetrics(container)
if err != nil {
return "", fmt.Errorf("Failed to format container metrics with error: %v", err)
}
return formated + sep + containerFormat, nil
}

/*
Expand Down Expand Up @@ -185,3 +202,39 @@ func FormatMetrics(countersText string, t *template.Template, m [][]Metric) (str

return countersText + res.String(), nil
}

func FormatContainerMetrics(metrics []ContainerMetric) (string, error) {
format := `{{ .Name }}{gpu="{{.GPU}}",UUID="{{.GPUUUID}}",device="{{.GPUDevice}}",container="{{.Container}}",namespace="{{.Namespace}}",pod="{{.Pod}}"} {{.Value}}
`
t := template.Must(template.New("metrics").Parse(format))
var res bytes.Buffer
// metrics = []ContainerMetric{
// ContainerMetric{
// Name: "DCGM_FI_DEV_SM_CLOCK",
// Value: "1515",
// GPU: "kxie",
// GPUUUID: "GPU-de4b1bb0-3ec3-67ed-b3e2-c32d8546e818",
// GPUDevice: "nvidia0",
// Namespace: "default",
// Pod: "nbody-pod",
// Container: "nbody",
// },

// ContainerMetric{
// Name: "DCGM_FI_DEV_GPU_UTIL",
// Value: "100",
// GPU: "kxie",
// GPUUUID: "GPU-de4b1bb0-3ec3-67ed-b3e2-c32d8546e818",
// GPUDevice: "nvidia0",
// Namespace: "default",
// Pod: "nbody-pod",
// Container: "nbody",
// },
// }
for _, m := range metrics {
if err := t.Execute(&res, m); err != nil {
return "", err
}
}
return res.String(), nil
}
19 changes: 19 additions & 0 deletions pkg/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ type Config struct {

type Transform interface {
Process(metrics [][]Metric) error
K8sProcess() ([]ContainerMetric, error)
Name() string
}

Expand All @@ -71,6 +72,7 @@ type MetricsPipeline struct {
type DCGMCollector struct {
Counters []Counter
DeviceFields []dcgm.Short
DeviceList []dcgm.Device
Cleanups []func()
}

Expand All @@ -92,6 +94,19 @@ type Metric struct {
Attributes map[string]string
}

type ContainerMetric struct {
Name string
Value string

GPU string
GPUUUID string
GPUDevice string

Namespace string
Pod string
Container string
}

func (m Metric) getIDOfType(idType KubernetesGPUIDType) (string, error) {
switch idType {
case GPUUID:
Expand Down Expand Up @@ -126,3 +141,7 @@ type PodInfo struct {
Namespace string
Container string
}

type CgroupMapper struct {
DeviceList []dcgm.Device
}

0 comments on commit 15156d6

Please sign in to comment.