refactor: Parameterize hardcoded values and resolve Integration duality
This commit is contained in:
@@ -21,9 +21,9 @@ type ExecuteTaskResult struct {
|
|||||||
|
|
||||||
// ExecuteTask manages the entire lifecycle of a task using a sandboxed environment.
|
// ExecuteTask manages the entire lifecycle of a task using a sandboxed environment.
|
||||||
// Returns sandbox reference so it can be destroyed after PR creation
|
// Returns sandbox reference so it can be destroyed after PR creation
|
||||||
func ExecuteTask(ctx context.Context, task *types.EnhancedTask, hlog *logging.HypercoreLog) (*ExecuteTaskResult, error) {
|
func ExecuteTask(ctx context.Context, task *types.EnhancedTask, hlog *logging.HypercoreLog, agentConfig *config.AgentConfig) (*ExecuteTaskResult, error) {
|
||||||
// 1. Create the sandbox environment
|
// 1. Create the sandbox environment
|
||||||
sb, err := sandbox.CreateSandbox(ctx, "") // Use default image for now
|
sb, err := sandbox.CreateSandbox(ctx, "", agentConfig) // Use default image for now
|
||||||
if err != nil {
|
if err != nil {
|
||||||
return nil, fmt.Errorf("failed to create sandbox: %w", err)
|
return nil, fmt.Errorf("failed to create sandbox: %w", err)
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -182,7 +182,7 @@ func (c *Client) ClaimTask(issueNumber int, agentID string) (*Task, error) {
|
|||||||
|
|
||||||
// Attempt atomic assignment using GitHub's native assignment
|
// Attempt atomic assignment using GitHub's native assignment
|
||||||
// GitHub only accepts existing usernames, so we'll assign to the repo owner
|
// GitHub only accepts existing usernames, so we'll assign to the repo owner
|
||||||
githubAssignee := "anthonyrawlins"
|
githubAssignee := c.config.Assignee
|
||||||
issueRequest := &github.IssueRequest{
|
issueRequest := &github.IssueRequest{
|
||||||
Assignee: &githubAssignee,
|
Assignee: &githubAssignee,
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -1,470 +0,0 @@
|
|||||||
package github
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"fmt"
|
|
||||||
"strings"
|
|
||||||
"sync"
|
|
||||||
"time"
|
|
||||||
|
|
||||||
"github.com/anthonyrawlins/bzzz/executor"
|
|
||||||
"github.com/anthonyrawlins/bzzz/logging"
|
|
||||||
"github.com/anthonyrawlins/bzzz/pkg/hive"
|
|
||||||
"github.com/anthonyrawlins/bzzz/pkg/types"
|
|
||||||
"github.com/anthonyrawlins/bzzz/pubsub"
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
|
||||||
)
|
|
||||||
|
|
||||||
// HiveIntegration handles dynamic repository discovery via Hive API
|
|
||||||
type HiveIntegration struct {
|
|
||||||
hiveClient *hive.HiveClient
|
|
||||||
githubToken string
|
|
||||||
pubsub *pubsub.PubSub
|
|
||||||
hlog *logging.HypercoreLog
|
|
||||||
ctx context.Context
|
|
||||||
config *IntegrationConfig
|
|
||||||
|
|
||||||
// Repository management
|
|
||||||
repositories map[int]*RepositoryClient // projectID -> GitHub client
|
|
||||||
repositoryLock sync.RWMutex
|
|
||||||
|
|
||||||
// Conversation tracking
|
|
||||||
activeDiscussions map[string]*Conversation // "projectID:taskID" -> conversation
|
|
||||||
discussionLock sync.RWMutex
|
|
||||||
}
|
|
||||||
|
|
||||||
// RepositoryClient wraps a GitHub client for a specific repository
|
|
||||||
type RepositoryClient struct {
|
|
||||||
Client *Client
|
|
||||||
Repository hive.Repository
|
|
||||||
LastSync time.Time
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewHiveIntegration creates a new Hive-based GitHub integration
|
|
||||||
func NewHiveIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToken string, ps *pubsub.PubSub, hlog *logging.HypercoreLog, config *IntegrationConfig) *HiveIntegration {
|
|
||||||
if config.PollInterval == 0 {
|
|
||||||
config.PollInterval = 30 * time.Second
|
|
||||||
}
|
|
||||||
if config.MaxTasks == 0 {
|
|
||||||
config.MaxTasks = 3
|
|
||||||
}
|
|
||||||
|
|
||||||
return &HiveIntegration{
|
|
||||||
hiveClient: hiveClient,
|
|
||||||
githubToken: githubToken,
|
|
||||||
pubsub: ps,
|
|
||||||
hlog: hlog,
|
|
||||||
ctx: ctx,
|
|
||||||
config: config,
|
|
||||||
repositories: make(map[int]*RepositoryClient),
|
|
||||||
activeDiscussions: make(map[string]*Conversation),
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start begins the Hive-GitHub integration
|
|
||||||
func (hi *HiveIntegration) Start() {
|
|
||||||
fmt.Printf("🔗 Starting Hive-GitHub integration for agent: %s\n", hi.config.AgentID)
|
|
||||||
|
|
||||||
// Register the handler for incoming meta-discussion messages
|
|
||||||
hi.pubsub.SetAntennaeMessageHandler(hi.handleMetaDiscussion)
|
|
||||||
|
|
||||||
// Start repository discovery and task polling
|
|
||||||
go hi.repositoryDiscoveryLoop()
|
|
||||||
go hi.taskPollingLoop()
|
|
||||||
}
|
|
||||||
|
|
||||||
// repositoryDiscoveryLoop periodically discovers active repositories from Hive
|
|
||||||
func (hi *HiveIntegration) repositoryDiscoveryLoop() {
|
|
||||||
ticker := time.NewTicker(5 * time.Minute) // Check for new repositories every 5 minutes
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
// Initial discovery
|
|
||||||
hi.syncRepositories()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-hi.ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
hi.syncRepositories()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// syncRepositories synchronizes the list of active repositories from Hive
|
|
||||||
func (hi *HiveIntegration) syncRepositories() {
|
|
||||||
repositories, err := hi.hiveClient.GetActiveRepositories(hi.ctx)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to get active repositories: %v\n", err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
hi.repositoryLock.Lock()
|
|
||||||
defer hi.repositoryLock.Unlock()
|
|
||||||
|
|
||||||
// Track which repositories we've seen
|
|
||||||
currentRepos := make(map[int]bool)
|
|
||||||
|
|
||||||
for _, repo := range repositories {
|
|
||||||
currentRepos[repo.ProjectID] = true
|
|
||||||
|
|
||||||
// Check if we already have a client for this repository
|
|
||||||
if _, exists := hi.repositories[repo.ProjectID]; !exists {
|
|
||||||
// Create new GitHub client for this repository
|
|
||||||
githubConfig := &Config{
|
|
||||||
AccessToken: hi.githubToken,
|
|
||||||
Owner: repo.Owner,
|
|
||||||
Repository: repo.Repository,
|
|
||||||
BaseBranch: repo.Branch,
|
|
||||||
}
|
|
||||||
|
|
||||||
client, err := NewClient(hi.ctx, githubConfig)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to create GitHub client for %s/%s: %v\n", repo.Owner, repo.Repository, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
|
|
||||||
hi.repositories[repo.ProjectID] = &RepositoryClient{
|
|
||||||
Client: client,
|
|
||||||
Repository: repo,
|
|
||||||
LastSync: time.Now(),
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("✅ Added repository: %s/%s (Project ID: %d)\n", repo.Owner, repo.Repository, repo.ProjectID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Remove repositories that are no longer active
|
|
||||||
for projectID := range hi.repositories {
|
|
||||||
if !currentRepos[projectID] {
|
|
||||||
delete(hi.repositories, projectID)
|
|
||||||
fmt.Printf("🗑️ Removed inactive repository (Project ID: %d)\n", projectID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("📊 Repository sync complete: %d active repositories\n", len(hi.repositories))
|
|
||||||
}
|
|
||||||
|
|
||||||
// taskPollingLoop periodically polls all repositories for available tasks
|
|
||||||
func (hi *HiveIntegration) taskPollingLoop() {
|
|
||||||
ticker := time.NewTicker(hi.config.PollInterval)
|
|
||||||
defer ticker.Stop()
|
|
||||||
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-hi.ctx.Done():
|
|
||||||
return
|
|
||||||
case <-ticker.C:
|
|
||||||
hi.pollAllRepositories()
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// pollAllRepositories checks all active repositories for available tasks
|
|
||||||
func (hi *HiveIntegration) pollAllRepositories() {
|
|
||||||
hi.repositoryLock.RLock()
|
|
||||||
repositories := make([]*RepositoryClient, 0, len(hi.repositories))
|
|
||||||
for _, repo := range hi.repositories {
|
|
||||||
repositories = append(repositories, repo)
|
|
||||||
}
|
|
||||||
hi.repositoryLock.RUnlock()
|
|
||||||
|
|
||||||
if len(repositories) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("🔍 Polling %d repositories for available tasks...\n", len(repositories))
|
|
||||||
|
|
||||||
var allTasks []*types.EnhancedTask
|
|
||||||
|
|
||||||
// Collect tasks from all repositories
|
|
||||||
for _, repoClient := range repositories {
|
|
||||||
tasks, err := hi.getRepositoryTasks(repoClient)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to get tasks from %s/%s: %v\n",
|
|
||||||
repoClient.Repository.Owner, repoClient.Repository.Repository, err)
|
|
||||||
continue
|
|
||||||
}
|
|
||||||
allTasks = append(allTasks, tasks...)
|
|
||||||
}
|
|
||||||
|
|
||||||
if len(allTasks) == 0 {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("📋 Found %d total available tasks across all repositories\n", len(allTasks))
|
|
||||||
|
|
||||||
// Apply filtering and selection
|
|
||||||
suitableTasks := hi.filterSuitableTasks(allTasks)
|
|
||||||
if len(suitableTasks) == 0 {
|
|
||||||
fmt.Printf("⚠️ No suitable tasks for agent capabilities: %v\n", hi.config.Capabilities)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Select and claim the highest priority task
|
|
||||||
task := suitableTasks[0]
|
|
||||||
hi.claimAndExecuteTask(task)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getRepositoryTasks fetches available tasks from a specific repository
|
|
||||||
func (hi *HiveIntegration) getRepositoryTasks(repoClient *RepositoryClient) ([]*types.EnhancedTask, error) {
|
|
||||||
// Get tasks from GitHub
|
|
||||||
githubTasks, err := repoClient.Client.ListAvailableTasks()
|
|
||||||
if err != nil {
|
|
||||||
return nil, err
|
|
||||||
}
|
|
||||||
|
|
||||||
// Convert to enhanced tasks with project context
|
|
||||||
var enhancedTasks []*types.EnhancedTask
|
|
||||||
for _, task := range githubTasks {
|
|
||||||
enhancedTask := &types.EnhancedTask{
|
|
||||||
ID: task.ID,
|
|
||||||
Number: task.Number,
|
|
||||||
Title: task.Title,
|
|
||||||
Description: task.Description,
|
|
||||||
State: task.State,
|
|
||||||
Labels: task.Labels,
|
|
||||||
Assignee: task.Assignee,
|
|
||||||
CreatedAt: task.CreatedAt,
|
|
||||||
UpdatedAt: task.UpdatedAt,
|
|
||||||
TaskType: task.TaskType,
|
|
||||||
Priority: task.Priority,
|
|
||||||
Requirements: task.Requirements,
|
|
||||||
Deliverables: task.Deliverables,
|
|
||||||
Context: task.Context,
|
|
||||||
ProjectID: repoClient.Repository.ProjectID,
|
|
||||||
GitURL: repoClient.Repository.GitURL,
|
|
||||||
Repository: repoClient.Repository,
|
|
||||||
}
|
|
||||||
enhancedTasks = append(enhancedTasks, enhancedTask)
|
|
||||||
}
|
|
||||||
|
|
||||||
return enhancedTasks, nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterSuitableTasks filters tasks based on agent capabilities
|
|
||||||
func (hi *HiveIntegration) filterSuitableTasks(tasks []*types.EnhancedTask) []*types.EnhancedTask {
|
|
||||||
var suitable []*types.EnhancedTask
|
|
||||||
|
|
||||||
for _, task := range tasks {
|
|
||||||
if hi.canHandleTaskType(task.TaskType) {
|
|
||||||
suitable = append(suitable, task)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return suitable
|
|
||||||
}
|
|
||||||
|
|
||||||
// canHandleTaskType checks if this agent can handle the given task type
|
|
||||||
func (hi *HiveIntegration) canHandleTaskType(taskType string) bool {
|
|
||||||
for _, capability := range hi.config.Capabilities {
|
|
||||||
if capability == taskType || capability == "general" || capability == "task-coordination" {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// claimAndExecuteTask claims a task and begins execution
|
|
||||||
func (hi *HiveIntegration) claimAndExecuteTask(task *types.EnhancedTask) {
|
|
||||||
hi.repositoryLock.RLock()
|
|
||||||
repoClient, exists := hi.repositories[task.ProjectID]
|
|
||||||
hi.repositoryLock.RUnlock()
|
|
||||||
|
|
||||||
if !exists {
|
|
||||||
fmt.Printf("❌ Repository client not found for project %d\n", task.ProjectID)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Claim the task in GitHub
|
|
||||||
_, err := repoClient.Client.ClaimTask(task.Number, hi.config.AgentID)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to claim task %d in %s/%s: %v\n",
|
|
||||||
task.Number, task.Repository.Owner, task.Repository.Repository, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("✋ Claimed task #%d from %s/%s: %s\n",
|
|
||||||
task.Number, task.Repository.Owner, task.Repository.Repository, task.Title)
|
|
||||||
|
|
||||||
// Log the claim
|
|
||||||
hi.hlog.Append(logging.TaskClaimed, map[string]interface{}{
|
|
||||||
"task_id": task.Number,
|
|
||||||
"repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository),
|
|
||||||
"title": task.Title,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Report claim to Hive
|
|
||||||
if err := hi.hiveClient.ClaimTask(hi.ctx, task.ProjectID, task.Number, hi.config.AgentID); err != nil {
|
|
||||||
fmt.Printf("⚠️ Failed to report task claim to Hive: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start task execution
|
|
||||||
go hi.executeTask(task, repoClient)
|
|
||||||
}
|
|
||||||
|
|
||||||
// executeTask executes a claimed task with reasoning and coordination
|
|
||||||
func (hi *HiveIntegration) executeTask(task *types.EnhancedTask, repoClient *RepositoryClient) {
|
|
||||||
// Define the dynamic topic for this task
|
|
||||||
taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", task.Number)
|
|
||||||
hi.pubsub.JoinDynamicTopic(taskTopic)
|
|
||||||
defer hi.pubsub.LeaveDynamicTopic(taskTopic)
|
|
||||||
|
|
||||||
fmt.Printf("🚀 Starting execution of task #%d in sandbox...\n", task.Number)
|
|
||||||
|
|
||||||
// The executor now handles the entire iterative process.
|
|
||||||
result, err := executor.ExecuteTask(hi.ctx, task, hi.hlog)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to execute task #%d: %v\n", task.Number, err)
|
|
||||||
hi.hlog.Append(logging.TaskFailed, map[string]interface{}{"task_id": task.Number, "reason": "task execution failed in sandbox"})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Ensure sandbox cleanup happens regardless of PR creation success/failure
|
|
||||||
defer result.Sandbox.DestroySandbox()
|
|
||||||
|
|
||||||
// Create a pull request
|
|
||||||
pr, err := repoClient.Client.CreatePullRequest(task.Number, result.BranchName, hi.config.AgentID)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to create pull request for task #%d: %v\n", task.Number, err)
|
|
||||||
fmt.Printf("📝 Note: Branch '%s' has been pushed to repository and work is preserved\n", result.BranchName)
|
|
||||||
|
|
||||||
// Escalate PR creation failure to humans via N8N webhook
|
|
||||||
escalationReason := fmt.Sprintf("Failed to create pull request: %v. Task execution completed successfully and work is preserved in branch '%s', but PR creation failed.", err, result.BranchName)
|
|
||||||
hi.requestAssistance(task, escalationReason, fmt.Sprintf("bzzz/meta/issue/%d", task.Number))
|
|
||||||
|
|
||||||
hi.hlog.Append(logging.TaskFailed, map[string]interface{}{
|
|
||||||
"task_id": task.Number,
|
|
||||||
"reason": "failed to create pull request",
|
|
||||||
"branch_name": result.BranchName,
|
|
||||||
"work_preserved": true,
|
|
||||||
"escalated": true,
|
|
||||||
})
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("✅ Successfully created pull request for task #%d: %s\n", task.Number, pr.GetHTMLURL())
|
|
||||||
hi.hlog.Append(logging.TaskCompleted, map[string]interface{}{
|
|
||||||
"task_id": task.Number,
|
|
||||||
"pr_url": pr.GetHTMLURL(),
|
|
||||||
"pr_number": pr.GetNumber(),
|
|
||||||
})
|
|
||||||
|
|
||||||
// Report completion to Hive
|
|
||||||
if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, task.ProjectID, task.Number, "completed", map[string]interface{}{
|
|
||||||
"pull_request_url": pr.GetHTMLURL(),
|
|
||||||
}); err != nil {
|
|
||||||
fmt.Printf("⚠️ Failed to report task completion to Hive: %v\n", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// requestAssistance publishes a help request to the task-specific topic.
|
|
||||||
func (hi *HiveIntegration) requestAssistance(task *types.EnhancedTask, reason, topic string) {
|
|
||||||
fmt.Printf("🆘 Agent %s is requesting assistance for task #%d: %s\n", hi.config.AgentID, task.Number, reason)
|
|
||||||
hi.hlog.Append(logging.TaskHelpRequested, map[string]interface{}{
|
|
||||||
"task_id": task.Number,
|
|
||||||
"reason": reason,
|
|
||||||
})
|
|
||||||
|
|
||||||
helpRequest := map[string]interface{}{
|
|
||||||
"issue_id": task.Number,
|
|
||||||
"repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository),
|
|
||||||
"reason": reason,
|
|
||||||
}
|
|
||||||
|
|
||||||
hi.pubsub.PublishToDynamicTopic(topic, pubsub.TaskHelpRequest, helpRequest)
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleMetaDiscussion handles all incoming messages from dynamic and static topics.
|
|
||||||
func (hi *HiveIntegration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
|
|
||||||
switch msg.Type {
|
|
||||||
case pubsub.TaskHelpRequest:
|
|
||||||
hi.handleHelpRequest(msg, from)
|
|
||||||
case pubsub.TaskHelpResponse:
|
|
||||||
hi.handleHelpResponse(msg, from)
|
|
||||||
default:
|
|
||||||
// Handle other meta-discussion messages (e.g., peer feedback)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleHelpRequest is called when another agent requests assistance.
|
|
||||||
func (hi *HiveIntegration) handleHelpRequest(msg pubsub.Message, from peer.ID) {
|
|
||||||
issueID, _ := msg.Data["issue_id"].(float64)
|
|
||||||
reason, _ := msg.Data["reason"].(string)
|
|
||||||
fmt.Printf("🙋 Received help request for task #%d from %s: %s\n", int(issueID), from.ShortString(), reason)
|
|
||||||
|
|
||||||
// Simple logic: if we are not busy, we can help.
|
|
||||||
// A more advanced agent would check its capabilities against the reason.
|
|
||||||
canHelp := true // Placeholder for more complex logic
|
|
||||||
|
|
||||||
if canHelp {
|
|
||||||
fmt.Printf("✅ Agent %s can help with task #%d\n", hi.config.AgentID, int(issueID))
|
|
||||||
hi.hlog.Append(logging.TaskHelpOffered, map[string]interface{}{
|
|
||||||
"task_id": int(issueID),
|
|
||||||
"requester_id": from.ShortString(),
|
|
||||||
})
|
|
||||||
|
|
||||||
response := map[string]interface{}{
|
|
||||||
"issue_id": issueID,
|
|
||||||
"can_help": true,
|
|
||||||
"capabilities": hi.config.Capabilities,
|
|
||||||
}
|
|
||||||
taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", int(issueID))
|
|
||||||
hi.pubsub.PublishToDynamicTopic(taskTopic, pubsub.TaskHelpResponse, response)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// handleHelpResponse is called when an agent receives an offer for help.
|
|
||||||
func (hi *HiveIntegration) handleHelpResponse(msg pubsub.Message, from peer.ID) {
|
|
||||||
issueID, _ := msg.Data["issue_id"].(float64)
|
|
||||||
canHelp, _ := msg.Data["can_help"].(bool)
|
|
||||||
|
|
||||||
if canHelp {
|
|
||||||
fmt.Printf("🤝 Received help offer for task #%d from %s\n", int(issueID), from.ShortString())
|
|
||||||
hi.hlog.Append(logging.TaskHelpReceived, map[string]interface{}{
|
|
||||||
"task_id": int(issueID),
|
|
||||||
"helper_id": from.ShortString(),
|
|
||||||
})
|
|
||||||
// In a full implementation, the agent would now delegate a sub-task
|
|
||||||
// or use the helper's capabilities. For now, we just log it.
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// shouldEscalate determines if a task needs human intervention
|
|
||||||
func (hi *HiveIntegration) shouldEscalate(response string, history []string) bool {
|
|
||||||
// Check for escalation keywords
|
|
||||||
lowerResponse := strings.ToLower(response)
|
|
||||||
keywords := []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"}
|
|
||||||
|
|
||||||
for _, keyword := range keywords {
|
|
||||||
if strings.Contains(lowerResponse, keyword) {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check conversation length
|
|
||||||
if len(history) >= 10 {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
|
||||||
|
|
||||||
// triggerHumanEscalation sends escalation to Hive and N8N
|
|
||||||
func (hi *HiveIntegration) triggerHumanEscalation(projectID int, convo *Conversation, reason string) {
|
|
||||||
hi.hlog.Append(logging.Escalation, map[string]interface{}{
|
|
||||||
"task_id": convo.TaskID,
|
|
||||||
"reason": reason,
|
|
||||||
})
|
|
||||||
|
|
||||||
// Report to Hive system
|
|
||||||
if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, projectID, convo.TaskID, "escalated", map[string]interface{}{
|
|
||||||
"escalation_reason": reason,
|
|
||||||
"conversation_length": len(convo.History),
|
|
||||||
"escalated_by": hi.config.AgentID,
|
|
||||||
}); err != nil {
|
|
||||||
fmt.Printf("⚠️ Failed to report escalation to Hive: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("✅ Task #%d in project %d escalated for human intervention\n", convo.TaskID, projectID)
|
|
||||||
}
|
|
||||||
@@ -1,60 +1,48 @@
|
|||||||
package github
|
package github
|
||||||
|
|
||||||
import (
|
import (
|
||||||
"bytes"
|
|
||||||
"context"
|
"context"
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
|
||||||
"strings"
|
"strings"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"github.com/anthonyrawlins/bzzz/executor"
|
||||||
|
"github.com/anthonyrawlins/bzzz/logging"
|
||||||
|
"github.com/anthonyrawlins/bzzz/pkg/hive"
|
||||||
|
"github.com/anthonyrawlins/bzzz/pkg/types"
|
||||||
"github.com/anthonyrawlins/bzzz/pubsub"
|
"github.com/anthonyrawlins/bzzz/pubsub"
|
||||||
"github.com/anthonyrawlins/bzzz/reasoning"
|
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
// Integration handles dynamic repository discovery via Hive API
|
||||||
// humanEscalationWebhookURL is the N8N webhook for escalating tasks.
|
|
||||||
humanEscalationWebhookURL = "https://n8n.home.deepblack.cloud/webhook-test/human-escalation"
|
|
||||||
// conversationHistoryLimit is the number of messages before auto-escalation.
|
|
||||||
conversationHistoryLimit = 10
|
|
||||||
)
|
|
||||||
|
|
||||||
var escalationKeywords = []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"}
|
|
||||||
|
|
||||||
// Conversation represents the history of a discussion for a task.
|
|
||||||
type Conversation struct {
|
|
||||||
TaskID int
|
|
||||||
TaskTitle string
|
|
||||||
TaskDescription string
|
|
||||||
History []string
|
|
||||||
LastUpdated time.Time
|
|
||||||
IsEscalated bool
|
|
||||||
}
|
|
||||||
|
|
||||||
// Integration handles the integration between GitHub tasks and Bzzz P2P coordination
|
|
||||||
type Integration struct {
|
type Integration struct {
|
||||||
client *Client
|
hiveClient *hive.HiveClient
|
||||||
|
githubToken string
|
||||||
pubsub *pubsub.PubSub
|
pubsub *pubsub.PubSub
|
||||||
ctx context.Context
|
hlog *logging.HypercoreLog
|
||||||
|
ctx context.Context
|
||||||
config *IntegrationConfig
|
config *IntegrationConfig
|
||||||
|
agentConfig *config.AgentConfig
|
||||||
|
|
||||||
activeDiscussions map[int]*Conversation
|
// Repository management
|
||||||
discussionLock sync.RWMutex
|
repositories map[int]*RepositoryClient // projectID -> GitHub client
|
||||||
|
repositoryLock sync.RWMutex
|
||||||
|
|
||||||
|
// Conversation tracking
|
||||||
|
activeDiscussions map[string]*Conversation // "projectID:taskID" -> conversation
|
||||||
|
discussionLock sync.RWMutex
|
||||||
}
|
}
|
||||||
|
|
||||||
// IntegrationConfig holds configuration for GitHub-Bzzz integration
|
// RepositoryClient wraps a GitHub client for a specific repository
|
||||||
type IntegrationConfig struct {
|
type RepositoryClient struct {
|
||||||
PollInterval time.Duration
|
Client *Client
|
||||||
MaxTasks int
|
Repository hive.Repository
|
||||||
AgentID string
|
LastSync time.Time
|
||||||
Capabilities []string
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewIntegration creates a new GitHub-Bzzz integration
|
// NewIntegration creates a new Hive-based GitHub integration
|
||||||
func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, config *IntegrationConfig) *Integration {
|
func NewIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToken string, ps *pubsub.PubSub, hlog *logging.HypercoreLog, config *IntegrationConfig, agentConfig *config.AgentConfig) *Integration {
|
||||||
if config.PollInterval == 0 {
|
if config.PollInterval == 0 {
|
||||||
config.PollInterval = 30 * time.Second
|
config.PollInterval = 30 * time.Second
|
||||||
}
|
}
|
||||||
@@ -63,234 +51,215 @@ func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, conf
|
|||||||
}
|
}
|
||||||
|
|
||||||
return &Integration{
|
return &Integration{
|
||||||
client: client,
|
hiveClient: hiveClient,
|
||||||
|
githubToken: githubToken,
|
||||||
pubsub: ps,
|
pubsub: ps,
|
||||||
|
hlog: hlog,
|
||||||
ctx: ctx,
|
ctx: ctx,
|
||||||
config: config,
|
config: config,
|
||||||
activeDiscussions: make(map[int]*Conversation),
|
agentConfig: agentConfig,
|
||||||
|
repositories: make(map[int]*RepositoryClient),
|
||||||
|
activeDiscussions: make(map[string]*Conversation),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Start begins the GitHub-Bzzz integration
|
// Start begins the Hive-GitHub integration
|
||||||
func (i *Integration) Start() {
|
func (hi *Integration) Start() {
|
||||||
fmt.Printf("🔗 Starting GitHub-Bzzz integration for agent: %s\n", i.config.AgentID)
|
fmt.Printf("🔗 Starting Hive-GitHub integration for agent: %s\n", hi.config.AgentID)
|
||||||
i.pubsub.SetAntennaeMessageHandler(i.handleMetaDiscussion)
|
|
||||||
go i.pollForTasks()
|
// Register the handler for incoming meta-discussion messages
|
||||||
|
hi.pubsub.SetAntennaeMessageHandler(hi.handleMetaDiscussion)
|
||||||
|
|
||||||
|
// Start repository discovery and task polling
|
||||||
|
go hi.repositoryDiscoveryLoop()
|
||||||
|
go hi.taskPollingLoop()
|
||||||
}
|
}
|
||||||
|
|
||||||
// pollForTasks periodically checks GitHub for available tasks
|
// repositoryDiscoveryLoop periodically discovers active repositories from Hive
|
||||||
func (i *Integration) pollForTasks() {
|
func (hi *Integration) repositoryDiscoveryLoop() {
|
||||||
ticker := time.NewTicker(i.config.PollInterval)
|
ticker := time.NewTicker(5 * time.Minute) // Check for new repositories every 5 minutes
|
||||||
defer ticker.Stop()
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
// Initial discovery
|
||||||
|
hi.syncRepositories()
|
||||||
|
|
||||||
for {
|
for {
|
||||||
select {
|
select {
|
||||||
case <-i.ctx.Done():
|
case <-hi.ctx.Done():
|
||||||
return
|
return
|
||||||
case <-ticker.C:
|
case <-ticker.C:
|
||||||
if err := i.checkAndClaimTasks(); err != nil {
|
hi.syncRepositories()
|
||||||
fmt.Printf("❌ Error checking tasks: %v\n", err)
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// syncRepositories synchronizes the list of active repositories from Hive
|
||||||
|
func (hi *Integration) syncRepositories() {
|
||||||
|
repositories, err := hi.hiveClient.GetActiveRepositories(hi.ctx)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("❌ Failed to get active repositories: %v\n", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
hi.repositoryLock.Lock()
|
||||||
|
defer hi.repositoryLock.Unlock()
|
||||||
|
|
||||||
|
// Track which repositories we've seen
|
||||||
|
currentRepos := make(map[int]bool)
|
||||||
|
|
||||||
|
for _, repo := range repositories {
|
||||||
|
currentRepos[repo.ProjectID] = true
|
||||||
|
|
||||||
|
// Check if we already have a client for this repository
|
||||||
|
if _, exists := hi.repositories[repo.ProjectID]; !exists {
|
||||||
|
// Create new GitHub client for this repository
|
||||||
|
githubConfig := &Config{
|
||||||
|
AccessToken: hi.githubToken,
|
||||||
|
Owner: repo.Owner,
|
||||||
|
Repository: repo.Repository,
|
||||||
|
BaseBranch: repo.Branch,
|
||||||
}
|
}
|
||||||
|
|
||||||
|
client, err := NewClient(hi.ctx, githubConfig)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("❌ Failed to create GitHub client for %s/%s: %v\n", repo.Owner, repo.Repository, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
hi.repositories[repo.ProjectID] = &RepositoryClient{
|
||||||
|
Client: client,
|
||||||
|
Repository: repo,
|
||||||
|
LastSync: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("✅ Added repository: %s/%s (Project ID: %d)\n", repo.Owner, repo.Repository, repo.ProjectID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Remove repositories that are no longer active
|
||||||
|
for projectID := range hi.repositories {
|
||||||
|
if !currentRepos[projectID] {
|
||||||
|
delete(hi.repositories, projectID)
|
||||||
|
fmt.Printf("🗑️ Removed inactive repository (Project ID: %d)\n", projectID)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("📊 Repository sync complete: %d active repositories\n", len(hi.repositories))
|
||||||
|
}
|
||||||
|
|
||||||
|
// taskPollingLoop periodically polls all repositories for available tasks
|
||||||
|
func (hi *Integration) taskPollingLoop() {
|
||||||
|
ticker := time.NewTicker(hi.config.PollInterval)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-hi.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
hi.pollAllRepositories()
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// checkAndClaimTasks looks for available tasks and claims suitable ones
|
// pollAllRepositories checks all active repositories for available tasks
|
||||||
func (i *Integration) checkAndClaimTasks() error {
|
func (hi *Integration) pollAllRepositories() {
|
||||||
tasks, err := i.client.ListAvailableTasks()
|
hi.repositoryLock.RLock()
|
||||||
if err != nil {
|
repositories := make([]*RepositoryClient, 0, len(hi.repositories))
|
||||||
return fmt.Errorf("failed to list tasks: %w", err)
|
for _, repo := range hi.repositories {
|
||||||
|
repositories = append(repositories, repo)
|
||||||
}
|
}
|
||||||
if len(tasks) == 0 {
|
hi.repositoryLock.RUnlock()
|
||||||
return nil
|
|
||||||
|
if len(repositories) == 0 {
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
suitableTasks := i.filterSuitableTasks(tasks)
|
fmt.Printf("🔍 Polling %d repositories for available tasks...\n", len(repositories))
|
||||||
|
|
||||||
|
var allTasks []*types.EnhancedTask
|
||||||
|
|
||||||
|
// Collect tasks from all repositories
|
||||||
|
for _, repoClient := range repositories {
|
||||||
|
tasks, err := hi.getRepositoryTasks(repoClient)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("❌ Failed to get tasks from %s/%s: %v\n",
|
||||||
|
repoClient.Repository.Owner, repoClient.Repository.Repository, err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
allTasks = append(allTasks, tasks...)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(allTasks) == 0 {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("📋 Found %d total available tasks across all repositories\n", len(allTasks))
|
||||||
|
|
||||||
|
// Apply filtering and selection
|
||||||
|
suitableTasks := hi.filterSuitableTasks(allTasks)
|
||||||
if len(suitableTasks) == 0 {
|
if len(suitableTasks) == 0 {
|
||||||
return nil
|
fmt.Printf("⚠️ No suitable tasks for agent capabilities: %v\n", hi.config.Capabilities)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Select and claim the highest priority task
|
||||||
task := suitableTasks[0]
|
task := suitableTasks[0]
|
||||||
claimedTask, err := i.client.ClaimTask(task.Number, i.config.AgentID)
|
hi.claimAndExecuteTask(task)
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to claim task %d: %w", task.Number, err)
|
|
||||||
}
|
|
||||||
fmt.Printf("✋ Claimed task #%d: %s\n", claimedTask.Number, claimedTask.Title)
|
|
||||||
|
|
||||||
go i.executeTask(claimedTask)
|
|
||||||
return nil
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// executeTask starts the task by generating and proposing a plan.
|
// getRepositoryTasks fetches available tasks from a specific repository
|
||||||
func (i *Integration) executeTask(task *Task) {
|
func (hi *Integration) getRepositoryTasks(repoClient *RepositoryClient) ([]*types.EnhancedTask, error) {
|
||||||
fmt.Printf("🚀 Starting execution of task #%d: %s\n", task.Number, task.Title)
|
// Get tasks from GitHub
|
||||||
|
githubTasks, err := repoClient.Client.ListAvailableTasks()
|
||||||
prompt := fmt.Sprintf("You are an expert AI developer. Based on the following GitHub issue, create a concise, step-by-step plan to resolve it. Issue Title: %s. Issue Body: %s.", task.Title, task.Description)
|
|
||||||
plan, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt)
|
|
||||||
if err != nil {
|
if err != nil {
|
||||||
fmt.Printf("❌ Failed to generate execution plan for task #%d: %v\n", task.Number, err)
|
return nil, err
|
||||||
return
|
|
||||||
}
|
}
|
||||||
fmt.Printf("📝 Generated Plan for task #%d:\n%s\n", task.Number, plan)
|
|
||||||
|
// Convert to enhanced tasks with project context
|
||||||
i.discussionLock.Lock()
|
var enhancedTasks []*types.EnhancedTask
|
||||||
i.activeDiscussions[task.Number] = &Conversation{
|
for _, task := range githubTasks {
|
||||||
TaskID: task.Number,
|
enhancedTask := &types.EnhancedTask{
|
||||||
TaskTitle: task.Title,
|
ID: task.ID,
|
||||||
TaskDescription: task.Description,
|
Number: task.Number,
|
||||||
History: []string{fmt.Sprintf("Plan by %s: %s", i.config.AgentID, plan)},
|
Title: task.Title,
|
||||||
LastUpdated: time.Now(),
|
Description: task.Description,
|
||||||
}
|
State: task.State,
|
||||||
i.discussionLock.Unlock()
|
Labels: task.Labels,
|
||||||
|
Assignee: task.Assignee,
|
||||||
metaMsg := map[string]interface{}{
|
CreatedAt: task.CreatedAt,
|
||||||
"issue_id": task.Number,
|
UpdatedAt: task.UpdatedAt,
|
||||||
"message": "Here is my proposed plan of action. What are your thoughts?",
|
TaskType: task.TaskType,
|
||||||
"plan": plan,
|
Priority: task.Priority,
|
||||||
}
|
Requirements: task.Requirements,
|
||||||
if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, metaMsg); err != nil {
|
Deliverables: task.Deliverables,
|
||||||
fmt.Printf("⚠️ Failed to publish plan to meta-discussion channel: %v\n", err)
|
Context: task.Context,
|
||||||
}
|
ProjectID: repoClient.Repository.ProjectID,
|
||||||
}
|
GitURL: repoClient.Repository.GitURL,
|
||||||
|
Repository: repoClient.Repository,
|
||||||
// handleMetaDiscussion is the core handler for incoming Antennae messages.
|
|
||||||
func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
|
|
||||||
issueID, ok := msg.Data["issue_id"].(float64)
|
|
||||||
if !ok {
|
|
||||||
return
|
|
||||||
}
|
|
||||||
taskID := int(issueID)
|
|
||||||
|
|
||||||
i.discussionLock.Lock()
|
|
||||||
convo, exists := i.activeDiscussions[taskID]
|
|
||||||
if !exists || convo.IsEscalated {
|
|
||||||
i.discussionLock.Unlock()
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
incomingMessage, _ := msg.Data["message"].(string)
|
|
||||||
convo.History = append(convo.History, fmt.Sprintf("Response from %s: %s", from.ShortString(), incomingMessage))
|
|
||||||
convo.LastUpdated = time.Now()
|
|
||||||
i.discussionLock.Unlock()
|
|
||||||
|
|
||||||
fmt.Printf("🎯 Received peer feedback for task #%d. Reasoning about a response...\n", taskID)
|
|
||||||
|
|
||||||
historyStr := strings.Join(convo.History, "\n")
|
|
||||||
prompt := fmt.Sprintf(
|
|
||||||
"You are an AI developer collaborating on a task. "+
|
|
||||||
"This is the original task: Title: %s, Body: %s. "+
|
|
||||||
"This is the conversation so far:\n%s\n\n"+
|
|
||||||
"Based on the last message, provide a concise and helpful response.",
|
|
||||||
convo.TaskTitle, convo.TaskDescription, historyStr,
|
|
||||||
)
|
|
||||||
|
|
||||||
response, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to generate response for task #%d: %v\n", taskID, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
// Check if the situation requires human intervention
|
|
||||||
if i.shouldEscalate(response, convo.History) {
|
|
||||||
fmt.Printf("🚨 Escalating task #%d for human review.\n", taskID)
|
|
||||||
convo.IsEscalated = true
|
|
||||||
go i.triggerHumanEscalation(convo, response)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("💬 Sending response for task #%d...\n", taskID)
|
|
||||||
responseMsg := map[string]interface{}{
|
|
||||||
"issue_id": taskID,
|
|
||||||
"message": response,
|
|
||||||
}
|
|
||||||
if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, responseMsg); err != nil {
|
|
||||||
fmt.Printf("⚠️ Failed to publish response for task #%d: %v\n", taskID, err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// shouldEscalate determines if a task needs human intervention.
|
|
||||||
func (i *Integration) shouldEscalate(response string, history []string) bool {
|
|
||||||
// Rule 1: Check for keywords in the latest response
|
|
||||||
lowerResponse := strings.ToLower(response)
|
|
||||||
for _, keyword := range escalationKeywords {
|
|
||||||
if strings.Contains(lowerResponse, keyword) {
|
|
||||||
return true
|
|
||||||
}
|
}
|
||||||
|
enhancedTasks = append(enhancedTasks, enhancedTask)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Rule 2: Check if the conversation is too long
|
return enhancedTasks, nil
|
||||||
if len(history) >= conversationHistoryLimit {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
|
|
||||||
return false
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// triggerHumanEscalation sends the conversation details to the N8N webhook.
|
// filterSuitableTasks filters tasks based on agent capabilities
|
||||||
func (i *Integration) triggerHumanEscalation(convo *Conversation, reason string) {
|
func (hi *Integration) filterSuitableTasks(tasks []*types.EnhancedTask) []*types.EnhancedTask {
|
||||||
// 1. Announce the escalation to other agents
|
var suitable []*types.EnhancedTask
|
||||||
escalationMsg := map[string]interface{}{
|
|
||||||
"issue_id": convo.TaskID,
|
|
||||||
"message": "This task has been escalated for human review. No further automated action will be taken.",
|
|
||||||
"reason": reason,
|
|
||||||
}
|
|
||||||
if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, escalationMsg); err != nil {
|
|
||||||
fmt.Printf("⚠️ Failed to publish escalation message for task #%d: %v\n", convo.TaskID, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// 2. Send the payload to the N8N webhook
|
|
||||||
payload := map[string]interface{}{
|
|
||||||
"task_id": convo.TaskID,
|
|
||||||
"task_title": convo.TaskTitle,
|
|
||||||
"escalation_agent": i.config.AgentID,
|
|
||||||
"reason": reason,
|
|
||||||
"history": strings.Join(convo.History, "\n"),
|
|
||||||
}
|
|
||||||
payloadBytes, err := json.Marshal(payload)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to marshal escalation payload for task #%d: %v\n", convo.TaskID, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(i.ctx, "POST", humanEscalationWebhookURL, bytes.NewBuffer(payloadBytes))
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to create escalation request for task #%d: %v\n", convo.TaskID, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
req.Header.Set("Content-Type", "application/json")
|
|
||||||
|
|
||||||
resp, err := http.DefaultClient.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
fmt.Printf("❌ Failed to send escalation webhook for task #%d: %v\n", convo.TaskID, err)
|
|
||||||
return
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode >= 300 {
|
|
||||||
fmt.Printf("⚠️ Human escalation webhook for task #%d returned non-2xx status: %d\n", convo.TaskID, resp.StatusCode)
|
|
||||||
} else {
|
|
||||||
fmt.Printf("✅ Successfully escalated task #%d to human administrator.\n", convo.TaskID)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// filterSuitableTasks filters tasks based on agent capabilities and task labels
|
|
||||||
func (i *Integration) filterSuitableTasks(tasks []*Task) []*Task {
|
|
||||||
var suitable []*Task
|
|
||||||
|
|
||||||
for _, task := range tasks {
|
for _, task := range tasks {
|
||||||
// Check if we can handle this task based on its labels or title keywords
|
if hi.canHandleTaskType(task.TaskType) {
|
||||||
if i.canHandleTask(task) {
|
|
||||||
suitable = append(suitable, task)
|
suitable = append(suitable, task)
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("🔍 Filtered %d suitable tasks from %d total tasks\n", len(suitable), len(tasks))
|
|
||||||
return suitable
|
return suitable
|
||||||
}
|
}
|
||||||
|
|
||||||
// canHandleTaskType checks if this agent can handle the given task type
|
// canHandleTaskType checks if this agent can handle the given task type
|
||||||
func (i *Integration) canHandleTaskType(taskType string) bool {
|
func (hi *Integration) canHandleTaskType(taskType string) bool {
|
||||||
for _, capability := range i.config.Capabilities {
|
for _, capability := range hi.config.Capabilities {
|
||||||
if capability == taskType || capability == "general" || capability == "task-coordination" {
|
if capability == taskType || capability == "general" || capability == "task-coordination" {
|
||||||
return true
|
return true
|
||||||
}
|
}
|
||||||
@@ -298,67 +267,206 @@ func (i *Integration) canHandleTaskType(taskType string) bool {
|
|||||||
return false
|
return false
|
||||||
}
|
}
|
||||||
|
|
||||||
// canHandleTask determines if this agent can handle a specific task
|
// claimAndExecuteTask claims a task and begins execution
|
||||||
func (i *Integration) canHandleTask(task *Task) bool {
|
func (hi *Integration) claimAndExecuteTask(task *types.EnhancedTask) {
|
||||||
// Check task labels for capability matches
|
hi.repositoryLock.RLock()
|
||||||
for _, label := range task.Labels {
|
repoClient, exists := hi.repositories[task.ProjectID]
|
||||||
if i.canHandleTaskType(label) {
|
hi.repositoryLock.RUnlock()
|
||||||
return true
|
|
||||||
}
|
if !exists {
|
||||||
|
fmt.Printf("❌ Repository client not found for project %d\n", task.ProjectID)
|
||||||
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
// Check title/description for keyword matches based on capabilities
|
// Claim the task in GitHub
|
||||||
taskText := strings.ToLower(task.Title + " " + task.Description)
|
_, err := repoClient.Client.ClaimTask(task.Number, hi.config.AgentID)
|
||||||
|
if err != nil {
|
||||||
for _, capability := range i.config.Capabilities {
|
fmt.Printf("❌ Failed to claim task %d in %s/%s: %v\n",
|
||||||
switch capability {
|
task.Number, task.Repository.Owner, task.Repository.Repository, err)
|
||||||
case "code-generation", "coding":
|
return
|
||||||
if strings.Contains(taskText, "code") || strings.Contains(taskText, "implement") ||
|
|
||||||
strings.Contains(taskText, "develop") || strings.Contains(taskText, "write") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case "code-analysis", "review":
|
|
||||||
if strings.Contains(taskText, "review") || strings.Contains(taskText, "analyze") ||
|
|
||||||
strings.Contains(taskText, "audit") || strings.Contains(taskText, "refactor") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case "debugging", "bug-fix":
|
|
||||||
if strings.Contains(taskText, "bug") || strings.Contains(taskText, "fix") ||
|
|
||||||
strings.Contains(taskText, "error") || strings.Contains(taskText, "debug") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case "testing":
|
|
||||||
if strings.Contains(taskText, "test") || strings.Contains(taskText, "spec") ||
|
|
||||||
strings.Contains(taskText, "validation") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case "documentation":
|
|
||||||
if strings.Contains(taskText, "doc") || strings.Contains(taskText, "readme") ||
|
|
||||||
strings.Contains(taskText, "guide") || strings.Contains(taskText, "manual") {
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
case "general", "task-coordination", "meta-discussion":
|
|
||||||
// These capabilities can handle any task
|
|
||||||
return true
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
// If no specific match, check if we have general capabilities
|
fmt.Printf("✋ Claimed task #%d from %s/%s: %s\n",
|
||||||
return i.canHandleTaskType("general")
|
task.Number, task.Repository.Owner, task.Repository.Repository, task.Title)
|
||||||
|
|
||||||
|
// Log the claim
|
||||||
|
hi.hlog.Append(logging.TaskClaimed, map[string]interface{}{
|
||||||
|
"task_id": task.Number,
|
||||||
|
"repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository),
|
||||||
|
"title": task.Title,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Report claim to Hive
|
||||||
|
if err := hi.hiveClient.ClaimTask(hi.ctx, task.ProjectID, task.Number, hi.config.AgentID); err != nil {
|
||||||
|
fmt.Printf("⚠️ Failed to report task claim to Hive: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Start task execution
|
||||||
|
go hi.executeTask(task, repoClient)
|
||||||
}
|
}
|
||||||
|
|
||||||
// announceTaskClaim broadcasts task claim to P2P mesh for coordination
|
// executeTask executes a claimed task with reasoning and coordination
|
||||||
func (i *Integration) announceTaskClaim(task *Task) error {
|
func (hi *Integration) executeTask(task *types.EnhancedTask, repoClient *RepositoryClient) {
|
||||||
claimData := map[string]interface{}{
|
// Define the dynamic topic for this task
|
||||||
"task_id": task.ID,
|
taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", task.Number)
|
||||||
"task_number": task.Number,
|
hi.pubsub.JoinDynamicTopic(taskTopic)
|
||||||
"task_title": task.Title,
|
defer hi.pubsub.LeaveDynamicTopic(taskTopic)
|
||||||
"agent_id": i.config.AgentID,
|
|
||||||
"timestamp": time.Now().Unix(),
|
fmt.Printf("🚀 Starting execution of task #%d in sandbox...\n", task.Number)
|
||||||
"repository": fmt.Sprintf("%s/%s", task.Labels[0], task.Labels[1]), // Assuming owner/repo in labels
|
|
||||||
"action": "claimed",
|
// The executor now handles the entire iterative process.
|
||||||
|
result, err := executor.ExecuteTask(hi.ctx, task, hi.hlog, hi.agentConfig)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("❌ Failed to execute task #%d: %v\n", task.Number, err)
|
||||||
|
hi.hlog.Append(logging.TaskFailed, map[string]interface{}{"task_id": task.Number, "reason": "task execution failed in sandbox"})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Ensure sandbox cleanup happens regardless of PR creation success/failure
|
||||||
|
defer result.Sandbox.DestroySandbox()
|
||||||
|
|
||||||
|
// Create a pull request
|
||||||
|
pr, err := repoClient.Client.CreatePullRequest(task.Number, result.BranchName, hi.config.AgentID)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("❌ Failed to create pull request for task #%d: %v\n", task.Number, err)
|
||||||
|
fmt.Printf("📝 Note: Branch '%s' has been pushed to repository and work is preserved\n", result.BranchName)
|
||||||
|
|
||||||
|
// Escalate PR creation failure to humans via N8N webhook
|
||||||
|
escalationReason := fmt.Sprintf("Failed to create pull request: %v. Task execution completed successfully and work is preserved in branch '%s', but PR creation failed.", err, result.BranchName)
|
||||||
|
hi.requestAssistance(task, escalationReason, fmt.Sprintf("bzzz/meta/issue/%d", task.Number))
|
||||||
|
|
||||||
|
hi.hlog.Append(logging.TaskFailed, map[string]interface{}{
|
||||||
|
"task_id": task.Number,
|
||||||
|
"reason": "failed to create pull request",
|
||||||
|
"branch_name": result.BranchName,
|
||||||
|
"work_preserved": true,
|
||||||
|
"escalated": true,
|
||||||
|
})
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("✅ Successfully created pull request for task #%d: %s\n", task.Number, pr.GetHTMLURL())
|
||||||
|
hi.hlog.Append(logging.TaskCompleted, map[string]interface{}{
|
||||||
|
"task_id": task.Number,
|
||||||
|
"pr_url": pr.GetHTMLURL(),
|
||||||
|
"pr_number": pr.GetNumber(),
|
||||||
|
})
|
||||||
|
|
||||||
|
// Report completion to Hive
|
||||||
|
if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, task.ProjectID, task.Number, "completed", map[string]interface{}{
|
||||||
|
"pull_request_url": pr.GetHTMLURL(),
|
||||||
|
}); err != nil {
|
||||||
|
fmt.Printf("⚠️ Failed to report task completion to Hive: %v\n", err)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// requestAssistance publishes a help request to the task-specific topic.
|
||||||
|
func (hi *Integration) requestAssistance(task *types.EnhancedTask, reason, topic string) {
|
||||||
|
fmt.Printf("🆘 Agent %s is requesting assistance for task #%d: %s\n", hi.config.AgentID, task.Number, reason)
|
||||||
|
hi.hlog.Append(logging.TaskHelpRequested, map[string]interface{}{
|
||||||
|
"task_id": task.Number,
|
||||||
|
"reason": reason,
|
||||||
|
})
|
||||||
|
|
||||||
|
helpRequest := map[string]interface{}{
|
||||||
|
"issue_id": task.Number,
|
||||||
|
"repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository),
|
||||||
|
"reason": reason,
|
||||||
|
}
|
||||||
|
|
||||||
|
hi.pubsub.PublishToDynamicTopic(topic, pubsub.TaskHelpRequest, helpRequest)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleMetaDiscussion handles all incoming messages from dynamic and static topics.
|
||||||
|
func (hi *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
|
||||||
|
switch msg.Type {
|
||||||
|
case pubsub.TaskHelpRequest:
|
||||||
|
hi.handleHelpRequest(msg, from)
|
||||||
|
case pubsub.TaskHelpResponse:
|
||||||
|
hi.handleHelpResponse(msg, from)
|
||||||
|
default:
|
||||||
|
// Handle other meta-discussion messages (e.g., peer feedback)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleHelpRequest is called when another agent requests assistance.
|
||||||
|
func (hi *Integration) handleHelpRequest(msg pubsub.Message, from peer.ID) {
|
||||||
|
issueID, _ := msg.Data["issue_id"].(float64)
|
||||||
|
reason, _ := msg.Data["reason"].(string)
|
||||||
|
fmt.Printf("🙋 Received help request for task #%d from %s: %s\n", int(issueID), from.ShortString(), reason)
|
||||||
|
|
||||||
|
// Simple logic: if we are not busy, we can help.
|
||||||
|
// TODO: A more advanced agent would check its capabilities against the reason.
|
||||||
|
canHelp := true // Placeholder for more complex logic
|
||||||
|
|
||||||
|
if canHelp {
|
||||||
|
fmt.Printf("✅ Agent %s can help with task #%d\n", hi.config.AgentID, int(issueID))
|
||||||
|
hi.hlog.Append(logging.TaskHelpOffered, map[string]interface{}{
|
||||||
|
"task_id": int(issueID),
|
||||||
|
"requester_id": from.ShortString(),
|
||||||
|
})
|
||||||
|
|
||||||
|
response := map[string]interface{}{
|
||||||
|
"issue_id": issueID,
|
||||||
|
"can_help": true,
|
||||||
|
"capabilities": hi.config.Capabilities,
|
||||||
|
}
|
||||||
|
taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", int(issueID))
|
||||||
|
hi.pubsub.PublishToDynamicTopic(taskTopic, pubsub.TaskHelpResponse, response)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleHelpResponse is called when an agent receives an offer for help.
|
||||||
|
func (hi *Integration) handleHelpResponse(msg pubsub.Message, from peer.ID) {
|
||||||
|
issueID, _ := msg.Data["issue_id"].(float64)
|
||||||
|
canHelp, _ := msg.Data["can_help"].(bool)
|
||||||
|
|
||||||
|
if canHelp {
|
||||||
|
fmt.Printf("🤝 Received help offer for task #%d from %s\n", int(issueID), from.ShortString())
|
||||||
|
hi.hlog.Append(logging.TaskHelpReceived, map[string]interface{}{
|
||||||
|
"task_id": int(issueID),
|
||||||
|
"helper_id": from.ShortString(),
|
||||||
|
})
|
||||||
|
// In a full implementation, the agent would now delegate a sub-task
|
||||||
|
// or use the helper's capabilities. For now, we just log it.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// shouldEscalate determines if a task needs human intervention
|
||||||
|
func (hi *Integration) shouldEscalate(response string, history []string) bool {
|
||||||
|
// Check for escalation keywords
|
||||||
|
lowerResponse := strings.ToLower(response)
|
||||||
|
keywords := []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"}
|
||||||
|
|
||||||
|
for _, keyword := range keywords {
|
||||||
|
if strings.Contains(lowerResponse, keyword) {
|
||||||
|
return true
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
fmt.Printf("📢 Announcing task claim to P2P mesh: Task #%d\n", task.Number)
|
// Check conversation length
|
||||||
return i.pubsub.PublishBzzzMessage(pubsub.TaskClaim, claimData)
|
if len(history) >= 10 {
|
||||||
}
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
|
||||||
|
// triggerHumanEscalation sends escalation to Hive and N8N
|
||||||
|
func (hi *Integration) triggerHumanEscalation(projectID int, convo *Conversation, reason string) {
|
||||||
|
hi.hlog.Append(logging.Escalation, map[string]interface{}{
|
||||||
|
"task_id": convo.TaskID,
|
||||||
|
"reason": reason,
|
||||||
|
})
|
||||||
|
|
||||||
|
// Report to Hive system
|
||||||
|
if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, projectID, convo.TaskID, "escalated", map[string]interface{}{
|
||||||
|
"escalation_reason": reason,
|
||||||
|
"conversation_length": len(convo.History),
|
||||||
|
"escalated_by": hi.config.AgentID,
|
||||||
|
}); err != nil {
|
||||||
|
fmt.Printf("⚠️ Failed to report escalation to Hive: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("✅ Task #%d in project %d escalated for human intervention\n", convo.TaskID, projectID)
|
||||||
|
}
|
||||||
|
|||||||
6
main.go
6
main.go
@@ -141,7 +141,7 @@ func main() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// Initialize dynamic GitHub integration
|
// Initialize dynamic GitHub integration
|
||||||
var ghIntegration *github.HiveIntegration
|
var ghIntegration *github.Integration
|
||||||
if githubToken != "" {
|
if githubToken != "" {
|
||||||
// Use agent ID from config (auto-generated from node ID)
|
// Use agent ID from config (auto-generated from node ID)
|
||||||
agentID := cfg.Agent.ID
|
agentID := cfg.Agent.ID
|
||||||
@@ -156,7 +156,7 @@ func main() {
|
|||||||
MaxTasks: cfg.Agent.MaxTasks,
|
MaxTasks: cfg.Agent.MaxTasks,
|
||||||
}
|
}
|
||||||
|
|
||||||
ghIntegration = github.NewHiveIntegration(ctx, hiveClient, githubToken, ps, hlog, integrationConfig)
|
ghIntegration = github.NewIntegration(ctx, hiveClient, githubToken, ps, hlog, integrationConfig, &cfg.Agent)
|
||||||
|
|
||||||
// Start the integration service
|
// Start the integration service
|
||||||
ghIntegration.Start()
|
ghIntegration.Start()
|
||||||
@@ -339,7 +339,7 @@ func announceCapabilitiesOnChange(ps *pubsub.PubSub, nodeID string, cfg *config.
|
|||||||
cfg.Agent.Models = validModels
|
cfg.Agent.Models = validModels
|
||||||
|
|
||||||
// Configure reasoning module with available models and webhook
|
// Configure reasoning module with available models and webhook
|
||||||
reasoning.SetModelConfig(validModels, cfg.Agent.ModelSelectionWebhook)
|
reasoning.SetModelConfig(validModels, cfg.Agent.ModelSelectionWebhook, cfg.Agent.DefaultReasoningModel)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Get current capabilities
|
// Get current capabilities
|
||||||
|
|||||||
@@ -36,6 +36,8 @@ type AgentConfig struct {
|
|||||||
Models []string `yaml:"models"`
|
Models []string `yaml:"models"`
|
||||||
Specialization string `yaml:"specialization"`
|
Specialization string `yaml:"specialization"`
|
||||||
ModelSelectionWebhook string `yaml:"model_selection_webhook"`
|
ModelSelectionWebhook string `yaml:"model_selection_webhook"`
|
||||||
|
DefaultReasoningModel string `yaml:"default_reasoning_model"`
|
||||||
|
SandboxImage string `yaml:"sandbox_image"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// GitHubConfig holds GitHub integration settings
|
// GitHubConfig holds GitHub integration settings
|
||||||
@@ -44,6 +46,7 @@ type GitHubConfig struct {
|
|||||||
UserAgent string `yaml:"user_agent"`
|
UserAgent string `yaml:"user_agent"`
|
||||||
Timeout time.Duration `yaml:"timeout"`
|
Timeout time.Duration `yaml:"timeout"`
|
||||||
RateLimit bool `yaml:"rate_limit"`
|
RateLimit bool `yaml:"rate_limit"`
|
||||||
|
Assignee string `yaml:"assignee"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// P2PConfig holds P2P networking configuration
|
// P2PConfig holds P2P networking configuration
|
||||||
@@ -107,12 +110,15 @@ func getDefaultConfig() *Config {
|
|||||||
Models: []string{"phi3", "llama3.1"},
|
Models: []string{"phi3", "llama3.1"},
|
||||||
Specialization: "general_developer",
|
Specialization: "general_developer",
|
||||||
ModelSelectionWebhook: "https://n8n.home.deepblack.cloud/webhook/model-selection",
|
ModelSelectionWebhook: "https://n8n.home.deepblack.cloud/webhook/model-selection",
|
||||||
|
DefaultReasoningModel: "phi3",
|
||||||
|
SandboxImage: "registry.home.deepblack.cloud/tony/bzzz-sandbox:latest",
|
||||||
},
|
},
|
||||||
GitHub: GitHubConfig{
|
GitHub: GitHubConfig{
|
||||||
TokenFile: "/home/tony/AI/secrets/passwords_and_tokens/gh-token",
|
TokenFile: "/home/tony/AI/secrets/passwords_and_tokens/gh-token",
|
||||||
UserAgent: "Bzzz-P2P-Agent/1.0",
|
UserAgent: "Bzzz-P2P-Agent/1.0",
|
||||||
Timeout: 30 * time.Second,
|
Timeout: 30 * time.Second,
|
||||||
RateLimit: true,
|
RateLimit: true,
|
||||||
|
Assignee: "anthonyrawlins",
|
||||||
},
|
},
|
||||||
P2P: P2PConfig{
|
P2P: P2PConfig{
|
||||||
ServiceTag: "bzzz-peer-discovery",
|
ServiceTag: "bzzz-peer-discovery",
|
||||||
|
|||||||
@@ -18,6 +18,7 @@ const (
|
|||||||
var (
|
var (
|
||||||
availableModels []string
|
availableModels []string
|
||||||
modelWebhookURL string
|
modelWebhookURL string
|
||||||
|
defaultModel string
|
||||||
)
|
)
|
||||||
|
|
||||||
// OllamaRequest represents the request payload for the Ollama API.
|
// OllamaRequest represents the request payload for the Ollama API.
|
||||||
@@ -84,9 +85,10 @@ func GenerateResponse(ctx context.Context, model, prompt string) (string, error)
|
|||||||
}
|
}
|
||||||
|
|
||||||
// SetModelConfig configures the available models and webhook URL for smart model selection
|
// SetModelConfig configures the available models and webhook URL for smart model selection
|
||||||
func SetModelConfig(models []string, webhookURL string) {
|
func SetModelConfig(models []string, webhookURL, defaultReasoningModel string) {
|
||||||
availableModels = models
|
availableModels = models
|
||||||
modelWebhookURL = webhookURL
|
modelWebhookURL = webhookURL
|
||||||
|
defaultModel = defaultReasoningModel
|
||||||
}
|
}
|
||||||
|
|
||||||
// selectBestModel calls the model selection webhook to choose the best model for a prompt
|
// selectBestModel calls the model selection webhook to choose the best model for a prompt
|
||||||
@@ -96,7 +98,7 @@ func selectBestModel(availableModels []string, prompt string) string {
|
|||||||
if len(availableModels) > 0 {
|
if len(availableModels) > 0 {
|
||||||
return availableModels[0]
|
return availableModels[0]
|
||||||
}
|
}
|
||||||
return "phi3" // Last resort fallback
|
return defaultModel // Last resort fallback
|
||||||
}
|
}
|
||||||
|
|
||||||
requestPayload := map[string]interface{}{
|
requestPayload := map[string]interface{}{
|
||||||
|
|||||||
@@ -15,11 +15,6 @@ import (
|
|||||||
"github.com/docker/docker/pkg/stdcopy"
|
"github.com/docker/docker/pkg/stdcopy"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
|
||||||
// DefaultDockerImage is the image used if a task does not specify one.
|
|
||||||
DefaultDockerImage = "registry.home.deepblack.cloud/tony/bzzz-sandbox:latest"
|
|
||||||
)
|
|
||||||
|
|
||||||
// Sandbox represents a stateful, isolated execution environment for a single task.
|
// Sandbox represents a stateful, isolated execution environment for a single task.
|
||||||
type Sandbox struct {
|
type Sandbox struct {
|
||||||
ID string // The ID of the running container.
|
ID string // The ID of the running container.
|
||||||
@@ -37,9 +32,9 @@ type CommandResult struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
// CreateSandbox provisions a new Docker container for a task.
|
// CreateSandbox provisions a new Docker container for a task.
|
||||||
func CreateSandbox(ctx context.Context, taskImage string) (*Sandbox, error) {
|
func CreateSandbox(ctx context.Context, taskImage string, agentConfig *config.AgentConfig) (*Sandbox, error) {
|
||||||
if taskImage == "" {
|
if taskImage == "" {
|
||||||
taskImage = DefaultDockerImage
|
taskImage = agentConfig.SandboxImage
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create a new Docker client
|
// Create a new Docker client
|
||||||
|
|||||||
Reference in New Issue
Block a user