Files
CHORUS/chrs-observer/src/main.rs

241 lines
7.8 KiB
Rust

//! chrs-observer: Real-time TUI dashboard for CHORUS cluster monitoring.
use ratatui::{
backend::CrosstermBackend,
widgets::{Block, Borders, Paragraph, List, ListItem, Gauge, BorderType},
layout::{Layout, Constraint, Direction, Alignment},
style::{Color, Modifier, Style},
Terminal,
};
use crossterm::{
event::{self, DisableMouseCapture, EnableMouseCapture, Event, KeyCode},
execute,
terminal::{disable_raw_mode, enable_raw_mode, EnterAlternateScreen, LeaveAlternateScreen},
};
use std::{error::Error, io, time::{Duration, Instant}};
use tokio::sync::mpsc;
use chrs_discovery::{SwarmManager, BusMessage};
use chrs_backbeat::BeatFrame;
use chrs_council::Peer;
use std::collections::HashMap;
struct Project {
name: String,
status: String,
}
/// State of the TUI application.
struct App {
pulse_bpm: u32,
beat_index: u32,
peers: HashMap<String, Peer>,
logs: Vec<String>,
projects: Vec<Project>,
}
impl App {
fn new() -> App {
App {
pulse_bpm: 30,
beat_index: 0,
peers: HashMap::new(),
logs: vec!["[OBSERVER] Initialized. Waiting for P2P events...".into()],
projects: Vec::new(),
}
}
async fn update_projects(&mut self) {
if let Ok(res) = reqwest::get("http://127.0.0.1:3001/api/status").await {
if let Ok(projs) = res.json::<serde_json::Value>().await {
if let Some(arr) = projs.as_array() {
self.projects = arr.iter().map(|v| Project {
name: v["name"].as_str().unwrap_or("Unknown").to_string(),
status: v["status"].as_str().unwrap_or("Unknown").to_string(),
}).collect();
}
}
}
}
fn add_log(&mut self, log: String) {
let now = chrono::Local::now().format("%H:%M:%S").to_string();
self.logs.push(format!("[{}] {}", now, log));
if self.logs.len() > 100 {
self.logs.remove(0);
}
}
}
#[tokio::main]
async fn main() -> Result<(), Box<dyn Error>> {
// 1. Setup LibP2P Bus
let (bus_tx, mut bus_rx) = mpsc::unbounded_channel::<BusMessage>();
let _bus_handle = SwarmManager::start_bus(bus_tx).await?;
// 2. Setup Terminal
enable_raw_mode()?;
let mut stdout = io::stdout();
execute!(stdout, EnterAlternateScreen, EnableMouseCapture)?;
let backend = CrosstermBackend::new(stdout);
let mut terminal = Terminal::new(backend)?;
// 3. App State
let mut app = App::new();
// 4. Main Loop
let tick_rate = Duration::from_millis(100);
let mut last_tick = Instant::now();
let mut last_status_update = Instant::now();
loop {
terminal.draw(|f| ui(f, &app))?;
// Periodically fetch project status from GUI API
if last_status_update.elapsed() > Duration::from_secs(5) {
app.update_projects().await;
last_status_update = Instant::now();
}
// Handle P2P Messages
while let Ok(msg) = bus_rx.try_recv() {
if msg.topic == "chorus-heartbeat" {
if let Ok(peer) = serde_json::from_slice::<Peer>(&msg.payload) {
if !app.peers.contains_key(&peer.id) {
app.add_log(format!("Discovered Agent: {} ({:?})", peer.id, peer.role));
}
app.peers.insert(peer.id.clone(), peer);
}
} else if msg.topic == "chorus-global" || msg.topic == "beat_frame" {
if let Ok(frame) = serde_json::from_slice::<BeatFrame>(&msg.payload) {
app.beat_index = frame.beat_index;
app.pulse_bpm = frame.tempo_bpm;
}
}
}
// Handle Input
let timeout = tick_rate
.checked_sub(last_tick.elapsed())
.unwrap_or_else(|| Duration::from_secs(0));
if crossterm::event::poll(timeout)? {
if let Event::Key(key) = event::read()? {
if let KeyCode::Char('q') = key.code {
break;
}
}
}
if last_tick.elapsed() >= tick_rate {
last_tick = Instant::now();
}
}
// Restore terminal
disable_raw_mode()?;
execute!(
terminal.backend_mut(),
LeaveAlternateScreen,
DisableMouseCapture
)?;
terminal.show_cursor()?;
Ok(())
}
fn ui(f: &mut ratatui::Frame, app: &App) {
let size = f.size();
let chunks = Layout::default()
.direction(Direction::Vertical)
.constraints(
[
Constraint::Length(3), // Pulse and Status
Constraint::Min(0), // Main Body
]
.as_ref(),
)
.split(size);
// --- Header / Pulse ---
let header_chunks = Layout::default()
.direction(Direction::Horizontal)
.constraints(
[
Constraint::Percentage(40), // BPM & Status
Constraint::Percentage(60), // Rhythm Progress
]
.as_ref(),
)
.split(chunks[0]);
let status_text = format!(" CHORUS Cluster | {} BPM | Agents: {}", app.pulse_bpm, app.peers.len());
let status_para = Paragraph::new(status_text)
.block(Block::default().borders(Borders::ALL).border_type(BorderType::Rounded))
.style(Style::default().fg(Color::Cyan).add_modifier(Modifier::BOLD))
.alignment(Alignment::Left);
f.render_widget(status_para, header_chunks[0]);
// Visual Beat Counter: [ 1 2 3 4 5 6 7 8 ]
let mut beat_viz = String::from(" [");
for i in 1..=8 {
if i == app.beat_index {
beat_viz.push_str(&format!(" {} ", i));
} else {
beat_viz.push_str(" . ");
}
}
beat_viz.push(']');
let beat_progress = (app.beat_index as f32 / 8.0) * 100.0;
let pulse_gauge = Gauge::default()
.block(Block::default().borders(Borders::ALL).border_type(BorderType::Rounded).title(" Cluster Pulse "))
.gauge_style(Style::default().fg(Color::Cyan).bg(Color::DarkGray))
.percent(beat_progress as u16)
.label(beat_viz);
f.render_widget(pulse_gauge, header_chunks[1]);
// --- Body (Peers, Projects, and Logs) ---
let body_chunks = Layout::default()
.direction(Direction::Horizontal)
.constraints(
[
Constraint::Percentage(25), // Peer List
Constraint::Percentage(25), // Project List
Constraint::Percentage(50), // Logs
]
.as_ref(),
)
.split(chunks[1]);
// Peer List
let peers: Vec<ListItem> = app.peers.values()
.map(|p| {
let content = format!(" {} [{:?}] W: {:.2}", p.id, p.role, p.resource_score);
ListItem::new(content).style(Style::default().fg(Color::Yellow))
})
.collect();
let peer_list = List::new(peers)
.block(Block::default().borders(Borders::ALL).title(" Active Agents "))
.highlight_style(Style::default().add_modifier(Modifier::BOLD))
.highlight_symbol(">> ");
f.render_widget(peer_list, body_chunks[0]);
// Project List
let projects: Vec<ListItem> = app.projects.iter()
.map(|p| {
let content = format!(" {} [{}]", p.name, p.status);
ListItem::new(content).style(Style::default().fg(Color::Green))
})
.collect();
let project_list = List::new(projects)
.block(Block::default().borders(Borders::ALL).title(" Local Projects "));
f.render_widget(project_list, body_chunks[1]);
// Logs
let logs: Vec<ListItem> = app.logs.iter().rev()
.map(|s| ListItem::new(s.as_str()))
.collect();
let log_list = List::new(logs)
.block(Block::default().borders(Borders::ALL).title(" Live Event Bus "));
f.render_widget(log_list, body_chunks[2]);
}