Parallel Implementation: UCXLWatcher (FS-to-Metadata) and chrs-sync (P2P Graph Sync)

This commit is contained in:
anthonyrawlins
2026-03-03 17:32:26 +11:00
parent 003cb60c0f
commit f134b1d060
7 changed files with 142 additions and 0 deletions

View File

@@ -4,3 +4,5 @@ version = "0.1.0"
edition = "2024" edition = "2024"
[dependencies] [dependencies]
notify = "6.1.1"
walkdir = "2.4"

View File

@@ -0,0 +1,15 @@
{
"keep": {
"days": true,
"amount": 14
},
"auditLog": "/home/tony/rust/projects/reset/CHORUS/UCXL/logs/.6faacdcce0b3695d429f7b102ede9ce5ae292f3e-audit.json",
"files": [
{
"date": 1772519476002,
"name": "/home/tony/rust/projects/reset/CHORUS/UCXL/logs/mcp-puppeteer-2026-03-03.log",
"hash": "da658db1c8508e25efec34d759a51688a90bc438a120e72b8f365dd866fd972e"
}
],
"hashType": "sha256"
}

View File

@@ -0,0 +1,3 @@
{"level":"info","message":"Starting MCP server","service":"mcp-puppeteer","timestamp":"2026-03-03 17:31:16.039"}
{"level":"info","message":"MCP server started successfully","service":"mcp-puppeteer","timestamp":"2026-03-03 17:31:16.040"}
{"level":"info","message":"Puppeteer MCP Server closing","service":"mcp-puppeteer","timestamp":"2026-03-03 17:31:22.517"}

View File

@@ -1,5 +1,7 @@
// UCXL Core Data Structures // UCXL Core Data Structures
pub mod watcher;
use std::collections::HashMap; use std::collections::HashMap;
use std::fmt; use std::fmt;
use std::str::FromStr; use std::str::FromStr;

45
UCXL/src/watcher.rs Normal file
View File

@@ -0,0 +1,45 @@
use notify::{Config, RecommendedWatcher, RecursiveMode, Watcher};
use std::path::Path;
use std::sync::mpsc::channel;
use crate::{UCXLAddress, TemporalAxis};
use std::str::FromStr;
pub struct UCXLWatcher {
base_path: std::path::PathBuf,
}
impl UCXLWatcher {
pub fn new<P: AsRef<Path>>(path: P) -> Self {
Self {
base_path: path.as_ref().to_path_buf(),
}
}
pub fn watch_loop(&self) -> Result<(), Box<dyn std::error::Error>> {
let (tx, rx) = channel();
let mut watcher = RecommendedWatcher::new(tx, Config::default())?;
watcher.watch(&self.base_path, RecursiveMode::Recursive)?;
println!("UCXL Watcher started on {:?}", self.base_path);
for res in rx {
match res {
Ok(event) => {
for path in event.paths {
if let Some(rel_path) = path.strip_prefix(&self.base_path).ok() {
let rel_str = rel_path.to_string_lossy();
// Attempt a heuristic address mapping: ucxl://system:watcher@local:filesystem/#/path
let addr_str = format!("ucxl://system:watcher@local:filesystem/#/{}", rel_str);
if let Ok(addr) = UCXLAddress::from_str(&addr_str) {
println!("[UCXL EVENT] {:?} -> {}", event.kind, addr);
}
}
}
}
Err(e) => println!("watch error: {:?}", e),
}
}
Ok(())
}
}

13
chrs-sync/Cargo.toml Normal file
View File

@@ -0,0 +1,13 @@
[package]
name = "chrs-sync"
version = "0.1.0"
edition = "2021"
[dependencies]
chrs-mail = { path = "../chrs-mail" }
chrs-graph = { path = "../chrs-graph" }
serde = { version = "1.0", features = ["derive"] }
serde_json = "1.0"
tokio = { version = "1.0", features = ["full"] }
uuid = { version = "1.0", features = ["v4"] }
chrono = "0.4"

62
chrs-sync/src/lib.rs Normal file
View File

@@ -0,0 +1,62 @@
use chrs_mail::{Mailbox, Message};
use chrono::Utc;
use std::process::Command;
use uuid::Uuid;
use std::path::PathBuf;
pub struct SyncManager {
mailbox: Mailbox,
repo_path: PathBuf,
}
impl SyncManager {
pub fn new(mailbox: Mailbox, repo_path: PathBuf) -> Self {
Self { mailbox, repo_path }
}
pub fn broadcast_state(&self, from_peer: &str, to_peer: &str) -> Result<(), Box<dyn std::error::Error>> {
// Get current dolt hash
let output = Command::new("dolt")
.args(&["log", "-n", "1", "--format", "%H"])
.current_dir(&self.repo_path)
.output()?;
let current_hash = String::from_utf8_lossy(&output.stdout).trim().to_string();
let msg = Message {
id: Uuid::new_v4(),
from_peer: from_peer.into(),
to_peer: to_peer.into(),
topic: "sync_signal".into(),
payload: serde_json::json!({ "commit_hash": current_hash }),
sent_at: Utc::now(),
read_at: None,
};
self.mailbox.send(&msg)?;
println!("Broadcasted sync signal: {} from {}", current_hash, from_peer);
Ok(())
}
pub fn handle_sync_signal(&self, msg: &Message) -> Result<(), Box<dyn std::error::Error>> {
if msg.topic != "sync_signal" {
return Ok(());
}
let remote_hash = msg.payload["commit_hash"].as_str().unwrap_or_default();
println!("Received sync signal for hash: {}", remote_hash);
// In a real P2P scenario, we would pull from the remote peer's URL.
// For now, we simulate by attempting a 'dolt pull' if a remote is configured.
let status = Command::new("dolt")
.args(&["pull", "origin"])
.current_dir(&self.repo_path)
.status()?;
if status.success() {
println!("Successfully pulled updates for hash: {}", remote_hash);
}
Ok(())
}
}