 543ab216f9
			
		
	
	543ab216f9
	
	
	
		
			
			🎭 CHORUS now contains full BZZZ functionality adapted for containers Core systems ported: - P2P networking (libp2p with DHT and PubSub) - Task coordination (COOEE protocol) - HMMM collaborative reasoning - SHHH encryption and security - SLURP admin election system - UCXL content addressing - UCXI server integration - Hypercore logging system - Health monitoring and graceful shutdown - License validation with KACHING Container adaptations: - Environment variable configuration (no YAML files) - Container-optimized logging to stdout/stderr - Auto-generated agent IDs for container deployments - Docker-first architecture All proven BZZZ P2P protocols, AI integration, and collaboration features are now available in containerized form. Next: Build and test container deployment. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			472 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			472 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package leader
 | |
| 
 | |
| import (
 | |
| 	"sync"
 | |
| 	"time"
 | |
| )
 | |
| 
 | |
| // MetricsCollector collects and tracks metrics for context generation operations
 | |
| type MetricsCollector struct {
 | |
| 	mu                  sync.RWMutex
 | |
| 	startTime           time.Time
 | |
| 	
 | |
| 	// Request metrics
 | |
| 	totalRequests       int64
 | |
| 	successfulRequests  int64
 | |
| 	failedRequests      int64
 | |
| 	cancelledRequests   int64
 | |
| 	droppedRequests     int64
 | |
| 	
 | |
| 	// Queue metrics
 | |
| 	queueLengthSamples  []int
 | |
| 	maxQueueLength      int
 | |
| 	queueOverflows      int64
 | |
| 	
 | |
| 	// Processing metrics
 | |
| 	totalProcessingTime time.Duration
 | |
| 	minProcessingTime   time.Duration
 | |
| 	maxProcessingTime   time.Duration
 | |
| 	
 | |
| 	// Leadership metrics
 | |
| 	leadershipChanges   int64
 | |
| 	timeAsLeader        time.Duration
 | |
| 	lastBecameLeader    time.Time
 | |
| 	lastLostLeadership  time.Time
 | |
| 	
 | |
| 	// Error metrics
 | |
| 	errorsByType        map[string]int64
 | |
| 	errorsByCode        map[string]int64
 | |
| 	
 | |
| 	// Performance metrics
 | |
| 	throughput          float64  // requests per second
 | |
| 	averageLatency      time.Duration
 | |
| 	p95Latency          time.Duration
 | |
| 	p99Latency          time.Duration
 | |
| 	
 | |
| 	// Custom metrics
 | |
| 	customCounters      map[string]int64
 | |
| 	customGauges        map[string]float64
 | |
| 	customTimers        map[string]time.Duration
 | |
| }
 | |
| 
 | |
| // NewMetricsCollector creates a new metrics collector
 | |
| func NewMetricsCollector() *MetricsCollector {
 | |
| 	return &MetricsCollector{
 | |
| 		startTime:           time.Now(),
 | |
| 		queueLengthSamples:  make([]int, 0, 1000),
 | |
| 		minProcessingTime:   time.Hour, // Large initial value
 | |
| 		errorsByType:        make(map[string]int64),
 | |
| 		errorsByCode:        make(map[string]int64),
 | |
| 		customCounters:      make(map[string]int64),
 | |
| 		customGauges:        make(map[string]float64),
 | |
| 		customTimers:        make(map[string]time.Duration),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RecordRequest records a context generation request
 | |
| func (mc *MetricsCollector) RecordRequest(success bool, processingTime time.Duration, errorType, errorCode string) {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 	
 | |
| 	mc.totalRequests++
 | |
| 	
 | |
| 	if success {
 | |
| 		mc.successfulRequests++
 | |
| 	} else {
 | |
| 		mc.failedRequests++
 | |
| 		if errorType != "" {
 | |
| 			mc.errorsByType[errorType]++
 | |
| 		}
 | |
| 		if errorCode != "" {
 | |
| 			mc.errorsByCode[errorCode]++
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// Update processing time metrics
 | |
| 	mc.totalProcessingTime += processingTime
 | |
| 	if processingTime < mc.minProcessingTime {
 | |
| 		mc.minProcessingTime = processingTime
 | |
| 	}
 | |
| 	if processingTime > mc.maxProcessingTime {
 | |
| 		mc.maxProcessingTime = processingTime
 | |
| 	}
 | |
| 	
 | |
| 	// Calculate running averages
 | |
| 	mc.updatePerformanceMetrics()
 | |
| }
 | |
| 
 | |
| // RecordQueueLength records current queue length
 | |
| func (mc *MetricsCollector) RecordQueueLength(length int) {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 	
 | |
| 	if length > mc.maxQueueLength {
 | |
| 		mc.maxQueueLength = length
 | |
| 	}
 | |
| 	
 | |
| 	// Keep a sliding window of queue length samples
 | |
| 	mc.queueLengthSamples = append(mc.queueLengthSamples, length)
 | |
| 	if len(mc.queueLengthSamples) > 1000 {
 | |
| 		mc.queueLengthSamples = mc.queueLengthSamples[1:]
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RecordQueueOverflow records a queue overflow event
 | |
| func (mc *MetricsCollector) RecordQueueOverflow() {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 	
 | |
| 	mc.queueOverflows++
 | |
| 	mc.droppedRequests++
 | |
| }
 | |
| 
 | |
| // RecordLeadershipChange records a leadership change
 | |
| func (mc *MetricsCollector) RecordLeadershipChange(becameLeader bool) {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 	
 | |
| 	mc.leadershipChanges++
 | |
| 	
 | |
| 	if becameLeader {
 | |
| 		mc.lastBecameLeader = time.Now()
 | |
| 	} else {
 | |
| 		mc.lastLostLeadership = time.Now()
 | |
| 		if !mc.lastBecameLeader.IsZero() {
 | |
| 			mc.timeAsLeader += time.Since(mc.lastBecameLeader)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // RecordCancellation records a request cancellation
 | |
| func (mc *MetricsCollector) RecordCancellation() {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 	
 | |
| 	mc.cancelledRequests++
 | |
| }
 | |
| 
 | |
| // IncrementCounter increments a custom counter
 | |
| func (mc *MetricsCollector) IncrementCounter(name string, delta int64) {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 	
 | |
| 	mc.customCounters[name] += delta
 | |
| }
 | |
| 
 | |
| // SetGauge sets a custom gauge value
 | |
| func (mc *MetricsCollector) SetGauge(name string, value float64) {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 	
 | |
| 	mc.customGauges[name] = value
 | |
| }
 | |
| 
 | |
| // RecordTimer records a custom timer value
 | |
| func (mc *MetricsCollector) RecordTimer(name string, duration time.Duration) {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 	
 | |
| 	mc.customTimers[name] = duration
 | |
| }
 | |
| 
 | |
| // GetMetrics returns current metrics snapshot
 | |
| func (mc *MetricsCollector) GetMetrics() *ContextMetrics {
 | |
| 	mc.mu.RLock()
 | |
| 	defer mc.mu.RUnlock()
 | |
| 	
 | |
| 	uptime := time.Since(mc.startTime)
 | |
| 	
 | |
| 	metrics := &ContextMetrics{
 | |
| 		// Basic metrics
 | |
| 		Uptime:             uptime,
 | |
| 		TotalRequests:      mc.totalRequests,
 | |
| 		SuccessfulRequests: mc.successfulRequests,
 | |
| 		FailedRequests:     mc.failedRequests,
 | |
| 		CancelledRequests:  mc.cancelledRequests,
 | |
| 		DroppedRequests:    mc.droppedRequests,
 | |
| 		
 | |
| 		// Success rate
 | |
| 		SuccessRate: mc.calculateSuccessRate(),
 | |
| 		
 | |
| 		// Queue metrics
 | |
| 		MaxQueueLength:     mc.maxQueueLength,
 | |
| 		QueueOverflows:     mc.queueOverflows,
 | |
| 		AverageQueueLength: mc.calculateAverageQueueLength(),
 | |
| 		
 | |
| 		// Processing metrics
 | |
| 		AverageProcessingTime: mc.calculateAverageProcessingTime(),
 | |
| 		MinProcessingTime:     mc.minProcessingTime,
 | |
| 		MaxProcessingTime:     mc.maxProcessingTime,
 | |
| 		
 | |
| 		// Performance metrics
 | |
| 		Throughput:      mc.throughput,
 | |
| 		AverageLatency:  mc.averageLatency,
 | |
| 		P95Latency:      mc.p95Latency,
 | |
| 		P99Latency:      mc.p99Latency,
 | |
| 		
 | |
| 		// Leadership metrics
 | |
| 		LeadershipChanges: mc.leadershipChanges,
 | |
| 		TimeAsLeader:      mc.timeAsLeader,
 | |
| 		LastBecameLeader:  mc.lastBecameLeader,
 | |
| 		LastLostLeadership: mc.lastLostLeadership,
 | |
| 		
 | |
| 		// Error metrics
 | |
| 		ErrorsByType:      make(map[string]int64),
 | |
| 		ErrorsByCode:      make(map[string]int64),
 | |
| 		
 | |
| 		// Custom metrics
 | |
| 		CustomCounters: make(map[string]int64),
 | |
| 		CustomGauges:   make(map[string]float64),
 | |
| 		CustomTimers:   make(map[string]time.Duration),
 | |
| 		
 | |
| 		// Metadata
 | |
| 		CollectedAt: time.Now(),
 | |
| 	}
 | |
| 	
 | |
| 	// Copy error maps
 | |
| 	for k, v := range mc.errorsByType {
 | |
| 		metrics.ErrorsByType[k] = v
 | |
| 	}
 | |
| 	for k, v := range mc.errorsByCode {
 | |
| 		metrics.ErrorsByCode[k] = v
 | |
| 	}
 | |
| 	
 | |
| 	// Copy custom metrics
 | |
| 	for k, v := range mc.customCounters {
 | |
| 		metrics.CustomCounters[k] = v
 | |
| 	}
 | |
| 	for k, v := range mc.customGauges {
 | |
| 		metrics.CustomGauges[k] = v
 | |
| 	}
 | |
| 	for k, v := range mc.customTimers {
 | |
| 		metrics.CustomTimers[k] = v
 | |
| 	}
 | |
| 	
 | |
| 	return metrics
 | |
| }
 | |
| 
 | |
| // Reset resets all metrics
 | |
| func (mc *MetricsCollector) Reset() {
 | |
| 	mc.mu.Lock()
 | |
| 	defer mc.mu.Unlock()
 | |
| 	
 | |
| 	mc.startTime = time.Now()
 | |
| 	mc.totalRequests = 0
 | |
| 	mc.successfulRequests = 0
 | |
| 	mc.failedRequests = 0
 | |
| 	mc.cancelledRequests = 0
 | |
| 	mc.droppedRequests = 0
 | |
| 	mc.queueLengthSamples = mc.queueLengthSamples[:0]
 | |
| 	mc.maxQueueLength = 0
 | |
| 	mc.queueOverflows = 0
 | |
| 	mc.totalProcessingTime = 0
 | |
| 	mc.minProcessingTime = time.Hour
 | |
| 	mc.maxProcessingTime = 0
 | |
| 	mc.leadershipChanges = 0
 | |
| 	mc.timeAsLeader = 0
 | |
| 	mc.lastBecameLeader = time.Time{}
 | |
| 	mc.lastLostLeadership = time.Time{}
 | |
| 	
 | |
| 	// Clear error maps
 | |
| 	for k := range mc.errorsByType {
 | |
| 		delete(mc.errorsByType, k)
 | |
| 	}
 | |
| 	for k := range mc.errorsByCode {
 | |
| 		delete(mc.errorsByCode, k)
 | |
| 	}
 | |
| 	
 | |
| 	// Clear custom metrics
 | |
| 	for k := range mc.customCounters {
 | |
| 		delete(mc.customCounters, k)
 | |
| 	}
 | |
| 	for k := range mc.customGauges {
 | |
| 		delete(mc.customGauges, k)
 | |
| 	}
 | |
| 	for k := range mc.customTimers {
 | |
| 		delete(mc.customTimers, k)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Helper methods
 | |
| 
 | |
| func (mc *MetricsCollector) calculateSuccessRate() float64 {
 | |
| 	if mc.totalRequests == 0 {
 | |
| 		return 0
 | |
| 	}
 | |
| 	return float64(mc.successfulRequests) / float64(mc.totalRequests)
 | |
| }
 | |
| 
 | |
| func (mc *MetricsCollector) calculateAverageQueueLength() float64 {
 | |
| 	if len(mc.queueLengthSamples) == 0 {
 | |
| 		return 0
 | |
| 	}
 | |
| 	
 | |
| 	var sum int
 | |
| 	for _, length := range mc.queueLengthSamples {
 | |
| 		sum += length
 | |
| 	}
 | |
| 	return float64(sum) / float64(len(mc.queueLengthSamples))
 | |
| }
 | |
| 
 | |
| func (mc *MetricsCollector) calculateAverageProcessingTime() time.Duration {
 | |
| 	if mc.totalRequests == 0 {
 | |
| 		return 0
 | |
| 	}
 | |
| 	return mc.totalProcessingTime / time.Duration(mc.totalRequests)
 | |
| }
 | |
| 
 | |
| func (mc *MetricsCollector) updatePerformanceMetrics() {
 | |
| 	// Calculate throughput (requests per second)
 | |
| 	uptime := time.Since(mc.startTime)
 | |
| 	if uptime.Seconds() > 0 {
 | |
| 		mc.throughput = float64(mc.totalRequests) / uptime.Seconds()
 | |
| 	}
 | |
| 	
 | |
| 	// Update average latency
 | |
| 	mc.averageLatency = mc.calculateAverageProcessingTime()
 | |
| 	
 | |
| 	// TODO: Calculate percentile latencies (requires storing all processing times)
 | |
| 	mc.p95Latency = mc.averageLatency * 2  // Rough estimate
 | |
| 	mc.p99Latency = mc.averageLatency * 3  // Rough estimate
 | |
| }
 | |
| 
 | |
| // ContextMetrics represents metrics for context generation operations
 | |
| type ContextMetrics struct {
 | |
| 	// Basic metrics
 | |
| 	Uptime             time.Duration `json:"uptime"`
 | |
| 	TotalRequests      int64         `json:"total_requests"`
 | |
| 	SuccessfulRequests int64         `json:"successful_requests"`
 | |
| 	FailedRequests     int64         `json:"failed_requests"`
 | |
| 	CancelledRequests  int64         `json:"cancelled_requests"`
 | |
| 	DroppedRequests    int64         `json:"dropped_requests"`
 | |
| 	SuccessRate        float64       `json:"success_rate"`
 | |
| 	
 | |
| 	// Queue metrics
 | |
| 	MaxQueueLength     int     `json:"max_queue_length"`
 | |
| 	QueueOverflows     int64   `json:"queue_overflows"`
 | |
| 	AverageQueueLength float64 `json:"average_queue_length"`
 | |
| 	
 | |
| 	// Processing metrics
 | |
| 	AverageProcessingTime time.Duration `json:"average_processing_time"`
 | |
| 	MinProcessingTime     time.Duration `json:"min_processing_time"`
 | |
| 	MaxProcessingTime     time.Duration `json:"max_processing_time"`
 | |
| 	
 | |
| 	// Performance metrics
 | |
| 	Throughput      float64       `json:"throughput"`       // requests per second
 | |
| 	AverageLatency  time.Duration `json:"average_latency"`
 | |
| 	P95Latency      time.Duration `json:"p95_latency"`
 | |
| 	P99Latency      time.Duration `json:"p99_latency"`
 | |
| 	
 | |
| 	// Leadership metrics
 | |
| 	LeadershipChanges  int64     `json:"leadership_changes"`
 | |
| 	TimeAsLeader       time.Duration `json:"time_as_leader"`
 | |
| 	LastBecameLeader   time.Time `json:"last_became_leader"`
 | |
| 	LastLostLeadership time.Time `json:"last_lost_leadership"`
 | |
| 	
 | |
| 	// Error metrics
 | |
| 	ErrorsByType map[string]int64 `json:"errors_by_type"`
 | |
| 	ErrorsByCode map[string]int64 `json:"errors_by_code"`
 | |
| 	
 | |
| 	// Custom metrics
 | |
| 	CustomCounters map[string]int64         `json:"custom_counters"`
 | |
| 	CustomGauges   map[string]float64       `json:"custom_gauges"`
 | |
| 	CustomTimers   map[string]time.Duration `json:"custom_timers"`
 | |
| 	
 | |
| 	// Metadata
 | |
| 	CollectedAt time.Time `json:"collected_at"`
 | |
| }
 | |
| 
 | |
| // HealthStatus represents various health status levels
 | |
| type HealthStatus string
 | |
| 
 | |
| const (
 | |
| 	HealthStatusHealthy   HealthStatus = "healthy"
 | |
| 	HealthStatusDegraded  HealthStatus = "degraded"
 | |
| 	HealthStatusUnhealthy HealthStatus = "unhealthy"
 | |
| 	HealthStatusCritical  HealthStatus = "critical"
 | |
| )
 | |
| 
 | |
| // QueueHealth represents queue health information
 | |
| type QueueHealth struct {
 | |
| 	Status              HealthStatus  `json:"status"`
 | |
| 	QueueLength         int           `json:"queue_length"`
 | |
| 	MaxQueueSize        int           `json:"max_queue_size"`
 | |
| 	QueueUtilization    float64       `json:"queue_utilization"`
 | |
| 	ProcessingRate      float64       `json:"processing_rate"`
 | |
| 	AverageWaitTime     time.Duration `json:"average_wait_time"`
 | |
| 	OldestRequest       *time.Time    `json:"oldest_request,omitempty"`
 | |
| 	HealthScore         float64       `json:"health_score"`
 | |
| 	Issues              []string      `json:"issues,omitempty"`
 | |
| 	Recommendations     []string      `json:"recommendations,omitempty"`
 | |
| 	LastHealthCheck     time.Time     `json:"last_health_check"`
 | |
| }
 | |
| 
 | |
| // LeaderHealth represents leader health information
 | |
| type LeaderHealth struct {
 | |
| 	Status              HealthStatus  `json:"status"`
 | |
| 	NodeID              string        `json:"node_id"`
 | |
| 	LeaderSince         time.Time     `json:"leader_since"`
 | |
| 	LastHeartbeat       time.Time     `json:"last_heartbeat"`
 | |
| 	ActiveTasks         int           `json:"active_tasks"`
 | |
| 	QueuedTasks         int           `json:"queued_tasks"`
 | |
| 	ProcessingCapacity  int           `json:"processing_capacity"`
 | |
| 	LoadPercentage      float64       `json:"load_percentage"`
 | |
| 	ResponseTime        time.Duration `json:"response_time"`
 | |
| 	HealthScore         float64       `json:"health_score"`
 | |
| 	Issues              []string      `json:"issues,omitempty"`
 | |
| 	Recommendations     []string      `json:"recommendations,omitempty"`
 | |
| 	LastHealthCheck     time.Time     `json:"last_health_check"`
 | |
| }
 | |
| 
 | |
| // HealthMetrics represents overall health metrics
 | |
| type HealthMetrics struct {
 | |
| 	OverallStatus       HealthStatus    `json:"overall_status"`
 | |
| 	OverallHealthScore  float64         `json:"overall_health_score"`
 | |
| 	QueueHealth         *QueueHealth    `json:"queue_health"`
 | |
| 	LeaderHealth        *LeaderHealth   `json:"leader_health"`
 | |
| 	ClusterHealth       map[string]*NodeHealth `json:"cluster_health"`
 | |
| 	SystemMetrics       *SystemMetrics  `json:"system_metrics"`
 | |
| 	Issues              []HealthIssue   `json:"issues,omitempty"`
 | |
| 	Recommendations     []string        `json:"recommendations,omitempty"`
 | |
| 	LastHealthCheck     time.Time       `json:"last_health_check"`
 | |
| }
 | |
| 
 | |
| // SystemMetrics represents system-level metrics
 | |
| type SystemMetrics struct {
 | |
| 	CPUUsage            float64   `json:"cpu_usage"`
 | |
| 	MemoryUsage         float64   `json:"memory_usage"`
 | |
| 	DiskUsage           float64   `json:"disk_usage"`
 | |
| 	NetworkLatency      time.Duration `json:"network_latency"`
 | |
| 	OpenFileDescriptors int       `json:"open_file_descriptors"`
 | |
| 	ActiveConnections   int       `json:"active_connections"`
 | |
| 	Uptime              time.Duration `json:"uptime"`
 | |
| 	LoadAverage         []float64 `json:"load_average"` // 1, 5, 15 minute averages
 | |
| }
 | |
| 
 | |
| // HealthPolicy represents health monitoring policy
 | |
| type HealthPolicy struct {
 | |
| 	HealthCheckInterval     time.Duration `json:"health_check_interval"`
 | |
| 	UnhealthyThreshold      float64       `json:"unhealthy_threshold"`
 | |
| 	CriticalThreshold       float64       `json:"critical_threshold"`
 | |
| 	MaxQueueUtilization     float64       `json:"max_queue_utilization"`
 | |
| 	MaxProcessingLatency    time.Duration `json:"max_processing_latency"`
 | |
| 	MaxLeaderResponseTime   time.Duration `json:"max_leader_response_time"`
 | |
| 	AlertOnIssues           bool          `json:"alert_on_issues"`
 | |
| 	AutoRecovery            bool          `json:"auto_recovery"`
 | |
| 	FailoverOnCritical      bool          `json:"failover_on_critical"`
 | |
| }
 | |
| 
 | |
| // DefaultHealthPolicy returns default health monitoring policy
 | |
| func DefaultHealthPolicy() *HealthPolicy {
 | |
| 	return &HealthPolicy{
 | |
| 		HealthCheckInterval:     30 * time.Second,
 | |
| 		UnhealthyThreshold:      0.7,  // 70%
 | |
| 		CriticalThreshold:       0.3,  // 30%
 | |
| 		MaxQueueUtilization:     0.9,  // 90%
 | |
| 		MaxProcessingLatency:    5 * time.Minute,
 | |
| 		MaxLeaderResponseTime:   10 * time.Second,
 | |
| 		AlertOnIssues:           true,
 | |
| 		AutoRecovery:            true,
 | |
| 		FailoverOnCritical:      true,
 | |
| 	}
 | |
| } |