You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

354 lines
9.9 KiB

package util
import (
"encoding/json"
"errors"
"fmt"
"strconv"
"strings"
"github.com/go-kratos/kratos/v2/log"
corev1 "k8s.io/api/core/v1"
)
const (
// OneContainerMultiDeviceSplitSymbol this is when one container use multi device, use : symbol to join device info.
OneContainerMultiDeviceSplitSymbol = ":"
// OnePodMultiContainerSplitSymbol this is when one pod having multi container and more than one container use device, use ; symbol to join device info.
OnePodMultiContainerSplitSymbol = ";"
NvidiaGPUDevice = "NVIDIA"
AscendGPUDevice = "Ascend"
Ascend310PGPUDevice = "Ascend310P"
HygonGPUDevice = "DCU"
CambriconGPUDevice = "MLU"
DsmluProfileAndInstance = "CAMBRICON_DSMLU_PROFILE_INSTANCE"
NVIDIAPriority = "nvidia.com/priority"
)
type ascendDeviceConfig struct {
Usedmem int32
Usedcores int32
}
var (
InRequestDevices map[string]string
SupportDevices map[string]string
ascendDeviceConfigs map[string]map[int32]ascendDeviceConfig
)
func init() {
InRequestDevices = make(map[string]string)
SupportDevices = make(map[string]string)
ascendDeviceConfigs = map[string]map[int32]ascendDeviceConfig{
"Ascend910B": {
16384: {Usedmem: 16384, Usedcores: 25},
32768: {Usedmem: 32768, Usedcores: 50},
},
"Ascend310P": {
3072: {Usedmem: 3072, Usedcores: 13},
6144: {Usedmem: 6144, Usedcores: 25},
12288: {Usedmem: 12288, Usedcores: 50},
},
}
initMLUDevice()
}
func initMLUDevice() {
}
func EncodeNodeDevices(dlist []*DeviceInfo, log *log.Helper) string {
tmp := ""
for _, val := range dlist {
tmp += val.ID + "," + strconv.FormatInt(int64(val.Count), 10) + "," + strconv.Itoa(int(val.Devmem)) + "," + strconv.Itoa(int(val.Devcore)) + "," + val.Type + "," + strconv.Itoa(val.Numa) + "," + strconv.FormatBool(val.Health) + OneContainerMultiDeviceSplitSymbol
}
log.Infof("Encoded node Devices: %s", tmp)
return tmp
}
// DecodeNodeDevices decodes the node devices from a string.
func DecodeNodeDevices(str string, log *log.Helper) ([]*DeviceInfo, error) {
if !strings.Contains(str, OneContainerMultiDeviceSplitSymbol) {
log.Warn("Node annotations not decode successfully")
return []*DeviceInfo{}, errors.New("node annotations not decode successfully")
}
tmp := strings.Split(str, OneContainerMultiDeviceSplitSymbol)
var retval []*DeviceInfo
for _, val := range tmp {
if strings.Contains(val, ",") {
items := strings.Split(val, ",")
if len(items) >= 7 || len(items) == 9 {
count, _ := strconv.ParseInt(items[1], 10, 32)
devmem, _ := strconv.ParseInt(items[2], 10, 32)
devcore, _ := strconv.ParseInt(items[3], 10, 32)
health, _ := strconv.ParseBool(items[6])
numa, _ := strconv.Atoi(items[5])
mode := "hami-core"
index := 0
if len(items) == 9 {
index, _ = strconv.Atoi(items[7])
mode = items[8]
}
i := DeviceInfo{
ID: items[0],
AliasId: items[0],
Count: int32(count),
Devmem: int32(devmem),
Devcore: int32(devcore),
Type: items[4],
Numa: numa,
Health: health,
Mode: mode,
Index: uint(index),
}
retval = append(retval, &i)
} else {
return []*DeviceInfo{}, errors.New("node annotations not decode successfully")
}
}
}
return retval, nil
}
// DecodeContainerDevices decodes the container devices from a string.
func DecodeContainerDevices(str, priority string) (ContainerDevices, error) {
if len(str) == 0 {
return ContainerDevices{}, nil
}
cd := strings.Split(str, OneContainerMultiDeviceSplitSymbol)
contdev := ContainerDevices{}
if len(str) == 0 {
return ContainerDevices{}, nil
}
for i, val := range cd {
if strings.Contains(val, ",") {
tmpstr := strings.Split(val, ",")
if len(tmpstr) < 4 {
return ContainerDevices{}, fmt.Errorf("pod annotation format error; information missing, please do not use nodeName field in task")
}
tmpdev := ContainerDevice{}
tmpdev.Idx = i
tmpdev.UUID = tmpstr[0]
tmpdev.Type = tmpstr[1]
devmem, _ := strconv.ParseInt(tmpstr[2], 10, 32)
tmpdev.Usedmem = int32(devmem)
devcores, _ := strconv.ParseInt(tmpstr[3], 10, 32)
if devcores == 0 {
tmpdev.Usedcores = 100
} else {
tmpdev.Usedcores = int32(devcores)
}
tmpdev.Priority = priority
contdev = append(contdev, tmpdev)
}
}
return contdev, nil
}
func DecodeDCUContainerDevices(str, priority, nodeName string) (ContainerDevices, error) {
if len(str) == 0 {
return ContainerDevices{}, nil
}
cd := strings.Split(str, OneContainerMultiDeviceSplitSymbol)
contdev := ContainerDevices{}
tmpdev := ContainerDevice{}
if len(str) == 0 {
return ContainerDevices{}, nil
}
for i, val := range cd {
if strings.Contains(val, ",") {
tmpstr := strings.Split(val, ",")
if len(tmpstr) < 4 {
return ContainerDevices{}, fmt.Errorf("pod annotation format error; information missing, please do not use nodeName field in task")
}
cardIdx := strings.Split(tmpstr[0], "-")
tmpdev.Idx = i
tmpdev.UUID = fmt.Sprintf("%s-dcu-%s", nodeName, cardIdx[1])
tmpdev.Type = tmpstr[1]
devmem, _ := strconv.ParseInt(tmpstr[2], 10, 32)
tmpdev.Usedmem = int32(devmem)
devcores, _ := strconv.ParseInt(tmpstr[3], 10, 32)
if devcores == 0 {
tmpdev.Usedcores = 100
} else {
tmpdev.Usedcores = int32(devcores)
}
tmpdev.Priority = priority
contdev = append(contdev, tmpdev)
}
}
return contdev, nil
}
func DecodeNpuContainerDevices(str string) (ContainerDevices, error) {
if len(str) == 0 {
return ContainerDevices{}, nil
}
cd := strings.Split(str, OneContainerMultiDeviceSplitSymbol)
contdev := ContainerDevices{}
tmpdev := ContainerDevice{}
if len(str) == 0 {
return ContainerDevices{}, nil
}
for i, val := range cd {
if strings.Contains(val, ",") {
tmpstr := strings.Split(val, ",")
if len(tmpstr) < 4 {
return ContainerDevices{}, fmt.Errorf("pod annotation format error; information missing, please do not use nodeName field in task")
}
tmpdev.Idx = i
tmpdev.UUID = tmpstr[0]
tmpdev.Type = tmpstr[1]
devmem, _ := strconv.ParseInt(tmpstr[2], 10, 32)
tmpdev.Usedmem = int32(devmem)
tmpdev.Usedcores = 100
if configs, exists := ascendDeviceConfigs[tmpdev.Type]; exists {
if config, ok := configs[tmpdev.Usedmem]; ok {
tmpdev.Usedcores = config.Usedcores
}
}
contdev = append(contdev, tmpdev)
}
}
return contdev, nil
}
// DecodeMLUContainerDevices decodes the mlu container devices from a string.
func DecodeMLUContainerDevices(str, nodeName string) (ContainerDevices, error) {
if len(str) == 0 {
return ContainerDevices{}, nil
}
contdev := ContainerDevices{}
tmpdev := ContainerDevice{}
if strings.Contains(str, "_") {
//fmt.Println("cd is ", val)
tmpstr := strings.Split(str, "_")
if len(tmpstr) < 3 {
return ContainerDevices{}, fmt.Errorf("pod annotation format error; information missing, please do not use nodeName field in task")
}
tmpdev.Type = "MLU"
devcores, _ := strconv.ParseInt(tmpstr[1], 10, 32)
devmem, _ := strconv.ParseInt(tmpstr[2], 10, 32)
tmpdev.Usedmem = int32(devmem) * 1024
index, _ := strconv.ParseInt(tmpstr[5], 10, 32)
tmpdev.Idx = int(index)
tmpdev.UUID = fmt.Sprintf("%s-cambricon-mlu-%d", nodeName, index)
if devcores == 0 {
tmpdev.Usedcores = 100
} else {
tmpdev.Usedcores = int32(devcores)
}
contdev = append(contdev, tmpdev)
}
return contdev, nil
}
func GetContainerPriorities(pod *corev1.Pod) []string {
var priorities []string
nvidiaPriority := corev1.ResourceName(NVIDIAPriority)
for _, ctr := range pod.Spec.Containers {
priority := ""
if limitPriority, ok := ctr.Resources.Limits[nvidiaPriority]; ok {
priority = limitPriority.String()
} else if requestPriority, ok := ctr.Resources.Requests[nvidiaPriority]; ok {
priority = requestPriority.String()
}
priorities = append(priorities, priority)
}
return priorities
}
func DecodePodDevices(pod *corev1.Pod, log *log.Helper) (PodDevices, error) {
checklist := SupportDevices
priorities := GetContainerPriorities(pod)
annos := pod.Annotations
if len(annos) == 0 {
return PodDevices{}, nil
}
nodeName := annos[AssignedNodeAnnotations]
pd := make(PodDevices)
for devType, devs := range checklist {
str, ok := annos[devs]
if !ok {
continue
}
pd[devType] = make(PodSingleDevice, 0)
switch devType {
case AscendGPUDevice, Ascend310PGPUDevice:
for _, s := range strings.Split(str, OnePodMultiContainerSplitSymbol) {
cd, err := DecodeNpuContainerDevices(s)
if err != nil {
return PodDevices{}, nil
}
if len(cd) == 0 {
continue
}
pd[devType] = append(pd[devType], cd)
}
case CambriconGPUDevice:
instance := annos[DsmluProfileAndInstance]
cd, err := DecodeMLUContainerDevices(fmt.Sprintf("%s_%s", str, instance, nodeName), nodeName)
if err != nil {
return PodDevices{}, nil
}
if len(cd) == 0 {
continue
}
pd[devType] = append(pd[devType], cd)
case NvidiaGPUDevice:
for i, s := range strings.Split(str, OnePodMultiContainerSplitSymbol) {
if i >= len(pod.Spec.Containers) {
break
}
if s == "" {
pd[devType] = append(pd[devType], ContainerDevices{})
continue
}
cd, err := DecodeContainerDevices(s, priorities[i])
if err != nil {
return PodDevices{}, nil
}
if len(cd) == 0 {
continue
}
pd[devType] = append(pd[devType], cd)
}
case HygonGPUDevice:
for i, s := range strings.Split(str, OnePodMultiContainerSplitSymbol) {
if i >= len(pod.Spec.Containers) {
break
}
if s == "" {
pd[devType] = append(pd[devType], ContainerDevices{})
continue
}
cd, err := DecodeDCUContainerDevices(s, priorities[i], nodeName)
if err != nil {
return PodDevices{}, nil
}
if len(cd) == 0 {
continue
}
pd[devType] = append(pd[devType], cd)
}
}
}
log.Infof("Decoded pod annos: poddevices %v", pd)
return pd, nil
}
func UnMarshalNodeDevices(str string) ([]*DeviceInfo, error) {
var dlist []*DeviceInfo
err := json.Unmarshal([]byte(str), &dlist)
return dlist, err
}