/*! * BZZZ SDK Rust Performance Monitor Example * ========================================= * * Demonstrates high-performance monitoring and metrics collection using BZZZ SDK for Rust. * Shows async operations, custom metrics, and efficient data processing. */ use std::collections::HashMap; use std::sync::Arc; use std::time::{Duration, Instant, SystemTime, UNIX_EPOCH}; use tokio::sync::{Mutex, mpsc}; use tokio::time::interval; use serde::{Deserialize, Serialize}; use tracing::{info, warn, error, debug}; use tracing_subscriber; // BZZZ SDK imports (would be from crates.io: bzzz-sdk = "2.0") use bzzz_sdk::{BzzzClient, Config as BzzzConfig}; use bzzz_sdk::decisions::{CodeDecision, TestResults, DecisionClient}; use bzzz_sdk::dht::{DhtClient, DhtMetrics}; use bzzz_sdk::crypto::CryptoClient; use bzzz_sdk::elections::ElectionClient; #[derive(Debug, Clone, Serialize, Deserialize)] struct PerformanceMetrics { timestamp: u64, cpu_usage: f64, memory_usage: f64, network_latency: f64, dht_operations: u32, crypto_operations: u32, decision_throughput: u32, error_count: u32, } #[derive(Debug, Clone, Serialize)] struct SystemHealth { overall_status: String, component_health: HashMap, performance_score: f64, alerts: Vec, } struct PerformanceMonitor { client: Arc, decisions: Arc, dht: Arc, crypto: Arc, elections: Arc, metrics: Arc>>, alert_sender: mpsc::Sender, is_running: Arc>, config: MonitorConfig, } #[derive(Debug, Clone)] struct MonitorConfig { collection_interval: Duration, alert_threshold_cpu: f64, alert_threshold_memory: f64, alert_threshold_latency: f64, metrics_retention: usize, publish_interval: Duration, } impl Default for MonitorConfig { fn default() -> Self { Self { collection_interval: Duration::from_secs(10), alert_threshold_cpu: 80.0, alert_threshold_memory: 85.0, alert_threshold_latency: 1000.0, metrics_retention: 1000, publish_interval: Duration::from_secs(60), } } } impl PerformanceMonitor { async fn new(endpoint: &str, role: &str) -> Result> { // Initialize tracing tracing_subscriber::fmt::init(); info!("🚀 Initializing BZZZ Performance Monitor"); // Create BZZZ client let client = Arc::new(BzzzClient::new(BzzzConfig { endpoint: endpoint.to_string(), role: role.to_string(), timeout: Duration::from_secs(30), retry_count: 3, rate_limit: 100, ..Default::default() }).await?); // Create specialized clients let decisions = Arc::new(DecisionClient::new(client.clone())); let dht = Arc::new(DhtClient::new(client.clone())); let crypto = Arc::new(CryptoClient::new(client.clone())); let elections = Arc::new(ElectionClient::new(client.clone())); // Test connection let status = client.get_status().await?; info!("✅ Connected to BZZZ node"); info!(" Node ID: {}", status.node_id); info!(" Agent ID: {}", status.agent_id); info!(" Role: {}", status.role); let (alert_sender, _) = mpsc::channel(100); Ok(Self { client, decisions, dht, crypto, elections, metrics: Arc::new(Mutex::new(Vec::new())), alert_sender, is_running: Arc::new(Mutex::new(false)), config: MonitorConfig::default(), }) } async fn start_monitoring(&self) -> Result<(), Box> { info!("📊 Starting performance monitoring..."); { let mut is_running = self.is_running.lock().await; *is_running = true; } // Spawn monitoring tasks let monitor_clone = self.clone_for_task(); let metrics_task = tokio::spawn(async move { monitor_clone.metrics_collection_loop().await; }); let monitor_clone = self.clone_for_task(); let analysis_task = tokio::spawn(async move { monitor_clone.performance_analysis_loop().await; }); let monitor_clone = self.clone_for_task(); let publish_task = tokio::spawn(async move { monitor_clone.metrics_publishing_loop().await; }); let monitor_clone = self.clone_for_task(); let health_task = tokio::spawn(async move { monitor_clone.health_monitoring_loop().await; }); info!("✅ Monitoring tasks started"); info!(" Metrics collection: every {:?}", self.config.collection_interval); info!(" Publishing interval: every {:?}", self.config.publish_interval); // Wait for tasks (in a real app, you'd handle shutdown signals) tokio::try_join!(metrics_task, analysis_task, publish_task, health_task)?; Ok(()) } fn clone_for_task(&self) -> Self { Self { client: self.client.clone(), decisions: self.decisions.clone(), dht: self.dht.clone(), crypto: self.crypto.clone(), elections: self.elections.clone(), metrics: self.metrics.clone(), alert_sender: self.alert_sender.clone(), is_running: self.is_running.clone(), config: self.config.clone(), } } async fn metrics_collection_loop(&self) { let mut interval = interval(self.config.collection_interval); info!("📈 Starting metrics collection loop"); while self.is_running().await { interval.tick().await; match self.collect_performance_metrics().await { Ok(metrics) => { self.store_metrics(metrics).await; } Err(e) => { error!("Failed to collect metrics: {}", e); } } } info!("📊 Metrics collection stopped"); } async fn collect_performance_metrics(&self) -> Result> { let start_time = Instant::now(); // Collect system metrics (simulated for this example) let cpu_usage = self.get_cpu_usage().await?; let memory_usage = self.get_memory_usage().await?; // Test network latency to BZZZ node let latency_start = Instant::now(); let _status = self.client.get_status().await?; let network_latency = latency_start.elapsed().as_millis() as f64; // Get BZZZ-specific metrics let dht_metrics = self.dht.get_metrics().await?; let election_status = self.elections.get_status().await?; // Count recent operations (simplified) let dht_operations = dht_metrics.stored_items + dht_metrics.retrieved_items; let crypto_operations = dht_metrics.encryption_ops + dht_metrics.decryption_ops; let metrics = PerformanceMetrics { timestamp: SystemTime::now() .duration_since(UNIX_EPOCH)? .as_secs(), cpu_usage, memory_usage, network_latency, dht_operations, crypto_operations, decision_throughput: self.calculate_decision_throughput().await?, error_count: 0, // Would track actual errors }; debug!("Collected metrics in {:?}", start_time.elapsed()); Ok(metrics) } async fn get_cpu_usage(&self) -> Result> { // In a real implementation, this would use system APIs // For demo, simulate CPU usage Ok(rand::random::() * 30.0 + 20.0) // 20-50% usage } async fn get_memory_usage(&self) -> Result> { // In a real implementation, this would use system APIs // For demo, simulate memory usage Ok(rand::random::() * 25.0 + 45.0) // 45-70% usage } async fn calculate_decision_throughput(&self) -> Result> { // In a real implementation, this would track actual decision publishing rates // For demo, return a simulated value Ok((rand::random::() % 20) + 5) // 5-25 decisions per interval } async fn store_metrics(&self, metrics: PerformanceMetrics) { let mut metrics_vec = self.metrics.lock().await; // Add new metrics metrics_vec.push(metrics.clone()); // Maintain retention limit if metrics_vec.len() > self.config.metrics_retention { metrics_vec.remove(0); } // Check for alerts if metrics.cpu_usage > self.config.alert_threshold_cpu { self.send_alert(format!("High CPU usage: {:.1}%", metrics.cpu_usage)).await; } if metrics.memory_usage > self.config.alert_threshold_memory { self.send_alert(format!("High memory usage: {:.1}%", metrics.memory_usage)).await; } if metrics.network_latency > self.config.alert_threshold_latency { self.send_alert(format!("High network latency: {:.0}ms", metrics.network_latency)).await; } } async fn performance_analysis_loop(&self) { let mut interval = interval(Duration::from_secs(30)); info!("🔍 Starting performance analysis loop"); while self.is_running().await { interval.tick().await; match self.analyze_performance_trends().await { Ok(_) => debug!("Performance analysis completed"), Err(e) => error!("Performance analysis failed: {}", e), } } info!("🔍 Performance analysis stopped"); } async fn analyze_performance_trends(&self) -> Result<(), Box> { let metrics = self.metrics.lock().await; if metrics.len() < 10 { return Ok(()); // Need more data points } let recent = &metrics[metrics.len()-10..]; // Calculate trends let avg_cpu = recent.iter().map(|m| m.cpu_usage).sum::() / recent.len() as f64; let avg_memory = recent.iter().map(|m| m.memory_usage).sum::() / recent.len() as f64; let avg_latency = recent.iter().map(|m| m.network_latency).sum::() / recent.len() as f64; // Check for trends let cpu_trend = self.calculate_trend(recent.iter().map(|m| m.cpu_usage).collect()); let memory_trend = self.calculate_trend(recent.iter().map(|m| m.memory_usage).collect()); debug!("Performance trends: CPU {:.1}% ({}), Memory {:.1}% ({}), Latency {:.0}ms", avg_cpu, cpu_trend, avg_memory, memory_trend, avg_latency); // Alert on concerning trends if cpu_trend == "increasing" && avg_cpu > 60.0 { self.send_alert("CPU usage trending upward".to_string()).await; } if memory_trend == "increasing" && avg_memory > 70.0 { self.send_alert("Memory usage trending upward".to_string()).await; } Ok(()) } fn calculate_trend(&self, values: Vec) -> &'static str { if values.len() < 5 { return "insufficient_data"; } let mid = values.len() / 2; let first_half: f64 = values[..mid].iter().sum::() / mid as f64; let second_half: f64 = values[mid..].iter().sum::() / (values.len() - mid) as f64; let diff = second_half - first_half; if diff > 5.0 { "increasing" } else if diff < -5.0 { "decreasing" } else { "stable" } } async fn metrics_publishing_loop(&self) { let mut interval = interval(self.config.publish_interval); info!("📤 Starting metrics publishing loop"); while self.is_running().await { interval.tick().await; match self.publish_performance_report().await { Ok(_) => debug!("Performance report published"), Err(e) => error!("Failed to publish performance report: {}", e), } } info!("📤 Metrics publishing stopped"); } async fn publish_performance_report(&self) -> Result<(), Box> { let metrics = self.metrics.lock().await; if metrics.is_empty() { return Ok(()); } // Calculate summary statistics let recent_metrics = if metrics.len() > 60 { &metrics[metrics.len()-60..] } else { &metrics[..] }; let avg_cpu = recent_metrics.iter().map(|m| m.cpu_usage).sum::() / recent_metrics.len() as f64; let avg_memory = recent_metrics.iter().map(|m| m.memory_usage).sum::() / recent_metrics.len() as f64; let avg_latency = recent_metrics.iter().map(|m| m.network_latency).sum::() / recent_metrics.len() as f64; let total_dht_ops: u32 = recent_metrics.iter().map(|m| m.dht_operations).sum(); let total_crypto_ops: u32 = recent_metrics.iter().map(|m| m.crypto_operations).sum(); // Publish system status decision self.decisions.publish_system_status(bzzz_sdk::decisions::SystemStatus { status: "Performance monitoring active".to_string(), metrics: { let mut map = std::collections::HashMap::new(); map.insert("avg_cpu_usage".to_string(), avg_cpu.into()); map.insert("avg_memory_usage".to_string(), avg_memory.into()); map.insert("avg_network_latency_ms".to_string(), avg_latency.into()); map.insert("dht_operations_total".to_string(), total_dht_ops.into()); map.insert("crypto_operations_total".to_string(), total_crypto_ops.into()); map.insert("metrics_collected".to_string(), metrics.len().into()); map }, health_checks: { let mut checks = std::collections::HashMap::new(); checks.insert("metrics_collection".to_string(), true); checks.insert("performance_analysis".to_string(), true); checks.insert("alert_system".to_string(), true); checks.insert("bzzz_connectivity".to_string(), avg_latency < 500.0); checks }, }).await?; info!("📊 Published performance report: CPU {:.1}%, Memory {:.1}%, Latency {:.0}ms", avg_cpu, avg_memory, avg_latency); Ok(()) } async fn health_monitoring_loop(&self) { let mut interval = interval(Duration::from_secs(120)); // Check health every 2 minutes info!("❤️ Starting health monitoring loop"); while self.is_running().await { interval.tick().await; match self.assess_system_health().await { Ok(health) => { if health.overall_status != "healthy" { warn!("System health: {}", health.overall_status); for alert in &health.alerts { self.send_alert(alert.clone()).await; } } else { debug!("System health: {} (score: {:.1})", health.overall_status, health.performance_score); } } Err(e) => error!("Health assessment failed: {}", e), } } info!("❤️ Health monitoring stopped"); } async fn assess_system_health(&self) -> Result> { let metrics = self.metrics.lock().await; let mut component_health = HashMap::new(); let mut alerts = Vec::new(); let mut health_score = 100.0; if let Some(latest) = metrics.last() { // CPU health if latest.cpu_usage > 90.0 { component_health.insert("cpu".to_string(), "critical".to_string()); alerts.push("CPU usage critical".to_string()); health_score -= 30.0; } else if latest.cpu_usage > 75.0 { component_health.insert("cpu".to_string(), "warning".to_string()); health_score -= 15.0; } else { component_health.insert("cpu".to_string(), "healthy".to_string()); } // Memory health if latest.memory_usage > 95.0 { component_health.insert("memory".to_string(), "critical".to_string()); alerts.push("Memory usage critical".to_string()); health_score -= 25.0; } else if latest.memory_usage > 80.0 { component_health.insert("memory".to_string(), "warning".to_string()); health_score -= 10.0; } else { component_health.insert("memory".to_string(), "healthy".to_string()); } // Network health if latest.network_latency > 2000.0 { component_health.insert("network".to_string(), "critical".to_string()); alerts.push("Network latency critical".to_string()); health_score -= 20.0; } else if latest.network_latency > 1000.0 { component_health.insert("network".to_string(), "warning".to_string()); health_score -= 10.0; } else { component_health.insert("network".to_string(), "healthy".to_string()); } } else { component_health.insert("metrics".to_string(), "no_data".to_string()); health_score -= 50.0; } let overall_status = if health_score >= 90.0 { "healthy".to_string() } else if health_score >= 70.0 { "warning".to_string() } else { "critical".to_string() }; Ok(SystemHealth { overall_status, component_health, performance_score: health_score, alerts, }) } async fn send_alert(&self, message: String) { warn!("🚨 ALERT: {}", message); // In a real implementation, you would: // - Send to alert channels (Slack, email, etc.) // - Store in alert database // - Trigger automated responses if let Err(e) = self.alert_sender.send(message).await { error!("Failed to send alert: {}", e); } } async fn is_running(&self) -> bool { *self.is_running.lock().await } async fn stop(&self) -> Result<(), Box> { info!("🛑 Stopping performance monitor..."); { let mut is_running = self.is_running.lock().await; *is_running = false; } // Publish final report self.publish_performance_report().await?; // Publish shutdown status self.decisions.publish_system_status(bzzz_sdk::decisions::SystemStatus { status: "Performance monitor shutting down".to_string(), metrics: std::collections::HashMap::new(), health_checks: { let mut checks = std::collections::HashMap::new(); checks.insert("monitoring_active".to_string(), false); checks }, }).await?; info!("✅ Performance monitor stopped"); Ok(()) } } #[tokio::main] async fn main() -> Result<(), Box> { let monitor = PerformanceMonitor::new("http://localhost:8080", "performance_monitor").await?; // Handle shutdown signals let monitor_clone = Arc::new(monitor); let monitor_for_signal = monitor_clone.clone(); tokio::spawn(async move { tokio::signal::ctrl_c().await.unwrap(); info!("🔄 Received shutdown signal..."); if let Err(e) = monitor_for_signal.stop().await { error!("Error during shutdown: {}", e); } std::process::exit(0); }); // Start monitoring monitor_clone.start_monitoring().await?; Ok(()) } // Additional helper modules would be here in a real implementation mod rand { pub fn random() -> T where T: From, { // Simplified random number generation for demo use std::time::{SystemTime, UNIX_EPOCH}; let seed = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .subsec_nanos(); T::from(seed % 100) } }