#!/usr/bin/env python3 """ Cascading Context Resolution System This system efficiently resolves context for UCXL addresses by: 1. Loading the cascading metadata hierarchy 2. Resolving context through CSS-like inheritance 3. Providing fast lookups with caching 4. Supporting temporal context evolution (decisions change over time) The temporal component you mentioned is crucial - we need to track how context and decisions evolve over time. A temporal graph network could indeed be the way to handle this, where each context node has temporal versions. Usage: from context_resolver import CascadingContextResolver resolver = CascadingContextResolver("~/chorus/project-metadata/BZZZ") context = resolver.resolve("ucxl://any:any@BZZZ:RUSTLE-testing/src/main.rs") """ import json from pathlib import Path from typing import Dict, List, Optional, Any, Tuple from datetime import datetime, timezone from dataclasses import dataclass import logging logger = logging.getLogger(__name__) @dataclass class ResolvedContext: """Complete resolved context for a UCXL address""" ucxl_address: str summary: str purpose: str technologies: List[str] tags: List[str] insights: List[str] # Metadata file_type: str size_bytes: Optional[int] language: Optional[str] last_modified: Optional[str] content_hash: Optional[str] # Resolution metadata context_source_path: str inheritance_chain: List[str] resolution_confidence: float resolved_at: str # Temporal metadata (for future enhancement) temporal_version: int = 1 context_evolution: List[str] = None class CascadingContextResolver: """Efficiently resolves cascading context for UCXL addresses""" def __init__(self, metadata_dir: str, max_hierarchy_depth: int = 10): self.metadata_dir = Path(metadata_dir) self.context_hierarchy = {} self.path_metadata = {} self.lookup_index = {} self.resolution_cache = {} self.global_contexts = [] # Contexts that apply to all paths self.max_hierarchy_depth = max_hierarchy_depth self.load_metadata_system() def load_metadata_system(self) -> None: """Load the cascading metadata system from files""" try: # Load context hierarchy hierarchy_file = self.metadata_dir / "context_hierarchy.json" if hierarchy_file.exists(): with open(hierarchy_file, 'r') as f: self.context_hierarchy = json.load(f) logger.info(f"Loaded {len(self.context_hierarchy)} context nodes") # Load path metadata paths_file = self.metadata_dir / "path_metadata.json" if paths_file.exists(): with open(paths_file, 'r') as f: self.path_metadata = json.load(f) logger.info(f"Loaded {len(self.path_metadata)} path entries") # Load lookup index index_file = self.metadata_dir / "context_lookup_index.json" if index_file.exists(): with open(index_file, 'r') as f: self.lookup_index = json.load(f) logger.info(f"Loaded {len(self.lookup_index)} lookup entries") # Load global contexts global_file = self.metadata_dir / "global_contexts.json" if global_file.exists(): with open(global_file, 'r') as f: self.global_contexts = json.load(f) logger.info(f"Loaded {len(self.global_contexts)} global contexts") except Exception as e: logger.error(f"Error loading metadata system: {e}") raise def resolve(self, ucxl_address: str) -> Optional[ResolvedContext]: """Resolve context for a UCXL address using cascading inheritance""" # Check cache first if ucxl_address in self.resolution_cache: return self.resolution_cache[ucxl_address] # Find the filesystem path from UCXL address filesystem_path = self.ucxl_to_filesystem_path(ucxl_address) if not filesystem_path: return None # Get path metadata path_meta = self.path_metadata.get(filesystem_path) if not path_meta: return None # Resolve context through hierarchy with bounded traversal resolved_context = self.resolve_cascading_context(filesystem_path, self.max_hierarchy_depth) if not resolved_context: return None # Combine path metadata with resolved context result = ResolvedContext( ucxl_address=ucxl_address, summary=resolved_context.get('summary', ''), purpose=resolved_context.get('purpose', ''), technologies=resolved_context.get('technologies', []), tags=resolved_context.get('tags', []), insights=resolved_context.get('insights', []), file_type=path_meta.get('file_type', 'unknown'), size_bytes=path_meta.get('size_bytes'), language=path_meta.get('language'), last_modified=path_meta.get('last_modified'), content_hash=path_meta.get('content_hash'), context_source_path=resolved_context.get('source_path', ''), inheritance_chain=resolved_context.get('inheritance_chain', []), resolution_confidence=resolved_context.get('confidence', 0.5), resolved_at=datetime.now(timezone.utc).isoformat(), context_evolution=[] # Placeholder for temporal versioning ) # Cache the result self.resolution_cache[ucxl_address] = result return result def ucxl_to_filesystem_path(self, ucxl_address: str) -> Optional[str]: """Convert UCXL address to filesystem path""" # Parse UCXL address: ucxl://agent:role@project:task/path if not ucxl_address.startswith("ucxl://"): return None try: # Remove scheme remainder = ucxl_address[7:] # Remove "ucxl://" # Split by @ to separate authority and path if '@' not in remainder: return None authority_part, path_part = remainder.split('@', 1) # Split by first / to separate project:task from path if '/' in path_part: project_task, file_path = path_part.split('/', 1) # Look for matching path in our metadata for fs_path, meta in self.path_metadata.items(): if meta['ucxl_address'] == ucxl_address: return fs_path # If not found, try to construct from UCXL path # This is project-specific logic for BZZZ if project_task == "BZZZ:RUSTLE-testing": base_path = "/home/tony/chorus/project-queues/active/BZZZ" # Could be configurable constructed_path = str(Path(base_path) / file_path) if constructed_path in self.path_metadata: return constructed_path return None except Exception as e: logger.warning(f"Error parsing UCXL address {ucxl_address}: {e}") return None def resolve_cascading_context(self, filesystem_path: str, max_depth: int = 10) -> Optional[Dict[str, Any]]: """Resolve context by walking up the directory hierarchy with bounded traversal""" current_path = Path(filesystem_path) if not current_path.is_dir(): current_path = current_path.parent contexts = [] inheritance_chain = [] depth = 0 # First, check for global contexts global_contexts = self._get_global_contexts() # Walk up the directory tree with bounded depth while depth < max_depth: path_str = str(current_path) if path_str in self.context_hierarchy: context_node = self.context_hierarchy[path_str] contexts.append(context_node) inheritance_chain.append(path_str) # Stop if this context doesn't cascade to children if not context_node.get('applies_to_children', True): break # Stop if this context overrides parent if context_node.get('overrides_parent', False): break # Move up one level parent = current_path.parent if parent == current_path: # Reached filesystem root break current_path = parent depth += 1 # Add global contexts (they apply everywhere) contexts.extend(global_contexts) if not contexts: return None # Merge contexts (most specific first, global contexts last) merged_context = self.merge_contexts(contexts) merged_context['inheritance_chain'] = inheritance_chain merged_context['source_path'] = inheritance_chain[0] if inheritance_chain else filesystem_path merged_context['confidence'] = min(1.0, len(contexts) * 0.2 + 0.6) # Higher confidence with more context layers merged_context['bounded_depth'] = depth merged_context['global_contexts_applied'] = len(global_contexts) > 0 return merged_context def merge_contexts(self, contexts: List[Dict[str, Any]]) -> Dict[str, Any]: """Merge multiple contexts using CSS-like specificity""" if len(contexts) == 1: return contexts[0].copy() # Sort by specificity (higher = more specific) contexts.sort(key=lambda c: c.get('context_specificity', 0), reverse=True) # Start with most specific context merged = contexts[0].copy() # Merge in less specific contexts for context in contexts[1:]: # Tags are additive (union) merged_tags = set(merged.get('tags', [])) merged_tags.update(context.get('tags', [])) merged['tags'] = list(merged_tags) # Technologies are additive (union) merged_tech = set(merged.get('technologies', [])) merged_tech.update(context.get('technologies', [])) merged['technologies'] = list(merged_tech) # Insights are additive (append unique) merged_insights = merged.get('insights', []).copy() for insight in context.get('insights', []): if insight not in merged_insights: merged_insights.append(insight) merged['insights'] = merged_insights # Use most specific for summary/purpose unless empty if not merged.get('summary'): merged['summary'] = context.get('summary', '') if not merged.get('purpose'): merged['purpose'] = context.get('purpose', '') return merged def _get_global_contexts(self) -> List[Dict[str, Any]]: """Get contexts marked as global (apply everywhere)""" return [context for context in self.global_contexts if context.get('is_global', False)] def add_global_context(self, context: Dict[str, Any]) -> None: """Add a context that applies globally to all paths""" context['is_global'] = True context['context_specificity'] = -1 # Lowest specificity self.global_contexts.append(context) self.clear_cache() # Clear cache since global context affects everything def remove_global_context(self, context_id: str) -> bool: """Remove a global context by its ID""" initial_count = len(self.global_contexts) self.global_contexts = [ctx for ctx in self.global_contexts if ctx.get('id') != context_id] if len(self.global_contexts) < initial_count: self.clear_cache() return True return False def set_hierarchy_depth_limit(self, max_depth: int) -> None: """Set the maximum depth for hierarchy traversal""" self.max_hierarchy_depth = max_depth self.clear_cache() # Clear cache since depth limit affects resolution def get_context_statistics(self) -> Dict[str, Any]: """Get statistics about the loaded context system""" return { 'context_nodes': len(self.context_hierarchy), 'path_entries': len(self.path_metadata), 'lookup_entries': len(self.lookup_index), 'cached_resolutions': len(self.resolution_cache), 'global_contexts': len(self.global_contexts), 'max_hierarchy_depth': self.max_hierarchy_depth, 'metadata_directory': str(self.metadata_dir) } def clear_cache(self) -> None: """Clear the resolution cache""" self.resolution_cache.clear() def batch_resolve(self, ucxl_addresses: List[str]) -> Dict[str, ResolvedContext]: """Efficiently resolve multiple UCXL addresses""" results = {} for address in ucxl_addresses: context = self.resolve(address) if context: results[address] = context return results def search_by_tag(self, tag: str) -> List[ResolvedContext]: """Find all contexts containing a specific tag""" results = [] # Search through all path metadata for fs_path, meta in self.path_metadata.items(): ucxl_address = meta.get('ucxl_address') if not ucxl_address: continue context = self.resolve(ucxl_address) if context and tag.lower() in [t.lower() for t in context.tags]: results.append(context) return results def search_by_technology(self, technology: str) -> List[ResolvedContext]: """Find all contexts using a specific technology""" results = [] for fs_path, meta in self.path_metadata.items(): ucxl_address = meta.get('ucxl_address') if not ucxl_address: continue context = self.resolve(ucxl_address) if context and technology.lower() in [t.lower() for t in context.technologies]: results.append(context) return results # Example usage and demo def demo_context_resolution(): """Demonstrate the cascading context resolution system""" metadata_dir = Path.home() / "chorus" / "project-metadata" / "BZZZ" if not metadata_dir.exists(): logger.error(f"Metadata directory not found: {metadata_dir}") logger.info("Run cascading_metadata_generator.py first to generate the hierarchy") return resolver = CascadingContextResolver(str(metadata_dir)) # Demo UCXL addresses demo_addresses = [ "ucxl://any:any@BZZZ:RUSTLE-testing/src/main.rs", "ucxl://any:any@BZZZ:RUSTLE-testing/src/api/handlers.rs", "ucxl://any:any@BZZZ:RUSTLE-testing/tests/unit/core_tests.rs", "ucxl://any:any@BZZZ:RUSTLE-testing/assets/fonts/README.md", "ucxl://any:any@BZZZ:RUSTLE-testing/config/settings.toml" ] logger.info("šŸ” Context Resolution Demo") logger.info("=" * 50) for address in demo_addresses: logger.info(f"\nšŸ”— Resolving: {address}") context = resolver.resolve(address) if context: logger.info(f" šŸ“ Summary: {context.summary}") logger.info(f" šŸŽÆ Purpose: {context.purpose}") logger.info(f" šŸ”§ Technologies: {', '.join(context.technologies[:3])}") logger.info(f" šŸ·ļø Tags: {', '.join(context.tags[:5])}") logger.info(f" šŸ“Š Source: {Path(context.context_source_path).name}") logger.info(f" šŸ”— Chain: {' → '.join([Path(p).name for p in context.inheritance_chain])}") logger.info(f" ✨ Confidence: {context.resolution_confidence:.2f}") else: logger.warning(f" āŒ Could not resolve context") # Show statistics stats = resolver.get_context_statistics() logger.info(f"\nšŸ“Š System Statistics:") logger.info(f" šŸ—ļø Context nodes: {stats['context_nodes']}") logger.info(f" šŸ“„ Path entries: {stats['path_entries']}") logger.info(f" šŸ” Lookup entries: {stats['lookup_entries']}") logger.info(f" šŸ’¾ Cached resolutions: {stats['cached_resolutions']}") # Demo search functionality logger.info(f"\nšŸ”Ž Search Demo:") rust_contexts = resolver.search_by_technology("rust") logger.info(f" šŸ¦€ Found {len(rust_contexts)} Rust contexts") source_contexts = resolver.search_by_tag("source-code") logger.info(f" šŸ’» Found {len(source_contexts)} source code contexts") if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') demo_context_resolution()