Files
BACKBEAT/pkg/sdk/examples/service_monitor.go
2025-10-17 08:56:25 +11:00

326 lines
9.3 KiB
Go

package examples
import (
"context"
"crypto/ed25519"
"crypto/rand"
"encoding/json"
"fmt"
"log/slog"
"net/http"
"os"
"os/signal"
"runtime"
"sync"
"syscall"
"time"
"github.com/chorus-services/backbeat/pkg/sdk"
)
// ServiceHealth represents the health status of a monitored service
type ServiceHealth struct {
ServiceName string `json:"service_name"`
Status string `json:"status"` // healthy, degraded, unhealthy
LastCheck time.Time `json:"last_check"`
ResponseTime time.Duration `json:"response_time"`
ErrorCount int `json:"error_count"`
Uptime time.Duration `json:"uptime"`
}
// SystemMetrics represents system-level metrics
type SystemMetrics struct {
CPUPercent float64 `json:"cpu_percent"`
MemoryPercent float64 `json:"memory_percent"`
GoroutineCount int `json:"goroutine_count"`
HeapSizeMB float64 `json:"heap_size_mb"`
}
// ServiceMonitor demonstrates health monitoring with beat-aligned reporting
// This example shows how to integrate BACKBEAT with service monitoring
func ServiceMonitor() {
// Generate a signing key for this example
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
slog.Error("Failed to generate signing key", "error", err)
return
}
// Create SDK configuration
config := sdk.DefaultConfig()
config.ClusterID = "chorus-dev"
config.AgentID = "service-monitor"
config.NATSUrl = "nats://localhost:4222"
config.SigningKey = signingKey
config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// Create BACKBEAT client
client := sdk.NewClient(config)
// Services to monitor (example endpoints)
monitoredServices := map[string]string{
"pulse-service": "http://localhost:8080/health",
"reverb-service": "http://localhost:8081/health",
"nats-server": "http://localhost:8222/varz", // NATS monitoring endpoint
}
// Health tracking
var (
healthStatus = make(map[string]*ServiceHealth)
healthMutex sync.RWMutex
startTime = time.Now()
)
// Initialize health status
for serviceName := range monitoredServices {
healthStatus[serviceName] = &ServiceHealth{
ServiceName: serviceName,
Status: "unknown",
LastCheck: time.Time{},
}
}
// Register beat callback for frequent health checks
client.OnBeat(func(beat sdk.BeatFrame) {
// Perform health checks every 4 beats (reduce frequency)
if beat.BeatIndex%4 == 0 {
performHealthChecks(monitoredServices, healthStatus, &healthMutex)
}
// Emit status claim with current health summary
if beat.BeatIndex%2 == 0 {
healthSummary := generateHealthSummary(healthStatus, &healthMutex)
systemMetrics := collectSystemMetrics()
state := determineOverallState(healthSummary)
notes := fmt.Sprintf("Services: %s | CPU: %.1f%% | Mem: %.1f%% | Goroutines: %d",
formatHealthSummary(healthSummary),
systemMetrics.CPUPercent,
systemMetrics.MemoryPercent,
systemMetrics.GoroutineCount)
err := client.EmitStatusClaim(sdk.StatusClaim{
State: state,
BeatsLeft: 0, // Monitoring is continuous
Progress: calculateHealthScore(healthSummary),
Notes: notes,
})
if err != nil {
slog.Error("Failed to emit status claim", "error", err)
}
}
})
// Register downbeat callback for detailed reporting
client.OnDownbeat(func(beat sdk.BeatFrame) {
healthMutex.RLock()
healthData, _ := json.MarshalIndent(healthStatus, "", " ")
healthMutex.RUnlock()
systemMetrics := collectSystemMetrics()
uptime := time.Since(startTime)
slog.Info("Service health report",
"beat_index", beat.BeatIndex,
"window_id", beat.WindowID,
"uptime", uptime.String(),
"cpu_percent", systemMetrics.CPUPercent,
"memory_percent", systemMetrics.MemoryPercent,
"heap_mb", systemMetrics.HeapSizeMB,
"goroutines", systemMetrics.GoroutineCount,
)
// Log health details
slog.Debug("Detailed health status", "health_data", string(healthData))
// Emit comprehensive status for the bar
healthSummary := generateHealthSummary(healthStatus, &healthMutex)
err := client.EmitStatusClaim(sdk.StatusClaim{
State: "review", // Downbeat is review time
BeatsLeft: 0,
Progress: calculateHealthScore(healthSummary),
Notes: fmt.Sprintf("Bar %d health review: %s", beat.BeatIndex/4, formatDetailedHealth(healthSummary, systemMetrics)),
})
if err != nil {
slog.Error("Failed to emit downbeat status", "error", err)
}
})
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
slog.Info("Shutdown signal received")
cancel()
}()
// Start the client
if err := client.Start(ctx); err != nil {
slog.Error("Failed to start BACKBEAT client", "error", err)
return
}
defer client.Stop()
slog.Info("Service monitor started - use Ctrl+C to stop",
"monitored_services", len(monitoredServices))
// Expose metrics endpoint
go func() {
http.HandleFunc("/metrics", func(w http.ResponseWriter, r *http.Request) {
healthMutex.RLock()
data := make(map[string]interface{})
data["health"] = healthStatus
data["system"] = collectSystemMetrics()
data["backbeat"] = client.Health()
healthMutex.RUnlock()
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(data)
})
slog.Info("Metrics endpoint available", "url", "http://localhost:9090/metrics")
if err := http.ListenAndServe(":9090", nil); err != nil {
slog.Error("Metrics server failed", "error", err)
}
}()
// Wait for shutdown
<-ctx.Done()
slog.Info("Service monitor shutting down")
}
// performHealthChecks checks the health of all monitored services
func performHealthChecks(services map[string]string, healthStatus map[string]*ServiceHealth, mutex *sync.RWMutex) {
for serviceName, endpoint := range services {
go func(name, url string) {
start := time.Now()
client := &http.Client{Timeout: 5 * time.Second}
resp, err := client.Get(url)
responseTime := time.Since(start)
mutex.Lock()
health := healthStatus[name]
health.LastCheck = time.Now()
health.ResponseTime = responseTime
if err != nil {
health.ErrorCount++
health.Status = "unhealthy"
slog.Warn("Health check failed",
"service", name,
"endpoint", url,
"error", err,
"response_time", responseTime)
} else {
if resp.StatusCode >= 200 && resp.StatusCode < 300 {
health.Status = "healthy"
} else if resp.StatusCode >= 300 && resp.StatusCode < 500 {
health.Status = "degraded"
} else {
health.Status = "unhealthy"
health.ErrorCount++
}
resp.Body.Close()
if responseTime > 2*time.Second {
health.Status = "degraded" // Slow response
}
slog.Debug("Health check completed",
"service", name,
"status", health.Status,
"response_time", responseTime,
"status_code", resp.StatusCode)
}
mutex.Unlock()
}(serviceName, endpoint)
}
}
// generateHealthSummary creates a summary of service health
func generateHealthSummary(healthStatus map[string]*ServiceHealth, mutex *sync.RWMutex) map[string]int {
mutex.RLock()
defer mutex.RUnlock()
summary := map[string]int{
"healthy": 0,
"degraded": 0,
"unhealthy": 0,
"unknown": 0,
}
for _, health := range healthStatus {
summary[health.Status]++
}
return summary
}
// determineOverallState determines the overall system state
func determineOverallState(healthSummary map[string]int) string {
if healthSummary["unhealthy"] > 0 {
return "failed"
}
if healthSummary["degraded"] > 0 {
return "executing" // Degraded but still working
}
if healthSummary["healthy"] > 0 {
return "done"
}
return "waiting" // All unknown
}
// calculateHealthScore calculates a health score (0.0-1.0)
func calculateHealthScore(healthSummary map[string]int) float64 {
total := healthSummary["healthy"] + healthSummary["degraded"] + healthSummary["unhealthy"] + healthSummary["unknown"]
if total == 0 {
return 0.0
}
// Weight the scores: healthy=1.0, degraded=0.5, unhealthy=0.0, unknown=0.25
score := float64(healthSummary["healthy"])*1.0 +
float64(healthSummary["degraded"])*0.5 +
float64(healthSummary["unknown"])*0.25
return score / float64(total)
}
// formatHealthSummary creates a compact string representation
func formatHealthSummary(healthSummary map[string]int) string {
return fmt.Sprintf("H:%d D:%d U:%d ?:%d",
healthSummary["healthy"],
healthSummary["degraded"],
healthSummary["unhealthy"],
healthSummary["unknown"])
}
// formatDetailedHealth creates detailed health information
func formatDetailedHealth(healthSummary map[string]int, systemMetrics SystemMetrics) string {
return fmt.Sprintf("Health: %s, CPU: %.1f%%, Mem: %.1f%%, Heap: %.1fMB",
formatHealthSummary(healthSummary),
systemMetrics.CPUPercent,
systemMetrics.MemoryPercent,
systemMetrics.HeapSizeMB)
}
// collectSystemMetrics collects basic system metrics
func collectSystemMetrics() SystemMetrics {
var mem runtime.MemStats
runtime.ReadMemStats(&mem)
return SystemMetrics{
CPUPercent: 0.0, // Would need external package like gopsutil for real CPU metrics
MemoryPercent: float64(mem.Sys) / (1024 * 1024 * 1024) * 100, // Rough approximation
GoroutineCount: runtime.NumGoroutine(),
HeapSizeMB: float64(mem.HeapSys) / (1024 * 1024),
}
}