陈博文 1 week ago
commit b0a538c3fd

@ -36,6 +36,16 @@ service Node {
summary: "节点详情";
};
}
rpc UpdateNodeStatus (UpdateNodeStatusRequest) returns (UpdateNodeStatusResponse) {
option (google.api.http) = {
post: "/v1/node/status/update",
body: "*"
};
option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = {
summary: "禁用启用节点";
};
}
}
message GetSummaryReq {
@ -100,3 +110,13 @@ message NodeReply {
message NodesReply {
repeated NodeReply list = 1;
}
message UpdateNodeStatusRequest {
string node_name = 1;
string status = 2;
}
message UpdateNodeStatusResponse {
int32 code = 1;
string message = 2;
}

@ -58,6 +58,8 @@ type NodeRepo interface {
GetNode(context.Context, string) (*Node, error)
ListAllDevices(context.Context) ([]*DeviceInfo, error)
FindDeviceByAliasId(string) (*DeviceInfo, error)
EnableNode(context.Context, string) error
DisableNode(context.Context, string) error
}
type NodeUsecase struct {
@ -88,3 +90,11 @@ func (uc *NodeUsecase) ListAllDevices(ctx context.Context) ([]*DeviceInfo, error
func (uc *NodeUsecase) FindDeviceByAliasId(aliasId string) (*DeviceInfo, error) {
return uc.repo.FindDeviceByAliasId(aliasId)
}
func (uc *NodeUsecase) EnableNode(ctx context.Context, nodeName string) error {
return uc.repo.EnableNode(ctx, nodeName)
}
func (uc *NodeUsecase) DisableNode(ctx context.Context, nodeName string) error {
return uc.repo.DisableNode(ctx, nodeName)
}

@ -6,6 +6,8 @@ import (
"fmt"
"github.com/go-kratos/kratos/v2/log"
corev1 "k8s.io/api/core/v1"
"k8s.io/api/policy/v1beta1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/labels"
k8stypes "k8s.io/apimachinery/pkg/types"
"k8s.io/client-go/informers"
@ -239,3 +241,97 @@ func (r *nodeRepo) FindDeviceByAliasId(aliasId string) (*biz.DeviceInfo, error)
}
return nil, errors.New(fmt.Sprintf("aliasID:%s device not found", aliasId))
}
// DisableNode 禁用节点标记为不可调度并排空Pod
func (r *nodeRepo) EnableNode(ctx context.Context, nodeName string) error {
// 1. 标记为可调度
patch := []byte(`{"spec":{"unschedulable":false}}`)
_, err := r.data.k8sCl.CoreV1().Nodes().Patch(
ctx,
nodeName,
k8stypes.StrategicMergePatchType,
patch,
metav1.PatchOptions{},
)
if err != nil {
return fmt.Errorf("标记节点不可调度失败: %v", err)
}
log.Infof("节点 %s 已恢复可调度状态", nodeName)
//// 2. 驱逐 Pod
//if err := r.evictPodsOnNode(ctx, nodeName); err != nil {
// return fmt.Errorf("驱逐 Pod 失败: %v", err)
//}
return nil
}
// DisableNode 禁用节点标记为不可调度并排空Pod
func (r *nodeRepo) DisableNode(ctx context.Context, nodeName string) error {
// 1. 标记为不可调度
patch := []byte(`{"spec":{"unschedulable":true}}`)
_, err := r.data.k8sCl.CoreV1().Nodes().Patch(
ctx,
nodeName,
k8stypes.StrategicMergePatchType,
patch,
metav1.PatchOptions{},
)
if err != nil {
return fmt.Errorf("标记节点不可调度失败: %v", err)
}
log.Infof("节点 %s 已设置为不可调度状态", nodeName)
//// 2. 驱逐 Pod
//if err := r.evictPodsOnNode(ctx, nodeName); err != nil {
// return fmt.Errorf("驱逐 Pod 失败: %v", err)
//}
return nil
}
func (r *nodeRepo) evictPodsOnNode(ctx context.Context, nodeName string) error {
// 获取该节点上的 Pod 列表
pods, err := r.data.k8sCl.CoreV1().Pods("").List(ctx, metav1.ListOptions{
FieldSelector: fmt.Sprintf("spec.nodeName=%s", nodeName),
})
if err != nil {
return fmt.Errorf("列出节点上的 Pod 失败: %v", err)
}
// 遍历驱逐 Pod
for _, pod := range pods.Items {
// 跳过 DaemonSet 和 Mirror Pods它们不能被驱逐
if isMirrorPod(&pod) || isDaemonSetPod(&pod) {
continue
}
eviction := &v1beta1.Eviction{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Namespace: pod.Namespace,
},
}
err := r.data.k8sCl.CoreV1().Pods(pod.Namespace).Evict(ctx, eviction)
if err != nil {
r.log.Warnf("驱逐 Pod %s/%s 失败: %v", pod.Namespace, pod.Name, err)
} else {
r.log.Infof("已驱逐 Pod %s/%s", pod.Namespace, pod.Name)
}
}
return nil
}
func isDaemonSetPod(pod *corev1.Pod) bool {
for _, owner := range pod.OwnerReferences {
if owner.Kind == "DaemonSet" {
return true
}
}
return false
}
func isMirrorPod(pod *corev1.Pod) bool {
_, found := pod.Annotations["kubernetes.io/config.mirror"]
return found
}

@ -88,6 +88,23 @@ func (s *NodeService) GetNode(ctx context.Context, req *pb.GetNodeReq) (*pb.Node
return s.buildNodeReply(ctx, node)
}
func (s *NodeService) UpdateNodeStatus(ctx context.Context, req *pb.UpdateNodeStatusRequest) (*pb.UpdateNodeStatusResponse, error) {
nodeName := req.NodeName
if req.Status == "DISABLED" {
err := s.uc.DisableNode(ctx, nodeName)
if err != nil {
return &pb.UpdateNodeStatusResponse{Code: 500, Message: "禁用失败"}, err
}
} else {
err := s.uc.EnableNode(ctx, nodeName)
if err != nil {
return &pb.UpdateNodeStatusResponse{Code: 500, Message: "启用失败"}, nil
}
}
return &pb.UpdateNodeStatusResponse{Code: 200, Message: "成功"}, nil
}
func (s *NodeService) buildNodeReply(ctx context.Context, node *biz.Node) (*pb.NodeReply, error) {
nodeReply := &pb.NodeReply{
Name: node.Name,

@ -33,12 +33,12 @@ func (s *ResourcePoolService) Create(ctx context.Context, req *pb.ResourcePoolCr
poolName := req.PoolName
if database.ExistsResourcePoolByPoolName(poolName) {
return &pb.BaseResponse{Code: -1, Message: "资源池:'" + poolName + "'已经存在"}, nil
return &pb.BaseResponse{Code: 500, Message: "资源池:'" + poolName + "'已经存在"}, nil
}
poolId, err := database.InsertResourcePool(poolName)
if err != nil {
return &pb.BaseResponse{Code: -1, Message: poolName + "创建资源池失败"}, nil
return &pb.BaseResponse{Code: 500, Message: poolName + "创建资源池失败"}, nil
}
nodes := make([]*database.NodeInfo, 0, len(req.Nodes))
@ -51,11 +51,11 @@ func (s *ResourcePoolService) Create(ctx context.Context, req *pb.ResourcePoolCr
rows, err := database.InsertNodes(poolId, nodes)
if err != nil {
return &pb.BaseResponse{Code: -1, Message: poolName + "创建资源池失败"}, nil
return &pb.BaseResponse{Code: 500, Message: poolName + "创建资源池失败"}, nil
}
log.Info("CreateResourcePool success", poolName, rows)
return &pb.BaseResponse{Code: 1, Message: "成功"}, nil
return &pb.BaseResponse{Code: 200, Message: "成功"}, nil
}
func (s *ResourcePoolService) Update(ctx context.Context, req *pb.ResourcePoolUpdateRequest) (*pb.BaseResponse, error) {
@ -63,16 +63,16 @@ func (s *ResourcePoolService) Update(ctx context.Context, req *pb.ResourcePoolUp
poolId := req.PoolId
resourcePool, err := database.QueryResourcePoolById(poolId)
if err != nil {
return &pb.BaseResponse{Code: -1, Message: "更新资源池失败"}, nil
return &pb.BaseResponse{Code: 500, Message: "更新资源池失败"}, nil
}
if resourcePool == nil {
return &pb.BaseResponse{Code: -1, Message: "资源池不存在"}, nil
return &pb.BaseResponse{Code: 500, Message: "资源池不存在"}, nil
}
_, err = database.DeleteNodesByPoolId(poolId)
if err != nil {
return &pb.BaseResponse{Code: -1, Message: "更新资源池失败"}, nil
return &pb.BaseResponse{Code: 500, Message: "更新资源池失败"}, nil
}
nodes := make([]*database.NodeInfo, 0, len(req.Nodes))
@ -85,10 +85,10 @@ func (s *ResourcePoolService) Update(ctx context.Context, req *pb.ResourcePoolUp
_, err = database.InsertNodes(poolId, nodes)
_, err = database.UpdateResourcePool(poolId, req.PoolName)
if err != nil {
return &pb.BaseResponse{Code: -1, Message: "更新资源池失败"}, nil
return &pb.BaseResponse{Code: 500, Message: "更新资源池失败"}, nil
}
return &pb.BaseResponse{Code: 1, Message: "成功"}, nil
return &pb.BaseResponse{Code: 200, Message: "成功"}, nil
}
func (s *ResourcePoolService) Delete(ctx context.Context, req *pb.ResourcePoolDeleteRequest) (*pb.BaseResponse, error) {
@ -96,17 +96,17 @@ func (s *ResourcePoolService) Delete(ctx context.Context, req *pb.ResourcePoolDe
poolId := req.PoolId
num, err := database.DeleteNodesByPoolId(poolId)
if err != nil {
return &pb.BaseResponse{Code: -1, Message: "删除资源池失败"}, nil
return &pb.BaseResponse{Code: 500, Message: "删除资源池失败"}, nil
}
log.Infof("DeleteNodes success, poolId: %d, 影响行数: %d", poolId, num)
num, err = database.DeleteResourcePoolById(poolId)
if err != nil {
return &pb.BaseResponse{Code: -1, Message: "删除资源池失败"}, nil
return &pb.BaseResponse{Code: 500, Message: "删除资源池失败"}, nil
}
log.Infof("DeleteResourcePool success poolId: %d, 影响行数: %d", poolId, num)
return &pb.BaseResponse{Code: 1, Message: "成功"}, nil
return &pb.BaseResponse{Code: 200, Message: "成功"}, nil
}
func (s *ResourcePoolService) List(ctx context.Context, req *pb.ResourcePoolListRequest) (*pb.ResourcePoolListResponse, error) {

@ -3,37 +3,42 @@
openapi: 3.0.3
info:
title: ResourcePool API
title: Node API
version: 0.0.1
paths:
/v1/available/nodes:
/v1/node:
get:
tags:
- ResourcePool
operationId: ResourcePool_GetAvailableNodes
- Node
operationId: Node_GetNode
parameters:
- name: uid
in: query
schema:
type: string
responses:
"200":
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/AvailableNodesResponse'
$ref: '#/components/schemas/NodeReply'
default:
description: Default error response
content:
application/json:
schema:
$ref: '#/components/schemas/Status'
/v1/resource/pool/create:
/v1/node/status/update:
post:
tags:
- ResourcePool
operationId: ResourcePool_Create
- Node
operationId: Node_UpdateNodeStatus
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ResourcePoolCreateRequest'
$ref: '#/components/schemas/UpdateNodeStatusRequest'
required: true
responses:
"200":
@ -41,23 +46,23 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/BaseResponse'
$ref: '#/components/schemas/UpdateNodeStatusResponse'
default:
description: Default error response
content:
application/json:
schema:
$ref: '#/components/schemas/Status'
/v1/resource/pool/delete:
/v1/nodes:
post:
tags:
- ResourcePool
operationId: ResourcePool_Delete
- Node
operationId: Node_GetAllNodes
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ResourcePoolDeleteRequest'
$ref: '#/components/schemas/GetAllNodesReq'
required: true
responses:
"200":
@ -65,64 +70,23 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/BaseResponse'
$ref: '#/components/schemas/NodesReply'
default:
description: Default error response
content:
application/json:
schema:
$ref: '#/components/schemas/Status'
/v1/resource/pool/detail:
get:
tags:
- ResourcePool
operationId: ResourcePool_GetDetail
parameters:
- name: poolId
in: query
schema:
type: string
responses:
"200":
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/ResourcePoolDetailResponse'
default:
description: Default error response
content:
application/json:
schema:
$ref: '#/components/schemas/Status'
/v1/resource/pool/list:
get:
tags:
- ResourcePool
operationId: ResourcePool_List
responses:
"200":
description: OK
content:
application/json:
schema:
$ref: '#/components/schemas/ResourcePoolListResponse'
default:
description: Default error response
content:
application/json:
schema:
$ref: '#/components/schemas/Status'
/v1/resource/pool/update:
/v1/summary:
post:
tags:
- ResourcePool
operationId: ResourcePool_Update
- Node
operationId: Node_GetSummary
requestBody:
content:
application/json:
schema:
$ref: '#/components/schemas/ResourcePoolUpdateRequest'
$ref: '#/components/schemas/GetSummaryReq'
required: true
responses:
"200":
@ -130,7 +94,7 @@ paths:
content:
application/json:
schema:
$ref: '#/components/schemas/BaseResponse'
$ref: '#/components/schemas/DeviceSummaryReply'
default:
description: Default error response
content:
@ -139,40 +103,61 @@ paths:
$ref: '#/components/schemas/Status'
components:
schemas:
AvailableNodesInfo:
DeviceSummaryReply:
type: object
properties:
nodeName:
type: string
cpuCores:
type: string
gpuNum:
type: string
gpuMemory:
type: string
totalMemory:
vgpuUsed:
type: integer
format: int32
vgpuTotal:
type: integer
format: int32
coreUsed:
type: integer
format: int32
coreTotal:
type: integer
format: int32
memoryUsed:
type: integer
format: int32
memoryTotal:
type: integer
format: int32
gpuCount:
type: integer
format: int32
nodeCount:
type: integer
format: int32
GetAllNodesReq:
type: object
properties:
filters:
$ref: '#/components/schemas/GetAllNodesReq_Filters'
GetAllNodesReq_Filters:
type: object
properties:
ip:
type: string
diskSize:
type:
type: string
nodeIp:
isSchedulable:
type: string
AvailableNodesResponse:
GetSummaryReq:
type: object
properties:
data:
type: array
items:
$ref: '#/components/schemas/AvailableNodesInfo'
BaseResponse:
filters:
$ref: '#/components/schemas/GetSummaryReq_Filters'
GetSummaryReq_Filters:
type: object
properties:
code:
type: integer
format: int32
message:
type:
type: string
nodeUid:
type: string
deviceId:
type: string
data:
type: object
GoogleProtobufAny:
type: object
properties:
@ -181,14 +166,7 @@ components:
description: The type of the serialized message.
additionalProperties: true
description: Contains an arbitrary serialized message along with a @type that describes the type of the serialized message.
Nodes:
type: object
properties:
nodeIp:
type: string
nodeName:
type: string
PoolNodeReply:
NodeReply:
type: object
properties:
ip:
@ -213,7 +191,8 @@ components:
coreTotal:
type: string
memoryUsed:
type: string
type: integer
format: int32
memoryTotal:
type: string
uid:
@ -241,64 +220,13 @@ components:
type: string
diskSize:
type: string
ResourcePoolCreateRequest:
type: object
properties:
poolName:
type: string
nodes:
type: array
items:
$ref: '#/components/schemas/Nodes'
ResourcePoolDeleteRequest:
type: object
properties:
poolId:
type: string
ResourcePoolDetailResponse:
NodesReply:
type: object
properties:
list:
type: array
items:
$ref: '#/components/schemas/PoolNodeReply'
ResourcePoolListData:
type: object
properties:
poolId:
type: string
poolName:
type: string
cpuCores:
type: string
nodeNum:
type: string
gpuNum:
type: string
availableMemory:
type: string
totalMemory:
type: string
diskSize:
type: string
ResourcePoolListResponse:
type: object
properties:
data:
type: array
items:
$ref: '#/components/schemas/ResourcePoolListData'
ResourcePoolUpdateRequest:
type: object
properties:
poolId:
type: string
poolName:
type: string
nodes:
type: array
items:
$ref: '#/components/schemas/Nodes'
$ref: '#/components/schemas/NodeReply'
Status:
type: object
properties:
@ -315,5 +243,20 @@ components:
$ref: '#/components/schemas/GoogleProtobufAny'
description: A list of messages that carry the error details. There is a common set of message types for APIs to use.
description: 'The `Status` type defines a logical error model that is suitable for different programming environments, including REST APIs and RPC APIs. It is used by [gRPC](https://github.com/grpc). Each `Status` message contains three pieces of data: error code, error message, and error details. You can find out more about this error model and how to work with it in the [API Design Guide](https://cloud.google.com/apis/design/errors).'
UpdateNodeStatusRequest:
type: object
properties:
nodeName:
type: string
status:
type: string
UpdateNodeStatusResponse:
type: object
properties:
code:
type: integer
format: int32
message:
type: string
tags:
- name: ResourcePool
- name: Node

Loading…
Cancel
Save