#!/usr/bin/env python3 """ Temporal Context Evolution System Implements a temporal graph network for tracking how context and decisions evolve over time. Key concepts: - Temporal Nodes: Context at specific points in time - Decision Points: When context changes due to decisions/events - Context Evolution: How understanding deepens over time - Temporal Queries: Query context "as it was" at any point in time - Decision Tracking: Why context changed (git commits, design decisions, etc.) This addresses your insight that decisions change over time and we need to track the evolution of understanding and context within the project. Architecture: ``` TemporalContextNode ├── timestamp: when this context was valid ├── context_data: the actual context at this time ├── change_reason: why this context changed ├── parent_version: previous version of this context ├── decision_metadata: what decision caused this change └── confidence_evolution: how confidence changed over time ``` """ import json from pathlib import Path from typing import Dict, List, Optional, Any, Tuple, Set from datetime import datetime, timezone, timedelta from dataclasses import dataclass, asdict from enum import Enum import logging import hashlib logger = logging.getLogger(__name__) class ContextChangeReason(Enum): """Reasons why context might change over time""" INITIAL_CREATION = "initial_creation" CODE_CHANGE = "code_change" DESIGN_DECISION = "design_decision" REFACTORING = "refactoring" ARCHITECTURE_CHANGE = "architecture_change" REQUIREMENTS_CHANGE = "requirements_change" LEARNING_EVOLUTION = "learning_evolution" # Understanding improved RAG_ENHANCEMENT = "rag_enhancement" # RAG provided better insights TEAM_INPUT = "team_input" # Team provided corrections/additions BUG_DISCOVERY = "bug_discovery" # Found issues that change understanding PERFORMANCE_INSIGHT = "performance_insight" # Performance analysis changed understanding SECURITY_REVIEW = "security_review" # Security analysis added context @dataclass class DecisionMetadata: """Metadata about a decision that changed context""" decision_maker: str # Who made the decision (person, system, process) decision_id: Optional[str] # Git commit hash, ticket ID, etc. decision_rationale: str # Why the decision was made impact_scope: str # Local, module, project, system confidence_level: float # How confident we are in this decision external_references: List[str] # Links to PRs, issues, docs, etc. @dataclass class TemporalContextNode: """A context node at a specific point in time""" ucxl_address: str timestamp: str # ISO format version: int # Monotonic version number # Core context data summary: str purpose: str technologies: List[str] tags: List[str] insights: List[str] # Temporal metadata change_reason: ContextChangeReason parent_version: Optional[int] # Previous version decision_metadata: Optional[DecisionMetadata] # Evolution tracking context_hash: str # Hash of the context content confidence_score: float # How confident we are in this context staleness_indicator: float # How likely this context is outdated # Graph relationships influences: List[str] # Other UCXL addresses this context influences influenced_by: List[str] # Other UCXL addresses that influence this # Validation metadata validated_by: List[str] # Who/what validated this context last_validation: Optional[str] # When it was last validated class TemporalContextGraph: """Manages the temporal evolution of context across the project""" def __init__(self, metadata_base: str, project_name: str): self.metadata_base = Path(metadata_base) self.project_name = project_name self.temporal_dir = self.metadata_base / project_name / "temporal" self.temporal_dir.mkdir(parents=True, exist_ok=True) # In-memory graph self.temporal_nodes: Dict[str, List[TemporalContextNode]] = {} # ucxl_address -> [nodes] self.decision_log: List[DecisionMetadata] = [] self.influence_graph: Dict[str, Set[str]] = {} # ucxl_address -> influenced addresses self.load_temporal_data() def create_initial_context(self, ucxl_address: str, context_data: Dict[str, Any], creator: str = "system") -> TemporalContextNode: """Create the first version of context for a UCXL address""" decision = DecisionMetadata( decision_maker=creator, decision_id=None, decision_rationale="Initial context creation", impact_scope="local", confidence_level=0.7, external_references=[] ) node = TemporalContextNode( ucxl_address=ucxl_address, timestamp=datetime.now(timezone.utc).isoformat(), version=1, summary=context_data.get('summary', ''), purpose=context_data.get('purpose', ''), technologies=context_data.get('technologies', []), tags=context_data.get('tags', []), insights=context_data.get('insights', []), change_reason=ContextChangeReason.INITIAL_CREATION, parent_version=None, decision_metadata=decision, context_hash=self.calculate_context_hash(context_data), confidence_score=0.7, staleness_indicator=0.0, influences=[], influenced_by=[], validated_by=[creator], last_validation=datetime.now(timezone.utc).isoformat() ) # Add to graph if ucxl_address not in self.temporal_nodes: self.temporal_nodes[ucxl_address] = [] self.temporal_nodes[ucxl_address].append(node) self.decision_log.append(decision) return node def evolve_context(self, ucxl_address: str, new_context_data: Dict[str, Any], change_reason: ContextChangeReason, decision_metadata: DecisionMetadata) -> TemporalContextNode: """Evolve context for a UCXL address - create new temporal version""" # Get current version current_version = self.get_latest_version(ucxl_address) if not current_version: return self.create_initial_context(ucxl_address, new_context_data, decision_metadata.decision_maker) # Calculate changes and impact context_hash = self.calculate_context_hash(new_context_data) confidence_evolution = self.calculate_confidence_evolution(current_version, new_context_data, change_reason) # Create new version new_node = TemporalContextNode( ucxl_address=ucxl_address, timestamp=datetime.now(timezone.utc).isoformat(), version=current_version.version + 1, summary=new_context_data.get('summary', current_version.summary), purpose=new_context_data.get('purpose', current_version.purpose), technologies=new_context_data.get('technologies', current_version.technologies), tags=new_context_data.get('tags', current_version.tags), insights=new_context_data.get('insights', current_version.insights), change_reason=change_reason, parent_version=current_version.version, decision_metadata=decision_metadata, context_hash=context_hash, confidence_score=confidence_evolution, staleness_indicator=0.0, # Fresh context influences=current_version.influences.copy(), influenced_by=current_version.influenced_by.copy(), validated_by=[decision_metadata.decision_maker], last_validation=datetime.now(timezone.utc).isoformat() ) self.temporal_nodes[ucxl_address].append(new_node) self.decision_log.append(decision_metadata) # Propagate influence self.propagate_context_change(ucxl_address, change_reason, decision_metadata) return new_node def calculate_context_hash(self, context_data: Dict[str, Any]) -> str: """Calculate hash of context content for change detection""" content = json.dumps(context_data, sort_keys=True) return hashlib.sha256(content.encode()).hexdigest()[:16] def calculate_confidence_evolution(self, current: TemporalContextNode, new_data: Dict[str, Any], reason: ContextChangeReason) -> float: """Calculate how confidence should evolve based on the type of change""" base_confidence = current.confidence_score # Confidence adjustments based on change reason confidence_adjustments = { ContextChangeReason.RAG_ENHANCEMENT: 0.1, # RAG usually improves confidence ContextChangeReason.TEAM_INPUT: 0.15, # Human input is valuable ContextChangeReason.LEARNING_EVOLUTION: 0.1, # Learning improves understanding ContextChangeReason.DESIGN_DECISION: 0.05, # Decisions clarify purpose ContextChangeReason.ARCHITECTURE_CHANGE: -0.05, # Major changes create uncertainty ContextChangeReason.REQUIREMENTS_CHANGE: -0.1, # Requirements changes create uncertainty ContextChangeReason.BUG_DISCOVERY: -0.15, # Bugs indicate misunderstanding ContextChangeReason.CODE_CHANGE: 0.02, # Code changes slightly improve confidence ContextChangeReason.REFACTORING: 0.05, # Refactoring clarifies intent } adjustment = confidence_adjustments.get(reason, 0.0) # Additional confidence boost if multiple sources agree if len(new_data.get('insights', [])) > len(current.insights): adjustment += 0.05 return min(1.0, max(0.1, base_confidence + adjustment)) def propagate_context_change(self, changed_address: str, reason: ContextChangeReason, decision: DecisionMetadata) -> None: """Propagate context changes to influenced addresses""" # Get addresses influenced by this change influenced = self.influence_graph.get(changed_address, set()) for influenced_address in influenced: # Increase staleness indicator for influenced contexts latest = self.get_latest_version(influenced_address) if latest: # Create a "staleness update" - not a full context change staleness_increase = self.calculate_staleness_impact(reason, decision.impact_scope) # This could trigger a re-analysis of the influenced context logger.info(f"Context change in {changed_address} affects {influenced_address} (staleness +{staleness_increase:.2f})") def calculate_staleness_impact(self, reason: ContextChangeReason, impact_scope: str) -> float: """Calculate how much a change affects staleness of related contexts""" base_staleness = { "local": 0.1, "module": 0.3, "project": 0.5, "system": 0.8 }.get(impact_scope, 0.2) reason_multipliers = { ContextChangeReason.ARCHITECTURE_CHANGE: 2.0, ContextChangeReason.REQUIREMENTS_CHANGE: 1.5, ContextChangeReason.REFACTORING: 1.2, ContextChangeReason.CODE_CHANGE: 1.0, ContextChangeReason.DESIGN_DECISION: 1.3, } multiplier = reason_multipliers.get(reason, 1.0) return base_staleness * multiplier def get_latest_version(self, ucxl_address: str) -> Optional[TemporalContextNode]: """Get the most recent version of context for an address""" versions = self.temporal_nodes.get(ucxl_address, []) if not versions: return None return max(versions, key=lambda n: n.version) def get_version_at_time(self, ucxl_address: str, target_time: datetime) -> Optional[TemporalContextNode]: """Get the context as it was at a specific point in time""" versions = self.temporal_nodes.get(ucxl_address, []) if not versions: return None # Find the latest version before or at the target time target_iso = target_time.isoformat() valid_versions = [v for v in versions if v.timestamp <= target_iso] if not valid_versions: return None return max(valid_versions, key=lambda n: n.timestamp) def get_context_evolution(self, ucxl_address: str) -> List[TemporalContextNode]: """Get the complete evolution history of a context""" versions = self.temporal_nodes.get(ucxl_address, []) return sorted(versions, key=lambda n: n.version) def add_influence_relationship(self, influencer: str, influenced: str, relationship_type: str = "affects") -> None: """Add an influence relationship between contexts""" if influencer not in self.influence_graph: self.influence_graph[influencer] = set() self.influence_graph[influencer].add(influenced) # Update the influenced_by lists in the temporal nodes influencer_latest = self.get_latest_version(influencer) influenced_latest = self.get_latest_version(influenced) if influencer_latest and influenced not in influencer_latest.influences: influencer_latest.influences.append(influenced) if influenced_latest and influencer not in influenced_latest.influenced_by: influenced_latest.influenced_by.append(influencer) def analyze_decision_patterns(self) -> Dict[str, Any]: """Analyze patterns in decision-making over time""" analysis = { 'total_decisions': len(self.decision_log), 'decision_makers': {}, 'change_reasons': {}, 'impact_scopes': {}, 'confidence_trends': [], 'decision_frequency': {} # decisions per time period } # Analyze decision makers for decision in self.decision_log: maker = decision.decision_maker analysis['decision_makers'][maker] = analysis['decision_makers'].get(maker, 0) + 1 # Analyze change reasons for address, versions in self.temporal_nodes.items(): for version in versions: reason = version.change_reason.value analysis['change_reasons'][reason] = analysis['change_reasons'].get(reason, 0) + 1 # Analyze impact scopes for decision in self.decision_log: scope = decision.impact_scope analysis['impact_scopes'][scope] = analysis['impact_scopes'].get(scope, 0) + 1 return analysis def find_related_decisions(self, ucxl_address: str, max_hops: int = 3) -> List[Tuple[str, TemporalContextNode, int]]: """Find decisions that are x number of decision-hops away from a given address""" visited = set() decision_queue = [(ucxl_address, 0)] # (address, hop_distance) related_decisions = [] while decision_queue and len(decision_queue) > 0: current_address, hop_distance = decision_queue.pop(0) if current_address in visited or hop_distance > max_hops: continue visited.add(current_address) # Get latest version for this address latest_version = self.get_latest_version(current_address) if latest_version: related_decisions.append((current_address, latest_version, hop_distance)) # Add influenced addresses to queue (1 hop further) for influenced_addr in latest_version.influences: if influenced_addr not in visited: decision_queue.append((influenced_addr, hop_distance + 1)) # Add addresses that influence this one (1 hop further) for influencer_addr in latest_version.influenced_by: if influencer_addr not in visited: decision_queue.append((influencer_addr, hop_distance + 1)) # Sort by hop distance, then by decision recency return sorted(related_decisions, key=lambda x: (x[2], -x[1].version)) def find_decision_path(self, from_address: str, to_address: str) -> Optional[List[Tuple[str, TemporalContextNode]]]: """Find the shortest decision path between two UCXL addresses""" if from_address == to_address: latest = self.get_latest_version(from_address) return [(from_address, latest)] if latest else None visited = set() queue = [(from_address, [])] # (current_address, path) while queue: current_address, path = queue.pop(0) if current_address in visited: continue visited.add(current_address) latest_version = self.get_latest_version(current_address) if not latest_version: continue new_path = path + [(current_address, latest_version)] if current_address == to_address: return new_path # Explore influences (decisions made here affect these addresses) for influenced_addr in latest_version.influences: if influenced_addr not in visited: queue.append((influenced_addr, new_path)) # Explore influenced_by (decisions made there affect this address) for influencer_addr in latest_version.influenced_by: if influencer_addr not in visited: queue.append((influencer_addr, new_path)) return None # No path found def get_decision_timeline(self, ucxl_address: str, include_related: bool = False, max_hops: int = 2) -> Dict[str, Any]: """Get decision timeline showing how context evolved through decisions (not time)""" timeline = { 'primary_address': ucxl_address, 'decision_sequence': [], 'related_decisions': [] if include_related else None } # Get primary decision sequence versions = self.get_context_evolution(ucxl_address) for i, version in enumerate(versions): decision_info = { 'version': version.version, 'decision_hop': i, # Decision distance from initial context 'change_reason': version.change_reason.value, 'decision_maker': version.decision_metadata.decision_maker if version.decision_metadata else 'unknown', 'decision_rationale': version.decision_metadata.decision_rationale if version.decision_metadata else '', 'confidence_evolution': version.confidence_score, 'timestamp': version.timestamp, 'influences_count': len(version.influences), 'influenced_by_count': len(version.influenced_by) } timeline['decision_sequence'].append(decision_info) # Get related decisions if requested if include_related and max_hops > 0: related = self.find_related_decisions(ucxl_address, max_hops) for addr, version, hops in related: if addr != ucxl_address: # Don't include the primary address related_info = { 'address': addr, 'decision_hops': hops, 'latest_version': version.version, 'change_reason': version.change_reason.value, 'decision_maker': version.decision_metadata.decision_maker if version.decision_metadata else 'unknown', 'confidence': version.confidence_score, 'last_decision_timestamp': version.timestamp } timeline['related_decisions'].append(related_info) return timeline def find_stale_contexts(self, staleness_threshold: float = 0.5) -> List[Tuple[str, TemporalContextNode]]: """Find contexts that might be outdated""" stale_contexts = [] current_time = datetime.now(timezone.utc) for address, versions in self.temporal_nodes.items(): latest = self.get_latest_version(address) if not latest: continue # Calculate time-based staleness last_update = datetime.fromisoformat(latest.timestamp.replace('Z', '+00:00')) days_old = (current_time - last_update).days time_staleness = min(1.0, days_old / 30.0) # Max staleness after 30 days total_staleness = latest.staleness_indicator + time_staleness if total_staleness >= staleness_threshold: stale_contexts.append((address, latest)) return sorted(stale_contexts, key=lambda x: x[1].staleness_indicator, reverse=True) def save_temporal_data(self) -> None: """Save temporal graph data to files""" # Save temporal nodes nodes_file = self.temporal_dir / "temporal_nodes.json" nodes_data = {} for address, versions in self.temporal_nodes.items(): nodes_data[address] = [asdict(version) for version in versions] with open(nodes_file, 'w') as f: json.dump(nodes_data, f, indent=2, default=str) # Save decision log decisions_file = self.temporal_dir / "decision_log.json" decisions_data = [asdict(decision) for decision in self.decision_log] with open(decisions_file, 'w') as f: json.dump(decisions_data, f, indent=2, default=str) # Save influence graph influence_file = self.temporal_dir / "influence_graph.json" influence_data = {k: list(v) for k, v in self.influence_graph.items()} with open(influence_file, 'w') as f: json.dump(influence_data, f, indent=2) logger.info(f"Temporal data saved to {self.temporal_dir}") def load_temporal_data(self) -> None: """Load temporal graph data from files""" try: # Load temporal nodes nodes_file = self.temporal_dir / "temporal_nodes.json" if nodes_file.exists(): with open(nodes_file, 'r') as f: nodes_data = json.load(f) for address, versions_data in nodes_data.items(): versions = [] for version_data in versions_data: # Convert change_reason back to enum version_data['change_reason'] = ContextChangeReason(version_data['change_reason']) # Convert decision_metadata back to dataclass if version_data['decision_metadata']: version_data['decision_metadata'] = DecisionMetadata(**version_data['decision_metadata']) versions.append(TemporalContextNode(**version_data)) self.temporal_nodes[address] = versions # Load decision log decisions_file = self.temporal_dir / "decision_log.json" if decisions_file.exists(): with open(decisions_file, 'r') as f: decisions_data = json.load(f) self.decision_log = [DecisionMetadata(**d) for d in decisions_data] # Load influence graph influence_file = self.temporal_dir / "influence_graph.json" if influence_file.exists(): with open(influence_file, 'r') as f: influence_data = json.load(f) self.influence_graph = {k: set(v) for k, v in influence_data.items()} logger.info(f"Loaded temporal data: {len(self.temporal_nodes)} addresses, {len(self.decision_log)} decisions") except Exception as e: logger.warning(f"Error loading temporal data: {e}") # Example usage and integration def demo_temporal_context(): """Demonstrate the temporal context evolution system""" temporal_graph = TemporalContextGraph( metadata_base=str(Path.home() / "chorus" / "project-metadata"), project_name="BZZZ" ) # Example: Context evolution for a main.rs file ucxl_address = "ucxl://any:any@BZZZ:RUSTLE-testing/src/main.rs" # Initial context initial_context = { 'summary': 'Main entry point for BZZZ application', 'purpose': 'Application startup and initialization', 'technologies': ['Rust'], 'tags': ['main', 'entry-point', 'rust'], 'insights': ['Basic application entry point'] } # Create initial version initial_node = temporal_graph.create_initial_context( ucxl_address, initial_context, "system_analysis" ) logger.info(f"Created initial context v{initial_node.version}") # Evolution 1: RAG enhancement rag_decision = DecisionMetadata( decision_maker="rag_system", decision_id="rag_001", decision_rationale="RAG analysis provided enhanced understanding of application structure", impact_scope="module", confidence_level=0.8, external_references=["rag_analysis_log"] ) enhanced_context = { 'summary': 'Main entry point implementing BZZZ distributed system bootstrap', 'purpose': 'Application startup, configuration loading, and P2P network initialization', 'technologies': ['Rust', 'Tokio', 'P2P Networking'], 'tags': ['main', 'entry-point', 'rust', 'distributed-system', 'p2p'], 'insights': [ 'Initializes distributed hash table for metadata storage', 'Sets up P2P networking for node discovery', 'Loads configuration for hybrid mock/real backend switching' ] } enhanced_node = temporal_graph.evolve_context( ucxl_address, enhanced_context, ContextChangeReason.RAG_ENHANCEMENT, rag_decision ) logger.info(f"Enhanced context to v{enhanced_node.version} (confidence: {enhanced_node.confidence_score:.2f})") # Evolution 2: Architecture change arch_decision = DecisionMetadata( decision_maker="tony", decision_id="commit_abc123", decision_rationale="Refactored to use cascading context hierarchy instead of flat metadata", impact_scope="system", confidence_level=0.9, external_references=["github.com/project/commit/abc123"] ) arch_context = enhanced_context.copy() arch_context['insights'].append('Now supports hierarchical context inheritance for efficient metadata storage') arch_context['tags'].append('hierarchical-context') arch_node = temporal_graph.evolve_context( ucxl_address, arch_context, ContextChangeReason.ARCHITECTURE_CHANGE, arch_decision ) logger.info(f"Architecture update to v{arch_node.version} (confidence: {arch_node.confidence_score:.2f})") # Demonstrate decision-hop based temporal analysis logger.info(f"\n📈 Decision Evolution for {ucxl_address}:") # Show decision timeline (decisions, not time) decision_timeline = temporal_graph.get_decision_timeline(ucxl_address, include_related=True, max_hops=2) logger.info(f" 🎯 Primary Decision Sequence:") for decision in decision_timeline['decision_sequence']: logger.info(f" Decision #{decision['decision_hop']}: {decision['change_reason']}") logger.info(f" 👤 By: {decision['decision_maker']}") logger.info(f" 💪 Confidence: {decision['confidence_evolution']:.2f}") logger.info(f" 📊 Influences {decision['influences_count']} addresses") if decision_timeline['related_decisions']: logger.info(f"\n 🔗 Related Decisions (within 2 hops):") for related in decision_timeline['related_decisions'][:3]: # Show first 3 logger.info(f" {related['address']} ({related['decision_hops']} hops away)") logger.info(f" 🔄 Latest: {related['change_reason']} by {related['decision_maker']}") logger.info(f" 📊 Confidence: {related['confidence']:.2f}") # Demonstrate decision path finding sample_addresses = [ "ucxl://any:any@BZZZ:RUSTLE-testing/src/api.rs", "ucxl://any:any@BZZZ:RUSTLE-testing/src/lib.rs" ] # Create additional contexts for demo for addr in sample_addresses: if addr != ucxl_address: context = { 'summary': f'Related component: {addr.split("/")[-1]}', 'purpose': 'Supporting component in BZZZ architecture', 'technologies': ['Rust', 'BZZZ Protocol'], 'tags': ['component', 'rust'], 'insights': [f'Related to main entry point through architectural decisions'] } decision = DecisionMetadata( decision_maker="system_analysis", decision_id="related_001", decision_rationale="Architectural relationship analysis", impact_scope="module", confidence_level=0.7, external_references=[] ) related_node = temporal_graph.evolve_context( addr, context, ContextChangeReason.ARCHITECTURE_CHANGE, decision ) # Create influence relationship temporal_graph.add_influence_relationship(ucxl_address, addr, "architectural_dependency") # Show decision path analysis logger.info(f"\n🛤️ Decision Path Analysis:") for addr in sample_addresses: path = temporal_graph.find_decision_path(ucxl_address, addr) if path: logger.info(f" Path to {addr}:") for i, (path_addr, node) in enumerate(path): logger.info(f" {i+1}. {path_addr.split('/')[-1]} (v{node.version})") # Show related decisions within 2 hops related = temporal_graph.find_related_decisions(addr, max_hops=2) logger.info(f" 📊 {len(related)} decisions within 2 hops of {addr.split('/')[-1]}") # Save the temporal data temporal_graph.save_temporal_data() # Analyze decision patterns (not time patterns) patterns = temporal_graph.analyze_decision_patterns() logger.info(f"\n📊 Decision Network Analysis:") logger.info(f" Total decision nodes: {patterns['total_decisions']}") logger.info(f" Decision makers: {patterns['decision_makers']}") logger.info(f" Change reasons: {patterns['change_reasons']}") logger.info(f" Impact scopes: {patterns['impact_scopes']}") # Show network connectivity total_addresses = len(temporal_graph.temporal_nodes) total_influences = sum(len(node.influences) for versions in temporal_graph.temporal_nodes.values() for node in versions) logger.info(f" 🕸️ Network connectivity: {total_influences} decision influences across {total_addresses} addresses") if __name__ == "__main__": logging.basicConfig(level=logging.INFO, format='%(levelname)s - %(message)s') demo_temporal_context()