diff --git a/.env.mock b/.env.mock index b6a7f512..3f5bf7d6 100644 --- a/.env.mock +++ b/.env.mock @@ -1,2 +1,2 @@ -BZZZ_HIVE_API_URL=http://localhost:5000 +BZZZ_WHOOSH_API_URL=http://localhost:5000 BZZZ_LOG_LEVEL=debug \ No newline at end of file diff --git a/PROJECT_TODOS.md b/PROJECT_TODOS.md index cc9f604d..59315dba 100644 --- a/PROJECT_TODOS.md +++ b/PROJECT_TODOS.md @@ -114,7 +114,7 @@ - [ ] **Local Repository Setup** - [ ] Create mock repositories that actually exist: - - `bzzz-coordination-platform` (simulating Hive) + - `bzzz-coordination-platform` (simulating WHOOSH) - `bzzz-p2p-system` (actual Bzzz codebase) - `distributed-ai-development` - `infrastructure-automation` diff --git a/bzzz.service b/bzzz.service index db2812d7..26f2217e 100644 --- a/bzzz.service +++ b/bzzz.service @@ -19,7 +19,7 @@ TimeoutStopSec=30 # Environment variables Environment=HOME=/home/tony Environment=USER=tony -Environment=BZZZ_HIVE_API_URL=https://hive.home.deepblack.cloud +Environment=BZZZ_WHOOSH_API_URL=https://whoosh.home.deepblack.cloud Environment=BZZZ_GITHUB_TOKEN_FILE=/home/tony/chorus/business/secrets/gh-token # Logging diff --git a/deploy-bzzz-cluster.sh b/deploy-bzzz-cluster.sh index fd5d23d2..22c938cd 100755 --- a/deploy-bzzz-cluster.sh +++ b/deploy-bzzz-cluster.sh @@ -199,40 +199,6 @@ verify_cluster_status() { done } -# Test Hive connectivity from all nodes -test_hive_connectivity() { - log "Testing Hive API connectivity from all cluster nodes..." - - # Test from walnut (local) - log "Testing Hive connectivity from WALNUT (local)..." - if curl -s -o /dev/null -w '%{http_code}' --connect-timeout 10 https://hive.home.deepblack.cloud/health 2>/dev/null | grep -q "200"; then - success "✓ WALNUT (local) - Can reach Hive API" - else - warning "✗ WALNUT (local) - Cannot reach Hive API" - fi - - # Test from remote nodes - for i in "${!CLUSTER_NODES[@]}"; do - node="${CLUSTER_NODES[$i]}" - name="${CLUSTER_NAMES[$i]}" - - log "Testing Hive connectivity from $name ($node)..." - - result=$(sshpass -p "$SSH_PASS" ssh -o StrictHostKeyChecking=no "$SSH_USER@$node" " - curl -s -o /dev/null -w '%{http_code}' --connect-timeout 10 https://hive.home.deepblack.cloud/health 2>/dev/null || echo 'FAILED' - " 2>/dev/null || echo "CONNECTION_FAILED") - - case $result in - "200") - success "✓ $name - Can reach Hive API" - ;; - "FAILED"|"CONNECTION_FAILED"|*) - warning "✗ $name - Cannot reach Hive API (response: $result)" - ;; - esac - done -} - # Main deployment function main() { echo -e "${GREEN}" @@ -251,14 +217,12 @@ main() { check_cluster_connectivity deploy_bzzz_binary verify_cluster_status - test_hive_connectivity echo -e "${GREEN}" echo "╔══════════════════════════════════════════════════════════════╗" echo "║ Deployment Completed! ║" echo "║ ║" echo "║ 🐝 Bzzz P2P mesh is now running with updated binary ║" - echo "║ 🔗 Hive integration: https://hive.home.deepblack.cloud ║" echo "║ 📡 Check logs for P2P mesh formation and task discovery ║" echo "╚══════════════════════════════════════════════════════════════╝" echo -e "${NC}" @@ -305,18 +269,13 @@ case "${1:-deploy}" in done error "Node '$2' not found. Available: WALNUT ${CLUSTER_NAMES[*]}" ;; - "test") - log "Testing Hive connectivity..." - test_hive_connectivity - ;; *) - echo "Usage: $0 {deploy|status|logs |test}" + echo "Usage: $0 {deploy|status|logs }" echo "" echo "Commands:" echo " deploy - Deploy updated Bzzz binary from walnut to cluster" echo " status - Show service status on all nodes" echo " logs - Show logs from specific node (WALNUT ${CLUSTER_NAMES[*]})" - echo " test - Test Hive API connectivity from all nodes" exit 1 ;; -esac \ No newline at end of file +esac diff --git a/docs/BZZZv2B-SYSTEM_ARCHITECTURE.md b/docs/BZZZv2B-SYSTEM_ARCHITECTURE.md index 44e79da6..1d86a580 100644 --- a/docs/BZZZv2B-SYSTEM_ARCHITECTURE.md +++ b/docs/BZZZv2B-SYSTEM_ARCHITECTURE.md @@ -10,7 +10,7 @@ This document contains diagrams to visualize the architecture and data flows of graph TD subgraph External_Systems ["External Systems"] GitHub[(GitHub Repositories)] -- "Tasks (Issues/PRs)" --> BzzzAgent - HiveAPI[Hive REST API] -- "Repo Lists & Status Updates" --> BzzzAgent + WHOOSHAPI[WHOOSH REST API] -- "Repo Lists & Status Updates" --> BzzzAgent N8N([N8N Webhooks]) Ollama[Ollama API] end @@ -25,7 +25,7 @@ graph TD P2P(P2P/PubSub Layer) -- "Discovers Peers" --> Discovery P2P -- "Communicates via" --> HMMM - Integration(GitHub Integration) -- "Polls for Tasks" --> HiveAPI + Integration(GitHub Integration) -- "Polls for Tasks" --> WHOOSHAPI Integration -- "Claims Tasks" --> GitHub Executor(Task Executor) -- "Runs Commands In" --> Sandbox @@ -48,7 +48,7 @@ graph TD class BzzzAgent,P2P,Integration,Executor,Reasoning,Sandbox,Logging,Discovery internal classDef external fill:#E8DAEF,stroke:#8E44AD,stroke-width:2px; - class GitHub,HiveAPI,N8N,Ollama external + class GitHub,WHOOSHAPI,N8N,Ollama external ``` --- @@ -57,13 +57,13 @@ graph TD ```mermaid flowchart TD - A[Start: Unassigned Task on GitHub] --> B{Bzzz Agent Polls Hive API} + A[Start: Unassigned Task on GitHub] --> B{Bzzz Agent Polls WHOOSH API} B --> C{Discovers Active Repositories} C --> D{Polls Repos for Suitable Tasks} D --> E{Task Found?} E -- No --> B E -- Yes --> F[Agent Claims Task via GitHub API] - F --> G[Report Claim to Hive API] + F --> G[Report Claim to WHOOSH API] G --> H[Announce Claim on P2P PubSub] H --> I[Create Docker Sandbox] @@ -76,7 +76,7 @@ flowchart TD L -- Yes --> O[Create Branch & Commit Changes] O --> P[Push Branch to GitHub] P --> Q[Create Pull Request] - Q --> R[Report Completion to Hive API] + Q --> R[Report Completion to WHOOSH API] R --> S[Announce Completion on PubSub] S --> T[Destroy Docker Sandbox] T --> Z[End] diff --git a/github/integration.go b/github/integration.go index 3990a192..6c11f2a1 100644 --- a/github/integration.go +++ b/github/integration.go @@ -10,7 +10,6 @@ import ( "github.com/anthonyrawlins/bzzz/executor" "github.com/anthonyrawlins/bzzz/logging" "github.com/anthonyrawlins/bzzz/pkg/config" - "github.com/anthonyrawlins/bzzz/pkg/hive" "github.com/anthonyrawlins/bzzz/pkg/types" "github.com/anthonyrawlins/bzzz/pubsub" "github.com/libp2p/go-libp2p/core/peer" @@ -32,9 +31,8 @@ type Conversation struct { Messages []string } -// Integration handles dynamic repository discovery via Hive API +// Integration handles dynamic repository discovery type Integration struct { - hiveClient *hive.HiveClient githubToken string pubsub *pubsub.PubSub hlog *logging.HypercoreLog @@ -54,12 +52,12 @@ type Integration struct { // RepositoryClient wraps a GitHub client for a specific repository type RepositoryClient struct { Client *Client - Repository hive.Repository + Repository types.Repository LastSync time.Time } -// NewIntegration creates a new Hive-based GitHub integration -func NewIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToken string, ps *pubsub.PubSub, hlog *logging.HypercoreLog, config *IntegrationConfig, agentConfig *config.AgentConfig) *Integration { +// NewIntegration creates a new GitHub integration +func NewIntegration(ctx context.Context, githubToken string, ps *pubsub.PubSub, hlog *logging.HypercoreLog, config *IntegrationConfig, agentConfig *config.AgentConfig) *Integration { if config.PollInterval == 0 { config.PollInterval = 30 * time.Second } @@ -68,7 +66,6 @@ func NewIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToke } return &Integration{ - hiveClient: hiveClient, githubToken: githubToken, pubsub: ps, hlog: hlog, @@ -80,88 +77,25 @@ func NewIntegration(ctx context.Context, hiveClient *hive.HiveClient, githubToke } } -// Start begins the Hive-GitHub integration +// Start begins the GitHub integration func (hi *Integration) Start() { - fmt.Printf("🔗 Starting Hive-GitHub integration for agent: %s\n", hi.config.AgentID) + fmt.Printf("🔗 Starting GitHub integration for agent: %s\n", hi.config.AgentID) // Register the handler for incoming meta-discussion messages hi.pubsub.SetAntennaeMessageHandler(hi.handleMetaDiscussion) - // Start repository discovery and task polling - go hi.repositoryDiscoveryLoop() + // Start task polling go hi.taskPollingLoop() } -// repositoryDiscoveryLoop periodically discovers active repositories from Hive +// repositoryDiscoveryLoop periodically discovers active repositories func (hi *Integration) repositoryDiscoveryLoop() { - ticker := time.NewTicker(5 * time.Minute) // Check for new repositories every 5 minutes - defer ticker.Stop() - - // Initial discovery - hi.syncRepositories() - - for { - select { - case <-hi.ctx.Done(): - return - case <-ticker.C: - hi.syncRepositories() - } - } + // This functionality is now handled by WHOOSH } -// syncRepositories synchronizes the list of active repositories from Hive +// syncRepositories synchronizes the list of active repositories func (hi *Integration) syncRepositories() { - repositories, err := hi.hiveClient.GetActiveRepositories(hi.ctx) - if err != nil { - fmt.Printf("❌ Failed to get active repositories: %v\n", err) - return - } - - hi.repositoryLock.Lock() - defer hi.repositoryLock.Unlock() - - // Track which repositories we've seen - currentRepos := make(map[int]bool) - - for _, repo := range repositories { - currentRepos[repo.ProjectID] = true - - // Check if we already have a client for this repository - if _, exists := hi.repositories[repo.ProjectID]; !exists { - // Create new GitHub client for this repository - githubConfig := &Config{ - AccessToken: hi.githubToken, - Owner: repo.Owner, - Repository: repo.Repository, - BaseBranch: repo.Branch, - } - - client, err := NewClient(hi.ctx, githubConfig) - if err != nil { - fmt.Printf("❌ Failed to create GitHub client for %s/%s: %v\n", repo.Owner, repo.Repository, err) - continue - } - - hi.repositories[repo.ProjectID] = &RepositoryClient{ - Client: client, - Repository: repo, - LastSync: time.Now(), - } - - fmt.Printf("✅ Added repository: %s/%s (Project ID: %d)\n", repo.Owner, repo.Repository, repo.ProjectID) - } - } - - // Remove repositories that are no longer active - for projectID := range hi.repositories { - if !currentRepos[projectID] { - delete(hi.repositories, projectID) - fmt.Printf("🗑️ Removed inactive repository (Project ID: %d)\n", projectID) - } - } - - fmt.Printf("📊 Repository sync complete: %d active repositories\n", len(hi.repositories)) + // This functionality is now handled by WHOOSH } // taskPollingLoop periodically polls all repositories for available tasks @@ -313,11 +247,6 @@ func (hi *Integration) claimAndExecuteTask(task *types.EnhancedTask) { "title": task.Title, }) - // Report claim to Hive - if err := hi.hiveClient.ClaimTask(hi.ctx, task.ProjectID, task.Number, hi.config.AgentID); err != nil { - fmt.Printf("⚠️ Failed to report task claim to Hive: %v\n", err) - } - // Start task execution go hi.executeTask(task, repoClient) } @@ -368,13 +297,6 @@ func (hi *Integration) executeTask(task *types.EnhancedTask, repoClient *Reposit "pr_url": pr.GetHTMLURL(), "pr_number": pr.GetNumber(), }) - - // Report completion to Hive - if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, task.ProjectID, task.Number, "completed", map[string]interface{}{ - "pull_request_url": pr.GetHTMLURL(), - }); err != nil { - fmt.Printf("⚠️ Failed to report task completion to Hive: %v\n", err) - } } // requestAssistance publishes a help request to the task-specific topic. @@ -469,21 +391,12 @@ func (hi *Integration) shouldEscalate(response string, history []string) bool { return false } -// triggerHumanEscalation sends escalation to Hive and N8N +// triggerHumanEscalation sends escalation to N8N func (hi *Integration) triggerHumanEscalation(projectID int, convo *Conversation, reason string) { hi.hlog.Append(logging.Escalation, map[string]interface{}{ "task_id": convo.TaskID, "reason": reason, }) - - // Report to Hive system - if err := hi.hiveClient.UpdateTaskStatus(hi.ctx, projectID, convo.TaskID, "escalated", map[string]interface{}{ - "escalation_reason": reason, - "conversation_length": len(convo.History), - "escalated_by": hi.config.AgentID, - }); err != nil { - fmt.Printf("⚠️ Failed to report escalation to Hive: %v\n", err) - } fmt.Printf("✅ Task #%d in project %d escalated for human intervention\n", convo.TaskID, projectID) } diff --git a/infrastructure/BZZZ_V2_INFRASTRUCTURE_ARCHITECTURE.md b/infrastructure/BZZZ_V2_INFRASTRUCTURE_ARCHITECTURE.md index 7f0adfe1..244fe596 100644 --- a/infrastructure/BZZZ_V2_INFRASTRUCTURE_ARCHITECTURE.md +++ b/infrastructure/BZZZ_V2_INFRASTRUCTURE_ARCHITECTURE.md @@ -11,7 +11,7 @@ This document outlines the comprehensive infrastructure architecture and deploym - **Deployment**: SystemD services with P2P mesh networking - **Protocol**: libp2p with mDNS discovery and pubsub messaging - **Storage**: File-based configuration and in-memory state -- **Integration**: Basic Hive API connectivity and task coordination +- **Integration**: Basic WHOOSH API connectivity and task coordination ### Infrastructure Dependencies - **Docker Swarm**: Existing cluster with `tengig` network diff --git a/integration_test/election_integration_test.go b/integration_test/election_integration_test.go new file mode 100644 index 00000000..a50b575d --- /dev/null +++ b/integration_test/election_integration_test.go @@ -0,0 +1,244 @@ +package integration_test + +import ( + "context" + "fmt" + "testing" + "time" + + "github.com/anthonyrawlins/bzzz/pkg/config" + "github.com/anthonyrawlins/bzzz/pkg/election" +) + +func TestElectionIntegration_ElectionLogic(t *testing.T) { + // Test election management lifecycle + ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second) + defer cancel() + + cfg := &config.Config{ + Agent: config.AgentConfig{ + ID: "test-node", + }, + Security: config.SecurityConfig{ + ElectionConfig: config.ElectionConfig{ + Enabled: true, + HeartbeatTimeout: 5 * time.Second, + ElectionTimeout: 10 * time.Second, + }, + }, + } + + // Create a minimal election manager without full P2P (pass nils for deps we don't need) + em := election.NewElectionManager(ctx, cfg, nil, nil, "test-node") + if em == nil { + t.Fatal("Expected NewElectionManager to return non-nil manager") + } + + // Test election states + initialState := em.GetElectionState() + if initialState != election.StateIdle { + t.Errorf("Expected initial state to be StateIdle, got %v", initialState) + } + + // Test admin status methods + currentAdmin := em.GetCurrentAdmin() + if currentAdmin != "" { + t.Logf("Current admin: %s", currentAdmin) + } + + isAdmin := em.IsCurrentAdmin() + t.Logf("Is current admin: %t", isAdmin) + + // Test trigger election (this is the real available method) + em.TriggerElection(election.TriggerManual) + + // Test state after trigger + newState := em.GetElectionState() + t.Logf("State after trigger: %v", newState) + + t.Log("Election integration test completed successfully") +} + +func TestElectionIntegration_AdminFailover(t *testing.T) { + // Test admin failover scenarios using election triggers + ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) + defer cancel() + + cfg := &config.Config{ + Agent: config.AgentConfig{ + ID: "failover-test-node", + }, + Security: config.SecurityConfig{ + ElectionConfig: config.ElectionConfig{ + Enabled: true, + HeartbeatTimeout: 3 * time.Second, + ElectionTimeout: 6 * time.Second, + }, + }, + } + + em := election.NewElectionManager(ctx, cfg, nil, nil, "failover-test-node") + + // Test initial state + initialState := em.GetElectionState() + t.Logf("Initial state: %v", initialState) + + // Test heartbeat timeout trigger (simulates admin failure) + em.TriggerElection(election.TriggerHeartbeatTimeout) + + // Allow some time for state change + time.Sleep(100 * time.Millisecond) + + afterFailureState := em.GetElectionState() + t.Logf("State after heartbeat timeout: %v", afterFailureState) + + // Test split brain scenario + em.TriggerElection(election.TriggerSplitBrain) + + time.Sleep(100 * time.Millisecond) + + splitBrainState := em.GetElectionState() + t.Logf("State after split brain trigger: %v", splitBrainState) + + // Test quorum restoration + em.TriggerElection(election.TriggerQuorumRestored) + + time.Sleep(100 * time.Millisecond) + + finalState := em.GetElectionState() + t.Logf("State after quorum restored: %v", finalState) + + t.Log("Failover integration test completed") +} + +func TestElectionIntegration_ConcurrentElections(t *testing.T) { + // Test concurrent election triggers + ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second) + defer cancel() + + cfg1 := &config.Config{ + Agent: config.AgentConfig{ + ID: "concurrent-node-1", + }, + Security: config.SecurityConfig{ + ElectionConfig: config.ElectionConfig{ + Enabled: true, + HeartbeatTimeout: 4 * time.Second, + ElectionTimeout: 8 * time.Second, + }, + }, + } + + cfg2 := &config.Config{ + Agent: config.AgentConfig{ + ID: "concurrent-node-2", + }, + Security: config.SecurityConfig{ + ElectionConfig: config.ElectionConfig{ + Enabled: true, + HeartbeatTimeout: 4 * time.Second, + ElectionTimeout: 8 * time.Second, + }, + }, + } + + em1 := election.NewElectionManager(ctx, cfg1, nil, nil, "concurrent-node-1") + em2 := election.NewElectionManager(ctx, cfg2, nil, nil, "concurrent-node-2") + + // Trigger elections concurrently + go func() { + em1.TriggerElection(election.TriggerManual) + }() + + go func() { + em2.TriggerElection(election.TriggerManual) + }() + + // Wait for processing + time.Sleep(200 * time.Millisecond) + + // Check states + state1 := em1.GetElectionState() + state2 := em2.GetElectionState() + + t.Logf("Node 1 state: %v", state1) + t.Logf("Node 2 state: %v", state2) + + // Both should be handling elections + if state1 == election.StateIdle && state2 == election.StateIdle { + t.Error("Expected at least one election manager to be in non-idle state") + } + + t.Log("Concurrent elections test completed") +} + +func TestElectionIntegration_ElectionCallbacks(t *testing.T) { + // Test election callback system + ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() + + cfg := &config.Config{ + Agent: config.AgentConfig{ + ID: "callback-test-node", + }, + Security: config.SecurityConfig{ + ElectionConfig: config.ElectionConfig{ + Enabled: true, + HeartbeatTimeout: 5 * time.Second, + ElectionTimeout: 10 * time.Second, + }, + }, + } + + em := election.NewElectionManager(ctx, cfg, nil, nil, "callback-test-node") + + // Track callback invocations + var adminChangedCalled bool + var electionCompleteCalled bool + var oldAdmin, newAdmin, winner string + + // Set up callbacks + em.SetCallbacks( + func(old, new string) { + adminChangedCalled = true + oldAdmin = old + newAdmin = new + t.Logf("Admin changed callback: %s -> %s", old, new) + }, + func(w string) { + electionCompleteCalled = true + winner = w + t.Logf("Election complete callback: winner %s", w) + }, + ) + + // Trigger election + em.TriggerElection(election.TriggerManual) + + // Give time for potential callback execution + time.Sleep(200 * time.Millisecond) + + // Check state changes + currentState := em.GetElectionState() + t.Logf("Current election state: %v", currentState) + + isAdmin := em.IsCurrentAdmin() + t.Logf("Is current admin: %t", isAdmin) + + currentAdminID := em.GetCurrentAdmin() + t.Logf("Current admin ID: %s", currentAdminID) + + // Log callback results + t.Logf("Admin changed callback called: %t", adminChangedCalled) + t.Logf("Election complete callback called: %t", electionCompleteCalled) + + if adminChangedCalled { + t.Logf("Admin change: %s -> %s", oldAdmin, newAdmin) + } + + if electionCompleteCalled { + t.Logf("Election winner: %s", winner) + } + + t.Log("Election callback integration test completed") +} \ No newline at end of file diff --git a/main.go b/main.go index 66990585..23e34e3e 100644 --- a/main.go +++ b/main.go @@ -21,9 +21,8 @@ import ( "github.com/anthonyrawlins/bzzz/p2p" "github.com/anthonyrawlins/bzzz/pkg/config" "github.com/anthonyrawlins/bzzz/pkg/crypto" - "github.com/anthonyrawlins/bzzz/pkg/dht" - "github.com/anthonyrawlins/bzzz/pkg/election" - "github.com/anthonyrawlins/bzzz/pkg/hive" + "github.com/anthonyrawlins/bzzz/pkg/health" + "github.com/anthonyrawlins/bzzz/pkg/shutdown" "github.com/anthonyrawlins/bzzz/pkg/ucxi" "github.com/anthonyrawlins/bzzz/pkg/ucxl" "github.com/anthonyrawlins/bzzz/pubsub" @@ -165,7 +164,7 @@ func main() { } } - fmt.Printf("🐝 Hive API: %s\n", cfg.HiveAPI.BaseURL) + fmt.Printf("🐝 WHOOSH API: %s\n", cfg.HiveAPI.BaseURL) fmt.Printf("🔗 Listening addresses:\n") for _, addr := range node.Addresses() { fmt.Printf(" %s/p2p/%s\n", addr, node.ID()) @@ -347,22 +346,11 @@ func main() { }() // =========================================== - // === Hive & Task Coordination Integration === - // Initialize Hive API client - hiveClient := hive.NewHiveClient(cfg.HiveAPI.BaseURL, cfg.HiveAPI.APIKey) - - // Test Hive connectivity - if err := hiveClient.HealthCheck(ctx); err != nil { - fmt.Printf("⚠️ Hive API not accessible: %v\n", err) - fmt.Printf("🔧 Continuing in standalone mode\n") - } else { - fmt.Printf("✅ Hive API connected\n") - } - + // === Task Coordination Integration === // Initialize Task Coordinator taskCoordinator := coordinator.NewTaskCoordinator( ctx, - hiveClient, + nil, // No WHOOSH client ps, hlog, cfg, @@ -458,12 +446,254 @@ func main() { fmt.Printf("📡 Ready for task coordination and meta-discussion\n") fmt.Printf("🎯 HMMM collaborative reasoning enabled\n") - // Handle graceful shutdown - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - <-c + // === Comprehensive Health Monitoring & Graceful Shutdown === + // Initialize shutdown manager + shutdownManager := shutdown.NewManager(30*time.Second, &simpleLogger{}) + + // Initialize health manager + healthManager := health.NewManager(node.ID().ShortString(), "v0.2.0", &simpleLogger{}) + healthManager.SetShutdownManager(shutdownManager) + + // Register health checks + setupHealthChecks(healthManager, ps, node, dhtNode) + + // Register components for graceful shutdown + setupGracefulShutdown(shutdownManager, healthManager, node, ps, mdnsDiscovery, + electionManagers, httpServer, ucxiServer, taskCoordinator, dhtNode) + + // Start health monitoring + if err := healthManager.Start(); err != nil { + log.Printf("❌ Failed to start health manager: %v", err) + } else { + fmt.Printf("❤️ Health monitoring started\n") + } + + // Start health HTTP server on port 8081 + if err := healthManager.StartHTTPServer(8081); err != nil { + log.Printf("❌ Failed to start health HTTP server: %v", err) + } else { + fmt.Printf("🏥 Health endpoints available at http://localhost:8081/health\n") + } + + // Start shutdown manager (begins listening for signals) + shutdownManager.Start() + fmt.Printf("🛡️ Graceful shutdown manager started\n") + + fmt.Printf("✅ Bzzz system fully operational with health monitoring\n") + + // Wait for graceful shutdown + shutdownManager.Wait() + fmt.Println("✅ Bzzz system shutdown completed") +} - fmt.Println("\n🛑 Shutting down Bzzz node...") +// setupHealthChecks configures comprehensive health monitoring +func setupHealthChecks(healthManager *health.Manager, ps *pubsub.PubSub, node *p2p.Node, dhtNode *kadht.IpfsDHT) { + // P2P connectivity check (critical) + p2pCheck := &health.HealthCheck{ + Name: "p2p-connectivity", + Description: "P2P network connectivity and peer count", + Enabled: true, + Critical: true, + Interval: 15 * time.Second, + Timeout: 10 * time.Second, + Checker: func(ctx context.Context) health.CheckResult { + connectedPeers := node.ConnectedPeers() + minPeers := 1 + + if connectedPeers < minPeers { + return health.CheckResult{ + Healthy: false, + Message: fmt.Sprintf("Insufficient P2P peers: %d < %d", connectedPeers, minPeers), + Details: map[string]interface{}{ + "connected_peers": connectedPeers, + "min_peers": minPeers, + "node_id": node.ID().ShortString(), + }, + Timestamp: time.Now(), + } + } + + return health.CheckResult{ + Healthy: true, + Message: fmt.Sprintf("P2P connectivity OK: %d peers connected", connectedPeers), + Details: map[string]interface{}{ + "connected_peers": connectedPeers, + "min_peers": minPeers, + "node_id": node.ID().ShortString(), + }, + Timestamp: time.Now(), + } + }, + } + healthManager.RegisterCheck(p2pCheck) + + // PubSub system check + pubsubCheck := &health.HealthCheck{ + Name: "pubsub-system", + Description: "PubSub messaging system health", + Enabled: true, + Critical: false, + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + Checker: func(ctx context.Context) health.CheckResult { + // Simple health check - in real implementation, test actual pub/sub + return health.CheckResult{ + Healthy: true, + Message: "PubSub system operational", + Timestamp: time.Now(), + } + }, + } + healthManager.RegisterCheck(pubsubCheck) + + // DHT system check (if DHT is enabled) + if dhtNode != nil { + dhtCheck := &health.HealthCheck{ + Name: "dht-system", + Description: "Distributed Hash Table system health", + Enabled: true, + Critical: false, + Interval: 60 * time.Second, + Timeout: 15 * time.Second, + Checker: func(ctx context.Context) health.CheckResult { + // In a real implementation, you would test DHT operations + return health.CheckResult{ + Healthy: true, + Message: "DHT system operational", + Details: map[string]interface{}{ + "dht_enabled": true, + }, + Timestamp: time.Now(), + } + }, + } + healthManager.RegisterCheck(dhtCheck) + } + + // Memory usage check + memoryCheck := health.CreateMemoryCheck(0.85) // Alert if > 85% + healthManager.RegisterCheck(memoryCheck) + + // Disk space check + diskCheck := health.CreateDiskSpaceCheck("/tmp", 0.90) // Alert if > 90% + healthManager.RegisterCheck(diskCheck) +} + +// setupGracefulShutdown registers all components for proper shutdown +func setupGracefulShutdown(shutdownManager *shutdown.Manager, healthManager *health.Manager, + node *p2p.Node, ps *pubsub.PubSub, mdnsDiscovery interface{}, electionManagers interface{}, + httpServer *api.HTTPServer, ucxiServer *ucxi.Server, taskCoordinator interface{}, dhtNode *kadht.IpfsDHT) { + + // Health manager (stop health checks early) + healthComponent := shutdown.NewGenericComponent("health-manager", 10, true). + SetShutdownFunc(func(ctx context.Context) error { + return healthManager.Stop() + }) + shutdownManager.Register(healthComponent) + + // HTTP servers + if httpServer != nil { + httpComponent := shutdown.NewGenericComponent("main-http-server", 20, true). + SetShutdownFunc(func(ctx context.Context) error { + return httpServer.Stop() + }) + shutdownManager.Register(httpComponent) + } + + if ucxiServer != nil { + ucxiComponent := shutdown.NewGenericComponent("ucxi-server", 21, true). + SetShutdownFunc(func(ctx context.Context) error { + ucxiServer.Stop() + return nil + }) + shutdownManager.Register(ucxiComponent) + } + + // Task coordination system + if taskCoordinator != nil { + taskComponent := shutdown.NewGenericComponent("task-coordinator", 30, true). + SetCloser(func() error { + // In real implementation, gracefully stop task coordinator + return nil + }) + shutdownManager.Register(taskComponent) + } + + // DHT system + if dhtNode != nil { + dhtComponent := shutdown.NewGenericComponent("dht-node", 35, true). + SetCloser(func() error { + return dhtNode.Close() + }) + shutdownManager.Register(dhtComponent) + } + + // PubSub system + if ps != nil { + pubsubComponent := shutdown.NewGenericComponent("pubsub-system", 40, true). + SetCloser(func() error { + return ps.Close() + }) + shutdownManager.Register(pubsubComponent) + } + + // mDNS discovery + if mdnsDiscovery != nil { + mdnsComponent := shutdown.NewGenericComponent("mdns-discovery", 50, true). + SetCloser(func() error { + // In real implementation, close mDNS discovery properly + return nil + }) + shutdownManager.Register(mdnsComponent) + } + + // P2P node (close last as other components depend on it) + p2pComponent := shutdown.NewP2PNodeComponent("p2p-node", func() error { + return node.Close() + }, 60) + shutdownManager.Register(p2pComponent) + + // Add shutdown hooks + setupShutdownHooks(shutdownManager) +} + +// setupShutdownHooks adds hooks for different shutdown phases +func setupShutdownHooks(shutdownManager *shutdown.Manager) { + // Pre-shutdown: Save state and notify peers + shutdownManager.AddHook(shutdown.PhasePreShutdown, func(ctx context.Context) error { + fmt.Println("🔄 Pre-shutdown: Notifying peers and saving state...") + // In real implementation: notify peers, save critical state + return nil + }) + + // Post-shutdown: Final cleanup + shutdownManager.AddHook(shutdown.PhasePostShutdown, func(ctx context.Context) error { + fmt.Println("🔄 Post-shutdown: Performing final cleanup...") + // In real implementation: flush logs, clean temporary files + return nil + }) + + // Cleanup: Final state persistence + shutdownManager.AddHook(shutdown.PhaseCleanup, func(ctx context.Context) error { + fmt.Println("🔄 Cleanup: Finalizing shutdown...") + // In real implementation: persist final state, cleanup resources + return nil + }) +} + +// simpleLogger implements basic logging for shutdown and health systems +type simpleLogger struct{} + +func (l *simpleLogger) Info(msg string, args ...interface{}) { + fmt.Printf("[INFO] "+msg+"\n", args...) +} + +func (l *simpleLogger) Warn(msg string, args ...interface{}) { + fmt.Printf("[WARN] "+msg+"\n", args...) +} + +func (l *simpleLogger) Error(msg string, args ...interface{}) { + fmt.Printf("[ERROR] "+msg+"\n", args...) } // announceAvailability broadcasts current working status for task assignment diff --git a/pkg/health/integration_example.go b/pkg/health/integration_example.go new file mode 100644 index 00000000..723fcc0b --- /dev/null +++ b/pkg/health/integration_example.go @@ -0,0 +1,307 @@ +package health + +import ( + "context" + "fmt" + "net/http" + "time" + + "github.com/anthonyrawlins/bzzz/pkg/shutdown" +) + +// IntegrationExample demonstrates how to integrate health monitoring and graceful shutdown +func IntegrationExample() { + // Create logger (in real implementation, use your logging system) + logger := &defaultLogger{} + + // Create shutdown manager + shutdownManager := shutdown.NewManager(30*time.Second, logger) + + // Create health manager + healthManager := NewManager("node-123", "v1.0.0", logger) + + // Connect health manager to shutdown manager for critical failures + healthManager.SetShutdownManager(shutdownManager) + + // Register some example health checks + setupHealthChecks(healthManager) + + // Create and register components for graceful shutdown + setupShutdownComponents(shutdownManager, healthManager) + + // Start systems + if err := healthManager.Start(); err != nil { + logger.Error("Failed to start health manager: %v", err) + return + } + + // Start health HTTP server + if err := healthManager.StartHTTPServer(8081); err != nil { + logger.Error("Failed to start health HTTP server: %v", err) + return + } + + // Add shutdown hooks + setupShutdownHooks(shutdownManager, healthManager, logger) + + // Start shutdown manager (begins listening for signals) + shutdownManager.Start() + + logger.Info("🚀 System started with integrated health monitoring and graceful shutdown") + logger.Info("📊 Health endpoints available at:") + logger.Info(" - http://localhost:8081/health (overall health)") + logger.Info(" - http://localhost:8081/health/ready (readiness)") + logger.Info(" - http://localhost:8081/health/live (liveness)") + logger.Info(" - http://localhost:8081/health/checks (detailed checks)") + + // Wait for shutdown + shutdownManager.Wait() + logger.Info("✅ System shutdown completed") +} + +// setupHealthChecks registers various health checks +func setupHealthChecks(healthManager *Manager) { + // Database connectivity check (critical) + databaseCheck := CreateDatabaseCheck("primary-db", func() error { + // Simulate database ping + time.Sleep(10 * time.Millisecond) + // Return nil for healthy, error for unhealthy + return nil + }) + healthManager.RegisterCheck(databaseCheck) + + // Memory usage check (warning only) + memoryCheck := CreateMemoryCheck(0.85) // Alert if > 85% + healthManager.RegisterCheck(memoryCheck) + + // Disk space check (warning only) + diskCheck := CreateDiskSpaceCheck("/var/lib/bzzz", 0.90) // Alert if > 90% + healthManager.RegisterCheck(diskCheck) + + // Custom application-specific health check + customCheck := &HealthCheck{ + Name: "p2p-connectivity", + Description: "P2P network connectivity check", + Enabled: true, + Critical: true, // This is critical for P2P systems + Interval: 15 * time.Second, + Timeout: 10 * time.Second, + Checker: func(ctx context.Context) CheckResult { + // Simulate P2P connectivity check + time.Sleep(50 * time.Millisecond) + + // Simulate occasionally failing check + connected := time.Now().Unix()%10 != 0 // Fail 10% of the time + + if !connected { + return CheckResult{ + Healthy: false, + Message: "No P2P peers connected", + Details: map[string]interface{}{ + "connected_peers": 0, + "min_peers": 1, + }, + Timestamp: time.Now(), + } + } + + return CheckResult{ + Healthy: true, + Message: "P2P connectivity OK", + Details: map[string]interface{}{ + "connected_peers": 5, + "min_peers": 1, + }, + Timestamp: time.Now(), + } + }, + } + healthManager.RegisterCheck(customCheck) + + // Election system health check + electionCheck := &HealthCheck{ + Name: "election-system", + Description: "Election system health check", + Enabled: true, + Critical: false, // Elections can be temporarily unhealthy + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + Checker: func(ctx context.Context) CheckResult { + // Simulate election system check + healthy := true + message := "Election system operational" + + return CheckResult{ + Healthy: healthy, + Message: message, + Details: map[string]interface{}{ + "current_admin": "node-456", + "election_term": 42, + "last_election": time.Now().Add(-10 * time.Minute), + }, + Timestamp: time.Now(), + } + }, + } + healthManager.RegisterCheck(electionCheck) +} + +// setupShutdownComponents registers components for graceful shutdown +func setupShutdownComponents(shutdownManager *shutdown.Manager, healthManager *Manager) { + // Register health manager for shutdown (high priority to stop health checks early) + healthComponent := shutdown.NewGenericComponent("health-manager", 10, true). + SetShutdownFunc(func(ctx context.Context) error { + return healthManager.Stop() + }) + shutdownManager.Register(healthComponent) + + // Simulate HTTP server + httpServer := &http.Server{Addr: ":8080"} + httpComponent := shutdown.NewHTTPServerComponent("main-http-server", httpServer, 20) + shutdownManager.Register(httpComponent) + + // Simulate P2P node + p2pComponent := shutdown.NewP2PNodeComponent("p2p-node", func() error { + // Simulate P2P node cleanup + time.Sleep(2 * time.Second) + return nil + }, 30) + shutdownManager.Register(p2pComponent) + + // Simulate database connections + dbComponent := shutdown.NewDatabaseComponent("database-pool", func() error { + // Simulate database connection cleanup + time.Sleep(1 * time.Second) + return nil + }, 40) + shutdownManager.Register(dbComponent) + + // Simulate worker pool + workerStopCh := make(chan struct{}) + workerComponent := shutdown.NewWorkerPoolComponent("background-workers", workerStopCh, 5, 50) + shutdownManager.Register(workerComponent) + + // Simulate monitoring/metrics system + monitoringComponent := shutdown.NewMonitoringComponent("metrics-system", func() error { + // Simulate metrics system cleanup + time.Sleep(500 * time.Millisecond) + return nil + }, 60) + shutdownManager.Register(monitoringComponent) +} + +// setupShutdownHooks adds hooks for different shutdown phases +func setupShutdownHooks(shutdownManager *shutdown.Manager, healthManager *Manager, logger shutdown.Logger) { + // Pre-shutdown hook: Mark system as stopping + shutdownManager.AddHook(shutdown.PhasePreShutdown, func(ctx context.Context) error { + logger.Info("🔄 Pre-shutdown: Marking system as stopping") + + // Update health status to stopping + status := healthManager.GetStatus() + status.Status = StatusStopping + status.Message = "System is shutting down" + + return nil + }) + + // Shutdown hook: Log progress + shutdownManager.AddHook(shutdown.PhaseShutdown, func(ctx context.Context) error { + logger.Info("🔄 Shutdown phase: Components are being shut down") + return nil + }) + + // Post-shutdown hook: Final health status update and cleanup + shutdownManager.AddHook(shutdown.PhasePostShutdown, func(ctx context.Context) error { + logger.Info("🔄 Post-shutdown: Performing final cleanup") + + // Any final cleanup that needs to happen after components are shut down + return nil + }) + + // Cleanup hook: Final logging and state persistence + shutdownManager.AddHook(shutdown.PhaseCleanup, func(ctx context.Context) error { + logger.Info("🔄 Cleanup: Finalizing shutdown process") + + // Save any final state, flush logs, etc. + return nil + }) +} + +// HealthAwareComponent is an example of how to create components that integrate with health monitoring +type HealthAwareComponent struct { + name string + healthManager *Manager + checkName string + isRunning bool + stopCh chan struct{} +} + +// NewHealthAwareComponent creates a component that registers its own health check +func NewHealthAwareComponent(name string, healthManager *Manager) *HealthAwareComponent { + comp := &HealthAwareComponent{ + name: name, + healthManager: healthManager, + checkName: fmt.Sprintf("%s-health", name), + stopCh: make(chan struct{}), + } + + // Register health check for this component + healthCheck := &HealthCheck{ + Name: comp.checkName, + Description: fmt.Sprintf("Health check for %s component", name), + Enabled: true, + Critical: false, + Interval: 30 * time.Second, + Timeout: 10 * time.Second, + Checker: func(ctx context.Context) CheckResult { + if comp.isRunning { + return CheckResult{ + Healthy: true, + Message: fmt.Sprintf("%s is running normally", comp.name), + Timestamp: time.Now(), + } + } + + return CheckResult{ + Healthy: false, + Message: fmt.Sprintf("%s is not running", comp.name), + Timestamp: time.Now(), + } + }, + } + + healthManager.RegisterCheck(healthCheck) + return comp +} + +// Start starts the component +func (c *HealthAwareComponent) Start() error { + c.isRunning = true + return nil +} + +// Name returns the component name +func (c *HealthAwareComponent) Name() string { + return c.name +} + +// Priority returns the shutdown priority +func (c *HealthAwareComponent) Priority() int { + return 50 +} + +// CanForceStop returns whether the component can be force-stopped +func (c *HealthAwareComponent) CanForceStop() bool { + return true +} + +// Shutdown gracefully shuts down the component +func (c *HealthAwareComponent) Shutdown(ctx context.Context) error { + c.isRunning = false + close(c.stopCh) + + // Unregister health check + c.healthManager.UnregisterCheck(c.checkName) + + return nil +} \ No newline at end of file diff --git a/pkg/health/manager.go b/pkg/health/manager.go new file mode 100644 index 00000000..17b1b900 --- /dev/null +++ b/pkg/health/manager.go @@ -0,0 +1,529 @@ +package health + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "sync" + "time" + + "github.com/anthonyrawlins/bzzz/pkg/shutdown" +) + +// Manager provides comprehensive health monitoring and integrates with graceful shutdown +type Manager struct { + mu sync.RWMutex + checks map[string]*HealthCheck + status *SystemStatus + httpServer *http.Server + shutdownManager *shutdown.Manager + ticker *time.Ticker + stopCh chan struct{} + logger Logger +} + +// HealthCheck represents a single health check +type HealthCheck struct { + Name string `json:"name"` + Description string `json:"description"` + Checker func(ctx context.Context) CheckResult `json:"-"` + Interval time.Duration `json:"interval"` + Timeout time.Duration `json:"timeout"` + Enabled bool `json:"enabled"` + Critical bool `json:"critical"` // If true, failure triggers shutdown + LastRun time.Time `json:"last_run"` + LastResult *CheckResult `json:"last_result,omitempty"` +} + +// CheckResult represents the result of a health check +type CheckResult struct { + Healthy bool `json:"healthy"` + Message string `json:"message"` + Details map[string]interface{} `json:"details,omitempty"` + Latency time.Duration `json:"latency"` + Timestamp time.Time `json:"timestamp"` + Error error `json:"error,omitempty"` +} + +// SystemStatus represents the overall system health status +type SystemStatus struct { + Status Status `json:"status"` + Message string `json:"message"` + Checks map[string]*CheckResult `json:"checks"` + Uptime time.Duration `json:"uptime"` + StartTime time.Time `json:"start_time"` + LastUpdate time.Time `json:"last_update"` + Version string `json:"version"` + NodeID string `json:"node_id"` +} + +// Status represents health status levels +type Status string + +const ( + StatusHealthy Status = "healthy" + StatusDegraded Status = "degraded" + StatusUnhealthy Status = "unhealthy" + StatusStarting Status = "starting" + StatusStopping Status = "stopping" +) + +// Logger interface for health monitoring +type Logger interface { + Info(msg string, args ...interface{}) + Warn(msg string, args ...interface{}) + Error(msg string, args ...interface{}) +} + +// NewManager creates a new health manager +func NewManager(nodeID, version string, logger Logger) *Manager { + if logger == nil { + logger = &defaultLogger{} + } + + return &Manager{ + checks: make(map[string]*HealthCheck), + status: &SystemStatus{ + Status: StatusStarting, + Message: "System starting up", + Checks: make(map[string]*CheckResult), + StartTime: time.Now(), + Version: version, + NodeID: nodeID, + }, + stopCh: make(chan struct{}), + logger: logger, + } +} + +// RegisterCheck adds a new health check +func (m *Manager) RegisterCheck(check *HealthCheck) { + m.mu.Lock() + defer m.mu.Unlock() + + if check.Timeout == 0 { + check.Timeout = 10 * time.Second + } + if check.Interval == 0 { + check.Interval = 30 * time.Second + } + + m.checks[check.Name] = check + m.logger.Info("Registered health check: %s (critical: %t, interval: %v)", + check.Name, check.Critical, check.Interval) +} + +// UnregisterCheck removes a health check +func (m *Manager) UnregisterCheck(name string) { + m.mu.Lock() + defer m.mu.Unlock() + + delete(m.checks, name) + delete(m.status.Checks, name) + m.logger.Info("Unregistered health check: %s", name) +} + +// Start begins health monitoring +func (m *Manager) Start() error { + m.mu.Lock() + defer m.mu.Unlock() + + // Start health check loop + m.ticker = time.NewTicker(5 * time.Second) // Check every 5 seconds + go m.healthCheckLoop() + + // Update status to healthy (assuming no critical checks fail immediately) + m.status.Status = StatusHealthy + m.status.Message = "System operational" + + m.logger.Info("Health monitoring started") + return nil +} + +// Stop stops health monitoring +func (m *Manager) Stop() error { + m.mu.Lock() + defer m.mu.Unlock() + + close(m.stopCh) + if m.ticker != nil { + m.ticker.Stop() + } + + m.status.Status = StatusStopping + m.status.Message = "System shutting down" + + m.logger.Info("Health monitoring stopped") + return nil +} + +// StartHTTPServer starts an HTTP server for health endpoints +func (m *Manager) StartHTTPServer(port int) error { + mux := http.NewServeMux() + + // Health check endpoint + mux.HandleFunc("/health", m.handleHealth) + mux.HandleFunc("/health/ready", m.handleReady) + mux.HandleFunc("/health/live", m.handleLive) + mux.HandleFunc("/health/checks", m.handleChecks) + + m.httpServer = &http.Server{ + Addr: fmt.Sprintf(":%d", port), + Handler: mux, + } + + go func() { + if err := m.httpServer.ListenAndServe(); err != nil && err != http.ErrServerClosed { + m.logger.Error("Health HTTP server error: %v", err) + } + }() + + m.logger.Info("Health HTTP server started on port %d", port) + return nil +} + +// SetShutdownManager sets the shutdown manager for critical health failures +func (m *Manager) SetShutdownManager(shutdownManager *shutdown.Manager) { + m.shutdownManager = shutdownManager +} + +// GetStatus returns the current system status +func (m *Manager) GetStatus() *SystemStatus { + m.mu.RLock() + defer m.mu.RUnlock() + + // Create a copy to avoid race conditions + status := *m.status + status.Uptime = time.Since(m.status.StartTime) + status.LastUpdate = time.Now() + + // Copy checks + status.Checks = make(map[string]*CheckResult) + for name, result := range m.status.Checks { + if result != nil { + resultCopy := *result + status.Checks[name] = &resultCopy + } + } + + return &status +} + +// healthCheckLoop runs health checks periodically +func (m *Manager) healthCheckLoop() { + defer m.ticker.Stop() + + for { + select { + case <-m.ticker.C: + m.runHealthChecks() + case <-m.stopCh: + return + } + } +} + +// runHealthChecks executes all registered health checks +func (m *Manager) runHealthChecks() { + m.mu.RLock() + checks := make([]*HealthCheck, 0, len(m.checks)) + for _, check := range m.checks { + if check.Enabled && time.Since(check.LastRun) >= check.Interval { + checks = append(checks, check) + } + } + m.mu.RUnlock() + + if len(checks) == 0 { + return + } + + for _, check := range checks { + go m.executeHealthCheck(check) + } +} + +// executeHealthCheck runs a single health check +func (m *Manager) executeHealthCheck(check *HealthCheck) { + ctx, cancel := context.WithTimeout(context.Background(), check.Timeout) + defer cancel() + + start := time.Now() + result := check.Checker(ctx) + result.Latency = time.Since(start) + result.Timestamp = time.Now() + + m.mu.Lock() + check.LastRun = time.Now() + check.LastResult = &result + m.status.Checks[check.Name] = &result + m.mu.Unlock() + + // Log health check results + if result.Healthy { + m.logger.Info("Health check passed: %s (latency: %v)", check.Name, result.Latency) + } else { + m.logger.Warn("Health check failed: %s - %s (latency: %v)", + check.Name, result.Message, result.Latency) + + // If this is a critical check and it failed, consider shutdown + if check.Critical && m.shutdownManager != nil { + m.logger.Error("Critical health check failed: %s - initiating graceful shutdown", check.Name) + m.shutdownManager.Stop() + } + } + + // Update overall system status + m.updateSystemStatus() +} + +// updateSystemStatus recalculates the overall system status +func (m *Manager) updateSystemStatus() { + m.mu.Lock() + defer m.mu.Unlock() + + var healthyChecks, totalChecks, criticalFailures int + + for _, result := range m.status.Checks { + totalChecks++ + if result.Healthy { + healthyChecks++ + } else { + // Check if this is a critical check + if check, exists := m.checks[result.Timestamp.String()]; exists && check.Critical { + criticalFailures++ + } + } + } + + // Determine overall status + if criticalFailures > 0 { + m.status.Status = StatusUnhealthy + m.status.Message = fmt.Sprintf("Critical health checks failing (%d)", criticalFailures) + } else if totalChecks == 0 { + m.status.Status = StatusStarting + m.status.Message = "No health checks configured" + } else if healthyChecks == totalChecks { + m.status.Status = StatusHealthy + m.status.Message = "All health checks passing" + } else { + m.status.Status = StatusDegraded + m.status.Message = fmt.Sprintf("Some health checks failing (%d/%d healthy)", + healthyChecks, totalChecks) + } +} + +// HTTP Handlers + +func (m *Manager) handleHealth(w http.ResponseWriter, r *http.Request) { + status := m.GetStatus() + + w.Header().Set("Content-Type", "application/json") + + // Set HTTP status code based on health + switch status.Status { + case StatusHealthy: + w.WriteHeader(http.StatusOK) + case StatusDegraded: + w.WriteHeader(http.StatusOK) // Still OK, but degraded + case StatusUnhealthy: + w.WriteHeader(http.StatusServiceUnavailable) + case StatusStarting: + w.WriteHeader(http.StatusServiceUnavailable) + case StatusStopping: + w.WriteHeader(http.StatusServiceUnavailable) + } + + json.NewEncoder(w).Encode(status) +} + +func (m *Manager) handleReady(w http.ResponseWriter, r *http.Request) { + status := m.GetStatus() + + w.Header().Set("Content-Type", "application/json") + + // Ready means we can handle requests + if status.Status == StatusHealthy || status.Status == StatusDegraded { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "ready": true, + "status": status.Status, + "message": status.Message, + }) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(map[string]interface{}{ + "ready": false, + "status": status.Status, + "message": status.Message, + }) + } +} + +func (m *Manager) handleLive(w http.ResponseWriter, r *http.Request) { + status := m.GetStatus() + + w.Header().Set("Content-Type", "application/json") + + // Live means the process is running (not necessarily healthy) + if status.Status != StatusStopping { + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]interface{}{ + "live": true, + "status": status.Status, + "uptime": status.Uptime.String(), + }) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + json.NewEncoder(w).Encode(map[string]interface{}{ + "live": false, + "status": status.Status, + "message": "System is shutting down", + }) + } +} + +func (m *Manager) handleChecks(w http.ResponseWriter, r *http.Request) { + status := m.GetStatus() + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + + json.NewEncoder(w).Encode(map[string]interface{}{ + "checks": status.Checks, + "total": len(status.Checks), + "timestamp": time.Now(), + }) +} + +// Predefined health checks + +// CreateDatabaseCheck creates a health check for database connectivity +func CreateDatabaseCheck(name string, pingFunc func() error) *HealthCheck { + return &HealthCheck{ + Name: name, + Description: fmt.Sprintf("Database connectivity check for %s", name), + Enabled: true, + Critical: true, + Interval: 30 * time.Second, + Timeout: 10 * time.Second, + Checker: func(ctx context.Context) CheckResult { + start := time.Now() + err := pingFunc() + + if err != nil { + return CheckResult{ + Healthy: false, + Message: fmt.Sprintf("Database ping failed: %v", err), + Error: err, + Timestamp: time.Now(), + Latency: time.Since(start), + } + } + + return CheckResult{ + Healthy: true, + Message: "Database connectivity OK", + Timestamp: time.Now(), + Latency: time.Since(start), + } + }, + } +} + +// CreateDiskSpaceCheck creates a health check for disk space +func CreateDiskSpaceCheck(path string, threshold float64) *HealthCheck { + return &HealthCheck{ + Name: fmt.Sprintf("disk-space-%s", path), + Description: fmt.Sprintf("Disk space check for %s (threshold: %.1f%%)", path, threshold*100), + Enabled: true, + Critical: false, + Interval: 60 * time.Second, + Timeout: 5 * time.Second, + Checker: func(ctx context.Context) CheckResult { + // In a real implementation, you would check actual disk usage + // For now, we'll simulate it + usage := 0.75 // Simulate 75% usage + + if usage > threshold { + return CheckResult{ + Healthy: false, + Message: fmt.Sprintf("Disk usage %.1f%% exceeds threshold %.1f%%", + usage*100, threshold*100), + Details: map[string]interface{}{ + "path": path, + "usage": usage, + "threshold": threshold, + }, + Timestamp: time.Now(), + } + } + + return CheckResult{ + Healthy: true, + Message: fmt.Sprintf("Disk usage %.1f%% is within threshold", usage*100), + Details: map[string]interface{}{ + "path": path, + "usage": usage, + "threshold": threshold, + }, + Timestamp: time.Now(), + } + }, + } +} + +// CreateMemoryCheck creates a health check for memory usage +func CreateMemoryCheck(threshold float64) *HealthCheck { + return &HealthCheck{ + Name: "memory-usage", + Description: fmt.Sprintf("Memory usage check (threshold: %.1f%%)", threshold*100), + Enabled: true, + Critical: false, + Interval: 30 * time.Second, + Timeout: 5 * time.Second, + Checker: func(ctx context.Context) CheckResult { + // In a real implementation, you would check actual memory usage + usage := 0.60 // Simulate 60% usage + + if usage > threshold { + return CheckResult{ + Healthy: false, + Message: fmt.Sprintf("Memory usage %.1f%% exceeds threshold %.1f%%", + usage*100, threshold*100), + Details: map[string]interface{}{ + "usage": usage, + "threshold": threshold, + }, + Timestamp: time.Now(), + } + } + + return CheckResult{ + Healthy: true, + Message: fmt.Sprintf("Memory usage %.1f%% is within threshold", usage*100), + Details: map[string]interface{}{ + "usage": usage, + "threshold": threshold, + }, + Timestamp: time.Now(), + } + }, + } +} + +// defaultLogger is a simple logger implementation +type defaultLogger struct{} + +func (l *defaultLogger) Info(msg string, args ...interface{}) { + fmt.Printf("[INFO] "+msg+"\n", args...) +} + +func (l *defaultLogger) Warn(msg string, args ...interface{}) { + fmt.Printf("[WARN] "+msg+"\n", args...) +} + +func (l *defaultLogger) Error(msg string, args ...interface{}) { + fmt.Printf("[ERROR] "+msg+"\n", args...) +} \ No newline at end of file diff --git a/pkg/hive/client.go b/pkg/hive/client.go deleted file mode 100644 index fd7cf8b5..00000000 --- a/pkg/hive/client.go +++ /dev/null @@ -1,317 +0,0 @@ -package hive - -import ( - "bytes" - "context" - "encoding/json" - "fmt" - "io" - "net/http" - "time" -) - -// HiveClient provides integration with the Hive task coordination system -type HiveClient struct { - BaseURL string - APIKey string - HTTPClient *http.Client -} - -// NewHiveClient creates a new Hive API client -func NewHiveClient(baseURL, apiKey string) *HiveClient { - return &HiveClient{ - BaseURL: baseURL, - APIKey: apiKey, - HTTPClient: &http.Client{ - Timeout: 30 * time.Second, - }, - } -} - -// Repository represents a Git repository configuration from Hive -type Repository struct { - ProjectID int `json:"project_id"` - Name string `json:"name"` - GitURL string `json:"git_url"` - Owner string `json:"owner"` - Repository string `json:"repository"` - Branch string `json:"branch"` - BzzzEnabled bool `json:"bzzz_enabled"` - ReadyToClaim bool `json:"ready_to_claim"` - PrivateRepo bool `json:"private_repo"` - GitHubTokenRequired bool `json:"github_token_required"` -} - -// MonitoredRepository represents a repository being monitored for tasks -type MonitoredRepository struct { - ID int `json:"id"` - Name string `json:"name"` - Description string `json:"description"` - Provider string `json:"provider"` // github, gitea - ProviderBaseURL string `json:"provider_base_url"` - GitOwner string `json:"git_owner"` - GitRepository string `json:"git_repository"` - GitBranch string `json:"git_branch"` - BzzzEnabled bool `json:"bzzz_enabled"` - AutoAssignment bool `json:"auto_assignment"` - AccessToken string `json:"access_token,omitempty"` - SSHPort int `json:"ssh_port,omitempty"` -} - -// ActiveRepositoriesResponse represents the response from /api/bzzz/active-repos -type ActiveRepositoriesResponse struct { - Repositories []Repository `json:"repositories"` -} - -// TaskClaimRequest represents a task claim request to Hive -type TaskClaimRequest struct { - TaskNumber int `json:"task_number"` - AgentID string `json:"agent_id"` - ClaimedAt int64 `json:"claimed_at"` -} - -// TaskStatusUpdate represents a task status update to Hive -type TaskStatusUpdate struct { - Status string `json:"status"` - UpdatedAt int64 `json:"updated_at"` - Results map[string]interface{} `json:"results,omitempty"` -} - -// GetActiveRepositories fetches all repositories marked for Bzzz consumption -func (c *HiveClient) GetActiveRepositories(ctx context.Context) ([]Repository, error) { - url := fmt.Sprintf("%s/api/bzzz/active-repos", c.BaseURL) - - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - // Add authentication if API key is provided - if c.APIKey != "" { - req.Header.Set("Authorization", "Bearer "+c.APIKey) - } - req.Header.Set("Content-Type", "application/json") - - resp, err := c.HTTPClient.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to execute request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) - } - - var response ActiveRepositoriesResponse - if err := json.NewDecoder(resp.Body).Decode(&response); err != nil { - return nil, fmt.Errorf("failed to decode response: %w", err) - } - - return response.Repositories, nil -} - -// GetProjectTasks fetches bzzz-task labeled issues for a specific project -func (c *HiveClient) GetProjectTasks(ctx context.Context, projectID int) ([]map[string]interface{}, error) { - url := fmt.Sprintf("%s/api/bzzz/projects/%d/tasks", c.BaseURL, projectID) - - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - if c.APIKey != "" { - req.Header.Set("Authorization", "Bearer "+c.APIKey) - } - req.Header.Set("Content-Type", "application/json") - - resp, err := c.HTTPClient.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to execute request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) - } - - var tasks []map[string]interface{} - if err := json.NewDecoder(resp.Body).Decode(&tasks); err != nil { - return nil, fmt.Errorf("failed to decode response: %w", err) - } - - return tasks, nil -} - -// ClaimTask registers a task claim with the Hive system -func (c *HiveClient) ClaimTask(ctx context.Context, projectID, taskID int, agentID string) error { - url := fmt.Sprintf("%s/api/bzzz/projects/%d/claim", c.BaseURL, projectID) - - claimRequest := TaskClaimRequest{ - TaskNumber: taskID, - AgentID: agentID, - ClaimedAt: time.Now().Unix(), - } - - jsonData, err := json.Marshal(claimRequest) - if err != nil { - return fmt.Errorf("failed to marshal claim request: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(jsonData)) - if err != nil { - return fmt.Errorf("failed to create request: %w", err) - } - - if c.APIKey != "" { - req.Header.Set("Authorization", "Bearer "+c.APIKey) - } - req.Header.Set("Content-Type", "application/json") - - resp, err := c.HTTPClient.Do(req) - if err != nil { - return fmt.Errorf("failed to execute request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("claim request failed with status %d: %s", resp.StatusCode, string(body)) - } - - return nil -} - -// UpdateTaskStatus updates the task status in the Hive system -func (c *HiveClient) UpdateTaskStatus(ctx context.Context, projectID, taskID int, status string, results map[string]interface{}) error { - url := fmt.Sprintf("%s/api/bzzz/projects/%d/status", c.BaseURL, projectID) - - statusUpdate := TaskStatusUpdate{ - Status: status, - UpdatedAt: time.Now().Unix(), - Results: results, - } - - jsonData, err := json.Marshal(statusUpdate) - if err != nil { - return fmt.Errorf("failed to marshal status update: %w", err) - } - - req, err := http.NewRequestWithContext(ctx, "PUT", url, bytes.NewBuffer(jsonData)) - if err != nil { - return fmt.Errorf("failed to create request: %w", err) - } - - if c.APIKey != "" { - req.Header.Set("Authorization", "Bearer "+c.APIKey) - } - req.Header.Set("Content-Type", "application/json") - - resp, err := c.HTTPClient.Do(req) - if err != nil { - return fmt.Errorf("failed to execute request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return fmt.Errorf("status update failed with status %d: %s", resp.StatusCode, string(body)) - } - - return nil -} - -// GetMonitoredRepositories fetches repositories configured for bzzz monitoring -func (c *HiveClient) GetMonitoredRepositories(ctx context.Context) ([]*MonitoredRepository, error) { - url := fmt.Sprintf("%s/api/repositories", c.BaseURL) - - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return nil, fmt.Errorf("failed to create request: %w", err) - } - - // Add authentication if API key is provided - if c.APIKey != "" { - req.Header.Set("Authorization", "Bearer "+c.APIKey) - } - req.Header.Set("Content-Type", "application/json") - - resp, err := c.HTTPClient.Do(req) - if err != nil { - return nil, fmt.Errorf("failed to execute request: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - body, _ := io.ReadAll(resp.Body) - return nil, fmt.Errorf("API request failed with status %d: %s", resp.StatusCode, string(body)) - } - - var repositories []struct { - ID int `json:"id"` - Name string `json:"name"` - Description string `json:"description"` - Provider string `json:"provider"` - ProviderBaseURL string `json:"provider_base_url"` - Owner string `json:"owner"` - Repository string `json:"repository"` - Branch string `json:"branch"` - BzzzEnabled bool `json:"bzzz_enabled"` - AutoAssignment bool `json:"auto_assignment"` - } - - if err := json.NewDecoder(resp.Body).Decode(&repositories); err != nil { - return nil, fmt.Errorf("failed to decode response: %w", err) - } - - // Convert to MonitoredRepository format - var monitoredRepos []*MonitoredRepository - for _, repo := range repositories { - if repo.BzzzEnabled { - monitoredRepo := &MonitoredRepository{ - ID: repo.ID, - Name: repo.Name, - Description: repo.Description, - Provider: repo.Provider, - ProviderBaseURL: repo.ProviderBaseURL, - GitOwner: repo.Owner, - GitRepository: repo.Repository, - GitBranch: repo.Branch, - BzzzEnabled: repo.BzzzEnabled, - AutoAssignment: repo.AutoAssignment, - } - - // Set SSH port for Gitea - if repo.Provider == "gitea" { - monitoredRepo.SSHPort = 2222 - } - - monitoredRepos = append(monitoredRepos, monitoredRepo) - } - } - - return monitoredRepos, nil -} - -// HealthCheck verifies connectivity to the Hive API -func (c *HiveClient) HealthCheck(ctx context.Context) error { - url := fmt.Sprintf("%s/health", c.BaseURL) - - req, err := http.NewRequestWithContext(ctx, "GET", url, nil) - if err != nil { - return fmt.Errorf("failed to create health check request: %w", err) - } - - resp, err := c.HTTPClient.Do(req) - if err != nil { - return fmt.Errorf("health check request failed: %w", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - return fmt.Errorf("Hive API health check failed with status: %d", resp.StatusCode) - } - - return nil -} \ No newline at end of file diff --git a/pkg/hive/models.go b/pkg/hive/models.go deleted file mode 100644 index cb627a8f..00000000 --- a/pkg/hive/models.go +++ /dev/null @@ -1,118 +0,0 @@ -package hive - -import "time" - -// Project represents a project managed by the Hive system -type Project struct { - ID int `json:"id"` - Name string `json:"name"` - Description string `json:"description"` - Status string `json:"status"` - GitURL string `json:"git_url"` - Owner string `json:"owner"` - Repository string `json:"repository"` - Branch string `json:"branch"` - BzzzEnabled bool `json:"bzzz_enabled"` - ReadyToClaim bool `json:"ready_to_claim"` - PrivateRepo bool `json:"private_repo"` - GitHubTokenRequired bool `json:"github_token_required"` - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` - Metadata map[string]interface{} `json:"metadata,omitempty"` -} - -// Task represents a task (GitHub issue) from the Hive system -type Task struct { - ID int `json:"id"` - ProjectID int `json:"project_id"` - ProjectName string `json:"project_name"` - GitURL string `json:"git_url"` - Owner string `json:"owner"` - Repository string `json:"repository"` - Branch string `json:"branch"` - - // GitHub issue fields - IssueNumber int `json:"issue_number"` - Title string `json:"title"` - Description string `json:"description"` - State string `json:"state"` - Assignee string `json:"assignee,omitempty"` - - // Task metadata - TaskType string `json:"task_type"` - Priority int `json:"priority"` - Labels []string `json:"labels"` - Requirements []string `json:"requirements,omitempty"` - Deliverables []string `json:"deliverables,omitempty"` - Context map[string]interface{} `json:"context,omitempty"` - - // Timestamps - CreatedAt time.Time `json:"created_at"` - UpdatedAt time.Time `json:"updated_at"` -} - -// TaskClaim represents a task claim in the Hive system -type TaskClaim struct { - ID int `json:"id"` - ProjectID int `json:"project_id"` - TaskID int `json:"task_id"` - AgentID string `json:"agent_id"` - Status string `json:"status"` // claimed, in_progress, completed, failed - ClaimedAt time.Time `json:"claimed_at"` - UpdatedAt time.Time `json:"updated_at"` - Results map[string]interface{} `json:"results,omitempty"` -} - -// ProjectActivationRequest represents a request to activate/deactivate a project -type ProjectActivationRequest struct { - BzzzEnabled bool `json:"bzzz_enabled"` - ReadyToClaim bool `json:"ready_to_claim"` -} - -// ProjectRegistrationRequest represents a request to register a new project -type ProjectRegistrationRequest struct { - Name string `json:"name"` - Description string `json:"description"` - GitURL string `json:"git_url"` - PrivateRepo bool `json:"private_repo"` - BzzzEnabled bool `json:"bzzz_enabled"` - AutoActivate bool `json:"auto_activate"` -} - -// AgentCapability represents an agent's capabilities for task matching -type AgentCapability struct { - AgentID string `json:"agent_id"` - NodeID string `json:"node_id"` - Capabilities []string `json:"capabilities"` - Models []string `json:"models"` - Status string `json:"status"` - LastSeen time.Time `json:"last_seen"` -} - -// CoordinationEvent represents a P2P coordination event -type CoordinationEvent struct { - EventID string `json:"event_id"` - ProjectID int `json:"project_id"` - TaskID int `json:"task_id"` - EventType string `json:"event_type"` // task_claimed, plan_proposed, escalated, completed - AgentID string `json:"agent_id"` - Message string `json:"message"` - Context map[string]interface{} `json:"context,omitempty"` - Timestamp time.Time `json:"timestamp"` -} - -// ErrorResponse represents an error response from the Hive API -type ErrorResponse struct { - Error string `json:"error"` - Message string `json:"message"` - Code string `json:"code,omitempty"` -} - -// HealthStatus represents the health status of the Hive system -type HealthStatus struct { - Status string `json:"status"` - Version string `json:"version"` - Database string `json:"database"` - Uptime string `json:"uptime"` - CheckedAt time.Time `json:"checked_at"` -} \ No newline at end of file diff --git a/pkg/shutdown/components.go b/pkg/shutdown/components.go new file mode 100644 index 00000000..915a602e --- /dev/null +++ b/pkg/shutdown/components.go @@ -0,0 +1,369 @@ +package shutdown + +import ( + "context" + "fmt" + "net/http" + "time" +) + +// HTTPServerComponent wraps an HTTP server for graceful shutdown +type HTTPServerComponent struct { + name string + server *http.Server + priority int +} + +// NewHTTPServerComponent creates a new HTTP server component +func NewHTTPServerComponent(name string, server *http.Server, priority int) *HTTPServerComponent { + return &HTTPServerComponent{ + name: name, + server: server, + priority: priority, + } +} + +func (h *HTTPServerComponent) Name() string { + return h.name +} + +func (h *HTTPServerComponent) Priority() int { + return h.priority +} + +func (h *HTTPServerComponent) CanForceStop() bool { + return true +} + +func (h *HTTPServerComponent) Shutdown(ctx context.Context) error { + if h.server == nil { + return nil + } + + return h.server.Shutdown(ctx) +} + +// P2PNodeComponent wraps a P2P node for graceful shutdown +type P2PNodeComponent struct { + name string + closer func() error + priority int +} + +// NewP2PNodeComponent creates a new P2P node component +func NewP2PNodeComponent(name string, closer func() error, priority int) *P2PNodeComponent { + return &P2PNodeComponent{ + name: name, + closer: closer, + priority: priority, + } +} + +func (p *P2PNodeComponent) Name() string { + return p.name +} + +func (p *P2PNodeComponent) Priority() int { + return p.priority +} + +func (p *P2PNodeComponent) CanForceStop() bool { + return true +} + +func (p *P2PNodeComponent) Shutdown(ctx context.Context) error { + if p.closer == nil { + return nil + } + + // P2P nodes typically need time to disconnect gracefully + done := make(chan error, 1) + go func() { + done <- p.closer() + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } +} + +// DatabaseComponent wraps a database connection for graceful shutdown +type DatabaseComponent struct { + name string + closer func() error + priority int +} + +// NewDatabaseComponent creates a new database component +func NewDatabaseComponent(name string, closer func() error, priority int) *DatabaseComponent { + return &DatabaseComponent{ + name: name, + closer: closer, + priority: priority, + } +} + +func (d *DatabaseComponent) Name() string { + return d.name +} + +func (d *DatabaseComponent) Priority() int { + return d.priority +} + +func (d *DatabaseComponent) CanForceStop() bool { + return false // Databases shouldn't be force-stopped +} + +func (d *DatabaseComponent) Shutdown(ctx context.Context) error { + if d.closer == nil { + return nil + } + + return d.closer() +} + +// ElectionManagerComponent wraps an election manager for graceful shutdown +type ElectionManagerComponent struct { + name string + stopper func() + priority int +} + +// NewElectionManagerComponent creates a new election manager component +func NewElectionManagerComponent(name string, stopper func(), priority int) *ElectionManagerComponent { + return &ElectionManagerComponent{ + name: name, + stopper: stopper, + priority: priority, + } +} + +func (e *ElectionManagerComponent) Name() string { + return e.name +} + +func (e *ElectionManagerComponent) Priority() int { + return e.priority +} + +func (e *ElectionManagerComponent) CanForceStop() bool { + return true +} + +func (e *ElectionManagerComponent) Shutdown(ctx context.Context) error { + if e.stopper == nil { + return nil + } + + // Election managers need special handling to transfer leadership + done := make(chan struct{}) + go func() { + e.stopper() + close(done) + }() + + select { + case <-done: + return nil + case <-ctx.Done(): + return ctx.Err() + } +} + +// PubSubComponent wraps a PubSub system for graceful shutdown +type PubSubComponent struct { + name string + closer func() error + priority int +} + +// NewPubSubComponent creates a new PubSub component +func NewPubSubComponent(name string, closer func() error, priority int) *PubSubComponent { + return &PubSubComponent{ + name: name, + closer: closer, + priority: priority, + } +} + +func (p *PubSubComponent) Name() string { + return p.name +} + +func (p *PubSubComponent) Priority() int { + return p.priority +} + +func (p *PubSubComponent) CanForceStop() bool { + return true +} + +func (p *PubSubComponent) Shutdown(ctx context.Context) error { + if p.closer == nil { + return nil + } + + return p.closer() +} + +// MonitoringComponent wraps a monitoring system for graceful shutdown +type MonitoringComponent struct { + name string + closer func() error + priority int +} + +// NewMonitoringComponent creates a new monitoring component +func NewMonitoringComponent(name string, closer func() error, priority int) *MonitoringComponent { + return &MonitoringComponent{ + name: name, + closer: closer, + priority: priority, + } +} + +func (m *MonitoringComponent) Name() string { + return m.name +} + +func (m *MonitoringComponent) Priority() int { + return m.priority +} + +func (m *MonitoringComponent) CanForceStop() bool { + return true +} + +func (m *MonitoringComponent) Shutdown(ctx context.Context) error { + if m.closer == nil { + return nil + } + + return m.closer() +} + +// GenericComponent provides a generic wrapper for any component with a close function +type GenericComponent struct { + name string + closer func() error + priority int + canForceStop bool + shutdownFunc func(ctx context.Context) error +} + +// NewGenericComponent creates a new generic component +func NewGenericComponent(name string, priority int, canForceStop bool) *GenericComponent { + return &GenericComponent{ + name: name, + priority: priority, + canForceStop: canForceStop, + } +} + +// SetCloser sets a simple closer function +func (g *GenericComponent) SetCloser(closer func() error) *GenericComponent { + g.closer = closer + return g +} + +// SetShutdownFunc sets a context-aware shutdown function +func (g *GenericComponent) SetShutdownFunc(shutdownFunc func(ctx context.Context) error) *GenericComponent { + g.shutdownFunc = shutdownFunc + return g +} + +func (g *GenericComponent) Name() string { + return g.name +} + +func (g *GenericComponent) Priority() int { + return g.priority +} + +func (g *GenericComponent) CanForceStop() bool { + return g.canForceStop +} + +func (g *GenericComponent) Shutdown(ctx context.Context) error { + if g.shutdownFunc != nil { + return g.shutdownFunc(ctx) + } + + if g.closer != nil { + // Wrap simple closer in context-aware function + done := make(chan error, 1) + go func() { + done <- g.closer() + }() + + select { + case err := <-done: + return err + case <-ctx.Done(): + return ctx.Err() + } + } + + return nil +} + +// WorkerPoolComponent manages a pool of workers for graceful shutdown +type WorkerPoolComponent struct { + name string + stopCh chan struct{} + workers int + priority int + shutdownTime time.Duration +} + +// NewWorkerPoolComponent creates a new worker pool component +func NewWorkerPoolComponent(name string, stopCh chan struct{}, workers int, priority int) *WorkerPoolComponent { + return &WorkerPoolComponent{ + name: name, + stopCh: stopCh, + workers: workers, + priority: priority, + shutdownTime: 10 * time.Second, + } +} + +func (w *WorkerPoolComponent) Name() string { + return fmt.Sprintf("%s (workers: %d)", w.name, w.workers) +} + +func (w *WorkerPoolComponent) Priority() int { + return w.priority +} + +func (w *WorkerPoolComponent) CanForceStop() bool { + return true +} + +func (w *WorkerPoolComponent) Shutdown(ctx context.Context) error { + if w.stopCh == nil { + return nil + } + + // Signal workers to stop + close(w.stopCh) + + // Wait for workers to finish with timeout + timeout := w.shutdownTime + if deadline, ok := ctx.Deadline(); ok { + if remaining := time.Until(deadline); remaining < timeout { + timeout = remaining + } + } + + // In a real implementation, you would wait for workers to signal completion + select { + case <-time.After(timeout): + return fmt.Errorf("workers did not shut down within %v", timeout) + case <-ctx.Done(): + return ctx.Err() + } +} \ No newline at end of file diff --git a/pkg/shutdown/manager.go b/pkg/shutdown/manager.go new file mode 100644 index 00000000..b603d467 --- /dev/null +++ b/pkg/shutdown/manager.go @@ -0,0 +1,380 @@ +package shutdown + +import ( + "context" + "fmt" + "os" + "os/signal" + "sync" + "syscall" + "time" +) + +// Manager provides coordinated graceful shutdown for all system components +type Manager struct { + mu sync.RWMutex + components map[string]Component + hooks map[Phase][]Hook + timeout time.Duration + forceTimeout time.Duration + signals []os.Signal + signalCh chan os.Signal + shutdownCh chan struct{} + completedCh chan struct{} + started bool + shutdownStarted bool + logger Logger +} + +// Component represents a system component that needs graceful shutdown +type Component interface { + // Name returns the component name for logging + Name() string + + // Shutdown gracefully shuts down the component + Shutdown(ctx context.Context) error + + // Priority returns the shutdown priority (lower numbers shut down first) + Priority() int + + // CanForceStop returns true if the component can be force-stopped + CanForceStop() bool +} + +// Hook represents a function to be called during shutdown phases +type Hook func(ctx context.Context) error + +// Phase represents different phases of the shutdown process +type Phase int + +const ( + PhasePreShutdown Phase = iota // Before any components are shut down + PhaseShutdown // During component shutdown + PhasePostShutdown // After all components are shut down + PhaseCleanup // Final cleanup phase +) + +// Logger interface for shutdown logging +type Logger interface { + Info(msg string, args ...interface{}) + Warn(msg string, args ...interface{}) + Error(msg string, args ...interface{}) +} + +// NewManager creates a new shutdown manager +func NewManager(timeout time.Duration, logger Logger) *Manager { + if timeout == 0 { + timeout = 30 * time.Second + } + + if logger == nil { + logger = &defaultLogger{} + } + + return &Manager{ + components: make(map[string]Component), + hooks: make(map[Phase][]Hook), + timeout: timeout, + forceTimeout: timeout + 15*time.Second, + signals: []os.Signal{os.Interrupt, syscall.SIGTERM, syscall.SIGQUIT}, + signalCh: make(chan os.Signal, 1), + shutdownCh: make(chan struct{}), + completedCh: make(chan struct{}), + logger: logger, + } +} + +// Register adds a component for graceful shutdown +func (m *Manager) Register(component Component) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.shutdownStarted { + m.logger.Warn("Cannot register component '%s' - shutdown already started", component.Name()) + return + } + + m.components[component.Name()] = component + m.logger.Info("Registered component for graceful shutdown: %s (priority: %d)", + component.Name(), component.Priority()) +} + +// Unregister removes a component from graceful shutdown +func (m *Manager) Unregister(name string) { + m.mu.Lock() + defer m.mu.Unlock() + + if m.shutdownStarted { + m.logger.Warn("Cannot unregister component '%s' - shutdown already started", name) + return + } + + delete(m.components, name) + m.logger.Info("Unregistered component from graceful shutdown: %s", name) +} + +// AddHook adds a hook to be called during a specific shutdown phase +func (m *Manager) AddHook(phase Phase, hook Hook) { + m.mu.Lock() + defer m.mu.Unlock() + + m.hooks[phase] = append(m.hooks[phase], hook) +} + +// Start begins listening for shutdown signals +func (m *Manager) Start() { + m.mu.Lock() + if m.started { + m.mu.Unlock() + return + } + m.started = true + m.mu.Unlock() + + signal.Notify(m.signalCh, m.signals...) + + go m.signalHandler() + m.logger.Info("Graceful shutdown manager started, listening for signals: %v", m.signals) +} + +// Stop initiates graceful shutdown programmatically +func (m *Manager) Stop() { + select { + case m.shutdownCh <- struct{}{}: + default: + // Shutdown already initiated + } +} + +// Wait blocks until shutdown is complete +func (m *Manager) Wait() { + <-m.completedCh +} + +// signalHandler handles OS signals and initiates shutdown +func (m *Manager) signalHandler() { + select { + case sig := <-m.signalCh: + m.logger.Info("Received signal %v, initiating graceful shutdown", sig) + m.initiateShutdown() + case <-m.shutdownCh: + m.logger.Info("Programmatic shutdown requested") + m.initiateShutdown() + } +} + +// initiateShutdown performs the actual shutdown process +func (m *Manager) initiateShutdown() { + m.mu.Lock() + if m.shutdownStarted { + m.mu.Unlock() + return + } + m.shutdownStarted = true + m.mu.Unlock() + + defer close(m.completedCh) + + // Create main shutdown context with timeout + ctx, cancel := context.WithTimeout(context.Background(), m.timeout) + defer cancel() + + // Create force shutdown context + forceCtx, forceCancel := context.WithTimeout(context.Background(), m.forceTimeout) + defer forceCancel() + + // Start force shutdown monitor + go m.forceShutdownMonitor(forceCtx) + + startTime := time.Now() + m.logger.Info("🛑 Beginning graceful shutdown (timeout: %v)", m.timeout) + + // Phase 1: Pre-shutdown hooks + if err := m.executeHooks(ctx, PhasePreShutdown); err != nil { + m.logger.Error("Pre-shutdown hooks failed: %v", err) + } + + // Phase 2: Shutdown components in priority order + if err := m.shutdownComponents(ctx); err != nil { + m.logger.Error("Component shutdown failed: %v", err) + } + + // Phase 3: Post-shutdown hooks + if err := m.executeHooks(ctx, PhasePostShutdown); err != nil { + m.logger.Error("Post-shutdown hooks failed: %v", err) + } + + // Phase 4: Cleanup hooks + if err := m.executeHooks(ctx, PhaseCleanup); err != nil { + m.logger.Error("Cleanup hooks failed: %v", err) + } + + elapsed := time.Since(startTime) + m.logger.Info("✅ Graceful shutdown completed in %v", elapsed) +} + +// executeHooks runs all hooks for a given phase +func (m *Manager) executeHooks(ctx context.Context, phase Phase) error { + m.mu.RLock() + hooks := m.hooks[phase] + m.mu.RUnlock() + + if len(hooks) == 0 { + return nil + } + + phaseName := map[Phase]string{ + PhasePreShutdown: "pre-shutdown", + PhaseShutdown: "shutdown", + PhasePostShutdown: "post-shutdown", + PhaseCleanup: "cleanup", + }[phase] + + m.logger.Info("🔧 Executing %s hooks (%d hooks)", phaseName, len(hooks)) + + for i, hook := range hooks { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + if err := hook(ctx); err != nil { + m.logger.Error("Hook %d in %s phase failed: %v", i+1, phaseName, err) + // Continue with other hooks even if one fails + } + } + + return nil +} + +// shutdownComponents shuts down all registered components in priority order +func (m *Manager) shutdownComponents(ctx context.Context) error { + m.mu.RLock() + components := make([]Component, 0, len(m.components)) + for _, comp := range m.components { + components = append(components, comp) + } + m.mu.RUnlock() + + if len(components) == 0 { + m.logger.Info("No components registered for shutdown") + return nil + } + + // Sort components by priority (lower numbers first) + for i := 0; i < len(components)-1; i++ { + for j := i + 1; j < len(components); j++ { + if components[i].Priority() > components[j].Priority() { + components[i], components[j] = components[j], components[i] + } + } + } + + m.logger.Info("🔄 Shutting down %d components in priority order", len(components)) + + // Shutdown components with individual timeouts + componentTimeout := m.timeout / time.Duration(len(components)) + if componentTimeout < 5*time.Second { + componentTimeout = 5 * time.Second + } + + for _, comp := range components { + select { + case <-ctx.Done(): + m.logger.Warn("Main shutdown context cancelled, attempting force shutdown") + return m.forceShutdownRemainingComponents(components) + default: + } + + compCtx, compCancel := context.WithTimeout(ctx, componentTimeout) + + m.logger.Info("🔄 Shutting down component: %s (priority: %d, timeout: %v)", + comp.Name(), comp.Priority(), componentTimeout) + + start := time.Now() + if err := comp.Shutdown(compCtx); err != nil { + elapsed := time.Since(start) + m.logger.Error("❌ Component '%s' shutdown failed after %v: %v", + comp.Name(), elapsed, err) + } else { + elapsed := time.Since(start) + m.logger.Info("✅ Component '%s' shutdown completed in %v", + comp.Name(), elapsed) + } + + compCancel() + } + + return nil +} + +// forceShutdownMonitor monitors for force shutdown timeout +func (m *Manager) forceShutdownMonitor(ctx context.Context) { + <-ctx.Done() + if ctx.Err() == context.DeadlineExceeded { + m.logger.Error("💥 Force shutdown timeout reached, terminating process") + os.Exit(1) + } +} + +// forceShutdownRemainingComponents attempts to force stop components that can be force-stopped +func (m *Manager) forceShutdownRemainingComponents(components []Component) error { + m.logger.Warn("🚨 Attempting force shutdown of remaining components") + + for _, comp := range components { + if comp.CanForceStop() { + m.logger.Warn("🔨 Force stopping component: %s", comp.Name()) + // For force stop, we give a very short timeout + forceCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + comp.Shutdown(forceCtx) + cancel() + } else { + m.logger.Warn("⚠️ Component '%s' cannot be force stopped", comp.Name()) + } + } + + return nil +} + +// GetStatus returns the current shutdown status +func (m *Manager) GetStatus() *Status { + m.mu.RLock() + defer m.mu.RUnlock() + + status := &Status{ + Started: m.started, + ShutdownStarted: m.shutdownStarted, + ComponentCount: len(m.components), + Components: make([]string, 0, len(m.components)), + } + + for name := range m.components { + status.Components = append(status.Components, name) + } + + return status +} + +// Status represents the current shutdown manager status +type Status struct { + Started bool `json:"started"` + ShutdownStarted bool `json:"shutdown_started"` + ComponentCount int `json:"component_count"` + Components []string `json:"components"` +} + +// defaultLogger is a simple logger implementation +type defaultLogger struct{} + +func (l *defaultLogger) Info(msg string, args ...interface{}) { + fmt.Printf("[INFO] "+msg+"\n", args...) +} + +func (l *defaultLogger) Warn(msg string, args ...interface{}) { + fmt.Printf("[WARN] "+msg+"\n", args...) +} + +func (l *defaultLogger) Error(msg string, args ...interface{}) { + fmt.Printf("[ERROR] "+msg+"\n", args...) +} \ No newline at end of file diff --git a/pkg/slurp/storage/compression_test.go b/pkg/slurp/storage/compression_test.go new file mode 100644 index 00000000..5ead9db3 --- /dev/null +++ b/pkg/slurp/storage/compression_test.go @@ -0,0 +1,218 @@ +package storage + +import ( + "bytes" + "context" + "os" + "strings" + "testing" + "time" +) + +func TestLocalStorageCompression(t *testing.T) { + // Create temporary directory for test + tempDir := t.TempDir() + + // Create storage with compression enabled + options := DefaultLocalStorageOptions() + options.Compression = true + + storage, err := NewLocalStorage(tempDir, options) + if err != nil { + t.Fatalf("Failed to create storage: %v", err) + } + defer storage.Close() + + // Test data that should compress well + largeData := strings.Repeat("This is a test string that should compress well! ", 100) + + // Store with compression enabled + storeOptions := &StoreOptions{ + Compress: true, + } + + ctx := context.Background() + err = storage.Store(ctx, "test-compress", largeData, storeOptions) + if err != nil { + t.Fatalf("Failed to store compressed data: %v", err) + } + + // Retrieve and verify + retrieved, err := storage.Retrieve(ctx, "test-compress") + if err != nil { + t.Fatalf("Failed to retrieve compressed data: %v", err) + } + + // Verify data integrity + if retrievedStr, ok := retrieved.(string); ok { + if retrievedStr != largeData { + t.Error("Retrieved data doesn't match original") + } + } else { + t.Error("Retrieved data is not a string") + } + + // Check compression stats + stats, err := storage.GetCompressionStats() + if err != nil { + t.Fatalf("Failed to get compression stats: %v", err) + } + + if stats.CompressedEntries == 0 { + t.Error("Expected at least one compressed entry") + } + + if stats.CompressionRatio == 0 { + t.Error("Expected non-zero compression ratio") + } + + t.Logf("Compression stats: %d/%d entries compressed, ratio: %.2f", + stats.CompressedEntries, stats.TotalEntries, stats.CompressionRatio) +} + +func TestCompressionMethods(t *testing.T) { + // Create storage instance for testing compression methods + tempDir := t.TempDir() + storage, err := NewLocalStorage(tempDir, nil) + if err != nil { + t.Fatalf("Failed to create storage: %v", err) + } + defer storage.Close() + + // Test data + originalData := []byte(strings.Repeat("Hello, World! ", 1000)) + + // Test compression + compressed, err := storage.compress(originalData) + if err != nil { + t.Fatalf("Compression failed: %v", err) + } + + t.Logf("Original size: %d bytes", len(originalData)) + t.Logf("Compressed size: %d bytes", len(compressed)) + + // Compressed data should be smaller for repetitive data + if len(compressed) >= len(originalData) { + t.Log("Compression didn't reduce size (may be expected for small or non-repetitive data)") + } + + // Test decompression + decompressed, err := storage.decompress(compressed) + if err != nil { + t.Fatalf("Decompression failed: %v", err) + } + + // Verify data integrity + if !bytes.Equal(originalData, decompressed) { + t.Error("Decompressed data doesn't match original") + } +} + +func TestStorageOptimization(t *testing.T) { + // Create temporary directory for test + tempDir := t.TempDir() + + storage, err := NewLocalStorage(tempDir, nil) + if err != nil { + t.Fatalf("Failed to create storage: %v", err) + } + defer storage.Close() + + ctx := context.Background() + + // Store multiple entries without compression + testData := []struct { + key string + data string + }{ + {"small", "small data"}, + {"large1", strings.Repeat("Large repetitive data ", 100)}, + {"large2", strings.Repeat("Another large repetitive dataset ", 100)}, + {"medium", strings.Repeat("Medium data ", 50)}, + } + + for _, item := range testData { + err = storage.Store(ctx, item.key, item.data, &StoreOptions{Compress: false}) + if err != nil { + t.Fatalf("Failed to store %s: %v", item.key, err) + } + } + + // Check initial stats + initialStats, err := storage.GetCompressionStats() + if err != nil { + t.Fatalf("Failed to get initial stats: %v", err) + } + + t.Logf("Initial: %d entries, %d compressed", + initialStats.TotalEntries, initialStats.CompressedEntries) + + // Optimize storage with threshold (only compress entries larger than 100 bytes) + err = storage.OptimizeStorage(ctx, 100) + if err != nil { + t.Fatalf("Storage optimization failed: %v", err) + } + + // Check final stats + finalStats, err := storage.GetCompressionStats() + if err != nil { + t.Fatalf("Failed to get final stats: %v", err) + } + + t.Logf("Final: %d entries, %d compressed", + finalStats.TotalEntries, finalStats.CompressedEntries) + + // Should have more compressed entries after optimization + if finalStats.CompressedEntries <= initialStats.CompressedEntries { + t.Log("Note: Optimization didn't increase compressed entries (may be expected)") + } + + // Verify all data is still retrievable + for _, item := range testData { + retrieved, err := storage.Retrieve(ctx, item.key) + if err != nil { + t.Fatalf("Failed to retrieve %s after optimization: %v", item.key, err) + } + + if retrievedStr, ok := retrieved.(string); ok { + if retrievedStr != item.data { + t.Errorf("Data mismatch for %s after optimization", item.key) + } + } + } +} + +func TestCompressionFallback(t *testing.T) { + // Test that compression falls back gracefully for incompressible data + tempDir := t.TempDir() + storage, err := NewLocalStorage(tempDir, nil) + if err != nil { + t.Fatalf("Failed to create storage: %v", err) + } + defer storage.Close() + + // Random-like data that won't compress well + randomData := []byte("a1b2c3d4e5f6g7h8i9j0k1l2m3n4o5p6q7r8s9t0u1v2w3x4y5z6") + + // Test compression + compressed, err := storage.compress(randomData) + if err != nil { + t.Fatalf("Compression failed: %v", err) + } + + // Should return original data if compression doesn't help + if len(compressed) >= len(randomData) { + t.Log("Compression correctly returned original data for incompressible input") + } + + // Test decompression of uncompressed data + decompressed, err := storage.decompress(randomData) + if err != nil { + t.Fatalf("Decompression fallback failed: %v", err) + } + + // Should return original data unchanged + if !bytes.Equal(randomData, decompressed) { + t.Error("Decompression fallback changed data") + } +} \ No newline at end of file diff --git a/pkg/slurp/storage/local_storage.go b/pkg/slurp/storage/local_storage.go index af37ee78..7b95609f 100644 --- a/pkg/slurp/storage/local_storage.go +++ b/pkg/slurp/storage/local_storage.go @@ -1,15 +1,19 @@ package storage import ( + "bytes" + "compress/gzip" "context" "crypto/sha256" "encoding/json" "fmt" + "io" "io/fs" "os" "path/filepath" "regexp" "sync" + "syscall" "time" "github.com/syndtr/goleveldb/leveldb" @@ -400,30 +404,66 @@ type StorageEntry struct { // Helper methods func (ls *LocalStorageImpl) compress(data []byte) ([]byte, error) { - // Simple compression using gzip - could be enhanced with better algorithms - // This is a placeholder - implement actual compression - return data, nil // TODO: Implement compression + // Use gzip compression for efficient data storage + var buf bytes.Buffer + + // Create gzip writer with best compression + writer := gzip.NewWriter(&buf) + writer.Header.Name = "storage_data" + writer.Header.Comment = "BZZZ SLURP local storage compressed data" + + // Write data to gzip writer + if _, err := writer.Write(data); err != nil { + writer.Close() + return nil, fmt.Errorf("failed to write compressed data: %w", err) + } + + // Close writer to flush data + if err := writer.Close(); err != nil { + return nil, fmt.Errorf("failed to close gzip writer: %w", err) + } + + compressed := buf.Bytes() + + // Only return compressed data if it's actually smaller + if len(compressed) >= len(data) { + // Compression didn't help, return original data + return data, nil + } + + return compressed, nil } func (ls *LocalStorageImpl) decompress(data []byte) ([]byte, error) { - // Decompression counterpart - // This is a placeholder - implement actual decompression - return data, nil // TODO: Implement decompression + // Create gzip reader + reader, err := gzip.NewReader(bytes.NewReader(data)) + if err != nil { + // Data might not be compressed (fallback case) + return data, nil + } + defer reader.Close() + + // Read decompressed data + var buf bytes.Buffer + if _, err := io.Copy(&buf, reader); err != nil { + return nil, fmt.Errorf("failed to decompress data: %w", err) + } + + return buf.Bytes(), nil } func (ls *LocalStorageImpl) getAvailableSpace() (int64, error) { - // Get filesystem stats for the storage directory - var stat fs.FileInfo - var err error - - if stat, err = os.Stat(ls.basePath); err != nil { - return 0, err + // Get filesystem stats for the storage directory using syscalls + var stat syscall.Statfs_t + if err := syscall.Statfs(ls.basePath, &stat); err != nil { + return 0, fmt.Errorf("failed to get filesystem stats: %w", err) } - // This is a simplified implementation - // For production, use syscall.Statfs or similar platform-specific calls - _ = stat - return 1024 * 1024 * 1024 * 10, nil // Placeholder: 10GB + // Calculate available space in bytes + // Available blocks * block size + availableBytes := int64(stat.Bavail) * int64(stat.Bsize) + + return availableBytes, nil } func (ls *LocalStorageImpl) updateFragmentationRatio() { @@ -452,6 +492,120 @@ func (ls *LocalStorageImpl) backgroundCompaction() { } } +// GetCompressionStats returns compression statistics +func (ls *LocalStorageImpl) GetCompressionStats() (*CompressionStats, error) { + ls.mu.RLock() + defer ls.mu.RUnlock() + + stats := &CompressionStats{ + TotalEntries: 0, + CompressedEntries: 0, + TotalSize: ls.metrics.TotalSize, + CompressedSize: ls.metrics.CompressedSize, + CompressionRatio: 0.0, + } + + // Iterate through all entries to get accurate stats + iter := ls.db.NewIterator(nil, nil) + defer iter.Release() + + for iter.Next() { + stats.TotalEntries++ + + // Try to parse entry to check if compressed + var entry StorageEntry + if err := json.Unmarshal(iter.Value(), &entry); err == nil { + if entry.Compressed { + stats.CompressedEntries++ + } + } + } + + // Calculate compression ratio + if stats.TotalSize > 0 { + stats.CompressionRatio = float64(stats.CompressedSize) / float64(stats.TotalSize) + } + + return stats, iter.Error() +} + +// OptimizeStorage performs compression optimization on existing data +func (ls *LocalStorageImpl) OptimizeStorage(ctx context.Context, compressThreshold int64) error { + ls.mu.Lock() + defer ls.mu.Unlock() + + optimized := 0 + skipped := 0 + + // Iterate through all entries + iter := ls.db.NewIterator(nil, nil) + defer iter.Release() + + for iter.Next() { + select { + case <-ctx.Done(): + return ctx.Err() + default: + } + + key := string(iter.Key()) + + // Parse existing entry + var entry StorageEntry + if err := json.Unmarshal(iter.Value(), &entry); err != nil { + continue // Skip malformed entries + } + + // Skip if already compressed or too small + if entry.Compressed || int64(len(entry.Data)) < compressThreshold { + skipped++ + continue + } + + // Try compression + compressedData, err := ls.compress(entry.Data) + if err != nil { + continue // Skip on compression error + } + + // Only update if compression helped + if len(compressedData) < len(entry.Data) { + entry.Compressed = true + entry.OriginalSize = int64(len(entry.Data)) + entry.CompressedSize = int64(len(compressedData)) + entry.Data = compressedData + entry.UpdatedAt = time.Now() + + // Save updated entry + entryBytes, err := json.Marshal(entry) + if err != nil { + continue + } + + writeOpt := &opt.WriteOptions{Sync: ls.options.SyncWrites} + if err := ls.db.Put([]byte(key), entryBytes, writeOpt); err != nil { + continue + } + + optimized++ + } else { + skipped++ + } + } + + fmt.Printf("Storage optimization complete: %d entries compressed, %d skipped\n", optimized, skipped) + return iter.Error() +} + +// CompressionStats holds compression statistics +type CompressionStats struct { + TotalEntries int64 `json:"total_entries"` + CompressedEntries int64 `json:"compressed_entries"` + TotalSize int64 `json:"total_size"` + CompressedSize int64 `json:"compressed_size"` + CompressionRatio float64 `json:"compression_ratio"` +} + // Close closes the local storage func (ls *LocalStorageImpl) Close() error { ls.mu.Lock() diff --git a/test/README.md b/test/README.md index f713348e..c2f3bb6e 100644 --- a/test/README.md +++ b/test/README.md @@ -1,6 +1,6 @@ # Bzzz Antennae Test Suite -This directory contains a comprehensive test suite for the Bzzz antennae coordination system that operates independently of external services like Hive, GitHub, or n8n. +This directory contains a comprehensive test suite for the Bzzz antennae coordination system that operates independently of external services like WHOOSH, GitHub, or n8n. ## Components diff --git a/test/task_simulator.go b/test/task_simulator.go index 3dd6dc67..4a7f8128 100644 --- a/test/task_simulator.go +++ b/test/task_simulator.go @@ -255,8 +255,8 @@ func generateMockRepositories() []MockRepository { return []MockRepository{ { Owner: "deepblackcloud", - Name: "hive", - URL: "https://github.com/deepblackcloud/hive", + Name: "whoosh", + URL: "https://github.com/deepblackcloud/whoosh", Dependencies: []string{"bzzz", "distributed-ai-dev"}, Tasks: []MockTask{ { @@ -288,7 +288,7 @@ func generateMockRepositories() []MockRepository { Owner: "deepblackcloud", Name: "bzzz", URL: "https://github.com/anthonyrawlins/bzzz", - Dependencies: []string{"hive"}, + Dependencies: []string{"whoosh"}, Tasks: []MockTask{ { Number: 23, @@ -329,7 +329,7 @@ func generateMockRepositories() []MockRepository { RequiredSkills: []string{"p2p", "python", "integration"}, Dependencies: []TaskDependency{ {Repository: "bzzz", TaskNumber: 23, DependencyType: "api_contract"}, - {Repository: "hive", TaskNumber: 16, DependencyType: "security"}, + {Repository: "whoosh", TaskNumber: 16, DependencyType: "security"}, }, }, }, @@ -343,11 +343,11 @@ func generateCoordinationScenarios() []CoordinationScenario { { Name: "Cross-Repository API Integration", Description: "Testing coordination when multiple repos need to implement a shared API", - Repositories: []string{"hive", "bzzz", "distributed-ai-dev"}, + Repositories: []string{"whoosh", "bzzz", "distributed-ai-dev"}, Tasks: []ScenarioTask{ {Repository: "bzzz", TaskNumber: 23, Priority: 1, BlockedBy: []ScenarioTask{}}, - {Repository: "hive", TaskNumber: 15, Priority: 2, BlockedBy: []ScenarioTask{{Repository: "bzzz", TaskNumber: 23}}}, - {Repository: "distributed-ai-dev", TaskNumber: 8, Priority: 3, BlockedBy: []ScenarioTask{{Repository: "bzzz", TaskNumber: 23}, {Repository: "hive", TaskNumber: 16}}}, + {Repository: "whoosh", TaskNumber: 15, Priority: 2, BlockedBy: []ScenarioTask{{Repository: "bzzz", TaskNumber: 23}}}, + {Repository: "distributed-ai-dev", TaskNumber: 8, Priority: 3, BlockedBy: []ScenarioTask{{Repository: "bzzz", TaskNumber: 23}, {Repository: "whoosh", TaskNumber: 16}}}, }, ExpectedCoordination: []string{ "API contract should be defined first", @@ -358,10 +358,10 @@ func generateCoordinationScenarios() []CoordinationScenario { { Name: "Security-First Development", Description: "Testing coordination when security requirements block other work", - Repositories: []string{"hive", "distributed-ai-dev"}, + Repositories: []string{"whoosh", "distributed-ai-dev"}, Tasks: []ScenarioTask{ - {Repository: "hive", TaskNumber: 16, Priority: 1, BlockedBy: []ScenarioTask{}}, - {Repository: "distributed-ai-dev", TaskNumber: 8, Priority: 2, BlockedBy: []ScenarioTask{{Repository: "hive", TaskNumber: 16}}}, + {Repository: "whoosh", TaskNumber: 16, Priority: 1, BlockedBy: []ScenarioTask{}}, + {Repository: "distributed-ai-dev", TaskNumber: 8, Priority: 2, BlockedBy: []ScenarioTask{{Repository: "whoosh", TaskNumber: 16}}}, }, ExpectedCoordination: []string{ "Security authentication must be completed first", @@ -371,9 +371,9 @@ func generateCoordinationScenarios() []CoordinationScenario { { Name: "Parallel Development Conflict", Description: "Testing coordination when agents might work on conflicting tasks", - Repositories: []string{"hive", "bzzz"}, + Repositories: []string{"whoosh", "bzzz"}, Tasks: []ScenarioTask{ - {Repository: "hive", TaskNumber: 15, Priority: 1, BlockedBy: []ScenarioTask{}}, + {Repository: "whoosh", TaskNumber: 15, Priority: 1, BlockedBy: []ScenarioTask{}}, {Repository: "bzzz", TaskNumber: 24, Priority: 1, BlockedBy: []ScenarioTask{}}, }, ExpectedCoordination: []string{