Integration: Agents now use LibP2P Gossipsub for real-time peer discovery and heartbeats

This commit is contained in:
anthonyrawlins
2026-03-04 03:50:59 +11:00
parent 2202dfbaf3
commit 8db06616e0
2 changed files with 93 additions and 91 deletions

View File

@@ -1,12 +1,14 @@
//! chrs-discovery: LibP2P-based peer discovery for CHORUS.
//! chrs-discovery: LibP2P-based peer discovery and message bus for CHORUS.
use libp2p::{
mdns,
swarm::{NetworkBehaviour, SwarmEvent},
gossipsub,
PeerId,
};
use futures::StreamExt;
use std::error::Error;
use tokio::sync::mpsc;
#[derive(NetworkBehaviour)]
pub struct MyBehaviour {
@@ -14,14 +16,34 @@ pub struct MyBehaviour {
pub gossipsub: gossipsub::Behaviour,
}
/// A message received from the P2P bus.
#[derive(Debug, Clone)]
pub struct BusMessage {
pub topic: String,
pub source_peer_id: String,
pub payload: Vec<u8>,
}
/// A handle to interact with the P2P bus from the agent.
pub struct BusHandle {
pub outgoing_tx: mpsc::UnboundedSender<(String, Vec<u8>)>, // (topic, data)
pub local_peer_id: PeerId,
}
impl BusHandle {
pub fn publish(&self, topic: &str, data: Vec<u8>) -> Result<(), Box<dyn Error>> {
self.outgoing_tx.send((topic.to_string(), data))?;
Ok(())
}
}
pub struct SwarmManager;
impl SwarmManager {
/// Initialize a new LibP2P swarm with mDNS discovery.
///
/// **Why**: Moving away from polling a database to real-time P2P discovery
/// reduces latency and allows CHORUS to scale to dynamic, broker-less environments.
pub async fn start_discovery_loop() -> Result<(), Box<dyn Error>> {
/// Initialize a new LibP2P swarm and start the P2P bus loop.
pub async fn start_bus(
received_tx: mpsc::UnboundedSender<BusMessage>
) -> Result<BusHandle, Box<dyn Error>> {
let mut swarm = libp2p::SwarmBuilder::with_new_identity()
.with_tokio()
.with_tcp(
@@ -51,19 +73,54 @@ impl SwarmManager {
})?
.build();
let local_peer_id = *swarm.local_peer_id();
println!("[DISCOVERY] Swarm started. Local PeerId: {}", local_peer_id);
// Subscribe to default topics
let global_topic = gossipsub::IdentTopic::new("chorus-global");
let heartbeat_topic = gossipsub::IdentTopic::new("chorus-heartbeat");
swarm.behaviour_mut().gossipsub.subscribe(&global_topic)?;
swarm.behaviour_mut().gossipsub.subscribe(&heartbeat_topic)?;
swarm.listen_on("/ip4/0.0.0.0/tcp/0".parse()?)?;
println!("[DISCOVERY] Swarm started. Listening for peers via mDNS...");
let (outgoing_tx, mut outgoing_rx) = mpsc::unbounded_channel::<(String, Vec<u8>)>();
loop {
match swarm.select_next_some().await {
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, _multiaddr) in list {
println!("[DISCOVERY] mDNS discovered a new peer: {}", peer_id);
// Start the swarm event loop
tokio::spawn(async move {
loop {
tokio::select! {
event = swarm.select_next_some() => match event {
SwarmEvent::Behaviour(MyBehaviourEvent::Mdns(mdns::Event::Discovered(list))) => {
for (peer_id, _multiaddr) in list {
println!("[DISCOVERY] mDNS discovered peer: {}", peer_id);
swarm.behaviour_mut().gossipsub.add_explicit_peer(&peer_id);
}
}
SwarmEvent::Behaviour(MyBehaviourEvent::Gossipsub(gossipsub::Event::Message {
propagation_source: peer_id,
message_id: _id,
message,
})) => {
let _ = received_tx.send(BusMessage {
topic: message.topic.to_string(),
source_peer_id: peer_id.to_string(),
payload: message.data,
});
}
_ => {}
},
Some((topic_name, data)) = outgoing_rx.recv() => {
let topic = gossipsub::IdentTopic::new(topic_name);
let _ = swarm.behaviour_mut().gossipsub.publish(topic, data);
}
}
_ => {}
}
}
});
Ok(BusHandle {
outgoing_tx,
local_peer_id,
})
}
}