From ca77981ef412b0e4f650681bf6bc26e62b9da5b6 Mon Sep 17 00:00:00 2001 From: youys <1272586223@qq.com> Date: Mon, 4 Aug 2025 10:56:07 +0800 Subject: [PATCH] =?UTF-8?q?fix(backend):=20=E6=88=90=E5=8A=9Fcode=E8=BF=94?= =?UTF-8?q?=E5=9B=9E200=EF=BC=8C=E6=96=B0=E5=A2=9E=E7=A6=81=E7=94=A8?= =?UTF-8?q?=E5=90=AF=E7=94=A8=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- server/api/v1/node.proto | 20 ++ server/internal/biz/node.go | 10 + server/internal/data/node.go | 96 +++++++++ server/internal/service/node.go | 17 ++ server/internal/service/resource_pool.go | 24 +-- server/openapi.yaml | 239 +++++++++-------------- 6 files changed, 246 insertions(+), 160 deletions(-) diff --git a/server/api/v1/node.proto b/server/api/v1/node.proto index b57fbdb..f39f70d 100644 --- a/server/api/v1/node.proto +++ b/server/api/v1/node.proto @@ -36,6 +36,16 @@ service Node { summary: "节点详情"; }; } + + rpc UpdateNodeStatus (UpdateNodeStatusRequest) returns (UpdateNodeStatusResponse) { + option (google.api.http) = { + post: "/v1/node/status/update", + body: "*" + }; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + summary: "禁用启用节点"; + }; + } } message GetSummaryReq { @@ -100,3 +110,13 @@ message NodeReply { message NodesReply { repeated NodeReply list = 1; } + +message UpdateNodeStatusRequest { + string node_name = 1; + string status = 2; +} + +message UpdateNodeStatusResponse { + int32 code = 1; + string message = 2; +} \ No newline at end of file diff --git a/server/internal/biz/node.go b/server/internal/biz/node.go index aaae57b..7ab9e5b 100644 --- a/server/internal/biz/node.go +++ b/server/internal/biz/node.go @@ -58,6 +58,8 @@ type NodeRepo interface { GetNode(context.Context, string) (*Node, error) ListAllDevices(context.Context) ([]*DeviceInfo, error) FindDeviceByAliasId(string) (*DeviceInfo, error) + EnableNode(context.Context, string) error + DisableNode(context.Context, string) error } type NodeUsecase struct { @@ -88,3 +90,11 @@ func (uc *NodeUsecase) ListAllDevices(ctx context.Context) ([]*DeviceInfo, error func (uc *NodeUsecase) FindDeviceByAliasId(aliasId string) (*DeviceInfo, error) { return uc.repo.FindDeviceByAliasId(aliasId) } + +func (uc *NodeUsecase) EnableNode(ctx context.Context, nodeName string) error { + return uc.repo.EnableNode(ctx, nodeName) +} + +func (uc *NodeUsecase) DisableNode(ctx context.Context, nodeName string) error { + return uc.repo.DisableNode(ctx, nodeName) +} diff --git a/server/internal/data/node.go b/server/internal/data/node.go index 950cf7c..f716e17 100644 --- a/server/internal/data/node.go +++ b/server/internal/data/node.go @@ -6,6 +6,8 @@ import ( "fmt" "github.com/go-kratos/kratos/v2/log" corev1 "k8s.io/api/core/v1" + "k8s.io/api/policy/v1beta1" + metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/apimachinery/pkg/labels" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" @@ -239,3 +241,97 @@ func (r *nodeRepo) FindDeviceByAliasId(aliasId string) (*biz.DeviceInfo, error) } return nil, errors.New(fmt.Sprintf("aliasID:%s device not found", aliasId)) } + +// DisableNode 禁用节点(标记为不可调度并排空Pod) +func (r *nodeRepo) EnableNode(ctx context.Context, nodeName string) error { + // 1. 标记为可调度 + patch := []byte(`{"spec":{"unschedulable":false}}`) + _, err := r.data.k8sCl.CoreV1().Nodes().Patch( + ctx, + nodeName, + k8stypes.StrategicMergePatchType, + patch, + metav1.PatchOptions{}, + ) + if err != nil { + return fmt.Errorf("标记节点不可调度失败: %v", err) + } + + log.Infof("节点 %s 已恢复可调度状态", nodeName) + + //// 2. 驱逐 Pod + //if err := r.evictPodsOnNode(ctx, nodeName); err != nil { + // return fmt.Errorf("驱逐 Pod 失败: %v", err) + //} + return nil +} + +// DisableNode 禁用节点(标记为不可调度并排空Pod) +func (r *nodeRepo) DisableNode(ctx context.Context, nodeName string) error { + // 1. 标记为不可调度 + patch := []byte(`{"spec":{"unschedulable":true}}`) + _, err := r.data.k8sCl.CoreV1().Nodes().Patch( + ctx, + nodeName, + k8stypes.StrategicMergePatchType, + patch, + metav1.PatchOptions{}, + ) + if err != nil { + return fmt.Errorf("标记节点不可调度失败: %v", err) + } + log.Infof("节点 %s 已设置为不可调度状态", nodeName) + + //// 2. 驱逐 Pod + //if err := r.evictPodsOnNode(ctx, nodeName); err != nil { + // return fmt.Errorf("驱逐 Pod 失败: %v", err) + //} + return nil +} + +func (r *nodeRepo) evictPodsOnNode(ctx context.Context, nodeName string) error { + // 获取该节点上的 Pod 列表 + pods, err := r.data.k8sCl.CoreV1().Pods("").List(ctx, metav1.ListOptions{ + FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName), + }) + if err != nil { + return fmt.Errorf("列出节点上的 Pod 失败: %v", err) + } + + // 遍历驱逐 Pod + for _, pod := range pods.Items { + // 跳过 DaemonSet 和 Mirror Pods(它们不能被驱逐) + if isMirrorPod(&pod) || isDaemonSetPod(&pod) { + continue + } + + eviction := &v1beta1.Eviction{ + ObjectMeta: metav1.ObjectMeta{ + Name: pod.Name, + Namespace: pod.Namespace, + }, + } + + err := r.data.k8sCl.CoreV1().Pods(pod.Namespace).Evict(ctx, eviction) + if err != nil { + r.log.Warnf("驱逐 Pod %s/%s 失败: %v", pod.Namespace, pod.Name, err) + } else { + r.log.Infof("已驱逐 Pod %s/%s", pod.Namespace, pod.Name) + } + } + return nil +} + +func isDaemonSetPod(pod *corev1.Pod) bool { + for _, owner := range pod.OwnerReferences { + if owner.Kind == "DaemonSet" { + return true + } + } + return false +} + +func isMirrorPod(pod *corev1.Pod) bool { + _, found := pod.Annotations["kubernetes.io/config.mirror"] + return found +} diff --git a/server/internal/service/node.go b/server/internal/service/node.go index 5c4f729..01895ac 100644 --- a/server/internal/service/node.go +++ b/server/internal/service/node.go @@ -88,6 +88,23 @@ func (s *NodeService) GetNode(ctx context.Context, req *pb.GetNodeReq) (*pb.Node return s.buildNodeReply(ctx, node) } +func (s *NodeService) UpdateNodeStatus(ctx context.Context, req *pb.UpdateNodeStatusRequest) (*pb.UpdateNodeStatusResponse, error) { + nodeName := req.NodeName + if req.Status == "DISABLED" { + err := s.uc.DisableNode(ctx, nodeName) + if err != nil { + return &pb.UpdateNodeStatusResponse{Code: 500, Message: "禁用失败"}, err + } + } else { + err := s.uc.EnableNode(ctx, nodeName) + if err != nil { + return &pb.UpdateNodeStatusResponse{Code: 500, Message: "启用失败"}, nil + } + } + + return &pb.UpdateNodeStatusResponse{Code: 200, Message: "成功"}, nil +} + func (s *NodeService) buildNodeReply(ctx context.Context, node *biz.Node) (*pb.NodeReply, error) { nodeReply := &pb.NodeReply{ Name: node.Name, diff --git a/server/internal/service/resource_pool.go b/server/internal/service/resource_pool.go index 5450e37..b552516 100644 --- a/server/internal/service/resource_pool.go +++ b/server/internal/service/resource_pool.go @@ -33,12 +33,12 @@ func (s *ResourcePoolService) Create(ctx context.Context, req *pb.ResourcePoolCr poolName := req.PoolName if database.ExistsResourcePoolByPoolName(poolName) { - return &pb.BaseResponse{Code: -1, Message: "资源池:'" + poolName + "'已经存在"}, nil + return &pb.BaseResponse{Code: 500, Message: "资源池:'" + poolName + "'已经存在"}, nil } poolId, err := database.InsertResourcePool(poolName) if err != nil { - return &pb.BaseResponse{Code: -1, Message: poolName + "创建资源池失败"}, nil + return &pb.BaseResponse{Code: 500, Message: poolName + "创建资源池失败"}, nil } nodes := make([]*database.NodeInfo, 0, len(req.Nodes)) @@ -51,11 +51,11 @@ func (s *ResourcePoolService) Create(ctx context.Context, req *pb.ResourcePoolCr rows, err := database.InsertNodes(poolId, nodes) if err != nil { - return &pb.BaseResponse{Code: -1, Message: poolName + "创建资源池失败"}, nil + return &pb.BaseResponse{Code: 500, Message: poolName + "创建资源池失败"}, nil } log.Info("CreateResourcePool success", poolName, rows) - return &pb.BaseResponse{Code: 1, Message: "成功"}, nil + return &pb.BaseResponse{Code: 200, Message: "成功"}, nil } func (s *ResourcePoolService) Update(ctx context.Context, req *pb.ResourcePoolUpdateRequest) (*pb.BaseResponse, error) { @@ -63,16 +63,16 @@ func (s *ResourcePoolService) Update(ctx context.Context, req *pb.ResourcePoolUp poolId := req.PoolId resourcePool, err := database.QueryResourcePoolById(poolId) if err != nil { - return &pb.BaseResponse{Code: -1, Message: "更新资源池失败"}, nil + return &pb.BaseResponse{Code: 500, Message: "更新资源池失败"}, nil } if resourcePool == nil { - return &pb.BaseResponse{Code: -1, Message: "资源池不存在"}, nil + return &pb.BaseResponse{Code: 500, Message: "资源池不存在"}, nil } _, err = database.DeleteNodesByPoolId(poolId) if err != nil { - return &pb.BaseResponse{Code: -1, Message: "更新资源池失败"}, nil + return &pb.BaseResponse{Code: 500, Message: "更新资源池失败"}, nil } nodes := make([]*database.NodeInfo, 0, len(req.Nodes)) @@ -85,10 +85,10 @@ func (s *ResourcePoolService) Update(ctx context.Context, req *pb.ResourcePoolUp _, err = database.InsertNodes(poolId, nodes) _, err = database.UpdateResourcePool(poolId, req.PoolName) if err != nil { - return &pb.BaseResponse{Code: -1, Message: "更新资源池失败"}, nil + return &pb.BaseResponse{Code: 500, Message: "更新资源池失败"}, nil } - return &pb.BaseResponse{Code: 1, Message: "成功"}, nil + return &pb.BaseResponse{Code: 200, Message: "成功"}, nil } func (s *ResourcePoolService) Delete(ctx context.Context, req *pb.ResourcePoolDeleteRequest) (*pb.BaseResponse, error) { @@ -96,17 +96,17 @@ func (s *ResourcePoolService) Delete(ctx context.Context, req *pb.ResourcePoolDe poolId := req.PoolId num, err := database.DeleteNodesByPoolId(poolId) if err != nil { - return &pb.BaseResponse{Code: -1, Message: "删除资源池失败"}, nil + return &pb.BaseResponse{Code: 500, Message: "删除资源池失败"}, nil } log.Infof("DeleteNodes success, poolId: %d, 影响行数: %d", poolId, num) num, err = database.DeleteResourcePoolById(poolId) if err != nil { - return &pb.BaseResponse{Code: -1, Message: "删除资源池失败"}, nil + return &pb.BaseResponse{Code: 500, Message: "删除资源池失败"}, nil } log.Infof("DeleteResourcePool success poolId: %d, 影响行数: %d", poolId, num) - return &pb.BaseResponse{Code: 1, Message: "成功"}, nil + return &pb.BaseResponse{Code: 200, Message: "成功"}, nil } func (s *ResourcePoolService) List(ctx context.Context, req *pb.ResourcePoolListRequest) (*pb.ResourcePoolListResponse, error) { diff --git a/server/openapi.yaml b/server/openapi.yaml index f2e3f23..96fe1e9 100644 --- a/server/openapi.yaml +++ b/server/openapi.yaml @@ -3,37 +3,42 @@ openapi: 3.0.3 info: - title: ResourcePool API + title: Node API version: 0.0.1 paths: - /v1/available/nodes: + /v1/node: get: tags: - - ResourcePool - operationId: ResourcePool_GetAvailableNodes + - Node + operationId: Node_GetNode + parameters: + - name: uid + in: query + schema: + type: string responses: "200": description: OK content: application/json: schema: - $ref: '#/components/schemas/AvailableNodesResponse' + $ref: '#/components/schemas/NodeReply' default: description: Default error response content: application/json: schema: $ref: '#/components/schemas/Status' - /v1/resource/pool/create: + /v1/node/status/update: post: tags: - - ResourcePool - operationId: ResourcePool_Create + - Node + operationId: Node_UpdateNodeStatus requestBody: content: application/json: schema: - $ref: '#/components/schemas/ResourcePoolCreateRequest' + $ref: '#/components/schemas/UpdateNodeStatusRequest' required: true responses: "200": @@ -41,23 +46,23 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/BaseResponse' + $ref: '#/components/schemas/UpdateNodeStatusResponse' default: description: Default error response content: application/json: schema: $ref: '#/components/schemas/Status' - /v1/resource/pool/delete: + /v1/nodes: post: tags: - - ResourcePool - operationId: ResourcePool_Delete + - Node + operationId: Node_GetAllNodes requestBody: content: application/json: schema: - $ref: '#/components/schemas/ResourcePoolDeleteRequest' + $ref: '#/components/schemas/GetAllNodesReq' required: true responses: "200": @@ -65,64 +70,23 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/BaseResponse' + $ref: '#/components/schemas/NodesReply' default: description: Default error response content: application/json: schema: $ref: '#/components/schemas/Status' - /v1/resource/pool/detail: - get: - tags: - - ResourcePool - operationId: ResourcePool_GetDetail - parameters: - - name: poolId - in: query - schema: - type: string - responses: - "200": - description: OK - content: - application/json: - schema: - $ref: '#/components/schemas/ResourcePoolDetailResponse' - default: - description: Default error response - content: - application/json: - schema: - $ref: '#/components/schemas/Status' - /v1/resource/pool/list: - get: - tags: - - ResourcePool - operationId: ResourcePool_List - responses: - "200": - description: OK - content: - application/json: - schema: - $ref: '#/components/schemas/ResourcePoolListResponse' - default: - description: Default error response - content: - application/json: - schema: - $ref: '#/components/schemas/Status' - /v1/resource/pool/update: + /v1/summary: post: tags: - - ResourcePool - operationId: ResourcePool_Update + - Node + operationId: Node_GetSummary requestBody: content: application/json: schema: - $ref: '#/components/schemas/ResourcePoolUpdateRequest' + $ref: '#/components/schemas/GetSummaryReq' required: true responses: "200": @@ -130,7 +94,7 @@ paths: content: application/json: schema: - $ref: '#/components/schemas/BaseResponse' + $ref: '#/components/schemas/DeviceSummaryReply' default: description: Default error response content: @@ -139,40 +103,61 @@ paths: $ref: '#/components/schemas/Status' components: schemas: - AvailableNodesInfo: + DeviceSummaryReply: type: object properties: - nodeName: - type: string - cpuCores: - type: string - gpuNum: - type: string - gpuMemory: - type: string - totalMemory: + vgpuUsed: + type: integer + format: int32 + vgpuTotal: + type: integer + format: int32 + coreUsed: + type: integer + format: int32 + coreTotal: + type: integer + format: int32 + memoryUsed: + type: integer + format: int32 + memoryTotal: + type: integer + format: int32 + gpuCount: + type: integer + format: int32 + nodeCount: + type: integer + format: int32 + GetAllNodesReq: + type: object + properties: + filters: + $ref: '#/components/schemas/GetAllNodesReq_Filters' + GetAllNodesReq_Filters: + type: object + properties: + ip: type: string - diskSize: + type: type: string - nodeIp: + isSchedulable: type: string - AvailableNodesResponse: + GetSummaryReq: type: object properties: - data: - type: array - items: - $ref: '#/components/schemas/AvailableNodesInfo' - BaseResponse: + filters: + $ref: '#/components/schemas/GetSummaryReq_Filters' + GetSummaryReq_Filters: type: object properties: - code: - type: integer - format: int32 - message: + type: + type: string + nodeUid: + type: string + deviceId: type: string - data: - type: object GoogleProtobufAny: type: object properties: @@ -181,14 +166,7 @@ components: description: The type of the serialized message. additionalProperties: true description: Contains an arbitrary serialized message along with a @type that describes the type of the serialized message. - Nodes: - type: object - properties: - nodeIp: - type: string - nodeName: - type: string - PoolNodeReply: + NodeReply: type: object properties: ip: @@ -213,7 +191,8 @@ components: coreTotal: type: string memoryUsed: - type: string + type: integer + format: int32 memoryTotal: type: string uid: @@ -241,64 +220,13 @@ components: type: string diskSize: type: string - ResourcePoolCreateRequest: - type: object - properties: - poolName: - type: string - nodes: - type: array - items: - $ref: '#/components/schemas/Nodes' - ResourcePoolDeleteRequest: - type: object - properties: - poolId: - type: string - ResourcePoolDetailResponse: + NodesReply: type: object properties: list: type: array items: - $ref: '#/components/schemas/PoolNodeReply' - ResourcePoolListData: - type: object - properties: - poolId: - type: string - poolName: - type: string - cpuCores: - type: string - nodeNum: - type: string - gpuNum: - type: string - availableMemory: - type: string - totalMemory: - type: string - diskSize: - type: string - ResourcePoolListResponse: - type: object - properties: - data: - type: array - items: - $ref: '#/components/schemas/ResourcePoolListData' - ResourcePoolUpdateRequest: - type: object - properties: - poolId: - type: string - poolName: - type: string - nodes: - type: array - items: - $ref: '#/components/schemas/Nodes' + $ref: '#/components/schemas/NodeReply' Status: type: object properties: @@ -315,5 +243,20 @@ components: $ref: '#/components/schemas/GoogleProtobufAny' description: A list of messages that carry the error details. There is a common set of message types for APIs to use. description: 'The `Status` type defines a logical error model that is suitable for different programming environments, including REST APIs and RPC APIs. It is used by [gRPC](https://github.com/grpc). Each `Status` message contains three pieces of data: error code, error message, and error details. You can find out more about this error model and how to work with it in the [API Design Guide](https://cloud.google.com/apis/design/errors).' + UpdateNodeStatusRequest: + type: object + properties: + nodeName: + type: string + status: + type: string + UpdateNodeStatusResponse: + type: object + properties: + code: + type: integer + format: int32 + message: + type: string tags: - - name: ResourcePool + - name: Node