diff --git a/server/api/v1/resource_pool.proto b/server/api/v1/resource_pool.proto new file mode 100644 index 0000000..aed7871 --- /dev/null +++ b/server/api/v1/resource_pool.proto @@ -0,0 +1,169 @@ +syntax = "proto3"; + +package api.v1; + +import "google/api/annotations.proto"; +import "protoc-gen-openapiv2/options/annotations.proto"; +import "google/protobuf/struct.proto"; + + +option go_package = "vgpu/api/v1;v1"; + +service ResourcePool { + + rpc Create (ResourcePoolCreateRequest) returns (BaseResponse) { + option (google.api.http) = { + post: "/v1/resource/pool/create", + body: "*" + }; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + summary: "资源池创建"; + }; + } + + rpc Update (ResourcePoolUpdateRequest) returns (BaseResponse) { + option (google.api.http) = { + post: "/v1/resource/pool/update", + body: "*" + }; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + summary: "资源池更新"; + }; + } + + rpc Delete (ResourcePoolDeleteRequest) returns (BaseResponse) { + option (google.api.http) = { + post: "/v1/resource/pool/delete", + body: "*" + }; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + summary: "资源池删除"; + }; + } + + rpc List (ResourcePoolListRequest) returns (ResourcePoolListResponse) { + option (google.api.http) = { + get: "/v1/resource/pool/list" + }; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + summary: "资源池列表"; + }; + } + + rpc GetDetail (ResourcePoolDetailRequest) returns (ResourcePoolDetailResponse) { + option (google.api.http) = { + get: "/v1/resource/pool/detail" + }; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + summary: "资源池详情"; + }; + } + + rpc GetAvailableNodes (AvailableNodesRequest) returns (AvailableNodesResponse) { + option (google.api.http) = { + get: "/v1/available/nodes" + }; + option (grpc.gateway.protoc_gen_openapiv2.options.openapiv2_operation) = { + summary: "可用节点列表"; + }; + } +} + +message BaseResponse { + int32 code = 1; + string message = 2; + google.protobuf.Struct data = 3; +} + +message Nodes { + string node_ip = 1; + string node_name = 2; +} + +message ResourcePoolCreateRequest { + string pool_name = 1; + repeated Nodes nodes = 2; +} + +message ResourcePoolUpdateRequest { + int64 pool_id = 1; + string pool_name = 2; + repeated Nodes nodes = 3; +} + +message ResourcePoolDeleteRequest { + int64 pool_id = 1; +} + +message PoolNodeReply { + string ip = 1; + bool is_schedulable = 2; + bool is_ready = 3; + repeated string type = 4; + int32 vgpu_used = 5; + int32 vgpu_total = 6; + int32 core_used = 7; + int64 core_total = 8; + int64 memory_used = 9; + int64 memory_total = 10; + string uid = 11; + string name = 12; + int32 card_cnt = 13; + string os_image = 14; + string operating_system = 15; + string kernel_version = 16; + string container_runtime_version = 17; + string kubelet_version = 18; + string kube_proxy_version = 19; + string architecture = 20; + string creation_timestamp = 21; + int64 disk_size = 22; +} + +message ResourcePoolDetailRequest { + int64 pool_id = 1; +} + +message ResourcePoolDetailResponse { + repeated PoolNodeReply list = 1; +} + +message ResourcePoolListData{ + int64 pool_id = 1; + string pool_name = 2; + int64 cpu_cores = 3; + int64 node_num = 4; + int64 gpu_num = 5; + int64 available_memory = 6;//kb + int64 total_memory = 7; // kb + int64 disk_size = 8; +} + +message ResourcePoolListRequest { + +} + +message ResourcePoolListResponse { + repeated ResourcePoolListData data = 1; +} + +message AvailableNodesRequest{ + +} + +message AvailableNodesResponse{ + repeated AvailableNodesInfo data = 1; +} + +message AvailableNodesInfo{ + string node_name = 1; + int64 cpu_cores = 2; + int64 gpu_num = 3; + int64 gpu_memory = 4; + int64 total_memory = 5; // byte + int64 disk_size = 6; + string node_ip = 7; +} + + + diff --git a/server/config/config.yaml b/server/config/config.yaml index f3905c8..e2065f7 100644 --- a/server/config/config.yaml +++ b/server/config/config.yaml @@ -3,13 +3,16 @@ server: addr: 0.0.0.0:8000 timeout: 1s grpc: - addr: 0.0.0.0:9000 + addr: 0.0.0.0:9001 timeout: 1s prometheus: - address: http://localhost:9090 + address: http://172.16.100.14:29090 timeout: 1m node_selectors: NVIDIA: gpu=on Ascend: ascend=on DCU: dcu=on - MLU: mlu=on \ No newline at end of file + MLU: mlu=on +database: + driver: mysql + dataSourceName: testeducoder:TEST@123@tcp(testeducoder-public.mysql.polardb.rds.aliyuncs.com:3306)/hami?parseTime=true&loc=Local \ No newline at end of file diff --git a/server/config/db.sql b/server/config/db.sql new file mode 100644 index 0000000..86d07c4 --- /dev/null +++ b/server/config/db.sql @@ -0,0 +1,19 @@ +create database hami character set utf8mb4; + + +create table resource_pool( + id bigint primary key not null auto_increment, + pool_name varchar(128) not null, + create_time timestamp default current_timestamp, + update_time timestamp default current_timestamp on update current_timestamp +); + +create table nodes( + id bigint primary key not null auto_increment, + pool_id bigint not null, + node_name varchar(128) not null, + node_ip varchar(32) not null, + create_time timestamp default current_timestamp, + update_time timestamp default current_timestamp on update current_timestamp +); + diff --git a/server/go.mod b/server/go.mod index f7bc57e..fea3c87 100644 --- a/server/go.mod +++ b/server/go.mod @@ -27,6 +27,7 @@ require ( ) require ( + filippo.io/edwards25519 v1.1.0 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.3.0 // indirect github.com/davecgh/go-spew v1.1.2-0.20180830191138-d8f796af33cc // indirect @@ -40,6 +41,7 @@ require ( github.com/go-openapi/jsonreference v0.21.0 // indirect github.com/go-openapi/swag v0.23.0 // indirect github.com/go-playground/form/v4 v4.2.0 // indirect + github.com/go-sql-driver/mysql v1.9.3 // indirect github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/glog v1.2.0 // indirect github.com/golang/protobuf v1.5.4 // indirect diff --git a/server/go.sum b/server/go.sum index 62dafb6..d1afeb5 100644 --- a/server/go.sum +++ b/server/go.sum @@ -31,6 +31,8 @@ cloud.google.com/go/storage v1.6.0/go.mod h1:N7U0C8pVQ/+NIKOBQyamJIeKQKkZ+mxpohl cloud.google.com/go/storage v1.8.0/go.mod h1:Wv1Oy7z6Yz3DshWRJFhqM/UCfaWIRTdp0RXyy7KQOVs= cloud.google.com/go/storage v1.10.0/go.mod h1:FLPqc6j+Ki4BU591ie1oL6qBQGu2Bl/tZ9ullr3+Kg0= dmitri.shuralyov.com/gpu/mtl v0.0.0-20190408044501-666a987793e9/go.mod h1:H6x//7gZCb22OMCxBHrMx7a5I7Hp++hsVxbQ4BYO7hU= +filippo.io/edwards25519 v1.1.0 h1:FNf4tywRC1HmFuKW5xopWpigGjJKiJSV0Cqo0cJWDaA= +filippo.io/edwards25519 v1.1.0/go.mod h1:BxyFTGdWcka3PhytdK4V28tE5sGfRvvvRV7EaN4VDT4= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= github.com/antihax/optional v1.0.0/go.mod h1:uupD/76wgC+ih3iEmQUL+0Ugr19nfwCT1kdvxnR2qWY= @@ -98,6 +100,8 @@ github.com/go-playground/assert/v2 v2.0.1 h1:MsBgLAaY856+nPRTKrp3/OZK38U/wa0CcBY github.com/go-playground/assert/v2 v2.0.1/go.mod h1:VDjEfimB/XKnb+ZQfWdccd7VUvScMdVu0Titje2rxJ4= github.com/go-playground/form/v4 v4.2.0 h1:N1wh+Goz61e6w66vo8vJkQt+uwZSoLz50kZPJWR8eic= github.com/go-playground/form/v4 v4.2.0/go.mod h1:q1a2BY+AQUUzhl6xA/6hBetay6dEIhMHjgvJiGo6K7U= +github.com/go-sql-driver/mysql v1.9.3 h1:U/N249h2WzJ3Ukj8SowVFjdtZKfu9vlLZxjPXV1aweo= +github.com/go-sql-driver/mysql v1.9.3/go.mod h1:qn46aNg1333BRMNU69Lq93t8du/dwxI64Gl8i5p1WMU= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= diff --git a/server/internal/biz/node.go b/server/internal/biz/node.go index 3202772..aaae57b 100644 --- a/server/internal/biz/node.go +++ b/server/internal/biz/node.go @@ -21,6 +21,12 @@ type Node struct { Architecture string CreationTimestamp string Devices []*DeviceInfo + CPUCores int64 // CPU 核数 + GPUCount int64 // 显卡数量 + TotalMemory int64 // 总内存(字节) + AvailableMemory int64 // 可用内存(字节) + DiskTotal int64 // 磁盘总大小(字节) + StorageNum int64 } type DeviceInfo struct { @@ -48,6 +54,7 @@ type DeviceTotal struct { type NodeRepo interface { ListAll(context.Context) ([]*Node, error) + ListAllV2(context.Context) ([]*Node, error) GetNode(context.Context, string) (*Node, error) ListAllDevices(context.Context) ([]*DeviceInfo, error) FindDeviceByAliasId(string) (*DeviceInfo, error) @@ -66,6 +73,10 @@ func (uc *NodeUsecase) ListAllNodes(ctx context.Context) ([]*Node, error) { return uc.repo.ListAll(ctx) } +func (uc *NodeUsecase) ListAllNodesV2(ctx context.Context) ([]*Node, error) { + return uc.repo.ListAllV2(ctx) +} + func (uc *NodeUsecase) GetNode(ctx context.Context, nodeName string) (*Node, error) { return uc.repo.GetNode(ctx, nodeName) } diff --git a/server/internal/data/node.go b/server/internal/data/node.go index d5a54a1..950cf7c 100644 --- a/server/internal/data/node.go +++ b/server/internal/data/node.go @@ -6,6 +6,7 @@ import ( "fmt" "github.com/go-kratos/kratos/v2/log" corev1 "k8s.io/api/core/v1" + "k8s.io/apimachinery/pkg/labels" k8stypes "k8s.io/apimachinery/pkg/types" "k8s.io/client-go/informers" listerscorev1 "k8s.io/client-go/listers/core/v1" @@ -26,6 +27,7 @@ type nodeRepo struct { nodeNotify chan struct{} nodeLister listerscorev1.NodeLister nodes map[k8stypes.UID]*biz.Node + allNodes []*biz.Node log *log.Helper mutex sync.RWMutex providers []provider.Provider @@ -71,13 +73,14 @@ func (r *nodeRepo) updateLocalNodes() { if _, ok := n[node.UID]; !ok { n[node.UID] = bizNode } + devices, err := p.FetchDevices(node) if err != nil { r.log.Warnf("list devices info error: %s", err) continue } for _, device := range devices { - n[node.UID].Devices = append(bizNode.Devices, &biz.DeviceInfo{ + bizNode.Devices = append(bizNode.Devices, &biz.DeviceInfo{ Index: int(device.Index), Id: device.ID, AliasId: device.AliasId, @@ -97,6 +100,18 @@ func (r *nodeRepo) updateLocalNodes() { } } r.nodes = n + + var all []*biz.Node + allNodes, _ := r.nodeLister.List(labels.Everything()) + for _, node := range allNodes { + bizNode := r.fetchNodeInfo(node) + gpuNode := n[k8stypes.UID(bizNode.Uid)] + if gpuNode != nil { + bizNode.Devices = gpuNode.Devices + } + all = append(all, bizNode) + } + r.allNodes = all } } @@ -128,6 +143,8 @@ func (r *nodeRepo) onDeletedNode(obj interface{}) { } func (r *nodeRepo) fetchNodeInfo(node *corev1.Node) *biz.Node { + //b, _ := json.MarshalIndent(node, "", " ") + //fmt.Println(string(b)) n := &biz.Node{IsSchedulable: !node.Spec.Unschedulable} for _, addr := range node.Status.Addresses { if addr.Type == corev1.NodeInternalIP { @@ -150,6 +167,34 @@ func (r *nodeRepo) fetchNodeInfo(node *corev1.Node) *biz.Node { n.KubeProxyVersion = node.Status.NodeInfo.KubeProxyVersion n.Architecture = strings.ToUpper(node.Status.NodeInfo.Architecture) n.CreationTimestamp = node.CreationTimestamp.Format("2006-01-02 15:04:05") + + capacity := node.Status.Capacity + allocatable := node.Status.Allocatable + // CPU 核数 + if cpu, ok := capacity[corev1.ResourceCPU]; ok { + n.CPUCores = cpu.Value() + } + + // GPU 数量(nvidia.com/gpu) + //if gpu, ok := capacity["nvidia.com/gpu"]; ok { + // n.GPUCount = gpu.Value() + //} + + // 总内存 + if mem, ok := capacity[corev1.ResourceMemory]; ok { + n.TotalMemory = mem.Value() + } + + // 可用内存 + if mem, ok := allocatable[corev1.ResourceMemory]; ok { + n.AvailableMemory = mem.Value() + } + + // 磁盘总大小(临时存储) + if disk, ok := capacity[corev1.ResourceEphemeralStorage]; ok { + n.DiskTotal = disk.Value() + } + return n } @@ -161,6 +206,14 @@ func (r *nodeRepo) ListAll(context.Context) ([]*biz.Node, error) { return nodeList, nil } +func (r *nodeRepo) ListAllV2(context.Context) ([]*biz.Node, error) { + var nodeList []*biz.Node + for _, node := range r.allNodes { + nodeList = append(nodeList, node) + } + return nodeList, nil +} + func (r *nodeRepo) GetNode(_ context.Context, uid string) (*biz.Node, error) { if _, ok := r.nodes[k8stypes.UID(uid)]; !ok { return nil, errors.New("node not found") diff --git a/server/internal/database/config.go b/server/internal/database/config.go new file mode 100644 index 0000000..d224f25 --- /dev/null +++ b/server/internal/database/config.go @@ -0,0 +1,31 @@ +package database + +import ( + "fmt" + "gopkg.in/yaml.v3" + "os" +) + +type DatabaseConfig struct { + Driver string `yaml:"driver"` + DataSourceName string `yaml:"dataSourceName"` +} + +type Config struct { + Database DatabaseConfig `yaml:"database"` +} + +func LoadConfig(filePath string) (*Config, error) { + yamlFile, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("failed to read config file: %v", err) + } + + var config Config + err = yaml.Unmarshal(yamlFile, &config) + if err != nil { + return nil, fmt.Errorf("failed to unmarshal config file: %v", err) + } + + return &config, nil +} diff --git a/server/internal/database/init_db.go b/server/internal/database/init_db.go new file mode 100644 index 0000000..fb40bc8 --- /dev/null +++ b/server/internal/database/init_db.go @@ -0,0 +1,33 @@ +package database + +import ( + "database/sql" + _ "github.com/go-sql-driver/mysql" + "log" +) + +var db *sql.DB + +func InitDB(config *DatabaseConfig) { + var err error + switch config.Driver { + case "mysql": + db, err = sql.Open("mysql", config.DataSourceName) + default: + log.Fatalf("Unsupported database driver: %s", config.Driver) + } + + if err != nil { + log.Fatalf("Failed to connect to database: %v", err) + } + + err = db.Ping() + if err != nil { + log.Fatalf("Failed to ping database: %v", err) + } + log.Println("Connected to database") +} + +func GetDB() *sql.DB { + return db +} diff --git a/server/internal/database/resource_pool_db.go b/server/internal/database/resource_pool_db.go new file mode 100644 index 0000000..420001c --- /dev/null +++ b/server/internal/database/resource_pool_db.go @@ -0,0 +1,212 @@ +package database + +import ( + "database/sql" + "errors" + "fmt" + "log" + "strings" + "time" +) + +type ResourcePool struct { + Id int64 `db:"id"` + PoolName string `db:"pool_name"` + CreateTime time.Time `db:"create_time"` + UpdateTime time.Time `db:"update_time"` +} + +type Nodes struct { + Id int64 `db:"id"` + NodeName string `db:"node_name"` + NodeIp string `db:"node_ip"` + CreateTime time.Time `db:"create_time"` + UpdateTime time.Time `db:"update_time"` +} + +type NodeInfo struct { + Name string + IP string +} + +func ExistsResourcePoolByPoolName(poolName string) bool { + var count int + err := db.QueryRow("SELECT count(1) FROM resource_pool WHERE pool_name = ?", poolName).Scan(&count) + if err != nil { + log.Printf("Query failed: %v", err) + return false + } + + return count > 0 +} + +func QueryResourcePoolById(poolId int64) (*ResourcePool, error) { + var pool ResourcePool + err := db.QueryRow("SELECT id, pool_name, create_time, update_time FROM resource_pool WHERE id = ?", poolId). + Scan(&pool.Id, &pool.PoolName, &pool.CreateTime, &pool.UpdateTime) + if err != nil { + if errors.Is(err, sql.ErrNoRows) { + log.Printf("No record found with id %d", poolId) + return nil, nil + } + log.Printf("Query failed: %v", err) + return nil, err + } + + return &pool, nil +} + +func QueryResourcePoolListAll() ([]*ResourcePool, error) { + // 执行查询 + rows, err := db.Query("SELECT id, pool_name, create_time, update_time FROM resource_pool") + if err != nil { + log.Printf("Query failed: %v", err) + return nil, err + } + defer rows.Close() + + // 存放结果的切片 + pools := make([]*ResourcePool, 0) + + // 遍历每一行 + for rows.Next() { + var pool ResourcePool + err := rows.Scan(&pool.Id, &pool.PoolName, &pool.CreateTime, &pool.UpdateTime) + if err != nil { + log.Printf("Scan failed: %v", err) + return nil, err + } + pools = append(pools, &pool) + } + + // 检查 rows 是否遍历中出错 + if err := rows.Err(); err != nil { + return nil, err + } + + return pools, nil +} + +func QueryNodesByPoolId(poolId int64) ([]*Nodes, error) { + // 执行查询 + rows, err := db.Query("SELECT id, node_name, node_ip, create_time, update_time FROM nodes where pool_id = ?", poolId) + if err != nil { + log.Printf("Query failed: %v", err) + return nil, err + } + defer rows.Close() + + // 存放结果的切片 + nodes := make([]*Nodes, 0) + + // 遍历每一行 + for rows.Next() { + var node Nodes + err := rows.Scan(&node.Id, &node.NodeName, &node.NodeIp, &node.CreateTime, &node.UpdateTime) + if err != nil { + log.Printf("Scan failed: %v", err) + return nil, err + } + nodes = append(nodes, &node) + } + + // 检查 rows 是否遍历中出错 + if err := rows.Err(); err != nil { + return nil, err + } + + return nodes, nil +} + +func InsertResourcePool(poolName string) (int64, error) { + querySql := "INSERT INTO resource_pool(pool_name) VALUES (?)" + + result, err := db.Exec(querySql, poolName) + if err != nil { + log.Printf("Failed to insert record: %v", err) + return 0, err + } + + id, err := result.LastInsertId() + if err != nil { + log.Printf("Failed to get last insert ID: %v", err) + return 0, err + } + + return id, nil +} + +func UpdateResourcePool(poolId int64, poolName string) (int64, error) { + updateSql := "UPDATE resource_pool SET pool_name=? where id=?" + result, err := db.Exec(updateSql, poolName, poolId) + if err != nil { + log.Printf("Failed to update record: %v", err) + return 0, err + } + + rows, err := result.RowsAffected() + if err != nil { + log.Printf("Failed to get rows affected: %v", err) + return 0, err + } + + return rows, nil +} + +func InsertNodes(poolId int64, nodes []*NodeInfo) (int64, error) { + valueStrings := make([]string, 0, len(nodes)) + valueArgs := make([]interface{}, 0, len(nodes)*3) + + for _, node := range nodes { + valueStrings = append(valueStrings, "(?, ?, ?)") + valueArgs = append(valueArgs, poolId, node.Name, node.IP) + } + + insertSql := fmt.Sprintf("INSERT INTO nodes(pool_id, node_name, node_ip) VALUES %s", + strings.Join(valueStrings, ","), + ) + + result, err := db.Exec(insertSql, valueArgs...) + if err != nil { + log.Printf("Batch insert failed: %v", err) + return 0, err + } + + rows, err := result.RowsAffected() + if err != nil { + log.Printf("Get rows affected failed: %v", err) + return 0, err + } + + return rows, nil +} + +func DeleteResourcePoolById(poolId int64) (int64, error) { + result, err := db.Exec("DELETE FROM resource_pool WHERE id = ?", poolId) + if err != nil { + return 0, fmt.Errorf("delete failed: %w", err) + } + + // 返回影响的行数(0 表示未删除任何数据) + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("get rows affected failed: %w", err) + } + + return rowsAffected, nil +} + +func DeleteNodesByPoolId(poolId int64) (int64, error) { + result, err := db.Exec("DELETE FROM nodes WHERE pool_id = ?", poolId) + if err != nil { + return 0, fmt.Errorf("delete failed: %w", err) + } + + // 返回影响的行数(0 表示未删除任何数据) + rowsAffected, err := result.RowsAffected() + if err != nil { + return 0, fmt.Errorf("get rows affected failed: %w", err) + } + + return rowsAffected, nil +} diff --git a/server/internal/prometheus/constants.go b/server/internal/prometheus/constants.go new file mode 100644 index 0000000..a6412af --- /dev/null +++ b/server/internal/prometheus/constants.go @@ -0,0 +1,8 @@ +package prometheus + +const ( + // GpuMemoryQuery 查询gpu显存大小 + GpuMemoryQuery = "hami_memory_size{node=\"%s\"}" + // NumberOfDiskQuery 查询磁盘数量 + NumberOfDiskQuery = "count(\n node_disk_info{\n instance=\"%s\", \n device=~\"sd[a-z]+|nvme[0-9]+n[0-9]+|vd[a-z]+\"\n }\n)" +) diff --git a/server/internal/server/http.go b/server/internal/server/http.go index 2625c14..5eb777c 100644 --- a/server/internal/server/http.go +++ b/server/internal/server/http.go @@ -20,6 +20,7 @@ func NewHTTPServer(c *conf.Bootstrap, card *service.CardService, ctr *service.ContainerService, monitor *service.MonitorService, + resourcePool *service.ResourcePoolService, exporter *exporter.MetricsGenerator, logger log.Logger) *http.Server { var opts = []http.ServerOption{ @@ -42,6 +43,7 @@ func NewHTTPServer(c *conf.Bootstrap, v1.RegisterCardHTTPServer(srv, card) v1.RegisterContainerHTTPServer(srv, ctr) v1.RegisterMonitorHTTPServer(srv, monitor) + v1.RegisterResourcePoolHTTPServer(srv, resourcePool) srv.HandlePrefix("/q/", openapiv2.NewHandler()) srv.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { exporter.GenerateMetrics(r.Context()) diff --git a/server/internal/service/README.md b/server/internal/service/README.md index 42321b7..7aff82f 100644 --- a/server/internal/service/README.md +++ b/server/internal/service/README.md @@ -1 +1,6 @@ # Service + +```shell +# 生成命令 +kratos proto client api/v1/resource_pool.proto +``` diff --git a/server/internal/service/resource_pool.go b/server/internal/service/resource_pool.go new file mode 100644 index 0000000..fd1d9a0 --- /dev/null +++ b/server/internal/service/resource_pool.go @@ -0,0 +1,279 @@ +package service + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "github.com/gookit/goutil/arrutil" + "github.com/prometheus/common/model" + "log" + "sort" + pb "vgpu/api/v1" + "vgpu/internal/biz" + "vgpu/internal/database" + "vgpu/internal/prometheus" +) + +type ResourcePoolService struct { + pb.UnimplementedCardServer + + uc *biz.NodeUsecase + pod *biz.PodUseCase + summary *biz.SummaryUseCase + ms *MonitorService +} + +func NewResourcePoolService(uc *biz.NodeUsecase, pod *biz.PodUseCase, summary *biz.SummaryUseCase, ms *MonitorService) *ResourcePoolService { + return &ResourcePoolService{uc: uc, pod: pod, summary: summary, ms: ms} +} + +func (s *ResourcePoolService) Create(ctx context.Context, req *pb.ResourcePoolCreateRequest) (*pb.BaseResponse, error) { + log.Println("CreateResourcePool called", req) + poolName := req.PoolName + + if database.ExistsResourcePoolByPoolName(poolName) { + return &pb.BaseResponse{Code: -1, Message: "资源池:'" + poolName + "'已经存在"}, nil + } + + poolId, err := database.InsertResourcePool(poolName) + if err != nil { + return &pb.BaseResponse{Code: -1, Message: poolName + "创建资源池失败"}, nil + } + + nodes := make([]*database.NodeInfo, 0, len(req.Nodes)) + for _, node := range req.Nodes { + nodes = append(nodes, &database.NodeInfo{ + Name: node.NodeName, + IP: node.NodeIp, + }) + } + + rows, err := database.InsertNodes(poolId, nodes) + if err != nil { + return &pb.BaseResponse{Code: -1, Message: poolName + "创建资源池失败"}, nil + } + + log.Println("CreateResourcePool success", poolName, rows) + return &pb.BaseResponse{Code: 1, Message: "成功"}, nil +} + +func (s *ResourcePoolService) Update(ctx context.Context, req *pb.ResourcePoolUpdateRequest) (*pb.BaseResponse, error) { + log.Println("UpdateResourcePool called", req) + poolId := req.PoolId + resourcePool, err := database.QueryResourcePoolById(poolId) + if err != nil { + return &pb.BaseResponse{Code: -1, Message: "更新资源池失败"}, nil + } + + if resourcePool == nil { + return &pb.BaseResponse{Code: -1, Message: "资源池不存在"}, nil + } + + _, err = database.DeleteNodesByPoolId(poolId) + if err != nil { + return &pb.BaseResponse{Code: -1, Message: "更新资源池失败"}, nil + } + + nodes := make([]*database.NodeInfo, 0, len(req.Nodes)) + for _, node := range req.Nodes { + nodes = append(nodes, &database.NodeInfo{ + Name: node.NodeName, + IP: node.NodeIp, + }) + } + _, err = database.InsertNodes(poolId, nodes) + _, err = database.UpdateResourcePool(poolId, req.PoolName) + if err != nil { + return &pb.BaseResponse{Code: -1, Message: "更新资源池失败"}, nil + } + + return &pb.BaseResponse{Code: 1, Message: "成功"}, nil +} + +func (s *ResourcePoolService) Delete(ctx context.Context, req *pb.ResourcePoolDeleteRequest) (*pb.BaseResponse, error) { + log.Println("DeleteResourcePool called", req) + poolId := req.PoolId + num, err := database.DeleteNodesByPoolId(poolId) + if err != nil { + return &pb.BaseResponse{Code: -1, Message: "删除资源池失败"}, nil + } + + log.Println("DeleteNodes success", poolId, num) + num, err = database.DeleteResourcePoolById(poolId) + if err != nil { + return &pb.BaseResponse{Code: -1, Message: "删除资源池失败"}, nil + } + + log.Println("DeleteResourcePool success", poolId, num) + return &pb.BaseResponse{Code: 1, Message: "成功"}, nil +} + +func (s *ResourcePoolService) List(ctx context.Context, req *pb.ResourcePoolListRequest) (*pb.ResourcePoolListResponse, error) { + log.Println("GetResourcePoolList", req) + + resourcePoolList, err := database.QueryResourcePoolListAll() + if err != nil { + return nil, errors.New("获取资源池列表失败") + } + + var data []*pb.ResourcePoolListData + k8sNodes := s.getK8sNodes(ctx) + for _, resourcePool := range resourcePoolList { + var poolData pb.ResourcePoolListData + poolData.PoolId = resourcePool.Id + poolData.PoolName = resourcePool.PoolName + dbNodes, _ := database.QueryNodesByPoolId(resourcePool.Id) + poolData.NodeNum = int64(len(dbNodes)) + for _, n := range dbNodes { + node := k8sNodes[n.NodeName] + if node == nil { + return nil, errors.New("node not found: " + n.NodeName) + } + poolData.CpuCores = poolData.CpuCores + node.CPUCores + poolData.GpuNum = poolData.GpuNum + node.GPUCount + poolData.TotalMemory = poolData.TotalMemory + node.TotalMemory + poolData.AvailableMemory = poolData.AvailableMemory + node.AvailableMemory + poolData.DiskSize = poolData.DiskSize + node.DiskTotal + + } + data = append(data, &poolData) + } + return &pb.ResourcePoolListResponse{Data: data}, nil +} + +func (s *ResourcePoolService) GetDetail(ctx context.Context, req *pb.ResourcePoolDetailRequest) (*pb.ResourcePoolDetailResponse, error) { + log.Println("GetResourcePoolDetail", req) + poolNodes, err := database.QueryNodesByPoolId(req.PoolId) + if err != nil { + return nil, err + } + if len(poolNodes) == 0 { + return &pb.ResourcePoolDetailResponse{}, nil + } + log.Println("GetResourcePoolDetail success", poolNodes) + var res = &pb.ResourcePoolDetailResponse{List: []*pb.PoolNodeReply{}} + nodes, err := s.uc.ListAllNodesV2(ctx) + + for _, poolNode := range poolNodes { + b1, _ := json.MarshalIndent(poolNode, "", " ") + fmt.Println(string(b1)) + node := s.filterNode(poolNode.NodeIp, nodes) + if node == nil { + continue + } + nodeReply, err := s.buildNodeReply(ctx, node) + if err != nil { + return nil, err + } + res.List = append(res.List, nodeReply) + } + + sort.SliceStable(res.List, func(i, j int) bool { + return res.List[i].Name < res.List[j].Name + }) + + return res, nil +} + +func (s *ResourcePoolService) GetAvailableNodes(ctx context.Context, req *pb.AvailableNodesRequest) (*pb.AvailableNodesResponse, error) { + log.Println("GetAvailableNodes", req) + + var data []*pb.AvailableNodesInfo + k8sNodes := s.getK8sNodes(ctx) + for _, node := range k8sNodes { + var info pb.AvailableNodesInfo + if node.Devices != nil { + gpuMemoryQuery := fmt.Sprintf(prometheus.GpuMemoryQuery, node.Name) + info.GpuMemory = s.simpleQuery(ctx, gpuMemoryQuery) * 1024 * 1024 + info.GpuNum = int64(len(node.Devices)) + } else { + info.GpuMemory = 0 + info.GpuNum = 0 + } + info.NodeName = node.Name + info.NodeIp = node.IP + info.TotalMemory = node.TotalMemory + info.CpuCores = node.CPUCores + info.DiskSize = node.DiskTotal + data = append(data, &info) + } + return &pb.AvailableNodesResponse{Data: data}, nil +} + +func (s *ResourcePoolService) getK8sNodes(ctx context.Context) map[string]*biz.Node { + nodes, _ := s.uc.ListAllNodesV2(ctx) + m := make(map[string]*biz.Node) + for _, node := range nodes { + node.GPUCount = int64(len(node.Devices)) + m[node.Name] = node + } + return m +} + +func (s *ResourcePoolService) simpleQuery(ctx context.Context, query string) int64 { + value, err := s.ms.promClient.Query(ctx, query) + if err != nil { + return 0 + } + + vector, ok := value.(model.Vector) + if !ok || len(vector) == 0 { + return 0 + } + + // 提取第一个样本的值(24576) + sampleValue := int64(vector[0].Value) + return sampleValue +} + +func (s *ResourcePoolService) filterNode(nodeIp string, nodes []*biz.Node) *biz.Node { + for _, node := range nodes { + b, _ := json.MarshalIndent(node, "", " ") + fmt.Println(string(b)) + if node.IP == nodeIp { + return node + } + } + + return nil +} + +func (s *ResourcePoolService) buildNodeReply(ctx context.Context, node *biz.Node) (*pb.PoolNodeReply, error) { + nodeReply := &pb.PoolNodeReply{ + Name: node.Name, + Uid: node.Uid, + Ip: node.IP, + IsSchedulable: node.IsSchedulable, + IsReady: node.IsReady, + OsImage: node.OSImage, + OperatingSystem: node.OperatingSystem, + KernelVersion: node.KernelVersion, + ContainerRuntimeVersion: node.ContainerRuntimeVersion, + KubeletVersion: node.KubeletVersion, + KubeProxyVersion: node.KubeProxyVersion, + Architecture: node.Architecture, + CreationTimestamp: node.CreationTimestamp, + CoreTotal: node.CPUCores, + MemoryTotal: node.TotalMemory, + DiskSize: node.DiskTotal, + } + + for _, device := range node.Devices { + nodeReply.Type = append(nodeReply.Type, device.Type) + nodeReply.VgpuTotal += device.Count + nodeReply.CoreTotal += int64(device.Devcore) + nodeReply.MemoryTotal += int64(device.Devmem) + vGPU, core, memory, err := s.pod.StatisticsByDeviceId(ctx, device.AliasId) + if err == nil { + nodeReply.VgpuUsed += vGPU + nodeReply.CoreUsed += core + nodeReply.MemoryUsed += int64(memory) + } + } + + nodeReply.Type = arrutil.Unique(nodeReply.Type) + nodeReply.CardCnt = int32(len(node.Devices)) + + return nodeReply, nil +} diff --git a/server/internal/service/service.go b/server/internal/service/service.go index b16453a..507f428 100644 --- a/server/internal/service/service.go +++ b/server/internal/service/service.go @@ -8,4 +8,5 @@ var ProviderSet = wire.NewSet( NewCardService, NewContainerService, NewMonitorService, + NewResourcePoolService, ) diff --git a/server/openapi.yaml b/server/openapi.yaml index f8f3735..f2e3f23 100644 --- a/server/openapi.yaml +++ b/server/openapi.yaml @@ -3,8 +3,317 @@ openapi: 3.0.3 info: - title: "" + title: ResourcePool API version: 0.0.1 -paths: {} +paths: + /v1/available/nodes: + get: + tags: + - ResourcePool + operationId: ResourcePool_GetAvailableNodes + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/AvailableNodesResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' + /v1/resource/pool/create: + post: + tags: + - ResourcePool + operationId: ResourcePool_Create + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ResourcePoolCreateRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/BaseResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' + /v1/resource/pool/delete: + post: + tags: + - ResourcePool + operationId: ResourcePool_Delete + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ResourcePoolDeleteRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/BaseResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' + /v1/resource/pool/detail: + get: + tags: + - ResourcePool + operationId: ResourcePool_GetDetail + parameters: + - name: poolId + in: query + schema: + type: string + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ResourcePoolDetailResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' + /v1/resource/pool/list: + get: + tags: + - ResourcePool + operationId: ResourcePool_List + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/ResourcePoolListResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' + /v1/resource/pool/update: + post: + tags: + - ResourcePool + operationId: ResourcePool_Update + requestBody: + content: + application/json: + schema: + $ref: '#/components/schemas/ResourcePoolUpdateRequest' + required: true + responses: + "200": + description: OK + content: + application/json: + schema: + $ref: '#/components/schemas/BaseResponse' + default: + description: Default error response + content: + application/json: + schema: + $ref: '#/components/schemas/Status' components: - schemas: {} + schemas: + AvailableNodesInfo: + type: object + properties: + nodeName: + type: string + cpuCores: + type: string + gpuNum: + type: string + gpuMemory: + type: string + totalMemory: + type: string + diskSize: + type: string + nodeIp: + type: string + AvailableNodesResponse: + type: object + properties: + data: + type: array + items: + $ref: '#/components/schemas/AvailableNodesInfo' + BaseResponse: + type: object + properties: + code: + type: integer + format: int32 + message: + type: string + data: + type: object + GoogleProtobufAny: + type: object + properties: + '@type': + type: string + description: The type of the serialized message. + additionalProperties: true + description: Contains an arbitrary serialized message along with a @type that describes the type of the serialized message. + Nodes: + type: object + properties: + nodeIp: + type: string + nodeName: + type: string + PoolNodeReply: + type: object + properties: + ip: + type: string + isSchedulable: + type: boolean + isReady: + type: boolean + type: + type: array + items: + type: string + vgpuUsed: + type: integer + format: int32 + vgpuTotal: + type: integer + format: int32 + coreUsed: + type: integer + format: int32 + coreTotal: + type: string + memoryUsed: + type: string + memoryTotal: + type: string + uid: + type: string + name: + type: string + cardCnt: + type: integer + format: int32 + osImage: + type: string + operatingSystem: + type: string + kernelVersion: + type: string + containerRuntimeVersion: + type: string + kubeletVersion: + type: string + kubeProxyVersion: + type: string + architecture: + type: string + creationTimestamp: + type: string + diskSize: + type: string + ResourcePoolCreateRequest: + type: object + properties: + poolName: + type: string + nodes: + type: array + items: + $ref: '#/components/schemas/Nodes' + ResourcePoolDeleteRequest: + type: object + properties: + poolId: + type: string + ResourcePoolDetailResponse: + type: object + properties: + list: + type: array + items: + $ref: '#/components/schemas/PoolNodeReply' + ResourcePoolListData: + type: object + properties: + poolId: + type: string + poolName: + type: string + cpuCores: + type: string + nodeNum: + type: string + gpuNum: + type: string + availableMemory: + type: string + totalMemory: + type: string + diskSize: + type: string + ResourcePoolListResponse: + type: object + properties: + data: + type: array + items: + $ref: '#/components/schemas/ResourcePoolListData' + ResourcePoolUpdateRequest: + type: object + properties: + poolId: + type: string + poolName: + type: string + nodes: + type: array + items: + $ref: '#/components/schemas/Nodes' + Status: + type: object + properties: + code: + type: integer + description: The status code, which should be an enum value of [google.rpc.Code][google.rpc.Code]. + format: int32 + message: + type: string + description: A developer-facing error message, which should be in English. Any user-facing error message should be localized and sent in the [google.rpc.Status.details][google.rpc.Status.details] field, or localized by the client. + details: + type: array + items: + $ref: '#/components/schemas/GoogleProtobufAny' + description: A list of messages that carry the error details. There is a common set of message types for APIs to use. + description: 'The `Status` type defines a logical error model that is suitable for different programming environments, including REST APIs and RPC APIs. It is used by [gRPC](https://github.com/grpc). Each `Status` message contains three pieces of data: error code, error message, and error details. You can find out more about this error model and how to work with it in the [API Design Guide](https://cloud.google.com/apis/design/errors).' +tags: + - name: ResourcePool