 9bdcbe0447
			
		
	
	9bdcbe0447
	
	
	
		
			
			Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			277 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			277 lines
		
	
	
		
			7.9 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package sdk
 | |
| 
 | |
| import (
 | |
| 	"expvar"
 | |
| 	"fmt"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // Metrics provides comprehensive observability for the SDK
 | |
| type Metrics struct {
 | |
| 	// Connection metrics
 | |
| 	ConnectionStatus    *expvar.Int
 | |
| 	ReconnectCount      *expvar.Int
 | |
| 	ConnectionDuration  *expvar.Int
 | |
| 	
 | |
| 	// Beat metrics
 | |
| 	BeatsReceived       *expvar.Int
 | |
| 	DownbeatsReceived   *expvar.Int
 | |
| 	BeatJitterMS        *expvar.Map
 | |
| 	BeatCallbackLatency *expvar.Map
 | |
| 	BeatMisses          *expvar.Int
 | |
| 	LocalDegradationTime *expvar.Int
 | |
| 	
 | |
| 	// Status emission metrics
 | |
| 	StatusClaimsEmitted *expvar.Int
 | |
| 	StatusClaimErrors   *expvar.Int
 | |
| 	
 | |
| 	// Budget metrics
 | |
| 	BudgetsCreated      *expvar.Int
 | |
| 	BudgetsCompleted    *expvar.Int
 | |
| 	BudgetsTimedOut     *expvar.Int
 | |
| 	
 | |
| 	// Error metrics
 | |
| 	TotalErrors         *expvar.Int
 | |
| 	LastError           *expvar.String
 | |
| 	
 | |
| 	// Internal counters
 | |
| 	beatJitterSamples   []float64
 | |
| 	jitterMutex         sync.Mutex
 | |
| 	callbackLatencies   []float64
 | |
| 	latencyMutex        sync.Mutex
 | |
| }
 | |
| 
 | |
| // NewMetrics creates a new metrics instance with expvar integration
 | |
| func NewMetrics(prefix string) *Metrics {
 | |
| 	m := &Metrics{
 | |
| 		ConnectionStatus:     expvar.NewInt(prefix + ".connection.status"),
 | |
| 		ReconnectCount:       expvar.NewInt(prefix + ".connection.reconnects"),
 | |
| 		ConnectionDuration:   expvar.NewInt(prefix + ".connection.duration_ms"),
 | |
| 		
 | |
| 		BeatsReceived:        expvar.NewInt(prefix + ".beats.received"),
 | |
| 		DownbeatsReceived:    expvar.NewInt(prefix + ".beats.downbeats"),
 | |
| 		BeatJitterMS:         expvar.NewMap(prefix + ".beats.jitter_ms"),
 | |
| 		BeatCallbackLatency:  expvar.NewMap(prefix + ".beats.callback_latency_ms"),
 | |
| 		BeatMisses:           expvar.NewInt(prefix + ".beats.misses"),
 | |
| 		LocalDegradationTime: expvar.NewInt(prefix + ".beats.degradation_ms"),
 | |
| 		
 | |
| 		StatusClaimsEmitted:  expvar.NewInt(prefix + ".status.claims_emitted"),
 | |
| 		StatusClaimErrors:    expvar.NewInt(prefix + ".status.claim_errors"),
 | |
| 		
 | |
| 		BudgetsCreated:       expvar.NewInt(prefix + ".budgets.created"),
 | |
| 		BudgetsCompleted:     expvar.NewInt(prefix + ".budgets.completed"),
 | |
| 		BudgetsTimedOut:      expvar.NewInt(prefix + ".budgets.timed_out"),
 | |
| 		
 | |
| 		TotalErrors:          expvar.NewInt(prefix + ".errors.total"),
 | |
| 		LastError:            expvar.NewString(prefix + ".errors.last"),
 | |
| 		
 | |
| 		beatJitterSamples:    make([]float64, 0, 100),
 | |
| 		callbackLatencies:    make([]float64, 0, 100),
 | |
| 	}
 | |
| 	
 | |
| 	// Initialize connection status to disconnected
 | |
| 	m.ConnectionStatus.Set(0)
 | |
| 	
 | |
| 	return m
 | |
| }
 | |
| 
 | |
| // RecordConnection records connection establishment
 | |
| func (m *Metrics) RecordConnection() {
 | |
| 	m.ConnectionStatus.Set(1)
 | |
| 	m.ReconnectCount.Add(1)
 | |
| }
 | |
| 
 | |
| // RecordDisconnection records connection loss
 | |
| func (m *Metrics) RecordDisconnection() {
 | |
| 	m.ConnectionStatus.Set(0)
 | |
| }
 | |
| 
 | |
| // RecordBeat records a beat reception with jitter measurement
 | |
| func (m *Metrics) RecordBeat(expectedTime, actualTime time.Time, isDownbeat bool) {
 | |
| 	m.BeatsReceived.Add(1)
 | |
| 	if isDownbeat {
 | |
| 		m.DownbeatsReceived.Add(1)
 | |
| 	}
 | |
| 	
 | |
| 	// Calculate and record jitter
 | |
| 	jitter := actualTime.Sub(expectedTime)
 | |
| 	jitterMS := float64(jitter.Nanoseconds()) / 1e6
 | |
| 	
 | |
| 	m.jitterMutex.Lock()
 | |
| 	m.beatJitterSamples = append(m.beatJitterSamples, jitterMS)
 | |
| 	if len(m.beatJitterSamples) > 100 {
 | |
| 		m.beatJitterSamples = m.beatJitterSamples[1:]
 | |
| 	}
 | |
| 	
 | |
| 	// Update jitter statistics
 | |
| 	if len(m.beatJitterSamples) > 0 {
 | |
| 		avg, p95, p99 := m.calculatePercentiles(m.beatJitterSamples)
 | |
| 		m.BeatJitterMS.Set("avg", &expvar.Float{})
 | |
| 		m.BeatJitterMS.Get("avg").(*expvar.Float).Set(avg)
 | |
| 		m.BeatJitterMS.Set("p95", &expvar.Float{})
 | |
| 		m.BeatJitterMS.Get("p95").(*expvar.Float).Set(p95)
 | |
| 		m.BeatJitterMS.Set("p99", &expvar.Float{})
 | |
| 		m.BeatJitterMS.Get("p99").(*expvar.Float).Set(p99)
 | |
| 	}
 | |
| 	m.jitterMutex.Unlock()
 | |
| }
 | |
| 
 | |
| // RecordBeatMiss records a missed beat
 | |
| func (m *Metrics) RecordBeatMiss() {
 | |
| 	m.BeatMisses.Add(1)
 | |
| }
 | |
| 
 | |
| // RecordCallbackLatency records callback execution latency
 | |
| func (m *Metrics) RecordCallbackLatency(duration time.Duration, callbackType string) {
 | |
| 	latencyMS := float64(duration.Nanoseconds()) / 1e6
 | |
| 	
 | |
| 	m.latencyMutex.Lock()
 | |
| 	m.callbackLatencies = append(m.callbackLatencies, latencyMS)
 | |
| 	if len(m.callbackLatencies) > 100 {
 | |
| 		m.callbackLatencies = m.callbackLatencies[1:]
 | |
| 	}
 | |
| 	
 | |
| 	// Update latency statistics
 | |
| 	if len(m.callbackLatencies) > 0 {
 | |
| 		avg, p95, p99 := m.calculatePercentiles(m.callbackLatencies)
 | |
| 		key := callbackType + "_avg"
 | |
| 		m.BeatCallbackLatency.Set(key, &expvar.Float{})
 | |
| 		m.BeatCallbackLatency.Get(key).(*expvar.Float).Set(avg)
 | |
| 		
 | |
| 		key = callbackType + "_p95"
 | |
| 		m.BeatCallbackLatency.Set(key, &expvar.Float{})
 | |
| 		m.BeatCallbackLatency.Get(key).(*expvar.Float).Set(p95)
 | |
| 		
 | |
| 		key = callbackType + "_p99"
 | |
| 		m.BeatCallbackLatency.Set(key, &expvar.Float{})
 | |
| 		m.BeatCallbackLatency.Get(key).(*expvar.Float).Set(p99)
 | |
| 	}
 | |
| 	m.latencyMutex.Unlock()
 | |
| }
 | |
| 
 | |
| // RecordLocalDegradation records time spent in local degradation mode
 | |
| func (m *Metrics) RecordLocalDegradation(duration time.Duration) {
 | |
| 	durationMS := duration.Nanoseconds() / 1e6
 | |
| 	m.LocalDegradationTime.Add(durationMS)
 | |
| }
 | |
| 
 | |
| // RecordStatusClaim records a status claim emission
 | |
| func (m *Metrics) RecordStatusClaim(success bool) {
 | |
| 	if success {
 | |
| 		m.StatusClaimsEmitted.Add(1)
 | |
| 	} else {
 | |
| 		m.StatusClaimErrors.Add(1)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RecordBudget records budget creation and completion
 | |
| func (m *Metrics) RecordBudgetCreated() {
 | |
| 	m.BudgetsCreated.Add(1)
 | |
| }
 | |
| 
 | |
| func (m *Metrics) RecordBudgetCompleted(timedOut bool) {
 | |
| 	if timedOut {
 | |
| 		m.BudgetsTimedOut.Add(1)
 | |
| 	} else {
 | |
| 		m.BudgetsCompleted.Add(1)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RecordError records an error
 | |
| func (m *Metrics) RecordError(err string) {
 | |
| 	m.TotalErrors.Add(1)
 | |
| 	m.LastError.Set(err)
 | |
| }
 | |
| 
 | |
| // calculatePercentiles calculates avg, p95, p99 for a slice of samples
 | |
| func (m *Metrics) calculatePercentiles(samples []float64) (avg, p95, p99 float64) {
 | |
| 	if len(samples) == 0 {
 | |
| 		return 0, 0, 0
 | |
| 	}
 | |
| 	
 | |
| 	// Calculate average
 | |
| 	sum := 0.0
 | |
| 	for _, s := range samples {
 | |
| 		sum += s
 | |
| 	}
 | |
| 	avg = sum / float64(len(samples))
 | |
| 	
 | |
| 	// Sort for percentiles (simple bubble sort for small slices)
 | |
| 	sorted := make([]float64, len(samples))
 | |
| 	copy(sorted, samples)
 | |
| 	
 | |
| 	for i := 0; i < len(sorted); i++ {
 | |
| 		for j := 0; j < len(sorted)-i-1; j++ {
 | |
| 			if sorted[j] > sorted[j+1] {
 | |
| 				sorted[j], sorted[j+1] = sorted[j+1], sorted[j]
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// Calculate percentiles
 | |
| 	p95Index := int(float64(len(sorted)) * 0.95)
 | |
| 	if p95Index >= len(sorted) {
 | |
| 		p95Index = len(sorted) - 1
 | |
| 	}
 | |
| 	p95 = sorted[p95Index]
 | |
| 	
 | |
| 	p99Index := int(float64(len(sorted)) * 0.99)
 | |
| 	if p99Index >= len(sorted) {
 | |
| 		p99Index = len(sorted) - 1
 | |
| 	}
 | |
| 	p99 = sorted[p99Index]
 | |
| 	
 | |
| 	return avg, p95, p99
 | |
| }
 | |
| 
 | |
| // Enhanced client with metrics integration
 | |
| func (c *client) initMetrics() {
 | |
| 	prefix := fmt.Sprintf("backbeat.sdk.%s", c.config.AgentID)
 | |
| 	c.metrics = NewMetrics(prefix)
 | |
| }
 | |
| 
 | |
| // Add metrics field to client struct (this would go in client.go)
 | |
| type clientWithMetrics struct {
 | |
| 	*client
 | |
| 	metrics *Metrics
 | |
| }
 | |
| 
 | |
| // Prometheus integration helper
 | |
| type PrometheusMetrics struct {
 | |
| 	// This would integrate with prometheus/client_golang
 | |
| 	// For now, we'll just use expvar which can be scraped
 | |
| }
 | |
| 
 | |
| // GetMetricsSnapshot returns a snapshot of all current metrics
 | |
| func (m *Metrics) GetMetricsSnapshot() map[string]interface{} {
 | |
| 	snapshot := make(map[string]interface{})
 | |
| 	
 | |
| 	snapshot["connection_status"] = m.ConnectionStatus.Value()
 | |
| 	snapshot["reconnect_count"] = m.ReconnectCount.Value()
 | |
| 	snapshot["beats_received"] = m.BeatsReceived.Value()
 | |
| 	snapshot["downbeats_received"] = m.DownbeatsReceived.Value()
 | |
| 	snapshot["beat_misses"] = m.BeatMisses.Value()
 | |
| 	snapshot["status_claims_emitted"] = m.StatusClaimsEmitted.Value()
 | |
| 	snapshot["status_claim_errors"] = m.StatusClaimErrors.Value()
 | |
| 	snapshot["budgets_created"] = m.BudgetsCreated.Value()
 | |
| 	snapshot["budgets_completed"] = m.BudgetsCompleted.Value()
 | |
| 	snapshot["budgets_timed_out"] = m.BudgetsTimedOut.Value()
 | |
| 	snapshot["total_errors"] = m.TotalErrors.Value()
 | |
| 	snapshot["last_error"] = m.LastError.Value()
 | |
| 	
 | |
| 	return snapshot
 | |
| }
 | |
| 
 | |
| // Health check with metrics
 | |
| func (c *client) GetHealthWithMetrics() map[string]interface{} {
 | |
| 	health := map[string]interface{}{
 | |
| 		"status": c.Health(),
 | |
| 	}
 | |
| 	
 | |
| 	if c.metrics != nil {
 | |
| 		health["metrics"] = c.metrics.GetMetricsSnapshot()
 | |
| 	}
 | |
| 	
 | |
| 	return health
 | |
| } |