package hmmm_adapter import ( "context" "fmt" "sync" "time" ) // Joiner joins a pub/sub topic (ensure availability before publish). type Joiner func(topic string) error // Publisher publishes a raw JSON payload to a topic. type Publisher func(topic string, payload []byte) error // Adapter bridges CHORUS pub/sub to a RawPublisher-compatible interface. // It does not impose any message envelope so HMMM can publish raw JSON frames. // The adapter provides additional features like topic caching, metrics, and validation. type Adapter struct { join Joiner publish Publisher // Topic join cache to avoid redundant joins joinedTopics map[string]bool joinedTopicsMu sync.RWMutex // Metrics tracking publishCount int64 joinCount int64 errorCount int64 metricsLock sync.RWMutex // Configuration maxPayloadSize int joinTimeout time.Duration publishTimeout time.Duration } // AdapterConfig holds configuration options for the Adapter type AdapterConfig struct { MaxPayloadSize int `yaml:"max_payload_size"` JoinTimeout time.Duration `yaml:"join_timeout"` PublishTimeout time.Duration `yaml:"publish_timeout"` } // DefaultAdapterConfig returns sensible defaults for the adapter func DefaultAdapterConfig() AdapterConfig { return AdapterConfig{ MaxPayloadSize: 1024 * 1024, // 1MB max payload JoinTimeout: 30 * time.Second, PublishTimeout: 10 * time.Second, } } // NewAdapter constructs a new adapter with explicit join/publish hooks. // Wire these to CHORUS pubsub methods, e.g., JoinDynamicTopic and a thin PublishRaw helper. func NewAdapter(join Joiner, publish Publisher) *Adapter { return NewAdapterWithConfig(join, publish, DefaultAdapterConfig()) } // NewAdapterWithConfig constructs a new adapter with custom configuration. func NewAdapterWithConfig(join Joiner, publish Publisher, config AdapterConfig) *Adapter { return &Adapter{ join: join, publish: publish, joinedTopics: make(map[string]bool), maxPayloadSize: config.MaxPayloadSize, joinTimeout: config.JoinTimeout, publishTimeout: config.PublishTimeout, } } // Publish ensures the topic is joined before sending a raw payload. // Includes validation, caching, metrics, and timeout handling. func (a *Adapter) Publish(ctx context.Context, topic string, payload []byte) error { // Input validation if topic == "" { a.incrementErrorCount() return fmt.Errorf("topic cannot be empty") } if len(payload) == 0 { a.incrementErrorCount() return fmt.Errorf("payload cannot be empty") } if len(payload) > a.maxPayloadSize { a.incrementErrorCount() return fmt.Errorf("payload size %d exceeds maximum %d bytes", len(payload), a.maxPayloadSize) } // Check if we need to join the topic (with caching) if !a.isTopicJoined(topic) { joinCtx, cancel := context.WithTimeout(ctx, a.joinTimeout) defer cancel() if err := a.joinTopic(joinCtx, topic); err != nil { a.incrementErrorCount() return fmt.Errorf("failed to join topic %s: %w", topic, err) } } // Publish with timeout publishCtx, cancel := context.WithTimeout(ctx, a.publishTimeout) defer cancel() done := make(chan error, 1) go func() { done <- a.publish(topic, payload) }() select { case err := <-done: if err != nil { a.incrementErrorCount() return fmt.Errorf("failed to publish to topic %s: %w", topic, err) } a.incrementPublishCount() return nil case <-publishCtx.Done(): a.incrementErrorCount() return fmt.Errorf("publish to topic %s timed out after %v", topic, a.publishTimeout) } } // isTopicJoined checks if a topic has already been joined (with caching) func (a *Adapter) isTopicJoined(topic string) bool { a.joinedTopicsMu.RLock() defer a.joinedTopicsMu.RUnlock() return a.joinedTopics[topic] } // joinTopic joins a topic and updates the cache func (a *Adapter) joinTopic(ctx context.Context, topic string) error { // Double-check locking pattern to avoid redundant joins if a.isTopicJoined(topic) { return nil } a.joinedTopicsMu.Lock() defer a.joinedTopicsMu.Unlock() // Check again after acquiring write lock if a.joinedTopics[topic] { return nil } // Execute join with context done := make(chan error, 1) go func() { done <- a.join(topic) }() select { case err := <-done: if err == nil { a.joinedTopics[topic] = true a.incrementJoinCount() } return err case <-ctx.Done(): return ctx.Err() } } // GetMetrics returns current adapter metrics func (a *Adapter) GetMetrics() AdapterMetrics { a.metricsLock.RLock() defer a.metricsLock.RUnlock() return AdapterMetrics{ PublishCount: a.publishCount, JoinCount: a.joinCount, ErrorCount: a.errorCount, JoinedTopics: len(a.joinedTopics), } } // AdapterMetrics holds metrics data for the adapter type AdapterMetrics struct { PublishCount int64 `json:"publish_count"` JoinCount int64 `json:"join_count"` ErrorCount int64 `json:"error_count"` JoinedTopics int `json:"joined_topics"` } // ResetMetrics resets all metrics counters (useful for testing) func (a *Adapter) ResetMetrics() { a.metricsLock.Lock() defer a.metricsLock.Unlock() a.publishCount = 0 a.joinCount = 0 a.errorCount = 0 } // ClearTopicCache clears the joined topics cache (useful for testing or reconnections) func (a *Adapter) ClearTopicCache() { a.joinedTopicsMu.Lock() defer a.joinedTopicsMu.Unlock() a.joinedTopics = make(map[string]bool) } // GetJoinedTopics returns a list of currently joined topics func (a *Adapter) GetJoinedTopics() []string { a.joinedTopicsMu.RLock() defer a.joinedTopicsMu.RUnlock() topics := make([]string, 0, len(a.joinedTopics)) for topic := range a.joinedTopics { topics = append(topics, topic) } return topics } // incrementPublishCount safely increments the publish counter func (a *Adapter) incrementPublishCount() { a.metricsLock.Lock() a.publishCount++ a.metricsLock.Unlock() } // incrementJoinCount safely increments the join counter func (a *Adapter) incrementJoinCount() { a.metricsLock.Lock() a.joinCount++ a.metricsLock.Unlock() } // incrementErrorCount safely increments the error counter func (a *Adapter) incrementErrorCount() { a.metricsLock.Lock() a.errorCount++ a.metricsLock.Unlock() }