Compare commits
5 Commits
docs/compr
...
f7130b327c
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f7130b327c | ||
|
|
7381137db5 | ||
|
|
9f480986fa | ||
|
|
4d424764e5 | ||
|
|
63dab5c4d4 |
@@ -1,3 +1,19 @@
|
|||||||
|
# ⚠️ DEPRECATED: DO NOT USE THIS DOCKERFILE ⚠️
|
||||||
|
#
|
||||||
|
# This Alpine-based Dockerfile is INCOMPATIBLE with the chorus-agent binary
|
||||||
|
# built by 'make build-agent'. The binary is compiled with glibc dependencies
|
||||||
|
# and will NOT run on Alpine's musl libc.
|
||||||
|
#
|
||||||
|
# ERROR when used: "exec /app/chorus-agent: no such file or directory"
|
||||||
|
#
|
||||||
|
# ✅ USE Dockerfile.ubuntu INSTEAD
|
||||||
|
#
|
||||||
|
# This file is kept for reference only and should not be used for builds.
|
||||||
|
# Last failed: 2025-10-01
|
||||||
|
# Reason: Alpine musl libc incompatibility with glibc-linked binary
|
||||||
|
#
|
||||||
|
# -------------------------------------------------------------------
|
||||||
|
|
||||||
# CHORUS - Simple Docker image using pre-built binary
|
# CHORUS - Simple Docker image using pre-built binary
|
||||||
FROM alpine:3.18
|
FROM alpine:3.18
|
||||||
|
|
||||||
5
Makefile
5
Makefile
@@ -90,10 +90,13 @@ run-hap: build-hap
|
|||||||
./$(BUILD_DIR)/$(BINARY_NAME_HAP)
|
./$(BUILD_DIR)/$(BINARY_NAME_HAP)
|
||||||
|
|
||||||
# Docker builds
|
# Docker builds
|
||||||
|
# NOTE: Always use Dockerfile.ubuntu for production builds!
|
||||||
|
# Dockerfile.simple.DEPRECATED uses Alpine which is incompatible with glibc-linked binaries
|
||||||
.PHONY: docker-agent
|
.PHONY: docker-agent
|
||||||
docker-agent:
|
docker-agent:
|
||||||
@echo "🐳 Building Docker image for CHORUS agent..."
|
@echo "🐳 Building Docker image for CHORUS agent..."
|
||||||
docker build -f docker/Dockerfile.agent -t chorus-agent:$(VERSION) .
|
docker build -f Dockerfile.ubuntu -t chorus-agent:$(VERSION) .
|
||||||
|
@echo "⚠️ IMPORTANT: Production images MUST use Dockerfile.ubuntu (glibc compatibility)"
|
||||||
|
|
||||||
.PHONY: docker-hap
|
.PHONY: docker-hap
|
||||||
docker-hap:
|
docker-hap:
|
||||||
|
|||||||
@@ -4,10 +4,15 @@ import (
|
|||||||
"encoding/json"
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
"net/http"
|
"net/http"
|
||||||
|
"os"
|
||||||
"strconv"
|
"strconv"
|
||||||
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"chorus/internal/council"
|
||||||
"chorus/internal/logging"
|
"chorus/internal/logging"
|
||||||
|
"chorus/p2p"
|
||||||
|
"chorus/pkg/config"
|
||||||
"chorus/pubsub"
|
"chorus/pubsub"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
@@ -19,17 +24,94 @@ type HTTPServer struct {
|
|||||||
hypercoreLog *logging.HypercoreLog
|
hypercoreLog *logging.HypercoreLog
|
||||||
pubsub *pubsub.PubSub
|
pubsub *pubsub.PubSub
|
||||||
server *http.Server
|
server *http.Server
|
||||||
|
CouncilManager *council.Manager // Exported for brief processing
|
||||||
|
whooshEndpoint string
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHTTPServer creates a new HTTP server for CHORUS API
|
// NewHTTPServer creates a new HTTP server for CHORUS API
|
||||||
func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer {
|
func NewHTTPServer(cfg *config.Config, node *p2p.Node, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer {
|
||||||
|
agentID := cfg.Agent.ID
|
||||||
|
agentName := deriveAgentName(cfg)
|
||||||
|
endpoint := deriveAgentEndpoint(cfg)
|
||||||
|
p2pAddr := deriveAgentP2PAddress(cfg, node)
|
||||||
|
capabilities := cfg.Agent.Capabilities
|
||||||
|
if len(capabilities) == 0 {
|
||||||
|
capabilities = []string{"general_development", "task_coordination"}
|
||||||
|
}
|
||||||
|
|
||||||
|
councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities)
|
||||||
|
|
||||||
|
whooshEndpoint := overrideWhooshEndpoint(cfg)
|
||||||
|
|
||||||
return &HTTPServer{
|
return &HTTPServer{
|
||||||
port: port,
|
port: cfg.Network.APIPort,
|
||||||
hypercoreLog: hlog,
|
hypercoreLog: hlog,
|
||||||
pubsub: ps,
|
pubsub: ps,
|
||||||
|
CouncilManager: councilMgr,
|
||||||
|
whooshEndpoint: strings.TrimRight(whooshEndpoint, "/"),
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func deriveAgentName(cfg *config.Config) string {
|
||||||
|
if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_NAME")); v != "" {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
if cfg.Agent.Specialization != "" {
|
||||||
|
return cfg.Agent.Specialization
|
||||||
|
}
|
||||||
|
return cfg.Agent.ID
|
||||||
|
}
|
||||||
|
|
||||||
|
func deriveAgentEndpoint(cfg *config.Config) string {
|
||||||
|
if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_ENDPOINT")); v != "" {
|
||||||
|
return strings.TrimRight(v, "/")
|
||||||
|
}
|
||||||
|
host := strings.TrimSpace(os.Getenv("CHORUS_AGENT_SERVICE_HOST"))
|
||||||
|
if host == "" {
|
||||||
|
host = "chorus"
|
||||||
|
}
|
||||||
|
scheme := strings.TrimSpace(os.Getenv("CHORUS_AGENT_ENDPOINT_SCHEME"))
|
||||||
|
if scheme == "" {
|
||||||
|
scheme = "http"
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s://%s:%d", scheme, host, cfg.Network.APIPort)
|
||||||
|
}
|
||||||
|
|
||||||
|
func deriveAgentP2PAddress(cfg *config.Config, node *p2p.Node) string {
|
||||||
|
if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_P2P_ENDPOINT")); v != "" {
|
||||||
|
return v
|
||||||
|
}
|
||||||
|
if node != nil {
|
||||||
|
addrs := node.Addresses()
|
||||||
|
if len(addrs) > 0 {
|
||||||
|
return fmt.Sprintf("%s/p2p/%s", addrs[0], node.ID())
|
||||||
|
}
|
||||||
|
}
|
||||||
|
host := strings.TrimSpace(os.Getenv("CHORUS_AGENT_SERVICE_HOST"))
|
||||||
|
if host == "" {
|
||||||
|
host = "chorus"
|
||||||
|
}
|
||||||
|
return fmt.Sprintf("%s:%d", host, cfg.Network.P2PPort)
|
||||||
|
}
|
||||||
|
|
||||||
|
func overrideWhooshEndpoint(cfg *config.Config) string {
|
||||||
|
if v := strings.TrimSpace(os.Getenv("CHORUS_WHOOSH_ENDPOINT")); v != "" {
|
||||||
|
return strings.TrimRight(v, "/")
|
||||||
|
}
|
||||||
|
candidate := cfg.WHOOSHAPI.BaseURL
|
||||||
|
if candidate == "" {
|
||||||
|
candidate = cfg.WHOOSHAPI.URL
|
||||||
|
}
|
||||||
|
if candidate == "" {
|
||||||
|
return "http://whoosh:8080"
|
||||||
|
}
|
||||||
|
trimmed := strings.TrimRight(candidate, "/")
|
||||||
|
if strings.Contains(trimmed, "localhost") || strings.Contains(trimmed, "127.0.0.1") {
|
||||||
|
return "http://whoosh:8080"
|
||||||
|
}
|
||||||
|
return trimmed
|
||||||
|
}
|
||||||
|
|
||||||
// Start starts the HTTP server
|
// Start starts the HTTP server
|
||||||
func (h *HTTPServer) Start() error {
|
func (h *HTTPServer) Start() error {
|
||||||
router := mux.NewRouter()
|
router := mux.NewRouter()
|
||||||
@@ -65,6 +147,12 @@ func (h *HTTPServer) Start() error {
|
|||||||
// Status endpoint
|
// Status endpoint
|
||||||
api.HandleFunc("/status", h.handleStatus).Methods("GET")
|
api.HandleFunc("/status", h.handleStatus).Methods("GET")
|
||||||
|
|
||||||
|
// Council opportunity endpoints (v1)
|
||||||
|
v1 := api.PathPrefix("/v1").Subrouter()
|
||||||
|
v1.HandleFunc("/opportunities/council", h.handleCouncilOpportunity).Methods("POST")
|
||||||
|
v1.HandleFunc("/councils/status", h.handleCouncilStatusUpdate).Methods("POST")
|
||||||
|
v1.HandleFunc("/councils/{councilID}/roles/{roleName}/brief", h.handleCouncilBrief).Methods("POST")
|
||||||
|
|
||||||
h.server = &http.Server{
|
h.server = &http.Server{
|
||||||
Addr: fmt.Sprintf(":%d", h.port),
|
Addr: fmt.Sprintf(":%d", h.port),
|
||||||
Handler: router,
|
Handler: router,
|
||||||
@@ -242,3 +330,209 @@ func (h *HTTPServer) handleStatus(w http.ResponseWriter, r *http.Request) {
|
|||||||
|
|
||||||
json.NewEncoder(w).Encode(status)
|
json.NewEncoder(w).Encode(status)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// handleCouncilOpportunity receives council formation opportunities from WHOOSH
|
||||||
|
func (h *HTTPServer) handleCouncilOpportunity(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
var opportunity council.CouncilOpportunity
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&opportunity); err != nil {
|
||||||
|
http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log the received opportunity to hypercore
|
||||||
|
logData := map[string]interface{}{
|
||||||
|
"event": "council_opportunity_received",
|
||||||
|
"council_id": opportunity.CouncilID,
|
||||||
|
"project_name": opportunity.ProjectName,
|
||||||
|
"repository": opportunity.Repository,
|
||||||
|
"core_roles": len(opportunity.CoreRoles),
|
||||||
|
"optional_roles": len(opportunity.OptionalRoles),
|
||||||
|
"ucxl_address": opportunity.UCXLAddress,
|
||||||
|
"message": fmt.Sprintf("📡 Received council opportunity for project: %s", opportunity.ProjectName),
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil {
|
||||||
|
fmt.Printf("Failed to log council opportunity: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Log to console for immediate visibility
|
||||||
|
fmt.Printf("\n📡 COUNCIL OPPORTUNITY RECEIVED\n")
|
||||||
|
fmt.Printf(" Council ID: %s\n", opportunity.CouncilID)
|
||||||
|
fmt.Printf(" Project: %s\n", opportunity.ProjectName)
|
||||||
|
fmt.Printf(" Repository: %s\n", opportunity.Repository)
|
||||||
|
fmt.Printf(" Core Roles: %d\n", len(opportunity.CoreRoles))
|
||||||
|
fmt.Printf(" Optional Roles: %d\n", len(opportunity.OptionalRoles))
|
||||||
|
fmt.Printf(" UCXL: %s\n", opportunity.UCXLAddress)
|
||||||
|
fmt.Printf("\n Available Roles:\n")
|
||||||
|
for _, role := range opportunity.CoreRoles {
|
||||||
|
fmt.Printf(" - %s (%s) [CORE]\n", role.AgentName, role.RoleName)
|
||||||
|
}
|
||||||
|
for _, role := range opportunity.OptionalRoles {
|
||||||
|
fmt.Printf(" - %s (%s) [OPTIONAL]\n", role.AgentName, role.RoleName)
|
||||||
|
}
|
||||||
|
fmt.Printf("\n")
|
||||||
|
|
||||||
|
// Evaluate the opportunity and claim a role if suitable
|
||||||
|
go func() {
|
||||||
|
if err := h.CouncilManager.EvaluateOpportunity(&opportunity, h.whooshEndpoint); err != nil {
|
||||||
|
fmt.Printf("Failed to evaluate/claim council role: %v\n", err)
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
|
||||||
|
response := map[string]interface{}{
|
||||||
|
"status": "received",
|
||||||
|
"council_id": opportunity.CouncilID,
|
||||||
|
"message": "Council opportunity received and being evaluated",
|
||||||
|
"timestamp": time.Now().Unix(),
|
||||||
|
"agent_id": h.CouncilManager.AgentID(),
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusAccepted)
|
||||||
|
json.NewEncoder(w).Encode(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
// handleCouncilStatusUpdate receives council staffing updates from WHOOSH
|
||||||
|
func (h *HTTPServer) handleCouncilStatusUpdate(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
type roleCountsPayload struct {
|
||||||
|
Total int `json:"total"`
|
||||||
|
Claimed int `json:"claimed"`
|
||||||
|
}
|
||||||
|
|
||||||
|
type councilStatusPayload struct {
|
||||||
|
CouncilID string `json:"council_id"`
|
||||||
|
ProjectName string `json:"project_name"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
Timestamp time.Time `json:"timestamp"`
|
||||||
|
CoreRoles roleCountsPayload `json:"core_roles"`
|
||||||
|
Optional roleCountsPayload `json:"optional_roles"`
|
||||||
|
}
|
||||||
|
|
||||||
|
var payload councilStatusPayload
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
|
||||||
|
http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if payload.CouncilID == "" {
|
||||||
|
http.Error(w, "council_id is required", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if payload.Status == "" {
|
||||||
|
payload.Status = "unknown"
|
||||||
|
}
|
||||||
|
|
||||||
|
if payload.Timestamp.IsZero() {
|
||||||
|
payload.Timestamp = time.Now()
|
||||||
|
}
|
||||||
|
|
||||||
|
if payload.Message == "" {
|
||||||
|
payload.Message = fmt.Sprintf("Council status update: %s (core %d/%d, optional %d/%d)",
|
||||||
|
payload.Status,
|
||||||
|
payload.CoreRoles.Claimed, payload.CoreRoles.Total,
|
||||||
|
payload.Optional.Claimed, payload.Optional.Total,
|
||||||
|
)
|
||||||
|
}
|
||||||
|
|
||||||
|
logData := map[string]interface{}{
|
||||||
|
"event": "council_status_update",
|
||||||
|
"council_id": payload.CouncilID,
|
||||||
|
"project_name": payload.ProjectName,
|
||||||
|
"status": payload.Status,
|
||||||
|
"message": payload.Message,
|
||||||
|
"timestamp": payload.Timestamp.Format(time.RFC3339),
|
||||||
|
"core_roles_total": payload.CoreRoles.Total,
|
||||||
|
"core_roles_claimed": payload.CoreRoles.Claimed,
|
||||||
|
"optional_roles_total": payload.Optional.Total,
|
||||||
|
"optional_roles_claimed": payload.Optional.Claimed,
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil {
|
||||||
|
fmt.Printf("Failed to log council status update: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("\n🏁 COUNCIL STATUS UPDATE\n")
|
||||||
|
fmt.Printf(" Council ID: %s\n", payload.CouncilID)
|
||||||
|
if payload.ProjectName != "" {
|
||||||
|
fmt.Printf(" Project: %s\n", payload.ProjectName)
|
||||||
|
}
|
||||||
|
fmt.Printf(" Status: %s\n", payload.Status)
|
||||||
|
fmt.Printf(" Core Roles: %d/%d claimed\n", payload.CoreRoles.Claimed, payload.CoreRoles.Total)
|
||||||
|
fmt.Printf(" Optional Roles: %d/%d claimed\n", payload.Optional.Claimed, payload.Optional.Total)
|
||||||
|
fmt.Printf(" Message: %s\n\n", payload.Message)
|
||||||
|
|
||||||
|
response := map[string]interface{}{
|
||||||
|
"status": "received",
|
||||||
|
"council_id": payload.CouncilID,
|
||||||
|
"timestamp": payload.Timestamp.Unix(),
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusAccepted)
|
||||||
|
json.NewEncoder(w).Encode(response)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (h *HTTPServer) handleCouncilBrief(w http.ResponseWriter, r *http.Request) {
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
vars := mux.Vars(r)
|
||||||
|
councilID := vars["councilID"]
|
||||||
|
roleName := vars["roleName"]
|
||||||
|
|
||||||
|
if councilID == "" || roleName == "" {
|
||||||
|
http.Error(w, "councilID and roleName are required", http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
var brief council.CouncilBrief
|
||||||
|
if err := json.NewDecoder(r.Body).Decode(&brief); err != nil {
|
||||||
|
http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
brief.CouncilID = councilID
|
||||||
|
brief.RoleName = roleName
|
||||||
|
|
||||||
|
fmt.Printf("\n📦 Received council brief for %s (%s)\n", councilID, roleName)
|
||||||
|
if brief.BriefURL != "" {
|
||||||
|
fmt.Printf(" Brief URL: %s\n", brief.BriefURL)
|
||||||
|
}
|
||||||
|
if brief.Summary != "" {
|
||||||
|
fmt.Printf(" Summary: %s\n", brief.Summary)
|
||||||
|
}
|
||||||
|
|
||||||
|
if h.CouncilManager != nil {
|
||||||
|
h.CouncilManager.HandleCouncilBrief(councilID, roleName, &brief)
|
||||||
|
}
|
||||||
|
|
||||||
|
logData := map[string]interface{}{
|
||||||
|
"event": "council_brief_received",
|
||||||
|
"council_id": councilID,
|
||||||
|
"role_name": roleName,
|
||||||
|
"project_name": brief.ProjectName,
|
||||||
|
"repository": brief.Repository,
|
||||||
|
"brief_url": brief.BriefURL,
|
||||||
|
"ucxl_address": brief.UCXLAddress,
|
||||||
|
"hmmm_topic": brief.HMMMTopic,
|
||||||
|
"expected_artifacts": brief.ExpectedArtifacts,
|
||||||
|
"timestamp": time.Now().Format(time.RFC3339),
|
||||||
|
}
|
||||||
|
|
||||||
|
if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil {
|
||||||
|
fmt.Printf("Failed to log council brief: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
response := map[string]interface{}{
|
||||||
|
"status": "received",
|
||||||
|
"council_id": councilID,
|
||||||
|
"role_name": roleName,
|
||||||
|
"timestamp": time.Now().Unix(),
|
||||||
|
}
|
||||||
|
|
||||||
|
w.WriteHeader(http.StatusAccepted)
|
||||||
|
json.NewEncoder(w).Encode(response)
|
||||||
|
}
|
||||||
|
|||||||
@@ -11,18 +11,18 @@ WORKDIR /build
|
|||||||
# Copy go mod files first (for better caching)
|
# Copy go mod files first (for better caching)
|
||||||
COPY go.mod go.sum ./
|
COPY go.mod go.sum ./
|
||||||
|
|
||||||
# Download dependencies
|
# Skip go mod download; we rely on vendored deps to avoid local replaces
|
||||||
RUN go mod download
|
RUN echo "Using vendored dependencies (skipping go mod download)"
|
||||||
|
|
||||||
# Copy source code
|
# Copy source code
|
||||||
COPY . .
|
COPY . .
|
||||||
|
|
||||||
# Build the CHORUS binary with mod mode
|
# Build the CHORUS agent binary with vendored deps
|
||||||
RUN CGO_ENABLED=0 GOOS=linux go build \
|
RUN CGO_ENABLED=0 GOOS=linux go build \
|
||||||
-mod=mod \
|
-mod=vendor \
|
||||||
-ldflags='-w -s -extldflags "-static"' \
|
-ldflags='-w -s -extldflags "-static"' \
|
||||||
-o chorus \
|
-o chorus-agent \
|
||||||
./cmd/chorus
|
./cmd/agent
|
||||||
|
|
||||||
# Final minimal runtime image
|
# Final minimal runtime image
|
||||||
FROM alpine:3.18
|
FROM alpine:3.18
|
||||||
@@ -42,8 +42,8 @@ RUN mkdir -p /app/data && \
|
|||||||
chown -R chorus:chorus /app
|
chown -R chorus:chorus /app
|
||||||
|
|
||||||
# Copy binary from builder stage
|
# Copy binary from builder stage
|
||||||
COPY --from=builder /build/chorus /app/chorus
|
COPY --from=builder /build/chorus-agent /app/chorus-agent
|
||||||
RUN chmod +x /app/chorus
|
RUN chmod +x /app/chorus-agent
|
||||||
|
|
||||||
# Switch to non-root user
|
# Switch to non-root user
|
||||||
USER chorus
|
USER chorus
|
||||||
@@ -64,5 +64,5 @@ ENV LOG_LEVEL=info \
|
|||||||
CHORUS_HEALTH_PORT=8081 \
|
CHORUS_HEALTH_PORT=8081 \
|
||||||
CHORUS_P2P_PORT=9000
|
CHORUS_P2P_PORT=9000
|
||||||
|
|
||||||
# Start CHORUS
|
# Start CHORUS Agent
|
||||||
ENTRYPOINT ["/app/chorus"]
|
ENTRYPOINT ["/app/chorus-agent"]
|
||||||
|
|||||||
@@ -29,8 +29,8 @@ services:
|
|||||||
- CHORUS_MAX_CONCURRENT_DHT=16 # Limit concurrent DHT queries
|
- CHORUS_MAX_CONCURRENT_DHT=16 # Limit concurrent DHT queries
|
||||||
|
|
||||||
# Election stability windows (Medium-risk fix 2.1)
|
# Election stability windows (Medium-risk fix 2.1)
|
||||||
- CHORUS_ELECTION_MIN_TERM=30s # Minimum time between elections to prevent churn
|
- CHORUS_ELECTION_MIN_TERM=120s # Minimum time between elections to prevent churn
|
||||||
- CHORUS_LEADER_MIN_TERM=45s # Minimum time before challenging healthy leader
|
- CHORUS_LEADER_MIN_TERM=240s # Minimum time before challenging healthy leader
|
||||||
|
|
||||||
# Assignment system for runtime configuration (Medium-risk fix 2.2)
|
# Assignment system for runtime configuration (Medium-risk fix 2.2)
|
||||||
- ASSIGN_URL=${ASSIGN_URL:-} # Optional: WHOOSH assignment endpoint
|
- ASSIGN_URL=${ASSIGN_URL:-} # Optional: WHOOSH assignment endpoint
|
||||||
@@ -57,6 +57,13 @@ services:
|
|||||||
- CHORUS_MODELS=${CHORUS_MODELS:-meta/llama-3.1-8b-instruct}
|
- CHORUS_MODELS=${CHORUS_MODELS:-meta/llama-3.1-8b-instruct}
|
||||||
- CHORUS_DEFAULT_REASONING_MODEL=${CHORUS_DEFAULT_REASONING_MODEL:-meta/llama-3.1-8b-instruct}
|
- CHORUS_DEFAULT_REASONING_MODEL=${CHORUS_DEFAULT_REASONING_MODEL:-meta/llama-3.1-8b-instruct}
|
||||||
|
|
||||||
|
# LightRAG configuration (optional RAG enhancement)
|
||||||
|
- CHORUS_LIGHTRAG_ENABLED=${CHORUS_LIGHTRAG_ENABLED:-false}
|
||||||
|
- CHORUS_LIGHTRAG_BASE_URL=${CHORUS_LIGHTRAG_BASE_URL:-http://lightrag:9621}
|
||||||
|
- CHORUS_LIGHTRAG_TIMEOUT=${CHORUS_LIGHTRAG_TIMEOUT:-30s}
|
||||||
|
- CHORUS_LIGHTRAG_API_KEY=${CHORUS_LIGHTRAG_API_KEY:-your-secure-api-key-here}
|
||||||
|
- CHORUS_LIGHTRAG_DEFAULT_MODE=${CHORUS_LIGHTRAG_DEFAULT_MODE:-hybrid}
|
||||||
|
|
||||||
# Logging configuration
|
# Logging configuration
|
||||||
- LOG_LEVEL=${LOG_LEVEL:-info}
|
- LOG_LEVEL=${LOG_LEVEL:-info}
|
||||||
- LOG_FORMAT=${LOG_FORMAT:-structured}
|
- LOG_FORMAT=${LOG_FORMAT:-structured}
|
||||||
@@ -95,7 +102,7 @@ services:
|
|||||||
# Container resource limits
|
# Container resource limits
|
||||||
deploy:
|
deploy:
|
||||||
mode: replicated
|
mode: replicated
|
||||||
replicas: ${CHORUS_REPLICAS:-9}
|
replicas: ${CHORUS_REPLICAS:-20}
|
||||||
update_config:
|
update_config:
|
||||||
parallelism: 1
|
parallelism: 1
|
||||||
delay: 10s
|
delay: 10s
|
||||||
@@ -166,6 +173,8 @@ services:
|
|||||||
WHOOSH_SERVER_READ_TIMEOUT: "30s"
|
WHOOSH_SERVER_READ_TIMEOUT: "30s"
|
||||||
WHOOSH_SERVER_WRITE_TIMEOUT: "30s"
|
WHOOSH_SERVER_WRITE_TIMEOUT: "30s"
|
||||||
WHOOSH_SERVER_SHUTDOWN_TIMEOUT: "30s"
|
WHOOSH_SERVER_SHUTDOWN_TIMEOUT: "30s"
|
||||||
|
# UI static directory (served at site root by WHOOSH)
|
||||||
|
WHOOSH_UI_DIR: "/app/ui"
|
||||||
|
|
||||||
# GITEA configuration
|
# GITEA configuration
|
||||||
WHOOSH_GITEA_BASE_URL: https://gitea.chorus.services
|
WHOOSH_GITEA_BASE_URL: https://gitea.chorus.services
|
||||||
@@ -210,7 +219,8 @@ services:
|
|||||||
- jwt_secret
|
- jwt_secret
|
||||||
- service_tokens
|
- service_tokens
|
||||||
- redis_password
|
- redis_password
|
||||||
# volumes:
|
volumes:
|
||||||
|
- whoosh_ui:/app/ui:ro
|
||||||
# - /var/run/docker.sock:/var/run/docker.sock # Disabled for agent assignment architecture
|
# - /var/run/docker.sock:/var/run/docker.sock # Disabled for agent assignment architecture
|
||||||
deploy:
|
deploy:
|
||||||
replicas: 2
|
replicas: 2
|
||||||
@@ -247,11 +257,11 @@ services:
|
|||||||
- traefik.enable=true
|
- traefik.enable=true
|
||||||
- traefik.docker.network=tengig
|
- traefik.docker.network=tengig
|
||||||
- traefik.http.routers.whoosh.rule=Host(`whoosh.chorus.services`)
|
- traefik.http.routers.whoosh.rule=Host(`whoosh.chorus.services`)
|
||||||
|
- traefik.http.routers.whoosh.entrypoints=web,web-secured
|
||||||
- traefik.http.routers.whoosh.tls=true
|
- traefik.http.routers.whoosh.tls=true
|
||||||
- traefik.http.routers.whoosh.tls.certresolver=letsencryptresolver
|
- traefik.http.routers.whoosh.tls.certresolver=letsencryptresolver
|
||||||
- traefik.http.routers.photoprism.entrypoints=web,web-secured
|
|
||||||
- traefik.http.services.whoosh.loadbalancer.server.port=8080
|
- traefik.http.services.whoosh.loadbalancer.server.port=8080
|
||||||
- traefik.http.services.photoprism.loadbalancer.passhostheader=true
|
- traefik.http.services.whoosh.loadbalancer.passhostheader=true
|
||||||
- traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash
|
- traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash
|
||||||
networks:
|
networks:
|
||||||
- tengig
|
- tengig
|
||||||
@@ -407,7 +417,7 @@ services:
|
|||||||
# REQ: BACKBEAT-REQ-001 - Single BeatFrame publisher per cluster
|
# REQ: BACKBEAT-REQ-001 - Single BeatFrame publisher per cluster
|
||||||
# REQ: BACKBEAT-OPS-001 - One replica prefers leadership
|
# REQ: BACKBEAT-OPS-001 - One replica prefers leadership
|
||||||
backbeat-pulse:
|
backbeat-pulse:
|
||||||
image: anthonyrawlins/backbeat-pulse:v1.0.5
|
image: anthonyrawlins/backbeat-pulse:v1.0.6
|
||||||
command: >
|
command: >
|
||||||
./pulse
|
./pulse
|
||||||
-cluster=chorus-production
|
-cluster=chorus-production
|
||||||
@@ -574,6 +584,14 @@ services:
|
|||||||
max-file: "3"
|
max-file: "3"
|
||||||
tag: "nats/{{.Name}}/{{.ID}}"
|
tag: "nats/{{.Name}}/{{.ID}}"
|
||||||
|
|
||||||
|
watchtower:
|
||||||
|
image: containrrr/watchtower
|
||||||
|
volumes:
|
||||||
|
- /var/run/docker.sock:/var/run/docker.sock
|
||||||
|
command: --interval 300 --cleanup --revive-stopped --include-stopped
|
||||||
|
restart: always
|
||||||
|
|
||||||
|
|
||||||
# KACHING services are deployed separately in their own stack
|
# KACHING services are deployed separately in their own stack
|
||||||
# License validation will access https://kaching.chorus.services/api
|
# License validation will access https://kaching.chorus.services/api
|
||||||
|
|
||||||
@@ -611,6 +629,12 @@ volumes:
|
|||||||
type: none
|
type: none
|
||||||
o: bind
|
o: bind
|
||||||
device: /rust/containers/WHOOSH/redis
|
device: /rust/containers/WHOOSH/redis
|
||||||
|
whoosh_ui:
|
||||||
|
driver: local
|
||||||
|
driver_opts:
|
||||||
|
type: none
|
||||||
|
o: bind
|
||||||
|
device: /rust/containers/WHOOSH/ui
|
||||||
|
|
||||||
|
|
||||||
# Networks for CHORUS communication
|
# Networks for CHORUS communication
|
||||||
@@ -645,7 +669,7 @@ secrets:
|
|||||||
name: whoosh_webhook_token
|
name: whoosh_webhook_token
|
||||||
jwt_secret:
|
jwt_secret:
|
||||||
external: true
|
external: true
|
||||||
name: whoosh_jwt_secret
|
name: whoosh_jwt_secret_v4
|
||||||
service_tokens:
|
service_tokens:
|
||||||
external: true
|
external: true
|
||||||
name: whoosh_service_tokens
|
name: whoosh_service_tokens
|
||||||
|
|||||||
388
docs/LIGHTRAG_INTEGRATION.md
Normal file
388
docs/LIGHTRAG_INTEGRATION.md
Normal file
@@ -0,0 +1,388 @@
|
|||||||
|
# LightRAG MCP Integration
|
||||||
|
|
||||||
|
**Status:** ✅ Production Ready
|
||||||
|
**Version:** 1.0.0
|
||||||
|
**Date:** 2025-09-30
|
||||||
|
|
||||||
|
## Overview
|
||||||
|
|
||||||
|
CHORUS now includes optional LightRAG integration for Retrieval-Augmented Generation (RAG) capabilities. LightRAG provides graph-based knowledge retrieval to enrich AI reasoning and context resolution.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
### Components
|
||||||
|
|
||||||
|
1. **LightRAG Client** (`pkg/mcp/lightrag_client.go`)
|
||||||
|
- HTTP client for LightRAG MCP server
|
||||||
|
- Supports 4 query modes: naive, local, global, hybrid
|
||||||
|
- Health checking and document insertion
|
||||||
|
- Configurable timeouts and API authentication
|
||||||
|
|
||||||
|
2. **Reasoning Engine Integration** (`reasoning/reasoning.go`)
|
||||||
|
- `GenerateResponseWithRAG()` - RAG-enriched response generation
|
||||||
|
- `GenerateResponseSmartWithRAG()` - Combines model selection + RAG
|
||||||
|
- `SetLightRAGClient()` - Configure RAG client
|
||||||
|
- Non-fatal error handling (degrades gracefully)
|
||||||
|
|
||||||
|
3. **SLURP Context Enrichment** (`pkg/slurp/context/lightrag.go`)
|
||||||
|
- `LightRAGEnricher` - Enriches context nodes with RAG data
|
||||||
|
- `EnrichContextNode()` - Add insights to individual nodes
|
||||||
|
- `EnrichResolvedContext()` - Enrich resolved context chains
|
||||||
|
- `InsertContextNode()` - Build knowledge base over time
|
||||||
|
|
||||||
|
4. **Configuration** (`pkg/config/config.go`)
|
||||||
|
- `LightRAGConfig` struct with 5 configuration options
|
||||||
|
- Environment variable support
|
||||||
|
- Automatic initialization in runtime
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
### Environment Variables
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Enable LightRAG integration
|
||||||
|
CHORUS_LIGHTRAG_ENABLED=true
|
||||||
|
|
||||||
|
# LightRAG server endpoint
|
||||||
|
CHORUS_LIGHTRAG_BASE_URL=http://127.0.0.1:9621
|
||||||
|
|
||||||
|
# Query timeout
|
||||||
|
CHORUS_LIGHTRAG_TIMEOUT=30s
|
||||||
|
|
||||||
|
# Optional API key
|
||||||
|
CHORUS_LIGHTRAG_API_KEY=your-api-key
|
||||||
|
|
||||||
|
# Default query mode (naive, local, global, hybrid)
|
||||||
|
CHORUS_LIGHTRAG_DEFAULT_MODE=hybrid
|
||||||
|
```
|
||||||
|
|
||||||
|
### Docker Configuration
|
||||||
|
|
||||||
|
```yaml
|
||||||
|
services:
|
||||||
|
chorus-agent:
|
||||||
|
environment:
|
||||||
|
- CHORUS_LIGHTRAG_ENABLED=true
|
||||||
|
- CHORUS_LIGHTRAG_BASE_URL=http://lightrag:9621
|
||||||
|
- CHORUS_LIGHTRAG_DEFAULT_MODE=hybrid
|
||||||
|
depends_on:
|
||||||
|
- lightrag
|
||||||
|
|
||||||
|
lightrag:
|
||||||
|
image: lightrag/lightrag:latest
|
||||||
|
ports:
|
||||||
|
- "9621:9621"
|
||||||
|
volumes:
|
||||||
|
- lightrag-data:/app/data
|
||||||
|
```
|
||||||
|
|
||||||
|
## Query Modes
|
||||||
|
|
||||||
|
LightRAG supports 4 query modes with different retrieval strategies:
|
||||||
|
|
||||||
|
1. **Naive Mode** (`QueryModeNaive`)
|
||||||
|
- Simple semantic search
|
||||||
|
- Fastest, least context
|
||||||
|
- Use for: Quick lookups
|
||||||
|
|
||||||
|
2. **Local Mode** (`QueryModeLocal`)
|
||||||
|
- Local graph traversal
|
||||||
|
- Context from immediate neighbors
|
||||||
|
- Use for: Related information
|
||||||
|
|
||||||
|
3. **Global Mode** (`QueryModeGlobal`)
|
||||||
|
- Global graph analysis
|
||||||
|
- Broad context from entire knowledge base
|
||||||
|
- Use for: High-level questions
|
||||||
|
|
||||||
|
4. **Hybrid Mode** (`QueryModeHybrid`) ⭐ **Recommended**
|
||||||
|
- Combined approach
|
||||||
|
- Balances breadth and depth
|
||||||
|
- Use for: General purpose RAG
|
||||||
|
|
||||||
|
## Usage Examples
|
||||||
|
|
||||||
|
### Reasoning Engine with RAG
|
||||||
|
|
||||||
|
```go
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"chorus/reasoning"
|
||||||
|
"chorus/pkg/mcp"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Initialize LightRAG client
|
||||||
|
config := mcp.LightRAGConfig{
|
||||||
|
BaseURL: "http://127.0.0.1:9621",
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
}
|
||||||
|
client := mcp.NewLightRAGClient(config)
|
||||||
|
|
||||||
|
// Configure reasoning engine
|
||||||
|
reasoning.SetLightRAGClient(client)
|
||||||
|
|
||||||
|
// Generate RAG-enriched response
|
||||||
|
ctx := context.Background()
|
||||||
|
response, err := reasoning.GenerateResponseWithRAG(
|
||||||
|
ctx,
|
||||||
|
"meta/llama-3.1-8b-instruct",
|
||||||
|
"How does CHORUS handle P2P networking?",
|
||||||
|
mcp.QueryModeHybrid,
|
||||||
|
)
|
||||||
|
```
|
||||||
|
|
||||||
|
### SLURP Context Enrichment
|
||||||
|
|
||||||
|
```go
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"chorus/pkg/slurp/context"
|
||||||
|
"chorus/pkg/mcp"
|
||||||
|
)
|
||||||
|
|
||||||
|
// Create enricher
|
||||||
|
enricher := context.NewLightRAGEnricher(client, "hybrid")
|
||||||
|
|
||||||
|
// Enrich a context node
|
||||||
|
node := &context.ContextNode{
|
||||||
|
Path: "/pkg/p2p",
|
||||||
|
Summary: "P2P networking implementation",
|
||||||
|
Purpose: "Provides libp2p networking layer",
|
||||||
|
}
|
||||||
|
|
||||||
|
err := enricher.EnrichContextNode(ctx, node)
|
||||||
|
// node.Insights now contains RAG-retrieved information
|
||||||
|
|
||||||
|
// Insert for future retrieval
|
||||||
|
err = enricher.InsertContextNode(ctx, node)
|
||||||
|
```
|
||||||
|
|
||||||
|
### Direct LightRAG Client
|
||||||
|
|
||||||
|
```go
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"chorus/pkg/mcp"
|
||||||
|
)
|
||||||
|
|
||||||
|
client := mcp.NewLightRAGClient(config)
|
||||||
|
|
||||||
|
// Health check
|
||||||
|
healthy := client.IsHealthy(ctx)
|
||||||
|
|
||||||
|
// Query with response
|
||||||
|
response, err := client.Query(ctx, "query", mcp.QueryModeHybrid)
|
||||||
|
|
||||||
|
// Get context only
|
||||||
|
context, err := client.GetContext(ctx, "query", mcp.QueryModeHybrid)
|
||||||
|
|
||||||
|
// Insert document
|
||||||
|
err := client.Insert(ctx, "text content", "description")
|
||||||
|
```
|
||||||
|
|
||||||
|
## Integration Points
|
||||||
|
|
||||||
|
### Runtime Initialization
|
||||||
|
|
||||||
|
LightRAG is initialized automatically in `internal/runtime/shared.go`:
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Line 685-704
|
||||||
|
if cfg.LightRAG.Enabled {
|
||||||
|
lightragConfig := mcp.LightRAGConfig{
|
||||||
|
BaseURL: cfg.LightRAG.BaseURL,
|
||||||
|
Timeout: cfg.LightRAG.Timeout,
|
||||||
|
APIKey: cfg.LightRAG.APIKey,
|
||||||
|
}
|
||||||
|
lightragClient := mcp.NewLightRAGClient(lightragConfig)
|
||||||
|
|
||||||
|
if lightragClient.IsHealthy(ctx) {
|
||||||
|
reasoning.SetLightRAGClient(lightragClient)
|
||||||
|
logger.Info("📚 LightRAG RAG system enabled")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
### Graceful Degradation
|
||||||
|
|
||||||
|
LightRAG integration is **completely optional** and **non-blocking**:
|
||||||
|
|
||||||
|
- If `CHORUS_LIGHTRAG_ENABLED=false`, no LightRAG calls are made
|
||||||
|
- If LightRAG server is unavailable, health check fails gracefully
|
||||||
|
- If RAG queries fail, reasoning engine falls back to non-RAG generation
|
||||||
|
- SLURP enrichment failures are logged but don't block context resolution
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
### Unit Tests
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Run all LightRAG tests (requires running server)
|
||||||
|
go test -v ./pkg/mcp/
|
||||||
|
|
||||||
|
# Run only unit tests (no server required)
|
||||||
|
go test -v -short ./pkg/mcp/
|
||||||
|
```
|
||||||
|
|
||||||
|
### Integration Tests
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Start LightRAG server
|
||||||
|
cd ~/chorus/mcp-include/LightRAG
|
||||||
|
python main.py
|
||||||
|
|
||||||
|
# Run integration tests
|
||||||
|
cd ~/chorus/project-queues/active/CHORUS
|
||||||
|
go test -v ./pkg/mcp/ -run TestLightRAGClient
|
||||||
|
```
|
||||||
|
|
||||||
|
## Performance Considerations
|
||||||
|
|
||||||
|
### Query Timeouts
|
||||||
|
|
||||||
|
- Default: 30 seconds
|
||||||
|
- Hybrid mode is slowest (analyzes entire graph)
|
||||||
|
- Naive mode is fastest (simple semantic search)
|
||||||
|
|
||||||
|
### Caching
|
||||||
|
|
||||||
|
LightRAG includes internal caching:
|
||||||
|
- Repeated queries return cached results
|
||||||
|
- Cache TTL managed by LightRAG server
|
||||||
|
- No CHORUS-side caching required
|
||||||
|
|
||||||
|
### Resource Usage
|
||||||
|
|
||||||
|
- Memory: Proportional to knowledge base size
|
||||||
|
- CPU: Query modes have different compute requirements
|
||||||
|
- Network: HTTP requests to LightRAG server
|
||||||
|
|
||||||
|
## Troubleshooting
|
||||||
|
|
||||||
|
### Server Not Healthy
|
||||||
|
|
||||||
|
**Symptom:** `LightRAG enabled but server not healthy`
|
||||||
|
|
||||||
|
**Solutions:**
|
||||||
|
1. Check if LightRAG server is running: `curl http://127.0.0.1:9621/health`
|
||||||
|
2. Verify correct port in `CHORUS_LIGHTRAG_BASE_URL`
|
||||||
|
3. Check LightRAG logs for errors
|
||||||
|
4. Ensure network connectivity between CHORUS and LightRAG
|
||||||
|
|
||||||
|
### Empty Responses
|
||||||
|
|
||||||
|
**Symptom:** RAG queries return empty results
|
||||||
|
|
||||||
|
**Solutions:**
|
||||||
|
1. Knowledge base may be empty - insert documents first
|
||||||
|
2. Query may not match indexed content
|
||||||
|
3. Try different query mode (hybrid recommended)
|
||||||
|
4. Check LightRAG indexing logs
|
||||||
|
|
||||||
|
### Timeout Errors
|
||||||
|
|
||||||
|
**Symptom:** `context deadline exceeded`
|
||||||
|
|
||||||
|
**Solutions:**
|
||||||
|
1. Increase `CHORUS_LIGHTRAG_TIMEOUT`
|
||||||
|
2. Use faster query mode (naive or local)
|
||||||
|
3. Optimize LightRAG server performance
|
||||||
|
4. Check network latency
|
||||||
|
|
||||||
|
## Security Considerations
|
||||||
|
|
||||||
|
### API Authentication
|
||||||
|
|
||||||
|
Optional API key support:
|
||||||
|
```bash
|
||||||
|
CHORUS_LIGHTRAG_API_KEY=your-secret-key
|
||||||
|
```
|
||||||
|
|
||||||
|
Keys are sent as Bearer tokens in Authorization header.
|
||||||
|
|
||||||
|
### Network Security
|
||||||
|
|
||||||
|
- Run LightRAG on internal network only
|
||||||
|
- Use HTTPS for production deployments
|
||||||
|
- Consider firewall rules to restrict access
|
||||||
|
- LightRAG doesn't include built-in encryption
|
||||||
|
|
||||||
|
### Data Privacy
|
||||||
|
|
||||||
|
- All queries and documents are stored in LightRAG
|
||||||
|
- Consider what data is being indexed
|
||||||
|
- Implement data retention policies
|
||||||
|
- Use access control on LightRAG server
|
||||||
|
|
||||||
|
## Monitoring
|
||||||
|
|
||||||
|
### Health Checks
|
||||||
|
|
||||||
|
```go
|
||||||
|
// Check LightRAG availability
|
||||||
|
if client.IsHealthy(ctx) {
|
||||||
|
// Server is healthy
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get detailed health info
|
||||||
|
health, err := client.Health(ctx)
|
||||||
|
// Returns: Status, CoreVersion, APIVersion, etc.
|
||||||
|
```
|
||||||
|
|
||||||
|
### Metrics
|
||||||
|
|
||||||
|
Consider adding:
|
||||||
|
- RAG query latency
|
||||||
|
- Cache hit rates
|
||||||
|
- Enrichment success/failure rates
|
||||||
|
- Knowledge base size
|
||||||
|
|
||||||
|
## Future Enhancements
|
||||||
|
|
||||||
|
Potential improvements:
|
||||||
|
|
||||||
|
1. **Batch Query Optimization**
|
||||||
|
- Batch multiple RAG queries together
|
||||||
|
- Reduce HTTP overhead
|
||||||
|
|
||||||
|
2. **Adaptive Query Mode Selection**
|
||||||
|
- Automatically choose query mode based on question type
|
||||||
|
- Learn from past query performance
|
||||||
|
|
||||||
|
3. **Knowledge Base Management**
|
||||||
|
- Automated document insertion from SLURP contexts
|
||||||
|
- Background indexing of code repositories
|
||||||
|
- Scheduled knowledge base updates
|
||||||
|
|
||||||
|
4. **Advanced Caching**
|
||||||
|
- CHORUS-side caching with TTL
|
||||||
|
- Semantic cache (similar queries share cache)
|
||||||
|
- Persistent cache across restarts
|
||||||
|
|
||||||
|
5. **Multi-tenant Support**
|
||||||
|
- Per-agent knowledge bases
|
||||||
|
- Role-based access to documents
|
||||||
|
- Encrypted knowledge storage
|
||||||
|
|
||||||
|
## Files Changed
|
||||||
|
|
||||||
|
1. `pkg/mcp/lightrag_client.go` - NEW (277 lines)
|
||||||
|
2. `pkg/mcp/lightrag_client_test.go` - NEW (239 lines)
|
||||||
|
3. `pkg/config/config.go` - Modified (added LightRAGConfig)
|
||||||
|
4. `reasoning/reasoning.go` - Modified (added RAG functions)
|
||||||
|
5. `internal/runtime/shared.go` - Modified (added initialization)
|
||||||
|
6. `pkg/slurp/context/lightrag.go` - NEW (203 lines)
|
||||||
|
|
||||||
|
**Total:** 3 new files, 3 modified files, ~750 lines of code
|
||||||
|
|
||||||
|
## References
|
||||||
|
|
||||||
|
- LightRAG Documentation: https://github.com/HKUDS/LightRAG
|
||||||
|
- MCP Protocol Spec: https://spec.modelcontextprotocol.io
|
||||||
|
- CHORUS Documentation: `docs/comprehensive/`
|
||||||
|
|
||||||
|
---
|
||||||
|
|
||||||
|
**Maintainer:** CHORUS Project Team
|
||||||
|
**Last Updated:** 2025-09-30
|
||||||
|
**Status:** Production Ready
|
||||||
451
internal/council/manager.go
Normal file
451
internal/council/manager.go
Normal file
@@ -0,0 +1,451 @@
|
|||||||
|
package council
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"crypto/sha256"
|
||||||
|
"encoding/hex"
|
||||||
|
"encoding/json"
|
||||||
|
"errors"
|
||||||
|
"fmt"
|
||||||
|
"hash/fnv"
|
||||||
|
"math/rand"
|
||||||
|
"net/http"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"time"
|
||||||
|
|
||||||
|
"chorus/internal/persona"
|
||||||
|
)
|
||||||
|
|
||||||
|
// CouncilOpportunity represents a council formation opportunity from WHOOSH.
|
||||||
|
type CouncilOpportunity struct {
|
||||||
|
CouncilID string `json:"council_id"`
|
||||||
|
ProjectName string `json:"project_name"`
|
||||||
|
Repository string `json:"repository"`
|
||||||
|
ProjectBrief string `json:"project_brief"`
|
||||||
|
CoreRoles []CouncilRole `json:"core_roles"`
|
||||||
|
OptionalRoles []CouncilRole `json:"optional_roles"`
|
||||||
|
UCXLAddress string `json:"ucxl_address"`
|
||||||
|
FormationDeadline time.Time `json:"formation_deadline"`
|
||||||
|
CreatedAt time.Time `json:"created_at"`
|
||||||
|
Metadata map[string]interface{} `json:"metadata"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// CouncilRole represents a single role available within a council.
|
||||||
|
type CouncilRole struct {
|
||||||
|
RoleName string `json:"role_name"`
|
||||||
|
AgentName string `json:"agent_name"`
|
||||||
|
Required bool `json:"required"`
|
||||||
|
RequiredSkills []string `json:"required_skills"`
|
||||||
|
Description string `json:"description"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RoleProfile mirrors WHOOSH role profile metadata included in claim responses.
|
||||||
|
type RoleProfile struct {
|
||||||
|
RoleName string `json:"role_name"`
|
||||||
|
DisplayName string `json:"display_name"`
|
||||||
|
PromptKey string `json:"prompt_key"`
|
||||||
|
PromptPack string `json:"prompt_pack"`
|
||||||
|
Capabilities []string `json:"capabilities"`
|
||||||
|
BriefRoutingHint string `json:"brief_routing_hint"`
|
||||||
|
DefaultBriefOwner bool `json:"default_brief_owner"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// CouncilBrief carries the high-level brief metadata for an activated council.
|
||||||
|
type CouncilBrief struct {
|
||||||
|
CouncilID string `json:"council_id"`
|
||||||
|
RoleName string `json:"role_name"`
|
||||||
|
ProjectName string `json:"project_name"`
|
||||||
|
Repository string `json:"repository"`
|
||||||
|
Summary string `json:"summary"`
|
||||||
|
BriefURL string `json:"brief_url"`
|
||||||
|
IssueID *int64 `json:"issue_id"`
|
||||||
|
UCXLAddress string `json:"ucxl_address"`
|
||||||
|
ExpectedArtifacts []string `json:"expected_artifacts"`
|
||||||
|
HMMMTopic string `json:"hmmm_topic"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// RoleAssignment keeps track of the agent's current council engagement.
|
||||||
|
type RoleAssignment struct {
|
||||||
|
CouncilID string
|
||||||
|
RoleName string
|
||||||
|
UCXLAddress string
|
||||||
|
AssignedAt time.Time
|
||||||
|
Profile RoleProfile
|
||||||
|
Brief *CouncilBrief
|
||||||
|
Persona *persona.Persona
|
||||||
|
PersonaHash string
|
||||||
|
}
|
||||||
|
|
||||||
|
var ErrRoleConflict = errors.New("council role already claimed")
|
||||||
|
|
||||||
|
const defaultModelProvider = "ollama"
|
||||||
|
|
||||||
|
// Manager handles council opportunity evaluation, persona preparation, and brief handoff.
|
||||||
|
type Manager struct {
|
||||||
|
agentID string
|
||||||
|
agentName string
|
||||||
|
endpoint string
|
||||||
|
p2pAddr string
|
||||||
|
capabilities []string
|
||||||
|
|
||||||
|
httpClient *http.Client
|
||||||
|
personaLoader *persona.Loader
|
||||||
|
|
||||||
|
mu sync.Mutex
|
||||||
|
currentAssignment *RoleAssignment
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewManager creates a new council manager.
|
||||||
|
func NewManager(agentID, agentName, endpoint, p2pAddr string, capabilities []string) *Manager {
|
||||||
|
loader, err := persona.NewLoader()
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("⚠️ Persona loader initialisation failed: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &Manager{
|
||||||
|
agentID: agentID,
|
||||||
|
agentName: agentName,
|
||||||
|
endpoint: endpoint,
|
||||||
|
p2pAddr: p2pAddr,
|
||||||
|
capabilities: capabilities,
|
||||||
|
httpClient: &http.Client{Timeout: 10 * time.Second},
|
||||||
|
personaLoader: loader,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AgentID returns the agent's identifier.
|
||||||
|
func (m *Manager) AgentID() string {
|
||||||
|
return m.agentID
|
||||||
|
}
|
||||||
|
|
||||||
|
// EvaluateOpportunity analyzes a council opportunity and decides whether to claim a role.
|
||||||
|
func (m *Manager) EvaluateOpportunity(opportunity *CouncilOpportunity, whooshEndpoint string) error {
|
||||||
|
fmt.Printf("\n🤔 Evaluating council opportunity for: %s\n", opportunity.ProjectName)
|
||||||
|
|
||||||
|
if current := m.currentAssignmentSnapshot(); current != nil {
|
||||||
|
fmt.Printf(" ℹ️ Agent already assigned to council %s as %s; skipping new claims\n", current.CouncilID, current.RoleName)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
const maxAttempts = 10
|
||||||
|
const retryDelay = 3 * time.Second
|
||||||
|
|
||||||
|
var attemptedAtLeastOne bool
|
||||||
|
|
||||||
|
for attempt := 1; attempt <= maxAttempts; attempt++ {
|
||||||
|
assignment, attemptedCore, err := m.tryClaimRoles(opportunity.CoreRoles, opportunity, whooshEndpoint, "CORE")
|
||||||
|
attemptedAtLeastOne = attemptedAtLeastOne || attemptedCore
|
||||||
|
if assignment != nil {
|
||||||
|
m.setCurrentAssignment(assignment)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil && !errors.Is(err, ErrRoleConflict) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
assignment, attemptedOptional, err := m.tryClaimRoles(opportunity.OptionalRoles, opportunity, whooshEndpoint, "OPTIONAL")
|
||||||
|
attemptedAtLeastOne = attemptedAtLeastOne || attemptedOptional
|
||||||
|
if assignment != nil {
|
||||||
|
m.setCurrentAssignment(assignment)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
if err != nil && !errors.Is(err, ErrRoleConflict) {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
if !attemptedAtLeastOne {
|
||||||
|
fmt.Printf(" ✗ No suitable roles found for this agent\n\n")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf(" ↻ Attempt %d did not secure a council role; retrying in %s...\n", attempt, retryDelay)
|
||||||
|
time.Sleep(retryDelay)
|
||||||
|
}
|
||||||
|
|
||||||
|
return fmt.Errorf("exhausted council role claim attempts for council %s", opportunity.CouncilID)
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) tryClaimRoles(roles []CouncilRole, opportunity *CouncilOpportunity, whooshEndpoint string, roleType string) (*RoleAssignment, bool, error) {
|
||||||
|
var attempted bool
|
||||||
|
|
||||||
|
// Shuffle roles deterministically per agent+council to reduce herd on the first role
|
||||||
|
shuffled := append([]CouncilRole(nil), roles...)
|
||||||
|
if len(shuffled) > 1 {
|
||||||
|
h := fnv.New64a()
|
||||||
|
_, _ = h.Write([]byte(m.agentID))
|
||||||
|
_, _ = h.Write([]byte(opportunity.CouncilID))
|
||||||
|
seed := int64(h.Sum64())
|
||||||
|
r := rand.New(rand.NewSource(seed))
|
||||||
|
r.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, role := range shuffled {
|
||||||
|
if !m.shouldClaimRole(role, opportunity) {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
attempted = true
|
||||||
|
fmt.Printf(" ✓ Attempting to claim %s role: %s (%s)\n", roleType, role.AgentName, role.RoleName)
|
||||||
|
|
||||||
|
assignment, err := m.claimRole(opportunity, role, whooshEndpoint)
|
||||||
|
if assignment != nil {
|
||||||
|
return assignment, attempted, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if errors.Is(err, ErrRoleConflict) {
|
||||||
|
fmt.Printf(" ⚠️ Role %s already claimed by another agent, trying next role...\n", role.RoleName)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
if err != nil {
|
||||||
|
return nil, attempted, err
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, attempted, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) shouldClaimRole(role CouncilRole, _ *CouncilOpportunity) bool {
|
||||||
|
if m.hasActiveAssignment() {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
// TODO: implement capability-based selection. For now, opportunistically claim any available role.
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) claimRole(opportunity *CouncilOpportunity, role CouncilRole, whooshEndpoint string) (*RoleAssignment, error) {
|
||||||
|
claimURL := fmt.Sprintf("%s/api/v1/councils/%s/claims", strings.TrimRight(whooshEndpoint, "/"), opportunity.CouncilID)
|
||||||
|
|
||||||
|
claim := map[string]interface{}{
|
||||||
|
"agent_id": m.agentID,
|
||||||
|
"agent_name": m.agentName,
|
||||||
|
"role_name": role.RoleName,
|
||||||
|
"capabilities": m.capabilities,
|
||||||
|
"confidence": 0.75, // TODO: calculate based on capability match quality.
|
||||||
|
"reasoning": fmt.Sprintf("Agent has capabilities matching role: %s", role.RoleName),
|
||||||
|
"endpoint": m.endpoint,
|
||||||
|
"p2p_addr": m.p2pAddr,
|
||||||
|
}
|
||||||
|
|
||||||
|
payload, err := json.Marshal(claim)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to marshal claim: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodPost, claimURL, bytes.NewBuffer(payload))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create claim request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
resp, err := m.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to send claim: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
|
||||||
|
var errorResp map[string]interface{}
|
||||||
|
_ = json.NewDecoder(resp.Body).Decode(&errorResp)
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusConflict {
|
||||||
|
reason := "role already claimed"
|
||||||
|
if msg, ok := errorResp["error"].(string); ok && msg != "" {
|
||||||
|
reason = msg
|
||||||
|
}
|
||||||
|
return nil, fmt.Errorf("%w: %s", ErrRoleConflict, reason)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil, fmt.Errorf("claim rejected (status %d): %v", resp.StatusCode, errorResp)
|
||||||
|
}
|
||||||
|
|
||||||
|
var claimResp roleClaimResponse
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&claimResp); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to decode claim response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
assignment := &RoleAssignment{
|
||||||
|
CouncilID: opportunity.CouncilID,
|
||||||
|
RoleName: role.RoleName,
|
||||||
|
UCXLAddress: claimResp.UCXLAddress,
|
||||||
|
Profile: claimResp.RoleProfile,
|
||||||
|
}
|
||||||
|
|
||||||
|
if t, err := time.Parse(time.RFC3339, claimResp.AssignedAt); err == nil {
|
||||||
|
assignment.AssignedAt = t
|
||||||
|
}
|
||||||
|
|
||||||
|
if claimResp.CouncilBrief != nil {
|
||||||
|
assignment.Brief = claimResp.CouncilBrief
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("\n✅ ROLE CLAIM ACCEPTED!\n")
|
||||||
|
fmt.Printf(" Council ID: %s\n", opportunity.CouncilID)
|
||||||
|
fmt.Printf(" Role: %s (%s)\n", role.AgentName, role.RoleName)
|
||||||
|
fmt.Printf(" UCXL: %s\n", assignment.UCXLAddress)
|
||||||
|
fmt.Printf(" Assigned At: %s\n", claimResp.AssignedAt)
|
||||||
|
|
||||||
|
if err := m.preparePersonaAndAck(opportunity.CouncilID, role.RoleName, &assignment.Profile, claimResp.CouncilBrief, whooshEndpoint, assignment); err != nil {
|
||||||
|
fmt.Printf(" ⚠️ Persona preparation encountered an issue: %v\n", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf("\n")
|
||||||
|
return assignment, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) preparePersonaAndAck(councilID, roleName string, profile *RoleProfile, brief *CouncilBrief, whooshEndpoint string, assignment *RoleAssignment) error {
|
||||||
|
if m.personaLoader == nil {
|
||||||
|
return m.sendPersonaAck(councilID, roleName, whooshEndpoint, nil, "", "failed", []string{"persona loader unavailable"})
|
||||||
|
}
|
||||||
|
|
||||||
|
promptKey := profile.PromptKey
|
||||||
|
if promptKey == "" {
|
||||||
|
promptKey = roleName
|
||||||
|
}
|
||||||
|
|
||||||
|
personaCapabilities := profile.Capabilities
|
||||||
|
personaCapabilities = append([]string{}, personaCapabilities...)
|
||||||
|
|
||||||
|
personaEntry, err := m.personaLoader.Compose(promptKey, profile.DisplayName, "", personaCapabilities)
|
||||||
|
if err != nil {
|
||||||
|
return m.sendPersonaAck(councilID, roleName, whooshEndpoint, nil, "", "failed", []string{err.Error()})
|
||||||
|
}
|
||||||
|
|
||||||
|
hash := sha256.Sum256([]byte(personaEntry.SystemPrompt))
|
||||||
|
personaHash := hex.EncodeToString(hash[:])
|
||||||
|
|
||||||
|
assignment.Persona = personaEntry
|
||||||
|
assignment.PersonaHash = personaHash
|
||||||
|
|
||||||
|
if err := m.sendPersonaAck(councilID, roleName, whooshEndpoint, personaEntry, personaHash, "loaded", nil); err != nil {
|
||||||
|
return err
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) sendPersonaAck(councilID, roleName, whooshEndpoint string, personaEntry *persona.Persona, personaHash string, status string, errs []string) error {
|
||||||
|
ackURL := fmt.Sprintf("%s/api/v1/councils/%s/roles/%s/personas", strings.TrimRight(whooshEndpoint, "/"), councilID, roleName)
|
||||||
|
|
||||||
|
payload := map[string]interface{}{
|
||||||
|
"agent_id": m.agentID,
|
||||||
|
"status": status,
|
||||||
|
"model_provider": defaultModelProvider,
|
||||||
|
"capabilities": m.capabilities,
|
||||||
|
"metadata": map[string]interface{}{
|
||||||
|
"endpoint": m.endpoint,
|
||||||
|
"p2p_addr": m.p2pAddr,
|
||||||
|
"agent_name": m.agentName,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
if personaEntry != nil {
|
||||||
|
payload["system_prompt_hash"] = personaHash
|
||||||
|
payload["model_name"] = personaEntry.Model
|
||||||
|
if len(personaEntry.Capabilities) > 0 {
|
||||||
|
payload["capabilities"] = personaEntry.Capabilities
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(errs) > 0 {
|
||||||
|
payload["errors"] = errs
|
||||||
|
}
|
||||||
|
|
||||||
|
body, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("marshal persona ack: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequest(http.MethodPost, ackURL, bytes.NewBuffer(body))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("create persona ack request: %w", err)
|
||||||
|
}
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
resp, err := m.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("send persona ack: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
|
||||||
|
return fmt.Errorf("persona ack rejected with status %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
fmt.Printf(" 📫 Persona status '%s' acknowledged by WHOOSH\n", status)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// HandleCouncilBrief records the design brief assigned to this agent once WHOOSH dispatches it.
|
||||||
|
func (m *Manager) HandleCouncilBrief(councilID, roleName string, brief *CouncilBrief) {
|
||||||
|
if brief == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
|
||||||
|
if m.currentAssignment == nil {
|
||||||
|
fmt.Printf("⚠️ Received council brief for %s (%s) but agent has no active assignment\n", councilID, roleName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if m.currentAssignment.CouncilID != councilID || !strings.EqualFold(m.currentAssignment.RoleName, roleName) {
|
||||||
|
fmt.Printf("⚠️ Received council brief for %s (%s) but agent is assigned to %s (%s)\n", councilID, roleName, m.currentAssignment.CouncilID, m.currentAssignment.RoleName)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
brief.CouncilID = councilID
|
||||||
|
brief.RoleName = roleName
|
||||||
|
m.currentAssignment.Brief = brief
|
||||||
|
|
||||||
|
fmt.Printf("📦 Design brief received for council %s (%s)\n", councilID, roleName)
|
||||||
|
if brief.BriefURL != "" {
|
||||||
|
fmt.Printf(" Brief URL: %s\n", brief.BriefURL)
|
||||||
|
}
|
||||||
|
if brief.Summary != "" {
|
||||||
|
fmt.Printf(" Summary: %s\n", brief.Summary)
|
||||||
|
}
|
||||||
|
if len(brief.ExpectedArtifacts) > 0 {
|
||||||
|
fmt.Printf(" Expected Artifacts: %v\n", brief.ExpectedArtifacts)
|
||||||
|
}
|
||||||
|
if brief.HMMMTopic != "" {
|
||||||
|
fmt.Printf(" HMMM Topic: %s\n", brief.HMMMTopic)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) hasActiveAssignment() bool {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
return m.currentAssignment != nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) setCurrentAssignment(assignment *RoleAssignment) {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
m.currentAssignment = assignment
|
||||||
|
}
|
||||||
|
|
||||||
|
func (m *Manager) currentAssignmentSnapshot() *RoleAssignment {
|
||||||
|
m.mu.Lock()
|
||||||
|
defer m.mu.Unlock()
|
||||||
|
return m.currentAssignment
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetCurrentAssignment returns the current role assignment (public accessor)
|
||||||
|
func (m *Manager) GetCurrentAssignment() *RoleAssignment {
|
||||||
|
return m.currentAssignmentSnapshot()
|
||||||
|
}
|
||||||
|
|
||||||
|
// roleClaimResponse mirrors WHOOSH role claim response payload.
|
||||||
|
type roleClaimResponse struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
CouncilID string `json:"council_id"`
|
||||||
|
RoleName string `json:"role_name"`
|
||||||
|
UCXLAddress string `json:"ucxl_address"`
|
||||||
|
AssignedAt string `json:"assigned_at"`
|
||||||
|
RoleProfile RoleProfile `json:"role_profile"`
|
||||||
|
CouncilBrief *CouncilBrief `json:"council_brief"`
|
||||||
|
PersonaStatus string `json:"persona_status"`
|
||||||
|
}
|
||||||
@@ -1,12 +1,18 @@
|
|||||||
package runtime
|
package runtime
|
||||||
|
|
||||||
import (
|
import (
|
||||||
|
"bytes"
|
||||||
"context"
|
"context"
|
||||||
|
"encoding/json"
|
||||||
"fmt"
|
"fmt"
|
||||||
|
"net/http"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"chorus/internal/council"
|
||||||
"chorus/internal/logging"
|
"chorus/internal/logging"
|
||||||
|
"chorus/pkg/ai"
|
||||||
"chorus/pkg/dht"
|
"chorus/pkg/dht"
|
||||||
|
"chorus/pkg/execution"
|
||||||
"chorus/pkg/health"
|
"chorus/pkg/health"
|
||||||
"chorus/pkg/shutdown"
|
"chorus/pkg/shutdown"
|
||||||
"chorus/pubsub"
|
"chorus/pubsub"
|
||||||
@@ -39,6 +45,10 @@ func (r *SharedRuntime) StartAgentMode() error {
|
|||||||
// Start status reporting
|
// Start status reporting
|
||||||
go r.statusReporter()
|
go r.statusReporter()
|
||||||
|
|
||||||
|
// Start council brief processing
|
||||||
|
ctx := context.Background()
|
||||||
|
go r.processBriefs(ctx)
|
||||||
|
|
||||||
r.Logger.Info("🔍 Listening for peers on container network...")
|
r.Logger.Info("🔍 Listening for peers on container network...")
|
||||||
r.Logger.Info("📡 Ready for task coordination and meta-discussion")
|
r.Logger.Info("📡 Ready for task coordination and meta-discussion")
|
||||||
r.Logger.Info("🎯 HMMM collaborative reasoning enabled")
|
r.Logger.Info("🎯 HMMM collaborative reasoning enabled")
|
||||||
@@ -321,3 +331,185 @@ func (r *SharedRuntime) setupGracefulShutdown(shutdownManager *shutdown.Manager,
|
|||||||
|
|
||||||
r.Logger.Info("🛡️ Graceful shutdown components registered")
|
r.Logger.Info("🛡️ Graceful shutdown components registered")
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// processBriefs polls for council briefs and executes them
|
||||||
|
func (r *SharedRuntime) processBriefs(ctx context.Context) {
|
||||||
|
ticker := time.NewTicker(15 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
r.Logger.Info("📦 Brief processing loop started")
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
r.Logger.Info("📦 Brief processing loop stopped")
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
if r.HTTPServer == nil || r.HTTPServer.CouncilManager == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
assignment := r.HTTPServer.CouncilManager.GetCurrentAssignment()
|
||||||
|
if assignment == nil || assignment.Brief == nil {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if we have a brief to execute
|
||||||
|
brief := assignment.Brief
|
||||||
|
if brief.BriefURL == "" && brief.Summary == "" {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Logger.Info("📦 Processing design brief for council %s, role %s", assignment.CouncilID, assignment.RoleName)
|
||||||
|
|
||||||
|
// Execute the brief
|
||||||
|
if err := r.executeBrief(ctx, assignment); err != nil {
|
||||||
|
r.Logger.Error("❌ Failed to execute brief: %v", err)
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Logger.Info("✅ Brief execution completed for council %s", assignment.CouncilID)
|
||||||
|
|
||||||
|
// Clear the brief after execution to prevent re-execution
|
||||||
|
assignment.Brief = nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// executeBrief executes a council brief using the ExecutionEngine
|
||||||
|
func (r *SharedRuntime) executeBrief(ctx context.Context, assignment *council.RoleAssignment) error {
|
||||||
|
brief := assignment.Brief
|
||||||
|
if brief == nil {
|
||||||
|
return fmt.Errorf("no brief to execute")
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create execution engine
|
||||||
|
engine := execution.NewTaskExecutionEngine()
|
||||||
|
|
||||||
|
// Create AI provider factory
|
||||||
|
aiFactory := ai.NewProviderFactory()
|
||||||
|
|
||||||
|
engineConfig := &execution.EngineConfig{
|
||||||
|
AIProviderFactory: aiFactory,
|
||||||
|
MaxConcurrentTasks: 1,
|
||||||
|
DefaultTimeout: time.Hour,
|
||||||
|
EnableMetrics: true,
|
||||||
|
LogLevel: "info",
|
||||||
|
}
|
||||||
|
|
||||||
|
if err := engine.Initialize(ctx, engineConfig); err != nil {
|
||||||
|
return fmt.Errorf("failed to initialize execution engine: %w", err)
|
||||||
|
}
|
||||||
|
defer engine.Shutdown()
|
||||||
|
|
||||||
|
// Build execution request
|
||||||
|
request := r.buildExecutionRequest(assignment)
|
||||||
|
|
||||||
|
r.Logger.Info("🚀 Executing brief for council %s, role %s", assignment.CouncilID, assignment.RoleName)
|
||||||
|
|
||||||
|
// Track task
|
||||||
|
taskID := fmt.Sprintf("council-%s-%s", assignment.CouncilID, assignment.RoleName)
|
||||||
|
r.TaskTracker.AddTask(taskID)
|
||||||
|
defer r.TaskTracker.RemoveTask(taskID)
|
||||||
|
|
||||||
|
// Execute the task
|
||||||
|
result, err := engine.ExecuteTask(ctx, request)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("task execution failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Logger.Info("✅ Task execution successful. Output: %s", result.Output)
|
||||||
|
|
||||||
|
// Upload results to WHOOSH
|
||||||
|
if err := r.uploadResults(assignment, result); err != nil {
|
||||||
|
r.Logger.Error("⚠️ Failed to upload results to WHOOSH: %v", err)
|
||||||
|
// Don't fail the execution if upload fails
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildExecutionRequest converts a council brief to an execution request
|
||||||
|
func (r *SharedRuntime) buildExecutionRequest(assignment *council.RoleAssignment) *execution.TaskExecutionRequest {
|
||||||
|
brief := assignment.Brief
|
||||||
|
|
||||||
|
// Build task description from brief
|
||||||
|
taskDescription := brief.Summary
|
||||||
|
if taskDescription == "" {
|
||||||
|
taskDescription = "Execute council brief"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add additional context
|
||||||
|
additionalContext := map[string]interface{}{
|
||||||
|
"council_id": assignment.CouncilID,
|
||||||
|
"role_name": assignment.RoleName,
|
||||||
|
"brief_url": brief.BriefURL,
|
||||||
|
"expected_artifacts": brief.ExpectedArtifacts,
|
||||||
|
"hmmm_topic": brief.HMMMTopic,
|
||||||
|
"persona": assignment.Persona,
|
||||||
|
}
|
||||||
|
|
||||||
|
return &execution.TaskExecutionRequest{
|
||||||
|
ID: fmt.Sprintf("council-%s-%s", assignment.CouncilID, assignment.RoleName),
|
||||||
|
Type: "council_brief",
|
||||||
|
Description: taskDescription,
|
||||||
|
Context: additionalContext,
|
||||||
|
Requirements: &execution.TaskRequirements{
|
||||||
|
AIModel: r.Config.AI.Provider,
|
||||||
|
SandboxType: "docker",
|
||||||
|
RequiredTools: []string{},
|
||||||
|
},
|
||||||
|
Timeout: time.Hour,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// uploadResults uploads execution results to WHOOSH
|
||||||
|
func (r *SharedRuntime) uploadResults(assignment *council.RoleAssignment, result *execution.TaskExecutionResult) error {
|
||||||
|
// Get WHOOSH endpoint from environment or config
|
||||||
|
whooshEndpoint := r.Config.WHOOSHAPI.BaseURL
|
||||||
|
if whooshEndpoint == "" {
|
||||||
|
whooshEndpoint = "http://whoosh:8080"
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build result payload
|
||||||
|
payload := map[string]interface{}{
|
||||||
|
"council_id": assignment.CouncilID,
|
||||||
|
"role_name": assignment.RoleName,
|
||||||
|
"agent_id": r.Config.Agent.ID,
|
||||||
|
"ucxl_address": assignment.UCXLAddress,
|
||||||
|
"output": result.Output,
|
||||||
|
"artifacts": result.Artifacts,
|
||||||
|
"success": result.Success,
|
||||||
|
"error_message": result.ErrorMessage,
|
||||||
|
"execution_time": result.Metrics.Duration.Seconds(),
|
||||||
|
"timestamp": time.Now().Unix(),
|
||||||
|
}
|
||||||
|
|
||||||
|
jsonData, err := json.Marshal(payload)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to marshal result payload: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Send to WHOOSH
|
||||||
|
url := fmt.Sprintf("%s/api/councils/%s/results", whooshEndpoint, assignment.CouncilID)
|
||||||
|
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to create HTTP request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
|
||||||
|
client := &http.Client{Timeout: 30 * time.Second}
|
||||||
|
resp, err := client.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to send results to WHOOSH: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
|
||||||
|
return fmt.Errorf("WHOOSH returned status %d", resp.StatusCode)
|
||||||
|
}
|
||||||
|
|
||||||
|
r.Logger.Info("📤 Results uploaded to WHOOSH for council %s", assignment.CouncilID)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|||||||
@@ -21,6 +21,7 @@ import (
|
|||||||
"chorus/pkg/dht"
|
"chorus/pkg/dht"
|
||||||
"chorus/pkg/election"
|
"chorus/pkg/election"
|
||||||
"chorus/pkg/health"
|
"chorus/pkg/health"
|
||||||
|
"chorus/pkg/mcp"
|
||||||
"chorus/pkg/metrics"
|
"chorus/pkg/metrics"
|
||||||
"chorus/pkg/prompt"
|
"chorus/pkg/prompt"
|
||||||
"chorus/pkg/shhh"
|
"chorus/pkg/shhh"
|
||||||
@@ -682,5 +683,26 @@ func initializeAIProvider(cfg *config.Config, logger *SimpleLogger) error {
|
|||||||
reasoning.SetDefaultSystemPrompt(d)
|
reasoning.SetDefaultSystemPrompt(d)
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Initialize LightRAG client if enabled
|
||||||
|
if cfg.LightRAG.Enabled {
|
||||||
|
lightragConfig := mcp.LightRAGConfig{
|
||||||
|
BaseURL: cfg.LightRAG.BaseURL,
|
||||||
|
Timeout: cfg.LightRAG.Timeout,
|
||||||
|
APIKey: cfg.LightRAG.APIKey,
|
||||||
|
}
|
||||||
|
lightragClient := mcp.NewLightRAGClient(lightragConfig)
|
||||||
|
|
||||||
|
// Test connectivity
|
||||||
|
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
|
||||||
|
defer cancel()
|
||||||
|
if lightragClient.IsHealthy(ctx) {
|
||||||
|
reasoning.SetLightRAGClient(lightragClient)
|
||||||
|
logger.Info("📚 LightRAG RAG system enabled - Endpoint: %s, Mode: %s",
|
||||||
|
cfg.LightRAG.BaseURL, cfg.LightRAG.DefaultMode)
|
||||||
|
} else {
|
||||||
|
logger.Warn("⚠️ LightRAG enabled but server not healthy at %s", cfg.LightRAG.BaseURL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -24,6 +24,7 @@ type Config struct {
|
|||||||
Slurp SlurpConfig `yaml:"slurp"`
|
Slurp SlurpConfig `yaml:"slurp"`
|
||||||
Security SecurityConfig `yaml:"security"`
|
Security SecurityConfig `yaml:"security"`
|
||||||
WHOOSHAPI WHOOSHAPIConfig `yaml:"whoosh_api"`
|
WHOOSHAPI WHOOSHAPIConfig `yaml:"whoosh_api"`
|
||||||
|
LightRAG LightRAGConfig `yaml:"lightrag"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// AgentConfig defines agent-specific settings
|
// AgentConfig defines agent-specific settings
|
||||||
@@ -161,6 +162,15 @@ type WHOOSHAPIConfig struct {
|
|||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// LightRAGConfig defines LightRAG RAG service settings
|
||||||
|
type LightRAGConfig struct {
|
||||||
|
Enabled bool `yaml:"enabled"`
|
||||||
|
BaseURL string `yaml:"base_url"`
|
||||||
|
Timeout time.Duration `yaml:"timeout"`
|
||||||
|
APIKey string `yaml:"api_key"`
|
||||||
|
DefaultMode string `yaml:"default_mode"` // naive, local, global, hybrid
|
||||||
|
}
|
||||||
|
|
||||||
// LoadFromEnvironment loads configuration from environment variables
|
// LoadFromEnvironment loads configuration from environment variables
|
||||||
func LoadFromEnvironment() (*Config, error) {
|
func LoadFromEnvironment() (*Config, error) {
|
||||||
cfg := &Config{
|
cfg := &Config{
|
||||||
@@ -270,6 +280,13 @@ func LoadFromEnvironment() (*Config, error) {
|
|||||||
Token: os.Getenv("WHOOSH_API_TOKEN"),
|
Token: os.Getenv("WHOOSH_API_TOKEN"),
|
||||||
Enabled: getEnvBoolOrDefault("WHOOSH_API_ENABLED", false),
|
Enabled: getEnvBoolOrDefault("WHOOSH_API_ENABLED", false),
|
||||||
},
|
},
|
||||||
|
LightRAG: LightRAGConfig{
|
||||||
|
Enabled: getEnvBoolOrDefault("CHORUS_LIGHTRAG_ENABLED", false),
|
||||||
|
BaseURL: getEnvOrDefault("CHORUS_LIGHTRAG_BASE_URL", "http://127.0.0.1:9621"),
|
||||||
|
Timeout: getEnvDurationOrDefault("CHORUS_LIGHTRAG_TIMEOUT", 30*time.Second),
|
||||||
|
APIKey: os.Getenv("CHORUS_LIGHTRAG_API_KEY"),
|
||||||
|
DefaultMode: getEnvOrDefault("CHORUS_LIGHTRAG_DEFAULT_MODE", "hybrid"),
|
||||||
|
},
|
||||||
}
|
}
|
||||||
|
|
||||||
// Validate required configuration
|
// Validate required configuration
|
||||||
|
|||||||
265
pkg/mcp/lightrag_client.go
Normal file
265
pkg/mcp/lightrag_client.go
Normal file
@@ -0,0 +1,265 @@
|
|||||||
|
package mcp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"bytes"
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LightRAGClient provides access to LightRAG MCP server
|
||||||
|
type LightRAGClient struct {
|
||||||
|
baseURL string
|
||||||
|
httpClient *http.Client
|
||||||
|
apiKey string // Optional API key for authentication
|
||||||
|
}
|
||||||
|
|
||||||
|
// LightRAGConfig holds configuration for LightRAG client
|
||||||
|
type LightRAGConfig struct {
|
||||||
|
BaseURL string // e.g., "http://127.0.0.1:9621"
|
||||||
|
Timeout time.Duration // HTTP timeout
|
||||||
|
APIKey string // Optional API key
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryMode represents LightRAG query modes
|
||||||
|
type QueryMode string
|
||||||
|
|
||||||
|
const (
|
||||||
|
QueryModeNaive QueryMode = "naive" // Simple semantic search
|
||||||
|
QueryModeLocal QueryMode = "local" // Local graph traversal
|
||||||
|
QueryModeGlobal QueryMode = "global" // Global graph analysis
|
||||||
|
QueryModeHybrid QueryMode = "hybrid" // Combined approach
|
||||||
|
)
|
||||||
|
|
||||||
|
// QueryRequest represents a LightRAG query request
|
||||||
|
type QueryRequest struct {
|
||||||
|
Query string `json:"query"`
|
||||||
|
Mode QueryMode `json:"mode"`
|
||||||
|
OnlyNeedContext bool `json:"only_need_context,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryResponse represents a LightRAG query response
|
||||||
|
type QueryResponse struct {
|
||||||
|
Response string `json:"response"`
|
||||||
|
Context string `json:"context,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// InsertRequest represents a LightRAG document insertion request
|
||||||
|
type InsertRequest struct {
|
||||||
|
Text string `json:"text"`
|
||||||
|
Description string `json:"description,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// InsertResponse represents a LightRAG insertion response
|
||||||
|
type InsertResponse struct {
|
||||||
|
Success bool `json:"success"`
|
||||||
|
Message string `json:"message"`
|
||||||
|
Status string `json:"status"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// HealthResponse represents LightRAG health check response
|
||||||
|
type HealthResponse struct {
|
||||||
|
Status string `json:"status"`
|
||||||
|
WorkingDirectory string `json:"working_directory"`
|
||||||
|
InputDirectory string `json:"input_directory"`
|
||||||
|
Configuration map[string]interface{} `json:"configuration"`
|
||||||
|
AuthMode string `json:"auth_mode"`
|
||||||
|
PipelineBusy bool `json:"pipeline_busy"`
|
||||||
|
KeyedLocks map[string]interface{} `json:"keyed_locks"`
|
||||||
|
CoreVersion string `json:"core_version"`
|
||||||
|
APIVersion string `json:"api_version"`
|
||||||
|
WebUITitle string `json:"webui_title"`
|
||||||
|
WebUIDescription string `json:"webui_description"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLightRAGClient creates a new LightRAG MCP client
|
||||||
|
func NewLightRAGClient(config LightRAGConfig) *LightRAGClient {
|
||||||
|
if config.Timeout == 0 {
|
||||||
|
config.Timeout = 30 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
return &LightRAGClient{
|
||||||
|
baseURL: config.BaseURL,
|
||||||
|
httpClient: &http.Client{
|
||||||
|
Timeout: config.Timeout,
|
||||||
|
},
|
||||||
|
apiKey: config.APIKey,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query performs a RAG query against LightRAG
|
||||||
|
func (c *LightRAGClient) Query(ctx context.Context, query string, mode QueryMode) (*QueryResponse, error) {
|
||||||
|
req := QueryRequest{
|
||||||
|
Query: query,
|
||||||
|
Mode: mode,
|
||||||
|
}
|
||||||
|
|
||||||
|
respData, err := c.post(ctx, "/query", req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var response QueryResponse
|
||||||
|
if err := json.Unmarshal(respData, &response); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// QueryWithContext performs a RAG query and returns both response and context
|
||||||
|
func (c *LightRAGClient) QueryWithContext(ctx context.Context, query string, mode QueryMode) (*QueryResponse, error) {
|
||||||
|
req := QueryRequest{
|
||||||
|
Query: query,
|
||||||
|
Mode: mode,
|
||||||
|
OnlyNeedContext: false, // Get both response and context
|
||||||
|
}
|
||||||
|
|
||||||
|
respData, err := c.post(ctx, "/query", req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("query with context failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var response QueryResponse
|
||||||
|
if err := json.Unmarshal(respData, &response); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetContext retrieves context without generating a response
|
||||||
|
func (c *LightRAGClient) GetContext(ctx context.Context, query string, mode QueryMode) (string, error) {
|
||||||
|
req := QueryRequest{
|
||||||
|
Query: query,
|
||||||
|
Mode: mode,
|
||||||
|
OnlyNeedContext: true,
|
||||||
|
}
|
||||||
|
|
||||||
|
respData, err := c.post(ctx, "/query", req)
|
||||||
|
if err != nil {
|
||||||
|
return "", fmt.Errorf("get context failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var response QueryResponse
|
||||||
|
if err := json.Unmarshal(respData, &response); err != nil {
|
||||||
|
return "", fmt.Errorf("failed to parse response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return response.Context, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Insert adds a document to the LightRAG knowledge base
|
||||||
|
func (c *LightRAGClient) Insert(ctx context.Context, text, description string) error {
|
||||||
|
req := InsertRequest{
|
||||||
|
Text: text,
|
||||||
|
Description: description,
|
||||||
|
}
|
||||||
|
|
||||||
|
respData, err := c.post(ctx, "/insert", req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("insert failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var response InsertResponse
|
||||||
|
if err := json.Unmarshal(respData, &response); err != nil {
|
||||||
|
return fmt.Errorf("failed to parse insert response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if !response.Success {
|
||||||
|
return fmt.Errorf("insert failed: %s", response.Message)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Health checks the health of the LightRAG server
|
||||||
|
func (c *LightRAGClient) Health(ctx context.Context) (*HealthResponse, error) {
|
||||||
|
respData, err := c.get(ctx, "/health")
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("health check failed: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
var response HealthResponse
|
||||||
|
if err := json.Unmarshal(respData, &response); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to parse health response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &response, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsHealthy checks if LightRAG server is healthy
|
||||||
|
func (c *LightRAGClient) IsHealthy(ctx context.Context) bool {
|
||||||
|
health, err := c.Health(ctx)
|
||||||
|
if err != nil {
|
||||||
|
return false
|
||||||
|
}
|
||||||
|
return health.Status == "healthy"
|
||||||
|
}
|
||||||
|
|
||||||
|
// post performs an HTTP POST request
|
||||||
|
func (c *LightRAGClient) post(ctx context.Context, endpoint string, body interface{}) ([]byte, error) {
|
||||||
|
jsonData, err := json.Marshal(body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to marshal request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+endpoint, bytes.NewBuffer(jsonData))
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
req.Header.Set("Content-Type", "application/json")
|
||||||
|
if c.apiKey != "" {
|
||||||
|
req.Header.Set("Authorization", "Bearer "+c.apiKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("request failed: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
respData, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("request failed with status %d: %s", resp.StatusCode, string(respData))
|
||||||
|
}
|
||||||
|
|
||||||
|
return respData, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// get performs an HTTP GET request
|
||||||
|
func (c *LightRAGClient) get(ctx context.Context, endpoint string) ([]byte, error) {
|
||||||
|
req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+endpoint, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if c.apiKey != "" {
|
||||||
|
req.Header.Set("Authorization", "Bearer "+c.apiKey)
|
||||||
|
}
|
||||||
|
|
||||||
|
resp, err := c.httpClient.Do(req)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("request failed: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
respData, err := io.ReadAll(resp.Body)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to read response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
return nil, fmt.Errorf("request failed with status %d: %s", resp.StatusCode, string(respData))
|
||||||
|
}
|
||||||
|
|
||||||
|
return respData, nil
|
||||||
|
}
|
||||||
243
pkg/mcp/lightrag_client_test.go
Normal file
243
pkg/mcp/lightrag_client_test.go
Normal file
@@ -0,0 +1,243 @@
|
|||||||
|
package mcp
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"testing"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// TestLightRAGClient_NewClient tests client creation
|
||||||
|
func TestLightRAGClient_NewClient(t *testing.T) {
|
||||||
|
config := LightRAGConfig{
|
||||||
|
BaseURL: "http://127.0.0.1:9621",
|
||||||
|
Timeout: 10 * time.Second,
|
||||||
|
APIKey: "",
|
||||||
|
}
|
||||||
|
|
||||||
|
client := NewLightRAGClient(config)
|
||||||
|
if client == nil {
|
||||||
|
t.Fatal("expected non-nil client")
|
||||||
|
}
|
||||||
|
|
||||||
|
if client.baseURL != config.BaseURL {
|
||||||
|
t.Errorf("expected baseURL %s, got %s", config.BaseURL, client.baseURL)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLightRAGClient_Health tests health check
|
||||||
|
// NOTE: This test requires a running LightRAG server at 127.0.0.1:9621
|
||||||
|
func TestLightRAGClient_Health(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
config := LightRAGConfig{
|
||||||
|
BaseURL: "http://127.0.0.1:9621",
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
client := NewLightRAGClient(config)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
health, err := client.Health(ctx)
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("Health check failed (server may not be running): %v", err)
|
||||||
|
t.Skip("skipping test - lightrag server not available")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
if health.Status != "healthy" {
|
||||||
|
t.Errorf("expected status 'healthy', got '%s'", health.Status)
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("LightRAG Health: %s", health.Status)
|
||||||
|
t.Logf("Core Version: %s", health.CoreVersion)
|
||||||
|
t.Logf("API Version: %s", health.APIVersion)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLightRAGClient_IsHealthy tests the convenience health check
|
||||||
|
func TestLightRAGClient_IsHealthy(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
config := LightRAGConfig{
|
||||||
|
BaseURL: "http://127.0.0.1:9621",
|
||||||
|
Timeout: 5 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
client := NewLightRAGClient(config)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
healthy := client.IsHealthy(ctx)
|
||||||
|
if !healthy {
|
||||||
|
t.Log("Server not healthy (may not be running)")
|
||||||
|
t.Skip("skipping test - lightrag server not available")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLightRAGClient_Query tests querying with different modes
|
||||||
|
func TestLightRAGClient_Query(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
config := LightRAGConfig{
|
||||||
|
BaseURL: "http://127.0.0.1:9621",
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
client := NewLightRAGClient(config)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
// First check if server is available
|
||||||
|
if !client.IsHealthy(ctx) {
|
||||||
|
t.Skip("skipping test - lightrag server not available")
|
||||||
|
}
|
||||||
|
|
||||||
|
testCases := []struct {
|
||||||
|
name string
|
||||||
|
query string
|
||||||
|
mode QueryMode
|
||||||
|
}{
|
||||||
|
{
|
||||||
|
name: "naive mode",
|
||||||
|
query: "What is CHORUS?",
|
||||||
|
mode: QueryModeNaive,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "local mode",
|
||||||
|
query: "How does P2P networking work?",
|
||||||
|
mode: QueryModeLocal,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "global mode",
|
||||||
|
query: "What are the main components?",
|
||||||
|
mode: QueryModeGlobal,
|
||||||
|
},
|
||||||
|
{
|
||||||
|
name: "hybrid mode",
|
||||||
|
query: "Explain the architecture",
|
||||||
|
mode: QueryModeHybrid,
|
||||||
|
},
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, tc := range testCases {
|
||||||
|
t.Run(tc.name, func(t *testing.T) {
|
||||||
|
response, err := client.Query(ctx, tc.query, tc.mode)
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("Query failed: %v", err)
|
||||||
|
return // Non-fatal - may just have empty knowledge base
|
||||||
|
}
|
||||||
|
|
||||||
|
if response == nil {
|
||||||
|
t.Error("expected non-nil response")
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Query: %s", tc.query)
|
||||||
|
t.Logf("Mode: %s", tc.mode)
|
||||||
|
t.Logf("Response length: %d chars", len(response.Response))
|
||||||
|
})
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLightRAGClient_GetContext tests context retrieval
|
||||||
|
func TestLightRAGClient_GetContext(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
config := LightRAGConfig{
|
||||||
|
BaseURL: "http://127.0.0.1:9621",
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
client := NewLightRAGClient(config)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
if !client.IsHealthy(ctx) {
|
||||||
|
t.Skip("skipping test - lightrag server not available")
|
||||||
|
}
|
||||||
|
|
||||||
|
context, err := client.GetContext(ctx, "distributed systems", QueryModeHybrid)
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("GetContext failed: %v", err)
|
||||||
|
return // Non-fatal
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Context length: %d chars", len(context))
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLightRAGClient_Insert tests document insertion
|
||||||
|
func TestLightRAGClient_Insert(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
config := LightRAGConfig{
|
||||||
|
BaseURL: "http://127.0.0.1:9621",
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
client := NewLightRAGClient(config)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
if !client.IsHealthy(ctx) {
|
||||||
|
t.Skip("skipping test - lightrag server not available")
|
||||||
|
}
|
||||||
|
|
||||||
|
text := `CHORUS is a distributed task coordination system built on P2P networking.
|
||||||
|
It uses libp2p for peer-to-peer communication and implements democratic leader election.
|
||||||
|
Tasks are executed in Docker sandboxes for security and isolation.`
|
||||||
|
|
||||||
|
description := "CHORUS system overview"
|
||||||
|
|
||||||
|
err := client.Insert(ctx, text, description)
|
||||||
|
if err != nil {
|
||||||
|
t.Errorf("Insert failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Log("Document inserted successfully")
|
||||||
|
|
||||||
|
// Give time for indexing
|
||||||
|
time.Sleep(2 * time.Second)
|
||||||
|
|
||||||
|
// Try to query the inserted document
|
||||||
|
response, err := client.Query(ctx, "What is CHORUS?", QueryModeHybrid)
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("Query after insert failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Query response after insert: %s", response.Response)
|
||||||
|
}
|
||||||
|
|
||||||
|
// TestLightRAGClient_QueryWithContext tests retrieving both response and context
|
||||||
|
func TestLightRAGClient_QueryWithContext(t *testing.T) {
|
||||||
|
if testing.Short() {
|
||||||
|
t.Skip("skipping integration test in short mode")
|
||||||
|
}
|
||||||
|
|
||||||
|
config := LightRAGConfig{
|
||||||
|
BaseURL: "http://127.0.0.1:9621",
|
||||||
|
Timeout: 30 * time.Second,
|
||||||
|
}
|
||||||
|
|
||||||
|
client := NewLightRAGClient(config)
|
||||||
|
ctx := context.Background()
|
||||||
|
|
||||||
|
if !client.IsHealthy(ctx) {
|
||||||
|
t.Skip("skipping test - lightrag server not available")
|
||||||
|
}
|
||||||
|
|
||||||
|
response, err := client.QueryWithContext(ctx, "distributed coordination", QueryModeHybrid)
|
||||||
|
if err != nil {
|
||||||
|
t.Logf("QueryWithContext failed: %v", err)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
t.Logf("Response: %s", response.Response)
|
||||||
|
t.Logf("Context: %s", response.Context)
|
||||||
|
}
|
||||||
@@ -102,6 +102,7 @@ const (
|
|||||||
StatusCollaborating AgentStatus = "collaborating"
|
StatusCollaborating AgentStatus = "collaborating"
|
||||||
StatusEscalating AgentStatus = "escalating"
|
StatusEscalating AgentStatus = "escalating"
|
||||||
StatusTerminating AgentStatus = "terminating"
|
StatusTerminating AgentStatus = "terminating"
|
||||||
|
StatusOffline AgentStatus = "offline"
|
||||||
)
|
)
|
||||||
|
|
||||||
// AgentTask represents a task being worked on by an agent
|
// AgentTask represents a task being worked on by an agent
|
||||||
@@ -427,7 +428,7 @@ func (s *McpServer) processMCPMessage(message map[string]interface{}) (map[strin
|
|||||||
case "tools/call":
|
case "tools/call":
|
||||||
return s.callTool(params)
|
return s.callTool(params)
|
||||||
case "resources/list":
|
case "resources/list":
|
||||||
return s.listResources(), nil
|
return s.listResources()
|
||||||
case "resources/read":
|
case "resources/read":
|
||||||
return s.readResource(params)
|
return s.readResource(params)
|
||||||
default:
|
default:
|
||||||
@@ -626,3 +627,346 @@ type Relation struct {
|
|||||||
Strength float64
|
Strength float64
|
||||||
Evidence []string
|
Evidence []string
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// REST API handlers
|
||||||
|
|
||||||
|
func (s *McpServer) handleAgentsAPI(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.agentsMutex.RLock()
|
||||||
|
defer s.agentsMutex.RUnlock()
|
||||||
|
|
||||||
|
agents := make([]map[string]interface{}, 0, len(s.agents))
|
||||||
|
for _, agent := range s.agents {
|
||||||
|
agent.mutex.RLock()
|
||||||
|
agents = append(agents, map[string]interface{}{
|
||||||
|
"id": agent.ID,
|
||||||
|
"role": agent.Role,
|
||||||
|
"status": agent.Status,
|
||||||
|
"specialization": agent.Specialization,
|
||||||
|
"capabilities": agent.Capabilities,
|
||||||
|
"current_tasks": len(agent.CurrentTasks),
|
||||||
|
"max_tasks": agent.MaxTasks,
|
||||||
|
})
|
||||||
|
agent.mutex.RUnlock()
|
||||||
|
}
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||||
|
"agents": agents,
|
||||||
|
"total": len(agents),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) handleConversationsAPI(w http.ResponseWriter, r *http.Request) {
|
||||||
|
// Collect all active conversation threads from agents
|
||||||
|
conversations := make([]map[string]interface{}, 0)
|
||||||
|
|
||||||
|
s.agentsMutex.RLock()
|
||||||
|
for _, agent := range s.agents {
|
||||||
|
agent.mutex.RLock()
|
||||||
|
for threadID, thread := range agent.ActiveThreads {
|
||||||
|
conversations = append(conversations, map[string]interface{}{
|
||||||
|
"id": threadID,
|
||||||
|
"topic": thread.Topic,
|
||||||
|
"state": thread.State,
|
||||||
|
"participants": len(thread.Participants),
|
||||||
|
"created_at": thread.CreatedAt,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
agent.mutex.RUnlock()
|
||||||
|
}
|
||||||
|
s.agentsMutex.RUnlock()
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||||
|
"conversations": conversations,
|
||||||
|
"total": len(conversations),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) handleStatsAPI(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.stats.mutex.RLock()
|
||||||
|
defer s.stats.mutex.RUnlock()
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||||
|
"start_time": s.stats.StartTime,
|
||||||
|
"uptime_seconds": time.Since(s.stats.StartTime).Seconds(),
|
||||||
|
"total_requests": s.stats.TotalRequests,
|
||||||
|
"active_agents": s.stats.ActiveAgents,
|
||||||
|
"messages_processed": s.stats.MessagesProcessed,
|
||||||
|
"tokens_consumed": s.stats.TokensConsumed,
|
||||||
|
"average_cost_per_task": s.stats.AverageCostPerTask,
|
||||||
|
"error_rate": s.stats.ErrorRate,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) handleHealthCheck(w http.ResponseWriter, r *http.Request) {
|
||||||
|
s.agentsMutex.RLock()
|
||||||
|
agentCount := len(s.agents)
|
||||||
|
s.agentsMutex.RUnlock()
|
||||||
|
|
||||||
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
w.WriteHeader(http.StatusOK)
|
||||||
|
json.NewEncoder(w).Encode(map[string]interface{}{
|
||||||
|
"status": "healthy",
|
||||||
|
"active_agents": agentCount,
|
||||||
|
"uptime": time.Since(s.stats.StartTime).String(),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
// Message handlers
|
||||||
|
|
||||||
|
func (s *McpServer) handleBzzzMessages() {
|
||||||
|
// Subscribe to BZZZ messages via pubsub
|
||||||
|
if s.pubsub == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Listen for BZZZ coordination messages
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
// Process BZZZ messages from pubsub
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) handleHmmmMessages() {
|
||||||
|
// Subscribe to HMMM messages via pubsub
|
||||||
|
if s.pubsub == nil {
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// Listen for HMMM discussion messages
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
default:
|
||||||
|
// Process HMMM messages from pubsub
|
||||||
|
time.Sleep(1 * time.Second)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) periodicTasks() {
|
||||||
|
ticker := time.NewTicker(30 * time.Second)
|
||||||
|
defer ticker.Stop()
|
||||||
|
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-s.ctx.Done():
|
||||||
|
return
|
||||||
|
case <-ticker.C:
|
||||||
|
// Update agent statistics
|
||||||
|
s.agentsMutex.RLock()
|
||||||
|
s.stats.mutex.Lock()
|
||||||
|
s.stats.ActiveAgents = len(s.agents)
|
||||||
|
s.stats.mutex.Unlock()
|
||||||
|
s.agentsMutex.RUnlock()
|
||||||
|
|
||||||
|
// Re-announce agents periodically
|
||||||
|
s.agentsMutex.RLock()
|
||||||
|
for _, agent := range s.agents {
|
||||||
|
if time.Since(agent.LastAnnouncement) > 5*time.Minute {
|
||||||
|
s.announceAgent(agent)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
s.agentsMutex.RUnlock()
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Agent management
|
||||||
|
|
||||||
|
func (s *McpServer) stopAgent(agent *GPTAgent) {
|
||||||
|
agent.mutex.Lock()
|
||||||
|
defer agent.mutex.Unlock()
|
||||||
|
|
||||||
|
// Update status
|
||||||
|
agent.Status = StatusOffline
|
||||||
|
|
||||||
|
// Clean up active tasks
|
||||||
|
for taskID := range agent.CurrentTasks {
|
||||||
|
delete(agent.CurrentTasks, taskID)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Clean up active threads
|
||||||
|
for threadID := range agent.ActiveThreads {
|
||||||
|
delete(agent.ActiveThreads, threadID)
|
||||||
|
}
|
||||||
|
|
||||||
|
s.hlog.Append(logging.PeerLeft, map[string]interface{}{
|
||||||
|
"agent_id": agent.ID,
|
||||||
|
"role": string(agent.Role),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) initiateCollaboration(thread *ConversationThread) error {
|
||||||
|
// Send collaboration invitation to all participants
|
||||||
|
for _, participant := range thread.Participants {
|
||||||
|
s.agentsMutex.RLock()
|
||||||
|
agent, exists := s.agents[participant.AgentID]
|
||||||
|
s.agentsMutex.RUnlock()
|
||||||
|
|
||||||
|
if !exists {
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
|
||||||
|
// Update participant status
|
||||||
|
agent.mutex.Lock()
|
||||||
|
participant.Status = ParticipantStatusActive
|
||||||
|
agent.mutex.Unlock()
|
||||||
|
|
||||||
|
// Log collaboration start
|
||||||
|
s.hlog.Append(logging.Collaboration, map[string]interface{}{
|
||||||
|
"event": "collaboration_started",
|
||||||
|
"thread_id": thread.ID,
|
||||||
|
"agent_id": agent.ID,
|
||||||
|
"role": string(agent.Role),
|
||||||
|
})
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// MCP tool listing
|
||||||
|
|
||||||
|
func (s *McpServer) listTools() map[string]interface{} {
|
||||||
|
return map[string]interface{}{
|
||||||
|
"tools": []map[string]interface{}{
|
||||||
|
{
|
||||||
|
"name": "chorus_announce",
|
||||||
|
"description": "Announce agent availability to CHORUS network",
|
||||||
|
"parameters": map[string]interface{}{
|
||||||
|
"agent_id": "string",
|
||||||
|
"capabilities": "array",
|
||||||
|
"specialization": "string",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "chorus_lookup",
|
||||||
|
"description": "Look up available agents by capability or role",
|
||||||
|
"parameters": map[string]interface{}{
|
||||||
|
"capability": "string",
|
||||||
|
"role": "string",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "chorus_get",
|
||||||
|
"description": "Retrieve context or data from CHORUS DHT",
|
||||||
|
"parameters": map[string]interface{}{
|
||||||
|
"key": "string",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "chorus_store",
|
||||||
|
"description": "Store data in CHORUS DHT",
|
||||||
|
"parameters": map[string]interface{}{
|
||||||
|
"key": "string",
|
||||||
|
"value": "string",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"name": "chorus_collaborate",
|
||||||
|
"description": "Request multi-agent collaboration on a task",
|
||||||
|
"parameters": map[string]interface{}{
|
||||||
|
"task": "object",
|
||||||
|
"required_roles": "array",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// MCP resource handling
|
||||||
|
|
||||||
|
func (s *McpServer) listResources() (map[string]interface{}, error) {
|
||||||
|
return map[string]interface{}{
|
||||||
|
"resources": []map[string]interface{}{
|
||||||
|
{
|
||||||
|
"uri": "chorus://agents",
|
||||||
|
"name": "Available Agents",
|
||||||
|
"description": "List of all available CHORUS agents",
|
||||||
|
"mimeType": "application/json",
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"uri": "chorus://dht",
|
||||||
|
"name": "DHT Storage",
|
||||||
|
"description": "Access to distributed hash table storage",
|
||||||
|
"mimeType": "application/json",
|
||||||
|
},
|
||||||
|
},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) readResource(params map[string]interface{}) (map[string]interface{}, error) {
|
||||||
|
uri, ok := params["uri"].(string)
|
||||||
|
if !ok {
|
||||||
|
return nil, fmt.Errorf("missing uri parameter")
|
||||||
|
}
|
||||||
|
|
||||||
|
switch uri {
|
||||||
|
case "chorus://agents":
|
||||||
|
s.agentsMutex.RLock()
|
||||||
|
defer s.agentsMutex.RUnlock()
|
||||||
|
|
||||||
|
agents := make([]map[string]interface{}, 0, len(s.agents))
|
||||||
|
for _, agent := range s.agents {
|
||||||
|
agents = append(agents, map[string]interface{}{
|
||||||
|
"id": agent.ID,
|
||||||
|
"role": agent.Role,
|
||||||
|
"status": agent.Status,
|
||||||
|
})
|
||||||
|
}
|
||||||
|
return map[string]interface{}{"agents": agents}, nil
|
||||||
|
|
||||||
|
case "chorus://dht":
|
||||||
|
return map[string]interface{}{"message": "DHT access not implemented"}, nil
|
||||||
|
|
||||||
|
default:
|
||||||
|
return nil, fmt.Errorf("unknown resource: %s", uri)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BZZZ tool handlers
|
||||||
|
|
||||||
|
func (s *McpServer) handleBzzzLookup(params map[string]interface{}) (map[string]interface{}, error) {
|
||||||
|
// Stub: Lookup agents or resources via BZZZ
|
||||||
|
return map[string]interface{}{
|
||||||
|
"results": []interface{}{},
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) handleBzzzGet(params map[string]interface{}) (map[string]interface{}, error) {
|
||||||
|
// Stub: Get data from BZZZ system
|
||||||
|
return map[string]interface{}{
|
||||||
|
"data": nil,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) handleBzzzPost(params map[string]interface{}) (map[string]interface{}, error) {
|
||||||
|
// Stub: Post data to BZZZ system
|
||||||
|
return map[string]interface{}{
|
||||||
|
"success": false,
|
||||||
|
"message": "not implemented",
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) handleBzzzThread(params map[string]interface{}) (map[string]interface{}, error) {
|
||||||
|
// Stub: Handle BZZZ thread operations
|
||||||
|
return map[string]interface{}{
|
||||||
|
"thread": nil,
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (s *McpServer) handleBzzzSubscribe(params map[string]interface{}) (map[string]interface{}, error) {
|
||||||
|
// Stub: Subscribe to BZZZ events
|
||||||
|
return map[string]interface{}{
|
||||||
|
"subscribed": false,
|
||||||
|
"message": "not implemented",
|
||||||
|
}, nil
|
||||||
|
}
|
||||||
218
pkg/slurp/context/lightrag.go
Normal file
218
pkg/slurp/context/lightrag.go
Normal file
@@ -0,0 +1,218 @@
|
|||||||
|
package context
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"fmt"
|
||||||
|
"strings"
|
||||||
|
|
||||||
|
"chorus/pkg/mcp"
|
||||||
|
"chorus/pkg/ucxl"
|
||||||
|
)
|
||||||
|
|
||||||
|
// LightRAGEnricher enriches context nodes with RAG-retrieved information
|
||||||
|
type LightRAGEnricher struct {
|
||||||
|
client *mcp.LightRAGClient
|
||||||
|
defaultMode mcp.QueryMode
|
||||||
|
enabled bool
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewLightRAGEnricher creates a new LightRAG context enricher
|
||||||
|
func NewLightRAGEnricher(client *mcp.LightRAGClient, defaultMode string) *LightRAGEnricher {
|
||||||
|
if client == nil {
|
||||||
|
return &LightRAGEnricher{enabled: false}
|
||||||
|
}
|
||||||
|
|
||||||
|
mode := mcp.QueryModeHybrid // Default to hybrid
|
||||||
|
switch defaultMode {
|
||||||
|
case "naive":
|
||||||
|
mode = mcp.QueryModeNaive
|
||||||
|
case "local":
|
||||||
|
mode = mcp.QueryModeLocal
|
||||||
|
case "global":
|
||||||
|
mode = mcp.QueryModeGlobal
|
||||||
|
case "hybrid":
|
||||||
|
mode = mcp.QueryModeHybrid
|
||||||
|
}
|
||||||
|
|
||||||
|
return &LightRAGEnricher{
|
||||||
|
client: client,
|
||||||
|
defaultMode: mode,
|
||||||
|
enabled: true,
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnrichContextNode enriches a ContextNode with LightRAG data
|
||||||
|
// This queries LightRAG for relevant information and adds it to the node's insights
|
||||||
|
func (e *LightRAGEnricher) EnrichContextNode(ctx context.Context, node *ContextNode) error {
|
||||||
|
if !e.enabled || e.client == nil {
|
||||||
|
return nil // No-op if not enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build query from node information
|
||||||
|
query := e.buildQuery(node)
|
||||||
|
if query == "" {
|
||||||
|
return nil // Nothing to query
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query LightRAG for context
|
||||||
|
ragContext, err := e.client.GetContext(ctx, query, e.defaultMode)
|
||||||
|
if err != nil {
|
||||||
|
// Non-fatal - just log and continue
|
||||||
|
return fmt.Errorf("lightrag query failed (non-fatal): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add RAG context to insights if we got meaningful data
|
||||||
|
if strings.TrimSpace(ragContext) != "" {
|
||||||
|
insight := fmt.Sprintf("RAG Context: %s", strings.TrimSpace(ragContext))
|
||||||
|
node.Insights = append(node.Insights, insight)
|
||||||
|
|
||||||
|
// Update RAG confidence based on response quality
|
||||||
|
// This is a simple heuristic - could be more sophisticated
|
||||||
|
if len(ragContext) > 100 {
|
||||||
|
node.RAGConfidence = 0.8
|
||||||
|
} else if len(ragContext) > 50 {
|
||||||
|
node.RAGConfidence = 0.6
|
||||||
|
} else {
|
||||||
|
node.RAGConfidence = 0.4
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnrichResolvedContext enriches a ResolvedContext with LightRAG data
|
||||||
|
// This is called after context resolution to add additional RAG-retrieved insights
|
||||||
|
func (e *LightRAGEnricher) EnrichResolvedContext(ctx context.Context, resolved *ResolvedContext) error {
|
||||||
|
if !e.enabled || e.client == nil {
|
||||||
|
return nil // No-op if not enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build query from resolved context
|
||||||
|
query := fmt.Sprintf("Purpose: %s\nSummary: %s\nTechnologies: %s",
|
||||||
|
resolved.Purpose,
|
||||||
|
resolved.Summary,
|
||||||
|
strings.Join(resolved.Technologies, ", "))
|
||||||
|
|
||||||
|
// Query LightRAG
|
||||||
|
ragContext, err := e.client.GetContext(ctx, query, e.defaultMode)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("lightrag query failed (non-fatal): %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Add to insights if meaningful
|
||||||
|
if strings.TrimSpace(ragContext) != "" {
|
||||||
|
insight := fmt.Sprintf("RAG Enhancement: %s", strings.TrimSpace(ragContext))
|
||||||
|
resolved.Insights = append(resolved.Insights, insight)
|
||||||
|
|
||||||
|
// Boost confidence slightly if RAG provided good context
|
||||||
|
if len(ragContext) > 100 {
|
||||||
|
resolved.ResolutionConfidence = min(1.0, resolved.ResolutionConfidence*1.1)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// EnrichBatchResolution enriches a batch resolution with LightRAG data
|
||||||
|
// Efficiently processes multiple addresses by batching queries where possible
|
||||||
|
func (e *LightRAGEnricher) EnrichBatchResolution(ctx context.Context, batch *BatchResolutionResult) error {
|
||||||
|
if !e.enabled || e.client == nil {
|
||||||
|
return nil // No-op if not enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
// Enrich each resolved context
|
||||||
|
for _, resolved := range batch.Results {
|
||||||
|
if err := e.EnrichResolvedContext(ctx, resolved); err != nil {
|
||||||
|
// Log error but continue with other contexts
|
||||||
|
// Errors are non-fatal for enrichment
|
||||||
|
continue
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// InsertContextNode inserts a context node into LightRAG for future retrieval
|
||||||
|
// This builds the knowledge base over time as contexts are created
|
||||||
|
func (e *LightRAGEnricher) InsertContextNode(ctx context.Context, node *ContextNode) error {
|
||||||
|
if !e.enabled || e.client == nil {
|
||||||
|
return nil // No-op if not enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build text representation of the context node
|
||||||
|
text := e.buildTextRepresentation(node)
|
||||||
|
description := fmt.Sprintf("Context for %s: %s", node.Path, node.Summary)
|
||||||
|
|
||||||
|
// Insert into LightRAG
|
||||||
|
if err := e.client.Insert(ctx, text, description); err != nil {
|
||||||
|
return fmt.Errorf("failed to insert context into lightrag: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// IsEnabled returns whether LightRAG enrichment is enabled
|
||||||
|
func (e *LightRAGEnricher) IsEnabled() bool {
|
||||||
|
return e.enabled
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildQuery constructs a search query from a ContextNode
|
||||||
|
func (e *LightRAGEnricher) buildQuery(node *ContextNode) string {
|
||||||
|
var parts []string
|
||||||
|
|
||||||
|
if node.Purpose != "" {
|
||||||
|
parts = append(parts, node.Purpose)
|
||||||
|
}
|
||||||
|
|
||||||
|
if node.Summary != "" {
|
||||||
|
parts = append(parts, node.Summary)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(node.Technologies) > 0 {
|
||||||
|
parts = append(parts, strings.Join(node.Technologies, " "))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(node.Tags) > 0 {
|
||||||
|
parts = append(parts, strings.Join(node.Tags, " "))
|
||||||
|
}
|
||||||
|
|
||||||
|
return strings.Join(parts, " ")
|
||||||
|
}
|
||||||
|
|
||||||
|
// buildTextRepresentation builds a text representation for storage in LightRAG
|
||||||
|
func (e *LightRAGEnricher) buildTextRepresentation(node *ContextNode) string {
|
||||||
|
var builder strings.Builder
|
||||||
|
|
||||||
|
builder.WriteString(fmt.Sprintf("Path: %s\n", node.Path))
|
||||||
|
builder.WriteString(fmt.Sprintf("UCXL Address: %s\n", node.UCXLAddress.String()))
|
||||||
|
builder.WriteString(fmt.Sprintf("Summary: %s\n", node.Summary))
|
||||||
|
builder.WriteString(fmt.Sprintf("Purpose: %s\n", node.Purpose))
|
||||||
|
|
||||||
|
if len(node.Technologies) > 0 {
|
||||||
|
builder.WriteString(fmt.Sprintf("Technologies: %s\n", strings.Join(node.Technologies, ", ")))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(node.Tags) > 0 {
|
||||||
|
builder.WriteString(fmt.Sprintf("Tags: %s\n", strings.Join(node.Tags, ", ")))
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(node.Insights) > 0 {
|
||||||
|
builder.WriteString("Insights:\n")
|
||||||
|
for _, insight := range node.Insights {
|
||||||
|
builder.WriteString(fmt.Sprintf(" - %s\n", insight))
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if node.Language != nil {
|
||||||
|
builder.WriteString(fmt.Sprintf("Language: %s\n", *node.Language))
|
||||||
|
}
|
||||||
|
|
||||||
|
return builder.String()
|
||||||
|
}
|
||||||
|
|
||||||
|
func min(a, b float64) float64 {
|
||||||
|
if a < b {
|
||||||
|
return a
|
||||||
|
}
|
||||||
|
return b
|
||||||
|
}
|
||||||
@@ -9,6 +9,8 @@ import (
|
|||||||
"net/http"
|
"net/http"
|
||||||
"strings"
|
"strings"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
|
"chorus/pkg/mcp"
|
||||||
)
|
)
|
||||||
|
|
||||||
const (
|
const (
|
||||||
@@ -23,6 +25,7 @@ var (
|
|||||||
aiProvider string = "resetdata" // Default provider
|
aiProvider string = "resetdata" // Default provider
|
||||||
resetdataConfig ResetDataConfig
|
resetdataConfig ResetDataConfig
|
||||||
defaultSystemPrompt string
|
defaultSystemPrompt string
|
||||||
|
lightragClient *mcp.LightRAGClient // Optional LightRAG client for context enrichment
|
||||||
)
|
)
|
||||||
|
|
||||||
// AIProvider represents the AI service provider
|
// AIProvider represents the AI service provider
|
||||||
@@ -242,6 +245,43 @@ func SetDefaultSystemPrompt(systemPrompt string) {
|
|||||||
defaultSystemPrompt = systemPrompt
|
defaultSystemPrompt = systemPrompt
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// SetLightRAGClient configures the optional LightRAG client for context enrichment
|
||||||
|
func SetLightRAGClient(client *mcp.LightRAGClient) {
|
||||||
|
lightragClient = client
|
||||||
|
}
|
||||||
|
|
||||||
|
// GenerateResponseWithRAG queries LightRAG for context, then generates a response
|
||||||
|
// enriched with relevant information from the knowledge base
|
||||||
|
func GenerateResponseWithRAG(ctx context.Context, model, prompt string, queryMode mcp.QueryMode) (string, error) {
|
||||||
|
// If LightRAG is not configured, fall back to regular generation
|
||||||
|
if lightragClient == nil {
|
||||||
|
return GenerateResponse(ctx, model, prompt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Query LightRAG for relevant context
|
||||||
|
ragCtx, err := lightragClient.GetContext(ctx, prompt, queryMode)
|
||||||
|
if err != nil {
|
||||||
|
// Log the error but continue with regular generation
|
||||||
|
// This makes LightRAG failures non-fatal
|
||||||
|
return GenerateResponse(ctx, model, prompt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// If we got context, enrich the prompt
|
||||||
|
enrichedPrompt := prompt
|
||||||
|
if strings.TrimSpace(ragCtx) != "" {
|
||||||
|
enrichedPrompt = fmt.Sprintf("Context from knowledge base:\n%s\n\nUser query:\n%s", ragCtx, prompt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Generate response with enriched context
|
||||||
|
return GenerateResponse(ctx, model, enrichedPrompt)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GenerateResponseSmartWithRAG combines smart model selection with RAG context enrichment
|
||||||
|
func GenerateResponseSmartWithRAG(ctx context.Context, prompt string, queryMode mcp.QueryMode) (string, error) {
|
||||||
|
selectedModel := selectBestModel(availableModels, prompt)
|
||||||
|
return GenerateResponseWithRAG(ctx, selectedModel, prompt, queryMode)
|
||||||
|
}
|
||||||
|
|
||||||
// selectBestModel calls the model selection webhook to choose the best model for a prompt
|
// selectBestModel calls the model selection webhook to choose the best model for a prompt
|
||||||
func selectBestModel(availableModels []string, prompt string) string {
|
func selectBestModel(availableModels []string, prompt string) string {
|
||||||
if modelWebhookURL == "" || len(availableModels) == 0 {
|
if modelWebhookURL == "" || len(availableModels) == 0 {
|
||||||
|
|||||||
Reference in New Issue
Block a user