diff --git a/src/api/api.go b/src/api/api.go index 970f367..5a9338a 100644 --- a/src/api/api.go +++ b/src/api/api.go @@ -3,9 +3,11 @@ package api import ( "encoding/json" "fmt" + "log" "net/http" "time" + "gitea.deepblack.cloud/chorus/bubble/backbeat" "gitea.deepblack.cloud/chorus/bubble/core" "gitea.deepblack.cloud/chorus/bubble/models" "gitea.deepblack.cloud/chorus/bubble/storage" @@ -13,21 +15,31 @@ import ( // Server holds the dependencies for the API server. type Server struct { - Store storage.Storage + Store storage.Storage + Backbeat *backbeat.BackbeatIntegration } // NewServer creates a new API server. func NewServer(store storage.Storage) *Server { - return &Server{Store: store} + return &Server{ + Store: store, + } +} + +// SetBackbeatIntegration adds BACKBEAT timing integration to the server +func (s *Server) SetBackbeatIntegration(bb *backbeat.BackbeatIntegration) { + s.Backbeat = bb } // Start begins listening for HTTP requests. func (s *Server) Start(addr string) error { http.HandleFunc("/decision/bundle", s.handleDecisionBundle) + http.HandleFunc("/backbeat/metrics", s.handleBackbeatMetrics) + http.HandleFunc("/backbeat/operations", s.handleBackbeatOperations) return http.ListenAndServe(addr, nil) } -// handleDecisionBundle is the handler for the /decision/bundle endpoint. +// handleDecisionBundle is the handler for the /decision/bundle endpoint with BACKBEAT timing. func (s *Server) handleDecisionBundle(w http.ResponseWriter, r *http.Request) { if r.Method != http.MethodPost { http.Error(w, "Only POST method is allowed", http.StatusMethodNotAllowed) @@ -45,43 +57,328 @@ func (s *Server) handleDecisionBundle(w http.ResponseWriter, r *http.Request) { return } + // Generate operation ID for BACKBEAT tracking + operationID := fmt.Sprintf("bundle-%d", time.Now().UnixNano()) + + // Estimate beats based on complexity + estimatedBeats := s.estimateBundleComplexity(req) + + // Start BACKBEAT tracking if available + if s.Backbeat != nil { + operationData := map[string]interface{}{ + "start_id": req.StartID, + "role": req.Role, + "max_hops": req.MaxHops, + "top_k": req.TopK, + "query": req.Query, + } + + if err := s.Backbeat.StartDecisionOperation(operationID, backbeat.DecisionBundleGeneration, estimatedBeats, operationData); err != nil { + log.Printf("āš ļø Failed to start BACKBEAT tracking: %v", err) + } + } + // --- Core Logic --- - // Use StartID for now. Query-based anchor resolution will be added later. + // Phase 1: Node resolution and validation + if s.Backbeat != nil { + s.Backbeat.UpdateDecisionPhase(operationID, backbeat.PhaseTraversing) + } + startNode, err := s.Store.GetDecisionMetadata(req.StartID) if err != nil { + s.completeOperation(operationID, false, map[string]interface{}{"error": "failed to get start node", "details": err.Error()}) http.Error(w, fmt.Sprintf("Failed to get start node: %v", err), http.StatusInternalServerError) return } if startNode == nil { + s.completeOperation(operationID, false, map[string]interface{}{"error": "start node not found"}) http.Error(w, "Start node not found", http.StatusNotFound) return } - // Perform the provenance walk. - timeline, err := core.WalkBack(s.Store, req.StartID, req.MaxHops, req.Role, req.TopK) + // Phase 2: Perform the provenance walk with scoring + if s.Backbeat != nil { + s.Backbeat.UpdateDecisionPhase(operationID, backbeat.PhaseScoring) + } + + timeline, err := core.WalkBack(s.Store, req.StartID, req.Query, req.Role, req.MaxHops, req.TopK) if err != nil { + s.completeOperation(operationID, false, map[string]interface{}{"error": "walkback failed", "details": err.Error()}) http.Error(w, fmt.Sprintf("Failed to walk provenance graph: %v", err), http.StatusInternalServerError) return } - // Assemble the response bundle. - // This is a simplified version of the logic in the blueprint. + // Phase 3: Generate summary and assemble bundle + if s.Backbeat != nil { + s.Backbeat.UpdateDecisionPhase(operationID, backbeat.PhaseSummarizing) + } + + // Assemble the response bundle with enhanced metadata response := models.DecisionBundleResponse{ - BundleID: fmt.Sprintf("bundle:%s", req.StartID), // Simplified ID + BundleID: fmt.Sprintf("bundle:%s", req.StartID), StartID: req.StartID, GeneratedAt: time.Now().UTC().Format(time.RFC3339), - Summary: fmt.Sprintf("Decision bundle for %s, found %d ancestors.", req.StartID, len(timeline)), + Summary: fmt.Sprintf("Decision bundle for %s, found %d related decisions through %d-hop traversal.", req.StartID, len(timeline), req.MaxHops), Timeline: timeline, - ConstraintsSummary: []string{}, // Placeholder - KeyEvidenceRefs: []string{}, // Placeholder - GoalAlignment: models.GoalAlignment{}, // Placeholder - SuggestedActions: []models.SuggestedAction{}, // Placeholder - Escalation: models.Escalation{}, // Placeholder + ConstraintsSummary: s.generateConstraintsSummary(timeline), + KeyEvidenceRefs: s.extractEvidenceRefs(timeline), + GoalAlignment: s.calculateGoalAlignment(timeline, req.Query), + SuggestedActions: s.generateSuggestedActions(timeline, req.Role), + Escalation: s.checkEscalationNeeded(timeline), CacheHit: false, // Caching not yet implemented } + // Complete BACKBEAT tracking + resultData := map[string]interface{}{ + "bundle_id": response.BundleID, + "decisions_found": len(timeline), + "start_node_id": req.StartID, + "role": req.Role, + "max_hops": req.MaxHops, + } + + s.completeOperation(operationID, true, resultData) + w.Header().Set("Content-Type", "application/json") if err := json.NewEncoder(w).Encode(response); err != nil { http.Error(w, err.Error(), http.StatusInternalServerError) } +} + +// completeOperation completes BACKBEAT tracking if available +func (s *Server) completeOperation(operationID string, success bool, resultData map[string]interface{}) { + if s.Backbeat != nil { + if _, err := s.Backbeat.CompleteDecisionOperation(operationID, success, resultData); err != nil { + log.Printf("āš ļø Failed to complete BACKBEAT operation: %v", err) + } + } +} + +// estimateBundleComplexity estimates the number of beats required based on request complexity +func (s *Server) estimateBundleComplexity(req models.DecisionBundleRequest) int { + baseBeats := 2 // Base complexity for simple operations + + // Add beats based on max hops (traversal complexity) + if req.MaxHops > 5 { + baseBeats += (req.MaxHops - 5) / 2 + } + + // Add beats based on top-K (scoring complexity) + if req.TopK > 10 { + baseBeats += (req.TopK - 10) / 5 + } + + // Add beats for query processing if present + if req.Query != "" { + baseBeats += 1 + } + + // Add beats for full decision record inclusion + if req.IncludeFullDR { + baseBeats += 2 + } + + return baseBeats +} + +// generateConstraintsSummary creates a summary of constraints from the decision timeline +func (s *Server) generateConstraintsSummary(timeline []models.DecisionRecordSummary) []string { + constraints := make([]string, 0) + + for _, decision := range timeline { + if len(decision.Tags) > 0 { + for _, tag := range decision.Tags { + if tag == "constraint" || tag == "requirement" || tag == "policy" { + constraint := fmt.Sprintf("From %s: %s", decision.ID, decision.Statement) + constraints = append(constraints, constraint) + } + } + } + } + + if len(constraints) == 0 { + constraints = append(constraints, "No explicit constraints identified in decision timeline") + } + + return constraints +} + +// extractEvidenceRefs extracts key evidence references from the timeline +func (s *Server) extractEvidenceRefs(timeline []models.DecisionRecordSummary) []string { + evidenceRefs := make([]string, 0) + + for _, decision := range timeline { + if len(decision.EvidenceRefs) > 0 { + evidenceRefs = append(evidenceRefs, decision.EvidenceRefs...) + } + } + + if len(evidenceRefs) == 0 { + evidenceRefs = append(evidenceRefs, "No evidence references found in timeline") + } + + return evidenceRefs +} + +// calculateGoalAlignment analyzes how well decisions align with stated goals +func (s *Server) calculateGoalAlignment(timeline []models.DecisionRecordSummary, query string) models.GoalAlignment { + if len(timeline) == 0 { + return models.GoalAlignment{ + Score: 0.0, + Reasons: []string{"No decisions found for alignment analysis"}, + } + } + + // Simple scoring based on decision lifecycle states + activeDecisions := 0 + completedDecisions := 0 + + for _, decision := range timeline { + switch decision.LifecycleState { + case "active", "implementing": + activeDecisions++ + case "completed", "validated": + completedDecisions++ + } + } + + // Calculate alignment score based on decision maturity + score := float64(completedDecisions) / float64(len(timeline)) * 0.7 + score += float64(activeDecisions) / float64(len(timeline)) * 0.3 + + reasons := []string{ + fmt.Sprintf("Found %d completed decisions out of %d total", completedDecisions, len(timeline)), + fmt.Sprintf("Found %d active decisions indicating ongoing progress", activeDecisions), + } + + if query != "" { + reasons = append(reasons, fmt.Sprintf("Analysis filtered for query: '%s'", query)) + } + + return models.GoalAlignment{ + Score: score, + Reasons: reasons, + } +} + +// generateSuggestedActions creates recommended next steps based on the decision timeline +func (s *Server) generateSuggestedActions(timeline []models.DecisionRecordSummary, role string) []models.SuggestedAction { + actions := make([]models.SuggestedAction, 0) + + if len(timeline) == 0 { + actions = append(actions, models.SuggestedAction{ + Type: "investigate", + Description: "No related decisions found - investigate if this is a new decision domain", + Assignee: role, + Confidence: 0.8, + }) + return actions + } + + // Analyze decision states to suggest actions + pendingDecisions := 0 + activeDecisions := 0 + + for _, decision := range timeline { + switch decision.LifecycleState { + case "pending", "proposed": + pendingDecisions++ + case "active", "implementing": + activeDecisions++ + } + } + + if pendingDecisions > 0 { + actions = append(actions, models.SuggestedAction{ + Type: "review", + Description: fmt.Sprintf("Review %d pending decisions for approval or closure", pendingDecisions), + Assignee: role, + Confidence: 0.9, + }) + } + + if activeDecisions > 2 { + actions = append(actions, models.SuggestedAction{ + Type: "monitor", + Description: fmt.Sprintf("Monitor progress on %d active decisions to prevent bottlenecks", activeDecisions), + Assignee: role, + Confidence: 0.7, + }) + } + + if len(actions) == 0 { + actions = append(actions, models.SuggestedAction{ + Type: "document", + Description: "Document lessons learned from completed decision timeline", + Assignee: role, + Confidence: 0.6, + }) + } + + return actions +} + +// checkEscalationNeeded determines if human escalation is required +func (s *Server) checkEscalationNeeded(timeline []models.DecisionRecordSummary) models.Escalation { + if len(timeline) == 0 { + return models.Escalation{ + Required: false, + } + } + + // Check for escalation indicators + conflictingDecisions := 0 + highRiskDecisions := 0 + + for _, decision := range timeline { + for _, tag := range decision.Tags { + if tag == "conflict" || tag == "contradiction" { + conflictingDecisions++ + } + if tag == "high-risk" || tag == "critical" { + highRiskDecisions++ + } + } + } + + if conflictingDecisions > 0 || highRiskDecisions > 1 { + return models.Escalation{ + Required: true, + Who: []string{"decision_lead", "technical_lead", "product_owner"}, + } + } + + return models.Escalation{ + Required: false, + } +} + +// handleBackbeatMetrics returns BACKBEAT metrics for monitoring +func (s *Server) handleBackbeatMetrics(w http.ResponseWriter, r *http.Request) { + if s.Backbeat == nil { + http.Error(w, "BACKBEAT integration not available", http.StatusServiceUnavailable) + return + } + + metrics := s.Backbeat.GetDecisionMetrics() + w.Header().Set("Content-Type", "application/json") + + if err := json.NewEncoder(w).Encode(metrics); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } +} + +// handleBackbeatOperations returns currently active BACKBEAT operations +func (s *Server) handleBackbeatOperations(w http.ResponseWriter, r *http.Request) { + if s.Backbeat == nil { + http.Error(w, "BACKBEAT integration not available", http.StatusServiceUnavailable) + return + } + + operations := s.Backbeat.GetActiveOperations() + w.Header().Set("Content-Type", "application/json") + + if err := json.NewEncoder(w).Encode(operations); err != nil { + http.Error(w, err.Error(), http.StatusInternalServerError) + } } \ No newline at end of file diff --git a/src/backbeat/backbeat.go b/src/backbeat/backbeat.go new file mode 100644 index 0000000..e642e6f --- /dev/null +++ b/src/backbeat/backbeat.go @@ -0,0 +1,301 @@ +package backbeat + +import ( + "encoding/json" + "fmt" + "log" + "sync" + "time" + + "github.com/nats-io/nats.go" +) + +// BackbeatIntegration provides beat-aware timing coordination for BUBBLE decision operations +type BackbeatIntegration struct { + nc *nats.Conn + activeOperations sync.Map + clusterID string + agentID string + tempo int // beats per minute +} + +// DecisionOperation represents a tracked decision bundle operation with beat timing +type DecisionOperation struct { + ID string `json:"id"` + OperationType DecisionOperationType `json:"operation_type"` + StartBeat uint64 `json:"start_beat"` + EstimatedBeats int `json:"estimated_beats"` + Phase DecisionPhase `json:"phase"` + StartTime time.Time `json:"start_time"` + Data map[string]interface{} `json:"data"` +} + +// DecisionOperationType represents the type of decision operation being performed +type DecisionOperationType string + +const ( + DecisionBundleGeneration DecisionOperationType = "bundle_generation" + DecisionWalkback DecisionOperationType = "walkback_traversal" + DecisionScoring DecisionOperationType = "decision_scoring" + DecisionSummaryCreation DecisionOperationType = "summary_creation" + DecisionEvidenceGather DecisionOperationType = "evidence_gathering" +) + +// DecisionPhase represents the current phase of a decision operation +type DecisionPhase string + +const ( + PhaseStarted DecisionPhase = "started" + PhaseTraversing DecisionPhase = "traversing" + PhaseScoring DecisionPhase = "scoring" + PhaseSummarizing DecisionPhase = "summarizing" + PhaseCompleted DecisionPhase = "completed" + PhaseFailed DecisionPhase = "failed" +) + +// DecisionResult contains the results of a completed decision operation +type DecisionResult struct { + OperationID string `json:"operation_id"` + OperationType DecisionOperationType `json:"operation_type"` + Success bool `json:"success"` + BeatsTaken uint64 `json:"beats_taken"` + DurationMS int64 `json:"duration_ms"` + StartBeat uint64 `json:"start_beat"` + EndBeat uint64 `json:"end_beat"` + ResultData map[string]interface{} `json:"result_data"` +} + +// DecisionStatus represents the current status of a decision operation for BACKBEAT network +type DecisionStatus struct { + ClusterID string `json:"cluster_id"` + AgentID string `json:"agent_id"` + OperationID string `json:"operation_id"` + OperationType DecisionOperationType `json:"operation_type"` + Phase DecisionPhase `json:"phase"` + Beat uint64 `json:"beat"` + EstimatedBeats int `json:"estimated_beats"` + Timestamp string `json:"timestamp"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +// NewBackbeatIntegration creates a new BACKBEAT integration for decision tracking +func NewBackbeatIntegration(clusterID, agentID string, tempo int) *BackbeatIntegration { + return &BackbeatIntegration{ + clusterID: clusterID, + agentID: agentID, + tempo: tempo, + } +} + +// Connect establishes connection to BACKBEAT infrastructure via NATS +func (b *BackbeatIntegration) Connect(natsURL string) error { + log.Printf("🄁 Connecting to BACKBEAT infrastructure at %s", natsURL) + + nc, err := nats.Connect(natsURL) + if err != nil { + return fmt.Errorf("failed to connect to NATS: %w", err) + } + + b.nc = nc + log.Printf("āœ… Connected to BACKBEAT tempo grid (%d bpm)", b.tempo) + return nil +} + +// Close closes the BACKBEAT connection +func (b *BackbeatIntegration) Close() { + if b.nc != nil { + b.nc.Close() + log.Println("šŸ”Œ Disconnected from BACKBEAT infrastructure") + } +} + +// StartDecisionOperation begins tracking a decision operation with beat timing +func (b *BackbeatIntegration) StartDecisionOperation(operationID string, operationType DecisionOperationType, estimatedBeats int, data map[string]interface{}) error { + currentBeat := b.getCurrentBeat() + + operation := &DecisionOperation{ + ID: operationID, + OperationType: operationType, + StartBeat: currentBeat, + EstimatedBeats: estimatedBeats, + Phase: PhaseStarted, + StartTime: time.Now(), + Data: data, + } + + b.activeOperations.Store(operationID, operation) + + if err := b.emitDecisionStatus(operation); err != nil { + log.Printf("āš ļø Failed to emit decision status: %v", err) + } + + log.Printf("šŸ“Š Started %s operation %s at beat %d (estimated: %d beats)", + operationType, operationID, currentBeat, estimatedBeats) + + return nil +} + +// UpdateDecisionPhase updates the phase of an active decision operation +func (b *BackbeatIntegration) UpdateDecisionPhase(operationID string, phase DecisionPhase) error { + value, exists := b.activeOperations.Load(operationID) + if !exists { + log.Printf("āš ļø Decision operation %s not found for phase update", operationID) + return fmt.Errorf("operation not found: %s", operationID) + } + + operation := value.(*DecisionOperation) + operation.Phase = phase + + if err := b.emitDecisionStatus(operation); err != nil { + log.Printf("āš ļø Failed to emit decision status: %v", err) + } + + log.Printf("šŸ”„ Updated decision %s to phase %s", operationID, phase) + return nil +} + +// CompleteDecisionOperation finishes a decision operation and returns results +func (b *BackbeatIntegration) CompleteDecisionOperation(operationID string, success bool, resultData map[string]interface{}) (*DecisionResult, error) { + value, exists := b.activeOperations.Load(operationID) + if !exists { + return nil, fmt.Errorf("operation not found: %s", operationID) + } + + operation := value.(*DecisionOperation) + b.activeOperations.Delete(operationID) + + currentBeat := b.getCurrentBeat() + beatsTaken := currentBeat - operation.StartBeat + duration := time.Since(operation.StartTime) + + result := &DecisionResult{ + OperationID: operationID, + OperationType: operation.OperationType, + Success: success, + BeatsTaken: beatsTaken, + DurationMS: duration.Milliseconds(), + StartBeat: operation.StartBeat, + EndBeat: currentBeat, + ResultData: resultData, + } + + // Emit final status + operation.Phase = PhaseCompleted + if !success { + operation.Phase = PhaseFailed + } + if err := b.emitDecisionStatus(operation); err != nil { + log.Printf("āš ļø Failed to emit final decision status: %v", err) + } + + log.Printf("āœ… Completed decision %s - Success: %t (took %d beats, %dms)", + operationID, success, beatsTaken, duration.Milliseconds()) + + return result, nil +} + +// GetActiveOperations returns a list of currently active decision operations +func (b *BackbeatIntegration) GetActiveOperations() []*DecisionOperation { + var operations []*DecisionOperation + + b.activeOperations.Range(func(key, value interface{}) bool { + if operation, ok := value.(*DecisionOperation); ok { + operations = append(operations, operation) + } + return true + }) + + return operations +} + +// IsWithinBeatBudget checks if an operation is still within its estimated beat budget +func (b *BackbeatIntegration) IsWithinBeatBudget(operationID string) bool { + value, exists := b.activeOperations.Load(operationID) + if !exists { + return false + } + + operation := value.(*DecisionOperation) + currentBeat := b.getCurrentBeat() + beatsElapsed := currentBeat - operation.StartBeat + + return int(beatsElapsed) <= operation.EstimatedBeats +} + +// TimeoutOperation forcefully times out an operation that exceeds its beat budget +func (b *BackbeatIntegration) TimeoutOperation(operationID string) error { + log.Printf("ā° Timing out decision operation %s due to beat budget exceeded", operationID) + + timeoutData := map[string]interface{}{ + "timeout_reason": "beat_budget_exceeded", + "timeout_at_beat": b.getCurrentBeat(), + } + + _, err := b.CompleteDecisionOperation(operationID, false, timeoutData) + return err +} + +// getCurrentBeat calculates the current beat based on system time and tempo +func (b *BackbeatIntegration) getCurrentBeat() uint64 { + // Calculate beat based on 2 BPM tempo (30 seconds per beat) + // In production, this would sync with BACKBEAT infrastructure + epochTime := time.Now().Unix() + beatsPerSecond := float64(b.tempo) / 60.0 + return uint64(float64(epochTime) * beatsPerSecond) +} + +// emitDecisionStatus publishes decision operation status to BACKBEAT network +func (b *BackbeatIntegration) emitDecisionStatus(operation *DecisionOperation) error { + if b.nc == nil { + return nil // Not connected, skip emission + } + + status := DecisionStatus{ + ClusterID: b.clusterID, + AgentID: b.agentID, + OperationID: operation.ID, + OperationType: operation.OperationType, + Phase: operation.Phase, + Beat: b.getCurrentBeat(), + EstimatedBeats: operation.EstimatedBeats, + Timestamp: time.Now().UTC().Format(time.RFC3339), + Metadata: operation.Data, + } + + statusJSON, err := json.Marshal(status) + if err != nil { + return fmt.Errorf("failed to marshal status: %w", err) + } + + // Publish to BACKBEAT decision tracking topic + subject := fmt.Sprintf("backbeat.decision.%s.%s", b.clusterID, b.agentID) + if err := b.nc.Publish(subject, statusJSON); err != nil { + return fmt.Errorf("failed to publish status: %w", err) + } + + log.Printf("šŸ“” Emitted decision status: %s - %s", operation.ID, operation.Phase) + return nil +} + +// GetDecisionMetrics returns aggregated metrics for decision operations +func (b *BackbeatIntegration) GetDecisionMetrics() map[string]interface{} { + activeOps := b.GetActiveOperations() + + operationTypes := make(map[DecisionOperationType]int) + operationPhases := make(map[DecisionPhase]int) + + for _, op := range activeOps { + operationTypes[op.OperationType]++ + operationPhases[op.Phase]++ + } + + return map[string]interface{}{ + "active_operations": len(activeOps), + "operation_types": operationTypes, + "operation_phases": operationPhases, + "cluster_id": b.clusterID, + "agent_id": b.agentID, + "tempo_bpm": b.tempo, + "connected": b.nc != nil && b.nc.IsConnected(), + } +} \ No newline at end of file diff --git a/src/bubble b/src/bubble new file mode 100755 index 0000000..3ed4655 Binary files /dev/null and b/src/bubble differ diff --git a/src/go.mod b/src/go.mod index 9992d5d..3c6a6c5 100644 --- a/src/go.mod +++ b/src/go.mod @@ -3,6 +3,14 @@ module gitea.deepblack.cloud/chorus/bubble go 1.24.5 require ( - github.com/mattn/go-sqlite3 v1.14.31 // indirect - github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c // indirect + github.com/mattn/go-sqlite3 v1.14.31 + github.com/nats-io/nats.go v1.31.0 +) + +require ( + github.com/klauspost/compress v1.17.0 // indirect + github.com/nats-io/nkeys v0.4.5 // indirect + github.com/nats-io/nuid v1.0.1 // indirect + golang.org/x/crypto v0.6.0 // indirect + golang.org/x/sys v0.5.0 // indirect ) diff --git a/src/go.sum b/src/go.sum index 1fe5c20..3baaa33 100644 --- a/src/go.sum +++ b/src/go.sum @@ -1,4 +1,14 @@ +github.com/klauspost/compress v1.17.0 h1:Rnbp4K9EjcDuVuHtd0dgA4qNuv9yKDYKK1ulpJwgrqM= +github.com/klauspost/compress v1.17.0/go.mod h1:ntbaceVETuRiXiv4DpjP66DpAtAGkEQskQzEyD//IeE= github.com/mattn/go-sqlite3 v1.14.31 h1:ldt6ghyPJsokUIlksH63gWZkG6qVGeEAu4zLeS4aVZM= github.com/mattn/go-sqlite3 v1.14.31/go.mod h1:Uh1q+B4BYcTPb+yiD3kU8Ct7aC0hY9fxUwlHK0RXw+Y= -github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c h1:g+WoO5jjkqGAzHWCjJB1zZfXPIAaDpzXIEJ0eS6B5Ok= -github.com/tecbot/gorocksdb v0.0.0-20191217155057-f0fad39f321c/go.mod h1:ahpPrc7HpcfEWDQRZEmnXMzHY03mLDYMCxeDzy46i+8= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= +github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= +github.com/nats-io/nkeys v0.4.5/go.mod h1:XUkxdLPTufzlihbamfzQ7mw/VGx6ObUs+0bN5sNvt64= +github.com/nats-io/nuid v1.0.1 h1:5iA8DT8V7q8WK2EScv2padNa/rTESc1KdnPw4TC2paw= +github.com/nats-io/nuid v1.0.1/go.mod h1:19wcPz3Ph3q0Jbyiqsd0kePYG7A95tJPxeL+1OSON2c= +golang.org/x/crypto v0.6.0 h1:qfktjS5LUO+fFKeJXZ+ikTRijMmljikvG68fpMMruSc= +golang.org/x/crypto v0.6.0/go.mod h1:OFC/31mSvZgRz0V1QTNCzfAI1aIRzbiufJtkMIlEp58= +golang.org/x/sys v0.5.0 h1:MUK/U/4lj1t1oPg0HfuXDN/Z1wv31ZJ/YcPiGccS4DU= +golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= diff --git a/src/main.go b/src/main.go index 05c213e..0b61cb2 100644 --- a/src/main.go +++ b/src/main.go @@ -3,30 +3,82 @@ package main import ( "fmt" "log" + "os" + "os/signal" + "syscall" + "gitea.deepblack.cloud/chorus/bubble/api" + "gitea.deepblack.cloud/chorus/bubble/backbeat" "gitea.deepblack.cloud/chorus/bubble/storage" ) func main() { // --- Storage Initialization --- - dbPath := "./bubble_rocksdb" + dbPath := getEnvOrDefault("BUBBLE_DB_PATH", "./bubble.db") - // Initialize the RocksDB store. - store, err := storage.NewRocksDBStore(dbPath) + // Initialize the SQLite store. + store, err := storage.NewSQLiteStore(dbPath) if err != nil { - log.Fatalf("Failed to initialize rocksdb store: %v", err) + log.Fatalf("Failed to initialize sqlite store: %v", err) } - // defer store.Close() // Close the DB when the application exits + // defer store.DB.Close() // Close the DB when the application exits - fmt.Println("RocksDB store initialized successfully.") + fmt.Println("SQLite store initialized successfully.") + + // --- BACKBEAT Integration --- + fmt.Println("🄁 Initializing BACKBEAT integration...") + backbeatIntegration := backbeat.NewBackbeatIntegration("bubble-cluster", "bubble-decision-agent", 2) // 2 BPM + + // Connect to BACKBEAT infrastructure + natsURL := getEnvOrDefault("BACKBEAT_NATS_URL", "nats://backbeat-nats:4222") + if err := backbeatIntegration.Connect(natsURL); err != nil { + log.Printf("āš ļø Failed to connect to BACKBEAT: %v", err) + log.Println("šŸ”„ Continuing without BACKBEAT integration...") + } else { + fmt.Println("āœ… BACKBEAT integration initialized") + } // --- API Server Initialization --- server := api.NewServer(store) + server.SetBackbeatIntegration(backbeatIntegration) + + // Setup graceful shutdown + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + go func() { + <-c + fmt.Println("\nšŸ›‘ Shutting down BUBBLE Decision Agent...") + backbeatIntegration.Close() + os.Exit(0) + }() // Start the server. - port := "8080" - fmt.Printf("Starting BUBBLE Decision Agent on port %s...\n", port) + port := getEnvOrDefault("BUBBLE_PORT", "8080") + fmt.Printf("šŸš€ Starting BUBBLE Decision Agent on port %s...\n", port) + fmt.Printf("šŸ“Š BACKBEAT timing integration: %s\n", getBoolString(backbeatIntegration != nil)) + fmt.Printf("🌐 API endpoints:\n") + fmt.Printf(" POST /decision/bundle - Generate decision bundles with timing\n") + fmt.Printf(" GET /backbeat/metrics - View BACKBEAT metrics\n") + fmt.Printf(" GET /backbeat/operations - View active operations\n") + if err := server.Start(":" + port); err != nil { log.Fatalf("Failed to start server: %v", err) } +} + +// getEnvOrDefault returns environment variable value or default if not set +func getEnvOrDefault(key, defaultValue string) string { + if value := os.Getenv(key); value != "" { + return value + } + return defaultValue +} + +// getBoolString returns a human-readable string for boolean values +func getBoolString(b bool) string { + if b { + return "enabled" + } + return "disabled" } \ No newline at end of file diff --git a/src/seed.go b/src/seed.go deleted file mode 100644 index 2d33f87..0000000 --- a/src/seed.go +++ /dev/null @@ -1,72 +0,0 @@ -package main - -import ( - "database/sql" - "log" - - _ "github.com/mattn/go-sqlite3" -) - -func main() { - db, err := sql.Open("sqlite3", "./bubble.db") - if err != nil { - log.Fatalf("Failed to open database: %v", err) - } - defer db.Close() - - // Sample Decisions - decisions := []struct { - ID string - Statement string - LifecycleState string - RoleExposure string - Tags string - Timestamp string - }{ - {"dr:1", "Adopt Go for new microservices", "active", `{"engineer": true, "pm": true}`, `["language", "backend"]`, "2025-08-12T10:00:00Z"}, - {"dr:2", "Use FastAPI for Python services", "superseded", `{"engineer": true}`, `["python", "api"]`, "2025-08-10T11:00:00Z"}, - {"dr:3", "Evaluate RocksDB for storage", "active", `{"engineer": true, "research": true}`, `["database", "storage"]`, "2025-08-11T15:00:00Z"}, - {"dr:4", "Decision to use Go was influenced by performance needs", "active", `{"pm": true}`, `["performance"]`, "2025-08-12T09:00:00Z"}, - } - - // Sample Edges (Provenance) - // dr:4 -> dr:1 (dr:4 influenced dr:1) - // dr:2 -> dr:1 (dr:2 was superseded by dr:1) - edges := []struct { - SourceID string - TargetID string - Relation string - }{ - {"dr:4", "dr:1", "influences"}, - {"dr:2", "dr:1", "supersedes"}, - {"dr:3", "dr:4", "influences"}, - } - - log.Println("Seeding database...") - - // Insert Decisions - for _, d := range decisions { - _, err := db.Exec(` - INSERT INTO decisions (id, statement, lifecycle_state, role_exposure, tags, timestamp) - VALUES (?, ?, ?, ?, ?, ?) - ON CONFLICT(id) DO NOTHING; - `, d.ID, d.Statement, d.LifecycleState, d.RoleExposure, d.Tags, d.Timestamp) - if err != nil { - log.Fatalf("Failed to insert decision %s: %v", d.ID, err) - } - } - - // Insert Edges - for _, e := range edges { - _, err := db.Exec(` - INSERT INTO edges (source_id, target_id, relation) - VALUES (?, ?, ?) - ON CONFLICT(source_id, target_id) DO NOTHING; - `, e.SourceID, e.TargetID, e.Relation) - if err != nil { - log.Fatalf("Failed to insert edge %s -> %s: %v", e.SourceID, e.TargetID, err) - } - } - - log.Println("Database seeded successfully.") -} diff --git a/src/storage/rocksdb.go b/src/storage/rocksdb.go.disabled similarity index 100% rename from src/storage/rocksdb.go rename to src/storage/rocksdb.go.disabled