package integration import ( "context" "fmt" "math" "regexp" "strings" "sync" "time" "chorus.services/bzzz/pkg/config" "chorus.services/bzzz/pkg/ucxl" "chorus.services/bzzz/pubsub" "github.com/libp2p/go-libp2p/core/peer" ) // SlurpEventIntegrator manages the integration between HMMM discussions and SLURP events type SlurpEventIntegrator struct { config config.SlurpConfig client *SlurpClient pubsub *pubsub.PubSub eventMapping config.HmmmToSlurpMapping decisionPublisher *DecisionPublisher // Batch processing eventBatch []SlurpEvent batchMutex sync.Mutex batchTimer *time.Timer // Context and lifecycle ctx context.Context cancel context.CancelFunc // Statistics stats SlurpIntegrationStats statsMutex sync.RWMutex } // SlurpIntegrationStats tracks integration performance metrics type SlurpIntegrationStats struct { EventsGenerated int64 `json:"events_generated"` EventsSuccessful int64 `json:"events_successful"` EventsFailed int64 `json:"events_failed"` BatchesSent int64 `json:"batches_sent"` LastEventTime time.Time `json:"last_event_time"` LastSuccessTime time.Time `json:"last_success_time"` LastFailureTime time.Time `json:"last_failure_time"` LastFailureError string `json:"last_failure_error"` AverageResponseTime float64 `json:"average_response_time_ms"` } // HmmmDiscussionContext represents a HMMM discussion that can generate SLURP events type HmmmDiscussionContext struct { DiscussionID string `json:"discussion_id"` SessionID string `json:"session_id,omitempty"` Participants []string `json:"participants"` StartTime time.Time `json:"start_time"` EndTime time.Time `json:"end_time"` Messages []HmmmMessage `json:"messages"` ConsensusReached bool `json:"consensus_reached"` ConsensusStrength float64 `json:"consensus_strength"` OutcomeType string `json:"outcome_type"` ProjectPath string `json:"project_path"` RelatedTasks []string `json:"related_tasks,omitempty"` Metadata map[string]interface{} `json:"metadata,omitempty"` } // HmmmMessage represents a message in a HMMM discussion type HmmmMessage struct { From string `json:"from"` Content string `json:"content"` Type string `json:"type"` Timestamp time.Time `json:"timestamp"` Metadata map[string]interface{} `json:"metadata,omitempty"` } // NewSlurpEventIntegrator creates a new SLURP event integrator func NewSlurpEventIntegrator(ctx context.Context, slurpConfig config.SlurpConfig, ps *pubsub.PubSub, decisionPublisher *DecisionPublisher) (*SlurpEventIntegrator, error) { if !slurpConfig.Enabled { return nil, fmt.Errorf("SLURP integration is disabled in configuration") } client := NewSlurpClient(slurpConfig) // Test connection to SLURP if err := client.ValidateConnection(ctx); err != nil { return nil, fmt.Errorf("failed to connect to SLURP: %w", err) } integrationCtx, cancel := context.WithCancel(ctx) integrator := &SlurpEventIntegrator{ config: slurpConfig, client: client, pubsub: ps, eventMapping: config.GetHmmmToSlurpMapping(), decisionPublisher: decisionPublisher, eventBatch: make([]SlurpEvent, 0, slurpConfig.BatchProcessing.MaxBatchSize), ctx: integrationCtx, cancel: cancel, stats: SlurpIntegrationStats{}, } // Initialize batch processing if enabled if slurpConfig.BatchProcessing.Enabled { integrator.initBatchProcessing() } fmt.Printf("๐ŸŽฏ SLURP Event Integrator initialized for %s\n", slurpConfig.BaseURL) return integrator, nil } // ProcessHmmmDiscussion analyzes a HMMM discussion and generates appropriate SLURP events func (s *SlurpEventIntegrator) ProcessHmmmDiscussion(ctx context.Context, discussion HmmmDiscussionContext) error { s.statsMutex.Lock() s.stats.EventsGenerated++ s.stats.LastEventTime = time.Now() s.statsMutex.Unlock() // Validate discussion meets generation criteria if !s.shouldGenerateEvent(discussion) { fmt.Printf("๐Ÿ“Š Discussion %s does not meet event generation criteria\n", discussion.DiscussionID) return nil } // Determine event type from discussion eventType, confidence := s.determineEventType(discussion) if eventType == "" { fmt.Printf("๐Ÿ“Š Could not determine event type for discussion %s\n", discussion.DiscussionID) return nil } // Calculate severity severity := s.calculateSeverity(discussion, eventType) // Generate event content content := s.generateEventContent(discussion) // Generate UCXL address for this discussion ucxlAddr, err := s.generateUCXLAddress(discussion) if err != nil { fmt.Printf("โš ๏ธ Failed to generate UCXL address: %v", err) // Continue without UCXL address if generation fails } // Create SLURP event with UCXL enrichment slurpEvent := SlurpEvent{ EventType: eventType, Path: discussion.ProjectPath, Content: content, Severity: severity, CreatedBy: s.config.DefaultEventSettings.DefaultCreatedBy, Timestamp: time.Now(), Tags: append(s.config.DefaultEventSettings.DefaultTags, fmt.Sprintf("confidence-%.2f", confidence)), Metadata: map[string]interface{}{ "discussion_id": discussion.DiscussionID, "session_id": discussion.SessionID, "participants": discussion.Participants, "consensus_strength": discussion.ConsensusStrength, "discussion_duration": discussion.EndTime.Sub(discussion.StartTime).String(), "message_count": len(discussion.Messages), "outcome_type": discussion.OutcomeType, "generation_confidence": confidence, }, } // Add UCXL address components if successfully generated if ucxlAddr != nil { slurpEvent.Metadata["ucxl_reference"] = ucxlAddr.String() slurpEvent.Metadata["ucxl_agent"] = ucxlAddr.Agent slurpEvent.Metadata["ucxl_role"] = ucxlAddr.Role slurpEvent.Metadata["ucxl_project"] = ucxlAddr.Project slurpEvent.Metadata["ucxl_task"] = ucxlAddr.Task slurpEvent.Metadata["ucxl_temporal"] = ucxlAddr.TemporalSegment.String() if ucxlAddr.Path != "" { slurpEvent.Metadata["ucxl_path"] = ucxlAddr.Path } } // Add custom metadata from template for key, value := range s.config.DefaultEventSettings.MetadataTemplate { slurpEvent.Metadata[key] = value } // Add discussion-specific metadata for key, value := range discussion.Metadata { slurpEvent.Metadata[key] = value } // Publish decision to DHT if UCXL address was successfully generated and decision publisher is available if ucxlAddr != nil && s.decisionPublisher != nil && s.decisionPublisher.IsEnabled() { if s.shouldPublishDecision(eventType) { decision := s.createDecisionFromDiscussion(discussion, eventType, confidence) publishResult, err := s.decisionPublisher.PublishDecision(ctx, ucxlAddr, decision) if err != nil { log.Printf("โš ๏ธ Failed to publish decision to DHT: %v", err) } else if publishResult.Success { // Add DHT reference to event metadata slurpEvent.Metadata["decision_dht_hash"] = publishResult.DHTHash slurpEvent.Metadata["decision_published"] = true slurpEvent.Metadata["decision_published_at"] = publishResult.PublishedAt log.Printf("๐Ÿ“ค Decision published to DHT: %s", publishResult.DHTHash[:16]+"...") } } } // Send event (batch or immediate) if s.config.BatchProcessing.Enabled { return s.addToBatch(slurpEvent) } else { return s.sendImmediateEvent(ctx, slurpEvent, discussion.DiscussionID) } } // shouldGenerateEvent determines if a discussion meets the criteria for event generation func (s *SlurpEventIntegrator) shouldGenerateEvent(discussion HmmmDiscussionContext) bool { // Check minimum participants if len(discussion.Participants) < s.config.EventGeneration.MinParticipants { return false } // Check consensus strength if discussion.ConsensusStrength < s.config.EventGeneration.MinConsensusStrength { return false } // Check discussion duration duration := discussion.EndTime.Sub(discussion.StartTime) if duration < s.config.EventGeneration.MinDiscussionDuration { return false } if duration > s.config.EventGeneration.MaxDiscussionDuration { return false // Too long, might indicate stalled discussion } // Check if unanimity is required and achieved if s.config.EventGeneration.RequireUnanimity && discussion.ConsensusStrength < 1.0 { return false } return true } // determineEventType analyzes discussion content to determine SLURP event type func (s *SlurpEventIntegrator) determineEventType(discussion HmmmDiscussionContext) (string, float64) { // Combine all message content for analysis var allContent strings.Builder for _, msg := range discussion.Messages { allContent.WriteString(strings.ToLower(msg.Content)) allContent.WriteString(" ") } content := allContent.String() // Score each event type based on keyword matches scores := make(map[string]float64) scores["approval"] = s.scoreKeywordMatch(content, s.eventMapping.ApprovalKeywords) scores["warning"] = s.scoreKeywordMatch(content, s.eventMapping.WarningKeywords) scores["blocker"] = s.scoreKeywordMatch(content, s.eventMapping.BlockerKeywords) scores["priority_change"] = s.scoreKeywordMatch(content, s.eventMapping.PriorityKeywords) scores["access_update"] = s.scoreKeywordMatch(content, s.eventMapping.AccessKeywords) scores["structural_change"] = s.scoreKeywordMatch(content, s.eventMapping.StructuralKeywords) scores["announcement"] = s.scoreKeywordMatch(content, s.eventMapping.AnnouncementKeywords) // Find highest scoring event type var bestType string var bestScore float64 for eventType, score := range scores { if score > bestScore { bestType = eventType bestScore = score } } // Require minimum confidence threshold minConfidence := 0.3 if bestScore < minConfidence { return "", 0 } // Check if event type is enabled if s.isEventTypeDisabled(bestType) { return "", 0 } return bestType, bestScore } // scoreKeywordMatch calculates a score based on keyword frequency func (s *SlurpEventIntegrator) scoreKeywordMatch(content string, keywords []string) float64 { if len(keywords) == 0 { return 0 } matches := 0 for _, keyword := range keywords { if strings.Contains(content, strings.ToLower(keyword)) { matches++ } } return float64(matches) / float64(len(keywords)) } // isEventTypeDisabled checks if an event type is disabled in configuration func (s *SlurpEventIntegrator) isEventTypeDisabled(eventType string) bool { for _, disabled := range s.config.EventGeneration.DisabledEventTypes { if disabled == eventType { return true } } // Check if it's in enabled list (if specified) if len(s.config.EventGeneration.EnabledEventTypes) > 0 { for _, enabled := range s.config.EventGeneration.EnabledEventTypes { if enabled == eventType { return false } } return true // Not in enabled list } return false } // calculateSeverity determines event severity based on discussion characteristics func (s *SlurpEventIntegrator) calculateSeverity(discussion HmmmDiscussionContext, eventType string) int { // Start with base severity for event type baseSeverity := s.config.EventGeneration.SeverityRules.BaseSeverity[eventType] if baseSeverity == 0 { baseSeverity = s.config.DefaultEventSettings.DefaultSeverity } severity := float64(baseSeverity) // Apply participant multiplier participantBoost := float64(len(discussion.Participants)-1) * s.config.EventGeneration.SeverityRules.ParticipantMultiplier severity += participantBoost // Apply duration multiplier durationHours := discussion.EndTime.Sub(discussion.StartTime).Hours() durationBoost := durationHours * s.config.EventGeneration.SeverityRules.DurationMultiplier severity += durationBoost // Check for urgency keywords allContent := strings.ToLower(s.generateEventContent(discussion)) for _, keyword := range s.config.EventGeneration.SeverityRules.UrgencyKeywords { if strings.Contains(allContent, strings.ToLower(keyword)) { severity += float64(s.config.EventGeneration.SeverityRules.UrgencyBoost) break // Only apply once } } // Apply bounds finalSeverity := int(math.Round(severity)) if finalSeverity < s.config.EventGeneration.SeverityRules.MinSeverity { finalSeverity = s.config.EventGeneration.SeverityRules.MinSeverity } if finalSeverity > s.config.EventGeneration.SeverityRules.MaxSeverity { finalSeverity = s.config.EventGeneration.SeverityRules.MaxSeverity } return finalSeverity } // generateEventContent creates human-readable content for the SLURP event func (s *SlurpEventIntegrator) generateEventContent(discussion HmmmDiscussionContext) string { if discussion.OutcomeType != "" { return fmt.Sprintf("HMMM discussion reached consensus: %s (%d participants, %.1f%% agreement)", discussion.OutcomeType, len(discussion.Participants), discussion.ConsensusStrength*100) } return fmt.Sprintf("HMMM discussion completed with %d participants over %v", len(discussion.Participants), discussion.EndTime.Sub(discussion.StartTime).Round(time.Minute)) } // addToBatch adds an event to the batch for later processing func (s *SlurpEventIntegrator) addToBatch(event SlurpEvent) error { s.batchMutex.Lock() defer s.batchMutex.Unlock() s.eventBatch = append(s.eventBatch, event) // Check if batch is full if len(s.eventBatch) >= s.config.BatchProcessing.MaxBatchSize { return s.flushBatch() } // Reset batch timer if s.batchTimer != nil { s.batchTimer.Stop() } s.batchTimer = time.AfterFunc(s.config.BatchProcessing.MaxBatchWait, func() { s.batchMutex.Lock() defer s.batchMutex.Unlock() s.flushBatch() }) fmt.Printf("๐Ÿ“ฆ Added event to batch (%d/%d)\n", len(s.eventBatch), s.config.BatchProcessing.MaxBatchSize) return nil } // flushBatch sends all batched events to SLURP func (s *SlurpEventIntegrator) flushBatch() error { if len(s.eventBatch) == 0 { return nil } events := make([]SlurpEvent, len(s.eventBatch)) copy(events, s.eventBatch) s.eventBatch = s.eventBatch[:0] // Clear batch if s.batchTimer != nil { s.batchTimer.Stop() s.batchTimer = nil } fmt.Printf("๐Ÿš€ Flushing batch of %d events to SLURP\n", len(events)) start := time.Now() resp, err := s.client.CreateEventsBatch(s.ctx, events) duration := time.Since(start) s.statsMutex.Lock() s.stats.BatchesSent++ s.stats.AverageResponseTime = (s.stats.AverageResponseTime + duration.Seconds()*1000) / 2 if err != nil { s.stats.EventsFailed += int64(len(events)) s.stats.LastFailureTime = time.Now() s.stats.LastFailureError = err.Error() s.statsMutex.Unlock() // Publish failure notification s.publishSlurpEvent("slurp_batch_failed", map[string]interface{}{ "error": err.Error(), "event_count": len(events), "batch_id": fmt.Sprintf("batch_%d", time.Now().Unix()), }) return fmt.Errorf("failed to send batch: %w", err) } s.stats.EventsSuccessful += int64(resp.ProcessedCount) s.stats.EventsFailed += int64(resp.FailedCount) s.stats.LastSuccessTime = time.Now() s.statsMutex.Unlock() // Publish success notification s.publishSlurpEvent("slurp_batch_success", map[string]interface{}{ "processed_count": resp.ProcessedCount, "failed_count": resp.FailedCount, "event_ids": resp.EventIDs, "batch_id": fmt.Sprintf("batch_%d", time.Now().Unix()), }) fmt.Printf("โœ… Batch processed: %d succeeded, %d failed\n", resp.ProcessedCount, resp.FailedCount) return nil } // sendImmediateEvent sends a single event immediately to SLURP func (s *SlurpEventIntegrator) sendImmediateEvent(ctx context.Context, event SlurpEvent, discussionID string) error { start := time.Now() resp, err := s.client.CreateEvent(ctx, event) duration := time.Since(start) s.statsMutex.Lock() s.stats.AverageResponseTime = (s.stats.AverageResponseTime + duration.Seconds()*1000) / 2 if err != nil { s.stats.EventsFailed++ s.stats.LastFailureTime = time.Now() s.stats.LastFailureError = err.Error() s.statsMutex.Unlock() // Publish failure notification s.publishSlurpEvent("slurp_event_failed", map[string]interface{}{ "discussion_id": discussionID, "event_type": event.EventType, "error": err.Error(), }) return fmt.Errorf("failed to send event: %w", err) } s.stats.EventsSuccessful++ s.stats.LastSuccessTime = time.Now() s.statsMutex.Unlock() // Publish success notification s.publishSlurpEvent("slurp_event_success", map[string]interface{}{ "discussion_id": discussionID, "event_type": event.EventType, "event_id": resp.EventID, "severity": event.Severity, }) fmt.Printf("โœ… SLURP event created: %s (ID: %s)\n", event.EventType, resp.EventID) return nil } // publishSlurpEvent publishes a SLURP integration event to the pubsub system func (s *SlurpEventIntegrator) publishSlurpEvent(eventType string, data map[string]interface{}) { var msgType pubsub.MessageType switch eventType { case "slurp_event_success", "slurp_batch_success": msgType = pubsub.SlurpEventGenerated case "slurp_event_failed", "slurp_batch_failed": msgType = pubsub.SlurpEventAck default: msgType = pubsub.SlurpContextUpdate } data["timestamp"] = time.Now() data["integration_source"] = "hmmm-slurp-integrator" if err := s.pubsub.PublishHmmmMessage(msgType, data); err != nil { fmt.Printf("โŒ Failed to publish SLURP integration event: %v\n", err) } } // initBatchProcessing initializes batch processing components func (s *SlurpEventIntegrator) initBatchProcessing() { fmt.Printf("๐Ÿ“ฆ Batch processing enabled: max_size=%d, max_wait=%v\n", s.config.BatchProcessing.MaxBatchSize, s.config.BatchProcessing.MaxBatchWait) } // GetStats returns current integration statistics func (s *SlurpEventIntegrator) GetStats() SlurpIntegrationStats { s.statsMutex.RLock() defer s.statsMutex.RUnlock() return s.stats } // Close shuts down the integrator and flushes any pending events func (s *SlurpEventIntegrator) Close() error { s.cancel() // Flush any remaining batched events if s.config.BatchProcessing.Enabled && s.config.BatchProcessing.FlushOnShutdown { s.batchMutex.Lock() if len(s.eventBatch) > 0 { fmt.Printf("๐Ÿงน Flushing %d remaining events on shutdown\n", len(s.eventBatch)) s.flushBatch() } s.batchMutex.Unlock() } if s.batchTimer != nil { s.batchTimer.Stop() } return s.client.Close() } // generateUCXLAddress creates a UCXL address from HMMM discussion context func (s *SlurpEventIntegrator) generateUCXLAddress(discussion HmmmDiscussionContext) (*ucxl.Address, error) { // Extract components from discussion agent := s.extractAgentFromParticipants(discussion.Participants) role := s.extractRoleFromDiscussion(discussion) project := s.extractProjectFromPath(discussion.ProjectPath) task := s.extractTaskFromDiscussion(discussion) // Use latest temporal segment by default temporalSegment := "*^" // Build UCXL address string addressStr := fmt.Sprintf("ucxl://%s:%s@%s:%s/%s", agent, role, project, task, temporalSegment) // Add path if available if discussion.ProjectPath != "" { // Extract relative path for UCXL relativePath := s.extractRelativePath(discussion.ProjectPath) if relativePath != "" { addressStr += "/" + relativePath } } // Parse and validate the address return ucxl.Parse(addressStr) } // extractAgentFromParticipants determines the primary agent from participants func (s *SlurpEventIntegrator) extractAgentFromParticipants(participants []string) string { if len(participants) == 0 { return "any" } // Use the first participant as the primary agent, or "consensus" for multiple if len(participants) == 1 { return s.normalizeIdentifier(participants[0]) } return "consensus" } // extractRoleFromDiscussion determines the role from discussion context func (s *SlurpEventIntegrator) extractRoleFromDiscussion(discussion HmmmDiscussionContext) string { // Look for role hints in metadata if discussion.Metadata != nil { if role, exists := discussion.Metadata["primary_role"]; exists { if roleStr, ok := role.(string); ok { return s.normalizeIdentifier(roleStr) } } // Check for role-specific keywords in outcome type switch discussion.OutcomeType { case "architecture_decision": return "architect" case "security_review": return "security" case "code_review": return "developer" case "deployment_decision": return "ops" default: return "contributor" } } return "contributor" } // extractProjectFromPath extracts project name from project path func (s *SlurpEventIntegrator) extractProjectFromPath(projectPath string) string { if projectPath == "" { return "unknown" } // Split path and take the first segment as project parts := strings.Split(strings.Trim(projectPath, "/"), "/") if len(parts) > 0 && parts[0] != "" { return s.normalizeIdentifier(parts[0]) } return "unknown" } // extractTaskFromDiscussion determines task from discussion context func (s *SlurpEventIntegrator) extractTaskFromDiscussion(discussion HmmmDiscussionContext) string { // First check for explicit task in related tasks if len(discussion.RelatedTasks) > 0 { return s.normalizeIdentifier(discussion.RelatedTasks[0]) } // Check metadata for task information if discussion.Metadata != nil { if task, exists := discussion.Metadata["task_id"]; exists { if taskStr, ok := task.(string); ok { return s.normalizeIdentifier(taskStr) } } if feature, exists := discussion.Metadata["feature"]; exists { if featureStr, ok := feature.(string); ok { return s.normalizeIdentifier(featureStr) } } } // Fall back to discussion ID as task identifier if discussion.DiscussionID != "" { return s.normalizeIdentifier("discussion-" + discussion.DiscussionID) } return "general" } // extractRelativePath extracts relative path from project path for UCXL func (s *SlurpEventIntegrator) extractRelativePath(projectPath string) string { if projectPath == "" { return "" } // Remove leading slash and split trimmed := strings.Trim(projectPath, "/") parts := strings.Split(trimmed, "/") // If we have more than just the project name, join the rest as relative path if len(parts) > 1 { return strings.Join(parts[1:], "/") } return "" } // normalizeIdentifier normalizes identifiers for UCXL compliance func (s *SlurpEventIntegrator) normalizeIdentifier(identifier string) string { if identifier == "" { return "unknown" } // Convert to lowercase and replace invalid characters with underscores normalized := strings.ToLower(identifier) normalized = regexp.MustCompile(`[^a-zA-Z0-9_\-]`).ReplaceAllString(normalized, "_") // Ensure it doesn't start with a number or special character if !regexp.MustCompile(`^[a-zA-Z_]`).MatchString(normalized) { normalized = "id_" + normalized } // Truncate if too long (UCXL components should be reasonable length) if len(normalized) > 50 { normalized = normalized[:50] } return normalized } // shouldPublishDecision determines if an event type warrants decision publication func (s *SlurpEventIntegrator) shouldPublishDecision(eventType string) bool { // Only publish decisions for conclusive outcomes decisiveEventTypes := []string{ "approval", "blocker", "structural_change", "priority_change", "access_update", } for _, decisive := range decisiveEventTypes { if eventType == decisive { return true } } return false } // createDecisionFromDiscussion creates a Decision object from HMMM discussion context func (s *SlurpEventIntegrator) createDecisionFromDiscussion(discussion HmmmDiscussionContext, eventType string, confidence float64) *Decision { decision := &Decision{ Type: eventType, Content: s.generateEventContent(discussion), Participants: discussion.Participants, ConsensusLevel: discussion.ConsensusStrength, Timestamp: time.Now(), DiscussionID: discussion.DiscussionID, Confidence: confidence, Tags: []string{"hmmm-generated", "consensus-based", eventType}, Metadata: map[string]interface{}{ "session_id": discussion.SessionID, "discussion_duration": discussion.EndTime.Sub(discussion.StartTime).String(), "message_count": len(discussion.Messages), "outcome_type": discussion.OutcomeType, "project_path": discussion.ProjectPath, "related_tasks": discussion.RelatedTasks, "generation_source": "slurp-event-integrator", "generation_timestamp": time.Now(), }, } // Add discussion metadata to decision metadata if discussion.Metadata != nil { for key, value := range discussion.Metadata { decision.Metadata["discussion_"+key] = value } } // Set expiration for temporary decisions (warnings, announcements) if eventType == "warning" || eventType == "announcement" { expiration := time.Now().Add(30 * 24 * time.Hour) // 30 days decision.ExpiresAt = &expiration } return decision }