package coordinator import ( "context" "fmt" "strings" "sync" "time" "github.com/anthonyrawlins/bzzz/logging" "github.com/anthonyrawlins/bzzz/pkg/config" "github.com/anthonyrawlins/bzzz/pkg/hive" "github.com/anthonyrawlins/bzzz/pubsub" "github.com/anthonyrawlins/bzzz/repository" "github.com/libp2p/go-libp2p/core/peer" ) // TaskCoordinator manages task discovery, assignment, and execution across multiple repositories type TaskCoordinator struct { hiveClient *hive.HiveClient pubsub *pubsub.PubSub hlog *logging.HypercoreLog ctx context.Context config *config.Config // Repository management providers map[int]repository.TaskProvider // projectID -> provider providerLock sync.RWMutex factory repository.ProviderFactory // Task management activeTasks map[string]*ActiveTask // taskKey -> active task taskLock sync.RWMutex taskMatcher repository.TaskMatcher // Agent tracking nodeID string agentInfo *repository.AgentInfo // Sync settings syncInterval time.Duration lastSync map[int]time.Time syncLock sync.RWMutex } // ActiveTask represents a task currently being worked on type ActiveTask struct { Task *repository.Task Provider repository.TaskProvider ProjectID int ClaimedAt time.Time Status string // claimed, working, completed, failed AgentID string Results map[string]interface{} } // NewTaskCoordinator creates a new task coordinator func NewTaskCoordinator( ctx context.Context, hiveClient *hive.HiveClient, ps *pubsub.PubSub, hlog *logging.HypercoreLog, cfg *config.Config, nodeID string, ) *TaskCoordinator { coordinator := &TaskCoordinator{ hiveClient: hiveClient, pubsub: ps, hlog: hlog, ctx: ctx, config: cfg, providers: make(map[int]repository.TaskProvider), activeTasks: make(map[string]*ActiveTask), lastSync: make(map[int]time.Time), factory: &repository.DefaultProviderFactory{}, taskMatcher: &repository.DefaultTaskMatcher{}, nodeID: nodeID, syncInterval: 30 * time.Second, } // Create agent info from config coordinator.agentInfo = &repository.AgentInfo{ ID: cfg.Agent.ID, Role: cfg.Agent.Role, Expertise: cfg.Agent.Expertise, CurrentTasks: 0, MaxTasks: cfg.Agent.MaxTasks, Status: "ready", LastSeen: time.Now(), Performance: 0.8, // Default performance score Availability: 1.0, } return coordinator } // Start begins the task coordination process func (tc *TaskCoordinator) Start() { fmt.Printf("🎯 Starting task coordinator for agent %s (%s)\n", tc.agentInfo.ID, tc.agentInfo.Role) // Announce role and capabilities tc.announceAgentRole() // Start periodic task discovery and sync go tc.taskDiscoveryLoop() // Start role-based message handling tc.pubsub.SetAntennaeMessageHandler(tc.handleRoleMessage) fmt.Printf("✅ Task coordinator started\n") } // taskDiscoveryLoop periodically discovers and processes tasks func (tc *TaskCoordinator) taskDiscoveryLoop() { ticker := time.NewTicker(tc.syncInterval) defer ticker.Stop() for { select { case <-tc.ctx.Done(): return case <-ticker.C: tc.discoverAndProcessTasks() } } } // discoverAndProcessTasks discovers tasks from all repositories and processes them func (tc *TaskCoordinator) discoverAndProcessTasks() { // Get monitored repositories from Hive repositories, err := tc.hiveClient.GetMonitoredRepositories(tc.ctx) if err != nil { fmt.Printf("⚠️ Failed to get monitored repositories: %v\n", err) return } var totalTasks, processedTasks int for _, repo := range repositories { // Skip if repository is not enabled for bzzz if !repo.BzzzEnabled { continue } // Create or get repository provider provider, err := tc.getOrCreateProvider(repo) if err != nil { fmt.Printf("⚠️ Failed to create provider for %s: %v\n", repo.Name, err) continue } // Get available tasks tasks, err := provider.ListAvailableTasks() if err != nil { fmt.Printf("⚠️ Failed to list tasks for %s: %v\n", repo.Name, err) continue } totalTasks += len(tasks) // Filter tasks suitable for this agent suitableTasks, err := tc.taskMatcher.MatchTasksToRole(tasks, tc.agentInfo.Role, tc.agentInfo.Expertise) if err != nil { fmt.Printf("⚠️ Failed to match tasks for role %s: %v\n", tc.agentInfo.Role, err) continue } // Process suitable tasks for _, task := range suitableTasks { if tc.shouldProcessTask(task) { if tc.processTask(task, provider, repo.ID) { processedTasks++ } } } // Update last sync time tc.syncLock.Lock() tc.lastSync[repo.ID] = time.Now() tc.syncLock.Unlock() } if totalTasks > 0 { fmt.Printf("🔍 Discovered %d tasks, processed %d suitable tasks\n", totalTasks, processedTasks) } } // shouldProcessTask determines if we should process a task func (tc *TaskCoordinator) shouldProcessTask(task *repository.Task) bool { // Check if we're already at capacity tc.taskLock.RLock() currentTasks := len(tc.activeTasks) tc.taskLock.RUnlock() if currentTasks >= tc.agentInfo.MaxTasks { return false } // Check if task is already assigned to us taskKey := fmt.Sprintf("%s:%d", task.Repository, task.Number) tc.taskLock.RLock() _, alreadyActive := tc.activeTasks[taskKey] tc.taskLock.RUnlock() if alreadyActive { return false } // Check minimum score threshold score := tc.taskMatcher.ScoreTaskForAgent(task, tc.agentInfo.Role, tc.agentInfo.Expertise) return score > 0.5 // Only process tasks with good fit } // processTask attempts to claim and process a task func (tc *TaskCoordinator) processTask(task *repository.Task, provider repository.TaskProvider, projectID int) bool { taskKey := fmt.Sprintf("%s:%d", task.Repository, task.Number) // Request collaboration if needed if tc.shouldRequestCollaboration(task) { tc.requestTaskCollaboration(task) } // Attempt to claim the task claimedTask, err := provider.ClaimTask(task.Number, tc.agentInfo.ID) if err != nil { fmt.Printf("⚠️ Failed to claim task %s #%d: %v\n", task.Repository, task.Number, err) return false } // Create active task activeTask := &ActiveTask{ Task: claimedTask, Provider: provider, ProjectID: projectID, ClaimedAt: time.Now(), Status: "claimed", AgentID: tc.agentInfo.ID, Results: make(map[string]interface{}), } // Store active task tc.taskLock.Lock() tc.activeTasks[taskKey] = activeTask tc.agentInfo.CurrentTasks = len(tc.activeTasks) tc.taskLock.Unlock() // Log task claim tc.hlog.Append(logging.TaskClaimed, map[string]interface{}{ "task_number": task.Number, "repository": task.Repository, "title": task.Title, "required_role": task.RequiredRole, "priority": task.Priority, }) // Announce task claim tc.announceTaskClaim(task) // Start processing the task go tc.executeTask(activeTask) fmt.Printf("✅ Claimed task %s #%d: %s\n", task.Repository, task.Number, task.Title) return true } // shouldRequestCollaboration determines if we should request collaboration for a task func (tc *TaskCoordinator) shouldRequestCollaboration(task *repository.Task) bool { // Request collaboration for high-priority or complex tasks if task.Priority >= 8 { return true } // Request collaboration if task requires expertise we don't have if len(task.RequiredExpertise) > 0 { for _, required := range task.RequiredExpertise { hasExpertise := false for _, expertise := range tc.agentInfo.Expertise { if strings.EqualFold(required, expertise) { hasExpertise = true break } } if !hasExpertise { return true } } } return false } // requestTaskCollaboration requests collaboration for a task func (tc *TaskCoordinator) requestTaskCollaboration(task *repository.Task) { data := map[string]interface{}{ "task_number": task.Number, "repository": task.Repository, "title": task.Title, "required_role": task.RequiredRole, "required_expertise": task.RequiredExpertise, "priority": task.Priority, "requester_role": tc.agentInfo.Role, "reason": "expertise_gap", } opts := pubsub.MessageOptions{ FromRole: tc.agentInfo.Role, ToRoles: []string{task.RequiredRole}, RequiredExpertise: task.RequiredExpertise, Priority: "high", ThreadID: fmt.Sprintf("task-%s-%d", task.Repository, task.Number), } err := tc.pubsub.PublishRoleBasedMessage(pubsub.TaskHelpRequest, data, opts) if err != nil { fmt.Printf("⚠️ Failed to request collaboration: %v\n", err) } else { fmt.Printf("🤝 Requested collaboration for task %s #%d\n", task.Repository, task.Number) } } // executeTask executes a claimed task func (tc *TaskCoordinator) executeTask(activeTask *ActiveTask) { taskKey := fmt.Sprintf("%s:%d", activeTask.Task.Repository, activeTask.Task.Number) // Update status tc.taskLock.Lock() activeTask.Status = "working" tc.taskLock.Unlock() // Announce work start tc.announceTaskProgress(activeTask.Task, "started") // Simulate task execution (in real implementation, this would call actual execution logic) time.Sleep(10 * time.Second) // Simulate work // Complete the task results := map[string]interface{}{ "status": "completed", "completion_time": time.Now().Format(time.RFC3339), "agent_id": tc.agentInfo.ID, "agent_role": tc.agentInfo.Role, } err := activeTask.Provider.CompleteTask(activeTask.Task.Number, tc.agentInfo.ID, results) if err != nil { fmt.Printf("❌ Failed to complete task %s #%d: %v\n", activeTask.Task.Repository, activeTask.Task.Number, err) // Update status to failed tc.taskLock.Lock() activeTask.Status = "failed" activeTask.Results = map[string]interface{}{"error": err.Error()} tc.taskLock.Unlock() return } // Update status and remove from active tasks tc.taskLock.Lock() activeTask.Status = "completed" activeTask.Results = results delete(tc.activeTasks, taskKey) tc.agentInfo.CurrentTasks = len(tc.activeTasks) tc.taskLock.Unlock() // Log completion tc.hlog.Append(logging.TaskCompleted, map[string]interface{}{ "task_number": activeTask.Task.Number, "repository": activeTask.Task.Repository, "duration": time.Since(activeTask.ClaimedAt).Seconds(), "results": results, }) // Announce completion tc.announceTaskProgress(activeTask.Task, "completed") fmt.Printf("✅ Completed task %s #%d\n", activeTask.Task.Repository, activeTask.Task.Number) } // getOrCreateProvider gets or creates a repository provider func (tc *TaskCoordinator) getOrCreateProvider(repo *hive.MonitoredRepository) (repository.TaskProvider, error) { tc.providerLock.RLock() if provider, exists := tc.providers[repo.ID]; exists { tc.providerLock.RUnlock() return provider, nil } tc.providerLock.RUnlock() // Create new provider config := &repository.Config{ Provider: repo.Provider, BaseURL: repo.ProviderBaseURL, AccessToken: repo.AccessToken, Owner: repo.GitOwner, Repository: repo.GitRepository, TaskLabel: "bzzz-task", InProgressLabel: "in-progress", CompletedLabel: "completed", BaseBranch: repo.GitBranch, BranchPrefix: "bzzz/task-", } provider, err := tc.factory.CreateProvider(tc.ctx, config) if err != nil { return nil, fmt.Errorf("failed to create provider: %w", err) } tc.providerLock.Lock() tc.providers[repo.ID] = provider tc.providerLock.Unlock() return provider, nil } // announceAgentRole announces this agent's role and capabilities func (tc *TaskCoordinator) announceAgentRole() { data := map[string]interface{}{ "agent_id": tc.agentInfo.ID, "node_id": tc.nodeID, "role": tc.agentInfo.Role, "expertise": tc.agentInfo.Expertise, "capabilities": tc.config.Agent.Capabilities, "max_tasks": tc.agentInfo.MaxTasks, "current_tasks": tc.agentInfo.CurrentTasks, "status": tc.agentInfo.Status, "specialization": tc.config.Agent.Specialization, } opts := pubsub.MessageOptions{ FromRole: tc.agentInfo.Role, Priority: "medium", } err := tc.pubsub.PublishRoleBasedMessage(pubsub.RoleAnnouncement, data, opts) if err != nil { fmt.Printf("⚠️ Failed to announce role: %v\n", err) } else { fmt.Printf("📢 Announced role: %s with expertise in %v\n", tc.agentInfo.Role, tc.agentInfo.Expertise) } } // announceTaskClaim announces that this agent has claimed a task func (tc *TaskCoordinator) announceTaskClaim(task *repository.Task) { data := map[string]interface{}{ "task_number": task.Number, "repository": task.Repository, "title": task.Title, "agent_id": tc.agentInfo.ID, "agent_role": tc.agentInfo.Role, "claim_time": time.Now().Format(time.RFC3339), "estimated_completion": time.Now().Add(time.Hour).Format(time.RFC3339), } opts := pubsub.MessageOptions{ FromRole: tc.agentInfo.Role, Priority: "medium", ThreadID: fmt.Sprintf("task-%s-%d", task.Repository, task.Number), } err := tc.pubsub.PublishRoleBasedMessage(pubsub.TaskProgress, data, opts) if err != nil { fmt.Printf("⚠️ Failed to announce task claim: %v\n", err) } } // announceTaskProgress announces task progress updates func (tc *TaskCoordinator) announceTaskProgress(task *repository.Task, status string) { data := map[string]interface{}{ "task_number": task.Number, "repository": task.Repository, "agent_id": tc.agentInfo.ID, "agent_role": tc.agentInfo.Role, "status": status, "timestamp": time.Now().Format(time.RFC3339), } opts := pubsub.MessageOptions{ FromRole: tc.agentInfo.Role, Priority: "low", ThreadID: fmt.Sprintf("task-%s-%d", task.Repository, task.Number), } err := tc.pubsub.PublishRoleBasedMessage(pubsub.TaskProgress, data, opts) if err != nil { fmt.Printf("⚠️ Failed to announce task progress: %v\n", err) } } // handleRoleMessage handles incoming role-based messages func (tc *TaskCoordinator) handleRoleMessage(msg pubsub.Message, from peer.ID) { switch msg.Type { case pubsub.TaskHelpRequest: tc.handleTaskHelpRequest(msg, from) case pubsub.ExpertiseRequest: tc.handleExpertiseRequest(msg, from) case pubsub.CoordinationRequest: tc.handleCoordinationRequest(msg, from) case pubsub.RoleAnnouncement: tc.handleRoleAnnouncement(msg, from) default: fmt.Printf("🎯 Received %s from %s: %v\n", msg.Type, from.ShortString(), msg.Data) } } // handleTaskHelpRequest handles requests for task assistance func (tc *TaskCoordinator) handleTaskHelpRequest(msg pubsub.Message, from peer.ID) { // Check if we can help with this task requiredExpertise, ok := msg.Data["required_expertise"].([]interface{}) if !ok { return } canHelp := false for _, required := range requiredExpertise { reqStr, ok := required.(string) if !ok { continue } for _, expertise := range tc.agentInfo.Expertise { if strings.EqualFold(reqStr, expertise) { canHelp = true break } } if canHelp { break } } if canHelp && tc.agentInfo.CurrentTasks < tc.agentInfo.MaxTasks { // Offer help responseData := map[string]interface{}{ "agent_id": tc.agentInfo.ID, "agent_role": tc.agentInfo.Role, "expertise": tc.agentInfo.Expertise, "availability": tc.agentInfo.MaxTasks - tc.agentInfo.CurrentTasks, "offer_type": "collaboration", "response_to": msg.Data, } opts := pubsub.MessageOptions{ FromRole: tc.agentInfo.Role, Priority: "medium", ThreadID: msg.ThreadID, } err := tc.pubsub.PublishRoleBasedMessage(pubsub.TaskHelpResponse, responseData, opts) if err != nil { fmt.Printf("⚠️ Failed to offer help: %v\n", err) } else { fmt.Printf("🤝 Offered help for task collaboration\n") } } } // handleExpertiseRequest handles requests for specific expertise func (tc *TaskCoordinator) handleExpertiseRequest(msg pubsub.Message, from peer.ID) { // Similar to task help request but more focused on expertise fmt.Printf("🎯 Expertise request from %s: %v\n", from.ShortString(), msg.Data) } // handleCoordinationRequest handles coordination requests func (tc *TaskCoordinator) handleCoordinationRequest(msg pubsub.Message, from peer.ID) { fmt.Printf("🎯 Coordination request from %s: %v\n", from.ShortString(), msg.Data) } // handleRoleAnnouncement handles role announcements from other agents func (tc *TaskCoordinator) handleRoleAnnouncement(msg pubsub.Message, from peer.ID) { role, _ := msg.Data["role"].(string) expertise, _ := msg.Data["expertise"].([]interface{}) fmt.Printf("📢 Agent %s announced role: %s with expertise: %v\n", from.ShortString(), role, expertise) } // GetStatus returns current coordinator status func (tc *TaskCoordinator) GetStatus() map[string]interface{} { tc.taskLock.RLock() activeTasks := len(tc.activeTasks) taskList := make([]map[string]interface{}, 0, len(tc.activeTasks)) for _, task := range tc.activeTasks { taskList = append(taskList, map[string]interface{}{ "repository": task.Task.Repository, "number": task.Task.Number, "title": task.Task.Title, "status": task.Status, "claimed_at": task.ClaimedAt.Format(time.RFC3339), }) } tc.taskLock.RUnlock() tc.providerLock.RLock() providers := len(tc.providers) tc.providerLock.RUnlock() return map[string]interface{}{ "agent_id": tc.agentInfo.ID, "role": tc.agentInfo.Role, "expertise": tc.agentInfo.Expertise, "current_tasks": activeTasks, "max_tasks": tc.agentInfo.MaxTasks, "active_providers": providers, "status": tc.agentInfo.Status, "active_tasks": taskList, } }