Final High-Fidelity: Eliminate all remaining mocks and hardcoded placeholders. Real Docker exit codes, dynamic resource scores, and true SQL cooperative throttling implemented.
This commit is contained in:
@@ -14,3 +14,4 @@ chrono = { version = "0.4", features = ["serde"] }
|
||||
uuid = { version = "1.0", features = ["v4", "serde"] }
|
||||
tokio = { version = "1.0", features = ["full"] }
|
||||
rand = "0.8"
|
||||
sysinfo = "0.36.1"
|
||||
|
||||
@@ -5,7 +5,9 @@ use chrono::{DateTime, Utc, Duration};
|
||||
use serde::{Deserialize, Serialize};
|
||||
use thiserror::Error;
|
||||
use std::collections::HashMap;
|
||||
use std::sync::{Arc, RwLock};
|
||||
use std::sync::{Arc, RwLock, Mutex};
|
||||
use sysinfo::{System, CpuRefreshKind, RefreshKind};
|
||||
use rand::Rng;
|
||||
|
||||
const ELECTION_TOPIC: &str = "CHORUS/election/v1";
|
||||
const HEARTBEAT_TOPIC: &str = "CHORUS/admin/heartbeat/v1";
|
||||
@@ -64,10 +66,16 @@ pub struct ElectionManager {
|
||||
candidates: Arc<RwLock<HashMap<String, AdminCandidate>>>,
|
||||
bus: BusHandle,
|
||||
boot_time: DateTime<Utc>,
|
||||
sys: Arc<Mutex<System>>,
|
||||
}
|
||||
|
||||
impl ElectionManager {
|
||||
pub fn new(node_id: &str, bus: BusHandle) -> Self {
|
||||
let mut sys = System::new_with_specifics(
|
||||
RefreshKind::nothing().with_cpu(CpuRefreshKind::everything())
|
||||
);
|
||||
sys.refresh_cpu_all();
|
||||
|
||||
Self {
|
||||
node_id: node_id.to_string(),
|
||||
state: Arc::new(RwLock::new(ElectionState::Idle)),
|
||||
@@ -78,6 +86,7 @@ impl ElectionManager {
|
||||
candidates: Arc::new(RwLock::new(HashMap::new())),
|
||||
bus,
|
||||
boot_time: Utc::now(),
|
||||
sys: Arc::new(Mutex::new(sys)),
|
||||
}
|
||||
}
|
||||
|
||||
@@ -96,6 +105,15 @@ impl ElectionManager {
|
||||
}
|
||||
}
|
||||
|
||||
pub fn get_resource_score(&self) -> f64 {
|
||||
let mut sys = self.sys.lock().unwrap();
|
||||
sys.refresh_cpu_all();
|
||||
let global_cpu = sys.global_cpu_usage() as f64;
|
||||
let jitter: f64 = rand::thread_rng().gen_range(0.0..0.01);
|
||||
let score: f64 = 1.0 - (global_cpu / 100.0);
|
||||
score.max(0.0) + jitter
|
||||
}
|
||||
|
||||
pub async fn process_message(&self, msg: &BusMessage) -> Result<(), ElectionError> {
|
||||
if msg.topic == HEARTBEAT_TOPIC {
|
||||
let heartbeat: serde_json::Value = serde_json::from_slice(&msg.payload)?;
|
||||
@@ -158,7 +176,6 @@ impl ElectionManager {
|
||||
|
||||
match state {
|
||||
ElectionState::Idle => {
|
||||
// Heartbeat timeout - move to discovery
|
||||
if last_hb_age > Duration::seconds(15) {
|
||||
println!("[ELECTION {}] Heartbeat timeout after {}s", self.node_id, last_hb_age.num_seconds());
|
||||
self.transition(ElectionState::Discovering);
|
||||
@@ -167,7 +184,6 @@ impl ElectionManager {
|
||||
}
|
||||
}
|
||||
ElectionState::Discovering => {
|
||||
// Discovery timeout - start election
|
||||
if state_age > Duration::seconds(5) {
|
||||
let mut term = self.current_term.write().unwrap();
|
||||
*term += 1;
|
||||
@@ -178,17 +194,14 @@ impl ElectionManager {
|
||||
}
|
||||
}
|
||||
ElectionState::Electing => {
|
||||
// Candidacy period
|
||||
if state_age < Duration::seconds(2) {
|
||||
// Send my candidacy
|
||||
let score = 0.9; // Placeholder
|
||||
let score = self.get_resource_score();
|
||||
let msg = ElectionMessage::Candidacy {
|
||||
term: *self.current_term.read().unwrap(),
|
||||
candidate: AdminCandidate { node_id: self.node_id.clone(), score },
|
||||
};
|
||||
let _ = self.bus.publish(ELECTION_TOPIC, serde_json::to_vec(&msg)?);
|
||||
} else if state_age > Duration::seconds(7) {
|
||||
// Tally votes
|
||||
let term = *self.current_term.read().unwrap();
|
||||
let candidates = self.candidates.read().unwrap();
|
||||
if let Some(winner) = candidates.values().max_by(|a, b| a.score.partial_cmp(&b.score).unwrap()) {
|
||||
|
||||
Reference in New Issue
Block a user