Files
CHORUS/hmmm-monitor/main.go
anthonyrawlins 511e52a05c feat: Add HMMM traffic monitoring tool
Create standalone monitoring container that subscribes to all CHORUS
pub/sub topics and logs traffic in real-time for debugging and observability.

Features:
- Subscribes to chorus-bzzz, chorus-hmmm, chorus-context topics
- Logs all messages with timestamps and sender information
- Pretty-printed JSON output with topic-specific emojis
- Minimal resource usage (256MB RAM, 0.5 CPU)
- Read-only monitoring (doesn't publish messages)

Files:
- hmmm-monitor/main.go: Main monitoring application
- hmmm-monitor/Dockerfile: Multi-stage build for minimal image
- hmmm-monitor/docker-compose.yml: Swarm deployment config
- hmmm-monitor/README.md: Usage documentation

This tool helps debug council formation, task execution, and agent
coordination by providing visibility into all HMMM/Bzzz traffic.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 12:30:43 +11:00

196 lines
4.4 KiB
Go

package main
import (
"context"
"encoding/json"
"fmt"
"log"
"os"
"os/signal"
"syscall"
"time"
"github.com/libp2p/go-libp2p"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
)
// MessageLog represents a logged HMMM/Bzzz message
type MessageLog struct {
Timestamp time.Time `json:"timestamp"`
Topic string `json:"topic"`
From string `json:"from"`
Type string `json:"type,omitempty"`
Payload map[string]interface{} `json:"payload"`
}
func main() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle graceful shutdown
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
log.Println("🛑 Shutting down HMMM monitor...")
cancel()
}()
log.Println("🔍 Starting HMMM Traffic Monitor...")
// Create libp2p host
h, err := libp2p.New(
libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/0"),
)
if err != nil {
log.Fatal("Failed to create libp2p host:", err)
}
defer h.Close()
log.Printf("📡 Monitor node ID: %s", h.ID().String())
log.Printf("📍 Listening on: %v", h.Addrs())
// Create PubSub instance
ps, err := pubsub.NewGossipSub(ctx, h)
if err != nil {
log.Fatal("Failed to create PubSub:", err)
}
// Topics to monitor
topics := []string{
"chorus-bzzz", // Main CHORUS coordination topic
"chorus-hmmm", // HMMM meta-discussion topic
"chorus-context", // Context feedback topic
"council-formation", // Council formation broadcasts
"council-assignments", // Role assignments
}
// Subscribe to all topics
for _, topicName := range topics {
go monitorTopic(ctx, ps, h, topicName)
}
log.Println("✅ HMMM Monitor ready - listening for traffic...")
log.Println(" Press Ctrl+C to stop")
// Keep running until context is cancelled
<-ctx.Done()
log.Println("✅ HMMM Monitor stopped")
}
func monitorTopic(ctx context.Context, ps *pubsub.PubSub, h host.Host, topicName string) {
// Join topic
topic, err := ps.Join(topicName)
if err != nil {
log.Printf("❌ Failed to join topic %s: %v", topicName, err)
return
}
defer topic.Close()
// Subscribe to topic
sub, err := topic.Subscribe()
if err != nil {
log.Printf("❌ Failed to subscribe to %s: %v", topicName, err)
return
}
defer sub.Cancel()
log.Printf("👂 Monitoring topic: %s", topicName)
// Process messages
for {
select {
case <-ctx.Done():
return
default:
msg, err := sub.Next(ctx)
if err != nil {
if ctx.Err() != nil {
return
}
log.Printf("⚠️ Error reading from %s: %v", topicName, err)
continue
}
// Skip messages from ourselves
if msg.ReceivedFrom == h.ID() {
continue
}
logMessage(topicName, msg)
}
}
}
func logMessage(topicName string, msg *pubsub.Message) {
// Try to parse as JSON
var payload map[string]interface{}
if err := json.Unmarshal(msg.Data, &payload); err != nil {
// Not JSON, log as raw data
log.Printf("🐝 [%s] from %s: %s", topicName, msg.ReceivedFrom.ShortString(), string(msg.Data))
return
}
// Extract message type if available
msgType, _ := payload["type"].(string)
logEntry := MessageLog{
Timestamp: time.Now(),
Topic: topicName,
From: msg.ReceivedFrom.ShortString(),
Type: msgType,
Payload: payload,
}
// Pretty print JSON log
jsonLog, _ := json.MarshalIndent(logEntry, "", " ")
// Use emoji based on topic
emoji := getTopicEmoji(topicName, msgType)
fmt.Printf("\n%s [%s] from %s\n%s\n", emoji, topicName, msg.ReceivedFrom.ShortString(), jsonLog)
}
func getTopicEmoji(topic, msgType string) string {
// Topic-based emojis
switch topic {
case "chorus-bzzz":
switch msgType {
case "availability_broadcast":
return "📊"
case "capability_broadcast":
return "🎯"
case "task_claim":
return "✋"
case "task_progress":
return "⏳"
case "task_complete":
return "✅"
default:
return "🐝"
}
case "chorus-hmmm":
switch msgType {
case "meta_discussion":
return "💬"
case "task_help_request":
return "🆘"
case "task_help_response":
return "💡"
case "escalation_trigger":
return "🚨"
default:
return "🧠"
}
case "chorus-context":
return "📝"
case "council-formation":
return "🎭"
case "council-assignments":
return "👔"
default:
return "📡"
}
}