Major integrations and fixes: - Added BACKBEAT SDK integration for P2P operation timing - Implemented beat-aware status tracking for distributed operations - Added Docker secrets support for secure license management - Resolved KACHING license validation via HTTPS/TLS - Updated docker-compose configuration for clean stack deployment - Disabled rollback policies to prevent deployment failures - Added license credential storage (CHORUS-DEV-MULTI-001) Technical improvements: - BACKBEAT P2P operation tracking with phase management - Enhanced configuration system with file-based secrets - Improved error handling for license validation - Clean separation of KACHING and CHORUS deployment stacks 🤖 Generated with [Claude Code](https://claude.ai/code) Co-Authored-By: Claude <noreply@anthropic.com>
1204 lines
33 KiB
Go
1204 lines
33 KiB
Go
package intelligence
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io/ioutil"
|
|
"net/http"
|
|
"strings"
|
|
"sync"
|
|
"time"
|
|
|
|
slurpContext "chorus/pkg/slurp/context"
|
|
)
|
|
|
|
// DefaultRAGIntegration provides comprehensive RAG system integration
|
|
type DefaultRAGIntegration struct {
|
|
config *EngineConfig
|
|
httpClient *http.Client
|
|
queryOptimizer *QueryOptimizer
|
|
indexManager *IndexManager
|
|
cacheManager *RAGCacheManager
|
|
fallbackEngine *FallbackEngine
|
|
statsTracker *RAGStatsTracker
|
|
}
|
|
|
|
// QueryOptimizer optimizes queries for better RAG retrieval
|
|
type QueryOptimizer struct {
|
|
queryTemplates map[string]*QueryTemplate
|
|
contextEnricher *ContextEnricher
|
|
}
|
|
|
|
// QueryTemplate defines structured queries for different use cases
|
|
type QueryTemplate struct {
|
|
Name string
|
|
Template string
|
|
Variables []string
|
|
Context map[string]interface{}
|
|
Priority int
|
|
Timeout time.Duration
|
|
}
|
|
|
|
// ContextEnricher adds contextual information to queries
|
|
type ContextEnricher struct {
|
|
enrichmentRules []*EnrichmentRule
|
|
}
|
|
|
|
// EnrichmentRule defines how to enrich queries with context
|
|
type EnrichmentRule struct {
|
|
Trigger string
|
|
Action string
|
|
Parameters map[string]interface{}
|
|
Weight float64
|
|
Conditions []string
|
|
}
|
|
|
|
// IndexManager manages RAG index operations
|
|
type IndexManager struct {
|
|
mu sync.RWMutex
|
|
indexedContent map[string]*IndexedDocument
|
|
indexingQueue chan *IndexingRequest
|
|
batchProcessor *BatchProcessor
|
|
stats *IndexStats
|
|
}
|
|
|
|
// IndexedDocument represents a document in the RAG index
|
|
type IndexedDocument struct {
|
|
ID string `json:"id"`
|
|
Content string `json:"content"`
|
|
Metadata map[string]interface{} `json:"metadata"`
|
|
Embeddings []float64 `json:"embeddings,omitempty"`
|
|
IndexedAt time.Time `json:"indexed_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
Version int `json:"version"`
|
|
Tags []string `json:"tags"`
|
|
Language string `json:"language"`
|
|
Size int64 `json:"size"`
|
|
}
|
|
|
|
// IndexingRequest represents a request to index content
|
|
type IndexingRequest struct {
|
|
DocumentID string
|
|
Content string
|
|
Metadata map[string]interface{}
|
|
Priority int
|
|
Callback func(error)
|
|
}
|
|
|
|
// BatchProcessor handles batch indexing operations
|
|
type BatchProcessor struct {
|
|
batchSize int
|
|
batchTimeout time.Duration
|
|
pendingBatch []*IndexingRequest
|
|
mu sync.Mutex
|
|
lastFlush time.Time
|
|
}
|
|
|
|
// IndexStats tracks indexing statistics
|
|
type IndexStats struct {
|
|
TotalDocuments int64 `json:"total_documents"`
|
|
IndexedToday int64 `json:"indexed_today"`
|
|
IndexingErrors int64 `json:"indexing_errors"`
|
|
AverageIndexTime time.Duration `json:"average_index_time"`
|
|
LastIndexTime time.Time `json:"last_index_time"`
|
|
IndexSize int64 `json:"index_size"`
|
|
}
|
|
|
|
// RAGCacheManager manages caching for RAG responses
|
|
type RAGCacheManager struct {
|
|
cache sync.Map
|
|
cacheTTL time.Duration
|
|
maxCacheSize int
|
|
currentSize int
|
|
mu sync.RWMutex
|
|
cleanupTicker *time.Ticker
|
|
}
|
|
|
|
// RAGCacheEntry represents a cached RAG response
|
|
type RAGCacheEntry struct {
|
|
Query string `json:"query"`
|
|
Response *RAGResponse `json:"response"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
ExpiresAt time.Time `json:"expires_at"`
|
|
AccessCount int `json:"access_count"`
|
|
LastAccess time.Time `json:"last_access"`
|
|
Size int `json:"size"`
|
|
}
|
|
|
|
// FallbackEngine provides fallback when RAG is unavailable
|
|
type FallbackEngine struct {
|
|
localKnowledge *LocalKnowledgeBase
|
|
ruleEngine *RuleBasedEngine
|
|
templateEngine *TemplateEngine
|
|
}
|
|
|
|
// LocalKnowledgeBase contains local knowledge for fallback
|
|
type LocalKnowledgeBase struct {
|
|
knowledgeBase map[string]*KnowledgeEntry
|
|
patterns []*KnowledgePattern
|
|
mu sync.RWMutex
|
|
}
|
|
|
|
// KnowledgeEntry represents a local knowledge entry
|
|
type KnowledgeEntry struct {
|
|
Topic string `json:"topic"`
|
|
Content string `json:"content"`
|
|
Keywords []string `json:"keywords"`
|
|
Confidence float64 `json:"confidence"`
|
|
Source string `json:"source"`
|
|
Tags []string `json:"tags"`
|
|
Metadata map[string]interface{} `json:"metadata"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
UpdatedAt time.Time `json:"updated_at"`
|
|
}
|
|
|
|
// KnowledgePattern represents a pattern in local knowledge
|
|
type KnowledgePattern struct {
|
|
Pattern string `json:"pattern"`
|
|
Response string `json:"response"`
|
|
Confidence float64 `json:"confidence"`
|
|
Examples []string `json:"examples"`
|
|
Category string `json:"category"`
|
|
}
|
|
|
|
// RuleBasedEngine provides rule-based fallback responses
|
|
type RuleBasedEngine struct {
|
|
rules []*ResponseRule
|
|
}
|
|
|
|
// ResponseRule defines a rule for generating responses
|
|
type ResponseRule struct {
|
|
Condition string `json:"condition"`
|
|
Response string `json:"response"`
|
|
Priority int `json:"priority"`
|
|
Confidence float64 `json:"confidence"`
|
|
Tags []string `json:"tags"`
|
|
}
|
|
|
|
// TemplateEngine generates responses from templates
|
|
type TemplateEngine struct {
|
|
templates map[string]*ResponseTemplate
|
|
}
|
|
|
|
// ResponseTemplate defines a response template
|
|
type ResponseTemplate struct {
|
|
Name string `json:"name"`
|
|
Template string `json:"template"`
|
|
Variables []string `json:"variables"`
|
|
Category string `json:"category"`
|
|
Confidence float64 `json:"confidence"`
|
|
Metadata map[string]interface{} `json:"metadata"`
|
|
}
|
|
|
|
// RAGStatsTracker tracks RAG performance statistics
|
|
type RAGStatsTracker struct {
|
|
mu sync.RWMutex
|
|
totalQueries int64
|
|
successfulQueries int64
|
|
failedQueries int64
|
|
cacheHits int64
|
|
cacheMisses int64
|
|
averageLatency time.Duration
|
|
fallbackUsed int64
|
|
lastReset time.Time
|
|
}
|
|
|
|
// NewDefaultRAGIntegration creates a new RAG integration
|
|
func NewDefaultRAGIntegration(config *EngineConfig) *DefaultRAGIntegration {
|
|
integration := &DefaultRAGIntegration{
|
|
config: config,
|
|
httpClient: &http.Client{
|
|
Timeout: config.RAGTimeout,
|
|
Transport: &http.Transport{
|
|
MaxIdleConns: 10,
|
|
MaxIdleConnsPerHost: 5,
|
|
IdleConnTimeout: 30 * time.Second,
|
|
},
|
|
},
|
|
queryOptimizer: NewQueryOptimizer(),
|
|
indexManager: NewIndexManager(),
|
|
cacheManager: NewRAGCacheManager(config.CacheTTL),
|
|
fallbackEngine: NewFallbackEngine(),
|
|
statsTracker: NewRAGStatsTracker(),
|
|
}
|
|
|
|
// Start background processes
|
|
go integration.indexManager.startBatchProcessor()
|
|
go integration.cacheManager.startCleanupRoutine()
|
|
|
|
return integration
|
|
}
|
|
|
|
// NewQueryOptimizer creates a query optimizer
|
|
func NewQueryOptimizer() *QueryOptimizer {
|
|
optimizer := &QueryOptimizer{
|
|
queryTemplates: make(map[string]*QueryTemplate),
|
|
contextEnricher: NewContextEnricher(),
|
|
}
|
|
|
|
// Define standard query templates
|
|
templates := []*QueryTemplate{
|
|
{
|
|
Name: "code_analysis",
|
|
Template: "Analyze the {{language}} code in {{file_path}}. Focus on {{focus_areas}}. Consider {{context}}.",
|
|
Variables: []string{"language", "file_path", "focus_areas", "context"},
|
|
Priority: 1,
|
|
Timeout: 30 * time.Second,
|
|
},
|
|
{
|
|
Name: "architecture_advice",
|
|
Template: "Provide architectural guidance for {{component_type}} in {{project_context}}. Consider {{constraints}} and {{goals}}.",
|
|
Variables: []string{"component_type", "project_context", "constraints", "goals"},
|
|
Priority: 2,
|
|
Timeout: 45 * time.Second,
|
|
},
|
|
{
|
|
Name: "best_practices",
|
|
Template: "What are the best practices for {{technology}} in {{use_case}}? Consider {{requirements}}.",
|
|
Variables: []string{"technology", "use_case", "requirements"},
|
|
Priority: 1,
|
|
Timeout: 20 * time.Second,
|
|
},
|
|
{
|
|
Name: "pattern_recommendation",
|
|
Template: "Recommend design patterns for {{problem_description}} using {{technologies}}. Context: {{project_context}}.",
|
|
Variables: []string{"problem_description", "technologies", "project_context"},
|
|
Priority: 2,
|
|
Timeout: 35 * time.Second,
|
|
},
|
|
}
|
|
|
|
for _, template := range templates {
|
|
optimizer.queryTemplates[template.Name] = template
|
|
}
|
|
|
|
return optimizer
|
|
}
|
|
|
|
// NewContextEnricher creates a context enricher
|
|
func NewContextEnricher() *ContextEnricher {
|
|
enricher := &ContextEnricher{
|
|
enrichmentRules: []*EnrichmentRule{},
|
|
}
|
|
|
|
// Define enrichment rules
|
|
rules := []*EnrichmentRule{
|
|
{
|
|
Trigger: "code_analysis",
|
|
Action: "add_language_context",
|
|
Parameters: map[string]interface{}{"depth": "detailed"},
|
|
Weight: 0.8,
|
|
Conditions: []string{"has_language", "has_file_path"},
|
|
},
|
|
{
|
|
Trigger: "architecture",
|
|
Action: "add_project_context",
|
|
Parameters: map[string]interface{}{"scope": "system_wide"},
|
|
Weight: 0.9,
|
|
Conditions: []string{"has_project_info"},
|
|
},
|
|
{
|
|
Trigger: "performance",
|
|
Action: "add_performance_context",
|
|
Parameters: map[string]interface{}{"metrics": "standard"},
|
|
Weight: 0.7,
|
|
Conditions: []string{"has_performance_data"},
|
|
},
|
|
}
|
|
|
|
enricher.enrichmentRules = rules
|
|
return enricher
|
|
}
|
|
|
|
// NewIndexManager creates an index manager
|
|
func NewIndexManager() *IndexManager {
|
|
return &IndexManager{
|
|
indexedContent: make(map[string]*IndexedDocument),
|
|
indexingQueue: make(chan *IndexingRequest, 1000),
|
|
batchProcessor: &BatchProcessor{
|
|
batchSize: 10,
|
|
batchTimeout: 30 * time.Second,
|
|
lastFlush: time.Now(),
|
|
},
|
|
stats: &IndexStats{
|
|
LastIndexTime: time.Now(),
|
|
},
|
|
}
|
|
}
|
|
|
|
// NewRAGCacheManager creates a cache manager
|
|
func NewRAGCacheManager(ttl time.Duration) *RAGCacheManager {
|
|
manager := &RAGCacheManager{
|
|
cacheTTL: ttl,
|
|
maxCacheSize: 1000, // Maximum cached entries
|
|
}
|
|
|
|
return manager
|
|
}
|
|
|
|
// NewFallbackEngine creates a fallback engine
|
|
func NewFallbackEngine() *FallbackEngine {
|
|
return &FallbackEngine{
|
|
localKnowledge: NewLocalKnowledgeBase(),
|
|
ruleEngine: NewRuleBasedEngine(),
|
|
templateEngine: NewTemplateEngine(),
|
|
}
|
|
}
|
|
|
|
// NewLocalKnowledgeBase creates a local knowledge base
|
|
func NewLocalKnowledgeBase() *LocalKnowledgeBase {
|
|
kb := &LocalKnowledgeBase{
|
|
knowledgeBase: make(map[string]*KnowledgeEntry),
|
|
patterns: []*KnowledgePattern{},
|
|
}
|
|
|
|
// Load default knowledge entries
|
|
kb.loadDefaultKnowledge()
|
|
return kb
|
|
}
|
|
|
|
// NewRuleBasedEngine creates a rule-based engine
|
|
func NewRuleBasedEngine() *RuleBasedEngine {
|
|
engine := &RuleBasedEngine{
|
|
rules: []*ResponseRule{},
|
|
}
|
|
|
|
// Load default rules
|
|
engine.loadDefaultRules()
|
|
return engine
|
|
}
|
|
|
|
// NewTemplateEngine creates a template engine
|
|
func NewTemplateEngine() *TemplateEngine {
|
|
engine := &TemplateEngine{
|
|
templates: make(map[string]*ResponseTemplate),
|
|
}
|
|
|
|
// Load default templates
|
|
engine.loadDefaultTemplates()
|
|
return engine
|
|
}
|
|
|
|
// NewRAGStatsTracker creates a stats tracker
|
|
func NewRAGStatsTracker() *RAGStatsTracker {
|
|
return &RAGStatsTracker{
|
|
lastReset: time.Now(),
|
|
}
|
|
}
|
|
|
|
// Query queries the RAG system for relevant information
|
|
func (ri *DefaultRAGIntegration) Query(ctx context.Context, query string, context map[string]interface{}) (*RAGResponse, error) {
|
|
start := time.Now()
|
|
ri.statsTracker.recordQuery()
|
|
|
|
// Check cache first
|
|
if cached := ri.cacheManager.get(query); cached != nil {
|
|
ri.statsTracker.recordCacheHit()
|
|
return cached.Response, nil
|
|
}
|
|
ri.statsTracker.recordCacheMiss()
|
|
|
|
// Optimize query
|
|
optimizedQuery := ri.queryOptimizer.optimizeQuery(query, context)
|
|
|
|
// Try RAG system
|
|
response, err := ri.queryRAGSystem(ctx, optimizedQuery)
|
|
if err != nil {
|
|
// Fallback to local knowledge
|
|
ri.statsTracker.recordFallback()
|
|
response, err = ri.fallbackEngine.generateResponse(ctx, query, context)
|
|
if err != nil {
|
|
ri.statsTracker.recordFailure()
|
|
return nil, fmt.Errorf("both RAG and fallback failed: %w", err)
|
|
}
|
|
}
|
|
|
|
// Cache successful response
|
|
ri.cacheManager.put(query, response)
|
|
|
|
// Update stats
|
|
ri.statsTracker.recordSuccess(time.Since(start))
|
|
|
|
return response, nil
|
|
}
|
|
|
|
// EnhanceContext enhances context using RAG knowledge
|
|
func (ri *DefaultRAGIntegration) EnhanceContext(ctx context.Context, node *slurpContext.ContextNode) (*slurpContext.ContextNode, error) {
|
|
// Create enhancement query
|
|
query := ri.buildEnhancementQuery(node)
|
|
queryContext := ri.buildQueryContext(node)
|
|
|
|
// Query RAG system
|
|
response, err := ri.Query(ctx, query, queryContext)
|
|
if err != nil {
|
|
return node, fmt.Errorf("failed to enhance context: %w", err)
|
|
}
|
|
|
|
// Apply enhancements
|
|
enhanced := ri.applyEnhancements(node, response)
|
|
return enhanced, nil
|
|
}
|
|
|
|
// IndexContent indexes content for RAG retrieval
|
|
func (ri *DefaultRAGIntegration) IndexContent(ctx context.Context, content string, metadata map[string]interface{}) error {
|
|
request := &IndexingRequest{
|
|
DocumentID: ri.generateDocumentID(content, metadata),
|
|
Content: content,
|
|
Metadata: metadata,
|
|
Priority: 1,
|
|
}
|
|
|
|
select {
|
|
case ri.indexManager.indexingQueue <- request:
|
|
return nil
|
|
default:
|
|
return fmt.Errorf("indexing queue is full")
|
|
}
|
|
}
|
|
|
|
// SearchSimilar searches for similar content in RAG system
|
|
func (ri *DefaultRAGIntegration) SearchSimilar(ctx context.Context, content string, limit int) ([]*RAGResult, error) {
|
|
// Build similarity search query
|
|
query := fmt.Sprintf("Find similar content to: %s", content)
|
|
|
|
// Query RAG system for similar content
|
|
response, err := ri.Query(ctx, query, map[string]interface{}{
|
|
"search_type": "similarity",
|
|
"limit": limit,
|
|
"content": content,
|
|
})
|
|
|
|
if err != nil {
|
|
return nil, fmt.Errorf("similarity search failed: %w", err)
|
|
}
|
|
|
|
// Convert response to results
|
|
results := ri.convertToRAGResults(response, limit)
|
|
return results, nil
|
|
}
|
|
|
|
// UpdateIndex updates RAG index with new content
|
|
func (ri *DefaultRAGIntegration) UpdateIndex(ctx context.Context, updates []*RAGUpdate) error {
|
|
for _, update := range updates {
|
|
metadata := update.Metadata
|
|
if metadata == nil {
|
|
metadata = make(map[string]interface{})
|
|
}
|
|
metadata["operation"] = update.Operation
|
|
|
|
err := ri.IndexContent(ctx, update.Content, metadata)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update index for document %s: %w", update.ID, err)
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// GetRAGStats returns RAG system statistics
|
|
func (ri *DefaultRAGIntegration) GetRAGStats(ctx context.Context) (*RAGStatistics, error) {
|
|
stats := ri.statsTracker.getStats()
|
|
indexStats := ri.indexManager.getStats()
|
|
|
|
return &RAGStatistics{
|
|
TotalDocuments: indexStats.TotalDocuments,
|
|
TotalQueries: stats.totalQueries,
|
|
AverageQueryTime: stats.averageLatency,
|
|
IndexSize: indexStats.IndexSize,
|
|
LastIndexUpdate: indexStats.LastIndexTime,
|
|
ErrorRate: ri.calculateErrorRate(stats),
|
|
}, nil
|
|
}
|
|
|
|
// Helper methods
|
|
|
|
func (ri *DefaultRAGIntegration) queryRAGSystem(ctx context.Context, query string) (*RAGResponse, error) {
|
|
if ri.config.RAGEndpoint == "" {
|
|
return nil, fmt.Errorf("RAG endpoint not configured")
|
|
}
|
|
|
|
// Prepare request
|
|
requestBody := map[string]interface{}{
|
|
"query": query,
|
|
"timeout": ri.config.RAGTimeout.Seconds(),
|
|
}
|
|
|
|
jsonBody, err := json.Marshal(requestBody)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
|
}
|
|
|
|
// Create HTTP request
|
|
req, err := http.NewRequestWithContext(ctx, "POST", ri.config.RAGEndpoint, bytes.NewBuffer(jsonBody))
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
|
}
|
|
|
|
req.Header.Set("Content-Type", "application/json")
|
|
|
|
// Execute request
|
|
resp, err := ri.httpClient.Do(req)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("RAG request failed: %w", err)
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return nil, fmt.Errorf("RAG request failed with status: %d", resp.StatusCode)
|
|
}
|
|
|
|
// Parse response
|
|
body, err := ioutil.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to read response: %w", err)
|
|
}
|
|
|
|
var ragResponse RAGResponse
|
|
if err := json.Unmarshal(body, &ragResponse); err != nil {
|
|
return nil, fmt.Errorf("failed to parse response: %w", err)
|
|
}
|
|
|
|
ragResponse.ProcessedAt = time.Now()
|
|
return &ragResponse, nil
|
|
}
|
|
|
|
func (qo *QueryOptimizer) optimizeQuery(query string, context map[string]interface{}) string {
|
|
// Determine query type
|
|
queryType := qo.determineQueryType(query, context)
|
|
|
|
// Get appropriate template
|
|
template, exists := qo.queryTemplates[queryType]
|
|
if !exists {
|
|
return query // Return original if no template
|
|
}
|
|
|
|
// Apply template
|
|
optimizedQuery := qo.applyTemplate(template, query, context)
|
|
|
|
// Enrich with context
|
|
enrichedQuery := qo.contextEnricher.enrichQuery(optimizedQuery, context)
|
|
|
|
return enrichedQuery
|
|
}
|
|
|
|
func (qo *QueryOptimizer) determineQueryType(query string, context map[string]interface{}) string {
|
|
lowerQuery := strings.ToLower(query)
|
|
|
|
// Simple keyword matching for query type determination
|
|
if strings.Contains(lowerQuery, "analyze") || strings.Contains(lowerQuery, "code") {
|
|
return "code_analysis"
|
|
}
|
|
if strings.Contains(lowerQuery, "architecture") || strings.Contains(lowerQuery, "design") {
|
|
return "architecture_advice"
|
|
}
|
|
if strings.Contains(lowerQuery, "best practice") || strings.Contains(lowerQuery, "recommendation") {
|
|
return "best_practices"
|
|
}
|
|
if strings.Contains(lowerQuery, "pattern") {
|
|
return "pattern_recommendation"
|
|
}
|
|
|
|
return "code_analysis" // Default
|
|
}
|
|
|
|
func (qo *QueryOptimizer) applyTemplate(template *QueryTemplate, query string, context map[string]interface{}) string {
|
|
result := template.Template
|
|
|
|
// Replace template variables with context values
|
|
for _, variable := range template.Variables {
|
|
placeholder := fmt.Sprintf("{{%s}}", variable)
|
|
if value, exists := context[variable]; exists {
|
|
result = strings.ReplaceAll(result, placeholder, fmt.Sprintf("%v", value))
|
|
} else {
|
|
// Provide reasonable defaults
|
|
switch variable {
|
|
case "language":
|
|
if lang, ok := context["language"]; ok {
|
|
result = strings.ReplaceAll(result, placeholder, fmt.Sprintf("%v", lang))
|
|
} else {
|
|
result = strings.ReplaceAll(result, placeholder, "unknown")
|
|
}
|
|
case "file_path":
|
|
if path, ok := context["file_path"]; ok {
|
|
result = strings.ReplaceAll(result, placeholder, fmt.Sprintf("%v", path))
|
|
} else {
|
|
result = strings.ReplaceAll(result, placeholder, "current file")
|
|
}
|
|
default:
|
|
result = strings.ReplaceAll(result, placeholder, query)
|
|
}
|
|
}
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
func (ce *ContextEnricher) enrichQuery(query string, context map[string]interface{}) string {
|
|
enriched := query
|
|
|
|
// Apply enrichment rules
|
|
for _, rule := range ce.enrichmentRules {
|
|
if ce.shouldApplyRule(rule, context) {
|
|
enriched = ce.applyEnrichmentRule(enriched, rule, context)
|
|
}
|
|
}
|
|
|
|
return enriched
|
|
}
|
|
|
|
func (ce *ContextEnricher) shouldApplyRule(rule *EnrichmentRule, context map[string]interface{}) bool {
|
|
for _, condition := range rule.Conditions {
|
|
switch condition {
|
|
case "has_language":
|
|
if _, exists := context["language"]; !exists {
|
|
return false
|
|
}
|
|
case "has_file_path":
|
|
if _, exists := context["file_path"]; !exists {
|
|
return false
|
|
}
|
|
case "has_project_info":
|
|
if _, exists := context["project"]; !exists {
|
|
return false
|
|
}
|
|
}
|
|
}
|
|
return true
|
|
}
|
|
|
|
func (ce *ContextEnricher) applyEnrichmentRule(query string, rule *EnrichmentRule, context map[string]interface{}) string {
|
|
switch rule.Action {
|
|
case "add_language_context":
|
|
if lang, exists := context["language"]; exists {
|
|
return fmt.Sprintf("%s Consider %s language-specific patterns and idioms.", query, lang)
|
|
}
|
|
case "add_project_context":
|
|
if project, exists := context["project"]; exists {
|
|
return fmt.Sprintf("%s In the context of project %v.", query, project)
|
|
}
|
|
case "add_performance_context":
|
|
return fmt.Sprintf("%s Focus on performance implications and optimization opportunities.", query)
|
|
}
|
|
return query
|
|
}
|
|
|
|
func (ri *DefaultRAGIntegration) buildEnhancementQuery(node *slurpContext.ContextNode) string {
|
|
return fmt.Sprintf("Provide additional insights for %s: %s. Technologies: %s",
|
|
node.Purpose, node.Summary, strings.Join(node.Technologies, ", "))
|
|
}
|
|
|
|
func (ri *DefaultRAGIntegration) buildQueryContext(node *slurpContext.ContextNode) map[string]interface{} {
|
|
return map[string]interface{}{
|
|
"file_path": node.Path,
|
|
"purpose": node.Purpose,
|
|
"technologies": node.Technologies,
|
|
"tags": node.Tags,
|
|
"summary": node.Summary,
|
|
}
|
|
}
|
|
|
|
func (ri *DefaultRAGIntegration) applyEnhancements(node *slurpContext.ContextNode, response *RAGResponse) *slurpContext.ContextNode {
|
|
enhanced := node.Clone()
|
|
|
|
// Add RAG insights
|
|
if response.Confidence >= ri.config.MinConfidenceThreshold {
|
|
enhanced.Insights = append(enhanced.Insights, fmt.Sprintf("RAG: %s", response.Answer))
|
|
enhanced.RAGConfidence = response.Confidence
|
|
|
|
// Add metadata
|
|
if enhanced.Metadata == nil {
|
|
enhanced.Metadata = make(map[string]interface{})
|
|
}
|
|
enhanced.Metadata["rag_enhanced"] = true
|
|
enhanced.Metadata["rag_sources"] = response.Sources
|
|
}
|
|
|
|
return enhanced
|
|
}
|
|
|
|
func (ri *DefaultRAGIntegration) generateDocumentID(content string, metadata map[string]interface{}) string {
|
|
// Simple hash-based ID generation
|
|
hash := fmt.Sprintf("%x", []byte(content))
|
|
if len(hash) > 16 {
|
|
hash = hash[:16]
|
|
}
|
|
return fmt.Sprintf("doc_%s_%d", hash, time.Now().Unix())
|
|
}
|
|
|
|
func (ri *DefaultRAGIntegration) convertToRAGResults(response *RAGResponse, limit int) []*RAGResult {
|
|
results := []*RAGResult{}
|
|
|
|
// Convert sources to results
|
|
for i, source := range response.Sources {
|
|
if i >= limit {
|
|
break
|
|
}
|
|
|
|
result := &RAGResult{
|
|
ID: source.ID,
|
|
Content: source.Content,
|
|
Score: source.Score,
|
|
Metadata: source.Metadata,
|
|
Highlights: []string{}, // Would be populated by actual RAG system
|
|
}
|
|
results = append(results, result)
|
|
}
|
|
|
|
return results
|
|
}
|
|
|
|
func (ri *DefaultRAGIntegration) calculateErrorRate(stats *RAGStatsTracker) float64 {
|
|
if stats.totalQueries == 0 {
|
|
return 0.0
|
|
}
|
|
return float64(stats.failedQueries) / float64(stats.totalQueries)
|
|
}
|
|
|
|
// Cache management methods
|
|
|
|
func (cm *RAGCacheManager) get(query string) *RAGCacheEntry {
|
|
if value, ok := cm.cache.Load(query); ok {
|
|
if entry, ok := value.(*RAGCacheEntry); ok {
|
|
if time.Now().Before(entry.ExpiresAt) {
|
|
entry.AccessCount++
|
|
entry.LastAccess = time.Now()
|
|
return entry
|
|
}
|
|
// Entry expired, remove it
|
|
cm.cache.Delete(query)
|
|
}
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (cm *RAGCacheManager) put(query string, response *RAGResponse) {
|
|
entry := &RAGCacheEntry{
|
|
Query: query,
|
|
Response: response,
|
|
CreatedAt: time.Now(),
|
|
ExpiresAt: time.Now().Add(cm.cacheTTL),
|
|
AccessCount: 1,
|
|
LastAccess: time.Now(),
|
|
Size: len(query) + len(response.Answer),
|
|
}
|
|
|
|
cm.mu.Lock()
|
|
if cm.currentSize >= cm.maxCacheSize {
|
|
cm.evictOldest()
|
|
}
|
|
cm.currentSize++
|
|
cm.mu.Unlock()
|
|
|
|
cm.cache.Store(query, entry)
|
|
}
|
|
|
|
func (cm *RAGCacheManager) evictOldest() {
|
|
// Simple LRU eviction
|
|
var oldestKey interface{}
|
|
var oldestTime time.Time = time.Now()
|
|
|
|
cm.cache.Range(func(key, value interface{}) bool {
|
|
if entry, ok := value.(*RAGCacheEntry); ok {
|
|
if entry.LastAccess.Before(oldestTime) {
|
|
oldestTime = entry.LastAccess
|
|
oldestKey = key
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
|
|
if oldestKey != nil {
|
|
cm.cache.Delete(oldestKey)
|
|
cm.currentSize--
|
|
}
|
|
}
|
|
|
|
func (cm *RAGCacheManager) startCleanupRoutine() {
|
|
cm.cleanupTicker = time.NewTicker(10 * time.Minute)
|
|
|
|
for range cm.cleanupTicker.C {
|
|
cm.cleanup()
|
|
}
|
|
}
|
|
|
|
func (cm *RAGCacheManager) cleanup() {
|
|
now := time.Now()
|
|
keysToDelete := []interface{}{}
|
|
|
|
cm.cache.Range(func(key, value interface{}) bool {
|
|
if entry, ok := value.(*RAGCacheEntry); ok {
|
|
if now.After(entry.ExpiresAt) {
|
|
keysToDelete = append(keysToDelete, key)
|
|
}
|
|
}
|
|
return true
|
|
})
|
|
|
|
cm.mu.Lock()
|
|
for _, key := range keysToDelete {
|
|
cm.cache.Delete(key)
|
|
cm.currentSize--
|
|
}
|
|
cm.mu.Unlock()
|
|
}
|
|
|
|
// Fallback engine methods
|
|
|
|
func (fe *FallbackEngine) generateResponse(ctx context.Context, query string, context map[string]interface{}) (*RAGResponse, error) {
|
|
// Try local knowledge base first
|
|
if response := fe.localKnowledge.search(query); response != nil {
|
|
return response, nil
|
|
}
|
|
|
|
// Try rule-based engine
|
|
if response := fe.ruleEngine.generateResponse(query, context); response != nil {
|
|
return response, nil
|
|
}
|
|
|
|
// Try template engine
|
|
if response := fe.templateEngine.generateResponse(query, context); response != nil {
|
|
return response, nil
|
|
}
|
|
|
|
// Return generic fallback
|
|
return &RAGResponse{
|
|
Query: query,
|
|
Answer: "I don't have specific information about this topic in my knowledge base.",
|
|
Sources: []*RAGSource{},
|
|
Confidence: 0.1,
|
|
Context: context,
|
|
ProcessedAt: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
// Additional implementation methods would continue here...
|
|
// For brevity, I'm showing the key structure and primary methods.
|
|
// In a complete implementation, all the helper methods for knowledge base loading,
|
|
// rule processing, template rendering, stats tracking, etc. would be included.
|
|
|
|
func (kb *LocalKnowledgeBase) loadDefaultKnowledge() {
|
|
// Load default knowledge entries
|
|
entries := []*KnowledgeEntry{
|
|
{
|
|
Topic: "Go Best Practices",
|
|
Content: "Use clear variable names, handle errors properly, follow Go conventions for package organization.",
|
|
Keywords: []string{"go", "golang", "best practices", "conventions"},
|
|
Confidence: 0.8,
|
|
Source: "built-in",
|
|
Tags: []string{"go", "best-practices"},
|
|
CreatedAt: time.Now(),
|
|
},
|
|
{
|
|
Topic: "JavaScript Patterns",
|
|
Content: "Use modern ES6+ features, avoid callback hell with async/await, follow modular design patterns.",
|
|
Keywords: []string{"javascript", "patterns", "es6", "async"},
|
|
Confidence: 0.8,
|
|
Source: "built-in",
|
|
Tags: []string{"javascript", "patterns"},
|
|
CreatedAt: time.Now(),
|
|
},
|
|
}
|
|
|
|
for _, entry := range entries {
|
|
kb.knowledgeBase[entry.Topic] = entry
|
|
}
|
|
}
|
|
|
|
func (kb *LocalKnowledgeBase) search(query string) *RAGResponse {
|
|
lowerQuery := strings.ToLower(query)
|
|
|
|
// Simple keyword matching
|
|
for _, entry := range kb.knowledgeBase {
|
|
for _, keyword := range entry.Keywords {
|
|
if strings.Contains(lowerQuery, strings.ToLower(keyword)) {
|
|
return &RAGResponse{
|
|
Query: query,
|
|
Answer: entry.Content,
|
|
Sources: []*RAGSource{{ID: entry.Topic, Title: entry.Topic, Content: entry.Content, Score: entry.Confidence}},
|
|
Confidence: entry.Confidence,
|
|
Context: map[string]interface{}{"source": "local_knowledge"},
|
|
ProcessedAt: time.Now(),
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (re *RuleBasedEngine) loadDefaultRules() {
|
|
rules := []*ResponseRule{
|
|
{
|
|
Condition: "contains:error handling",
|
|
Response: "Always check for errors and handle them appropriately. Use proper error wrapping and logging.",
|
|
Priority: 1,
|
|
Confidence: 0.7,
|
|
Tags: []string{"error-handling", "best-practices"},
|
|
},
|
|
{
|
|
Condition: "contains:performance",
|
|
Response: "Consider using profiling tools, optimize algorithms, and avoid premature optimization.",
|
|
Priority: 2,
|
|
Confidence: 0.6,
|
|
Tags: []string{"performance", "optimization"},
|
|
},
|
|
}
|
|
|
|
re.rules = rules
|
|
}
|
|
|
|
func (re *RuleBasedEngine) generateResponse(query string, context map[string]interface{}) *RAGResponse {
|
|
lowerQuery := strings.ToLower(query)
|
|
|
|
for _, rule := range re.rules {
|
|
if re.matchesCondition(lowerQuery, rule.Condition) {
|
|
return &RAGResponse{
|
|
Query: query,
|
|
Answer: rule.Response,
|
|
Sources: []*RAGSource{{ID: "rule", Title: "Rule-based response", Content: rule.Response, Score: rule.Confidence}},
|
|
Confidence: rule.Confidence,
|
|
Context: map[string]interface{}{"source": "rule_engine", "rule": rule.Condition},
|
|
ProcessedAt: time.Now(),
|
|
}
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (re *RuleBasedEngine) matchesCondition(query, condition string) bool {
|
|
if strings.HasPrefix(condition, "contains:") {
|
|
keyword := strings.TrimPrefix(condition, "contains:")
|
|
return strings.Contains(query, keyword)
|
|
}
|
|
return false
|
|
}
|
|
|
|
func (te *TemplateEngine) loadDefaultTemplates() {
|
|
templates := []*ResponseTemplate{
|
|
{
|
|
Name: "generic_advice",
|
|
Template: "For {{topic}}, consider following established best practices and consulting relevant documentation.",
|
|
Variables: []string{"topic"},
|
|
Category: "general",
|
|
Confidence: 0.4,
|
|
},
|
|
}
|
|
|
|
for _, template := range templates {
|
|
te.templates[template.Name] = template
|
|
}
|
|
}
|
|
|
|
func (te *TemplateEngine) generateResponse(query string, context map[string]interface{}) *RAGResponse {
|
|
// Simple template matching
|
|
if template, exists := te.templates["generic_advice"]; exists {
|
|
response := strings.ReplaceAll(template.Template, "{{topic}}", query)
|
|
|
|
return &RAGResponse{
|
|
Query: query,
|
|
Answer: response,
|
|
Sources: []*RAGSource{{ID: "template", Title: "Template response", Content: response, Score: template.Confidence}},
|
|
Confidence: template.Confidence,
|
|
Context: map[string]interface{}{"source": "template_engine", "template": template.Name},
|
|
ProcessedAt: time.Now(),
|
|
}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// Stats tracking methods
|
|
func (st *RAGStatsTracker) recordQuery() {
|
|
st.mu.Lock()
|
|
defer st.mu.Unlock()
|
|
st.totalQueries++
|
|
}
|
|
|
|
func (st *RAGStatsTracker) recordSuccess(latency time.Duration) {
|
|
st.mu.Lock()
|
|
defer st.mu.Unlock()
|
|
st.successfulQueries++
|
|
|
|
// Update average latency
|
|
if st.totalQueries == 1 {
|
|
st.averageLatency = latency
|
|
} else {
|
|
st.averageLatency = time.Duration(
|
|
(int64(st.averageLatency)*(st.totalQueries-1) + int64(latency)) / st.totalQueries,
|
|
)
|
|
}
|
|
}
|
|
|
|
func (st *RAGStatsTracker) recordFailure() {
|
|
st.mu.Lock()
|
|
defer st.mu.Unlock()
|
|
st.failedQueries++
|
|
}
|
|
|
|
func (st *RAGStatsTracker) recordCacheHit() {
|
|
st.mu.Lock()
|
|
defer st.mu.Unlock()
|
|
st.cacheHits++
|
|
}
|
|
|
|
func (st *RAGStatsTracker) recordCacheMiss() {
|
|
st.mu.Lock()
|
|
defer st.mu.Unlock()
|
|
st.cacheMisses++
|
|
}
|
|
|
|
func (st *RAGStatsTracker) recordFallback() {
|
|
st.mu.Lock()
|
|
defer st.mu.Unlock()
|
|
st.fallbackUsed++
|
|
}
|
|
|
|
func (st *RAGStatsTracker) getStats() *RAGStatsTracker {
|
|
st.mu.RLock()
|
|
defer st.mu.RUnlock()
|
|
return &RAGStatsTracker{
|
|
totalQueries: st.totalQueries,
|
|
successfulQueries: st.successfulQueries,
|
|
failedQueries: st.failedQueries,
|
|
cacheHits: st.cacheHits,
|
|
cacheMisses: st.cacheMisses,
|
|
averageLatency: st.averageLatency,
|
|
fallbackUsed: st.fallbackUsed,
|
|
lastReset: st.lastReset,
|
|
}
|
|
}
|
|
|
|
// Index management methods
|
|
func (im *IndexManager) startBatchProcessor() {
|
|
ticker := time.NewTicker(im.batchProcessor.batchTimeout)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case request := <-im.indexingQueue:
|
|
im.batchProcessor.mu.Lock()
|
|
im.batchProcessor.pendingBatch = append(im.batchProcessor.pendingBatch, request)
|
|
shouldFlush := len(im.batchProcessor.pendingBatch) >= im.batchProcessor.batchSize
|
|
im.batchProcessor.mu.Unlock()
|
|
|
|
if shouldFlush {
|
|
im.processBatch()
|
|
}
|
|
|
|
case <-ticker.C:
|
|
im.processBatch()
|
|
}
|
|
}
|
|
}
|
|
|
|
func (im *IndexManager) processBatch() {
|
|
im.batchProcessor.mu.Lock()
|
|
batch := im.batchProcessor.pendingBatch
|
|
im.batchProcessor.pendingBatch = []*IndexingRequest{}
|
|
im.batchProcessor.lastFlush = time.Now()
|
|
im.batchProcessor.mu.Unlock()
|
|
|
|
if len(batch) == 0 {
|
|
return
|
|
}
|
|
|
|
// Process batch
|
|
for _, request := range batch {
|
|
err := im.indexDocument(request)
|
|
if request.Callback != nil {
|
|
request.Callback(err)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (im *IndexManager) indexDocument(request *IndexingRequest) error {
|
|
im.mu.Lock()
|
|
defer im.mu.Unlock()
|
|
|
|
doc := &IndexedDocument{
|
|
ID: request.DocumentID,
|
|
Content: request.Content,
|
|
Metadata: request.Metadata,
|
|
IndexedAt: time.Now(),
|
|
UpdatedAt: time.Now(),
|
|
Version: 1,
|
|
Size: int64(len(request.Content)),
|
|
}
|
|
|
|
// Extract language if available
|
|
if lang, exists := request.Metadata["language"]; exists {
|
|
doc.Language = fmt.Sprintf("%v", lang)
|
|
}
|
|
|
|
// Extract tags if available
|
|
if tags, exists := request.Metadata["tags"]; exists {
|
|
if tagSlice, ok := tags.([]string); ok {
|
|
doc.Tags = tagSlice
|
|
}
|
|
}
|
|
|
|
im.indexedContent[request.DocumentID] = doc
|
|
im.stats.TotalDocuments++
|
|
im.stats.LastIndexTime = time.Now()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (im *IndexManager) getStats() *IndexStats {
|
|
im.mu.RLock()
|
|
defer im.mu.RUnlock()
|
|
|
|
totalSize := int64(0)
|
|
for _, doc := range im.indexedContent {
|
|
totalSize += doc.Size
|
|
}
|
|
|
|
return &IndexStats{
|
|
TotalDocuments: im.stats.TotalDocuments,
|
|
IndexedToday: im.stats.IndexedToday,
|
|
IndexingErrors: im.stats.IndexingErrors,
|
|
LastIndexTime: im.stats.LastIndexTime,
|
|
IndexSize: totalSize,
|
|
}
|
|
}
|
|
|
|
// NoOpRAGIntegration provides a no-op implementation when RAG is disabled
|
|
type NoOpRAGIntegration struct{}
|
|
|
|
func NewNoOpRAGIntegration() *NoOpRAGIntegration {
|
|
return &NoOpRAGIntegration{}
|
|
}
|
|
|
|
func (nri *NoOpRAGIntegration) Query(ctx context.Context, query string, context map[string]interface{}) (*RAGResponse, error) {
|
|
return &RAGResponse{
|
|
Query: query,
|
|
Answer: "RAG integration is disabled",
|
|
Sources: []*RAGSource{},
|
|
Confidence: 0.0,
|
|
Context: context,
|
|
ProcessedAt: time.Now(),
|
|
}, nil
|
|
}
|
|
|
|
func (nri *NoOpRAGIntegration) EnhanceContext(ctx context.Context, node *slurpContext.ContextNode) (*slurpContext.ContextNode, error) {
|
|
return node, nil
|
|
}
|
|
|
|
func (nri *NoOpRAGIntegration) IndexContent(ctx context.Context, content string, metadata map[string]interface{}) error {
|
|
return nil
|
|
}
|
|
|
|
func (nri *NoOpRAGIntegration) SearchSimilar(ctx context.Context, content string, limit int) ([]*RAGResult, error) {
|
|
return []*RAGResult{}, nil
|
|
}
|
|
|
|
func (nri *NoOpRAGIntegration) UpdateIndex(ctx context.Context, updates []*RAGUpdate) error {
|
|
return nil
|
|
}
|
|
|
|
func (nri *NoOpRAGIntegration) GetRAGStats(ctx context.Context) (*RAGStatistics, error) {
|
|
return &RAGStatistics{}, nil
|
|
} |