🎭 CHORUS now contains full BZZZ functionality adapted for containers Core systems ported: - P2P networking (libp2p with DHT and PubSub) - Task coordination (COOEE protocol) - HMMM collaborative reasoning - SHHH encryption and security - SLURP admin election system - UCXL content addressing - UCXI server integration - Hypercore logging system - Health monitoring and graceful shutdown - License validation with KACHING Container adaptations: - Environment variable configuration (no YAML files) - Container-optimized logging to stdout/stderr - Auto-generated agent IDs for container deployments - Docker-first architecture All proven BZZZ P2P protocols, AI integration, and collaboration features are now available in containerized form. Next: Build and test container deployment. 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
474 lines
12 KiB
Go
474 lines
12 KiB
Go
package integration
|
|
|
|
import (
|
|
"context"
|
|
"crypto/sha256"
|
|
"encoding/json"
|
|
"fmt"
|
|
"log"
|
|
"math"
|
|
"math/rand"
|
|
"os"
|
|
"path/filepath"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// CircuitState represents the state of a circuit breaker
|
|
type CircuitState int
|
|
|
|
const (
|
|
CircuitClosed CircuitState = iota
|
|
CircuitOpen
|
|
CircuitHalfOpen
|
|
)
|
|
|
|
// String returns string representation of circuit state
|
|
func (s CircuitState) String() string {
|
|
switch s {
|
|
case CircuitClosed:
|
|
return "CLOSED"
|
|
case CircuitOpen:
|
|
return "OPEN"
|
|
case CircuitHalfOpen:
|
|
return "HALF_OPEN"
|
|
default:
|
|
return "UNKNOWN"
|
|
}
|
|
}
|
|
|
|
// CircuitBreaker implements circuit breaker pattern for SLURP client
|
|
type CircuitBreaker struct {
|
|
mu sync.RWMutex
|
|
state CircuitState
|
|
failureCount int
|
|
consecutiveFailures int
|
|
lastFailureTime time.Time
|
|
nextRetryTime time.Time
|
|
|
|
// Configuration
|
|
maxFailures int // Max failures before opening circuit
|
|
cooldownPeriod time.Duration // How long to stay open
|
|
halfOpenTimeout time.Duration // How long to wait in half-open before closing
|
|
|
|
// Metrics
|
|
totalRequests int64
|
|
successfulRequests int64
|
|
failedRequests int64
|
|
}
|
|
|
|
// NewCircuitBreaker creates a new circuit breaker
|
|
func NewCircuitBreaker(maxFailures int, cooldownPeriod, halfOpenTimeout time.Duration) *CircuitBreaker {
|
|
return &CircuitBreaker{
|
|
state: CircuitClosed,
|
|
maxFailures: maxFailures,
|
|
cooldownPeriod: cooldownPeriod,
|
|
halfOpenTimeout: halfOpenTimeout,
|
|
}
|
|
}
|
|
|
|
// CanProceed checks if request can proceed through circuit breaker
|
|
func (cb *CircuitBreaker) CanProceed() bool {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
cb.totalRequests++
|
|
|
|
switch cb.state {
|
|
case CircuitClosed:
|
|
return true
|
|
|
|
case CircuitOpen:
|
|
if time.Now().After(cb.nextRetryTime) {
|
|
cb.state = CircuitHalfOpen
|
|
log.Printf("🔄 Circuit breaker moving to HALF_OPEN state")
|
|
return true
|
|
}
|
|
return false
|
|
|
|
case CircuitHalfOpen:
|
|
return true
|
|
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
// RecordSuccess records a successful operation
|
|
func (cb *CircuitBreaker) RecordSuccess() {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
cb.successfulRequests++
|
|
cb.failureCount = 0
|
|
cb.consecutiveFailures = 0
|
|
|
|
if cb.state == CircuitHalfOpen {
|
|
cb.state = CircuitClosed
|
|
log.Printf("✅ Circuit breaker closed after successful operation")
|
|
}
|
|
}
|
|
|
|
// RecordFailure records a failed operation
|
|
func (cb *CircuitBreaker) RecordFailure() {
|
|
cb.mu.Lock()
|
|
defer cb.mu.Unlock()
|
|
|
|
cb.failedRequests++
|
|
cb.failureCount++
|
|
cb.consecutiveFailures++
|
|
cb.lastFailureTime = time.Now()
|
|
|
|
if cb.failureCount >= cb.maxFailures && cb.state == CircuitClosed {
|
|
cb.state = CircuitOpen
|
|
cb.nextRetryTime = time.Now().Add(cb.cooldownPeriod)
|
|
log.Printf("🚫 Circuit breaker opened due to %d consecutive failures", cb.consecutiveFailures)
|
|
}
|
|
}
|
|
|
|
// GetStats returns circuit breaker statistics
|
|
func (cb *CircuitBreaker) GetStats() map[string]interface{} {
|
|
cb.mu.RLock()
|
|
defer cb.mu.RUnlock()
|
|
|
|
return map[string]interface{}{
|
|
"state": cb.state.String(),
|
|
"total_requests": cb.totalRequests,
|
|
"successful_requests": cb.successfulRequests,
|
|
"failed_requests": cb.failedRequests,
|
|
"current_failures": cb.failureCount,
|
|
"consecutive_failures": cb.consecutiveFailures,
|
|
"last_failure_time": cb.lastFailureTime,
|
|
"next_retry_time": cb.nextRetryTime,
|
|
}
|
|
}
|
|
|
|
// IdempotencyManager handles idempotency key generation and tracking
|
|
type IdempotencyManager struct {
|
|
keys map[string]time.Time
|
|
mu sync.RWMutex
|
|
maxAge time.Duration
|
|
}
|
|
|
|
// NewIdempotencyManager creates a new idempotency manager
|
|
func NewIdempotencyManager(maxAge time.Duration) *IdempotencyManager {
|
|
im := &IdempotencyManager{
|
|
keys: make(map[string]time.Time),
|
|
maxAge: maxAge,
|
|
}
|
|
|
|
// Start cleanup goroutine
|
|
go im.cleanupExpiredKeys()
|
|
|
|
return im
|
|
}
|
|
|
|
// GenerateKey generates a stable idempotency key for an event
|
|
func (im *IdempotencyManager) GenerateKey(discussionID, eventType string, timestamp time.Time) string {
|
|
// Create 5-minute time buckets to handle slight timing differences
|
|
bucket := timestamp.Truncate(5 * time.Minute)
|
|
|
|
// Generate stable hash
|
|
data := fmt.Sprintf("%s_%s_%d", discussionID, eventType, bucket.Unix())
|
|
hash := sha256.Sum256([]byte(data))
|
|
return fmt.Sprintf("hmmm_%x", hash[:8]) // Use first 8 bytes for shorter key
|
|
}
|
|
|
|
// IsProcessed checks if an idempotency key has been processed recently
|
|
func (im *IdempotencyManager) IsProcessed(key string) bool {
|
|
im.mu.RLock()
|
|
defer im.mu.RUnlock()
|
|
|
|
processTime, exists := im.keys[key]
|
|
if !exists {
|
|
return false
|
|
}
|
|
|
|
// Check if key is still valid (not expired)
|
|
return time.Since(processTime) <= im.maxAge
|
|
}
|
|
|
|
// MarkProcessed marks an idempotency key as processed
|
|
func (im *IdempotencyManager) MarkProcessed(key string) {
|
|
im.mu.Lock()
|
|
defer im.mu.Unlock()
|
|
|
|
im.keys[key] = time.Now()
|
|
}
|
|
|
|
// cleanupExpiredKeys periodically removes expired idempotency keys
|
|
func (im *IdempotencyManager) cleanupExpiredKeys() {
|
|
ticker := time.NewTicker(im.maxAge / 2) // Cleanup twice as often as expiry
|
|
defer ticker.Stop()
|
|
|
|
for range ticker.C {
|
|
im.mu.Lock()
|
|
now := time.Now()
|
|
expired := make([]string, 0)
|
|
|
|
for key, processTime := range im.keys {
|
|
if now.Sub(processTime) > im.maxAge {
|
|
expired = append(expired, key)
|
|
}
|
|
}
|
|
|
|
for _, key := range expired {
|
|
delete(im.keys, key)
|
|
}
|
|
|
|
if len(expired) > 0 {
|
|
log.Printf("🧹 Cleaned up %d expired idempotency keys", len(expired))
|
|
}
|
|
|
|
im.mu.Unlock()
|
|
}
|
|
}
|
|
|
|
// DeadLetterQueue handles failed events that need to be retried later
|
|
type DeadLetterQueue struct {
|
|
queueDir string
|
|
mu sync.RWMutex
|
|
items map[string]*DLQItem
|
|
maxRetries int
|
|
}
|
|
|
|
// DLQItem represents an item in the dead letter queue
|
|
type DLQItem struct {
|
|
Event SlurpEvent `json:"event"`
|
|
FailureReason string `json:"failure_reason"`
|
|
RetryCount int `json:"retry_count"`
|
|
NextRetryTime time.Time `json:"next_retry_time"`
|
|
FirstFailed time.Time `json:"first_failed"`
|
|
LastFailed time.Time `json:"last_failed"`
|
|
}
|
|
|
|
// NewDeadLetterQueue creates a new dead letter queue
|
|
func NewDeadLetterQueue(queueDir string, maxRetries int) (*DeadLetterQueue, error) {
|
|
if err := os.MkdirAll(queueDir, 0755); err != nil {
|
|
return nil, fmt.Errorf("failed to create queue directory: %w", err)
|
|
}
|
|
|
|
dlq := &DeadLetterQueue{
|
|
queueDir: queueDir,
|
|
items: make(map[string]*DLQItem),
|
|
maxRetries: maxRetries,
|
|
}
|
|
|
|
// Load existing items from disk
|
|
if err := dlq.loadFromDisk(); err != nil {
|
|
log.Printf("⚠️ Failed to load DLQ from disk: %v", err)
|
|
}
|
|
|
|
return dlq, nil
|
|
}
|
|
|
|
// Enqueue adds a failed event to the dead letter queue
|
|
func (dlq *DeadLetterQueue) Enqueue(event SlurpEvent, reason string) error {
|
|
dlq.mu.Lock()
|
|
defer dlq.mu.Unlock()
|
|
|
|
eventID := dlq.generateEventID(event)
|
|
now := time.Now()
|
|
|
|
// Check if event already exists in DLQ
|
|
if existing, exists := dlq.items[eventID]; exists {
|
|
existing.RetryCount++
|
|
existing.FailureReason = reason
|
|
existing.LastFailed = now
|
|
existing.NextRetryTime = dlq.calculateNextRetry(existing.RetryCount)
|
|
|
|
log.Printf("💀 Updated DLQ item %s (retry %d/%d)", eventID, existing.RetryCount, dlq.maxRetries)
|
|
} else {
|
|
// Create new DLQ item
|
|
item := &DLQItem{
|
|
Event: event,
|
|
FailureReason: reason,
|
|
RetryCount: 1,
|
|
NextRetryTime: dlq.calculateNextRetry(1),
|
|
FirstFailed: now,
|
|
LastFailed: now,
|
|
}
|
|
|
|
dlq.items[eventID] = item
|
|
log.Printf("💀 Added new item to DLQ: %s", eventID)
|
|
}
|
|
|
|
// Persist to disk
|
|
return dlq.saveToDisk()
|
|
}
|
|
|
|
// GetReadyItems returns items that are ready for retry
|
|
func (dlq *DeadLetterQueue) GetReadyItems() []*DLQItem {
|
|
dlq.mu.RLock()
|
|
defer dlq.mu.RUnlock()
|
|
|
|
now := time.Now()
|
|
ready := make([]*DLQItem, 0)
|
|
|
|
for _, item := range dlq.items {
|
|
if item.RetryCount <= dlq.maxRetries && now.After(item.NextRetryTime) {
|
|
ready = append(ready, item)
|
|
}
|
|
}
|
|
|
|
return ready
|
|
}
|
|
|
|
// MarkSuccess removes an item from the DLQ after successful retry
|
|
func (dlq *DeadLetterQueue) MarkSuccess(eventID string) error {
|
|
dlq.mu.Lock()
|
|
defer dlq.mu.Unlock()
|
|
|
|
delete(dlq.items, eventID)
|
|
log.Printf("✅ Removed successfully retried item from DLQ: %s", eventID)
|
|
|
|
return dlq.saveToDisk()
|
|
}
|
|
|
|
// MarkFailure updates retry count for failed retry attempt
|
|
func (dlq *DeadLetterQueue) MarkFailure(eventID string, reason string) error {
|
|
dlq.mu.Lock()
|
|
defer dlq.mu.Unlock()
|
|
|
|
if item, exists := dlq.items[eventID]; exists {
|
|
item.RetryCount++
|
|
item.FailureReason = reason
|
|
item.LastFailed = time.Now()
|
|
item.NextRetryTime = dlq.calculateNextRetry(item.RetryCount)
|
|
|
|
if item.RetryCount > dlq.maxRetries {
|
|
log.Printf("💀 Item exceeded max retries, keeping in DLQ for manual review: %s", eventID)
|
|
}
|
|
}
|
|
|
|
return dlq.saveToDisk()
|
|
}
|
|
|
|
// GetStats returns DLQ statistics
|
|
func (dlq *DeadLetterQueue) GetStats() map[string]interface{} {
|
|
dlq.mu.RLock()
|
|
defer dlq.mu.RUnlock()
|
|
|
|
ready := 0
|
|
exhausted := 0
|
|
waiting := 0
|
|
|
|
now := time.Now()
|
|
for _, item := range dlq.items {
|
|
if item.RetryCount > dlq.maxRetries {
|
|
exhausted++
|
|
} else if now.After(item.NextRetryTime) {
|
|
ready++
|
|
} else {
|
|
waiting++
|
|
}
|
|
}
|
|
|
|
return map[string]interface{}{
|
|
"total_items": len(dlq.items),
|
|
"ready_for_retry": ready,
|
|
"waiting": waiting,
|
|
"exhausted": exhausted,
|
|
"max_retries": dlq.maxRetries,
|
|
}
|
|
}
|
|
|
|
// calculateNextRetry calculates the next retry time using exponential backoff with jitter
|
|
func (dlq *DeadLetterQueue) calculateNextRetry(retryCount int) time.Time {
|
|
// Exponential backoff: 2^retryCount minutes with jitter
|
|
baseDelay := time.Duration(math.Pow(2, float64(retryCount))) * time.Minute
|
|
|
|
// Add jitter (±25% random variation)
|
|
jitter := time.Duration(rand.Float64()*0.5-0.25) * baseDelay
|
|
delay := baseDelay + jitter
|
|
|
|
// Cap at 1 hour maximum
|
|
if delay > time.Hour {
|
|
delay = time.Hour
|
|
}
|
|
|
|
return time.Now().Add(delay)
|
|
}
|
|
|
|
// generateEventID creates a unique ID for an event
|
|
func (dlq *DeadLetterQueue) generateEventID(event SlurpEvent) string {
|
|
data := fmt.Sprintf("%s_%s_%s_%d",
|
|
event.EventType,
|
|
event.Path,
|
|
event.CreatedBy,
|
|
event.Timestamp.Unix())
|
|
|
|
hash := sha256.Sum256([]byte(data))
|
|
return fmt.Sprintf("dlq_%x", hash[:8])
|
|
}
|
|
|
|
// saveToDisk persists the DLQ to disk
|
|
func (dlq *DeadLetterQueue) saveToDisk() error {
|
|
filePath := filepath.Join(dlq.queueDir, "dlq_items.json")
|
|
|
|
data, err := json.MarshalIndent(dlq.items, "", " ")
|
|
if err != nil {
|
|
return fmt.Errorf("failed to marshal DLQ items: %w", err)
|
|
}
|
|
|
|
return os.WriteFile(filePath, data, 0644)
|
|
}
|
|
|
|
// loadFromDisk loads the DLQ from disk
|
|
func (dlq *DeadLetterQueue) loadFromDisk() error {
|
|
filePath := filepath.Join(dlq.queueDir, "dlq_items.json")
|
|
|
|
data, err := os.ReadFile(filePath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return nil // No existing queue file, start fresh
|
|
}
|
|
return fmt.Errorf("failed to read DLQ file: %w", err)
|
|
}
|
|
|
|
return json.Unmarshal(data, &dlq.items)
|
|
}
|
|
|
|
// BackoffStrategy calculates retry delays with exponential backoff and jitter
|
|
type BackoffStrategy struct {
|
|
initialDelay time.Duration
|
|
maxDelay time.Duration
|
|
multiplier float64
|
|
jitterFactor float64
|
|
}
|
|
|
|
// NewBackoffStrategy creates a new backoff strategy
|
|
func NewBackoffStrategy(initialDelay, maxDelay time.Duration, multiplier, jitterFactor float64) *BackoffStrategy {
|
|
return &BackoffStrategy{
|
|
initialDelay: initialDelay,
|
|
maxDelay: maxDelay,
|
|
multiplier: multiplier,
|
|
jitterFactor: jitterFactor,
|
|
}
|
|
}
|
|
|
|
// GetDelay calculates the delay for a given attempt number
|
|
func (bs *BackoffStrategy) GetDelay(attempt int) time.Duration {
|
|
if attempt <= 0 {
|
|
return bs.initialDelay
|
|
}
|
|
|
|
// Exponential backoff
|
|
delay := time.Duration(float64(bs.initialDelay) * math.Pow(bs.multiplier, float64(attempt-1)))
|
|
|
|
// Apply maximum delay cap
|
|
if delay > bs.maxDelay {
|
|
delay = bs.maxDelay
|
|
}
|
|
|
|
// Add jitter to avoid thundering herd
|
|
jitter := time.Duration(rand.Float64()*bs.jitterFactor*2-bs.jitterFactor) * delay
|
|
delay += jitter
|
|
|
|
// Ensure delay is never negative
|
|
if delay < 0 {
|
|
delay = bs.initialDelay
|
|
}
|
|
|
|
return delay
|
|
} |