Initial commit - BUBBLE decision tracking system

- Added core BUBBLE architecture with decision envelopes and policy store
- Implemented bundle API with FastAPI skeleton and OpenAPI specification
- Added Go-based storage implementation with SQLite and RocksDB support
- Created integrations for peer sync, vector search, and N8N workflows
- Added comprehensive testing framework and documentation
- Implemented provenance walking and decision checking algorithms

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
anthonyrawlins
2025-08-27 09:35:03 +10:00
commit 013e0dc3f0
50 changed files with 3053 additions and 0 deletions

View File

@@ -0,0 +1,68 @@
# BUBBLE Microservice: Implementation & Integration Report
## 1. Executive Summary
This document details the initial implementation of the **BUBBLE** microservice, a core component of the CHORUS ecosystem. BUBBLE functions as a specialized **Decision Agent**, providing on-demand analysis of decision provenance.
The work accomplished includes:
- **Project Scaffolding:** A complete Go module (`gitea.deepblack.cloud/chorus/bubble`) has been initialized and structured.
- **Blueprint Extraction:** All Python, JSON, SQL, and YAML snippets from the initial design conversation have been extracted and organized into a structured `/src` directory, serving as a clear blueprint for development.
- **Core Data Structures:** Go structs corresponding to the API and data models have been defined in `models/models.go`.
- **Storage Layer:** A flexible storage interface has been defined. A fully functional SQLite implementation has been created for persistence and testing, and a RocksDB implementation has been scaffolded for future high-performance use.
- **API Layer:** A runnable HTTP server using Go's standard library exposes the primary `/decision/bundle` endpoint.
- **Core Logic:** The central `WalkBack` algorithm has been implemented, featuring a breadth-first search (BFS) for graph traversal and a priority queue for scoring and ranking decision records.
- **Testing:** A minimal viable product has been successfully built and tested, confirming the API can serve requests by querying a seeded database.
BUBBLE is now a functional, testable microservice that fulfills its foundational role within the CHORUS architecture.
## 2. Integration with the CHORUS Ecosystem
BUBBLE does not operate in isolation; it is a critical, specialized tool designed to be called by other components, primarily **SLURP**.
### 2.1. Relationship with SLURP
The relationship is that of an orchestrator and a specialized tool:
1. **SLURP is the Orchestrator:** As the "Context Curator," SLURP is the entry point for all new information. When a BZZZ agent submits a Decision Record (DR), SLURP is responsible for the entire enrichment process.
2. **BUBBLE is the Provenance Engine:** SLURP's first step in enrichment is to understand the history behind the new DR. To do this, it makes a synchronous API call to BUBBLE's `/decision/bundle` endpoint.
3. **Data Flow:**
* SLURP sends a `start_id` (the ID of the new DR) and a `role` to BUBBLE.
* BUBBLE performs its high-speed, read-only "walk back" through the historical decision graph stored in its local RocksDB/SQLite database.
* BUBBLE returns a "Decision Dossier" (the `DecisionBundleResponse`), which is a compact, scored, and ranked summary of the most relevant historical DRs.
* SLURP takes this historical context from BUBBLE, combines it with conceptual context from other tools (like a RAG via n8n), and synthesizes the final, enriched context package.
* SLURP then writes this complete package to the DHT using the UCXL protocol.
This architecture correctly places the responsibility of complex, stateful graph traversal on a dedicated service (BUBBLE), allowing SLURP to remain a more stateless orchestrator.
### 2.2. Interaction with BZZZ
BUBBLE has no direct interaction with BZZZ agents. BZZZ agents communicate with SLURP, which in turn uses BUBBLE as a backing service. This maintains a clear separation of concerns.
## 3. Role in Decision Record (DR) Management
BUBBLE is the primary tool for **unlocking the value** of the Decision Records that SLURP curates. While SLURP is the librarian that collects and catalogs the books, BUBBLE is the research assistant that reads them, understands their connections, and provides a concise summary on demand.
### 3.1. Answering "Why?"
By traversing the `influences` and `supersedes` edges in the provenance graph, BUBBLE provides the auditable trail required to answer fundamental questions about a project's history:
- "Why was this technical decision made?"
- "What alternatives were considered?"
- "What prior work influenced this new feature?"
### 3.2. Preventing Rework
The `WalkBack` algorithm, combined with future vector similarity scoring, is the mechanism that addresses the "we tried this already" problem. When SLURP receives a new proposal, it can ask BUBBLE to find semantically similar DRs in the past. If BUBBLE returns a previously `rejected` or `superseded` DR with a high score, SLURP can flag the new proposal as a potential repeat of past failures, saving significant development time.
### 3.3. Enforcing Constraints and Policies
While BUBBLE does not query the Policy Store directly, its output is a critical input for SLURP's policy enforcement. The Decision Dossier provides the historical context that SLURP needs to determine if a new decision violates established budgetary, technical, or security constraints.
## 4. Next Steps & Future Development
The current implementation provides a solid foundation. The following steps will build upon it to realize the full vision of the blueprint:
1. **Full RocksDB Implementation:** Complete the `RocksDBStore` methods to provide a high-performance alternative to SQLite.
2. **Data Seeding/Syncing:** Create a mechanism to sync or replicate the decision graph from the central DHT into BUBBLE's local stores.
3. **Vector Search Integration:** Integrate a vector search client (e.g., FAISS, Milvus) to implement the `sim_score` part of the `WalkBack` algorithm.
4. **Caching Layer:** Implement a caching mechanism (e.g., in-memory LRU or Redis) for frequently requested bundles to further improve performance.
5. **LLM Integration for Summarization:** While SLURP handles most LLM interaction, BUBBLE could potentially use a local model to generate the `summary` field within the `DecisionBundleResponse` for greater accuracy.

View File

@@ -0,0 +1,980 @@
I'd like to talk about (in terms of my CHORUS / BZZZ software) the ancestory and provenance of decisions. Im plagued by the idea that searching for relevant context in a tree structure of files, coupled with a virtual overlay of temporal metadata, and a virtual overlay of context, is a three dimensional problem. But the axes are not all equal. Let me explain, I think in one dimension (the actual folders/files, the logical structure is obviously a tree (even if underneath at block level its linear), but on the context layer the structure is a lot less obvious. With RAG, concepts are a number of conceptual 'hops' away in a graph structure of related entities, but on the decision axis, decisions are an ancestory of decisions from sometimes seemingly unrelated domains. I guess I'm looking for the logical structure that we use in the academic world of cross-referencing and citation, but having that navigable, and most importantly, a really fast lookup. Guide me here. I think you know what I'm looking for I just lack the language or framework to express it.
Yeah, I think I see exactly where your mental model is going — youre trying to unify three different “organizing axes” that dont naturally share the same geometry:
* **File/Folder Structure (Logical Hierarchy)** → rigid, tree-shaped, deterministic, and finite in branching.
* **Context/Conceptual Graph** → fuzzy, graph-shaped, possibly cyclic, with weighted edges representing relevance or conceptual proximity.
* **Decision/Provenance Lineage** → genealogical, DAG-shaped (directed acyclic graph), often spanning across unrelated domains and contexts.
In CHORUS/BZZZ terms, youre basically describing a **multiaxial, multi-topology knowledge space**, where each axis has its own rules for navigation, storage, and indexing — but you still want a *single, fast, unified lookup* that lets you pivot seamlessly between them.
---
## 1. A Framework for Thinking About This
Heres the language and model Id use to describe it, which comes partly from **knowledge graph theory**, **provenance modelling** (like W3C PROV), and **information retrieval** research:
| Axis | Shape / Topology | Typical Navigation | Analogy |
| -------------- | ----------------------------- | ------------------ | ------------------------------------- |
| **Structure** | Tree (hierarchical) | Path traversal | File explorer / namespace |
| **Context** | Graph (weighted, fuzzy edges) | Semantic hops | RAG vector search / ontology browsing |
| **Provenance** | DAG (directed acyclic graph) | Lineage tracing | Git history / academic citations |
The *trick* is that a query might need to move **orthogonally** between these axes — e.g.,
`Find the file that contains concept X (context axis) which was influenced by decision Y (provenance axis), and is stored under this folder (structure axis).`
---
## 2. Why “3D” Is the Wrong but Helpful Metaphor
It *feels* like a 3D space because you have three distinct navigational modes, but the axes arent geometrically equivalent:
* **Tree axis** → deterministic, bounded depth, “hard” pointers.
* **Graph axis** → probabilistic, possibly infinite expansion, “soft” associations.
* **Lineage axis** → directional, causal, may connect distant or conceptually unrelated nodes.
If you imagine each node in the tree having:
1. A set of **semantic vector embeddings** (for context hops),
2. A set of **provenance edges** (for decision ancestry),
…then what you really have is **a primary index (tree)** overlaid with two **orthogonal link layers** that allow arbitrary lateral jumps.
---
## 3. The Academic Parallel
In the academic world, youre basically talking about merging:
* **Citation Graphs** (papers citing other papers → decision ancestry)
* **Topical Ontologies** (subject headings, keyword indexing → context)
* **Library Classification** (Dewey, LOC → folder structure)
The “fast lookup” problem has already been partly solved in that domain with:
* **Inverted indexes** for topic/term search (think Elasticsearch or Lucene)
* **Adjacency lists / compressed sparse graphs** for citation lookups
* **Triple stores / RDF + SPARQL** for complex cross-axis queries
But academic search is slow compared to what you want — they prioritize completeness, not interactive navigation speed.
---
## 4. The Logical Structure Youre After
What you need in CHORUS/BZZZ is something like this:
**Base Layer (Tree Index):**
* Acts as a spatial/structural namespace.
* Stored in a **prefix tree (trie)** or **B+ tree** for ultra-fast path lookups.
**Overlay Layer 1 (Context Graph):**
* Each node in the tree has a list of **vector IDs** pointing into a vector store (FAISS, Milvus, Weaviate, Vespa, etc.).
* Context hops = nearest-neighbor search in embedding space.
**Overlay Layer 2 (Decision Lineage DAG):**
* Each node has a **provenance edge list**.
* Stored in **adjacency list form**, optimized with a **bidirectional search index** so you can go “upstream” (ancestors) or “downstream” (descendants) fast.
**Unified Query Engine:**
* Queries run as **multi-stage pipelines**: filter by tree → hop via graph → traverse via DAG (or in reverse).
* Optionally cache common traversals in **materialized path indexes** so “decision ancestry + context” queries become O(1) after first resolution.
---
## 5. Fast Navigation Tricks
* **Multi-key indexing** — e.g., `(path, embedding_cluster_id, decision_id)` as a compound index for ultra-quick cross-axis lookups.
* **Precomputed transitive closure** for provenance edges (so “all ancestors” queries are constant-time).
* **Contextual Bloom filters** per node — to cheaply say “this file definitely doesnt contain this concept”.
* **Hybrid search** — combine vector similarity with metadata filters in one step (e.g., Vespa or Elasticsearch + dense vectors).
---
## 6. Why This Matters for CHORUS/BZZZ
This would give you:
* Academic-style **citation tracking** for decisions (fast).
* RAG-style **semantic proximity** navigation (fast enough for interactive use).
* Traditional **namespace navigation** (instant).
* A **pivoting query language** — you could start in any axis and jump orthogonally.
The language for this in computer science would be:
> “A hierarchical index augmented with semantic and provenance overlays, supporting multi-topology traversal and compound indexing.”
---
If you want, I can **sketch the actual data structure layout** for CHORUS/BZZZ that makes this navigable in <10ms per hop, even at scale essentially a hybrid of Gits object store, a vector DB, and a citation graph DB. Thats where this becomes truly practical.
Do you want me to go ahead and design that?
So to clarify, I currently have a combination of git (and all the functionality that brings), a 1:1 mapping between the filesystem paths and ucxl address space. The context is stored under each ucxl address as encrypted markdown wrapped in a JSON envelope, in our distributed p2p network as a DHT, and we can look that up reasonably fast. So when we get a new decision made or new content created, all that metadata about the who, what, where, when , & why is submitted to the 'Leader' BZZZ node (Effectively the Project Manager). Its at this point too that our system events are most informative (blockers, etc). That agent uses a well-crafted prompt to assess all the submitted data, and judiciously either embelishes or prunes the context, tailoring it for the many roles in the organisation. (As part of its toolset it has a RAG lookup, which it can use to find context around terms it might not understand) It then stores this tailored context under the ucxl address space (DHT) for each of those roles. (If its unsure, it calls upon human intervention) So we don't so much need a fast lookup time as a fast processing time for that LLM. This whole ingestion process I'm calling SLURPing. What I'm looking for here is its counterpart (I guess?) The decision justification tool, the provenance of authority, the trace of rationality, that strongly correlates with roles, responsibilites, and project goals and strategies. This is clearly an LLMs strength not just another DB / data structure, unless this is another MCP tool the SLURP LLM can use. Which leads me to the how: a decision tree walk-back-in-time (more precisely decision space n-hops), thereby displaying an ancestory of decision points would be informative (but is it sufficient?) How do we pass on to an agent things like "we tried this already", and "we can't pursue this line of reasoning due to budgetary constraints", or "this library is incompatible with our cloud infrastructure". My guess is that some context is global and ... [truncated]
Nice this is a crisp brief and youre already halfway to the right architecture. Ill be blunt: you shouldnt try to hand this whole job to a single LLM prompt forever. Instead make the LLM *one* powerful orchestration tool in a small suite of well-structured, audited primitives (structured decision records, a provenance graph, global policy/constraint store, and a fast vector/context store). Below is a practical, concrete plan you can implement incrementally.
# What you need (conceptually)
1. **Immutable Decision Records (DRs).** Every decision (or proposal, experiment, rejection) becomes an immutable object stored in your DHT/git-backed UCXL address space.
2. **Provenance Graph.** DRs are nodes in a directed graph (edges = “influenced by”, derived from”, rejected in favour of”, depends on constraint X”, uses lib Y”, etc.). This is the thing you walk back n-hops.
3. **Context Vectors + Metadata.** Each DR and content node gets:
* embeddings for semantic similarity (RAG)
* structured metadata (who/when/why/alternatives/constraints/cost/confidence)
* role-exposure flags (which roles should see this as global vs. private)
4. **Policy/Constraint Store (global).** Single source of truth for global limits: budgets, infra incompatibilities, regulatory constraints, capability profiles.
5. **Small toolset for SLURP + Decision Agent.** SLURP ingests/normalizes into DRs; Decision Agent uses graph queries + RAG + constraint checks + human-in-loop to produce tailored context bundles.
# Decision Record (DR) — concrete schema (example)
Store this as JSON wrapped and signed/encrypted; keep it immutable and append-only.
[BUBBLE-SNIPPET-01]
# Where SLURP fits
SLURP keeps doing what it does:
* normalise incoming submit (who/what/where/when/why)
* create a DR with structured fields above
* generate an embedding and store `embeddings_id`
* append to UCXL/DHT and link into provenance graph
* produce role-specific bundles by filtering `role_exposure` + applying global policy filters
But add two new SLURP capabilities:
1. **Decision Summariser** produce a short machine-readable summary plus human-readable justification that becomes part of the DR. This is the LLMs strength but done as a step, not the only truth.
2. **Constraint Annotator** match DR constraints against the Policy Store and tag the DR (e.g., `violates:budget-l1`).
# The Decision Agent (counterpart to SLURP)
A small orchestration service (can be LLM-driven) that:
* receives queries like Can Agent-X attempt approach Q?”
* fetches: relevant DRs (n-hop ancestry), relevant context via RAG, global constraints, and capability profiles
* runs a **pre-check pipeline**:
1. Provenance walk (bounded) to collect ancestors/descendants
2. RAG similarity search to find we tried this before evidence
3. Constraint check (policy store)
4. Capability check (role/team/infra profiles)
* produces a **decision packet**: yes/no/conditional + provenance trace + suggested next steps + who-to-ask (human)
* if unsure, escalates to human review, and records that escalation as a DR
# Provenance + RAG query strategy (practical)
Youll often want to combine graph distance and semantic similarity. Use a scoring function:
score(node) = α \* provenance\_relevance(node) + β \* semantic\_similarity(query\_embedding, node\_embedding) + γ \* recency\_factor
* provenance\_relevance: inverse of number of hops (or use weighted PageRank)
* semantic\_similarity: cosine from vector DB
* recency\_factor: decays older nodes unless they are highly-cited in graph
Algorithm for `walk_back(n, query, role)`:
1. Find anchor nodes (explicit doc or concept vector hits).
2. BFS from anchors over provenance edges up to depth n, collecting nodes.
3. For each node compute `score`.
4. Return top-K nodes filtered by `role_exposure` and policy checks.
Cache results for repeated queries.
# Passing "we tried this already" and constraints
Make attempts explicit DRs with lifecycle states:
* `experiment:started`, `experiment:failed`, `experiment:succeeded`, `experiment:abandoned`
Tag experiments with outcome and reason. An agent checks for any `experiment` DRs matching same `tags` or high semantic similarity before starting new work. Thats how we tried this already becomes a deterministic check, not a fuzzy maybe.”
For constraints:
* Put them in a **Policy Store** (UCXL path) with machine-friendly predicates:
* `policy:budget-l1 -> {"max_spend":10000,"applies_to":["project:foo"]}`
* `policy:cloud-compat -> {"disallowed":["gcp"], "allowed":["aws","azure"]}`
Decision Agent always runs constraint checks automatically and annotates the DR with `violations` field.
# Role-specific context hygiene
Youre right some context is global and some is local. Two patterns:
1. **Global flags + redaction rules.**
* DR fields can be marked `global`, `project-only`, or `private-to-role`. SLURP masks or omits private fields when producing bundles.
2. **Concise role bundles.**
* The Leader produces condensed bundles: `role_bundle = {summary, relevant_DR_refs, top_evidence, policies_applied}`. Thats what gets stored for that roles UCXL path.
# UI / UX for humans & agents
* **Decision Walk UI**: timeline slider + graph view + dead-branch collapse”. Allow toggles: show all ancestors, only experiments, only constraints, only external influences.
* **Signal lights**: green = no policy violations; amber = conditional; red = violates global policy.
* **“Tried before?” quick-check** button that runs semantic + provenance test and shows the most relevant prior experiment DR(s).
# Fine-tune vs prompt engineering (be practical)
* **Start with robust prompt engineering + toolchain**: SLURP + Decision Agent + structured DRs + RAG will already drastically reduce hallucination and keep traceability.
* **Collect good labeled data** (DRs + human decisions + outcomes). After you have thousands of DRs and outcomes, fine-tune or train a model specifically to:
* Summarize decisions into DR fields
* Predict likely outcomes given prior DRs
* Fine-tuning is expensive and useful later; first get the structured pipeline and provenance graph right.
# Performance & storage notes (practical)
* Keep DRs small and reference external evidence via UCXL addresses (dont inline huge blobs).
* Index:
* adjacency lists for provenance graph (fast traversal)
* vector index (FAISS / Milvus) for embeddings
* materialized ancestor sets for frequently-read nodes (to instant-query all ancestors”).
* Cache role bundles and common walks, invalidate on new DR append.
# Attack surface & signing
* DRs should be signed by actor keys; leader validates signatures before incorporating decisions.
* Maintain role-based encryption so private or sensitive DRs are only visible to authorized roles.
# Minimal roadmap (first 3 increments)
1. **DR schema + append-only storage**: implement immutable DR format; wire SLURP to create them.
2. **Provenance graph**: start small (Neo4j / DHT adjacency lists) and implement a bounded walk API.
3. **Decision Agent orchestration**: implement the pre-check pipeline (provenance walk + RAG + policy check) and add the tried before check into your CI for agents. Add human escalation logging.
After those three, build the Decision Walk UI and start collecting data for future fine-tuning.
# Final take
Youre right to treat this as something the LLM helps with, not replaces. The LLMs job should be:
* normalize incoming natural language into structured DR fields,
* summarize and justify decisions in human language (for the record),
* propose candidate ancestors and alternatives (and surface evidence).
But the **source of truth** must be structured DRs + provenance graph + policy store. That combination lets you (1) prove *why* something was done, (2) prevent rework by answering did we try this?”, and (3) safely tailor context to roles.
If you want, Ill:
* draft a production-ready DR JSON schema (with signed envelope and encryption examples),
* sketch the API endpoints for SLURP + Decision Agent (walk, check, bundle),
* or write the pseudocode for the `walk_back(n, query)` scorer and caching strategy.
Which of those do you want next?
Got it heres a **production-ready Decision Record (DR) schema** that fits your CHORUS/BZZZ/UCXL ecosystem, assuming:
* Immutable, append-only storage
* JSON wrapped in an **encrypted + signed envelope**
* Links cleanly into your UCXL path space
* Supports role-specific exposure
* Explicit provenance edges + policy constraints
---
## **1. Encrypted + Signed Envelope**
Every DR sits inside an envelope with three top-level sections:
[BUBBLE-SNIPPET-01]
* **`payload`** is the raw DR (below), encrypted at rest in your DHT.
* **`signatures`** verify integrity & identity. Multiple allowed (co-signed decisions).
* **`encryption.recipients`** = keys of roles or individuals allowed to read.
---
## **2. Decision Record (DR) Payload**
This is the actual schema SLURP will generate and the Decision Agent will consume.
[BUBBLE-SNIPPET-02]
---
### **Key Design Choices**
1. **`id`** is a content hash (e.g., SHA-256 of canonicalised payload) immutability guaranteed.
2. **`ucxl_path`** is the anchor for linking into your filesystem/git space.
3. **`lifecycle_state`** lets you quickly filter out superseded or rejected DRs without deleting them.
4. **`alternatives`** explicitly track rejected or less-preferred options critical for we tried this lookups.
5. **`constraints`** tie directly into your **Policy Store**.
6. **`provenance`** is a lightweight adjacency list enables O(1) fetch for neighbours in the graph.
7. **`role_exposure`** = filtering hint for SLURP when bundling context for specific roles.
8. **`embeddings_id`** links to the vector DB for semantic search.
---
## **3. Constraints & Policy Store Example**
Youd keep these in their own UCXL namespace:
[BUBBLE-SNIPPET-03]
---
## **4. Encryption Flow**
* **On creation**:
* SLURP creates payload (DR)
* Serialises hashes signs encrypts for allowed recipients stores in DHT
* **On retrieval**:
* Decrypt with recipient key verify signatures hand DR to agent/human
---
## **5. Why This Works**
* **Auditable**: Immutable + signed = no silent changes to history.
* **Navigable**: Provenance graph is adjacency lists in each DR fast n-hop walks.
* **LLM-Friendly**: Structured fields mean you can feed only relevant parts into prompt windows.
* **Role-Aware**: Exposure flags + encryption let you control information flow at storage time, not just query time.
---
If you like, I can now **extend this with the pseudocode & data layout** for the provenance graph so your Decision Agent can do the n-hop ancestry walk efficiently thats what will make the decision space walk-back instant.
Do you want me to do that next?
Alright lets wire this DR schema into a **provenance graph** you can walk in milliseconds, even at thousands or millions of DRs.
---
## **1. Core Idea**
* Each **Decision Record (DR)** in the DHT has **provenance edges** (`influenced_by`, `derived_from`, `supersedes`).
* We **materialize** an adjacency index (key-value store) so you dont need to decrypt and parse every DR just to walk the graph.
* This index is **signed & cached locally** on each node so graph walks are fast without hitting the network.
---
## **2. Data Layout**
Two main indexes:
### **A. Forward Index (Downstream)**
[BUBBLE-SNIPPET-04]
### **B. Reverse Index (Upstream)**
[BUBBLE-SNIPPET-05]
* **Forward index** answers: What came after this DR?”
* **Reverse index** answers: What led to this DR?”
Both can be **stored as compact adjacency lists in LevelDB / RocksDB / SQLite**.
---
## **3. Provenance Walk Algorithm**
Heres pseudocode for **bounded n-hop ancestry**:
[BUBBLE-SNIPPET-06]
---
## **4. Metadata Cache**
`get_dr_metadata(dr_id)` is **not** a full decrypt just a lightweight cache entry like:
[BUBBLE-SNIPPET-07]
This cache is generated **when SLURP first ingests** or updates a DR.
That way walks avoid decrypting everything and hitting the full DHT.
---
## **5. Fast “We Tried This Already” Check**
You can combine provenance walk + semantic similarity:
[BUBBLE-SNIPPET-08]
---
## **6. Storage & Sync Strategy**
* Keep adjacency lists **locally indexed** per BZZZ node.
* Sync them as small delta updates over p2p (no need to transfer whole DRs).
* Store `forward_index` and `reverse_index` **as separate keyspaces** in a KV store.
* When a new DR is ingested, SLURP updates:
* forward edges from each provenance target
* reverse edges pointing back to each provenance source
---
## **7. Graph Walk + RAG for Decision Agent**
The Decision Agent query pipeline becomes:
1. **Find entry point**
* If starting from concept vector search anchor DR(s)
* If starting from DR directly use ID
2. **Upstream Walk**
* Reverse index BFS up to `n` hops
* Role-based filter, state filter
3. **Semantic Expansion**
* Take ancestor set, vector search for semantically similar DRs (within constraints)
4. **Constraint Check**
* For each DR, check `constraints` field vs. Policy Store
5. **Assemble Provenance Bundle**
* For LLM: timeline order + statements + constraints + key evidence refs
---
## **8. Why This Will Be Fast**
* Graph walk = **O(n)** in number of edges visited, no DHT round trips
* Metadata cache avoids decryption until absolutely needed
* Role/state filters applied at walk time keep set small
* DHT is only hit at **bundle assembly** stage, not during raw walk
---
If you want, I can now **mock a Decision Agents “bundle” output** so you can see exactly what the LLM would get after a walk-back thatll make it clear how the agent uses the provenance for dont repeat past mistakes reasoning.
Do you want me to do that next?
Lets.. sketch the API endpoints for SLURP + Decision Agent (walk, check, bundle), and write the pseudocode for the walk_back(n, query) scorer and caching strategy.
Got it lets wire the API shape first, then drop into pseudocode for `walk_back()` and its cache layer.
---
## **1. API Endpoints Overview**
Im assuming this is **internal agent-to-agent** API over HTTP or message bus, not user-facing.
### **SLURP (Ingestion)**
| Method | Endpoint | Description |
| ------ | ------------------------ | ------------------------------------------------------------------------------------- |
| `POST` | `/slurp/ingest` | Accepts raw DR payload, runs LLM contextualization, updates DHT & provenance indexes. |
| `GET` | `/slurp/metadata/:dr_id` | Returns metadata cache for DR. |
| `GET` | `/slurp/context/:dr_id` | Returns full decrypted DR (if authorized). |
| `POST` | `/slurp/index/update` | Batch index updates (forward + reverse) from peer nodes. |
---
### **Decision Agent**
| Method | Endpoint | Description |
| ------ | ------------------ | ------------------------------------------------------------------------------------------ |
| `POST` | `/decision/walk` | Upstream or downstream walk in decision space (bounded n-hops). |
| `POST` | `/decision/check` | Checks if concept/decision was tried before or blocked (vector search + provenance check). |
| `POST` | `/decision/bundle` | Returns full provenance bundle (timeline + DR details + constraints). |
---
## **2. Endpoint Shapes**
### `/decision/walk`
[BUBBLE-SNIPPET-09]
---
### `/decision/check`
[BUBBLE-SNIPPET-10]
---
### `/decision/bundle`
[BUBBLE-SNIPPET-11]
---
## **3. `walk_back()` Pseudocode with Scoring**
[BUBBLE-SNIPPET-12]
---
## **4. Caching Strategy**
**Metadata Cache Layout (local KV store)**
Key: `dr:sha256:...`
Value:
[BUBBLE-SNIPPET-13]
**Rules:**
* Cache populated at ingestion time (`/slurp/ingest`) and on first remote fetch.
* TTL: long-lived (e.g., 30 days) unless superseded DR appears.
* Local indexes (`forward_index`, `reverse_index`) reference cache keys.
* LRU eviction if memory pressure occurs, but persistent KV keeps full set.
**Sync:**
* When new DR appears, peer sends:
[BUBBLE-SNIPPET-14]
Peers update both index & cache atomically.
---
If you want, I can now **sketch the minimal persistent store schema** for `metadata_cache`, `forward_index`, and `reverse_index` so you can implement this in RocksDB or SQLite without guesswork. Thats the next logical step if you want this ready for code.
If you want, I can sketch the hybrid layout where RocksDB is your live store and SQLite is a periodically-updated reporting copy for humans and slower agents. That fits SLURP + decision walk patterns perfectly.
Alright hybrid it is.
Well make **RocksDB** the *fast, always-hot* operational store for agents, and **SQLite** the *cold-ish, human-friendly* analytics layer.
---
## **Hybrid Architecture**
### **1. RocksDB: Live KV Store**
**Purpose:**
* Instant lookups during SLURP ingestion & decision walks
* Fast writes when new decisions/edges come in from DHT
**Buckets / Column Families** *(logical separation in RocksDB)*:
1. **`meta`** DR metadata by ID
* Key: `dr:<hash>`
* Value: JSON or protobuf blob of metadata (statement, role exposure, state, tags, timestamp).
2. **`fwd`** Forward index (decision influenced decisions)
* Key: `fwd:<hash>`
* Value: List of DR IDs.
3. **`rev`** Reverse index (decision ancestor decisions)
* Key: `rev:<hash>`
* Value: List of DR IDs.
4. **`constraints`** Global or scoped constraints for quick rule injection.
* Key: `constraint:<role>:<hash>`
* Value: Constraint description blob.
---
### **2. SQLite: Reporting & Analytics Store**
**Purpose:**
* Complex joins, filtering, and historical views for humans & slow LLM analytics.
* Ingests *batches* from RocksDB every few minutes or on demand.
**Schema:**
[BUBBLE-SNIPPET-15]
---
### **3. Data Flow**
**Ingestion (SLURP):**
1. LLM processes incoming DR payload.
2. Writes **metadata**, **forward index**, **reverse index** to RocksDB.
3. Appends to a **write-ahead log** (also in RocksDB or flat file).
**Sync to SQLite:**
* Background job reads WAL or change markers.
* Batch INSERT/UPDATE into SQLite tables.
* Allows filtering, joins, analytics queries.
**Querying:**
* **Agent** calls RocksDB directly for hot lookups (walk\_back, check, bundle).
* **Human/LLM analytics** uses SQLite for deep queries.
---
### **4. Pseudocode for Write + Sync**
[BUBBLE-SNIPPET-16]
---
### **5. Benefits**
RocksDB **fast ingestion** + **fast agent lookups**
SQLite **rich historical queries** + **human-readable reporting**
WAL Guarantees that no DR is dropped between RocksDB hot store and SQLite cold store
Batch sync Avoids hammering SQLite with every single update
---
If you want, I can next **integrate the `walk_back()` scorer with this hybrid store** so that:
* It pulls metadata & indexes from RocksDB (fast)
* Falls back to SQLite if key is missing in RocksDB (cold fetch)
That gives you both speed *and* completeness.
Do you want me to do that next?
Alright heres how we bolt `walk_back()` onto the hybrid RocksDB + SQLite design so it can:
1. **Pull from RocksDB first** (fast, low-latency).
2. **Fallback to SQLite** (for older decisions that may be cold or pruned from RocksDB cache).
3. **Score** results based on relevance to the `query` (semantic + structural weight).
4. **Cache** results so repeated walks dont burn cycles.
---
## **walk\_back(n, query)** — Hybrid Version
### **Algorithm**
1. **Start at a DR ID** (passed in or resolved from query).
2. Traverse **reverse index** in RocksDB collect ancestors.
3. For each ancestor, fetch **metadata** from RocksDB.
4. If missing, fetch from SQLite.
5. Score each ancestor based on:
* **Graph distance** (n-hops)
* **Semantic similarity** to query (vector store / embedding lookup)
* **Constraint impact** (penalize or boost depending on role scope / blocking status)
6. Cache results in RocksDB for future queries.
---
### **Pseudocode**
[BUBBLE-SNIPPET-17]
---
### **Caching Strategy**
* **Cache Key**: `walkcache:{start_id}:{n}:{hash(query)}`
* **Eviction Policy**:
* Keep LRU of last `X` queries in RocksDB.
* Periodic cleanup job to drop caches older than `Y` days.
* **Cache Scope**:
* RocksDB only (agents hit hot cache).
* SQLite is never queried for cache its cold path only.
---
### **Benefits of this setup**
* **Hot path**: All RocksDB (fast).
* **Cold path**: Falls back to SQLite (complete).
* **Semantic + structural scoring**: Mixes RAG-like similarity with graph depth.
* **Constraint-aware**: The scoring penalizes or boosts based on role/goals.
---
If you like, I can **extend this with a "bundle" endpoint** so that after `walk_back()`, the agent can request a **compact decision dossier** with only the most relevant ancestors + constraints, ready for injection into another LLM context window.
Thats basically the decision ancestry briefing pack you were hinting at earlier.
Hell yeah lets build the **Decision Dossier**: a compact, role-filtered, provenance-backed bundle agents (or humans) can drop straight into an LLM prompt or a UI. Below Ill show:
1. the `/decision/bundle` API shape
2. the bundle JSON schema (what the dossier contains)
3. production-ready pseudocode to assemble the bundle (including walk\_back reuse, policy checks, redaction, caching)
4. operational notes (size limits, caching, security, escalation)
Ill be blunt: this is where most systems either bloat the LLM context with garbage or leak private details. Keep the bundle tight, signed, and role-aware.
# /decision/bundle API
**POST** `/decision/bundle`
Request body:
[BUBBLE-SNIPPET-18]
Response: 200
[BUBBLE-SNIPPET-19]
# Bundle JSON schema (concise)
`timeline` element (one item):
[BUBBLE-SNIPPET-20]
# Pseudocode — assemble\_bundle()
[BUBBLE-SNIPPET-21]
Notes on helper functions:
* `walk_back(...)` is the hybrid rocks/sql walk we already defined (returns scored metas).
* `redact_field(value, role)` strips or masks private fields based on `role_exposure` and encryption flags. Do not rely solely on redaction for sensitive info prefer encryption per DR.
* `llm_call()` is an LLM microservice; keep prompts minimal and structured. The LLM proposes actions treat these as suggestions and create a DR for any action taken.
* `is_authorized(role, dr_id)` must check role ACLs and decryption rights. Always audit full\_payload accesses.
# Operational & Safety Details
* **Size caps:** limit bundle to e.g., 40KB of textual content or 25 DR summaries; for larger needs provide `full_payload_refs` only.
* **Cache TTL:** 1272 hours depending on project velocity. Invalidate on new DR appended that links into bundle.
* **Signing & Audit:** every assembled bundle is content-hashed and signed by the assembler (Leader) and stored in the DHT for later audit.
* **Rate-limits & quotas:** limit how often an agent may request `include_full_dr=true` to prevent exfiltration.
* **Human escalation:** any bundle that triggers policy violations or has low-confidence LLM suggestions should set `escalation.required = true` and create an `escalation` DR automatically.
* **Redaction-first policy:** prefer not to include secrets or attachments in bundles; reference UCXL paths. Only allow full DRs when strictly authorized.
# Example minimal bundle (realistic)
[BUBBLE-SNIPPET-22]
# Final notes — practical priorities
* Start small: implement the bundle assembler as a deterministic pipeline that mainly does walk\_back + policy checks + evidence aggregation.
* Add the LLM suggested actions afterwards; treat LLM as non-authoritative until you have human-labeled outcomes and can fine-tune.
* Audit everything. If you want to fine-tune a model later, use these bundles + outcomes as labeled training data.
Want me to (pick one):
* produce the exact request/response JSON schema as an OpenAPI snippet?
* write the Node/Python server skeleton for `/decision/bundle` that ties into your RocksDB + SQLite stores?
* produce a compact LLM prompt template for the `action_suggester` step that minimizes hallucination?
Which one next?
write the skeleton first
Got it. Heres a clean, modular Python FastAPI skeleton for your `/decision/bundle` endpoint, wired into the hybrid RocksDB + SQLite stack and using the earlier pseudocode.
This skeleton covers:
* Request validation
* Hybrid metadata fetch with fallback
* Provenance walk reuse
* Bundle assembly stub (no LLM call included yet)
* Cache read/write in RocksDB
* Basic error handling
[BUBBLE-SNIPPET-23]
---
### What to do next
* Replace `embed()` and `cosine_similarity()` with your actual embedding/vector similarity code.
* Add **authorization** and **full payload fetch + decryption** where marked.
* Plug in your real vector search for query-based anchor resolution.
* Integrate your LLM call for `suggested_actions` and **goal alignment** scoring.
* Harden security (rate limits, audit logs, encryption at rest).
* Add logging & metrics.
---
If you want, I can help with:
* Vector search integration code sample
* Full payload decryption stub
* LLM prompt templates & invocation wrapper
* Unit tests for the bundle endpoint
What next?
I want you to do all 4 in sequence and then get back to producing a compact LLM prompt template for the action_suggester step that minimizes hallucination.
Got it. Ill deliver these in sequence, focused, practical, and production-ready:
---
# 1. Vector Search Integration Sample
Lets assume you use a vector DB (like FAISS or Pinecone) or an embedding library (e.g., OpenAI embeddings or SentenceTransformers). This example uses a local FAISS index with embeddings from SentenceTransformers.
[BUBBLE-SNIPPET-24]
---
# 2. Full Payload Decryption Stub
Add a placeholder for full DR fetch + decrypt from DHT or encrypted store:
[BUBBLE-SNIPPET-25]
---
# 3. LLM Prompt Templates & Invocation Wrapper
Example prompt template for **action\_suggester** that minimizes hallucination:
[BUBBLE-SNIPPET-26]
---
# 4. Unit Tests for `/decision/bundle`
Using pytest and httpx for FastAPI testing:
[BUBBLE-SNIPPET-27]
---
Ill follow up with the **compact hallucination-minimized LLM prompt template** for action\_suggester next. Let me know if you want me to iterate on any of these steps or add deeper integration examples.
Heres a tightly focused, hallucination-resistant LLM prompt template for your **action\_suggester** step. Its designed to:
* Be explicit about the format and expectations
* Provide just enough context, no fluff
* Force JSON-only output with strict structure
* Use role and constraints to narrow scope
* Include examples for clarity
---
[BUBBLE-SNIPPET-28]
---
### How to fill in the variables:
* `{role}` the role of the requesting agent/user (e.g., "engineer", "pm", "legal")
* `{start_statement}` the main decision or issue statement initiating this bundle
* `{timeline}` a numbered, bullet, or newline-separated list of **short** decision summaries, max 5 items (e.g.:
```
1. Rejected Library Z due to license conflicts
2. Budget capped at $10K for new library experiments
3. Previous tests show incompatibility with cloud infra
```
)
* `{constraints}` — list of active constraints/policies relevant to the decision
---
### Why this works:
* The strict JSON output and role+constraints focus *strongly reduce hallucination*
* Explicit instructions on confidence prevent the model from guessing wildly
* The example JSON sets a clear output pattern the LLM can mimic
* Restricting action types ensures semantic consistency and downstream parsing ease
---
If you want, I can help you build a **lightweight parser/validator** for the output JSON to catch common format errors and make sure your system safely ingests the recommendations.
Want me to do that next?
Absolutely — Ill give you a pair of clean, reusable Python functions that you can call from anywhere (like a microservice, CLI tool, or n8n custom node) to:
1. **Send** the action suggestion request to your n8n webhook, with exactly the parameters as per the prompt template.
2. **Receive** and validate the JSON response from the webhook, including basic sanity checks to ensure it matches the expected schema.
This will let you easily iterate and tune your prompt in n8n without breaking your app integration.
---
### 1. Send to n8n webhook function
[BUBBLE-SNIPPET-32]
---
### 2. Example receive handler for n8n webhook (Python Flask style)
If you want a sample minimal HTTP handler that n8n can call internally or externally (or simulate) — heres how you might do it on your server side:
[BUBBLE-SNIPPET-33]
---
### How to wire this into n8n
* Create an **HTTP Request** node in your workflow to **call your Python microservices `/n8n-action-suggester` endpoint** or any LLM API endpoint.
* The input JSON should be the payload structure defined above.
* The output from that node will be a parsed JSON list of actions, which you can pass downstream for display, notification, or recording.
---
If you want, I can also draft a **ready-to-import n8n workflow JSON snippet** for this, including an HTTP Request node plus example trigger.
Want that next?
Heres a minimal n8n workflow JSON snippet you can import directly, illustrating:
* A **Webhook Trigger** node to start the workflow
* An **HTTP Request** node calling your external Python microservice (or any LLM endpoint) for action suggestions
* A **Set** node that outputs the response for easy testing
This snippet assumes your external action suggester service is reachable at `http://localhost:5000/n8n-action-suggester`. Adjust the URL as needed.
[BUBBLE-SNIPPET-34]
---
### How to use:
1. Import this JSON in n8n (top-right menu → Import from file / paste JSON).
2. Replace the webhook node's `"webhookId"` with your generated webhook ID or leave it to auto-generate.
3. Update the URL in the **Call Action Suggester** node to your real Python microservice or LLM API URL.
4. Trigger the webhook (e.g., `POST /webhook/action-suggester`) with JSON payload like:
```json
{
"role": "engineer",
"start_statement": "We need to decide on the streaming library.",
"timeline": [
"Rejected Library Z due to license conflicts",
"Budget capped at $10K for new library experiments",
"Previous tests show incompatibility with cloud infra"
],
"constraints": [
"Budget under $10K",
"AWS-only infrastructure"
]
}
```
5. The response will be captured and output as `suggested_actions` in the **Set Output** node.
---
If you want, I can help extend this with:
* Validation nodes to verify webhook input
* Retry or error handling for the HTTP Request node
* Formatting outputs for Slack/Email notifications or database storage
Just say the word.
Lets go back to the produce the exact request/response JSON schema as an OpenAPI snippet for the decision bundle
Heres a concise, practical OpenAPI 3.0 snippet defining the `/decision/bundle` endpoints request and response JSON schemas based on everything we discussed. Its ready to plug into your OpenAPI spec or Swagger UI.
[BUBBLE-SNIPPET-35]
---
### Notes:
* The `oneOf` on request requires either `start_id` or `query`.
* Response bundles keep sensitive info redacted by default; full payload references allow lazy fetch if authorized.
* `timeline` is an ordered list of decision records closest to the anchor, with scores.
* `goal_alignment` and `suggested_actions` support future LLM integration outputs.
* `escalation` flags when human attention is required.
---
If you want, I can help generate a **full OpenAPI YAML file** around this snippet or create a **Postman collection** for testing. Just say.

41
src/MANIFEST.md Normal file
View File

@@ -0,0 +1,41 @@
# BUBBLE Project Snippet Manifest
This file maps the reference keys in `BUBBLE-PROJECT-CONVERSATION.md` to the extracted source files.
| Reference Key | Source File |
|---|---|
| `BUBBLE-SNIPPET-01` | `src/01-dr-envelope.jsonc` |
| `BUBBLE-SNIPPET-02` | `src/02-dr-payload.jsonc` |
| `BUBBLE-SNIPPET-03` | `src/03-policy-store-example.jsonc` |
| `BUBBLE-SNIPPET-04` | `src/04-forward-index.jsonc` |
| `BUBBLE-SNIPPET-05` | `src/05-reverse-index.jsonc` |
| `BUBBLE-SNIPPET-06` | `src/06-provenance-walk.py` |
| `BUBBLE-SNIPPET-07` | `src/07-metadata-cache.jsonc` |
| `BUBBLE-SNIPPET-08` | `src/08-fast-check.py` |
| `BUBBLE-SNIPPET-09` | `src/09-decision-walk-request.jsonc` |
| `BUBBLE-SNIPPET-10` | `src/10-decision-walk-response.jsonc` |
| `BUBBLE-SNIPPET-11` | `src/11-decision-check-request.jsonc` |
| `BUBBLE-SNIPPET-12` | `src/12-decision-check-response.jsonc` |
| `BUBBLE-SNIPPET-13` | `src/13-decision-bundle-request.jsonc` |
| `BUBBLE-SNIPPET-14` | `src/14-decision-bundle-response.jsonc` |
| `BUBBLE-SNIPPET-15` | `src/15-walk-back-pseudocode.py` |
| `BUBBLE-SNIPPET-16` | `src/16-metadata-cache-layout.jsonc` |
| `BUBBLE-SNIPPET-17` | `src/17-peer-sync.json` |
| `BUBBLE-SNIPPET-18` | `src/18-rocksdb-sqlite-schema.sql` |
| `BUBBLE-SNIPPET-19` | `src/19-write-sync-pseudocode.py` |
| `BUBBLE-SNIPPET-20` | `src/20-walk-back-hybrid.py` |
| `BUBBLE-SNIPPET-21` | `src/21-bundle-api-request.json` |
| `BUBBLE-SNIPPET-22` | `src/22-bundle-api-response.json` |
| `BUBBLE-SNIPPET-23` | `src/23-bundle-timeline-element.json` |
| `BUBBLE-SNIPPET-24` | `src/24-assemble-bundle-pseudocode.py` |
| `BUBBLE-SNIPPET-25` | `src/25-minimal-bundle-example.json` |
| `BUBBLE-SNIPPET-26` | `src/26-fastapi-skeleton.py` |
| `BUBBLE-SNIPPET-27` | `src/27-vector-search-integration.py` |
| `BUBBLE-SNIPPET-28` | `src/28-decryption-stub.py` |
| `BUBBLE-SNIPPET-29` | `src/29-llm-prompt-template.py` |
| `BUBBLE-SNIPPET-30` | `src/30-unit-tests.py` |
| `BUBBLE-SNIPPET-31` | `src/31-llm-prompt-template.txt` |
| `BUBBLE-SNIPPET-32` | `src/32-n8n-webhook-send.py` |
| `BUBBLE-SNIPPET-33` | `src/33-flask-handler.py` |
| `BUBBLE-SNIPPET-34` | `src/34-n8n-workflow.json` |
| `BUBBLE-SNIPPET-35` | `src/35-openapi-spec.yaml` |

View File

@@ -0,0 +1,8 @@
// POST body
{
"start_id": "dr:sha256:abc123",
"direction": "upstream", // or "downstream"
"max_hops": 3,
"role": "engineer",
"filter_state": ["rejected", "approved"]
}

View File

@@ -0,0 +1,13 @@
// response
{
"start_id": "dr:sha256:abc123",
"visited": [
{
"id": "dr:sha256:prev1",
"relation": "influenced_by",
"statement": "Rejected libZ due to licensing",
"timestamp": "2025-06-10T09:15:00Z"
},
...
]
}

View File

@@ -0,0 +1,5 @@
// POST body
{
"query": "use library Z for cloud deployment",
"role": "engineer"
}

View File

@@ -0,0 +1,6 @@
// response
{
"tried_before": true,
"matched_id": "dr:sha256:prev1",
"reason": "Rejected libZ due to incompatible licensing with SaaS model"
}

View File

@@ -0,0 +1,6 @@
// POST body
{
"start_id": "dr:sha256:abc123",
"max_hops": 3,
"role": "pm"
}

View File

@@ -0,0 +1,20 @@
// response
{
"timeline": [
{
"id": "dr:sha256:prev3",
"timestamp": "...",
"statement": "...",
"constraints": ["budget < $10k"]
},
...
],
"constraints_summary": [
"Budgetary constraint prevents line of reasoning",
"Cloud infra incompatible with library Z"
],
"goal_alignment": {
"project_goal": "...",
"alignment_score": 0.87
}
}

View File

@@ -0,0 +1,9 @@
{
"start_id": "dr:sha256:abc123", // or null if query-based anchor
"query": "evaluate using lib Z for streaming",
"role": "engineer",
"max_hops": 3,
"top_k": 10,
"include_full_dr": false, // whether to include decrypted full payloads (auth required)
"redaction": true // apply role-based redaction rules
}

View File

@@ -0,0 +1,14 @@
{
"bundle_id": "bundle:sha256:...",
"start_id": "dr:sha256:abc123",
"generated_at": "2025-08-11T...",
"summary": "Short human-readable summary of provenance & decision state",
"timeline": [ ... ], // ordered list of ancestor DR summaries
"constraints_summary": [...],
"key_evidence_refs": [...], // UCXL addresses
"goal_alignment": { "score": 0.82, "reasons": [...] },
"suggested_actions": [...],
"escalation": { "required": false, "who": [] },
"signatures": [...], // leader + assembler sign
"cache_hit": true|false
}

View File

@@ -0,0 +1,241 @@
from fastapi import FastAPI, HTTPException
from pydantic import BaseModel, Field
from typing import Optional, List
import json
import hashlib
import time
import rocksdb
import sqlite3
app = FastAPI()
# === RocksDB setup ===
rocks = rocksdb.DB("rocksdb_data", rocksdb.Options(create_if_missing=True))
# === SQLite setup ===
sqlite_conn = sqlite3.connect("decisions.db")
sqlite_conn.row_factory = sqlite3.Row
# --- Models ---
class BundleRequest(BaseModel):
start_id: Optional[str] = None
query: Optional[str] = None
role: str = Field(..., example="engineer")
max_hops: int = Field(3, ge=1, le=10)
top_k: int = Field(10, ge=1, le=50)
include_full_dr: bool = False
redaction: bool = True
class DRMetadata(BaseModel):
id: str
statement: str
lifecycle_state: Optional[str]
role_exposure: dict
tags: List[str] = []
timestamp: str
relation: Optional[str] = None
score: Optional[float] = None
# --- Helpers ---
def serialize(obj) -> bytes:
return json.dumps(obj).encode("utf-8")
def deserialize(data: bytes):
return json.loads(data.decode("utf-8"))
def sha256_of_content(content: str) -> str:
return hashlib.sha256(content.encode("utf-8")).hexdigest()
def now_iso() -> str:
return time.strftime("%Y-%m-%dT%H:%M:%SZ", time.gmtime())
def get_metadata(dr_id: str) -> Optional[dict]:
# Try RocksDB
val = rocks.get(f"meta:{dr_id}".encode())
if val:
return deserialize(val)
# Fallback SQLite
row = sqlite_conn.execute(
"SELECT statement, lifecycle_state, role_exposure, tags, timestamp FROM decisions WHERE id=?",
(dr_id,),
).fetchone()
if row:
return {
"id": dr_id,
"statement": row["statement"],
"lifecycle_state": row["lifecycle_state"],
"role_exposure": json.loads(row["role_exposure"]),
"tags": json.loads(row["tags"]),
"timestamp": row["timestamp"],
}
return None
def get_ancestors(dr_id: str) -> List[str]:
# RocksDB reverse index fallback to SQLite edges
val = rocks.get(f"rev:{dr_id}".encode())
if val:
return deserialize(val)
rows = sqlite_conn.execute(
"SELECT source_id FROM edges WHERE target_id=? AND relation='influences'",
(dr_id,),
).fetchall()
return [r[0] for r in rows]
def walk_back(
start_id: str, n: int, role: str, top_k: int
) -> List[DRMetadata]:
from collections import deque
import heapq
visited = set()
results = []
queue = deque([(start_id, 0)])
# Dummy embed/sim functions — replace with your actual implementations
def embed(text): return text
def cosine_similarity(a, b): return 1.0 if a == b else 0.5
query_vec = embed("") # empty here — extend as needed
while queue:
dr_id, depth = queue.popleft()
if dr_id in visited or depth > n:
continue
visited.add(dr_id)
meta = get_metadata(dr_id)
if not meta:
continue
# Role exposure filter
if role and not meta["role_exposure"].get(role, False):
continue
# Score heuristic: favor close ancestors and "approved" states
dist_score = max(0, (n - depth) / n)
state_bonus = 1.1 if meta.get("lifecycle_state") == "approved" else 1.0
score = dist_score * state_bonus
heapq.heappush(results, (-score, DRMetadata(**meta, score=score)))
for anc in get_ancestors(dr_id):
queue.append((anc, depth + 1))
# Return top_k sorted descending by score
sorted_results = [md for _, md in heapq.nsmallest(top_k, results)]
return sorted_results
def assemble_bundle(
start_id: str,
role: str,
max_hops: int,
top_k: int,
include_full_dr: bool,
redact: bool,
query: Optional[str] = None,
) -> dict:
# TODO: integrate query embedding and refined scoring later
timeline = []
evidence_refs = set()
ancestors = walk_back(start_id, max_hops, role, top_k)
for meta in ancestors:
item = {
"id": meta.id,
"timestamp": meta.timestamp,
"statement": meta.statement if not redact else redact_field(meta.statement, role),
"lifecycle_state": meta.lifecycle_state,
"score": meta.score,
"role_exposure": meta.role_exposure,
"tags": meta.tags,
"relation_to_start": meta.relation,
# "full_payload": ... fetch + decrypt if include_full_dr and authorized
# For now omitted for brevity
}
timeline.append(item)
# Collect evidence refs from meta if available (stub)
# evidence_refs.update(meta.get("evidence_refs", []))
summary = f"Decision bundle starting at {start_id} with {len(timeline)} items."
bundle_content = {
"bundle_id": f"bundle:sha256:{sha256_of_content(summary)}",
"start_id": start_id,
"generated_at": now_iso(),
"summary": summary,
"timeline": timeline,
"constraints_summary": [],
"key_evidence_refs": list(evidence_refs),
"goal_alignment": {},
"suggested_actions": [],
"escalation": {"required": False, "who": []},
"signatures": [],
"cache_hit": False,
}
return bundle_content
def redact_field(text: str, role: str) -> str:
# Stub: redact sensitive info based on role
# Replace with your own policy
if role == "pm":
return text
else:
return text.replace("SECRET", "[REDACTED]")
def cache_get(key: str):
val = rocks.get(key.encode())
if val:
return deserialize(val)
return None
def cache_set(key: str, value: dict):
rocks.put(key.encode(), serialize(value))
# --- Endpoint ---
@app.post("/decision/bundle")
async def decision_bundle(req: BundleRequest):
# Validate inputs
if not req.start_id and not req.query:
raise HTTPException(status_code=400, detail="start_id or query required")
# Resolve anchor if query only
start_id = req.start_id
if not start_id:
# Placeholder: your vector search to find anchor DR by query
# For now raise error
raise HTTPException(status_code=400, detail="Query-based anchor resolution not implemented")
cache_key = f"bundle:{start_id}:{req.role}:{req.max_hops}:{hash(req.query or '')}:{req.top_k}"
cached_bundle = cache_get(cache_key)
if cached_bundle:
cached_bundle["cache_hit"] = True
return cached_bundle
bundle = assemble_bundle(
start_id=start_id,
role=req.role,
max_hops=req.max_hops,
top_k=req.top_k,
include_full_dr=req.include_full_dr,
redact=req.redaction,
query=req.query,
)
cache_set(cache_key, bundle)
return bundle

View File

@@ -0,0 +1,207 @@
paths:
/decision/bundle:
post:
summary: Generate a decision provenance bundle tailored to a role
requestBody:
required: true
content:
application/json:
schema:
$ref: '#/components/schemas/DecisionBundleRequest'
responses:
'200':
description: Decision bundle assembled successfully
content:
application/json:
schema:
$ref: '#/components/schemas/DecisionBundleResponse'
'400':
description: Invalid request parameters
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
'500':
description: Internal server error
content:
application/json:
schema:
$ref: '#/components/schemas/ErrorResponse'
components:
schemas:
DecisionBundleRequest:
type: object
properties:
start_id:
type: string
description: >
The starting decision record ID to anchor the bundle.
Optional if 'query' is provided.
query:
type: string
description: >
Search query to find a decision anchor if start_id not provided.
role:
type: string
description: Role requesting the bundle (e.g. engineer, pm)
max_hops:
type: integer
description: Max ancestry depth to walk back
minimum: 1
maximum: 10
default: 3
top_k:
type: integer
description: Max number of decision records to include
minimum: 1
maximum: 50
default: 10
include_full_dr:
type: boolean
description: Whether to include decrypted full decision record payloads (auth required)
default: false
redaction:
type: boolean
description: Whether to apply role-based redaction to sensitive fields
default: true
oneOf:
- required: ["start_id"]
- required: ["query"]
additionalProperties: false
DecisionRecordSummary:
type: object
properties:
id:
type: string
description: Unique decision record ID (e.g. dr:sha256:...)
timestamp:
type: string
format: date-time
type:
type: string
enum: [experiment, decision, rejection, escalation]
description: Type of decision record
statement:
type: string
description: Brief statement of the decision or rationale
rationale:
type: string
description: Explanation or reasoning (may be redacted)
lifecycle_state:
type: string
enum: [proposed, approved, rejected, deprecated]
relation_to_start:
type: string
enum: [influenced_by, derived_from, unrelated]
score:
type: number
format: float
description: Relevance or confidence score (0.0 - 1.0)
tags:
type: array
items:
type: string
role_exposure:
type: object
additionalProperties:
type: boolean
description: Role access flags for redaction control
evidence_refs:
type: array
items:
type: string
description: UCXL address or DHT reference
full_payload_ref:
type: string
description: Reference to full payload if included (optional)
GoalAlignment:
type: object
properties:
score:
type: number
format: float
description: Alignment score with project goals (0.0 - 1.0)
reasons:
type: array
items:
type: string
description: Brief reasons explaining the alignment score
SuggestedAction:
type: object
properties:
type:
type: string
enum: [experiment, research, escalate, review, defer]
description:
type: string
assignee:
type: string
confidence:
type: number
format: float
minimum: 0
maximum: 1
Escalation:
type: object
properties:
required:
type: boolean
who:
type: array
items:
type: string
description: Roles or teams to escalate to
DecisionBundleResponse:
type: object
properties:
bundle_id:
type: string
description: Unique ID for the bundle
start_id:
type: string
description: The starting decision record ID
generated_at:
type: string
format: date-time
summary:
type: string
description: Human-readable summary of the bundle content
timeline:
type: array
items:
$ref: '#/components/schemas/DecisionRecordSummary'
constraints_summary:
type: array
items:
type: string
key_evidence_refs:
type: array
items:
type: string
goal_alignment:
$ref: '#/components/schemas/GoalAlignment'
suggested_actions:
type: array
items:
$ref: '#/components/schemas/SuggestedAction'
escalation:
$ref: '#/components/schemas/Escalation'
signatures:
type: array
items:
type: string
cache_hit:
type: boolean
description: Whether the bundle was served from cache
ErrorResponse:
type: object
properties:
error:
type: string

87
src/api/api.go Normal file
View File

@@ -0,0 +1,87 @@
package api
import (
"encoding/json"
"fmt"
"net/http"
"time"
"gitea.deepblack.cloud/chorus/bubble/core"
"gitea.deepblack.cloud/chorus/bubble/models"
"gitea.deepblack.cloud/chorus/bubble/storage"
)
// Server holds the dependencies for the API server.
type Server struct {
Store storage.Storage
}
// NewServer creates a new API server.
func NewServer(store storage.Storage) *Server {
return &Server{Store: store}
}
// Start begins listening for HTTP requests.
func (s *Server) Start(addr string) error {
http.HandleFunc("/decision/bundle", s.handleDecisionBundle)
return http.ListenAndServe(addr, nil)
}
// handleDecisionBundle is the handler for the /decision/bundle endpoint.
func (s *Server) handleDecisionBundle(w http.ResponseWriter, r *http.Request) {
if r.Method != http.MethodPost {
http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed)
return
}
var req models.DecisionBundleRequest
if err := json.NewDecoder(r.Body).Decode(&req); err != nil {
http.Error(w, err.Error(), http.StatusBadRequest)
return
}
if req.StartID == "" && req.Query == "" {
http.Error(w, "start_id or query is required", http.StatusBadRequest)
return
}
// --- Core Logic ---
// Use StartID for now. Query-based anchor resolution will be added later.
startNode, err := s.Store.GetDecisionMetadata(req.StartID)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to get start node: %v", err), http.StatusInternalServerError)
return
}
if startNode == nil {
http.Error(w, "Start node not found", http.StatusNotFound)
return
}
// Perform the provenance walk.
timeline, err := core.WalkBack(s.Store, req.StartID, req.MaxHops, req.Role, req.TopK)
if err != nil {
http.Error(w, fmt.Sprintf("Failed to walk provenance graph: %v", err), http.StatusInternalServerError)
return
}
// Assemble the response bundle.
// This is a simplified version of the logic in the blueprint.
response := models.DecisionBundleResponse{
BundleID: fmt.Sprintf("bundle:%s", req.StartID), // Simplified ID
StartID: req.StartID,
GeneratedAt: time.Now().UTC().Format(time.RFC3339),
Summary: fmt.Sprintf("Decision bundle for %s, found %d ancestors.", req.StartID, len(timeline)),
Timeline: timeline,
ConstraintsSummary: []string{}, // Placeholder
KeyEvidenceRefs: []string{}, // Placeholder
GoalAlignment: models.GoalAlignment{}, // Placeholder
SuggestedActions: []models.SuggestedAction{}, // Placeholder
Escalation: models.Escalation{}, // Placeholder
CacheHit: false, // Caching not yet implemented
}
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(response); err != nil {
http.Error(w, err.Error(), http.StatusInternalServerError)
}
}

BIN
src/bubble.db Normal file

Binary file not shown.

BIN
src/bubble_server Executable file

Binary file not shown.

View File

@@ -0,0 +1,18 @@
{
"envelope_version": "1.0",
"payload": { /* decision_record */ },
"signatures": [
{
"actor_id": "user:alice",
"role": "sys-arch",
"signature": "BASE64-SIG-OF-PAYLOAD"
}
],
"encryption": {
"alg": "age",
"recipients": [
"age1...",
"age1..."
]
}
}

View File

@@ -0,0 +1,59 @@
{
"id": "dr:sha256:...", // immutable unique ID
"ucxl_path": "ucxl://project:foo@repo:main/path/to/doc.md",
"type": "decision", // decision|proposal|experiment|rejection
"lifecycle_state": "active", // active|superseded|rejected|archived
"timestamp": "2025-08-11T07:30:00Z",
"actor": {
"id": "user:alice",
"role": "sys-arch",
"org_unit": "infrastructure"
},
"statement": "We will adopt crate X for parsing JSON streams.",
"rationale": "Performance improvement and lower memory footprint.",
"alternatives": [
{
"id": "dr:sha256:alt1...",
"statement": "Use crate Y",
"reason": "Legacy support",
"rejected": true
}
],
"evidence": [
"ucxl://project:foo@repo:main/docs/benchmarks.md",
"ucxl://project:foo@repo:main/reports/compatibility.json"
],
"metrics": {
"cost_estimate": { "hours": 40, "budget": 0 },
"confidence": 0.67
},
"constraints": [
"policy:budget-l1",
"policy:cloud-aws-only"
],
"provenance": {
"influenced_by": [
"dr:sha256:prev1...",
"dr:sha256:prev2..."
],
"derived_from": [],
"supersedes": []
},
"embeddings_id": "vec:abc123",
"tags": ["infra", "parser", "performance"],
"role_exposure": {
"engineer": true,
"pm": true,
"research": false
}
}

View File

@@ -0,0 +1,9 @@
{
"id": "policy:budget-l1",
"description": "Budget limit for experimental features in project Foo",
"applies_to": ["project:foo"],
"rules": {
"max_budget": 10000,
"currency": "USD"
}
}

View File

@@ -0,0 +1,43 @@
def walk_back(dr_id, max_hops=3, role=None, filter_state=None):
"""
Walks upstream in the decision space, returning relevant ancestors.
"""
visited = set()
results = []
# BFS queue
queue = [(dr_id, 0)]
while queue:
current_id, depth = queue.pop(0)
if current_id in visited or depth > max_hops:
continue
visited.add(current_id)
# fetch upstream edges from reverse index
edges = reverse_index.get(current_id, [])
for edge in edges:
ancestor_id = edge["source"]
# fetch DR metadata (not full payload)
meta = get_dr_metadata(ancestor_id)
# role-based filter
if role and not meta["role_exposure"].get(role, False):
continue
# state filter
if filter_state and meta["lifecycle_state"] != filter_state:
continue
results.append({
"id": ancestor_id,
"relation": edge["relation"],
"statement": meta["statement"],
"timestamp": meta["timestamp"]
})
queue.append((ancestor_id, depth + 1))
return results

10
src/core/08-fast-check.py Normal file
View File

@@ -0,0 +1,10 @@
def tried_before(query_vec, tags=None, role=None):
candidates = vector_search(query_vec, top_k=10)
for dr in candidates:
meta = get_dr_metadata(dr["id"])
if role and not meta["role_exposure"].get(role, False):
continue
if meta["lifecycle_state"] in ("rejected", "superseded"):
return True, dr["id"], meta["statement"]
return False, None, None

View File

@@ -0,0 +1,53 @@
def walk_back(start_id, n_hops=3, role=None, filter_state=None):
visited = set()
results = []
queue = [(start_id, 0)]
while queue:
current_id, depth = queue.pop(0)
if current_id in visited or depth > n_hops:
continue
visited.add(current_id)
edges = reverse_index.get(current_id, [])
for edge in edges:
ancestor_id = edge["source"]
meta = metadata_cache.get(ancestor_id)
if not meta:
meta = fetch_metadata_from_dht(ancestor_id)
metadata_cache[ancestor_id] = meta # Cache store
# Role filter
if role and not meta["role_exposure"].get(role, False):
continue
# State filter
if filter_state and meta["lifecycle_state"] not in filter_state:
continue
score = provenance_score(meta, depth)
results.append({
"id": ancestor_id,
"relation": edge["relation"],
"statement": meta["statement"],
"timestamp": meta["timestamp"],
"score": score
})
queue.append((ancestor_id, depth + 1))
# Sort results by score descending
results.sort(key=lambda r: r["score"], reverse=True)
return results
def provenance_score(meta, depth):
"""Heuristic scoring for relevance."""
score = 1.0 / (depth + 1) # closer ancestors score higher
if meta["lifecycle_state"] == "rejected":
score *= 0.9
if "critical" in meta.get("tags", []):
score *= 1.2
return score

View File

@@ -0,0 +1,77 @@
from collections import deque
import heapq
def walk_back(start_id, n, query):
visited = set()
results = []
queue = deque([(start_id, 0)]) # (dr_id, hops)
query_vec = embed(query) # Precompute embedding for semantic scoring
while queue:
dr_id, depth = queue.popleft()
if dr_id in visited or depth > n:
continue
visited.add(dr_id)
# Get metadata from RocksDB first, fallback to SQLite
metadata = get_metadata(dr_id)
if not metadata:
continue
# Score
sim_score = cosine_similarity(query_vec, embed(metadata['statement']))
dist_score = max(0, (n - depth) / n) # Closer ancestors score higher
constraint_penalty = -0.2 if metadata.get("blocked") else 0
total_score = (0.6 * sim_score) + (0.3 * dist_score) + constraint_penalty
heapq.heappush(results, (-total_score, metadata)) # Max-heap
# Traverse ancestors
for anc_id in get_ancestors(dr_id):
queue.append((anc_id, depth + 1))
# Return top results sorted
sorted_results = [md for _, md in sorted(results)]
cache_walk_results(start_id, n, query, sorted_results)
return sorted_results
def get_metadata(dr_id):
val = rocks.get(f"meta:{dr_id}")
if val:
return deserialize(val)
# Fallback to SQLite
row = sqlite_conn.execute("""
SELECT statement, lifecycle_state, role_exposure, tags, timestamp
FROM decisions WHERE id=?
""", (dr_id,)).fetchone()
if row:
return {
"id": dr_id,
"statement": row[0],
"lifecycle_state": row[1],
"role_exposure": json.loads(row[2]),
"tags": json.loads(row[3]),
"timestamp": row[4]
}
return None
def get_ancestors(dr_id):
val = rocks.get(f"rev:{dr_id}")
if val:
return deserialize(val)
# Fallback to SQLite edges
rows = sqlite_conn.execute("""
SELECT source_id FROM edges
WHERE target_id=? AND relation='influences'
""", (dr_id,)).fetchall()
return [r[0] for r in rows]
def cache_walk_results(start_id, n, query, results):
cache_key = f"walkcache:{start_id}:{n}:{hash(query)}"
rocks.put(cache_key, serialize(results))

View File

@@ -0,0 +1,14 @@
{
"id": "dr:sha256:prev1",
"timestamp": "...",
"type": "experiment|decision|rejection",
"statement": "Rejected lib Z due to license",
"rationale": "Incompatible with SaaS licensing",
"lifecycle_state": "rejected",
"relation_to_start": "influenced_by|derived_from",
"score": 0.93,
"tags": ["licensing","cloud"],
"role_exposure": {"engineer": true, "pm": true},
"evidence_refs": ["ucxl://..."],
"full_payload_ref": "ucxl://.../dr:sha256:prev1" // only if include_full_dr=true and authorized
}

View File

@@ -0,0 +1,97 @@
def assemble_bundle(start_id=None, query=None, role="engineer",
max_hops=3, top_k=10, include_full_dr=False, redact=True):
# 0. anchor resolution
if not start_id:
anchors = vector_search(query, top_k=3) # maybe return DR ids
if not anchors:
return {"error":"no anchors found"}
start_id = anchors[0]["id"]
# 1. try cache
cache_key = f"bundle:{start_id}:{role}:{max_hops}:{hash(query)}:{top_k}"
cached = rocks.get(cache_key)
if cached:
return cached # signed object
# 2. provenance walk
ancestors = walk_back(start_id, n=max_hops, query=query, role=role, top_k=top_k)
# walk_back returns sorted list of metadata objects with scores
# 3. fetch required metadata and optionally full payloads
timeline = []
evidence_refs = set()
constraints_violations = set()
for meta in ancestors[:top_k]:
# meta is fetched from RocksDB or SQLite (fast)
item = {
"id": meta["id"],
"timestamp": meta["timestamp"],
"type": meta.get("type"),
"statement": redact_field(meta["statement"], role) if redact else meta["statement"],
"rationale": redact_field(meta.get("rationale",""), role) if redact else meta.get("rationale",""),
"lifecycle_state": meta.get("lifecycle_state"),
"relation_to_start": meta.get("relation"),
"score": meta.get("score"),
"tags": meta.get("tags", []),
"role_exposure": meta.get("role_exposure", {})
}
if include_full_dr and is_authorized(role, meta["id"]):
item["full_payload"] = fetch_and_decrypt_dr(meta["id"]) # heavy, audited
else:
item["full_payload_ref"] = meta.get("ucxl_path")
timeline.append(item)
for c in meta.get("constraints", []):
if violates_policy(c):
constraints_violations.add(c)
evidence_refs.update(meta.get("evidence", []))
# 4. constraints summary (human readable)
constraints_summary = [render_policy_summary(c) for c in constraints_violations]
# 5. goal alignment (heuristic)
project_goal_vec = get_project_goal_embedding() # from project metadata
alignment_scores = []
for t in timeline:
alignment_scores.append( cosine_similarity(project_goal_vec, embed(t["statement"])) )
goal_alignment_score = mean(alignment_scores)
goal_reasons = top_reasons_from_timeline(timeline, project_goal_vec, n=3)
# 6. suggested actions (LLM-assisted)
# Provide LLM with:
# - start statement
# - concise timeline (n items)
# - constraints summary
# Ask: "Given constraints, what next steps? Who to ask? What to avoid?"
llm_prompt = build_prompt_for_actions(start_statement=timeline[0]["statement"],
timeline=timeline[:5],
constraints=constraints_summary,
role=role)
suggested_actions = llm_call("action_suggester", llm_prompt, max_tokens=400)
# LLM output should be parsed into structured actions {type, desc, assignee, confidence}
# 7. escalation check
escalation = {"required": False, "who": []}
if any("budget" in c for c in constraints_violations) and goal_alignment_score > 0.8:
escalation = {"required": True, "who": ["pm","finance"]}
# 8. assemble, sign, cache
bundle = {
"bundle_id": "bundle:sha256:"+sha256_of_content(...),
"start_id": start_id,
"generated_at": now_iso(),
"summary": auto_summary_from_timeline(timeline[:5]),
"timeline": timeline,
"constraints_summary": constraints_summary,
"key_evidence_refs": list(evidence_refs)[:10],
"goal_alignment": {"score": goal_alignment_score, "reasons": goal_reasons},
"suggested_actions": suggested_actions,
"escalation": escalation,
"signatures": [signer_sig("leader"), signer_sig("assembler")],
"cache_hit": False
}
rocks.put(cache_key, serialize(bundle))
return bundle

View File

@@ -0,0 +1,26 @@
{
"bundle_id": "bundle:sha256:deadbeef",
"start_id": "dr:sha256:abc123",
"generated_at": "2025-08-11T09:00:00Z",
"summary": "Library Z was previously rejected (licensing). Budget constraints apply. Engineering suggests an alternative X.",
"timeline": [
{
"id": "dr:sha256:prev1",
"timestamp": "2025-06-10T09:15:00Z",
"statement": "Rejected lib Z due to restrictive SaaS licensing",
"lifecycle_state": "rejected",
"relation_to_start": "influenced_by",
"score": 0.93,
"evidence_refs": ["ucxl://.../license_report.md"]
}
],
"constraints_summary": ["policy:budget-l1 (max $10k)","policy:cloud-aws-only"],
"key_evidence_refs": ["ucxl://.../license_report.md"],
"goal_alignment": {"score": 0.72, "reasons":["performance vs cost tradeoff"]},
"suggested_actions": [
{"type":"experiment","desc":"Run compatibility shim test with lib X","assignee":"eng-team","confidence":0.6}
],
"escalation": {"required": true, "who": ["pm","legal"]},
"signatures": ["leader:SIG...","assembler:SIG..."],
"cache_hit": false
}

129
src/core/bundle.go Normal file
View File

@@ -0,0 +1,129 @@
package core
import (
"container/heap"
"gitea.deepblack.cloud/chorus/bubble/models"
"gitea.deepblack.cloud/chorus/bubble/storage"
)
// --- Priority Queue for Scoring ---
// An Item is something we manage in a priority queue.
type scoredItem struct {
summary models.DecisionRecordSummary
priority float64 // The priority of the item in the queue.
index int // The index of the item in the heap.
}
// A PriorityQueue implements heap.Interface and holds Items.
type PriorityQueue []*scoredItem
func (pq PriorityQueue) Len() int { return len(pq) }
func (pq PriorityQueue) Less(i, j int) bool {
// We want Pop to give us the highest, not lowest, priority so we use greater than here.
return pq[i].priority > pq[j].priority
}
func (pq PriorityQueue) Swap(i, j int) {
pq[i], pq[j] = pq[j], pq[i]
pq[i].index = i
pq[j].index = j
}
func (pq *PriorityQueue) Push(x interface{}) {
n := len(*pq)
item := x.(*scoredItem)
item.index = n
*pq = append(*pq, item)
}
func (pq *PriorityQueue) Pop() interface{} {
old := *pq
n := len(old)
item := old[n-1]
old[n-1] = nil // avoid memory leak
item.index = -1 // for safety
*pq = old[0 : n-1]
return item
}
// --- Core Logic ---
// WalkBack performs a scored, role-aware walk of the provenance graph.
func WalkBack(store storage.Storage, startID, query, role string, maxHops, topK int) ([]models.DecisionRecordSummary, error) {
visited := make(map[string]bool)
pq := make(PriorityQueue, 0)
heap.Init(&pq)
queue := []struct {
id string
depth int
}{{startID, 0}}
visited[startID] = true
// Placeholder for query embedding
// queryVec := embed(query)
for len(queue) > 0 {
current := queue[0]
queue = queue[1:]
if current.depth > maxHops {
continue
}
// Fetch metadata for the current node
meta, err := store.GetDecisionMetadata(current.id)
if err != nil {
// Log or handle error, maybe continue
continue
}
if meta == nil {
continue
}
// Role-based filtering
if exposure, ok := meta.RoleExposure[role]; !ok || !exposure {
// If role not in map or is false, skip
// (Unless it's a global admin role, etc. - logic can be added here)
}
// --- Scoring ---
// simScore := cosineSimilarity(queryVec, embed(meta.Statement))
simScore := 0.5 // Placeholder value
distScore := float64(maxHops-current.depth) / float64(maxHops)
constraintPenalty := 0.0 // Placeholder
totalScore := (0.6*simScore) + (0.3*distScore) + constraintPenalty
meta.Score = totalScore
heap.Push(&pq, &scoredItem{summary: *meta, priority: totalScore})
// --- Traverse Ancestors ---
ancestors, err := store.GetAncestors(current.id)
if err != nil {
// Log or handle error
continue
}
for _, ancID := range ancestors {
if !visited[ancID] {
visited[ancID] = true
queue = append(queue, struct {
id string
depth int
}{ancID, current.depth + 1})
}
}
}
// Extract top K results from the priority queue
var results []models.DecisionRecordSummary
for i := 0; i < topK && pq.Len() > 0; i++ {
item := heap.Pop(&pq).(*scoredItem)
results = append(results, item.summary)
}
return results, nil
}

8
src/go.mod Normal file
View File

@@ -0,0 +1,8 @@
module gitea.deepblack.cloud/chorus/bubble
go 1.24.5
require (
github.com/mattn/go-sqlite3 v1.14.31 // indirect
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect
)

4
src/go.sum Normal file
View File

@@ -0,0 +1,4 @@
github.com/mattn/go-sqlite3 v1.14.31 h1:ldt6ghyPJsokUIlksH63gWZkG6qVGeEAu4zLeS4aVZM=
github.com/mattn/go-sqlite3 v1.14.31/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y=
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c h1:g+WoO5jjkqGAzHWCjJB1zZfXPIAaDpzXIEJ0eS6B5Ok=
github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8=

View File

@@ -0,0 +1,7 @@
{
"id": "dr:sha256:new1",
"edges": [
{"relation": "influenced_by", "source": "dr:sha256:old1"}
],
"metadata": {...}
}

View File

@@ -0,0 +1,22 @@
from sentence_transformers import SentenceTransformer
import faiss
import numpy as np
# Load your embedding model once
embedding_model = SentenceTransformer("all-MiniLM-L6-v2")
# Load FAISS index from disk (or create it on startup)
faiss_index = faiss.read_index("dr_faiss.index")
# Map of vector index to DR IDs
dr_id_map = {} # Load from persistent storage
def vector_search(query: str, top_k=3) -> list:
vec = embedding_model.encode([query], convert_to_numpy=True)
D, I = faiss_index.search(vec, top_k)
results = []
for dist, idx in zip(D[0], I[0]):
dr_id = dr_id_map.get(idx)
if dr_id:
results.append({"id": dr_id, "distance": dist})
return results

View File

@@ -0,0 +1,16 @@
def fetch_and_decrypt_dr(dr_id: str) -> dict:
# This should pull the encrypted markdown+metadata envelope from the DHT (UCXL address)
# Then decrypt with your key management system based on role/ACL
# Return the decrypted DR content as dict
# Placeholder:
encrypted_blob = dht_get(dr_id) # Implement your DHT fetch
if not encrypted_blob:
return {}
try:
decrypted = decrypt_blob(encrypted_blob, role=...) # Your crypto module
return json.loads(decrypted)
except Exception as e:
# Log error
return {}

View File

@@ -0,0 +1,47 @@
def build_prompt_for_actions(start_statement, timeline, constraints, role):
prompt = f"""
You are an expert {role} advisor in a software project.
Given the starting decision statement:
"""{start_statement}"""
And the following timeline of relevant prior decisions (summarized):
{chr(10).join(['- ' + t['statement'] for t in timeline])}
Current known constraints and policies:
{chr(10).join(['- ' + c for c in constraints])}
Provide a **structured** list of next recommended actions, including:
- Type (e.g. experiment, research, escalate)
- Description
- Assignee (role or team)
- Confidence (0.0 - 1.0)
Respond ONLY in JSON format as a list of objects with those fields. Do not include any additional commentary.
Example:
[
{{"type": "experiment", "desc": "Run compatibility test with lib X", "assignee": "engineers", "confidence": 0.75}},
{{"type": "escalate", "desc": "Review licensing risk", "assignee": "legal", "confidence": 0.9}}
]
Begin now.
"""
return prompt
def llm_call(model_name: str, prompt: str, max_tokens=400) -> list:
# Wrap your favorite LLM call here, e.g. OpenAI, Ollama, Claude
# Return parsed JSON list or empty list on failure
raw_response = call_llm_api(model_name, prompt, max_tokens=max_tokens)
try:
parsed = json.loads(raw_response)
return parsed
except Exception:
# Log parse failure, return empty
return []

View File

@@ -0,0 +1,44 @@
import requests
import json
from typing import List, Dict, Any
def send_to_n8n_webhook(
webhook_url: str,
role: str,
start_statement: str,
timeline: List[str],
constraints: List[str],
timeout: int = 10,
) -> Dict[str, Any]:
"""
Sends action suggestion request to n8n webhook.
Args:
webhook_url: Full URL to your n8n webhook endpoint.
role: Role requesting advice (e.g. 'engineer', 'pm').
start_statement: Main decision statement string.
timeline: List of up to 5 concise prior decision summaries.
constraints: List of active constraints/policies strings.
timeout: HTTP timeout in seconds.
Returns:
Parsed JSON response from n8n webhook.
Raises:
requests.RequestException for HTTP/network errors.
json.JSONDecodeError if response is not valid JSON.
ValueError if response fails schema validation.
"""
payload = {
"role": role,
"start_statement": start_statement,
"timeline": timeline[:5], # cap to 5
"constraints": constraints,
}
headers = {"Content-Type": "application/json"}
response = requests.post(webhook_url, json=payload, headers=headers, timeout=timeout)
response.raise_for_status()
data = response.json()

View File

@@ -0,0 +1,32 @@
from flask import Flask, request, jsonify
app = Flask(__name__)
@app.route("/n8n-action-suggester", methods=["POST"])
def n8n_action_suggester():
data = request.get_json(force=True)
role = data.get("role")
start_statement = data.get("start_statement")
timeline = data.get("timeline", [])
constraints = data.get("constraints", [])
# Here youd run your prompt builder + LLM call logic
# For demo, return a static sample
response = [
{
"type": "experiment",
"description": "Run compatibility tests for Library X",
"assignee": "engineering",
"confidence": 0.85,
},
{
"type": "escalate",
"description": "Review licensing risk with legal team",
"assignee": "legal",
"confidence": 0.9,
},
]
return jsonify(response)

View File

@@ -0,0 +1,76 @@
{
"name": "Action Suggester Workflow",
"nodes": [
{
"parameters": {},
"id": "1",
"name": "Webhook Trigger",
"type": "n8n-nodes-base.webhook",
"typeVersion": 1,
"position": [250, 300],
"webhookId": "your-webhook-id",
"path": "action-suggester"
},
{
"parameters": {
"requestMethod": "POST",
"url": "http://localhost:5000/n8n-action-suggester",
"jsonParameters": true,
"options": {},
"bodyParametersJson": "={
\"role\": $json[\"role\"],
\"start_statement\": $json[\"start_statement\"],
\"timeline\": $json[\"timeline\"].slice(0,5),
\"constraints\": $json[\"constraints\"]
}"
},
"id": "2",
"name": "Call Action Suggester",
"type": "n8n-nodes-base.httpRequest",
"typeVersion": 1,
"position": [550, 300]
},
{
"parameters": {
"values": {
"string": [
{
"name": "suggested_actions",
"value": "={{JSON.stringify($node[\"Call Action Suggester\"].json)}}"
}
]
},
"options": {}
},
"id": "3",
"name": "Set Output",
"type": "n8n-nodes-base.set",
"typeVersion": 1,
"position": [750, 300]
}
],
"connections": {
"Webhook Trigger": {
"main": [
[
{
"node": "Call Action Suggester",
"type": "main",
"index": 0
}
]
]
},
"Call Action Suggester": {
"main": [
[
{
"node": "Set Output",
"type": "main",
"index": 0
}
]
]
}
}
}

32
src/main.go Normal file
View File

@@ -0,0 +1,32 @@
package main
import (
"fmt"
"log"
"gitea.deepblack.cloud/chorus/bubble/api"
"gitea.deepblack.cloud/chorus/bubble/storage"
)
func main() {
// --- Storage Initialization ---
dbPath := "./bubble_rocksdb"
// Initialize the RocksDB store.
store, err := storage.NewRocksDBStore(dbPath)
if err != nil {
log.Fatalf("Failed to initialize rocksdb store: %v", err)
}
// defer store.Close() // Close the DB when the application exits
fmt.Println("RocksDB store initialized successfully.")
// --- API Server Initialization ---
server := api.NewServer(store)
// Start the server.
port := "8080"
fmt.Printf("Starting BUBBLE Decision Agent on port %s...\n", port)
if err := server.Start(":" + port); err != nil {
log.Fatalf("Failed to start server: %v", err)
}
}

64
src/models/models.go Normal file
View File

@@ -0,0 +1,64 @@
package models
// DecisionBundleRequest defines the structure for a request to the /decision/bundle endpoint.
type DecisionBundleRequest struct {
StartID string `json:"start_id,omitempty"`
Query string `json:"query,omitempty"`
Role string `json:"role"`
MaxHops int `json:"max_hops,omitempty"`
TopK int `json:"top_k,omitempty"`
IncludeFullDR bool `json:"include_full_dr,omitempty"`
Redaction bool `json:"redaction,omitempty"`
}
// DecisionRecordSummary represents a concise summary of a Decision Record.
type DecisionRecordSummary struct {
ID string `json:"id"`
Timestamp string `json:"timestamp"`
Type string `json:"type"`
Statement string `json:"statement"`
Rationale string `json:"rationale,omitempty"`
LifecycleState string `json:"lifecycle_state"`
RelationToStart string `json:"relation_to_start,omitempty"`
Score float64 `json:"score,omitempty"`
Tags []string `json:"tags,omitempty"`
RoleExposure map[string]bool `json:"role_exposure,omitempty"`
EvidenceRefs []string `json:"evidence_refs,omitempty"`
FullPayloadRef string `json:"full_payload_ref,omitempty"`
}
// GoalAlignment indicates how a decision aligns with project goals.
type GoalAlignment struct {
Score float64 `json:"score"`
Reasons []string `json:"reasons,omitempty"`
}
// SuggestedAction defines a recommended next step.
type SuggestedAction struct {
Type string `json:"type"`
Description string `json:"description"`
Assignee string `json:"assignee"`
Confidence float64 `json:"confidence"`
}
// Escalation indicates if a decision requires human attention.
type Escalation struct {
Required bool `json:"required"`
Who []string `json:"who,omitempty"`
}
// DecisionBundleResponse is the structure of the dossier returned by the /decision/bundle endpoint.
type DecisionBundleResponse struct {
BundleID string `json:"bundle_id"`
StartID string `json:"start_id"`
GeneratedAt string `json:"generated_at"`
Summary string `json:"summary"`
Timeline []DecisionRecordSummary `json:"timeline"`
ConstraintsSummary []string `json:"constraints_summary"`
KeyEvidenceRefs []string `json:"key_evidence_refs"`
GoalAlignment GoalAlignment `json:"goal_alignment"`
SuggestedActions []SuggestedAction `json:"suggested_actions"`
Escalation Escalation `json:"escalation"`
Signatures []string `json:"signatures,omitempty"`
CacheHit bool `json:"cache_hit"`
}

View File

@@ -0,0 +1,42 @@
You are an expert {role} advisor in a software project.
Given the starting decision statement:
"""
{start_statement}
"""
And the following recent timeline of relevant prior decisions (each a concise summary):
{timeline}
Current known constraints and policies affecting this decision:
{constraints}
Your task:
Provide a structured list of **recommended next actions** to move the project forward, considering constraints and project goals.
Output **only JSON** as a list of objects, each with:
- "type": one of ["experiment", "research", "escalate", "review", "defer"]
- "description": brief action description
- "assignee": role or team responsible
- "confidence": decimal from 0.0 to 1.0 indicating your confidence
Example output:
[
{{
"type": "experiment",
"description": "Run compatibility tests for Library X",
"assignee": "engineering",
"confidence": 0.85
}},
{{
"type": "escalate",
"description": "Review licensing risk with legal team",
"assignee": "legal",
"confidence": 0.9
}}
]
Do not include any explanation, commentary, or additional text.
If unsure, provide lower confidence scores rather than fabricate details.
Begin output now:

72
src/seed.go Normal file
View File

@@ -0,0 +1,72 @@
package main
import (
"database/sql"
"log"
_ "github.com/mattn/go-sqlite3"
)
func main() {
db, err := sql.Open("sqlite3", "./bubble.db")
if err != nil {
log.Fatalf("Failed to open database: %v", err)
}
defer db.Close()
// Sample Decisions
decisions := []struct {
ID string
Statement string
LifecycleState string
RoleExposure string
Tags string
Timestamp string
}{
{"dr:1", "Adopt Go for new microservices", "active", `{"engineer": true, "pm": true}`, `["language", "backend"]`, "2025-08-12T10:00:00Z"},
{"dr:2", "Use FastAPI for Python services", "superseded", `{"engineer": true}`, `["python", "api"]`, "2025-08-10T11:00:00Z"},
{"dr:3", "Evaluate RocksDB for storage", "active", `{"engineer": true, "research": true}`, `["database", "storage"]`, "2025-08-11T15:00:00Z"},
{"dr:4", "Decision to use Go was influenced by performance needs", "active", `{"pm": true}`, `["performance"]`, "2025-08-12T09:00:00Z"},
}
// Sample Edges (Provenance)
// dr:4 -> dr:1 (dr:4 influenced dr:1)
// dr:2 -> dr:1 (dr:2 was superseded by dr:1)
edges := []struct {
SourceID string
TargetID string
Relation string
}{
{"dr:4", "dr:1", "influences"},
{"dr:2", "dr:1", "supersedes"},
{"dr:3", "dr:4", "influences"},
}
log.Println("Seeding database...")
// Insert Decisions
for _, d := range decisions {
_, err := db.Exec(`
INSERT INTO decisions (id, statement, lifecycle_state, role_exposure, tags, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO NOTHING;
`, d.ID, d.Statement, d.LifecycleState, d.RoleExposure, d.Tags, d.Timestamp)
if err != nil {
log.Fatalf("Failed to insert decision %s: %v", d.ID, err)
}
}
// Insert Edges
for _, e := range edges {
_, err := db.Exec(`
INSERT INTO edges (source_id, target_id, relation)
VALUES (?, ?, ?)
ON CONFLICT(source_id, target_id) DO NOTHING;
`, e.SourceID, e.TargetID, e.Relation)
if err != nil {
log.Fatalf("Failed to insert edge %s -> %s: %v", e.SourceID, e.TargetID, err)
}
}
log.Println("Database seeded successfully.")
}

View File

@@ -0,0 +1,7 @@
// forward_index.json
{
"dr:sha256:prev1": [
{ "relation": "influenced", "target": "dr:sha256:curr1" },
{ "relation": "superseded", "target": "dr:sha256:curr2" }
]
}

View File

@@ -0,0 +1,7 @@
// reverse_index.json
{
"dr:sha256:curr1": [
{ "relation": "influenced_by", "source": "dr:sha256:prev1" },
{ "relation": "derived_from", "source": "dr:sha256:prev2" }
]
}

View File

@@ -0,0 +1,7 @@
{
"id": "dr:sha256:prev1",
"statement": "We rejected library Z due to licensing issues",
"timestamp": "2025-06-10T09:15:00Z",
"lifecycle_state": "rejected",
"role_exposure": {"engineer": true, "pm": true, "research": false}
}

View File

@@ -0,0 +1,7 @@
{
"statement": "...",
"timestamp": "...",
"lifecycle_state": "rejected",
"role_exposure": {"engineer": true, "pm": true},
"tags": ["cloud", "licensing"]
}

View File

@@ -0,0 +1,21 @@
CREATE TABLE decisions (
id TEXT PRIMARY KEY,
statement TEXT,
lifecycle_state TEXT,
role_exposure TEXT, -- JSON
tags TEXT, -- JSON array
timestamp DATETIME
);
CREATE TABLE edges (
source_id TEXT,
target_id TEXT,
relation TEXT,
PRIMARY KEY (source_id, target_id)
);
CREATE TABLE constraints (
id TEXT PRIMARY KEY,
scope TEXT, -- "global" or "role:<role>"
description TEXT
);

View File

@@ -0,0 +1,58 @@
def store_decision(dr_id, metadata, ancestors, descendants):
# RocksDB writes
rocks.put(f"meta:{dr_id}", serialize(metadata))
for anc in ancestors:
rocks.append_list(f"rev:{dr_id}", anc)
rocks.append_list(f"fwd:{anc}", dr_id)
# WAL append for sync
wal.write({
"type": "decision",
"id": dr_id,
"metadata": metadata,
"ancestors": ancestors,
"descendants": descendants
})
def sync_to_sqlite():
while True:
batch = wal.read_batch(limit=100)
if not batch:
break
with sqlite_conn:
for entry in batch:
if entry["type"] == "decision":
# Upsert into decisions table
sqlite_conn.execute("""
INSERT INTO decisions (id, statement, lifecycle_state, role_exposure, tags, timestamp)
VALUES (?, ?, ?, ?, ?, ?)
ON CONFLICT(id) DO UPDATE SET
statement=excluded.statement,
lifecycle_state=excluded.lifecycle_state,
role_exposure=excluded.role_exposure,
tags=excluded.tags,
timestamp=excluded.timestamp
""", (
entry["id"],
entry["metadata"]["statement"],
entry["metadata"]["lifecycle_state"],
json.dumps(entry["metadata"]["role_exposure"]),
json.dumps(entry["metadata"]["tags"]),
entry["metadata"]["timestamp"]
))
# Edges
for anc in entry["ancestors"]:
sqlite_conn.execute("""
INSERT OR IGNORE INTO edges (source_id, target_id, relation)
VALUES (?, ?, ?)
""", (anc, entry["id"], "influences"))
for desc in entry["descendants"]:
sqlite_conn.execute("""
INSERT OR IGNORE INTO edges (source_id, target_id, relation)
VALUES (?, ?, ?)
""", (entry["id"], desc, "influences"))
wal.mark_batch_complete(batch)

78
src/storage/rocksdb.go Normal file
View File

@@ -0,0 +1,78 @@
package storage
import (
"encoding/json"
"gitea.deepblack.cloud/chorus/bubble/models"
"github.com/tecbot/gorocksdb"
)
// RocksDBStore is an implementation of the Storage interface using RocksDB.
type RocksDBStore struct {
DB *gorocksdb.DB
}
// NewRocksDBStore creates and initializes a new RocksDB database.
func NewRocksDBStore(dbPath string) (*RocksDBStore, error) {
opts := gorocksdb.NewDefaultOptions()
opts.SetCreateIfMissing(true)
db, err := gorocksdb.OpenDb(opts, dbPath)
if err != nil {
return nil, err
}
return &RocksDBStore{DB: db}, nil
}
// GetDecisionMetadata retrieves a decision's metadata from RocksDB.
func (r *RocksDBStore) GetDecisionMetadata(drID string) (*models.DecisionRecordSummary, error) {
ro := gorocksdb.NewDefaultReadOptions()
// Keys are stored as "meta:<id>"
key := []byte("meta:" + drID)
slice, err := r.DB.Get(ro, key)
if err != nil {
return nil, err
}
defer slice.Free()
if !slice.Exists() {
return nil, nil // Not found
}
var summary models.DecisionRecordSummary
if err := json.Unmarshal(slice.Data(), &summary); err != nil {
return nil, err
}
return &summary, nil
}
// GetAncestors retrieves a decision's ancestor IDs from RocksDB.
func (r *RocksDBStore) GetAncestors(drID string) ([]string, error) {
ro := gorocksdb.NewDefaultReadOptions()
// Keys are stored as "rev:<id>"
key := []byte("rev:" + drID)
slice, err := r.DB.Get(ro, key)
if err != nil {
return nil, err
}
defer slice.Free()
if !slice.Exists() {
return nil, nil // Not found, no ancestors
}
var ancestorIDs []string
if err := json.Unmarshal(slice.Data(), &ancestorIDs); err != nil {
return nil, err
}
return ancestorIDs, nil
}
// Close closes the RocksDB database connection.
func (r *RocksDBStore) Close() {
if r.DB != nil {
r.DB.Close()
}
}

90
src/storage/sqlite.go Normal file
View File

@@ -0,0 +1,90 @@
package storage
import (
"database/sql"
"encoding/json"
"gitea.deepblack.cloud/chorus/bubble/models"
_ "github.com/mattn/go-sqlite3"
"io/ioutil"
"path/filepath"
)
// SQLiteStore is an implementation of the Storage interface using SQLite.
type SQLiteStore struct {
DB *sql.DB
}
// NewSQLiteStore connects to the SQLite database and returns a new SQLiteStore.
func NewSQLiteStore(dbPath string) (*SQLiteStore, error) {
db, err := sql.Open("sqlite3", dbPath)
if err != nil {
return nil, err
}
if err = db.Ping(); err != nil {
return nil, err
}
return &SQLiteStore{DB: db}, nil
}
// Setup reads the schema file and executes it to create the database tables.
func (s *SQLiteStore) Setup(schemaPath string) error {
schema, err := ioutil.ReadFile(filepath.Clean(schemaPath))
if err != nil {
return err
}
_, err = s.DB.Exec(string(schema))
return err
}
// GetDecisionMetadata retrieves a single decision record's metadata from the database.
func (s *SQLiteStore) GetDecisionMetadata(drID string) (*models.DecisionRecordSummary, error) {
row := s.DB.QueryRow("SELECT id, statement, lifecycle_state, role_exposure, tags, timestamp FROM decisions WHERE id = ?", drID)
var summary models.DecisionRecordSummary
var roleExposureJSON, tagsJSON string
err := row.Scan(
&summary.ID,
&summary.Statement,
&summary.LifecycleState,
&roleExposureJSON,
&tagsJSON,
&summary.Timestamp,
)
if err != nil {
if err == sql.ErrNoRows {
return nil, nil // Return nil, nil if not found
}
return nil, err
}
// Unmarshal JSON fields
if err := json.Unmarshal([]byte(roleExposureJSON), &summary.RoleExposure); err != nil {
return nil, err
}
if err := json.Unmarshal([]byte(tagsJSON), &summary.Tags); err != nil {
return nil, err
}
return &summary, nil
}
// GetAncestors retrieves the IDs of all direct ancestors for a given decision record.
func (s *SQLiteStore) GetAncestors(drID string) ([]string, error) {
rows, err := s.DB.Query("SELECT source_id FROM edges WHERE target_id = ?", drID)
if err != nil {
return nil, err
}
defer rows.Close()
var ancestorIDs []string
for rows.Next() {
var id string
if err := rows.Scan(&id); err != nil {
return nil, err
}
ancestorIDs = append(ancestorIDs, id)
}
return ancestorIDs, nil
}

33
src/storage/storage.go Normal file
View File

@@ -0,0 +1,33 @@
package storage
import "gitea.deepblack.cloud/chorus/bubble/models"
// Storage defines the interface for accessing the decision provenance data.
// This allows for swapping the underlying database implementation.
type Storage interface {
GetDecisionMetadata(drID string) (*models.DecisionRecordSummary, error)
GetAncestors(drID string) ([]string, error)
// Add more methods as needed, e.g., for writing, caching, etc.
}
// RocksDBStore is an implementation of the Storage interface using RocksDB.
type RocksDBStore struct {
// DB *gorocksdb.DB // Placeholder for the actual RocksDB client
}
// NewRocksDBStore creates a new RocksDBStore.
func NewRocksDBStore() (*RocksDBStore, error) {
// Placeholder for RocksDB initialization logic
return &RocksDBStore{}, nil
}
func (r *RocksDBStore) GetDecisionMetadata(drID string) (*models.DecisionRecordSummary, error) {
// Placeholder: Implement logic to fetch metadata from RocksDB.
// This will involve deserializing the data into the models.DecisionRecordSummary struct.
return nil, nil
}
func (r *RocksDBStore) GetAncestors(drID string) ([]string, error) {
// Placeholder: Implement logic to fetch ancestor IDs from the reverse index in RocksDB.
return nil, nil
}

View File

@@ -0,0 +1,39 @@
from fastapi.testclient import TestClient
import pytest
client = TestClient(app)
def test_bundle_requires_start_id_or_query():
response = client.post("/decision/bundle", json={"role": "engineer"})
assert response.status_code == 400
def test_bundle_with_fake_start_id():
req = {
"start_id": "dr:nonexistent",
"role": "engineer",
"max_hops": 1,
"top_k": 1,
"include_full_dr": False,
"redaction": True,
}
response = client.post("/decision/bundle", json=req)
assert response.status_code == 200
data = response.json()
assert data["start_id"] == "dr:nonexistent"
assert "timeline" in data
def test_bundle_cache_hit():
req = {
"start_id": "dr:nonexistent",
"role": "engineer",
"max_hops": 1,
"top_k": 1,
"include_full_dr": False,
"redaction": True,
}
# First call to populate cache
client.post("/decision/bundle", json=req)
# Second call to hit cache
response = client.post("/decision/bundle", json=req)
data = response.json()
assert data.get("cache_hit") is True