diff --git a/Makefile b/Makefile index 756404f..5529643 100644 --- a/Makefile +++ b/Makefile @@ -1,11 +1,12 @@ # CHORUS Multi-Binary Makefile -# Builds both chorus-agent and chorus-hap binaries +# Builds chorus-agent, chorus-hap, and seqthink-wrapper binaries # Build configuration BINARY_NAME_AGENT = chorus-agent BINARY_NAME_HAP = chorus-hap BINARY_NAME_COMPAT = chorus -VERSION ?= 0.5.5 +BINARY_NAME_SEQTHINK = seqthink-wrapper +VERSION ?= 0.5.28 COMMIT_HASH ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown") BUILD_DATE ?= $(shell date -u '+%Y-%m-%d_%H:%M:%S') @@ -49,6 +50,14 @@ build-compat: go build $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_COMPAT) ./$(CMD_DIR)/chorus @echo "✅ Compatibility wrapper built: $(BUILD_DIR)/$(BINARY_NAME_COMPAT)" +# Build Sequential Thinking age-encrypted wrapper +.PHONY: build-seqthink +build-seqthink: + @echo "🔐 Building Sequential Thinking wrapper..." + @mkdir -p $(BUILD_DIR) + GOWORK=off go build -mod=mod $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_SEQTHINK) ./$(CMD_DIR)/seqthink-wrapper + @echo "✅ SeqThink wrapper built: $(BUILD_DIR)/$(BINARY_NAME_SEQTHINK)" + # Test compilation without building .PHONY: test-compile test-compile: @@ -103,8 +112,13 @@ docker-hap: @echo "🐳 Building Docker image for CHORUS HAP..." docker build -f docker/Dockerfile.hap -t chorus-hap:$(VERSION) . +.PHONY: docker-seqthink +docker-seqthink: + @echo "🔐 Building Docker image for Sequential Thinking wrapper..." + docker build -f deploy/seqthink/Dockerfile -t seqthink-wrapper:$(VERSION) . + .PHONY: docker -docker: docker-agent docker-hap +docker: docker-agent docker-hap docker-seqthink # Help .PHONY: help @@ -112,22 +126,24 @@ help: @echo "CHORUS Multi-Binary Build System" @echo "" @echo "Targets:" - @echo " all - Clean and build both binaries (default)" - @echo " build - Build both binaries" + @echo " all - Clean and build all binaries (default)" + @echo " build - Build all binaries" @echo " build-agent - Build autonomous agent binary only" @echo " build-hap - Build human agent portal binary only" - @echo " test-compile - Test that both binaries compile" + @echo " build-seqthink - Build Sequential Thinking wrapper only" + @echo " test-compile - Test that binaries compile" @echo " test - Run tests" @echo " clean - Remove build artifacts" @echo " install - Install binaries to GOPATH/bin" @echo " run-agent - Build and run agent" @echo " run-hap - Build and run HAP" - @echo " docker - Build Docker images for both binaries" + @echo " docker - Build Docker images for all binaries" @echo " docker-agent - Build Docker image for agent only" @echo " docker-hap - Build Docker image for HAP only" + @echo " docker-seqthink - Build Docker image for SeqThink wrapper only" @echo " help - Show this help" @echo "" @echo "Environment Variables:" - @echo " VERSION - Version string (default: 0.1.0-dev)" + @echo " VERSION - Version string (default: 0.5.28)" @echo " COMMIT_HASH - Git commit hash (auto-detected)" - @echo " BUILD_DATE - Build timestamp (auto-generated)" \ No newline at end of file + @echo " BUILD_DATE - Build timestamp (auto-generated)" diff --git a/cmd/seqthink-wrapper/main.go b/cmd/seqthink-wrapper/main.go new file mode 100644 index 0000000..6d206f3 --- /dev/null +++ b/cmd/seqthink-wrapper/main.go @@ -0,0 +1,173 @@ +package main + +import ( + "context" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "chorus/pkg/seqthink/mcpclient" + "chorus/pkg/seqthink/observability" + "chorus/pkg/seqthink/proxy" + "github.com/rs/zerolog/log" +) + +// Config holds the wrapper configuration +type Config struct { + Port string + MCPLocalURL string + LogLevel string + MaxBodyMB int + HealthTimeout time.Duration + ShutdownTimeout time.Duration + AgeIdentPath string + AgeRecipsPath string + KachingJWKSURL string + RequiredScope string +} + +func loadConfig() *Config { + return &Config{ + Port: getEnv("PORT", "8443"), + MCPLocalURL: getEnv("MCP_LOCAL", "http://127.0.0.1:8000"), + LogLevel: getEnv("LOG_LEVEL", "info"), + MaxBodyMB: getEnvInt("MAX_BODY_MB", 4), + HealthTimeout: 5 * time.Second, + ShutdownTimeout: 30 * time.Second, + AgeIdentPath: getEnv("AGE_IDENT_PATH", ""), + AgeRecipsPath: getEnv("AGE_RECIPS_PATH", ""), + KachingJWKSURL: getEnv("KACHING_JWKS_URL", ""), + RequiredScope: getEnv("REQUIRED_SCOPE", "sequentialthinking.run"), + } +} + +func main() { + cfg := loadConfig() + + // Initialize observability + observability.InitLogger(cfg.LogLevel) + metrics := observability.InitMetrics() + + log.Info(). + Str("port", cfg.Port). + Str("mcp_url", cfg.MCPLocalURL). + Str("version", "0.1.0-beta1"). + Msg("🚀 Starting Sequential Thinking Age Wrapper") + + // Create MCP client + mcpClient := mcpclient.New(cfg.MCPLocalURL) + + // Wait for MCP server to be ready + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + log.Info().Msg("⏳ Waiting for MCP server...") + if err := waitForMCP(ctx, mcpClient); err != nil { + log.Fatal().Err(err).Msg("❌ MCP server not ready") + } + + log.Info().Msg("✅ MCP server ready") + + // Create proxy server + proxyServer, err := proxy.NewServer(proxy.ServerConfig{ + MCPClient: mcpClient, + Metrics: metrics, + MaxBodyMB: cfg.MaxBodyMB, + AgeIdentPath: cfg.AgeIdentPath, + AgeRecipsPath: cfg.AgeRecipsPath, + KachingJWKSURL: cfg.KachingJWKSURL, + RequiredScope: cfg.RequiredScope, + }) + + if err != nil { + log.Fatal().Err(err).Msg("❌ Failed to create proxy server") + } + + // Setup HTTP server + srv := &http.Server{ + Addr: ":" + cfg.Port, + Handler: proxyServer.Handler(), + ReadTimeout: 30 * time.Second, + WriteTimeout: 90 * time.Second, + IdleTimeout: 120 * time.Second, + } + + // Start server in goroutine + go func() { + log.Info(). + Str("addr", srv.Addr). + Bool("encryption_enabled", cfg.AgeIdentPath != ""). + Bool("policy_enabled", cfg.KachingJWKSURL != ""). + Msg("🔐 Wrapper listening") + + if err := srv.ListenAndServe(); err != http.ErrServerClosed { + log.Fatal().Err(err).Msg("❌ HTTP server failed") + } + }() + + // Wait for shutdown signal + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + log.Info().Msg("🛑 Shutting down gracefully...") + + // Graceful shutdown + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), cfg.ShutdownTimeout) + defer shutdownCancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Error().Err(err).Msg("⚠️ Shutdown error") + } + + log.Info().Msg("✅ Shutdown complete") +} + +// waitForMCP waits for MCP server to be ready +func waitForMCP(ctx context.Context, client *mcpclient.Client) error { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for MCP server") + case <-ticker.C: + if err := client.Health(ctx); err == nil { + return nil + } + log.Debug().Msg("Waiting for MCP server...") + } + } +} + +// getEnv gets environment variable with default +func getEnv(key, defaultVal string) string { + if val := os.Getenv(key); val != "" { + return val + } + return defaultVal +} + +// getEnvInt gets environment variable as int with default +func getEnvInt(key string, defaultVal int) int { + val := os.Getenv(key) + if val == "" { + return defaultVal + } + + var result int + if _, err := fmt.Sscanf(val, "%d", &result); err != nil { + log.Warn(). + Str("key", key). + Str("value", val). + Int("default", defaultVal). + Msg("Invalid integer env var, using default") + return defaultVal + } + + return result +} diff --git a/deploy/seqthink/Dockerfile b/deploy/seqthink/Dockerfile new file mode 100644 index 0000000..f66c01f --- /dev/null +++ b/deploy/seqthink/Dockerfile @@ -0,0 +1,86 @@ +# Sequential Thinking Age-Encrypted Wrapper +# Beat 1: Plaintext skeleton - encryption added in Beat 2 + +# Stage 1: Build Go wrapper +FROM golang:1.23-alpine AS go-builder + +WORKDIR /build + +# Install build dependencies +RUN apk add --no-cache git make + +# Copy go mod files +COPY go.mod go.sum ./ +RUN go mod download + +# Copy source code +COPY . . + +# Build the wrapper binary +RUN CGO_ENABLED=0 GOOS=linux go build -a -installsuffix cgo \ + -ldflags '-w -s -extldflags "-static"' \ + -o seqthink-wrapper \ + ./cmd/seqthink-wrapper + +# Stage 2: Build Python MCP server +FROM python:3.11-slim AS python-builder + +WORKDIR /mcp + +# Install Sequential Thinking MCP server dependencies +# Note: For Beat 1, we'll use a minimal Python HTTP server +# Full MCP server integration happens in later beats +RUN pip install --no-cache-dir \ + fastapi==0.109.0 \ + uvicorn[standard]==0.27.0 \ + pydantic==2.5.3 + +# Copy MCP server stub (to be replaced with real implementation) +COPY deploy/seqthink/mcp_stub.py /mcp/server.py + +# Stage 3: Runtime +FROM debian:bookworm-slim + +# Install runtime dependencies +RUN apt-get update && \ + apt-get install -y --no-install-recommends \ + ca-certificates \ + python3 \ + python3-pip && \ + apt-get clean && \ + rm -rf /var/lib/apt/lists/* + +# Install Python packages in runtime +RUN pip3 install --no-cache-dir --break-system-packages \ + fastapi==0.109.0 \ + uvicorn[standard]==0.27.0 \ + pydantic==2.5.3 + +# Create non-root user +RUN useradd -r -u 1000 -m -s /bin/bash seqthink + +# Copy binaries +COPY --from=go-builder /build/seqthink-wrapper /usr/local/bin/ +COPY --from=python-builder /mcp/server.py /opt/mcp/server.py + +# Copy entrypoint +COPY deploy/seqthink/entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +# Setup directories +RUN mkdir -p /etc/seqthink /var/log/seqthink && \ + chown -R seqthink:seqthink /etc/seqthink /var/log/seqthink + +# Switch to non-root user +USER seqthink +WORKDIR /home/seqthink + +# Health check +HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \ + CMD curl -f http://localhost:8443/health || exit 1 + +# Expose wrapper port (MCP server on 127.0.0.1:8000 is internal only) +EXPOSE 8443 + +# Run entrypoint +ENTRYPOINT ["/entrypoint.sh"] diff --git a/deploy/seqthink/entrypoint.sh b/deploy/seqthink/entrypoint.sh new file mode 100644 index 0000000..34a1840 --- /dev/null +++ b/deploy/seqthink/entrypoint.sh @@ -0,0 +1,27 @@ +#!/bin/bash +set -e + +echo "🚀 Starting Sequential Thinking Age Wrapper (Beat 1)" + +# Start MCP server on loopback +echo "📡 Starting MCP server on 127.0.0.1:8000..." +python3 /opt/mcp/server.py & +MCP_PID=$! + +# Wait for MCP server to be ready +echo "⏳ Waiting for MCP server to be ready..." +for i in {1..30}; do + if curl -sf http://127.0.0.1:8000/health > /dev/null 2>&1; then + echo "✅ MCP server ready" + break + fi + if [ $i -eq 30 ]; then + echo "❌ MCP server failed to start" + exit 1 + fi + sleep 1 +done + +# Start wrapper +echo "🔐 Starting wrapper on :8443..." +exec seqthink-wrapper diff --git a/deploy/seqthink/mcp_stub.py b/deploy/seqthink/mcp_stub.py new file mode 100644 index 0000000..931e9a8 --- /dev/null +++ b/deploy/seqthink/mcp_stub.py @@ -0,0 +1,70 @@ +#!/usr/bin/env python3 +""" +Sequential Thinking MCP Server Stub (Beat 1) + +This is a minimal implementation for testing the wrapper infrastructure. +In later beats, this will be replaced with the full Sequential Thinking MCP server. +""" + +from fastapi import FastAPI, HTTPException +from pydantic import BaseModel +from typing import Dict, Any, Optional +import uvicorn + +app = FastAPI(title="Sequential Thinking MCP Server Stub") + + +class ToolRequest(BaseModel): + tool: str + payload: Dict[str, Any] + + +class ToolResponse(BaseModel): + result: Optional[Any] = None + error: Optional[str] = None + + +@app.get("/health") +async def health(): + """Health check endpoint""" + return {"status": "ok"} + + +@app.post("/mcp/tool") +async def call_tool(request: ToolRequest) -> ToolResponse: + """ + Tool call endpoint - stub implementation + + In Beat 1, this just echoes back the request to verify the wrapper works. + Later beats will implement the actual Sequential Thinking logic. + """ + if request.tool != "mcp__sequential-thinking__sequentialthinking": + return ToolResponse( + error=f"Unknown tool: {request.tool}" + ) + + # Stub response for Sequential Thinking tool + payload = request.payload + thought_number = payload.get("thoughtNumber", 1) + total_thoughts = payload.get("totalThoughts", 5) + thought = payload.get("thought", "") + next_thought_needed = payload.get("nextThoughtNeeded", True) + + return ToolResponse( + result={ + "thoughtNumber": thought_number, + "totalThoughts": total_thoughts, + "thought": thought, + "nextThoughtNeeded": next_thought_needed, + "message": "Beat 1 stub - Sequential Thinking not yet implemented" + } + ) + + +if __name__ == "__main__": + uvicorn.run( + app, + host="127.0.0.1", + port=8000, + log_level="info" + ) diff --git a/docs/SEQTHINK-AGE-WRAPPER-IMPLEMENTATION.md b/docs/SEQTHINK-AGE-WRAPPER-IMPLEMENTATION.md new file mode 100644 index 0000000..b83d445 --- /dev/null +++ b/docs/SEQTHINK-AGE-WRAPPER-IMPLEMENTATION.md @@ -0,0 +1,1090 @@ +# Sequential Thinking Age-Encrypted Wrapper - Implementation Guide + +**Date**: 2025-10-13 +**Status**: Ready for Implementation +**Project**: SequentialThinkingForCHORUS +**Priority**: High - Blocking agent intelligence improvements + +--- + +## Executive Summary + +This document provides the **complete implementation plan** for the age-encrypted Sequential Thinking MCP wrapper as specified in the SequentialThinkingForCHORUS issue #1. This wrapper enables secure, production-grade deployment of Sequential Thinking capabilities while maintaining confidentiality through end-to-end encryption. + +**Two-Part Solution**: +1. **Part A**: Build the `seqthink-age` container (age-encrypted wrapper) +2. **Part B**: Integrate CHORUS agents with the encrypted wrapper + +--- + +## Part A: seqthink-age Container Implementation + +### Repository Structure + +``` +seqthink-age/ +├── cmd/ +│ └── agewrap/ +│ └── main.go # Wrapper entry point +├── pkg/ +│ ├── ageio/ +│ │ ├── crypto.go # SealFrame/OpenFrame + gzip +│ │ ├── crypto_test.go +│ │ └── testdata/ # Golden test vectors +│ ├── policy/ +│ │ ├── kaching.go # JWT verification +│ │ ├── scope.go # Scope checking +│ │ └── policy_test.go +│ ├── proxy/ +│ │ ├── handlers.go # HTTP handlers +│ │ ├── sse.go # SSE streaming +│ │ ├── health.go # Health checks +│ │ └── proxy_test.go +│ ├── mcpclient/ +│ │ ├── client.go # Loopback MCP caller +│ │ └── client_test.go +│ └── observability/ +│ ├── metrics.go # Prometheus metrics +│ ├── logging.go # Structured logging +│ └── pprof.go # Optional profiling +├── deploy/ +│ ├── Dockerfile # Multi-stage build +│ ├── entrypoint.sh # Process orchestration +│ └── compose.example.yml # Local dev stack +├── secrets/ # .gitignored +│ ├── age_identity.key.example +│ └── age_recipients.txt.example +├── test/ +│ ├── integration/ +│ │ └── e2e_test.go +│ └── fixtures/ +│ └── jwt_samples.json +├── docs/ +│ ├── API.md # API documentation +│ ├── SECURITY.md # Security model +│ └── WHOOSH_INTEGRATION.md # WHOOSH client guide +├── .github/ +│ └── workflows/ +│ └── ci.yml # Build, test, sign, push +├── Makefile +├── go.mod +├── go.sum +├── README.md +└── LICENSE +``` + +--- + +## Beat 1: Skeleton & Plaintext Correct + +### Implementation Tasks + +#### 1.1 Repository Setup + +```bash +# Create repository structure +mkdir -p seqthink-age/{cmd/agewrap,pkg/{ageio,policy,proxy,mcpclient,observability},deploy,test/{integration,fixtures},docs,.github/workflows} + +# Initialize Go module +cd seqthink-age +go mod init github.com/chorus-services/seqthink-age + +# Add dependencies +go get github.com/gorilla/mux +go get github.com/rs/zerolog +go get github.com/prometheus/client_golang/prometheus +go get filippo.io/age +go get github.com/golang-jwt/jwt/v5 +``` + +#### 1.2 Main Entry Point + +**File**: `cmd/agewrap/main.go` + +```go +package main + +import ( + "context" + "fmt" + "net/http" + "os" + "os/signal" + "syscall" + "time" + + "github.com/chorus-services/seqthink-age/pkg/mcpclient" + "github.com/chorus-services/seqthink-age/pkg/proxy" + "github.com/chorus-services/seqthink-age/pkg/observability" + "github.com/rs/zerolog/log" +) + +type Config struct { + Port string + MCPLocalURL string + LogLevel string + MaxBodyMB int + HealthTimeout time.Duration + ShutdownTimeout time.Duration +} + +func loadConfig() *Config { + return &Config{ + Port: getEnv("PORT", "8443"), + MCPLocalURL: getEnv("MCP_LOCAL", "http://127.0.0.1:8000"), + LogLevel: getEnv("LOG_LEVEL", "info"), + MaxBodyMB: getEnvInt("MAX_BODY_MB", 4), + HealthTimeout: 5 * time.Second, + ShutdownTimeout: 30 * time.Second, + } +} + +func main() { + cfg := loadConfig() + + // Initialize observability + observability.InitLogger(cfg.LogLevel) + metrics := observability.InitMetrics() + + log.Info(). + Str("port", cfg.Port). + Str("mcp_url", cfg.MCPLocalURL). + Msg("Starting seqthink-age wrapper") + + // Create MCP client + mcpClient := mcpclient.New(cfg.MCPLocalURL) + + // Wait for MCP server to be ready + ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := waitForMCP(ctx, mcpClient); err != nil { + log.Fatal().Err(err).Msg("MCP server not ready") + } + + log.Info().Msg("✅ MCP server ready") + + // Create proxy server + proxyServer := proxy.NewServer(proxy.ServerConfig{ + MCPClient: mcpClient, + Metrics: metrics, + MaxBodyMB: cfg.MaxBodyMB, + }) + + // Setup HTTP server + srv := &http.Server{ + Addr: ":" + cfg.Port, + Handler: proxyServer.Handler(), + ReadTimeout: 30 * time.Second, + WriteTimeout: 90 * time.Second, + IdleTimeout: 120 * time.Second, + } + + // Start server in goroutine + go func() { + log.Info().Str("addr", srv.Addr).Msg("🚀 Wrapper listening") + if err := srv.ListenAndServe(); err != http.ErrServerClosed { + log.Fatal().Err(err).Msg("HTTP server failed") + } + }() + + // Wait for shutdown signal + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + <-sigChan + + log.Info().Msg("🛑 Shutting down gracefully...") + + // Graceful shutdown + shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), cfg.ShutdownTimeout) + defer shutdownCancel() + + if err := srv.Shutdown(shutdownCtx); err != nil { + log.Error().Err(err).Msg("Shutdown error") + } + + log.Info().Msg("✅ Shutdown complete") +} + +func waitForMCP(ctx context.Context, client *mcpclient.Client) error { + ticker := time.NewTicker(1 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ctx.Done(): + return fmt.Errorf("timeout waiting for MCP server") + case <-ticker.C: + if err := client.Health(ctx); err == nil { + return nil + } + log.Debug().Msg("Waiting for MCP server...") + } + } +} + +func getEnv(key, defaultVal string) string { + if val := os.Getenv(key); val != "" { + return val + } + return defaultVal +} + +func getEnvInt(key string, defaultVal int) int { + // Implementation + return defaultVal +} +``` + +#### 1.3 MCP Client + +**File**: `pkg/mcpclient/client.go` + +```go +package mcpclient + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" +) + +type Client struct { + baseURL string + httpClient *http.Client +} + +type ToolRequest struct { + Tool string `json:"tool"` + Payload map[string]interface{} `json:"payload"` +} + +type ToolResponse struct { + Result interface{} `json:"result"` + Error string `json:"error,omitempty"` +} + +func New(baseURL string) *Client { + return &Client{ + baseURL: baseURL, + httpClient: &http.Client{ + Timeout: 60 * time.Second, + }, + } +} + +func (c *Client) Health(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/health", nil) + if err != nil { + return err + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return err + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("health check failed: status %d", resp.StatusCode) + } + + return nil +} + +func (c *Client) CallTool(ctx context.Context, req *ToolRequest) (*ToolResponse, error) { + jsonData, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+"/mcp/tool", bytes.NewReader(jsonData)) + if err != nil { + return nil, err + } + + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("http request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("tool call failed: status %d, body: %s", resp.StatusCode, string(body)) + } + + var toolResp ToolResponse + if err := json.Unmarshal(body, &toolResp); err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } + + return &toolResp, nil +} +``` + +#### 1.4 Proxy Handlers (Plaintext Phase) + +**File**: `pkg/proxy/server.go` + +```go +package proxy + +import ( + "encoding/json" + "io" + "net/http" + + "github.com/chorus-services/seqthink-age/pkg/mcpclient" + "github.com/chorus-services/seqthink-age/pkg/observability" + "github.com/gorilla/mux" + "github.com/rs/zerolog/log" +) + +type ServerConfig struct { + MCPClient *mcpclient.Client + Metrics *observability.Metrics + MaxBodyMB int +} + +type Server struct { + config ServerConfig + router *mux.Router +} + +func NewServer(cfg ServerConfig) *Server { + s := &Server{ + config: cfg, + router: mux.NewRouter(), + } + + s.setupRoutes() + return s +} + +func (s *Server) Handler() http.Handler { + return s.router +} + +func (s *Server) setupRoutes() { + // Health endpoints + s.router.HandleFunc("/health", s.handleHealth).Methods("GET") + s.router.HandleFunc("/ready", s.handleReady).Methods("GET") + + // MCP proxy endpoints + s.router.HandleFunc("/mcp/tool", s.handleToolCall).Methods("POST") + s.router.HandleFunc("/mcp/sse", s.handleSSE).Methods("GET") + + // Metrics (optional) + if s.config.Metrics != nil { + s.router.Handle("/metrics", s.config.Metrics.Handler()) + } +} + +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Check MCP server health + if err := s.config.MCPClient.Health(ctx); err != nil { + log.Warn().Err(err).Msg("MCP health check failed") + http.Error(w, "MCP server unhealthy", http.StatusServiceUnavailable) + return + } + + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ok"}) +} + +func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(map[string]string{"status": "ready"}) +} + +func (s *Server) handleToolCall(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Limit body size + r.Body = http.MaxBytesReader(w, r.Body, int64(s.config.MaxBodyMB*1024*1024)) + + // Read request body + body, err := io.ReadAll(r.Body) + if err != nil { + log.Warn().Err(err).Msg("Failed to read request body") + http.Error(w, "Invalid request body", http.StatusBadRequest) + return + } + + // Parse as MCP tool request + var toolReq mcpclient.ToolRequest + if err := json.Unmarshal(body, &toolReq); err != nil { + log.Warn().Err(err).Msg("Failed to parse tool request") + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Call MCP server + resp, err := s.config.MCPClient.CallTool(ctx, &toolReq) + if err != nil { + log.Error().Err(err).Msg("MCP tool call failed") + http.Error(w, "Tool execution failed", http.StatusInternalServerError) + return + } + + // Return response + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(resp) +} + +func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { + // SSE implementation (placeholder for Beat 1) + http.Error(w, "SSE not yet implemented", http.StatusNotImplemented) +} +``` + +#### 1.5 Dockerfile & Entrypoint + +**File**: `deploy/Dockerfile` + +```dockerfile +# Build stage +FROM golang:1.22-alpine AS builder + +WORKDIR /build + +# Copy go mod files +COPY go.mod go.sum ./ +RUN go mod download + +# Copy source +COPY . . + +# Build wrapper +RUN CGO_ENABLED=0 GOOS=linux GOARCH=amd64 go build \ + -ldflags="-w -s" \ + -o agewrap \ + ./cmd/agewrap + +# Runtime stage +FROM debian:stable-slim + +# Install MCP server (placeholder - adjust based on actual MCP distribution) +RUN apt-get update && apt-get install -y \ + ca-certificates \ + curl \ + && rm -rf /var/lib/apt/lists/* + +# Copy wrapper binary +COPY --from=builder /build/agewrap /usr/local/bin/agewrap + +# Copy entrypoint +COPY deploy/entrypoint.sh /entrypoint.sh +RUN chmod +x /entrypoint.sh + +# Create non-root user +RUN groupadd -g 1000 seqthink && \ + useradd -u 1000 -g seqthink -s /bin/bash -d /home/seqthink -m seqthink + +# Runtime directories +RUN mkdir -p /run/secrets /tmp/seqthink && \ + chown -R seqthink:seqthink /tmp/seqthink + +USER seqthink + +EXPOSE 8443 + +ENTRYPOINT ["/entrypoint.sh"] +``` + +**File**: `deploy/entrypoint.sh` + +```bash +#!/bin/bash +set -euo pipefail + +echo "🚀 Starting Sequential Thinking Wrapper" + +# Start MCP server on loopback +echo "📡 Starting MCP server on 127.0.0.1:8000..." +sequentialthinking --transport=http --port=8000 --bind=127.0.0.1 & +MCP_PID=$! + +# Wait for MCP to be ready +echo "⏳ Waiting for MCP server..." +for i in {1..30}; do + if curl -sf http://127.0.0.1:8000/health > /dev/null 2>&1; then + echo "✅ MCP server ready" + break + fi + if [ $i -eq 30 ]; then + echo "❌ MCP server failed to start" + exit 1 + fi + sleep 1 +done + +# Start age wrapper +echo "🔐 Starting age wrapper on :${PORT:-8443}..." +exec agewrap +``` + +#### 1.6 Makefile + +**File**: `Makefile` + +```makefile +.PHONY: all build test lint clean image + +# Variables +IMAGE_NAME ?= anthonyrawlins/seqthink-age +IMAGE_TAG ?= latest +GOBIN ?= $(shell go env GOPATH)/bin + +all: lint test build + +build: + @echo "Building agewrap..." + go build -o bin/agewrap ./cmd/agewrap + +test: + @echo "Running tests..." + go test -v -race -cover ./... + +lint: + @echo "Running linters..." + golangci-lint run ./... + +clean: + @echo "Cleaning..." + rm -rf bin/ + +image: + @echo "Building Docker image..." + docker build -f deploy/Dockerfile -t $(IMAGE_NAME):$(IMAGE_TAG) . + +image-push: + @echo "Pushing image..." + docker push $(IMAGE_NAME):$(IMAGE_TAG) + +run-local: + @echo "Running locally..." + MCP_LOCAL=http://127.0.0.1:8000 PORT=8443 go run ./cmd/agewrap + +compose-up: + docker-compose -f deploy/compose.example.yml up --build + +compose-down: + docker-compose -f deploy/compose.example.yml down +``` + +### Beat 1 Acceptance Criteria + +- [ ] `make build` compiles successfully +- [ ] `docker build` creates image +- [ ] Container starts MCP server on loopback +- [ ] Container starts wrapper on :8443 +- [ ] `/health` endpoint returns 200 +- [ ] POST to `/mcp/tool` with plaintext JSON returns tool result +- [ ] Logs are structured JSON +- [ ] Graceful shutdown works + +--- + +## Beat 2: Crypto Envelope (Age + Gzip) + +### Implementation Tasks + +#### 2.1 Age Crypto Implementation + +**File**: `pkg/ageio/crypto.go` + +```go +package ageio + +import ( + "bytes" + "compress/gzip" + "fmt" + "io" + + "filippo.io/age" +) + +// SealFrame encrypts and compresses plaintext +func SealFrame(plaintext []byte, recipients []age.Recipient) ([]byte, error) { + // 1. Gzip compress + var compressed bytes.Buffer + gzipWriter := gzip.NewWriter(&compressed) + if _, err := gzipWriter.Write(plaintext); err != nil { + return nil, fmt.Errorf("gzip write: %w", err) + } + if err := gzipWriter.Close(); err != nil { + return nil, fmt.Errorf("gzip close: %w", err) + } + + // 2. Age encrypt + var encrypted bytes.Buffer + encWriter, err := age.Encrypt(&encrypted, recipients...) + if err != nil { + return nil, fmt.Errorf("age encrypt: %w", err) + } + + if _, err := encWriter.Write(compressed.Bytes()); err != nil { + return nil, fmt.Errorf("encrypt write: %w", err) + } + + if err := encWriter.Close(); err != nil { + return nil, fmt.Errorf("encrypt close: %w", err) + } + + return encrypted.Bytes(), nil +} + +// OpenFrame decrypts and decompresses ciphertext +func OpenFrame(ciphertext []byte, identities []age.Identity) ([]byte, error) { + // 1. Age decrypt + decReader, err := age.Decrypt(bytes.NewReader(ciphertext), identities...) + if err != nil { + return nil, fmt.Errorf("age decrypt: %w", err) + } + + // 2. Gunzip decompress + gzipReader, err := gzip.NewReader(decReader) + if err != nil { + return nil, fmt.Errorf("gzip reader: %w", err) + } + defer gzipReader.Close() + + plaintext, err := io.ReadAll(gzipReader) + if err != nil { + return nil, fmt.Errorf("decompress: %w", err) + } + + return plaintext, nil +} +``` + +**File**: `pkg/ageio/keys.go` + +```go +package ageio + +import ( + "bufio" + "fmt" + "os" + "strings" + + "filippo.io/age" +) + +// LoadIdentities loads age identities from file +func LoadIdentities(path string) ([]age.Identity, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("open identity file: %w", err) + } + defer f.Close() + + return age.ParseIdentities(f) +} + +// LoadRecipients loads age recipients from file +func LoadRecipients(path string) ([]age.Recipient, error) { + f, err := os.Open(path) + if err != nil { + return nil, fmt.Errorf("open recipients file: %w", err) + } + defer f.Close() + + var recipients []age.Recipient + scanner := bufio.NewScanner(f) + for scanner.Scan() { + line := strings.TrimSpace(scanner.Text()) + if line == "" || strings.HasPrefix(line, "#") { + continue + } + + recipient, err := age.ParseX25519Recipient(line) + if err != nil { + return nil, fmt.Errorf("parse recipient: %w", err) + } + + recipients = append(recipients, recipient) + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("scan recipients: %w", err) + } + + return recipients, nil +} +``` + +#### 2.2 Update Proxy for Encryption + +**File**: `pkg/proxy/server.go` (additions) + +```go +type Server struct { + config ServerConfig + router *mux.Router + identities []age.Identity // NEW + recipients []age.Recipient // NEW +} + +func NewServer(cfg ServerConfig) (*Server, error) { + s := &Server{ + config: cfg, + router: mux.NewRouter(), + } + + // Load age keys if configured + if cfg.AgeIdentPath != "" { + identities, err := ageio.LoadIdentities(cfg.AgeIdentPath) + if err != nil { + return nil, fmt.Errorf("load identities: %w", err) + } + s.identities = identities + } + + if cfg.AgeRecipsPath != "" { + recipients, err := ageio.LoadRecipients(cfg.AgeRecipsPath) + if err != nil { + return nil, fmt.Errorf("load recipients: %w", err) + } + s.recipients = recipients + } + + s.setupRoutes() + return s, nil +} + +func (s *Server) handleToolCall(w http.ResponseWriter, r *http.Request) { + ctx := r.Context() + + // Check content type if encryption is enabled + if s.identities != nil { + contentType := r.Header.Get("Content-Type") + if contentType != "application/age" { + http.Error(w, "Content-Type must be application/age", http.StatusUnsupportedMediaType) + return + } + } + + // Read encrypted body + r.Body = http.MaxBytesReader(w, r.Body, int64(s.config.MaxBodyMB*1024*1024)) + encryptedBody, err := io.ReadAll(r.Body) + if err != nil { + http.Error(w, "Failed to read body", http.StatusBadRequest) + return + } + + // Decrypt if encryption enabled + var plaintext []byte + if s.identities != nil { + plaintext, err = ageio.OpenFrame(encryptedBody, s.identities) + if err != nil { + log.Error().Err(err).Msg("Decryption failed") + s.config.Metrics.RecordDecryptFailure() + http.Error(w, "Decryption failed", http.StatusBadRequest) + return + } + } else { + plaintext = encryptedBody + } + + // Parse tool request + var toolReq mcpclient.ToolRequest + if err := json.Unmarshal(plaintext, &toolReq); err != nil { + http.Error(w, "Invalid JSON", http.StatusBadRequest) + return + } + + // Call MCP + resp, err := s.config.MCPClient.CallTool(ctx, &toolReq) + if err != nil { + http.Error(w, "Tool execution failed", http.StatusInternalServerError) + return + } + + // Marshal response + responseJSON, err := json.Marshal(resp) + if err != nil { + http.Error(w, "Failed to marshal response", http.StatusInternalServerError) + return + } + + // Encrypt response if encryption enabled + var responseBody []byte + if s.recipients != nil { + responseBody, err = ageio.SealFrame(responseJSON, s.recipients) + if err != nil { + log.Error().Err(err).Msg("Encryption failed") + http.Error(w, "Encryption failed", http.StatusInternalServerError) + return + } + w.Header().Set("Content-Type", "application/age") + } else { + responseBody = responseJSON + w.Header().Set("Content-Type", "application/json") + } + + w.WriteHeader(http.StatusOK) + w.Write(responseBody) +} +``` + +### Beat 2 Acceptance Criteria + +- [ ] Age encrypt/decrypt golden tests pass +- [ ] Wrapper enforces `Content-Type: application/age` +- [ ] End-to-end: encrypted POST → encrypted response +- [ ] Decryption failures return 400 with proper logging +- [ ] Metrics track decrypt successes/failures + +--- + +*[Continuing with Beats 3-6 would follow similar detailed patterns...]* + +--- + +## Part B: CHORUS Agent Integration + +### B.1 CHORUS AI Provider Extension + +**File**: `pkg/ai/seqthink_provider.go` + +```go +package ai + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "filippo.io/age" +) + +// SeqThinkProvider wraps age-encrypted Sequential Thinking +type SeqThinkProvider struct { + config ProviderConfig + httpClient *http.Client + recipients []age.Recipient + identities []age.Identity + jwt string // KACHING JWT for policy +} + +type SeqThinkRequest struct { + Tool string `json:"tool"` + Payload SeqThinkPayload `json:"payload"` + Policy SeqThinkPolicy `json:"policy"` + TS time.Time `json:"ts"` + Nonce string `json:"nonce"` +} + +type SeqThinkPayload struct { + Objective string `json:"objective"` + MaxDepth int `json:"max_depth"` + Reflect bool `json:"reflect"` +} + +type SeqThinkPolicy struct { + Caller string `json:"caller"` + License string `json:"license"` // KACHING JWT + Scope []string `json:"scope"` +} + +func (p *SeqThinkProvider) ExecuteTask(ctx context.Context, request *TaskRequest) (*TaskResponse, error) { + startTime := time.Now() + + // Build Sequential Thinking request + seqReq := SeqThinkRequest{ + Tool: "sequentialthinking", + Payload: SeqThinkPayload{ + Objective: p.formatTaskAsObjective(request), + MaxDepth: 10, + Reflect: true, + }, + Policy: SeqThinkPolicy{ + Caller: request.AgentID, + License: p.jwt, + Scope: []string{"sequentialthinking.run"}, + }, + TS: time.Now(), + Nonce: generateNonce(), + } + + // Marshal to JSON + plaintext, err := json.Marshal(seqReq) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + + // Encrypt with age + encrypted, err := ageio.SealFrame(plaintext, p.recipients) + if err != nil { + return nil, fmt.Errorf("encrypt request: %w", err) + } + + // POST to wrapper + httpReq, err := http.NewRequestWithContext(ctx, "POST", p.config.Endpoint+"/mcp/tool", bytes.NewReader(encrypted)) + if err != nil { + return nil, err + } + + httpReq.Header.Set("Content-Type", "application/age") + + resp, err := p.httpClient.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("http request: %w", err) + } + defer resp.Body.Close() + + // Read encrypted response + encryptedResp, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response: %w", err) + } + + // Decrypt response + decrypted, err := ageio.OpenFrame(encryptedResp, p.identities) + if err != nil { + return nil, fmt.Errorf("decrypt response: %w", err) + } + + // Parse thinking result + var thinkingResult struct { + Thoughts []ThoughtStep `json:"thoughts"` + Conclusion string `json:"conclusion"` + } + + if err := json.Unmarshal(decrypted, &thinkingResult); err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } + + // Convert to TaskResponse with structured reasoning + return &TaskResponse{ + Success: true, + TaskID: request.TaskID, + AgentID: request.AgentID, + Provider: "seqthink", + Response: thinkingResult.Conclusion, + StructuredReasoning: &StructuredReasoning{ + Thoughts: thinkingResult.Thoughts, + Confidence: 0.9, // Extract from thinking result + }, + StartTime: startTime, + EndTime: time.Now(), + Duration: time.Since(startTime), + }, nil +} +``` + +### B.2 Configuration + +**File**: `config/agent.yaml` (add seqthink provider) + +```yaml +ai_providers: + # Existing providers... + + seqthink: + type: "seqthink" + endpoint: "http://seqthink-age:8443" + + # Age encryption + age_ident_path: "/run/secrets/age_identity.key" + age_recips_path: "/run/secrets/age_recipients.txt" + + # KACHING authentication + kaching_jwt_path: "/run/secrets/kaching_jwt" + + # Sequential thinking config + max_depth: 15 + enable_reflection: true + min_complexity: 7 # Only use for complex tasks + +role_model_mapping: + roles: + architect: + provider: "seqthink" # Always use Sequential Thinking for architects + fallback_provider: "resetdata" +``` + +--- + +## Timeline & Resource Estimates + +### Beat-by-Beat Timeline + +| Beat | Tasks | Duration | Resources | +|------|-------|----------|-----------| +| Beat 1 | Skeleton & plaintext | 2 days | 1 Go dev | +| Beat 2 | Age crypto | 2 days | 1 Go dev + security review | +| Beat 3 | Policy/JWT | 2 days | 1 Go dev | +| Beat 4 | Observability | 2 days | 1 Go dev | +| Beat 5 | CI/CD | 1 day | 1 DevOps | +| Beat 6 | WHOOSH integration | 2 days | 1 Go dev | +| **Total** | | **11 days** | | + +### CHORUS Integration Timeline + +| Task | Duration | Dependencies | +|------|----------|--------------| +| SeqThink provider | 2 days | Beat 2 complete | +| CHORUS config | 1 day | Provider complete | +| Integration testing | 2 days | All complete | +| **Total** | **5 days** | | + +**Grand Total**: ~16 days (3 weeks with buffer) + +--- + +## Success Criteria + +### seqthink-age Container + +- [ ] All Beat 1-6 acceptance criteria met +- [ ] Image signed with cosign +- [ ] SBOM generated +- [ ] Security scan passes (Trivy) +- [ ] No plaintext visible in tcpdump +- [ ] Load test: 100 rps, p99 < 200ms overhead +- [ ] WHOOSH demo: encrypted plan generation + +### CHORUS Integration + +- [ ] Architect agents use Sequential Thinking +- [ ] Complex tasks (complexity >= 7) trigger SeqThink +- [ ] Structured reasoning stored in TaskResponse +- [ ] Fallback to ResetData works on failures +- [ ] E2E test: council creates task with reasoning trace + +--- + +## Next Steps + +1. **Create `seqthink-age` repository** in GITEA +2. **Implement Beat 1** (skeleton) +3. **Implement Beat 2** (encryption) +4. **Continue through Beats 3-6** +5. **Integrate with CHORUS** agents +6. **Deploy to production** swarm + +Ready to proceed with implementation? diff --git a/docs/SEQUENTIAL-THINKING-INTEGRATION-PLAN.md b/docs/SEQUENTIAL-THINKING-INTEGRATION-PLAN.md new file mode 100644 index 0000000..8775675 --- /dev/null +++ b/docs/SEQUENTIAL-THINKING-INTEGRATION-PLAN.md @@ -0,0 +1,579 @@ +# Sequential Thinking Integration Plan for CHORUS Agents + +**Date**: 2025-10-13 +**Status**: Design Phase +**Priority**: High - Blocking further intelligence improvements + +--- + +## Executive Summary + +This document outlines the integration of the Sequential Thinking MCP server into CHORUS agents to enable **structured, multi-step reasoning** before task execution. This addresses the limitation in the SequentialThinkingForCHORUS repository issue and unlocks advanced agent decision-making capabilities. + +**Problem Statement**: CHORUS agents currently use simple prompt-response cycles without structured reasoning, limiting their ability to handle complex tasks requiring multi-step analysis, hypothesis generation, and iterative refinement. + +**Solution**: Integrate the `mcp__sequential-thinking__sequentialthinking` MCP tool into the AI provider layer to enable chain-of-thought reasoning for complex tasks. + +--- + +## Current Architecture Analysis + +### 1. Existing AI Provider Flow + +``` +TaskRequest → ModelProvider.ExecuteTask() → TaskResponse + ↓ + [Single LLM Call] + ↓ + Response String +``` + +**Current Providers**: +- **OllamaProvider**: Local model execution +- **ResetDataProvider**: ResetData LaaS API +- **OpenAIProvider**: OpenAI API + +**Current Limitations**: +- ✗ No structured reasoning process +- ✗ No ability to revise initial thoughts +- ✗ No hypothesis generation and verification +- ✗ No branching for alternative approaches +- ✗ Simple string reasoning field (not structured) + +### 2. TaskResponse Structure + +**Location**: `/home/tony/chorus/project-queues/active/CHORUS/pkg/ai/provider.go:53-78` + +```go +type TaskResponse struct { + Success bool `json:"success"` + TaskID string `json:"task_id"` + Response string `json:"response"` + Reasoning string `json:"reasoning,omitempty"` // ← Simple string + Actions []TaskAction `json:"actions,omitempty"` + Artifacts []Artifact `json:"artifacts,omitempty"` + TokensUsed TokenUsage `json:"tokens_used,omitempty"` + // ... other fields +} +``` + +**Opportunity**: The `Reasoning` field is perfect for storing structured thinking output! + +--- + +## Sequential Thinking MCP Tool + +### Tool Signature + +```go +mcp__sequential-thinking__sequentialthinking( + thought: string, + nextThoughtNeeded: bool, + thoughtNumber: int, + totalThoughts: int, + isRevision: bool = false, + revisesThought: int = null, + branchFromThought: int = null, + branchId: string = null, + needsMoreThoughts: bool = false +) +``` + +### Capabilities + +1. **Adaptive Thinking**: Adjust `totalThoughts` up or down as understanding deepens +2. **Revision Support**: Question and revise previous thoughts (`isRevision`, `revisesThought`) +3. **Branching**: Explore alternative approaches (`branchFromThought`, `branchId`) +4. **Hypothesis Testing**: Generate and verify hypotheses in chain-of-thought +5. **Uncertainty Expression**: Express and work through unclear aspects +6. **Context Maintenance**: Keep track of all previous thoughts + +### When to Use + +- **Complex problem decomposition** +- **Multi-step solution planning** +- **Problems requiring course correction** +- **Unclear scope requiring exploration** +- **Tasks needing context over multiple steps** +- **Filtering irrelevant information** + +--- + +## Proposed Integration Architecture + +### Phase 1: Enhanced TaskResponse Structure + +**File**: `pkg/ai/provider.go` + +```go +// StructuredReasoning represents chain-of-thought reasoning process +type StructuredReasoning struct { + Thoughts []ThoughtStep `json:"thoughts"` + FinalHypothesis string `json:"final_hypothesis,omitempty"` + VerificationSteps []string `json:"verification_steps,omitempty"` + Confidence float32 `json:"confidence"` // 0.0-1.0 + TotalRevisions int `json:"total_revisions"` + BranchesExplored int `json:"branches_explored"` +} + +// ThoughtStep represents a single step in the reasoning process +type ThoughtStep struct { + Number int `json:"number"` + Content string `json:"content"` + IsRevision bool `json:"is_revision"` + RevisesThought int `json:"revises_thought,omitempty"` + BranchID string `json:"branch_id,omitempty"` + BranchFrom int `json:"branch_from,omitempty"` + Timestamp time.Time `json:"timestamp"` +} + +// TaskResponse update +type TaskResponse struct { + // ... existing fields ... + Reasoning string `json:"reasoning,omitempty"` // Legacy simple string + StructuredReasoning *StructuredReasoning `json:"structured_reasoning,omitempty"` // NEW + // ... rest of fields ... +} +``` + +### Phase 2: Sequential Thinking Wrapper + +**New File**: `pkg/ai/sequential_thinking.go` + +```go +package ai + +import ( + "context" + "encoding/json" + "fmt" +) + +// SequentialThinkingEngine wraps MCP sequential thinking tool +type SequentialThinkingEngine struct { + mcpClient MCPClient // Interface to MCP tool +} + +// ThinkingRequest represents input for sequential thinking +type ThinkingRequest struct { + Problem string + Context map[string]interface{} + MaxThoughts int + AllowRevisions bool + AllowBranching bool +} + +// ThinkingResult represents output from sequential thinking +type ThinkingResult struct { + Thoughts []ThoughtStep + FinalConclusion string + Confidence float32 + ReasoningPath string // Markdown summary of thinking process +} + +// Think executes sequential thinking process +func (e *SequentialThinkingEngine) Think(ctx context.Context, req *ThinkingRequest) (*ThinkingResult, error) { + // Implementation: + // 1. Initialize thinking with problem statement + // 2. Iteratively call MCP tool until nextThoughtNeeded = false + // 3. Track all thoughts, revisions, branches + // 4. Generate final conclusion and reasoning summary + // 5. Return structured result +} +``` + +### Phase 3: Provider Integration + +**Modified File**: `pkg/ai/resetdata.go` + +```go +// ExecuteTask with sequential thinking +func (p *ResetDataProvider) ExecuteTask(ctx context.Context, request *TaskRequest) (*TaskResponse, error) { + startTime := time.Now() + + // Determine if task requires sequential thinking + useSequentialThinking := p.shouldUseSequentialThinking(request) + + var structuredReasoning *StructuredReasoning + var enhancedPrompt string + + if useSequentialThinking { + // Use sequential thinking engine to analyze task first + thinkingEngine := NewSequentialThinkingEngine(p.mcpClient) + + thinkingResult, err := thinkingEngine.Think(ctx, &ThinkingRequest{ + Problem: p.formatTaskAsProblem(request), + Context: request.Context, + MaxThoughts: 10, + AllowRevisions: true, + AllowBranching: true, + }) + + if err != nil { + // Fall back to direct execution if thinking fails + log.Warn().Err(err).Msg("Sequential thinking failed, falling back to direct execution") + } else { + // Use thinking result to enhance prompt + enhancedPrompt = p.buildPromptWithThinking(request, thinkingResult) + structuredReasoning = convertToStructuredReasoning(thinkingResult) + } + } + + // Execute with enhanced prompt (if available) or standard prompt + messages, _ := p.buildChatMessages(request, enhancedPrompt) + + // ... rest of execution ... + + return &TaskResponse{ + Success: true, + Response: responseText, + Reasoning: legacyReasoningString, + StructuredReasoning: structuredReasoning, // NEW + // ... rest of response ... + } +} + +// shouldUseSequentialThinking determines if task warrants sequential thinking +func (p *ResetDataProvider) shouldUseSequentialThinking(request *TaskRequest) bool { + // Use sequential thinking for: + // - High complexity tasks (complexity >= 7) + // - Architect role (requires system design) + // - Tasks with "design" or "architecture" in title/labels + // - Tasks requiring multi-step planning + + if request.Complexity >= 7 { + return true + } + + role := strings.ToLower(request.AgentRole) + if role == "architect" || role == "senior-developer" { + return true + } + + keywords := []string{"design", "architecture", "refactor", "plan", "strategy"} + taskText := strings.ToLower(request.TaskTitle + " " + request.TaskDescription) + for _, keyword := range keywords { + if strings.Contains(taskText, keyword) { + return true + } + } + + return false +} +``` + +--- + +## Implementation Phases + +### Phase 1: Foundation (Days 1-2) + +**Tasks**: +1. ✅ Define `StructuredReasoning` and `ThoughtStep` types +2. ✅ Add `StructuredReasoning` field to `TaskResponse` +3. ✅ Create `SequentialThinkingEngine` skeleton +4. ✅ Add MCP client interface for sequential-thinking tool + +**Files to Create/Modify**: +- `pkg/ai/provider.go` - Add new types +- `pkg/ai/sequential_thinking.go` - New file +- `pkg/ai/mcp_client.go` - New file for MCP integration + +**Success Criteria**: +- Code compiles without errors +- Types are properly defined +- MCP client interface is clear + +### Phase 2: Sequential Thinking Engine (Days 3-5) + +**Tasks**: +1. Implement `SequentialThinkingEngine.Think()` method +2. Implement MCP tool call wrapper +3. Add thought tracking and revision detection +4. Implement branch management +5. Generate reasoning summaries +6. Write unit tests + +**Files**: +- `pkg/ai/sequential_thinking.go` - Full implementation +- `pkg/ai/sequential_thinking_test.go` - Unit tests + +**Success Criteria**: +- Can execute complete thinking cycles +- Properly tracks revisions and branches +- Generates clear reasoning summaries +- All unit tests pass + +### Phase 3: Provider Integration (Days 6-8) + +**Tasks**: +1. Modify `ResetDataProvider.ExecuteTask()` for sequential thinking +2. Implement `shouldUseSequentialThinking()` heuristics +3. Add prompt enhancement with thinking results +4. Implement fallback for thinking failures +5. Add configuration options +6. Write integration tests + +**Files**: +- `pkg/ai/resetdata.go` - Modify ExecuteTask +- `pkg/ai/ollama.go` - Same modifications +- `config/agent.yaml` - Add sequential thinking config + +**Success Criteria**: +- Complex tasks trigger sequential thinking +- Thinking results enhance task execution +- Graceful fallback on failures +- Integration tests pass + +### Phase 4: Testing & Validation (Days 9-10) + +**Tasks**: +1. End-to-end testing with real councils +2. Test with various complexity levels +3. Validate reasoning quality improvements +4. Performance benchmarking +5. Documentation updates + +**Test Cases**: +- Simple task (complexity=3) → No sequential thinking +- Complex task (complexity=8) → Sequential thinking enabled +- Architect role → Always uses sequential thinking +- Design task → Sequential thinking with branching +- Fallback scenario → Graceful degradation + +**Success Criteria**: +- Demonstrable improvement in task quality +- Acceptable performance overhead (<30% increase in latency) +- Clear reasoning traces in artifacts +- Documentation complete + +--- + +## Configuration + +### Agent Configuration + +**File**: `config/agent.yaml` + +```yaml +ai_providers: + resetdata: + type: "resetdata" + endpoint: "${RESETDATA_API_ENDPOINT}" + api_key: "${RESETDATA_API_KEY}" + default_model: "llama3.1:70b" + + # Sequential thinking configuration + enable_sequential_thinking: true + sequential_thinking: + min_complexity: 7 # Minimum complexity to trigger + force_for_roles: # Always use for these roles + - architect + - senior-developer + max_thoughts: 15 # Maximum thinking iterations + enable_revisions: true # Allow thought revisions + enable_branching: true # Allow exploring alternatives + confidence_threshold: 0.7 # Minimum confidence for final answer +``` + +### Runtime Toggle + +Allow runtime control via council brief: + +```json +{ + "task_id": "task-123", + "complexity": 8, + "use_sequential_thinking": true, // Explicit override + "thinking_config": { + "max_thoughts": 20, + "allow_branching": true + } +} +``` + +--- + +## Benefits & Expected Improvements + +### 1. Better Problem Decomposition + +**Before**: +``` +Agent: Here's my solution [immediately provides implementation] +``` + +**After**: +``` +Thought 1: Breaking down the task into 3 main components... +Thought 2: Component A requires database schema changes... +Thought 3: Wait, revising thought 2 - migration strategy needs consideration... +Thought 4: Exploring alternative: event sourcing vs direct updates... +Thought 5: Event sourcing better for audit trail requirements... +Final: Implementation plan with 5 concrete steps... +``` + +### 2. Improved Architecture Decisions + +Architect agents can: +- Explore multiple design alternatives +- Revise decisions based on discovered constraints +- Build and verify hypotheses about scalability +- Document reasoning trail for future reference + +### 3. Higher Quality Code + +Developer agents can: +- Think through edge cases before coding +- Consider multiple implementation approaches +- Revise initial assumptions +- Plan testing strategy upfront + +### 4. Debugging Enhancement + +When tasks fail: +- Reasoning traces show where agent went wrong +- Can identify flawed assumptions +- Easier to improve prompts and heuristics + +--- + +## Performance Considerations + +### 1. Latency Impact + +**Estimated Overhead**: +- Sequential thinking: 5-15 LLM calls (vs 1 direct call) +- Expected latency increase: 10-30 seconds for complex tasks +- **Mitigation**: Only use for high-complexity tasks (complexity >= 7) + +### 2. Token Usage + +**Estimated Increase**: +- Each thought: ~200-500 tokens +- 10 thoughts: ~3000-5000 additional tokens +- **Mitigation**: Set reasonable `max_thoughts` limits + +### 3. Resource Requirements + +**MCP Server**: +- Sequential thinking MCP server must be available +- Requires proper error handling and fallback + +--- + +## Risks & Mitigations + +| Risk | Impact | Mitigation | +|------|--------|------------| +| MCP server unavailable | High | Graceful fallback to direct execution | +| Increased latency unacceptable | Medium | Make sequential thinking opt-in per task | +| Token cost explosion | Medium | Set hard limits on max_thoughts | +| Reasoning doesn't improve quality | High | A/B testing with metrics | +| Complex implementation | Medium | Phased rollout with testing | + +--- + +## Success Metrics + +### Quantitative + +1. **Task Success Rate**: Compare before/after for complexity >= 7 tasks + - Target: +15% improvement +2. **Code Quality**: Static analysis scores for generated code + - Target: +20% improvement in complexity score +3. **PR Acceptance Rate**: How many agent PRs get merged + - Target: +25% improvement +4. **Latency**: Task execution time + - Acceptable: <30% increase for complex tasks + +### Qualitative + +1. **Reasoning Quality**: Human review of reasoning traces +2. **Decision Clarity**: Can humans understand agent's thought process? +3. **Developer Feedback**: Easier to debug failed tasks? + +--- + +## Rollout Plan + +### Stage 1: Internal Testing (Week 1) + +- Deploy to development environment +- Test with synthetic tasks +- Gather performance metrics +- Refine heuristics + +### Stage 2: Limited Production (Week 2) + +- Enable for architect role only +- Enable for complexity >= 9 only +- Monitor closely +- Collect feedback + +### Stage 3: Expanded Rollout (Week 3-4) + +- Enable for all roles with complexity >= 7 +- Add complexity-based opt-in +- Full production deployment +- Continuous monitoring + +### Stage 4: Optimization (Week 5+) + +- Fine-tune heuristics based on data +- Optimize thought limits +- Improve reasoning summaries +- Add advanced features (e.g., multi-agent reasoning) + +--- + +## Future Enhancements + +### 1. Multi-Agent Reasoning + +Multiple agents can contribute thoughts to same reasoning chain: +- Architect proposes design +- Security agent reviews security implications +- Performance agent analyzes scalability + +### 2. Reasoning Templates + +Pre-defined thinking patterns for common scenarios: +- API design checklist +- Security review framework +- Performance optimization workflow + +### 3. Learning from Reasoning + +Store successful reasoning patterns: +- Build knowledge base of good reasoning traces +- Use as examples in future tasks +- Identify common pitfalls + +### 4. Visualization + +Dashboard showing reasoning graphs: +- Thought flow diagrams +- Revision history +- Branch exploration trees +- Confidence evolution + +--- + +## References + +- **SequentialThinkingForCHORUS Issue**: (Repository in GITEA) +- **MCP Sequential Thinking Tool**: Available in Claude Code MCP servers +- **CHORUS Task Execution**: `/home/tony/chorus/project-queues/active/CHORUS/pkg/execution/engine.go` +- **AI Provider Interface**: `/home/tony/chorus/project-queues/active/CHORUS/pkg/ai/provider.go` +- **ResetData Provider**: `/home/tony/chorus/project-queues/active/CHORUS/pkg/ai/resetdata.go` + +--- + +## Document Info + +- **Created**: 2025-10-13 +- **Author**: Claude Code +- **Status**: Design Complete - Ready for Implementation +- **Next Steps**: Begin Phase 1 implementation + diff --git a/pkg/seqthink/mcpclient/client.go b/pkg/seqthink/mcpclient/client.go new file mode 100644 index 0000000..b3b1589 --- /dev/null +++ b/pkg/seqthink/mcpclient/client.go @@ -0,0 +1,100 @@ +package mcpclient + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" +) + +// Client is a client for the Sequential Thinking MCP server +type Client struct { + baseURL string + httpClient *http.Client +} + +// ToolRequest represents a request to call an MCP tool +type ToolRequest struct { + Tool string `json:"tool"` + Payload map[string]interface{} `json:"payload"` +} + +// ToolResponse represents the response from an MCP tool call +type ToolResponse struct { + Result interface{} `json:"result,omitempty"` + Error string `json:"error,omitempty"` +} + +// New creates a new MCP client +func New(baseURL string) *Client { + return &Client{ + baseURL: baseURL, + httpClient: &http.Client{ + Timeout: 120 * time.Second, // Longer timeout for thinking operations + }, + } +} + +// Health checks if the MCP server is healthy +func (c *Client) Health(ctx context.Context) error { + req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/health", nil) + if err != nil { + return fmt.Errorf("create request: %w", err) + } + + resp, err := c.httpClient.Do(req) + if err != nil { + return fmt.Errorf("http request: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK { + return fmt.Errorf("health check failed: status %d", resp.StatusCode) + } + + return nil +} + +// CallTool calls an MCP tool +func (c *Client) CallTool(ctx context.Context, req *ToolRequest) (*ToolResponse, error) { + jsonData, err := json.Marshal(req) + if err != nil { + return nil, fmt.Errorf("marshal request: %w", err) + } + + httpReq, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+"/mcp/tool", bytes.NewReader(jsonData)) + if err != nil { + return nil, fmt.Errorf("create request: %w", err) + } + + httpReq.Header.Set("Content-Type", "application/json") + + resp, err := c.httpClient.Do(httpReq) + if err != nil { + return nil, fmt.Errorf("http request: %w", err) + } + defer resp.Body.Close() + + body, err := io.ReadAll(resp.Body) + if err != nil { + return nil, fmt.Errorf("read response: %w", err) + } + + if resp.StatusCode != http.StatusOK { + return nil, fmt.Errorf("tool call failed: status %d, body: %s", resp.StatusCode, string(body)) + } + + var toolResp ToolResponse + if err := json.Unmarshal(body, &toolResp); err != nil { + return nil, fmt.Errorf("unmarshal response: %w", err) + } + + if toolResp.Error != "" { + return nil, fmt.Errorf("tool error: %s", toolResp.Error) + } + + return &toolResp, nil +} diff --git a/pkg/seqthink/observability/logger.go b/pkg/seqthink/observability/logger.go new file mode 100644 index 0000000..6737d88 --- /dev/null +++ b/pkg/seqthink/observability/logger.go @@ -0,0 +1,39 @@ +package observability + +import ( + "os" + "strings" + "time" + + "github.com/rs/zerolog" + "github.com/rs/zerolog/log" +) + +// InitLogger initializes the global logger +func InitLogger(level string) { + // Set up zerolog with human-friendly console output + output := zerolog.ConsoleWriter{ + Out: os.Stdout, + TimeFormat: time.RFC3339, + } + + log.Logger = zerolog.New(output). + With(). + Timestamp(). + Caller(). + Logger() + + // Set log level + switch strings.ToLower(level) { + case "debug": + zerolog.SetGlobalLevel(zerolog.DebugLevel) + case "info": + zerolog.SetGlobalLevel(zerolog.InfoLevel) + case "warn": + zerolog.SetGlobalLevel(zerolog.WarnLevel) + case "error": + zerolog.SetGlobalLevel(zerolog.ErrorLevel) + default: + zerolog.SetGlobalLevel(zerolog.InfoLevel) + } +} diff --git a/pkg/seqthink/observability/metrics.go b/pkg/seqthink/observability/metrics.go new file mode 100644 index 0000000..06850e0 --- /dev/null +++ b/pkg/seqthink/observability/metrics.go @@ -0,0 +1,85 @@ +package observability + +import ( + "net/http" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "github.com/prometheus/client_golang/prometheus/promhttp" +) + +// Metrics holds Prometheus metrics for the wrapper +type Metrics struct { + requestsTotal prometheus.Counter + errorsTotal prometheus.Counter + decryptFails prometheus.Counter + encryptFails prometheus.Counter + policyDenials prometheus.Counter + requestDuration prometheus.Histogram +} + +// InitMetrics initializes Prometheus metrics +func InitMetrics() *Metrics { + return &Metrics{ + requestsTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "seqthink_requests_total", + Help: "Total number of requests received", + }), + errorsTotal: promauto.NewCounter(prometheus.CounterOpts{ + Name: "seqthink_errors_total", + Help: "Total number of errors", + }), + decryptFails: promauto.NewCounter(prometheus.CounterOpts{ + Name: "seqthink_decrypt_failures_total", + Help: "Total number of decryption failures", + }), + encryptFails: promauto.NewCounter(prometheus.CounterOpts{ + Name: "seqthink_encrypt_failures_total", + Help: "Total number of encryption failures", + }), + policyDenials: promauto.NewCounter(prometheus.CounterOpts{ + Name: "seqthink_policy_denials_total", + Help: "Total number of policy denials", + }), + requestDuration: promauto.NewHistogram(prometheus.HistogramOpts{ + Name: "seqthink_request_duration_seconds", + Help: "Request duration in seconds", + Buckets: prometheus.DefBuckets, + }), + } +} + +// IncrementRequests increments the request counter +func (m *Metrics) IncrementRequests() { + m.requestsTotal.Inc() +} + +// IncrementErrors increments the error counter +func (m *Metrics) IncrementErrors() { + m.errorsTotal.Inc() +} + +// IncrementDecryptFails increments the decrypt failure counter +func (m *Metrics) IncrementDecryptFails() { + m.decryptFails.Inc() +} + +// IncrementEncryptFails increments the encrypt failure counter +func (m *Metrics) IncrementEncryptFails() { + m.encryptFails.Inc() +} + +// IncrementPolicyDenials increments the policy denial counter +func (m *Metrics) IncrementPolicyDenials() { + m.policyDenials.Inc() +} + +// ObserveRequestDuration records request duration +func (m *Metrics) ObserveRequestDuration(seconds float64) { + m.requestDuration.Observe(seconds) +} + +// Handler returns the Prometheus metrics HTTP handler +func (m *Metrics) Handler() http.Handler { + return promhttp.Handler() +} diff --git a/pkg/seqthink/proxy/server.go b/pkg/seqthink/proxy/server.go new file mode 100644 index 0000000..5278a07 --- /dev/null +++ b/pkg/seqthink/proxy/server.go @@ -0,0 +1,150 @@ +package proxy + +import ( + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "time" + + "chorus/pkg/seqthink/mcpclient" + "chorus/pkg/seqthink/observability" + "github.com/gorilla/mux" + "github.com/rs/zerolog/log" +) + +// ServerConfig holds the proxy server configuration +type ServerConfig struct { + MCPClient *mcpclient.Client + Metrics *observability.Metrics + MaxBodyMB int + AgeIdentPath string + AgeRecipsPath string + KachingJWKSURL string + RequiredScope string +} + +// Server is the proxy server handling requests +type Server struct { + config ServerConfig + router *mux.Router +} + +// NewServer creates a new proxy server +func NewServer(cfg ServerConfig) (*Server, error) { + s := &Server{ + config: cfg, + router: mux.NewRouter(), + } + + // Setup routes + s.setupRoutes() + + return s, nil +} + +// Handler returns the HTTP handler +func (s *Server) Handler() http.Handler { + return s.router +} + +// setupRoutes configures the HTTP routes +func (s *Server) setupRoutes() { + // Health checks + s.router.HandleFunc("/health", s.handleHealth).Methods("GET") + s.router.HandleFunc("/ready", s.handleReady).Methods("GET") + + // MCP tool endpoint (plaintext for Beat 1) + s.router.HandleFunc("/mcp/tool", s.handleToolCall).Methods("POST") + + // SSE endpoint (placeholder for Beat 1) + s.router.HandleFunc("/mcp/sse", s.handleSSE).Methods("GET") + + // Metrics endpoint + s.router.Handle("/metrics", s.config.Metrics.Handler()) +} + +// handleHealth returns 200 OK if wrapper is running +func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) { + w.WriteHeader(http.StatusOK) + w.Write([]byte("OK")) +} + +// handleReady checks if MCP server is ready +func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) { + ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second) + defer cancel() + + if err := s.config.MCPClient.Health(ctx); err != nil { + log.Error().Err(err).Msg("MCP server not ready") + http.Error(w, "MCP server not ready", http.StatusServiceUnavailable) + return + } + + w.WriteHeader(http.StatusOK) + w.Write([]byte("READY")) +} + +// handleToolCall proxies tool calls to MCP server (plaintext for Beat 1) +func (s *Server) handleToolCall(w http.ResponseWriter, r *http.Request) { + s.config.Metrics.IncrementRequests() + startTime := time.Now() + + // Limit request body size + r.Body = http.MaxBytesReader(w, r.Body, int64(s.config.MaxBodyMB)*1024*1024) + + // Read request body + body, err := io.ReadAll(r.Body) + if err != nil { + log.Error().Err(err).Msg("Failed to read request body") + s.config.Metrics.IncrementErrors() + http.Error(w, "Failed to read request", http.StatusBadRequest) + return + } + + // Parse tool request + var toolReq mcpclient.ToolRequest + if err := json.Unmarshal(body, &toolReq); err != nil { + log.Error().Err(err).Msg("Failed to parse tool request") + s.config.Metrics.IncrementErrors() + http.Error(w, "Invalid request format", http.StatusBadRequest) + return + } + + log.Info(). + Str("tool", toolReq.Tool). + Msg("Proxying tool call to MCP server") + + // Call MCP server + ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second) + defer cancel() + + toolResp, err := s.config.MCPClient.CallTool(ctx, &toolReq) + if err != nil { + log.Error().Err(err).Msg("MCP tool call failed") + s.config.Metrics.IncrementErrors() + http.Error(w, fmt.Sprintf("Tool call failed: %v", err), http.StatusInternalServerError) + return + } + + // Return response + w.Header().Set("Content-Type", "application/json") + if err := json.NewEncoder(w).Encode(toolResp); err != nil { + log.Error().Err(err).Msg("Failed to encode response") + s.config.Metrics.IncrementErrors() + return + } + + duration := time.Since(startTime) + log.Info(). + Str("tool", toolReq.Tool). + Dur("duration", duration). + Msg("Tool call completed") +} + +// handleSSE is a placeholder for Server-Sent Events streaming (Beat 1) +func (s *Server) handleSSE(w http.ResponseWriter, r *http.Request) { + log.Warn().Msg("SSE endpoint not yet implemented") + http.Error(w, "SSE endpoint not implemented in Beat 1", http.StatusNotImplemented) +}