You can not select more than 25 topics
Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
221 lines
5.9 KiB
221 lines
5.9 KiB
package data
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"sync"
|
|
"time"
|
|
"vgpu/internal/biz"
|
|
"vgpu/internal/provider/util"
|
|
|
|
"github.com/go-kratos/kratos/v2/log"
|
|
"github.com/jinzhu/copier"
|
|
corev1 "k8s.io/api/core/v1"
|
|
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
|
|
k8stypes "k8s.io/apimachinery/pkg/types"
|
|
"k8s.io/client-go/informers"
|
|
listerscorev1 "k8s.io/client-go/listers/core/v1"
|
|
"k8s.io/client-go/tools/cache"
|
|
)
|
|
|
|
type podRepo struct {
|
|
data *Data
|
|
podLister listerscorev1.PodLister
|
|
pods map[k8stypes.UID]*biz.PodInfo
|
|
allPods []*biz.PodInfo
|
|
mutex sync.RWMutex
|
|
log *log.Helper
|
|
}
|
|
|
|
func NewPodRepo(data *Data, logger log.Logger) biz.PodRepo {
|
|
repo := &podRepo{
|
|
data: data,
|
|
pods: make(map[k8stypes.UID]*biz.PodInfo),
|
|
allPods: []*biz.PodInfo{},
|
|
log: log.NewHelper(logger),
|
|
}
|
|
repo.init()
|
|
return repo
|
|
}
|
|
|
|
func (r *podRepo) init() {
|
|
informerFactory := informers.NewSharedInformerFactoryWithOptions(r.data.k8sCl, time.Hour*1)
|
|
r.podLister = informerFactory.Core().V1().Pods().Lister()
|
|
informer := informerFactory.Core().V1().Pods().Informer()
|
|
informer.AddEventHandler(cache.ResourceEventHandlerFuncs{
|
|
AddFunc: r.onAddPod,
|
|
UpdateFunc: r.onUpdatePod,
|
|
DeleteFunc: r.onDeletedPod,
|
|
})
|
|
stopCh := make(chan struct{})
|
|
informerFactory.Start(stopCh)
|
|
informerFactory.WaitForCacheSync(stopCh)
|
|
}
|
|
|
|
func (r *podRepo) onAddPod(obj interface{}) {
|
|
pod, ok := obj.(*corev1.Pod)
|
|
if !ok {
|
|
r.log.Error("unknown add object type")
|
|
return
|
|
}
|
|
nodeID, ok := pod.Annotations[util.AssignedNodeAnnotations]
|
|
tpiID := pod.Labels["tpi-id"]
|
|
if !ok && tpiID == "" {
|
|
return
|
|
}
|
|
if biz.IsPodInTerminatedState(pod) {
|
|
r.delPod(pod)
|
|
return
|
|
}
|
|
bizPodDev := biz.PodDevices{}
|
|
podDev, _ := util.DecodePodDevices(pod, r.log)
|
|
copier.Copy(&bizPodDev, podDev)
|
|
r.addPod(pod, nodeID, bizPodDev)
|
|
}
|
|
|
|
func (r *podRepo) onUpdatePod(_ interface{}, new interface{}) {
|
|
r.onAddPod(new)
|
|
}
|
|
|
|
func (r *podRepo) onDeletedPod(obj interface{}) {
|
|
pod, ok := obj.(*corev1.Pod)
|
|
if !ok {
|
|
r.log.Error("unknown add object type")
|
|
return
|
|
}
|
|
_, ok = pod.Annotations[util.AssignedNodeAnnotations]
|
|
tpiID := pod.Labels["tpi-id"]
|
|
if !ok && tpiID == "" {
|
|
return
|
|
}
|
|
r.delPod(pod)
|
|
}
|
|
|
|
func (r *podRepo) addPod(pod *corev1.Pod, nodeID string, devices biz.PodDevices) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
ctrs := r.fetchContainerInfo(pod)
|
|
|
|
pi := &biz.PodInfo{Name: pod.Name, UID: pod.UID, Namespace: pod.Namespace, NodeID: nodeID, Devices: devices, Ctrs: ctrs, Labels: pod.Labels}
|
|
r.pods[pod.UID] = pi
|
|
r.allPods = append(r.allPods, pi)
|
|
r.log.Infof("Pod added: Name: %s, UID: %s, Namespace: %s, NodeID: %s", pod.Name, pod.UID, pod.Namespace, nodeID)
|
|
}
|
|
|
|
func (r *podRepo) delPod(pod *corev1.Pod) {
|
|
r.mutex.Lock()
|
|
defer r.mutex.Unlock()
|
|
pi, ok := r.pods[pod.UID]
|
|
if ok {
|
|
r.log.Infof("Deleted pod %s with node ID %s", pi.Name, pi.NodeID)
|
|
delete(r.pods, pod.UID)
|
|
}
|
|
}
|
|
|
|
func (r *podRepo) fetchContainerInfo(pod *corev1.Pod) []*biz.Container {
|
|
containers := []*biz.Container{}
|
|
pdevices, err := util.DecodePodDevices(pod, r.log)
|
|
if err != nil {
|
|
return containers
|
|
}
|
|
bizContainerDevices := []biz.ContainerDevices{}
|
|
for _, pds := range pdevices {
|
|
copier.Copy(&bizContainerDevices, pds)
|
|
}
|
|
if len(bizContainerDevices) < 1 {
|
|
for range pod.Spec.Containers {
|
|
bizContainerDevices = append(bizContainerDevices, biz.ContainerDevices{})
|
|
}
|
|
//return containers
|
|
}
|
|
|
|
ctrIdMaps := map[string]string{}
|
|
containerStat := map[string]string{}
|
|
for _, ctr := range pod.Status.ContainerStatuses {
|
|
ctrIdMaps[ctr.Name] = ctr.ContainerID
|
|
containerStat[ctr.Name] = biz.ContainerStatusUnknown
|
|
if pod.Status.Phase == corev1.PodRunning && ctr.Ready {
|
|
containerStat[ctr.Name] = biz.ContainerStatusSuccess
|
|
} else if pod.Status.Phase == corev1.PodFailed {
|
|
containerStat[ctr.Name] = biz.ContainerStatusFailed
|
|
}
|
|
}
|
|
|
|
for i, ctr := range pod.Spec.Containers {
|
|
c := &biz.Container{
|
|
Name: ctr.Name,
|
|
UUID: ctrIdMaps[ctr.Name],
|
|
ContainerIdx: i,
|
|
NodeName: pod.Spec.NodeName,
|
|
PodName: pod.Name,
|
|
PodUID: string(pod.UID),
|
|
Status: containerStat[ctr.Name],
|
|
NodeUID: r.GetNodeUUID(pod),
|
|
Namespace: pod.Namespace,
|
|
CreateTime: r.GetCreateTime(pod),
|
|
ContainerDevices: bizContainerDevices[i],
|
|
RequestedCpuCores: float32(ctr.Resources.Requests.Cpu().MilliValue()) / 1000,
|
|
RequestedMemory: ctr.Resources.Requests.Memory().Value(),
|
|
}
|
|
if len(bizContainerDevices[i]) > 0 {
|
|
c.Priority = bizContainerDevices[i][0].Priority
|
|
}
|
|
containers = append(containers, c)
|
|
}
|
|
return containers
|
|
}
|
|
|
|
func (r *podRepo) GetNodeUUID(pod *corev1.Pod) string {
|
|
node, err := r.data.k8sCl.CoreV1().Nodes().Get(context.Background(), pod.Spec.NodeName, metav1.GetOptions{})
|
|
if err != nil {
|
|
//p.log.Warnf("Error getting node: %s", err)
|
|
return ""
|
|
} else {
|
|
return string(node.UID)
|
|
}
|
|
}
|
|
|
|
func (r *podRepo) GetCreateTime(pod *corev1.Pod) time.Time {
|
|
for _, status := range pod.Status.Conditions {
|
|
if status.Type == corev1.PodScheduled {
|
|
return status.LastTransitionTime.Time
|
|
}
|
|
}
|
|
return time.Now()
|
|
}
|
|
|
|
func (r *podRepo) GetStartTime(pod *corev1.Pod) time.Time {
|
|
for _, status := range pod.Status.Conditions {
|
|
if status.Type == corev1.PodInitialized {
|
|
return status.LastTransitionTime.Time
|
|
}
|
|
}
|
|
return time.Now()
|
|
}
|
|
|
|
func (r *podRepo) ListAll(context.Context) ([]*biz.Container, error) {
|
|
var containerList []*biz.Container
|
|
for _, pod := range r.pods {
|
|
TpiID := pod.Labels["tpi-id"]
|
|
for _, container := range pod.Ctrs {
|
|
container.TpiID = TpiID
|
|
containerList = append(containerList, container)
|
|
}
|
|
}
|
|
return containerList, nil
|
|
}
|
|
|
|
func (r *podRepo) FindOne(_ context.Context, podUID string, name string) (*biz.Container, error) {
|
|
if podUID == "" || name == "" {
|
|
return nil, fmt.Errorf("podUID or name is empty")
|
|
}
|
|
|
|
for _, container := range r.pods[k8stypes.UID(podUID)].Ctrs {
|
|
if container.Name == name {
|
|
return container, nil
|
|
}
|
|
}
|
|
|
|
return nil, fmt.Errorf("not found")
|
|
}
|