You can not select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

121 lines
3.6 KiB

package service
import (
"context"
"math"
"time"
pb "vgpu/api/v1"
"vgpu/internal/biz"
"vgpu/internal/data/prom"
"github.com/jinzhu/copier"
v1 "github.com/prometheus/client_golang/api/prometheus/v1"
"github.com/prometheus/common/model"
)
type MonitorService struct {
promClient *prom.Client
nodeUsecase *biz.NodeUsecase
podUsecase *biz.PodUseCase
pb.UnimplementedMonitorServer
}
func NewMonitorService(
promClient *prom.Client,
nodeUsecase *biz.NodeUsecase,
podUsecase *biz.PodUseCase,
) *MonitorService {
return &MonitorService{
promClient: promClient,
nodeUsecase: nodeUsecase,
podUsecase: podUsecase,
}
}
func (s *MonitorService) QueryRange(ctx context.Context, req *pb.QueryRangeRequest) (*pb.RangeResponse, error) {
startTime, err := time.ParseInLocation(time.DateTime, req.Range.GetStart(), time.Local)
if err != nil {
return nil, pb.ErrorTransformError(err.Error())
}
endTime, err := time.ParseInLocation(time.DateTime, req.Range.GetEnd(), time.Local)
if err != nil {
return nil, pb.ErrorTransformError(err.Error())
}
step, err := time.ParseDuration(req.Range.GetStep())
if err != nil {
return nil, pb.ErrorTransformError(err.Error())
}
value, err := s.promClient.QueryRange(ctx, req.GetQuery(), v1.Range{Start: startTime, End: endTime, Step: step})
if err != nil {
return nil, pb.ErrorVgpuDomainError(err.Error())
}
matrixValue, ok := value.(model.Matrix)
if !ok {
return nil, pb.ErrorTransformError("Error casting result to model.Matrix")
}
var res = &pb.RangeResponse{}
copier.Copy(&res.Data, &matrixValue)
for _, sample := range res.Data {
sample.Values = fillLessSamplePoint(startTime, endTime, step, sample.Values)
}
return res, nil
}
func fillLessSamplePoint(startTime, endTime time.Time, step time.Duration, values []*pb.SamplePair) []*pb.SamplePair {
existingPoints := make(map[int64]float32)
for _, pair := range values {
existingPoints[pair.Timestamp] = pair.Value
}
var filledValues []*pb.SamplePair
currentTime := getSamplePointStartTime(startTime, step, values)
for !currentTime.After(endTime) {
currentTimestamp := currentTime.UnixMilli()
if value, exists := existingPoints[currentTimestamp]; exists {
filledValues = append(filledValues, &pb.SamplePair{Value: value, Timestamp: currentTimestamp})
} else {
filledValues = append(filledValues, &pb.SamplePair{Value: 0, Timestamp: currentTimestamp})
}
currentTime = currentTime.Add(step)
}
return filledValues
}
// Compatible with both Prometheus and VictoriaMetrics
func getSamplePointStartTime(startTime time.Time, step time.Duration, values []*pb.SamplePair) time.Time {
if len(values) == 0 {
return startTime
}
startTimeMilli := startTime.UnixMilli()
firstValue := values[0]
// Case: Prometheus
if firstValue.Timestamp == startTimeMilli {
return startTime
}
// Case: VictoriaMetrics
stepMilli := step.Milliseconds()
// exists startTime data point
if math.Abs(float64(firstValue.Timestamp-startTimeMilli)) <= float64(stepMilli) {
return time.UnixMilli(firstValue.Timestamp)
}
// data points within the query time range
stepCount := (firstValue.Timestamp - startTimeMilli) / stepMilli
return time.UnixMilli(firstValue.Timestamp - stepCount*stepMilli)
}
func (s *MonitorService) QueryInstant(ctx context.Context, req *pb.QueryInstantRequest) (*pb.InstantResponse, error) {
value, err := s.promClient.Query(ctx, req.GetQuery())
if err != nil {
return nil, pb.ErrorVgpuDomainError(err.Error())
}
vectorValue, ok := value.(model.Vector)
if !ok {
return nil, pb.ErrorTransformError("Error casting result to model.Vector")
}
var res = &pb.InstantResponse{}
copier.Copy(&res.Data, &vectorValue)
return res, nil
}