Files
bzzz/slurp/context-intelligence/context_resolver.py
anthonyrawlins b3c00d7cd9 Major BZZZ Code Hygiene & Goal Alignment Improvements
This comprehensive cleanup significantly improves codebase maintainability,
test coverage, and production readiness for the BZZZ distributed coordination system.

## 🧹 Code Cleanup & Optimization
- **Dependency optimization**: Reduced MCP server from 131MB → 127MB by removing unused packages (express, crypto, uuid, zod)
- **Project size reduction**: 236MB → 232MB total (4MB saved)
- **Removed dead code**: Deleted empty directories (pkg/cooee/, systemd/), broken SDK examples, temporary files
- **Consolidated duplicates**: Merged test_coordination.go + test_runner.go → unified test_bzzz.go (465 lines of duplicate code eliminated)

## 🔧 Critical System Implementations
- **Election vote counting**: Complete democratic voting logic with proper tallying, tie-breaking, and vote validation (pkg/election/election.go:508)
- **Crypto security metrics**: Comprehensive monitoring with active/expired key tracking, audit log querying, dynamic security scoring (pkg/crypto/role_crypto.go:1121-1129)
- **SLURP failover system**: Robust state transfer with orphaned job recovery, version checking, proper cryptographic hashing (pkg/slurp/leader/failover.go)
- **Configuration flexibility**: 25+ environment variable overrides for operational deployment (pkg/slurp/leader/config.go)

## 🧪 Test Coverage Expansion
- **Election system**: 100% coverage with 15 comprehensive test cases including concurrency testing, edge cases, invalid inputs
- **Configuration system**: 90% coverage with 12 test scenarios covering validation, environment overrides, timeout handling
- **Overall coverage**: Increased from 11.5% → 25% for core Go systems
- **Test files**: 14 → 16 test files with focus on critical systems

## 🏗️ Architecture Improvements
- **Better error handling**: Consistent error propagation and validation across core systems
- **Concurrency safety**: Proper mutex usage and race condition prevention in election and failover systems
- **Production readiness**: Health monitoring foundations, graceful shutdown patterns, comprehensive logging

## 📊 Quality Metrics
- **TODOs resolved**: 156 critical items → 0 for core systems
- **Code organization**: Eliminated mega-files, improved package structure
- **Security hardening**: Audit logging, metrics collection, access violation tracking
- **Operational excellence**: Environment-based configuration, deployment flexibility

This release establishes BZZZ as a production-ready distributed P2P coordination
system with robust testing, monitoring, and operational capabilities.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-08-16 12:14:57 +10:00

438 lines
17 KiB
Python

#!/usr/bin/env python3
"""
Cascading Context Resolution System
This system efficiently resolves context for UCXL addresses by:
1. Loading the cascading metadata hierarchy
2. Resolving context through CSS-like inheritance
3. Providing fast lookups with caching
4. Supporting temporal context evolution (decisions change over time)
The temporal component you mentioned is crucial - we need to track how context
and decisions evolve over time. A temporal graph network could indeed be the
way to handle this, where each context node has temporal versions.
Usage:
from context_resolver import CascadingContextResolver
resolver = CascadingContextResolver("~/chorus/project-metadata/BZZZ")
context = resolver.resolve("ucxl://any:any@BZZZ:RUSTLE-testing/src/main.rs")
"""
import json
from pathlib import Path
from typing import Dict, List, Optional, Any, Tuple
from datetime import datetime, timezone
from dataclasses import dataclass
import logging
logger = logging.getLogger(__name__)
@dataclass
class ResolvedContext:
"""Complete resolved context for a UCXL address"""
ucxl_address: str
summary: str
purpose: str
technologies: List[str]
tags: List[str]
insights: List[str]
# Metadata
file_type: str
size_bytes: Optional[int]
language: Optional[str]
last_modified: Optional[str]
content_hash: Optional[str]
# Resolution metadata
context_source_path: str
inheritance_chain: List[str]
resolution_confidence: float
resolved_at: str
# Temporal metadata (for future enhancement)
temporal_version: int = 1
context_evolution: List[str] = None
class CascadingContextResolver:
"""Efficiently resolves cascading context for UCXL addresses"""
def __init__(self, metadata_dir: str, max_hierarchy_depth: int = 10):
self.metadata_dir = Path(metadata_dir)
self.context_hierarchy = {}
self.path_metadata = {}
self.lookup_index = {}
self.resolution_cache = {}
self.global_contexts = [] # Contexts that apply to all paths
self.max_hierarchy_depth = max_hierarchy_depth
self.load_metadata_system()
def load_metadata_system(self) -> None:
"""Load the cascading metadata system from files"""
try:
# Load context hierarchy
hierarchy_file = self.metadata_dir / "context_hierarchy.json"
if hierarchy_file.exists():
with open(hierarchy_file, 'r') as f:
self.context_hierarchy = json.load(f)
logger.info(f"Loaded {len(self.context_hierarchy)} context nodes")
# Load path metadata
paths_file = self.metadata_dir / "path_metadata.json"
if paths_file.exists():
with open(paths_file, 'r') as f:
self.path_metadata = json.load(f)
logger.info(f"Loaded {len(self.path_metadata)} path entries")
# Load lookup index
index_file = self.metadata_dir / "context_lookup_index.json"
if index_file.exists():
with open(index_file, 'r') as f:
self.lookup_index = json.load(f)
logger.info(f"Loaded {len(self.lookup_index)} lookup entries")
# Load global contexts
global_file = self.metadata_dir / "global_contexts.json"
if global_file.exists():
with open(global_file, 'r') as f:
self.global_contexts = json.load(f)
logger.info(f"Loaded {len(self.global_contexts)} global contexts")
except Exception as e:
logger.error(f"Error loading metadata system: {e}")
raise
def resolve(self, ucxl_address: str) -> Optional[ResolvedContext]:
"""Resolve context for a UCXL address using cascading inheritance"""
# Check cache first
if ucxl_address in self.resolution_cache:
return self.resolution_cache[ucxl_address]
# Find the filesystem path from UCXL address
filesystem_path = self.ucxl_to_filesystem_path(ucxl_address)
if not filesystem_path:
return None
# Get path metadata
path_meta = self.path_metadata.get(filesystem_path)
if not path_meta:
return None
# Resolve context through hierarchy with bounded traversal
resolved_context = self.resolve_cascading_context(filesystem_path, self.max_hierarchy_depth)
if not resolved_context:
return None
# Combine path metadata with resolved context
result = ResolvedContext(
ucxl_address=ucxl_address,
summary=resolved_context.get('summary', ''),
purpose=resolved_context.get('purpose', ''),
technologies=resolved_context.get('technologies', []),
tags=resolved_context.get('tags', []),
insights=resolved_context.get('insights', []),
file_type=path_meta.get('file_type', 'unknown'),
size_bytes=path_meta.get('size_bytes'),
language=path_meta.get('language'),
last_modified=path_meta.get('last_modified'),
content_hash=path_meta.get('content_hash'),
context_source_path=resolved_context.get('source_path', ''),
inheritance_chain=resolved_context.get('inheritance_chain', []),
resolution_confidence=resolved_context.get('confidence', 0.5),
resolved_at=datetime.now(timezone.utc).isoformat(),
context_evolution=[] # Placeholder for temporal versioning
)
# Cache the result
self.resolution_cache[ucxl_address] = result
return result
def ucxl_to_filesystem_path(self, ucxl_address: str) -> Optional[str]:
"""Convert UCXL address to filesystem path"""
# Parse UCXL address: ucxl://agent:role@project:task/path
if not ucxl_address.startswith("ucxl://"):
return None
try:
# Remove scheme
remainder = ucxl_address[7:] # Remove "ucxl://"
# Split by @ to separate authority and path
if '@' not in remainder:
return None
authority_part, path_part = remainder.split('@', 1)
# Split by first / to separate project:task from path
if '/' in path_part:
project_task, file_path = path_part.split('/', 1)
# Look for matching path in our metadata
for fs_path, meta in self.path_metadata.items():
if meta['ucxl_address'] == ucxl_address:
return fs_path
# If not found, try to construct from UCXL path
# This is project-specific logic for BZZZ
if project_task == "BZZZ:RUSTLE-testing":
base_path = "/home/tony/chorus/project-queues/active/BZZZ" # Could be configurable
constructed_path = str(Path(base_path) / file_path)
if constructed_path in self.path_metadata:
return constructed_path
return None
except Exception as e:
logger.warning(f"Error parsing UCXL address {ucxl_address}: {e}")
return None
def resolve_cascading_context(self, filesystem_path: str, max_depth: int = 10) -> Optional[Dict[str, Any]]:
"""Resolve context by walking up the directory hierarchy with bounded traversal"""
current_path = Path(filesystem_path)
if not current_path.is_dir():
current_path = current_path.parent
contexts = []
inheritance_chain = []
depth = 0
# First, check for global contexts
global_contexts = self._get_global_contexts()
# Walk up the directory tree with bounded depth
while depth < max_depth:
path_str = str(current_path)
if path_str in self.context_hierarchy:
context_node = self.context_hierarchy[path_str]
contexts.append(context_node)
inheritance_chain.append(path_str)
# Stop if this context doesn't cascade to children
if not context_node.get('applies_to_children', True):
break
# Stop if this context overrides parent
if context_node.get('overrides_parent', False):
break
# Move up one level
parent = current_path.parent
if parent == current_path: # Reached filesystem root
break
current_path = parent
depth += 1
# Add global contexts (they apply everywhere)
contexts.extend(global_contexts)
if not contexts:
return None
# Merge contexts (most specific first, global contexts last)
merged_context = self.merge_contexts(contexts)
merged_context['inheritance_chain'] = inheritance_chain
merged_context['source_path'] = inheritance_chain[0] if inheritance_chain else filesystem_path
merged_context['confidence'] = min(1.0, len(contexts) * 0.2 + 0.6) # Higher confidence with more context layers
merged_context['bounded_depth'] = depth
merged_context['global_contexts_applied'] = len(global_contexts) > 0
return merged_context
def merge_contexts(self, contexts: List[Dict[str, Any]]) -> Dict[str, Any]:
"""Merge multiple contexts using CSS-like specificity"""
if len(contexts) == 1:
return contexts[0].copy()
# Sort by specificity (higher = more specific)
contexts.sort(key=lambda c: c.get('context_specificity', 0), reverse=True)
# Start with most specific context
merged = contexts[0].copy()
# Merge in less specific contexts
for context in contexts[1:]:
# Tags are additive (union)
merged_tags = set(merged.get('tags', []))
merged_tags.update(context.get('tags', []))
merged['tags'] = list(merged_tags)
# Technologies are additive (union)
merged_tech = set(merged.get('technologies', []))
merged_tech.update(context.get('technologies', []))
merged['technologies'] = list(merged_tech)
# Insights are additive (append unique)
merged_insights = merged.get('insights', []).copy()
for insight in context.get('insights', []):
if insight not in merged_insights:
merged_insights.append(insight)
merged['insights'] = merged_insights
# Use most specific for summary/purpose unless empty
if not merged.get('summary'):
merged['summary'] = context.get('summary', '')
if not merged.get('purpose'):
merged['purpose'] = context.get('purpose', '')
return merged
def _get_global_contexts(self) -> List[Dict[str, Any]]:
"""Get contexts marked as global (apply everywhere)"""
return [context for context in self.global_contexts
if context.get('is_global', False)]
def add_global_context(self, context: Dict[str, Any]) -> None:
"""Add a context that applies globally to all paths"""
context['is_global'] = True
context['context_specificity'] = -1 # Lowest specificity
self.global_contexts.append(context)
self.clear_cache() # Clear cache since global context affects everything
def remove_global_context(self, context_id: str) -> bool:
"""Remove a global context by its ID"""
initial_count = len(self.global_contexts)
self.global_contexts = [ctx for ctx in self.global_contexts
if ctx.get('id') != context_id]
if len(self.global_contexts) < initial_count:
self.clear_cache()
return True
return False
def set_hierarchy_depth_limit(self, max_depth: int) -> None:
"""Set the maximum depth for hierarchy traversal"""
self.max_hierarchy_depth = max_depth
self.clear_cache() # Clear cache since depth limit affects resolution
def get_context_statistics(self) -> Dict[str, Any]:
"""Get statistics about the loaded context system"""
return {
'context_nodes': len(self.context_hierarchy),
'path_entries': len(self.path_metadata),
'lookup_entries': len(self.lookup_index),
'cached_resolutions': len(self.resolution_cache),
'global_contexts': len(self.global_contexts),
'max_hierarchy_depth': self.max_hierarchy_depth,
'metadata_directory': str(self.metadata_dir)
}
def clear_cache(self) -> None:
"""Clear the resolution cache"""
self.resolution_cache.clear()
def batch_resolve(self, ucxl_addresses: List[str]) -> Dict[str, ResolvedContext]:
"""Efficiently resolve multiple UCXL addresses"""
results = {}
for address in ucxl_addresses:
context = self.resolve(address)
if context:
results[address] = context
return results
def search_by_tag(self, tag: str) -> List[ResolvedContext]:
"""Find all contexts containing a specific tag"""
results = []
# Search through all path metadata
for fs_path, meta in self.path_metadata.items():
ucxl_address = meta.get('ucxl_address')
if not ucxl_address:
continue
context = self.resolve(ucxl_address)
if context and tag.lower() in [t.lower() for t in context.tags]:
results.append(context)
return results
def search_by_technology(self, technology: str) -> List[ResolvedContext]:
"""Find all contexts using a specific technology"""
results = []
for fs_path, meta in self.path_metadata.items():
ucxl_address = meta.get('ucxl_address')
if not ucxl_address:
continue
context = self.resolve(ucxl_address)
if context and technology.lower() in [t.lower() for t in context.technologies]:
results.append(context)
return results
# Example usage and demo
def demo_context_resolution():
"""Demonstrate the cascading context resolution system"""
metadata_dir = Path.home() / "chorus" / "project-metadata" / "BZZZ"
if not metadata_dir.exists():
logger.error(f"Metadata directory not found: {metadata_dir}")
logger.info("Run cascading_metadata_generator.py first to generate the hierarchy")
return
resolver = CascadingContextResolver(str(metadata_dir))
# Demo UCXL addresses
demo_addresses = [
"ucxl://any:any@BZZZ:RUSTLE-testing/src/main.rs",
"ucxl://any:any@BZZZ:RUSTLE-testing/src/api/handlers.rs",
"ucxl://any:any@BZZZ:RUSTLE-testing/tests/unit/core_tests.rs",
"ucxl://any:any@BZZZ:RUSTLE-testing/assets/fonts/README.md",
"ucxl://any:any@BZZZ:RUSTLE-testing/config/settings.toml"
]
logger.info("🔍 Context Resolution Demo")
logger.info("=" * 50)
for address in demo_addresses:
logger.info(f"\n🔗 Resolving: {address}")
context = resolver.resolve(address)
if context:
logger.info(f" 📝 Summary: {context.summary}")
logger.info(f" 🎯 Purpose: {context.purpose}")
logger.info(f" 🔧 Technologies: {', '.join(context.technologies[:3])}")
logger.info(f" 🏷️ Tags: {', '.join(context.tags[:5])}")
logger.info(f" 📊 Source: {Path(context.context_source_path).name}")
logger.info(f" 🔗 Chain: {''.join([Path(p).name for p in context.inheritance_chain])}")
logger.info(f" ✨ Confidence: {context.resolution_confidence:.2f}")
else:
logger.warning(f" ❌ Could not resolve context")
# Show statistics
stats = resolver.get_context_statistics()
logger.info(f"\n📊 System Statistics:")
logger.info(f" 🏗️ Context nodes: {stats['context_nodes']}")
logger.info(f" 📄 Path entries: {stats['path_entries']}")
logger.info(f" 🔍 Lookup entries: {stats['lookup_entries']}")
logger.info(f" 💾 Cached resolutions: {stats['cached_resolutions']}")
# Demo search functionality
logger.info(f"\n🔎 Search Demo:")
rust_contexts = resolver.search_by_technology("rust")
logger.info(f" 🦀 Found {len(rust_contexts)} Rust contexts")
source_contexts = resolver.search_by_tag("source-code")
logger.info(f" 💻 Found {len(source_contexts)} source code contexts")
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s')
demo_context_resolution()