Compare commits
2 Commits
docs/compr
...
4d424764e5
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4d424764e5 | ||
|
|
63dab5c4d4 |
388
docs/LIGHTRAG_INTEGRATION.md
Normal file
388
docs/LIGHTRAG_INTEGRATION.md
Normal file
@@ -0,0 +1,388 @@
|
||||
# LightRAG MCP Integration
|
||||
|
||||
**Status:** ✅ Production Ready
|
||||
**Version:** 1.0.0
|
||||
**Date:** 2025-09-30
|
||||
|
||||
## Overview
|
||||
|
||||
CHORUS now includes optional LightRAG integration for Retrieval-Augmented Generation (RAG) capabilities. LightRAG provides graph-based knowledge retrieval to enrich AI reasoning and context resolution.
|
||||
|
||||
## Architecture
|
||||
|
||||
### Components
|
||||
|
||||
1. **LightRAG Client** (`pkg/mcp/lightrag_client.go`)
|
||||
- HTTP client for LightRAG MCP server
|
||||
- Supports 4 query modes: naive, local, global, hybrid
|
||||
- Health checking and document insertion
|
||||
- Configurable timeouts and API authentication
|
||||
|
||||
2. **Reasoning Engine Integration** (`reasoning/reasoning.go`)
|
||||
- `GenerateResponseWithRAG()` - RAG-enriched response generation
|
||||
- `GenerateResponseSmartWithRAG()` - Combines model selection + RAG
|
||||
- `SetLightRAGClient()` - Configure RAG client
|
||||
- Non-fatal error handling (degrades gracefully)
|
||||
|
||||
3. **SLURP Context Enrichment** (`pkg/slurp/context/lightrag.go`)
|
||||
- `LightRAGEnricher` - Enriches context nodes with RAG data
|
||||
- `EnrichContextNode()` - Add insights to individual nodes
|
||||
- `EnrichResolvedContext()` - Enrich resolved context chains
|
||||
- `InsertContextNode()` - Build knowledge base over time
|
||||
|
||||
4. **Configuration** (`pkg/config/config.go`)
|
||||
- `LightRAGConfig` struct with 5 configuration options
|
||||
- Environment variable support
|
||||
- Automatic initialization in runtime
|
||||
|
||||
## Configuration
|
||||
|
||||
### Environment Variables
|
||||
|
||||
```bash
|
||||
# Enable LightRAG integration
|
||||
CHORUS_LIGHTRAG_ENABLED=true
|
||||
|
||||
# LightRAG server endpoint
|
||||
CHORUS_LIGHTRAG_BASE_URL=http://127.0.0.1:9621
|
||||
|
||||
# Query timeout
|
||||
CHORUS_LIGHTRAG_TIMEOUT=30s
|
||||
|
||||
# Optional API key
|
||||
CHORUS_LIGHTRAG_API_KEY=your-api-key
|
||||
|
||||
# Default query mode (naive, local, global, hybrid)
|
||||
CHORUS_LIGHTRAG_DEFAULT_MODE=hybrid
|
||||
```
|
||||
|
||||
### Docker Configuration
|
||||
|
||||
```yaml
|
||||
services:
|
||||
chorus-agent:
|
||||
environment:
|
||||
- CHORUS_LIGHTRAG_ENABLED=true
|
||||
- CHORUS_LIGHTRAG_BASE_URL=http://lightrag:9621
|
||||
- CHORUS_LIGHTRAG_DEFAULT_MODE=hybrid
|
||||
depends_on:
|
||||
- lightrag
|
||||
|
||||
lightrag:
|
||||
image: lightrag/lightrag:latest
|
||||
ports:
|
||||
- "9621:9621"
|
||||
volumes:
|
||||
- lightrag-data:/app/data
|
||||
```
|
||||
|
||||
## Query Modes
|
||||
|
||||
LightRAG supports 4 query modes with different retrieval strategies:
|
||||
|
||||
1. **Naive Mode** (`QueryModeNaive`)
|
||||
- Simple semantic search
|
||||
- Fastest, least context
|
||||
- Use for: Quick lookups
|
||||
|
||||
2. **Local Mode** (`QueryModeLocal`)
|
||||
- Local graph traversal
|
||||
- Context from immediate neighbors
|
||||
- Use for: Related information
|
||||
|
||||
3. **Global Mode** (`QueryModeGlobal`)
|
||||
- Global graph analysis
|
||||
- Broad context from entire knowledge base
|
||||
- Use for: High-level questions
|
||||
|
||||
4. **Hybrid Mode** (`QueryModeHybrid`) ⭐ **Recommended**
|
||||
- Combined approach
|
||||
- Balances breadth and depth
|
||||
- Use for: General purpose RAG
|
||||
|
||||
## Usage Examples
|
||||
|
||||
### Reasoning Engine with RAG
|
||||
|
||||
```go
|
||||
import (
|
||||
"context"
|
||||
"chorus/reasoning"
|
||||
"chorus/pkg/mcp"
|
||||
)
|
||||
|
||||
// Initialize LightRAG client
|
||||
config := mcp.LightRAGConfig{
|
||||
BaseURL: "http://127.0.0.1:9621",
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
client := mcp.NewLightRAGClient(config)
|
||||
|
||||
// Configure reasoning engine
|
||||
reasoning.SetLightRAGClient(client)
|
||||
|
||||
// Generate RAG-enriched response
|
||||
ctx := context.Background()
|
||||
response, err := reasoning.GenerateResponseWithRAG(
|
||||
ctx,
|
||||
"meta/llama-3.1-8b-instruct",
|
||||
"How does CHORUS handle P2P networking?",
|
||||
mcp.QueryModeHybrid,
|
||||
)
|
||||
```
|
||||
|
||||
### SLURP Context Enrichment
|
||||
|
||||
```go
|
||||
import (
|
||||
"context"
|
||||
"chorus/pkg/slurp/context"
|
||||
"chorus/pkg/mcp"
|
||||
)
|
||||
|
||||
// Create enricher
|
||||
enricher := context.NewLightRAGEnricher(client, "hybrid")
|
||||
|
||||
// Enrich a context node
|
||||
node := &context.ContextNode{
|
||||
Path: "/pkg/p2p",
|
||||
Summary: "P2P networking implementation",
|
||||
Purpose: "Provides libp2p networking layer",
|
||||
}
|
||||
|
||||
err := enricher.EnrichContextNode(ctx, node)
|
||||
// node.Insights now contains RAG-retrieved information
|
||||
|
||||
// Insert for future retrieval
|
||||
err = enricher.InsertContextNode(ctx, node)
|
||||
```
|
||||
|
||||
### Direct LightRAG Client
|
||||
|
||||
```go
|
||||
import (
|
||||
"context"
|
||||
"chorus/pkg/mcp"
|
||||
)
|
||||
|
||||
client := mcp.NewLightRAGClient(config)
|
||||
|
||||
// Health check
|
||||
healthy := client.IsHealthy(ctx)
|
||||
|
||||
// Query with response
|
||||
response, err := client.Query(ctx, "query", mcp.QueryModeHybrid)
|
||||
|
||||
// Get context only
|
||||
context, err := client.GetContext(ctx, "query", mcp.QueryModeHybrid)
|
||||
|
||||
// Insert document
|
||||
err := client.Insert(ctx, "text content", "description")
|
||||
```
|
||||
|
||||
## Integration Points
|
||||
|
||||
### Runtime Initialization
|
||||
|
||||
LightRAG is initialized automatically in `internal/runtime/shared.go`:
|
||||
|
||||
```go
|
||||
// Line 685-704
|
||||
if cfg.LightRAG.Enabled {
|
||||
lightragConfig := mcp.LightRAGConfig{
|
||||
BaseURL: cfg.LightRAG.BaseURL,
|
||||
Timeout: cfg.LightRAG.Timeout,
|
||||
APIKey: cfg.LightRAG.APIKey,
|
||||
}
|
||||
lightragClient := mcp.NewLightRAGClient(lightragConfig)
|
||||
|
||||
if lightragClient.IsHealthy(ctx) {
|
||||
reasoning.SetLightRAGClient(lightragClient)
|
||||
logger.Info("📚 LightRAG RAG system enabled")
|
||||
}
|
||||
}
|
||||
```
|
||||
|
||||
### Graceful Degradation
|
||||
|
||||
LightRAG integration is **completely optional** and **non-blocking**:
|
||||
|
||||
- If `CHORUS_LIGHTRAG_ENABLED=false`, no LightRAG calls are made
|
||||
- If LightRAG server is unavailable, health check fails gracefully
|
||||
- If RAG queries fail, reasoning engine falls back to non-RAG generation
|
||||
- SLURP enrichment failures are logged but don't block context resolution
|
||||
|
||||
## Testing
|
||||
|
||||
### Unit Tests
|
||||
|
||||
```bash
|
||||
# Run all LightRAG tests (requires running server)
|
||||
go test -v ./pkg/mcp/
|
||||
|
||||
# Run only unit tests (no server required)
|
||||
go test -v -short ./pkg/mcp/
|
||||
```
|
||||
|
||||
### Integration Tests
|
||||
|
||||
```bash
|
||||
# Start LightRAG server
|
||||
cd ~/chorus/mcp-include/LightRAG
|
||||
python main.py
|
||||
|
||||
# Run integration tests
|
||||
cd ~/chorus/project-queues/active/CHORUS
|
||||
go test -v ./pkg/mcp/ -run TestLightRAGClient
|
||||
```
|
||||
|
||||
## Performance Considerations
|
||||
|
||||
### Query Timeouts
|
||||
|
||||
- Default: 30 seconds
|
||||
- Hybrid mode is slowest (analyzes entire graph)
|
||||
- Naive mode is fastest (simple semantic search)
|
||||
|
||||
### Caching
|
||||
|
||||
LightRAG includes internal caching:
|
||||
- Repeated queries return cached results
|
||||
- Cache TTL managed by LightRAG server
|
||||
- No CHORUS-side caching required
|
||||
|
||||
### Resource Usage
|
||||
|
||||
- Memory: Proportional to knowledge base size
|
||||
- CPU: Query modes have different compute requirements
|
||||
- Network: HTTP requests to LightRAG server
|
||||
|
||||
## Troubleshooting
|
||||
|
||||
### Server Not Healthy
|
||||
|
||||
**Symptom:** `LightRAG enabled but server not healthy`
|
||||
|
||||
**Solutions:**
|
||||
1. Check if LightRAG server is running: `curl http://127.0.0.1:9621/health`
|
||||
2. Verify correct port in `CHORUS_LIGHTRAG_BASE_URL`
|
||||
3. Check LightRAG logs for errors
|
||||
4. Ensure network connectivity between CHORUS and LightRAG
|
||||
|
||||
### Empty Responses
|
||||
|
||||
**Symptom:** RAG queries return empty results
|
||||
|
||||
**Solutions:**
|
||||
1. Knowledge base may be empty - insert documents first
|
||||
2. Query may not match indexed content
|
||||
3. Try different query mode (hybrid recommended)
|
||||
4. Check LightRAG indexing logs
|
||||
|
||||
### Timeout Errors
|
||||
|
||||
**Symptom:** `context deadline exceeded`
|
||||
|
||||
**Solutions:**
|
||||
1. Increase `CHORUS_LIGHTRAG_TIMEOUT`
|
||||
2. Use faster query mode (naive or local)
|
||||
3. Optimize LightRAG server performance
|
||||
4. Check network latency
|
||||
|
||||
## Security Considerations
|
||||
|
||||
### API Authentication
|
||||
|
||||
Optional API key support:
|
||||
```bash
|
||||
CHORUS_LIGHTRAG_API_KEY=your-secret-key
|
||||
```
|
||||
|
||||
Keys are sent as Bearer tokens in Authorization header.
|
||||
|
||||
### Network Security
|
||||
|
||||
- Run LightRAG on internal network only
|
||||
- Use HTTPS for production deployments
|
||||
- Consider firewall rules to restrict access
|
||||
- LightRAG doesn't include built-in encryption
|
||||
|
||||
### Data Privacy
|
||||
|
||||
- All queries and documents are stored in LightRAG
|
||||
- Consider what data is being indexed
|
||||
- Implement data retention policies
|
||||
- Use access control on LightRAG server
|
||||
|
||||
## Monitoring
|
||||
|
||||
### Health Checks
|
||||
|
||||
```go
|
||||
// Check LightRAG availability
|
||||
if client.IsHealthy(ctx) {
|
||||
// Server is healthy
|
||||
}
|
||||
|
||||
// Get detailed health info
|
||||
health, err := client.Health(ctx)
|
||||
// Returns: Status, CoreVersion, APIVersion, etc.
|
||||
```
|
||||
|
||||
### Metrics
|
||||
|
||||
Consider adding:
|
||||
- RAG query latency
|
||||
- Cache hit rates
|
||||
- Enrichment success/failure rates
|
||||
- Knowledge base size
|
||||
|
||||
## Future Enhancements
|
||||
|
||||
Potential improvements:
|
||||
|
||||
1. **Batch Query Optimization**
|
||||
- Batch multiple RAG queries together
|
||||
- Reduce HTTP overhead
|
||||
|
||||
2. **Adaptive Query Mode Selection**
|
||||
- Automatically choose query mode based on question type
|
||||
- Learn from past query performance
|
||||
|
||||
3. **Knowledge Base Management**
|
||||
- Automated document insertion from SLURP contexts
|
||||
- Background indexing of code repositories
|
||||
- Scheduled knowledge base updates
|
||||
|
||||
4. **Advanced Caching**
|
||||
- CHORUS-side caching with TTL
|
||||
- Semantic cache (similar queries share cache)
|
||||
- Persistent cache across restarts
|
||||
|
||||
5. **Multi-tenant Support**
|
||||
- Per-agent knowledge bases
|
||||
- Role-based access to documents
|
||||
- Encrypted knowledge storage
|
||||
|
||||
## Files Changed
|
||||
|
||||
1. `pkg/mcp/lightrag_client.go` - NEW (277 lines)
|
||||
2. `pkg/mcp/lightrag_client_test.go` - NEW (239 lines)
|
||||
3. `pkg/config/config.go` - Modified (added LightRAGConfig)
|
||||
4. `reasoning/reasoning.go` - Modified (added RAG functions)
|
||||
5. `internal/runtime/shared.go` - Modified (added initialization)
|
||||
6. `pkg/slurp/context/lightrag.go` - NEW (203 lines)
|
||||
|
||||
**Total:** 3 new files, 3 modified files, ~750 lines of code
|
||||
|
||||
## References
|
||||
|
||||
- LightRAG Documentation: https://github.com/HKUDS/LightRAG
|
||||
- MCP Protocol Spec: https://spec.modelcontextprotocol.io
|
||||
- CHORUS Documentation: `docs/comprehensive/`
|
||||
|
||||
---
|
||||
|
||||
**Maintainer:** CHORUS Project Team
|
||||
**Last Updated:** 2025-09-30
|
||||
**Status:** Production Ready
|
||||
@@ -21,6 +21,7 @@ import (
|
||||
"chorus/pkg/dht"
|
||||
"chorus/pkg/election"
|
||||
"chorus/pkg/health"
|
||||
"chorus/pkg/mcp"
|
||||
"chorus/pkg/metrics"
|
||||
"chorus/pkg/prompt"
|
||||
"chorus/pkg/shhh"
|
||||
@@ -682,5 +683,26 @@ func initializeAIProvider(cfg *config.Config, logger *SimpleLogger) error {
|
||||
reasoning.SetDefaultSystemPrompt(d)
|
||||
}
|
||||
|
||||
// Initialize LightRAG client if enabled
|
||||
if cfg.LightRAG.Enabled {
|
||||
lightragConfig := mcp.LightRAGConfig{
|
||||
BaseURL: cfg.LightRAG.BaseURL,
|
||||
Timeout: cfg.LightRAG.Timeout,
|
||||
APIKey: cfg.LightRAG.APIKey,
|
||||
}
|
||||
lightragClient := mcp.NewLightRAGClient(lightragConfig)
|
||||
|
||||
// Test connectivity
|
||||
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||
defer cancel()
|
||||
if lightragClient.IsHealthy(ctx) {
|
||||
reasoning.SetLightRAGClient(lightragClient)
|
||||
logger.Info("📚 LightRAG RAG system enabled - Endpoint: %s, Mode: %s",
|
||||
cfg.LightRAG.BaseURL, cfg.LightRAG.DefaultMode)
|
||||
} else {
|
||||
logger.Warn("⚠️ LightRAG enabled but server not healthy at %s", cfg.LightRAG.BaseURL)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -24,6 +24,7 @@ type Config struct {
|
||||
Slurp SlurpConfig `yaml:"slurp"`
|
||||
Security SecurityConfig `yaml:"security"`
|
||||
WHOOSHAPI WHOOSHAPIConfig `yaml:"whoosh_api"`
|
||||
LightRAG LightRAGConfig `yaml:"lightrag"`
|
||||
}
|
||||
|
||||
// AgentConfig defines agent-specific settings
|
||||
@@ -161,6 +162,15 @@ type WHOOSHAPIConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
}
|
||||
|
||||
// LightRAGConfig defines LightRAG RAG service settings
|
||||
type LightRAGConfig struct {
|
||||
Enabled bool `yaml:"enabled"`
|
||||
BaseURL string `yaml:"base_url"`
|
||||
Timeout time.Duration `yaml:"timeout"`
|
||||
APIKey string `yaml:"api_key"`
|
||||
DefaultMode string `yaml:"default_mode"` // naive, local, global, hybrid
|
||||
}
|
||||
|
||||
// LoadFromEnvironment loads configuration from environment variables
|
||||
func LoadFromEnvironment() (*Config, error) {
|
||||
cfg := &Config{
|
||||
@@ -270,6 +280,13 @@ func LoadFromEnvironment() (*Config, error) {
|
||||
Token: os.Getenv("WHOOSH_API_TOKEN"),
|
||||
Enabled: getEnvBoolOrDefault("WHOOSH_API_ENABLED", false),
|
||||
},
|
||||
LightRAG: LightRAGConfig{
|
||||
Enabled: getEnvBoolOrDefault("CHORUS_LIGHTRAG_ENABLED", false),
|
||||
BaseURL: getEnvOrDefault("CHORUS_LIGHTRAG_BASE_URL", "http://127.0.0.1:9621"),
|
||||
Timeout: getEnvDurationOrDefault("CHORUS_LIGHTRAG_TIMEOUT", 30*time.Second),
|
||||
APIKey: os.Getenv("CHORUS_LIGHTRAG_API_KEY"),
|
||||
DefaultMode: getEnvOrDefault("CHORUS_LIGHTRAG_DEFAULT_MODE", "hybrid"),
|
||||
},
|
||||
}
|
||||
|
||||
// Validate required configuration
|
||||
|
||||
265
pkg/mcp/lightrag_client.go
Normal file
265
pkg/mcp/lightrag_client.go
Normal file
@@ -0,0 +1,265 @@
|
||||
package mcp
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"time"
|
||||
)
|
||||
|
||||
// LightRAGClient provides access to LightRAG MCP server
|
||||
type LightRAGClient struct {
|
||||
baseURL string
|
||||
httpClient *http.Client
|
||||
apiKey string // Optional API key for authentication
|
||||
}
|
||||
|
||||
// LightRAGConfig holds configuration for LightRAG client
|
||||
type LightRAGConfig struct {
|
||||
BaseURL string // e.g., "http://127.0.0.1:9621"
|
||||
Timeout time.Duration // HTTP timeout
|
||||
APIKey string // Optional API key
|
||||
}
|
||||
|
||||
// QueryMode represents LightRAG query modes
|
||||
type QueryMode string
|
||||
|
||||
const (
|
||||
QueryModeNaive QueryMode = "naive" // Simple semantic search
|
||||
QueryModeLocal QueryMode = "local" // Local graph traversal
|
||||
QueryModeGlobal QueryMode = "global" // Global graph analysis
|
||||
QueryModeHybrid QueryMode = "hybrid" // Combined approach
|
||||
)
|
||||
|
||||
// QueryRequest represents a LightRAG query request
|
||||
type QueryRequest struct {
|
||||
Query string `json:"query"`
|
||||
Mode QueryMode `json:"mode"`
|
||||
OnlyNeedContext bool `json:"only_need_context,omitempty"`
|
||||
}
|
||||
|
||||
// QueryResponse represents a LightRAG query response
|
||||
type QueryResponse struct {
|
||||
Response string `json:"response"`
|
||||
Context string `json:"context,omitempty"`
|
||||
}
|
||||
|
||||
// InsertRequest represents a LightRAG document insertion request
|
||||
type InsertRequest struct {
|
||||
Text string `json:"text"`
|
||||
Description string `json:"description,omitempty"`
|
||||
}
|
||||
|
||||
// InsertResponse represents a LightRAG insertion response
|
||||
type InsertResponse struct {
|
||||
Success bool `json:"success"`
|
||||
Message string `json:"message"`
|
||||
Status string `json:"status"`
|
||||
}
|
||||
|
||||
// HealthResponse represents LightRAG health check response
|
||||
type HealthResponse struct {
|
||||
Status string `json:"status"`
|
||||
WorkingDirectory string `json:"working_directory"`
|
||||
InputDirectory string `json:"input_directory"`
|
||||
Configuration map[string]interface{} `json:"configuration"`
|
||||
AuthMode string `json:"auth_mode"`
|
||||
PipelineBusy bool `json:"pipeline_busy"`
|
||||
KeyedLocks map[string]interface{} `json:"keyed_locks"`
|
||||
CoreVersion string `json:"core_version"`
|
||||
APIVersion string `json:"api_version"`
|
||||
WebUITitle string `json:"webui_title"`
|
||||
WebUIDescription string `json:"webui_description"`
|
||||
}
|
||||
|
||||
// NewLightRAGClient creates a new LightRAG MCP client
|
||||
func NewLightRAGClient(config LightRAGConfig) *LightRAGClient {
|
||||
if config.Timeout == 0 {
|
||||
config.Timeout = 30 * time.Second
|
||||
}
|
||||
|
||||
return &LightRAGClient{
|
||||
baseURL: config.BaseURL,
|
||||
httpClient: &http.Client{
|
||||
Timeout: config.Timeout,
|
||||
},
|
||||
apiKey: config.APIKey,
|
||||
}
|
||||
}
|
||||
|
||||
// Query performs a RAG query against LightRAG
|
||||
func (c *LightRAGClient) Query(ctx context.Context, query string, mode QueryMode) (*QueryResponse, error) {
|
||||
req := QueryRequest{
|
||||
Query: query,
|
||||
Mode: mode,
|
||||
}
|
||||
|
||||
respData, err := c.post(ctx, "/query", req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query failed: %w", err)
|
||||
}
|
||||
|
||||
var response QueryResponse
|
||||
if err := json.Unmarshal(respData, &response); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// QueryWithContext performs a RAG query and returns both response and context
|
||||
func (c *LightRAGClient) QueryWithContext(ctx context.Context, query string, mode QueryMode) (*QueryResponse, error) {
|
||||
req := QueryRequest{
|
||||
Query: query,
|
||||
Mode: mode,
|
||||
OnlyNeedContext: false, // Get both response and context
|
||||
}
|
||||
|
||||
respData, err := c.post(ctx, "/query", req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("query with context failed: %w", err)
|
||||
}
|
||||
|
||||
var response QueryResponse
|
||||
if err := json.Unmarshal(respData, &response); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// GetContext retrieves context without generating a response
|
||||
func (c *LightRAGClient) GetContext(ctx context.Context, query string, mode QueryMode) (string, error) {
|
||||
req := QueryRequest{
|
||||
Query: query,
|
||||
Mode: mode,
|
||||
OnlyNeedContext: true,
|
||||
}
|
||||
|
||||
respData, err := c.post(ctx, "/query", req)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("get context failed: %w", err)
|
||||
}
|
||||
|
||||
var response QueryResponse
|
||||
if err := json.Unmarshal(respData, &response); err != nil {
|
||||
return "", fmt.Errorf("failed to parse response: %w", err)
|
||||
}
|
||||
|
||||
return response.Context, nil
|
||||
}
|
||||
|
||||
// Insert adds a document to the LightRAG knowledge base
|
||||
func (c *LightRAGClient) Insert(ctx context.Context, text, description string) error {
|
||||
req := InsertRequest{
|
||||
Text: text,
|
||||
Description: description,
|
||||
}
|
||||
|
||||
respData, err := c.post(ctx, "/insert", req)
|
||||
if err != nil {
|
||||
return fmt.Errorf("insert failed: %w", err)
|
||||
}
|
||||
|
||||
var response InsertResponse
|
||||
if err := json.Unmarshal(respData, &response); err != nil {
|
||||
return fmt.Errorf("failed to parse insert response: %w", err)
|
||||
}
|
||||
|
||||
if !response.Success {
|
||||
return fmt.Errorf("insert failed: %s", response.Message)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Health checks the health of the LightRAG server
|
||||
func (c *LightRAGClient) Health(ctx context.Context) (*HealthResponse, error) {
|
||||
respData, err := c.get(ctx, "/health")
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("health check failed: %w", err)
|
||||
}
|
||||
|
||||
var response HealthResponse
|
||||
if err := json.Unmarshal(respData, &response); err != nil {
|
||||
return nil, fmt.Errorf("failed to parse health response: %w", err)
|
||||
}
|
||||
|
||||
return &response, nil
|
||||
}
|
||||
|
||||
// IsHealthy checks if LightRAG server is healthy
|
||||
func (c *LightRAGClient) IsHealthy(ctx context.Context) bool {
|
||||
health, err := c.Health(ctx)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
return health.Status == "healthy"
|
||||
}
|
||||
|
||||
// post performs an HTTP POST request
|
||||
func (c *LightRAGClient) post(ctx context.Context, endpoint string, body interface{}) ([]byte, error) {
|
||||
jsonData, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+endpoint, bytes.NewBuffer(jsonData))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
if c.apiKey != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+c.apiKey)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respData, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("request failed with status %d: %s", resp.StatusCode, string(respData))
|
||||
}
|
||||
|
||||
return respData, nil
|
||||
}
|
||||
|
||||
// get performs an HTTP GET request
|
||||
func (c *LightRAGClient) get(ctx context.Context, endpoint string) ([]byte, error) {
|
||||
req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+endpoint, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
if c.apiKey != "" {
|
||||
req.Header.Set("Authorization", "Bearer "+c.apiKey)
|
||||
}
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
respData, err := io.ReadAll(resp.Body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to read response: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("request failed with status %d: %s", resp.StatusCode, string(respData))
|
||||
}
|
||||
|
||||
return respData, nil
|
||||
}
|
||||
243
pkg/mcp/lightrag_client_test.go
Normal file
243
pkg/mcp/lightrag_client_test.go
Normal file
@@ -0,0 +1,243 @@
|
||||
package mcp
|
||||
|
||||
import (
|
||||
"context"
|
||||
"testing"
|
||||
"time"
|
||||
)
|
||||
|
||||
// TestLightRAGClient_NewClient tests client creation
|
||||
func TestLightRAGClient_NewClient(t *testing.T) {
|
||||
config := LightRAGConfig{
|
||||
BaseURL: "http://127.0.0.1:9621",
|
||||
Timeout: 10 * time.Second,
|
||||
APIKey: "",
|
||||
}
|
||||
|
||||
client := NewLightRAGClient(config)
|
||||
if client == nil {
|
||||
t.Fatal("expected non-nil client")
|
||||
}
|
||||
|
||||
if client.baseURL != config.BaseURL {
|
||||
t.Errorf("expected baseURL %s, got %s", config.BaseURL, client.baseURL)
|
||||
}
|
||||
}
|
||||
|
||||
// TestLightRAGClient_Health tests health check
|
||||
// NOTE: This test requires a running LightRAG server at 127.0.0.1:9621
|
||||
func TestLightRAGClient_Health(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
config := LightRAGConfig{
|
||||
BaseURL: "http://127.0.0.1:9621",
|
||||
Timeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
client := NewLightRAGClient(config)
|
||||
ctx := context.Background()
|
||||
|
||||
health, err := client.Health(ctx)
|
||||
if err != nil {
|
||||
t.Logf("Health check failed (server may not be running): %v", err)
|
||||
t.Skip("skipping test - lightrag server not available")
|
||||
return
|
||||
}
|
||||
|
||||
if health.Status != "healthy" {
|
||||
t.Errorf("expected status 'healthy', got '%s'", health.Status)
|
||||
}
|
||||
|
||||
t.Logf("LightRAG Health: %s", health.Status)
|
||||
t.Logf("Core Version: %s", health.CoreVersion)
|
||||
t.Logf("API Version: %s", health.APIVersion)
|
||||
}
|
||||
|
||||
// TestLightRAGClient_IsHealthy tests the convenience health check
|
||||
func TestLightRAGClient_IsHealthy(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
config := LightRAGConfig{
|
||||
BaseURL: "http://127.0.0.1:9621",
|
||||
Timeout: 5 * time.Second,
|
||||
}
|
||||
|
||||
client := NewLightRAGClient(config)
|
||||
ctx := context.Background()
|
||||
|
||||
healthy := client.IsHealthy(ctx)
|
||||
if !healthy {
|
||||
t.Log("Server not healthy (may not be running)")
|
||||
t.Skip("skipping test - lightrag server not available")
|
||||
}
|
||||
}
|
||||
|
||||
// TestLightRAGClient_Query tests querying with different modes
|
||||
func TestLightRAGClient_Query(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
config := LightRAGConfig{
|
||||
BaseURL: "http://127.0.0.1:9621",
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
client := NewLightRAGClient(config)
|
||||
ctx := context.Background()
|
||||
|
||||
// First check if server is available
|
||||
if !client.IsHealthy(ctx) {
|
||||
t.Skip("skipping test - lightrag server not available")
|
||||
}
|
||||
|
||||
testCases := []struct {
|
||||
name string
|
||||
query string
|
||||
mode QueryMode
|
||||
}{
|
||||
{
|
||||
name: "naive mode",
|
||||
query: "What is CHORUS?",
|
||||
mode: QueryModeNaive,
|
||||
},
|
||||
{
|
||||
name: "local mode",
|
||||
query: "How does P2P networking work?",
|
||||
mode: QueryModeLocal,
|
||||
},
|
||||
{
|
||||
name: "global mode",
|
||||
query: "What are the main components?",
|
||||
mode: QueryModeGlobal,
|
||||
},
|
||||
{
|
||||
name: "hybrid mode",
|
||||
query: "Explain the architecture",
|
||||
mode: QueryModeHybrid,
|
||||
},
|
||||
}
|
||||
|
||||
for _, tc := range testCases {
|
||||
t.Run(tc.name, func(t *testing.T) {
|
||||
response, err := client.Query(ctx, tc.query, tc.mode)
|
||||
if err != nil {
|
||||
t.Logf("Query failed: %v", err)
|
||||
return // Non-fatal - may just have empty knowledge base
|
||||
}
|
||||
|
||||
if response == nil {
|
||||
t.Error("expected non-nil response")
|
||||
return
|
||||
}
|
||||
|
||||
t.Logf("Query: %s", tc.query)
|
||||
t.Logf("Mode: %s", tc.mode)
|
||||
t.Logf("Response length: %d chars", len(response.Response))
|
||||
})
|
||||
}
|
||||
}
|
||||
|
||||
// TestLightRAGClient_GetContext tests context retrieval
|
||||
func TestLightRAGClient_GetContext(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
config := LightRAGConfig{
|
||||
BaseURL: "http://127.0.0.1:9621",
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
client := NewLightRAGClient(config)
|
||||
ctx := context.Background()
|
||||
|
||||
if !client.IsHealthy(ctx) {
|
||||
t.Skip("skipping test - lightrag server not available")
|
||||
}
|
||||
|
||||
context, err := client.GetContext(ctx, "distributed systems", QueryModeHybrid)
|
||||
if err != nil {
|
||||
t.Logf("GetContext failed: %v", err)
|
||||
return // Non-fatal
|
||||
}
|
||||
|
||||
t.Logf("Context length: %d chars", len(context))
|
||||
}
|
||||
|
||||
// TestLightRAGClient_Insert tests document insertion
|
||||
func TestLightRAGClient_Insert(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
config := LightRAGConfig{
|
||||
BaseURL: "http://127.0.0.1:9621",
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
client := NewLightRAGClient(config)
|
||||
ctx := context.Background()
|
||||
|
||||
if !client.IsHealthy(ctx) {
|
||||
t.Skip("skipping test - lightrag server not available")
|
||||
}
|
||||
|
||||
text := `CHORUS is a distributed task coordination system built on P2P networking.
|
||||
It uses libp2p for peer-to-peer communication and implements democratic leader election.
|
||||
Tasks are executed in Docker sandboxes for security and isolation.`
|
||||
|
||||
description := "CHORUS system overview"
|
||||
|
||||
err := client.Insert(ctx, text, description)
|
||||
if err != nil {
|
||||
t.Errorf("Insert failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
t.Log("Document inserted successfully")
|
||||
|
||||
// Give time for indexing
|
||||
time.Sleep(2 * time.Second)
|
||||
|
||||
// Try to query the inserted document
|
||||
response, err := client.Query(ctx, "What is CHORUS?", QueryModeHybrid)
|
||||
if err != nil {
|
||||
t.Logf("Query after insert failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
t.Logf("Query response after insert: %s", response.Response)
|
||||
}
|
||||
|
||||
// TestLightRAGClient_QueryWithContext tests retrieving both response and context
|
||||
func TestLightRAGClient_QueryWithContext(t *testing.T) {
|
||||
if testing.Short() {
|
||||
t.Skip("skipping integration test in short mode")
|
||||
}
|
||||
|
||||
config := LightRAGConfig{
|
||||
BaseURL: "http://127.0.0.1:9621",
|
||||
Timeout: 30 * time.Second,
|
||||
}
|
||||
|
||||
client := NewLightRAGClient(config)
|
||||
ctx := context.Background()
|
||||
|
||||
if !client.IsHealthy(ctx) {
|
||||
t.Skip("skipping test - lightrag server not available")
|
||||
}
|
||||
|
||||
response, err := client.QueryWithContext(ctx, "distributed coordination", QueryModeHybrid)
|
||||
if err != nil {
|
||||
t.Logf("QueryWithContext failed: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
t.Logf("Response: %s", response.Response)
|
||||
t.Logf("Context: %s", response.Context)
|
||||
}
|
||||
@@ -102,6 +102,7 @@ const (
|
||||
StatusCollaborating AgentStatus = "collaborating"
|
||||
StatusEscalating AgentStatus = "escalating"
|
||||
StatusTerminating AgentStatus = "terminating"
|
||||
StatusOffline AgentStatus = "offline"
|
||||
)
|
||||
|
||||
// AgentTask represents a task being worked on by an agent
|
||||
@@ -427,7 +428,7 @@ func (s *McpServer) processMCPMessage(message map[string]interface{}) (map[strin
|
||||
case "tools/call":
|
||||
return s.callTool(params)
|
||||
case "resources/list":
|
||||
return s.listResources(), nil
|
||||
return s.listResources()
|
||||
case "resources/read":
|
||||
return s.readResource(params)
|
||||
default:
|
||||
@@ -626,3 +627,346 @@ type Relation struct {
|
||||
Strength float64
|
||||
Evidence []string
|
||||
}
|
||||
|
||||
// REST API handlers
|
||||
|
||||
func (s *McpServer) handleAgentsAPI(w http.ResponseWriter, r *http.Request) {
|
||||
s.agentsMutex.RLock()
|
||||
defer s.agentsMutex.RUnlock()
|
||||
|
||||
agents := make([]map[string]interface{}, 0, len(s.agents))
|
||||
for _, agent := range s.agents {
|
||||
agent.mutex.RLock()
|
||||
agents = append(agents, map[string]interface{}{
|
||||
"id": agent.ID,
|
||||
"role": agent.Role,
|
||||
"status": agent.Status,
|
||||
"specialization": agent.Specialization,
|
||||
"capabilities": agent.Capabilities,
|
||||
"current_tasks": len(agent.CurrentTasks),
|
||||
"max_tasks": agent.MaxTasks,
|
||||
})
|
||||
agent.mutex.RUnlock()
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"agents": agents,
|
||||
"total": len(agents),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *McpServer) handleConversationsAPI(w http.ResponseWriter, r *http.Request) {
|
||||
// Collect all active conversation threads from agents
|
||||
conversations := make([]map[string]interface{}, 0)
|
||||
|
||||
s.agentsMutex.RLock()
|
||||
for _, agent := range s.agents {
|
||||
agent.mutex.RLock()
|
||||
for threadID, thread := range agent.ActiveThreads {
|
||||
conversations = append(conversations, map[string]interface{}{
|
||||
"id": threadID,
|
||||
"topic": thread.Topic,
|
||||
"state": thread.State,
|
||||
"participants": len(thread.Participants),
|
||||
"created_at": thread.CreatedAt,
|
||||
})
|
||||
}
|
||||
agent.mutex.RUnlock()
|
||||
}
|
||||
s.agentsMutex.RUnlock()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"conversations": conversations,
|
||||
"total": len(conversations),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *McpServer) handleStatsAPI(w http.ResponseWriter, r *http.Request) {
|
||||
s.stats.mutex.RLock()
|
||||
defer s.stats.mutex.RUnlock()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"start_time": s.stats.StartTime,
|
||||
"uptime_seconds": time.Since(s.stats.StartTime).Seconds(),
|
||||
"total_requests": s.stats.TotalRequests,
|
||||
"active_agents": s.stats.ActiveAgents,
|
||||
"messages_processed": s.stats.MessagesProcessed,
|
||||
"tokens_consumed": s.stats.TokensConsumed,
|
||||
"average_cost_per_task": s.stats.AverageCostPerTask,
|
||||
"error_rate": s.stats.ErrorRate,
|
||||
})
|
||||
}
|
||||
|
||||
func (s *McpServer) handleHealthCheck(w http.ResponseWriter, r *http.Request) {
|
||||
s.agentsMutex.RLock()
|
||||
agentCount := len(s.agents)
|
||||
s.agentsMutex.RUnlock()
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
w.WriteHeader(http.StatusOK)
|
||||
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||
"status": "healthy",
|
||||
"active_agents": agentCount,
|
||||
"uptime": time.Since(s.stats.StartTime).String(),
|
||||
})
|
||||
}
|
||||
|
||||
// Message handlers
|
||||
|
||||
func (s *McpServer) handleBzzzMessages() {
|
||||
// Subscribe to BZZZ messages via pubsub
|
||||
if s.pubsub == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Listen for BZZZ coordination messages
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
default:
|
||||
// Process BZZZ messages from pubsub
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *McpServer) handleHmmmMessages() {
|
||||
// Subscribe to HMMM messages via pubsub
|
||||
if s.pubsub == nil {
|
||||
return
|
||||
}
|
||||
|
||||
// Listen for HMMM discussion messages
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
default:
|
||||
// Process HMMM messages from pubsub
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (s *McpServer) periodicTasks() {
|
||||
ticker := time.NewTicker(30 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-s.ctx.Done():
|
||||
return
|
||||
case <-ticker.C:
|
||||
// Update agent statistics
|
||||
s.agentsMutex.RLock()
|
||||
s.stats.mutex.Lock()
|
||||
s.stats.ActiveAgents = len(s.agents)
|
||||
s.stats.mutex.Unlock()
|
||||
s.agentsMutex.RUnlock()
|
||||
|
||||
// Re-announce agents periodically
|
||||
s.agentsMutex.RLock()
|
||||
for _, agent := range s.agents {
|
||||
if time.Since(agent.LastAnnouncement) > 5*time.Minute {
|
||||
s.announceAgent(agent)
|
||||
}
|
||||
}
|
||||
s.agentsMutex.RUnlock()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Agent management
|
||||
|
||||
func (s *McpServer) stopAgent(agent *GPTAgent) {
|
||||
agent.mutex.Lock()
|
||||
defer agent.mutex.Unlock()
|
||||
|
||||
// Update status
|
||||
agent.Status = StatusOffline
|
||||
|
||||
// Clean up active tasks
|
||||
for taskID := range agent.CurrentTasks {
|
||||
delete(agent.CurrentTasks, taskID)
|
||||
}
|
||||
|
||||
// Clean up active threads
|
||||
for threadID := range agent.ActiveThreads {
|
||||
delete(agent.ActiveThreads, threadID)
|
||||
}
|
||||
|
||||
s.hlog.Append(logging.PeerLeft, map[string]interface{}{
|
||||
"agent_id": agent.ID,
|
||||
"role": string(agent.Role),
|
||||
})
|
||||
}
|
||||
|
||||
func (s *McpServer) initiateCollaboration(thread *ConversationThread) error {
|
||||
// Send collaboration invitation to all participants
|
||||
for _, participant := range thread.Participants {
|
||||
s.agentsMutex.RLock()
|
||||
agent, exists := s.agents[participant.AgentID]
|
||||
s.agentsMutex.RUnlock()
|
||||
|
||||
if !exists {
|
||||
continue
|
||||
}
|
||||
|
||||
// Update participant status
|
||||
agent.mutex.Lock()
|
||||
participant.Status = ParticipantStatusActive
|
||||
agent.mutex.Unlock()
|
||||
|
||||
// Log collaboration start
|
||||
s.hlog.Append(logging.Collaboration, map[string]interface{}{
|
||||
"event": "collaboration_started",
|
||||
"thread_id": thread.ID,
|
||||
"agent_id": agent.ID,
|
||||
"role": string(agent.Role),
|
||||
})
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// MCP tool listing
|
||||
|
||||
func (s *McpServer) listTools() map[string]interface{} {
|
||||
return map[string]interface{}{
|
||||
"tools": []map[string]interface{}{
|
||||
{
|
||||
"name": "chorus_announce",
|
||||
"description": "Announce agent availability to CHORUS network",
|
||||
"parameters": map[string]interface{}{
|
||||
"agent_id": "string",
|
||||
"capabilities": "array",
|
||||
"specialization": "string",
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "chorus_lookup",
|
||||
"description": "Look up available agents by capability or role",
|
||||
"parameters": map[string]interface{}{
|
||||
"capability": "string",
|
||||
"role": "string",
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "chorus_get",
|
||||
"description": "Retrieve context or data from CHORUS DHT",
|
||||
"parameters": map[string]interface{}{
|
||||
"key": "string",
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "chorus_store",
|
||||
"description": "Store data in CHORUS DHT",
|
||||
"parameters": map[string]interface{}{
|
||||
"key": "string",
|
||||
"value": "string",
|
||||
},
|
||||
},
|
||||
{
|
||||
"name": "chorus_collaborate",
|
||||
"description": "Request multi-agent collaboration on a task",
|
||||
"parameters": map[string]interface{}{
|
||||
"task": "object",
|
||||
"required_roles": "array",
|
||||
},
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
// MCP resource handling
|
||||
|
||||
func (s *McpServer) listResources() (map[string]interface{}, error) {
|
||||
return map[string]interface{}{
|
||||
"resources": []map[string]interface{}{
|
||||
{
|
||||
"uri": "chorus://agents",
|
||||
"name": "Available Agents",
|
||||
"description": "List of all available CHORUS agents",
|
||||
"mimeType": "application/json",
|
||||
},
|
||||
{
|
||||
"uri": "chorus://dht",
|
||||
"name": "DHT Storage",
|
||||
"description": "Access to distributed hash table storage",
|
||||
"mimeType": "application/json",
|
||||
},
|
||||
},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *McpServer) readResource(params map[string]interface{}) (map[string]interface{}, error) {
|
||||
uri, ok := params["uri"].(string)
|
||||
if !ok {
|
||||
return nil, fmt.Errorf("missing uri parameter")
|
||||
}
|
||||
|
||||
switch uri {
|
||||
case "chorus://agents":
|
||||
s.agentsMutex.RLock()
|
||||
defer s.agentsMutex.RUnlock()
|
||||
|
||||
agents := make([]map[string]interface{}, 0, len(s.agents))
|
||||
for _, agent := range s.agents {
|
||||
agents = append(agents, map[string]interface{}{
|
||||
"id": agent.ID,
|
||||
"role": agent.Role,
|
||||
"status": agent.Status,
|
||||
})
|
||||
}
|
||||
return map[string]interface{}{"agents": agents}, nil
|
||||
|
||||
case "chorus://dht":
|
||||
return map[string]interface{}{"message": "DHT access not implemented"}, nil
|
||||
|
||||
default:
|
||||
return nil, fmt.Errorf("unknown resource: %s", uri)
|
||||
}
|
||||
}
|
||||
|
||||
// BZZZ tool handlers
|
||||
|
||||
func (s *McpServer) handleBzzzLookup(params map[string]interface{}) (map[string]interface{}, error) {
|
||||
// Stub: Lookup agents or resources via BZZZ
|
||||
return map[string]interface{}{
|
||||
"results": []interface{}{},
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *McpServer) handleBzzzGet(params map[string]interface{}) (map[string]interface{}, error) {
|
||||
// Stub: Get data from BZZZ system
|
||||
return map[string]interface{}{
|
||||
"data": nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *McpServer) handleBzzzPost(params map[string]interface{}) (map[string]interface{}, error) {
|
||||
// Stub: Post data to BZZZ system
|
||||
return map[string]interface{}{
|
||||
"success": false,
|
||||
"message": "not implemented",
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *McpServer) handleBzzzThread(params map[string]interface{}) (map[string]interface{}, error) {
|
||||
// Stub: Handle BZZZ thread operations
|
||||
return map[string]interface{}{
|
||||
"thread": nil,
|
||||
}, nil
|
||||
}
|
||||
|
||||
func (s *McpServer) handleBzzzSubscribe(params map[string]interface{}) (map[string]interface{}, error) {
|
||||
// Stub: Subscribe to BZZZ events
|
||||
return map[string]interface{}{
|
||||
"subscribed": false,
|
||||
"message": "not implemented",
|
||||
}, nil
|
||||
}
|
||||
218
pkg/slurp/context/lightrag.go
Normal file
218
pkg/slurp/context/lightrag.go
Normal file
@@ -0,0 +1,218 @@
|
||||
package context
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"strings"
|
||||
|
||||
"chorus/pkg/mcp"
|
||||
"chorus/pkg/ucxl"
|
||||
)
|
||||
|
||||
// LightRAGEnricher enriches context nodes with RAG-retrieved information
|
||||
type LightRAGEnricher struct {
|
||||
client *mcp.LightRAGClient
|
||||
defaultMode mcp.QueryMode
|
||||
enabled bool
|
||||
}
|
||||
|
||||
// NewLightRAGEnricher creates a new LightRAG context enricher
|
||||
func NewLightRAGEnricher(client *mcp.LightRAGClient, defaultMode string) *LightRAGEnricher {
|
||||
if client == nil {
|
||||
return &LightRAGEnricher{enabled: false}
|
||||
}
|
||||
|
||||
mode := mcp.QueryModeHybrid // Default to hybrid
|
||||
switch defaultMode {
|
||||
case "naive":
|
||||
mode = mcp.QueryModeNaive
|
||||
case "local":
|
||||
mode = mcp.QueryModeLocal
|
||||
case "global":
|
||||
mode = mcp.QueryModeGlobal
|
||||
case "hybrid":
|
||||
mode = mcp.QueryModeHybrid
|
||||
}
|
||||
|
||||
return &LightRAGEnricher{
|
||||
client: client,
|
||||
defaultMode: mode,
|
||||
enabled: true,
|
||||
}
|
||||
}
|
||||
|
||||
// EnrichContextNode enriches a ContextNode with LightRAG data
|
||||
// This queries LightRAG for relevant information and adds it to the node's insights
|
||||
func (e *LightRAGEnricher) EnrichContextNode(ctx context.Context, node *ContextNode) error {
|
||||
if !e.enabled || e.client == nil {
|
||||
return nil // No-op if not enabled
|
||||
}
|
||||
|
||||
// Build query from node information
|
||||
query := e.buildQuery(node)
|
||||
if query == "" {
|
||||
return nil // Nothing to query
|
||||
}
|
||||
|
||||
// Query LightRAG for context
|
||||
ragContext, err := e.client.GetContext(ctx, query, e.defaultMode)
|
||||
if err != nil {
|
||||
// Non-fatal - just log and continue
|
||||
return fmt.Errorf("lightrag query failed (non-fatal): %w", err)
|
||||
}
|
||||
|
||||
// Add RAG context to insights if we got meaningful data
|
||||
if strings.TrimSpace(ragContext) != "" {
|
||||
insight := fmt.Sprintf("RAG Context: %s", strings.TrimSpace(ragContext))
|
||||
node.Insights = append(node.Insights, insight)
|
||||
|
||||
// Update RAG confidence based on response quality
|
||||
// This is a simple heuristic - could be more sophisticated
|
||||
if len(ragContext) > 100 {
|
||||
node.RAGConfidence = 0.8
|
||||
} else if len(ragContext) > 50 {
|
||||
node.RAGConfidence = 0.6
|
||||
} else {
|
||||
node.RAGConfidence = 0.4
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnrichResolvedContext enriches a ResolvedContext with LightRAG data
|
||||
// This is called after context resolution to add additional RAG-retrieved insights
|
||||
func (e *LightRAGEnricher) EnrichResolvedContext(ctx context.Context, resolved *ResolvedContext) error {
|
||||
if !e.enabled || e.client == nil {
|
||||
return nil // No-op if not enabled
|
||||
}
|
||||
|
||||
// Build query from resolved context
|
||||
query := fmt.Sprintf("Purpose: %s\nSummary: %s\nTechnologies: %s",
|
||||
resolved.Purpose,
|
||||
resolved.Summary,
|
||||
strings.Join(resolved.Technologies, ", "))
|
||||
|
||||
// Query LightRAG
|
||||
ragContext, err := e.client.GetContext(ctx, query, e.defaultMode)
|
||||
if err != nil {
|
||||
return fmt.Errorf("lightrag query failed (non-fatal): %w", err)
|
||||
}
|
||||
|
||||
// Add to insights if meaningful
|
||||
if strings.TrimSpace(ragContext) != "" {
|
||||
insight := fmt.Sprintf("RAG Enhancement: %s", strings.TrimSpace(ragContext))
|
||||
resolved.Insights = append(resolved.Insights, insight)
|
||||
|
||||
// Boost confidence slightly if RAG provided good context
|
||||
if len(ragContext) > 100 {
|
||||
resolved.ResolutionConfidence = min(1.0, resolved.ResolutionConfidence*1.1)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// EnrichBatchResolution enriches a batch resolution with LightRAG data
|
||||
// Efficiently processes multiple addresses by batching queries where possible
|
||||
func (e *LightRAGEnricher) EnrichBatchResolution(ctx context.Context, batch *BatchResolutionResult) error {
|
||||
if !e.enabled || e.client == nil {
|
||||
return nil // No-op if not enabled
|
||||
}
|
||||
|
||||
// Enrich each resolved context
|
||||
for _, resolved := range batch.Results {
|
||||
if err := e.EnrichResolvedContext(ctx, resolved); err != nil {
|
||||
// Log error but continue with other contexts
|
||||
// Errors are non-fatal for enrichment
|
||||
continue
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// InsertContextNode inserts a context node into LightRAG for future retrieval
|
||||
// This builds the knowledge base over time as contexts are created
|
||||
func (e *LightRAGEnricher) InsertContextNode(ctx context.Context, node *ContextNode) error {
|
||||
if !e.enabled || e.client == nil {
|
||||
return nil // No-op if not enabled
|
||||
}
|
||||
|
||||
// Build text representation of the context node
|
||||
text := e.buildTextRepresentation(node)
|
||||
description := fmt.Sprintf("Context for %s: %s", node.Path, node.Summary)
|
||||
|
||||
// Insert into LightRAG
|
||||
if err := e.client.Insert(ctx, text, description); err != nil {
|
||||
return fmt.Errorf("failed to insert context into lightrag: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// IsEnabled returns whether LightRAG enrichment is enabled
|
||||
func (e *LightRAGEnricher) IsEnabled() bool {
|
||||
return e.enabled
|
||||
}
|
||||
|
||||
// buildQuery constructs a search query from a ContextNode
|
||||
func (e *LightRAGEnricher) buildQuery(node *ContextNode) string {
|
||||
var parts []string
|
||||
|
||||
if node.Purpose != "" {
|
||||
parts = append(parts, node.Purpose)
|
||||
}
|
||||
|
||||
if node.Summary != "" {
|
||||
parts = append(parts, node.Summary)
|
||||
}
|
||||
|
||||
if len(node.Technologies) > 0 {
|
||||
parts = append(parts, strings.Join(node.Technologies, " "))
|
||||
}
|
||||
|
||||
if len(node.Tags) > 0 {
|
||||
parts = append(parts, strings.Join(node.Tags, " "))
|
||||
}
|
||||
|
||||
return strings.Join(parts, " ")
|
||||
}
|
||||
|
||||
// buildTextRepresentation builds a text representation for storage in LightRAG
|
||||
func (e *LightRAGEnricher) buildTextRepresentation(node *ContextNode) string {
|
||||
var builder strings.Builder
|
||||
|
||||
builder.WriteString(fmt.Sprintf("Path: %s\n", node.Path))
|
||||
builder.WriteString(fmt.Sprintf("UCXL Address: %s\n", node.UCXLAddress.String()))
|
||||
builder.WriteString(fmt.Sprintf("Summary: %s\n", node.Summary))
|
||||
builder.WriteString(fmt.Sprintf("Purpose: %s\n", node.Purpose))
|
||||
|
||||
if len(node.Technologies) > 0 {
|
||||
builder.WriteString(fmt.Sprintf("Technologies: %s\n", strings.Join(node.Technologies, ", ")))
|
||||
}
|
||||
|
||||
if len(node.Tags) > 0 {
|
||||
builder.WriteString(fmt.Sprintf("Tags: %s\n", strings.Join(node.Tags, ", ")))
|
||||
}
|
||||
|
||||
if len(node.Insights) > 0 {
|
||||
builder.WriteString("Insights:\n")
|
||||
for _, insight := range node.Insights {
|
||||
builder.WriteString(fmt.Sprintf(" - %s\n", insight))
|
||||
}
|
||||
}
|
||||
|
||||
if node.Language != nil {
|
||||
builder.WriteString(fmt.Sprintf("Language: %s\n", *node.Language))
|
||||
}
|
||||
|
||||
return builder.String()
|
||||
}
|
||||
|
||||
func min(a, b float64) float64 {
|
||||
if a < b {
|
||||
return a
|
||||
}
|
||||
return b
|
||||
}
|
||||
@@ -9,6 +9,8 @@ import (
|
||||
"net/http"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"chorus/pkg/mcp"
|
||||
)
|
||||
|
||||
const (
|
||||
@@ -23,6 +25,7 @@ var (
|
||||
aiProvider string = "resetdata" // Default provider
|
||||
resetdataConfig ResetDataConfig
|
||||
defaultSystemPrompt string
|
||||
lightragClient *mcp.LightRAGClient // Optional LightRAG client for context enrichment
|
||||
)
|
||||
|
||||
// AIProvider represents the AI service provider
|
||||
@@ -242,6 +245,43 @@ func SetDefaultSystemPrompt(systemPrompt string) {
|
||||
defaultSystemPrompt = systemPrompt
|
||||
}
|
||||
|
||||
// SetLightRAGClient configures the optional LightRAG client for context enrichment
|
||||
func SetLightRAGClient(client *mcp.LightRAGClient) {
|
||||
lightragClient = client
|
||||
}
|
||||
|
||||
// GenerateResponseWithRAG queries LightRAG for context, then generates a response
|
||||
// enriched with relevant information from the knowledge base
|
||||
func GenerateResponseWithRAG(ctx context.Context, model, prompt string, queryMode mcp.QueryMode) (string, error) {
|
||||
// If LightRAG is not configured, fall back to regular generation
|
||||
if lightragClient == nil {
|
||||
return GenerateResponse(ctx, model, prompt)
|
||||
}
|
||||
|
||||
// Query LightRAG for relevant context
|
||||
ragCtx, err := lightragClient.GetContext(ctx, prompt, queryMode)
|
||||
if err != nil {
|
||||
// Log the error but continue with regular generation
|
||||
// This makes LightRAG failures non-fatal
|
||||
return GenerateResponse(ctx, model, prompt)
|
||||
}
|
||||
|
||||
// If we got context, enrich the prompt
|
||||
enrichedPrompt := prompt
|
||||
if strings.TrimSpace(ragCtx) != "" {
|
||||
enrichedPrompt = fmt.Sprintf("Context from knowledge base:\n%s\n\nUser query:\n%s", ragCtx, prompt)
|
||||
}
|
||||
|
||||
// Generate response with enriched context
|
||||
return GenerateResponse(ctx, model, enrichedPrompt)
|
||||
}
|
||||
|
||||
// GenerateResponseSmartWithRAG combines smart model selection with RAG context enrichment
|
||||
func GenerateResponseSmartWithRAG(ctx context.Context, prompt string, queryMode mcp.QueryMode) (string, error) {
|
||||
selectedModel := selectBestModel(availableModels, prompt)
|
||||
return GenerateResponseWithRAG(ctx, selectedModel, prompt, queryMode)
|
||||
}
|
||||
|
||||
// selectBestModel calls the model selection webhook to choose the best model for a prompt
|
||||
func selectBestModel(availableModels []string, prompt string) string {
|
||||
if modelWebhookURL == "" || len(availableModels) == 0 {
|
||||
|
||||
Reference in New Issue
Block a user