package integration import ( "context" "encoding/json" "fmt" "log" "sync" "time" "chorus.services/bzzz/pkg/config" ) // ReliableSlurpClient wraps SlurpClient with reliability features type ReliableSlurpClient struct { baseClient *SlurpClient circuitBreaker *CircuitBreaker idempotencyMgr *IdempotencyManager deadLetterQueue *DeadLetterQueue backoffStrategy *BackoffStrategy // Configuration config config.SlurpConfig // Background processing ctx context.Context cancel context.CancelFunc retryWorker sync.WaitGroup // Metrics metrics *ReliabilityMetrics metricsMutex sync.RWMutex } // ReliabilityMetrics tracks reliability-related metrics type ReliabilityMetrics struct { TotalEvents int64 `json:"total_events"` SuccessfulEvents int64 `json:"successful_events"` FailedEvents int64 `json:"failed_events"` DeduplicatedEvents int64 `json:"deduplicated_events"` CircuitBreakerTrips int64 `json:"circuit_breaker_trips"` DLQEnqueued int64 `json:"dlq_enqueued"` DLQRetrySuccesses int64 `json:"dlq_retry_successes"` DLQRetryFailures int64 `json:"dlq_retry_failures"` LastEventTime time.Time `json:"last_event_time"` LastSuccessTime time.Time `json:"last_success_time"` LastFailureTime time.Time `json:"last_failure_time"` } // NewReliableSlurpClient creates a new reliable SLURP client func NewReliableSlurpClient(ctx context.Context, slurpConfig config.SlurpConfig) (*ReliableSlurpClient, error) { if !slurpConfig.Enabled { return nil, fmt.Errorf("SLURP integration is disabled") } // Create base client baseClient := NewSlurpClient(slurpConfig) // Test connection if err := baseClient.ValidateConnection(ctx); err != nil { return nil, fmt.Errorf("failed to validate SLURP connection: %w", err) } // Initialize reliability components circuitBreaker := NewCircuitBreaker( slurpConfig.Reliability.MaxFailures, slurpConfig.Reliability.CooldownPeriod, slurpConfig.Reliability.HalfOpenTimeout, ) idempotencyMgr := NewIdempotencyManager(slurpConfig.Reliability.IdempotencyWindow) dlq, err := NewDeadLetterQueue( slurpConfig.Reliability.DLQDirectory, slurpConfig.Reliability.MaxRetries, ) if err != nil { return nil, fmt.Errorf("failed to initialize dead letter queue: %w", err) } backoffStrategy := NewBackoffStrategy( slurpConfig.Reliability.InitialBackoff, slurpConfig.Reliability.MaxBackoff, slurpConfig.Reliability.BackoffMultiplier, slurpConfig.Reliability.JitterFactor, ) clientCtx, cancel := context.WithCancel(ctx) client := &ReliableSlurpClient{ baseClient: baseClient, circuitBreaker: circuitBreaker, idempotencyMgr: idempotencyMgr, deadLetterQueue: dlq, backoffStrategy: backoffStrategy, config: slurpConfig, ctx: clientCtx, cancel: cancel, metrics: &ReliabilityMetrics{}, } // Start background retry worker client.startRetryWorker() log.Printf("πŸ›‘οΈ Reliable SLURP client initialized with circuit breaker and DLQ") return client, nil } // CreateEventReliably sends an event with full reliability features func (rc *ReliableSlurpClient) CreateEventReliably(ctx context.Context, event SlurpEvent) (*EventResponse, error) { rc.metricsMutex.Lock() rc.metrics.TotalEvents++ rc.metrics.LastEventTime = time.Now() rc.metricsMutex.Unlock() // Generate idempotency key idempotencyKey := rc.idempotencyMgr.GenerateKey( rc.extractDiscussionID(event), event.EventType, event.Timestamp, ) // Check if already processed if rc.idempotencyMgr.IsProcessed(idempotencyKey) { rc.metricsMutex.Lock() rc.metrics.DeduplicatedEvents++ rc.metricsMutex.Unlock() log.Printf("πŸ”„ Event deduplicated with key: %s", idempotencyKey) return &EventResponse{ Success: true, EventID: idempotencyKey, Message: "Event deduplicated", Timestamp: time.Now(), }, nil } // Check circuit breaker if !rc.circuitBreaker.CanProceed() { // Circuit is open, add to DLQ for later retry err := rc.deadLetterQueue.Enqueue(event, "Circuit breaker open") if err != nil { log.Printf("❌ Failed to enqueue event to DLQ: %v", err) } rc.metricsMutex.Lock() rc.metrics.DLQEnqueued++ rc.metricsMutex.Unlock() return nil, fmt.Errorf("circuit breaker is open, event queued for retry") } // Add idempotency header to event metadata if event.Metadata == nil { event.Metadata = make(map[string]interface{}) } event.Metadata["idempotency_key"] = idempotencyKey // Attempt to send event resp, err := rc.baseClient.CreateEvent(ctx, event) if err != nil { // Record failure in circuit breaker rc.circuitBreaker.RecordFailure() // Add to DLQ for retry if dlqErr := rc.deadLetterQueue.Enqueue(event, err.Error()); dlqErr != nil { log.Printf("❌ Failed to enqueue failed event to DLQ: %v", dlqErr) } else { rc.metricsMutex.Lock() rc.metrics.DLQEnqueued++ rc.metricsMutex.Unlock() } rc.metricsMutex.Lock() rc.metrics.FailedEvents++ rc.metrics.LastFailureTime = time.Now() rc.metricsMutex.Unlock() return nil, fmt.Errorf("failed to send event: %w", err) } // Success! Record in circuit breaker and idempotency manager rc.circuitBreaker.RecordSuccess() rc.idempotencyMgr.MarkProcessed(idempotencyKey) rc.metricsMutex.Lock() rc.metrics.SuccessfulEvents++ rc.metrics.LastSuccessTime = time.Now() rc.metricsMutex.Unlock() return resp, nil } // CreateEventsBatchReliably sends a batch of events with reliability features func (rc *ReliableSlurpClient) CreateEventsBatchReliably(ctx context.Context, events []SlurpEvent) (*BatchEventResponse, error) { rc.metricsMutex.Lock() rc.metrics.TotalEvents += int64(len(events)) rc.metrics.LastEventTime = time.Now() rc.metricsMutex.Unlock() // Check circuit breaker if !rc.circuitBreaker.CanProceed() { // Circuit is open, add all events to DLQ for _, event := range events { if err := rc.deadLetterQueue.Enqueue(event, "Circuit breaker open"); err != nil { log.Printf("❌ Failed to enqueue batch event to DLQ: %v", err) } } rc.metricsMutex.Lock() rc.metrics.DLQEnqueued += int64(len(events)) rc.metricsMutex.Unlock() return nil, fmt.Errorf("circuit breaker is open, %d events queued for retry", len(events)) } // Add idempotency keys to all events processedEvents := make([]SlurpEvent, 0, len(events)) deduplicatedCount := 0 for _, event := range events { idempotencyKey := rc.idempotencyMgr.GenerateKey( rc.extractDiscussionID(event), event.EventType, event.Timestamp, ) // Check if already processed if rc.idempotencyMgr.IsProcessed(idempotencyKey) { deduplicatedCount++ continue } // Add idempotency key to metadata if event.Metadata == nil { event.Metadata = make(map[string]interface{}) } event.Metadata["idempotency_key"] = idempotencyKey processedEvents = append(processedEvents, event) } if deduplicatedCount > 0 { rc.metricsMutex.Lock() rc.metrics.DeduplicatedEvents += int64(deduplicatedCount) rc.metricsMutex.Unlock() log.Printf("πŸ”„ Deduplicated %d events from batch", deduplicatedCount) } if len(processedEvents) == 0 { return &BatchEventResponse{ Success: true, ProcessedCount: 0, FailedCount: 0, Message: "All events were deduplicated", Timestamp: time.Now(), }, nil } // Attempt to send batch resp, err := rc.baseClient.CreateEventsBatch(ctx, processedEvents) if err != nil { // Record failure in circuit breaker rc.circuitBreaker.RecordFailure() // Add all events to DLQ for retry for _, event := range processedEvents { if dlqErr := rc.deadLetterQueue.Enqueue(event, err.Error()); dlqErr != nil { log.Printf("❌ Failed to enqueue batch event to DLQ: %v", dlqErr) } } rc.metricsMutex.Lock() rc.metrics.FailedEvents += int64(len(processedEvents)) rc.metrics.DLQEnqueued += int64(len(processedEvents)) rc.metrics.LastFailureTime = time.Now() rc.metricsMutex.Unlock() return nil, fmt.Errorf("failed to send batch: %w", err) } // Success! Record in circuit breaker and idempotency manager rc.circuitBreaker.RecordSuccess() // Mark all events as processed for _, event := range processedEvents { if idempotencyKey, exists := event.Metadata["idempotency_key"].(string); exists { rc.idempotencyMgr.MarkProcessed(idempotencyKey) } } rc.metricsMutex.Lock() rc.metrics.SuccessfulEvents += int64(resp.ProcessedCount) rc.metrics.FailedEvents += int64(resp.FailedCount) rc.metrics.LastSuccessTime = time.Now() rc.metricsMutex.Unlock() return resp, nil } // GetHealth checks the health of SLURP service and reliability components func (rc *ReliableSlurpClient) GetHealth(ctx context.Context) (*HealthResponse, error) { // Try base health check first health, err := rc.baseClient.GetHealth(ctx) if err != nil { rc.circuitBreaker.RecordFailure() return nil, err } rc.circuitBreaker.RecordSuccess() return health, nil } // GetReliabilityStats returns comprehensive reliability statistics func (rc *ReliableSlurpClient) GetReliabilityStats() map[string]interface{} { rc.metricsMutex.RLock() metrics := *rc.metrics rc.metricsMutex.RUnlock() stats := map[string]interface{}{ "metrics": metrics, "circuit_breaker": rc.circuitBreaker.GetStats(), "dead_letter_queue": rc.deadLetterQueue.GetStats(), } return stats } // startRetryWorker starts background worker to process DLQ items func (rc *ReliableSlurpClient) startRetryWorker() { rc.retryWorker.Add(1) go func() { defer rc.retryWorker.Done() ticker := time.NewTicker(rc.config.Reliability.RetryInterval) defer ticker.Stop() log.Printf("πŸ”„ DLQ retry worker started (interval: %v)", rc.config.Reliability.RetryInterval) for { select { case <-rc.ctx.Done(): log.Printf("πŸ›‘ DLQ retry worker stopping") return case <-ticker.C: rc.processDLQItems() } } }() } // processDLQItems processes items ready for retry from the DLQ func (rc *ReliableSlurpClient) processDLQItems() { readyItems := rc.deadLetterQueue.GetReadyItems() if len(readyItems) == 0 { return } log.Printf("πŸ”„ Processing %d DLQ items ready for retry", len(readyItems)) for _, item := range readyItems { if rc.ctx.Err() != nil { break } // Check if circuit breaker allows retry if !rc.circuitBreaker.CanProceed() { log.Printf("⏸️ Circuit breaker open, skipping DLQ retry") break } // Attempt retry eventID := rc.deadLetterQueue.generateEventID(item.Event) _, err := rc.baseClient.CreateEvent(rc.ctx, item.Event) if err != nil { // Retry failed rc.circuitBreaker.RecordFailure() if markErr := rc.deadLetterQueue.MarkFailure(eventID, err.Error()); markErr != nil { log.Printf("❌ Failed to mark DLQ failure: %v", markErr) } rc.metricsMutex.Lock() rc.metrics.DLQRetryFailures++ rc.metricsMutex.Unlock() log.Printf("❌ DLQ retry failed for %s: %v", eventID, err) } else { // Retry succeeded rc.circuitBreaker.RecordSuccess() if markErr := rc.deadLetterQueue.MarkSuccess(eventID); markErr != nil { log.Printf("❌ Failed to mark DLQ success: %v", markErr) } rc.metricsMutex.Lock() rc.metrics.DLQRetrySuccesses++ rc.metricsMutex.Unlock() log.Printf("βœ… DLQ retry succeeded for %s", eventID) } } } // extractDiscussionID extracts discussion ID from event metadata for idempotency key generation func (rc *ReliableSlurpClient) extractDiscussionID(event SlurpEvent) string { if event.Metadata == nil { return "unknown" } if discussionID, exists := event.Metadata["discussion_id"]; exists { if id, ok := discussionID.(string); ok { return id } } // Fallback to event path if no discussion_id return event.Path } // Close gracefully shuts down the reliable client func (rc *ReliableSlurpClient) Close() error { log.Printf("πŸ›‘ Shutting down reliable SLURP client...") // Cancel context to stop retry worker rc.cancel() // Wait for retry worker to finish rc.retryWorker.Wait() // Close base client return rc.baseClient.Close() }