package hmmm_adapter import ( "context" "encoding/json" "sync" "testing" "time" "chorus.services/bzzz/p2p" "chorus.services/bzzz/pubsub" "chorus.services/hmmm/pkg/hmmm" ) // TestAdapterPubSubIntegration tests the complete integration between the adapter and BZZZ pubsub func TestAdapterPubSubIntegration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Create P2P node node, err := p2p.NewNode(ctx) if err != nil { t.Fatalf("Failed to create P2P node: %v", err) } defer node.Close() // Create PubSub system ps, err := pubsub.NewPubSub(ctx, node.Host(), "bzzz/test/coordination", "hmmm/test/meta-discussion") if err != nil { t.Fatalf("Failed to create PubSub: %v", err) } defer ps.Close() // Create adapter using actual BZZZ pubsub methods adapter := NewAdapter( ps.JoinDynamicTopic, ps.PublishRaw, ) // Test publishing to a per-issue topic topic := "bzzz/meta/issue/integration-test-42" testPayload := []byte(`{"version": 1, "type": "meta_msg", "issue_id": 42, "message": "Integration test message"}`) err = adapter.Publish(ctx, topic, testPayload) if err != nil { t.Fatalf("Failed to publish message: %v", err) } // Verify metrics metrics := adapter.GetMetrics() if metrics.PublishCount != 1 { t.Errorf("Expected publish count 1, got %d", metrics.PublishCount) } if metrics.JoinCount != 1 { t.Errorf("Expected join count 1, got %d", metrics.JoinCount) } if metrics.ErrorCount != 0 { t.Errorf("Expected error count 0, got %d", metrics.ErrorCount) } // Verify topic is cached joinedTopics := adapter.GetJoinedTopics() if len(joinedTopics) != 1 || joinedTopics[0] != topic { t.Errorf("Expected topic to be cached: got %v", joinedTopics) } // Test repeated publishing to same topic (should use cache) err = adapter.Publish(ctx, topic, []byte(`{"version": 1, "type": "meta_msg", "issue_id": 42, "message": "Second message"}`)) if err != nil { t.Fatalf("Failed to publish second message: %v", err) } // Verify join count didn't increase (cached) metrics = adapter.GetMetrics() if metrics.JoinCount != 1 { t.Errorf("Expected join count to remain 1 (cached), got %d", metrics.JoinCount) } if metrics.PublishCount != 2 { t.Errorf("Expected publish count 2, got %d", metrics.PublishCount) } } // TestHMMMRouterIntegration tests the adapter working with the HMMM Router func TestHMMMRouterIntegration(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Create P2P node node, err := p2p.NewNode(ctx) if err != nil { t.Fatalf("Failed to create P2P node: %v", err) } defer node.Close() // Create PubSub system ps, err := pubsub.NewPubSub(ctx, node.Host(), "bzzz/test/coordination", "hmmm/test/meta-discussion") if err != nil { t.Fatalf("Failed to create PubSub: %v", err) } defer ps.Close() // Create adapter adapter := NewAdapter( ps.JoinDynamicTopic, ps.PublishRaw, ) // Create HMMM Router using our adapter hmmmRouter := hmmm.NewRouter(adapter, hmmm.DefaultConfig()) // Create a valid HMMM message msg := hmmm.Message{ Version: 1, Type: "meta_msg", IssueID: 42, ThreadID: "test-thread-1", MsgID: "test-msg-1", NodeID: node.ID().String(), Author: "test-author", HopCount: 0, Timestamp: time.Now(), Message: "Test message from HMMM Router integration test", } // Publish through HMMM Router err = hmmmRouter.Publish(ctx, msg) if err != nil { t.Fatalf("Failed to publish via HMMM Router: %v", err) } // Verify adapter metrics were updated metrics := adapter.GetMetrics() if metrics.PublishCount != 1 { t.Errorf("Expected publish count 1, got %d", metrics.PublishCount) } if metrics.JoinCount != 1 { t.Errorf("Expected join count 1, got %d", metrics.JoinCount) } // Verify the expected topic was joined expectedTopic := hmmm.TopicForIssue(42) joinedTopics := adapter.GetJoinedTopics() if len(joinedTopics) != 1 || joinedTopics[0] != expectedTopic { t.Errorf("Expected topic %s to be joined, got %v", expectedTopic, joinedTopics) } } // TestPerIssueTopicPublishing tests publishing to multiple per-issue topics func TestPerIssueTopicPublishing(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Create P2P node node, err := p2p.NewNode(ctx) if err != nil { t.Fatalf("Failed to create P2P node: %v", err) } defer node.Close() // Create PubSub system ps, err := pubsub.NewPubSub(ctx, node.Host(), "bzzz/test/coordination", "hmmm/test/meta-discussion") if err != nil { t.Fatalf("Failed to create PubSub: %v", err) } defer ps.Close() // Create adapter adapter := NewAdapter( ps.JoinDynamicTopic, ps.PublishRaw, ) // Test publishing to multiple per-issue topics issueIDs := []int64{100, 101, 102, 103, 104} for _, issueID := range issueIDs { topic := hmmm.TopicForIssue(issueID) testMessage := map[string]interface{}{ "version": 1, "type": "meta_msg", "issue_id": issueID, "thread_id": "test-thread", "msg_id": "test-msg-" + string(rune(issueID)), "node_id": node.ID().String(), "hop_count": 0, "timestamp": time.Now().UTC(), "message": "Test message for issue " + string(rune(issueID)), } payload, err := json.Marshal(testMessage) if err != nil { t.Fatalf("Failed to marshal test message: %v", err) } err = adapter.Publish(ctx, topic, payload) if err != nil { t.Fatalf("Failed to publish to topic %s: %v", topic, err) } } // Verify all topics were joined metrics := adapter.GetMetrics() if metrics.JoinCount != int64(len(issueIDs)) { t.Errorf("Expected join count %d, got %d", len(issueIDs), metrics.JoinCount) } if metrics.PublishCount != int64(len(issueIDs)) { t.Errorf("Expected publish count %d, got %d", len(issueIDs), metrics.PublishCount) } joinedTopics := adapter.GetJoinedTopics() if len(joinedTopics) != len(issueIDs) { t.Errorf("Expected %d joined topics, got %d", len(issueIDs), len(joinedTopics)) } // Verify all expected topics are present expectedTopics := make(map[string]bool) for _, issueID := range issueIDs { expectedTopics[hmmm.TopicForIssue(issueID)] = true } for _, topic := range joinedTopics { if !expectedTopics[topic] { t.Errorf("Unexpected topic joined: %s", topic) } } } // TestConcurrentPerIssuePublishing tests concurrent publishing to multiple per-issue topics func TestConcurrentPerIssuePublishing(t *testing.T) { ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) defer cancel() // Create P2P node node, err := p2p.NewNode(ctx) if err != nil { t.Fatalf("Failed to create P2P node: %v", err) } defer node.Close() // Create PubSub system ps, err := pubsub.NewPubSub(ctx, node.Host(), "bzzz/test/coordination", "hmmm/test/meta-discussion") if err != nil { t.Fatalf("Failed to create PubSub: %v", err) } defer ps.Close() // Create adapter adapter := NewAdapter( ps.JoinDynamicTopic, ps.PublishRaw, ) // Test concurrent publishing const numGoroutines = 20 const numIssues = 5 var wg sync.WaitGroup wg.Add(numGoroutines) for i := 0; i < numGoroutines; i++ { go func(id int) { defer wg.Done() issueID := int64(200 + (id % numIssues)) // Distribute across 5 issues topic := hmmm.TopicForIssue(issueID) testMessage := map[string]interface{}{ "version": 1, "type": "meta_msg", "issue_id": issueID, "thread_id": "concurrent-test", "msg_id": string(rune(id)), "node_id": node.ID().String(), "hop_count": 0, "timestamp": time.Now().UTC(), "message": "Concurrent test message", } payload, err := json.Marshal(testMessage) if err != nil { t.Errorf("Failed to marshal message in goroutine %d: %v", id, err) return } err = adapter.Publish(ctx, topic, payload) if err != nil { t.Errorf("Failed to publish in goroutine %d: %v", id, err) } }(i) } wg.Wait() // Verify results metrics := adapter.GetMetrics() if metrics.PublishCount != numGoroutines { t.Errorf("Expected publish count %d, got %d", numGoroutines, metrics.PublishCount) } if metrics.JoinCount != numIssues { t.Errorf("Expected join count %d, got %d", numIssues, metrics.JoinCount) } if metrics.ErrorCount != 0 { t.Errorf("Expected error count 0, got %d", metrics.ErrorCount) } joinedTopics := adapter.GetJoinedTopics() if len(joinedTopics) != numIssues { t.Errorf("Expected %d unique topics joined, got %d", numIssues, len(joinedTopics)) } } // TestAdapterValidation tests input validation in integration scenario func TestAdapterValidation(t *testing.T) { ctx := context.Background() // Create P2P node node, err := p2p.NewNode(ctx) if err != nil { t.Fatalf("Failed to create P2P node: %v", err) } defer node.Close() // Create PubSub system ps, err := pubsub.NewPubSub(ctx, node.Host(), "bzzz/test/coordination", "hmmm/test/meta-discussion") if err != nil { t.Fatalf("Failed to create PubSub: %v", err) } defer ps.Close() // Create adapter with small payload limit for testing config := DefaultAdapterConfig() config.MaxPayloadSize = 100 // Small limit adapter := NewAdapterWithConfig( ps.JoinDynamicTopic, ps.PublishRaw, config, ) // Test empty topic err = adapter.Publish(ctx, "", []byte(`{"test": true}`)) if err == nil { t.Error("Expected error for empty topic") } // Test empty payload err = adapter.Publish(ctx, "test-topic", []byte{}) if err == nil { t.Error("Expected error for empty payload") } // Test payload too large largePayload := make([]byte, 200) // Larger than limit err = adapter.Publish(ctx, "test-topic", largePayload) if err == nil { t.Error("Expected error for payload too large") } // Verify all errors were tracked metrics := adapter.GetMetrics() if metrics.ErrorCount != 3 { t.Errorf("Expected error count 3, got %d", metrics.ErrorCount) } if metrics.PublishCount != 0 { t.Errorf("Expected publish count 0, got %d", metrics.PublishCount) } }