From 6538f8e367eeeb872a64dd7633f081a2a7010d80 Mon Sep 17 00:00:00 2001 From: hippoley Date: Tue, 28 Apr 2026 19:04:20 +0800 Subject: [PATCH] feat: instance usage monitoring with metrics-server integration Add real-time resource usage collection for running instances by polling the Kubernetes metrics-server API. The collector runs as a background goroutine and gracefully degrades to uptime-only recording when the metrics-server is unavailable. New files: - repository/instance_usage_repository.go: CRUD + aggregation queries for the existing instance_usage table (ensureTable as safety net) - services/instance_usage_service.go: business logic with input validation, 24h default / 30-day cap on history queries - services/instance_usage_collector.go: scheduled collector using existing rest.Config (no new dependencies), with sync.Once Stop(), panic recovery, TryLock overlap guard, and configurable interval via USAGE_COLLECT_INTERVAL env var - handlers/instance_usage_handler.go: REST endpoints for current usage, history, and admin summary - services/instance_usage_service_test.go: 11 tests covering CPU/memory quantity parsing, service logic, and edge cases Modified: - cmd/server/main.go: wire repository, service, collector, handler, routes, and graceful shutdown API endpoints: - GET /instances/:id/usage/current (user-scoped) - GET /instances/:id/usage/history?hours=24 (user-scoped) - GET /admin/instances/usage/summary (admin-only) Design decisions: - No new go.mod dependencies: uses clientset.RESTClient().AbsPath() to call metrics-server API directly - GPU and disk metrics left as nil (requires DCGM / kubelet stats API) - CPU reported as percentage of instance allocated cores - Memory reported in GB - Uptime calculated from pod startTime --- backend/cmd/server/main.go | 11 + .../handlers/instance_usage_handler.go | 82 +++++ .../repository/instance_usage_repository.go | 146 ++++++++ .../services/instance_usage_collector.go | 329 ++++++++++++++++++ .../services/instance_usage_service.go | 68 ++++ .../services/instance_usage_service_test.go | 243 +++++++++++++ 6 files changed, 879 insertions(+) create mode 100644 backend/internal/handlers/instance_usage_handler.go create mode 100644 backend/internal/repository/instance_usage_repository.go create mode 100644 backend/internal/services/instance_usage_collector.go create mode 100644 backend/internal/services/instance_usage_service.go create mode 100644 backend/internal/services/instance_usage_service_test.go diff --git a/backend/cmd/server/main.go b/backend/cmd/server/main.go index 0187f7f..95b5b3b 100644 --- a/backend/cmd/server/main.go +++ b/backend/cmd/server/main.go @@ -67,6 +67,7 @@ func main() { instanceConfigRevisionRepo := repository.NewInstanceConfigRevisionRepository(database) skillRepo := repository.NewSkillRepository(database) securityScanRepo := repository.NewSecurityScanRepository(database) + instanceUsageRepo := repository.NewInstanceUsageRepository(database) if repaired, repairErr := services.RepairSeededAdminPassword(userRepo); repairErr != nil { log.Printf("Warning: failed to repair seeded admin password: %v", repairErr) @@ -105,6 +106,7 @@ func main() { skillService := services.NewSkillService(skillRepo, instanceRepo, instanceCommandService, objectStorageService, skillScannerClient) securityScanService := services.NewSecurityScanService(securityScanRepo, skillRepo, objectStorageService, skillScannerClient) aiGatewayService := aigateway.NewService(llmModelRepo, modelInvocationService, auditEventService, costRecordService, riskDetectionService, riskHitService, chatSessionService, chatMessageService) + instanceUsageService := services.NewInstanceUsageService(instanceUsageRepo) // Initialize handlers authHandler := handlers.NewAuthHandler(authService) @@ -121,6 +123,7 @@ func main() { skillHandler := handlers.NewSkillHandler(skillService, instanceService) securityHandler := handlers.NewSecurityHandler(securityScanService) agentHandler := handlers.NewAgentHandler(instanceAgentService, instanceCommandService, instanceRuntimeStatusService, instanceConfigRevisionService, skillService) + instanceUsageHandler := handlers.NewInstanceUsageHandler(instanceUsageService) // Initialize WebSocket hub and handler wsHub := services.GetHub() @@ -130,6 +133,10 @@ func main() { syncService := services.NewSyncService(instanceRepo, instanceRuntimeStatusService) syncService.Start() + // Start instance usage collector + instanceUsageCollector := services.NewInstanceUsageCollector(instanceRepo, instanceUsageRepo) + instanceUsageCollector.Start() + // Setup router r := gin.Default() @@ -202,6 +209,8 @@ func main() { instances.GET("/:id/skills", skillHandler.ListInstanceSkills) instances.POST("/:id/skills", skillHandler.AttachSkillToInstance) instances.DELETE("/:id/skills/:skillId", skillHandler.RemoveSkillFromInstance) + instances.GET("/:id/usage/current", instanceUsageHandler.GetCurrentUsage) + instances.GET("/:id/usage/history", instanceUsageHandler.GetUsageHistory) } // Admin console: cross-user instance listing. Gated by admin @@ -214,6 +223,7 @@ func main() { adminInstances.Use(middleware.NewAdminAuth(userRepo)) { adminInstances.GET("", instanceHandler.ListAllInstances) + adminInstances.GET("/usage/summary", instanceUsageHandler.GetUsageSummary) } openClawConfigs := api.Group("/openclaw-configs") @@ -397,6 +407,7 @@ func main() { // Stop background services syncService.Stop() + instanceUsageCollector.Stop() wsHub.Stop() instanceHandler.Shutdown() diff --git a/backend/internal/handlers/instance_usage_handler.go b/backend/internal/handlers/instance_usage_handler.go new file mode 100644 index 0000000..c85c9b2 --- /dev/null +++ b/backend/internal/handlers/instance_usage_handler.go @@ -0,0 +1,82 @@ +package handlers + +import ( + "net/http" + "strconv" + + "clawreef/internal/services" + "clawreef/internal/utils" + + "github.com/gin-gonic/gin" +) + +// InstanceUsageHandler exposes instance resource usage data via REST. +type InstanceUsageHandler struct { + usageService services.InstanceUsageService +} + +// NewInstanceUsageHandler creates a new handler. +func NewInstanceUsageHandler(usageService services.InstanceUsageService) *InstanceUsageHandler { + return &InstanceUsageHandler{usageService: usageService} +} + +// GetCurrentUsage returns the latest usage snapshot for a single instance. +// GET /instances/:id/usage/current +func (h *InstanceUsageHandler) GetCurrentUsage(c *gin.Context) { + id, err := strconv.Atoi(c.Param("id")) + if err != nil { + utils.Error(c, http.StatusBadRequest, "Invalid instance ID") + return + } + + usage, err := h.usageService.GetCurrentUsage(id) + if err != nil { + utils.HandleError(c, err) + return + } + if usage == nil { + utils.Success(c, http.StatusOK, "No usage data available yet", nil) + return + } + + utils.Success(c, http.StatusOK, "Current usage retrieved", usage) +} + +// GetUsageHistory returns historical usage records for a single instance. +// GET /instances/:id/usage/history?hours=24 +func (h *InstanceUsageHandler) GetUsageHistory(c *gin.Context) { + id, err := strconv.Atoi(c.Param("id")) + if err != nil { + utils.Error(c, http.StatusBadRequest, "Invalid instance ID") + return + } + + hours := 24 + if v := c.Query("hours"); v != "" { + if parsed, err := strconv.Atoi(v); err == nil && parsed > 0 { + hours = parsed + } + } + + records, err := h.usageService.GetHistory(id, hours) + if err != nil { + utils.HandleError(c, err) + return + } + + utils.Success(c, http.StatusOK, "Usage history retrieved", records) +} + +// GetUsageSummary returns the latest usage snapshot for every running instance. +// Admin-only endpoint. +// GET /admin/usage/summary +func (h *InstanceUsageHandler) GetUsageSummary(c *gin.Context) { + records, err := h.usageService.GetAllCurrentUsage() + if err != nil { + utils.HandleError(c, err) + return + } + + utils.Success(c, http.StatusOK, "Usage summary retrieved", records) +} + diff --git a/backend/internal/repository/instance_usage_repository.go b/backend/internal/repository/instance_usage_repository.go new file mode 100644 index 0000000..42eaf92 --- /dev/null +++ b/backend/internal/repository/instance_usage_repository.go @@ -0,0 +1,146 @@ +package repository + +import ( + "fmt" + "time" + + "clawreef/internal/models" + + "github.com/upper/db/v4" +) + +// InstanceUsageRepository defines repository operations for instance resource +// usage records. +type InstanceUsageRepository interface { + Create(record *models.InstanceUsage) error + GetLatestByInstanceID(instanceID int) (*models.InstanceUsage, error) + ListByInstanceID(instanceID int, since time.Time, limit int) ([]models.InstanceUsage, error) + ListLatestPerInstance() ([]models.InstanceUsage, error) + DeleteOlderThan(cutoff time.Time) (int64, error) +} + +type instanceUsageRepository struct { + sess db.Session +} + +// NewInstanceUsageRepository creates a new instance usage repository and +// ensures its table exists. The table is created by the init migration, so +// ensureTable is a no-op safety net. +func NewInstanceUsageRepository(sess db.Session) InstanceUsageRepository { + repo := &instanceUsageRepository{sess: sess} + repo.ensureTable() + return repo +} + +func (r *instanceUsageRepository) ensureTable() { + const query = ` +CREATE TABLE IF NOT EXISTS instance_usage ( + id INT AUTO_INCREMENT PRIMARY KEY, + instance_id INT NOT NULL, + cpu_usage_percent DECIMAL(5,2), + memory_usage_gb DECIMAL(10,2), + disk_usage_gb DECIMAL(10,2), + gpu_usage_percent DECIMAL(5,2), + uptime_seconds INT, + recorded_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP, + FOREIGN KEY (instance_id) REFERENCES instances(id) ON DELETE CASCADE, + INDEX idx_instance_recorded (instance_id, recorded_at) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; +` + if _, err := r.sess.SQL().Exec(query); err != nil { + panic(fmt.Errorf("failed to ensure instance_usage table: %w", err)) + } +} + +func (r *instanceUsageRepository) Create(record *models.InstanceUsage) error { + if record.RecordedAt.IsZero() { + record.RecordedAt = time.Now() + } + res, err := r.sess.Collection("instance_usage").Insert(record) + if err != nil { + return fmt.Errorf("failed to create instance usage record: %w", err) + } + record.ID = int(res.ID().(int64)) + return nil +} + +func (r *instanceUsageRepository) GetLatestByInstanceID(instanceID int) (*models.InstanceUsage, error) { + var item models.InstanceUsage + err := r.sess.Collection("instance_usage"). + Find(db.Cond{"instance_id": instanceID}). + OrderBy("-recorded_at"). + Limit(1). + One(&item) + if err != nil { + if err == db.ErrNoMoreRows { + return nil, nil + } + return nil, fmt.Errorf("failed to get latest usage for instance %d: %w", instanceID, err) + } + return &item, nil +} + +func (r *instanceUsageRepository) ListByInstanceID(instanceID int, since time.Time, limit int) ([]models.InstanceUsage, error) { + if limit <= 0 { + limit = 500 + } + var items []models.InstanceUsage + err := r.sess.Collection("instance_usage"). + Find(db.And( + db.Cond{"instance_id": instanceID}, + db.Cond{"recorded_at >=": since}, + )). + OrderBy("-recorded_at"). + Limit(limit). + All(&items) + if err != nil { + return nil, fmt.Errorf("failed to list usage for instance %d: %w", instanceID, err) + } + return items, nil +} + +// ListLatestPerInstance returns the most recent usage record for every +// instance that has at least one record. Uses a correlated subquery to +// pick the row with the maximum recorded_at per instance_id. +func (r *instanceUsageRepository) ListLatestPerInstance() ([]models.InstanceUsage, error) { + var items []models.InstanceUsage + rows, err := r.sess.SQL().Query(` +SELECT u.id, u.instance_id, u.cpu_usage_percent, u.memory_usage_gb, + u.disk_usage_gb, u.gpu_usage_percent, u.uptime_seconds, u.recorded_at +FROM instance_usage u +INNER JOIN ( + SELECT instance_id, MAX(recorded_at) AS max_recorded + FROM instance_usage + GROUP BY instance_id +) latest ON u.instance_id = latest.instance_id AND u.recorded_at = latest.max_recorded +ORDER BY u.instance_id +`) + if err != nil { + return nil, fmt.Errorf("failed to list latest usage per instance: %w", err) + } + defer rows.Close() + + for rows.Next() { + var item models.InstanceUsage + if err := rows.Scan( + &item.ID, &item.InstanceID, &item.CPUUsagePercent, &item.MemoryUsageGB, + &item.DiskUsageGB, &item.GPUUsagePercent, &item.UptimeSeconds, &item.RecordedAt, + ); err != nil { + return nil, fmt.Errorf("failed to scan usage row: %w", err) + } + items = append(items, item) + } + return items, nil +} + +func (r *instanceUsageRepository) DeleteOlderThan(cutoff time.Time) (int64, error) { + result, err := r.sess.SQL().Exec( + "DELETE FROM instance_usage WHERE recorded_at < ?", cutoff, + ) + if err != nil { + return 0, fmt.Errorf("failed to delete expired usage records: %w", err) + } + affected, _ := result.RowsAffected() + return affected, nil +} + diff --git a/backend/internal/services/instance_usage_collector.go b/backend/internal/services/instance_usage_collector.go new file mode 100644 index 0000000..835c7ab --- /dev/null +++ b/backend/internal/services/instance_usage_collector.go @@ -0,0 +1,329 @@ +package services + +import ( + "context" + "encoding/json" + "fmt" + "io" + "log" + "math" + "os" + "strconv" + "strings" + "sync" + "time" + + "clawreef/internal/models" + "clawreef/internal/repository" + "clawreef/internal/services/k8s" +) + +// InstanceUsageCollector periodically polls K8s metrics-server for running +// instances and records resource usage snapshots into the database. +type InstanceUsageCollector struct { + instanceRepo repository.InstanceRepository + usageRepo repository.InstanceUsageRepository + client *k8s.Client + podService *k8s.PodService + interval time.Duration + stopChan chan struct{} + stopOnce sync.Once + collecting sync.Mutex +} + +// NewInstanceUsageCollector creates a new collector. The collection interval +// defaults to 60 seconds and can be overridden with the USAGE_COLLECT_INTERVAL +// environment variable (in seconds). +func NewInstanceUsageCollector( + instanceRepo repository.InstanceRepository, + usageRepo repository.InstanceUsageRepository, +) *InstanceUsageCollector { + interval := 60 * time.Second + if v := os.Getenv("USAGE_COLLECT_INTERVAL"); v != "" { + if secs, err := strconv.Atoi(v); err == nil && secs > 0 { + interval = time.Duration(secs) * time.Second + } + } + return &InstanceUsageCollector{ + instanceRepo: instanceRepo, + usageRepo: usageRepo, + client: k8s.GetClient(), + podService: k8s.NewPodService(), + interval: interval, + stopChan: make(chan struct{}), + } +} + +// Start launches the collection loop in a background goroutine. +func (c *InstanceUsageCollector) Start() { + log.Printf("[UsageCollector] Starting with interval %v", c.interval) + go c.loop() +} + +// Stop signals the collector to shut down. Safe to call multiple times. +func (c *InstanceUsageCollector) Stop() { + c.stopOnce.Do(func() { + log.Println("[UsageCollector] Stopping...") + close(c.stopChan) + }) +} + +func (c *InstanceUsageCollector) loop() { + ticker := time.NewTicker(c.interval) + defer ticker.Stop() + + // Run once immediately on start. + c.safeTick() + + for { + select { + case <-ticker.C: + c.safeTick() + case <-c.stopChan: + log.Println("[UsageCollector] Stopped") + return + } + } +} + +// safeTick wraps tick with panic recovery and overlap guard. +func (c *InstanceUsageCollector) safeTick() { + if !c.collecting.TryLock() { + log.Println("[UsageCollector] Previous collection still running, skipping tick") + return + } + defer c.collecting.Unlock() + + defer func() { + if r := recover(); r != nil { + log.Printf("[UsageCollector] Recovered from panic: %v", r) + } + }() + + c.collectAll() +} + +func (c *InstanceUsageCollector) collectAll() { + if c.client == nil || c.client.Clientset == nil { + log.Println("[UsageCollector] K8s client not initialized, skipping collection") + return + } + + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + instances, err := c.instanceRepo.GetAllRunning() + if err != nil { + log.Printf("[UsageCollector] Failed to list running instances: %v", err) + return + } + if len(instances) == 0 { + return + } + + metricsAvailable := c.isMetricsServerAvailable(ctx) + if !metricsAvailable { + log.Println("[UsageCollector] metrics-server not available, recording uptime only") + } + + now := time.Now() + recorded := 0 + for _, inst := range instances { + record := c.collectInstance(ctx, &inst, now, metricsAvailable) + if record != nil { + if err := c.usageRepo.Create(record); err != nil { + log.Printf("[UsageCollector] Failed to record usage for instance %d: %v", inst.ID, err) + } else { + recorded++ + } + } + } + + if recorded > 0 { + log.Printf("[UsageCollector] Recorded usage for %d/%d instances", recorded, len(instances)) + } +} + +// collectInstance gathers resource usage for a single instance. Returns nil if +// the pod cannot be found (instance may have just stopped). +func (c *InstanceUsageCollector) collectInstance( + ctx context.Context, + instance *models.Instance, + now time.Time, + metricsAvailable bool, +) *models.InstanceUsage { + pod, err := c.podService.GetPod(ctx, instance.UserID, instance.ID) + if err != nil { + // Pod not found — instance may have just stopped, skip silently. + return nil + } + + record := &models.InstanceUsage{ + InstanceID: instance.ID, + RecordedAt: now, + } + + // Calculate uptime from pod start time. + if pod.Status.StartTime != nil { + uptime := int(now.Sub(pod.Status.StartTime.Time).Seconds()) + if uptime < 0 { + uptime = 0 + } + record.UptimeSeconds = &uptime + } + + // Fetch CPU and memory from metrics-server if available. + if metricsAvailable { + namespace := c.client.GetNamespace(instance.UserID) + metrics, err := c.fetchPodMetrics(ctx, namespace, pod.Name) + if err != nil { + // Non-fatal: we still have uptime. + log.Printf("[UsageCollector] Failed to fetch metrics for pod %s: %v", pod.Name, err) + return record + } + + // Sum CPU and memory across all containers in the pod. + var totalCPUMillicores int64 + var totalMemoryBytes int64 + for _, container := range metrics.Containers { + totalCPUMillicores += parseCPUToMillicores(container.Usage.CPU) + totalMemoryBytes += parseMemoryToBytes(container.Usage.Memory) + } + + // CPU usage as percentage of instance's allocated cores. + if instance.CPUCores > 0 { + cpuPercent := (float64(totalCPUMillicores) / 1000.0 / instance.CPUCores) * 100.0 + cpuPercent = math.Round(cpuPercent*100) / 100 // round to 2 decimals + record.CPUUsagePercent = &cpuPercent + } + + // Memory usage in GB. + memGB := float64(totalMemoryBytes) / (1024 * 1024 * 1024) + memGB = math.Round(memGB*100) / 100 + record.MemoryUsageGB = &memGB + } + + return record +} + +// isMetricsServerAvailable checks whether the metrics-server API is reachable. +func (c *InstanceUsageCollector) isMetricsServerAvailable(ctx context.Context) bool { + body, err := c.client.Clientset.RESTClient().Get(). + AbsPath("/apis/metrics.k8s.io/v1beta1"). + DoRaw(ctx) + if err != nil { + return false + } + // A successful response (even empty resource list) means the API is up. + return len(body) > 0 +} + +// podMetricsResponse mirrors the metrics-server PodMetrics JSON structure. +// We define it locally to avoid adding k8s.io/metrics as a dependency. +type podMetricsResponse struct { + Containers []containerMetrics `json:"containers"` +} + +type containerMetrics struct { + Name string `json:"name"` + Usage resourceUsage `json:"usage"` +} + +type resourceUsage struct { + CPU string `json:"cpu"` + Memory string `json:"memory"` +} + +// fetchPodMetrics calls the metrics-server API for a specific pod. +func (c *InstanceUsageCollector) fetchPodMetrics(ctx context.Context, namespace, podName string) (*podMetricsResponse, error) { + path := fmt.Sprintf("/apis/metrics.k8s.io/v1beta1/namespaces/%s/pods/%s", namespace, podName) + resp, err := c.client.Clientset.RESTClient().Get(). + AbsPath(path). + Stream(ctx) + if err != nil { + return nil, fmt.Errorf("metrics request failed: %w", err) + } + defer resp.Close() + + data, err := io.ReadAll(resp) + if err != nil { + return nil, fmt.Errorf("failed to read metrics response: %w", err) + } + + var metrics podMetricsResponse + if err := json.Unmarshal(data, &metrics); err != nil { + return nil, fmt.Errorf("failed to parse metrics response: %w", err) + } + return &metrics, nil +} + +// parseCPUToMillicores converts a K8s CPU quantity string to millicores. +// Examples: "100m" → 100, "2" → 2000, "1500n" → 1 (rounded). +func parseCPUToMillicores(cpu string) int64 { + cpu = strings.TrimSpace(cpu) + if cpu == "" { + return 0 + } + if strings.HasSuffix(cpu, "n") { + // Nanocores → millicores + val, err := strconv.ParseInt(strings.TrimSuffix(cpu, "n"), 10, 64) + if err != nil { + return 0 + } + return val / 1_000_000 + } + if strings.HasSuffix(cpu, "m") { + val, err := strconv.ParseInt(strings.TrimSuffix(cpu, "m"), 10, 64) + if err != nil { + return 0 + } + return val + } + // Plain number = whole cores + val, err := strconv.ParseFloat(cpu, 64) + if err != nil { + return 0 + } + return int64(val * 1000) +} + +// parseMemoryToBytes converts a K8s memory quantity string to bytes. +// Examples: "128Mi" → 134217728, "1Gi" → 1073741824, "1000Ki" → 1024000. +func parseMemoryToBytes(mem string) int64 { + mem = strings.TrimSpace(mem) + if mem == "" { + return 0 + } + + suffixes := []struct { + suffix string + multiplier int64 + }{ + {"Ti", 1024 * 1024 * 1024 * 1024}, + {"Gi", 1024 * 1024 * 1024}, + {"Mi", 1024 * 1024}, + {"Ki", 1024}, + {"T", 1000_000_000_000}, + {"G", 1000_000_000}, + {"M", 1000_000}, + {"K", 1000}, + } + + for _, s := range suffixes { + if strings.HasSuffix(mem, s.suffix) { + val, err := strconv.ParseInt(strings.TrimSuffix(mem, s.suffix), 10, 64) + if err != nil { + return 0 + } + return val * s.multiplier + } + } + + // Plain bytes + val, err := strconv.ParseInt(mem, 10, 64) + if err != nil { + return 0 + } + return val +} + diff --git a/backend/internal/services/instance_usage_service.go b/backend/internal/services/instance_usage_service.go new file mode 100644 index 0000000..e70b01a --- /dev/null +++ b/backend/internal/services/instance_usage_service.go @@ -0,0 +1,68 @@ +package services + +import ( + "fmt" + "time" + + "clawreef/internal/models" + "clawreef/internal/repository" +) + +// InstanceUsageService defines application-level operations for instance +// resource usage monitoring. +type InstanceUsageService interface { + // GetCurrentUsage returns the most recent usage snapshot for an instance. + GetCurrentUsage(instanceID int) (*models.InstanceUsage, error) + // GetHistory returns usage records for an instance within a time window. + GetHistory(instanceID int, hours int) ([]models.InstanceUsage, error) + // GetAllCurrentUsage returns the latest usage snapshot for every instance + // that has at least one record (admin view). + GetAllCurrentUsage() ([]models.InstanceUsage, error) + // RecordUsage persists a single usage snapshot. + RecordUsage(record *models.InstanceUsage) error +} + +type instanceUsageService struct { + repo repository.InstanceUsageRepository +} + +// NewInstanceUsageService creates a new instance usage service. +func NewInstanceUsageService(repo repository.InstanceUsageRepository) InstanceUsageService { + return &instanceUsageService{repo: repo} +} + +func (s *instanceUsageService) GetCurrentUsage(instanceID int) (*models.InstanceUsage, error) { + if instanceID <= 0 { + return nil, fmt.Errorf("invalid instance ID") + } + return s.repo.GetLatestByInstanceID(instanceID) +} + +func (s *instanceUsageService) GetHistory(instanceID int, hours int) ([]models.InstanceUsage, error) { + if instanceID <= 0 { + return nil, fmt.Errorf("invalid instance ID") + } + if hours <= 0 { + hours = 24 + } + if hours > 720 { // cap at 30 days + hours = 720 + } + since := time.Now().Add(-time.Duration(hours) * time.Hour) + return s.repo.ListByInstanceID(instanceID, since, 0) +} + +func (s *instanceUsageService) GetAllCurrentUsage() ([]models.InstanceUsage, error) { + return s.repo.ListLatestPerInstance() +} + +func (s *instanceUsageService) RecordUsage(record *models.InstanceUsage) error { + if record == nil { + return fmt.Errorf("usage record is required") + } + if record.InstanceID <= 0 { + return fmt.Errorf("instance ID is required") + } + return s.repo.Create(record) +} + diff --git a/backend/internal/services/instance_usage_service_test.go b/backend/internal/services/instance_usage_service_test.go new file mode 100644 index 0000000..22e4e15 --- /dev/null +++ b/backend/internal/services/instance_usage_service_test.go @@ -0,0 +1,243 @@ +package services + +import ( + "fmt" + "testing" + "time" + + "clawreef/internal/models" +) + +// --------------------------------------------------------------------------- +// Stub repository +// --------------------------------------------------------------------------- + +type stubInstanceUsageRepository struct { + records []models.InstanceUsage + created []models.InstanceUsage +} + +func (r *stubInstanceUsageRepository) Create(record *models.InstanceUsage) error { + record.ID = len(r.created) + 1 + r.created = append(r.created, *record) + return nil +} + +func (r *stubInstanceUsageRepository) GetLatestByInstanceID(instanceID int) (*models.InstanceUsage, error) { + for i := len(r.records) - 1; i >= 0; i-- { + if r.records[i].InstanceID == instanceID { + return &r.records[i], nil + } + } + return nil, nil +} + +func (r *stubInstanceUsageRepository) ListByInstanceID(instanceID int, since time.Time, limit int) ([]models.InstanceUsage, error) { + var out []models.InstanceUsage + for _, rec := range r.records { + if rec.InstanceID == instanceID && !rec.RecordedAt.Before(since) { + out = append(out, rec) + } + } + return out, nil +} + +func (r *stubInstanceUsageRepository) ListLatestPerInstance() ([]models.InstanceUsage, error) { + latest := map[int]models.InstanceUsage{} + for _, rec := range r.records { + if existing, ok := latest[rec.InstanceID]; !ok || rec.RecordedAt.After(existing.RecordedAt) { + latest[rec.InstanceID] = rec + } + } + var out []models.InstanceUsage + for _, v := range latest { + out = append(out, v) + } + return out, nil +} + +func (r *stubInstanceUsageRepository) DeleteOlderThan(cutoff time.Time) (int64, error) { + return 0, nil +} + +// --------------------------------------------------------------------------- +// parseCPUToMillicores tests +// --------------------------------------------------------------------------- + +func TestParseCPUToMillicores(t *testing.T) { + cases := []struct { + input string + want int64 + }{ + {"100m", 100}, + {"250m", 250}, + {"1", 1000}, + {"2", 2000}, + {"0.5", 500}, + {"1500000000n", 1500}, + {"500000000n", 500}, + {"", 0}, + {"garbage", 0}, + } + for _, tc := range cases { + t.Run(fmt.Sprintf("cpu=%q", tc.input), func(t *testing.T) { + got := parseCPUToMillicores(tc.input) + if got != tc.want { + t.Errorf("parseCPUToMillicores(%q) = %d, want %d", tc.input, got, tc.want) + } + }) + } +} + +// --------------------------------------------------------------------------- +// parseMemoryToBytes tests +// --------------------------------------------------------------------------- + +func TestParseMemoryToBytes(t *testing.T) { + cases := []struct { + input string + want int64 + }{ + {"128Mi", 128 * 1024 * 1024}, + {"1Gi", 1024 * 1024 * 1024}, + {"256Ki", 256 * 1024}, + {"2Ti", 2 * 1024 * 1024 * 1024 * 1024}, + {"1000M", 1000_000_000}, + {"500K", 500_000}, + {"1048576", 1048576}, + {"", 0}, + {"garbage", 0}, + } + for _, tc := range cases { + t.Run(fmt.Sprintf("mem=%q", tc.input), func(t *testing.T) { + got := parseMemoryToBytes(tc.input) + if got != tc.want { + t.Errorf("parseMemoryToBytes(%q) = %d, want %d", tc.input, got, tc.want) + } + }) + } +} + +// --------------------------------------------------------------------------- +// InstanceUsageService tests +// --------------------------------------------------------------------------- + +func TestGetCurrentUsage_ReturnsLatest(t *testing.T) { + cpu := 45.0 + repo := &stubInstanceUsageRepository{ + records: []models.InstanceUsage{ + {ID: 1, InstanceID: 10, CPUUsagePercent: &cpu, RecordedAt: time.Now().Add(-time.Hour)}, + }, + } + svc := NewInstanceUsageService(repo) + + usage, err := svc.GetCurrentUsage(10) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if usage == nil || usage.ID != 1 { + t.Fatalf("expected record ID 1, got %v", usage) + } +} + +func TestGetCurrentUsage_InvalidID(t *testing.T) { + svc := NewInstanceUsageService(&stubInstanceUsageRepository{}) + _, err := svc.GetCurrentUsage(0) + if err == nil { + t.Fatal("expected error for invalid ID") + } +} + +func TestGetCurrentUsage_NoData(t *testing.T) { + svc := NewInstanceUsageService(&stubInstanceUsageRepository{}) + usage, err := svc.GetCurrentUsage(99) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if usage != nil { + t.Fatalf("expected nil for missing data, got %v", usage) + } +} + +func TestGetHistory_DefaultsTo24Hours(t *testing.T) { + now := time.Now() + cpu := 10.0 + repo := &stubInstanceUsageRepository{ + records: []models.InstanceUsage{ + {ID: 1, InstanceID: 5, CPUUsagePercent: &cpu, RecordedAt: now.Add(-12 * time.Hour)}, + {ID: 2, InstanceID: 5, CPUUsagePercent: &cpu, RecordedAt: now.Add(-48 * time.Hour)}, + }, + } + svc := NewInstanceUsageService(repo) + + records, err := svc.GetHistory(5, 0) // 0 → defaults to 24h + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(records) != 1 { + t.Fatalf("expected 1 record within 24h, got %d", len(records)) + } +} + +func TestGetHistory_CapsAt720Hours(t *testing.T) { + svc := NewInstanceUsageService(&stubInstanceUsageRepository{}) + // Should not error even with huge hours value + _, err := svc.GetHistory(1, 9999) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } +} + +func TestGetAllCurrentUsage_MultipleInstances(t *testing.T) { + now := time.Now() + cpu1, cpu2 := 30.0, 60.0 + repo := &stubInstanceUsageRepository{ + records: []models.InstanceUsage{ + {ID: 1, InstanceID: 1, CPUUsagePercent: &cpu1, RecordedAt: now.Add(-2 * time.Hour)}, + {ID: 2, InstanceID: 1, CPUUsagePercent: &cpu2, RecordedAt: now}, + {ID: 3, InstanceID: 2, CPUUsagePercent: &cpu1, RecordedAt: now}, + }, + } + svc := NewInstanceUsageService(repo) + + records, err := svc.GetAllCurrentUsage() + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(records) != 2 { + t.Fatalf("expected 2 latest records (one per instance), got %d", len(records)) + } +} + +func TestRecordUsage_ValidRecord(t *testing.T) { + repo := &stubInstanceUsageRepository{} + svc := NewInstanceUsageService(repo) + + cpu := 50.0 + err := svc.RecordUsage(&models.InstanceUsage{ + InstanceID: 1, + CPUUsagePercent: &cpu, + }) + if err != nil { + t.Fatalf("unexpected error: %v", err) + } + if len(repo.created) != 1 { + t.Fatalf("expected 1 created record, got %d", len(repo.created)) + } +} + +func TestRecordUsage_NilRecord(t *testing.T) { + svc := NewInstanceUsageService(&stubInstanceUsageRepository{}) + if err := svc.RecordUsage(nil); err == nil { + t.Fatal("expected error for nil record") + } +} + +func TestRecordUsage_InvalidInstanceID(t *testing.T) { + svc := NewInstanceUsageService(&stubInstanceUsageRepository{}) + if err := svc.RecordUsage(&models.InstanceUsage{InstanceID: 0}); err == nil { + t.Fatal("expected error for zero instance ID") + } +} + +