diff --git a/executor/executor.go b/executor/executor.go index 638dd36b..56d7cc59 100644 --- a/executor/executor.go +++ b/executor/executor.go @@ -21,9 +21,9 @@ type ExecuteTaskResult struct { // ExecuteTask manages the entire lifecycle of a task using a sandboxed environment. // Returns sandbox reference so it can be destroyed after PR creation -func ExecuteTask(ctx context.Context, task *types.EnhancedTask, hlog *logging.HypercoreLog) (*ExecuteTaskResult, error) { +func ExecuteTask(ctx context.Context, task *types.EnhancedTask, hlog *logging.HypercoreLog, agentConfig *config.AgentConfig) (*ExecuteTaskResult, error) { // 1. Create the sandbox environment - sb, err := sandbox.CreateSandbox(ctx, "") // Use default image for now + sb, err := sandbox.CreateSandbox(ctx, "", agentConfig) // Use default image for now if err != nil { return nil, fmt.Errorf("failed to create sandbox: %w", err) } diff --git a/github/client.go b/github/client.go index 07c4476b..04f93cb3 100644 --- a/github/client.go +++ b/github/client.go @@ -182,7 +182,7 @@ func (c *Client) ClaimTask(issueNumber int, agentID string) (*Task, error) { // Attempt atomic assignment using GitHub's native assignment // GitHub only accepts existing usernames, so we'll assign to the repo owner - githubAssignee := "anthonyrawlins" + githubAssignee := c.config.Assignee issueRequest := &github.IssueRequest{ Assignee: &githubAssignee, } diff --git a/github/hive_integration.go b/github/hive_integration.go deleted file mode 100644 index 3effaecc..00000000 --- a/github/hive_integration.go +++ /dev/null @@ -1,470 +0,0 @@ -package github - -import ( - "context" - "fmt" - "strings" - "sync" - "time" - - "github.com/anthonyrawlins/bzzz/executor" - "github.com/anthonyrawlins/bzzz/logging" - "github.com/anthonyrawlins/bzzz/pkg/hive" - "github.com/anthonyrawlins/bzzz/pkg/types" - "github.com/anthonyrawlins/bzzz/pubsub" - "github.com/libp2p/go-libp2p/core/peer" -) - -// HiveIntegration handles dynamic repository discovery via Hive API -type HiveIntegration struct { - hiveClient *hive.HiveClient - githubToken string - pubsub *pubsub.PubSub - hlog *logging.HypercoreLog - ctx context.Context - config *IntegrationConfig - - // Repository management - repositories map[int]*RepositoryClient // projectID -> GitHub client - repositoryLock sync.RWMutex - - // Conversation tracking - activeDiscussions map[string]*Conversation // "projectID:taskID" -> conversation - discussionLock sync.RWMutex -} - -// RepositoryClient wraps a GitHub client for a specific repository -type RepositoryClient struct { - Client *Client - Repository hive.Repository - LastSync time.Time -} - -// NewHiveIntegration creates a new Hive-based GitHub integration -func NewHiveIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToken string, ps *pubsub.PubSub, hlog *logging.HypercoreLog, config *IntegrationConfig) *HiveIntegration { - if config.PollInterval == 0 { - config.PollInterval = 30 * time.Second - } - if config.MaxTasks == 0 { - config.MaxTasks = 3 - } - - return &HiveIntegration{ - hiveClient: hiveClient, - githubToken: githubToken, - pubsub: ps, - hlog: hlog, - ctx: ctx, - config: config, - repositories: make(map[int]*RepositoryClient), - activeDiscussions: make(map[string]*Conversation), - } -} - -// Start begins the Hive-GitHub integration -func (hi *HiveIntegration) Start() { - fmt.Printf("🔗 Starting Hive-GitHub integration for agent: %s\n", hi.config.AgentID) - - // Register the handler for incoming meta-discussion messages - hi.pubsub.SetAntennaeMessageHandler(hi.handleMetaDiscussion) - - // Start repository discovery and task polling - go hi.repositoryDiscoveryLoop() - go hi.taskPollingLoop() -} - -// repositoryDiscoveryLoop periodically discovers active repositories from Hive -func (hi *HiveIntegration) repositoryDiscoveryLoop() { - ticker := time.NewTicker(5 * time.Minute) // Check for new repositories every 5 minutes - defer ticker.Stop() - - // Initial discovery - hi.syncRepositories() - - for { - select { - case <-hi.ctx.Done(): - return - case <-ticker.C: - hi.syncRepositories() - } - } -} - -// syncRepositories synchronizes the list of active repositories from Hive -func (hi *HiveIntegration) syncRepositories() { - repositories, err := hi.hiveClient.GetActiveRepositories(hi.ctx) - if err != nil { - fmt.Printf("❌ Failed to get active repositories: %v\n", err) - return - } - - hi.repositoryLock.Lock() - defer hi.repositoryLock.Unlock() - - // Track which repositories we've seen - currentRepos := make(map[int]bool) - - for _, repo := range repositories { - currentRepos[repo.ProjectID] = true - - // Check if we already have a client for this repository - if _, exists := hi.repositories[repo.ProjectID]; !exists { - // Create new GitHub client for this repository - githubConfig := &Config{ - AccessToken: hi.githubToken, - Owner: repo.Owner, - Repository: repo.Repository, - BaseBranch: repo.Branch, - } - - client, err := NewClient(hi.ctx, githubConfig) - if err != nil { - fmt.Printf("❌ Failed to create GitHub client for %s/%s: %v\n", repo.Owner, repo.Repository, err) - continue - } - - hi.repositories[repo.ProjectID] = &RepositoryClient{ - Client: client, - Repository: repo, - LastSync: time.Now(), - } - - fmt.Printf("✅ Added repository: %s/%s (Project ID: %d)\n", repo.Owner, repo.Repository, repo.ProjectID) - } - } - - // Remove repositories that are no longer active - for projectID := range hi.repositories { - if !currentRepos[projectID] { - delete(hi.repositories, projectID) - fmt.Printf("🗑️ Removed inactive repository (Project ID: %d)\n", projectID) - } - } - - fmt.Printf("📊 Repository sync complete: %d active repositories\n", len(hi.repositories)) -} - -// taskPollingLoop periodically polls all repositories for available tasks -func (hi *HiveIntegration) taskPollingLoop() { - ticker := time.NewTicker(hi.config.PollInterval) - defer ticker.Stop() - - for { - select { - case <-hi.ctx.Done(): - return - case <-ticker.C: - hi.pollAllRepositories() - } - } -} - -// pollAllRepositories checks all active repositories for available tasks -func (hi *HiveIntegration) pollAllRepositories() { - hi.repositoryLock.RLock() - repositories := make([]*RepositoryClient, 0, len(hi.repositories)) - for _, repo := range hi.repositories { - repositories = append(repositories, repo) - } - hi.repositoryLock.RUnlock() - - if len(repositories) == 0 { - return - } - - fmt.Printf("🔍 Polling %d repositories for available tasks...\n", len(repositories)) - - var allTasks []*types.EnhancedTask - - // Collect tasks from all repositories - for _, repoClient := range repositories { - tasks, err := hi.getRepositoryTasks(repoClient) - if err != nil { - fmt.Printf("❌ Failed to get tasks from %s/%s: %v\n", - repoClient.Repository.Owner, repoClient.Repository.Repository, err) - continue - } - allTasks = append(allTasks, tasks...) - } - - if len(allTasks) == 0 { - return - } - - fmt.Printf("📋 Found %d total available tasks across all repositories\n", len(allTasks)) - - // Apply filtering and selection - suitableTasks := hi.filterSuitableTasks(allTasks) - if len(suitableTasks) == 0 { - fmt.Printf("⚠️ No suitable tasks for agent capabilities: %v\n", hi.config.Capabilities) - return - } - - // Select and claim the highest priority task - task := suitableTasks[0] - hi.claimAndExecuteTask(task) -} - -// getRepositoryTasks fetches available tasks from a specific repository -func (hi *HiveIntegration) getRepositoryTasks(repoClient *RepositoryClient) ([]*types.EnhancedTask, error) { - // Get tasks from GitHub - githubTasks, err := repoClient.Client.ListAvailableTasks() - if err != nil { - return nil, err - } - - // Convert to enhanced tasks with project context - var enhancedTasks []*types.EnhancedTask - for _, task := range githubTasks { - enhancedTask := &types.EnhancedTask{ - ID: task.ID, - Number: task.Number, - Title: task.Title, - Description: task.Description, - State: task.State, - Labels: task.Labels, - Assignee: task.Assignee, - CreatedAt: task.CreatedAt, - UpdatedAt: task.UpdatedAt, - TaskType: task.TaskType, - Priority: task.Priority, - Requirements: task.Requirements, - Deliverables: task.Deliverables, - Context: task.Context, - ProjectID: repoClient.Repository.ProjectID, - GitURL: repoClient.Repository.GitURL, - Repository: repoClient.Repository, - } - enhancedTasks = append(enhancedTasks, enhancedTask) - } - - return enhancedTasks, nil -} - -// filterSuitableTasks filters tasks based on agent capabilities -func (hi *HiveIntegration) filterSuitableTasks(tasks []*types.EnhancedTask) []*types.EnhancedTask { - var suitable []*types.EnhancedTask - - for _, task := range tasks { - if hi.canHandleTaskType(task.TaskType) { - suitable = append(suitable, task) - } - } - - return suitable -} - -// canHandleTaskType checks if this agent can handle the given task type -func (hi *HiveIntegration) canHandleTaskType(taskType string) bool { - for _, capability := range hi.config.Capabilities { - if capability == taskType || capability == "general" || capability == "task-coordination" { - return true - } - } - return false -} - -// claimAndExecuteTask claims a task and begins execution -func (hi *HiveIntegration) claimAndExecuteTask(task *types.EnhancedTask) { - hi.repositoryLock.RLock() - repoClient, exists := hi.repositories[task.ProjectID] - hi.repositoryLock.RUnlock() - - if !exists { - fmt.Printf("❌ Repository client not found for project %d\n", task.ProjectID) - return - } - - // Claim the task in GitHub - _, err := repoClient.Client.ClaimTask(task.Number, hi.config.AgentID) - if err != nil { - fmt.Printf("❌ Failed to claim task %d in %s/%s: %v\n", - task.Number, task.Repository.Owner, task.Repository.Repository, err) - return - } - - fmt.Printf("✋ Claimed task #%d from %s/%s: %s\n", - task.Number, task.Repository.Owner, task.Repository.Repository, task.Title) - - // Log the claim - hi.hlog.Append(logging.TaskClaimed, map[string]interface{}{ - "task_id": task.Number, - "repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository), - "title": task.Title, - }) - - // Report claim to Hive - if err := hi.hiveClient.ClaimTask(hi.ctx, task.ProjectID, task.Number, hi.config.AgentID); err != nil { - fmt.Printf("⚠️ Failed to report task claim to Hive: %v\n", err) - } - - // Start task execution - go hi.executeTask(task, repoClient) -} - -// executeTask executes a claimed task with reasoning and coordination -func (hi *HiveIntegration) executeTask(task *types.EnhancedTask, repoClient *RepositoryClient) { - // Define the dynamic topic for this task - taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", task.Number) - hi.pubsub.JoinDynamicTopic(taskTopic) - defer hi.pubsub.LeaveDynamicTopic(taskTopic) - - fmt.Printf("🚀 Starting execution of task #%d in sandbox...\n", task.Number) - - // The executor now handles the entire iterative process. - result, err := executor.ExecuteTask(hi.ctx, task, hi.hlog) - if err != nil { - fmt.Printf("❌ Failed to execute task #%d: %v\n", task.Number, err) - hi.hlog.Append(logging.TaskFailed, map[string]interface{}{"task_id": task.Number, "reason": "task execution failed in sandbox"}) - return - } - - // Ensure sandbox cleanup happens regardless of PR creation success/failure - defer result.Sandbox.DestroySandbox() - - // Create a pull request - pr, err := repoClient.Client.CreatePullRequest(task.Number, result.BranchName, hi.config.AgentID) - if err != nil { - fmt.Printf("❌ Failed to create pull request for task #%d: %v\n", task.Number, err) - fmt.Printf("📝 Note: Branch '%s' has been pushed to repository and work is preserved\n", result.BranchName) - - // Escalate PR creation failure to humans via N8N webhook - escalationReason := fmt.Sprintf("Failed to create pull request: %v. Task execution completed successfully and work is preserved in branch '%s', but PR creation failed.", err, result.BranchName) - hi.requestAssistance(task, escalationReason, fmt.Sprintf("bzzz/meta/issue/%d", task.Number)) - - hi.hlog.Append(logging.TaskFailed, map[string]interface{}{ - "task_id": task.Number, - "reason": "failed to create pull request", - "branch_name": result.BranchName, - "work_preserved": true, - "escalated": true, - }) - return - } - - fmt.Printf("✅ Successfully created pull request for task #%d: %s\n", task.Number, pr.GetHTMLURL()) - hi.hlog.Append(logging.TaskCompleted, map[string]interface{}{ - "task_id": task.Number, - "pr_url": pr.GetHTMLURL(), - "pr_number": pr.GetNumber(), - }) - - // Report completion to Hive - if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, task.ProjectID, task.Number, "completed", map[string]interface{}{ - "pull_request_url": pr.GetHTMLURL(), - }); err != nil { - fmt.Printf("⚠️ Failed to report task completion to Hive: %v\n", err) - } -} - -// requestAssistance publishes a help request to the task-specific topic. -func (hi *HiveIntegration) requestAssistance(task *types.EnhancedTask, reason, topic string) { - fmt.Printf("🆘 Agent %s is requesting assistance for task #%d: %s\n", hi.config.AgentID, task.Number, reason) - hi.hlog.Append(logging.TaskHelpRequested, map[string]interface{}{ - "task_id": task.Number, - "reason": reason, - }) - - helpRequest := map[string]interface{}{ - "issue_id": task.Number, - "repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository), - "reason": reason, - } - - hi.pubsub.PublishToDynamicTopic(topic, pubsub.TaskHelpRequest, helpRequest) -} - -// handleMetaDiscussion handles all incoming messages from dynamic and static topics. -func (hi *HiveIntegration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { - switch msg.Type { - case pubsub.TaskHelpRequest: - hi.handleHelpRequest(msg, from) - case pubsub.TaskHelpResponse: - hi.handleHelpResponse(msg, from) - default: - // Handle other meta-discussion messages (e.g., peer feedback) - } -} - -// handleHelpRequest is called when another agent requests assistance. -func (hi *HiveIntegration) handleHelpRequest(msg pubsub.Message, from peer.ID) { - issueID, _ := msg.Data["issue_id"].(float64) - reason, _ := msg.Data["reason"].(string) - fmt.Printf("🙋 Received help request for task #%d from %s: %s\n", int(issueID), from.ShortString(), reason) - - // Simple logic: if we are not busy, we can help. - // A more advanced agent would check its capabilities against the reason. - canHelp := true // Placeholder for more complex logic - - if canHelp { - fmt.Printf("✅ Agent %s can help with task #%d\n", hi.config.AgentID, int(issueID)) - hi.hlog.Append(logging.TaskHelpOffered, map[string]interface{}{ - "task_id": int(issueID), - "requester_id": from.ShortString(), - }) - - response := map[string]interface{}{ - "issue_id": issueID, - "can_help": true, - "capabilities": hi.config.Capabilities, - } - taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", int(issueID)) - hi.pubsub.PublishToDynamicTopic(taskTopic, pubsub.TaskHelpResponse, response) - } -} - -// handleHelpResponse is called when an agent receives an offer for help. -func (hi *HiveIntegration) handleHelpResponse(msg pubsub.Message, from peer.ID) { - issueID, _ := msg.Data["issue_id"].(float64) - canHelp, _ := msg.Data["can_help"].(bool) - - if canHelp { - fmt.Printf("🤝 Received help offer for task #%d from %s\n", int(issueID), from.ShortString()) - hi.hlog.Append(logging.TaskHelpReceived, map[string]interface{}{ - "task_id": int(issueID), - "helper_id": from.ShortString(), - }) - // In a full implementation, the agent would now delegate a sub-task - // or use the helper's capabilities. For now, we just log it. - } -} - -// shouldEscalate determines if a task needs human intervention -func (hi *HiveIntegration) shouldEscalate(response string, history []string) bool { - // Check for escalation keywords - lowerResponse := strings.ToLower(response) - keywords := []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"} - - for _, keyword := range keywords { - if strings.Contains(lowerResponse, keyword) { - return true - } - } - - // Check conversation length - if len(history) >= 10 { - return true - } - - return false -} - -// triggerHumanEscalation sends escalation to Hive and N8N -func (hi *HiveIntegration) triggerHumanEscalation(projectID int, convo *Conversation, reason string) { - hi.hlog.Append(logging.Escalation, map[string]interface{}{ - "task_id": convo.TaskID, - "reason": reason, - }) - - // Report to Hive system - if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, projectID, convo.TaskID, "escalated", map[string]interface{}{ - "escalation_reason": reason, - "conversation_length": len(convo.History), - "escalated_by": hi.config.AgentID, - }); err != nil { - fmt.Printf("⚠️ Failed to report escalation to Hive: %v\n", err) - } - - fmt.Printf("✅ Task #%d in project %d escalated for human intervention\n", convo.TaskID, projectID) -} diff --git a/github/integration.go b/github/integration.go index b648ff8d..55059eff 100644 --- a/github/integration.go +++ b/github/integration.go @@ -1,60 +1,48 @@ package github import ( - "bytes" "context" - "encoding/json" "fmt" - "net/http" "strings" "sync" "time" + "github.com/anthonyrawlins/bzzz/executor" + "github.com/anthonyrawlins/bzzz/logging" + "github.com/anthonyrawlins/bzzz/pkg/hive" + "github.com/anthonyrawlins/bzzz/pkg/types" "github.com/anthonyrawlins/bzzz/pubsub" - "github.com/anthonyrawlins/bzzz/reasoning" "github.com/libp2p/go-libp2p/core/peer" ) -const ( - // humanEscalationWebhookURL is the N8N webhook for escalating tasks. - humanEscalationWebhookURL = "https://n8n.home.deepblack.cloud/webhook-test/human-escalation" - // conversationHistoryLimit is the number of messages before auto-escalation. - conversationHistoryLimit = 10 -) - -var escalationKeywords = []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"} - -// Conversation represents the history of a discussion for a task. -type Conversation struct { - TaskID int - TaskTitle string - TaskDescription string - History []string - LastUpdated time.Time - IsEscalated bool -} - -// Integration handles the integration between GitHub tasks and Bzzz P2P coordination +// Integration handles dynamic repository discovery via Hive API type Integration struct { - client *Client + hiveClient *hive.HiveClient + githubToken string pubsub *pubsub.PubSub - ctx context.Context + hlog *logging.HypercoreLog + ctx context.Context config *IntegrationConfig + agentConfig *config.AgentConfig - activeDiscussions map[int]*Conversation - discussionLock sync.RWMutex + // Repository management + repositories map[int]*RepositoryClient // projectID -> GitHub client + repositoryLock sync.RWMutex + + // Conversation tracking + activeDiscussions map[string]*Conversation // "projectID:taskID" -> conversation + discussionLock sync.RWMutex } -// IntegrationConfig holds configuration for GitHub-Bzzz integration -type IntegrationConfig struct { - PollInterval time.Duration - MaxTasks int - AgentID string - Capabilities []string +// RepositoryClient wraps a GitHub client for a specific repository +type RepositoryClient struct { + Client *Client + Repository hive.Repository + LastSync time.Time } -// NewIntegration creates a new GitHub-Bzzz integration -func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, config *IntegrationConfig) *Integration { +// NewIntegration creates a new Hive-based GitHub integration +func NewIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToken string, ps *pubsub.PubSub, hlog *logging.HypercoreLog, config *IntegrationConfig, agentConfig *config.AgentConfig) *Integration { if config.PollInterval == 0 { config.PollInterval = 30 * time.Second } @@ -63,234 +51,215 @@ func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, conf } return &Integration{ - client: client, + hiveClient: hiveClient, + githubToken: githubToken, pubsub: ps, + hlog: hlog, ctx: ctx, config: config, - activeDiscussions: make(map[int]*Conversation), + agentConfig: agentConfig, + repositories: make(map[int]*RepositoryClient), + activeDiscussions: make(map[string]*Conversation), } } -// Start begins the GitHub-Bzzz integration -func (i *Integration) Start() { - fmt.Printf("🔗 Starting GitHub-Bzzz integration for agent: %s\n", i.config.AgentID) - i.pubsub.SetAntennaeMessageHandler(i.handleMetaDiscussion) - go i.pollForTasks() +// Start begins the Hive-GitHub integration +func (hi *Integration) Start() { + fmt.Printf("🔗 Starting Hive-GitHub integration for agent: %s\n", hi.config.AgentID) + + // Register the handler for incoming meta-discussion messages + hi.pubsub.SetAntennaeMessageHandler(hi.handleMetaDiscussion) + + // Start repository discovery and task polling + go hi.repositoryDiscoveryLoop() + go hi.taskPollingLoop() } -// pollForTasks periodically checks GitHub for available tasks -func (i *Integration) pollForTasks() { - ticker := time.NewTicker(i.config.PollInterval) +// repositoryDiscoveryLoop periodically discovers active repositories from Hive +func (hi *Integration) repositoryDiscoveryLoop() { + ticker := time.NewTicker(5 * time.Minute) // Check for new repositories every 5 minutes defer ticker.Stop() - + + // Initial discovery + hi.syncRepositories() + for { select { - case <-i.ctx.Done(): + case <-hi.ctx.Done(): return case <-ticker.C: - if err := i.checkAndClaimTasks(); err != nil { - fmt.Printf("❌ Error checking tasks: %v\n", err) + hi.syncRepositories() + } + } +} + +// syncRepositories synchronizes the list of active repositories from Hive +func (hi *Integration) syncRepositories() { + repositories, err := hi.hiveClient.GetActiveRepositories(hi.ctx) + if err != nil { + fmt.Printf("❌ Failed to get active repositories: %v\n", err) + return + } + + hi.repositoryLock.Lock() + defer hi.repositoryLock.Unlock() + + // Track which repositories we've seen + currentRepos := make(map[int]bool) + + for _, repo := range repositories { + currentRepos[repo.ProjectID] = true + + // Check if we already have a client for this repository + if _, exists := hi.repositories[repo.ProjectID]; !exists { + // Create new GitHub client for this repository + githubConfig := &Config{ + AccessToken: hi.githubToken, + Owner: repo.Owner, + Repository: repo.Repository, + BaseBranch: repo.Branch, } + + client, err := NewClient(hi.ctx, githubConfig) + if err != nil { + fmt.Printf("❌ Failed to create GitHub client for %s/%s: %v\n", repo.Owner, repo.Repository, err) + continue + } + + hi.repositories[repo.ProjectID] = &RepositoryClient{ + Client: client, + Repository: repo, + LastSync: time.Now(), + } + + fmt.Printf("✅ Added repository: %s/%s (Project ID: %d)\n", repo.Owner, repo.Repository, repo.ProjectID) + } + } + + // Remove repositories that are no longer active + for projectID := range hi.repositories { + if !currentRepos[projectID] { + delete(hi.repositories, projectID) + fmt.Printf("🗑️ Removed inactive repository (Project ID: %d)\n", projectID) + } + } + + fmt.Printf("📊 Repository sync complete: %d active repositories\n", len(hi.repositories)) +} + +// taskPollingLoop periodically polls all repositories for available tasks +func (hi *Integration) taskPollingLoop() { + ticker := time.NewTicker(hi.config.PollInterval) + defer ticker.Stop() + + for { + select { + case <-hi.ctx.Done(): + return + case <-ticker.C: + hi.pollAllRepositories() } } } -// checkAndClaimTasks looks for available tasks and claims suitable ones -func (i *Integration) checkAndClaimTasks() error { - tasks, err := i.client.ListAvailableTasks() - if err != nil { - return fmt.Errorf("failed to list tasks: %w", err) +// pollAllRepositories checks all active repositories for available tasks +func (hi *Integration) pollAllRepositories() { + hi.repositoryLock.RLock() + repositories := make([]*RepositoryClient, 0, len(hi.repositories)) + for _, repo := range hi.repositories { + repositories = append(repositories, repo) } - if len(tasks) == 0 { - return nil + hi.repositoryLock.RUnlock() + + if len(repositories) == 0 { + return } - - suitableTasks := i.filterSuitableTasks(tasks) + + fmt.Printf("🔍 Polling %d repositories for available tasks...\n", len(repositories)) + + var allTasks []*types.EnhancedTask + + // Collect tasks from all repositories + for _, repoClient := range repositories { + tasks, err := hi.getRepositoryTasks(repoClient) + if err != nil { + fmt.Printf("❌ Failed to get tasks from %s/%s: %v\n", + repoClient.Repository.Owner, repoClient.Repository.Repository, err) + continue + } + allTasks = append(allTasks, tasks...) + } + + if len(allTasks) == 0 { + return + } + + fmt.Printf("📋 Found %d total available tasks across all repositories\n", len(allTasks)) + + // Apply filtering and selection + suitableTasks := hi.filterSuitableTasks(allTasks) if len(suitableTasks) == 0 { - return nil + fmt.Printf("⚠️ No suitable tasks for agent capabilities: %v\n", hi.config.Capabilities) + return } - + + // Select and claim the highest priority task task := suitableTasks[0] - claimedTask, err := i.client.ClaimTask(task.Number, i.config.AgentID) - if err != nil { - return fmt.Errorf("failed to claim task %d: %w", task.Number, err) - } - fmt.Printf("✋ Claimed task #%d: %s\n", claimedTask.Number, claimedTask.Title) - - go i.executeTask(claimedTask) - return nil + hi.claimAndExecuteTask(task) } -// executeTask starts the task by generating and proposing a plan. -func (i *Integration) executeTask(task *Task) { - fmt.Printf("🚀 Starting execution of task #%d: %s\n", task.Number, task.Title) - - prompt := fmt.Sprintf("You are an expert AI developer. Based on the following GitHub issue, create a concise, step-by-step plan to resolve it. Issue Title: %s. Issue Body: %s.", task.Title, task.Description) - plan, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt) +// getRepositoryTasks fetches available tasks from a specific repository +func (hi *Integration) getRepositoryTasks(repoClient *RepositoryClient) ([]*types.EnhancedTask, error) { + // Get tasks from GitHub + githubTasks, err := repoClient.Client.ListAvailableTasks() if err != nil { - fmt.Printf("❌ Failed to generate execution plan for task #%d: %v\n", task.Number, err) - return + return nil, err } - fmt.Printf("📝 Generated Plan for task #%d:\n%s\n", task.Number, plan) - - i.discussionLock.Lock() - i.activeDiscussions[task.Number] = &Conversation{ - TaskID: task.Number, - TaskTitle: task.Title, - TaskDescription: task.Description, - History: []string{fmt.Sprintf("Plan by %s: %s", i.config.AgentID, plan)}, - LastUpdated: time.Now(), - } - i.discussionLock.Unlock() - - metaMsg := map[string]interface{}{ - "issue_id": task.Number, - "message": "Here is my proposed plan of action. What are your thoughts?", - "plan": plan, - } - if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, metaMsg); err != nil { - fmt.Printf("⚠️ Failed to publish plan to meta-discussion channel: %v\n", err) - } -} - -// handleMetaDiscussion is the core handler for incoming Antennae messages. -func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { - issueID, ok := msg.Data["issue_id"].(float64) - if !ok { - return - } - taskID := int(issueID) - - i.discussionLock.Lock() - convo, exists := i.activeDiscussions[taskID] - if !exists || convo.IsEscalated { - i.discussionLock.Unlock() - return - } - - incomingMessage, _ := msg.Data["message"].(string) - convo.History = append(convo.History, fmt.Sprintf("Response from %s: %s", from.ShortString(), incomingMessage)) - convo.LastUpdated = time.Now() - i.discussionLock.Unlock() - - fmt.Printf("🎯 Received peer feedback for task #%d. Reasoning about a response...\n", taskID) - - historyStr := strings.Join(convo.History, "\n") - prompt := fmt.Sprintf( - "You are an AI developer collaborating on a task. "+ - "This is the original task: Title: %s, Body: %s. "+ - "This is the conversation so far:\n%s\n\n"+ - "Based on the last message, provide a concise and helpful response.", - convo.TaskTitle, convo.TaskDescription, historyStr, - ) - - response, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt) - if err != nil { - fmt.Printf("❌ Failed to generate response for task #%d: %v\n", taskID, err) - return - } - - // Check if the situation requires human intervention - if i.shouldEscalate(response, convo.History) { - fmt.Printf("🚨 Escalating task #%d for human review.\n", taskID) - convo.IsEscalated = true - go i.triggerHumanEscalation(convo, response) - return - } - - fmt.Printf("💬 Sending response for task #%d...\n", taskID) - responseMsg := map[string]interface{}{ - "issue_id": taskID, - "message": response, - } - if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, responseMsg); err != nil { - fmt.Printf("⚠️ Failed to publish response for task #%d: %v\n", taskID, err) - } -} - -// shouldEscalate determines if a task needs human intervention. -func (i *Integration) shouldEscalate(response string, history []string) bool { - // Rule 1: Check for keywords in the latest response - lowerResponse := strings.ToLower(response) - for _, keyword := range escalationKeywords { - if strings.Contains(lowerResponse, keyword) { - return true + + // Convert to enhanced tasks with project context + var enhancedTasks []*types.EnhancedTask + for _, task := range githubTasks { + enhancedTask := &types.EnhancedTask{ + ID: task.ID, + Number: task.Number, + Title: task.Title, + Description: task.Description, + State: task.State, + Labels: task.Labels, + Assignee: task.Assignee, + CreatedAt: task.CreatedAt, + UpdatedAt: task.UpdatedAt, + TaskType: task.TaskType, + Priority: task.Priority, + Requirements: task.Requirements, + Deliverables: task.Deliverables, + Context: task.Context, + ProjectID: repoClient.Repository.ProjectID, + GitURL: repoClient.Repository.GitURL, + Repository: repoClient.Repository, } + enhancedTasks = append(enhancedTasks, enhancedTask) } - - // Rule 2: Check if the conversation is too long - if len(history) >= conversationHistoryLimit { - return true - } - - return false + + return enhancedTasks, nil } -// triggerHumanEscalation sends the conversation details to the N8N webhook. -func (i *Integration) triggerHumanEscalation(convo *Conversation, reason string) { - // 1. Announce the escalation to other agents - escalationMsg := map[string]interface{}{ - "issue_id": convo.TaskID, - "message": "This task has been escalated for human review. No further automated action will be taken.", - "reason": reason, - } - if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, escalationMsg); err != nil { - fmt.Printf("⚠️ Failed to publish escalation message for task #%d: %v\n", convo.TaskID, err) - } - - // 2. Send the payload to the N8N webhook - payload := map[string]interface{}{ - "task_id": convo.TaskID, - "task_title": convo.TaskTitle, - "escalation_agent": i.config.AgentID, - "reason": reason, - "history": strings.Join(convo.History, "\n"), - } - payloadBytes, err := json.Marshal(payload) - if err != nil { - fmt.Printf("❌ Failed to marshal escalation payload for task #%d: %v\n", convo.TaskID, err) - return - } - - req, err := http.NewRequestWithContext(i.ctx, "POST", humanEscalationWebhookURL, bytes.NewBuffer(payloadBytes)) - if err != nil { - fmt.Printf("❌ Failed to create escalation request for task #%d: %v\n", convo.TaskID, err) - return - } - req.Header.Set("Content-Type", "application/json") - - resp, err := http.DefaultClient.Do(req) - if err != nil { - fmt.Printf("❌ Failed to send escalation webhook for task #%d: %v\n", convo.TaskID, err) - return - } - defer resp.Body.Close() - - if resp.StatusCode >= 300 { - fmt.Printf("⚠️ Human escalation webhook for task #%d returned non-2xx status: %d\n", convo.TaskID, resp.StatusCode) - } else { - fmt.Printf("✅ Successfully escalated task #%d to human administrator.\n", convo.TaskID) - } -} - -// filterSuitableTasks filters tasks based on agent capabilities and task labels -func (i *Integration) filterSuitableTasks(tasks []*Task) []*Task { - var suitable []*Task +// filterSuitableTasks filters tasks based on agent capabilities +func (hi *Integration) filterSuitableTasks(tasks []*types.EnhancedTask) []*types.EnhancedTask { + var suitable []*types.EnhancedTask for _, task := range tasks { - // Check if we can handle this task based on its labels or title keywords - if i.canHandleTask(task) { + if hi.canHandleTaskType(task.TaskType) { suitable = append(suitable, task) } } - fmt.Printf("🔍 Filtered %d suitable tasks from %d total tasks\n", len(suitable), len(tasks)) return suitable } // canHandleTaskType checks if this agent can handle the given task type -func (i *Integration) canHandleTaskType(taskType string) bool { - for _, capability := range i.config.Capabilities { +func (hi *Integration) canHandleTaskType(taskType string) bool { + for _, capability := range hi.config.Capabilities { if capability == taskType || capability == "general" || capability == "task-coordination" { return true } @@ -298,67 +267,206 @@ func (i *Integration) canHandleTaskType(taskType string) bool { return false } -// canHandleTask determines if this agent can handle a specific task -func (i *Integration) canHandleTask(task *Task) bool { - // Check task labels for capability matches - for _, label := range task.Labels { - if i.canHandleTaskType(label) { - return true - } +// claimAndExecuteTask claims a task and begins execution +func (hi *Integration) claimAndExecuteTask(task *types.EnhancedTask) { + hi.repositoryLock.RLock() + repoClient, exists := hi.repositories[task.ProjectID] + hi.repositoryLock.RUnlock() + + if !exists { + fmt.Printf("❌ Repository client not found for project %d\n", task.ProjectID) + return } - // Check title/description for keyword matches based on capabilities - taskText := strings.ToLower(task.Title + " " + task.Description) - - for _, capability := range i.config.Capabilities { - switch capability { - case "code-generation", "coding": - if strings.Contains(taskText, "code") || strings.Contains(taskText, "implement") || - strings.Contains(taskText, "develop") || strings.Contains(taskText, "write") { - return true - } - case "code-analysis", "review": - if strings.Contains(taskText, "review") || strings.Contains(taskText, "analyze") || - strings.Contains(taskText, "audit") || strings.Contains(taskText, "refactor") { - return true - } - case "debugging", "bug-fix": - if strings.Contains(taskText, "bug") || strings.Contains(taskText, "fix") || - strings.Contains(taskText, "error") || strings.Contains(taskText, "debug") { - return true - } - case "testing": - if strings.Contains(taskText, "test") || strings.Contains(taskText, "spec") || - strings.Contains(taskText, "validation") { - return true - } - case "documentation": - if strings.Contains(taskText, "doc") || strings.Contains(taskText, "readme") || - strings.Contains(taskText, "guide") || strings.Contains(taskText, "manual") { - return true - } - case "general", "task-coordination", "meta-discussion": - // These capabilities can handle any task - return true - } + // Claim the task in GitHub + _, err := repoClient.Client.ClaimTask(task.Number, hi.config.AgentID) + if err != nil { + fmt.Printf("❌ Failed to claim task %d in %s/%s: %v\n", + task.Number, task.Repository.Owner, task.Repository.Repository, err) + return } - // If no specific match, check if we have general capabilities - return i.canHandleTaskType("general") + fmt.Printf("✋ Claimed task #%d from %s/%s: %s\n", + task.Number, task.Repository.Owner, task.Repository.Repository, task.Title) + + // Log the claim + hi.hlog.Append(logging.TaskClaimed, map[string]interface{}{ + "task_id": task.Number, + "repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository), + "title": task.Title, + }) + + // Report claim to Hive + if err := hi.hiveClient.ClaimTask(hi.ctx, task.ProjectID, task.Number, hi.config.AgentID); err != nil { + fmt.Printf("⚠️ Failed to report task claim to Hive: %v\n", err) + } + + // Start task execution + go hi.executeTask(task, repoClient) } -// announceTaskClaim broadcasts task claim to P2P mesh for coordination -func (i *Integration) announceTaskClaim(task *Task) error { - claimData := map[string]interface{}{ - "task_id": task.ID, - "task_number": task.Number, - "task_title": task.Title, - "agent_id": i.config.AgentID, - "timestamp": time.Now().Unix(), - "repository": fmt.Sprintf("%s/%s", task.Labels[0], task.Labels[1]), // Assuming owner/repo in labels - "action": "claimed", +// executeTask executes a claimed task with reasoning and coordination +func (hi *Integration) executeTask(task *types.EnhancedTask, repoClient *RepositoryClient) { + // Define the dynamic topic for this task + taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", task.Number) + hi.pubsub.JoinDynamicTopic(taskTopic) + defer hi.pubsub.LeaveDynamicTopic(taskTopic) + + fmt.Printf("🚀 Starting execution of task #%d in sandbox...\n", task.Number) + + // The executor now handles the entire iterative process. + result, err := executor.ExecuteTask(hi.ctx, task, hi.hlog, hi.agentConfig) + if err != nil { + fmt.Printf("❌ Failed to execute task #%d: %v\n", task.Number, err) + hi.hlog.Append(logging.TaskFailed, map[string]interface{}{"task_id": task.Number, "reason": "task execution failed in sandbox"}) + return + } + + // Ensure sandbox cleanup happens regardless of PR creation success/failure + defer result.Sandbox.DestroySandbox() + + // Create a pull request + pr, err := repoClient.Client.CreatePullRequest(task.Number, result.BranchName, hi.config.AgentID) + if err != nil { + fmt.Printf("❌ Failed to create pull request for task #%d: %v\n", task.Number, err) + fmt.Printf("📝 Note: Branch '%s' has been pushed to repository and work is preserved\n", result.BranchName) + + // Escalate PR creation failure to humans via N8N webhook + escalationReason := fmt.Sprintf("Failed to create pull request: %v. Task execution completed successfully and work is preserved in branch '%s', but PR creation failed.", err, result.BranchName) + hi.requestAssistance(task, escalationReason, fmt.Sprintf("bzzz/meta/issue/%d", task.Number)) + + hi.hlog.Append(logging.TaskFailed, map[string]interface{}{ + "task_id": task.Number, + "reason": "failed to create pull request", + "branch_name": result.BranchName, + "work_preserved": true, + "escalated": true, + }) + return + } + + fmt.Printf("✅ Successfully created pull request for task #%d: %s\n", task.Number, pr.GetHTMLURL()) + hi.hlog.Append(logging.TaskCompleted, map[string]interface{}{ + "task_id": task.Number, + "pr_url": pr.GetHTMLURL(), + "pr_number": pr.GetNumber(), + }) + + // Report completion to Hive + if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, task.ProjectID, task.Number, "completed", map[string]interface{}{ + "pull_request_url": pr.GetHTMLURL(), + }); err != nil { + fmt.Printf("⚠️ Failed to report task completion to Hive: %v\n", err) + } +} + +// requestAssistance publishes a help request to the task-specific topic. +func (hi *Integration) requestAssistance(task *types.EnhancedTask, reason, topic string) { + fmt.Printf("🆘 Agent %s is requesting assistance for task #%d: %s\n", hi.config.AgentID, task.Number, reason) + hi.hlog.Append(logging.TaskHelpRequested, map[string]interface{}{ + "task_id": task.Number, + "reason": reason, + }) + + helpRequest := map[string]interface{}{ + "issue_id": task.Number, + "repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository), + "reason": reason, + } + + hi.pubsub.PublishToDynamicTopic(topic, pubsub.TaskHelpRequest, helpRequest) +} + +// handleMetaDiscussion handles all incoming messages from dynamic and static topics. +func (hi *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { + switch msg.Type { + case pubsub.TaskHelpRequest: + hi.handleHelpRequest(msg, from) + case pubsub.TaskHelpResponse: + hi.handleHelpResponse(msg, from) + default: + // Handle other meta-discussion messages (e.g., peer feedback) + } +} + +// handleHelpRequest is called when another agent requests assistance. +func (hi *Integration) handleHelpRequest(msg pubsub.Message, from peer.ID) { + issueID, _ := msg.Data["issue_id"].(float64) + reason, _ := msg.Data["reason"].(string) + fmt.Printf("🙋 Received help request for task #%d from %s: %s\n", int(issueID), from.ShortString(), reason) + + // Simple logic: if we are not busy, we can help. + // TODO: A more advanced agent would check its capabilities against the reason. + canHelp := true // Placeholder for more complex logic + + if canHelp { + fmt.Printf("✅ Agent %s can help with task #%d\n", hi.config.AgentID, int(issueID)) + hi.hlog.Append(logging.TaskHelpOffered, map[string]interface{}{ + "task_id": int(issueID), + "requester_id": from.ShortString(), + }) + + response := map[string]interface{}{ + "issue_id": issueID, + "can_help": true, + "capabilities": hi.config.Capabilities, + } + taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", int(issueID)) + hi.pubsub.PublishToDynamicTopic(taskTopic, pubsub.TaskHelpResponse, response) + } +} + +// handleHelpResponse is called when an agent receives an offer for help. +func (hi *Integration) handleHelpResponse(msg pubsub.Message, from peer.ID) { + issueID, _ := msg.Data["issue_id"].(float64) + canHelp, _ := msg.Data["can_help"].(bool) + + if canHelp { + fmt.Printf("🤝 Received help offer for task #%d from %s\n", int(issueID), from.ShortString()) + hi.hlog.Append(logging.TaskHelpReceived, map[string]interface{}{ + "task_id": int(issueID), + "helper_id": from.ShortString(), + }) + // In a full implementation, the agent would now delegate a sub-task + // or use the helper's capabilities. For now, we just log it. + } +} + +// shouldEscalate determines if a task needs human intervention +func (hi *Integration) shouldEscalate(response string, history []string) bool { + // Check for escalation keywords + lowerResponse := strings.ToLower(response) + keywords := []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"} + + for _, keyword := range keywords { + if strings.Contains(lowerResponse, keyword) { + return true + } } - fmt.Printf("📢 Announcing task claim to P2P mesh: Task #%d\n", task.Number) - return i.pubsub.PublishBzzzMessage(pubsub.TaskClaim, claimData) -} \ No newline at end of file + // Check conversation length + if len(history) >= 10 { + return true + } + + return false +} + +// triggerHumanEscalation sends escalation to Hive and N8N +func (hi *Integration) triggerHumanEscalation(projectID int, convo *Conversation, reason string) { + hi.hlog.Append(logging.Escalation, map[string]interface{}{ + "task_id": convo.TaskID, + "reason": reason, + }) + + // Report to Hive system + if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, projectID, convo.TaskID, "escalated", map[string]interface{}{ + "escalation_reason": reason, + "conversation_length": len(convo.History), + "escalated_by": hi.config.AgentID, + }); err != nil { + fmt.Printf("⚠️ Failed to report escalation to Hive: %v\n", err) + } + + fmt.Printf("✅ Task #%d in project %d escalated for human intervention\n", convo.TaskID, projectID) +} diff --git a/main.go b/main.go index 4df0a05a..16889325 100644 --- a/main.go +++ b/main.go @@ -141,7 +141,7 @@ func main() { } // Initialize dynamic GitHub integration - var ghIntegration *github.HiveIntegration + var ghIntegration *github.Integration if githubToken != "" { // Use agent ID from config (auto-generated from node ID) agentID := cfg.Agent.ID @@ -156,7 +156,7 @@ func main() { MaxTasks: cfg.Agent.MaxTasks, } - ghIntegration = github.NewHiveIntegration(ctx, hiveClient, githubToken, ps, hlog, integrationConfig) + ghIntegration = github.NewIntegration(ctx, hiveClient, githubToken, ps, hlog, integrationConfig, &cfg.Agent) // Start the integration service ghIntegration.Start() @@ -339,7 +339,7 @@ func announceCapabilitiesOnChange(ps *pubsub.PubSub, nodeID string, cfg *config. cfg.Agent.Models = validModels // Configure reasoning module with available models and webhook - reasoning.SetModelConfig(validModels, cfg.Agent.ModelSelectionWebhook) + reasoning.SetModelConfig(validModels, cfg.Agent.ModelSelectionWebhook, cfg.Agent.DefaultReasoningModel) } // Get current capabilities diff --git a/pkg/config/config.go b/pkg/config/config.go index b93a31c6..f369cf68 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -36,6 +36,8 @@ type AgentConfig struct { Models []string `yaml:"models"` Specialization string `yaml:"specialization"` ModelSelectionWebhook string `yaml:"model_selection_webhook"` + DefaultReasoningModel string `yaml:"default_reasoning_model"` + SandboxImage string `yaml:"sandbox_image"` } // GitHubConfig holds GitHub integration settings @@ -44,6 +46,7 @@ type GitHubConfig struct { UserAgent string `yaml:"user_agent"` Timeout time.Duration `yaml:"timeout"` RateLimit bool `yaml:"rate_limit"` + Assignee string `yaml:"assignee"` } // P2PConfig holds P2P networking configuration @@ -107,12 +110,15 @@ func getDefaultConfig() *Config { Models: []string{"phi3", "llama3.1"}, Specialization: "general_developer", ModelSelectionWebhook: "https://n8n.home.deepblack.cloud/webhook/model-selection", + DefaultReasoningModel: "phi3", + SandboxImage: "registry.home.deepblack.cloud/tony/bzzz-sandbox:latest", }, GitHub: GitHubConfig{ TokenFile: "/home/tony/AI/secrets/passwords_and_tokens/gh-token", UserAgent: "Bzzz-P2P-Agent/1.0", Timeout: 30 * time.Second, RateLimit: true, + Assignee: "anthonyrawlins", }, P2P: P2PConfig{ ServiceTag: "bzzz-peer-discovery", diff --git a/reasoning/reasoning.go b/reasoning/reasoning.go index 09c660fe..29abc828 100644 --- a/reasoning/reasoning.go +++ b/reasoning/reasoning.go @@ -18,6 +18,7 @@ const ( var ( availableModels []string modelWebhookURL string + defaultModel string ) // OllamaRequest represents the request payload for the Ollama API. @@ -84,9 +85,10 @@ func GenerateResponse(ctx context.Context, model, prompt string) (string, error) } // SetModelConfig configures the available models and webhook URL for smart model selection -func SetModelConfig(models []string, webhookURL string) { +func SetModelConfig(models []string, webhookURL, defaultReasoningModel string) { availableModels = models modelWebhookURL = webhookURL + defaultModel = defaultReasoningModel } // selectBestModel calls the model selection webhook to choose the best model for a prompt @@ -96,7 +98,7 @@ func selectBestModel(availableModels []string, prompt string) string { if len(availableModels) > 0 { return availableModels[0] } - return "phi3" // Last resort fallback + return defaultModel // Last resort fallback } requestPayload := map[string]interface{}{ diff --git a/sandbox/sandbox.go b/sandbox/sandbox.go index dab4e80e..7a1a0620 100644 --- a/sandbox/sandbox.go +++ b/sandbox/sandbox.go @@ -15,11 +15,6 @@ import ( "github.com/docker/docker/pkg/stdcopy" ) -const ( - // DefaultDockerImage is the image used if a task does not specify one. - DefaultDockerImage = "registry.home.deepblack.cloud/tony/bzzz-sandbox:latest" -) - // Sandbox represents a stateful, isolated execution environment for a single task. type Sandbox struct { ID string // The ID of the running container. @@ -37,9 +32,9 @@ type CommandResult struct { } // CreateSandbox provisions a new Docker container for a task. -func CreateSandbox(ctx context.Context, taskImage string) (*Sandbox, error) { +func CreateSandbox(ctx context.Context, taskImage string, agentConfig *config.AgentConfig) (*Sandbox, error) { if taskImage == "" { - taskImage = DefaultDockerImage + taskImage = agentConfig.SandboxImage } // Create a new Docker client