 9bdcbe0447
			
		
	
	9bdcbe0447
	
	
	
		
			
			Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			439 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			439 lines
		
	
	
		
			12 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package integration
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"encoding/json"
 | |
| 	"fmt"
 | |
| 	"log"
 | |
| 	"sync"
 | |
| 	"time"
 | |
| 
 | |
| 	"chorus/pkg/config"
 | |
| )
 | |
| 
 | |
| // ReliableSlurpClient wraps SlurpClient with reliability features
 | |
| type ReliableSlurpClient struct {
 | |
| 	baseClient       *SlurpClient
 | |
| 	circuitBreaker   *CircuitBreaker
 | |
| 	idempotencyMgr   *IdempotencyManager
 | |
| 	deadLetterQueue  *DeadLetterQueue
 | |
| 	backoffStrategy  *BackoffStrategy
 | |
| 	
 | |
| 	// Configuration
 | |
| 	config           config.SlurpConfig
 | |
| 	
 | |
| 	// Background processing
 | |
| 	ctx              context.Context
 | |
| 	cancel           context.CancelFunc
 | |
| 	retryWorker      sync.WaitGroup
 | |
| 	
 | |
| 	// Metrics
 | |
| 	metrics          *ReliabilityMetrics
 | |
| 	metricsMutex     sync.RWMutex
 | |
| }
 | |
| 
 | |
| // ReliabilityMetrics tracks reliability-related metrics
 | |
| type ReliabilityMetrics struct {
 | |
| 	TotalEvents          int64     `json:"total_events"`
 | |
| 	SuccessfulEvents     int64     `json:"successful_events"`
 | |
| 	FailedEvents         int64     `json:"failed_events"`
 | |
| 	DeduplicatedEvents   int64     `json:"deduplicated_events"`
 | |
| 	CircuitBreakerTrips  int64     `json:"circuit_breaker_trips"`
 | |
| 	DLQEnqueued          int64     `json:"dlq_enqueued"`
 | |
| 	DLQRetrySuccesses    int64     `json:"dlq_retry_successes"`
 | |
| 	DLQRetryFailures     int64     `json:"dlq_retry_failures"`
 | |
| 	LastEventTime        time.Time `json:"last_event_time"`
 | |
| 	LastSuccessTime      time.Time `json:"last_success_time"`
 | |
| 	LastFailureTime      time.Time `json:"last_failure_time"`
 | |
| }
 | |
| 
 | |
| // NewReliableSlurpClient creates a new reliable SLURP client
 | |
| func NewReliableSlurpClient(ctx context.Context, slurpConfig config.SlurpConfig) (*ReliableSlurpClient, error) {
 | |
| 	if !slurpConfig.Enabled {
 | |
| 		return nil, fmt.Errorf("SLURP integration is disabled")
 | |
| 	}
 | |
| 	
 | |
| 	// Create base client
 | |
| 	baseClient := NewSlurpClient(slurpConfig)
 | |
| 	
 | |
| 	// Test connection
 | |
| 	if err := baseClient.ValidateConnection(ctx); err != nil {
 | |
| 		return nil, fmt.Errorf("failed to validate SLURP connection: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Initialize reliability components
 | |
| 	circuitBreaker := NewCircuitBreaker(
 | |
| 		slurpConfig.Reliability.MaxFailures,
 | |
| 		slurpConfig.Reliability.CooldownPeriod,
 | |
| 		slurpConfig.Reliability.HalfOpenTimeout,
 | |
| 	)
 | |
| 	
 | |
| 	idempotencyMgr := NewIdempotencyManager(slurpConfig.Reliability.IdempotencyWindow)
 | |
| 	
 | |
| 	dlq, err := NewDeadLetterQueue(
 | |
| 		slurpConfig.Reliability.DLQDirectory,
 | |
| 		slurpConfig.Reliability.MaxRetries,
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		return nil, fmt.Errorf("failed to initialize dead letter queue: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	backoffStrategy := NewBackoffStrategy(
 | |
| 		slurpConfig.Reliability.InitialBackoff,
 | |
| 		slurpConfig.Reliability.MaxBackoff,
 | |
| 		slurpConfig.Reliability.BackoffMultiplier,
 | |
| 		slurpConfig.Reliability.JitterFactor,
 | |
| 	)
 | |
| 	
 | |
| 	clientCtx, cancel := context.WithCancel(ctx)
 | |
| 	
 | |
| 	client := &ReliableSlurpClient{
 | |
| 		baseClient:      baseClient,
 | |
| 		circuitBreaker:  circuitBreaker,
 | |
| 		idempotencyMgr:  idempotencyMgr,
 | |
| 		deadLetterQueue: dlq,
 | |
| 		backoffStrategy: backoffStrategy,
 | |
| 		config:          slurpConfig,
 | |
| 		ctx:             clientCtx,
 | |
| 		cancel:          cancel,
 | |
| 		metrics:         &ReliabilityMetrics{},
 | |
| 	}
 | |
| 	
 | |
| 	// Start background retry worker
 | |
| 	client.startRetryWorker()
 | |
| 	
 | |
| 	log.Printf("🛡️ Reliable SLURP client initialized with circuit breaker and DLQ")
 | |
| 	return client, nil
 | |
| }
 | |
| 
 | |
| // CreateEventReliably sends an event with full reliability features
 | |
| func (rc *ReliableSlurpClient) CreateEventReliably(ctx context.Context, event SlurpEvent) (*EventResponse, error) {
 | |
| 	rc.metricsMutex.Lock()
 | |
| 	rc.metrics.TotalEvents++
 | |
| 	rc.metrics.LastEventTime = time.Now()
 | |
| 	rc.metricsMutex.Unlock()
 | |
| 	
 | |
| 	// Generate idempotency key
 | |
| 	idempotencyKey := rc.idempotencyMgr.GenerateKey(
 | |
| 		rc.extractDiscussionID(event),
 | |
| 		event.EventType,
 | |
| 		event.Timestamp,
 | |
| 	)
 | |
| 	
 | |
| 	// Check if already processed
 | |
| 	if rc.idempotencyMgr.IsProcessed(idempotencyKey) {
 | |
| 		rc.metricsMutex.Lock()
 | |
| 		rc.metrics.DeduplicatedEvents++
 | |
| 		rc.metricsMutex.Unlock()
 | |
| 		
 | |
| 		log.Printf("🔄 Event deduplicated with key: %s", idempotencyKey)
 | |
| 		return &EventResponse{
 | |
| 			Success:   true,
 | |
| 			EventID:   idempotencyKey,
 | |
| 			Message:   "Event deduplicated",
 | |
| 			Timestamp: time.Now(),
 | |
| 		}, nil
 | |
| 	}
 | |
| 	
 | |
| 	// Check circuit breaker
 | |
| 	if !rc.circuitBreaker.CanProceed() {
 | |
| 		// Circuit is open, add to DLQ for later retry
 | |
| 		err := rc.deadLetterQueue.Enqueue(event, "Circuit breaker open")
 | |
| 		if err != nil {
 | |
| 			log.Printf("❌ Failed to enqueue event to DLQ: %v", err)
 | |
| 		}
 | |
| 		
 | |
| 		rc.metricsMutex.Lock()
 | |
| 		rc.metrics.DLQEnqueued++
 | |
| 		rc.metricsMutex.Unlock()
 | |
| 		
 | |
| 		return nil, fmt.Errorf("circuit breaker is open, event queued for retry")
 | |
| 	}
 | |
| 	
 | |
| 	// Add idempotency header to event metadata
 | |
| 	if event.Metadata == nil {
 | |
| 		event.Metadata = make(map[string]interface{})
 | |
| 	}
 | |
| 	event.Metadata["idempotency_key"] = idempotencyKey
 | |
| 	
 | |
| 	// Attempt to send event
 | |
| 	resp, err := rc.baseClient.CreateEvent(ctx, event)
 | |
| 	
 | |
| 	if err != nil {
 | |
| 		// Record failure in circuit breaker
 | |
| 		rc.circuitBreaker.RecordFailure()
 | |
| 		
 | |
| 		// Add to DLQ for retry
 | |
| 		if dlqErr := rc.deadLetterQueue.Enqueue(event, err.Error()); dlqErr != nil {
 | |
| 			log.Printf("❌ Failed to enqueue failed event to DLQ: %v", dlqErr)
 | |
| 		} else {
 | |
| 			rc.metricsMutex.Lock()
 | |
| 			rc.metrics.DLQEnqueued++
 | |
| 			rc.metricsMutex.Unlock()
 | |
| 		}
 | |
| 		
 | |
| 		rc.metricsMutex.Lock()
 | |
| 		rc.metrics.FailedEvents++
 | |
| 		rc.metrics.LastFailureTime = time.Now()
 | |
| 		rc.metricsMutex.Unlock()
 | |
| 		
 | |
| 		return nil, fmt.Errorf("failed to send event: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Success! Record in circuit breaker and idempotency manager
 | |
| 	rc.circuitBreaker.RecordSuccess()
 | |
| 	rc.idempotencyMgr.MarkProcessed(idempotencyKey)
 | |
| 	
 | |
| 	rc.metricsMutex.Lock()
 | |
| 	rc.metrics.SuccessfulEvents++
 | |
| 	rc.metrics.LastSuccessTime = time.Now()
 | |
| 	rc.metricsMutex.Unlock()
 | |
| 	
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| // CreateEventsBatchReliably sends a batch of events with reliability features
 | |
| func (rc *ReliableSlurpClient) CreateEventsBatchReliably(ctx context.Context, events []SlurpEvent) (*BatchEventResponse, error) {
 | |
| 	rc.metricsMutex.Lock()
 | |
| 	rc.metrics.TotalEvents += int64(len(events))
 | |
| 	rc.metrics.LastEventTime = time.Now()
 | |
| 	rc.metricsMutex.Unlock()
 | |
| 	
 | |
| 	// Check circuit breaker
 | |
| 	if !rc.circuitBreaker.CanProceed() {
 | |
| 		// Circuit is open, add all events to DLQ
 | |
| 		for _, event := range events {
 | |
| 			if err := rc.deadLetterQueue.Enqueue(event, "Circuit breaker open"); err != nil {
 | |
| 				log.Printf("❌ Failed to enqueue batch event to DLQ: %v", err)
 | |
| 			}
 | |
| 		}
 | |
| 		
 | |
| 		rc.metricsMutex.Lock()
 | |
| 		rc.metrics.DLQEnqueued += int64(len(events))
 | |
| 		rc.metricsMutex.Unlock()
 | |
| 		
 | |
| 		return nil, fmt.Errorf("circuit breaker is open, %d events queued for retry", len(events))
 | |
| 	}
 | |
| 	
 | |
| 	// Add idempotency keys to all events
 | |
| 	processedEvents := make([]SlurpEvent, 0, len(events))
 | |
| 	deduplicatedCount := 0
 | |
| 	
 | |
| 	for _, event := range events {
 | |
| 		idempotencyKey := rc.idempotencyMgr.GenerateKey(
 | |
| 			rc.extractDiscussionID(event),
 | |
| 			event.EventType,
 | |
| 			event.Timestamp,
 | |
| 		)
 | |
| 		
 | |
| 		// Check if already processed
 | |
| 		if rc.idempotencyMgr.IsProcessed(idempotencyKey) {
 | |
| 			deduplicatedCount++
 | |
| 			continue
 | |
| 		}
 | |
| 		
 | |
| 		// Add idempotency key to metadata
 | |
| 		if event.Metadata == nil {
 | |
| 			event.Metadata = make(map[string]interface{})
 | |
| 		}
 | |
| 		event.Metadata["idempotency_key"] = idempotencyKey
 | |
| 		
 | |
| 		processedEvents = append(processedEvents, event)
 | |
| 	}
 | |
| 	
 | |
| 	if deduplicatedCount > 0 {
 | |
| 		rc.metricsMutex.Lock()
 | |
| 		rc.metrics.DeduplicatedEvents += int64(deduplicatedCount)
 | |
| 		rc.metricsMutex.Unlock()
 | |
| 		
 | |
| 		log.Printf("🔄 Deduplicated %d events from batch", deduplicatedCount)
 | |
| 	}
 | |
| 	
 | |
| 	if len(processedEvents) == 0 {
 | |
| 		return &BatchEventResponse{
 | |
| 			Success:        true,
 | |
| 			ProcessedCount: 0,
 | |
| 			FailedCount:    0,
 | |
| 			Message:        "All events were deduplicated",
 | |
| 			Timestamp:      time.Now(),
 | |
| 		}, nil
 | |
| 	}
 | |
| 	
 | |
| 	// Attempt to send batch
 | |
| 	resp, err := rc.baseClient.CreateEventsBatch(ctx, processedEvents)
 | |
| 	
 | |
| 	if err != nil {
 | |
| 		// Record failure in circuit breaker
 | |
| 		rc.circuitBreaker.RecordFailure()
 | |
| 		
 | |
| 		// Add all events to DLQ for retry
 | |
| 		for _, event := range processedEvents {
 | |
| 			if dlqErr := rc.deadLetterQueue.Enqueue(event, err.Error()); dlqErr != nil {
 | |
| 				log.Printf("❌ Failed to enqueue batch event to DLQ: %v", dlqErr)
 | |
| 			}
 | |
| 		}
 | |
| 		
 | |
| 		rc.metricsMutex.Lock()
 | |
| 		rc.metrics.FailedEvents += int64(len(processedEvents))
 | |
| 		rc.metrics.DLQEnqueued += int64(len(processedEvents))
 | |
| 		rc.metrics.LastFailureTime = time.Now()
 | |
| 		rc.metricsMutex.Unlock()
 | |
| 		
 | |
| 		return nil, fmt.Errorf("failed to send batch: %w", err)
 | |
| 	}
 | |
| 	
 | |
| 	// Success! Record in circuit breaker and idempotency manager
 | |
| 	rc.circuitBreaker.RecordSuccess()
 | |
| 	
 | |
| 	// Mark all events as processed
 | |
| 	for _, event := range processedEvents {
 | |
| 		if idempotencyKey, exists := event.Metadata["idempotency_key"].(string); exists {
 | |
| 			rc.idempotencyMgr.MarkProcessed(idempotencyKey)
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	rc.metricsMutex.Lock()
 | |
| 	rc.metrics.SuccessfulEvents += int64(resp.ProcessedCount)
 | |
| 	rc.metrics.FailedEvents += int64(resp.FailedCount)
 | |
| 	rc.metrics.LastSuccessTime = time.Now()
 | |
| 	rc.metricsMutex.Unlock()
 | |
| 	
 | |
| 	return resp, nil
 | |
| }
 | |
| 
 | |
| // GetHealth checks the health of SLURP service and reliability components
 | |
| func (rc *ReliableSlurpClient) GetHealth(ctx context.Context) (*HealthResponse, error) {
 | |
| 	// Try base health check first
 | |
| 	health, err := rc.baseClient.GetHealth(ctx)
 | |
| 	if err != nil {
 | |
| 		rc.circuitBreaker.RecordFailure()
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	
 | |
| 	rc.circuitBreaker.RecordSuccess()
 | |
| 	return health, nil
 | |
| }
 | |
| 
 | |
| // GetReliabilityStats returns comprehensive reliability statistics
 | |
| func (rc *ReliableSlurpClient) GetReliabilityStats() map[string]interface{} {
 | |
| 	rc.metricsMutex.RLock()
 | |
| 	metrics := *rc.metrics
 | |
| 	rc.metricsMutex.RUnlock()
 | |
| 	
 | |
| 	stats := map[string]interface{}{
 | |
| 		"metrics":         metrics,
 | |
| 		"circuit_breaker": rc.circuitBreaker.GetStats(),
 | |
| 		"dead_letter_queue": rc.deadLetterQueue.GetStats(),
 | |
| 	}
 | |
| 	
 | |
| 	return stats
 | |
| }
 | |
| 
 | |
| // startRetryWorker starts background worker to process DLQ items
 | |
| func (rc *ReliableSlurpClient) startRetryWorker() {
 | |
| 	rc.retryWorker.Add(1)
 | |
| 	
 | |
| 	go func() {
 | |
| 		defer rc.retryWorker.Done()
 | |
| 		
 | |
| 		ticker := time.NewTicker(rc.config.Reliability.RetryInterval)
 | |
| 		defer ticker.Stop()
 | |
| 		
 | |
| 		log.Printf("🔄 DLQ retry worker started (interval: %v)", rc.config.Reliability.RetryInterval)
 | |
| 		
 | |
| 		for {
 | |
| 			select {
 | |
| 			case <-rc.ctx.Done():
 | |
| 				log.Printf("🛑 DLQ retry worker stopping")
 | |
| 				return
 | |
| 				
 | |
| 			case <-ticker.C:
 | |
| 				rc.processDLQItems()
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| // processDLQItems processes items ready for retry from the DLQ
 | |
| func (rc *ReliableSlurpClient) processDLQItems() {
 | |
| 	readyItems := rc.deadLetterQueue.GetReadyItems()
 | |
| 	if len(readyItems) == 0 {
 | |
| 		return
 | |
| 	}
 | |
| 	
 | |
| 	log.Printf("🔄 Processing %d DLQ items ready for retry", len(readyItems))
 | |
| 	
 | |
| 	for _, item := range readyItems {
 | |
| 		if rc.ctx.Err() != nil {
 | |
| 			break
 | |
| 		}
 | |
| 		
 | |
| 		// Check if circuit breaker allows retry
 | |
| 		if !rc.circuitBreaker.CanProceed() {
 | |
| 			log.Printf("⏸️ Circuit breaker open, skipping DLQ retry")
 | |
| 			break
 | |
| 		}
 | |
| 		
 | |
| 		// Attempt retry
 | |
| 		eventID := rc.deadLetterQueue.generateEventID(item.Event)
 | |
| 		
 | |
| 		_, err := rc.baseClient.CreateEvent(rc.ctx, item.Event)
 | |
| 		if err != nil {
 | |
| 			// Retry failed
 | |
| 			rc.circuitBreaker.RecordFailure()
 | |
| 			
 | |
| 			if markErr := rc.deadLetterQueue.MarkFailure(eventID, err.Error()); markErr != nil {
 | |
| 				log.Printf("❌ Failed to mark DLQ failure: %v", markErr)
 | |
| 			}
 | |
| 			
 | |
| 			rc.metricsMutex.Lock()
 | |
| 			rc.metrics.DLQRetryFailures++
 | |
| 			rc.metricsMutex.Unlock()
 | |
| 			
 | |
| 			log.Printf("❌ DLQ retry failed for %s: %v", eventID, err)
 | |
| 		} else {
 | |
| 			// Retry succeeded
 | |
| 			rc.circuitBreaker.RecordSuccess()
 | |
| 			
 | |
| 			if markErr := rc.deadLetterQueue.MarkSuccess(eventID); markErr != nil {
 | |
| 				log.Printf("❌ Failed to mark DLQ success: %v", markErr)
 | |
| 			}
 | |
| 			
 | |
| 			rc.metricsMutex.Lock()
 | |
| 			rc.metrics.DLQRetrySuccesses++
 | |
| 			rc.metricsMutex.Unlock()
 | |
| 			
 | |
| 			log.Printf("✅ DLQ retry succeeded for %s", eventID)
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // extractDiscussionID extracts discussion ID from event metadata for idempotency key generation
 | |
| func (rc *ReliableSlurpClient) extractDiscussionID(event SlurpEvent) string {
 | |
| 	if event.Metadata == nil {
 | |
| 		return "unknown"
 | |
| 	}
 | |
| 	
 | |
| 	if discussionID, exists := event.Metadata["discussion_id"]; exists {
 | |
| 		if id, ok := discussionID.(string); ok {
 | |
| 			return id
 | |
| 		}
 | |
| 	}
 | |
| 	
 | |
| 	// Fallback to event path if no discussion_id
 | |
| 	return event.Path
 | |
| }
 | |
| 
 | |
| // Close gracefully shuts down the reliable client
 | |
| func (rc *ReliableSlurpClient) Close() error {
 | |
| 	log.Printf("🛑 Shutting down reliable SLURP client...")
 | |
| 	
 | |
| 	// Cancel context to stop retry worker
 | |
| 	rc.cancel()
 | |
| 	
 | |
| 	// Wait for retry worker to finish
 | |
| 	rc.retryWorker.Wait()
 | |
| 	
 | |
| 	// Close base client
 | |
| 	return rc.baseClient.Close()
 | |
| } |