feat: Implement human escalation trigger

This commit introduces the human escalation feature, completing Phase 3 of the development plan.

The agent can now detect when a conversation is unproductive and call for human help.

Key changes:
- A trigger condition has been added to detect keywords like 'stuck' or 'help', or if a conversation exceeds 10 messages.
- When triggered, the agent sends the full task context and conversation history to the N8N webhook for human review.
- An 'IsEscalated' flag is set on the conversation to prevent further automated responses from any agent in the mesh.
- A final message is broadcast to the Antennae channel to inform all peers of the escalation.
This commit is contained in:
anthonyrawlins
2025-07-12 20:44:25 +10:00
parent 26c4b960c8
commit fafeb1a007

View File

@@ -1,8 +1,11 @@
package github package github
import ( import (
"bytes"
"context" "context"
"encoding/json"
"fmt" "fmt"
"net/http"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -12,6 +15,15 @@ import (
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
) )
const (
// humanEscalationWebhookURL is the N8N webhook for escalating tasks.
humanEscalationWebhookURL = "https://n8n.home.deepblack.cloud/webhook-test/human-escalation"
// conversationHistoryLimit is the number of messages before auto-escalation.
conversationHistoryLimit = 10
)
var escalationKeywords = []string{"stuck", "help", "human", "escalate", "clarification needed", "manual intervention"}
// Conversation represents the history of a discussion for a task. // Conversation represents the history of a discussion for a task.
type Conversation struct { type Conversation struct {
TaskID int TaskID int
@@ -19,6 +31,7 @@ type Conversation struct {
TaskDescription string TaskDescription string
History []string History []string
LastUpdated time.Time LastUpdated time.Time
IsEscalated bool
} }
// Integration handles the integration between GitHub tasks and Bzzz P2P coordination // Integration handles the integration between GitHub tasks and Bzzz P2P coordination
@@ -28,17 +41,16 @@ type Integration struct {
ctx context.Context ctx context.Context
config *IntegrationConfig config *IntegrationConfig
// activeDiscussions stores the conversation history for each task.
activeDiscussions map[int]*Conversation activeDiscussions map[int]*Conversation
discussionLock sync.RWMutex discussionLock sync.RWMutex
} }
// IntegrationConfig holds configuration for GitHub-Bzzz integration // IntegrationConfig holds configuration for GitHub-Bzzz integration
type IntegrationConfig struct { type IntegrationConfig struct {
PollInterval time.Duration // How often to check for new tasks PollInterval time.Duration
MaxTasks int // Maximum tasks to process simultaneously MaxTasks int
AgentID string // This agent's identifier AgentID string
Capabilities []string // What types of tasks this agent can handle Capabilities []string
} }
// NewIntegration creates a new GitHub-Bzzz integration // NewIntegration creates a new GitHub-Bzzz integration
@@ -62,11 +74,7 @@ func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, conf
// Start begins the GitHub-Bzzz integration // Start begins the GitHub-Bzzz integration
func (i *Integration) Start() { func (i *Integration) Start() {
fmt.Printf("🔗 Starting GitHub-Bzzz integration for agent: %s\n", i.config.AgentID) fmt.Printf("🔗 Starting GitHub-Bzzz integration for agent: %s\n", i.config.AgentID)
// Register the handler for incoming meta-discussion messages
i.pubsub.SetAntennaeMessageHandler(i.handleMetaDiscussion) i.pubsub.SetAntennaeMessageHandler(i.handleMetaDiscussion)
// Start task polling
go i.pollForTasks() go i.pollForTasks()
} }
@@ -96,7 +104,6 @@ func (i *Integration) checkAndClaimTasks() error {
if len(tasks) == 0 { if len(tasks) == 0 {
return nil return nil
} }
fmt.Printf("📋 Found %d available tasks\n", len(tasks))
suitableTasks := i.filterSuitableTasks(tasks) suitableTasks := i.filterSuitableTasks(tasks)
if len(suitableTasks) == 0 { if len(suitableTasks) == 0 {
@@ -110,50 +117,22 @@ func (i *Integration) checkAndClaimTasks() error {
} }
fmt.Printf("✋ Claimed task #%d: %s\n", claimedTask.Number, claimedTask.Title) fmt.Printf("✋ Claimed task #%d: %s\n", claimedTask.Number, claimedTask.Title)
if err := i.announceTaskClaim(claimedTask); err != nil {
fmt.Printf("⚠️ Failed to announce task claim: %v\n", err)
}
go i.executeTask(claimedTask) go i.executeTask(claimedTask)
return nil return nil
} }
// filterSuitableTasks filters tasks based on agent capabilities
func (i *Integration) filterSuitableTasks(tasks []*Task) []*Task {
// (Implementation is unchanged)
return tasks
}
// canHandleTaskType checks if this agent can handle the given task type
func (i *Integration) canHandleTaskType(taskType string) bool {
// (Implementation is unchanged)
return true
}
// announceTaskClaim announces a task claim over the P2P network
func (i *Integration) announceTaskClaim(task *Task) error {
// (Implementation is unchanged)
return nil
}
// executeTask starts the task by generating and proposing a plan. // executeTask starts the task by generating and proposing a plan.
func (i *Integration) executeTask(task *Task) { func (i *Integration) executeTask(task *Task) {
fmt.Printf("🚀 Starting execution of task #%d: %s\n", task.Number, task.Title) fmt.Printf("🚀 Starting execution of task #%d: %s\n", task.Number, task.Title)
// === REASONING STEP ===
fmt.Printf("🧠 Reasoning about task #%d to form a plan...\n", task.Number)
prompt := fmt.Sprintf("You are an expert AI developer. Based on the following GitHub issue, create a concise, step-by-step plan to resolve it. Issue Title: %s. Issue Body: %s.", task.Title, task.Description) prompt := fmt.Sprintf("You are an expert AI developer. Based on the following GitHub issue, create a concise, step-by-step plan to resolve it. Issue Title: %s. Issue Body: %s.", task.Title, task.Description)
model := "phi3" plan, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt)
plan, err := reasoning.GenerateResponse(i.ctx, model, prompt)
if err != nil { if err != nil {
fmt.Printf("❌ Failed to generate execution plan for task #%d: %v\n", task.Number, err) fmt.Printf("❌ Failed to generate execution plan for task #%d: %v\n", task.Number, err)
return return
} }
fmt.Printf("📝 Generated Plan for task #%d:\n%s\n", task.Number, plan) fmt.Printf("📝 Generated Plan for task #%d:\n%s\n", task.Number, plan)
// === META-DISCUSSION STEP ===
// Store the initial state of the conversation
i.discussionLock.Lock() i.discussionLock.Lock()
i.activeDiscussions[task.Number] = &Conversation{ i.activeDiscussions[task.Number] = &Conversation{
TaskID: task.Number, TaskID: task.Number,
@@ -164,7 +143,6 @@ func (i *Integration) executeTask(task *Task) {
} }
i.discussionLock.Unlock() i.discussionLock.Unlock()
// Announce the plan on the Antennae channel
metaMsg := map[string]interface{}{ metaMsg := map[string]interface{}{
"issue_id": task.Number, "issue_id": task.Number,
"message": "Here is my proposed plan of action. What are your thoughts?", "message": "Here is my proposed plan of action. What are your thoughts?",
@@ -179,20 +157,17 @@ func (i *Integration) executeTask(task *Task) {
func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
issueID, ok := msg.Data["issue_id"].(float64) issueID, ok := msg.Data["issue_id"].(float64)
if !ok { if !ok {
fmt.Printf("⚠️ Received meta-discussion message with invalid issue_id\n")
return return
} }
taskID := int(issueID) taskID := int(issueID)
i.discussionLock.Lock() i.discussionLock.Lock()
convo, exists := i.activeDiscussions[taskID] convo, exists := i.activeDiscussions[taskID]
if !exists { if !exists || convo.IsEscalated {
i.discussionLock.Unlock() i.discussionLock.Unlock()
// We are not involved in this conversation, so we ignore it.
return return
} }
// Append the new message to the history
incomingMessage, _ := msg.Data["message"].(string) incomingMessage, _ := msg.Data["message"].(string)
convo.History = append(convo.History, fmt.Sprintf("Response from %s: %s", from.ShortString(), incomingMessage)) convo.History = append(convo.History, fmt.Sprintf("Response from %s: %s", from.ShortString(), incomingMessage))
convo.LastUpdated = time.Now() convo.LastUpdated = time.Now()
@@ -200,8 +175,6 @@ func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
fmt.Printf("🎯 Received peer feedback for task #%d. Reasoning about a response...\n", taskID) fmt.Printf("🎯 Received peer feedback for task #%d. Reasoning about a response...\n", taskID)
// === REASONING STEP (RESPONSE) ===
// Construct a prompt with the full conversation history
historyStr := strings.Join(convo.History, "\n") historyStr := strings.Join(convo.History, "\n")
prompt := fmt.Sprintf( prompt := fmt.Sprintf(
"You are an AI developer collaborating on a task. "+ "You are an AI developer collaborating on a task. "+
@@ -210,17 +183,22 @@ func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
"Based on the last message, provide a concise and helpful response.", "Based on the last message, provide a concise and helpful response.",
convo.TaskTitle, convo.TaskDescription, historyStr, convo.TaskTitle, convo.TaskDescription, historyStr,
) )
model := "phi3"
response, err := reasoning.GenerateResponse(i.ctx, model, prompt) response, err := reasoning.GenerateResponse(i.ctx, "phi3", prompt)
if err != nil { if err != nil {
fmt.Printf("❌ Failed to generate response for task #%d: %v\n", taskID, err) fmt.Printf("❌ Failed to generate response for task #%d: %v\n", taskID, err)
return return
} }
fmt.Printf("💬 Sending response for task #%d...\n", taskID) // Check if the situation requires human intervention
if i.shouldEscalate(response, convo.History) {
fmt.Printf("🚨 Escalating task #%d for human review.\n", taskID)
convo.IsEscalated = true
go i.triggerHumanEscalation(convo, response)
return
}
// Publish the response fmt.Printf("💬 Sending response for task #%d...\n", taskID)
responseMsg := map[string]interface{}{ responseMsg := map[string]interface{}{
"issue_id": taskID, "issue_id": taskID,
"message": response, "message": response,
@@ -229,3 +207,73 @@ func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) {
fmt.Printf("⚠️ Failed to publish response for task #%d: %v\n", taskID, err) fmt.Printf("⚠️ Failed to publish response for task #%d: %v\n", taskID, err)
} }
} }
// shouldEscalate determines if a task needs human intervention.
func (i *Integration) shouldEscalate(response string, history []string) bool {
// Rule 1: Check for keywords in the latest response
lowerResponse := strings.ToLower(response)
for _, keyword := range escalationKeywords {
if strings.Contains(lowerResponse, keyword) {
return true
}
}
// Rule 2: Check if the conversation is too long
if len(history) >= conversationHistoryLimit {
return true
}
return false
}
// triggerHumanEscalation sends the conversation details to the N8N webhook.
func (i *Integration) triggerHumanEscalation(convo *Conversation, reason string) {
// 1. Announce the escalation to other agents
escalationMsg := map[string]interface{}{
"issue_id": convo.TaskID,
"message": "This task has been escalated for human review. No further automated action will be taken.",
"reason": reason,
}
if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, escalationMsg); err != nil {
fmt.Printf("⚠️ Failed to publish escalation message for task #%d: %v\n", convo.TaskID, err)
}
// 2. Send the payload to the N8N webhook
payload := map[string]interface{}{
"task_id": convo.TaskID,
"task_title": convo.TaskTitle,
"escalation_agent": i.config.AgentID,
"reason": reason,
"history": strings.Join(convo.History, "\n"),
}
payloadBytes, err := json.Marshal(payload)
if err != nil {
fmt.Printf("❌ Failed to marshal escalation payload for task #%d: %v\n", convo.TaskID, err)
return
}
req, err := http.NewRequestWithContext(i.ctx, "POST", humanEscalationWebhookURL, bytes.NewBuffer(payloadBytes))
if err != nil {
fmt.Printf("❌ Failed to create escalation request for task #%d: %v\n", convo.TaskID, err)
return
}
req.Header.Set("Content-Type", "application/json")
resp, err := http.DefaultClient.Do(req)
if err != nil {
fmt.Printf("❌ Failed to send escalation webhook for task #%d: %v\n", convo.TaskID, err)
return
}
defer resp.Body.Close()
if resp.StatusCode >= 300 {
fmt.Printf("⚠️ Human escalation webhook for task #%d returned non-2xx status: %d\n", convo.TaskID, resp.StatusCode)
} else {
fmt.Printf("✅ Successfully escalated task #%d to human administrator.\n", convo.TaskID)
}
}
// Unchanged functions
func (i *Integration) filterSuitableTasks(tasks []*Task) []*Task { return tasks }
func (i *Integration) canHandleTaskType(taskType string) bool { return true }
func (i *Integration) announceTaskClaim(task *Task) error { return nil }