From 15156d6c329117dc897f553146ae7ab42844314c Mon Sep 17 00:00:00 2001 From: keyangxie Date: Fri, 22 Jan 2021 17:28:08 +0800 Subject: [PATCH] Add container metrics collector Signed-off-by: keyangxie --- pkg/container.go | 167 +++++++++++++++++++++++++++++++++++++++++++ pkg/gpu_collector.go | 13 ++++ pkg/kubernetes.go | 4 ++ pkg/pipeline.go | 55 +++++++++++++- pkg/types.go | 19 +++++ 5 files changed, 257 insertions(+), 1 deletion(-) create mode 100644 pkg/container.go diff --git a/pkg/container.go b/pkg/container.go new file mode 100644 index 0000000..1e6012b --- /dev/null +++ b/pkg/container.go @@ -0,0 +1,167 @@ +/* + * Copyright (c) 2020, NVIDIA CORPORATION. All rights reserved. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package main + +import ( + "bufio" + "fmt" + "os" + "regexp" + + "github.com/NVIDIA/gpu-monitoring-tools/bindings/go/dcgm" + "github.com/NVIDIA/gpu-monitoring-tools/bindings/go/nvml" + "github.com/sirupsen/logrus" +) + +var ( + kubePattern = regexp.MustCompile(`\d+:memory:/kubepods/[^/]+/pod([^/]+)/([0-9a-f]{64})`) +) + +type PodContainer struct { + PodUid string + ContainerId string +} + +func NewCgroupMapper(devices []dcgm.Device) *CgroupMapper { + logrus.Infof("Kubernetes metrics (by cgroup) collection enabled!") + return &CgroupMapper{DeviceList: devices} +} + +func (c *CgroupMapper) Name() string { + return "cgroupMapper" +} + +func (c *CgroupMapper) K8sProcess() ([]ContainerMetric, error) { + + var metrics []ContainerMetric + + for _, device := range c.DeviceList { + fmt.Println("GPU: ", device.GPU) + pids, err := listPidOnDev(device.UUID) + if err != nil { + return nil, err + } + + utils, err := getUtilInfo(device.UUID) + if err != nil { + return nil, err + } + + for _, pid := range pids { + pod, err := getPodContainer(pid) + if err == nil { + if util, ok := utils[pid]; ok { + utilMap := make(map[string]string) + utilMap["DCGM_FI_K8S_GPU_UTIL"] = string(util.GPU) + utilMap["DCGM_FI_K8S_MEM_COPY_UTIL"] = string(util.Memory) + utilMap["DCGM_FI_K8S_ENC_UTIL"] = string(util.Encoder) + utilMap["DCGM_FI_K8S_DEC_UTIL"] = string(util.Decoder) + + for field, value := range utilMap { + metrics = append(metrics, ContainerMetric{ + Name: field, + Value: value, + + GPU: fmt.Sprintf("%d", device.GPU), + GPUUUID: device.UUID, + GPUDevice: fmt.Sprintf("nvidia%d", device.GPU), + + Namespace: "default", + Pod: pod.PodUid, + Container: pod.ContainerId, + }) + } + fmt.Printf("util(%d): %+v\n", pid, util) + fmt.Printf("pod: %+v\n", pod) + } + } + } + + } + return metrics, nil +} + +func (c *CgroupMapper) Process(metrics [][]Metric) error { + return nil +} + +func getPodContainer(pid uint) (PodContainer, error) { + f, err := os.Open(fmt.Sprintf("/rootfs/proc/%d/cgroup", pid)) + if err != nil { + logrus.Errorf("open cgroup failed: %s", err) + return PodContainer{}, err + } + defer f.Close() + + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := scanner.Text() + parts := kubePattern.FindStringSubmatch(line) + if parts != nil { + return PodContainer{PodUid: parts[1], ContainerId: parts[2]}, nil + } + } + return PodContainer{}, fmt.Errorf("could't find pod by pid(%d)", pid) +} + +func listPidOnDev(uuid string) ([]uint, error) { + device, err := nvml.NewDeviceByUUID(uuid) + if err != nil { + return nil, fmt.Errorf("Error getting device %s: %v", uuid, err) + } + + infos, err := device.GetAllRunningProcesses() + if err != nil { + return nil, fmt.Errorf("Error getting device %s processes: %v", uuid, err) + } + + if len(infos) == 0 { + return nil, nil + } + + var pids []uint + for _, info := range infos { + pids = append(pids, info.PID) + } + return pids, nil +} + +func getUtilInfo(uuid string) (map[uint]dcgm.UtilizationInfo, error) { + util := make(map[uint]dcgm.UtilizationInfo) + device, err := nvml.NewDeviceByUUID(uuid) + if err != nil { + return nil, fmt.Errorf("Error getting device %s: %v", uuid, err) + } + + pInfo, err := device.GetAllProcessesUtilization() + if err != nil { + return nil, fmt.Errorf("Error getting device %s processes: %v", uuid, err) + } + if len(pInfo) == 0 { + return nil, nil + } + for _, info := range pInfo { + util[info.PID] = dcgm.UtilizationInfo{ + GPU: int64(info.SmUtil), + Memory: int64(info.MemUtil), + Encoder: int64(info.EncUtil), + Decoder: int64(info.DecUtil), + } + } + + return util, nil +} diff --git a/pkg/gpu_collector.go b/pkg/gpu_collector.go index 42e826a..6dc0001 100644 --- a/pkg/gpu_collector.go +++ b/pkg/gpu_collector.go @@ -27,6 +27,19 @@ func NewDCGMCollector(c []Counter) (*DCGMCollector, func(), error) { DeviceFields: NewDeviceFields(c), } + count, err := dcgm.GetAllDeviceCount() + if err != nil { + return nil, func() {}, err + } + + for i := uint(0); i < count; i++ { + deviceInfo, err := dcgm.GetDeviceInfo(i) + if err != nil { + return nil, func() {}, err + } + collector.DeviceList = append(collector.DeviceList, deviceInfo) + } + cleanups, err := SetupDcgmFieldsWatch(collector.DeviceFields) if err != nil { return nil, func() {}, err diff --git a/pkg/kubernetes.go b/pkg/kubernetes.go index 2ce9052..e442d3d 100644 --- a/pkg/kubernetes.go +++ b/pkg/kubernetes.go @@ -47,6 +47,10 @@ func (p *PodMapper) Name() string { return "podMapper" } +func (p *PodMapper) K8sProcess() ([]ContainerMetric, error) { + return nil, nil +} + func (p *PodMapper) Process(metrics [][]Metric) error { _, err := os.Stat(socketPath) if os.IsNotExist(err) { diff --git a/pkg/pipeline.go b/pkg/pipeline.go index 336680b..7fea0e8 100644 --- a/pkg/pipeline.go +++ b/pkg/pipeline.go @@ -48,6 +48,8 @@ func NewMetricsPipeline(c *Config) (*MetricsPipeline, func(), error) { if c.Kubernetes { transformations = append(transformations, NewPodMapper(c)) } + // TODO: kxie + transformations = append(transformations, NewCgroupMapper(gpuCollector.DeviceList)) return &MetricsPipeline{ config: c, @@ -116,11 +118,18 @@ func (m *MetricsPipeline) run() (string, error) { return "", fmt.Errorf("Failed to collect metrics with error: %v", err) } + var container []ContainerMetric + for _, transform := range m.transformations { err := transform.Process(metrics) if err != nil { return "", fmt.Errorf("Failed to transform metrics for transorm %s: %v", err, transform.Name()) } + + container, err = transform.K8sProcess() + if err != nil { + return "", fmt.Errorf("Failed to transform metrics for transorm %s: %v", err, transform.Name()) + } } formated, err := FormatMetrics(m.countersText, m.metricsFormat, metrics) @@ -128,7 +137,15 @@ func (m *MetricsPipeline) run() (string, error) { return "", fmt.Errorf("Failed to format metrics with error: %v", err) } - return formated, nil + sep := ` +` + + //TODO: kxie + containerFormat, err := FormatContainerMetrics(container) + if err != nil { + return "", fmt.Errorf("Failed to format container metrics with error: %v", err) + } + return formated + sep + containerFormat, nil } /* @@ -185,3 +202,39 @@ func FormatMetrics(countersText string, t *template.Template, m [][]Metric) (str return countersText + res.String(), nil } + +func FormatContainerMetrics(metrics []ContainerMetric) (string, error) { + format := `{{ .Name }}{gpu="{{.GPU}}",UUID="{{.GPUUUID}}",device="{{.GPUDevice}}",container="{{.Container}}",namespace="{{.Namespace}}",pod="{{.Pod}}"} {{.Value}} +` + t := template.Must(template.New("metrics").Parse(format)) + var res bytes.Buffer + // metrics = []ContainerMetric{ + // ContainerMetric{ + // Name: "DCGM_FI_DEV_SM_CLOCK", + // Value: "1515", + // GPU: "kxie", + // GPUUUID: "GPU-de4b1bb0-3ec3-67ed-b3e2-c32d8546e818", + // GPUDevice: "nvidia0", + // Namespace: "default", + // Pod: "nbody-pod", + // Container: "nbody", + // }, + + // ContainerMetric{ + // Name: "DCGM_FI_DEV_GPU_UTIL", + // Value: "100", + // GPU: "kxie", + // GPUUUID: "GPU-de4b1bb0-3ec3-67ed-b3e2-c32d8546e818", + // GPUDevice: "nvidia0", + // Namespace: "default", + // Pod: "nbody-pod", + // Container: "nbody", + // }, + // } + for _, m := range metrics { + if err := t.Execute(&res, m); err != nil { + return "", err + } + } + return res.String(), nil +} diff --git a/pkg/types.go b/pkg/types.go index c849f0b..3bebca0 100644 --- a/pkg/types.go +++ b/pkg/types.go @@ -55,6 +55,7 @@ type Config struct { type Transform interface { Process(metrics [][]Metric) error + K8sProcess() ([]ContainerMetric, error) Name() string } @@ -71,6 +72,7 @@ type MetricsPipeline struct { type DCGMCollector struct { Counters []Counter DeviceFields []dcgm.Short + DeviceList []dcgm.Device Cleanups []func() } @@ -92,6 +94,19 @@ type Metric struct { Attributes map[string]string } +type ContainerMetric struct { + Name string + Value string + + GPU string + GPUUUID string + GPUDevice string + + Namespace string + Pod string + Container string +} + func (m Metric) getIDOfType(idType KubernetesGPUIDType) (string, error) { switch idType { case GPUUID: @@ -126,3 +141,7 @@ type PodInfo struct { Namespace string Container string } + +type CgroupMapper struct { + DeviceList []dcgm.Device +}