package examples import ( "context" "crypto/ed25519" "crypto/rand" "encoding/json" "fmt" "log/slog" "net/http" "os" "os/signal" "runtime" "sync" "syscall" "time" "github.com/chorus-services/backbeat/pkg/sdk" ) // ServiceHealth represents the health status of a monitored service type ServiceHealth struct { ServiceName string `json:"service_name"` Status string `json:"status"` // healthy, degraded, unhealthy LastCheck time.Time `json:"last_check"` ResponseTime time.Duration `json:"response_time"` ErrorCount int `json:"error_count"` Uptime time.Duration `json:"uptime"` } // SystemMetrics represents system-level metrics type SystemMetrics struct { CPUPercent float64 `json:"cpu_percent"` MemoryPercent float64 `json:"memory_percent"` GoroutineCount int `json:"goroutine_count"` HeapSizeMB float64 `json:"heap_size_mb"` } // ServiceMonitor demonstrates health monitoring with beat-aligned reporting // This example shows how to integrate BACKBEAT with service monitoring func ServiceMonitor() { // Generate a signing key for this example _, signingKey, err := ed25519.GenerateKey(rand.Reader) if err != nil { slog.Error("Failed to generate signing key", "error", err) return } // Create SDK configuration config := sdk.DefaultConfig() config.ClusterID = "chorus-dev" config.AgentID = "service-monitor" config.NATSUrl = "nats://localhost:4222" config.SigningKey = signingKey config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelInfo, })) // Create BACKBEAT client client := sdk.NewClient(config) // Services to monitor (example endpoints) monitoredServices := map[string]string{ "pulse-service": "http://localhost:8080/health", "reverb-service": "http://localhost:8081/health", "nats-server": "http://localhost:8222/varz", // NATS monitoring endpoint } // Health tracking var ( healthStatus = make(map[string]*ServiceHealth) healthMutex sync.RWMutex startTime = time.Now() ) // Initialize health status for serviceName := range monitoredServices { healthStatus[serviceName] = &ServiceHealth{ ServiceName: serviceName, Status: "unknown", LastCheck: time.Time{}, } } // Register beat callback for frequent health checks client.OnBeat(func(beat sdk.BeatFrame) { // Perform health checks every 4 beats (reduce frequency) if beat.BeatIndex%4 == 0 { performHealthChecks(monitoredServices, healthStatus, &healthMutex) } // Emit status claim with current health summary if beat.BeatIndex%2 == 0 { healthSummary := generateHealthSummary(healthStatus, &healthMutex) systemMetrics := collectSystemMetrics() state := determineOverallState(healthSummary) notes := fmt.Sprintf("Services: %s | CPU: %.1f%% | Mem: %.1f%% | Goroutines: %d", formatHealthSummary(healthSummary), systemMetrics.CPUPercent, systemMetrics.MemoryPercent, systemMetrics.GoroutineCount) err := client.EmitStatusClaim(sdk.StatusClaim{ State: state, BeatsLeft: 0, // Monitoring is continuous Progress: calculateHealthScore(healthSummary), Notes: notes, }) if err != nil { slog.Error("Failed to emit status claim", "error", err) } } }) // Register downbeat callback for detailed reporting client.OnDownbeat(func(beat sdk.BeatFrame) { healthMutex.RLock() healthData, _ := json.MarshalIndent(healthStatus, "", " ") healthMutex.RUnlock() systemMetrics := collectSystemMetrics() uptime := time.Since(startTime) slog.Info("Service health report", "beat_index", beat.BeatIndex, "window_id", beat.WindowID, "uptime", uptime.String(), "cpu_percent", systemMetrics.CPUPercent, "memory_percent", systemMetrics.MemoryPercent, "heap_mb", systemMetrics.HeapSizeMB, "goroutines", systemMetrics.GoroutineCount, ) // Log health details slog.Debug("Detailed health status", "health_data", string(healthData)) // Emit comprehensive status for the bar healthSummary := generateHealthSummary(healthStatus, &healthMutex) err := client.EmitStatusClaim(sdk.StatusClaim{ State: "review", // Downbeat is review time BeatsLeft: 0, Progress: calculateHealthScore(healthSummary), Notes: fmt.Sprintf("Bar %d health review: %s", beat.BeatIndex/4, formatDetailedHealth(healthSummary, systemMetrics)), }) if err != nil { slog.Error("Failed to emit downbeat status", "error", err) } }) // Setup graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Handle shutdown signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan slog.Info("Shutdown signal received") cancel() }() // Start the client if err := client.Start(ctx); err != nil { slog.Error("Failed to start BACKBEAT client", "error", err) return } defer client.Stop() slog.Info("Service monitor started - use Ctrl+C to stop", "monitored_services", len(monitoredServices)) // Expose metrics endpoint go func() { http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) { healthMutex.RLock() data := make(map[string]interface{}) data["health"] = healthStatus data["system"] = collectSystemMetrics() data["backbeat"] = client.Health() healthMutex.RUnlock() w.Header().Set("Content-Type", "application/json") json.NewEncoder(w).Encode(data) }) slog.Info("Metrics endpoint available", "url", "http://localhost:9090/metrics") if err := http.ListenAndServe(":9090", nil); err != nil { slog.Error("Metrics server failed", "error", err) } }() // Wait for shutdown <-ctx.Done() slog.Info("Service monitor shutting down") } // performHealthChecks checks the health of all monitored services func performHealthChecks(services map[string]string, healthStatus map[string]*ServiceHealth, mutex *sync.RWMutex) { for serviceName, endpoint := range services { go func(name, url string) { start := time.Now() client := &http.Client{Timeout: 5 * time.Second} resp, err := client.Get(url) responseTime := time.Since(start) mutex.Lock() health := healthStatus[name] health.LastCheck = time.Now() health.ResponseTime = responseTime if err != nil { health.ErrorCount++ health.Status = "unhealthy" slog.Warn("Health check failed", "service", name, "endpoint", url, "error", err, "response_time", responseTime) } else { if resp.StatusCode >= 200 && resp.StatusCode < 300 { health.Status = "healthy" } else if resp.StatusCode >= 300 && resp.StatusCode < 500 { health.Status = "degraded" } else { health.Status = "unhealthy" health.ErrorCount++ } resp.Body.Close() if responseTime > 2*time.Second { health.Status = "degraded" // Slow response } slog.Debug("Health check completed", "service", name, "status", health.Status, "response_time", responseTime, "status_code", resp.StatusCode) } mutex.Unlock() }(serviceName, endpoint) } } // generateHealthSummary creates a summary of service health func generateHealthSummary(healthStatus map[string]*ServiceHealth, mutex *sync.RWMutex) map[string]int { mutex.RLock() defer mutex.RUnlock() summary := map[string]int{ "healthy": 0, "degraded": 0, "unhealthy": 0, "unknown": 0, } for _, health := range healthStatus { summary[health.Status]++ } return summary } // determineOverallState determines the overall system state func determineOverallState(healthSummary map[string]int) string { if healthSummary["unhealthy"] > 0 { return "failed" } if healthSummary["degraded"] > 0 { return "executing" // Degraded but still working } if healthSummary["healthy"] > 0 { return "done" } return "waiting" // All unknown } // calculateHealthScore calculates a health score (0.0-1.0) func calculateHealthScore(healthSummary map[string]int) float64 { total := healthSummary["healthy"] + healthSummary["degraded"] + healthSummary["unhealthy"] + healthSummary["unknown"] if total == 0 { return 0.0 } // Weight the scores: healthy=1.0, degraded=0.5, unhealthy=0.0, unknown=0.25 score := float64(healthSummary["healthy"])*1.0 + float64(healthSummary["degraded"])*0.5 + float64(healthSummary["unknown"])*0.25 return score / float64(total) } // formatHealthSummary creates a compact string representation func formatHealthSummary(healthSummary map[string]int) string { return fmt.Sprintf("H:%d D:%d U:%d ?:%d", healthSummary["healthy"], healthSummary["degraded"], healthSummary["unhealthy"], healthSummary["unknown"]) } // formatDetailedHealth creates detailed health information func formatDetailedHealth(healthSummary map[string]int, systemMetrics SystemMetrics) string { return fmt.Sprintf("Health: %s, CPU: %.1f%%, Mem: %.1f%%, Heap: %.1fMB", formatHealthSummary(healthSummary), systemMetrics.CPUPercent, systemMetrics.MemoryPercent, systemMetrics.HeapSizeMB) } // collectSystemMetrics collects basic system metrics func collectSystemMetrics() SystemMetrics { var mem runtime.MemStats runtime.ReadMemStats(&mem) return SystemMetrics{ CPUPercent: 0.0, // Would need external package like gopsutil for real CPU metrics MemoryPercent: float64(mem.Sys) / (1024 * 1024 * 1024) * 100, // Rough approximation GoroutineCount: runtime.NumGoroutine(), HeapSizeMB: float64(mem.HeapSys) / (1024 * 1024), } }