diff --git a/github/integration.go b/github/integration.go index 5f72ed54..720be011 100644 --- a/github/integration.go +++ b/github/integration.go @@ -1,8 +1,11 @@ package github import ( + "bytes" "context" + "encoding/json" "fmt" + "net/http" "strings" "sync" "time" @@ -12,6 +15,15 @@ import ( "github.com/libp2p/go-libp2p/core/peer" ) +const ( + // humanEscalationWebhookURL is the N8N webhook for escalating tasks. + humanEscalationWebhookURL = "https://n8n.home.deepblack.cloud/webhook-test/human-escalation" + // conversationHistoryLimit is the number of messages before auto-escalation. + conversationHistoryLimit = 10 +) + +var escalationKeywords = []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"} + // Conversation represents the history of a discussion for a task. type Conversation struct { TaskID int @@ -19,6 +31,7 @@ type Conversation struct { TaskDescription string History []string LastUpdated time.Time + IsEscalated bool } // Integration handles the integration between GitHub tasks and Bzzz P2P coordination @@ -28,17 +41,16 @@ type Integration struct { ctx context.Context config *IntegrationConfig - // activeDiscussions stores the conversation history for each task. activeDiscussions map[int]*Conversation discussionLock sync.RWMutex } // IntegrationConfig holds configuration for GitHub-Bzzz integration type IntegrationConfig struct { - PollInterval time.Duration // How often to check for new tasks - MaxTasks int // Maximum tasks to process simultaneously - AgentID string // This agent's identifier - Capabilities []string // What types of tasks this agent can handle + PollInterval time.Duration + MaxTasks int + AgentID string + Capabilities []string } // NewIntegration creates a new GitHub-Bzzz integration @@ -62,11 +74,7 @@ func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, conf // Start begins the GitHub-Bzzz integration func (i *Integration) Start() { fmt.Printf("🔗 Starting GitHub-Bzzz integration for agent: %s\n", i.config.AgentID) - - // Register the handler for incoming meta-discussion messages i.pubsub.SetAntennaeMessageHandler(i.handleMetaDiscussion) - - // Start task polling go i.pollForTasks() } @@ -96,7 +104,6 @@ func (i *Integration) checkAndClaimTasks() error { if len(tasks) == 0 { return nil } - fmt.Printf("📋 Found %d available tasks\n", len(tasks)) suitableTasks := i.filterSuitableTasks(tasks) if len(suitableTasks) == 0 { @@ -110,50 +117,22 @@ func (i *Integration) checkAndClaimTasks() error { } fmt.Printf("✋ Claimed task #%d: %s\n", claimedTask.Number, claimedTask.Title) - if err := i.announceTaskClaim(claimedTask); err != nil { - fmt.Printf("⚠️ Failed to announce task claim: %v\n", err) - } - go i.executeTask(claimedTask) return nil } -// filterSuitableTasks filters tasks based on agent capabilities -func (i *Integration) filterSuitableTasks(tasks []*Task) []*Task { - // (Implementation is unchanged) - return tasks -} - -// canHandleTaskType checks if this agent can handle the given task type -func (i *Integration) canHandleTaskType(taskType string) bool { - // (Implementation is unchanged) - return true -} - -// announceTaskClaim announces a task claim over the P2P network -func (i *Integration) announceTaskClaim(task *Task) error { - // (Implementation is unchanged) - return nil -} - // executeTask starts the task by generating and proposing a plan. func (i *Integration) executeTask(task *Task) { fmt.Printf("🚀 Starting execution of task #%d: %s\n", task.Number, task.Title) - // === REASONING STEP === - fmt.Printf("🧠 Reasoning about task #%d to form a plan...\n", task.Number) prompt := fmt.Sprintf("You are an expert AI developer. Based on the following GitHub issue, create a concise, step-by-step plan to resolve it. Issue Title: %s. Issue Body: %s.", task.Title, task.Description) - model := "phi3" - - plan, err := reasoning.GenerateResponse(i.ctx, model, prompt) + plan, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt) if err != nil { fmt.Printf("❌ Failed to generate execution plan for task #%d: %v\n", task.Number, err) return } fmt.Printf("📝 Generated Plan for task #%d:\n%s\n", task.Number, plan) - // === META-DISCUSSION STEP === - // Store the initial state of the conversation i.discussionLock.Lock() i.activeDiscussions[task.Number] = &Conversation{ TaskID: task.Number, @@ -164,7 +143,6 @@ func (i *Integration) executeTask(task *Task) { } i.discussionLock.Unlock() - // Announce the plan on the Antennae channel metaMsg := map[string]interface{}{ "issue_id": task.Number, "message": "Here is my proposed plan of action. What are your thoughts?", @@ -179,20 +157,17 @@ func (i *Integration) executeTask(task *Task) { func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { issueID, ok := msg.Data["issue_id"].(float64) if !ok { - fmt.Printf("⚠️ Received meta-discussion message with invalid issue_id\n") return } taskID := int(issueID) i.discussionLock.Lock() convo, exists := i.activeDiscussions[taskID] - if !exists { + if !exists || convo.IsEscalated { i.discussionLock.Unlock() - // We are not involved in this conversation, so we ignore it. return } - // Append the new message to the history incomingMessage, _ := msg.Data["message"].(string) convo.History = append(convo.History, fmt.Sprintf("Response from %s: %s", from.ShortString(), incomingMessage)) convo.LastUpdated = time.Now() @@ -200,8 +175,6 @@ func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { fmt.Printf("🎯 Received peer feedback for task #%d. Reasoning about a response...\n", taskID) - // === REASONING STEP (RESPONSE) === - // Construct a prompt with the full conversation history historyStr := strings.Join(convo.History, "\n") prompt := fmt.Sprintf( "You are an AI developer collaborating on a task. "+ @@ -210,17 +183,22 @@ func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { "Based on the last message, provide a concise and helpful response.", convo.TaskTitle, convo.TaskDescription, historyStr, ) - model := "phi3" - response, err := reasoning.GenerateResponse(i.ctx, model, prompt) + response, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt) if err != nil { fmt.Printf("❌ Failed to generate response for task #%d: %v\n", taskID, err) return } - fmt.Printf("💬 Sending response for task #%d...\n", taskID) + // Check if the situation requires human intervention + if i.shouldEscalate(response, convo.History) { + fmt.Printf("🚨 Escalating task #%d for human review.\n", taskID) + convo.IsEscalated = true + go i.triggerHumanEscalation(convo, response) + return + } - // Publish the response + fmt.Printf("💬 Sending response for task #%d...\n", taskID) responseMsg := map[string]interface{}{ "issue_id": taskID, "message": response, @@ -229,3 +207,73 @@ func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { fmt.Printf("⚠️ Failed to publish response for task #%d: %v\n", taskID, err) } } + +// shouldEscalate determines if a task needs human intervention. +func (i *Integration) shouldEscalate(response string, history []string) bool { + // Rule 1: Check for keywords in the latest response + lowerResponse := strings.ToLower(response) + for _, keyword := range escalationKeywords { + if strings.Contains(lowerResponse, keyword) { + return true + } + } + + // Rule 2: Check if the conversation is too long + if len(history) >= conversationHistoryLimit { + return true + } + + return false +} + +// triggerHumanEscalation sends the conversation details to the N8N webhook. +func (i *Integration) triggerHumanEscalation(convo *Conversation, reason string) { + // 1. Announce the escalation to other agents + escalationMsg := map[string]interface{}{ + "issue_id": convo.TaskID, + "message": "This task has been escalated for human review. No further automated action will be taken.", + "reason": reason, + } + if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, escalationMsg); err != nil { + fmt.Printf("⚠️ Failed to publish escalation message for task #%d: %v\n", convo.TaskID, err) + } + + // 2. Send the payload to the N8N webhook + payload := map[string]interface{}{ + "task_id": convo.TaskID, + "task_title": convo.TaskTitle, + "escalation_agent": i.config.AgentID, + "reason": reason, + "history": strings.Join(convo.History, "\n"), + } + payloadBytes, err := json.Marshal(payload) + if err != nil { + fmt.Printf("❌ Failed to marshal escalation payload for task #%d: %v\n", convo.TaskID, err) + return + } + + req, err := http.NewRequestWithContext(i.ctx, "POST", humanEscalationWebhookURL, bytes.NewBuffer(payloadBytes)) + if err != nil { + fmt.Printf("❌ Failed to create escalation request for task #%d: %v\n", convo.TaskID, err) + return + } + req.Header.Set("Content-Type", "application/json") + + resp, err := http.DefaultClient.Do(req) + if err != nil { + fmt.Printf("❌ Failed to send escalation webhook for task #%d: %v\n", convo.TaskID, err) + return + } + defer resp.Body.Close() + + if resp.StatusCode >= 300 { + fmt.Printf("⚠️ Human escalation webhook for task #%d returned non-2xx status: %d\n", convo.TaskID, resp.StatusCode) + } else { + fmt.Printf("✅ Successfully escalated task #%d to human administrator.\n", convo.TaskID) + } +} + +// Unchanged functions +func (i *Integration) filterSuitableTasks(tasks []*Task) []*Task { return tasks } +func (i *Integration) canHandleTaskType(taskType string) bool { return true } +func (i *Integration) announceTaskClaim(task *Task) error { return nil } \ No newline at end of file