96 lines
2.5 KiB
Go
96 lines
2.5 KiB
Go
package repository
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
|
|
"github.com/company/ai-ops/internal/database"
|
|
"github.com/company/ai-ops/internal/domain/model"
|
|
"github.com/jackc/pgx/v5"
|
|
)
|
|
|
|
// PGMetricRepository 是基于 PostgreSQL 的指标存储实现
|
|
type PGMetricRepository struct{}
|
|
|
|
func NewPGMetricRepository() *PGMetricRepository {
|
|
return &PGMetricRepository{}
|
|
}
|
|
|
|
func (r *PGMetricRepository) GetRealtime(ctx context.Context) (*model.RealtimeMetrics, error) {
|
|
// 从 ai_ops_metrics 表查询各指标的最新值
|
|
queries := map[string]*float64{
|
|
"qps": new(float64),
|
|
"avg_latency": new(float64),
|
|
"p99_latency": new(float64),
|
|
"error_rate": new(float64),
|
|
}
|
|
|
|
for name, ptr := range queries {
|
|
var value float64
|
|
err := database.Pool.QueryRow(ctx, `
|
|
SELECT value FROM ai_ops_metrics
|
|
WHERE metric_name = $1
|
|
ORDER BY recorded_at DESC
|
|
LIMIT 1
|
|
`, name).Scan(&value)
|
|
if err != nil && err != pgx.ErrNoRows {
|
|
return nil, fmt.Errorf("query %s: %w", name, err)
|
|
}
|
|
*ptr = value
|
|
}
|
|
|
|
return &model.RealtimeMetrics{
|
|
QPS: *queries["qps"],
|
|
AvgLatency: *queries["avg_latency"],
|
|
P99Latency: *queries["p99_latency"],
|
|
ErrorRate: *queries["error_rate"],
|
|
}, nil
|
|
}
|
|
|
|
func (r *PGMetricRepository) Query(ctx context.Context, req model.MetricQueryRequest) ([]model.MetricPoint, error) {
|
|
rows, err := database.Pool.Query(ctx, `
|
|
SELECT metric_name, labels, value, recorded_at
|
|
FROM ai_ops_metrics
|
|
WHERE metric_name = $1
|
|
AND recorded_at >= $2
|
|
AND recorded_at <= $3
|
|
ORDER BY recorded_at DESC
|
|
`, req.Name, req.StartTime, req.EndTime)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query metrics: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var points []model.MetricPoint
|
|
for rows.Next() {
|
|
var p model.MetricPoint
|
|
var labels map[string]string
|
|
if err := rows.Scan(&p.Name, &labels, &p.Value, &p.Timestamp); err != nil {
|
|
return nil, fmt.Errorf("scan metric: %w", err)
|
|
}
|
|
p.Source = req.Source
|
|
p.Tags = labels
|
|
points = append(points, p)
|
|
}
|
|
|
|
return points, rows.Err()
|
|
}
|
|
|
|
func (r *PGMetricRepository) GetLatest(ctx context.Context, source, name string) (*model.MetricPoint, error) {
|
|
var p model.MetricPoint
|
|
var labels map[string]string
|
|
err := database.Pool.QueryRow(ctx, `
|
|
SELECT metric_name, labels, value, recorded_at
|
|
FROM ai_ops_metrics
|
|
WHERE metric_name = $1
|
|
ORDER BY recorded_at DESC
|
|
LIMIT 1
|
|
`, name).Scan(&p.Name, &labels, &p.Value, &p.Timestamp)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("query latest metric: %w", err)
|
|
}
|
|
p.Source = source
|
|
p.Tags = labels
|
|
return &p, nil
|
|
}
|