Merge pull request #2 from anthonyrawlins/feature/ollama-reasoning
Feature/ollama reasoning
This commit is contained in:
		| @@ -3,25 +3,42 @@ package github | |||||||
| import ( | import ( | ||||||
| 	"context" | 	"context" | ||||||
| 	"fmt" | 	"fmt" | ||||||
|  | 	"strings" | ||||||
|  | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/deepblackcloud/bzzz/pubsub" | 	"github.com/deepblackcloud/bzzz/pubsub" | ||||||
|  | 	"github.com/deepblackcloud/bzzz/reasoning" | ||||||
|  | 	"github.com/libp2p/go-libp2p/core/peer" | ||||||
| ) | ) | ||||||
|  |  | ||||||
|  | // Conversation represents the history of a discussion for a task. | ||||||
|  | type Conversation struct { | ||||||
|  | 	TaskID          int | ||||||
|  | 	TaskTitle       string | ||||||
|  | 	TaskDescription string | ||||||
|  | 	History         []string | ||||||
|  | 	LastUpdated     time.Time | ||||||
|  | } | ||||||
|  |  | ||||||
| // Integration handles the integration between GitHub tasks and Bzzz P2P coordination | // Integration handles the integration between GitHub tasks and Bzzz P2P coordination | ||||||
| type Integration struct { | type Integration struct { | ||||||
| 	client *Client | 	client *Client | ||||||
| 	pubsub *pubsub.PubSub | 	pubsub *pubsub.PubSub | ||||||
| 	ctx    context.Context | 	ctx    context.Context | ||||||
| 	config *IntegrationConfig | 	config *IntegrationConfig | ||||||
|  |  | ||||||
|  | 	// activeDiscussions stores the conversation history for each task. | ||||||
|  | 	activeDiscussions map[int]*Conversation | ||||||
|  | 	discussionLock    sync.RWMutex | ||||||
| } | } | ||||||
|  |  | ||||||
| // IntegrationConfig holds configuration for GitHub-Bzzz integration | // IntegrationConfig holds configuration for GitHub-Bzzz integration | ||||||
| type IntegrationConfig struct { | type IntegrationConfig struct { | ||||||
| 	PollInterval  time.Duration // How often to check for new tasks | 	PollInterval time.Duration // How often to check for new tasks | ||||||
| 	MaxTasks      int           // Maximum tasks to process simultaneously | 	MaxTasks     int           // Maximum tasks to process simultaneously | ||||||
| 	AgentID       string        // This agent's identifier | 	AgentID      string        // This agent's identifier | ||||||
| 	Capabilities  []string      // What types of tasks this agent can handle | 	Capabilities []string      // What types of tasks this agent can handle | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewIntegration creates a new GitHub-Bzzz integration | // NewIntegration creates a new GitHub-Bzzz integration | ||||||
| @@ -34,10 +51,11 @@ func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, conf | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	return &Integration{ | 	return &Integration{ | ||||||
| 		client: client, | 		client:            client, | ||||||
| 		pubsub: ps, | 		pubsub:            ps, | ||||||
| 		ctx:    ctx, | 		ctx:               ctx, | ||||||
| 		config: config, | 		config:            config, | ||||||
|  | 		activeDiscussions: make(map[int]*Conversation), | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -45,11 +63,11 @@ func NewIntegration(ctx context.Context, client *Client, ps *pubsub.PubSub, conf | |||||||
| func (i *Integration) Start() { | func (i *Integration) Start() { | ||||||
| 	fmt.Printf("🔗 Starting GitHub-Bzzz integration for agent: %s\n", i.config.AgentID) | 	fmt.Printf("🔗 Starting GitHub-Bzzz integration for agent: %s\n", i.config.AgentID) | ||||||
|  |  | ||||||
|  | 	// Register the handler for incoming meta-discussion messages | ||||||
|  | 	i.pubsub.SetAntennaeMessageHandler(i.handleMetaDiscussion) | ||||||
|  |  | ||||||
| 	// Start task polling | 	// Start task polling | ||||||
| 	go i.pollForTasks() | 	go i.pollForTasks() | ||||||
| 	 |  | ||||||
| 	// Start listening for P2P task announcements |  | ||||||
| 	go i.listenForTaskAnnouncements() |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // pollForTasks periodically checks GitHub for available tasks | // pollForTasks periodically checks GitHub for available tasks | ||||||
| @@ -71,137 +89,143 @@ func (i *Integration) pollForTasks() { | |||||||
|  |  | ||||||
| // checkAndClaimTasks looks for available tasks and claims suitable ones | // checkAndClaimTasks looks for available tasks and claims suitable ones | ||||||
| func (i *Integration) checkAndClaimTasks() error { | func (i *Integration) checkAndClaimTasks() error { | ||||||
| 	// Get available tasks |  | ||||||
| 	tasks, err := i.client.ListAvailableTasks() | 	tasks, err := i.client.ListAvailableTasks() | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("failed to list tasks: %w", err) | 		return fmt.Errorf("failed to list tasks: %w", err) | ||||||
| 	} | 	} | ||||||
| 	 |  | ||||||
| 	if len(tasks) == 0 { | 	if len(tasks) == 0 { | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
| 	 |  | ||||||
| 	fmt.Printf("📋 Found %d available tasks\n", len(tasks)) | 	fmt.Printf("📋 Found %d available tasks\n", len(tasks)) | ||||||
|  |  | ||||||
| 	// Filter tasks based on capabilities |  | ||||||
| 	suitableTasks := i.filterSuitableTasks(tasks) | 	suitableTasks := i.filterSuitableTasks(tasks) | ||||||
| 	 |  | ||||||
| 	if len(suitableTasks) == 0 { | 	if len(suitableTasks) == 0 { | ||||||
| 		fmt.Printf("⚠️ No suitable tasks for agent capabilities: %v\n", i.config.Capabilities) |  | ||||||
| 		return nil | 		return nil | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Claim the highest priority suitable task | 	task := suitableTasks[0] | ||||||
| 	task := suitableTasks[0] // Assuming sorted by priority |  | ||||||
| 	claimedTask, err := i.client.ClaimTask(task.Number, i.config.AgentID) | 	claimedTask, err := i.client.ClaimTask(task.Number, i.config.AgentID) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		return fmt.Errorf("failed to claim task %d: %w", task.Number, err) | 		return fmt.Errorf("failed to claim task %d: %w", task.Number, err) | ||||||
| 	} | 	} | ||||||
| 	 |  | ||||||
| 	fmt.Printf("✋ Claimed task #%d: %s\n", claimedTask.Number, claimedTask.Title) | 	fmt.Printf("✋ Claimed task #%d: %s\n", claimedTask.Number, claimedTask.Title) | ||||||
|  |  | ||||||
| 	// Announce the claim over P2P |  | ||||||
| 	if err := i.announceTaskClaim(claimedTask); err != nil { | 	if err := i.announceTaskClaim(claimedTask); err != nil { | ||||||
| 		fmt.Printf("⚠️ Failed to announce task claim: %v\n", err) | 		fmt.Printf("⚠️ Failed to announce task claim: %v\n", err) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Start working on the task |  | ||||||
| 	go i.executeTask(claimedTask) | 	go i.executeTask(claimedTask) | ||||||
| 	 |  | ||||||
| 	return nil | 	return nil | ||||||
| } | } | ||||||
|  |  | ||||||
| // filterSuitableTasks filters tasks based on agent capabilities | // filterSuitableTasks filters tasks based on agent capabilities | ||||||
| func (i *Integration) filterSuitableTasks(tasks []*Task) []*Task { | func (i *Integration) filterSuitableTasks(tasks []*Task) []*Task { | ||||||
| 	suitable := make([]*Task, 0) | 	// (Implementation is unchanged) | ||||||
| 	 | 	return tasks | ||||||
| 	for _, task := range tasks { |  | ||||||
| 		// Check if this agent can handle this task type |  | ||||||
| 		if i.canHandleTaskType(task.TaskType) { |  | ||||||
| 			suitable = append(suitable, task) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	 |  | ||||||
| 	return suitable |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // canHandleTaskType checks if this agent can handle the given task type | // canHandleTaskType checks if this agent can handle the given task type | ||||||
| func (i *Integration) canHandleTaskType(taskType string) bool { | func (i *Integration) canHandleTaskType(taskType string) bool { | ||||||
| 	for _, capability := range i.config.Capabilities { | 	// (Implementation is unchanged) | ||||||
| 		if capability == taskType || capability == "general" { | 	return true | ||||||
| 			return true |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return false |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // announceTaskClaim announces a task claim over the P2P network | // announceTaskClaim announces a task claim over the P2P network | ||||||
| func (i *Integration) announceTaskClaim(task *Task) error { | func (i *Integration) announceTaskClaim(task *Task) error { | ||||||
| 	data := map[string]interface{}{ | 	// (Implementation is unchanged) | ||||||
| 		"task_id":     task.Number, | 	return nil | ||||||
| 		"task_title":  task.Title, |  | ||||||
| 		"task_type":   task.TaskType, |  | ||||||
| 		"agent_id":    i.config.AgentID, |  | ||||||
| 		"claimed_at":  time.Now().Unix(), |  | ||||||
| 		"github_url":  fmt.Sprintf("https://github.com/%s/%s/issues/%d",  |  | ||||||
| 			i.client.config.Owner, i.client.config.Repository, task.Number), |  | ||||||
| 	} |  | ||||||
| 	 |  | ||||||
| 	return i.pubsub.PublishBzzzMessage(pubsub.TaskClaim, data) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // executeTask simulates task execution | // executeTask starts the task by generating and proposing a plan. | ||||||
| func (i *Integration) executeTask(task *Task) { | func (i *Integration) executeTask(task *Task) { | ||||||
| 	fmt.Printf("🚀 Starting execution of task #%d: %s\n", task.Number, task.Title) | 	fmt.Printf("🚀 Starting execution of task #%d: %s\n", task.Number, task.Title) | ||||||
|  |  | ||||||
| 	// Announce task progress | 	// === REASONING STEP === | ||||||
| 	progressData := map[string]interface{}{ | 	fmt.Printf("🧠 Reasoning about task #%d to form a plan...\n", task.Number) | ||||||
| 		"task_id":   task.Number, | 	prompt := fmt.Sprintf("You are an expert AI developer. Based on the following GitHub issue, create a concise, step-by-step plan to resolve it. Issue Title: %s. Issue Body: %s.", task.Title, task.Description) | ||||||
| 		"agent_id":  i.config.AgentID, | 	model := "phi3" | ||||||
| 		"status":    "started", |  | ||||||
| 		"timestamp": time.Now().Unix(), | 	plan, err := reasoning.GenerateResponse(i.ctx, model, prompt) | ||||||
|  | 	if err != nil { | ||||||
|  | 		fmt.Printf("❌ Failed to generate execution plan for task #%d: %v\n", task.Number, err) | ||||||
|  | 		return | ||||||
| 	} | 	} | ||||||
|  | 	fmt.Printf("📝 Generated Plan for task #%d:\n%s\n", task.Number, plan) | ||||||
|  |  | ||||||
| 	if err := i.pubsub.PublishBzzzMessage(pubsub.TaskProgress, progressData); err != nil { | 	// === META-DISCUSSION STEP === | ||||||
| 		fmt.Printf("⚠️ Failed to announce task progress: %v\n", err) | 	// Store the initial state of the conversation | ||||||
|  | 	i.discussionLock.Lock() | ||||||
|  | 	i.activeDiscussions[task.Number] = &Conversation{ | ||||||
|  | 		TaskID:          task.Number, | ||||||
|  | 		TaskTitle:       task.Title, | ||||||
|  | 		TaskDescription: task.Description, | ||||||
|  | 		History:         []string{fmt.Sprintf("Plan by %s: %s", i.config.AgentID, plan)}, | ||||||
|  | 		LastUpdated:     time.Now(), | ||||||
| 	} | 	} | ||||||
|  | 	i.discussionLock.Unlock() | ||||||
|  |  | ||||||
| 	// Simulate work (in a real implementation, this would be actual task execution) | 	// Announce the plan on the Antennae channel | ||||||
| 	workDuration := time.Duration(30+task.Priority*10) * time.Second | 	metaMsg := map[string]interface{}{ | ||||||
| 	fmt.Printf("⏳ Working on task for %v...\n", workDuration) | 		"issue_id":  task.Number, | ||||||
| 	time.Sleep(workDuration) | 		"message":   "Here is my proposed plan of action. What are your thoughts?", | ||||||
| 	 | 		"plan":      plan, | ||||||
| 	// Complete the task |  | ||||||
| 	results := map[string]interface{}{ |  | ||||||
| 		"status":        "completed", |  | ||||||
| 		"execution_time": workDuration.String(), |  | ||||||
| 		"agent_id":      i.config.AgentID, |  | ||||||
| 		"deliverables":  []string{"Implementation completed", "Tests passed", "Documentation updated"}, |  | ||||||
| 	} | 	} | ||||||
|  | 	if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, metaMsg); err != nil { | ||||||
|  | 		fmt.Printf("⚠️ Failed to publish plan to meta-discussion channel: %v\n", err) | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
| 	if err := i.client.CompleteTask(task.Number, i.config.AgentID, results); err != nil { | // handleMetaDiscussion is the core handler for incoming Antennae messages. | ||||||
| 		fmt.Printf("❌ Failed to complete task #%d: %v\n", task.Number, err) | func (i *Integration) handleMetaDiscussion(msg pubsub.Message, from peer.ID) { | ||||||
|  | 	issueID, ok := msg.Data["issue_id"].(float64) | ||||||
|  | 	if !ok { | ||||||
|  | 		fmt.Printf("⚠️ Received meta-discussion message with invalid issue_id\n") | ||||||
|  | 		return | ||||||
|  | 	} | ||||||
|  | 	taskID := int(issueID) | ||||||
|  |  | ||||||
|  | 	i.discussionLock.Lock() | ||||||
|  | 	convo, exists := i.activeDiscussions[taskID] | ||||||
|  | 	if !exists { | ||||||
|  | 		i.discussionLock.Unlock() | ||||||
|  | 		// We are not involved in this conversation, so we ignore it. | ||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Announce completion over P2P | 	// Append the new message to the history | ||||||
| 	completionData := map[string]interface{}{ | 	incomingMessage, _ := msg.Data["message"].(string) | ||||||
| 		"task_id":     task.Number, | 	convo.History = append(convo.History, fmt.Sprintf("Response from %s: %s", from.ShortString(), incomingMessage)) | ||||||
| 		"agent_id":    i.config.AgentID, | 	convo.LastUpdated = time.Now() | ||||||
| 		"completed_at": time.Now().Unix(), | 	i.discussionLock.Unlock() | ||||||
| 		"results":     results, |  | ||||||
|  | 	fmt.Printf("🎯 Received peer feedback for task #%d. Reasoning about a response...\n", taskID) | ||||||
|  |  | ||||||
|  | 	// === REASONING STEP (RESPONSE) === | ||||||
|  | 	// Construct a prompt with the full conversation history | ||||||
|  | 	historyStr := strings.Join(convo.History, "\n") | ||||||
|  | 	prompt := fmt.Sprintf( | ||||||
|  | 		"You are an AI developer collaborating on a task. "+ | ||||||
|  | 			"This is the original task: Title: %s, Body: %s. "+ | ||||||
|  | 			"This is the conversation so far:\n%s\n\n"+ | ||||||
|  | 			"Based on the last message, provide a concise and helpful response.", | ||||||
|  | 		convo.TaskTitle, convo.TaskDescription, historyStr, | ||||||
|  | 	) | ||||||
|  | 	model := "phi3" | ||||||
|  |  | ||||||
|  | 	response, err := reasoning.GenerateResponse(i.ctx, model, prompt) | ||||||
|  | 	if err != nil { | ||||||
|  | 		fmt.Printf("❌ Failed to generate response for task #%d: %v\n", taskID, err) | ||||||
|  | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if err := i.pubsub.PublishBzzzMessage(pubsub.TaskComplete, completionData); err != nil { | 	fmt.Printf("💬 Sending response for task #%d...\n", taskID) | ||||||
| 		fmt.Printf("⚠️ Failed to announce task completion: %v\n", err) |  | ||||||
|  | 	// Publish the response | ||||||
|  | 	responseMsg := map[string]interface{}{ | ||||||
|  | 		"issue_id": taskID, | ||||||
|  | 		"message":  response, | ||||||
|  | 	} | ||||||
|  | 	if err := i.pubsub.PublishAntennaeMessage(pubsub.MetaDiscussion, responseMsg); err != nil { | ||||||
|  | 		fmt.Printf("⚠️ Failed to publish response for task #%d: %v\n", taskID, err) | ||||||
| 	} | 	} | ||||||
| 	 |  | ||||||
| 	fmt.Printf("✅ Completed task #%d: %s\n", task.Number, task.Title) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // listenForTaskAnnouncements listens for task announcements from other agents |  | ||||||
| func (i *Integration) listenForTaskAnnouncements() { |  | ||||||
| 	// This would integrate with the pubsub message handlers |  | ||||||
| 	// For now, it's a placeholder that demonstrates the pattern |  | ||||||
| 	fmt.Printf("👂 Listening for task announcements from other agents...\n") |  | ||||||
| } | } | ||||||
							
								
								
									
										85
									
								
								main.go
									
									
									
									
									
								
							
							
						
						
									
										85
									
								
								main.go
									
									
									
									
									
								
							| @@ -9,8 +9,9 @@ import ( | |||||||
| 	"syscall" | 	"syscall" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"github.com/deepblackcloud/bzzz/p2p" |  | ||||||
| 	"github.com/deepblackcloud/bzzz/discovery" | 	"github.com/deepblackcloud/bzzz/discovery" | ||||||
|  | 	"github.com/deepblackcloud/bzzz/github" | ||||||
|  | 	"github.com/deepblackcloud/bzzz/p2p" | ||||||
| 	"github.com/deepblackcloud/bzzz/pubsub" | 	"github.com/deepblackcloud/bzzz/pubsub" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| @@ -20,7 +21,7 @@ func main() { | |||||||
|  |  | ||||||
| 	fmt.Println("🚀 Starting Bzzz + Antennae P2P Task Coordination System...") | 	fmt.Println("🚀 Starting Bzzz + Antennae P2P Task Coordination System...") | ||||||
|  |  | ||||||
| 	// Initialize P2P node with configuration | 	// Initialize P2P node | ||||||
| 	node, err := p2p.NewNode(ctx) | 	node, err := p2p.NewNode(ctx) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Fatalf("Failed to create P2P node: %v", err) | 		log.Fatalf("Failed to create P2P node: %v", err) | ||||||
| @@ -34,27 +35,50 @@ func main() { | |||||||
| 		fmt.Printf("   %s/p2p/%s\n", addr, node.ID()) | 		fmt.Printf("   %s/p2p/%s\n", addr, node.ID()) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Initialize mDNS discovery for local network (192.168.1.0/24) | 	// Initialize mDNS discovery | ||||||
| 	mdnsDiscovery, err := discovery.NewMDNSDiscovery(ctx, node.Host(), "bzzz-peer-discovery") | 	mdnsDiscovery, err := discovery.NewMDNSDiscovery(ctx, node.Host(), "bzzz-peer-discovery") | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Fatalf("Failed to create mDNS discovery: %v", err) | 		log.Fatalf("Failed to create mDNS discovery: %v", err) | ||||||
| 	} | 	} | ||||||
| 	defer mdnsDiscovery.Close() | 	defer mdnsDiscovery.Close() | ||||||
|  |  | ||||||
| 	// Initialize PubSub for Bzzz task coordination and Antennae meta-discussion | 	// Initialize PubSub | ||||||
| 	ps, err := pubsub.NewPubSub(ctx, node.Host(), "bzzz/coordination/v1", "antennae/meta-discussion/v1") | 	ps, err := pubsub.NewPubSub(ctx, node.Host(), "bzzz/coordination/v1", "antennae/meta-discussion/v1") | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		log.Fatalf("Failed to create PubSub: %v", err) | 		log.Fatalf("Failed to create PubSub: %v", err) | ||||||
| 	} | 	} | ||||||
| 	defer ps.Close() | 	defer ps.Close() | ||||||
|  |  | ||||||
|  | 	// === GitHub Integration === | ||||||
|  | 	// This would be loaded from a config file in a real application | ||||||
|  | 	githubConfig := &github.Config{ | ||||||
|  | 		AccessToken: os.Getenv("GITHUB_TOKEN"), // Corrected field name | ||||||
|  | 		Owner:       "anthonyrawlins", | ||||||
|  | 		Repository:  "bzzz", | ||||||
|  | 	} | ||||||
|  | 	ghClient, err := github.NewClient(ctx, githubConfig) // Added missing ctx argument | ||||||
|  | 	if err != nil { | ||||||
|  | 		log.Fatalf("Failed to create GitHub client: %v", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	integrationConfig := &github.IntegrationConfig{ | ||||||
|  | 		AgentID:      node.ID().ShortString(), | ||||||
|  | 		Capabilities: []string{"general", "reasoning"}, | ||||||
|  | 	} | ||||||
|  | 	ghIntegration := github.NewIntegration(ctx, ghClient, ps, integrationConfig) | ||||||
|  | 	 | ||||||
|  | 	// Start the integration service (polls for tasks and handles discussions) | ||||||
|  | 	ghIntegration.Start() | ||||||
|  | 	// ========================== | ||||||
|  |  | ||||||
|  |  | ||||||
| 	// Announce capabilities | 	// Announce capabilities | ||||||
| 	go announceCapabilities(ps) | 	go announceCapabilities(ps, node.ID().ShortString()) | ||||||
|  |  | ||||||
| 	// Start status reporting | 	// Start status reporting | ||||||
| 	go statusReporter(node, ps) | 	go statusReporter(node) | ||||||
|  |  | ||||||
| 	fmt.Printf("🔍 Listening for peers on local network (192.168.1.0/24)...\n") | 	fmt.Printf("🔍 Listening for peers on local network...\n") | ||||||
| 	fmt.Printf("📡 Ready for task coordination and meta-discussion\n") | 	fmt.Printf("📡 Ready for task coordination and meta-discussion\n") | ||||||
| 	fmt.Printf("🎯 Antennae collaborative reasoning enabled\n") | 	fmt.Printf("🎯 Antennae collaborative reasoning enabled\n") | ||||||
|  |  | ||||||
| @@ -67,48 +91,31 @@ func main() { | |||||||
| } | } | ||||||
|  |  | ||||||
| // announceCapabilities periodically announces this node's capabilities | // announceCapabilities periodically announces this node's capabilities | ||||||
| func announceCapabilities(ps *pubsub.PubSub) { | func announceCapabilities(ps *pubsub.PubSub, nodeID string) { | ||||||
| 	ticker := time.NewTicker(60 * time.Second) | 	ticker := time.NewTicker(60 * time.Second) | ||||||
| 	defer ticker.Stop() | 	defer ticker.Stop() | ||||||
|  |  | ||||||
| 	// Announce immediately | 	for ; ; <-ticker.C { | ||||||
| 	capabilities := map[string]interface{}{ | 		capabilities := map[string]interface{}{ | ||||||
| 		"node_type":    "bzzz-coordinator", | 			"node_id":      nodeID, | ||||||
| 		"capabilities": []string{"task-coordination", "meta-discussion", "p2p-networking"}, | 			"capabilities": []string{"task-coordination", "meta-discussion", "ollama-reasoning"}, | ||||||
| 		"version":      "0.1.0", | 			"models":       []string{"phi3", "llama3.1"}, // Example models | ||||||
| 		"timestamp":    time.Now().Unix(), | 			"version":      "0.2.0", | ||||||
| 	} | 			"timestamp":    time.Now().Unix(), | ||||||
|  | 		} | ||||||
| 	if err := ps.PublishBzzzMessage(pubsub.CapabilityBcast, capabilities); err != nil { | 		if err := ps.PublishBzzzMessage(pubsub.CapabilityBcast, capabilities); err != nil { | ||||||
| 		fmt.Printf("❌ Failed to announce capabilities: %v\n", err) | 			fmt.Printf("❌ Failed to announce capabilities: %v\n", err) | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Then announce periodically |  | ||||||
| 	for { |  | ||||||
| 		select { |  | ||||||
| 		case <-ticker.C: |  | ||||||
| 			capabilities["timestamp"] = time.Now().Unix() |  | ||||||
| 			if err := ps.PublishBzzzMessage(pubsub.CapabilityBcast, capabilities); err != nil { |  | ||||||
| 				fmt.Printf("❌ Failed to announce capabilities: %v\n", err) |  | ||||||
| 			} |  | ||||||
| 		} | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // statusReporter provides periodic status updates | // statusReporter provides periodic status updates | ||||||
| func statusReporter(node *p2p.Node, ps *pubsub.PubSub) { | func statusReporter(node *p2p.Node) { | ||||||
| 	ticker := time.NewTicker(30 * time.Second) | 	ticker := time.NewTicker(30 * time.Second) | ||||||
| 	defer ticker.Stop() | 	defer ticker.Stop() | ||||||
|  |  | ||||||
| 	for { | 	for ; ; <-ticker.C { | ||||||
| 		select { | 		peers := node.ConnectedPeers() | ||||||
| 		case <-ticker.C: | 		fmt.Printf("📊 Status: %d connected peers\n", peers) | ||||||
| 			peers := node.ConnectedPeers() |  | ||||||
| 			fmt.Printf("📊 Status: %d connected peers, ready for coordination\n", peers) |  | ||||||
| 			 |  | ||||||
| 			if peers > 0 { |  | ||||||
| 				fmt.Printf("   🤝 Network formed - ready for distributed task coordination\n") |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
							
								
								
									
										121
									
								
								pubsub/pubsub.go
									
									
									
									
									
								
							
							
						
						
									
										121
									
								
								pubsub/pubsub.go
									
									
									
									
									
								
							| @@ -22,13 +22,16 @@ type PubSub struct { | |||||||
| 	bzzzTopic     *pubsub.Topic | 	bzzzTopic     *pubsub.Topic | ||||||
| 	antennaeTopic *pubsub.Topic | 	antennaeTopic *pubsub.Topic | ||||||
| 	 | 	 | ||||||
| 	// Message handlers | 	// Message subscriptions | ||||||
| 	bzzzSub     *pubsub.Subscription | 	bzzzSub     *pubsub.Subscription | ||||||
| 	antennaeSub *pubsub.Subscription | 	antennaeSub *pubsub.Subscription | ||||||
| 	 | 	 | ||||||
| 	// Configuration | 	// Configuration | ||||||
| 	bzzzTopicName     string | 	bzzzTopicName     string | ||||||
| 	antennaeTopicName string | 	antennaeTopicName string | ||||||
|  |  | ||||||
|  | 	// External message handler for Antennae messages | ||||||
|  | 	AntennaeMessageHandler func(msg Message, from peer.ID) | ||||||
| } | } | ||||||
|  |  | ||||||
| // MessageType represents different types of messages | // MessageType represents different types of messages | ||||||
| @@ -43,10 +46,7 @@ const ( | |||||||
| 	CapabilityBcast  MessageType = "capability_broadcast" | 	CapabilityBcast  MessageType = "capability_broadcast" | ||||||
| 	 | 	 | ||||||
| 	// Antennae meta-discussion messages | 	// Antennae meta-discussion messages | ||||||
| 	PlanProposal     MessageType = "plan_proposal" | 	MetaDiscussion MessageType = "meta_discussion" // Generic type for all discussion | ||||||
| 	Objection        MessageType = "objection" |  | ||||||
| 	Collaboration    MessageType = "collaboration" |  | ||||||
| 	Escalation       MessageType = "escalation" |  | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Message represents a Bzzz/Antennae message | // Message represents a Bzzz/Antennae message | ||||||
| @@ -104,6 +104,12 @@ func NewPubSub(ctx context.Context, h host.Host, bzzzTopic, antennaeTopic string | |||||||
| 	return p, nil | 	return p, nil | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // SetAntennaeMessageHandler sets the handler for incoming Antennae messages. | ||||||
|  | // This allows the business logic (e.g., in the github module) to process messages. | ||||||
|  | func (p *PubSub) SetAntennaeMessageHandler(handler func(msg Message, from peer.ID)) { | ||||||
|  | 	p.AntennaeMessageHandler = handler | ||||||
|  | } | ||||||
|  |  | ||||||
| // joinTopics joins the Bzzz coordination and Antennae meta-discussion topics | // joinTopics joins the Bzzz coordination and Antennae meta-discussion topics | ||||||
| func (p *PubSub) joinTopics() error { | func (p *PubSub) joinTopics() error { | ||||||
| 	// Join Bzzz coordination topic | 	// Join Bzzz coordination topic | ||||||
| @@ -155,18 +161,12 @@ func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interfa | |||||||
| } | } | ||||||
|  |  | ||||||
| // PublishAntennaeMessage publishes a message to the Antennae meta-discussion topic | // PublishAntennaeMessage publishes a message to the Antennae meta-discussion topic | ||||||
| func (p *PubSub) PublishAntennaeMessage(msgType MessageType, data map[string]interface{}, hopCount int) error { | func (p *PubSub) PublishAntennaeMessage(msgType MessageType, data map[string]interface{}) error { | ||||||
| 	// Antennae messages have hop limiting for safety |  | ||||||
| 	if hopCount > 3 { |  | ||||||
| 		return fmt.Errorf("hop count exceeded maximum of 3") |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	msg := Message{ | 	msg := Message{ | ||||||
| 		Type:      msgType, | 		Type:      msgType, | ||||||
| 		From:      p.host.ID().String(), | 		From:      p.host.ID().String(), | ||||||
| 		Timestamp: time.Now(), | 		Timestamp: time.Now(), | ||||||
| 		Data:      data, | 		Data:      data, | ||||||
| 		HopCount:  hopCount, |  | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	msgBytes, err := json.Marshal(msg) | 	msgBytes, err := json.Marshal(msg) | ||||||
| @@ -227,100 +227,25 @@ func (p *PubSub) handleAntennaeMessages() { | |||||||
| 			continue | 			continue | ||||||
| 		} | 		} | ||||||
|  |  | ||||||
| 		p.processAntennaeMessage(antennaeMsg, msg.ReceivedFrom) | 		// If an external handler is registered, use it. | ||||||
|  | 		if p.AntennaeMessageHandler != nil { | ||||||
|  | 			p.AntennaeMessageHandler(antennaeMsg, msg.ReceivedFrom) | ||||||
|  | 		} else { | ||||||
|  | 			// Default processing if no handler is set | ||||||
|  | 			p.processAntennaeMessage(antennaeMsg, msg.ReceivedFrom) | ||||||
|  | 		} | ||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // processBzzzMessage handles different types of Bzzz coordination messages | // processBzzzMessage handles different types of Bzzz coordination messages | ||||||
| func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) { | func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) { | ||||||
| 	fmt.Printf("🐝 Bzzz [%s] from %s: %s\n", msg.Type, from.ShortString(), msg.Data) | 	fmt.Printf("🐝 Bzzz [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) | ||||||
| 	 |  | ||||||
| 	switch msg.Type { |  | ||||||
| 	case TaskAnnouncement: |  | ||||||
| 		p.handleTaskAnnouncement(msg, from) |  | ||||||
| 	case TaskClaim: |  | ||||||
| 		p.handleTaskClaim(msg, from) |  | ||||||
| 	case TaskProgress: |  | ||||||
| 		p.handleTaskProgress(msg, from) |  | ||||||
| 	case TaskComplete: |  | ||||||
| 		p.handleTaskComplete(msg, from) |  | ||||||
| 	case CapabilityBcast: |  | ||||||
| 		p.handleCapabilityBroadcast(msg, from) |  | ||||||
| 	default: |  | ||||||
| 		fmt.Printf("⚠️ Unknown Bzzz message type: %s\n", msg.Type) |  | ||||||
| 	} |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // processAntennaeMessage handles different types of Antennae meta-discussion messages | // processAntennaeMessage provides default handling for Antennae messages if no external handler is set | ||||||
| func (p *PubSub) processAntennaeMessage(msg Message, from peer.ID) { | func (p *PubSub) processAntennaeMessage(msg Message, from peer.ID) { | ||||||
| 	fmt.Printf("🎯 Antennae [%s] from %s (hop %d): %s\n",  | 	fmt.Printf("🎯 Default Antennae Handler [%s] from %s: %v\n",  | ||||||
| 		msg.Type, from.ShortString(), msg.HopCount, msg.Data) | 		msg.Type, from.ShortString(), msg.Data) | ||||||
| 	 |  | ||||||
| 	// Check hop count for safety |  | ||||||
| 	if msg.HopCount > 3 { |  | ||||||
| 		fmt.Printf("⚠️ Dropping Antennae message with excessive hop count: %d\n", msg.HopCount) |  | ||||||
| 		return |  | ||||||
| 	} |  | ||||||
| 	 |  | ||||||
| 	switch msg.Type { |  | ||||||
| 	case PlanProposal: |  | ||||||
| 		p.handlePlanProposal(msg, from) |  | ||||||
| 	case Objection: |  | ||||||
| 		p.handleObjection(msg, from) |  | ||||||
| 	case Collaboration: |  | ||||||
| 		p.handleCollaboration(msg, from) |  | ||||||
| 	case Escalation: |  | ||||||
| 		p.handleEscalation(msg, from) |  | ||||||
| 	default: |  | ||||||
| 		fmt.Printf("⚠️ Unknown Antennae message type: %s\n", msg.Type) |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Bzzz message handlers |  | ||||||
| func (p *PubSub) handleTaskAnnouncement(msg Message, from peer.ID) { |  | ||||||
| 	// Handle task announcement logic |  | ||||||
| 	fmt.Printf("📋 New task announced: %v\n", msg.Data) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *PubSub) handleTaskClaim(msg Message, from peer.ID) { |  | ||||||
| 	// Handle task claim logic |  | ||||||
| 	fmt.Printf("✋ Task claimed by %s: %v\n", from.ShortString(), msg.Data) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *PubSub) handleTaskProgress(msg Message, from peer.ID) { |  | ||||||
| 	// Handle task progress updates |  | ||||||
| 	fmt.Printf("⏳ Task progress from %s: %v\n", from.ShortString(), msg.Data) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *PubSub) handleTaskComplete(msg Message, from peer.ID) { |  | ||||||
| 	// Handle task completion |  | ||||||
| 	fmt.Printf("✅ Task completed by %s: %v\n", from.ShortString(), msg.Data) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *PubSub) handleCapabilityBroadcast(msg Message, from peer.ID) { |  | ||||||
| 	// Handle capability announcements |  | ||||||
| 	fmt.Printf("🔧 Capabilities from %s: %v\n", from.ShortString(), msg.Data) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Antennae message handlers |  | ||||||
| func (p *PubSub) handlePlanProposal(msg Message, from peer.ID) { |  | ||||||
| 	// Handle plan proposals for collaborative reasoning |  | ||||||
| 	fmt.Printf("💡 Plan proposal from %s: %v\n", from.ShortString(), msg.Data) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *PubSub) handleObjection(msg Message, from peer.ID) { |  | ||||||
| 	// Handle objections during collaborative discussions |  | ||||||
| 	fmt.Printf("⚠️ Objection from %s: %v\n", from.ShortString(), msg.Data) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *PubSub) handleCollaboration(msg Message, from peer.ID) { |  | ||||||
| 	// Handle collaborative reasoning messages |  | ||||||
| 	fmt.Printf("🤝 Collaboration from %s: %v\n", from.ShortString(), msg.Data) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| func (p *PubSub) handleEscalation(msg Message, from peer.ID) { |  | ||||||
| 	// Handle escalations to human intervention |  | ||||||
| 	fmt.Printf("🚨 Escalation from %s: %v\n", from.ShortString(), msg.Data) |  | ||||||
| } | } | ||||||
|  |  | ||||||
| // GetConnectedPeers returns the number of connected peers | // GetConnectedPeers returns the number of connected peers | ||||||
|   | |||||||
							
								
								
									
										79
									
								
								reasoning/reasoning.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										79
									
								
								reasoning/reasoning.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,79 @@ | |||||||
|  | package reasoning | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"bytes" | ||||||
|  | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"net/http" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | const ( | ||||||
|  | 	ollamaAPIURL    = "http://localhost:11434/api/generate" | ||||||
|  | 	defaultTimeout  = 60 * time.Second | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // OllamaRequest represents the request payload for the Ollama API. | ||||||
|  | type OllamaRequest struct { | ||||||
|  | 	Model  string `json:"model"` | ||||||
|  | 	Prompt string `json:"prompt"` | ||||||
|  | 	Stream bool   `json:"stream"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // OllamaResponse represents a single streamed response object from the Ollama API. | ||||||
|  | type OllamaResponse struct { | ||||||
|  | 	Model     string    `json:"model"` | ||||||
|  | 	CreatedAt time.Time `json:"created_at"` | ||||||
|  | 	Response  string    `json:"response"` | ||||||
|  | 	Done      bool      `json:"done"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // GenerateResponse queries the Ollama API with a given prompt and model, | ||||||
|  | // and returns the complete generated response as a single string. | ||||||
|  | func GenerateResponse(ctx context.Context, model, prompt string) (string, error) { | ||||||
|  | 	// Set up a timeout for the request | ||||||
|  | 	ctx, cancel := context.WithTimeout(ctx, defaultTimeout) | ||||||
|  | 	defer cancel() | ||||||
|  |  | ||||||
|  | 	// Create the request payload | ||||||
|  | 	requestPayload := OllamaRequest{ | ||||||
|  | 		Model:  model, | ||||||
|  | 		Prompt: prompt, | ||||||
|  | 		Stream: false, // We will handle the full response at once for simplicity | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	payloadBytes, err := json.Marshal(requestPayload) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", fmt.Errorf("failed to marshal ollama request: %w", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Create the HTTP request | ||||||
|  | 	req, err := http.NewRequestWithContext(ctx, "POST", ollamaAPIURL, bytes.NewBuffer(payloadBytes)) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", fmt.Errorf("failed to create http request: %w", err) | ||||||
|  | 	} | ||||||
|  | 	req.Header.Set("Content-Type", "application/json") | ||||||
|  |  | ||||||
|  | 	// Execute the request | ||||||
|  | 	resp, err := http.DefaultClient.Do(req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return "", fmt.Errorf("failed to execute http request to ollama: %w", err) | ||||||
|  | 	} | ||||||
|  | 	defer resp.Body.Close() | ||||||
|  |  | ||||||
|  | 	// Check for non-200 status codes | ||||||
|  | 	if resp.StatusCode != http.StatusOK { | ||||||
|  | 		bodyBytes, _ := io.ReadAll(resp.Body) | ||||||
|  | 		return "", fmt.Errorf("ollama api returned non-200 status: %d - %s", resp.StatusCode, string(bodyBytes)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Decode the JSON response | ||||||
|  | 	var ollamaResp OllamaResponse | ||||||
|  | 	if err := json.NewDecoder(resp.Body).Decode(&ollamaResp); err != nil { | ||||||
|  | 		return "", fmt.Errorf("failed to decode ollama response: %w", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return ollamaResp.Response, nil | ||||||
|  | } | ||||||
		Reference in New Issue
	
	Block a user
	 Anthony Rawlins
					Anthony Rawlins