diff --git a/chrs-bubble/Cargo.toml b/chrs-bubble/Cargo.toml new file mode 100644 index 00000000..6a2eec9c --- /dev/null +++ b/chrs-bubble/Cargo.toml @@ -0,0 +1,18 @@ +[package] +name = "chrs-bubble" +version = "0.1.0" +edition = "2021" + +[dependencies] +chrs-graph = { path = "../chrs-graph" } +ucxl = { path = "../UCXL" } +serde = { version = "1.0", features = ["derive"] } +serde_json = "1.0" +thiserror = "1.0" +chrono = { version = "0.4", features = ["serde"] } +uuid = { version = "1.0", features = ["v4", "serde"] } +petgraph = "0.6" + +[dev-dependencies] +tempfile = "3" + diff --git a/chrs-bubble/src/lib.rs b/chrs-bubble/src/lib.rs new file mode 100644 index 00000000..1fa9fba4 --- /dev/null +++ b/chrs-bubble/src/lib.rs @@ -0,0 +1,104 @@ +use chrs_graph::{DoltGraph, GraphError}; +use ucxl::UCXLAddress; +use serde::{Deserialize, Serialize}; +use thiserror::Error; +use uuid::Uuid; +use petgraph::graph::{DiGraph, NodeIndex}; +use std::collections::HashMap; + +#[derive(Debug, Serialize, Deserialize, Clone, Copy, PartialEq, Eq)] +pub enum ProvenanceEdge { + DerivedFrom, + Cites, + InfluencedBy, +} + +#[derive(Debug, Error)] +pub enum BubbleError { + #[error("Graph error: {0}")] + Graph(#[from] GraphError), + #[error("Serde error: {0}")] + Serde(#[from] serde_json::Error), + #[error("Node not found: {0}")] + NodeNotFound(Uuid), +} + +pub struct ProvenanceGraph { + persistence: DoltGraph, + dag: DiGraph, + node_map: HashMap, +} + +impl ProvenanceGraph { + pub fn new(persistence: DoltGraph) -> Self { + Self { + persistence, + dag: DiGraph::new(), + node_map: HashMap::new(), + } + } + + pub fn record_node(&mut self, id: Uuid, address: &str) -> Result<(), BubbleError> { + if !self.node_map.contains_key(&id) { + let idx = self.dag.add_node(id); + self.node_map.insert(id, idx); + + // Persist + self.persistence.create_table("provenance_nodes", "id VARCHAR(255) PRIMARY KEY, address TEXT") + .ok(); + + let data = serde_json::json!({ + "id": id.to_string(), + "address": address + }); + self.persistence.insert_node("provenance_nodes", data)?; + self.persistence.commit(&format!("Record provenance node: {}", id))?; + } + Ok(()) + } + + pub fn record_link(&mut self, source: Uuid, target: Uuid, edge: ProvenanceEdge) -> Result<(), BubbleError> { + let source_idx = *self.node_map.get(&source).ok_or(BubbleError::NodeNotFound(source))?; + let target_idx = *self.node_map.get(&target).ok_or(BubbleError::NodeNotFound(target))?; + + self.dag.add_edge(source_idx, target_idx, edge); + + // Persist + self.persistence.create_table("provenance_links", "id VARCHAR(255) PRIMARY KEY, source_id TEXT, target_id TEXT, edge_type TEXT") + .ok(); + + let link_id = Uuid::new_v4(); + let data = serde_json::json!({ + "id": link_id.to_string(), + "source_id": source.to_string(), + "target_id": target.to_string(), + "edge_type": format!("{:?}", edge) + }); + + self.persistence.insert_node("provenance_links", data)?; + self.persistence.commit(&format!("Record provenance link: {} -> {}", source, target))?; + + Ok(()) + } +} + +#[cfg(test)] +mod tests { + use super::*; + use tempfile::TempDir; + + #[test] + fn test_provenance_dag() { + let dir = TempDir::new().unwrap(); + let persistence = DoltGraph::init(dir.path()).expect("dolt init failed"); + let mut graph = ProvenanceGraph::new(persistence); + + let id1 = Uuid::new_v4(); + let id2 = Uuid::new_v4(); + + graph.record_node(id1, "ucxl://agent:1@proj:task/#/file1.txt").unwrap(); + graph.record_node(id2, "ucxl://agent:1@proj:task/#/file2.txt").unwrap(); + + graph.record_link(id1, id2, ProvenanceEdge::DerivedFrom).unwrap(); + } +}