Create standalone monitoring container that subscribes to all CHORUS pub/sub topics and logs traffic in real-time for debugging and observability. Features: - Subscribes to chorus-bzzz, chorus-hmmm, chorus-context topics - Logs all messages with timestamps and sender information - Pretty-printed JSON output with topic-specific emojis - Minimal resource usage (256MB RAM, 0.5 CPU) - Read-only monitoring (doesn't publish messages) Files: - hmmm-monitor/main.go: Main monitoring application - hmmm-monitor/Dockerfile: Multi-stage build for minimal image - hmmm-monitor/docker-compose.yml: Swarm deployment config - hmmm-monitor/README.md: Usage documentation This tool helps debug council formation, task execution, and agent coordination by providing visibility into all HMMM/Bzzz traffic. 🤖 Generated with [Claude Code](https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
196 lines
4.4 KiB
Go
196 lines
4.4 KiB
Go
package main
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"os/signal"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/libp2p/go-libp2p"
|
|
pubsub "github.com/libp2p/go-libp2p-pubsub"
|
|
"github.com/libp2p/go-libp2p/core/host"
|
|
)
|
|
|
|
// MessageLog represents a logged HMMM/Bzzz message
|
|
type MessageLog struct {
|
|
Timestamp time.Time `json:"timestamp"`
|
|
Topic string `json:"topic"`
|
|
From string `json:"from"`
|
|
Type string `json:"type,omitempty"`
|
|
Payload map[string]interface{} `json:"payload"`
|
|
}
|
|
|
|
func main() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Handle graceful shutdown
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
go func() {
|
|
<-sigChan
|
|
log.Println("🛑 Shutting down HMMM monitor...")
|
|
cancel()
|
|
}()
|
|
|
|
log.Println("🔍 Starting HMMM Traffic Monitor...")
|
|
|
|
// Create libp2p host
|
|
h, err := libp2p.New(
|
|
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
|
|
)
|
|
if err != nil {
|
|
log.Fatal("Failed to create libp2p host:", err)
|
|
}
|
|
defer h.Close()
|
|
|
|
log.Printf("📡 Monitor node ID: %s", h.ID().String())
|
|
log.Printf("📍 Listening on: %v", h.Addrs())
|
|
|
|
// Create PubSub instance
|
|
ps, err := pubsub.NewGossipSub(ctx, h)
|
|
if err != nil {
|
|
log.Fatal("Failed to create PubSub:", err)
|
|
}
|
|
|
|
// Topics to monitor
|
|
topics := []string{
|
|
"chorus-bzzz", // Main CHORUS coordination topic
|
|
"chorus-hmmm", // HMMM meta-discussion topic
|
|
"chorus-context", // Context feedback topic
|
|
"council-formation", // Council formation broadcasts
|
|
"council-assignments", // Role assignments
|
|
}
|
|
|
|
// Subscribe to all topics
|
|
for _, topicName := range topics {
|
|
go monitorTopic(ctx, ps, h, topicName)
|
|
}
|
|
|
|
log.Println("✅ HMMM Monitor ready - listening for traffic...")
|
|
log.Println(" Press Ctrl+C to stop")
|
|
|
|
// Keep running until context is cancelled
|
|
<-ctx.Done()
|
|
log.Println("✅ HMMM Monitor stopped")
|
|
}
|
|
|
|
func monitorTopic(ctx context.Context, ps *pubsub.PubSub, h host.Host, topicName string) {
|
|
// Join topic
|
|
topic, err := ps.Join(topicName)
|
|
if err != nil {
|
|
log.Printf("❌ Failed to join topic %s: %v", topicName, err)
|
|
return
|
|
}
|
|
defer topic.Close()
|
|
|
|
// Subscribe to topic
|
|
sub, err := topic.Subscribe()
|
|
if err != nil {
|
|
log.Printf("❌ Failed to subscribe to %s: %v", topicName, err)
|
|
return
|
|
}
|
|
defer sub.Cancel()
|
|
|
|
log.Printf("👂 Monitoring topic: %s", topicName)
|
|
|
|
// Process messages
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
msg, err := sub.Next(ctx)
|
|
if err != nil {
|
|
if ctx.Err() != nil {
|
|
return
|
|
}
|
|
log.Printf("⚠️ Error reading from %s: %v", topicName, err)
|
|
continue
|
|
}
|
|
|
|
// Skip messages from ourselves
|
|
if msg.ReceivedFrom == h.ID() {
|
|
continue
|
|
}
|
|
|
|
logMessage(topicName, msg)
|
|
}
|
|
}
|
|
}
|
|
|
|
func logMessage(topicName string, msg *pubsub.Message) {
|
|
// Try to parse as JSON
|
|
var payload map[string]interface{}
|
|
if err := json.Unmarshal(msg.Data, &payload); err != nil {
|
|
// Not JSON, log as raw data
|
|
log.Printf("🐝 [%s] from %s: %s", topicName, msg.ReceivedFrom.ShortString(), string(msg.Data))
|
|
return
|
|
}
|
|
|
|
// Extract message type if available
|
|
msgType, _ := payload["type"].(string)
|
|
|
|
logEntry := MessageLog{
|
|
Timestamp: time.Now(),
|
|
Topic: topicName,
|
|
From: msg.ReceivedFrom.ShortString(),
|
|
Type: msgType,
|
|
Payload: payload,
|
|
}
|
|
|
|
// Pretty print JSON log
|
|
jsonLog, _ := json.MarshalIndent(logEntry, "", " ")
|
|
|
|
// Use emoji based on topic
|
|
emoji := getTopicEmoji(topicName, msgType)
|
|
|
|
fmt.Printf("\n%s [%s] from %s\n%s\n", emoji, topicName, msg.ReceivedFrom.ShortString(), jsonLog)
|
|
}
|
|
|
|
func getTopicEmoji(topic, msgType string) string {
|
|
// Topic-based emojis
|
|
switch topic {
|
|
case "chorus-bzzz":
|
|
switch msgType {
|
|
case "availability_broadcast":
|
|
return "📊"
|
|
case "capability_broadcast":
|
|
return "🎯"
|
|
case "task_claim":
|
|
return "✋"
|
|
case "task_progress":
|
|
return "⏳"
|
|
case "task_complete":
|
|
return "✅"
|
|
default:
|
|
return "🐝"
|
|
}
|
|
case "chorus-hmmm":
|
|
switch msgType {
|
|
case "meta_discussion":
|
|
return "💬"
|
|
case "task_help_request":
|
|
return "🆘"
|
|
case "task_help_response":
|
|
return "💡"
|
|
case "escalation_trigger":
|
|
return "🚨"
|
|
default:
|
|
return "🧠"
|
|
}
|
|
case "chorus-context":
|
|
return "📝"
|
|
case "council-formation":
|
|
return "🎭"
|
|
case "council-assignments":
|
|
return "👔"
|
|
default:
|
|
return "📡"
|
|
}
|
|
}
|