From cbb7366933d31baec43768074f37d21f4f610e0a Mon Sep 17 00:00:00 2001 From: anthonyrawlins Date: Wed, 4 Mar 2026 03:54:19 +1100 Subject: [PATCH] Implement chrs-observer: Real-time TUI dashboard for cluster monitoring --- chrs-observer/Cargo.toml | 3 + chrs-observer/src/main.rs | 153 +++++++++++++++++++++++++++----------- 2 files changed, 112 insertions(+), 44 deletions(-) diff --git a/chrs-observer/Cargo.toml b/chrs-observer/Cargo.toml index af871f68..ef2502e7 100644 --- a/chrs-observer/Cargo.toml +++ b/chrs-observer/Cargo.toml @@ -10,5 +10,8 @@ tokio = { version = "1.0", features = ["full"] } chrs-mail = { path = "../chrs-mail" } chrs-bubble = { path = "../chrs-bubble" } chrs-backbeat = { path = "../chrs-backbeat" } +chrs-discovery = { path = "../chrs-discovery" } +chrs-council = { path = "../chrs-council" } chrono = "0.4" +serde = { version = "1.0", features = ["derive"] } serde_json = "1.0" diff --git a/chrs-observer/src/main.rs b/chrs-observer/src/main.rs index 5144465c..f88bb36b 100644 --- a/chrs-observer/src/main.rs +++ b/chrs-observer/src/main.rs @@ -1,9 +1,10 @@ -//! chrs-observer: Real-time TUI dashboard for CHORUS. +//! chrs-observer: Real-time TUI dashboard for CHORUS cluster monitoring. use ratatui::{ backend::CrosstermBackend, - widgets::{Block, Borders, Paragraph, List, ListItem}, + widgets::{Block, Borders, Paragraph, List, ListItem, Gauge}, layout::{Layout, Constraint, Direction}, + style::{Color, Modifier, Style}, Terminal, }; use crossterm::{ @@ -12,10 +13,17 @@ use crossterm::{ terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen}, }; use std::{error::Error, io, time::{Duration, Instant}}; +use tokio::sync::mpsc; +use chrs_discovery::{SwarmManager, BusMessage}; +use chrs_backbeat::BeatFrame; +use chrs_council::Peer; +use std::collections::HashMap; +/// State of the TUI application. struct App { pulse_bpm: u32, beat_index: u32, + peers: HashMap, logs: Vec, } @@ -24,22 +32,75 @@ impl App { App { pulse_bpm: 30, beat_index: 0, - logs: vec!["[OBSERVER] Initialized.".into()], + peers: HashMap::new(), + logs: vec!["[OBSERVER] Initialized. Waiting for P2P events...".into()], + } + } + + fn add_log(&mut self, log: String) { + self.logs.push(log); + if self.logs.len() > 50 { + self.logs.remove(0); } } } -fn main() -> Result<(), Box> { - // Setup terminal +#[tokio::main] +async fn main() -> Result<(), Box> { + // 1. Setup LibP2P Bus + let (bus_tx, mut bus_rx) = mpsc::unbounded_channel::(); + let _bus_handle = SwarmManager::start_bus(bus_tx).await?; + + // 2. Setup Terminal enable_raw_mode()?; let mut stdout = io::stdout(); execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?; let backend = CrosstermBackend::new(stdout); let mut terminal = Terminal::new(backend)?; - // Create app and run loop - let app = App::new(); - let res = run_app(&mut terminal, app); + // 3. App State + let mut app = App::new(); + + // 4. Main Loop + let tick_rate = Duration::from_millis(100); + let mut last_tick = Instant::now(); + + loop { + terminal.draw(|f| ui(f, &app))?; + + // Handle P2P Messages + while let Ok(msg) = bus_rx.try_recv() { + if msg.topic == "chorus-heartbeat" { + if let Ok(peer) = serde_json::from_slice::(&msg.payload) { + if !app.peers.contains_key(&peer.id) { + app.add_log(format!("[P2P] Discovered Agent: {} ({:?})", peer.id, peer.role)); + } + app.peers.insert(peer.id.clone(), peer); + } + } else if msg.topic == "chorus-global" || msg.topic == "beat_frame" { + if let Ok(frame) = serde_json::from_slice::(&msg.payload) { + app.beat_index = frame.beat_index; + app.pulse_bpm = frame.tempo_bpm; + } + } + } + + // Handle Input + let timeout = tick_rate + .checked_sub(last_tick.elapsed()) + .unwrap_or_else(|| Duration::from_secs(0)); + if crossterm::event::poll(timeout)? { + if let Event::Key(key) = event::read()? { + if let KeyCode::Char('q') = key.code { + break; + } + } + } + + if last_tick.elapsed() >= tick_rate { + last_tick = Instant::now(); + } + } // Restore terminal disable_raw_mode()?; @@ -50,54 +111,58 @@ fn main() -> Result<(), Box> { )?; terminal.show_cursor()?; - if let Err(err) = res { - println!("{:?}", err) - } - Ok(()) } -fn run_app(terminal: &mut Terminal, mut app: App) -> io::Result<()> { - let tick_rate = Duration::from_millis(250); - let mut last_tick = Instant::now(); - loop { - terminal.draw(|f| ui(f, &app))?; - - let timeout = tick_rate - .checked_sub(last_tick.elapsed()) - .unwrap_or_else(|| Duration::from_secs(0)); - if crossterm::event::poll(timeout)? { - if let Event::Key(key) = event::read()? { - if let KeyCode::Char('q') = key.code { - return Ok(()); - } - } - } - if last_tick.elapsed() >= tick_rate { - // In a real app, we would update beat_index from chrs-backbeat here - last_tick = Instant::now(); - } - } -} - fn ui(f: &mut ratatui::Frame, app: &App) { let chunks = Layout::default() .direction(Direction::Vertical) - .margin(1) .constraints( [ - Constraint::Length(3), - Constraint::Min(0), + Constraint::Length(3), // Pulse Gauge + Constraint::Min(0), // Body ] .as_ref(), ) .split(f.size()); - let header = Paragraph::new(format!("CHORUS CLUSTER DASHBOARD | BPM: {} | BEAT: {}", app.pulse_bpm, app.beat_index)) - .block(Block::default().borders(Borders::ALL).title("Pulse")); - f.render_widget(header, chunks[0]); + // --- Header / Pulse --- + let beat_progress = (app.beat_index as f32 / 8.0) * 100.0; + let pulse_title = format!(" CHORUS CLUSTER PULSE | {} BPM | Beat: {}/8 ", app.pulse_bpm, app.beat_index); + let pulse_gauge = Gauge::default() + .block(Block::default().borders(Borders::ALL).title(pulse_title)) + .gauge_style(Style::default().fg(Color::Cyan).bg(Color::Black).add_modifier(Modifier::BOLD)) + .percent(beat_progress as u16); + f.render_widget(pulse_gauge, chunks[0]); - let logs: Vec = app.logs.iter().rev().map(|s| ListItem::new(s.as_str())).collect(); - let log_list = List::new(logs).block(Block::default().borders(Borders::ALL).title("Live Events")); - f.render_widget(log_list, chunks[1]); + // --- Body (Peers and Logs) --- + let body_chunks = Layout::default() + .direction(Direction::Horizontal) + .constraints( + [ + Constraint::Percentage(30), // Peer List + Constraint::Percentage(70), // Logs + ] + .as_ref(), + ) + .split(chunks[1]); + + // Peer List + let peers: Vec = app.peers.values() + .map(|p| { + let content = format!(" {} [{:?}] Score: {:.2}", p.id, p.role, p.resource_score); + ListItem::new(content).style(Style::default().fg(Color::Yellow)) + }) + .collect(); + let peer_list = List::new(peers) + .block(Block::default().borders(Borders::ALL).title(" Active Agents ")); + f.render_widget(peer_list, body_chunks[0]); + + // Logs + let logs: Vec = app.logs.iter().rev() + .map(|s| ListItem::new(s.as_str())) + .collect(); + let log_list = List::new(logs) + .block(Block::default().borders(Borders::ALL).title(" Live Event Bus ")); + f.render_widget(log_list, body_chunks[1]); }