package ucxl import ( "context" "encoding/json" "fmt" "log" "time" "chorus.services/bzzz/pkg/config" "chorus.services/bzzz/pkg/storage" ) // DecisionPublisher handles publishing task completion decisions to encrypted DHT storage type DecisionPublisher struct { ctx context.Context config *config.Config dhtStorage storage.UCXLStorage nodeID string agentName string } // NewDecisionPublisher creates a new decision publisher func NewDecisionPublisher( ctx context.Context, config *config.Config, dhtStorage storage.UCXLStorage, nodeID string, agentName string, ) *DecisionPublisher { return &DecisionPublisher{ ctx: ctx, config: config, dhtStorage: dhtStorage, nodeID: nodeID, agentName: agentName, } } // TaskDecision represents a decision made by an agent upon task completion type TaskDecision struct { Agent string `json:"agent"` Role string `json:"role"` Project string `json:"project"` Task string `json:"task"` Decision string `json:"decision"` Context map[string]interface{} `json:"context"` Timestamp time.Time `json:"timestamp"` Success bool `json:"success"` ErrorMessage string `json:"error_message,omitempty"` FilesModified []string `json:"files_modified,omitempty"` LinesChanged int `json:"lines_changed,omitempty"` TestResults *TestResults `json:"test_results,omitempty"` Dependencies []string `json:"dependencies,omitempty"` NextSteps []string `json:"next_steps,omitempty"` } // TestResults captures test execution results type TestResults struct { Passed int `json:"passed"` Failed int `json:"failed"` Skipped int `json:"skipped"` Coverage float64 `json:"coverage,omitempty"` FailedTests []string `json:"failed_tests,omitempty"` } // PublishTaskDecision publishes a task completion decision to the DHT func (dp *DecisionPublisher) PublishTaskDecision(decision *TaskDecision) error { // Ensure required fields if decision.Agent == "" { decision.Agent = dp.agentName } if decision.Role == "" { decision.Role = dp.config.Agent.Role } if decision.Project == "" { decision.Project = "default-project" // TODO: Add project field to config } if decision.Timestamp.IsZero() { decision.Timestamp = time.Now() } log.Printf("📤 Publishing task decision: %s/%s/%s", decision.Agent, decision.Project, decision.Task) // Generate UCXL address ucxlAddress, err := dp.generateUCXLAddress(decision) if err != nil { return fmt.Errorf("failed to generate UCXL address: %w", err) } // Serialize decision content decisionContent, err := json.MarshalIndent(decision, "", " ") if err != nil { return fmt.Errorf("failed to serialize decision: %w", err) } // Store in encrypted DHT err = dp.dhtStorage.StoreUCXLContent( ucxlAddress, decisionContent, decision.Role, "decision", ) if err != nil { return fmt.Errorf("failed to store decision in DHT: %w", err) } // Announce content availability if err := dp.dhtStorage.AnnounceContent(ucxlAddress); err != nil { log.Printf("⚠️ Failed to announce decision content: %v", err) // Don't fail the publish operation for announcement failure } log.Printf("✅ Published task decision: %s", ucxlAddress) return nil } // PublishTaskCompletion publishes a simple task completion without detailed context func (dp *DecisionPublisher) PublishTaskCompletion( taskName string, success bool, summary string, filesModified []string, ) error { decision := &TaskDecision{ Task: taskName, Decision: summary, Success: success, FilesModified: filesModified, Context: map[string]interface{}{ "completion_type": "basic", "node_id": dp.nodeID, }, } return dp.PublishTaskDecision(decision) } // PublishCodeDecision publishes a coding decision with technical context func (dp *DecisionPublisher) PublishCodeDecision( taskName string, decision string, filesModified []string, linesChanged int, testResults *TestResults, dependencies []string, ) error { taskDecision := &TaskDecision{ Task: taskName, Decision: decision, Success: testResults == nil || testResults.Failed == 0, FilesModified: filesModified, LinesChanged: linesChanged, TestResults: testResults, Dependencies: dependencies, Context: map[string]interface{}{ "decision_type": "code", "node_id": dp.nodeID, "language": dp.detectLanguage(filesModified), }, } return dp.PublishTaskDecision(taskDecision) } // PublishArchitecturalDecision publishes a high-level architectural decision func (dp *DecisionPublisher) PublishArchitecturalDecision( taskName string, decision string, rationale string, alternatives []string, implications []string, nextSteps []string, ) error { taskDecision := &TaskDecision{ Task: taskName, Decision: decision, Success: true, NextSteps: nextSteps, Context: map[string]interface{}{ "decision_type": "architecture", "rationale": rationale, "alternatives": alternatives, "implications": implications, "node_id": dp.nodeID, }, } return dp.PublishTaskDecision(taskDecision) } // generateUCXLAddress creates a UCXL address for the decision func (dp *DecisionPublisher) generateUCXLAddress(decision *TaskDecision) (string, error) { address := &Address{ Agent: decision.Agent, Role: decision.Role, Project: decision.Project, Task: decision.Task, TemporalSegment: TemporalSegment{ Type: TemporalLatest, // Latest decision for this agent/role/project/task }, } return address.String(), nil } // detectLanguage attempts to detect the programming language from modified files func (dp *DecisionPublisher) detectLanguage(files []string) string { languageMap := map[string]string{ ".go": "go", ".py": "python", ".js": "javascript", ".ts": "typescript", ".rs": "rust", ".java": "java", ".c": "c", ".cpp": "cpp", ".cs": "csharp", ".php": "php", ".rb": "ruby", ".yaml": "yaml", ".yml": "yaml", ".json": "json", ".md": "markdown", } languageCounts := make(map[string]int) for _, file := range files { for ext, lang := range languageMap { if len(file) > len(ext) && file[len(file)-len(ext):] == ext { languageCounts[lang]++ break } } } // Return the most common language maxCount := 0 primaryLanguage := "unknown" for lang, count := range languageCounts { if count > maxCount { maxCount = count primaryLanguage = lang } } return primaryLanguage } // QueryRecentDecisions retrieves recent decisions from the DHT func (dp *DecisionPublisher) QueryRecentDecisions( agent string, role string, project string, limit int, since time.Time, ) ([]*storage.UCXLMetadata, error) { query := &storage.SearchQuery{ Agent: agent, Role: role, Project: project, ContentType: "decision", CreatedAfter: since, Limit: limit, } return dp.dhtStorage.SearchContent(query) } // GetDecisionContent retrieves and decrypts a specific decision func (dp *DecisionPublisher) GetDecisionContent(ucxlAddress string) (*TaskDecision, error) { content, metadata, err := dp.dhtStorage.RetrieveUCXLContent(ucxlAddress) if err != nil { return nil, fmt.Errorf("failed to retrieve decision content: %w", err) } var decision TaskDecision if err := json.Unmarshal(content, &decision); err != nil { return nil, fmt.Errorf("failed to parse decision content: %w", err) } log.Printf("📥 Retrieved decision: %s (creator: %s)", ucxlAddress, metadata.CreatorRole) return &decision, nil } // SubscribeToDecisions sets up a subscription to new decisions (placeholder for future pubsub) func (dp *DecisionPublisher) SubscribeToDecisions( roleFilter string, callback func(*TaskDecision, *storage.UCXLMetadata), ) error { // This is a placeholder for future pubsub implementation // For now, we'll implement a simple polling mechanism go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() lastCheck := time.Now() for { select { case <-dp.ctx.Done(): return case <-ticker.C: // Query for recent decisions decisions, err := dp.QueryRecentDecisions("", roleFilter, "", 10, lastCheck) if err != nil { log.Printf("⚠️ Failed to query recent decisions: %v", err) continue } // Process new decisions for _, metadata := range decisions { decision, err := dp.GetDecisionContent(metadata.Address) if err != nil { log.Printf("⚠️ Failed to get decision content: %v", err) continue } callback(decision, metadata) } lastCheck = time.Now() } } }() log.Printf("🔔 Subscribed to decisions for role: %s", roleFilter) return nil } // PublishSystemStatus publishes current system status as a decision func (dp *DecisionPublisher) PublishSystemStatus( status string, metrics map[string]interface{}, healthChecks map[string]bool, ) error { decision := &TaskDecision{ Task: "system_status", Decision: status, Success: dp.allHealthChecksPass(healthChecks), Context: map[string]interface{}{ "decision_type": "system", "metrics": metrics, "health_checks": healthChecks, "node_id": dp.nodeID, }, } return dp.PublishTaskDecision(decision) } // allHealthChecksPass checks if all health checks are passing func (dp *DecisionPublisher) allHealthChecksPass(healthChecks map[string]bool) bool { for _, passing := range healthChecks { if !passing { return false } } return true } // GetPublisherMetrics returns metrics about the decision publisher func (dp *DecisionPublisher) GetPublisherMetrics() map[string]interface{} { dhtMetrics := dp.dhtStorage.GetMetrics() return map[string]interface{}{ "node_id": dp.nodeID, "agent_name": dp.agentName, "current_role": dp.config.Agent.Role, "project": "default-project", // TODO: Add project field to config "dht_metrics": dhtMetrics, "last_publish": time.Now(), // This would be tracked in a real implementation } }