Fix Go module imports and add dynamic Ollama model selection with N8N integration
- Fixed module path from github.com/deepblackcloud/bzzz to github.com/anthonyrawlins/bzzz - Added dynamic Ollama model detection via /api/tags endpoint - Implemented intelligent model selection through N8N webhook integration - Added BZZZ_MODEL_SELECTION_WEBHOOK environment variable support - Fixed GitHub assignee issue by using valid username instead of peer ID - Added comprehensive model fallback mechanisms - Updated all import statements across the codebase - Removed duplicate systemd service file - Added sandbox execution environment and type definitions 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
143
pubsub/pubsub.go
143
pubsub/pubsub.go
@@ -4,6 +4,7 @@ import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/libp2p/go-libp2p/core/host"
|
||||
@@ -26,6 +27,12 @@ type PubSub struct {
|
||||
bzzzSub *pubsub.Subscription
|
||||
antennaeSub *pubsub.Subscription
|
||||
|
||||
// Dynamic topic management
|
||||
dynamicTopics map[string]*pubsub.Topic
|
||||
dynamicTopicsMux sync.RWMutex
|
||||
dynamicSubs map[string]*pubsub.Subscription
|
||||
dynamicSubsMux sync.RWMutex
|
||||
|
||||
// Configuration
|
||||
bzzzTopicName string
|
||||
antennaeTopicName string
|
||||
@@ -48,6 +55,8 @@ const (
|
||||
|
||||
// Antennae meta-discussion messages
|
||||
MetaDiscussion MessageType = "meta_discussion" // Generic type for all discussion
|
||||
TaskHelpRequest MessageType = "task_help_request" // Request for assistance
|
||||
TaskHelpResponse MessageType = "task_help_response" // Response to a help request
|
||||
CoordinationRequest MessageType = "coordination_request" // Request for coordination
|
||||
CoordinationComplete MessageType = "coordination_complete" // Coordination session completed
|
||||
DependencyAlert MessageType = "dependency_alert" // Dependency detected
|
||||
@@ -93,10 +102,12 @@ func NewPubSub(ctx context.Context, h host.Host, bzzzTopic, antennaeTopic string
|
||||
cancel: cancel,
|
||||
bzzzTopicName: bzzzTopic,
|
||||
antennaeTopicName: antennaeTopic,
|
||||
dynamicTopics: make(map[string]*pubsub.Topic),
|
||||
dynamicSubs: make(map[string]*pubsub.Subscription),
|
||||
}
|
||||
|
||||
// Join topics
|
||||
if err := p.joinTopics(); err != nil {
|
||||
// Join static topics
|
||||
if err := p.joinStaticTopics(); err != nil {
|
||||
cancel()
|
||||
return nil, err
|
||||
}
|
||||
@@ -110,13 +121,12 @@ func NewPubSub(ctx context.Context, h host.Host, bzzzTopic, antennaeTopic string
|
||||
}
|
||||
|
||||
// SetAntennaeMessageHandler sets the handler for incoming Antennae messages.
|
||||
// This allows the business logic (e.g., in the github module) to process messages.
|
||||
func (p *PubSub) SetAntennaeMessageHandler(handler func(msg Message, from peer.ID)) {
|
||||
p.AntennaeMessageHandler = handler
|
||||
}
|
||||
|
||||
// joinTopics joins the Bzzz coordination and Antennae meta-discussion topics
|
||||
func (p *PubSub) joinTopics() error {
|
||||
// joinStaticTopics joins the main Bzzz and Antennae topics
|
||||
func (p *PubSub) joinStaticTopics() error {
|
||||
// Join Bzzz coordination topic
|
||||
bzzzTopic, err := p.ps.Join(p.bzzzTopicName)
|
||||
if err != nil {
|
||||
@@ -124,7 +134,6 @@ func (p *PubSub) joinTopics() error {
|
||||
}
|
||||
p.bzzzTopic = bzzzTopic
|
||||
|
||||
// Subscribe to Bzzz messages
|
||||
bzzzSub, err := bzzzTopic.Subscribe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to subscribe to Bzzz topic: %w", err)
|
||||
@@ -138,7 +147,6 @@ func (p *PubSub) joinTopics() error {
|
||||
}
|
||||
p.antennaeTopic = antennaeTopic
|
||||
|
||||
// Subscribe to Antennae messages
|
||||
antennaeSub, err := antennaeTopic.Subscribe()
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to subscribe to Antennae topic: %w", err)
|
||||
@@ -148,6 +156,83 @@ func (p *PubSub) joinTopics() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// JoinDynamicTopic joins a new topic for a specific task
|
||||
func (p *PubSub) JoinDynamicTopic(topicName string) error {
|
||||
p.dynamicTopicsMux.Lock()
|
||||
defer p.dynamicTopicsMux.Unlock()
|
||||
p.dynamicSubsMux.Lock()
|
||||
defer p.dynamicSubsMux.Unlock()
|
||||
|
||||
if _, exists := p.dynamicTopics[topicName]; exists {
|
||||
return nil // Already joined
|
||||
}
|
||||
|
||||
topic, err := p.ps.Join(topicName)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to join dynamic topic %s: %w", topicName, err)
|
||||
}
|
||||
|
||||
sub, err := topic.Subscribe()
|
||||
if err != nil {
|
||||
topic.Close()
|
||||
return fmt.Errorf("failed to subscribe to dynamic topic %s: %w", topicName, err)
|
||||
}
|
||||
|
||||
p.dynamicTopics[topicName] = topic
|
||||
p.dynamicSubs[topicName] = sub
|
||||
|
||||
// Start a handler for this new subscription
|
||||
go p.handleDynamicMessages(sub)
|
||||
|
||||
fmt.Printf("✅ Joined dynamic topic: %s\n", topicName)
|
||||
return nil
|
||||
}
|
||||
|
||||
// LeaveDynamicTopic leaves a specific task topic
|
||||
func (p *PubSub) LeaveDynamicTopic(topicName string) {
|
||||
p.dynamicTopicsMux.Lock()
|
||||
defer p.dynamicTopicsMux.Unlock()
|
||||
p.dynamicSubsMux.Lock()
|
||||
defer p.dynamicSubsMux.Unlock()
|
||||
|
||||
if sub, exists := p.dynamicSubs[topicName]; exists {
|
||||
sub.Cancel()
|
||||
delete(p.dynamicSubs, topicName)
|
||||
}
|
||||
|
||||
if topic, exists := p.dynamicTopics[topicName]; exists {
|
||||
topic.Close()
|
||||
delete(p.dynamicTopics, topicName)
|
||||
}
|
||||
|
||||
fmt.Printf("🗑️ Left dynamic topic: %s\n", topicName)
|
||||
}
|
||||
|
||||
// PublishToDynamicTopic publishes a message to a specific dynamic topic
|
||||
func (p *PubSub) PublishToDynamicTopic(topicName string, msgType MessageType, data map[string]interface{}) error {
|
||||
p.dynamicTopicsMux.RLock()
|
||||
topic, exists := p.dynamicTopics[topicName]
|
||||
p.dynamicTopicsMux.RUnlock()
|
||||
|
||||
if !exists {
|
||||
return fmt.Errorf("not subscribed to dynamic topic: %s", topicName)
|
||||
}
|
||||
|
||||
msg := Message{
|
||||
Type: msgType,
|
||||
From: p.host.ID().String(),
|
||||
Timestamp: time.Now(),
|
||||
Data: data,
|
||||
}
|
||||
|
||||
msgBytes, err := json.Marshal(msg)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to marshal message for dynamic topic: %w", err)
|
||||
}
|
||||
|
||||
return topic.Publish(p.ctx, msgBytes)
|
||||
}
|
||||
|
||||
// PublishBzzzMessage publishes a message to the Bzzz coordination topic
|
||||
func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interface{}) error {
|
||||
msg := Message{
|
||||
@@ -194,7 +279,6 @@ func (p *PubSub) handleBzzzMessages() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip our own messages
|
||||
if msg.ReceivedFrom == p.host.ID() {
|
||||
continue
|
||||
}
|
||||
@@ -221,7 +305,6 @@ func (p *PubSub) handleAntennaeMessages() {
|
||||
continue
|
||||
}
|
||||
|
||||
// Skip our own messages
|
||||
if msg.ReceivedFrom == p.host.ID() {
|
||||
continue
|
||||
}
|
||||
@@ -232,16 +315,43 @@ func (p *PubSub) handleAntennaeMessages() {
|
||||
continue
|
||||
}
|
||||
|
||||
// If an external handler is registered, use it.
|
||||
if p.AntennaeMessageHandler != nil {
|
||||
p.AntennaeMessageHandler(antennaeMsg, msg.ReceivedFrom)
|
||||
} else {
|
||||
// Default processing if no handler is set
|
||||
p.processAntennaeMessage(antennaeMsg, msg.ReceivedFrom)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// handleDynamicMessages processes messages from a dynamic topic subscription
|
||||
func (p *PubSub) handleDynamicMessages(sub *pubsub.Subscription) {
|
||||
for {
|
||||
msg, err := sub.Next(p.ctx)
|
||||
if err != nil {
|
||||
if p.ctx.Err() != nil || err.Error() == "subscription cancelled" {
|
||||
return // Subscription was cancelled, exit handler
|
||||
}
|
||||
fmt.Printf("❌ Error receiving dynamic message: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
if msg.ReceivedFrom == p.host.ID() {
|
||||
continue
|
||||
}
|
||||
|
||||
var dynamicMsg Message
|
||||
if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil {
|
||||
fmt.Printf("❌ Failed to unmarshal dynamic message: %v\n", err)
|
||||
continue
|
||||
}
|
||||
|
||||
// Use the main Antennae handler for all dynamic messages
|
||||
if p.AntennaeMessageHandler != nil {
|
||||
p.AntennaeMessageHandler(dynamicMsg, msg.ReceivedFrom)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// processBzzzMessage handles different types of Bzzz coordination messages
|
||||
func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) {
|
||||
fmt.Printf("🐝 Bzzz [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data)
|
||||
@@ -253,11 +363,6 @@ func (p *PubSub) processAntennaeMessage(msg Message, from peer.ID) {
|
||||
msg.Type, from.ShortString(), msg.Data)
|
||||
}
|
||||
|
||||
// GetConnectedPeers returns the number of connected peers
|
||||
func (p *PubSub) GetConnectedPeers() int {
|
||||
return len(p.host.Network().Peers())
|
||||
}
|
||||
|
||||
// Close shuts down the PubSub instance
|
||||
func (p *PubSub) Close() error {
|
||||
p.cancel()
|
||||
@@ -276,5 +381,11 @@ func (p *PubSub) Close() error {
|
||||
p.antennaeTopic.Close()
|
||||
}
|
||||
|
||||
p.dynamicTopicsMux.Lock()
|
||||
for _, topic := range p.dynamicTopics {
|
||||
topic.Close()
|
||||
}
|
||||
p.dynamicTopicsMux.Unlock()
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user