You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

192 lines
5.5 KiB

package service
import (
"context"
"fmt"
"sort"
"strconv"
"vgpu/internal/biz"
"vgpu/internal/database"
"github.com/jinzhu/copier"
pb "vgpu/api/v1"
"github.com/gookit/goutil/arrutil"
)
type NodeService struct {
pb.UnimplementedNodeServer
uc *biz.NodeUsecase
pod *biz.PodUseCase
summary *biz.SummaryUseCase
ms *MonitorService
}
func NewNodeService(uc *biz.NodeUsecase, pod *biz.PodUseCase, summary *biz.SummaryUseCase, ms *MonitorService) *NodeService {
return &NodeService{uc: uc, pod: pod, summary: summary, ms: ms}
}
func (s *NodeService) GetSummary(ctx context.Context, req *pb.GetSummaryReq) (*pb.DeviceSummaryReply, error) {
filters := req.Filters
var res = &pb.DeviceSummaryReply{}
t, err := s.summary.GetGPUSummary(ctx, filters.DeviceId, filters.NodeUid, filters.Type)
copier.Copy(&res, &t)
return res, err
}
func (s *NodeService) GetAllNodes(ctx context.Context, req *pb.GetAllNodesReq) (*pb.NodesReply, error) {
filters := req.Filters
nodes, err := s.uc.ListAllNodes(ctx)
if err != nil {
return nil, err
}
var res = &pb.NodesReply{List: []*pb.NodeReply{}}
for _, node := range nodes {
nodeReply, err := s.buildNodeReply(ctx, node)
if err != nil {
return nil, err
}
resourcePoolNames, err := database.QueryResourceNamesByIp(node.IP)
if err != nil {
return nil, err
}
nodeReply.ResourcePools = resourcePoolNames
//coreTotal, memoryTotal, err := s.queryNodeMetrics(ctx, node.Name)
//if err == nil {
// nodeReply.CoreTotal = int64(coreTotal)
// nodeReply.MemoryTotal = int64(memoryTotal)
//}
if filters.Ip != "" && filters.Ip != nodeReply.Ip {
continue
}
if filters.Type != "" && !arrutil.InStrings(filters.Type, nodeReply.Type) {
continue
}
result, err := strconv.ParseBool(filters.IsSchedulable)
if err == nil {
if result != nodeReply.IsSchedulable {
continue
}
}
res.List = append(res.List, nodeReply)
}
sort.SliceStable(res.List, func(i, j int) bool {
return res.List[i].Name < res.List[j].Name
})
return res, nil
}
func (s *NodeService) GetNode(ctx context.Context, req *pb.GetNodeReq) (*pb.NodeReply, error) {
node, err := s.uc.GetNode(ctx, req.Uid)
if err != nil {
return nil, err
}
return s.buildNodeReply(ctx, node)
}
func (s *NodeService) UpdateNodeStatus(ctx context.Context, req *pb.UpdateNodeStatusRequest) (*pb.UpdateNodeStatusResponse, error) {
nodeName := req.NodeName
if req.Status == "DISABLED" {
err := s.uc.DisableNode(ctx, nodeName)
if err != nil {
return &pb.UpdateNodeStatusResponse{Code: 500, Message: "禁用失败"}, err
}
} else {
err := s.uc.EnableNode(ctx, nodeName)
if err != nil {
return &pb.UpdateNodeStatusResponse{Code: 500, Message: "启用失败"}, nil
}
}
return &pb.UpdateNodeStatusResponse{Code: 200, Message: "成功"}, nil
}
func (s *NodeService) DiscoveredNode(ctx context.Context, req *pb.DiscoveredNodeRequest) (*pb.DiscoveredNodeResponse, error) {
nodes, err := s.uc.DiscoveredNode()
if err != nil {
return nil, err
}
var list []*pb.DiscoveredNodeInfo
for _, value := range nodes {
list = append(list, &pb.DiscoveredNodeInfo{NodeIp: value.NodeIp, NodeName: value.NodeName})
}
return &pb.DiscoveredNodeResponse{List: list}, nil
}
func (s *NodeService) JoinNode(ctx context.Context, req *pb.JoinNodeRequest) (*pb.JoinNodeResponse, error) {
err := s.uc.JoinNode(req.NodeNames)
if err != nil {
return &pb.JoinNodeResponse{Code: 500, Message: err.Error()}, err
}
return &pb.JoinNodeResponse{Code: 200, Message: "成功"}, nil
}
func (s *NodeService) buildNodeReply(ctx context.Context, node *biz.Node) (*pb.NodeReply, error) {
nodeReply := &pb.NodeReply{
Name: node.Name,
Uid: node.Uid,
Ip: node.IP,
IsSchedulable: node.IsSchedulable,
IsReady: node.IsReady,
OsImage: node.OSImage,
OperatingSystem: node.OperatingSystem,
KernelVersion: node.KernelVersion,
ContainerRuntimeVersion: node.ContainerRuntimeVersion,
KubeletVersion: node.KubeletVersion,
KubeProxyVersion: node.KubeProxyVersion,
Architecture: node.Architecture,
CreationTimestamp: node.CreationTimestamp,
DiskSize: node.DiskTotal,
CpuCores: node.CPUCores,
TotalMemory: node.TotalMemory,
}
for _, device := range node.Devices {
nodeReply.Type = append(nodeReply.Type, device.Type)
nodeReply.VgpuTotal += device.Count
nodeReply.CoreTotal += device.Devcore
nodeReply.MemoryTotal += device.Devmem
vGPU, core, memory, err := s.pod.StatisticsByDeviceId(ctx, device.AliasId)
if err == nil {
nodeReply.VgpuUsed += vGPU
nodeReply.CoreUsed += core
nodeReply.MemoryUsed += memory
}
}
nodeReply.Type = arrutil.Unique(nodeReply.Type)
nodeReply.CardCnt = int32(len(node.Devices))
return nodeReply, nil
}
func (s *NodeService) queryNodeMetrics(ctx context.Context, nodeName string) (int32, int32, error) {
coreTotal, memoryTotal := int32(0), int32(0)
resp, err := s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(sum(hami_core_size{node=~\"%s\"}) by (instance))", nodeName)})
if err == nil && len(resp.Data) > 0 {
coreTotal = int32(resp.Data[0].Value)
}
resp, err = s.ms.QueryInstant(ctx, &pb.QueryInstantRequest{Query: fmt.Sprintf("avg(sum(hami_memory_size{node=~\"%s\"}) by (instance))", nodeName)})
if err == nil && len(resp.Data) > 0 {
memoryTotal = int32(resp.Data[0].Value)
}
return coreTotal, memoryTotal, nil
}