You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

494 lines
19 KiB

package exporter
import (
"context"
"errors"
"fmt"
"math"
"strings"
"time"
pb "vgpu/api/v1"
"vgpu/internal/biz"
"vgpu/internal/data/prom"
"vgpu/internal/provider/mlu"
"vgpu/internal/service"
"github.com/google/wire"
)
// ProviderSet is service providers.
var ProviderSet = wire.NewSet(
NewMetricsGenerator,
)
type MetricsGenerator struct {
promClient *prom.Client
nodeUsecase *biz.NodeUsecase
podUsecase *biz.PodUseCase
monitorService *service.MonitorService
cacheTime time.Time
}
// roundToTwoDecimal 将浮点数保留两位小数
func roundToTwoDecimal(value float64) float64 {
return float64(math.Round(value*100) / 100)
}
// roundToOneDecimal 将浮点数保留一位小数并进行四舍五入
func roundToOneDecimal(value float64) float64 {
return math.Round(value*10) / 10
}
func NewMetricsGenerator(
promClient *prom.Client,
nodeUsecase *biz.NodeUsecase,
podUsecase *biz.PodUseCase,
monitorService *service.MonitorService,
) *MetricsGenerator {
return &MetricsGenerator{
promClient: promClient,
nodeUsecase: nodeUsecase,
podUsecase: podUsecase,
monitorService: monitorService,
}
}
func (s *MetricsGenerator) generatorCache() time.Time {
now := time.Now()
return time.Date(now.Year(), now.Month(), now.Day(), now.Hour(), now.Minute(), 0, 0, now.Location())
}
func (s *MetricsGenerator) cacheIsValidate() bool {
if s.cacheTime == s.generatorCache() {
return true
}
return false
}
func (s *MetricsGenerator) GenerateMetrics(ctx context.Context) error {
//if s.cacheIsValidate() {
// return nil
//}
reset() // 重置所有指标缓存值
s.GenerateDeviceMetrics(ctx) // 卡维度指标
s.GenerateContainerMetrics(ctx) // 任务维度指标
s.cacheTime = s.generatorCache()
return nil
}
// 卡维度指标
func (s *MetricsGenerator) GenerateDeviceMetrics(ctx context.Context) error {
deviceInfos, err := s.nodeUsecase.ListAllDevices(ctx)
if err != nil {
return err
}
for _, device := range deviceInfos {
provider := device.Provider
// 查询device的驱动版本以及设备号
deviceAdditional, err := s.queryDeviceAdditional(ctx, provider, device.Id)
var driver, deviceNo = "", ""
if err == nil && deviceAdditional != nil {
driver = deviceAdditional.DriverVersion
deviceNo = deviceAdditional.DeviceNo
}
// 分配率指标
HamiVgpuCount.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Count))
HamiVmemorySize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devmem))
HamiVcoreSize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devcore))
// 超配比指标
HamiVCoreScaling.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devcore) / 100)
HamiCoreSize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(100)
deviceMemUsed, err := s.deviceMemUsed(ctx, provider, device.Id)
if err == nil {
HamiMemoryUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(deviceMemUsed))
}
HamiMemorySize.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(device.Devmem))
HamiMemoryUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(roundToOneDecimal(100 * float64(deviceMemUsed/float32(device.Devmem))))
deviceMemSize, err := s.deviceMemTotal(ctx, provider, device.Id)
if err == nil && deviceMemSize > 0 {
HamiVMemoryScaling.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(roundToOneDecimal(float64(float32(device.Devmem) / deviceMemSize)))
}
actualCoreUtil, err := s.deviceCoreUtil(ctx, provider, device.Id)
if err == nil {
HamiCoreUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtil))
HamiCoreUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtil))
}
actualCoreUtilAvg, err := s.deviceCoreUtil(ctx, provider, device.Id)
if err == nil {
HamiCoreUsedAvg.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtilAvg))
HamiCoreUtilAvg.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(actualCoreUtilAvg))
}
gpuTemperature, err := s.gpuTemperature(ctx, provider, device.Id)
if err == nil {
HamiDeviceTemperature.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(gpuTemperature))
}
memoryTemperature, err := s.memoryTemperature(ctx, provider, device.Id)
if err == nil {
HamiDeviceMemoryTemperature.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(memoryTemperature))
}
gpuPower, err := s.gpuPower(ctx, provider, device.Id)
if err == nil {
HamiDevicePower.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(gpuPower))
}
fanSpeed, err := s.fanSpeed(ctx, provider, device.Id)
if err == nil {
switch provider {
case biz.NvidiaGPUDevice:
HamiDeviceFanSpeedP.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(fanSpeed))
case biz.CambriconGPUDevice:
HamiDeviceFanSpeedR.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(fanSpeed))
}
}
gpuHardwareHealth, err := s.gpuHardwareHealth(ctx, provider, device.Id)
if err == nil {
HamiDeviceHardwareHealth.WithLabelValues(device.NodeName, provider, device.Type, device.Id, driver, deviceNo).Set(float64(gpuHardwareHealth))
}
}
return nil
}
// 任务维度指标
func (s *MetricsGenerator) GenerateContainerMetrics(ctx context.Context) error {
deviceInfos, err := s.nodeUsecase.ListAllDevices(ctx)
if err != nil {
return err
}
containers, err := s.podUsecase.ListAll(ctx)
if err != nil {
return err
}
for _, device := range deviceInfos {
for _, c := range containers {
var vGPU int32 = 0
var core int32 = 0
var memory int32 = 0
var provider string = ""
for _, cd := range c.ContainerDevices {
if device.AliasId != "" && !strings.HasPrefix(cd.UUID, device.AliasId) {
continue
}
vGPU = vGPU + 1
core = core + cd.Usedcores
memory = memory + cd.Usedmem
provider = cd.Type
}
if provider == "" {
continue
}
HamiContainerVgpuAllocated.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(vGPU))
HamiContainerVmemoryAllocated.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(memory))
HamiContainerVcoreAllocated.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace, fmt.Sprintf("%s:%s", c.Name, c.PodUID)).Set(float64(core))
// 查询任务在当前设备下的算力利用率
taskCoreUsed, err := s.taskCoreUsed(ctx, provider, c.Namespace, c.PodName, c.Name, c.PodUID, device.Id)
if err == nil {
used := float64(0)
util := float64(0)
switch provider {
case biz.NvidiaGPUDevice:
used = float64(taskCoreUsed)
util = roundToOneDecimal(100 * float64(taskCoreUsed) / float64(core))
case biz.CambriconGPUDevice:
used = float64(taskCoreUsed) / 100 * float64(core)
util = float64(taskCoreUsed)
case biz.HygonGPUDevice:
used = float64(taskCoreUsed)
util = roundToOneDecimal(100 * float64(taskCoreUsed) / float64(core))
default:
}
cardCoreUtil, err := s.deviceCoreUtil(ctx, provider, device.Id)
if err == nil && used != 0 && cardCoreUtil > 95 {
used = float64(cardCoreUtil) / 100 * float64(core)
util = float64(cardCoreUtil)
}
HamiContainerCoreUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(used)
HamiContainerCoreUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(util)
}
taskMemoryUsed, err := s.taskMemoryUsed(ctx, provider, c.Namespace, c.PodName, c.Name, c.PodUID, device.Id)
if err == nil {
switch provider {
case biz.CambriconGPUDevice:
taskMemoryUsed = float32((taskMemoryUsed/100)*float32(memory)) * 1024 * 1024
default:
}
HamiContainerMemoryUsed.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(float64(taskMemoryUsed / 1024 / 1024))
HamiContainerMemoryUtil.WithLabelValues(device.NodeName, provider, device.Type, device.Id, c.PodName, c.Name, c.Namespace).Set(roundToOneDecimal(100 * float64(taskMemoryUsed/1024/1024) / float64(memory)))
}
}
}
return nil
}
func (s *MetricsGenerator) queryInstantVal(ctx context.Context, query string) (float32, error) {
res, err := s.monitorService.QueryInstant(context.TODO(), &pb.QueryInstantRequest{
Query: query,
})
if err != nil {
return 0, err
}
if len(res.Data) > 0 {
return res.Data[0].Value, nil
}
return 0, nil
}
// 卡显存已使用量
func (s *MetricsGenerator) deviceMemUsed(ctx context.Context, provider, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
query = fmt.Sprintf("avg(DCGM_FI_DEV_FB_USED{UUID=\"%s\"})", deviceUUID)
case biz.CambriconGPUDevice:
query = fmt.Sprintf("avg(mlu_memory_used{uuid=\"%s\"})", deviceUUID)
case biz.AscendGPUDevice:
query = fmt.Sprintf("avg(npu_chip_info_hbm_used_memory{vdie_id=\"%s\"})", deviceUUID)
case biz.HygonGPUDevice:
query = fmt.Sprintf("avg(dcu_usedmemory_bytes{device_id=\"%s\"})", deviceUUID)
default:
return 0, errors.New("provider not exists")
}
val, err := s.queryInstantVal(ctx, query)
if err != nil {
return val, err
}
if provider == biz.CambriconGPUDevice {
val = val / mlu.CambriconMemUnit
} else if provider == biz.HygonGPUDevice {
val = val / 1024 / 1024
}
return val, err
}
// 卡显存总量
func (s *MetricsGenerator) deviceMemTotal(ctx context.Context, provider, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
query = fmt.Sprintf("avg(DCGM_FI_DEV_FB_FREE{UUID=\"%s\"})+avg(DCGM_FI_DEV_FB_USED{UUID=\"%s\"})", deviceUUID, deviceUUID)
case biz.CambriconGPUDevice:
query = fmt.Sprintf("avg(mlu_memory_total{uuid=\"%s\"})", deviceUUID)
case biz.AscendGPUDevice:
query = fmt.Sprintf("avg(npu_chip_info_hbm_total_memory{vdie_id=\"%s\"})", deviceUUID)
case biz.HygonGPUDevice:
query = fmt.Sprintf("avg(dcu_memorycap_bytes{device_id=\"%s\"})", deviceUUID)
default:
return 0, errors.New("provider not exists")
}
val, err := s.queryInstantVal(ctx, query)
if err != nil {
return val, err
}
if provider == biz.CambriconGPUDevice {
val = val / 1024
} else if provider == biz.HygonGPUDevice {
val = val / 1024 / 1024
}
return val, err
}
// 卡算力利用率
func (s *MetricsGenerator) deviceCoreUtil(ctx context.Context, provider, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
//query = fmt.Sprintf("avg(avg_over_time(DCGM_FI_DEV_GPU_UTIL{UUID=\"%s\"}[1m]))", deviceUUID)
query = fmt.Sprintf("DCGM_FI_DEV_GPU_UTIL{UUID=\"%s\"}", deviceUUID)
//query = fmt.Sprintf("(%s * (sum_over_time(%s[5m:]) / count_over_time(( %s !=0)[5m:])) / %s) > 0 or %s", queryTemplate, queryTemplate, queryTemplate, queryTemplate, queryTemplate)
case biz.CambriconGPUDevice:
query = fmt.Sprintf("avg(mlu_utilization{uuid=\"%s\"})", deviceUUID)
case biz.AscendGPUDevice:
query = fmt.Sprintf("avg(npu_chip_info_utilization{vdie_id=\"%s\"})", deviceUUID)
case biz.HygonGPUDevice:
query = fmt.Sprintf("avg(dcu_utilizationrate{device_id=\"%s\"})", deviceUUID)
default:
return 0, errors.New("provider not exists")
}
return s.queryInstantVal(ctx, query)
}
// 任务算力利用率
func (s *MetricsGenerator) taskCoreUsed(ctx context.Context, provider, namespace, pod, container, podUUID, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
//query = fmt.Sprintf("avg(Device_utilization_desc_of_container{deviceuuid=\"%s\", podnamespace=\"%s\", podname=\"%s\", ctrname=\"%s\"})", deviceUUID, namespace, pod, container)
// queryTemplate := `last_over_time((Device_utilization_desc_of_container{deviceuuid="%s", podnamespace="%s", podname="%s", ctrname="%s"} != 0)[1m:])
//or
//last_over_time(Device_utilization_desc_of_container{deviceuuid="%s", podnamespace="%s", podname="%s", ctrname="%s"}[1m:])`
// query = fmt.Sprintf(queryTemplate, deviceUUID, namespace, pod, container, deviceUUID, namespace, pod, container)
queryTemplate := fmt.Sprintf("Device_utilization_desc_of_container{deviceuuid=\"%s\", podnamespace=\"%s\", podname=\"%s\", ctrname=\"%s\"}", deviceUUID, namespace, pod, container)
query = fmt.Sprintf("sum_over_time(%s[1m]) == 0 or (sum_over_time(%s[10m:]) / count_over_time(( %s !=0)[10m:])) ", queryTemplate, queryTemplate, queryTemplate)
//query = queryTemplate
case biz.CambriconGPUDevice:
query = fmt.Sprintf("avg(mlu_utilization * on(uuid) group_right mlu_container{namespace=\"%s\",pod=\"%s\",container=\"%s\",type=\"mlu370.smlu.vcore\"})", namespace, pod, container)
case biz.AscendGPUDevice:
return 0, nil
case biz.HygonGPUDevice:
query = fmt.Sprintf("avg(vdcu_percent{pod_uuid=\"%s\", container_name=\"%s\"})", podUUID, container)
default:
return 0, errors.New("provider not exists")
}
return s.queryInstantVal(ctx, query)
}
// 任务显存使用量
func (s *MetricsGenerator) taskMemoryUsed(ctx context.Context, provider, namespace, pod, container, podUUID, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
query = fmt.Sprintf("avg(vGPU_device_memory_usage_in_bytes{deviceuuid=\"%s\", podnamespace=\"%s\", podname=\"%s\", ctrname=\"%s\"})", deviceUUID, namespace, pod, container)
case biz.CambriconGPUDevice:
query = fmt.Sprintf("avg(mlu_memory_utilization * on(uuid) group_right mlu_container{namespace=\"%s\",pod=\"%s\",container=\"%s\",type=\"mlu370.smlu.vmemory\"})", namespace, pod, container)
case biz.AscendGPUDevice:
return 0, nil
case biz.HygonGPUDevice:
query = fmt.Sprintf("avg(vdcu_usage_memory_size{pod_uuid=\"%s\", container_name=\"%s\"})", podUUID, container)
default:
return 0, errors.New("provider not exists")
}
return s.queryInstantVal(ctx, query)
}
// GPU温度
func (s *MetricsGenerator) gpuTemperature(ctx context.Context, provider, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
query = fmt.Sprintf("avg(DCGM_FI_DEV_GPU_TEMP{UUID=\"%s\"})", deviceUUID)
case biz.CambriconGPUDevice:
query = fmt.Sprintf("avg(mlu_temperature{uuid=\"%s\"})", deviceUUID)
case biz.AscendGPUDevice:
query = fmt.Sprintf("avg(npu_chip_info_temperature{vdie_id=\"%s\"})", deviceUUID)
case biz.HygonGPUDevice:
query = fmt.Sprintf("avg(dcu_temp{device_id=\"%s\"})", deviceUUID)
default:
return 0, errors.New("provider not exists")
}
return s.queryInstantVal(ctx, query)
}
// 显存温度
func (s *MetricsGenerator) memoryTemperature(ctx context.Context, provider, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
query = fmt.Sprintf("avg(DCGM_FI_DEV_MEMORY_TEMP{UUID=\"%s\"})", deviceUUID)
case biz.CambriconGPUDevice:
query = fmt.Sprintf("avg(mlu_memory_temperature{uuid=\"%s\"})", deviceUUID)
case biz.AscendGPUDevice:
query = fmt.Sprintf("avg(npu_chip_info_temperature{vdie_id=\"%s\"})", deviceUUID)
default:
return 0, errors.New("provider not exists")
}
return s.queryInstantVal(ctx, query)
}
// 功耗
func (s *MetricsGenerator) gpuPower(ctx context.Context, provider, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
query = fmt.Sprintf("avg(DCGM_FI_DEV_POWER_USAGE{UUID=\"%s\"})", deviceUUID)
case biz.CambriconGPUDevice:
query = fmt.Sprintf("avg(mlu_power_usage{uuid=\"%s\"})", deviceUUID)
case biz.AscendGPUDevice:
query = fmt.Sprintf("avg(npu_chip_info_power{vdie_id=\"%s\"})", deviceUUID)
case biz.HygonGPUDevice:
query = fmt.Sprintf("avg(dcu_power_usage{device_id=\"%s\"})", deviceUUID)
default:
return 0, errors.New("provider not exists")
}
return s.queryInstantVal(ctx, query)
}
// 硬件健康
func (s *MetricsGenerator) gpuHardwareHealth(ctx context.Context, provider, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
query = fmt.Sprintf("avg(DCGM_FI_DEV_XID_ERRORS{UUID=\"%s\"})", deviceUUID)
default:
return 0, nil
}
return s.queryInstantVal(ctx, query)
}
// 风扇转速
func (s *MetricsGenerator) fanSpeed(ctx context.Context, provider, deviceUUID string) (float32, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
query = fmt.Sprintf("avg(DCGM_FI_DEV_FAN_SPEED{UUID=\"%s\"})", deviceUUID)
case biz.CambriconGPUDevice:
query = fmt.Sprintf("avg(mlu_fan_speed{uuid=\"%s\"})", deviceUUID)
case biz.AscendGPUDevice:
query = fmt.Sprintf("avg(npu_chip_link_speed{vdie_id=\"%s\"})", deviceUUID)
default:
return 0, errors.New("provider not exists")
}
return s.queryInstantVal(ctx, query)
}
type DeviceAdditionalInfo struct {
DriverVersion string // 驱动版本
DeviceNo string // 设备号
}
// 设备的驱动版本、设备号等信息
func (s *MetricsGenerator) queryDeviceAdditional(ctx context.Context, provider, deviceUUID string) (*DeviceAdditionalInfo, error) {
query := ""
switch provider {
case biz.NvidiaGPUDevice:
query = fmt.Sprintf("DCGM_FI_DEV_POWER_USAGE{UUID=\"%s\"}", deviceUUID)
case biz.AscendGPUDevice:
query = fmt.Sprintf("npu_chip_info_power{vdie_id=\"%s\"}", deviceUUID)
case biz.CambriconGPUDevice:
query = fmt.Sprintf("mlu_power_usage{uuid=\"%s\"}", deviceUUID)
case biz.HygonGPUDevice:
query = fmt.Sprintf("dcu_power_usage{device_id=\"%s\"}", deviceUUID)
default:
return nil, errors.New("provider not exists")
}
res, err := s.monitorService.QueryInstant(context.TODO(), &pb.QueryInstantRequest{
Query: query,
})
if err != nil {
return nil, err
}
if len(res.Data) > 0 {
sample := res.Data[0]
metric := sample.Metric
info := &DeviceAdditionalInfo{}
switch provider {
case biz.NvidiaGPUDevice:
info.DriverVersion = metric["DCGM_FI_DRIVER_VERSION"]
info.DeviceNo = metric["device"]
case biz.CambriconGPUDevice:
info.DriverVersion = metric["driver"]
info.DeviceNo = metric["sn"]
case biz.AscendGPUDevice:
info.DriverVersion = "暂无"
info.DeviceNo = "ascend-" + metric["id"]
case biz.HygonGPUDevice:
info.DriverVersion = "暂无"
info.DeviceNo = "dcu-" + metric["minor_number"]
}
return info, nil
}
return nil, fmt.Errorf("unknown error")
}
func (s *MetricsGenerator) systemComponentHealth(ctx context.Context, componentType, componentName string) (float32, error) {
query := ""
switch componentType {
case biz.ComponentTypeDeployment:
query = fmt.Sprintf("ALERTS{alertname=\"KubeDeploymentReplicasMismatch\",deployment=\"%s\"} ", componentName)
case biz.ComponentTypeStatefulSet:
query = fmt.Sprintf("ALERTS{alertname=\"KubeStatefulSetReplicasMismatch\", statefulset=\"%s\"}", componentName)
case biz.ComponentTypeDaemonSet:
query = fmt.Sprintf("(kube_daemonset_status_desired_number_scheduled{daemonset=\"%s\"} > kube_daemonset_status_number_available{daemonset=\"%s\"})", componentName, componentName)
default:
return 0, errors.New("componentType not exists")
}
return s.queryInstantVal(ctx, query)
}