 054fb67767
			
		
	
	054fb67767
	
	
	
		
			
			- Fixed module path from github.com/deepblackcloud/bzzz to github.com/anthonyrawlins/bzzz - Added dynamic Ollama model detection via /api/tags endpoint - Implemented intelligent model selection through N8N webhook integration - Added BZZZ_MODEL_SELECTION_WEBHOOK environment variable support - Fixed GitHub assignee issue by using valid username instead of peer ID - Added comprehensive model fallback mechanisms - Updated all import statements across the codebase - Removed duplicate systemd service file - Added sandbox execution environment and type definitions 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			447 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			447 lines
		
	
	
		
			13 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package monitoring
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"os"
 | |
| 	"path/filepath"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/anthonyrawlins/bzzz/pubsub"
 | |
| )
 | |
| 
 | |
| // AntennaeMonitor tracks and logs antennae coordination activity
 | |
| type AntennaeMonitor struct {
 | |
| 	ctx            context.Context
 | |
| 	pubsub         *pubsub.PubSub
 | |
| 	logFile        *os.File
 | |
| 	metricsFile    *os.File
 | |
| 	activeSessions map[string]*CoordinationSession
 | |
| 	metrics        *CoordinationMetrics
 | |
| 	mu             sync.RWMutex
 | |
| 	isRunning      bool
 | |
| }
 | |
| 
 | |
| // CoordinationSession tracks an active coordination session
 | |
| type CoordinationSession struct {
 | |
| 	SessionID       string                 `json:"session_id"`
 | |
| 	StartTime       time.Time              `json:"start_time"`
 | |
| 	LastActivity    time.Time              `json:"last_activity"`
 | |
| 	Repositories    []string               `json:"repositories"`
 | |
| 	Tasks           []string               `json:"tasks"`
 | |
| 	Participants    []string               `json:"participants"`
 | |
| 	Messages        []CoordinationMessage  `json:"messages"`
 | |
| 	Dependencies    []TaskDependency       `json:"dependencies"`
 | |
| 	Status          string                 `json:"status"` // active, completed, escalated, failed
 | |
| 	Outcome         map[string]interface{} `json:"outcome"`
 | |
| }
 | |
| 
 | |
| // CoordinationMessage represents a message in the coordination session
 | |
| type CoordinationMessage struct {
 | |
| 	Timestamp   time.Time              `json:"timestamp"`
 | |
| 	FromAgent   string                 `json:"from_agent"`
 | |
| 	MessageType string                 `json:"message_type"`
 | |
| 	Content     map[string]interface{} `json:"content"`
 | |
| 	Topic       string                 `json:"topic"`
 | |
| }
 | |
| 
 | |
| // TaskDependency represents a detected task dependency
 | |
| type TaskDependency struct {
 | |
| 	Repository     string `json:"repository"`
 | |
| 	TaskNumber     int    `json:"task_number"`
 | |
| 	DependsOn      string `json:"depends_on"`
 | |
| 	DependencyType string `json:"dependency_type"`
 | |
| 	DetectedAt     time.Time `json:"detected_at"`
 | |
| }
 | |
| 
 | |
| // CoordinationMetrics tracks quantitative coordination data
 | |
| type CoordinationMetrics struct {
 | |
| 	StartTime               time.Time `json:"start_time"`
 | |
| 	TotalSessions          int       `json:"total_sessions"`
 | |
| 	ActiveSessions         int       `json:"active_sessions"`
 | |
| 	CompletedSessions      int       `json:"completed_sessions"`
 | |
| 	EscalatedSessions      int       `json:"escalated_sessions"`
 | |
| 	FailedSessions         int       `json:"failed_sessions"`
 | |
| 	TotalMessages          int       `json:"total_messages"`
 | |
| 	TaskAnnouncements      int       `json:"task_announcements"`
 | |
| 	DependenciesDetected   int       `json:"dependencies_detected"`
 | |
| 	AgentParticipations    map[string]int `json:"agent_participations"`
 | |
| 	AverageSessionDuration time.Duration  `json:"average_session_duration"`
 | |
| 	LastUpdated            time.Time `json:"last_updated"`
 | |
| }
 | |
| 
 | |
| // NewAntennaeMonitor creates a new antennae monitoring system
 | |
| func NewAntennaeMonitor(ctx context.Context, ps *pubsub.PubSub, logDir string) (*AntennaeMonitor, error) {
 | |
| 	// Ensure log directory exists
 | |
| 	if err := os.MkdirAll(logDir, 0755); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create log directory: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	// Create log files
 | |
| 	timestamp := time.Now().Format("20060102_150405")
 | |
| 	logPath := filepath.Join(logDir, fmt.Sprintf("antennae_activity_%s.jsonl", timestamp))
 | |
| 	metricsPath := filepath.Join(logDir, fmt.Sprintf("antennae_metrics_%s.json", timestamp))
 | |
| 
 | |
| 	logFile, err := os.Create(logPath)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to create activity log file: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	metricsFile, err := os.Create(metricsPath)
 | |
| 	if err != nil {
 | |
| 		logFile.Close()
 | |
| 		return nil, fmt.Errorf("failed to create metrics file: %w", err)
 | |
| 	}
 | |
| 
 | |
| 	monitor := &AntennaeMonitor{
 | |
| 		ctx:            ctx,
 | |
| 		pubsub:         ps,
 | |
| 		logFile:        logFile,
 | |
| 		metricsFile:    metricsFile,
 | |
| 		activeSessions: make(map[string]*CoordinationSession),
 | |
| 		metrics: &CoordinationMetrics{
 | |
| 			StartTime:            time.Now(),
 | |
| 			AgentParticipations: make(map[string]int),
 | |
| 		},
 | |
| 	}
 | |
| 
 | |
| 	fmt.Printf("📊 Antennae Monitor initialized\n")
 | |
| 	fmt.Printf("   Activity Log: %s\n", logPath)
 | |
| 	fmt.Printf("   Metrics File: %s\n", metricsPath)
 | |
| 
 | |
| 	return monitor, nil
 | |
| }
 | |
| 
 | |
| // Start begins monitoring antennae coordination activity
 | |
| func (am *AntennaeMonitor) Start() {
 | |
| 	if am.isRunning {
 | |
| 		return
 | |
| 	}
 | |
| 	am.isRunning = true
 | |
| 
 | |
| 	fmt.Println("🔍 Starting Antennae coordination monitoring...")
 | |
| 
 | |
| 	// Start monitoring routines
 | |
| 	go am.monitorCoordinationMessages()
 | |
| 	go am.monitorTaskAnnouncements()
 | |
| 	go am.periodicMetricsUpdate()
 | |
| 	go am.sessionCleanup()
 | |
| }
 | |
| 
 | |
| // Stop stops the monitoring system
 | |
| func (am *AntennaeMonitor) Stop() {
 | |
| 	if !am.isRunning {
 | |
| 		return
 | |
| 	}
 | |
| 	am.isRunning = false
 | |
| 
 | |
| 	// Save final metrics
 | |
| 	am.saveMetrics()
 | |
| 
 | |
| 	// Close files
 | |
| 	if am.logFile != nil {
 | |
| 		am.logFile.Close()
 | |
| 	}
 | |
| 	if am.metricsFile != nil {
 | |
| 		am.metricsFile.Close()
 | |
| 	}
 | |
| 
 | |
| 	fmt.Println("🛑 Antennae monitoring stopped")
 | |
| }
 | |
| 
 | |
| // monitorCoordinationMessages listens for antennae meta-discussion messages
 | |
| func (am *AntennaeMonitor) monitorCoordinationMessages() {
 | |
| 	// Subscribe to antennae topic
 | |
| 	msgChan := make(chan pubsub.Message, 100)
 | |
| 	
 | |
| 	// This would be implemented with actual pubsub subscription
 | |
| 	// For now, we'll simulate receiving messages
 | |
| 	
 | |
| 	for am.isRunning {
 | |
| 		select {
 | |
| 		case <-am.ctx.Done():
 | |
| 			return
 | |
| 		case msg := <-msgChan:
 | |
| 			am.processCoordinationMessage(msg)
 | |
| 		case <-time.After(1 * time.Second):
 | |
| 			// Continue monitoring
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // monitorTaskAnnouncements listens for task announcements
 | |
| func (am *AntennaeMonitor) monitorTaskAnnouncements() {
 | |
| 	// Subscribe to bzzz coordination topic
 | |
| 	msgChan := make(chan pubsub.Message, 100)
 | |
| 	
 | |
| 	for am.isRunning {
 | |
| 		select {
 | |
| 		case <-am.ctx.Done():
 | |
| 			return
 | |
| 		case msg := <-msgChan:
 | |
| 			am.processTaskAnnouncement(msg)
 | |
| 		case <-time.After(1 * time.Second):
 | |
| 			// Continue monitoring
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // processCoordinationMessage processes an antennae coordination message
 | |
| func (am *AntennaeMonitor) processCoordinationMessage(msg pubsub.Message) {
 | |
| 	am.mu.Lock()
 | |
| 	defer am.mu.Unlock()
 | |
| 
 | |
| 	coordMsg := CoordinationMessage{
 | |
| 		Timestamp:   time.Now(),
 | |
| 		FromAgent:   msg.From,
 | |
| 		MessageType: msg.Type,
 | |
| 		Content:     msg.Data,
 | |
| 		Topic:       "antennae/meta-discussion",
 | |
| 	}
 | |
| 
 | |
| 	// Log the message
 | |
| 	am.logActivity("coordination_message", coordMsg)
 | |
| 
 | |
| 	// Update metrics
 | |
| 	am.metrics.TotalMessages++
 | |
| 	am.metrics.AgentParticipations[msg.From]++
 | |
| 
 | |
| 	// Determine session ID (could be extracted from message content)
 | |
| 	sessionID := am.extractSessionID(msg.Data)
 | |
| 	
 | |
| 	// Get or create session
 | |
| 	session := am.getOrCreateSession(sessionID)
 | |
| 	session.LastActivity = time.Now()
 | |
| 	session.Messages = append(session.Messages, coordMsg)
 | |
| 	
 | |
| 	// Add participant if new
 | |
| 	if !contains(session.Participants, msg.From) {
 | |
| 		session.Participants = append(session.Participants, msg.From)
 | |
| 	}
 | |
| 
 | |
| 	// Update session status based on message type
 | |
| 	am.updateSessionStatus(session, msg)
 | |
| 
 | |
| 	fmt.Printf("🧠 Antennae message: %s from %s (Session: %s)\n", 
 | |
| 		msg.Type, msg.From, sessionID)
 | |
| }
 | |
| 
 | |
| // processTaskAnnouncement processes a task announcement
 | |
| func (am *AntennaeMonitor) processTaskAnnouncement(msg pubsub.Message) {
 | |
| 	am.mu.Lock()
 | |
| 	defer am.mu.Unlock()
 | |
| 
 | |
| 	// Log the announcement
 | |
| 	am.logActivity("task_announcement", msg.Data)
 | |
| 
 | |
| 	// Update metrics
 | |
| 	am.metrics.TaskAnnouncements++
 | |
| 
 | |
| 	// Extract task information
 | |
| 	if repo, ok := msg.Data["repository"].(map[string]interface{}); ok {
 | |
| 		if repoName, ok := repo["name"].(string); ok {
 | |
| 			fmt.Printf("📢 Task announced: %s\n", repoName)
 | |
| 			
 | |
| 			// Check for dependencies and create coordination session if needed
 | |
| 			if task, ok := msg.Data["task"].(map[string]interface{}); ok {
 | |
| 				if deps, ok := task["dependencies"].([]interface{}); ok && len(deps) > 0 {
 | |
| 					sessionID := fmt.Sprintf("coord_%d", time.Now().Unix())
 | |
| 					session := am.getOrCreateSession(sessionID)
 | |
| 					session.Repositories = append(session.Repositories, repoName)
 | |
| 					
 | |
| 					fmt.Printf("🔗 Dependencies detected, creating coordination session: %s\n", sessionID)
 | |
| 				}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // getOrCreateSession gets an existing session or creates a new one
 | |
| func (am *AntennaeMonitor) getOrCreateSession(sessionID string) *CoordinationSession {
 | |
| 	if session, exists := am.activeSessions[sessionID]; exists {
 | |
| 		return session
 | |
| 	}
 | |
| 
 | |
| 	session := &CoordinationSession{
 | |
| 		SessionID:    sessionID,
 | |
| 		StartTime:    time.Now(),
 | |
| 		LastActivity: time.Now(),
 | |
| 		Status:       "active",
 | |
| 		Messages:     make([]CoordinationMessage, 0),
 | |
| 		Repositories: make([]string, 0),
 | |
| 		Tasks:        make([]string, 0),
 | |
| 		Participants: make([]string, 0),
 | |
| 		Dependencies: make([]TaskDependency, 0),
 | |
| 	}
 | |
| 
 | |
| 	am.activeSessions[sessionID] = session
 | |
| 	am.metrics.TotalSessions++
 | |
| 	am.metrics.ActiveSessions++
 | |
| 
 | |
| 	fmt.Printf("🆕 New coordination session created: %s\n", sessionID)
 | |
| 	return session
 | |
| }
 | |
| 
 | |
| // updateSessionStatus updates session status based on message content
 | |
| func (am *AntennaeMonitor) updateSessionStatus(session *CoordinationSession, msg pubsub.Message) {
 | |
| 	// Analyze message content to determine status changes
 | |
| 	if content, ok := msg.Data["type"].(string); ok {
 | |
| 		switch content {
 | |
| 		case "consensus_reached":
 | |
| 			session.Status = "completed"
 | |
| 			am.metrics.ActiveSessions--
 | |
| 			am.metrics.CompletedSessions++
 | |
| 		case "escalation_triggered":
 | |
| 			session.Status = "escalated"
 | |
| 			am.metrics.ActiveSessions--
 | |
| 			am.metrics.EscalatedSessions++
 | |
| 		case "coordination_failed":
 | |
| 			session.Status = "failed"
 | |
| 			am.metrics.ActiveSessions--
 | |
| 			am.metrics.FailedSessions++
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // periodicMetricsUpdate saves metrics periodically
 | |
| func (am *AntennaeMonitor) periodicMetricsUpdate() {
 | |
| 	ticker := time.NewTicker(30 * time.Second)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for am.isRunning {
 | |
| 		select {
 | |
| 		case <-am.ctx.Done():
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			am.saveMetrics()
 | |
| 			am.printStatus()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // sessionCleanup removes old inactive sessions
 | |
| func (am *AntennaeMonitor) sessionCleanup() {
 | |
| 	ticker := time.NewTicker(5 * time.Minute)
 | |
| 	defer ticker.Stop()
 | |
| 
 | |
| 	for am.isRunning {
 | |
| 		select {
 | |
| 		case <-am.ctx.Done():
 | |
| 			return
 | |
| 		case <-ticker.C:
 | |
| 			am.cleanupOldSessions()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // cleanupOldSessions removes sessions inactive for more than 10 minutes
 | |
| func (am *AntennaeMonitor) cleanupOldSessions() {
 | |
| 	am.mu.Lock()
 | |
| 	defer am.mu.Unlock()
 | |
| 
 | |
| 	cutoff := time.Now().Add(-10 * time.Minute)
 | |
| 	cleaned := 0
 | |
| 
 | |
| 	for sessionID, session := range am.activeSessions {
 | |
| 		if session.LastActivity.Before(cutoff) && session.Status == "active" {
 | |
| 			session.Status = "timeout"
 | |
| 			delete(am.activeSessions, sessionID)
 | |
| 			am.metrics.ActiveSessions--
 | |
| 			am.metrics.FailedSessions++
 | |
| 			cleaned++
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if cleaned > 0 {
 | |
| 		fmt.Printf("🧹 Cleaned up %d inactive sessions\n", cleaned)
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // logActivity logs an activity to the activity log file
 | |
| func (am *AntennaeMonitor) logActivity(activityType string, data interface{}) {
 | |
| 	logEntry := map[string]interface{}{
 | |
| 		"timestamp":     time.Now().Unix(),
 | |
| 		"activity_type": activityType,
 | |
| 		"data":          data,
 | |
| 	}
 | |
| 
 | |
| 	if jsonBytes, err := json.Marshal(logEntry); err == nil {
 | |
| 		am.logFile.WriteString(string(jsonBytes) + "\n")
 | |
| 		am.logFile.Sync()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // saveMetrics saves current metrics to file
 | |
| func (am *AntennaeMonitor) saveMetrics() {
 | |
| 	am.mu.RLock()
 | |
| 	defer am.mu.RUnlock()
 | |
| 
 | |
| 	am.metrics.LastUpdated = time.Now()
 | |
| 	
 | |
| 	// Calculate average session duration
 | |
| 	if am.metrics.CompletedSessions > 0 {
 | |
| 		totalDuration := time.Duration(0)
 | |
| 		completed := 0
 | |
| 		
 | |
| 		for _, session := range am.activeSessions {
 | |
| 			if session.Status == "completed" {
 | |
| 				totalDuration += session.LastActivity.Sub(session.StartTime)
 | |
| 				completed++
 | |
| 			}
 | |
| 		}
 | |
| 		
 | |
| 		if completed > 0 {
 | |
| 			am.metrics.AverageSessionDuration = totalDuration / time.Duration(completed)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if jsonBytes, err := json.MarshalIndent(am.metrics, "", "  "); err == nil {
 | |
| 		am.metricsFile.Seek(0, 0)
 | |
| 		am.metricsFile.Truncate(0)
 | |
| 		am.metricsFile.Write(jsonBytes)
 | |
| 		am.metricsFile.Sync()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // printStatus prints current monitoring status
 | |
| func (am *AntennaeMonitor) printStatus() {
 | |
| 	am.mu.RLock()
 | |
| 	defer am.mu.RUnlock()
 | |
| 
 | |
| 	fmt.Printf("📊 Antennae Monitor Status:\n")
 | |
| 	fmt.Printf("   Total Sessions: %d (Active: %d, Completed: %d)\n", 
 | |
| 		am.metrics.TotalSessions, am.metrics.ActiveSessions, am.metrics.CompletedSessions)
 | |
| 	fmt.Printf("   Messages: %d, Announcements: %d\n", 
 | |
| 		am.metrics.TotalMessages, am.metrics.TaskAnnouncements)
 | |
| 	fmt.Printf("   Dependencies Detected: %d\n", am.metrics.DependenciesDetected)
 | |
| 	fmt.Printf("   Active Participants: %d\n", len(am.metrics.AgentParticipations))
 | |
| }
 | |
| 
 | |
| // GetMetrics returns current metrics
 | |
| func (am *AntennaeMonitor) GetMetrics() *CoordinationMetrics {
 | |
| 	am.mu.RLock()
 | |
| 	defer am.mu.RUnlock()
 | |
| 	return am.metrics
 | |
| }
 | |
| 
 | |
| // Helper functions
 | |
| func (am *AntennaeMonitor) extractSessionID(data map[string]interface{}) string {
 | |
| 	if sessionID, ok := data["session_id"].(string); ok {
 | |
| 		return sessionID
 | |
| 	}
 | |
| 	if scenarioName, ok := data["scenario_name"].(string); ok {
 | |
| 		return fmt.Sprintf("scenario_%s", scenarioName)
 | |
| 	}
 | |
| 	return fmt.Sprintf("session_%d", time.Now().Unix())
 | |
| }
 | |
| 
 | |
| func contains(slice []string, item string) bool {
 | |
| 	for _, s := range slice {
 | |
| 		if s == item {
 | |
| 			return true
 | |
| 		}
 | |
| 	}
 | |
| 	return false
 | |
| } |