Prepare for v2 development: Add MCP integration and future development planning

- Add FUTURE_DEVELOPMENT.md with comprehensive v2 protocol specification
- Add MCP integration design and implementation foundation
- Add infrastructure and deployment configurations
- Update system architecture for v2 evolution

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
anthonyrawlins
2025-08-07 14:38:22 +10:00
parent 5f94288fbb
commit 065dddf8d5
41 changed files with 14970 additions and 161 deletions

View File

@@ -0,0 +1,327 @@
package integration
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"strings"
"time"
"github.com/anthonyrawlins/bzzz/pkg/config"
)
// SlurpClient handles HTTP communication with SLURP endpoints
type SlurpClient struct {
baseURL string
apiKey string
timeout time.Duration
retryCount int
retryDelay time.Duration
httpClient *http.Client
}
// SlurpEvent represents a SLURP event structure
type SlurpEvent struct {
EventType string `json:"event_type"`
Path string `json:"path"`
Content string `json:"content"`
Severity int `json:"severity"`
CreatedBy string `json:"created_by"`
Metadata map[string]interface{} `json:"metadata"`
Tags []string `json:"tags,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// EventResponse represents the response from SLURP API
type EventResponse struct {
Success bool `json:"success"`
EventID string `json:"event_id,omitempty"`
Message string `json:"message,omitempty"`
Error string `json:"error,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// BatchEventRequest represents a batch of events to be sent to SLURP
type BatchEventRequest struct {
Events []SlurpEvent `json:"events"`
Source string `json:"source"`
}
// BatchEventResponse represents the response for batch event creation
type BatchEventResponse struct {
Success bool `json:"success"`
ProcessedCount int `json:"processed_count"`
FailedCount int `json:"failed_count"`
EventIDs []string `json:"event_ids,omitempty"`
Errors []string `json:"errors,omitempty"`
Message string `json:"message,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// HealthResponse represents SLURP service health status
type HealthResponse struct {
Status string `json:"status"`
Version string `json:"version,omitempty"`
Uptime string `json:"uptime,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// NewSlurpClient creates a new SLURP API client
func NewSlurpClient(config config.SlurpConfig) *SlurpClient {
return &SlurpClient{
baseURL: strings.TrimSuffix(config.BaseURL, "/"),
apiKey: config.APIKey,
timeout: config.Timeout,
retryCount: config.RetryCount,
retryDelay: config.RetryDelay,
httpClient: &http.Client{
Timeout: config.Timeout,
},
}
}
// CreateEvent sends a single event to SLURP
func (c *SlurpClient) CreateEvent(ctx context.Context, event SlurpEvent) (*EventResponse, error) {
url := fmt.Sprintf("%s/api/events", c.baseURL)
eventData, err := json.Marshal(event)
if err != nil {
return nil, fmt.Errorf("failed to marshal event: %w", err)
}
var lastErr error
for attempt := 0; attempt <= c.retryCount; attempt++ {
if attempt > 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(c.retryDelay):
}
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(eventData))
if err != nil {
lastErr = fmt.Errorf("failed to create request: %w", err)
continue
}
c.setHeaders(req)
resp, err := c.httpClient.Do(req)
if err != nil {
lastErr = fmt.Errorf("failed to send request: %w", err)
continue
}
defer resp.Body.Close()
if c.isRetryableStatus(resp.StatusCode) && attempt < c.retryCount {
lastErr = fmt.Errorf("retryable error: HTTP %d", resp.StatusCode)
continue
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read response body: %w", err)
}
var eventResp EventResponse
if err := json.Unmarshal(body, &eventResp); err != nil {
return nil, fmt.Errorf("failed to unmarshal response: %w", err)
}
if resp.StatusCode >= 400 {
return &eventResp, fmt.Errorf("SLURP API error (HTTP %d): %s", resp.StatusCode, eventResp.Error)
}
return &eventResp, nil
}
return nil, fmt.Errorf("failed after %d attempts: %w", c.retryCount+1, lastErr)
}
// CreateEventsBatch sends multiple events to SLURP in a single request
func (c *SlurpClient) CreateEventsBatch(ctx context.Context, events []SlurpEvent) (*BatchEventResponse, error) {
url := fmt.Sprintf("%s/api/events/batch", c.baseURL)
batchRequest := BatchEventRequest{
Events: events,
Source: "bzzz-hmmm-integration",
}
batchData, err := json.Marshal(batchRequest)
if err != nil {
return nil, fmt.Errorf("failed to marshal batch request: %w", err)
}
var lastErr error
for attempt := 0; attempt <= c.retryCount; attempt++ {
if attempt > 0 {
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-time.After(c.retryDelay):
}
}
req, err := http.NewRequestWithContext(ctx, "POST", url, bytes.NewBuffer(batchData))
if err != nil {
lastErr = fmt.Errorf("failed to create batch request: %w", err)
continue
}
c.setHeaders(req)
resp, err := c.httpClient.Do(req)
if err != nil {
lastErr = fmt.Errorf("failed to send batch request: %w", err)
continue
}
defer resp.Body.Close()
if c.isRetryableStatus(resp.StatusCode) && attempt < c.retryCount {
lastErr = fmt.Errorf("retryable error: HTTP %d", resp.StatusCode)
continue
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read batch response body: %w", err)
}
var batchResp BatchEventResponse
if err := json.Unmarshal(body, &batchResp); err != nil {
return nil, fmt.Errorf("failed to unmarshal batch response: %w", err)
}
if resp.StatusCode >= 400 {
return &batchResp, fmt.Errorf("SLURP batch API error (HTTP %d): %s", resp.StatusCode, batchResp.Message)
}
return &batchResp, nil
}
return nil, fmt.Errorf("batch failed after %d attempts: %w", c.retryCount+1, lastErr)
}
// GetHealth checks SLURP service health
func (c *SlurpClient) GetHealth(ctx context.Context) (*HealthResponse, error) {
url := fmt.Sprintf("%s/api/health", c.baseURL)
req, err := http.NewRequestWithContext(ctx, "GET", url, nil)
if err != nil {
return nil, fmt.Errorf("failed to create health request: %w", err)
}
c.setHeaders(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send health request: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read health response: %w", err)
}
var healthResp HealthResponse
if err := json.Unmarshal(body, &healthResp); err != nil {
return nil, fmt.Errorf("failed to unmarshal health response: %w", err)
}
if resp.StatusCode >= 400 {
return &healthResp, fmt.Errorf("SLURP health check failed (HTTP %d)", resp.StatusCode)
}
return &healthResp, nil
}
// QueryEvents retrieves events from SLURP based on filters
func (c *SlurpClient) QueryEvents(ctx context.Context, filters map[string]string) ([]SlurpEvent, error) {
baseURL := fmt.Sprintf("%s/api/events", c.baseURL)
// Build query parameters
params := url.Values{}
for key, value := range filters {
params.Add(key, value)
}
queryURL := baseURL
if len(params) > 0 {
queryURL = fmt.Sprintf("%s?%s", baseURL, params.Encode())
}
req, err := http.NewRequestWithContext(ctx, "GET", queryURL, nil)
if err != nil {
return nil, fmt.Errorf("failed to create query request: %w", err)
}
c.setHeaders(req)
resp, err := c.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send query request: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("failed to read query response: %w", err)
}
var events []SlurpEvent
if err := json.Unmarshal(body, &events); err != nil {
return nil, fmt.Errorf("failed to unmarshal events: %w", err)
}
if resp.StatusCode >= 400 {
return nil, fmt.Errorf("SLURP query failed (HTTP %d)", resp.StatusCode)
}
return events, nil
}
// setHeaders sets common HTTP headers for SLURP API requests
func (c *SlurpClient) setHeaders(req *http.Request) {
req.Header.Set("Content-Type", "application/json")
req.Header.Set("Accept", "application/json")
req.Header.Set("User-Agent", "Bzzz-HMMM-Integration/1.0")
if c.apiKey != "" {
req.Header.Set("Authorization", fmt.Sprintf("Bearer %s", c.apiKey))
}
}
// isRetryableStatus determines if an HTTP status code is retryable
func (c *SlurpClient) isRetryableStatus(statusCode int) bool {
switch statusCode {
case http.StatusTooManyRequests, // 429
http.StatusInternalServerError, // 500
http.StatusBadGateway, // 502
http.StatusServiceUnavailable, // 503
http.StatusGatewayTimeout: // 504
return true
default:
return false
}
}
// Close cleans up the client resources
func (c *SlurpClient) Close() error {
// HTTP client doesn't need explicit cleanup, but we can implement
// connection pooling cleanup if needed in the future
return nil
}
// ValidateConnection tests the connection to SLURP
func (c *SlurpClient) ValidateConnection(ctx context.Context) error {
_, err := c.GetHealth(ctx)
return err
}

View File

@@ -0,0 +1,519 @@
package integration
import (
"context"
"fmt"
"math"
"strings"
"sync"
"time"
"github.com/anthonyrawlins/bzzz/pkg/config"
"github.com/anthonyrawlins/bzzz/pubsub"
"github.com/libp2p/go-libp2p/core/peer"
)
// SlurpEventIntegrator manages the integration between HMMM discussions and SLURP events
type SlurpEventIntegrator struct {
config config.SlurpConfig
client *SlurpClient
pubsub *pubsub.PubSub
eventMapping config.HmmmToSlurpMapping
// Batch processing
eventBatch []SlurpEvent
batchMutex sync.Mutex
batchTimer *time.Timer
// Context and lifecycle
ctx context.Context
cancel context.CancelFunc
// Statistics
stats SlurpIntegrationStats
statsMutex sync.RWMutex
}
// SlurpIntegrationStats tracks integration performance metrics
type SlurpIntegrationStats struct {
EventsGenerated int64 `json:"events_generated"`
EventsSuccessful int64 `json:"events_successful"`
EventsFailed int64 `json:"events_failed"`
BatchesSent int64 `json:"batches_sent"`
LastEventTime time.Time `json:"last_event_time"`
LastSuccessTime time.Time `json:"last_success_time"`
LastFailureTime time.Time `json:"last_failure_time"`
LastFailureError string `json:"last_failure_error"`
AverageResponseTime float64 `json:"average_response_time_ms"`
}
// HmmmDiscussionContext represents a HMMM discussion that can generate SLURP events
type HmmmDiscussionContext struct {
DiscussionID string `json:"discussion_id"`
SessionID string `json:"session_id,omitempty"`
Participants []string `json:"participants"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Messages []HmmmMessage `json:"messages"`
ConsensusReached bool `json:"consensus_reached"`
ConsensusStrength float64 `json:"consensus_strength"`
OutcomeType string `json:"outcome_type"`
ProjectPath string `json:"project_path"`
RelatedTasks []string `json:"related_tasks,omitempty"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// HmmmMessage represents a message in a HMMM discussion
type HmmmMessage struct {
From string `json:"from"`
Content string `json:"content"`
Type string `json:"type"`
Timestamp time.Time `json:"timestamp"`
Metadata map[string]interface{} `json:"metadata,omitempty"`
}
// NewSlurpEventIntegrator creates a new SLURP event integrator
func NewSlurpEventIntegrator(ctx context.Context, slurpConfig config.SlurpConfig, ps *pubsub.PubSub) (*SlurpEventIntegrator, error) {
if !slurpConfig.Enabled {
return nil, fmt.Errorf("SLURP integration is disabled in configuration")
}
client := NewSlurpClient(slurpConfig)
// Test connection to SLURP
if err := client.ValidateConnection(ctx); err != nil {
return nil, fmt.Errorf("failed to connect to SLURP: %w", err)
}
integrationCtx, cancel := context.WithCancel(ctx)
integrator := &SlurpEventIntegrator{
config: slurpConfig,
client: client,
pubsub: ps,
eventMapping: config.GetHmmmToSlurpMapping(),
eventBatch: make([]SlurpEvent, 0, slurpConfig.BatchProcessing.MaxBatchSize),
ctx: integrationCtx,
cancel: cancel,
stats: SlurpIntegrationStats{},
}
// Initialize batch processing if enabled
if slurpConfig.BatchProcessing.Enabled {
integrator.initBatchProcessing()
}
fmt.Printf("🎯 SLURP Event Integrator initialized for %s\n", slurpConfig.BaseURL)
return integrator, nil
}
// ProcessHmmmDiscussion analyzes a HMMM discussion and generates appropriate SLURP events
func (s *SlurpEventIntegrator) ProcessHmmmDiscussion(ctx context.Context, discussion HmmmDiscussionContext) error {
s.statsMutex.Lock()
s.stats.EventsGenerated++
s.stats.LastEventTime = time.Now()
s.statsMutex.Unlock()
// Validate discussion meets generation criteria
if !s.shouldGenerateEvent(discussion) {
fmt.Printf("📊 Discussion %s does not meet event generation criteria\n", discussion.DiscussionID)
return nil
}
// Determine event type from discussion
eventType, confidence := s.determineEventType(discussion)
if eventType == "" {
fmt.Printf("📊 Could not determine event type for discussion %s\n", discussion.DiscussionID)
return nil
}
// Calculate severity
severity := s.calculateSeverity(discussion, eventType)
// Generate event content
content := s.generateEventContent(discussion)
// Create SLURP event
slurpEvent := SlurpEvent{
EventType: eventType,
Path: discussion.ProjectPath,
Content: content,
Severity: severity,
CreatedBy: s.config.DefaultEventSettings.DefaultCreatedBy,
Timestamp: time.Now(),
Tags: append(s.config.DefaultEventSettings.DefaultTags, fmt.Sprintf("confidence-%.2f", confidence)),
Metadata: map[string]interface{}{
"discussion_id": discussion.DiscussionID,
"session_id": discussion.SessionID,
"participants": discussion.Participants,
"consensus_strength": discussion.ConsensusStrength,
"discussion_duration": discussion.EndTime.Sub(discussion.StartTime).String(),
"message_count": len(discussion.Messages),
"outcome_type": discussion.OutcomeType,
"generation_confidence": confidence,
},
}
// Add custom metadata from template
for key, value := range s.config.DefaultEventSettings.MetadataTemplate {
slurpEvent.Metadata[key] = value
}
// Add discussion-specific metadata
for key, value := range discussion.Metadata {
slurpEvent.Metadata[key] = value
}
// Send event (batch or immediate)
if s.config.BatchProcessing.Enabled {
return s.addToBatch(slurpEvent)
} else {
return s.sendImmediateEvent(ctx, slurpEvent, discussion.DiscussionID)
}
}
// shouldGenerateEvent determines if a discussion meets the criteria for event generation
func (s *SlurpEventIntegrator) shouldGenerateEvent(discussion HmmmDiscussionContext) bool {
// Check minimum participants
if len(discussion.Participants) < s.config.EventGeneration.MinParticipants {
return false
}
// Check consensus strength
if discussion.ConsensusStrength < s.config.EventGeneration.MinConsensusStrength {
return false
}
// Check discussion duration
duration := discussion.EndTime.Sub(discussion.StartTime)
if duration < s.config.EventGeneration.MinDiscussionDuration {
return false
}
if duration > s.config.EventGeneration.MaxDiscussionDuration {
return false // Too long, might indicate stalled discussion
}
// Check if unanimity is required and achieved
if s.config.EventGeneration.RequireUnanimity && discussion.ConsensusStrength < 1.0 {
return false
}
return true
}
// determineEventType analyzes discussion content to determine SLURP event type
func (s *SlurpEventIntegrator) determineEventType(discussion HmmmDiscussionContext) (string, float64) {
// Combine all message content for analysis
var allContent strings.Builder
for _, msg := range discussion.Messages {
allContent.WriteString(strings.ToLower(msg.Content))
allContent.WriteString(" ")
}
content := allContent.String()
// Score each event type based on keyword matches
scores := make(map[string]float64)
scores["approval"] = s.scoreKeywordMatch(content, s.eventMapping.ApprovalKeywords)
scores["warning"] = s.scoreKeywordMatch(content, s.eventMapping.WarningKeywords)
scores["blocker"] = s.scoreKeywordMatch(content, s.eventMapping.BlockerKeywords)
scores["priority_change"] = s.scoreKeywordMatch(content, s.eventMapping.PriorityKeywords)
scores["access_update"] = s.scoreKeywordMatch(content, s.eventMapping.AccessKeywords)
scores["structural_change"] = s.scoreKeywordMatch(content, s.eventMapping.StructuralKeywords)
scores["announcement"] = s.scoreKeywordMatch(content, s.eventMapping.AnnouncementKeywords)
// Find highest scoring event type
var bestType string
var bestScore float64
for eventType, score := range scores {
if score > bestScore {
bestType = eventType
bestScore = score
}
}
// Require minimum confidence threshold
minConfidence := 0.3
if bestScore < minConfidence {
return "", 0
}
// Check if event type is enabled
if s.isEventTypeDisabled(bestType) {
return "", 0
}
return bestType, bestScore
}
// scoreKeywordMatch calculates a score based on keyword frequency
func (s *SlurpEventIntegrator) scoreKeywordMatch(content string, keywords []string) float64 {
if len(keywords) == 0 {
return 0
}
matches := 0
for _, keyword := range keywords {
if strings.Contains(content, strings.ToLower(keyword)) {
matches++
}
}
return float64(matches) / float64(len(keywords))
}
// isEventTypeDisabled checks if an event type is disabled in configuration
func (s *SlurpEventIntegrator) isEventTypeDisabled(eventType string) bool {
for _, disabled := range s.config.EventGeneration.DisabledEventTypes {
if disabled == eventType {
return true
}
}
// Check if it's in enabled list (if specified)
if len(s.config.EventGeneration.EnabledEventTypes) > 0 {
for _, enabled := range s.config.EventGeneration.EnabledEventTypes {
if enabled == eventType {
return false
}
}
return true // Not in enabled list
}
return false
}
// calculateSeverity determines event severity based on discussion characteristics
func (s *SlurpEventIntegrator) calculateSeverity(discussion HmmmDiscussionContext, eventType string) int {
// Start with base severity for event type
baseSeverity := s.config.EventGeneration.SeverityRules.BaseSeverity[eventType]
if baseSeverity == 0 {
baseSeverity = s.config.DefaultEventSettings.DefaultSeverity
}
severity := float64(baseSeverity)
// Apply participant multiplier
participantBoost := float64(len(discussion.Participants)-1) * s.config.EventGeneration.SeverityRules.ParticipantMultiplier
severity += participantBoost
// Apply duration multiplier
durationHours := discussion.EndTime.Sub(discussion.StartTime).Hours()
durationBoost := durationHours * s.config.EventGeneration.SeverityRules.DurationMultiplier
severity += durationBoost
// Check for urgency keywords
allContent := strings.ToLower(s.generateEventContent(discussion))
for _, keyword := range s.config.EventGeneration.SeverityRules.UrgencyKeywords {
if strings.Contains(allContent, strings.ToLower(keyword)) {
severity += float64(s.config.EventGeneration.SeverityRules.UrgencyBoost)
break // Only apply once
}
}
// Apply bounds
finalSeverity := int(math.Round(severity))
if finalSeverity < s.config.EventGeneration.SeverityRules.MinSeverity {
finalSeverity = s.config.EventGeneration.SeverityRules.MinSeverity
}
if finalSeverity > s.config.EventGeneration.SeverityRules.MaxSeverity {
finalSeverity = s.config.EventGeneration.SeverityRules.MaxSeverity
}
return finalSeverity
}
// generateEventContent creates human-readable content for the SLURP event
func (s *SlurpEventIntegrator) generateEventContent(discussion HmmmDiscussionContext) string {
if discussion.OutcomeType != "" {
return fmt.Sprintf("HMMM discussion reached consensus: %s (%d participants, %.1f%% agreement)",
discussion.OutcomeType,
len(discussion.Participants),
discussion.ConsensusStrength*100)
}
return fmt.Sprintf("HMMM discussion completed with %d participants over %v",
len(discussion.Participants),
discussion.EndTime.Sub(discussion.StartTime).Round(time.Minute))
}
// addToBatch adds an event to the batch for later processing
func (s *SlurpEventIntegrator) addToBatch(event SlurpEvent) error {
s.batchMutex.Lock()
defer s.batchMutex.Unlock()
s.eventBatch = append(s.eventBatch, event)
// Check if batch is full
if len(s.eventBatch) >= s.config.BatchProcessing.MaxBatchSize {
return s.flushBatch()
}
// Reset batch timer
if s.batchTimer != nil {
s.batchTimer.Stop()
}
s.batchTimer = time.AfterFunc(s.config.BatchProcessing.MaxBatchWait, func() {
s.batchMutex.Lock()
defer s.batchMutex.Unlock()
s.flushBatch()
})
fmt.Printf("📦 Added event to batch (%d/%d)\n", len(s.eventBatch), s.config.BatchProcessing.MaxBatchSize)
return nil
}
// flushBatch sends all batched events to SLURP
func (s *SlurpEventIntegrator) flushBatch() error {
if len(s.eventBatch) == 0 {
return nil
}
events := make([]SlurpEvent, len(s.eventBatch))
copy(events, s.eventBatch)
s.eventBatch = s.eventBatch[:0] // Clear batch
if s.batchTimer != nil {
s.batchTimer.Stop()
s.batchTimer = nil
}
fmt.Printf("🚀 Flushing batch of %d events to SLURP\n", len(events))
start := time.Now()
resp, err := s.client.CreateEventsBatch(s.ctx, events)
duration := time.Since(start)
s.statsMutex.Lock()
s.stats.BatchesSent++
s.stats.AverageResponseTime = (s.stats.AverageResponseTime + duration.Seconds()*1000) / 2
if err != nil {
s.stats.EventsFailed += int64(len(events))
s.stats.LastFailureTime = time.Now()
s.stats.LastFailureError = err.Error()
s.statsMutex.Unlock()
// Publish failure notification
s.publishSlurpEvent("slurp_batch_failed", map[string]interface{}{
"error": err.Error(),
"event_count": len(events),
"batch_id": fmt.Sprintf("batch_%d", time.Now().Unix()),
})
return fmt.Errorf("failed to send batch: %w", err)
}
s.stats.EventsSuccessful += int64(resp.ProcessedCount)
s.stats.EventsFailed += int64(resp.FailedCount)
s.stats.LastSuccessTime = time.Now()
s.statsMutex.Unlock()
// Publish success notification
s.publishSlurpEvent("slurp_batch_success", map[string]interface{}{
"processed_count": resp.ProcessedCount,
"failed_count": resp.FailedCount,
"event_ids": resp.EventIDs,
"batch_id": fmt.Sprintf("batch_%d", time.Now().Unix()),
})
fmt.Printf("✅ Batch processed: %d succeeded, %d failed\n", resp.ProcessedCount, resp.FailedCount)
return nil
}
// sendImmediateEvent sends a single event immediately to SLURP
func (s *SlurpEventIntegrator) sendImmediateEvent(ctx context.Context, event SlurpEvent, discussionID string) error {
start := time.Now()
resp, err := s.client.CreateEvent(ctx, event)
duration := time.Since(start)
s.statsMutex.Lock()
s.stats.AverageResponseTime = (s.stats.AverageResponseTime + duration.Seconds()*1000) / 2
if err != nil {
s.stats.EventsFailed++
s.stats.LastFailureTime = time.Now()
s.stats.LastFailureError = err.Error()
s.statsMutex.Unlock()
// Publish failure notification
s.publishSlurpEvent("slurp_event_failed", map[string]interface{}{
"discussion_id": discussionID,
"event_type": event.EventType,
"error": err.Error(),
})
return fmt.Errorf("failed to send event: %w", err)
}
s.stats.EventsSuccessful++
s.stats.LastSuccessTime = time.Now()
s.statsMutex.Unlock()
// Publish success notification
s.publishSlurpEvent("slurp_event_success", map[string]interface{}{
"discussion_id": discussionID,
"event_type": event.EventType,
"event_id": resp.EventID,
"severity": event.Severity,
})
fmt.Printf("✅ SLURP event created: %s (ID: %s)\n", event.EventType, resp.EventID)
return nil
}
// publishSlurpEvent publishes a SLURP integration event to the pubsub system
func (s *SlurpEventIntegrator) publishSlurpEvent(eventType string, data map[string]interface{}) {
var msgType pubsub.MessageType
switch eventType {
case "slurp_event_success", "slurp_batch_success":
msgType = pubsub.SlurpEventGenerated
case "slurp_event_failed", "slurp_batch_failed":
msgType = pubsub.SlurpEventAck
default:
msgType = pubsub.SlurpContextUpdate
}
data["timestamp"] = time.Now()
data["integration_source"] = "hmmm-slurp-integrator"
if err := s.pubsub.PublishHmmmMessage(msgType, data); err != nil {
fmt.Printf("❌ Failed to publish SLURP integration event: %v\n", err)
}
}
// initBatchProcessing initializes batch processing components
func (s *SlurpEventIntegrator) initBatchProcessing() {
fmt.Printf("📦 Batch processing enabled: max_size=%d, max_wait=%v\n",
s.config.BatchProcessing.MaxBatchSize,
s.config.BatchProcessing.MaxBatchWait)
}
// GetStats returns current integration statistics
func (s *SlurpEventIntegrator) GetStats() SlurpIntegrationStats {
s.statsMutex.RLock()
defer s.statsMutex.RUnlock()
return s.stats
}
// Close shuts down the integrator and flushes any pending events
func (s *SlurpEventIntegrator) Close() error {
s.cancel()
// Flush any remaining batched events
if s.config.BatchProcessing.Enabled && s.config.BatchProcessing.FlushOnShutdown {
s.batchMutex.Lock()
if len(s.eventBatch) > 0 {
fmt.Printf("🧹 Flushing %d remaining events on shutdown\n", len(s.eventBatch))
s.flushBatch()
}
s.batchMutex.Unlock()
}
if s.batchTimer != nil {
s.batchTimer.Stop()
}
return s.client.Close()
}