Major Improvements: - Added retry deployment buttons in machine list for failed deployments - Added retry button in SSH console modal footer for enhanced UX - Enhanced deployment process with comprehensive cleanup of existing services - Improved binary installation with password-based sudo authentication - Updated configuration generation to include all required sections (agent, ai, network, security) - Fixed deployment verification and error handling Security Enhancements: - Enhanced verifiedStopExistingServices with thorough cleanup process - Improved binary copying with proper sudo authentication - Added comprehensive configuration validation UX Improvements: - Users can retry deployments without re-running machine discovery - Retry buttons available from both machine list and console modal - Real-time deployment progress with detailed console output - Clear error states with actionable retry options Technical Changes: - Modified ServiceDeployment.tsx with retry button components - Enhanced api/setup_manager.go with improved deployment functions - Updated main.go with command line argument support (--config, --setup) - Added comprehensive zero-trust security validation system 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1663 lines
51 KiB
Plaintext
1663 lines
51 KiB
Plaintext
package main
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"path/filepath"
|
|
"reflect"
|
|
"time"
|
|
|
|
"chorus.services/bzzz/api"
|
|
"chorus.services/bzzz/coordinator"
|
|
"chorus.services/bzzz/discovery"
|
|
"chorus.services/bzzz/logging"
|
|
"chorus.services/bzzz/p2p"
|
|
"chorus.services/bzzz/pkg/config"
|
|
"chorus.services/bzzz/pkg/crypto"
|
|
"chorus.services/bzzz/pkg/dht"
|
|
"chorus.services/bzzz/pkg/election"
|
|
"chorus.services/bzzz/pkg/health"
|
|
"chorus.services/bzzz/pkg/shutdown"
|
|
"chorus.services/bzzz/pkg/ucxi"
|
|
"chorus.services/bzzz/pkg/ucxl"
|
|
"chorus.services/bzzz/pkg/web"
|
|
"chorus.services/bzzz/pubsub"
|
|
"chorus.services/bzzz/reasoning"
|
|
"chorus.services/hmmm/pkg/hmmm"
|
|
"github.com/libp2p/go-libp2p/core/peer"
|
|
"github.com/multiformats/go-multiaddr"
|
|
)
|
|
|
|
// SimpleTaskTracker tracks active tasks for availability reporting
|
|
type SimpleTaskTracker struct {
|
|
maxTasks int
|
|
activeTasks map[string]bool
|
|
decisionPublisher *ucxl.DecisionPublisher
|
|
}
|
|
|
|
// GetActiveTasks returns list of active task IDs
|
|
func (t *SimpleTaskTracker) GetActiveTasks() []string {
|
|
tasks := make([]string, 0, len(t.activeTasks))
|
|
for taskID := range t.activeTasks {
|
|
tasks = append(tasks, taskID)
|
|
}
|
|
return tasks
|
|
}
|
|
|
|
// GetMaxTasks returns maximum number of concurrent tasks
|
|
func (t *SimpleTaskTracker) GetMaxTasks() int {
|
|
return t.maxTasks
|
|
}
|
|
|
|
// AddTask marks a task as active
|
|
func (t *SimpleTaskTracker) AddTask(taskID string) {
|
|
t.activeTasks[taskID] = true
|
|
}
|
|
|
|
// RemoveTask marks a task as completed and publishes decision if publisher available
|
|
func (t *SimpleTaskTracker) RemoveTask(taskID string) {
|
|
delete(t.activeTasks, taskID)
|
|
|
|
// Publish task completion decision if publisher is available
|
|
if t.decisionPublisher != nil {
|
|
t.publishTaskCompletion(taskID, true, "Task completed successfully", nil)
|
|
}
|
|
}
|
|
|
|
// CompleteTaskWithDecision marks a task as completed and publishes detailed decision
|
|
func (t *SimpleTaskTracker) CompleteTaskWithDecision(taskID string, success bool, summary string, filesModified []string) {
|
|
delete(t.activeTasks, taskID)
|
|
|
|
// Publish task completion decision if publisher is available
|
|
if t.decisionPublisher != nil {
|
|
t.publishTaskCompletion(taskID, success, summary, filesModified)
|
|
}
|
|
}
|
|
|
|
// SetDecisionPublisher sets the decision publisher for task completion tracking
|
|
func (t *SimpleTaskTracker) SetDecisionPublisher(publisher *ucxl.DecisionPublisher) {
|
|
t.decisionPublisher = publisher
|
|
}
|
|
|
|
// publishTaskCompletion publishes a task completion decision to DHT
|
|
func (t *SimpleTaskTracker) publishTaskCompletion(taskID string, success bool, summary string, filesModified []string) {
|
|
if t.decisionPublisher == nil {
|
|
return
|
|
}
|
|
|
|
if err := t.decisionPublisher.PublishTaskCompletion(taskID, success, summary, filesModified); err != nil {
|
|
fmt.Printf("⚠️ Failed to publish task completion for %s: %v\n", taskID, err)
|
|
} else {
|
|
fmt.Printf("📤 Published task completion decision for: %s\n", taskID)
|
|
}
|
|
}
|
|
|
|
func main() {
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
fmt.Println("🚀 Starting Bzzz + HMMM P2P Task Coordination System...")
|
|
|
|
// Determine config file path
|
|
configPath := os.Getenv("BZZZ_CONFIG_PATH")
|
|
if configPath == "" {
|
|
configPath = ".bzzz/config.yaml"
|
|
}
|
|
|
|
// Check if setup is required
|
|
if config.IsSetupRequired(configPath) {
|
|
fmt.Println("🔧 Setup required - starting web configuration interface...")
|
|
startSetupMode(configPath)
|
|
return
|
|
}
|
|
|
|
// Load configuration
|
|
cfg, err := config.LoadConfig(configPath)
|
|
if err != nil {
|
|
log.Fatalf("Failed to load configuration: %v", err)
|
|
}
|
|
|
|
// Validate configuration
|
|
if !config.IsValidConfiguration(cfg) {
|
|
fmt.Println("⚠️ Configuration is invalid - starting setup mode...")
|
|
startSetupMode(configPath)
|
|
return
|
|
}
|
|
|
|
// Initialize P2P node
|
|
node, err := p2p.NewNode(ctx)
|
|
if err != nil {
|
|
log.Fatalf("Failed to create P2P node: %v", err)
|
|
}
|
|
defer node.Close()
|
|
|
|
// Apply node-specific configuration if agent ID is not set
|
|
if cfg.Agent.ID == "" {
|
|
nodeID := node.ID().ShortString()
|
|
nodeSpecificCfg := config.GetNodeSpecificDefaults(nodeID)
|
|
|
|
// Merge node-specific defaults with loaded config
|
|
cfg.Agent.ID = nodeSpecificCfg.Agent.ID
|
|
if len(cfg.Agent.Capabilities) == 0 {
|
|
cfg.Agent.Capabilities = nodeSpecificCfg.Agent.Capabilities
|
|
}
|
|
if len(cfg.Agent.Models) == 0 {
|
|
cfg.Agent.Models = nodeSpecificCfg.Agent.Models
|
|
}
|
|
if cfg.Agent.Specialization == "" {
|
|
cfg.Agent.Specialization = nodeSpecificCfg.Agent.Specialization
|
|
}
|
|
}
|
|
|
|
// Apply role-based configuration if no role is set
|
|
if cfg.Agent.Role == "" {
|
|
// Determine default role based on specialization
|
|
defaultRole := getDefaultRoleForSpecialization(cfg.Agent.Specialization)
|
|
if defaultRole != "" {
|
|
fmt.Printf("🎭 Applying default role: %s\n", defaultRole)
|
|
if err := cfg.ApplyRoleDefinition(defaultRole); err != nil {
|
|
fmt.Printf("⚠️ Failed to apply role definition: %v\n", err)
|
|
} else {
|
|
fmt.Printf("✅ Role applied: %s\n", cfg.Agent.Role)
|
|
}
|
|
}
|
|
}
|
|
|
|
fmt.Printf("🐝 Bzzz node started successfully\n")
|
|
fmt.Printf("📍 Node ID: %s\n", node.ID().ShortString())
|
|
fmt.Printf("🤖 Agent ID: %s\n", cfg.Agent.ID)
|
|
fmt.Printf("🎯 Specialization: %s\n", cfg.Agent.Specialization)
|
|
|
|
// Display authority level if role is configured
|
|
if cfg.Agent.Role != "" {
|
|
authority, err := cfg.GetRoleAuthority(cfg.Agent.Role)
|
|
if err == nil {
|
|
fmt.Printf("🎭 Role: %s (Authority: %s)\n", cfg.Agent.Role, authority)
|
|
if authority == config.AuthorityMaster {
|
|
fmt.Printf("👑 This node can become admin/SLURP\n")
|
|
}
|
|
}
|
|
}
|
|
|
|
// Hive is deprecated - removed reference
|
|
fmt.Printf("🔗 Listening addresses:\n")
|
|
for _, addr := range node.Addresses() {
|
|
fmt.Printf(" %s/p2p/%s\n", addr, node.ID())
|
|
}
|
|
|
|
// Initialize Hypercore-style logger
|
|
hlog := logging.NewHypercoreLog(node.ID())
|
|
hlog.Append(logging.PeerJoined, map[string]interface{}{"status": "started"})
|
|
fmt.Printf("📝 Hypercore logger initialized\n")
|
|
|
|
// Initialize mDNS discovery
|
|
mdnsDiscovery, err := discovery.NewMDNSDiscovery(ctx, node.Host(), "bzzz-peer-discovery")
|
|
if err != nil {
|
|
log.Fatalf("Failed to create mDNS discovery: %v", err)
|
|
}
|
|
defer mdnsDiscovery.Close()
|
|
|
|
// Initialize PubSub with hypercore logging
|
|
ps, err := pubsub.NewPubSubWithLogger(ctx, node.Host(), "bzzz/coordination/v1", "hmmm/meta-discussion/v1", hlog)
|
|
if err != nil {
|
|
log.Fatalf("Failed to create PubSub: %v", err)
|
|
}
|
|
defer ps.Close()
|
|
|
|
// Initialize HMMM Router
|
|
hmmmAdapter := pubsub.NewGossipPublisher(ps)
|
|
hmmmRouter := hmmm.NewRouter(hmmmAdapter, hmmm.DefaultConfig())
|
|
fmt.Printf("🐜 HMMM Router initialized and attached to Bzzz pubsub\n")
|
|
_ = hmmmRouter // Prevent unused variable error for now
|
|
|
|
// Join role-based topics if role is configured
|
|
if cfg.Agent.Role != "" {
|
|
if err := ps.JoinRoleBasedTopics(cfg.Agent.Role, cfg.Agent.Expertise, cfg.Agent.ReportsTo); err != nil {
|
|
fmt.Printf("⚠️ Failed to join role-based topics: %v\n", err)
|
|
} else {
|
|
fmt.Printf("🎯 Joined role-based collaboration topics\n")
|
|
}
|
|
}
|
|
|
|
// Optional: HMMM per-issue room smoke test
|
|
if os.Getenv("BZZZ_HMMM_SMOKE") == "1" {
|
|
issueID := 42
|
|
topic := fmt.Sprintf("bzzz/meta/issue/%d", issueID)
|
|
if err := ps.JoinDynamicTopic(topic); err != nil {
|
|
fmt.Printf("⚠️ HMMM smoke: failed to join %s: %v\n", topic, err)
|
|
} else {
|
|
seed := map[string]interface{}{
|
|
"version": 1,
|
|
"type": "meta_msg",
|
|
"issue_id": issueID,
|
|
"thread_id": fmt.Sprintf("issue-%d", issueID),
|
|
"msg_id": fmt.Sprintf("seed-%d", time.Now().UnixNano()),
|
|
"node_id": node.ID().ShortString(),
|
|
"hop_count": 0,
|
|
"timestamp": time.Now().UTC(),
|
|
"message": "Seed: HMMM per-issue room initialized.",
|
|
}
|
|
b, _ := json.Marshal(seed)
|
|
if err := ps.PublishRaw(topic, b); err != nil {
|
|
fmt.Printf("⚠️ HMMM smoke: publish failed: %v\n", err)
|
|
} else {
|
|
fmt.Printf("🧪 HMMM smoke: published seed to %s\n", topic)
|
|
}
|
|
}
|
|
}
|
|
|
|
// === Admin Election System ===
|
|
// Initialize election manager
|
|
electionManager := election.NewElectionManager(ctx, cfg, node.Host(), ps, node.ID().ShortString())
|
|
|
|
// Set election callbacks
|
|
electionManager.SetCallbacks(
|
|
func(oldAdmin, newAdmin string) {
|
|
fmt.Printf("👑 Admin changed: %s -> %s\n", oldAdmin, newAdmin)
|
|
|
|
// If this node becomes admin, enable SLURP functionality
|
|
if newAdmin == node.ID().ShortString() {
|
|
fmt.Printf("🎯 This node is now admin - enabling SLURP functionality\n")
|
|
cfg.Slurp.Enabled = true
|
|
// Apply admin role configuration
|
|
if err := cfg.ApplyRoleDefinition("admin"); err != nil {
|
|
fmt.Printf("⚠️ Failed to apply admin role: %v\n", err)
|
|
}
|
|
}
|
|
},
|
|
func(winner string) {
|
|
fmt.Printf("🏆 Election completed, winner: %s\n", winner)
|
|
},
|
|
)
|
|
|
|
// Start election manager (now handles heartbeat lifecycle internally)
|
|
if err := electionManager.Start(); err != nil {
|
|
fmt.Printf("❌ Failed to start election manager: %v\n", err)
|
|
} else {
|
|
fmt.Printf("✅ Election manager started with automated heartbeat management\n")
|
|
}
|
|
defer electionManager.Stop()
|
|
// ============================
|
|
|
|
// === DHT Storage and Decision Publishing ===
|
|
// Initialize DHT for distributed storage
|
|
var dhtNode *dht.LibP2PDHT
|
|
var encryptedStorage *dht.EncryptedDHTStorage
|
|
var decisionPublisher *ucxl.DecisionPublisher
|
|
|
|
if cfg.V2.DHT.Enabled {
|
|
// Create DHT
|
|
dhtNode, err = dht.NewLibP2PDHT(ctx, node.Host())
|
|
if err != nil {
|
|
fmt.Printf("⚠️ Failed to create DHT: %v\n", err)
|
|
} else {
|
|
fmt.Printf("🕸️ DHT initialized\n")
|
|
|
|
// Bootstrap DHT
|
|
if err := dhtNode.Bootstrap(); err != nil {
|
|
fmt.Printf("⚠️ DHT bootstrap failed: %v\n", err)
|
|
}
|
|
|
|
// Connect to bootstrap peers if configured
|
|
for _, addrStr := range cfg.V2.DHT.BootstrapPeers {
|
|
addr, err := multiaddr.NewMultiaddr(addrStr)
|
|
if err != nil {
|
|
fmt.Printf("⚠️ Invalid bootstrap address %s: %v\n", addrStr, err)
|
|
continue
|
|
}
|
|
|
|
// Extract peer info from multiaddr
|
|
info, err := peer.AddrInfoFromP2pAddr(addr)
|
|
if err != nil {
|
|
fmt.Printf("⚠️ Failed to parse peer info from %s: %v\n", addrStr, err)
|
|
continue
|
|
}
|
|
|
|
if err := node.Host().Connect(ctx, *info); err != nil {
|
|
fmt.Printf("⚠️ Failed to connect to bootstrap peer %s: %v\n", addrStr, err)
|
|
} else {
|
|
fmt.Printf("🔗 Connected to DHT bootstrap peer: %s\n", addrStr)
|
|
}
|
|
}
|
|
|
|
// Initialize encrypted storage
|
|
encryptedStorage = dht.NewEncryptedDHTStorage(
|
|
ctx,
|
|
node.Host(),
|
|
dhtNode,
|
|
cfg,
|
|
node.ID().ShortString(),
|
|
)
|
|
|
|
// Start cache cleanup
|
|
encryptedStorage.StartCacheCleanup(5 * time.Minute)
|
|
fmt.Printf("🔐 Encrypted DHT storage initialized\n")
|
|
|
|
// Initialize decision publisher
|
|
decisionPublisher = ucxl.NewDecisionPublisher(
|
|
ctx,
|
|
cfg,
|
|
encryptedStorage,
|
|
node.ID().ShortString(),
|
|
cfg.Agent.ID,
|
|
)
|
|
fmt.Printf("📤 Decision publisher initialized\n")
|
|
|
|
// Test the encryption system on startup
|
|
go func() {
|
|
time.Sleep(2 * time.Second) // Wait for initialization
|
|
if err := crypto.TestAgeEncryption(); err != nil {
|
|
fmt.Printf("❌ Age encryption test failed: %v\n", err)
|
|
} else {
|
|
fmt.Printf("✅ Age encryption test passed\n")
|
|
}
|
|
|
|
if err := crypto.TestShamirSecretSharing(); err != nil {
|
|
fmt.Printf("❌ Shamir secret sharing test failed: %v\n", err)
|
|
} else {
|
|
fmt.Printf("✅ Shamir secret sharing test passed\n")
|
|
}
|
|
|
|
// Test end-to-end encrypted decision flow
|
|
time.Sleep(3 * time.Second) // Wait a bit more
|
|
testEndToEndDecisionFlow(decisionPublisher, encryptedStorage)
|
|
}()
|
|
}
|
|
} else {
|
|
fmt.Printf("⚪ DHT disabled in configuration\n")
|
|
}
|
|
defer func() {
|
|
if dhtNode != nil {
|
|
dhtNode.Close()
|
|
}
|
|
}()
|
|
// ===========================================
|
|
|
|
// === Task Coordination Integration ===
|
|
// Initialize Task Coordinator
|
|
taskCoordinator := coordinator.NewTaskCoordinator(
|
|
ctx,
|
|
ps,
|
|
hlog,
|
|
cfg,
|
|
node.ID().ShortString(),
|
|
hmmmRouter,
|
|
)
|
|
|
|
// Start task coordination
|
|
taskCoordinator.Start()
|
|
fmt.Printf("✅ Task coordination system active\n")
|
|
// ==========================
|
|
|
|
// Start HTTP API server
|
|
httpServer := api.NewHTTPServer(8080, hlog, ps)
|
|
go func() {
|
|
if err := httpServer.Start(); err != nil && err != http.ErrServerClosed {
|
|
fmt.Printf("❌ HTTP server error: %v\n", err)
|
|
}
|
|
}()
|
|
defer httpServer.Stop()
|
|
fmt.Printf("🌐 HTTP API server started on :8080\n")
|
|
|
|
// === UCXI Server Integration ===
|
|
// Initialize UCXI server if UCXL protocol is enabled
|
|
var ucxiServer *ucxi.Server
|
|
if cfg.UCXL.Enabled && cfg.UCXL.Server.Enabled {
|
|
// Create storage directory
|
|
storageDir := cfg.UCXL.Storage.Directory
|
|
if storageDir == "" {
|
|
storageDir = filepath.Join(os.TempDir(), "bzzz-ucxi-storage")
|
|
}
|
|
|
|
storage, err := ucxi.NewBasicContentStorage(storageDir)
|
|
if err != nil {
|
|
fmt.Printf("⚠️ Failed to create UCXI storage: %v\n", err)
|
|
} else {
|
|
// Create resolver
|
|
resolver := ucxi.NewBasicAddressResolver(node.ID().ShortString())
|
|
resolver.SetDefaultTTL(cfg.UCXL.Resolution.CacheTTL)
|
|
|
|
// TODO: Add P2P integration hooks here
|
|
// resolver.SetAnnounceHook(...)
|
|
// resolver.SetDiscoverHook(...)
|
|
|
|
// Create UCXI server
|
|
ucxiConfig := ucxi.ServerConfig{
|
|
Port: cfg.UCXL.Server.Port,
|
|
BasePath: cfg.UCXL.Server.BasePath,
|
|
Resolver: resolver,
|
|
Storage: storage,
|
|
Logger: ucxi.SimpleLogger{},
|
|
}
|
|
|
|
ucxiServer = ucxi.NewServer(ucxiConfig)
|
|
go func() {
|
|
if err := ucxiServer.Start(); err != nil && err != http.ErrServerClosed {
|
|
fmt.Printf("❌ UCXI server error: %v\n", err)
|
|
}
|
|
}()
|
|
defer func() {
|
|
if ucxiServer != nil {
|
|
ucxiServer.Stop()
|
|
}
|
|
}()
|
|
fmt.Printf("🔗 UCXI server started on :%d%s/ucxi/v1\n",
|
|
cfg.UCXL.Server.Port, cfg.UCXL.Server.BasePath)
|
|
}
|
|
} else {
|
|
fmt.Printf("⚪ UCXI server disabled (UCXL protocol not enabled)\n")
|
|
}
|
|
// ============================
|
|
|
|
// Create simple task tracker
|
|
taskTracker := &SimpleTaskTracker{
|
|
maxTasks: cfg.Agent.MaxTasks,
|
|
activeTasks: make(map[string]bool),
|
|
}
|
|
|
|
// Connect decision publisher to task tracker if available
|
|
if decisionPublisher != nil {
|
|
taskTracker.SetDecisionPublisher(decisionPublisher)
|
|
fmt.Printf("📤 Task completion decisions will be published to DHT\n")
|
|
}
|
|
|
|
// Announce capabilities and role
|
|
go announceAvailability(ps, node.ID().ShortString(), taskTracker)
|
|
go announceCapabilitiesOnChange(ps, node.ID().ShortString(), cfg)
|
|
go announceRoleOnStartup(ps, node.ID().ShortString(), cfg)
|
|
|
|
// Start status reporting
|
|
go statusReporter(node)
|
|
|
|
fmt.Printf("🔍 Listening for peers on local network...\n")
|
|
fmt.Printf("📡 Ready for task coordination and meta-discussion\n")
|
|
fmt.Printf("🎯 HMMM collaborative reasoning enabled\n")
|
|
|
|
// === Comprehensive Health Monitoring & Graceful Shutdown ===
|
|
// Initialize shutdown manager
|
|
shutdownManager := shutdown.NewManager(30*time.Second, &simpleLogger{})
|
|
|
|
// Initialize health manager
|
|
healthManager := health.NewManager(node.ID().ShortString(), "v0.2.0", &simpleLogger{})
|
|
healthManager.SetShutdownManager(shutdownManager)
|
|
|
|
// Register health checks
|
|
setupHealthChecks(healthManager, ps, node, dhtNode)
|
|
|
|
// Register components for graceful shutdown
|
|
setupGracefulShutdown(shutdownManager, healthManager, node, ps, mdnsDiscovery,
|
|
electionManager, httpServer, ucxiServer, taskCoordinator, dhtNode)
|
|
|
|
// Start health monitoring
|
|
if err := healthManager.Start(); err != nil {
|
|
log.Printf("❌ Failed to start health manager: %v", err)
|
|
} else {
|
|
fmt.Printf("❤️ Health monitoring started\n")
|
|
}
|
|
|
|
// Start health HTTP server on port 8081
|
|
if err := healthManager.StartHTTPServer(8081); err != nil {
|
|
log.Printf("❌ Failed to start health HTTP server: %v", err)
|
|
} else {
|
|
fmt.Printf("🏥 Health endpoints available at http://localhost:8081/health\n")
|
|
}
|
|
|
|
// Start shutdown manager (begins listening for signals)
|
|
shutdownManager.Start()
|
|
fmt.Printf("🛡️ Graceful shutdown manager started\n")
|
|
|
|
fmt.Printf("✅ Bzzz system fully operational with health monitoring\n")
|
|
|
|
// Wait for graceful shutdown
|
|
shutdownManager.Wait()
|
|
fmt.Println("✅ Bzzz system shutdown completed")
|
|
}
|
|
|
|
// setupHealthChecks configures comprehensive health monitoring
|
|
func setupHealthChecks(healthManager *health.Manager, ps *pubsub.PubSub, node *p2p.Node, dhtNode *dht.LibP2PDHT) {
|
|
// P2P connectivity check (critical)
|
|
p2pCheck := &health.HealthCheck{
|
|
Name: "p2p-connectivity",
|
|
Description: "P2P network connectivity and peer count",
|
|
Enabled: true,
|
|
Critical: true,
|
|
Interval: 15 * time.Second,
|
|
Timeout: 10 * time.Second,
|
|
Checker: func(ctx context.Context) health.CheckResult {
|
|
connectedPeers := node.ConnectedPeers()
|
|
minPeers := 1
|
|
|
|
if connectedPeers < minPeers {
|
|
return health.CheckResult{
|
|
Healthy: false,
|
|
Message: fmt.Sprintf("Insufficient P2P peers: %d < %d", connectedPeers, minPeers),
|
|
Details: map[string]interface{}{
|
|
"connected_peers": connectedPeers,
|
|
"min_peers": minPeers,
|
|
"node_id": node.ID().ShortString(),
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
}
|
|
|
|
return health.CheckResult{
|
|
Healthy: true,
|
|
Message: fmt.Sprintf("P2P connectivity OK: %d peers connected", connectedPeers),
|
|
Details: map[string]interface{}{
|
|
"connected_peers": connectedPeers,
|
|
"min_peers": minPeers,
|
|
"node_id": node.ID().ShortString(),
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
},
|
|
}
|
|
healthManager.RegisterCheck(p2pCheck)
|
|
|
|
// Active PubSub health probe
|
|
pubsubAdapter := health.NewPubSubAdapter(ps)
|
|
activePubSubCheck := health.CreateActivePubSubCheck(pubsubAdapter)
|
|
healthManager.RegisterCheck(activePubSubCheck)
|
|
fmt.Printf("✅ Active PubSub health probe registered\n")
|
|
|
|
// Active DHT health probe (if DHT is enabled)
|
|
if dhtNode != nil {
|
|
dhtAdapter := health.NewDHTAdapter(dhtNode)
|
|
activeDHTCheck := health.CreateActiveDHTCheck(dhtAdapter)
|
|
healthManager.RegisterCheck(activeDHTCheck)
|
|
fmt.Printf("✅ Active DHT health probe registered\n")
|
|
}
|
|
|
|
// Legacy static health checks for backward compatibility
|
|
|
|
// PubSub system check (static)
|
|
pubsubCheck := &health.HealthCheck{
|
|
Name: "pubsub-system-static",
|
|
Description: "Static PubSub messaging system health",
|
|
Enabled: true,
|
|
Critical: false,
|
|
Interval: 30 * time.Second,
|
|
Timeout: 5 * time.Second,
|
|
Checker: func(ctx context.Context) health.CheckResult {
|
|
// Simple health check - basic connectivity
|
|
return health.CheckResult{
|
|
Healthy: true,
|
|
Message: "PubSub system operational (static check)",
|
|
Timestamp: time.Now(),
|
|
}
|
|
},
|
|
}
|
|
healthManager.RegisterCheck(pubsubCheck)
|
|
|
|
// DHT system check (static, if DHT is enabled)
|
|
if dhtNode != nil {
|
|
dhtCheck := &health.HealthCheck{
|
|
Name: "dht-system-static",
|
|
Description: "Static Distributed Hash Table system health",
|
|
Enabled: true,
|
|
Critical: false,
|
|
Interval: 60 * time.Second,
|
|
Timeout: 15 * time.Second,
|
|
Checker: func(ctx context.Context) health.CheckResult {
|
|
// Basic connectivity check
|
|
return health.CheckResult{
|
|
Healthy: true,
|
|
Message: "DHT system operational (static check)",
|
|
Details: map[string]interface{}{
|
|
"dht_enabled": true,
|
|
},
|
|
Timestamp: time.Now(),
|
|
}
|
|
},
|
|
}
|
|
healthManager.RegisterCheck(dhtCheck)
|
|
}
|
|
|
|
// Memory usage check
|
|
memoryCheck := health.CreateMemoryCheck(0.85) // Alert if > 85%
|
|
healthManager.RegisterCheck(memoryCheck)
|
|
|
|
// Disk space check
|
|
diskCheck := health.CreateDiskSpaceCheck("/tmp", 0.90) // Alert if > 90%
|
|
healthManager.RegisterCheck(diskCheck)
|
|
}
|
|
|
|
// setupGracefulShutdown registers all components for proper shutdown
|
|
func setupGracefulShutdown(shutdownManager *shutdown.Manager, healthManager *health.Manager,
|
|
node *p2p.Node, ps *pubsub.PubSub, mdnsDiscovery interface{}, electionManager interface{},
|
|
httpServer *api.HTTPServer, ucxiServer *ucxi.Server, taskCoordinator interface{}, dhtNode *dht.LibP2PDHT) {
|
|
|
|
// Health manager (stop health checks early)
|
|
healthComponent := shutdown.NewGenericComponent("health-manager", 10, true).
|
|
SetShutdownFunc(func(ctx context.Context) error {
|
|
return healthManager.Stop()
|
|
})
|
|
shutdownManager.Register(healthComponent)
|
|
|
|
// HTTP servers
|
|
if httpServer != nil {
|
|
httpComponent := shutdown.NewGenericComponent("main-http-server", 20, true).
|
|
SetShutdownFunc(func(ctx context.Context) error {
|
|
return httpServer.Stop()
|
|
})
|
|
shutdownManager.Register(httpComponent)
|
|
}
|
|
|
|
if ucxiServer != nil {
|
|
ucxiComponent := shutdown.NewGenericComponent("ucxi-server", 21, true).
|
|
SetShutdownFunc(func(ctx context.Context) error {
|
|
ucxiServer.Stop()
|
|
return nil
|
|
})
|
|
shutdownManager.Register(ucxiComponent)
|
|
}
|
|
|
|
// Task coordination system
|
|
if taskCoordinator != nil {
|
|
taskComponent := shutdown.NewGenericComponent("task-coordinator", 30, true).
|
|
SetCloser(func() error {
|
|
// In real implementation, gracefully stop task coordinator
|
|
return nil
|
|
})
|
|
shutdownManager.Register(taskComponent)
|
|
}
|
|
|
|
// DHT system
|
|
if dhtNode != nil {
|
|
dhtComponent := shutdown.NewGenericComponent("dht-node", 35, true).
|
|
SetCloser(func() error {
|
|
return dhtNode.Close()
|
|
})
|
|
shutdownManager.Register(dhtComponent)
|
|
}
|
|
|
|
// PubSub system
|
|
if ps != nil {
|
|
pubsubComponent := shutdown.NewGenericComponent("pubsub-system", 40, true).
|
|
SetCloser(func() error {
|
|
return ps.Close()
|
|
})
|
|
shutdownManager.Register(pubsubComponent)
|
|
}
|
|
|
|
// mDNS discovery
|
|
if mdnsDiscovery != nil {
|
|
mdnsComponent := shutdown.NewGenericComponent("mdns-discovery", 50, true).
|
|
SetCloser(func() error {
|
|
// In real implementation, close mDNS discovery properly
|
|
return nil
|
|
})
|
|
shutdownManager.Register(mdnsComponent)
|
|
}
|
|
|
|
// P2P node (close last as other components depend on it)
|
|
p2pComponent := shutdown.NewP2PNodeComponent("p2p-node", func() error {
|
|
return node.Close()
|
|
}, 60)
|
|
shutdownManager.Register(p2pComponent)
|
|
|
|
// Add shutdown hooks
|
|
setupShutdownHooks(shutdownManager)
|
|
}
|
|
|
|
// setupShutdownHooks adds hooks for different shutdown phases
|
|
func setupShutdownHooks(shutdownManager *shutdown.Manager) {
|
|
// Pre-shutdown: Save state and notify peers
|
|
shutdownManager.AddHook(shutdown.PhasePreShutdown, func(ctx context.Context) error {
|
|
fmt.Println("🔄 Pre-shutdown: Notifying peers and saving state...")
|
|
// In real implementation: notify peers, save critical state
|
|
return nil
|
|
})
|
|
|
|
// Post-shutdown: Final cleanup
|
|
shutdownManager.AddHook(shutdown.PhasePostShutdown, func(ctx context.Context) error {
|
|
fmt.Println("🔄 Post-shutdown: Performing final cleanup...")
|
|
// In real implementation: flush logs, clean temporary files
|
|
return nil
|
|
})
|
|
|
|
// Cleanup: Final state persistence
|
|
shutdownManager.AddHook(shutdown.PhaseCleanup, func(ctx context.Context) error {
|
|
fmt.Println("🔄 Cleanup: Finalizing shutdown...")
|
|
// In real implementation: persist final state, cleanup resources
|
|
return nil
|
|
})
|
|
}
|
|
|
|
// simpleLogger implements basic logging for shutdown and health systems
|
|
type simpleLogger struct{}
|
|
|
|
func (l *simpleLogger) Info(msg string, args ...interface{}) {
|
|
fmt.Printf("[INFO] "+msg+"\n", args...)
|
|
}
|
|
|
|
func (l *simpleLogger) Warn(msg string, args ...interface{}) {
|
|
fmt.Printf("[WARN] "+msg+"\n", args...)
|
|
}
|
|
|
|
func (l *simpleLogger) Error(msg string, args ...interface{}) {
|
|
fmt.Printf("[ERROR] "+msg+"\n", args...)
|
|
}
|
|
|
|
// announceAvailability broadcasts current working status for task assignment
|
|
func announceAvailability(ps *pubsub.PubSub, nodeID string, taskTracker *SimpleTaskTracker) {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for ; ; <-ticker.C {
|
|
currentTasks := taskTracker.GetActiveTasks()
|
|
maxTasks := taskTracker.GetMaxTasks()
|
|
isAvailable := len(currentTasks) < maxTasks
|
|
|
|
status := "ready"
|
|
if len(currentTasks) >= maxTasks {
|
|
status = "busy"
|
|
} else if len(currentTasks) > 0 {
|
|
status = "working"
|
|
}
|
|
|
|
availability := map[string]interface{}{
|
|
"node_id": nodeID,
|
|
"available_for_work": isAvailable,
|
|
"current_tasks": len(currentTasks),
|
|
"max_tasks": maxTasks,
|
|
"last_activity": time.Now().Unix(),
|
|
"status": status,
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
if err := ps.PublishBzzzMessage(pubsub.AvailabilityBcast, availability); err != nil {
|
|
fmt.Printf("❌ Failed to announce availability: %v\n", err)
|
|
}
|
|
}
|
|
}
|
|
|
|
// detectAvailableOllamaModels queries Ollama API for available models
|
|
func detectAvailableOllamaModels() ([]string, error) {
|
|
resp, err := http.Get("http://localhost:11434/api/tags")
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to connect to Ollama API: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, fmt.Errorf("Ollama API returned status %d", resp.StatusCode)
|
|
}
|
|
|
|
var tagsResponse struct {
|
|
Models []struct {
|
|
Name string `json:"name"`
|
|
} `json:"models"`
|
|
}
|
|
|
|
if err := json.NewDecoder(resp.Body).Decode(&tagsResponse); err != nil {
|
|
return nil, fmt.Errorf("failed to decode Ollama response: %w", err)
|
|
}
|
|
|
|
models := make([]string, 0, len(tagsResponse.Models))
|
|
for _, model := range tagsResponse.Models {
|
|
models = append(models, model.Name)
|
|
}
|
|
|
|
return models, nil
|
|
}
|
|
|
|
// selectBestModel calls the model selection webhook to choose the best model for a prompt
|
|
func selectBestModel(webhookURL string, availableModels []string, prompt string) (string, error) {
|
|
if webhookURL == "" || len(availableModels) == 0 {
|
|
// Fallback to first available model
|
|
if len(availableModels) > 0 {
|
|
return availableModels[0], nil
|
|
}
|
|
return "", fmt.Errorf("no models available")
|
|
}
|
|
|
|
requestPayload := map[string]interface{}{
|
|
"models": availableModels,
|
|
"prompt": prompt,
|
|
}
|
|
|
|
payloadBytes, err := json.Marshal(requestPayload)
|
|
if err != nil {
|
|
// Fallback on error
|
|
return availableModels[0], nil
|
|
}
|
|
|
|
resp, err := http.Post(webhookURL, "application/json", bytes.NewBuffer(payloadBytes))
|
|
if err != nil {
|
|
// Fallback on error
|
|
return availableModels[0], nil
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
// Fallback on error
|
|
return availableModels[0], nil
|
|
}
|
|
|
|
var response struct {
|
|
Model string `json:"model"`
|
|
}
|
|
|
|
if err := json.NewDecoder(resp.Body).Decode(&response); err != nil {
|
|
// Fallback on error
|
|
return availableModels[0], nil
|
|
}
|
|
|
|
// Validate that the returned model is in our available list
|
|
for _, model := range availableModels {
|
|
if model == response.Model {
|
|
return response.Model, nil
|
|
}
|
|
}
|
|
|
|
// Fallback if webhook returned invalid model
|
|
return availableModels[0], nil
|
|
}
|
|
|
|
// announceCapabilitiesOnChange broadcasts capabilities only when they change
|
|
func announceCapabilitiesOnChange(ps *pubsub.PubSub, nodeID string, cfg *config.Config) {
|
|
// Detect available Ollama models and update config
|
|
availableModels, err := detectAvailableOllamaModels()
|
|
if err != nil {
|
|
fmt.Printf("⚠️ Failed to detect Ollama models: %v\n", err)
|
|
fmt.Printf("🔄 Using configured models: %v\n", cfg.Agent.Models)
|
|
} else {
|
|
// Filter configured models to only include available ones
|
|
validModels := make([]string, 0)
|
|
for _, configModel := range cfg.Agent.Models {
|
|
for _, availableModel := range availableModels {
|
|
if configModel == availableModel {
|
|
validModels = append(validModels, configModel)
|
|
break
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(validModels) == 0 {
|
|
fmt.Printf("⚠️ No configured models available in Ollama, using first available: %v\n", availableModels)
|
|
if len(availableModels) > 0 {
|
|
validModels = []string{availableModels[0]}
|
|
}
|
|
} else {
|
|
fmt.Printf("✅ Available models: %v\n", validModels)
|
|
}
|
|
|
|
// Update config with available models
|
|
cfg.Agent.Models = validModels
|
|
|
|
// Configure reasoning module with available models and webhook
|
|
reasoning.SetModelConfig(validModels, cfg.Agent.ModelSelectionWebhook, cfg.Agent.DefaultReasoningModel)
|
|
}
|
|
|
|
// Get current capabilities
|
|
currentCaps := map[string]interface{}{
|
|
"node_id": nodeID,
|
|
"capabilities": cfg.Agent.Capabilities,
|
|
"models": cfg.Agent.Models,
|
|
"version": "0.2.0",
|
|
"specialization": cfg.Agent.Specialization,
|
|
}
|
|
|
|
// Load stored capabilities from file
|
|
storedCaps, err := loadStoredCapabilities(nodeID)
|
|
if err != nil {
|
|
fmt.Printf("📄 No stored capabilities found, treating as first run\n")
|
|
storedCaps = nil
|
|
}
|
|
|
|
// Check if capabilities have changed
|
|
if capabilitiesChanged(currentCaps, storedCaps) {
|
|
fmt.Printf("🔄 Capabilities changed, broadcasting update\n")
|
|
|
|
currentCaps["timestamp"] = time.Now().Unix()
|
|
currentCaps["reason"] = getChangeReason(currentCaps, storedCaps)
|
|
|
|
// Broadcast the change
|
|
if err := ps.PublishBzzzMessage(pubsub.CapabilityBcast, currentCaps); err != nil {
|
|
fmt.Printf("❌ Failed to announce capabilities: %v\n", err)
|
|
} else {
|
|
// Store new capabilities
|
|
if err := storeCapabilities(nodeID, currentCaps); err != nil {
|
|
fmt.Printf("❌ Failed to store capabilities: %v\n", err)
|
|
}
|
|
}
|
|
} else {
|
|
fmt.Printf("✅ Capabilities unchanged since last run\n")
|
|
}
|
|
}
|
|
|
|
// statusReporter provides periodic status updates
|
|
func statusReporter(node *p2p.Node) {
|
|
ticker := time.NewTicker(30 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for ; ; <-ticker.C {
|
|
peers := node.ConnectedPeers()
|
|
fmt.Printf("📊 Status: %d connected peers\n", peers)
|
|
}
|
|
}
|
|
|
|
// getCapabilitiesFile returns the path to store capabilities for a node
|
|
func getCapabilitiesFile(nodeID string) string {
|
|
homeDir, _ := os.UserHomeDir()
|
|
return filepath.Join(homeDir, ".config", "bzzz", fmt.Sprintf("capabilities-%s.json", nodeID))
|
|
}
|
|
|
|
// loadStoredCapabilities loads previously stored capabilities from disk
|
|
func loadStoredCapabilities(nodeID string) (map[string]interface{}, error) {
|
|
capFile := getCapabilitiesFile(nodeID)
|
|
|
|
data, err := os.ReadFile(capFile)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
var capabilities map[string]interface{}
|
|
if err := json.Unmarshal(data, &capabilities); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
return capabilities, nil
|
|
}
|
|
|
|
// storeCapabilities saves current capabilities to disk
|
|
func storeCapabilities(nodeID string, capabilities map[string]interface{}) error {
|
|
capFile := getCapabilitiesFile(nodeID)
|
|
|
|
// Ensure directory exists
|
|
if err := os.MkdirAll(filepath.Dir(capFile), 0755); err != nil {
|
|
return err
|
|
}
|
|
|
|
data, err := json.MarshalIndent(capabilities, "", " ")
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
return os.WriteFile(capFile, data, 0644)
|
|
}
|
|
|
|
// capabilitiesChanged compares current and stored capabilities
|
|
func capabilitiesChanged(current, stored map[string]interface{}) bool {
|
|
if stored == nil {
|
|
return true // First run, always announce
|
|
}
|
|
|
|
// Compare important fields that indicate capability changes
|
|
compareFields := []string{"capabilities", "models", "specialization"}
|
|
|
|
for _, field := range compareFields {
|
|
if !reflect.DeepEqual(current[field], stored[field]) {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// getChangeReason determines why capabilities changed
|
|
func getChangeReason(current, stored map[string]interface{}) string {
|
|
if stored == nil {
|
|
return "startup"
|
|
}
|
|
|
|
if !reflect.DeepEqual(current["models"], stored["models"]) {
|
|
return "model_change"
|
|
}
|
|
|
|
if !reflect.DeepEqual(current["capabilities"], stored["capabilities"]) {
|
|
return "capability_change"
|
|
}
|
|
|
|
if !reflect.DeepEqual(current["specialization"], stored["specialization"]) {
|
|
return "specialization_change"
|
|
}
|
|
|
|
return "unknown_change"
|
|
}
|
|
|
|
// getDefaultRoleForSpecialization maps specializations to default roles
|
|
func getDefaultRoleForSpecialization(specialization string) string {
|
|
roleMap := map[string]string{
|
|
"code_generation": "backend_developer",
|
|
"advanced_reasoning": "senior_software_architect",
|
|
"code_analysis": "security_expert",
|
|
"general_developer": "full_stack_engineer",
|
|
"debugging": "qa_engineer",
|
|
"frontend": "frontend_developer",
|
|
"backend": "backend_developer",
|
|
"devops": "devops_engineer",
|
|
"security": "security_expert",
|
|
"design": "ui_ux_designer",
|
|
"architecture": "senior_software_architect",
|
|
}
|
|
|
|
if role, exists := roleMap[specialization]; exists {
|
|
return role
|
|
}
|
|
|
|
// Default fallback
|
|
return "full_stack_engineer"
|
|
}
|
|
|
|
// announceRoleOnStartup announces the agent's role when starting up
|
|
func announceRoleOnStartup(ps *pubsub.PubSub, nodeID string, cfg *config.Config) {
|
|
if cfg.Agent.Role == "" {
|
|
return // No role to announce
|
|
}
|
|
|
|
roleData := map[string]interface{}{
|
|
"node_id": nodeID,
|
|
"role": cfg.Agent.Role,
|
|
"expertise": cfg.Agent.Expertise,
|
|
"reports_to": cfg.Agent.ReportsTo,
|
|
"deliverables": cfg.Agent.Deliverables,
|
|
"capabilities": cfg.Agent.Capabilities,
|
|
"specialization": cfg.Agent.Specialization,
|
|
"timestamp": time.Now().Unix(),
|
|
"status": "online",
|
|
}
|
|
|
|
opts := pubsub.MessageOptions{
|
|
FromRole: cfg.Agent.Role,
|
|
RequiredExpertise: cfg.Agent.Expertise,
|
|
Priority: "medium",
|
|
}
|
|
|
|
if err := ps.PublishRoleBasedMessage(pubsub.RoleAnnouncement, roleData, opts); err != nil {
|
|
fmt.Printf("❌ Failed to announce role: %v\n", err)
|
|
} else {
|
|
fmt.Printf("📢 Role announced: %s\n", cfg.Agent.Role)
|
|
}
|
|
}
|
|
|
|
// testEndToEndDecisionFlow tests the complete encrypted decision publishing and retrieval flow
|
|
func testEndToEndDecisionFlow(publisher *ucxl.DecisionPublisher, storage *dht.EncryptedDHTStorage) {
|
|
if publisher == nil || storage == nil {
|
|
fmt.Printf("⚪ Skipping end-to-end test (components not initialized)\n")
|
|
return
|
|
}
|
|
|
|
fmt.Printf("🧪 Testing end-to-end encrypted decision flow...\n")
|
|
|
|
// Test 1: Publish an architectural decision
|
|
err := publisher.PublishArchitecturalDecision(
|
|
"implement_unified_bzzz_slurp",
|
|
"Integrate SLURP as specialized BZZZ agent with admin role for unified P2P architecture",
|
|
"Eliminates separate system complexity and leverages existing P2P infrastructure",
|
|
[]string{"Keep separate systems", "Use different consensus algorithm"},
|
|
[]string{"Single point of coordination", "Improved failover", "Simplified deployment"},
|
|
[]string{"Test consensus elections", "Implement key reconstruction", "Deploy to cluster"},
|
|
)
|
|
if err != nil {
|
|
fmt.Printf("❌ Failed to publish architectural decision: %v\n", err)
|
|
return
|
|
}
|
|
fmt.Printf("✅ Published architectural decision\n")
|
|
|
|
// Test 2: Publish a code decision
|
|
testResults := &ucxl.TestResults{
|
|
Passed: 15,
|
|
Failed: 2,
|
|
Skipped: 1,
|
|
Coverage: 78.5,
|
|
FailedTests: []string{"TestElection_SplitBrain", "TestCrypto_KeyReconstruction"},
|
|
}
|
|
|
|
err = publisher.PublishCodeDecision(
|
|
"implement_age_encryption",
|
|
"Implemented Age encryption for role-based UCXL content security",
|
|
[]string{"pkg/crypto/age_crypto.go", "pkg/dht/encrypted_storage.go"},
|
|
578,
|
|
testResults,
|
|
[]string{"filippo.io/age", "github.com/libp2p/go-libp2p-kad-dht"},
|
|
)
|
|
if err != nil {
|
|
fmt.Printf("❌ Failed to publish code decision: %v\n", err)
|
|
return
|
|
}
|
|
fmt.Printf("✅ Published code decision\n")
|
|
|
|
// Test 3: Query recent decisions
|
|
time.Sleep(1 * time.Second) // Allow decisions to propagate
|
|
|
|
decisions, err := publisher.QueryRecentDecisions("", "", "", 10, time.Now().Add(-1*time.Hour))
|
|
if err != nil {
|
|
fmt.Printf("❌ Failed to query recent decisions: %v\n", err)
|
|
return
|
|
}
|
|
|
|
fmt.Printf("🔍 Found %d recent decisions:\n", len(decisions))
|
|
for i, metadata := range decisions {
|
|
fmt.Printf(" %d. %s (creator: %s, type: %s)\n",
|
|
i+1, metadata.Address, metadata.CreatorRole, metadata.ContentType)
|
|
}
|
|
|
|
// Test 4: Retrieve and decrypt a specific decision
|
|
if len(decisions) > 0 {
|
|
decision, err := publisher.GetDecisionContent(decisions[0].Address)
|
|
if err != nil {
|
|
fmt.Printf("❌ Failed to retrieve decision content: %v\n", err)
|
|
} else {
|
|
fmt.Printf("✅ Retrieved decision: %s (%s)\n", decision.Task, decision.Decision)
|
|
fmt.Printf(" Files modified: %d, Success: %t\n", len(decision.FilesModified), decision.Success)
|
|
}
|
|
}
|
|
|
|
// Test 5: Publish system status
|
|
metrics := map[string]interface{}{
|
|
"uptime_seconds": 300,
|
|
"active_peers": 3,
|
|
"dht_entries": len(decisions),
|
|
"encryption_ops": 25,
|
|
"decryption_ops": 8,
|
|
"memory_usage_mb": 145.7,
|
|
}
|
|
|
|
healthChecks := map[string]bool{
|
|
"dht_connected": true,
|
|
"elections_ready": true,
|
|
"crypto_functional": true,
|
|
"peers_discovered": true,
|
|
}
|
|
|
|
err = publisher.PublishSystemStatus("All systems operational - Phase 2B implementation complete", metrics, healthChecks)
|
|
if err != nil {
|
|
fmt.Printf("❌ Failed to publish system status: %v\n", err)
|
|
} else {
|
|
fmt.Printf("✅ Published system status\n")
|
|
}
|
|
|
|
fmt.Printf("🎉 End-to-end encrypted decision flow test completed successfully!\n")
|
|
fmt.Printf("🔐 All decisions encrypted with role-based Age encryption\n")
|
|
fmt.Printf("🕸️ Content stored in distributed DHT with local caching\n")
|
|
fmt.Printf("🔍 Content discoverable and retrievable by authorized roles\n")
|
|
}
|
|
|
|
// startSetupMode starts the web-based configuration interface
|
|
func startSetupMode(configPath string) {
|
|
fmt.Println("🌐 Starting web-based setup interface...")
|
|
fmt.Printf("📂 Configuration will be saved to: %s\n", configPath)
|
|
|
|
// Create setup-only HTTP server
|
|
setupServer := &http.Server{
|
|
Addr: ":8090",
|
|
ReadTimeout: 15 * time.Second,
|
|
WriteTimeout: 15 * time.Second,
|
|
IdleTimeout: 60 * time.Second,
|
|
}
|
|
|
|
// Setup routes
|
|
http.HandleFunc("/", handleSetupUI)
|
|
http.HandleFunc("/setup/", handleSetupUI)
|
|
|
|
// API routes for setup
|
|
setupManager := api.NewSetupManager(configPath)
|
|
http.HandleFunc("/api/setup/required", corsHandler(handleSetupRequired(setupManager)))
|
|
http.HandleFunc("/api/setup/system", corsHandler(handleSystemDetection(setupManager)))
|
|
http.HandleFunc("/api/setup/repository/validate", corsHandler(handleRepositoryValidation(setupManager)))
|
|
http.HandleFunc("/api/setup/repository/providers", corsHandler(handleRepositoryProviders(setupManager)))
|
|
http.HandleFunc("/api/setup/license/validate", corsHandler(handleLicenseValidation(setupManager)))
|
|
http.HandleFunc("/api/setup/validate", corsHandler(handleConfigValidation(setupManager)))
|
|
http.HandleFunc("/api/setup/save", corsHandler(handleConfigSave(setupManager)))
|
|
http.HandleFunc("/api/setup/discover-machines", corsHandler(handleDiscoverMachines(setupManager)))
|
|
http.HandleFunc("/api/setup/test-ssh", corsHandler(handleTestSSH(setupManager)))
|
|
http.HandleFunc("/api/setup/deploy-service", corsHandler(handleDeployService(setupManager)))
|
|
http.HandleFunc("/api/health", corsHandler(handleSetupHealth))
|
|
|
|
fmt.Printf("🎯 Setup interface available at: http://localhost:8090\n")
|
|
fmt.Printf("🔧 Complete the setup to start BZZZ in full mode\n")
|
|
fmt.Printf("📄 Press Ctrl+C to exit setup\n")
|
|
|
|
if err := setupServer.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
log.Fatalf("Setup server failed: %v", err)
|
|
}
|
|
}
|
|
|
|
// handleSetupUI serves the embedded web UI or fallback page
|
|
func handleSetupUI(w http.ResponseWriter, r *http.Request) {
|
|
// Enable CORS
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
|
|
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
|
|
|
|
if r.Method == "OPTIONS" {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
// Determine file path
|
|
path := r.URL.Path
|
|
if path == "/" || path == "/setup" || path == "/setup/" {
|
|
path = "/index.html"
|
|
}
|
|
|
|
// Remove /setup prefix if present
|
|
if len(path) > 6 && path[:6] == "/setup" {
|
|
path = path[6:]
|
|
}
|
|
|
|
// Try to serve from embedded file system
|
|
if web.IsEmbeddedFileAvailable(path) {
|
|
web.ServeEmbeddedFile(w, r, path)
|
|
} else if web.IsEmbeddedFileAvailable("index.html") {
|
|
// Fallback to index.html for SPA routing
|
|
web.ServeEmbeddedFile(w, r, "index.html")
|
|
} else {
|
|
// Serve placeholder page if web UI not built
|
|
web.ServeEmbeddedFile(w, r, "index.html")
|
|
}
|
|
}
|
|
|
|
// corsHandler wraps handlers with CORS support
|
|
func corsHandler(handler http.HandlerFunc) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Access-Control-Allow-Origin", "*")
|
|
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
|
|
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
|
|
|
|
if r.Method == "OPTIONS" {
|
|
w.WriteHeader(http.StatusOK)
|
|
return
|
|
}
|
|
|
|
handler(w, r)
|
|
}
|
|
}
|
|
|
|
// Setup API handlers (simplified versions)
|
|
func handleSetupRequired(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
response := map[string]interface{}{
|
|
"setup_required": sm.IsSetupRequired(),
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
}
|
|
|
|
func handleSystemDetection(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
systemInfo, err := sm.DetectSystemInfo()
|
|
if err != nil {
|
|
http.Error(w, fmt.Sprintf("System detection failed: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
response := map[string]interface{}{
|
|
"system_info": systemInfo,
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
}
|
|
|
|
func handleRepositoryValidation(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
if r.Method != "POST" {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var repoConfig api.RepositoryConfig
|
|
if err := json.NewDecoder(r.Body).Decode(&repoConfig); err != nil {
|
|
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if err := sm.ValidateRepositoryConfig(&repoConfig); err != nil {
|
|
response := map[string]interface{}{
|
|
"valid": false,
|
|
"error": err.Error(),
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
json.NewEncoder(w).Encode(response)
|
|
return
|
|
}
|
|
|
|
response := map[string]interface{}{
|
|
"valid": true,
|
|
"message": "Repository configuration is valid",
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
}
|
|
|
|
func handleRepositoryProviders(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
providers := sm.GetSupportedProviders()
|
|
response := map[string]interface{}{
|
|
"providers": providers,
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
}
|
|
|
|
func handleConfigValidation(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
if r.Method != "POST" {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var setupConfig api.SetupConfig
|
|
if err := json.NewDecoder(r.Body).Decode(&setupConfig); err != nil {
|
|
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
errors := []string{}
|
|
if setupConfig.AgentID == "" {
|
|
errors = append(errors, "agent_id is required")
|
|
}
|
|
if len(setupConfig.Capabilities) == 0 {
|
|
errors = append(errors, "at least one capability is required")
|
|
}
|
|
if len(setupConfig.Models) == 0 {
|
|
errors = append(errors, "at least one model is required")
|
|
}
|
|
|
|
if setupConfig.Repository != nil {
|
|
if err := sm.ValidateRepositoryConfig(setupConfig.Repository); err != nil {
|
|
errors = append(errors, fmt.Sprintf("repository configuration: %v", err))
|
|
}
|
|
}
|
|
|
|
if len(errors) > 0 {
|
|
response := map[string]interface{}{
|
|
"valid": false,
|
|
"errors": errors,
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
json.NewEncoder(w).Encode(response)
|
|
return
|
|
}
|
|
|
|
response := map[string]interface{}{
|
|
"valid": true,
|
|
"message": "Configuration is valid",
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
}
|
|
|
|
func handleConfigSave(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
if r.Method != "POST" {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var setupConfig api.SetupConfig
|
|
if err := json.NewDecoder(r.Body).Decode(&setupConfig); err != nil {
|
|
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if setupConfig.AgentID == "" {
|
|
http.Error(w, "agent_id is required", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
if setupConfig.Repository != nil {
|
|
if err := sm.ValidateRepositoryConfig(setupConfig.Repository); err != nil {
|
|
http.Error(w, fmt.Sprintf("Invalid repository configuration: %v", err), http.StatusBadRequest)
|
|
return
|
|
}
|
|
}
|
|
|
|
if err := sm.SaveConfiguration(&setupConfig); err != nil {
|
|
http.Error(w, fmt.Sprintf("Failed to save configuration: %v", err), http.StatusInternalServerError)
|
|
return
|
|
}
|
|
|
|
response := map[string]interface{}{
|
|
"success": true,
|
|
"message": "Configuration saved successfully",
|
|
"timestamp": time.Now().Unix(),
|
|
"restart_required": true,
|
|
}
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
}
|
|
|
|
func handleLicenseValidation(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
if r.Method != "POST" {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var licenseRequest struct {
|
|
Email string `json:"email"`
|
|
LicenseKey string `json:"licenseKey"`
|
|
OrganizationName string `json:"organizationName,omitempty"`
|
|
}
|
|
|
|
if err := json.NewDecoder(r.Body).Decode(&licenseRequest); err != nil {
|
|
http.Error(w, "Invalid JSON payload", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Validate input
|
|
if licenseRequest.Email == "" {
|
|
response := map[string]interface{}{
|
|
"valid": false,
|
|
"message": "Email is required",
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
json.NewEncoder(w).Encode(response)
|
|
return
|
|
}
|
|
|
|
if licenseRequest.LicenseKey == "" {
|
|
response := map[string]interface{}{
|
|
"valid": false,
|
|
"message": "License key is required",
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
w.WriteHeader(http.StatusBadRequest)
|
|
json.NewEncoder(w).Encode(response)
|
|
return
|
|
}
|
|
|
|
// Mock license validation logic
|
|
// In production, this would call a license server API
|
|
validLicenseKey := "BZZZ-2025-DEMO-EVAL-001"
|
|
|
|
if licenseRequest.LicenseKey != validLicenseKey {
|
|
response := map[string]interface{}{
|
|
"valid": false,
|
|
"message": "Invalid license key. Please check your license key and try again.",
|
|
"timestamp": time.Now().Unix(),
|
|
}
|
|
w.WriteHeader(http.StatusUnauthorized)
|
|
json.NewEncoder(w).Encode(response)
|
|
return
|
|
}
|
|
|
|
// License is valid - return success with details
|
|
response := map[string]interface{}{
|
|
"valid": true,
|
|
"message": "License validated successfully",
|
|
"timestamp": time.Now().Unix(),
|
|
"details": map[string]interface{}{
|
|
"licenseType": "Evaluation",
|
|
"maxNodes": "5",
|
|
"expiresAt": "2025-12-31",
|
|
"features": []string{
|
|
"Distributed Task Coordination",
|
|
"AI Model Integration",
|
|
"Repository Management",
|
|
"Cluster Formation",
|
|
"Security Features",
|
|
},
|
|
},
|
|
}
|
|
|
|
json.NewEncoder(w).Encode(response)
|
|
}
|
|
}
|
|
|
|
func handleSetupHealth(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
health := map[string]interface{}{
|
|
"status": "setup_mode",
|
|
"timestamp": time.Now().Unix(),
|
|
"web_ui": web.IsEmbeddedFileAvailable("index.html"),
|
|
}
|
|
json.NewEncoder(w).Encode(health)
|
|
}
|
|
|
|
// handleDiscoverMachines discovers machines on the network subnet
|
|
func handleDiscoverMachines(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != "POST" {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var request struct {
|
|
Subnet string `json:"subnet"`
|
|
SSHKey string `json:"sshKey"`
|
|
}
|
|
|
|
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
|
|
http.Error(w, "Invalid request body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
machines, err := sm.DiscoverNetworkMachines(request.Subnet, request.SSHKey)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": err.Error(),
|
|
"machines": []interface{}{},
|
|
})
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": true,
|
|
"machines": machines,
|
|
})
|
|
}
|
|
}
|
|
|
|
// handleTestSSH tests SSH connection to a machine
|
|
func handleTestSSH(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != "POST" {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var request struct {
|
|
IP string `json:"ip"`
|
|
SSHKey string `json:"sshKey"`
|
|
SSHUsername string `json:"sshUsername"`
|
|
SSHPassword string `json:"sshPassword"`
|
|
SSHPort int `json:"sshPort"`
|
|
}
|
|
|
|
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
|
|
http.Error(w, "Invalid request body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
result, err := sm.TestSSHConnection(request.IP, request.SSHKey, request.SSHUsername, request.SSHPassword, request.SSHPort)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(result)
|
|
}
|
|
}
|
|
|
|
// handleDeployService deploys BZZZ service to a machine
|
|
func handleDeployService(sm *api.SetupManager) http.HandlerFunc {
|
|
return func(w http.ResponseWriter, r *http.Request) {
|
|
if r.Method != "POST" {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
var request struct {
|
|
IP string `json:"ip"`
|
|
SSHKey string `json:"sshKey"`
|
|
SSHUsername string `json:"sshUsername"`
|
|
SSHPassword string `json:"sshPassword"`
|
|
SSHPort int `json:"sshPort"`
|
|
Config struct {
|
|
Ports struct {
|
|
API int `json:"api"`
|
|
MCP int `json:"mcp"`
|
|
WebUI int `json:"webui"`
|
|
P2P int `json:"p2p"`
|
|
} `json:"ports"`
|
|
Security interface{} `json:"security"`
|
|
AutoStart bool `json:"autoStart"`
|
|
} `json:"config"`
|
|
}
|
|
|
|
if err := json.NewDecoder(r.Body).Decode(&request); err != nil {
|
|
http.Error(w, "Invalid request body", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// Convert the struct config to a map[string]interface{} format that the backend expects
|
|
configMap := map[string]interface{}{
|
|
"ports": map[string]interface{}{
|
|
"api": request.Config.Ports.API,
|
|
"mcp": request.Config.Ports.MCP,
|
|
"webui": request.Config.Ports.WebUI,
|
|
"p2p": request.Config.Ports.P2P,
|
|
},
|
|
"security": request.Config.Security,
|
|
"autoStart": request.Config.AutoStart,
|
|
}
|
|
|
|
result, err := sm.DeployServiceToMachine(request.IP, request.SSHKey, request.SSHUsername, request.SSHPassword, request.SSHPort, configMap)
|
|
if err != nil {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
|
"success": false,
|
|
"error": err.Error(),
|
|
})
|
|
return
|
|
}
|
|
|
|
w.Header().Set("Content-Type", "application/json")
|
|
json.NewEncoder(w).Encode(result)
|
|
}
|
|
}
|