Files
CHORUS/pkg/slurp/storage/monitoring.go
anthonyrawlins 543ab216f9 Complete BZZZ functionality port to CHORUS
🎭 CHORUS now contains full BZZZ functionality adapted for containers

Core systems ported:
- P2P networking (libp2p with DHT and PubSub)
- Task coordination (COOEE protocol)
- HMMM collaborative reasoning
- SHHH encryption and security
- SLURP admin election system
- UCXL content addressing
- UCXI server integration
- Hypercore logging system
- Health monitoring and graceful shutdown
- License validation with KACHING

Container adaptations:
- Environment variable configuration (no YAML files)
- Container-optimized logging to stdout/stderr
- Auto-generated agent IDs for container deployments
- Docker-first architecture

All proven BZZZ P2P protocols, AI integration, and collaboration
features are now available in containerized form.

Next: Build and test container deployment.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-02 20:02:37 +10:00

691 lines
20 KiB
Go

package storage
import (
"context"
"encoding/json"
"fmt"
"sort"
"sync"
"time"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
)
// MonitoringSystem provides comprehensive monitoring for the storage system
type MonitoringSystem struct {
mu sync.RWMutex
nodeID string
metrics *StorageMetrics
alerts *AlertManager
healthChecker *HealthChecker
performanceProfiler *PerformanceProfiler
logger *StructuredLogger
notifications chan *MonitoringEvent
stopCh chan struct{}
}
// StorageMetrics contains all Prometheus metrics for storage operations
type StorageMetrics struct {
// Operation counters
StoreOperations prometheus.Counter
RetrieveOperations prometheus.Counter
DeleteOperations prometheus.Counter
UpdateOperations prometheus.Counter
SearchOperations prometheus.Counter
BatchOperations prometheus.Counter
// Error counters
StoreErrors prometheus.Counter
RetrieveErrors prometheus.Counter
EncryptionErrors prometheus.Counter
DecryptionErrors prometheus.Counter
ReplicationErrors prometheus.Counter
CacheErrors prometheus.Counter
IndexErrors prometheus.Counter
// Latency histograms
StoreLatency prometheus.Histogram
RetrieveLatency prometheus.Histogram
EncryptionLatency prometheus.Histogram
DecryptionLatency prometheus.Histogram
ReplicationLatency prometheus.Histogram
SearchLatency prometheus.Histogram
// Cache metrics
CacheHits prometheus.Counter
CacheMisses prometheus.Counter
CacheEvictions prometheus.Counter
CacheSize prometheus.Gauge
// Storage size metrics
LocalStorageSize prometheus.Gauge
DistributedStorageSize prometheus.Gauge
CompressedStorageSize prometheus.Gauge
IndexStorageSize prometheus.Gauge
// Replication metrics
ReplicationFactor prometheus.Gauge
HealthyReplicas prometheus.Gauge
UnderReplicated prometheus.Gauge
ReplicationLag prometheus.Histogram
// Encryption metrics
EncryptedContexts prometheus.Gauge
KeyRotations prometheus.Counter
AccessDenials prometheus.Counter
ActiveKeys prometheus.Gauge
// Performance metrics
Throughput prometheus.Gauge
ConcurrentOperations prometheus.Gauge
QueueDepth prometheus.Gauge
// Health metrics
StorageHealth prometheus.Gauge
NodeConnectivity prometheus.Gauge
SyncLatency prometheus.Histogram
}
// AlertManager handles storage-related alerts and notifications
type AlertManager struct {
mu sync.RWMutex
rules []*AlertRule
activealerts map[string]*Alert
notifiers []AlertNotifier
history []*Alert
maxHistory int
}
// AlertRule defines conditions for triggering alerts
type AlertRule struct {
ID string `json:"id"`
Name string `json:"name"`
Description string `json:"description"`
Metric string `json:"metric"`
Condition string `json:"condition"` // >, <, ==, !=, etc.
Threshold float64 `json:"threshold"`
Duration time.Duration `json:"duration"`
Severity AlertSeverity `json:"severity"`
Labels map[string]string `json:"labels"`
Enabled bool `json:"enabled"`
}
// Alert represents an active or resolved alert
type Alert struct {
ID string `json:"id"`
RuleID string `json:"rule_id"`
Name string `json:"name"`
Description string `json:"description"`
Severity AlertSeverity `json:"severity"`
Status AlertStatus `json:"status"`
Value float64 `json:"value"`
Threshold float64 `json:"threshold"`
Labels map[string]string `json:"labels"`
StartTime time.Time `json:"start_time"`
EndTime *time.Time `json:"end_time,omitempty"`
LastUpdate time.Time `json:"last_update"`
}
// AlertSeverity defines alert severity levels
type AlertSeverity string
const (
SeverityInfo AlertSeverity = "info"
SeverityWarning AlertSeverity = "warning"
SeverityError AlertSeverity = "error"
SeverityCritical AlertSeverity = "critical"
)
// AlertStatus defines alert status
type AlertStatus string
const (
StatusPending AlertStatus = "pending"
StatusFiring AlertStatus = "firing"
StatusResolved AlertStatus = "resolved"
)
// AlertNotifier interface for sending alert notifications
type AlertNotifier interface {
Notify(alert *Alert) error
GetType() string
}
// HealthChecker monitors the overall health of the storage system
type HealthChecker struct {
mu sync.RWMutex
checks map[string]HealthCheck
status *SystemHealth
checkInterval time.Duration
timeout time.Duration
}
// HealthCheck defines a single health check
type HealthCheck struct {
Name string `json:"name"`
Description string `json:"description"`
Checker func(ctx context.Context) HealthResult `json:"-"`
Interval time.Duration `json:"interval"`
Timeout time.Duration `json:"timeout"`
Enabled bool `json:"enabled"`
}
// HealthResult represents the result of a health check
type HealthResult struct {
Healthy bool `json:"healthy"`
Message string `json:"message"`
Latency time.Duration `json:"latency"`
Metadata map[string]interface{} `json:"metadata"`
Timestamp time.Time `json:"timestamp"`
}
// SystemHealth represents the overall health of the storage system
type SystemHealth struct {
OverallStatus HealthStatus `json:"overall_status"`
Components map[string]HealthResult `json:"components"`
LastUpdate time.Time `json:"last_update"`
Uptime time.Duration `json:"uptime"`
StartTime time.Time `json:"start_time"`
}
// HealthStatus represents system health status
type HealthStatus string
const (
HealthHealthy HealthStatus = "healthy"
HealthDegraded HealthStatus = "degraded"
HealthUnhealthy HealthStatus = "unhealthy"
)
// PerformanceProfiler analyzes storage performance patterns
type PerformanceProfiler struct {
mu sync.RWMutex
operationProfiles map[string]*OperationProfile
resourceUsage *ResourceUsage
bottlenecks []*Bottleneck
recommendations []*PerformanceRecommendation
}
// OperationProfile contains performance analysis for a specific operation type
type OperationProfile struct {
Operation string `json:"operation"`
TotalOperations int64 `json:"total_operations"`
AverageLatency time.Duration `json:"average_latency"`
P50Latency time.Duration `json:"p50_latency"`
P95Latency time.Duration `json:"p95_latency"`
P99Latency time.Duration `json:"p99_latency"`
Throughput float64 `json:"throughput"`
ErrorRate float64 `json:"error_rate"`
LatencyHistory []time.Duration `json:"-"`
LastUpdated time.Time `json:"last_updated"`
}
// ResourceUsage tracks resource consumption
type ResourceUsage struct {
CPUUsage float64 `json:"cpu_usage"`
MemoryUsage int64 `json:"memory_usage"`
DiskUsage int64 `json:"disk_usage"`
NetworkIn int64 `json:"network_in"`
NetworkOut int64 `json:"network_out"`
OpenFiles int `json:"open_files"`
Goroutines int `json:"goroutines"`
LastUpdated time.Time `json:"last_updated"`
}
// Bottleneck represents a performance bottleneck
type Bottleneck struct {
ID string `json:"id"`
Type string `json:"type"` // cpu, memory, disk, network, etc.
Component string `json:"component"`
Description string `json:"description"`
Severity AlertSeverity `json:"severity"`
Impact float64 `json:"impact"`
DetectedAt time.Time `json:"detected_at"`
Metadata map[string]interface{} `json:"metadata"`
}
// PerformanceRecommendation suggests optimizations
type PerformanceRecommendation struct {
ID string `json:"id"`
Type string `json:"type"`
Title string `json:"title"`
Description string `json:"description"`
Priority int `json:"priority"`
Impact string `json:"impact"`
Effort string `json:"effort"`
GeneratedAt time.Time `json:"generated_at"`
Metadata map[string]interface{} `json:"metadata"`
}
// MonitoringEvent represents a monitoring system event
type MonitoringEvent struct {
Type string `json:"type"`
Level string `json:"level"`
Message string `json:"message"`
Component string `json:"component"`
NodeID string `json:"node_id"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata"`
}
// StructuredLogger provides structured logging for storage operations
type StructuredLogger struct {
mu sync.RWMutex
level LogLevel
output LogOutput
formatter LogFormatter
buffer []*LogEntry
maxBuffer int
}
// LogLevel defines logging levels
type LogLevel int
const (
LogDebug LogLevel = iota
LogInfo
LogWarning
LogError
LogCritical
)
// LogOutput interface for different output destinations
type LogOutput interface {
Write(entry *LogEntry) error
Flush() error
}
// LogFormatter interface for different log formats
type LogFormatter interface {
Format(entry *LogEntry) ([]byte, error)
}
// LogEntry represents a single log entry
type LogEntry struct {
Level LogLevel `json:"level"`
Message string `json:"message"`
Component string `json:"component"`
Operation string `json:"operation"`
NodeID string `json:"node_id"`
Timestamp time.Time `json:"timestamp"`
Fields map[string]interface{} `json:"fields"`
Error error `json:"error,omitempty"`
}
// NewMonitoringSystem creates a new monitoring system
func NewMonitoringSystem(nodeID string) *MonitoringSystem {
ms := &MonitoringSystem{
nodeID: nodeID,
metrics: initializeMetrics(nodeID),
alerts: newAlertManager(),
healthChecker: newHealthChecker(),
performanceProfiler: newPerformanceProfiler(),
logger: newStructuredLogger(),
notifications: make(chan *MonitoringEvent, 1000),
stopCh: make(chan struct{}),
}
// Start monitoring goroutines
go ms.monitoringLoop()
go ms.healthCheckLoop()
go ms.alertEvaluationLoop()
go ms.performanceAnalysisLoop()
return ms
}
// initializeMetrics creates and registers all Prometheus metrics
func initializeMetrics(nodeID string) *StorageMetrics {
labels := prometheus.Labels{"node_id": nodeID}
return &StorageMetrics{
// Operation counters
StoreOperations: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_store_operations_total",
Help: "Total number of store operations",
ConstLabels: labels,
}),
RetrieveOperations: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_retrieve_operations_total",
Help: "Total number of retrieve operations",
ConstLabels: labels,
}),
DeleteOperations: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_delete_operations_total",
Help: "Total number of delete operations",
ConstLabels: labels,
}),
UpdateOperations: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_update_operations_total",
Help: "Total number of update operations",
ConstLabels: labels,
}),
SearchOperations: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_search_operations_total",
Help: "Total number of search operations",
ConstLabels: labels,
}),
BatchOperations: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_batch_operations_total",
Help: "Total number of batch operations",
ConstLabels: labels,
}),
// Error counters
StoreErrors: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_store_errors_total",
Help: "Total number of store errors",
ConstLabels: labels,
}),
RetrieveErrors: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_retrieve_errors_total",
Help: "Total number of retrieve errors",
ConstLabels: labels,
}),
EncryptionErrors: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_encryption_errors_total",
Help: "Total number of encryption errors",
ConstLabels: labels,
}),
// Latency histograms
StoreLatency: promauto.NewHistogram(prometheus.HistogramOpts{
Name: "slurp_storage_store_latency_seconds",
Help: "Store operation latency in seconds",
ConstLabels: labels,
Buckets: prometheus.DefBuckets,
}),
RetrieveLatency: promauto.NewHistogram(prometheus.HistogramOpts{
Name: "slurp_storage_retrieve_latency_seconds",
Help: "Retrieve operation latency in seconds",
ConstLabels: labels,
Buckets: prometheus.DefBuckets,
}),
// Cache metrics
CacheHits: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_cache_hits_total",
Help: "Total number of cache hits",
ConstLabels: labels,
}),
CacheMisses: promauto.NewCounter(prometheus.CounterOpts{
Name: "slurp_storage_cache_misses_total",
Help: "Total number of cache misses",
ConstLabels: labels,
}),
// Storage size gauges
LocalStorageSize: promauto.NewGauge(prometheus.GaugeOpts{
Name: "slurp_storage_local_size_bytes",
Help: "Local storage size in bytes",
ConstLabels: labels,
}),
DistributedStorageSize: promauto.NewGauge(prometheus.GaugeOpts{
Name: "slurp_storage_distributed_size_bytes",
Help: "Distributed storage size in bytes",
ConstLabels: labels,
}),
// Health metrics
StorageHealth: promauto.NewGauge(prometheus.GaugeOpts{
Name: "slurp_storage_health_status",
Help: "Storage health status (1=healthy, 0=unhealthy)",
ConstLabels: labels,
}),
}
}
// Recording methods for metrics
func (ms *MonitoringSystem) RecordStoreOperation(duration time.Duration, success bool) {
ms.metrics.StoreOperations.Inc()
ms.metrics.StoreLatency.Observe(duration.Seconds())
if !success {
ms.metrics.StoreErrors.Inc()
}
}
func (ms *MonitoringSystem) RecordRetrieveOperation(duration time.Duration, success bool, cacheHit bool) {
ms.metrics.RetrieveOperations.Inc()
ms.metrics.RetrieveLatency.Observe(duration.Seconds())
if !success {
ms.metrics.RetrieveErrors.Inc()
}
if cacheHit {
ms.metrics.CacheHits.Inc()
} else {
ms.metrics.CacheMisses.Inc()
}
}
func (ms *MonitoringSystem) RecordEncryptionOperation(duration time.Duration, success bool) {
ms.metrics.EncryptionLatency.Observe(duration.Seconds())
if !success {
ms.metrics.EncryptionErrors.Inc()
}
}
func (ms *MonitoringSystem) UpdateStorageSize(local, distributed, compressed, index int64) {
ms.metrics.LocalStorageSize.Set(float64(local))
ms.metrics.DistributedStorageSize.Set(float64(distributed))
ms.metrics.CompressedStorageSize.Set(float64(compressed))
ms.metrics.IndexStorageSize.Set(float64(index))
}
func (ms *MonitoringSystem) UpdateHealthStatus(healthy bool) {
if healthy {
ms.metrics.StorageHealth.Set(1)
} else {
ms.metrics.StorageHealth.Set(0)
}
}
// Main monitoring loops
func (ms *MonitoringSystem) monitoringLoop() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ms.collectSystemMetrics()
case event := <-ms.notifications:
ms.processMonitoringEvent(event)
case <-ms.stopCh:
return
}
}
}
func (ms *MonitoringSystem) healthCheckLoop() {
ticker := time.NewTicker(1 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ms.performHealthChecks()
case <-ms.stopCh:
return
}
}
}
func (ms *MonitoringSystem) alertEvaluationLoop() {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ms.evaluateAlertRules()
case <-ms.stopCh:
return
}
}
}
func (ms *MonitoringSystem) performanceAnalysisLoop() {
ticker := time.NewTicker(5 * time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
ms.analyzePerformance()
case <-ms.stopCh:
return
}
}
}
// Implementation of monitoring functions (simplified)
func (ms *MonitoringSystem) collectSystemMetrics() {
// Collect system-level metrics
// This would integrate with system monitoring tools
}
func (ms *MonitoringSystem) processMonitoringEvent(event *MonitoringEvent) {
// Process monitoring events
ms.logger.LogEvent(event)
}
func (ms *MonitoringSystem) performHealthChecks() {
// Execute all registered health checks
ms.healthChecker.mu.RLock()
checks := ms.healthChecker.checks
ms.healthChecker.mu.RUnlock()
for _, check := range checks {
if check.Enabled {
go ms.executeHealthCheck(check)
}
}
}
func (ms *MonitoringSystem) executeHealthCheck(check HealthCheck) {
ctx, cancel := context.WithTimeout(context.Background(), check.Timeout)
defer cancel()
result := check.Checker(ctx)
ms.healthChecker.mu.Lock()
ms.healthChecker.status.Components[check.Name] = result
ms.healthChecker.mu.Unlock()
}
func (ms *MonitoringSystem) evaluateAlertRules() {
// Evaluate alert rules against current metrics
// This would query Prometheus metrics and trigger alerts
}
func (ms *MonitoringSystem) analyzePerformance() {
// Analyze performance patterns and generate recommendations
ms.performanceProfiler.analyzeBottlenecks()
ms.performanceProfiler.generateRecommendations()
}
// Helper functions and implementations
func newAlertManager() *AlertManager {
return &AlertManager{
rules: make([]*AlertRule, 0),
activealerts: make(map[string]*Alert),
notifiers: make([]AlertNotifier, 0),
history: make([]*Alert, 0),
maxHistory: 1000,
}
}
func newHealthChecker() *HealthChecker {
return &HealthChecker{
checks: make(map[string]HealthCheck),
status: &SystemHealth{
OverallStatus: HealthHealthy,
Components: make(map[string]HealthResult),
StartTime: time.Now(),
},
checkInterval: 1 * time.Minute,
timeout: 30 * time.Second,
}
}
func newPerformanceProfiler() *PerformanceProfiler {
return &PerformanceProfiler{
operationProfiles: make(map[string]*OperationProfile),
resourceUsage: &ResourceUsage{},
bottlenecks: make([]*Bottleneck, 0),
recommendations: make([]*PerformanceRecommendation, 0),
}
}
func newStructuredLogger() *StructuredLogger {
return &StructuredLogger{
level: LogInfo,
buffer: make([]*LogEntry, 0),
maxBuffer: 10000,
}
}
func (sl *StructuredLogger) LogEvent(event *MonitoringEvent) {
entry := &LogEntry{
Level: LogInfo,
Message: event.Message,
Component: event.Component,
NodeID: event.NodeID,
Timestamp: event.Timestamp,
Fields: event.Metadata,
}
sl.mu.Lock()
sl.buffer = append(sl.buffer, entry)
if len(sl.buffer) > sl.maxBuffer {
sl.buffer = sl.buffer[1:] // Remove oldest entry
}
sl.mu.Unlock()
}
func (pp *PerformanceProfiler) analyzeBottlenecks() {
// Analyze performance data to identify bottlenecks
// This would examine latency patterns, error rates, etc.
}
func (pp *PerformanceProfiler) generateRecommendations() {
// Generate performance improvement recommendations
// This would analyze patterns and suggest optimizations
}
// GetMonitoringStats returns comprehensive monitoring statistics
func (ms *MonitoringSystem) GetMonitoringStats() (*MonitoringStats, error) {
ms.mu.RLock()
defer ms.mu.RUnlock()
stats := &MonitoringStats{
NodeID: ms.nodeID,
Timestamp: time.Now(),
HealthStatus: ms.healthChecker.status.OverallStatus,
ActiveAlerts: len(ms.alerts.activealerts),
Bottlenecks: len(ms.performanceProfiler.bottlenecks),
}
return stats, nil
}
// MonitoringStats contains monitoring system statistics
type MonitoringStats struct {
NodeID string `json:"node_id"`
Timestamp time.Time `json:"timestamp"`
HealthStatus HealthStatus `json:"health_status"`
ActiveAlerts int `json:"active_alerts"`
Bottlenecks int `json:"bottlenecks"`
}
// Close shuts down the monitoring system
func (ms *MonitoringSystem) Close() error {
close(ms.stopCh)
return nil
}