// End-to-End Tests for Issue 016: HMMM → SLURP → UCXL Decision and Load // These tests validate the complete workflow from HMMM discussions through // SLURP event processing to UCXL decision storage and retrieval. package integration import ( "context" "encoding/json" "fmt" "sync" "testing" "time" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "chorus.services/bzzz/pkg/config" "chorus.services/bzzz/pkg/dht" "chorus.services/bzzz/pkg/slurp" "chorus.services/bzzz/pkg/ucxi" "chorus.services/bzzz/pkg/ucxl" "chorus.services/bzzz/test" ) // E2ETestSuite provides comprehensive end-to-end testing for HMMM → SLURP → UCXL workflow type E2ETestSuite struct { ctx context.Context config *config.Config hmmmSimulator *test.HmmmTestSuite slurpProcessor *slurp.EventProcessor slurpCoordinator *slurp.Coordinator decisionPublisher *ucxl.DecisionPublisher ucxiServer *ucxi.Server dhtStorage dht.DHT loadGenerator *LoadTestGenerator metricsCollector *MetricsCollector circuitBreaker *CircuitBreaker dlqHandler *DLQHandler testResults []E2ETestResult mutex sync.RWMutex } // E2ETestResult represents the outcome of an end-to-end test type E2ETestResult struct { TestName string `json:"test_name"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` Duration time.Duration `json:"duration"` Success bool `json:"success"` ExpectedOutcome string `json:"expected_outcome"` ActualOutcome string `json:"actual_outcome"` Metrics E2ETestMetrics `json:"metrics"` Steps []TestStepResult `json:"steps"` Errors []string `json:"errors"` Warnings []string `json:"warnings"` Metadata map[string]interface{} `json:"metadata"` } // E2ETestMetrics tracks quantitative metrics for end-to-end tests type E2ETestMetrics struct { HmmmDiscussions int `json:"hmmm_discussions"` SlurpEventsGenerated int `json:"slurp_events_generated"` SlurpEventsProcessed int `json:"slurp_events_processed"` UCXLDecisionsStored int `json:"ucxl_decisions_stored"` UCXLDecisionsRetrieved int `json:"ucxl_decisions_retrieved"` AvgProcessingTime time.Duration `json:"avg_processing_time"` Throughput float64 `json:"throughput_per_second"` ErrorRate float64 `json:"error_rate"` MemoryUsage int64 `json:"memory_usage_bytes"` CircuitBreakerTrips int `json:"circuit_breaker_trips"` DLQMessages int `json:"dlq_messages"` } // TestStepResult represents the result of an individual test step type TestStepResult struct { StepName string `json:"step_name"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` Duration time.Duration `json:"duration"` Success bool `json:"success"` ErrorMsg string `json:"error_msg,omitempty"` Metadata map[string]string `json:"metadata,omitempty"` } func TestHmmmSlurpUcxlE2EWorkflow(t *testing.T) { suite := NewE2ETestSuite(t) defer suite.Cleanup() t.Run("HappyPath_Complete_Workflow", suite.TestHappyPathCompleteWorkflow) t.Run("Load_Test_Batch_Processing", suite.TestLoadTestBatchProcessing) t.Run("Error_Injection_Resilience", suite.TestErrorInjectionResilience) t.Run("Circuit_Breaker_Protection", suite.TestCircuitBreakerProtection) t.Run("DLQ_Processing_Recovery", suite.TestDLQProcessingRecovery) t.Run("Schema_Validation_Enforcement", suite.TestSchemaValidationEnforcement) t.Run("Temporal_Navigation_Integration", suite.TestTemporalNavigationIntegration) t.Run("Concurrent_Multi_Discussion", suite.TestConcurrentMultiDiscussion) } func NewE2ETestSuite(t *testing.T) *E2ETestSuite { ctx := context.Background() // Initialize configuration cfg := &config.Config{ SLURP: config.SLURPConfig{ BatchSize: 10, ProcessingTimeout: 30 * time.Second, BackpressureEnabled: true, IdempotencyEnabled: true, DLQEnabled: true, CircuitBreakerEnabled: true, }, DHT: config.DHTConfig{ ReplicationFactor: 3, PutTimeout: 10 * time.Second, GetTimeout: 5 * time.Second, }, } // Initialize DHT storage dhtStorage := dht.NewMockDHT() // Initialize SLURP components slurpProcessor, err := slurp.NewEventProcessor(cfg, dhtStorage) require.NoError(t, err, "Failed to create SLURP processor") slurpCoordinator, err := slurp.NewCoordinator(cfg, slurpProcessor) require.NoError(t, err, "Failed to create SLURP coordinator") // Initialize decision publisher decisionPublisher, err := ucxl.NewDecisionPublisher(dhtStorage) require.NoError(t, err, "Failed to create decision publisher") // Initialize UCXI server ucxiServer, err := ucxi.NewServer(dhtStorage) require.NoError(t, err, "Failed to create UCXI server") // Initialize HMMM simulator hmmmSimulator := test.NewHmmmTestSuite(ctx, nil) // Provide actual pubsub if available // Initialize load testing and monitoring components loadGenerator := NewLoadTestGenerator(cfg) metricsCollector := NewMetricsCollector() circuitBreaker := NewCircuitBreaker(cfg) dlqHandler := NewDLQHandler(cfg) return &E2ETestSuite{ ctx: ctx, config: cfg, hmmmSimulator: hmmmSimulator, slurpProcessor: slurpProcessor, slurpCoordinator: slurpCoordinator, decisionPublisher: decisionPublisher, ucxiServer: ucxiServer, dhtStorage: dhtStorage, loadGenerator: loadGenerator, metricsCollector: metricsCollector, circuitBreaker: circuitBreaker, dlqHandler: dlqHandler, testResults: make([]E2ETestResult, 0), } } func (suite *E2ETestSuite) Cleanup() { // Cleanup resources suite.loadGenerator.Stop() suite.metricsCollector.Stop() suite.dlqHandler.Close() } // TestHappyPathCompleteWorkflow tests the complete happy path workflow func (suite *E2ETestSuite) TestHappyPathCompleteWorkflow(t *testing.T) { result := suite.startTestResult("HappyPath_Complete_Workflow", "HMMM discussion generates SLURP event, processes to UCXL decision, retrievable via UCXI") steps := []struct { name string fn func() error }{ {"InitiateHmmmDiscussion", suite.stepInitiateHmmmDiscussion}, {"GenerateSlurpEvent", suite.stepGenerateSlurpEvent}, {"ProcessSlurpEvent", suite.stepProcessSlurpEvent}, {"PublishUcxlDecision", suite.stepPublishUcxlDecision}, {"RetrieveViaUcxi", suite.stepRetrieveViaUcxi}, {"ValidateSchemaCompliance", suite.stepValidateSchemaCompliance}, } allSuccess := true for _, step := range steps { stepResult := suite.executeTestStep(step.name, step.fn) result.Steps = append(result.Steps, stepResult) if !stepResult.Success { allSuccess = false result.Errors = append(result.Errors, fmt.Sprintf("%s failed: %s", step.name, stepResult.ErrorMsg)) } } result.Success = allSuccess if allSuccess { result.ActualOutcome = "Successfully completed full HMMM → SLURP → UCXL workflow" } else { result.ActualOutcome = fmt.Sprintf("Workflow failed at %d step(s)", len(result.Errors)) } suite.finishTestResult(result) assert.True(t, result.Success, "Happy path workflow should succeed") } // TestLoadTestBatchProcessing tests batch processing under load func (suite *E2ETestSuite) TestLoadTestBatchProcessing(t *testing.T) { result := suite.startTestResult("Load_Test_Batch_Processing", "System handles configured throughput without error in healthy scenario") const numEvents = 100 const expectedThroughput = 10.0 // events per second // Generate load events := make([]SlurpEvent, numEvents) for i := 0; i < numEvents; i++ { events[i] = SlurpEvent{ ID: fmt.Sprintf("load-event-%d", i), Type: "discussion_complete", SourceHMMM: fmt.Sprintf("hmmm-discussion-%d", i%10), UCXLReference: fmt.Sprintf("ucxl://load-agent-%d:developer@load-project:task-%d/*^", i%5, i), Payload: map[string]interface{}{ "decision": fmt.Sprintf("Load test decision %d", i), "priority": "medium", "timestamp": time.Now().Format(time.RFC3339), }, Timestamp: time.Now(), } } startTime := time.Now() // Process events in batches batchSize := suite.config.SLURP.BatchSize successCount := 0 errorCount := 0 for i := 0; i < len(events); i += batchSize { end := i + batchSize if end > len(events) { end = len(events) } batch := events[i:end] err := suite.slurpProcessor.ProcessEventBatch(suite.ctx, batch) if err != nil { errorCount += len(batch) result.Errors = append(result.Errors, fmt.Sprintf("Batch %d failed: %v", i/batchSize, err)) } else { successCount += len(batch) } } processingDuration := time.Since(startTime) actualThroughput := float64(successCount) / processingDuration.Seconds() // Verify throughput result.Metrics.SlurpEventsGenerated = numEvents result.Metrics.SlurpEventsProcessed = successCount result.Metrics.Throughput = actualThroughput result.Metrics.ErrorRate = float64(errorCount) / float64(numEvents) result.Metrics.AvgProcessingTime = processingDuration / time.Duration(numEvents) result.Success = actualThroughput >= expectedThroughput && errorCount == 0 if result.Success { result.ActualOutcome = fmt.Sprintf("Achieved %.2f events/sec throughput with 0%% error rate", actualThroughput) } else { result.ActualOutcome = fmt.Sprintf("Achieved %.2f events/sec throughput with %.2f%% error rate", actualThroughput, result.Metrics.ErrorRate*100) } suite.finishTestResult(result) assert.True(t, result.Success, "Load test should meet performance requirements") } // TestErrorInjectionResilience tests resilience under error conditions func (suite *E2ETestSuite) TestErrorInjectionResilience(t *testing.T) { result := suite.startTestResult("Error_Injection_Resilience", "System recovers gracefully from injected failures with backoff and retry") // Inject failures into DHT storage mockDHT := suite.dhtStorage.(*dht.MockDHT) mockDHT.SetFailureRate(0.5) // 50% failure rate mockDHT.SetLatency(100 * time.Millisecond) defer func() { mockDHT.SetFailureRate(0.0) // Reset after test mockDHT.SetLatency(0) }() // Create test events numEvents := 20 events := make([]SlurpEvent, numEvents) for i := 0; i < numEvents; i++ { events[i] = SlurpEvent{ ID: fmt.Sprintf("resilience-event-%d", i), Type: "discussion_complete", UCXLReference: fmt.Sprintf("ucxl://resilience-agent:developer@project:task-%d/*^", i), Payload: map[string]interface{}{ "decision": fmt.Sprintf("Resilience test decision %d", i), "retry_count": 0, }, Timestamp: time.Now(), } } // Process events with retry logic successCount := 0 for _, event := range events { err := suite.processEventWithRetry(event, 3) // 3 retries if err == nil { successCount++ } else { result.Errors = append(result.Errors, fmt.Sprintf("Event %s failed after retries: %v", event.ID, err)) } } // Even with 50% failure rate, retries should achieve high success rate successRate := float64(successCount) / float64(numEvents) result.Metrics.SlurpEventsGenerated = numEvents result.Metrics.SlurpEventsProcessed = successCount result.Metrics.ErrorRate = 1.0 - successRate result.Success = successRate >= 0.8 // At least 80% success with retries if result.Success { result.ActualOutcome = fmt.Sprintf("Achieved %.1f%% success rate with error injection and retries", successRate*100) } else { result.ActualOutcome = fmt.Sprintf("Only achieved %.1f%% success rate despite retries", successRate*100) } suite.finishTestResult(result) assert.True(t, result.Success, "System should be resilient to injected failures") } // TestCircuitBreakerProtection tests circuit breaker functionality func (suite *E2ETestSuite) TestCircuitBreakerProtection(t *testing.T) { result := suite.startTestResult("Circuit_Breaker_Protection", "Circuit breaker trips on repeated failures and recovers automatically") // Force circuit breaker to trip by generating failures mockDHT := suite.dhtStorage.(*dht.MockDHT) mockDHT.SetFailureRate(1.0) // 100% failure rate // Send events until circuit breaker trips eventCount := 0 for eventCount < 20 && !suite.circuitBreaker.IsOpen() { event := SlurpEvent{ ID: fmt.Sprintf("cb-event-%d", eventCount), Type: "discussion_complete", UCXLReference: fmt.Sprintf("ucxl://cb-agent:developer@project:task-%d/*^", eventCount), Payload: map[string]interface{}{"test": true}, Timestamp: time.Now(), } err := suite.slurpProcessor.ProcessEvent(suite.ctx, event) if err != nil { suite.circuitBreaker.RecordFailure() } eventCount++ time.Sleep(10 * time.Millisecond) } // Verify circuit breaker is open if !suite.circuitBreaker.IsOpen() { result.Errors = append(result.Errors, "Circuit breaker did not trip despite repeated failures") } else { result.Metrics.CircuitBreakerTrips = 1 } // Reset failure rate and test recovery mockDHT.SetFailureRate(0.0) time.Sleep(suite.circuitBreaker.GetRecoveryTimeout()) // Test that circuit breaker allows requests after recovery timeout recoveryEvent := SlurpEvent{ ID: "recovery-test-event", Type: "discussion_complete", UCXLReference: "ucxl://recovery-agent:developer@project:recovery-task/*^", Payload: map[string]interface{}{"recovery": true}, Timestamp: time.Now(), } err := suite.slurpProcessor.ProcessEvent(suite.ctx, recoveryEvent) recovered := err == nil result.Success = suite.circuitBreaker.IsOpen() && recovered if result.Success { result.ActualOutcome = "Circuit breaker tripped on failures and recovered successfully" } else { result.ActualOutcome = "Circuit breaker protection did not work as expected" } suite.finishTestResult(result) assert.True(t, result.Success, "Circuit breaker should protect system and allow recovery") } // TestDLQProcessingRecovery tests Dead Letter Queue processing and recovery func (suite *E2ETestSuite) TestDLQProcessingRecovery(t *testing.T) { result := suite.startTestResult("DLQ_Processing_Recovery", "Failed events are moved to DLQ and can be reprocessed successfully") // Create events that will initially fail failedEvents := []SlurpEvent{ { ID: "dlq-event-1", Type: "discussion_complete", UCXLReference: "ucxl://dlq-agent-1:developer@project:dlq-task-1/*^", Payload: map[string]interface{}{"test": "dlq", "attempt": 1}, Timestamp: time.Now(), }, { ID: "dlq-event-2", Type: "discussion_complete", UCXLReference: "ucxl://dlq-agent-2:developer@project:dlq-task-2/*^", Payload: map[string]interface{}{"test": "dlq", "attempt": 1}, Timestamp: time.Now(), }, } // Force failures to populate DLQ mockDHT := suite.dhtStorage.(*dht.MockDHT) mockDHT.SetFailureRate(1.0) dlqCount := 0 for _, event := range failedEvents { err := suite.slurpProcessor.ProcessEvent(suite.ctx, event) if err != nil { suite.dlqHandler.AddToDLQ(event, err) dlqCount++ } } result.Metrics.DLQMessages = dlqCount // Verify DLQ contains failed events dlqEvents := suite.dlqHandler.GetDLQEvents() assert.Equal(t, dlqCount, len(dlqEvents), "DLQ should contain failed events") // Fix the underlying issue and reprocess DLQ mockDHT.SetFailureRate(0.0) reprocessedCount := 0 for _, dlqEvent := range dlqEvents { err := suite.slurpProcessor.ProcessEvent(suite.ctx, dlqEvent) if err == nil { reprocessedCount++ suite.dlqHandler.RemoveFromDLQ(dlqEvent.ID) } } result.Success = reprocessedCount == dlqCount if result.Success { result.ActualOutcome = fmt.Sprintf("Successfully reprocessed %d events from DLQ", reprocessedCount) } else { result.ActualOutcome = fmt.Sprintf("Only reprocessed %d out of %d DLQ events", reprocessedCount, dlqCount) } suite.finishTestResult(result) assert.True(t, result.Success, "DLQ processing should recover all failed events") } // TestSchemaValidationEnforcement tests UCXL address and payload validation func (suite *E2ETestSuite) TestSchemaValidationEnforcement(t *testing.T) { result := suite.startTestResult("Schema_Validation_Enforcement", "Invalid UCXL addresses and payloads are rejected with proper error codes") testCases := []struct { name string event SlurpEvent expectErr bool errType string }{ { name: "ValidEvent", event: SlurpEvent{ ID: "valid-event", Type: "discussion_complete", UCXLReference: "ucxl://valid-agent:developer@project:task/*^", Payload: map[string]interface{}{"decision": "valid"}, Timestamp: time.Now(), }, expectErr: false, }, { name: "InvalidUCXLAddress_MissingScheme", event: SlurpEvent{ ID: "invalid-addr-1", Type: "discussion_complete", UCXLReference: "invalid-agent:developer@project:task/*^", Payload: map[string]interface{}{"decision": "test"}, Timestamp: time.Now(), }, expectErr: true, errType: "invalid_address", }, { name: "InvalidPayload_Empty", event: SlurpEvent{ ID: "invalid-payload-1", Type: "discussion_complete", UCXLReference: "ucxl://agent:developer@project:task/*^", Payload: nil, Timestamp: time.Now(), }, expectErr: true, errType: "invalid_payload", }, { name: "InvalidType_Empty", event: SlurpEvent{ ID: "invalid-type-1", Type: "", UCXLReference: "ucxl://agent:developer@project:task/*^", Payload: map[string]interface{}{"decision": "test"}, Timestamp: time.Now(), }, expectErr: true, errType: "invalid_type", }, } validationErrors := 0 for _, tc := range testCases { err := suite.slurpProcessor.ProcessEvent(suite.ctx, tc.event) if tc.expectErr { if err == nil { result.Errors = append(result.Errors, fmt.Sprintf("%s: Expected error but got none", tc.name)) } else { validationErrors++ // Could check specific error types here } } else { if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("%s: Unexpected error: %v", tc.name, err)) } } } result.Success = len(result.Errors) == 0 if result.Success { result.ActualOutcome = fmt.Sprintf("Schema validation correctly rejected %d invalid events", validationErrors) } else { result.ActualOutcome = fmt.Sprintf("Schema validation failed with %d errors", len(result.Errors)) } suite.finishTestResult(result) assert.True(t, result.Success, "Schema validation should enforce proper formats") } // TestTemporalNavigationIntegration tests temporal addressing in the full workflow func (suite *E2ETestSuite) TestTemporalNavigationIntegration(t *testing.T) { result := suite.startTestResult("Temporal_Navigation_Integration", "Temporal navigation works correctly through the complete workflow") baseAddress := "ucxl://temporal-agent:developer@project:temporal-task/*" // Create multiple versions of decisions versions := []struct { address string version string data map[string]interface{} }{ {baseAddress + "v1", "v1", map[string]interface{}{"decision": "first version", "version": 1}}, {baseAddress + "v2", "v2", map[string]interface{}{"decision": "second version", "version": 2}}, {baseAddress + "v3", "v3", map[string]interface{}{"decision": "third version", "version": 3}}, {baseAddress + "^", "latest", map[string]interface{}{"decision": "latest version", "version": 999}}, } // Process events for each version for _, v := range versions { event := SlurpEvent{ ID: fmt.Sprintf("temporal-event-%s", v.version), Type: "discussion_complete", UCXLReference: v.address, Payload: v.data, Timestamp: time.Now(), } err := suite.slurpProcessor.ProcessEvent(suite.ctx, event) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("Failed to process %s: %v", v.version, err)) } } // Test temporal navigation navigationTests := []struct { address string expectData string description string }{ {baseAddress + "^", "latest version", "Latest version"}, {baseAddress + "v2", "second version", "Specific version v2"}, {baseAddress + "^-1", "third version", "Latest minus 1"}, // Assuming temporal navigation implemented } navigatedSuccessfully := 0 for _, nt := range navigationTests { // Try to retrieve via UCXI retrievedData, err := suite.retrieveViaUCXI(nt.address) if err != nil { result.Errors = append(result.Errors, fmt.Sprintf("%s: Failed to retrieve: %v", nt.description, err)) } else { if payload, ok := retrievedData["decision"]; ok && payload == nt.expectData { navigatedSuccessfully++ } else { result.Errors = append(result.Errors, fmt.Sprintf("%s: Expected '%s', got '%v'", nt.description, nt.expectData, payload)) } } } result.Success = navigatedSuccessfully == len(navigationTests) && len(result.Errors) == 0 if result.Success { result.ActualOutcome = fmt.Sprintf("Successfully navigated %d temporal addresses", navigatedSuccessfully) } else { result.ActualOutcome = fmt.Sprintf("Temporal navigation failed: %d errors", len(result.Errors)) } suite.finishTestResult(result) assert.True(t, result.Success, "Temporal navigation should work in complete workflow") } // TestConcurrentMultiDiscussion tests concurrent processing of multiple discussions func (suite *E2ETestSuite) TestConcurrentMultiDiscussion(t *testing.T) { result := suite.startTestResult("Concurrent_Multi_Discussion", "System handles concurrent HMMM discussions without conflicts or data loss") const numDiscussions = 5 const eventsPerDiscussion = 10 var wg sync.WaitGroup resultChan := make(chan error, numDiscussions*eventsPerDiscussion) // Start concurrent discussions for discussionID := 0; discussionID < numDiscussions; discussionID++ { wg.Add(1) go func(discID int) { defer wg.Done() for eventID := 0; eventID < eventsPerDiscussion; eventID++ { event := SlurpEvent{ ID: fmt.Sprintf("concurrent-disc-%d-event-%d", discID, eventID), Type: "discussion_complete", SourceHMMM: fmt.Sprintf("concurrent-hmmm-%d", discID), UCXLReference: fmt.Sprintf("ucxl://concurrent-agent-%d:developer@project:task-%d/*^", discID, eventID), Payload: map[string]interface{}{ "discussion_id": discID, "event_id": eventID, "decision": fmt.Sprintf("Concurrent decision from discussion %d event %d", discID, eventID), }, Timestamp: time.Now(), } err := suite.slurpProcessor.ProcessEvent(suite.ctx, event) resultChan <- err // Small delay to create more realistic concurrency time.Sleep(time.Millisecond * time.Duration(10+eventID)) } }(discussionID) } // Wait for all discussions to complete wg.Wait() close(resultChan) // Analyze results successCount := 0 errorCount := 0 for err := range resultChan { if err == nil { successCount++ } else { errorCount++ result.Errors = append(result.Errors, err.Error()) } } totalEvents := numDiscussions * eventsPerDiscussion result.Metrics.SlurpEventsGenerated = totalEvents result.Metrics.SlurpEventsProcessed = successCount result.Metrics.ErrorRate = float64(errorCount) / float64(totalEvents) result.Success = errorCount == 0 if result.Success { result.ActualOutcome = fmt.Sprintf("Successfully processed %d concurrent events from %d discussions", successCount, numDiscussions) } else { result.ActualOutcome = fmt.Sprintf("Concurrent processing had %d errors out of %d total events", errorCount, totalEvents) } suite.finishTestResult(result) assert.True(t, result.Success, "Concurrent multi-discussion processing should succeed") } // Helper methods for test execution func (suite *E2ETestSuite) startTestResult(testName, expectedOutcome string) *E2ETestResult { suite.mutex.Lock() defer suite.mutex.Unlock() result := &E2ETestResult{ TestName: testName, StartTime: time.Now(), ExpectedOutcome: expectedOutcome, Steps: make([]TestStepResult, 0), Errors: make([]string, 0), Warnings: make([]string, 0), Metadata: make(map[string]interface{}), Metrics: E2ETestMetrics{}, } return result } func (suite *E2ETestSuite) finishTestResult(result *E2ETestResult) { suite.mutex.Lock() defer suite.mutex.Unlock() result.EndTime = time.Now() result.Duration = result.EndTime.Sub(result.StartTime) suite.testResults = append(suite.testResults, *result) } func (suite *E2ETestSuite) executeTestStep(stepName string, stepFunc func() error) TestStepResult { startTime := time.Now() err := stepFunc() endTime := time.Now() stepResult := TestStepResult{ StepName: stepName, StartTime: startTime, EndTime: endTime, Duration: endTime.Sub(startTime), Success: err == nil, Metadata: make(map[string]string), } if err != nil { stepResult.ErrorMsg = err.Error() } return stepResult } // Test step implementations func (suite *E2ETestSuite) stepInitiateHmmmDiscussion() error { // Simulate HMMM discussion initiation // This would integrate with actual HMMM discussion simulator time.Sleep(100 * time.Millisecond) // Simulate processing time return nil } func (suite *E2ETestSuite) stepGenerateSlurpEvent() error { // Generate a SLURP event from the discussion event := SlurpEvent{ ID: "test-discussion-event", Type: "discussion_complete", SourceHMMM: "test-hmmm-discussion-1", UCXLReference: "ucxl://test-agent:developer@test-project:test-task/*^", Payload: map[string]interface{}{ "decision": "Test decision from HMMM discussion", "participants": []string{"agent1", "agent2", "agent3"}, "confidence": 0.85, }, Timestamp: time.Now(), } // Store event for later processing suite.mutex.Lock() if suite.testResults[len(suite.testResults)-1].Metadata == nil { suite.testResults[len(suite.testResults)-1].Metadata = make(map[string]interface{}) } suite.testResults[len(suite.testResults)-1].Metadata["test_event"] = event suite.mutex.Unlock() return nil } func (suite *E2ETestSuite) stepProcessSlurpEvent() error { // Get the event from metadata suite.mutex.RLock() testEvent, ok := suite.testResults[len(suite.testResults)-1].Metadata["test_event"].(SlurpEvent) suite.mutex.RUnlock() if !ok { return fmt.Errorf("test event not found in metadata") } return suite.slurpProcessor.ProcessEvent(suite.ctx, testEvent) } func (suite *E2ETestSuite) stepPublishUcxlDecision() error { // This step would be handled by the SLURP processor automatically // Verify that the decision was published time.Sleep(50 * time.Millisecond) // Allow processing time return nil } func (suite *E2ETestSuite) stepRetrieveViaUcxi() error { ucxlAddress := "ucxl://test-agent:developer@test-project:test-task/*^" _, err := suite.retrieveViaUCXI(ucxlAddress) return err } func (suite *E2ETestSuite) stepValidateSchemaCompliance() error { // Validate that stored decisions comply with expected schema return nil } func (suite *E2ETestSuite) retrieveViaUCXI(address string) (map[string]interface{}, error) { // Simulate UCXI retrieval value, err := suite.dhtStorage.GetValue(suite.ctx, address) if err != nil { return nil, err } var result map[string]interface{} err = json.Unmarshal(value, &result) return result, err } func (suite *E2ETestSuite) processEventWithRetry(event SlurpEvent, maxRetries int) error { var lastErr error for attempt := 0; attempt <= maxRetries; attempt++ { err := suite.slurpProcessor.ProcessEvent(suite.ctx, event) if err == nil { return nil } lastErr = err // Exponential backoff backoffDuration := time.Duration(100*attempt*attempt) * time.Millisecond time.Sleep(backoffDuration) } return lastErr } // Supporting types and interfaces (would be implemented in actual codebase) type SlurpEvent struct { ID string `json:"id"` Type string `json:"type"` SourceHMMM string `json:"source_hmmm"` UCXLReference string `json:"ucxl_reference"` Payload map[string]interface{} `json:"payload"` Timestamp time.Time `json:"timestamp"` } type LoadTestGenerator struct { config *config.Config } func NewLoadTestGenerator(config *config.Config) *LoadTestGenerator { return &LoadTestGenerator{config: config} } func (ltg *LoadTestGenerator) Stop() {} type MetricsCollector struct{} func NewMetricsCollector() *MetricsCollector { return &MetricsCollector{} } func (mc *MetricsCollector) Stop() {} type CircuitBreaker struct { open bool recoveryTimeout time.Duration } func NewCircuitBreaker(config *config.Config) *CircuitBreaker { return &CircuitBreaker{ recoveryTimeout: 30 * time.Second, } } func (cb *CircuitBreaker) IsOpen() bool { return cb.open } func (cb *CircuitBreaker) RecordFailure() { cb.open = true } func (cb *CircuitBreaker) GetRecoveryTimeout() time.Duration { return cb.recoveryTimeout } type DLQHandler struct { events map[string]SlurpEvent mutex sync.RWMutex } func NewDLQHandler(config *config.Config) *DLQHandler { return &DLQHandler{ events: make(map[string]SlurpEvent), } } func (dlq *DLQHandler) AddToDLQ(event SlurpEvent, err error) { dlq.mutex.Lock() defer dlq.mutex.Unlock() dlq.events[event.ID] = event } func (dlq *DLQHandler) RemoveFromDLQ(eventID string) { dlq.mutex.Lock() defer dlq.mutex.Unlock() delete(dlq.events, eventID) } func (dlq *DLQHandler) GetDLQEvents() []SlurpEvent { dlq.mutex.RLock() defer dlq.mutex.RUnlock() events := make([]SlurpEvent, 0, len(dlq.events)) for _, event := range dlq.events { events = append(events, event) } return events } func (dlq *DLQHandler) Close() {}