Implement chrs-observer: Real-time TUI dashboard for cluster monitoring
This commit is contained in:
@@ -10,5 +10,8 @@ tokio = { version = "1.0", features = ["full"] }
|
||||
chrs-mail = { path = "../chrs-mail" }
|
||||
chrs-bubble = { path = "../chrs-bubble" }
|
||||
chrs-backbeat = { path = "../chrs-backbeat" }
|
||||
chrs-discovery = { path = "../chrs-discovery" }
|
||||
chrs-council = { path = "../chrs-council" }
|
||||
chrono = "0.4"
|
||||
serde = { version = "1.0", features = ["derive"] }
|
||||
serde_json = "1.0"
|
||||
|
||||
@@ -1,9 +1,10 @@
|
||||
//! chrs-observer: Real-time TUI dashboard for CHORUS.
|
||||
//! chrs-observer: Real-time TUI dashboard for CHORUS cluster monitoring.
|
||||
|
||||
use ratatui::{
|
||||
backend::CrosstermBackend,
|
||||
widgets::{Block, Borders, Paragraph, List, ListItem},
|
||||
widgets::{Block, Borders, Paragraph, List, ListItem, Gauge},
|
||||
layout::{Layout, Constraint, Direction},
|
||||
style::{Color, Modifier, Style},
|
||||
Terminal,
|
||||
};
|
||||
use crossterm::{
|
||||
@@ -12,10 +13,17 @@ use crossterm::{
|
||||
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
|
||||
};
|
||||
use std::{error::Error, io, time::{Duration, Instant}};
|
||||
use tokio::sync::mpsc;
|
||||
use chrs_discovery::{SwarmManager, BusMessage};
|
||||
use chrs_backbeat::BeatFrame;
|
||||
use chrs_council::Peer;
|
||||
use std::collections::HashMap;
|
||||
|
||||
/// State of the TUI application.
|
||||
struct App {
|
||||
pulse_bpm: u32,
|
||||
beat_index: u32,
|
||||
peers: HashMap<String, Peer>,
|
||||
logs: Vec<String>,
|
||||
}
|
||||
|
||||
@@ -24,22 +32,75 @@ impl App {
|
||||
App {
|
||||
pulse_bpm: 30,
|
||||
beat_index: 0,
|
||||
logs: vec!["[OBSERVER] Initialized.".into()],
|
||||
peers: HashMap::new(),
|
||||
logs: vec!["[OBSERVER] Initialized. Waiting for P2P events...".into()],
|
||||
}
|
||||
}
|
||||
|
||||
fn add_log(&mut self, log: String) {
|
||||
self.logs.push(log);
|
||||
if self.logs.len() > 50 {
|
||||
self.logs.remove(0);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn main() -> Result<(), Box<dyn Error>> {
|
||||
// Setup terminal
|
||||
#[tokio::main]
|
||||
async fn main() -> Result<(), Box<dyn Error>> {
|
||||
// 1. Setup LibP2P Bus
|
||||
let (bus_tx, mut bus_rx) = mpsc::unbounded_channel::<BusMessage>();
|
||||
let _bus_handle = SwarmManager::start_bus(bus_tx).await?;
|
||||
|
||||
// 2. Setup Terminal
|
||||
enable_raw_mode()?;
|
||||
let mut stdout = io::stdout();
|
||||
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
|
||||
let backend = CrosstermBackend::new(stdout);
|
||||
let mut terminal = Terminal::new(backend)?;
|
||||
|
||||
// Create app and run loop
|
||||
let app = App::new();
|
||||
let res = run_app(&mut terminal, app);
|
||||
// 3. App State
|
||||
let mut app = App::new();
|
||||
|
||||
// 4. Main Loop
|
||||
let tick_rate = Duration::from_millis(100);
|
||||
let mut last_tick = Instant::now();
|
||||
|
||||
loop {
|
||||
terminal.draw(|f| ui(f, &app))?;
|
||||
|
||||
// Handle P2P Messages
|
||||
while let Ok(msg) = bus_rx.try_recv() {
|
||||
if msg.topic == "chorus-heartbeat" {
|
||||
if let Ok(peer) = serde_json::from_slice::<Peer>(&msg.payload) {
|
||||
if !app.peers.contains_key(&peer.id) {
|
||||
app.add_log(format!("[P2P] Discovered Agent: {} ({:?})", peer.id, peer.role));
|
||||
}
|
||||
app.peers.insert(peer.id.clone(), peer);
|
||||
}
|
||||
} else if msg.topic == "chorus-global" || msg.topic == "beat_frame" {
|
||||
if let Ok(frame) = serde_json::from_slice::<BeatFrame>(&msg.payload) {
|
||||
app.beat_index = frame.beat_index;
|
||||
app.pulse_bpm = frame.tempo_bpm;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Handle Input
|
||||
let timeout = tick_rate
|
||||
.checked_sub(last_tick.elapsed())
|
||||
.unwrap_or_else(|| Duration::from_secs(0));
|
||||
if crossterm::event::poll(timeout)? {
|
||||
if let Event::Key(key) = event::read()? {
|
||||
if let KeyCode::Char('q') = key.code {
|
||||
break;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if last_tick.elapsed() >= tick_rate {
|
||||
last_tick = Instant::now();
|
||||
}
|
||||
}
|
||||
|
||||
// Restore terminal
|
||||
disable_raw_mode()?;
|
||||
@@ -50,54 +111,58 @@ fn main() -> Result<(), Box<dyn Error>> {
|
||||
)?;
|
||||
terminal.show_cursor()?;
|
||||
|
||||
if let Err(err) = res {
|
||||
println!("{:?}", err)
|
||||
}
|
||||
|
||||
Ok(())
|
||||
}
|
||||
|
||||
fn run_app<B: ratatui::backend::Backend>(terminal: &mut Terminal<B>, mut app: App) -> io::Result<()> {
|
||||
let tick_rate = Duration::from_millis(250);
|
||||
let mut last_tick = Instant::now();
|
||||
loop {
|
||||
terminal.draw(|f| ui(f, &app))?;
|
||||
|
||||
let timeout = tick_rate
|
||||
.checked_sub(last_tick.elapsed())
|
||||
.unwrap_or_else(|| Duration::from_secs(0));
|
||||
if crossterm::event::poll(timeout)? {
|
||||
if let Event::Key(key) = event::read()? {
|
||||
if let KeyCode::Char('q') = key.code {
|
||||
return Ok(());
|
||||
}
|
||||
}
|
||||
}
|
||||
if last_tick.elapsed() >= tick_rate {
|
||||
// In a real app, we would update beat_index from chrs-backbeat here
|
||||
last_tick = Instant::now();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
fn ui(f: &mut ratatui::Frame, app: &App) {
|
||||
let chunks = Layout::default()
|
||||
.direction(Direction::Vertical)
|
||||
.margin(1)
|
||||
.constraints(
|
||||
[
|
||||
Constraint::Length(3),
|
||||
Constraint::Min(0),
|
||||
Constraint::Length(3), // Pulse Gauge
|
||||
Constraint::Min(0), // Body
|
||||
]
|
||||
.as_ref(),
|
||||
)
|
||||
.split(f.size());
|
||||
|
||||
let header = Paragraph::new(format!("CHORUS CLUSTER DASHBOARD | BPM: {} | BEAT: {}", app.pulse_bpm, app.beat_index))
|
||||
.block(Block::default().borders(Borders::ALL).title("Pulse"));
|
||||
f.render_widget(header, chunks[0]);
|
||||
// --- Header / Pulse ---
|
||||
let beat_progress = (app.beat_index as f32 / 8.0) * 100.0;
|
||||
let pulse_title = format!(" CHORUS CLUSTER PULSE | {} BPM | Beat: {}/8 ", app.pulse_bpm, app.beat_index);
|
||||
let pulse_gauge = Gauge::default()
|
||||
.block(Block::default().borders(Borders::ALL).title(pulse_title))
|
||||
.gauge_style(Style::default().fg(Color::Cyan).bg(Color::Black).add_modifier(Modifier::BOLD))
|
||||
.percent(beat_progress as u16);
|
||||
f.render_widget(pulse_gauge, chunks[0]);
|
||||
|
||||
let logs: Vec<ListItem> = app.logs.iter().rev().map(|s| ListItem::new(s.as_str())).collect();
|
||||
let log_list = List::new(logs).block(Block::default().borders(Borders::ALL).title("Live Events"));
|
||||
f.render_widget(log_list, chunks[1]);
|
||||
// --- Body (Peers and Logs) ---
|
||||
let body_chunks = Layout::default()
|
||||
.direction(Direction::Horizontal)
|
||||
.constraints(
|
||||
[
|
||||
Constraint::Percentage(30), // Peer List
|
||||
Constraint::Percentage(70), // Logs
|
||||
]
|
||||
.as_ref(),
|
||||
)
|
||||
.split(chunks[1]);
|
||||
|
||||
// Peer List
|
||||
let peers: Vec<ListItem> = app.peers.values()
|
||||
.map(|p| {
|
||||
let content = format!(" {} [{:?}] Score: {:.2}", p.id, p.role, p.resource_score);
|
||||
ListItem::new(content).style(Style::default().fg(Color::Yellow))
|
||||
})
|
||||
.collect();
|
||||
let peer_list = List::new(peers)
|
||||
.block(Block::default().borders(Borders::ALL).title(" Active Agents "));
|
||||
f.render_widget(peer_list, body_chunks[0]);
|
||||
|
||||
// Logs
|
||||
let logs: Vec<ListItem> = app.logs.iter().rev()
|
||||
.map(|s| ListItem::new(s.as_str()))
|
||||
.collect();
|
||||
let log_list = List::new(logs)
|
||||
.block(Block::default().borders(Borders::ALL).title(" Live Event Bus "));
|
||||
f.render_widget(log_list, body_chunks[1]);
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user