 9bdcbe0447
			
		
	
	9bdcbe0447
	
	
	
		
			
			Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
		
			
				
	
	
		
			236 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			236 lines
		
	
	
		
			6.7 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package hmmm_adapter
 | |
| 
 | |
| import (
 | |
|     "context"
 | |
|     "fmt"
 | |
|     "sync"
 | |
|     "time"
 | |
| )
 | |
| 
 | |
| // Joiner joins a pub/sub topic (ensure availability before publish).
 | |
| type Joiner func(topic string) error
 | |
| 
 | |
| // Publisher publishes a raw JSON payload to a topic.
 | |
| type Publisher func(topic string, payload []byte) error
 | |
| 
 | |
| // Adapter bridges CHORUS pub/sub to a RawPublisher-compatible interface.
 | |
| // It does not impose any message envelope so HMMM can publish raw JSON frames.
 | |
| // The adapter provides additional features like topic caching, metrics, and validation.
 | |
| type Adapter struct {
 | |
|     join    Joiner
 | |
|     publish Publisher
 | |
|     
 | |
|     // Topic join cache to avoid redundant joins
 | |
|     joinedTopics    map[string]bool
 | |
|     joinedTopicsMu  sync.RWMutex
 | |
|     
 | |
|     // Metrics tracking
 | |
|     publishCount    int64
 | |
|     joinCount       int64
 | |
|     errorCount      int64
 | |
|     metricsLock     sync.RWMutex
 | |
|     
 | |
|     // Configuration
 | |
|     maxPayloadSize  int
 | |
|     joinTimeout     time.Duration
 | |
|     publishTimeout  time.Duration
 | |
| }
 | |
| 
 | |
| // AdapterConfig holds configuration options for the Adapter
 | |
| type AdapterConfig struct {
 | |
|     MaxPayloadSize  int           `yaml:"max_payload_size"`
 | |
|     JoinTimeout     time.Duration `yaml:"join_timeout"`
 | |
|     PublishTimeout  time.Duration `yaml:"publish_timeout"`
 | |
| }
 | |
| 
 | |
| // DefaultAdapterConfig returns sensible defaults for the adapter
 | |
| func DefaultAdapterConfig() AdapterConfig {
 | |
|     return AdapterConfig{
 | |
|         MaxPayloadSize:  1024 * 1024, // 1MB max payload
 | |
|         JoinTimeout:     30 * time.Second,
 | |
|         PublishTimeout:  10 * time.Second,
 | |
|     }
 | |
| }
 | |
| 
 | |
| // NewAdapter constructs a new adapter with explicit join/publish hooks.
 | |
| // Wire these to CHORUS pubsub methods, e.g., JoinDynamicTopic and a thin PublishRaw helper.
 | |
| func NewAdapter(join Joiner, publish Publisher) *Adapter {
 | |
|     return NewAdapterWithConfig(join, publish, DefaultAdapterConfig())
 | |
| }
 | |
| 
 | |
| // NewAdapterWithConfig constructs a new adapter with custom configuration.
 | |
| func NewAdapterWithConfig(join Joiner, publish Publisher, config AdapterConfig) *Adapter {
 | |
|     return &Adapter{
 | |
|         join:            join,
 | |
|         publish:         publish,
 | |
|         joinedTopics:    make(map[string]bool),
 | |
|         maxPayloadSize:  config.MaxPayloadSize,
 | |
|         joinTimeout:     config.JoinTimeout,
 | |
|         publishTimeout:  config.PublishTimeout,
 | |
|     }
 | |
| }
 | |
| 
 | |
| // Publish ensures the topic is joined before sending a raw payload.
 | |
| // Includes validation, caching, metrics, and timeout handling.
 | |
| func (a *Adapter) Publish(ctx context.Context, topic string, payload []byte) error {
 | |
|     // Input validation
 | |
|     if topic == "" {
 | |
|         a.incrementErrorCount()
 | |
|         return fmt.Errorf("topic cannot be empty")
 | |
|     }
 | |
|     if len(payload) == 0 {
 | |
|         a.incrementErrorCount()
 | |
|         return fmt.Errorf("payload cannot be empty")
 | |
|     }
 | |
|     if len(payload) > a.maxPayloadSize {
 | |
|         a.incrementErrorCount()
 | |
|         return fmt.Errorf("payload size %d exceeds maximum %d bytes", len(payload), a.maxPayloadSize)
 | |
|     }
 | |
|     
 | |
|     // Check if we need to join the topic (with caching)
 | |
|     if !a.isTopicJoined(topic) {
 | |
|         joinCtx, cancel := context.WithTimeout(ctx, a.joinTimeout)
 | |
|         defer cancel()
 | |
|         
 | |
|         if err := a.joinTopic(joinCtx, topic); err != nil {
 | |
|             a.incrementErrorCount()
 | |
|             return fmt.Errorf("failed to join topic %s: %w", topic, err)
 | |
|         }
 | |
|     }
 | |
|     
 | |
|     // Publish with timeout
 | |
|     publishCtx, cancel := context.WithTimeout(ctx, a.publishTimeout)
 | |
|     defer cancel()
 | |
|     
 | |
|     done := make(chan error, 1)
 | |
|     go func() {
 | |
|         done <- a.publish(topic, payload)
 | |
|     }()
 | |
|     
 | |
|     select {
 | |
|     case err := <-done:
 | |
|         if err != nil {
 | |
|             a.incrementErrorCount()
 | |
|             return fmt.Errorf("failed to publish to topic %s: %w", topic, err)
 | |
|         }
 | |
|         a.incrementPublishCount()
 | |
|         return nil
 | |
|     case <-publishCtx.Done():
 | |
|         a.incrementErrorCount()
 | |
|         return fmt.Errorf("publish to topic %s timed out after %v", topic, a.publishTimeout)
 | |
|     }
 | |
| }
 | |
| 
 | |
| // isTopicJoined checks if a topic has already been joined (with caching)
 | |
| func (a *Adapter) isTopicJoined(topic string) bool {
 | |
|     a.joinedTopicsMu.RLock()
 | |
|     defer a.joinedTopicsMu.RUnlock()
 | |
|     return a.joinedTopics[topic]
 | |
| }
 | |
| 
 | |
| // joinTopic joins a topic and updates the cache
 | |
| func (a *Adapter) joinTopic(ctx context.Context, topic string) error {
 | |
|     // Double-check locking pattern to avoid redundant joins
 | |
|     if a.isTopicJoined(topic) {
 | |
|         return nil
 | |
|     }
 | |
|     
 | |
|     a.joinedTopicsMu.Lock()
 | |
|     defer a.joinedTopicsMu.Unlock()
 | |
|     
 | |
|     // Check again after acquiring write lock
 | |
|     if a.joinedTopics[topic] {
 | |
|         return nil
 | |
|     }
 | |
|     
 | |
|     // Execute join with context
 | |
|     done := make(chan error, 1)
 | |
|     go func() {
 | |
|         done <- a.join(topic)
 | |
|     }()
 | |
|     
 | |
|     select {
 | |
|     case err := <-done:
 | |
|         if err == nil {
 | |
|             a.joinedTopics[topic] = true
 | |
|             a.incrementJoinCount()
 | |
|         }
 | |
|         return err
 | |
|     case <-ctx.Done():
 | |
|         return ctx.Err()
 | |
|     }
 | |
| }
 | |
| 
 | |
| // GetMetrics returns current adapter metrics
 | |
| func (a *Adapter) GetMetrics() AdapterMetrics {
 | |
|     a.metricsLock.RLock()
 | |
|     defer a.metricsLock.RUnlock()
 | |
|     
 | |
|     return AdapterMetrics{
 | |
|         PublishCount: a.publishCount,
 | |
|         JoinCount:    a.joinCount,
 | |
|         ErrorCount:   a.errorCount,
 | |
|         JoinedTopics: len(a.joinedTopics),
 | |
|     }
 | |
| }
 | |
| 
 | |
| // AdapterMetrics holds metrics data for the adapter
 | |
| type AdapterMetrics struct {
 | |
|     PublishCount int64 `json:"publish_count"`
 | |
|     JoinCount    int64 `json:"join_count"`
 | |
|     ErrorCount   int64 `json:"error_count"`
 | |
|     JoinedTopics int   `json:"joined_topics"`
 | |
| }
 | |
| 
 | |
| // ResetMetrics resets all metrics counters (useful for testing)
 | |
| func (a *Adapter) ResetMetrics() {
 | |
|     a.metricsLock.Lock()
 | |
|     defer a.metricsLock.Unlock()
 | |
|     
 | |
|     a.publishCount = 0
 | |
|     a.joinCount = 0
 | |
|     a.errorCount = 0
 | |
| }
 | |
| 
 | |
| // ClearTopicCache clears the joined topics cache (useful for testing or reconnections)
 | |
| func (a *Adapter) ClearTopicCache() {
 | |
|     a.joinedTopicsMu.Lock()
 | |
|     defer a.joinedTopicsMu.Unlock()
 | |
|     
 | |
|     a.joinedTopics = make(map[string]bool)
 | |
| }
 | |
| 
 | |
| // GetJoinedTopics returns a list of currently joined topics
 | |
| func (a *Adapter) GetJoinedTopics() []string {
 | |
|     a.joinedTopicsMu.RLock()
 | |
|     defer a.joinedTopicsMu.RUnlock()
 | |
|     
 | |
|     topics := make([]string, 0, len(a.joinedTopics))
 | |
|     for topic := range a.joinedTopics {
 | |
|         topics = append(topics, topic)
 | |
|     }
 | |
|     return topics
 | |
| }
 | |
| 
 | |
| // incrementPublishCount safely increments the publish counter
 | |
| func (a *Adapter) incrementPublishCount() {
 | |
|     a.metricsLock.Lock()
 | |
|     a.publishCount++
 | |
|     a.metricsLock.Unlock()
 | |
| }
 | |
| 
 | |
| // incrementJoinCount safely increments the join counter
 | |
| func (a *Adapter) incrementJoinCount() {
 | |
|     a.metricsLock.Lock()
 | |
|     a.joinCount++
 | |
|     a.metricsLock.Unlock()
 | |
| }
 | |
| 
 | |
| // incrementErrorCount safely increments the error counter
 | |
| func (a *Adapter) incrementErrorCount() {
 | |
|     a.metricsLock.Lock()
 | |
|     a.errorCount++
 | |
|     a.metricsLock.Unlock()
 | |
| }
 | |
| 
 |