117 lines
4.2 KiB
Rust
117 lines
4.2 KiB
Rust
use chrono::Utc;
|
||
/// chrs-sync crate provides synchronization utilities for the CHORUS system.
|
||
///
|
||
/// It uses a `Mailbox` for message passing between peers and a Dolt repository
|
||
/// to track state hashes. The primary abstraction is `SyncManager`, which can
|
||
/// broadcast the current repository hash to peers and handle incoming sync
|
||
/// signals.
|
||
use chrs_mail::{Mailbox, Message};
|
||
use std::path::PathBuf;
|
||
use std::process::Command;
|
||
use uuid::Uuid;
|
||
|
||
/// Manages synchronization of a Dolt repository across peers.
|
||
///
|
||
/// # Fields
|
||
/// * `mailbox` – The `Mailbox` instance used to send and receive messages.
|
||
/// * `repo_path` – Filesystem path to the local Dolt repository.
|
||
///
|
||
/// # Rationale
|
||
/// The CHORUS architecture relies on deterministic state replication. By
|
||
/// broadcasting the latest commit hash (`sync_signal`) each peer can decide
|
||
/// whether to pull updates. This struct encapsulates that behaviour, keeping the
|
||
/// rest of the system agnostic of the underlying VCS commands.
|
||
pub struct SyncManager {
|
||
mailbox: Mailbox,
|
||
repo_path: PathBuf,
|
||
}
|
||
|
||
impl SyncManager {
|
||
/// Creates a new `SyncManager`.
|
||
///
|
||
/// # Parameters
|
||
/// * `mailbox` – An already‑opened `Mailbox` for peer communication.
|
||
/// * `repo_path` – Path to the Dolt repository that should be kept in sync.
|
||
///
|
||
/// Returns a fully‑initialised manager ready to broadcast or handle sync
|
||
/// signals.
|
||
pub fn new(mailbox: Mailbox, repo_path: PathBuf) -> Self {
|
||
Self { mailbox, repo_path }
|
||
}
|
||
|
||
/// Broadcasts the current repository state to a remote peer.
|
||
///
|
||
/// The method executes `dolt log -n 1 --format %H` to obtain the most recent
|
||
/// commit hash, constructs a `Message` with topic `"sync_signal"` and sends it
|
||
/// via the mailbox.
|
||
///
|
||
/// * `from_peer` – Identifier of the sender.
|
||
/// * `to_peer` – Identifier of the intended recipient.
|
||
///
|
||
/// # Errors
|
||
/// Returns any I/O or command‑execution error wrapped in a boxed `dyn
|
||
/// Error`.
|
||
pub fn broadcast_state(
|
||
&self,
|
||
from_peer: &str,
|
||
to_peer: &str,
|
||
) -> Result<(), Box<dyn std::error::Error>> {
|
||
// Get current dolt hash
|
||
let output = Command::new("dolt")
|
||
.args(&["log", "-n", "1", "--format", "%H"])
|
||
.current_dir(&self.repo_path)
|
||
.output()?;
|
||
|
||
let current_hash = String::from_utf8_lossy(&output.stdout).trim().to_string();
|
||
|
||
let msg = Message {
|
||
id: Uuid::new_v4(),
|
||
from_peer: from_peer.into(),
|
||
to_peer: to_peer.into(),
|
||
topic: "sync_signal".into(),
|
||
payload: serde_json::json!({ "commit_hash": current_hash }),
|
||
sent_at: Utc::now(),
|
||
read_at: None,
|
||
};
|
||
|
||
self.mailbox.send(&msg)?;
|
||
println!(
|
||
"Broadcasted sync signal: {} from {}",
|
||
current_hash, from_peer
|
||
);
|
||
Ok(())
|
||
}
|
||
|
||
/// Handles an incoming `sync_signal` message.
|
||
///
|
||
/// If the message topic is not `"sync_signal"` the function returns `Ok(())`
|
||
/// immediately. Otherwise it extracts the remote commit hash and attempts a
|
||
/// `dolt pull origin` to bring the local repository up‑to‑date. In a real
|
||
/// P2P deployment the remote URL would be derived from the sender, but the
|
||
/// current implementation uses the default remote configuration.
|
||
///
|
||
/// # Errors
|
||
/// Propagates any command execution failures.
|
||
pub fn handle_sync_signal(&self, msg: &Message) -> Result<(), Box<dyn std::error::Error>> {
|
||
if msg.topic != "sync_signal" {
|
||
return Ok(());
|
||
}
|
||
|
||
let remote_hash = msg.payload["commit_hash"].as_str().unwrap_or_default();
|
||
println!("Received sync signal for hash: {}", remote_hash);
|
||
|
||
// In a real P2P scenario, we would pull from the remote peer's URL.
|
||
// For now, we simulate by attempting a 'dolt pull' if a remote is configured.
|
||
let status = Command::new("dolt")
|
||
.args(&["pull", "origin"])
|
||
.current_dir(&self.repo_path)
|
||
.status()?;
|
||
|
||
if status.success() {
|
||
println!("Successfully pulled updates for hash: {}", remote_hash);
|
||
}
|
||
|
||
Ok(())
|
||
}
|
||
}
|