From 054fb67767d03a24732edcc35a57188696a0ffbe Mon Sep 17 00:00:00 2001 From: anthonyrawlins Date: Mon, 14 Jul 2025 20:26:24 +1000 Subject: [PATCH] Fix Go module imports and add dynamic Ollama model selection with N8N integration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit - Fixed module path from github.com/deepblackcloud/bzzz to github.com/anthonyrawlins/bzzz - Added dynamic Ollama model detection via /api/tags endpoint - Implemented intelligent model selection through N8N webhook integration - Added BZZZ_MODEL_SELECTION_WEBHOOK environment variable support - Fixed GitHub assignee issue by using valid username instead of peer ID - Added comprehensive model fallback mechanisms - Updated all import statements across the codebase - Removed duplicate systemd service file - Added sandbox execution environment and type definitions 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude --- DEVELOPMENT_PLAN.md | 2 +- Dockerfile.sandbox | 23 ++ bzzz.service | 2 + cmd/test_coordination.go | 10 +- cmd/test_runner.go | 8 +- executor/executor.go | 106 +++++++++ github/client.go | 43 +++- github/hive_integration.go | 284 +++++++++++++----------- github/integration.go | 4 +- go.mod | 27 ++- go.sum | 42 ++++ logging/hypercore.go | 15 +- main.go | 139 +++++++++++- monitoring/antennae_monitor.go | 2 +- pkg/config/config.go | 27 ++- pkg/coordination/dependency_detector.go | 2 +- pkg/coordination/meta_coordinator.go | 4 +- pkg/types/task.go | 35 +++ pubsub/pubsub.go | 143 ++++++++++-- reasoning/reasoning.go | 70 ++++++ sandbox/sandbox.go | 213 ++++++++++++++++++ systemd/bzzz-agent.service | 38 ---- test/antennae_test.go | 4 +- test/task_simulator.go | 4 +- 24 files changed, 1013 insertions(+), 234 deletions(-) create mode 100644 Dockerfile.sandbox create mode 100644 executor/executor.go create mode 100644 pkg/types/task.go create mode 100644 sandbox/sandbox.go delete mode 100644 systemd/bzzz-agent.service diff --git a/DEVELOPMENT_PLAN.md b/DEVELOPMENT_PLAN.md index 5e976cc4..0621409e 100644 --- a/DEVELOPMENT_PLAN.md +++ b/DEVELOPMENT_PLAN.md @@ -64,7 +64,7 @@ Based on comprehensive analysis of the existing Hive infrastructure and Bzzz's P # Docker Compose configuration for bzzz-agent services: bzzz-agent: - image: bzzz-agent:latest + image: registry.home.deepblack.cloud/tony/bzzz-agent:latest network_mode: "host" # Direct host network access for P2P volumes: - ./data:/app/data diff --git a/Dockerfile.sandbox b/Dockerfile.sandbox new file mode 100644 index 00000000..40ddf666 --- /dev/null +++ b/Dockerfile.sandbox @@ -0,0 +1,23 @@ +# Use a standard Go development image as the base +FROM golang:1.21 + +# Install common development tools and security updates +RUN apt-get update && apt-get install -y \ + build-essential \ + git \ + curl \ + tree \ + && rm -rf /var/lib/apt/lists/* + +# Create a non-root user for the agent to run as +RUN useradd -ms /bin/bash agent + +# Set the working directory for the agent +WORKDIR /home/agent/work + +# Switch to the non-root user +USER agent + +# Keep the container alive +CMD ["sleep", "infinity"] + diff --git a/bzzz.service b/bzzz.service index dfba0b21..43f28876 100644 --- a/bzzz.service +++ b/bzzz.service @@ -19,6 +19,8 @@ TimeoutStopSec=30 # Environment variables Environment=HOME=/home/tony Environment=USER=tony +Environment=BZZZ_HIVE_API_URL=https://hive.home.deepblack.cloud +Environment=BZZZ_GITHUB_TOKEN_FILE=/home/tony/AI/secrets/passwords_and_tokens/gh-token # Logging StandardOutput=journal diff --git a/cmd/test_coordination.go b/cmd/test_coordination.go index a4c88050..116990eb 100644 --- a/cmd/test_coordination.go +++ b/cmd/test_coordination.go @@ -9,11 +9,11 @@ import ( "syscall" "time" - "github.com/deepblackcloud/bzzz/discovery" - "github.com/deepblackcloud/bzzz/monitoring" - "github.com/deepblackcloud/bzzz/p2p" - "github.com/deepblackcloud/bzzz/pubsub" - "github.com/deepblackcloud/bzzz/test" + "github.com/anthonyrawlins/bzzz/discovery" + "github.com/anthonyrawlins/bzzz/monitoring" + "github.com/anthonyrawlins/bzzz/p2p" + "github.com/anthonyrawlins/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/test" ) func main() { diff --git a/cmd/test_runner.go b/cmd/test_runner.go index ca2c4690..1766deb9 100644 --- a/cmd/test_runner.go +++ b/cmd/test_runner.go @@ -9,10 +9,10 @@ import ( "syscall" "time" - "github.com/deepblackcloud/bzzz/discovery" - "github.com/deepblackcloud/bzzz/p2p" - "github.com/deepblackcloud/bzzz/pubsub" - "github.com/deepblackcloud/bzzz/test" + "github.com/anthonyrawlins/bzzz/discovery" + "github.com/anthonyrawlins/bzzz/p2p" + "github.com/anthonyrawlins/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/test" ) func main() { diff --git a/executor/executor.go b/executor/executor.go new file mode 100644 index 00000000..3c06ee7a --- /dev/null +++ b/executor/executor.go @@ -0,0 +1,106 @@ +package executor + +import ( + "context" + "fmt" + "strings" + + "github.com/anthonyrawlins/bzzz/logging" + "github.com/anthonyrawlins/bzzz/pkg/types" + "github.com/anthonyrawlins/bzzz/reasoning" + "github.com/anthonyrawlins/bzzz/sandbox" +) + +const maxIterations = 10 // Prevents infinite loops + +// ExecuteTask manages the entire lifecycle of a task using a sandboxed environment. +func ExecuteTask(ctx context.Context, task *types.EnhancedTask, hlog *logging.HypercoreLog) (string, error) { + // 1. Create the sandbox environment + sb, err := sandbox.CreateSandbox(ctx, "") // Use default image for now + if err != nil { + return "", fmt.Errorf("failed to create sandbox: %w", err) + } + defer sb.DestroySandbox() + + // 2. Clone the repository inside the sandbox + cloneCmd := fmt.Sprintf("git clone %s .", task.GitURL) + if _, err := sb.RunCommand(cloneCmd); err != nil { + return "", fmt.Errorf("failed to clone repository in sandbox: %w", err) + } + hlog.Append(logging.TaskProgress, map[string]interface{}{"task_id": task.Number, "status": "cloned repo"}) + + // 3. The main iterative development loop + var lastCommandOutput string + for i := 0; i < maxIterations; i++ { + // a. Generate the next command based on the task and previous output + nextCommand, err := generateNextCommand(ctx, task, lastCommandOutput) + if err != nil { + return "", fmt.Errorf("failed to generate next command: %w", err) + } + + hlog.Append(logging.TaskProgress, map[string]interface{}{ + "task_id": task.Number, + "iteration": i, + "command": nextCommand, + }) + + // b. Check for completion command + if strings.HasPrefix(nextCommand, "TASK_COMPLETE") { + fmt.Println("✅ Agent has determined the task is complete.") + break // Exit loop to proceed with PR creation + } + + // c. Execute the command in the sandbox + result, err := sb.RunCommand(nextCommand) + if err != nil { + // Log the error and feed it back to the agent + lastCommandOutput = fmt.Sprintf("Command failed: %v\nStdout: %s\nStderr: %s", err, result.StdOut, result.StdErr) + continue + } + + // d. Store the output for the next iteration + lastCommandOutput = fmt.Sprintf("Stdout: %s\nStderr: %s", result.StdOut, result.StdErr) + } + + // 4. Create a new branch and commit the changes + branchName := fmt.Sprintf("bzzz-task-%d", task.Number) + if _, err := sb.RunCommand(fmt.Sprintf("git checkout -b %s", branchName)); err != nil { + return "", fmt.Errorf("failed to create branch: %w", err) + } + if _, err := sb.RunCommand("git add ."); err != nil { + return "", fmt.Errorf("failed to add files: %w", err) + } + commitCmd := fmt.Sprintf("git commit -m 'feat: resolve task #%d'", task.Number) + if _, err := sb.RunCommand(commitCmd); err != nil { + return "", fmt.Errorf("failed to commit changes: %w", err) + } + + // 5. Push the new branch + if _, err := sb.RunCommand(fmt.Sprintf("git push origin %s", branchName)); err != nil { + return "", fmt.Errorf("failed to push branch: %w", err) + } + + hlog.Append(logging.TaskProgress, map[string]interface{}{"task_id": task.Number, "status": "pushed changes"}) + return branchName, nil +} + +// generateNextCommand uses the LLM to decide the next command to execute. +func generateNextCommand(ctx context.Context, task *types.EnhancedTask, lastOutput string) (string, error) { + prompt := fmt.Sprintf( + "You are an AI developer in a sandboxed shell environment. Your goal is to solve the following GitHub issue:\n\n"+ + "Title: %s\nDescription: %s\n\n"+ + "You can only interact with the system by issuing shell commands. "+ + "The previous command output was:\n---\n%s\n---\n"+ + "Based on this, what is the single next shell command you should run? "+ + "If you believe the task is complete and ready for a pull request, respond with 'TASK_COMPLETE'.", + task.Title, task.Description, lastOutput, + ) + + // Using the main reasoning engine to generate the command + command, err := reasoning.GenerateResponse(ctx, "phi3", prompt) + if err != nil { + return "", err + } + + return strings.TrimSpace(command), nil +} diff --git a/github/client.go b/github/client.go index e0269d36..fc4a71ea 100644 --- a/github/client.go +++ b/github/client.go @@ -156,8 +156,10 @@ func (c *Client) ClaimTask(issueNumber int, agentID string) (*Task, error) { } // Attempt atomic assignment using GitHub's native assignment + // GitHub only accepts existing usernames, so we'll assign to the repo owner + githubAssignee := "anthonyrawlins" issueRequest := &github.IssueRequest{ - Assignee: &agentID, + Assignee: &githubAssignee, } // Add in-progress label @@ -180,6 +182,23 @@ func (c *Client) ClaimTask(issueNumber int, agentID string) (*Task, error) { return nil, fmt.Errorf("failed to claim task: %w", err) } + // Add a comment to track which Bzzz agent claimed this task + claimComment := fmt.Sprintf("🐝 **Task claimed by Bzzz agent:** `%s`\n\nThis task has been automatically claimed by the Bzzz P2P task coordination system.", agentID) + commentRequest := &github.IssueComment{ + Body: &claimComment, + } + _, _, err = c.client.Issues.CreateComment( + c.ctx, + c.config.Owner, + c.config.Repository, + issueNumber, + commentRequest, + ) + if err != nil { + // Log error but don't fail the claim + fmt.Printf("⚠️ Failed to add claim comment: %v\n", err) + } + // Create a task branch if err := c.createTaskBranch(issueNumber, agentID); err != nil { // Log error but don't fail the claim @@ -315,6 +334,28 @@ func (c *Client) createTaskBranch(issueNumber int, agentID string) error { return nil } +// CreatePullRequest creates a new pull request for a completed task. +func (c *Client) CreatePullRequest(issueNumber int, branchName, agentID string) (*github.PullRequest, error) { + title := fmt.Sprintf("fix: resolve issue #%d via bzzz agent %s", issueNumber, agentID) + body := fmt.Sprintf("This pull request resolves issue #%d, and was automatically generated by the Bzzz agent `%s`.", issueNumber, agentID) + head := branchName + base := c.config.BaseBranch + + pr := &github.NewPullRequest{ + Title: &title, + Body: &body, + Head: &head, + Base: &base, + } + + newPR, _, err := c.client.PullRequests.Create(c.ctx, c.config.Owner, c.config.Repository, pr) + if err != nil { + return nil, fmt.Errorf("failed to create pull request: %w", err) + } + + return newPR, nil +} + // formatTaskBody formats task details into GitHub issue body func (c *Client) formatTaskBody(task *Task) string { body := fmt.Sprintf("**Task Type:** %s\n", task.TaskType) diff --git a/github/hive_integration.go b/github/hive_integration.go index ffb8c51f..c5b8839d 100644 --- a/github/hive_integration.go +++ b/github/hive_integration.go @@ -7,27 +7,31 @@ import ( "sync" "time" - "github.com/deepblackcloud/bzzz/pkg/hive" - "github.com/deepblackcloud/bzzz/pubsub" - "github.com/deepblackcloud/bzzz/reasoning" + "github.com/anthonyrawlins/bzzz/executor" + "github.com/anthonyrawlins/bzzz/logging" + "github.com/anthonyrawlins/bzzz/pkg/hive" + "github.com/anthonyrawlins/bzzz/pkg/types" + "github.com/anthonyrawlins/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/reasoning" "github.com/libp2p/go-libp2p/core/peer" ) // HiveIntegration handles dynamic repository discovery via Hive API type HiveIntegration struct { - hiveClient *hive.HiveClient - githubToken string - pubsub *pubsub.PubSub - ctx context.Context - config *IntegrationConfig - + hiveClient *hive.HiveClient + githubToken string + pubsub *pubsub.PubSub + hlog *logging.HypercoreLog + ctx context.Context + config *IntegrationConfig + // Repository management - repositories map[int]*RepositoryClient // projectID -> GitHub client - repositoryLock sync.RWMutex - + repositories map[int]*RepositoryClient // projectID -> GitHub client + repositoryLock sync.RWMutex + // Conversation tracking activeDiscussions map[string]*Conversation // "projectID:taskID" -> conversation - discussionLock sync.RWMutex + discussionLock sync.RWMutex } // RepositoryClient wraps a GitHub client for a specific repository @@ -38,7 +42,7 @@ type RepositoryClient struct { } // NewHiveIntegration creates a new Hive-based GitHub integration -func NewHiveIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToken string, ps *pubsub.PubSub, config *IntegrationConfig) *HiveIntegration { +func NewHiveIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToken string, ps *pubsub.PubSub, hlog *logging.HypercoreLog, config *IntegrationConfig) *HiveIntegration { if config.PollInterval == 0 { config.PollInterval = 30 * time.Second } @@ -50,6 +54,7 @@ func NewHiveIntegration(ctx context.Context, hiveClient *hive.HiveClient, github hiveClient: hiveClient, githubToken: githubToken, pubsub: ps, + hlog: hlog, ctx: ctx, config: config, repositories: make(map[int]*RepositoryClient), @@ -170,7 +175,7 @@ func (hi *HiveIntegration) pollAllRepositories() { fmt.Printf("🔍 Polling %d repositories for available tasks...\n", len(repositories)) - var allTasks []*EnhancedTask + var allTasks []*types.EnhancedTask // Collect tasks from all repositories for _, repoClient := range repositories { @@ -202,7 +207,7 @@ func (hi *HiveIntegration) pollAllRepositories() { } // getRepositoryTasks fetches available tasks from a specific repository -func (hi *HiveIntegration) getRepositoryTasks(repoClient *RepositoryClient) ([]*EnhancedTask, error) { +func (hi *HiveIntegration) getRepositoryTasks(repoClient *RepositoryClient) ([]*types.EnhancedTask, error) { // Get tasks from GitHub githubTasks, err := repoClient.Client.ListAvailableTasks() if err != nil { @@ -210,10 +215,23 @@ func (hi *HiveIntegration) getRepositoryTasks(repoClient *RepositoryClient) ([]* } // Convert to enhanced tasks with project context - var enhancedTasks []*EnhancedTask + var enhancedTasks []*types.EnhancedTask for _, task := range githubTasks { - enhancedTask := &EnhancedTask{ - Task: *task, + enhancedTask := &types.EnhancedTask{ + ID: task.ID, + Number: task.Number, + Title: task.Title, + Description: task.Description, + State: task.State, + Labels: task.Labels, + Assignee: task.Assignee, + CreatedAt: task.CreatedAt, + UpdatedAt: task.UpdatedAt, + TaskType: task.TaskType, + Priority: task.Priority, + Requirements: task.Requirements, + Deliverables: task.Deliverables, + Context: task.Context, ProjectID: repoClient.Repository.ProjectID, GitURL: repoClient.Repository.GitURL, Repository: repoClient.Repository, @@ -221,20 +239,12 @@ func (hi *HiveIntegration) getRepositoryTasks(repoClient *RepositoryClient) ([]* enhancedTasks = append(enhancedTasks, enhancedTask) } - return enhancedTasks, nil -} - -// EnhancedTask extends Task with project context -type EnhancedTask struct { - Task - ProjectID int - GitURL string - Repository hive.Repository + return enhancedTasks, nil } // filterSuitableTasks filters tasks based on agent capabilities -func (hi *HiveIntegration) filterSuitableTasks(tasks []*EnhancedTask) []*EnhancedTask { - var suitable []*EnhancedTask +func (hi *HiveIntegration) filterSuitableTasks(tasks []*types.EnhancedTask) []*types.EnhancedTask { + var suitable []*types.EnhancedTask for _, task := range tasks { if hi.canHandleTaskType(task.TaskType) { @@ -256,7 +266,7 @@ func (hi *HiveIntegration) canHandleTaskType(taskType string) bool { } // claimAndExecuteTask claims a task and begins execution -func (hi *HiveIntegration) claimAndExecuteTask(task *EnhancedTask) { +func (hi *HiveIntegration) claimAndExecuteTask(task *types.EnhancedTask) { hi.repositoryLock.RLock() repoClient, exists := hi.repositories[task.ProjectID] hi.repositoryLock.RUnlock() @@ -267,7 +277,7 @@ func (hi *HiveIntegration) claimAndExecuteTask(task *EnhancedTask) { } // Claim the task in GitHub - claimedTask, err := repoClient.Client.ClaimTask(task.Number, hi.config.AgentID) + _, err := repoClient.Client.ClaimTask(task.Number, hi.config.AgentID) if err != nil { fmt.Printf("❌ Failed to claim task %d in %s/%s: %v\n", task.Number, task.Repository.Owner, task.Repository.Repository, err) @@ -275,8 +285,15 @@ func (hi *HiveIntegration) claimAndExecuteTask(task *EnhancedTask) { } fmt.Printf("✋ Claimed task #%d from %s/%s: %s\n", - claimedTask.Number, task.Repository.Owner, task.Repository.Repository, claimedTask.Title) + task.Number, task.Repository.Owner, task.Repository.Repository, task.Title) + // Log the claim + hi.hlog.Append(logging.TaskClaimed, map[string]interface{}{ + "task_id": task.Number, + "repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository), + "title": task.Title, + }) + // Report claim to Hive if err := hi.hiveClient.ClaimTask(hi.ctx, task.ProjectID, task.Number, hi.config.AgentID); err != nil { fmt.Printf("⚠️ Failed to report task claim to Hive: %v\n", err) @@ -288,113 +305,113 @@ func (hi *HiveIntegration) claimAndExecuteTask(task *EnhancedTask) { // executeTask executes a claimed task with reasoning and coordination func (hi *HiveIntegration) executeTask(task *EnhancedTask, repoClient *RepositoryClient) { - fmt.Printf("🚀 Starting execution of task #%d from %s/%s: %s\n", - task.Number, task.Repository.Owner, task.Repository.Repository, task.Title) - - // Generate execution plan using reasoning - prompt := fmt.Sprintf("You are an expert AI developer working on a distributed task from repository %s/%s. "+ - "Create a concise, step-by-step plan to resolve this GitHub issue. "+ - "Issue Title: %s. Issue Body: %s. Project Context: %s", - task.Repository.Owner, task.Repository.Repository, task.Title, task.Description, task.GitURL) - - plan, err := reasoning.GenerateResponse(hi.ctx, "phi3", prompt) + // Define the dynamic topic for this task + taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", task.Number) + hi.pubsub.JoinDynamicTopic(taskTopic) + defer hi.pubsub.LeaveDynamicTopic(taskTopic) + + fmt.Printf("🚀 Starting execution of task #%d in sandbox...\n", task.Number) + + // The executor now handles the entire iterative process. + branchName, err := executor.ExecuteTask(hi.ctx, task, hi.hlog) if err != nil { - fmt.Printf("❌ Failed to generate execution plan for task #%d: %v\n", task.Number, err) + fmt.Printf("❌ Failed to execute task #%d: %v\n", task.Number, err) + hi.hlog.Append(logging.TaskFailed, map[string]interface{}{"task_id": task.Number, "reason": "task execution failed in sandbox"}) return } - - fmt.Printf("📝 Generated Plan for task #%d:\n%s\n", task.Number, plan) - - // Start meta-discussion - conversationKey := fmt.Sprintf("%d:%d", task.ProjectID, task.Number) - - hi.discussionLock.Lock() - hi.activeDiscussions[conversationKey] = &Conversation{ - TaskID: task.Number, - TaskTitle: task.Title, - TaskDescription: task.Description, - History: []string{fmt.Sprintf("Plan by %s (%s/%s): %s", hi.config.AgentID, task.Repository.Owner, task.Repository.Repository, plan)}, - LastUpdated: time.Now(), + + // Create a pull request + pr, err := repoClient.Client.CreatePullRequest(task.Number, branchName, hi.config.AgentID) + if err != nil { + fmt.Printf("❌ Failed to create pull request for task #%d: %v\n", task.Number, err) + hi.hlog.Append(logging.TaskFailed, map[string]interface{}{"task_id": task.Number, "reason": "failed to create pull request"}) + return } - hi.discussionLock.Unlock() - - // Announce plan for peer review - metaMsg := map[string]interface{}{ - "project_id": task.ProjectID, - "issue_id": task.Number, - "repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository), - "message": "Here is my proposed plan for this cross-repository task. What are your thoughts?", - "plan": plan, - "git_url": task.GitURL, - } - - if err := hi.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, metaMsg); err != nil { - fmt.Printf("⚠️ Failed to publish plan to meta-discussion channel: %v\n", err) + + fmt.Printf("✅ Successfully created pull request for task #%d: %s\n", task.Number, pr.GetHTMLURL()) + hi.hlog.Append(logging.TaskCompleted, map[string]interface{}{ + "task_id": task.Number, + "pr_url": pr.GetHTMLURL(), + "pr_number": pr.GetNumber(), + }) + + // Report completion to Hive + if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, task.ProjectID, task.Number, "completed", map[string]interface{}{ + "pull_request_url": pr.GetHTMLURL(), + }); err != nil { + fmt.Printf("⚠️ Failed to report task completion to Hive: %v\n", err) } } -// handleMetaDiscussion handles incoming meta-discussion messages +// requestAssistance publishes a help request to the task-specific topic. +func (hi *HiveIntegration) requestAssistance(task *EnhancedTask, reason, topic string) { + fmt.Printf("🆘 Agent %s is requesting assistance for task #%d: %s\n", hi.config.AgentID, task.Number, reason) + hi.hlog.Append(logging.TaskHelpRequested, map[string]interface{}{ + "task_id": task.Number, + "reason": reason, + }) + + helpRequest := map[string]interface{}{ + "issue_id": task.Number, + "repository": fmt.Sprintf("%s/%s", task.Repository.Owner, task.Repository.Repository), + "reason": reason, + } + + hi.pubsub.PublishToDynamicTopic(topic, pubsub.TaskHelpRequest, helpRequest) +} + +// handleMetaDiscussion handles all incoming messages from dynamic and static topics. func (hi *HiveIntegration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { - projectID, hasProject := msg.Data["project_id"].(float64) - issueID, hasIssue := msg.Data["issue_id"].(float64) - - if !hasProject || !hasIssue { - return + switch msg.Type { + case pubsub.TaskHelpRequest: + hi.handleHelpRequest(msg, from) + case pubsub.TaskHelpResponse: + hi.handleHelpResponse(msg, from) + default: + // Handle other meta-discussion messages (e.g., peer feedback) } - - conversationKey := fmt.Sprintf("%d:%d", int(projectID), int(issueID)) - - hi.discussionLock.Lock() - convo, exists := hi.activeDiscussions[conversationKey] - if !exists || convo.IsEscalated { - hi.discussionLock.Unlock() - return +} + +// handleHelpRequest is called when another agent requests assistance. +func (hi *HiveIntegration) handleHelpRequest(msg pubsub.Message, from peer.ID) { + issueID, _ := msg.Data["issue_id"].(float64) + reason, _ := msg.Data["reason"].(string) + fmt.Printf("🙋 Received help request for task #%d from %s: %s\n", int(issueID), from.ShortString(), reason) + + // Simple logic: if we are not busy, we can help. + // A more advanced agent would check its capabilities against the reason. + canHelp := true // Placeholder for more complex logic + + if canHelp { + fmt.Printf("✅ Agent %s can help with task #%d\n", hi.config.AgentID, int(issueID)) + hi.hlog.Append(logging.TaskHelpOffered, map[string]interface{}{ + "task_id": int(issueID), + "requester_id": from.ShortString(), + }) + + response := map[string]interface{}{ + "issue_id": issueID, + "can_help": true, + "capabilities": hi.config.Capabilities, + } + taskTopic := fmt.Sprintf("bzzz/meta/issue/%d", int(issueID)) + hi.pubsub.PublishToDynamicTopic(taskTopic, pubsub.TaskHelpResponse, response) } - - incomingMessage, _ := msg.Data["message"].(string) - repository, _ := msg.Data["repository"].(string) - - convo.History = append(convo.History, fmt.Sprintf("Response from %s (%s): %s", from.ShortString(), repository, incomingMessage)) - convo.LastUpdated = time.Now() - hi.discussionLock.Unlock() - - fmt.Printf("🎯 Received peer feedback for task #%d in project %d. Generating response...\n", int(issueID), int(projectID)) - - // Generate intelligent response - historyStr := strings.Join(convo.History, "\n") - prompt := fmt.Sprintf( - "You are an AI developer collaborating on a distributed task across multiple repositories. "+ - "Repository: %s. Task: %s. Description: %s. "+ - "Conversation history:\n%s\n\n"+ - "Based on the last message, provide a concise and helpful response for cross-repository coordination.", - repository, convo.TaskTitle, convo.TaskDescription, historyStr, - ) - - response, err := reasoning.GenerateResponse(hi.ctx, "phi3", prompt) - if err != nil { - fmt.Printf("❌ Failed to generate response for task #%d: %v\n", int(issueID), err) - return - } - - // Check for escalation - if hi.shouldEscalate(response, convo.History) { - fmt.Printf("🚨 Escalating task #%d in project %d for human review.\n", int(issueID), int(projectID)) - convo.IsEscalated = true - go hi.triggerHumanEscalation(int(projectID), convo, response) - return - } - - fmt.Printf("💬 Sending response for task #%d in project %d...\n", int(issueID), int(projectID)) - - responseMsg := map[string]interface{}{ - "project_id": int(projectID), - "issue_id": int(issueID), - "repository": repository, - "message": response, - } - - if err := hi.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, responseMsg); err != nil { - fmt.Printf("⚠️ Failed to publish response: %v\n", err) +} + +// handleHelpResponse is called when an agent receives an offer for help. +func (hi *HiveIntegration) handleHelpResponse(msg pubsub.Message, from peer.ID) { + issueID, _ := msg.Data["issue_id"].(float64) + canHelp, _ := msg.Data["can_help"].(bool) + + if canHelp { + fmt.Printf("🤝 Received help offer for task #%d from %s\n", int(issueID), from.ShortString()) + hi.hlog.Append(logging.TaskHelpReceived, map[string]interface{}{ + "task_id": int(issueID), + "helper_id": from.ShortString(), + }) + // In a full implementation, the agent would now delegate a sub-task + // or use the helper's capabilities. For now, we just log it. } } @@ -420,6 +437,11 @@ func (hi *HiveIntegration) shouldEscalate(response string, history []string) boo // triggerHumanEscalation sends escalation to Hive and N8N func (hi *HiveIntegration) triggerHumanEscalation(projectID int, convo *Conversation, reason string) { + hi.hlog.Append(logging.Escalation, map[string]interface{}{ + "task_id": convo.TaskID, + "reason": reason, + }) + // Report to Hive system if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, projectID, convo.TaskID, "escalated", map[string]interface{}{ "escalation_reason": reason, @@ -430,4 +452,4 @@ func (hi *HiveIntegration) triggerHumanEscalation(projectID int, convo *Conversa } fmt.Printf("✅ Task #%d in project %d escalated for human intervention\n", convo.TaskID, projectID) -} \ No newline at end of file +} diff --git a/github/integration.go b/github/integration.go index 3a780de0..b648ff8d 100644 --- a/github/integration.go +++ b/github/integration.go @@ -10,8 +10,8 @@ import ( "sync" "time" - "github.com/deepblackcloud/bzzz/pubsub" - "github.com/deepblackcloud/bzzz/reasoning" + "github.com/anthonyrawlins/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/reasoning" "github.com/libp2p/go-libp2p/core/peer" ) diff --git a/go.mod b/go.mod index c1b9f2d0..00458ac5 100644 --- a/go.mod +++ b/go.mod @@ -1,6 +1,8 @@ -module github.com/deepblackcloud/bzzz +module github.com/anthonyrawlins/bzzz -go 1.21 +go 1.23.0 + +toolchain go1.24.5 require ( github.com/google/go-github/v57 v57.0.0 @@ -12,17 +14,26 @@ require ( ) require ( + github.com/Microsoft/go-winio v0.4.14 // indirect github.com/benbjohnson/clock v1.3.5 // indirect github.com/beorn7/perks v1.0.1 // indirect github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/containerd/cgroups v1.1.0 // indirect + github.com/containerd/errdefs v1.0.0 // indirect + github.com/containerd/errdefs/pkg v0.3.0 // indirect github.com/coreos/go-systemd/v22 v22.5.0 // indirect github.com/davidlazar/go-crypto v0.0.0-20200604182044-b73af7476f6c // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 // indirect + github.com/distribution/reference v0.6.0 // indirect + github.com/docker/docker v28.3.2+incompatible // indirect + github.com/docker/go-connections v0.5.0 // indirect github.com/docker/go-units v0.5.0 // indirect github.com/elastic/gosigar v0.14.2 // indirect + github.com/felixge/httpsnoop v1.0.4 // indirect github.com/flynn/noise v1.0.0 // indirect github.com/francoispqt/gojay v1.2.13 // indirect + github.com/go-logr/logr v1.4.3 // indirect + github.com/go-logr/stdr v1.2.2 // indirect github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 // indirect github.com/godbus/dbus/v5 v5.1.0 // indirect github.com/gogo/protobuf v1.3.2 // indirect @@ -57,6 +68,7 @@ require ( github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect github.com/mikioh/tcpopt v0.0.0-20190314235656-172688c1accc // indirect github.com/minio/sha256-simd v1.0.1 // indirect + github.com/moby/docker-image-spec v1.3.1 // indirect github.com/mr-tron/base58 v1.2.0 // indirect github.com/multiformats/go-base32 v0.1.0 // indirect github.com/multiformats/go-base36 v0.2.0 // indirect @@ -68,6 +80,8 @@ require ( github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect github.com/onsi/ginkgo/v2 v2.13.0 // indirect + github.com/opencontainers/go-digest v1.0.0 // indirect + github.com/opencontainers/image-spec v1.1.1 // indirect github.com/opencontainers/runtime-spec v1.1.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect github.com/pkg/errors v0.9.1 // indirect @@ -81,6 +95,11 @@ require ( github.com/quic-go/webtransport-go v0.6.0 // indirect github.com/raulk/go-watchdog v1.3.0 // indirect github.com/spaolacci/murmur3 v1.1.0 // indirect + go.opentelemetry.io/auto/sdk v1.1.0 // indirect + go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 // indirect + go.opentelemetry.io/otel v1.37.0 // indirect + go.opentelemetry.io/otel/metric v1.37.0 // indirect + go.opentelemetry.io/otel/trace v1.37.0 // indirect go.uber.org/dig v1.17.1 // indirect go.uber.org/fx v1.20.1 // indirect go.uber.org/mock v0.3.0 // indirect @@ -91,10 +110,10 @@ require ( golang.org/x/mod v0.13.0 // indirect golang.org/x/net v0.19.0 // indirect golang.org/x/sync v0.4.0 // indirect - golang.org/x/sys v0.15.0 // indirect + golang.org/x/sys v0.33.0 // indirect golang.org/x/text v0.14.0 // indirect golang.org/x/tools v0.14.0 // indirect google.golang.org/appengine v1.6.7 // indirect - google.golang.org/protobuf v1.31.0 // indirect + google.golang.org/protobuf v1.34.2 // indirect lukechampine.com/blake3 v1.2.1 // indirect ) diff --git a/go.sum b/go.sum index 7bf996aa..c1264e13 100644 --- a/go.sum +++ b/go.sum @@ -40,6 +40,8 @@ dmitri.shuralyov.com/state v0.0.0-20180228185332-28bcc343414c/go.mod h1:0PRwlb0D git.apache.org/thrift.git v0.0.0-20180902110319-2566ecd5d999/go.mod h1:fPE2ZNJGynbRyZ4dJvy6G277gSllfV2HJqblrnkyeyg= github.com/BurntSushi/toml v0.3.1/go.mod h1:xHWCNGjB5oqiDr8zfno3MHue2Ht5sIBksp03qcyfWMU= github.com/BurntSushi/xgb v0.0.0-20160522181843-27f122750802/go.mod h1:IVnqGOEym/WlBOVXweHU+Q+/VP0lqqI8lqeDx9IjBqo= +github.com/Microsoft/go-winio v0.4.14 h1:+hMXMk01us9KgxGb7ftKQt2Xpf5hH/yky+TDA+qxleU= +github.com/Microsoft/go-winio v0.4.14/go.mod h1:qXqCSQ3Xa7+6tgxaGTIe4Kpcdsi+P8jBhyzoq1bpyYA= github.com/alecthomas/template v0.0.0-20160405071501-a0175ee3bccc/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/template v0.0.0-20190718012654-fb15b899a751/go.mod h1:LOuyumcjzFXgccqObfd/Ljyb9UuFJ6TxHnclSeseNhc= github.com/alecthomas/units v0.0.0-20151022065526-2efee857e7cf/go.mod h1:ybxpYRFXyAe+OPACYpWeL0wqObRcbAqCMya13uyzqw0= @@ -70,6 +72,10 @@ github.com/cncf/udpa/go v0.0.0-20191209042840-269d4d468f6f/go.mod h1:M8M6+tZqaGX github.com/containerd/cgroups v0.0.0-20201119153540-4cbc285b3327/go.mod h1:ZJeTFisyysqgcCdecO57Dj79RfL0LNeGiFUqLYQRYLE= github.com/containerd/cgroups v1.1.0 h1:v8rEWFl6EoqHB+swVNjVoCJE8o3jX7e8nqBGPLaDFBM= github.com/containerd/cgroups v1.1.0/go.mod h1:6ppBcbh/NOOUU+dMKrykgaBnK9lCIBxHqJDGwsa1mIw= +github.com/containerd/errdefs v1.0.0 h1:tg5yIfIlQIrxYtu9ajqY42W3lpS19XqdxRQeEwYG8PI= +github.com/containerd/errdefs v1.0.0/go.mod h1:+YBYIdtsnF4Iw6nWZhJcqGSg/dwvV7tyJ/kCkyJ2k+M= +github.com/containerd/errdefs/pkg v0.3.0 h1:9IKJ06FvyNlexW690DXuQNx2KA2cUJXx151Xdx3ZPPE= +github.com/containerd/errdefs/pkg v0.3.0/go.mod h1:NJw6s9HwNuRhnjJhM7pylWwMyAkmCQvQ4GpJHEqRLVk= github.com/coreos/go-systemd v0.0.0-20181012123002-c6f51f82210d/go.mod h1:F5haX7vjVVG0kc13fIWeqUViNPyEJxv/OmvnBo0Yme4= github.com/coreos/go-systemd/v22 v22.1.0/go.mod h1:xO0FLkIi5MaZafQlIrOotqXZ90ih+1atmu1JpKERPPk= github.com/coreos/go-systemd/v22 v22.5.0 h1:RrqgGjYQKalulkV8NGVIfkXQf6YYmOyiJKk8iXXhfZs= @@ -85,6 +91,12 @@ github.com/decred/dcrd/crypto/blake256 v1.0.1 h1:7PltbUIQB7u/FfZ39+DGa/ShuMyJ5il github.com/decred/dcrd/crypto/blake256 v1.0.1/go.mod h1:2OfgNZ5wDpcsFmHmCK5gZTPcCXqlm2ArzUIkw9czNJo= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0 h1:8UrgZ3GkP4i/CLijOJx79Yu+etlyjdBU4sfcs2WYQMs= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.2.0/go.mod h1:v57UDF4pDQJcEfFUCRop3lJL149eHGSe9Jvczhzjo/0= +github.com/distribution/reference v0.6.0 h1:0IXCQ5g4/QMHHkarYzh5l+u8T3t73zM5QvfrDyIgxBk= +github.com/distribution/reference v0.6.0/go.mod h1:BbU0aIcezP1/5jX/8MP0YiH4SdvB5Y4f/wlDRiLyi3E= +github.com/docker/docker v28.3.2+incompatible h1:wn66NJ6pWB1vBZIilP8G3qQPqHy5XymfYn5vsqeA5oA= +github.com/docker/docker v28.3.2+incompatible/go.mod h1:eEKB0N0r5NX/I1kEveEz05bcu8tLC/8azJZsviup8Sk= +github.com/docker/go-connections v0.5.0 h1:USnMq7hx7gwdVZq1L49hLXaFtUdTADjXGp+uj1Br63c= +github.com/docker/go-connections v0.5.0/go.mod h1:ov60Kzw0kKElRwhNs9UlUHAE/F9Fe6GLaXnqyDdmEXc= github.com/docker/go-units v0.4.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= github.com/docker/go-units v0.5.0 h1:69rxXcBk27SvSaaxTtLh/8llcHD8vYHT7WSdRZ/jvr4= github.com/docker/go-units v0.5.0/go.mod h1:fgPhTUdO+D/Jk86RDLlptpiXQzgHJF7gydDDbaIK4Dk= @@ -96,6 +108,8 @@ github.com/envoyproxy/go-control-plane v0.9.0/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymF github.com/envoyproxy/go-control-plane v0.9.1-0.20191026205805-5f8ba28d4473/go.mod h1:YTl/9mNaCwkRvm6d1a2C3ymFceY/DCBVvsKhRF0iEA4= github.com/envoyproxy/go-control-plane v0.9.4/go.mod h1:6rpuAdCZL397s3pYoYcLgu1mIlRU8Am5FuJP05cCM98= github.com/envoyproxy/protoc-gen-validate v0.1.0/go.mod h1:iSmxcyjqTsJpI2R4NaDN7+kN2VEUnK/pcBlmesArF7c= +github.com/felixge/httpsnoop v1.0.4 h1:NFTV2Zj1bL4mc9sqWACXbQFVBBg2W3GPvqp8/ESS2Wg= +github.com/felixge/httpsnoop v1.0.4/go.mod h1:m8KPJKqk1gH5J9DgRY2ASl2lWCfGKXixSwevea8zH2U= github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc= github.com/flynn/noise v1.0.0 h1:DlTHqmzmvcEiKj+4RYo/imoswx/4r6iBlCMfVtrMXpQ= github.com/flynn/noise v1.0.0/go.mod h1:xbMo+0i6+IGbYdJhF31t2eR1BIU0CYc12+BNAKwUTag= @@ -116,8 +130,13 @@ github.com/go-logfmt/logfmt v0.3.0/go.mod h1:Qt1PoO58o5twSAckw1HlFXLmHsOX5/0LbT9 github.com/go-logfmt/logfmt v0.4.0/go.mod h1:3RMwSq7FuexP4Kalkev3ejPJsZTpXXBr9+V4qmtdjCk= github.com/go-logfmt/logfmt v0.5.0/go.mod h1:wCYkCAKZfumFQihp8CzCvQ3paCTfi41vtzG1KdI/P7A= github.com/go-logfmt/logfmt v0.5.1/go.mod h1:WYhtIu8zTZfxdn5+rREduYbwxfcBr/Vr6KEVveWlfTs= +github.com/go-logr/logr v1.2.2/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= github.com/go-logr/logr v1.2.4 h1:g01GSCwiDw2xSZfjJ2/T9M+S6pFdcNtFYsp+Y43HYDQ= github.com/go-logr/logr v1.2.4/go.mod h1:jdQByPbusPIv2/zmleS9BjJVeZ6kBagPoEUsqbVz/1A= +github.com/go-logr/logr v1.4.3 h1:CjnDlHq8ikf6E492q6eKboGOC0T8CDaOvkHCIg8idEI= +github.com/go-logr/logr v1.4.3/go.mod h1:9T104GzyrTigFIr8wt5mBrctHMim0Nb2HLGrmQ40KvY= +github.com/go-logr/stdr v1.2.2 h1:hSWxHoqTgW2S2qGc0LTAI563KZ5YKYRhT3MFKZMbjag= +github.com/go-logr/stdr v1.2.2/go.mod h1:mMo/vtBO5dYbehREoey6XUKy/eSumjCCveDpRre4VKE= github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572 h1:tfuBGBXKqDEevZMzYi5KSi8KkcZtzBcTgAUUtapy0OI= github.com/go-task/slim-sprig v0.0.0-20230315185526-52ccab3ef572/go.mod h1:9Pwr4B2jHnOSGXyyzV8ROjYa2ojvAY6HCGYYfMoC3Ls= @@ -173,6 +192,7 @@ github.com/google/go-cmp v0.5.4/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/ github.com/google/go-cmp v0.5.5/go.mod h1:v8dTdLbMG2kIc/vJvl+f65V22dbkXbowE6jgT/gNBxE= github.com/google/go-cmp v0.6.0 h1:ofyhxvXcZhMsU5ulbFiLKl/XBFqE1GSq7atu8tAmTRI= github.com/google/go-cmp v0.6.0/go.mod h1:17dUlkBOakJ0+DkrSSNjCkIjxS6bF9zb3elmeNGIjoY= +github.com/google/go-cmp v0.7.0 h1:wk8382ETsv4JYUZwIsn6YpYiWiBsYLSJiTsyBybVuN8= github.com/google/go-github v17.0.0+incompatible/go.mod h1:zLgOLi98H3fifZn+44m+umXrS52loVEgC2AApnigrVQ= github.com/google/go-github/v57 v57.0.0 h1:L+Y3UPTY8ALM8x+TV0lg+IEBI+upibemtBD8Q9u7zHs= github.com/google/go-github/v57 v57.0.0/go.mod h1:s0omdnye0hvK/ecLvpsGfJMiRt85PimQh4oygmLIxHw= @@ -301,6 +321,8 @@ github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8Rv github.com/minio/sha256-simd v0.1.1-0.20190913151208-6de447530771/go.mod h1:B5e1o+1/KgNmWrSQK08Y6Z1Vb5pwIktudl0J58iy0KM= github.com/minio/sha256-simd v1.0.1 h1:6kaan5IFmwTNynnKKpDHe6FWHohJOHhCPchzK49dzMM= github.com/minio/sha256-simd v1.0.1/go.mod h1:Pz6AKMiUdngCLpeTL/RJY1M9rUuPMYujV5xJjtbRSN8= +github.com/moby/docker-image-spec v1.3.1 h1:jMKff3w6PgbfSa69GfNg+zN/XLhfXJGnEx3Nl2EsFP0= +github.com/moby/docker-image-spec v1.3.1/go.mod h1:eKmb5VW8vQEh/BAr2yvVNvuiJuY6UIocYsFu/DxxRpo= github.com/modern-go/concurrent v0.0.0-20180228061459-e0a39a4cb421/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd/go.mod h1:6dJC0mAP4ikYIbvyc7fijjWJddQyLn8Ig3JB5CqoB9Q= github.com/modern-go/reflect2 v0.0.0-20180701023420-4b7aa43c6742/go.mod h1:bx2lNnkwVCuqBIxFjflWJWanXIb3RllmbCylyMrvgv0= @@ -341,6 +363,10 @@ github.com/onsi/ginkgo/v2 v2.13.0 h1:0jY9lJquiL8fcf3M4LAXN5aMlS/b2BV86HFFPCPMgE4 github.com/onsi/ginkgo/v2 v2.13.0/go.mod h1:TE309ZR8s5FsKKpuB1YAQYBzCaAfUgatB/xlT/ETL/o= github.com/onsi/gomega v1.27.10 h1:naR28SdDFlqrG6kScpT8VWpu1xWY5nJRCF3XaYyBjhI= github.com/onsi/gomega v1.27.10/go.mod h1:RsS8tutOdbdgzbPtzzATp12yT7kM5I5aElG3evPbQ0M= +github.com/opencontainers/go-digest v1.0.0 h1:apOUWs51W5PlhuyGyz9FCeeBIOUDA/6nW8Oi/yOhh5U= +github.com/opencontainers/go-digest v1.0.0/go.mod h1:0JzlMkj0TRzQZfJkVvzbP0HBR3IKzErnv2BNG4W4MAM= +github.com/opencontainers/image-spec v1.1.1 h1:y0fUlFfIZhPF1W537XOLg0/fcx6zcHCJwooC2xJA040= +github.com/opencontainers/image-spec v1.1.1/go.mod h1:qpqAh3Dmcf36wStyyWU+kCeDgrGnAve2nCC8+7h8Q0M= github.com/opencontainers/runtime-spec v1.0.2/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= github.com/opencontainers/runtime-spec v1.1.0 h1:HHUyrt9mwHUjtasSbXSMvs4cyFxh+Bll4AjJ9odEGpg= github.com/opencontainers/runtime-spec v1.1.0/go.mod h1:jwyrGlmzljRJv/Fgzds9SsS/C5hL+LL3ko9hs6T5lQ0= @@ -420,6 +446,7 @@ github.com/shurcooL/sanitized_anchor_name v1.0.0/go.mod h1:1NzhyTcUVG4SuEtjjoZeV github.com/shurcooL/users v0.0.0-20180125191416-49c67e49c537/go.mod h1:QJTqeLYEDaXHZDBsXlPCDqdhQuJkuw4NOtaxYe3xii4= github.com/shurcooL/webdavfs v0.0.0-20170829043945-18c3829fa133/go.mod h1:hKmq5kWdCj2z2KEozexVbfEZIWiTjhE0+UjmZgPqehw= github.com/sirupsen/logrus v1.2.0/go.mod h1:LxeOpSwHxABJmUn/MG1IvRgCAasNZTLOkJPxbbu5VWo= +github.com/sirupsen/logrus v1.4.1/go.mod h1:ni0Sbl8bgC9z8RoU9G6nDWqqs/fq4eDPysMBDgk/93Q= github.com/sirupsen/logrus v1.4.2/go.mod h1:tLMulIdttU9McNUspp0xgXVQah82FyeX6MwdIuYE2rE= github.com/sirupsen/logrus v1.6.0/go.mod h1:7uNnSEd1DgxDLC74fIahvMZmmYsHGZGEOFrfsX/uA88= github.com/sirupsen/logrus v1.7.0/go.mod h1:yWOB1SBYBC5VeMP7gHvWumXLIWorT60ONWic61uBYv0= @@ -436,6 +463,7 @@ github.com/stretchr/testify v1.6.1/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/ github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.8.4 h1:CcVxjf3Q8PM0mHUKJCdn+eZZtm5yQwehR5yeSVQQcUk= github.com/stretchr/testify v1.8.4/go.mod h1:sz/lmYIOXD/1dqDmKjjqLyZ2RngseejIcXlSw2iwfAo= +github.com/stretchr/testify v1.10.0 h1:Xv5erBjTwe/5IxqUQTdXv5kgmIvbHo3QQyRwhJsOfJA= github.com/tarm/serial v0.0.0-20180830185346-98f6abe2eb07/go.mod h1:kDXzergiv9cbyO7IOYJZWg1U88JhDg3PB6klq9Hg2pA= github.com/urfave/cli v1.22.2/go.mod h1:Gos4lmkARVdJ6EkW0WaNv/tZAAMe9V7XWyB60NtXRu0= github.com/viant/assertly v0.4.8/go.mod h1:aGifi++jvCrUaklKEKT0BU95igDNaqkvz+49uaYMPRU= @@ -451,6 +479,16 @@ go.opencensus.io v0.22.0/go.mod h1:+kGneAE2xo2IficOXnaByMWTGM9T73dGwxeWcUqIpI8= go.opencensus.io v0.22.2/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.3/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= go.opencensus.io v0.22.4/go.mod h1:yxeiOL68Rb0Xd1ddK5vPZ/oVn4vY4Ynel7k9FzqtOIw= +go.opentelemetry.io/auto/sdk v1.1.0 h1:cH53jehLUN6UFLY71z+NDOiNJqDdPRaXzTel0sJySYA= +go.opentelemetry.io/auto/sdk v1.1.0/go.mod h1:3wSPjt5PWp2RhlCcmmOial7AvC4DQqZb7a7wCow3W8A= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0 h1:Hf9xI/XLML9ElpiHVDNwvqI0hIFlzV8dgIr35kV1kRU= +go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.62.0/go.mod h1:NfchwuyNoMcZ5MLHwPrODwUF1HWCXWrL31s8gSAdIKY= +go.opentelemetry.io/otel v1.37.0 h1:9zhNfelUvx0KBfu/gb+ZgeAfAgtWrfHJZcAqFC228wQ= +go.opentelemetry.io/otel v1.37.0/go.mod h1:ehE/umFRLnuLa/vSccNq9oS1ErUlkkK71gMcN34UG8I= +go.opentelemetry.io/otel/metric v1.37.0 h1:mvwbQS5m0tbmqML4NqK+e3aDiO02vsf/WgbsdpcPoZE= +go.opentelemetry.io/otel/metric v1.37.0/go.mod h1:04wGrZurHYKOc+RKeye86GwKiTb9FKm1WHtO+4EVr2E= +go.opentelemetry.io/otel/trace v1.37.0 h1:HLdcFNbRQBE2imdSEgm/kwqmQj1Or1l/7bW6mxVK7z4= +go.opentelemetry.io/otel/trace v1.37.0/go.mod h1:TlgrlQ+PtQO5XFerSPUYG0JSgGyryXewPGyayAWSBS0= go.uber.org/atomic v1.7.0/go.mod h1:fEN4uk6kAWBTFdckzkM89CLk9XfWZrxpCo0nPH17wJc= go.uber.org/atomic v1.11.0 h1:ZvwS0R+56ePWxUNi+Atn9dWONBPp/AUETXlHW0DxSjE= go.uber.org/atomic v1.11.0/go.mod h1:LUxbIzbOniOlMKjJjyPfpl4v+PKK2cNJn91OQbhoJI0= @@ -642,6 +680,8 @@ golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.15.0 h1:h48lPFYpsTvQJZF4EKyI4aLHaev3CxivZmv7yZig9pc= golang.org/x/sys v0.15.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= +golang.org/x/sys v0.33.0 h1:q3i8TbbEz+JRD9ywIRlyRAQbM0qF7hu24q3teo2hbuw= +golang.org/x/sys v0.33.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k= golang.org/x/term v0.0.0-20201126162022-7de9c90e9dd1/go.mod h1:bj7SfCRtBDWHUb9snDiAeCFNEtKQo2Wmx5Cou7ajbmo= golang.org/x/term v0.0.0-20210927222741-03fcf44c2211/go.mod h1:jbD1KX2456YbFQfuXm/mYQcufACuNUgVhRMnK/tPxf8= golang.org/x/text v0.0.0-20170915032832-14c0d48ead0c/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ= @@ -800,6 +840,8 @@ google.golang.org/protobuf v1.26.0-rc.1/go.mod h1:jlhhOSvTdKEhbULTjvd4ARK9grFBp0 google.golang.org/protobuf v1.26.0/go.mod h1:9q0QmTI4eRPtz6boOQmLYwt+qCgq0jsYwAQnmE0givc= google.golang.org/protobuf v1.31.0 h1:g0LDEJHgrBl9N9r17Ru3sqWhkIx2NB67okBHPwC7hs8= google.golang.org/protobuf v1.31.0/go.mod h1:HV8QOd/L58Z+nl8r43ehVNZIU/HEI6OcFqwMG9pJV4I= +google.golang.org/protobuf v1.34.2 h1:6xV6lTsCfpGD21XK49h7MhtcApnLqkfYgPcdHftf6hg= +google.golang.org/protobuf v1.34.2/go.mod h1:qYOHts0dSfpeUzUFpOMr/WGzszTmLH+DiWniOlNbLDw= gopkg.in/alecthomas/kingpin.v2 v2.2.6/go.mod h1:FMv+mEhP44yOT+4EoQTLFTRgOQ1FBLkstjWtayDeSgw= gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0= diff --git a/logging/hypercore.go b/logging/hypercore.go index b5d05a49..4d0a1f3f 100644 --- a/logging/hypercore.go +++ b/logging/hypercore.go @@ -48,12 +48,15 @@ const ( TaskFailed LogType = "task_failed" // Antennae meta-discussion logs - PlanProposed LogType = "plan_proposed" - ObjectionRaised LogType = "objection_raised" - Collaboration LogType = "collaboration" - ConsensusReached LogType = "consensus_reached" - Escalation LogType = "escalation" - + PlanProposed LogType = "plan_proposed" + ObjectionRaised LogType = "objection_raised" + Collaboration LogType = "collaboration" + ConsensusReached LogType = "consensus_reached" + Escalation LogType = "escalation" + TaskHelpRequested LogType = "task_help_requested" + TaskHelpOffered LogType = "task_help_offered" + TaskHelpReceived LogType = "task_help_received" + // System logs PeerJoined LogType = "peer_joined" PeerLeft LogType = "peer_left" diff --git a/main.go b/main.go index bc3786ca..4df0a05a 100644 --- a/main.go +++ b/main.go @@ -1,10 +1,12 @@ package main import ( + "bytes" "context" "encoding/json" "fmt" "log" + "net/http" "os" "os/signal" "path/filepath" @@ -12,12 +14,14 @@ import ( "syscall" "time" - "github.com/deepblackcloud/bzzz/discovery" - "github.com/deepblackcloud/bzzz/github" - "github.com/deepblackcloud/bzzz/p2p" - "github.com/deepblackcloud/bzzz/pkg/config" - "github.com/deepblackcloud/bzzz/pkg/hive" - "github.com/deepblackcloud/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/discovery" + "github.com/anthonyrawlins/bzzz/github" + "github.com/anthonyrawlins/bzzz/logging" + "github.com/anthonyrawlins/bzzz/p2p" + "github.com/anthonyrawlins/bzzz/pkg/config" + "github.com/anthonyrawlins/bzzz/pkg/hive" + "github.com/anthonyrawlins/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/reasoning" ) // SimpleTaskTracker tracks active tasks for availability reporting @@ -97,6 +101,11 @@ func main() { fmt.Printf(" %s/p2p/%s\n", addr, node.ID()) } + // Initialize Hypercore-style logger + hlog := logging.NewHypercoreLog(node.ID()) + hlog.Append(logging.PeerJoined, map[string]interface{}{"status": "started"}) + fmt.Printf("📝 Hypercore logger initialized\n") + // Initialize mDNS discovery mdnsDiscovery, err := discovery.NewMDNSDiscovery(ctx, node.Host(), "bzzz-peer-discovery") if err != nil { @@ -147,7 +156,7 @@ func main() { MaxTasks: cfg.Agent.MaxTasks, } - ghIntegration = github.NewHiveIntegration(ctx, hiveClient, githubToken, ps, integrationConfig) + ghIntegration = github.NewHiveIntegration(ctx, hiveClient, githubToken, ps, hlog, integrationConfig) // Start the integration service ghIntegration.Start() @@ -215,8 +224,124 @@ func announceAvailability(ps *pubsub.PubSub, nodeID string, taskTracker *SimpleT } } +// detectAvailableOllamaModels queries Ollama API for available models +func detectAvailableOllamaModels() ([]string, error) { + resp, err := http.Get("http://localhost:11434/api/tags") + if err != nil { + return nil, fmt.Errorf("failed to connect to Ollama API: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("Ollama API returned status %d", resp.StatusCode) + } + + var tagsResponse struct { + Models []struct { + Name string `json:"name"` + } `json:"models"` + } + + if err := json.NewDecoder(resp.Body).Decode(&tagsResponse); err != nil { + return nil, fmt.Errorf("failed to decode Ollama response: %w", err) + } + + models := make([]string, 0, len(tagsResponse.Models)) + for _, model := range tagsResponse.Models { + models = append(models, model.Name) + } + + return models, nil +} + +// selectBestModel calls the model selection webhook to choose the best model for a prompt +func selectBestModel(webhookURL string, availableModels []string, prompt string) (string, error) { + if webhookURL == "" || len(availableModels) == 0 { + // Fallback to first available model + if len(availableModels) > 0 { + return availableModels[0], nil + } + return "", fmt.Errorf("no models available") + } + + requestPayload := map[string]interface{}{ + "models": availableModels, + "prompt": prompt, + } + + payloadBytes, err := json.Marshal(requestPayload) + if err != nil { + // Fallback on error + return availableModels[0], nil + } + + resp, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(payloadBytes)) + if err != nil { + // Fallback on error + return availableModels[0], nil + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // Fallback on error + return availableModels[0], nil + } + + var response struct { + Model string `json:"model"` + } + + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + // Fallback on error + return availableModels[0], nil + } + + // Validate that the returned model is in our available list + for _, model := range availableModels { + if model == response.Model { + return response.Model, nil + } + } + + // Fallback if webhook returned invalid model + return availableModels[0], nil +} + // announceCapabilitiesOnChange broadcasts capabilities only when they change func announceCapabilitiesOnChange(ps *pubsub.PubSub, nodeID string, cfg *config.Config) { + // Detect available Ollama models and update config + availableModels, err := detectAvailableOllamaModels() + if err != nil { + fmt.Printf("⚠️ Failed to detect Ollama models: %v\n", err) + fmt.Printf("🔄 Using configured models: %v\n", cfg.Agent.Models) + } else { + // Filter configured models to only include available ones + validModels := make([]string, 0) + for _, configModel := range cfg.Agent.Models { + for _, availableModel := range availableModels { + if configModel == availableModel { + validModels = append(validModels, configModel) + break + } + } + } + + if len(validModels) == 0 { + fmt.Printf("⚠️ No configured models available in Ollama, using first available: %v\n", availableModels) + if len(availableModels) > 0 { + validModels = []string{availableModels[0]} + } + } else { + fmt.Printf("✅ Available models: %v\n", validModels) + } + + // Update config with available models + cfg.Agent.Models = validModels + + // Configure reasoning module with available models and webhook + reasoning.SetModelConfig(validModels, cfg.Agent.ModelSelectionWebhook) + } + // Get current capabilities currentCaps := map[string]interface{}{ "node_id": nodeID, diff --git a/monitoring/antennae_monitor.go b/monitoring/antennae_monitor.go index 7236d5fb..4f34e21a 100644 --- a/monitoring/antennae_monitor.go +++ b/monitoring/antennae_monitor.go @@ -9,7 +9,7 @@ import ( "sync" "time" - "github.com/deepblackcloud/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/pubsub" ) // AntennaeMonitor tracks and logs antennae coordination activity diff --git a/pkg/config/config.go b/pkg/config/config.go index 50a48bb5..b93a31c6 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -29,12 +29,13 @@ type HiveAPIConfig struct { // AgentConfig holds agent-specific configuration type AgentConfig struct { - ID string `yaml:"id"` - Capabilities []string `yaml:"capabilities"` - PollInterval time.Duration `yaml:"poll_interval"` - MaxTasks int `yaml:"max_tasks"` - Models []string `yaml:"models"` - Specialization string `yaml:"specialization"` + ID string `yaml:"id"` + Capabilities []string `yaml:"capabilities"` + PollInterval time.Duration `yaml:"poll_interval"` + MaxTasks int `yaml:"max_tasks"` + Models []string `yaml:"models"` + Specialization string `yaml:"specialization"` + ModelSelectionWebhook string `yaml:"model_selection_webhook"` } // GitHubConfig holds GitHub integration settings @@ -100,11 +101,12 @@ func getDefaultConfig() *Config { RetryCount: 3, }, Agent: AgentConfig{ - Capabilities: []string{"general", "reasoning", "task-coordination"}, - PollInterval: 30 * time.Second, - MaxTasks: 3, - Models: []string{"phi3", "llama3.1"}, - Specialization: "general_developer", + Capabilities: []string{"general", "reasoning", "task-coordination"}, + PollInterval: 30 * time.Second, + MaxTasks: 3, + Models: []string{"phi3", "llama3.1"}, + Specialization: "general_developer", + ModelSelectionWebhook: "https://n8n.home.deepblack.cloud/webhook/model-selection", }, GitHub: GitHubConfig{ TokenFile: "/home/tony/AI/secrets/passwords_and_tokens/gh-token", @@ -164,6 +166,9 @@ func loadFromEnv(config *Config) error { if specialization := os.Getenv("BZZZ_AGENT_SPECIALIZATION"); specialization != "" { config.Agent.Specialization = specialization } + if modelWebhook := os.Getenv("BZZZ_MODEL_SELECTION_WEBHOOK"); modelWebhook != "" { + config.Agent.ModelSelectionWebhook = modelWebhook + } // GitHub configuration if tokenFile := os.Getenv("BZZZ_GITHUB_TOKEN_FILE"); tokenFile != "" { diff --git a/pkg/coordination/dependency_detector.go b/pkg/coordination/dependency_detector.go index fd2d9f1f..d4b34170 100644 --- a/pkg/coordination/dependency_detector.go +++ b/pkg/coordination/dependency_detector.go @@ -6,7 +6,7 @@ import ( "strings" "time" - "github.com/deepblackcloud/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/pubsub" "github.com/libp2p/go-libp2p/core/peer" ) diff --git a/pkg/coordination/meta_coordinator.go b/pkg/coordination/meta_coordinator.go index 3bdfc433..dca18f0d 100644 --- a/pkg/coordination/meta_coordinator.go +++ b/pkg/coordination/meta_coordinator.go @@ -8,8 +8,8 @@ import ( "sync" "time" - "github.com/deepblackcloud/bzzz/pubsub" - "github.com/deepblackcloud/bzzz/reasoning" + "github.com/anthonyrawlins/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/reasoning" "github.com/libp2p/go-libp2p/core/peer" ) diff --git a/pkg/types/task.go b/pkg/types/task.go new file mode 100644 index 00000000..7495f293 --- /dev/null +++ b/pkg/types/task.go @@ -0,0 +1,35 @@ +package types + +import ( + "time" + + "github.com/anthonyrawlins/bzzz/pkg/hive" +) + +// EnhancedTask extends a basic Task with project-specific context. +// It's the primary data structure passed between the github, executor, +// and reasoning components. +type EnhancedTask struct { + // Core task details, originally from the GitHub issue. + ID int64 + Number int + Title string + Description string + State string + Labels []string + Assignee string + CreatedAt time.Time + UpdatedAt time.Time + + // Bzzz-specific fields parsed from the issue body or labels. + TaskType string + Priority int + Requirements []string + Deliverables []string + Context map[string]interface{} + + // Hive-integration fields providing repository context. + ProjectID int + GitURL string + Repository hive.Repository +} diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index e7273c11..c8b285b3 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "sync" "time" "github.com/libp2p/go-libp2p/core/host" @@ -26,6 +27,12 @@ type PubSub struct { bzzzSub *pubsub.Subscription antennaeSub *pubsub.Subscription + // Dynamic topic management + dynamicTopics map[string]*pubsub.Topic + dynamicTopicsMux sync.RWMutex + dynamicSubs map[string]*pubsub.Subscription + dynamicSubsMux sync.RWMutex + // Configuration bzzzTopicName string antennaeTopicName string @@ -48,6 +55,8 @@ const ( // Antennae meta-discussion messages MetaDiscussion MessageType = "meta_discussion" // Generic type for all discussion + TaskHelpRequest MessageType = "task_help_request" // Request for assistance + TaskHelpResponse MessageType = "task_help_response" // Response to a help request CoordinationRequest MessageType = "coordination_request" // Request for coordination CoordinationComplete MessageType = "coordination_complete" // Coordination session completed DependencyAlert MessageType = "dependency_alert" // Dependency detected @@ -93,10 +102,12 @@ func NewPubSub(ctx context.Context, h host.Host, bzzzTopic, antennaeTopic string cancel: cancel, bzzzTopicName: bzzzTopic, antennaeTopicName: antennaeTopic, + dynamicTopics: make(map[string]*pubsub.Topic), + dynamicSubs: make(map[string]*pubsub.Subscription), } - // Join topics - if err := p.joinTopics(); err != nil { + // Join static topics + if err := p.joinStaticTopics(); err != nil { cancel() return nil, err } @@ -110,13 +121,12 @@ func NewPubSub(ctx context.Context, h host.Host, bzzzTopic, antennaeTopic string } // SetAntennaeMessageHandler sets the handler for incoming Antennae messages. -// This allows the business logic (e.g., in the github module) to process messages. func (p *PubSub) SetAntennaeMessageHandler(handler func(msg Message, from peer.ID)) { p.AntennaeMessageHandler = handler } -// joinTopics joins the Bzzz coordination and Antennae meta-discussion topics -func (p *PubSub) joinTopics() error { +// joinStaticTopics joins the main Bzzz and Antennae topics +func (p *PubSub) joinStaticTopics() error { // Join Bzzz coordination topic bzzzTopic, err := p.ps.Join(p.bzzzTopicName) if err != nil { @@ -124,7 +134,6 @@ func (p *PubSub) joinTopics() error { } p.bzzzTopic = bzzzTopic - // Subscribe to Bzzz messages bzzzSub, err := bzzzTopic.Subscribe() if err != nil { return fmt.Errorf("failed to subscribe to Bzzz topic: %w", err) @@ -138,7 +147,6 @@ func (p *PubSub) joinTopics() error { } p.antennaeTopic = antennaeTopic - // Subscribe to Antennae messages antennaeSub, err := antennaeTopic.Subscribe() if err != nil { return fmt.Errorf("failed to subscribe to Antennae topic: %w", err) @@ -148,6 +156,83 @@ func (p *PubSub) joinTopics() error { return nil } +// JoinDynamicTopic joins a new topic for a specific task +func (p *PubSub) JoinDynamicTopic(topicName string) error { + p.dynamicTopicsMux.Lock() + defer p.dynamicTopicsMux.Unlock() + p.dynamicSubsMux.Lock() + defer p.dynamicSubsMux.Unlock() + + if _, exists := p.dynamicTopics[topicName]; exists { + return nil // Already joined + } + + topic, err := p.ps.Join(topicName) + if err != nil { + return fmt.Errorf("failed to join dynamic topic %s: %w", topicName, err) + } + + sub, err := topic.Subscribe() + if err != nil { + topic.Close() + return fmt.Errorf("failed to subscribe to dynamic topic %s: %w", topicName, err) + } + + p.dynamicTopics[topicName] = topic + p.dynamicSubs[topicName] = sub + + // Start a handler for this new subscription + go p.handleDynamicMessages(sub) + + fmt.Printf("✅ Joined dynamic topic: %s\n", topicName) + return nil +} + +// LeaveDynamicTopic leaves a specific task topic +func (p *PubSub) LeaveDynamicTopic(topicName string) { + p.dynamicTopicsMux.Lock() + defer p.dynamicTopicsMux.Unlock() + p.dynamicSubsMux.Lock() + defer p.dynamicSubsMux.Unlock() + + if sub, exists := p.dynamicSubs[topicName]; exists { + sub.Cancel() + delete(p.dynamicSubs, topicName) + } + + if topic, exists := p.dynamicTopics[topicName]; exists { + topic.Close() + delete(p.dynamicTopics, topicName) + } + + fmt.Printf("🗑️ Left dynamic topic: %s\n", topicName) +} + +// PublishToDynamicTopic publishes a message to a specific dynamic topic +func (p *PubSub) PublishToDynamicTopic(topicName string, msgType MessageType, data map[string]interface{}) error { + p.dynamicTopicsMux.RLock() + topic, exists := p.dynamicTopics[topicName] + p.dynamicTopicsMux.RUnlock() + + if !exists { + return fmt.Errorf("not subscribed to dynamic topic: %s", topicName) + } + + msg := Message{ + Type: msgType, + From: p.host.ID().String(), + Timestamp: time.Now(), + Data: data, + } + + msgBytes, err := json.Marshal(msg) + if err != nil { + return fmt.Errorf("failed to marshal message for dynamic topic: %w", err) + } + + return topic.Publish(p.ctx, msgBytes) +} + // PublishBzzzMessage publishes a message to the Bzzz coordination topic func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interface{}) error { msg := Message{ @@ -194,7 +279,6 @@ func (p *PubSub) handleBzzzMessages() { continue } - // Skip our own messages if msg.ReceivedFrom == p.host.ID() { continue } @@ -221,7 +305,6 @@ func (p *PubSub) handleAntennaeMessages() { continue } - // Skip our own messages if msg.ReceivedFrom == p.host.ID() { continue } @@ -232,16 +315,43 @@ func (p *PubSub) handleAntennaeMessages() { continue } - // If an external handler is registered, use it. if p.AntennaeMessageHandler != nil { p.AntennaeMessageHandler(antennaeMsg, msg.ReceivedFrom) } else { - // Default processing if no handler is set p.processAntennaeMessage(antennaeMsg, msg.ReceivedFrom) } } } +// handleDynamicMessages processes messages from a dynamic topic subscription +func (p *PubSub) handleDynamicMessages(sub *pubsub.Subscription) { + for { + msg, err := sub.Next(p.ctx) + if err != nil { + if p.ctx.Err() != nil || err.Error() == "subscription cancelled" { + return // Subscription was cancelled, exit handler + } + fmt.Printf("❌ Error receiving dynamic message: %v\n", err) + continue + } + + if msg.ReceivedFrom == p.host.ID() { + continue + } + + var dynamicMsg Message + if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil { + fmt.Printf("❌ Failed to unmarshal dynamic message: %v\n", err) + continue + } + + // Use the main Antennae handler for all dynamic messages + if p.AntennaeMessageHandler != nil { + p.AntennaeMessageHandler(dynamicMsg, msg.ReceivedFrom) + } + } +} + // processBzzzMessage handles different types of Bzzz coordination messages func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) { fmt.Printf("🐝 Bzzz [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) @@ -253,11 +363,6 @@ func (p *PubSub) processAntennaeMessage(msg Message, from peer.ID) { msg.Type, from.ShortString(), msg.Data) } -// GetConnectedPeers returns the number of connected peers -func (p *PubSub) GetConnectedPeers() int { - return len(p.host.Network().Peers()) -} - // Close shuts down the PubSub instance func (p *PubSub) Close() error { p.cancel() @@ -276,5 +381,11 @@ func (p *PubSub) Close() error { p.antennaeTopic.Close() } + p.dynamicTopicsMux.Lock() + for _, topic := range p.dynamicTopics { + topic.Close() + } + p.dynamicTopicsMux.Unlock() + return nil } diff --git a/reasoning/reasoning.go b/reasoning/reasoning.go index 397cb04f..09c660fe 100644 --- a/reasoning/reasoning.go +++ b/reasoning/reasoning.go @@ -15,6 +15,11 @@ const ( defaultTimeout = 60 * time.Second ) +var ( + availableModels []string + modelWebhookURL string +) + // OllamaRequest represents the request payload for the Ollama API. type OllamaRequest struct { Model string `json:"model"` @@ -77,3 +82,68 @@ func GenerateResponse(ctx context.Context, model, prompt string) (string, error) return ollamaResp.Response, nil } + +// SetModelConfig configures the available models and webhook URL for smart model selection +func SetModelConfig(models []string, webhookURL string) { + availableModels = models + modelWebhookURL = webhookURL +} + +// selectBestModel calls the model selection webhook to choose the best model for a prompt +func selectBestModel(availableModels []string, prompt string) string { + if modelWebhookURL == "" || len(availableModels) == 0 { + // Fallback to first available model + if len(availableModels) > 0 { + return availableModels[0] + } + return "phi3" // Last resort fallback + } + + requestPayload := map[string]interface{}{ + "models": availableModels, + "prompt": prompt, + } + + payloadBytes, err := json.Marshal(requestPayload) + if err != nil { + // Fallback on error + return availableModels[0] + } + + resp, err := http.Post(modelWebhookURL, "application/json", bytes.NewBuffer(payloadBytes)) + if err != nil { + // Fallback on error + return availableModels[0] + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + // Fallback on error + return availableModels[0] + } + + var response struct { + Model string `json:"model"` + } + + if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { + // Fallback on error + return availableModels[0] + } + + // Validate that the returned model is in our available list + for _, model := range availableModels { + if model == response.Model { + return response.Model + } + } + + // Fallback if webhook returned invalid model + return availableModels[0] +} + +// GenerateResponseSmart automatically selects the best model for the prompt +func GenerateResponseSmart(ctx context.Context, prompt string) (string, error) { + selectedModel := selectBestModel(availableModels, prompt) + return GenerateResponse(ctx, selectedModel, prompt) +} diff --git a/sandbox/sandbox.go b/sandbox/sandbox.go new file mode 100644 index 00000000..035f51af --- /dev/null +++ b/sandbox/sandbox.go @@ -0,0 +1,213 @@ +package sandbox + +import ( + "context" + "fmt" + "os" + "time" + + "github.com/docker/docker/api/types/container" + "github.com/docker/docker/client" +) + +const ( + // DefaultDockerImage is the image used if a task does not specify one. + DefaultDockerImage = "bzzz-sandbox:latest" +) + +// Sandbox represents a stateful, isolated execution environment for a single task. +type Sandbox struct { + ID string // The ID of the running container. + HostPath string // The path on the host machine mounted as the workspace. + Workspace string // The path inside the container that is the workspace. + dockerCli *client.Client + ctx context.Context +} + +// CommandResult holds the output of a command executed in the sandbox. +type CommandResult struct { + StdOut string + StdErr string + ExitCode int +} + +// CreateSandbox provisions a new Docker container for a task. +func CreateSandbox(ctx context.Context, taskImage string) (*Sandbox, error) { + if taskImage == "" { + taskImage = DefaultDockerImage + } + + // Create a new Docker client + cli, err := client.NewClientWithOpts(client.FromEnv, client.WithAPIVersionNegotiation()) + if err != nil { + return nil, fmt.Errorf("failed to create docker client: %w", err) + } + + // Create a temporary directory on the host + hostPath, err := os.MkdirTemp("", "bzzz-sandbox-") + if err != nil { + return nil, fmt.Errorf("failed to create temp dir for sandbox: %w", err) + } + + // Define container configuration + containerConfig := &container.Config{ + Image: taskImage, + Tty: true, // Keep the container running + OpenStdin: true, + WorkingDir: "/home/agent/work", + User: "agent", + } + + // Define host configuration (e.g., volume mounts, resource limits) + hostConfig := &container.HostConfig{ + Binds: []string{fmt.Sprintf("%s:/home/agent/work", hostPath)}, + Resources: container.Resources{ + CPUs: 2, + Memory: 2 * 1024 * 1024 * 1024, // 2GB + }, + } + + // Create the container + resp, err := cli.ContainerCreate(ctx, containerConfig, hostConfig, nil, nil, "") + if err != nil { + os.RemoveAll(hostPath) // Clean up the directory if container creation fails + return nil, fmt.Errorf("failed to create container: %w", err) + } + + // Start the container + if err := cli.ContainerStart(ctx, resp.ID, container.StartOptions{}); err != nil { + os.RemoveAll(hostPath) // Clean up + return nil, fmt.Errorf("failed to start container: %w", err) + } + + fmt.Printf("✅ Sandbox container %s created successfully.\n", resp.ID[:12]) + + return &Sandbox{ + ID: resp.ID, + HostPath: hostPath, + Workspace: "/home/agent/work", + dockerCli: cli, + ctx: ctx, + }, nil +} + +// DestroySandbox stops and removes the container and its associated host directory. +func (s *Sandbox) DestroySandbox() error { + if s == nil || s.ID == "" { + return nil + } + + // Define a timeout for stopping the container + timeout := 30 * time.Second + + // Stop the container + fmt.Printf("🛑 Stopping sandbox container %s...\n", s.ID[:12]) + err := s.dockerCli.ContainerStop(s.ctx, s.ID, container.StopOptions{Timeout: &timeout}) + if err != nil { + // Log the error but continue to try and clean up + fmt.Printf("⚠️ Error stopping container %s: %v. Proceeding with cleanup.\n", s.ID, err) + } + + // Remove the container + err = s.dockerCli.ContainerRemove(s.ctx, s.ID, container.RemoveOptions{Force: true}) + if err != nil { + fmt.Printf("⚠️ Error removing container %s: %v. Proceeding with cleanup.\n", s.ID, err) + } + + // Remove the host directory + fmt.Printf("🗑️ Removing host directory %s...\n", s.HostPath) + err = os.RemoveAll(s.HostPath) + if err != nil { + return fmt.Errorf("failed to remove host directory %s: %w", s.HostPath, err) + } + + fmt.Printf("✅ Sandbox %s destroyed successfully.\n", s.ID[:12]) + return nil +} + +// RunCommand executes a shell command inside the sandbox. +func (s *Sandbox) RunCommand(command string) (*CommandResult, error) { + // Configuration for the exec process + execConfig := container.ExecOptions{ + Cmd: []string{"/bin/sh", "-c", command}, + AttachStdout: true, + AttachStderr: true, + Tty: false, + } + + // Create the exec instance + execID, err := s.dockerCli.ContainerExecCreate(s.ctx, s.ID, execConfig) + if err != nil { + return nil, fmt.Errorf("failed to create exec in container: %w", err) + } + + // Start the exec process + resp, err := s.dockerCli.ContainerExecAttach(s.ctx, execID.ID, container.ExecStartOptions{}) + if err != nil { + return nil, fmt.Errorf("failed to attach to exec in container: %w", err) + } + defer resp.Close() + + // Read the output + var stdout, stderr bytes.Buffer + _, err = stdcopy.StdCopy(&stdout, &stderr, resp.Reader) + if err != nil { + return nil, fmt.Errorf("failed to read exec output: %w", err) + } + + // Inspect the exec process to get the exit code + inspect, err := s.dockerCli.ContainerExecInspect(s.ctx, execID.ID) + if err != nil { + return nil, fmt.Errorf("failed to inspect exec in container: %w", err) + } + + return &CommandResult{ + StdOut: stdout.String(), + StdErr: stderr.String(), + ExitCode: inspect.ExitCode, + }, nil +} + +// WriteFile writes content to a file inside the sandbox's workspace. +func (s *Sandbox) WriteFile(path string, content []byte) error { + // Create a temporary file on the host + tmpfile, err := os.CreateTemp("", "bzzz-write-") + if err != nil { + return fmt.Errorf("failed to create temp file: %w", err) + } + defer os.Remove(tmpfile.Name()) + + if _, err := tmpfile.Write(content); err != nil { + return fmt.Errorf("failed to write to temp file: %w", err) + } + tmpfile.Close() + + // Copy the file into the container + dstPath := filepath.Join(s.Workspace, path) + return s.dockerCli.CopyToContainer(s.ctx, tmpfile.Name(), s.ID, dstPath) +} + +// ReadFile reads the content of a file from the sandbox's workspace. +func (s *Sandbox) ReadFile(path string) ([]byte, error) { + srcPath := filepath.Join(s.Workspace, path) + + // Copy the file from the container + reader, _, err := s.dockerCli.CopyFromContainer(s.ctx, s.ID, srcPath) + if err != nil { + return nil, fmt.Errorf("failed to copy from container: %w", err) + } + defer reader.Close() + + // The result is a tar archive, so we need to extract it + tr := tar.NewReader(reader) + if _, err := tr.Next(); err != nil { + return nil, fmt.Errorf("failed to get tar header: %w", err) + } + + buf := new(bytes.Buffer) + if _, err := io.Copy(buf, tr); err != nil { + return nil, fmt.Errorf("failed to read file content from tar: %w", err) + } + + return buf.Bytes(), nil +} diff --git a/systemd/bzzz-agent.service b/systemd/bzzz-agent.service deleted file mode 100644 index 3ece402b..00000000 --- a/systemd/bzzz-agent.service +++ /dev/null @@ -1,38 +0,0 @@ -[Unit] -Description=Bzzz P2P Task Coordination Agent -Documentation=https://github.com/deepblackcloud/bzzz -After=network-online.target -Wants=network-online.target - -[Service] -Type=simple -User=tony -Group=tony -WorkingDirectory=/home/tony/AI/projects/Bzzz -ExecStart=/home/tony/AI/projects/Bzzz/bzzz -Restart=always -RestartSec=10 - -# Environment variables -Environment=BZZZ_LOG_LEVEL=info -Environment=BZZZ_HIVE_API_URL=https://hive.home.deepblack.cloud -Environment=BZZZ_GITHUB_TOKEN_FILE=/home/tony/AI/secrets/passwords_and_tokens/gh-token - -# Security settings -NoNewPrivileges=true -PrivateTmp=true -ProtectHome=false -ProtectSystem=strict -ReadWritePaths=/home/tony/AI/projects/Bzzz /tmp /home/tony/.config/bzzz - -# Resource limits -LimitNOFILE=65536 -LimitNPROC=4096 - -# Logging -StandardOutput=journal -StandardError=journal -SyslogIdentifier=bzzz-agent - -[Install] -WantedBy=multi-user.target \ No newline at end of file diff --git a/test/antennae_test.go b/test/antennae_test.go index ba38a9b9..13da62e6 100644 --- a/test/antennae_test.go +++ b/test/antennae_test.go @@ -5,8 +5,8 @@ import ( "fmt" "time" - "github.com/deepblackcloud/bzzz/pubsub" - "github.com/deepblackcloud/bzzz/pkg/coordination" + "github.com/anthonyrawlins/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/pkg/coordination" ) // AntennaeTestSuite runs comprehensive tests for the antennae coordination system diff --git a/test/task_simulator.go b/test/task_simulator.go index 1dcf4adb..a27f48c3 100644 --- a/test/task_simulator.go +++ b/test/task_simulator.go @@ -7,7 +7,7 @@ import ( "math/rand" "time" - "github.com/deepblackcloud/bzzz/pubsub" + "github.com/anthonyrawlins/bzzz/pubsub" ) // TaskSimulator generates realistic task scenarios for testing antennae coordination @@ -287,7 +287,7 @@ func generateMockRepositories() []MockRepository { { Owner: "deepblackcloud", Name: "bzzz", - URL: "https://github.com/deepblackcloud/bzzz", + URL: "https://github.com/anthonyrawlins/bzzz", Dependencies: []string{"hive"}, Tasks: []MockTask{ {