package runtime import ( "fmt" "sync" "time" "chorus.services/bzzz/pkg/ucxl" "chorus.services/bzzz/pubsub" ) // TaskTracker implements the SimpleTaskTracker interface type TaskTracker struct { maxTasks int activeTasks map[string]bool decisionPublisher *ucxl.DecisionPublisher pubsub *pubsub.PubSub nodeID string mutex sync.RWMutex } // NewTaskTracker creates a new task tracker func NewTaskTracker(maxTasks int, nodeID string, ps *pubsub.PubSub) SimpleTaskTracker { return &TaskTracker{ maxTasks: maxTasks, activeTasks: make(map[string]bool), pubsub: ps, nodeID: nodeID, } } // GetActiveTasks returns list of active task IDs func (t *TaskTracker) GetActiveTasks() []string { t.mutex.RLock() defer t.mutex.RUnlock() tasks := make([]string, 0, len(t.activeTasks)) for taskID := range t.activeTasks { tasks = append(tasks, taskID) } return tasks } // GetMaxTasks returns maximum number of concurrent tasks func (t *TaskTracker) GetMaxTasks() int { return t.maxTasks } // AddTask marks a task as active func (t *TaskTracker) AddTask(taskID string) { t.mutex.Lock() defer t.mutex.Unlock() t.activeTasks[taskID] = true } // RemoveTask marks a task as completed and publishes decision if publisher available func (t *TaskTracker) RemoveTask(taskID string) { t.mutex.Lock() defer t.mutex.Unlock() delete(t.activeTasks, taskID) // Publish task completion decision if publisher is available if t.decisionPublisher != nil { go t.publishTaskCompletion(taskID, true, "Task completed successfully", nil) } } // CompleteTaskWithDecision marks a task as completed and publishes detailed decision func (t *TaskTracker) CompleteTaskWithDecision(taskID string, success bool, summary string, filesModified []string) { t.mutex.Lock() defer t.mutex.Unlock() delete(t.activeTasks, taskID) // Publish task completion decision if publisher is available if t.decisionPublisher != nil { go t.publishTaskCompletion(taskID, success, summary, filesModified) } } // SetDecisionPublisher sets the decision publisher for task completion tracking func (t *TaskTracker) SetDecisionPublisher(publisher *ucxl.DecisionPublisher) { t.mutex.Lock() defer t.mutex.Unlock() t.decisionPublisher = publisher } // publishTaskCompletion publishes a task completion decision to DHT func (t *TaskTracker) publishTaskCompletion(taskID string, success bool, summary string, filesModified []string) { if t.decisionPublisher == nil { return } if err := t.decisionPublisher.PublishTaskCompletion(taskID, success, summary, filesModified); err != nil { fmt.Printf("⚠️ Failed to publish task completion for %s: %v\n", taskID, err) } else { fmt.Printf("📤 Published task completion decision for: %s\n", taskID) } } // IsAvailable returns whether the tracker can accept new tasks func (t *TaskTracker) IsAvailable() bool { t.mutex.RLock() defer t.mutex.RUnlock() return len(t.activeTasks) < t.maxTasks } // GetStatus returns the current status string func (t *TaskTracker) GetStatus() string { t.mutex.RLock() defer t.mutex.RUnlock() currentTasks := len(t.activeTasks) if currentTasks >= t.maxTasks { return "busy" } else if currentTasks > 0 { return "working" } return "ready" } // AnnounceAvailability starts a goroutine that broadcasts current working status func (t *TaskTracker) AnnounceAvailability() { if t.pubsub == nil { return } go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for range ticker.C { t.mutex.RLock() currentTasks := t.GetActiveTasks() maxTasks := t.maxTasks isAvailable := len(currentTasks) < maxTasks status := t.GetStatus() t.mutex.RUnlock() availability := map[string]interface{}{ "node_id": t.nodeID, "available_for_work": isAvailable, "current_tasks": len(currentTasks), "max_tasks": maxTasks, "last_activity": time.Now().Unix(), "status": status, "timestamp": time.Now().Unix(), } if err := t.pubsub.PublishBzzzMessage(pubsub.AvailabilityBcast, availability); err != nil { fmt.Printf("❌ Failed to announce availability: %v\n", err) } } }() } // CapabilityAnnouncer handles capability announcements type CapabilityAnnouncer struct { pubsub *pubsub.PubSub nodeID string logger interface{} // Using interface to avoid import cycles } // NewCapabilityAnnouncer creates a new capability announcer func NewCapabilityAnnouncer(ps *pubsub.PubSub, nodeID string) *CapabilityAnnouncer { return &CapabilityAnnouncer{ pubsub: ps, nodeID: nodeID, } } // AnnounceCapabilitiesOnChange announces capabilities only when they change func (ca *CapabilityAnnouncer) AnnounceCapabilitiesOnChange(services *RuntimeServices) { if ca.pubsub == nil || services == nil || services.Config == nil { return } cfg := services.Config // Get current capabilities currentCaps := map[string]interface{}{ "node_id": ca.nodeID, "capabilities": cfg.Agent.Capabilities, "models": cfg.Agent.Models, "version": "0.2.0", "specialization": cfg.Agent.Specialization, } // Load stored capabilities from file storedCaps, err := ca.loadStoredCapabilities(ca.nodeID) if err != nil { fmt.Printf("📄 No stored capabilities found, treating as first run\n") storedCaps = nil } // Check if capabilities have changed if ca.capabilitiesChanged(currentCaps, storedCaps) { fmt.Printf("🔄 Capabilities changed, broadcasting update\n") currentCaps["timestamp"] = time.Now().Unix() currentCaps["reason"] = ca.getChangeReason(currentCaps, storedCaps) // Broadcast the change if err := ca.pubsub.PublishBzzzMessage(pubsub.CapabilityBcast, currentCaps); err != nil { fmt.Printf("❌ Failed to announce capabilities: %v", err) } else { // Store new capabilities if err := ca.storeCapabilities(ca.nodeID, currentCaps); err != nil { fmt.Printf("❌ Failed to store capabilities: %v", err) } } } else { fmt.Printf("✅ Capabilities unchanged since last run\n") } } // AnnounceRoleOnStartup announces the agent's role when starting up func (ca *CapabilityAnnouncer) AnnounceRoleOnStartup(services *RuntimeServices) { if ca.pubsub == nil || services == nil || services.Config == nil { return } cfg := services.Config if cfg.Agent.Role == "" { return // No role to announce } roleData := map[string]interface{}{ "node_id": ca.nodeID, "role": cfg.Agent.Role, "expertise": cfg.Agent.Expertise, "reports_to": cfg.Agent.ReportsTo, "deliverables": cfg.Agent.Deliverables, "capabilities": cfg.Agent.Capabilities, "specialization": cfg.Agent.Specialization, "timestamp": time.Now().Unix(), "status": "online", } opts := pubsub.MessageOptions{ FromRole: cfg.Agent.Role, RequiredExpertise: cfg.Agent.Expertise, Priority: "medium", } if err := ca.pubsub.PublishRoleBasedMessage(pubsub.RoleAnnouncement, roleData, opts); err != nil { fmt.Printf("❌ Failed to announce role: %v", err) } else { fmt.Printf("📢 Role announced: %s\n", cfg.Agent.Role) } } // Placeholder implementations for capability storage and comparison // These would be implemented similarly to the main.go versions func (ca *CapabilityAnnouncer) loadStoredCapabilities(nodeID string) (map[string]interface{}, error) { // Implementation moved from main.go return nil, fmt.Errorf("not implemented") } func (ca *CapabilityAnnouncer) storeCapabilities(nodeID string, capabilities map[string]interface{}) error { // Implementation moved from main.go return fmt.Errorf("not implemented") } func (ca *CapabilityAnnouncer) capabilitiesChanged(current, stored map[string]interface{}) bool { // Implementation moved from main.go return true // Always announce for now } func (ca *CapabilityAnnouncer) getChangeReason(current, stored map[string]interface{}) string { // Implementation moved from main.go if stored == nil { return "startup" } return "unknown_change" } // StatusReporter provides periodic status updates type StatusReporter struct { node interface{} // P2P node interface logger interface{} // Logger interface } // NewStatusReporter creates a new status reporter func NewStatusReporter(node interface{}) *StatusReporter { return &StatusReporter{ node: node, } } // Start begins periodic status reporting func (sr *StatusReporter) Start() { go func() { ticker := time.NewTicker(30 * time.Second) defer ticker.Stop() for range ticker.C { // This would call the actual node's ConnectedPeers method // peers := sr.node.ConnectedPeers() // fmt.Printf("📊 Status: %d connected peers\n", peers) fmt.Printf("📊 Status: periodic update\n") } }() }