6 Commits

Author SHA1 Message Date
anthonyrawlins
007aeb149a Replace all Printf logging with structured zerolog in runtime files
Migrates CHORUS logging to 100% structured JSON format with ISO 8601 timestamps
for all runtime-critical subsystems.

Files modified:
- internal/runtime/shared.go: SimpleTaskTracker task completion logging
- api/http_server.go: HTTP server, council opportunity, and status logging
- pubsub/pubsub.go: PubSub initialization, topic management, and message handlers
- discovery/mdns.go: mDNS peer discovery and connection logging

All Printf calls replaced with structured zerolog logging using:
- .Info() for informational messages
- .Warn() for warnings and errors
- .Debug() for verbose debug output
- Structured fields: peer_id, topic_name, council_id, etc.

Version bumped to 0.5.40

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-19 17:04:27 +11:00
anthonyrawlins
2fd9a96950 Add Sequential Thinking compatibility server and JWKS support 2025-10-13 17:04:00 +11:00
anthonyrawlins
c99def17d7 Add deployment infrastructure and documentation
This commit adds complete deployment infrastructure for the Sequential Thinking
Age-Encrypted Wrapper, ready for Docker Swarm production deployment.

## Deliverables

### 1. Docker Swarm Service Definition
**File**: `deploy/seqthink/docker-compose.swarm.yml`

**Features**:
- 3 replicas with automatic spreading across worker nodes
- Resource limits: 1 CPU / 512MB RAM per replica
- Resource reservations: 0.5 CPU / 256MB RAM per replica
- Rolling updates with automatic rollback
- Health checks every 30 seconds
- Traefik integration with automatic HTTPS
- Load balancer with health checking
- Docker Secrets integration for age keys
- Comprehensive logging configuration

**Environment Variables**:
- `LOG_LEVEL`: Logging verbosity
- `MCP_LOCAL`: MCP server URL (loopback only)
- `PORT`: HTTP server port (8443)
- `MAX_BODY_MB`: Request size limit
- `AGE_IDENT_PATH`: Age private key path
- `AGE_RECIPS_PATH`: Age public key path
- `KACHING_JWKS_URL`: KACHING JWKS endpoint
- `REQUIRED_SCOPE`: Required JWT scope

**Secrets**:
- `seqthink_age_identity`: Age private key (mounted at /run/secrets/)
- `seqthink_age_recipients`: Age public key (mounted at /run/secrets/)

**Network**:
- `chorus-overlay`: External overlay network for service mesh

**Labels**:
- Traefik routing: `seqthink.chorus.services`
- HTTPS with Let's Encrypt
- Health check path: `/health`
- Load balancer port: 8443

### 2. Deployment Documentation
**File**: `deploy/seqthink/DEPLOYMENT.md` (500+ lines)

**Sections**:
1. **Prerequisites**: Cluster setup, network requirements
2. **Architecture**: Security layers diagram
3. **Step-by-step deployment**:
   - Generate age keys
   - Create Docker secrets
   - Build Docker image
   - Deploy to swarm
   - Verify deployment
   - Test with JWT tokens
4. **Configuration reference**: All environment variables documented
5. **Scaling**: Horizontal scaling commands
6. **Updates**: Rolling update procedures
7. **Rollback**: Automatic and manual rollback
8. **Monitoring**: Prometheus metrics, health checks, logs
9. **Troubleshooting**: Common issues and solutions
10. **Security considerations**: Key rotation, TLS, rate limiting
11. **Development mode**: Testing without security
12. **Production checklist**: Pre-deployment verification

### 3. End-to-End Test Script
**File**: `deploy/seqthink/test-e2e.sh` (executable)

**Tests**:
1. Health endpoint validation
2. Readiness endpoint validation
3. Metrics endpoint verification
4. Unauthorized request rejection (401)
5. Invalid authorization header rejection (401)
6. JWT token validation (if token provided)
7. Encrypted request/response (if age keys provided)
8. Content-Type validation (415 for wrong type)
9. Metrics collection verification
10. SSE endpoint availability

**Usage**:
```bash
# Basic tests (no auth)
./deploy/seqthink/test-e2e.sh

# With JWT token
export JWT_TOKEN="eyJhbGci..."
./deploy/seqthink/test-e2e.sh

# With JWT + encryption
export JWT_TOKEN="eyJhbGci..."
export AGE_RECIPIENT="$(cat seqthink_age.pub)"
export AGE_IDENTITY="seqthink_age.key"
./deploy/seqthink/test-e2e.sh
```

**Output**:
- Color-coded test results (✓ pass, ✗ fail, ⚠ warn)
- Test summary with counts
- Exit code 0 if all pass, 1 if any fail

### 4. Secrets Management Guide
**File**: `deploy/seqthink/SECRETS.md` (400+ lines)

**Topics**:
1. **Secret types**: Age keys, KACHING config
2. **Key generation**:
   - Method 1: Using age-keygen
   - Method 2: Using Go code
3. **Storing secrets**: Docker Swarm secret commands
4. **Using secrets**: Compose file configuration
5. **Key rotation**:
   - Why rotate
   - 5-step rotation process
   - Zero-downtime rotation
6. **Backup and recovery**:
   - Secure backup procedures
   - Age-encrypted backups
   - Recovery process
7. **Security best practices**:
   - Key generation ✓/✗ guidelines
   - Key storage ✓/✗ guidelines
   - Key distribution ✓/✗ guidelines
   - Key lifecycle ✓/✗ guidelines
8. **Troubleshooting**: Common secret issues
9. **Client-side key management**: Distributing public keys
10. **Compliance and auditing**: SOC 2, ISO 27001
11. **Emergency procedures**: Key compromise response

## Deployment Flow

### Initial Deployment
```bash
# 1. Generate keys
age-keygen -o seqthink_age.key
age-keygen -y seqthink_age.key > seqthink_age.pub

# 2. Create secrets
docker secret create seqthink_age_identity seqthink_age.key
docker secret create seqthink_age_recipients seqthink_age.pub

# 3. Build image
docker build -f deploy/seqthink/Dockerfile -t anthonyrawlins/seqthink-wrapper:latest .
docker push anthonyrawlins/seqthink-wrapper:latest

# 4. Deploy
docker stack deploy -c deploy/seqthink/docker-compose.swarm.yml seqthink

# 5. Verify
docker service ps seqthink_seqthink-wrapper
docker service logs seqthink_seqthink-wrapper

# 6. Test
./deploy/seqthink/test-e2e.sh
```

### Update Deployment
```bash
# Build new version
docker build -f deploy/seqthink/Dockerfile -t anthonyrawlins/seqthink-wrapper:0.2.0 .
docker push anthonyrawlins/seqthink-wrapper:0.2.0

# Rolling update
docker service update \
  --image anthonyrawlins/seqthink-wrapper:0.2.0 \
  seqthink_seqthink-wrapper
```

## Service Architecture

```
Internet
    ↓
Traefik (HTTPS + Load Balancing)
    ↓
seqthink.chorus.services
    ↓
Docker Swarm Overlay Network
    ↓
SeqThink Wrapper (3 replicas)
    ├─ JWT Validation (KACHING)
    ├─ Age Decryption
    ├─ MCP Server (loopback)
    ├─ Age Encryption
    └─ Response
```

## Security Layers

1. **Transport Security**: TLS 1.3 via Traefik
2. **Authentication**: JWT signature verification (RS256)
3. **Authorization**: Scope-based access control
4. **Encryption**: Age end-to-end encryption
5. **Network Isolation**: MCP server on loopback only
6. **Secrets Management**: Docker Swarm secrets (tmpfs)
7. **Resource Limits**: Container resource constraints

## Monitoring Integration

**Prometheus Metrics** (`/metrics`):
- `seqthink_requests_total`: Total requests
- `seqthink_errors_total`: Total errors
- `seqthink_decrypt_failures_total`: Decryption failures
- `seqthink_encrypt_failures_total`: Encryption failures
- `seqthink_policy_denials_total`: Authorization denials
- `seqthink_request_duration_seconds`: Request latency

**Health Checks**:
- `/health`: Liveness probe (wrapper running)
- `/ready`: Readiness probe (MCP server ready)

**Logging**:
- JSON format via docker logs
- 10MB max size, 3 file rotation
- Centralized log aggregation ready

## Production Readiness

 **High Availability**: 3 replicas with auto-restart
 **Zero-Downtime Updates**: Rolling updates with health checks
 **Automatic Rollback**: On update failure
 **Resource Management**: CPU/memory limits and reservations
 **Security**: Multi-layer defense (TLS, JWT, Age, Secrets)
 **Monitoring**: Metrics, health checks, structured logs
 **Documentation**: Complete deployment and operations guides
 **Testing**: Automated E2E test suite
 **Secrets Management**: Docker Swarm secrets with rotation procedures

## Next Steps

1. Test deployment on staging environment
2. Generate production age keys
3. Configure KACHING JWT integration
4. Deploy to production cluster
5. Monitor metrics and logs
6. Load testing and performance validation

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 08:53:36 +11:00
anthonyrawlins
9190c75440 Implement Beat 3: Policy Gate (JWT Authentication)
This commit completes Beat 3 of the SequentialThinkingForCHORUS implementation,
adding KACHING JWT policy enforcement with scope checking.

## Deliverables

### 1. JWT Validation Package (pkg/seqthink/policy/)

**jwt.go** (313 lines): Complete JWT validation system
- `Validator`: JWT token validation with JWKS fetching
- `Claims`: JWT claims structure with scope support
- JWKS fetching and caching (1-hour TTL)
- RSA public key parsing from JWK format
- Space-separated and array scope formats
- Automatic JWKS refresh on cache expiration

**Features**:
- RS256 signature verification
- Expiration and NotBefore validation
- Required scope checking
- JWKS caching to reduce API calls
- Thread-safe key cache with mutex
- Base64 URL encoding/decoding utilities

**jwt_test.go** (296 lines): Comprehensive test suite
- Valid token validation
- Expired token rejection
- Missing scope detection
- Space-separated scopes parsing
- Not-yet-valid token rejection
- JWKS caching behavior verification
- Invalid JWKS server handling
- 5 test scenarios, all passing

### 2. Authorization Middleware

**middleware.go** (75 lines): HTTP authorization middleware
- Bearer token extraction from Authorization header
- Token validation via Validator
- Policy denial metrics tracking
- Optional enforcement (disabled if no JWKS URL)
- Request logging with subject and scopes
- Clean error responses (401 Unauthorized)

**Integration**:
- Wraps `/mcp/tool` endpoint (both encrypted and plaintext)
- Wraps `/mcp/sse` endpoint (both encrypted and plaintext)
- Health and metrics endpoints remain open (no auth)
- Automatic mode detection based on configuration

### 3. Proxy Server Integration

**Updated server.go**:
- Policy middleware initialization in `NewServer()`
- Pre-fetches JWKS on startup
- Auth wrapper for protected endpoints
- Configuration-based enforcement
- Graceful fallback if JWKS unavailable

**Configuration**:
```go
ServerConfig{
    KachingJWKSURL: "https://auth.kaching.services/jwks",
    RequiredScope:  "sequentialthinking.run",
}
```

If both fields are set → policy enforcement enabled
If either is empty → policy enforcement disabled (dev mode)

## Testing Results

### Unit Tests
```
PASS: TestValidateToken (5 scenarios)
  - valid_token with required scope
  - expired_token rejection
  - missing_scope rejection
  - space_separated_scopes parsing
  - not_yet_valid rejection

PASS: TestJWKSCaching
  - Verifies JWKS fetched only once within cache window
  - Verifies JWKS re-fetched after cache expiration

PASS: TestParseScopes (5 scenarios)
  - Single scope parsing
  - Multiple scopes parsing
  - Extra spaces handling
  - Empty string handling
  - Spaces-only handling

PASS: TestInvalidJWKS
  - Handles JWKS server errors gracefully

PASS: TestGetCachedKeyCount
  - Tracks cached key count correctly
```

**All 5 test groups passed (16 total test cases)**

### Integration Verification

**Without Policy** (development):
```bash
export KACHING_JWKS_URL=""
./build/seqthink-wrapper
# → "Policy enforcement disabled"
# → All requests allowed
```

**With Policy** (production):
```bash
export KACHING_JWKS_URL="https://auth.kaching.services/jwks"
export REQUIRED_SCOPE="sequentialthinking.run"
./build/seqthink-wrapper
# → "Policy enforcement enabled"
# → JWKS pre-fetched
# → Authorization: Bearer <token> required
```

## Security Properties

 **Authentication**: RS256 JWT signature verification
 **Authorization**: Scope-based access control
 **Token Validation**: Expiration and not-before checking
 **JWKS Security**: Automatic key rotation support
 **Metrics**: Policy denial tracking for monitoring
 **Graceful Degradation**: Works without JWKS in dev mode
 **Thread Safety**: Concurrent JWKS cache access safe

## API Flow with Policy

### Successful Request:
```
1. Client → POST /mcp/tool
   Authorization: Bearer eyJhbGci...
   Content-Type: application/age
   Body: <encrypted request>

2. Middleware extracts Bearer token
3. Middleware validates JWT signature (JWKS)
4. Middleware checks required scope
5. Request forwarded to handler
6. Handler decrypts request
7. Handler calls MCP server
8. Handler encrypts response
9. Response sent to client
```

### Unauthorized Request:
```
1. Client → POST /mcp/tool
   (missing Authorization header)

2. Middleware checks for header → NOT FOUND
3. Policy denial metric incremented
4. 401 Unauthorized response
5. Request rejected
```

## Configuration Modes

**Full Security** (Beat 2 + Beat 3):
```bash
export AGE_IDENT_PATH=/etc/seqthink/age.key
export AGE_RECIPS_PATH=/etc/seqthink/age.pub
export KACHING_JWKS_URL=https://auth.kaching.services/jwks
export REQUIRED_SCOPE=sequentialthinking.run
```
→ Encryption + Authentication + Authorization

**Development Mode**:
```bash
# No AGE_* or KACHING_* variables set
```
→ Plaintext, no authentication

## Next Steps (Beat 4)

Beat 4 will add deployment infrastructure:
- Docker Swarm service definition
- Network overlay configuration
- Secret management for age keys
- KACHING integration documentation
- End-to-end testing in swarm

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 08:48:42 +11:00
anthonyrawlins
a658a7364d Implement Beat 2: Age Encryption Envelope
This commit completes Beat 2 of the SequentialThinkingForCHORUS implementation,
adding end-to-end age encryption for all MCP communications.

## Deliverables

### 1. Age Encryption/Decryption Package (pkg/seqthink/ageio/)
- `crypto.go`: Core encryption/decryption with age
- `testkeys.go`: Test key generation and convenience functions
- `crypto_test.go`: Comprehensive unit tests (11 tests, all passing)
- `golden_test.go`: Golden tests with real MCP payloads (12 tests, all passing)

**Features:**
- File-based identity and recipient key loading
- Streaming encryption/decryption support
- Proper error handling for all failure modes
- Performance benchmarks showing 400+ MB/s throughput

**Test Coverage:**
- Round-trip encryption/decryption for various payload sizes
- Unicode and emoji support
- Large payload handling (100KB+)
- Invalid ciphertext rejection
- Wrong key detection
- Truncated/modified ciphertext detection

### 2. Encrypted Proxy Handlers (pkg/seqthink/proxy/)
- `server_encrypted.go`: Encrypted tool call handler
- Updated `server.go`: Automatic routing based on encryption config
- Content-Type enforcement: `application/age` required when encryption enabled
- Metrics tracking for encryption/decryption failures

**Flow:**
1. Client sends encrypted request with `Content-Type: application/age`
2. Wrapper decrypts using age identity
3. Wrapper calls MCP server (plaintext on loopback)
4. Wrapper encrypts response
5. Client receives encrypted response with `Content-Type: application/age`

### 3. SSE Streaming with Encryption (pkg/seqthink/proxy/sse.go)
- `handleSSEEncrypted()`: Encrypted Server-Sent Events streaming
- `handleSSEPlaintext()`: Plaintext SSE for testing
- Base64-encoded encrypted frames for SSE transport
- `DecryptSSEFrame()`: Client-side frame decryption helper
- `ReadSSEStream()`: SSE stream parsing utility

**SSE Frame Format (Encrypted):**
```
event: thought
data: <base64-encoded age-encrypted JSON>
id: 1
```

### 4. Configuration-Based Mode Switching
The wrapper now operates in two modes based on environment variables:

**Encrypted Mode** (AGE_IDENT_PATH and AGE_RECIPS_PATH set):
- All requests/responses encrypted with age
- Content-Type: application/age enforced
- SSE frames base64-encoded and encrypted

**Plaintext Mode** (no encryption paths set):
- Direct plaintext proxying for development/testing
- Standard JSON Content-Type
- Plaintext SSE frames

## Testing Results

### Unit Tests
```
PASS: TestEncryptDecryptRoundTrip (all variants)
PASS: TestEncryptEmptyData
PASS: TestDecryptEmptyData
PASS: TestDecryptInvalidCiphertext
PASS: TestDecryptWrongKey
PASS: TestStreamingEncryptDecrypt
PASS: TestConvenienceFunctions
```

### Golden Tests
```
PASS: TestGoldenEncryptionRoundTrip (7 scenarios)
  - sequential_thinking_request (283→483 bytes, 70.7% overhead)
  - sequential_thinking_revision (303→503 bytes, 66.0% overhead)
  - sequential_thinking_branching (315→515 bytes, 63.5% overhead)
  - sequential_thinking_final (320→520 bytes, 62.5% overhead)
  - large_context_payload (3800→4000 bytes, 5.3% overhead)
  - unicode_payload (264→464 bytes, 75.8% overhead)
  - special_characters (140→340 bytes, 142.9% overhead)

PASS: TestGoldenDecryptionFailures (5 scenarios)
```

### Performance Benchmarks
```
Encryption:
  - 1KB:   5.44 MB/s
  - 10KB:  52.57 MB/s
  - 100KB: 398.66 MB/s

Decryption:
  - 1KB:   9.22 MB/s
  - 10KB:  85.41 MB/s
  - 100KB: 504.46 MB/s
```

## Security Properties

 **Confidentiality**: All payloads encrypted with age (X25519+ChaCha20-Poly1305)
 **Authenticity**: age provides AEAD with Poly1305 MAC
 **Forward Secrecy**: Each encryption uses fresh ephemeral keys
 **Key Management**: File-based identity/recipient keys
 **Tampering Detection**: Modified ciphertext rejected
 **No Plaintext Leakage**: MCP server only on 127.0.0.1 loopback

## Next Steps (Beat 3)

Beat 3 will add KACHING JWT policy enforcement:
- JWT token validation (`pkg/seqthink/policy/`)
- Scope checking for `sequentialthinking.run`
- JWKS fetching and caching
- Policy denial metrics

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 08:42:28 +11:00
anthonyrawlins
3ce9811826 Implement Beat 1: Sequential Thinking Age-Encrypted Wrapper (Skeleton)
This commit completes Beat 1 of the SequentialThinkingForCHORUS implementation,
providing a functional plaintext skeleton for the age-encrypted wrapper.

## Deliverables

### 1. Main Wrapper Entry Point
- `cmd/seqthink-wrapper/main.go`: HTTP server on :8443
- Configuration loading from environment variables
- Graceful shutdown handling
- MCP server readiness checking with timeout

### 2. MCP Client Package
- `pkg/seqthink/mcpclient/client.go`: HTTP client for MCP server
- Communicates with MCP server on localhost:8000
- Health check endpoint
- Tool call endpoint with 120s timeout

### 3. Proxy Server Package
- `pkg/seqthink/proxy/server.go`: HTTP handlers for wrapper
- Health and readiness endpoints
- Tool call proxy (plaintext for Beat 1)
- SSE endpoint placeholder
- Metrics endpoint integration

### 4. Observability Package
- `pkg/seqthink/observability/logger.go`: Structured logging with zerolog
- `pkg/seqthink/observability/metrics.go`: Prometheus metrics
- Counters for requests, errors, decrypt/encrypt failures, policy denials
- Request duration histogram

### 5. Docker Infrastructure
- `deploy/seqthink/Dockerfile`: Multi-stage build
- `deploy/seqthink/entrypoint.sh`: Startup orchestration
- `deploy/seqthink/mcp_stub.py`: Minimal MCP server for testing

### 6. Build System Integration
- Updated `Makefile` with `build-seqthink` target
- Uses GOWORK=off and -mod=mod for clean builds
- `docker-seqthink` target for container builds

## Testing

Successfully builds with:
```
make build-seqthink
```

Binary successfully starts and waits for MCP server connection.

## Next Steps

Beat 2 will add:
- Age encryption/decryption (pkg/seqthink/ageio)
- Content-Type: application/age enforcement
- SSE streaming with encrypted frames
- Golden tests for crypto round-trips

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-13 08:35:43 +11:00
30 changed files with 6192 additions and 123 deletions

View File

@@ -1,11 +1,12 @@
# CHORUS Multi-Binary Makefile # CHORUS Multi-Binary Makefile
# Builds both chorus-agent and chorus-hap binaries # Builds chorus-agent, chorus-hap, and seqthink-wrapper binaries
# Build configuration # Build configuration
BINARY_NAME_AGENT = chorus-agent BINARY_NAME_AGENT = chorus-agent
BINARY_NAME_HAP = chorus-hap BINARY_NAME_HAP = chorus-hap
BINARY_NAME_COMPAT = chorus BINARY_NAME_COMPAT = chorus
VERSION ?= 0.5.5 BINARY_NAME_SEQTHINK = seqthink-wrapper
VERSION ?= 0.5.40
COMMIT_HASH ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown") COMMIT_HASH ?= $(shell git rev-parse --short HEAD 2>/dev/null || echo "unknown")
BUILD_DATE ?= $(shell date -u '+%Y-%m-%d_%H:%M:%S') BUILD_DATE ?= $(shell date -u '+%Y-%m-%d_%H:%M:%S')
@@ -30,7 +31,7 @@ build: build-agent build-hap build-compat
build-agent: build-agent:
@echo "🤖 Building CHORUS autonomous agent..." @echo "🤖 Building CHORUS autonomous agent..."
@mkdir -p $(BUILD_DIR) @mkdir -p $(BUILD_DIR)
go build $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_AGENT) ./$(CMD_DIR)/agent GOWORK=off go build -mod=mod $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_AGENT) ./$(CMD_DIR)/agent
@echo "✅ Agent binary built: $(BUILD_DIR)/$(BINARY_NAME_AGENT)" @echo "✅ Agent binary built: $(BUILD_DIR)/$(BINARY_NAME_AGENT)"
# Build human agent portal binary # Build human agent portal binary
@@ -38,7 +39,7 @@ build-agent:
build-hap: build-hap:
@echo "👤 Building CHORUS human agent portal..." @echo "👤 Building CHORUS human agent portal..."
@mkdir -p $(BUILD_DIR) @mkdir -p $(BUILD_DIR)
go build $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_HAP) ./$(CMD_DIR)/hap GOWORK=off go build -mod=mod $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_HAP) ./$(CMD_DIR)/hap
@echo "✅ HAP binary built: $(BUILD_DIR)/$(BINARY_NAME_HAP)" @echo "✅ HAP binary built: $(BUILD_DIR)/$(BINARY_NAME_HAP)"
# Build compatibility wrapper (deprecated) # Build compatibility wrapper (deprecated)
@@ -46,9 +47,17 @@ build-hap:
build-compat: build-compat:
@echo "⚠️ Building CHORUS compatibility wrapper (deprecated)..." @echo "⚠️ Building CHORUS compatibility wrapper (deprecated)..."
@mkdir -p $(BUILD_DIR) @mkdir -p $(BUILD_DIR)
go build $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_COMPAT) ./$(CMD_DIR)/chorus GOWORK=off go build -mod=mod $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_COMPAT) ./$(CMD_DIR)/chorus
@echo "✅ Compatibility wrapper built: $(BUILD_DIR)/$(BINARY_NAME_COMPAT)" @echo "✅ Compatibility wrapper built: $(BUILD_DIR)/$(BINARY_NAME_COMPAT)"
# Build Sequential Thinking age-encrypted wrapper
.PHONY: build-seqthink
build-seqthink:
@echo "🔐 Building Sequential Thinking wrapper..."
@mkdir -p $(BUILD_DIR)
GOWORK=off go build -mod=mod $(BUILD_FLAGS) -o $(BUILD_DIR)/$(BINARY_NAME_SEQTHINK) ./$(CMD_DIR)/seqthink-wrapper
@echo "✅ SeqThink wrapper built: $(BUILD_DIR)/$(BINARY_NAME_SEQTHINK)"
# Test compilation without building # Test compilation without building
.PHONY: test-compile .PHONY: test-compile
test-compile: test-compile:
@@ -103,8 +112,13 @@ docker-hap:
@echo "🐳 Building Docker image for CHORUS HAP..." @echo "🐳 Building Docker image for CHORUS HAP..."
docker build -f docker/Dockerfile.hap -t chorus-hap:$(VERSION) . docker build -f docker/Dockerfile.hap -t chorus-hap:$(VERSION) .
.PHONY: docker-seqthink
docker-seqthink:
@echo "🔐 Building Docker image for Sequential Thinking wrapper..."
docker build -f deploy/seqthink/Dockerfile -t seqthink-wrapper:$(VERSION) .
.PHONY: docker .PHONY: docker
docker: docker-agent docker-hap docker: docker-agent docker-hap docker-seqthink
# Help # Help
.PHONY: help .PHONY: help
@@ -112,22 +126,24 @@ help:
@echo "CHORUS Multi-Binary Build System" @echo "CHORUS Multi-Binary Build System"
@echo "" @echo ""
@echo "Targets:" @echo "Targets:"
@echo " all - Clean and build both binaries (default)" @echo " all - Clean and build all binaries (default)"
@echo " build - Build both binaries" @echo " build - Build all binaries"
@echo " build-agent - Build autonomous agent binary only" @echo " build-agent - Build autonomous agent binary only"
@echo " build-hap - Build human agent portal binary only" @echo " build-hap - Build human agent portal binary only"
@echo " test-compile - Test that both binaries compile" @echo " build-seqthink - Build Sequential Thinking wrapper only"
@echo " test-compile - Test that binaries compile"
@echo " test - Run tests" @echo " test - Run tests"
@echo " clean - Remove build artifacts" @echo " clean - Remove build artifacts"
@echo " install - Install binaries to GOPATH/bin" @echo " install - Install binaries to GOPATH/bin"
@echo " run-agent - Build and run agent" @echo " run-agent - Build and run agent"
@echo " run-hap - Build and run HAP" @echo " run-hap - Build and run HAP"
@echo " docker - Build Docker images for both binaries" @echo " docker - Build Docker images for all binaries"
@echo " docker-agent - Build Docker image for agent only" @echo " docker-agent - Build Docker image for agent only"
@echo " docker-hap - Build Docker image for HAP only" @echo " docker-hap - Build Docker image for HAP only"
@echo " docker-seqthink - Build Docker image for SeqThink wrapper only"
@echo " help - Show this help" @echo " help - Show this help"
@echo "" @echo ""
@echo "Environment Variables:" @echo "Environment Variables:"
@echo " VERSION - Version string (default: 0.1.0-dev)" @echo " VERSION - Version string (default: 0.5.28)"
@echo " COMMIT_HASH - Git commit hash (auto-detected)" @echo " COMMIT_HASH - Git commit hash (auto-detected)"
@echo " BUILD_DATE - Build timestamp (auto-generated)" @echo " BUILD_DATE - Build timestamp (auto-generated)"

View File

@@ -16,6 +16,7 @@ import (
"chorus/pubsub" "chorus/pubsub"
"github.com/gorilla/mux" "github.com/gorilla/mux"
"github.com/rs/zerolog"
) )
// HTTPServer provides HTTP API endpoints for CHORUS // HTTPServer provides HTTP API endpoints for CHORUS
@@ -23,9 +24,11 @@ type HTTPServer struct {
port int port int
hypercoreLog *logging.HypercoreLog hypercoreLog *logging.HypercoreLog
pubsub *pubsub.PubSub pubsub *pubsub.PubSub
node *p2p.Node // P2P node for peer ID and network info
server *http.Server server *http.Server
CouncilManager *council.Manager // Exported for brief processing CouncilManager *council.Manager // Exported for brief processing
whooshEndpoint string whooshEndpoint string
logger zerolog.Logger
} }
// NewHTTPServer creates a new HTTP server for CHORUS API // NewHTTPServer creates a new HTTP server for CHORUS API
@@ -47,11 +50,18 @@ func NewHTTPServer(cfg *config.Config, node *p2p.Node, hlog *logging.HypercoreLo
port: cfg.Network.APIPort, port: cfg.Network.APIPort,
hypercoreLog: hlog, hypercoreLog: hlog,
pubsub: ps, pubsub: ps,
node: node,
CouncilManager: councilMgr, CouncilManager: councilMgr,
whooshEndpoint: strings.TrimRight(whooshEndpoint, "/"), whooshEndpoint: strings.TrimRight(whooshEndpoint, "/"),
logger: logging.ForComponent(logging.ComponentServer),
} }
} }
// WhooshEndpoint returns the WHOOSH base endpoint configured for this agent.
func (h *HTTPServer) WhooshEndpoint() string {
return h.whooshEndpoint
}
func deriveAgentName(cfg *config.Config) string { func deriveAgentName(cfg *config.Config) string {
if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_NAME")); v != "" { if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_NAME")); v != "" {
return v return v
@@ -161,7 +171,7 @@ func (h *HTTPServer) Start() error {
IdleTimeout: 60 * time.Second, IdleTimeout: 60 * time.Second,
} }
fmt.Printf("🌐 Starting HTTP API server on port %d\n", h.port) h.logger.Info().Int("port", h.port).Msg("Starting HTTP API server")
return h.server.ListenAndServe() return h.server.ListenAndServe()
} }
@@ -304,7 +314,7 @@ func (h *HTTPServer) handleGetLogStats(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(stats) json.NewEncoder(w).Encode(stats)
} }
// handleHealth returns health status // handleHealth returns health status with P2P network information
func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) { func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
@@ -314,6 +324,89 @@ func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) {
"log_entries": h.hypercoreLog.Length(), "log_entries": h.hypercoreLog.Length(),
} }
// Add P2P network information if node is available
if h.node != nil {
// Get peer ID
health["peer_id"] = h.node.ID().String()
// Build complete multiaddrs with peer ID using actual container IPs
// This is required for Docker Swarm because the service VIP load-balances
// and would cause peer ID mismatches when connecting to different replicas
var multiaddrs []string
rawAddrs := h.node.Addresses()
// Log what addresses we're getting from the node
h.logger.Debug().Int("address_count", len(rawAddrs)).Msg("Processing node addresses")
for i, addr := range rawAddrs {
h.logger.Debug().Int("index", i).Str("address", addr.String()).Msg("Raw address")
}
for _, addr := range rawAddrs {
addrStr := addr.String()
// Extract IP and port from multiaddr
var ip, port string
if strings.Contains(addrStr, "/ip4/") && strings.Contains(addrStr, "/tcp/") {
parts := strings.Split(addrStr, "/")
for i := 0; i < len(parts)-1; i++ {
if parts[i] == "ip4" {
ip = parts[i+1]
}
if parts[i] == "tcp" {
port = parts[i+1]
}
}
}
// Skip localhost addresses
if ip == "127.0.0.1" || ip == "::1" {
continue
}
// Build IP-based multiaddr for direct P2P connections
// This bypasses the Docker Swarm VIP and allows direct connection to this specific replica
if ip != "" && port != "" {
multiaddr := fmt.Sprintf("/ip4/%s/tcp/%s/p2p/%s", ip, port, h.node.ID().String())
h.logger.Debug().Str("multiaddr", multiaddr).Msg("Built multiaddr")
multiaddrs = append(multiaddrs, multiaddr)
}
}
health["multiaddrs"] = multiaddrs
// Add connected peer count
connectedPeers := h.node.ConnectedPeers()
health["connected_peers"] = connectedPeers
// P2P Connectivity Status - critical for detecting mesh issues
p2pStatus := "healthy"
if connectedPeers == 0 {
p2pStatus = "isolated" // No peers - serious issue
health["status"] = "degraded"
} else if connectedPeers < 3 {
p2pStatus = "limited" // Few peers - potential discovery issue
}
health["p2p_status"] = p2pStatus
// Add DHT status if available
if h.node.DHT() != nil {
health["dht_enabled"] = true
// DHT routing table size indicates how many nodes we know about
health["dht_routing_table_size"] = h.node.DHT().GetDHTSize()
} else {
health["dht_enabled"] = false
}
// Add GossipSub topics (static topics that agents join)
health["gossipsub_topics"] = []string{
"CHORUS/coordination/v1",
"hmmm/meta-discussion/v1",
"CHORUS/context-feedback/v1",
}
// Add bootstrap status
health["bootstrap_peers_configured"] = len(h.node.BootstrapPeers())
}
json.NewEncoder(w).Encode(health) json.NewEncoder(w).Encode(health)
} }
@@ -350,34 +443,43 @@ func (h *HTTPServer) handleCouncilOpportunity(w http.ResponseWriter, r *http.Req
"core_roles": len(opportunity.CoreRoles), "core_roles": len(opportunity.CoreRoles),
"optional_roles": len(opportunity.OptionalRoles), "optional_roles": len(opportunity.OptionalRoles),
"ucxl_address": opportunity.UCXLAddress, "ucxl_address": opportunity.UCXLAddress,
"message": fmt.Sprintf("📡 Received council opportunity for project: %s", opportunity.ProjectName), "message": fmt.Sprintf("Received council opportunity for project: %s", opportunity.ProjectName),
} }
if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil {
fmt.Printf("Failed to log council opportunity: %v\n", err) h.logger.Warn().Err(err).Msg("Failed to log council opportunity")
} }
// Log to console for immediate visibility // Log council opportunity with structured logging
fmt.Printf("\n📡 COUNCIL OPPORTUNITY RECEIVED\n") h.logger.Info().
fmt.Printf(" Council ID: %s\n", opportunity.CouncilID) Str("council_id", opportunity.CouncilID).
fmt.Printf(" Project: %s\n", opportunity.ProjectName) Str("project_name", opportunity.ProjectName).
fmt.Printf(" Repository: %s\n", opportunity.Repository) Str("repository", opportunity.Repository).
fmt.Printf(" Core Roles: %d\n", len(opportunity.CoreRoles)) Int("core_roles", len(opportunity.CoreRoles)).
fmt.Printf(" Optional Roles: %d\n", len(opportunity.OptionalRoles)) Int("optional_roles", len(opportunity.OptionalRoles)).
fmt.Printf(" UCXL: %s\n", opportunity.UCXLAddress) Str("ucxl_address", opportunity.UCXLAddress).
fmt.Printf("\n Available Roles:\n") Msg("Council opportunity received")
// Log available roles
for _, role := range opportunity.CoreRoles { for _, role := range opportunity.CoreRoles {
fmt.Printf(" - %s (%s) [CORE]\n", role.AgentName, role.RoleName) h.logger.Info().
Str("agent_name", role.AgentName).
Str("role_name", role.RoleName).
Str("role_type", "CORE").
Msg("Available role")
} }
for _, role := range opportunity.OptionalRoles { for _, role := range opportunity.OptionalRoles {
fmt.Printf(" - %s (%s) [OPTIONAL]\n", role.AgentName, role.RoleName) h.logger.Info().
Str("agent_name", role.AgentName).
Str("role_name", role.RoleName).
Str("role_type", "OPTIONAL").
Msg("Available role")
} }
fmt.Printf("\n")
// Evaluate the opportunity and claim a role if suitable // Evaluate the opportunity and claim a role if suitable
go func() { go func() {
if err := h.CouncilManager.EvaluateOpportunity(&opportunity, h.whooshEndpoint); err != nil { if err := h.CouncilManager.EvaluateOpportunity(&opportunity, h.whooshEndpoint); err != nil {
fmt.Printf("Failed to evaluate/claim council role: %v\n", err) h.logger.Warn().Err(err).Msg("Failed to evaluate/claim council role")
} }
}() }()
@@ -453,18 +555,19 @@ func (h *HTTPServer) handleCouncilStatusUpdate(w http.ResponseWriter, r *http.Re
} }
if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil {
fmt.Printf("Failed to log council status update: %v\n", err) h.logger.Warn().Err(err).Msg("Failed to log council status update")
} }
fmt.Printf("\n🏁 COUNCIL STATUS UPDATE\n") h.logger.Info().
fmt.Printf(" Council ID: %s\n", payload.CouncilID) Str("council_id", payload.CouncilID).
if payload.ProjectName != "" { Str("project_name", payload.ProjectName).
fmt.Printf(" Project: %s\n", payload.ProjectName) Str("status", payload.Status).
} Int("core_roles_claimed", payload.CoreRoles.Claimed).
fmt.Printf(" Status: %s\n", payload.Status) Int("core_roles_total", payload.CoreRoles.Total).
fmt.Printf(" Core Roles: %d/%d claimed\n", payload.CoreRoles.Claimed, payload.CoreRoles.Total) Int("optional_roles_claimed", payload.Optional.Claimed).
fmt.Printf(" Optional Roles: %d/%d claimed\n", payload.Optional.Claimed, payload.Optional.Total) Int("optional_roles_total", payload.Optional.Total).
fmt.Printf(" Message: %s\n\n", payload.Message) Str("message", payload.Message).
Msg("Council status update")
response := map[string]interface{}{ response := map[string]interface{}{
"status": "received", "status": "received",
@@ -497,13 +600,12 @@ func (h *HTTPServer) handleCouncilBrief(w http.ResponseWriter, r *http.Request)
brief.CouncilID = councilID brief.CouncilID = councilID
brief.RoleName = roleName brief.RoleName = roleName
fmt.Printf("\n📦 Received council brief for %s (%s)\n", councilID, roleName) h.logger.Info().
if brief.BriefURL != "" { Str("council_id", councilID).
fmt.Printf(" Brief URL: %s\n", brief.BriefURL) Str("role_name", roleName).
} Str("brief_url", brief.BriefURL).
if brief.Summary != "" { Str("summary", brief.Summary).
fmt.Printf(" Summary: %s\n", brief.Summary) Msg("Received council brief")
}
if h.CouncilManager != nil { if h.CouncilManager != nil {
h.CouncilManager.HandleCouncilBrief(councilID, roleName, &brief) h.CouncilManager.HandleCouncilBrief(councilID, roleName, &brief)
@@ -523,7 +625,7 @@ func (h *HTTPServer) handleCouncilBrief(w http.ResponseWriter, r *http.Request)
} }
if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil { if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil {
fmt.Printf("Failed to log council brief: %v\n", err) h.logger.Warn().Err(err).Msg("Failed to log council brief")
} }
response := map[string]interface{}{ response := map[string]interface{}{

View File

@@ -0,0 +1,173 @@
package main
import (
"context"
"fmt"
"net/http"
"os"
"os/signal"
"syscall"
"time"
"chorus/pkg/seqthink/mcpclient"
"chorus/pkg/seqthink/observability"
"chorus/pkg/seqthink/proxy"
"github.com/rs/zerolog/log"
)
// Config holds the wrapper configuration
type Config struct {
Port string
MCPLocalURL string
LogLevel string
MaxBodyMB int
HealthTimeout time.Duration
ShutdownTimeout time.Duration
AgeIdentPath string
AgeRecipsPath string
KachingJWKSURL string
RequiredScope string
}
func loadConfig() *Config {
return &Config{
Port: getEnv("PORT", "8443"),
MCPLocalURL: getEnv("MCP_LOCAL", "http://127.0.0.1:8000"),
LogLevel: getEnv("LOG_LEVEL", "info"),
MaxBodyMB: getEnvInt("MAX_BODY_MB", 4),
HealthTimeout: 5 * time.Second,
ShutdownTimeout: 30 * time.Second,
AgeIdentPath: getEnv("AGE_IDENT_PATH", ""),
AgeRecipsPath: getEnv("AGE_RECIPS_PATH", ""),
KachingJWKSURL: getEnv("KACHING_JWKS_URL", ""),
RequiredScope: getEnv("REQUIRED_SCOPE", "sequentialthinking.run"),
}
}
func main() {
cfg := loadConfig()
// Initialize observability
observability.InitLogger(cfg.LogLevel)
metrics := observability.InitMetrics()
log.Info().
Str("port", cfg.Port).
Str("mcp_url", cfg.MCPLocalURL).
Str("version", "0.1.0-beta2").
Msg("🚀 Starting Sequential Thinking Age Wrapper")
// Create MCP client
mcpClient := mcpclient.New(cfg.MCPLocalURL)
// Wait for MCP server to be ready
ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()
log.Info().Msg("⏳ Waiting for MCP server...")
if err := waitForMCP(ctx, mcpClient); err != nil {
log.Fatal().Err(err).Msg("❌ MCP server not ready")
}
log.Info().Msg("✅ MCP server ready")
// Create proxy server
proxyServer, err := proxy.NewServer(proxy.ServerConfig{
MCPClient: mcpClient,
Metrics: metrics,
MaxBodyMB: cfg.MaxBodyMB,
AgeIdentPath: cfg.AgeIdentPath,
AgeRecipsPath: cfg.AgeRecipsPath,
KachingJWKSURL: cfg.KachingJWKSURL,
RequiredScope: cfg.RequiredScope,
})
if err != nil {
log.Fatal().Err(err).Msg("❌ Failed to create proxy server")
}
// Setup HTTP server
srv := &http.Server{
Addr: ":" + cfg.Port,
Handler: proxyServer.Handler(),
ReadTimeout: 30 * time.Second,
WriteTimeout: 90 * time.Second,
IdleTimeout: 120 * time.Second,
}
// Start server in goroutine
go func() {
log.Info().
Str("addr", srv.Addr).
Bool("encryption_enabled", cfg.AgeIdentPath != "").
Bool("policy_enabled", cfg.KachingJWKSURL != "").
Msg("🔐 Wrapper listening")
if err := srv.ListenAndServe(); err != http.ErrServerClosed {
log.Fatal().Err(err).Msg("❌ HTTP server failed")
}
}()
// Wait for shutdown signal
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
<-sigChan
log.Info().Msg("🛑 Shutting down gracefully...")
// Graceful shutdown
shutdownCtx, shutdownCancel := context.WithTimeout(context.Background(), cfg.ShutdownTimeout)
defer shutdownCancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
log.Error().Err(err).Msg("⚠️ Shutdown error")
}
log.Info().Msg("✅ Shutdown complete")
}
// waitForMCP waits for MCP server to be ready
func waitForMCP(ctx context.Context, client *mcpclient.Client) error {
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return fmt.Errorf("timeout waiting for MCP server")
case <-ticker.C:
if err := client.Health(ctx); err == nil {
return nil
}
log.Debug().Msg("Waiting for MCP server...")
}
}
}
// getEnv gets environment variable with default
func getEnv(key, defaultVal string) string {
if val := os.Getenv(key); val != "" {
return val
}
return defaultVal
}
// getEnvInt gets environment variable as int with default
func getEnvInt(key string, defaultVal int) int {
val := os.Getenv(key)
if val == "" {
return defaultVal
}
var result int
if _, err := fmt.Sscanf(val, "%d", &result); err != nil {
log.Warn().
Str("key", key).
Str("value", val).
Int("default", defaultVal).
Msg("Invalid integer env var, using default")
return defaultVal
}
return result
}

View File

@@ -0,0 +1,380 @@
# Sequential Thinking Age Wrapper - Deployment Guide
## Overview
This guide covers deploying the Sequential Thinking Age-Encrypted Wrapper to Docker Swarm with full security enabled.
## Prerequisites
- Docker Swarm cluster initialized
- `chorus-overlay` network created
- Traefik reverse proxy configured
- KACHING authentication service available
## Architecture
```
Client → Traefik (HTTPS) → SeqThink Wrapper (JWT + Age Encryption) → MCP Server (loopback)
```
**Security Layers**:
1. **TLS**: Traefik terminates HTTPS
2. **JWT**: KACHING token validation
3. **Age Encryption**: End-to-end encrypted payloads
## Step 1: Generate Age Keys
Generate a key pair for encryption:
```bash
# Generate age identity (private key)
age-keygen -o seqthink_age.key
# Extract recipient (public key)
age-keygen -y seqthink_age.key > seqthink_age.pub
```
**Output**:
```
seqthink_age.key:
# created: 2025-10-13T08:00:00+11:00
# public key: age1abcd...
AGE-SECRET-KEY-1ABCD...
seqthink_age.pub:
age1abcd...
```
## Step 2: Create Docker Secrets
Store the age keys as Docker secrets:
```bash
# Create identity secret
docker secret create seqthink_age_identity seqthink_age.key
# Create recipient secret
docker secret create seqthink_age_recipients seqthink_age.pub
# Verify secrets
docker secret ls | grep seqthink
```
**Expected Output**:
```
seqthink_age_identity <timestamp>
seqthink_age_recipients <timestamp>
```
## Step 3: Build Docker Image
Build the wrapper image:
```bash
cd /home/tony/chorus/project-queues/active/CHORUS
# Build image
docker build -f deploy/seqthink/Dockerfile -t anthonyrawlins/seqthink-wrapper:latest .
# Tag with version
docker tag anthonyrawlins/seqthink-wrapper:latest anthonyrawlins/seqthink-wrapper:0.1.0
# Push to registry
docker push anthonyrawlins/seqthink-wrapper:latest
docker push anthonyrawlins/seqthink-wrapper:0.1.0
```
## Step 4: Deploy to Swarm
Deploy the service:
```bash
cd deploy/seqthink
# Deploy stack
docker stack deploy -c docker-compose.swarm.yml seqthink
# Check service status
docker service ls | grep seqthink
# Check logs
docker service logs -f seqthink_seqthink-wrapper
```
**Expected Log Output**:
```
🚀 Starting Sequential Thinking Age Wrapper
⏳ Waiting for MCP server...
✅ MCP server ready
Policy enforcement enabled
jwks_url: https://auth.kaching.services/jwks
required_scope: sequentialthinking.run
Fetching JWKS
JWKS cached successfully
key_count: 2
Encryption enabled - using encrypted endpoint
🔐 Wrapper listening
addr: :8443
encryption_enabled: true
policy_enabled: true
```
## Step 5: Verify Deployment
Check service health:
```bash
# Check replicas
docker service ps seqthink_seqthink-wrapper
# Test health endpoint
curl -f http://localhost:8443/health
# Expected: OK
# Test readiness
curl -f http://localhost:8443/ready
# Expected: READY
# Check metrics
curl http://localhost:8443/metrics | grep seqthink
```
## Step 6: Test with JWT Token
Get a KACHING JWT token and test the API:
```bash
# Set your JWT token
export JWT_TOKEN="eyJhbGciOiJSUzI1NiIsImtpZCI6ImRlZmF1bHQiLCJ0eXAiOiJKV1QifQ..."
# Test unauthorized (should fail)
curl -X POST https://seqthink.chorus.services/mcp/tool \
-H "Content-Type: application/age" \
-d "test"
# Expected: 401 Unauthorized
# Test authorized (should succeed)
curl -X POST https://seqthink.chorus.services/mcp/tool \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "Content-Type: application/age" \
-d "$(echo '{"tool":"test","payload":{}}' | age -r $(cat seqthink_age.pub))" \
--output encrypted_response.age
# Decrypt response
age -d -i seqthink_age.key encrypted_response.age
```
## Configuration Reference
### Environment Variables
| Variable | Required | Default | Description |
|----------|----------|---------|-------------|
| `PORT` | No | `8443` | HTTP server port |
| `MCP_LOCAL` | No | `http://127.0.0.1:8000` | MCP server URL (loopback) |
| `LOG_LEVEL` | No | `info` | Logging level (debug, info, warn, error) |
| `MAX_BODY_MB` | No | `4` | Maximum request body size in MB |
| `AGE_IDENT_PATH` | **Yes** | - | Path to age identity (private key) |
| `AGE_RECIPS_PATH` | **Yes** | - | Path to age recipients (public key) |
| `KACHING_JWKS_URL` | **Yes** | - | KACHING JWKS endpoint |
| `REQUIRED_SCOPE` | **Yes** | `sequentialthinking.run` | Required JWT scope |
### Docker Secrets
| Secret Name | Purpose | Content |
|-------------|---------|---------|
| `seqthink_age_identity` | Age private key | `AGE-SECRET-KEY-1...` |
| `seqthink_age_recipients` | Age public key | `age1...` |
### Network Ports
| Port | Protocol | Purpose |
|------|----------|---------|
| `8443` | HTTP | Wrapper API |
| `8000` | HTTP | MCP server (internal loopback only) |
## Scaling
Scale the service:
```bash
# Scale up
docker service scale seqthink_seqthink-wrapper=5
# Scale down
docker service scale seqthink_seqthink-wrapper=2
```
## Updates
Rolling update:
```bash
# Build new version
docker build -f deploy/seqthink/Dockerfile -t anthonyrawlins/seqthink-wrapper:0.2.0 .
docker push anthonyrawlins/seqthink-wrapper:0.2.0
# Update service
docker service update \
--image anthonyrawlins/seqthink-wrapper:0.2.0 \
seqthink_seqthink-wrapper
# Monitor rollout
docker service ps seqthink_seqthink-wrapper
```
## Rollback
If update fails:
```bash
# Automatic rollback (configured in stack)
# Or manual rollback:
docker service rollback seqthink_seqthink-wrapper
```
## Monitoring
### Prometheus Metrics
Available at `http://localhost:8443/metrics`:
```
seqthink_requests_total
seqthink_errors_total
seqthink_decrypt_failures_total
seqthink_encrypt_failures_total
seqthink_policy_denials_total
seqthink_request_duration_seconds
```
### Health Checks
- **Liveness**: `GET /health` - Returns 200 if wrapper is running
- **Readiness**: `GET /ready` - Returns 200 if MCP server is ready
### Logs
View logs:
```bash
# All replicas
docker service logs seqthink_seqthink-wrapper
# Follow logs
docker service logs -f seqthink_seqthink-wrapper
# Specific replica
docker service logs seqthink_seqthink-wrapper.<replica-id>
```
## Troubleshooting
### Issue: Policy Enforcement Disabled
**Symptoms**:
```
Policy enforcement disabled - no JWKS URL or required scope configured
```
**Solution**:
- Verify `KACHING_JWKS_URL` and `REQUIRED_SCOPE` are set
- Check environment variables: `docker service inspect seqthink_seqthink-wrapper`
### Issue: JWKS Fetch Failed
**Symptoms**:
```
Failed to pre-fetch JWKS, will retry on first request
```
**Solution**:
- Check KACHING service is accessible
- Verify JWKS URL is correct
- Check network connectivity
### Issue: Decryption Failed
**Symptoms**:
```
Failed to decrypt request
seqthink_decrypt_failures_total increasing
```
**Solution**:
- Verify age keys match between client and server
- Check client is using correct public key
- Ensure secrets are correctly mounted
### Issue: MCP Server Not Ready
**Symptoms**:
```
❌ MCP server not ready
timeout waiting for MCP server
```
**Solution**:
- Check MCP server is starting correctly
- Review entrypoint.sh logs
- Verify Python dependencies installed
## Security Considerations
1. **Key Rotation**: Periodically rotate age keys:
```bash
# Generate new keys
age-keygen -o seqthink_age_new.key
age-keygen -y seqthink_age_new.key > seqthink_age_new.pub
# Update secrets (requires service restart)
docker secret rm seqthink_age_identity
docker secret create seqthink_age_identity seqthink_age_new.key
```
2. **JWT Token Expiration**: Tokens should have short expiration times (1 hour recommended)
3. **Network Isolation**: MCP server only accessible on loopback (127.0.0.1)
4. **TLS**: Always use HTTPS in production (via Traefik)
5. **Rate Limiting**: Consider adding rate limiting at Traefik level
## Development Mode
For testing without security:
```yaml
environment:
# Disable encryption
AGE_IDENT_PATH: ""
AGE_RECIPS_PATH: ""
# Disable policy
KACHING_JWKS_URL: ""
REQUIRED_SCOPE: ""
```
**WARNING**: Only use in development environments!
## Production Checklist
- [ ] Age keys generated and stored as Docker secrets
- [ ] KACHING JWKS URL configured and accessible
- [ ] Docker image built and pushed to registry
- [ ] Service deployed to swarm
- [ ] Health checks passing
- [ ] Metrics endpoint accessible
- [ ] JWT tokens validated successfully
- [ ] End-to-end encryption verified
- [ ] Logs show no errors
- [ ] Monitoring alerts configured
- [ ] Backup of age keys stored securely
- [ ] Documentation updated with deployment details
## Support
For issues or questions:
- Check logs: `docker service logs seqthink_seqthink-wrapper`
- Review metrics: `curl http://localhost:8443/metrics`
- Consult implementation docs in `/home/tony/chorus/project-queues/active/CHORUS/docs/`

View File

@@ -0,0 +1,65 @@
# Sequential Thinking Age-Encrypted Wrapper
# Stage 1: Build Python MCP server
FROM python:3.11-slim AS python-builder
WORKDIR /mcp
# Install Sequential Thinking MCP server dependencies
# Note: For Beat 1, we'll use a minimal Python HTTP server
# Full MCP server integration happens in later beats
RUN pip install --no-cache-dir \
fastapi==0.109.0 \
uvicorn[standard]==0.27.0 \
pydantic==2.5.3
# Copy MCP compatibility server
COPY deploy/seqthink/mcp_server.py /mcp/server.py
# Stage 2: Runtime
FROM debian:bookworm-slim
# Install runtime dependencies
RUN apt-get update && \
apt-get install -y --no-install-recommends \
ca-certificates \
curl \
python3 \
python3-pip && \
apt-get clean && \
rm -rf /var/lib/apt/lists/*
# Install Python packages in runtime
RUN pip3 install --no-cache-dir --break-system-packages \
fastapi==0.109.0 \
uvicorn[standard]==0.27.0 \
pydantic==2.5.3
# Create non-root user
RUN useradd -r -u 1000 -m -s /bin/bash seqthink
# Copy wrapper binary built on host (GOWORK=off GOOS=linux go build ...)
COPY deploy/seqthink/bin/seqthink-wrapper /usr/local/bin/seqthink-wrapper
COPY --from=python-builder /mcp/server.py /opt/mcp/server.py
# Copy entrypoint
COPY deploy/seqthink/entrypoint.sh /entrypoint.sh
RUN chmod +x /entrypoint.sh
# Setup directories
RUN mkdir -p /etc/seqthink /var/log/seqthink && \
chown -R seqthink:seqthink /etc/seqthink /var/log/seqthink
# Switch to non-root user
USER seqthink
WORKDIR /home/seqthink
# Health check
HEALTHCHECK --interval=30s --timeout=10s --start-period=40s --retries=3 \
CMD curl -f http://localhost:8443/health || exit 1
# Expose wrapper port (MCP server on 127.0.0.1:8000 is internal only)
EXPOSE 8443
# Run entrypoint
ENTRYPOINT ["/entrypoint.sh"]

491
deploy/seqthink/SECRETS.md Normal file
View File

@@ -0,0 +1,491 @@
# Secrets Management Guide
## Overview
The Sequential Thinking Wrapper uses Docker Secrets for secure key management. This guide covers generating, storing, and rotating secrets.
## Secret Types
### 1. Age Encryption Keys
**Purpose**: End-to-end encryption of MCP communications
**Components**:
- **Identity (Private Key)**: `seqthink_age_identity`
- **Recipients (Public Key)**: `seqthink_age_recipients`
### 2. KACHING JWT Configuration
**Purpose**: Authentication and authorization
**Components**:
- JWKS URL (environment variable, not a secret)
- Required scope (environment variable, not a secret)
## Generating Age Keys
### Method 1: Using age-keygen
```bash
# Install age if not already installed
# macOS: brew install age
# Ubuntu: apt install age
# Arch: pacman -S age
# Generate identity (private key)
age-keygen -o seqthink_age.key
# Extract recipient (public key)
age-keygen -y seqthink_age.key > seqthink_age.pub
```
**Output Format**:
`seqthink_age.key`:
```
# created: 2025-10-13T08:00:00+11:00
# public key: age1ql3z7hjy54pw3hyww5ayyfg7zqgvc7w3j2elw8zmrj2kg5sfn9aqmcac8p
AGE-SECRET-KEY-1GFPYYSJQ...
```
`seqthink_age.pub`:
```
age1ql3z7hjy54pw3hyww5ayyfg7zqgvc7w3j2elw8zmrj2kg5sfn9aqmcac8p
```
### Method 2: Using Go Code
Create a helper script:
```go
package main
import (
"filippo.io/age"
"fmt"
"os"
)
func main() {
identity, err := age.GenerateX25519Identity()
if err != nil {
panic(err)
}
// Write identity (private key)
identityFile, _ := os.Create("seqthink_age.key")
fmt.Fprintf(identityFile, "# created: %s\n", time.Now().Format(time.RFC3339))
fmt.Fprintf(identityFile, "# public key: %s\n", identity.Recipient().String())
fmt.Fprintf(identityFile, "%s\n", identity.String())
identityFile.Close()
// Write recipient (public key)
recipientFile, _ := os.Create("seqthink_age.pub")
fmt.Fprintf(recipientFile, "%s\n", identity.Recipient().String())
recipientFile.Close()
fmt.Println("✅ Keys generated:")
fmt.Println(" Identity: seqthink_age.key")
fmt.Println(" Recipient: seqthink_age.pub")
}
```
## Storing Secrets in Docker Swarm
### Create Secrets
```bash
# Create identity secret
docker secret create seqthink_age_identity seqthink_age.key
# Create recipient secret
docker secret create seqthink_age_recipients seqthink_age.pub
```
### Verify Secrets
```bash
# List secrets
docker secret ls | grep seqthink
# Inspect secret metadata (not content)
docker secret inspect seqthink_age_identity
```
**Expected Output**:
```json
[
{
"ID": "abc123...",
"Version": {
"Index": 123
},
"CreatedAt": "2025-10-13T08:00:00.000Z",
"UpdatedAt": "2025-10-13T08:00:00.000Z",
"Spec": {
"Name": "seqthink_age_identity",
"Labels": {}
}
}
]
```
## Using Secrets in Services
### Compose File Configuration
```yaml
services:
seqthink-wrapper:
environment:
AGE_IDENT_PATH: /run/secrets/seqthink_age_identity
AGE_RECIPS_PATH: /run/secrets/seqthink_age_recipients
secrets:
- seqthink_age_identity
- seqthink_age_recipients
secrets:
seqthink_age_identity:
external: true
seqthink_age_recipients:
external: true
```
### Secret Mount Points
Inside the container, secrets are available at:
- `/run/secrets/seqthink_age_identity`
- `/run/secrets/seqthink_age_recipients`
These are read-only files mounted via tmpfs.
## Key Rotation
### Why Rotate Keys?
- Compromised key material
- Compliance requirements
- Periodic security hygiene
- Employee offboarding
### Rotation Process
#### Step 1: Generate New Keys
```bash
# Generate new keys with timestamp
TIMESTAMP=$(date +%Y%m%d)
age-keygen -o seqthink_age_${TIMESTAMP}.key
age-keygen -y seqthink_age_${TIMESTAMP}.key > seqthink_age_${TIMESTAMP}.pub
```
#### Step 2: Create New Secrets
```bash
# Create new secrets with version suffix
docker secret create seqthink_age_identity_v2 seqthink_age_${TIMESTAMP}.key
docker secret create seqthink_age_recipients_v2 seqthink_age_${TIMESTAMP}.pub
```
#### Step 3: Update Service
```bash
# Update service to use new secrets
docker service update \
--secret-rm seqthink_age_identity \
--secret-add source=seqthink_age_identity_v2,target=seqthink_age_identity \
--secret-rm seqthink_age_recipients \
--secret-add source=seqthink_age_recipients_v2,target=seqthink_age_recipients \
seqthink_seqthink-wrapper
```
#### Step 4: Verify New Keys Work
```bash
# Check service logs
docker service logs seqthink_seqthink-wrapper | tail -20
# Test encryption with new keys
echo "test" | age -r "$(cat seqthink_age_${TIMESTAMP}.pub)" | \
age -d -i seqthink_age_${TIMESTAMP}.key
```
#### Step 5: Clean Up Old Secrets
```bash
# Wait 24 hours to ensure no rollback needed
# Then remove old secrets
docker secret rm seqthink_age_identity
docker secret rm seqthink_age_recipients
# Promote v2 to primary names (optional)
docker secret create seqthink_age_identity seqthink_age_${TIMESTAMP}.key
docker secret create seqthink_age_recipients seqthink_age_${TIMESTAMP}.pub
```
## Backup and Recovery
### Backup Keys
```bash
# Create secure backup directory
mkdir -p ~/secure-backups/seqthink-keys
chmod 700 ~/secure-backups/seqthink-keys
# Copy keys to backup
cp seqthink_age.key ~/secure-backups/seqthink-keys/
cp seqthink_age.pub ~/secure-backups/seqthink-keys/
# Encrypt backup
tar czf - ~/secure-backups/seqthink-keys | \
age -r age1... > seqthink-keys-backup.tar.gz.age
# Store encrypted backup in:
# 1. Offsite backup (Backblaze, Scaleway)
# 2. Password manager (1Password, Bitwarden)
# 3. Hardware security module (YubiKey)
```
### Recover Keys
```bash
# Decrypt backup
age -d -i master_identity.key seqthink-keys-backup.tar.gz.age | \
tar xzf -
# Recreate Docker secrets
docker secret create seqthink_age_identity \
~/secure-backups/seqthink-keys/seqthink_age.key
docker secret create seqthink_age_recipients \
~/secure-backups/seqthink-keys/seqthink_age.pub
```
## Security Best Practices
### 1. Key Generation
**DO**:
- Generate keys on secure, air-gapped machines
- Use cryptographically secure random number generators
- Generate new keys per environment (dev, staging, prod)
**DON'T**:
- Reuse keys across environments
- Generate keys on shared/untrusted systems
- Store keys in git repositories
### 2. Key Storage
**DO**:
- Use Docker Secrets for production
- Encrypt backups with age or GPG
- Store backups in multiple secure locations
- Use hardware security modules for highly sensitive keys
**DON'T**:
- Store keys in environment variables
- Commit keys to version control
- Share keys via insecure channels (email, Slack)
- Store unencrypted keys on disk
### 3. Key Distribution
**DO**:
- Use secure channels (age-encrypted files, password managers)
- Verify key fingerprints before use
- Use Docker Secrets for service access
- Document key distribution recipients
**DON'T**:
- Send keys via unencrypted email
- Post keys in chat systems
- Share keys verbally
- Use public key servers for private keys
### 4. Key Lifecycle
**DO**:
- Rotate keys periodically (quarterly recommended)
- Rotate keys immediately if compromised
- Keep audit log of key generations and rotations
- Test key recovery procedures
**DON'T**:
- Keep keys indefinitely without rotation
- Delete old keys immediately (keep 30-day overlap)
- Skip testing key recovery
- Forget to document key changes
## Troubleshooting
### Issue: Secret Not Found
**Error**:
```
Error response from daemon: secret 'seqthink_age_identity' not found
```
**Solution**:
```bash
# Check if secret exists
docker secret ls | grep seqthink
# If missing, create it
docker secret create seqthink_age_identity seqthink_age.key
```
### Issue: Permission Denied Reading Secret
**Error**:
```
open /run/secrets/seqthink_age_identity: permission denied
```
**Solution**:
- Secrets are mounted read-only to containers
- Container user must have read permissions
- Check Dockerfile USER directive
### Issue: Wrong Key Used
**Error**:
```
Failed to decrypt request
seqthink_decrypt_failures_total increasing
```
**Solution**:
```bash
# Verify public key matches private key
PUBLIC_FROM_PRIVATE=$(age-keygen -y seqthink_age.key)
PUBLIC_IN_SECRET=$(cat seqthink_age.pub)
if [ "$PUBLIC_FROM_PRIVATE" = "$PUBLIC_IN_SECRET" ]; then
echo "✓ Keys match"
else
echo "✗ Keys don't match - regenerate recipient"
fi
```
### Issue: Secret Update Not Taking Effect
**Symptoms**: Service still using old keys after update
**Solution**:
```bash
# Force service update
docker service update --force seqthink_seqthink-wrapper
# Or restart service
docker service scale seqthink_seqthink-wrapper=0
docker service scale seqthink_seqthink-wrapper=3
```
## Client-Side Key Management
### Distributing Public Keys to Clients
Clients need the public key to encrypt requests:
```bash
# Generate client-friendly recipient file
cat seqthink_age.pub
# Clients can encrypt with:
echo '{"tool":"test","payload":{}}' | age -r age1ql3z7hjy54pw3... > request.age
```
### Recipient Key Distribution Methods
1. **Configuration Management**:
```yaml
seqthink:
recipient: age1ql3z7hjy54pw3hyww5ayyfg7zqgvc7w3j2elw8zmrj2kg5sfn9aqmcac8p
```
2. **Environment Variable**:
```bash
export SEQTHINK_RECIPIENT="age1ql3z7hjy54pw3..."
```
3. **API Discovery** (future):
```bash
curl https://seqthink.chorus.services/.well-known/age-recipient
```
## Compliance and Auditing
### Audit Log Example
Maintain a log of key operations:
```markdown
# seqthink-keys-audit.md
## 2025-10-13 - Initial Key Generation
- Generated by: Tony
- Purpose: Production deployment
- Public key: age1ql3z7hjy54pw3...
- Stored in: Docker Secrets + Backup
## 2025-11-15 - Quarterly Rotation
- Generated by: Tony
- Reason: Scheduled quarterly rotation
- Old public key: age1ql3z7hjy54pw3...
- New public key: age1abc123xyz...
- Overlap period: 30 days
- Old keys removed: 2025-12-15
```
### Compliance Requirements
For SOC 2, ISO 27001, or similar:
- Document key generation procedures
- Log all key rotations
- Restrict key access to authorized personnel
- Encrypt keys at rest
- Regular key rotation (90 days recommended)
- Incident response plan for key compromise
## Emergency Procedures
### Key Compromise Response
If keys are compromised:
1. **Immediate Actions** (< 1 hour):
```bash
# Generate new keys immediately
age-keygen -o seqthink_age_emergency.key
age-keygen -y seqthink_age_emergency.key > seqthink_age_emergency.pub
# Update Docker secrets
docker secret create seqthink_age_identity_emergency seqthink_age_emergency.key
docker secret create seqthink_age_recipients_emergency seqthink_age_emergency.pub
# Force service update
docker service update --force \
--secret-rm seqthink_age_identity \
--secret-add source=seqthink_age_identity_emergency,target=seqthink_age_identity \
--secret-rm seqthink_age_recipients \
--secret-add source=seqthink_age_recipients_emergency,target=seqthink_age_recipients \
seqthink_seqthink-wrapper
```
2. **Communication** (< 4 hours):
- Notify all clients of new public key
- Update documentation
- Post mortem analysis
3. **Follow-up** (< 24 hours):
- Review access logs
- Identify compromise source
- Update security procedures
- Complete incident report
## References
- [age encryption tool](https://github.com/FiloSottile/age)
- [Docker Secrets documentation](https://docs.docker.com/engine/swarm/secrets/)
- [NIST Key Management Guidelines](https://csrc.nist.gov/publications/detail/sp/800-57-part-1/rev-5/final)

View File

@@ -0,0 +1,102 @@
version: '3.8'
services:
seqthink-wrapper:
image: anthonyrawlins/seqthink-wrapper:latest
networks:
- chorus-overlay
ports:
- "8443:8443"
environment:
# Logging
LOG_LEVEL: info
# MCP server (internal loopback)
MCP_LOCAL: http://127.0.0.1:8000
# Port configuration
PORT: "8443"
# Request limits
MAX_BODY_MB: "4"
# Age encryption (use secrets)
AGE_IDENT_PATH: /run/secrets/seqthink_age_identity
AGE_RECIPS_PATH: /run/secrets/seqthink_age_recipients
# KACHING JWT policy
KACHING_JWKS_URL: https://auth.kaching.services/jwks
REQUIRED_SCOPE: sequentialthinking.run
secrets:
- seqthink_age_identity
- seqthink_age_recipients
deploy:
mode: replicated
replicas: 3
placement:
constraints:
- node.role == worker
preferences:
- spread: node.hostname
resources:
limits:
cpus: '1.0'
memory: 512M
reservations:
cpus: '0.5'
memory: 256M
restart_policy:
condition: on-failure
delay: 5s
max_attempts: 3
window: 120s
update_config:
parallelism: 1
delay: 10s
failure_action: rollback
monitor: 30s
max_failure_ratio: 0.3
rollback_config:
parallelism: 1
delay: 5s
failure_action: pause
monitor: 30s
labels:
- "traefik.enable=true"
- "traefik.http.routers.seqthink.rule=Host(`seqthink.chorus.services`)"
- "traefik.http.routers.seqthink.entrypoints=websecure"
- "traefik.http.routers.seqthink.tls=true"
- "traefik.http.routers.seqthink.tls.certresolver=letsencrypt"
- "traefik.http.services.seqthink.loadbalancer.server.port=8443"
- "traefik.http.services.seqthink.loadbalancer.healthcheck.path=/health"
- "traefik.http.services.seqthink.loadbalancer.healthcheck.interval=30s"
healthcheck:
test: ["CMD", "curl", "-f", "http://localhost:8443/health"]
interval: 30s
timeout: 10s
retries: 3
start_period: 40s
logging:
driver: "json-file"
options:
max-size: "10m"
max-file: "3"
networks:
chorus-overlay:
external: true
secrets:
seqthink_age_identity:
external: true
seqthink_age_recipients:
external: true

View File

@@ -0,0 +1,27 @@
#!/bin/bash
set -e
echo "🚀 Starting Sequential Thinking Age Wrapper"
# Start MCP server on loopback
echo "📡 Starting Sequential Thinking MCP compatibility server on 127.0.0.1:8000..."
python3 /opt/mcp/server.py &
MCP_PID=$!
# Wait for MCP server to be ready
echo "⏳ Waiting for MCP server to be ready..."
for i in {1..30}; do
if curl -sf http://127.0.0.1:8000/health > /dev/null 2>&1; then
echo "✅ MCP server ready"
break
fi
if [ $i -eq 30 ]; then
echo "❌ MCP server failed to start"
exit 1
fi
sleep 1
done
# Start wrapper
echo "🔐 Starting wrapper on :8443..."
exec seqthink-wrapper

View File

@@ -0,0 +1,160 @@
#!/usr/bin/env python3
"""Sequential Thinking MCP compatibility server (HTTP wrapper)."""
from __future__ import annotations
import json
import logging
import os
from typing import Any, Dict, List, Optional
from fastapi import FastAPI, HTTPException
import uvicorn
from pydantic import BaseModel, Field, validator
logging.basicConfig(level=logging.INFO, format="%(message)s")
logger = logging.getLogger("seqthink")
class ToolRequest(BaseModel):
tool: str
payload: Dict[str, Any]
@validator("tool")
def validate_tool(cls, value: str) -> str:
allowed = {
"sequentialthinking",
"mcp__sequential-thinking__sequentialthinking",
}
if value not in allowed:
raise ValueError(f"Unknown tool '{value}'")
return value
class ToolResponse(BaseModel):
result: Optional[Dict[str, Any]] = None
error: Optional[str] = None
class ThoughtData(BaseModel):
thought: str
thoughtNumber: int = Field(..., ge=1)
totalThoughts: int = Field(..., ge=1)
nextThoughtNeeded: bool
isRevision: Optional[bool] = False
revisesThought: Optional[int] = Field(default=None, ge=1)
branchFromThought: Optional[int] = Field(default=None, ge=1)
branchId: Optional[str] = None
needsMoreThoughts: Optional[bool] = None
@validator("totalThoughts")
def normalize_total(cls, value: int, values: Dict[str, Any]) -> int:
thought_number = values.get("thoughtNumber")
if thought_number is not None and value < thought_number:
return thought_number
return value
class SequentialThinkingEngine:
"""Replicates the upstream sequential thinking MCP behaviour."""
def __init__(self) -> None:
self._thought_history: List[ThoughtData] = []
self._branches: Dict[str, List[ThoughtData]] = {}
env = os.environ.get("DISABLE_THOUGHT_LOGGING", "")
self._disable_logging = env.lower() == "true"
def _record_branch(self, data: ThoughtData) -> None:
if data.branchFromThought and data.branchId:
self._branches.setdefault(data.branchId, []).append(data)
def _log_thought(self, data: ThoughtData) -> None:
if self._disable_logging:
return
header = []
if data.isRevision:
header.append("🔄 Revision")
if data.revisesThought:
header.append(f"(revising thought {data.revisesThought})")
elif data.branchFromThought:
header.append("🌿 Branch")
header.append(f"(from thought {data.branchFromThought})")
if data.branchId:
header.append(f"[ID: {data.branchId}]")
else:
header.append("💭 Thought")
header.append(f"{data.thoughtNumber}/{data.totalThoughts}")
header_line = " ".join(part for part in header if part)
border_width = max(len(header_line), len(data.thought)) + 4
border = "" * border_width
message = (
f"\n{border}\n"
f"{header_line.ljust(border_width - 2)}\n"
f"{border}\n"
f"{data.thought.ljust(border_width - 2)}\n"
f"{border}"
)
logger.error(message)
def process(self, payload: Dict[str, Any]) -> Dict[str, Any]:
try:
thought = ThoughtData(**payload)
except Exception as exc: # pylint: disable=broad-except
logger.exception("Invalid thought payload")
return {
"content": [
{
"type": "text",
"text": json.dumps({"error": str(exc)}, indent=2),
}
],
"isError": True,
}
self._thought_history.append(thought)
self._record_branch(thought)
self._log_thought(thought)
response_payload = {
"thoughtNumber": thought.thoughtNumber,
"totalThoughts": thought.totalThoughts,
"nextThoughtNeeded": thought.nextThoughtNeeded,
"branches": list(self._branches.keys()),
"thoughtHistoryLength": len(self._thought_history),
}
return {
"content": [
{
"type": "text",
"text": json.dumps(response_payload, indent=2),
}
]
}
engine = SequentialThinkingEngine()
app = FastAPI(title="Sequential Thinking MCP Compatibility Server")
@app.get("/health")
def health() -> Dict[str, str]:
return {"status": "ok"}
@app.post("/mcp/tool")
def call_tool(request: ToolRequest) -> ToolResponse:
try:
result = engine.process(request.payload)
if result.get("isError"):
return ToolResponse(error=result["content"][0]["text"])
return ToolResponse(result=result)
except Exception as exc: # pylint: disable=broad-except
raise HTTPException(status_code=400, detail=str(exc)) from exc
if __name__ == "__main__":
uvicorn.run(app, host="127.0.0.1", port=8000, log_level="info")

216
deploy/seqthink/test-e2e.sh Executable file
View File

@@ -0,0 +1,216 @@
#!/bin/bash
# End-to-end test script for Sequential Thinking Age Wrapper
set -e
echo "🧪 Sequential Thinking Wrapper E2E Tests"
echo "========================================"
echo ""
# Configuration
WRAPPER_URL="${WRAPPER_URL:-http://localhost:8443}"
JWT_TOKEN="${JWT_TOKEN:-}"
AGE_RECIPIENT="${AGE_RECIPIENT:-}"
AGE_IDENTITY="${AGE_IDENTITY:-}"
# Color codes
GREEN='\033[0;32m'
RED='\033[0;31m'
YELLOW='\033[1;33m'
NC='\033[0m' # No Color
# Test counters
TESTS_RUN=0
TESTS_PASSED=0
TESTS_FAILED=0
# Helper functions
pass() {
echo -e "${GREEN}${NC} $1"
((TESTS_PASSED++))
}
fail() {
echo -e "${RED}${NC} $1"
((TESTS_FAILED++))
}
warn() {
echo -e "${YELLOW}${NC} $1"
}
test_start() {
((TESTS_RUN++))
echo ""
echo "Test $TESTS_RUN: $1"
echo "---"
}
# Test 1: Health Check
test_start "Health endpoint"
if curl -sf "$WRAPPER_URL/health" > /dev/null 2>&1; then
pass "Health check passed"
else
fail "Health check failed"
fi
# Test 2: Readiness Check
test_start "Readiness endpoint"
if curl -sf "$WRAPPER_URL/ready" > /dev/null 2>&1; then
pass "Readiness check passed"
else
fail "Readiness check failed"
fi
# Test 3: Metrics Endpoint
test_start "Metrics endpoint"
if curl -sf "$WRAPPER_URL/metrics" | grep -q "seqthink_requests_total"; then
pass "Metrics endpoint accessible"
else
fail "Metrics endpoint failed"
fi
# Test 4: Unauthorized Request (no token)
test_start "Unauthorized request rejection"
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$WRAPPER_URL/mcp/tool" \
-H "Content-Type: application/json" \
-d '{"tool":"test"}')
if [ "$HTTP_CODE" = "401" ]; then
pass "Unauthorized request correctly rejected (401)"
else
warn "Expected 401, got $HTTP_CODE (may be policy disabled)"
fi
# Test 5: Invalid Authorization Header
test_start "Invalid authorization header"
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$WRAPPER_URL/mcp/tool" \
-H "Authorization: InvalidFormat" \
-H "Content-Type: application/json" \
-d '{"tool":"test"}')
if [ "$HTTP_CODE" = "401" ]; then
pass "Invalid auth header correctly rejected (401)"
else
warn "Expected 401, got $HTTP_CODE (may be policy disabled)"
fi
# Test 6: JWT Token Validation (if token provided)
if [ -n "$JWT_TOKEN" ]; then
test_start "JWT token validation"
# Check if age keys are available
if [ -n "$AGE_RECIPIENT" ] && [ -n "$AGE_IDENTITY" ]; then
# Test with encryption
test_start "Encrypted request with valid JWT"
# Create test payload
TEST_PAYLOAD='{"tool":"mcp__sequential-thinking__sequentialthinking","payload":{"thought":"Test thought","thoughtNumber":1,"totalThoughts":1,"nextThoughtNeeded":false}}'
# Encrypt payload
ENCRYPTED_PAYLOAD=$(echo "$TEST_PAYLOAD" | age -r "$AGE_RECIPIENT" 2>/dev/null)
if [ $? -eq 0 ]; then
# Send encrypted request
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$WRAPPER_URL/mcp/tool" \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "Content-Type: application/age" \
-d "$ENCRYPTED_PAYLOAD")
if [ "$HTTP_CODE" = "200" ]; then
pass "Encrypted request with JWT succeeded"
else
fail "Encrypted request failed with HTTP $HTTP_CODE"
fi
else
fail "Failed to encrypt test payload"
fi
else
# Test without encryption (plaintext mode)
test_start "Plaintext request with valid JWT"
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$WRAPPER_URL/mcp/tool" \
-H "Authorization: Bearer $JWT_TOKEN" \
-H "Content-Type: application/json" \
-d '{"tool":"mcp__sequential-thinking__sequentialthinking","payload":{"thought":"Test","thoughtNumber":1,"totalThoughts":1,"nextThoughtNeeded":false}}')
if [ "$HTTP_CODE" = "200" ]; then
pass "Plaintext request with JWT succeeded"
else
warn "Request failed with HTTP $HTTP_CODE"
fi
fi
else
warn "JWT_TOKEN not set - skipping authenticated tests"
fi
# Test 7: Content-Type Validation (if encryption enabled)
if [ -n "$AGE_RECIPIENT" ]; then
test_start "Content-Type validation for encrypted mode"
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" -X POST "$WRAPPER_URL/mcp/tool" \
-H "Authorization: Bearer ${JWT_TOKEN:-dummy}" \
-H "Content-Type: application/json" \
-d '{"tool":"test"}')
if [ "$HTTP_CODE" = "415" ]; then
pass "Incorrect Content-Type correctly rejected (415)"
else
warn "Expected 415, got $HTTP_CODE"
fi
fi
# Test 8: Metrics Collection
test_start "Metrics collection"
METRICS=$(curl -s "$WRAPPER_URL/metrics")
if echo "$METRICS" | grep -q "seqthink_requests_total"; then
REQUEST_COUNT=$(echo "$METRICS" | grep "^seqthink_requests_total" | awk '{print $2}')
pass "Request metrics collected (total: $REQUEST_COUNT)"
else
fail "Request metrics not found"
fi
if echo "$METRICS" | grep -q "seqthink_errors_total"; then
ERROR_COUNT=$(echo "$METRICS" | grep "^seqthink_errors_total" | awk '{print $2}')
pass "Error metrics collected (total: $ERROR_COUNT)"
else
fail "Error metrics not found"
fi
if echo "$METRICS" | grep -q "seqthink_policy_denials_total"; then
DENIAL_COUNT=$(echo "$METRICS" | grep "^seqthink_policy_denials_total" | awk '{print $2}')
pass "Policy denial metrics collected (total: $DENIAL_COUNT)"
else
warn "Policy denial metrics not found (may be policy disabled)"
fi
# Test 9: SSE Endpoint (basic check)
test_start "SSE endpoint availability"
# Just check if endpoint exists, don't try to consume stream
HTTP_CODE=$(curl -s -o /dev/null -w "%{http_code}" --max-time 2 "$WRAPPER_URL/mcp/sse" 2>/dev/null || echo "timeout")
if [ "$HTTP_CODE" = "401" ] || [ "$HTTP_CODE" = "200" ]; then
pass "SSE endpoint exists (HTTP $HTTP_CODE)"
else
warn "SSE endpoint check inconclusive (HTTP $HTTP_CODE)"
fi
# Summary
echo ""
echo "========================================"
echo "Test Summary"
echo "========================================"
echo "Tests Run: $TESTS_RUN"
echo -e "${GREEN}Tests Passed: $TESTS_PASSED${NC}"
if [ $TESTS_FAILED -gt 0 ]; then
echo -e "${RED}Tests Failed: $TESTS_FAILED${NC}"
fi
echo ""
if [ $TESTS_FAILED -eq 0 ]; then
echo -e "${GREEN}✓ All tests passed!${NC}"
exit 0
else
echo -e "${RED}✗ Some tests failed${NC}"
exit 1
fi

View File

@@ -5,9 +5,11 @@ import (
"fmt" "fmt"
"time" "time"
"chorus/internal/logging"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/discovery/mdns" "github.com/libp2p/go-libp2p/p2p/discovery/mdns"
"github.com/rs/zerolog"
) )
// MDNSDiscovery handles mDNS peer discovery for local network // MDNSDiscovery handles mDNS peer discovery for local network
@@ -18,6 +20,7 @@ type MDNSDiscovery struct {
ctx context.Context ctx context.Context
cancel context.CancelFunc cancel context.CancelFunc
serviceTag string serviceTag string
logger zerolog.Logger
} }
// mdnsNotifee handles discovered peers // mdnsNotifee handles discovered peers
@@ -25,6 +28,7 @@ type mdnsNotifee struct {
h host.Host h host.Host
ctx context.Context ctx context.Context
peersChan chan peer.AddrInfo peersChan chan peer.AddrInfo
logger zerolog.Logger
} }
// NewMDNSDiscovery creates a new mDNS discovery service // NewMDNSDiscovery creates a new mDNS discovery service
@@ -35,11 +39,14 @@ func NewMDNSDiscovery(ctx context.Context, h host.Host, serviceTag string) (*MDN
discoveryCtx, cancel := context.WithCancel(ctx) discoveryCtx, cancel := context.WithCancel(ctx)
logger := logging.ForComponent(logging.ComponentP2P)
// Create notifee to handle discovered peers // Create notifee to handle discovered peers
notifee := &mdnsNotifee{ notifee := &mdnsNotifee{
h: h, h: h,
ctx: discoveryCtx, ctx: discoveryCtx,
peersChan: make(chan peer.AddrInfo, 10), peersChan: make(chan peer.AddrInfo, 10),
logger: logger,
} }
// Create mDNS service // Create mDNS service
@@ -52,6 +59,7 @@ func NewMDNSDiscovery(ctx context.Context, h host.Host, serviceTag string) (*MDN
ctx: discoveryCtx, ctx: discoveryCtx,
cancel: cancel, cancel: cancel,
serviceTag: serviceTag, serviceTag: serviceTag,
logger: logger,
} }
// Start the service // Start the service
@@ -63,7 +71,7 @@ func NewMDNSDiscovery(ctx context.Context, h host.Host, serviceTag string) (*MDN
// Start background peer connection handler // Start background peer connection handler
go discovery.handleDiscoveredPeers() go discovery.handleDiscoveredPeers()
fmt.Printf("🔍 mDNS Discovery started with service tag: %s\n", serviceTag) logger.Info().Str("service_tag", serviceTag).Msg("mDNS Discovery started")
return discovery, nil return discovery, nil
} }
@@ -90,13 +98,13 @@ func (d *MDNSDiscovery) handleDiscoveredPeers() {
} }
// Attempt to connect // Attempt to connect
fmt.Printf("🤝 Discovered peer %s, attempting connection...\n", peerInfo.ID.ShortString()) d.logger.Info().Str("peer_id", peerInfo.ID.ShortString()).Msg("Discovered peer, attempting connection")
connectCtx, cancel := context.WithTimeout(d.ctx, 10*time.Second) connectCtx, cancel := context.WithTimeout(d.ctx, 10*time.Second)
if err := d.host.Connect(connectCtx, peerInfo); err != nil { if err := d.host.Connect(connectCtx, peerInfo); err != nil {
fmt.Printf("❌ Failed to connect to peer %s: %v\n", peerInfo.ID.ShortString(), err) d.logger.Warn().Err(err).Str("peer_id", peerInfo.ID.ShortString()).Msg("Failed to connect to peer")
} else { } else {
fmt.Printf("✅ Successfully connected to peer %s\n", peerInfo.ID.ShortString()) d.logger.Info().Str("peer_id", peerInfo.ID.ShortString()).Msg("Successfully connected to peer")
} }
cancel() cancel()
} }
@@ -119,6 +127,6 @@ func (n *mdnsNotifee) HandlePeerFound(pi peer.AddrInfo) {
// Peer info sent to channel // Peer info sent to channel
default: default:
// Channel is full, skip this peer // Channel is full, skip this peer
fmt.Printf("⚠️ Discovery channel full, skipping peer %s\n", pi.ID.ShortString()) n.logger.Warn().Str("peer_id", pi.ID.ShortString()).Msg("Discovery channel full, skipping peer")
} }
} }

File diff suppressed because it is too large Load Diff

View File

@@ -0,0 +1,579 @@
# Sequential Thinking Integration Plan for CHORUS Agents
**Date**: 2025-10-13
**Status**: Design Phase
**Priority**: High - Blocking further intelligence improvements
---
## Executive Summary
This document outlines the integration of the Sequential Thinking MCP server into CHORUS agents to enable **structured, multi-step reasoning** before task execution. This addresses the limitation in the SequentialThinkingForCHORUS repository issue and unlocks advanced agent decision-making capabilities.
**Problem Statement**: CHORUS agents currently use simple prompt-response cycles without structured reasoning, limiting their ability to handle complex tasks requiring multi-step analysis, hypothesis generation, and iterative refinement.
**Solution**: Integrate the `mcp__sequential-thinking__sequentialthinking` MCP tool into the AI provider layer to enable chain-of-thought reasoning for complex tasks.
---
## Current Architecture Analysis
### 1. Existing AI Provider Flow
```
TaskRequest → ModelProvider.ExecuteTask() → TaskResponse
[Single LLM Call]
Response String
```
**Current Providers**:
- **OllamaProvider**: Local model execution
- **ResetDataProvider**: ResetData LaaS API
- **OpenAIProvider**: OpenAI API
**Current Limitations**:
- ✗ No structured reasoning process
- ✗ No ability to revise initial thoughts
- ✗ No hypothesis generation and verification
- ✗ No branching for alternative approaches
- ✗ Simple string reasoning field (not structured)
### 2. TaskResponse Structure
**Location**: `/home/tony/chorus/project-queues/active/CHORUS/pkg/ai/provider.go:53-78`
```go
type TaskResponse struct {
Success bool `json:"success"`
TaskID string `json:"task_id"`
Response string `json:"response"`
Reasoning string `json:"reasoning,omitempty"` // ← Simple string
Actions []TaskAction `json:"actions,omitempty"`
Artifacts []Artifact `json:"artifacts,omitempty"`
TokensUsed TokenUsage `json:"tokens_used,omitempty"`
// ... other fields
}
```
**Opportunity**: The `Reasoning` field is perfect for storing structured thinking output!
---
## Sequential Thinking MCP Tool
### Tool Signature
```go
mcp__sequential-thinking__sequentialthinking(
thought: string,
nextThoughtNeeded: bool,
thoughtNumber: int,
totalThoughts: int,
isRevision: bool = false,
revisesThought: int = null,
branchFromThought: int = null,
branchId: string = null,
needsMoreThoughts: bool = false
)
```
### Capabilities
1. **Adaptive Thinking**: Adjust `totalThoughts` up or down as understanding deepens
2. **Revision Support**: Question and revise previous thoughts (`isRevision`, `revisesThought`)
3. **Branching**: Explore alternative approaches (`branchFromThought`, `branchId`)
4. **Hypothesis Testing**: Generate and verify hypotheses in chain-of-thought
5. **Uncertainty Expression**: Express and work through unclear aspects
6. **Context Maintenance**: Keep track of all previous thoughts
### When to Use
- **Complex problem decomposition**
- **Multi-step solution planning**
- **Problems requiring course correction**
- **Unclear scope requiring exploration**
- **Tasks needing context over multiple steps**
- **Filtering irrelevant information**
---
## Proposed Integration Architecture
### Phase 1: Enhanced TaskResponse Structure
**File**: `pkg/ai/provider.go`
```go
// StructuredReasoning represents chain-of-thought reasoning process
type StructuredReasoning struct {
Thoughts []ThoughtStep `json:"thoughts"`
FinalHypothesis string `json:"final_hypothesis,omitempty"`
VerificationSteps []string `json:"verification_steps,omitempty"`
Confidence float32 `json:"confidence"` // 0.0-1.0
TotalRevisions int `json:"total_revisions"`
BranchesExplored int `json:"branches_explored"`
}
// ThoughtStep represents a single step in the reasoning process
type ThoughtStep struct {
Number int `json:"number"`
Content string `json:"content"`
IsRevision bool `json:"is_revision"`
RevisesThought int `json:"revises_thought,omitempty"`
BranchID string `json:"branch_id,omitempty"`
BranchFrom int `json:"branch_from,omitempty"`
Timestamp time.Time `json:"timestamp"`
}
// TaskResponse update
type TaskResponse struct {
// ... existing fields ...
Reasoning string `json:"reasoning,omitempty"` // Legacy simple string
StructuredReasoning *StructuredReasoning `json:"structured_reasoning,omitempty"` // NEW
// ... rest of fields ...
}
```
### Phase 2: Sequential Thinking Wrapper
**New File**: `pkg/ai/sequential_thinking.go`
```go
package ai
import (
"context"
"encoding/json"
"fmt"
)
// SequentialThinkingEngine wraps MCP sequential thinking tool
type SequentialThinkingEngine struct {
mcpClient MCPClient // Interface to MCP tool
}
// ThinkingRequest represents input for sequential thinking
type ThinkingRequest struct {
Problem string
Context map[string]interface{}
MaxThoughts int
AllowRevisions bool
AllowBranching bool
}
// ThinkingResult represents output from sequential thinking
type ThinkingResult struct {
Thoughts []ThoughtStep
FinalConclusion string
Confidence float32
ReasoningPath string // Markdown summary of thinking process
}
// Think executes sequential thinking process
func (e *SequentialThinkingEngine) Think(ctx context.Context, req *ThinkingRequest) (*ThinkingResult, error) {
// Implementation:
// 1. Initialize thinking with problem statement
// 2. Iteratively call MCP tool until nextThoughtNeeded = false
// 3. Track all thoughts, revisions, branches
// 4. Generate final conclusion and reasoning summary
// 5. Return structured result
}
```
### Phase 3: Provider Integration
**Modified File**: `pkg/ai/resetdata.go`
```go
// ExecuteTask with sequential thinking
func (p *ResetDataProvider) ExecuteTask(ctx context.Context, request *TaskRequest) (*TaskResponse, error) {
startTime := time.Now()
// Determine if task requires sequential thinking
useSequentialThinking := p.shouldUseSequentialThinking(request)
var structuredReasoning *StructuredReasoning
var enhancedPrompt string
if useSequentialThinking {
// Use sequential thinking engine to analyze task first
thinkingEngine := NewSequentialThinkingEngine(p.mcpClient)
thinkingResult, err := thinkingEngine.Think(ctx, &ThinkingRequest{
Problem: p.formatTaskAsProblem(request),
Context: request.Context,
MaxThoughts: 10,
AllowRevisions: true,
AllowBranching: true,
})
if err != nil {
// Fall back to direct execution if thinking fails
log.Warn().Err(err).Msg("Sequential thinking failed, falling back to direct execution")
} else {
// Use thinking result to enhance prompt
enhancedPrompt = p.buildPromptWithThinking(request, thinkingResult)
structuredReasoning = convertToStructuredReasoning(thinkingResult)
}
}
// Execute with enhanced prompt (if available) or standard prompt
messages, _ := p.buildChatMessages(request, enhancedPrompt)
// ... rest of execution ...
return &TaskResponse{
Success: true,
Response: responseText,
Reasoning: legacyReasoningString,
StructuredReasoning: structuredReasoning, // NEW
// ... rest of response ...
}
}
// shouldUseSequentialThinking determines if task warrants sequential thinking
func (p *ResetDataProvider) shouldUseSequentialThinking(request *TaskRequest) bool {
// Use sequential thinking for:
// - High complexity tasks (complexity >= 7)
// - Architect role (requires system design)
// - Tasks with "design" or "architecture" in title/labels
// - Tasks requiring multi-step planning
if request.Complexity >= 7 {
return true
}
role := strings.ToLower(request.AgentRole)
if role == "architect" || role == "senior-developer" {
return true
}
keywords := []string{"design", "architecture", "refactor", "plan", "strategy"}
taskText := strings.ToLower(request.TaskTitle + " " + request.TaskDescription)
for _, keyword := range keywords {
if strings.Contains(taskText, keyword) {
return true
}
}
return false
}
```
---
## Implementation Phases
### Phase 1: Foundation (Days 1-2)
**Tasks**:
1. ✅ Define `StructuredReasoning` and `ThoughtStep` types
2. ✅ Add `StructuredReasoning` field to `TaskResponse`
3. ✅ Create `SequentialThinkingEngine` skeleton
4. ✅ Add MCP client interface for sequential-thinking tool
**Files to Create/Modify**:
- `pkg/ai/provider.go` - Add new types
- `pkg/ai/sequential_thinking.go` - New file
- `pkg/ai/mcp_client.go` - New file for MCP integration
**Success Criteria**:
- Code compiles without errors
- Types are properly defined
- MCP client interface is clear
### Phase 2: Sequential Thinking Engine (Days 3-5)
**Tasks**:
1. Implement `SequentialThinkingEngine.Think()` method
2. Implement MCP tool call wrapper
3. Add thought tracking and revision detection
4. Implement branch management
5. Generate reasoning summaries
6. Write unit tests
**Files**:
- `pkg/ai/sequential_thinking.go` - Full implementation
- `pkg/ai/sequential_thinking_test.go` - Unit tests
**Success Criteria**:
- Can execute complete thinking cycles
- Properly tracks revisions and branches
- Generates clear reasoning summaries
- All unit tests pass
### Phase 3: Provider Integration (Days 6-8)
**Tasks**:
1. Modify `ResetDataProvider.ExecuteTask()` for sequential thinking
2. Implement `shouldUseSequentialThinking()` heuristics
3. Add prompt enhancement with thinking results
4. Implement fallback for thinking failures
5. Add configuration options
6. Write integration tests
**Files**:
- `pkg/ai/resetdata.go` - Modify ExecuteTask
- `pkg/ai/ollama.go` - Same modifications
- `config/agent.yaml` - Add sequential thinking config
**Success Criteria**:
- Complex tasks trigger sequential thinking
- Thinking results enhance task execution
- Graceful fallback on failures
- Integration tests pass
### Phase 4: Testing & Validation (Days 9-10)
**Tasks**:
1. End-to-end testing with real councils
2. Test with various complexity levels
3. Validate reasoning quality improvements
4. Performance benchmarking
5. Documentation updates
**Test Cases**:
- Simple task (complexity=3) → No sequential thinking
- Complex task (complexity=8) → Sequential thinking enabled
- Architect role → Always uses sequential thinking
- Design task → Sequential thinking with branching
- Fallback scenario → Graceful degradation
**Success Criteria**:
- Demonstrable improvement in task quality
- Acceptable performance overhead (<30% increase in latency)
- Clear reasoning traces in artifacts
- Documentation complete
---
## Configuration
### Agent Configuration
**File**: `config/agent.yaml`
```yaml
ai_providers:
resetdata:
type: "resetdata"
endpoint: "${RESETDATA_API_ENDPOINT}"
api_key: "${RESETDATA_API_KEY}"
default_model: "llama3.1:70b"
# Sequential thinking configuration
enable_sequential_thinking: true
sequential_thinking:
min_complexity: 7 # Minimum complexity to trigger
force_for_roles: # Always use for these roles
- architect
- senior-developer
max_thoughts: 15 # Maximum thinking iterations
enable_revisions: true # Allow thought revisions
enable_branching: true # Allow exploring alternatives
confidence_threshold: 0.7 # Minimum confidence for final answer
```
### Runtime Toggle
Allow runtime control via council brief:
```json
{
"task_id": "task-123",
"complexity": 8,
"use_sequential_thinking": true, // Explicit override
"thinking_config": {
"max_thoughts": 20,
"allow_branching": true
}
}
```
---
## Benefits & Expected Improvements
### 1. Better Problem Decomposition
**Before**:
```
Agent: Here's my solution [immediately provides implementation]
```
**After**:
```
Thought 1: Breaking down the task into 3 main components...
Thought 2: Component A requires database schema changes...
Thought 3: Wait, revising thought 2 - migration strategy needs consideration...
Thought 4: Exploring alternative: event sourcing vs direct updates...
Thought 5: Event sourcing better for audit trail requirements...
Final: Implementation plan with 5 concrete steps...
```
### 2. Improved Architecture Decisions
Architect agents can:
- Explore multiple design alternatives
- Revise decisions based on discovered constraints
- Build and verify hypotheses about scalability
- Document reasoning trail for future reference
### 3. Higher Quality Code
Developer agents can:
- Think through edge cases before coding
- Consider multiple implementation approaches
- Revise initial assumptions
- Plan testing strategy upfront
### 4. Debugging Enhancement
When tasks fail:
- Reasoning traces show where agent went wrong
- Can identify flawed assumptions
- Easier to improve prompts and heuristics
---
## Performance Considerations
### 1. Latency Impact
**Estimated Overhead**:
- Sequential thinking: 5-15 LLM calls (vs 1 direct call)
- Expected latency increase: 10-30 seconds for complex tasks
- **Mitigation**: Only use for high-complexity tasks (complexity >= 7)
### 2. Token Usage
**Estimated Increase**:
- Each thought: ~200-500 tokens
- 10 thoughts: ~3000-5000 additional tokens
- **Mitigation**: Set reasonable `max_thoughts` limits
### 3. Resource Requirements
**MCP Server**:
- Sequential thinking MCP server must be available
- Requires proper error handling and fallback
---
## Risks & Mitigations
| Risk | Impact | Mitigation |
|------|--------|------------|
| MCP server unavailable | High | Graceful fallback to direct execution |
| Increased latency unacceptable | Medium | Make sequential thinking opt-in per task |
| Token cost explosion | Medium | Set hard limits on max_thoughts |
| Reasoning doesn't improve quality | High | A/B testing with metrics |
| Complex implementation | Medium | Phased rollout with testing |
---
## Success Metrics
### Quantitative
1. **Task Success Rate**: Compare before/after for complexity >= 7 tasks
- Target: +15% improvement
2. **Code Quality**: Static analysis scores for generated code
- Target: +20% improvement in complexity score
3. **PR Acceptance Rate**: How many agent PRs get merged
- Target: +25% improvement
4. **Latency**: Task execution time
- Acceptable: <30% increase for complex tasks
### Qualitative
1. **Reasoning Quality**: Human review of reasoning traces
2. **Decision Clarity**: Can humans understand agent's thought process?
3. **Developer Feedback**: Easier to debug failed tasks?
---
## Rollout Plan
### Stage 1: Internal Testing (Week 1)
- Deploy to development environment
- Test with synthetic tasks
- Gather performance metrics
- Refine heuristics
### Stage 2: Limited Production (Week 2)
- Enable for architect role only
- Enable for complexity >= 9 only
- Monitor closely
- Collect feedback
### Stage 3: Expanded Rollout (Week 3-4)
- Enable for all roles with complexity >= 7
- Add complexity-based opt-in
- Full production deployment
- Continuous monitoring
### Stage 4: Optimization (Week 5+)
- Fine-tune heuristics based on data
- Optimize thought limits
- Improve reasoning summaries
- Add advanced features (e.g., multi-agent reasoning)
---
## Future Enhancements
### 1. Multi-Agent Reasoning
Multiple agents can contribute thoughts to same reasoning chain:
- Architect proposes design
- Security agent reviews security implications
- Performance agent analyzes scalability
### 2. Reasoning Templates
Pre-defined thinking patterns for common scenarios:
- API design checklist
- Security review framework
- Performance optimization workflow
### 3. Learning from Reasoning
Store successful reasoning patterns:
- Build knowledge base of good reasoning traces
- Use as examples in future tasks
- Identify common pitfalls
### 4. Visualization
Dashboard showing reasoning graphs:
- Thought flow diagrams
- Revision history
- Branch exploration trees
- Confidence evolution
---
## References
- **SequentialThinkingForCHORUS Issue**: (Repository in GITEA)
- **MCP Sequential Thinking Tool**: Available in Claude Code MCP servers
- **CHORUS Task Execution**: `/home/tony/chorus/project-queues/active/CHORUS/pkg/execution/engine.go`
- **AI Provider Interface**: `/home/tony/chorus/project-queues/active/CHORUS/pkg/ai/provider.go`
- **ResetData Provider**: `/home/tony/chorus/project-queues/active/CHORUS/pkg/ai/resetdata.go`
---
## Document Info
- **Created**: 2025-10-13
- **Author**: Claude Code
- **Status**: Design Complete - Ready for Implementation
- **Next Steps**: Begin Phase 1 implementation

3
go.mod
View File

@@ -12,6 +12,7 @@ require (
github.com/docker/go-connections v0.6.0 github.com/docker/go-connections v0.6.0
github.com/docker/go-units v0.5.0 github.com/docker/go-units v0.5.0
github.com/go-redis/redis/v8 v8.11.5 github.com/go-redis/redis/v8 v8.11.5
github.com/golang-jwt/jwt/v5 v5.3.0
github.com/google/uuid v1.6.0 github.com/google/uuid v1.6.0
github.com/gorilla/mux v1.8.1 github.com/gorilla/mux v1.8.1
github.com/gorilla/websocket v1.5.0 github.com/gorilla/websocket v1.5.0
@@ -23,6 +24,7 @@ require (
github.com/multiformats/go-multihash v0.2.3 github.com/multiformats/go-multihash v0.2.3
github.com/prometheus/client_golang v1.19.1 github.com/prometheus/client_golang v1.19.1
github.com/robfig/cron/v3 v3.0.1 github.com/robfig/cron/v3 v3.0.1
github.com/rs/zerolog v1.32.0
github.com/sashabaranov/go-openai v1.41.1 github.com/sashabaranov/go-openai v1.41.1
github.com/sony/gobreaker v0.5.0 github.com/sony/gobreaker v0.5.0
github.com/stretchr/testify v1.11.1 github.com/stretchr/testify v1.11.1
@@ -108,6 +110,7 @@ require (
github.com/libp2p/go-yamux/v4 v4.0.1 // indirect github.com/libp2p/go-yamux/v4 v4.0.1 // indirect
github.com/libp2p/zeroconf/v2 v2.2.0 // indirect github.com/libp2p/zeroconf/v2 v2.2.0 // indirect
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.20 // indirect github.com/mattn/go-isatty v0.0.20 // indirect
github.com/miekg/dns v1.1.56 // indirect github.com/miekg/dns v1.1.56 // indirect
github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect github.com/mikioh/tcpinfo v0.0.0-20190314235526-30a79bb1804b // indirect

11
go.sum
View File

@@ -147,6 +147,8 @@ github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7a
github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o= github.com/gogo/protobuf v1.3.1/go.mod h1:SlYgWuQ5SjCEi6WLHjHCa1yvBfUnHcTbrrZtXPKa29o=
github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q= github.com/gogo/protobuf v1.3.2 h1:Ov1cvc58UF3b5XjBnZv7+opcTcQFZebYjWzi34vdm4Q=
github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q= github.com/gogo/protobuf v1.3.2/go.mod h1:P1XiOD3dCwIKUDQYPy72D8LYyHL2YPYrpS2s69NZV8Q=
github.com/golang-jwt/jwt/v5 v5.3.0 h1:pv4AsKCKKZuqlgs5sUmn4x8UlGa0kEVt/puTpKx9vvo=
github.com/golang-jwt/jwt/v5 v5.3.0/go.mod h1:fxCRLWMO43lRc8nhHWY6LGqRcf+1gQWArsqaEUEa5bE=
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q= github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b/go.mod h1:SBH7ygxi8pfUlaOkMMuAQtPIUF8ecWP5IEl/CR7VP2Q=
github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc= github.com/golang/groupcache v0.0.0-20200121045136-8c9f03a8e57e/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE= github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da h1:oI5xCqsCo564l8iNU+DwB5epxmsaqB+rhGL0m5jtYqE=
@@ -304,7 +306,11 @@ github.com/lunixbochs/vtclean v1.0.0/go.mod h1:pHhQNgMf3btfWnGBVipUOjRYhoOsdGqdm
github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc= github.com/mailru/easyjson v0.0.0-20190312143242-1de009706dbe/go.mod h1:C1wdFJiN94OJF2b5HbByQZoLdCWB1Yqtg26g4irojpc=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd h1:br0buuQ854V8u83wA0rVZ8ttrq5CpaPZdvrK0LP2lOk=
github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU= github.com/marten-seemann/tcp v0.0.0-20210406111302-dfbc87cc63fd/go.mod h1:QuCEs1Nt24+FYQEqAAncTDPJIuGs+LxK1MCiFL25pMU=
github.com/mattn/go-colorable v0.1.13 h1:fFA4WZxdEF4tXPZVKMLwD8oUnCTTo08duU7wxecdEvA=
github.com/mattn/go-colorable v0.1.13/go.mod h1:7S9/ev0klgBDR4GtXTXX8a3vIGJpMovkB8vQcUbaXHg=
github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94= github.com/mattn/go-isatty v0.0.14/go.mod h1:7GGIvUiUoEMVVmxf/4nioHXj79iQHKdU27kJ6hsGG94=
github.com/mattn/go-isatty v0.0.16/go.mod h1:kYGgaQfpe5nmfYZH+SKPsOc2e4SrIfOl2e/yFXSvRLM=
github.com/mattn/go-isatty v0.0.19/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY= github.com/mattn/go-isatty v0.0.20 h1:xfD0iDuEKnDkl03q4limB+vH+GxLEtL/jb4xVJSWWEY=
github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y= github.com/mattn/go-isatty v0.0.20/go.mod h1:W+V8PltTTMOvKvAeJH7IuucS94S2C6jfK/D7dTCTo3Y=
github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0= github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5Ld7szi9bcBfOoFv/3dc6xSMkL2PC0=
@@ -426,6 +432,9 @@ github.com/robfig/cron/v3 v3.0.1/go.mod h1:eQICP3HwyT7UooqI/z+Ov+PtYAWygg1TEWWzG
github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4= github.com/rogpeppe/go-internal v1.3.0/go.mod h1:M8bDsm7K2OlrFYOpmOWEs/qY81heoFRclV5y23lUDJ4=
github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8= github.com/rogpeppe/go-internal v1.12.0 h1:exVL4IDcn6na9z1rAb56Vxr+CgyK3nn3O+epU5NdKM8=
github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4= github.com/rogpeppe/go-internal v1.12.0/go.mod h1:E+RYuTGaKKdloAfM02xzb0FW3Paa99yedzYV+kq4uf4=
github.com/rs/xid v1.5.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg=
github.com/rs/zerolog v1.32.0 h1:keLypqrlIjaFsbmJOBdB/qvyF8KEtCWHwobLp5l/mQ0=
github.com/rs/zerolog v1.32.0/go.mod h1:/7mN4D5sKwJLZQ2b/znpjC3/GQWY/xaDXUM0kKWRHss=
github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g= github.com/russross/blackfriday v1.5.2/go.mod h1:JO/DiYxRf+HjHt06OyowR9PTA263kcR/rfWxYHBV53g=
github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM= github.com/russross/blackfriday/v2 v2.0.1/go.mod h1:+Rmxgy9KzJVeS9/2gXHxylqXiyQDYRxCVz55jmeOWTM=
github.com/sashabaranov/go-openai v1.41.1 h1:zf5tM+GuxpyiyD9XZg8nCqu52eYFQg9OOew0gnIuDy4= github.com/sashabaranov/go-openai v1.41.1 h1:zf5tM+GuxpyiyD9XZg8nCqu52eYFQg9OOew0gnIuDy4=
@@ -620,8 +629,10 @@ golang.org/x/sys v0.0.0-20210426080607-c94f62235c83/go.mod h1:h1NjWce9XRLGQEsW7w
golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210510120138-977fb7262007/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20210630005230-0f9fa26af87c/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.0.0-20220811171246-fbc7d0a398ab/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.5.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= golang.org/x/sys v0.6.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.12.0/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU= golang.org/x/sys v0.29.0 h1:TPYlXGxvx1MGTn2GiZDhnjPA9wZzZeGKHHmKhHYvgaU=
golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.29.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI= golang.org/x/sys v0.35.0 h1:vz1N37gP5bs89s7He8XuIYXpyY0+QlsKmzipCbUtyxI=

View File

@@ -2,8 +2,8 @@ package runtime
import ( import (
"context" "context"
"encoding/json"
"fmt" "fmt"
"log"
"net/http" "net/http"
"os" "os"
"path/filepath" "path/filepath"
@@ -16,6 +16,7 @@ import (
"chorus/internal/backbeat" "chorus/internal/backbeat"
"chorus/internal/licensing" "chorus/internal/licensing"
"chorus/internal/logging" "chorus/internal/logging"
councilnats "chorus/internal/nats"
"chorus/p2p" "chorus/p2p"
"chorus/pkg/config" "chorus/pkg/config"
"chorus/pkg/dht" "chorus/pkg/dht"
@@ -32,29 +33,38 @@ import (
"chorus/reasoning" "chorus/reasoning"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
"github.com/rs/zerolog"
) )
// Build information - set by main package // Build information - set by main package
var ( var (
AppName = "CHORUS" AppName = "CHORUS"
AppVersion = "0.1.0-dev" AppVersion = "0.5.32"
AppCommitHash = "unknown" AppCommitHash = "unknown"
AppBuildDate = "unknown" AppBuildDate = "unknown"
) )
// SimpleLogger provides basic logging implementation // SimpleLogger provides structured logging implementation via zerolog
type SimpleLogger struct{} type SimpleLogger struct {
logger zerolog.Logger
}
func NewSimpleLogger(component string) *SimpleLogger {
return &SimpleLogger{
logger: logging.ForComponent(component),
}
}
func (l *SimpleLogger) Info(msg string, args ...interface{}) { func (l *SimpleLogger) Info(msg string, args ...interface{}) {
log.Printf("[INFO] "+msg, args...) l.logger.Info().Msgf(msg, args...)
} }
func (l *SimpleLogger) Warn(msg string, args ...interface{}) { func (l *SimpleLogger) Warn(msg string, args ...interface{}) {
log.Printf("[WARN] "+msg, args...) l.logger.Warn().Msgf(msg, args...)
} }
func (l *SimpleLogger) Error(msg string, args ...interface{}) { func (l *SimpleLogger) Error(msg string, args ...interface{}) {
log.Printf("[ERROR] "+msg, args...) l.logger.Error().Msgf(msg, args...)
} }
// SimpleTaskTracker tracks active tasks for availability reporting // SimpleTaskTracker tracks active tasks for availability reporting
@@ -62,6 +72,7 @@ type SimpleTaskTracker struct {
maxTasks int maxTasks int
activeTasks map[string]bool activeTasks map[string]bool
decisionPublisher *ucxl.DecisionPublisher decisionPublisher *ucxl.DecisionPublisher
logger zerolog.Logger
} }
// GetActiveTasks returns list of active task IDs // GetActiveTasks returns list of active task IDs
@@ -100,9 +111,14 @@ func (t *SimpleTaskTracker) publishTaskCompletion(taskID string, success bool, s
} }
if err := t.decisionPublisher.PublishTaskCompletion(taskID, success, summary, filesModified); err != nil { if err := t.decisionPublisher.PublishTaskCompletion(taskID, success, summary, filesModified); err != nil {
fmt.Printf("⚠️ Failed to publish task completion for %s: %v\n", taskID, err) t.logger.Warn().
Err(err).
Str("task_id", taskID).
Msg("Failed to publish task completion")
} else { } else {
fmt.Printf("📤 Published task completion decision for: %s\n", taskID) t.logger.Debug().
Str("task_id", taskID).
Msg("Published task completion decision")
} }
} }
@@ -131,52 +147,53 @@ type SharedRuntime struct {
TaskTracker *SimpleTaskTracker TaskTracker *SimpleTaskTracker
Metrics *metrics.CHORUSMetrics Metrics *metrics.CHORUSMetrics
Shhh *shhh.Sentinel Shhh *shhh.Sentinel
CouncilSubscriber *councilnats.CouncilSubscriber
} }
// Initialize sets up all shared P2P infrastructure components // Initialize sets up all shared P2P infrastructure components
func Initialize(appMode string) (*SharedRuntime, error) { func Initialize(appMode string) (*SharedRuntime, error) {
runtime := &SharedRuntime{} runtime := &SharedRuntime{}
runtime.Logger = &SimpleLogger{} runtime.Logger = NewSimpleLogger(logging.ComponentRuntime)
ctx, cancel := context.WithCancel(context.Background()) ctx, cancel := context.WithCancel(context.Background())
runtime.Context = ctx runtime.Context = ctx
runtime.Cancel = cancel runtime.Cancel = cancel
runtime.Logger.Info("🎭 Starting CHORUS v%s (build: %s, %s) - Container-First P2P Task Coordination", AppVersion, AppCommitHash, AppBuildDate) runtime.Logger.Info("Starting CHORUS v%s (build: %s, %s) - Container-First P2P Task Coordination", AppVersion, AppCommitHash, AppBuildDate)
runtime.Logger.Info("📦 Container deployment - Mode: %s", appMode) runtime.Logger.Info("📦 Container deployment - Mode: %s", appMode)
// Load configuration from environment (no config files in containers) // Load configuration from environment (no config files in containers)
runtime.Logger.Info("📋 Loading configuration from environment variables...") runtime.Logger.Info("Loading configuration from environment variables...")
cfg, err := config.LoadFromEnvironment() cfg, err := config.LoadFromEnvironment()
if err != nil { if err != nil {
return nil, fmt.Errorf("configuration error: %v", err) return nil, fmt.Errorf("configuration error: %v", err)
} }
runtime.Config = cfg runtime.Config = cfg
runtime.Logger.Info("Configuration loaded successfully") runtime.Logger.Info("Configuration loaded successfully")
// Initialize runtime configuration with assignment support // Initialize runtime configuration with assignment support
runtime.RuntimeConfig = config.NewRuntimeConfig(cfg) runtime.RuntimeConfig = config.NewRuntimeConfig(cfg)
// Load assignment if ASSIGN_URL is configured // Load assignment if ASSIGN_URL is configured
if assignURL := os.Getenv("ASSIGN_URL"); assignURL != "" { if assignURL := os.Getenv("ASSIGN_URL"); assignURL != "" {
runtime.Logger.Info("📡 Loading assignment from WHOOSH: %s", assignURL) runtime.Logger.Info("Loading assignment from WHOOSH: %s", assignURL)
ctx, cancel := context.WithTimeout(runtime.Context, 10*time.Second) ctx, cancel := context.WithTimeout(runtime.Context, 10*time.Second)
if err := runtime.RuntimeConfig.LoadAssignment(ctx, assignURL); err != nil { if err := runtime.RuntimeConfig.LoadAssignment(ctx, assignURL); err != nil {
runtime.Logger.Warn("⚠️ Failed to load assignment (continuing with base config): %v", err) runtime.Logger.Warn("Failed to load assignment (continuing with base config): %v", err)
} else { } else {
runtime.Logger.Info("Assignment loaded successfully") runtime.Logger.Info("Assignment loaded successfully")
} }
cancel() cancel()
// Start reload handler for SIGHUP // Start reload handler for SIGHUP
runtime.RuntimeConfig.StartReloadHandler(runtime.Context, assignURL) runtime.RuntimeConfig.StartReloadHandler(runtime.Context, assignURL)
runtime.Logger.Info("📡 SIGHUP reload handler started for assignment updates") runtime.Logger.Info("SIGHUP reload handler started for assignment updates")
} else { } else {
runtime.Logger.Info("⚪ No ASSIGN_URL configured, using static configuration") runtime.Logger.Info("⚪ No ASSIGN_URL configured, using static configuration")
} }
runtime.Logger.Info("🤖 Agent ID: %s", cfg.Agent.ID) runtime.Logger.Info("Agent ID: %s", cfg.Agent.ID)
runtime.Logger.Info("🎯 Specialization: %s", cfg.Agent.Specialization) runtime.Logger.Info("🎯 Specialization: %s", cfg.Agent.Specialization)
// CRITICAL: Validate license before any P2P operations // CRITICAL: Validate license before any P2P operations
@@ -185,18 +202,19 @@ func Initialize(appMode string) (*SharedRuntime, error) {
LicenseID: cfg.License.LicenseID, LicenseID: cfg.License.LicenseID,
ClusterID: cfg.License.ClusterID, ClusterID: cfg.License.ClusterID,
KachingURL: cfg.License.KachingURL, KachingURL: cfg.License.KachingURL,
Version: AppVersion,
}) })
if err := licenseValidator.Validate(); err != nil { if err := licenseValidator.Validate(); err != nil {
return nil, fmt.Errorf("license validation failed: %v", err) return nil, fmt.Errorf("license validation failed: %v", err)
} }
runtime.Logger.Info("License validation successful - CHORUS authorized to run") runtime.Logger.Info("License validation successful - CHORUS authorized to run")
// Initialize AI provider configuration // Initialize AI provider configuration
runtime.Logger.Info("🧠 Configuring AI provider: %s", cfg.AI.Provider) runtime.Logger.Info("🧠 Configuring AI provider: %s", cfg.AI.Provider)
if err := initializeAIProvider(cfg, runtime.Logger); err != nil { if err := initializeAIProvider(cfg, runtime.Logger); err != nil {
return nil, fmt.Errorf("AI provider initialization failed: %v", err) return nil, fmt.Errorf("AI provider initialization failed: %v", err)
} }
runtime.Logger.Info("AI provider configured successfully") runtime.Logger.Info("AI provider configured successfully")
// Initialize metrics collector // Initialize metrics collector
runtime.Metrics = metrics.NewCHORUSMetrics(nil) runtime.Metrics = metrics.NewCHORUSMetrics(nil)
@@ -217,11 +235,11 @@ func Initialize(appMode string) (*SharedRuntime, error) {
var backbeatIntegration *backbeat.Integration var backbeatIntegration *backbeat.Integration
backbeatIntegration, err = backbeat.NewIntegration(cfg, cfg.Agent.ID, runtime.Logger) backbeatIntegration, err = backbeat.NewIntegration(cfg, cfg.Agent.ID, runtime.Logger)
if err != nil { if err != nil {
runtime.Logger.Warn("⚠️ BACKBEAT integration initialization failed: %v", err) runtime.Logger.Warn("BACKBEAT integration initialization failed: %v", err)
runtime.Logger.Info("📍 P2P operations will run without beat synchronization") runtime.Logger.Info("📍 P2P operations will run without beat synchronization")
} else { } else {
if err := backbeatIntegration.Start(ctx); err != nil { if err := backbeatIntegration.Start(ctx); err != nil {
runtime.Logger.Warn("⚠️ Failed to start BACKBEAT integration: %v", err) runtime.Logger.Warn("Failed to start BACKBEAT integration: %v", err)
backbeatIntegration = nil backbeatIntegration = nil
} else { } else {
runtime.Logger.Info("🎵 BACKBEAT integration started successfully") runtime.Logger.Info("🎵 BACKBEAT integration started successfully")
@@ -229,6 +247,29 @@ func Initialize(appMode string) (*SharedRuntime, error) {
} }
runtime.BackbeatIntegration = backbeatIntegration runtime.BackbeatIntegration = backbeatIntegration
// Fetch bootstrap peers from WHOOSH for P2P mesh formation
runtime.Logger.Info("Fetching bootstrap peers from WHOOSH...")
bootstrapPeers, err := fetchBootstrapPeers(cfg.WHOOSHAPI.BaseURL, runtime.Logger)
if err != nil {
runtime.Logger.Warn("Failed to fetch bootstrap peers from WHOOSH: %v", err)
runtime.Logger.Info("Falling back to static bootstrap configuration")
bootstrapPeers = getStaticBootstrapPeers(runtime.Logger)
} else {
runtime.Logger.Info("Fetched %d bootstrap peers from WHOOSH", len(bootstrapPeers))
}
// Set bootstrap peers in config for P2P node initialization
if len(bootstrapPeers) > 0 {
cfg.V2.DHT.BootstrapPeers = make([]string, len(bootstrapPeers))
for i, peer := range bootstrapPeers {
for _, addr := range peer.Addrs {
// Convert to full multiaddr with peer ID
cfg.V2.DHT.BootstrapPeers[i] = fmt.Sprintf("%s/p2p/%s", addr.String(), peer.ID.String())
break // Use first address
}
}
}
// Initialize P2P node // Initialize P2P node
node, err := p2p.NewNode(ctx) node, err := p2p.NewNode(ctx)
if err != nil { if err != nil {
@@ -243,6 +284,35 @@ func Initialize(appMode string) (*SharedRuntime, error) {
runtime.Logger.Info(" %s/p2p/%s", addr, node.ID()) runtime.Logger.Info(" %s/p2p/%s", addr, node.ID())
} }
// Wait for bootstrap peers to connect before proceeding
// This prevents election race conditions where elections start before peer discovery
// Increased from 5s to 15s to allow more time for P2P mesh formation
if len(bootstrapPeers) > 0 {
runtime.Logger.Info("Waiting 15 seconds for bootstrap peer connections to establish...")
runtime.Logger.Info(" Target peers: %d bootstrap peers", len(bootstrapPeers))
// Poll connectivity every 3 seconds to provide feedback
for i := 0; i < 5; i++ {
time.Sleep(3 * time.Second)
connectedPeers := len(node.Peers())
runtime.Logger.Info(" [%ds] Connected to %d peers", (i+1)*3, connectedPeers)
// If we've connected to at least half the bootstrap peers, we're in good shape
if connectedPeers >= len(bootstrapPeers)/2 && connectedPeers > 0 {
runtime.Logger.Info("Bootstrap connectivity achieved (%d/%d peers), proceeding early",
connectedPeers, len(bootstrapPeers))
break
}
}
finalConnected := len(node.Peers())
if finalConnected == 0 {
runtime.Logger.Warn("Bootstrap complete but NO peers connected - mesh may be isolated")
} else {
runtime.Logger.Info("Bootstrap grace period complete - %d peers connected", finalConnected)
}
}
// Initialize Hypercore-style logger for P2P coordination // Initialize Hypercore-style logger for P2P coordination
hlog := logging.NewHypercoreLog(node.ID()) hlog := logging.NewHypercoreLog(node.ID())
if runtime.Shhh != nil { if runtime.Shhh != nil {
@@ -269,7 +339,7 @@ func Initialize(appMode string) (*SharedRuntime, error) {
} }
runtime.PubSub = ps runtime.PubSub = ps
runtime.Logger.Info("📡 PubSub system initialized") runtime.Logger.Info("PubSub system initialized")
// Join role-based topics if role is configured // Join role-based topics if role is configured
if cfg.Agent.Role != "" { if cfg.Agent.Role != "" {
@@ -278,7 +348,7 @@ func Initialize(appMode string) (*SharedRuntime, error) {
reportsTo = []string{cfg.Agent.ReportsTo} reportsTo = []string{cfg.Agent.ReportsTo}
} }
if err := ps.JoinRoleBasedTopics(cfg.Agent.Role, cfg.Agent.Expertise, reportsTo); err != nil { if err := ps.JoinRoleBasedTopics(cfg.Agent.Role, cfg.Agent.Expertise, reportsTo); err != nil {
runtime.Logger.Warn("⚠️ Failed to join role-based topics: %v", err) runtime.Logger.Warn("Failed to join role-based topics: %v", err)
} else { } else {
runtime.Logger.Info("🎯 Joined role-based collaboration topics") runtime.Logger.Info("🎯 Joined role-based collaboration topics")
} }
@@ -302,7 +372,7 @@ func Initialize(appMode string) (*SharedRuntime, error) {
// Cleanup properly shuts down all runtime components // Cleanup properly shuts down all runtime components
func (r *SharedRuntime) Cleanup() { func (r *SharedRuntime) Cleanup() {
r.Logger.Info("🔄 Starting graceful shutdown...") r.Logger.Info("Starting graceful shutdown...")
if r.BackbeatIntegration != nil { if r.BackbeatIntegration != nil {
r.BackbeatIntegration.Stop() r.BackbeatIntegration.Stop()
@@ -310,7 +380,7 @@ func (r *SharedRuntime) Cleanup() {
if r.MDNSDiscovery != nil { if r.MDNSDiscovery != nil {
r.MDNSDiscovery.Close() r.MDNSDiscovery.Close()
r.Logger.Info("🔍 mDNS discovery closed") r.Logger.Info("mDNS discovery closed")
} }
if r.PubSub != nil { if r.PubSub != nil {
@@ -329,6 +399,12 @@ func (r *SharedRuntime) Cleanup() {
r.HTTPServer.Stop() r.HTTPServer.Stop()
} }
if r.CouncilSubscriber != nil {
if err := r.CouncilSubscriber.Close(); err != nil {
r.Logger.Warn("Failed to close council NATS subscriber: %v", err)
}
}
if r.UCXIServer != nil { if r.UCXIServer != nil {
r.UCXIServer.Stop() r.UCXIServer.Stop()
} }
@@ -341,7 +417,7 @@ func (r *SharedRuntime) Cleanup() {
r.Cancel() r.Cancel()
} }
r.Logger.Info("CHORUS shutdown completed") r.Logger.Info("CHORUS shutdown completed")
} }
// Helper methods for initialization (extracted from main.go) // Helper methods for initialization (extracted from main.go)
@@ -349,6 +425,15 @@ func (r *SharedRuntime) initializeElectionSystem() error {
// === Admin Election System === // === Admin Election System ===
electionManager := election.NewElectionManager(r.Context, r.Config, r.Node.Host(), r.PubSub, r.Node.ID().ShortString()) electionManager := election.NewElectionManager(r.Context, r.Config, r.Node.Host(), r.PubSub, r.Node.ID().ShortString())
if r.BackbeatIntegration != nil {
electionManager.SetTempoResolver(func() int {
return r.BackbeatIntegration.CurrentTempoBPM()
})
electionManager.SetBeatGapResolver(func() time.Duration {
return r.BackbeatIntegration.TimeSinceLastBeat()
})
}
// Set election callbacks with BACKBEAT integration // Set election callbacks with BACKBEAT integration
electionManager.SetCallbacks( electionManager.SetCallbacks(
func(oldAdmin, newAdmin string) { func(oldAdmin, newAdmin string) {
@@ -372,7 +457,7 @@ func (r *SharedRuntime) initializeElectionSystem() error {
r.Config.Slurp.Enabled = true r.Config.Slurp.Enabled = true
// Apply admin role configuration // Apply admin role configuration
if err := r.Config.ApplyRoleDefinition("admin"); err != nil { if err := r.Config.ApplyRoleDefinition("admin"); err != nil {
r.Logger.Warn("⚠️ Failed to apply admin role: %v", err) r.Logger.Warn("Failed to apply admin role: %v", err)
} }
} }
}, },
@@ -396,7 +481,7 @@ func (r *SharedRuntime) initializeElectionSystem() error {
return fmt.Errorf("failed to start election manager: %v", err) return fmt.Errorf("failed to start election manager: %v", err)
} }
r.ElectionManager = electionManager r.ElectionManager = electionManager
r.Logger.Info("Election manager started with automated heartbeat management") r.Logger.Info("Election manager started with automated heartbeat management")
return nil return nil
} }
@@ -412,7 +497,7 @@ func (r *SharedRuntime) initializeDHTStorage() error {
var err error var err error
dhtNode, err = dht.NewLibP2PDHT(r.Context, r.Node.Host()) dhtNode, err = dht.NewLibP2PDHT(r.Context, r.Node.Host())
if err != nil { if err != nil {
r.Logger.Warn("⚠️ Failed to create DHT: %v", err) r.Logger.Warn("Failed to create DHT: %v", err)
} else { } else {
r.Logger.Info("🕸️ DHT initialized") r.Logger.Info("🕸️ DHT initialized")
@@ -424,14 +509,14 @@ func (r *SharedRuntime) initializeDHTStorage() error {
} }
if err := dhtNode.Bootstrap(); err != nil { if err := dhtNode.Bootstrap(); err != nil {
r.Logger.Warn("⚠️ DHT bootstrap failed: %v", err) r.Logger.Warn("DHT bootstrap failed: %v", err)
r.BackbeatIntegration.FailP2POperation(operationID, err.Error()) r.BackbeatIntegration.FailP2POperation(operationID, err.Error())
} else { } else {
r.BackbeatIntegration.CompleteP2POperation(operationID, 1) r.BackbeatIntegration.CompleteP2POperation(operationID, 1)
} }
} else { } else {
if err := dhtNode.Bootstrap(); err != nil { if err := dhtNode.Bootstrap(); err != nil {
r.Logger.Warn("⚠️ DHT bootstrap failed: %v", err) r.Logger.Warn("DHT bootstrap failed: %v", err)
} }
} }
@@ -451,14 +536,14 @@ func (r *SharedRuntime) initializeDHTStorage() error {
for _, addrStr := range bootstrapPeers { for _, addrStr := range bootstrapPeers {
addr, err := multiaddr.NewMultiaddr(addrStr) addr, err := multiaddr.NewMultiaddr(addrStr)
if err != nil { if err != nil {
r.Logger.Warn("⚠️ Invalid bootstrap address %s: %v", addrStr, err) r.Logger.Warn("Invalid bootstrap address %s: %v", addrStr, err)
continue continue
} }
// Extract peer info from multiaddr // Extract peer info from multiaddr
info, err := peer.AddrInfoFromP2pAddr(addr) info, err := peer.AddrInfoFromP2pAddr(addr)
if err != nil { if err != nil {
r.Logger.Warn("⚠️ Failed to parse peer info from %s: %v", addrStr, err) r.Logger.Warn("Failed to parse peer info from %s: %v", addrStr, err)
continue continue
} }
@@ -471,7 +556,7 @@ func (r *SharedRuntime) initializeDHTStorage() error {
r.BackbeatIntegration.UpdateP2POperationPhase(operationID, backbeat.PhaseConnecting, 0) r.BackbeatIntegration.UpdateP2POperationPhase(operationID, backbeat.PhaseConnecting, 0)
if err := r.Node.Host().Connect(r.Context, *info); err != nil { if err := r.Node.Host().Connect(r.Context, *info); err != nil {
r.Logger.Warn("⚠️ Failed to connect to bootstrap peer %s: %v", addrStr, err) r.Logger.Warn("Failed to connect to bootstrap peer %s: %v", addrStr, err)
r.BackbeatIntegration.FailP2POperation(operationID, err.Error()) r.BackbeatIntegration.FailP2POperation(operationID, err.Error())
} else { } else {
r.Logger.Info("🔗 Connected to DHT bootstrap peer: %s", addrStr) r.Logger.Info("🔗 Connected to DHT bootstrap peer: %s", addrStr)
@@ -480,7 +565,7 @@ func (r *SharedRuntime) initializeDHTStorage() error {
} }
} else { } else {
if err := r.Node.Host().Connect(r.Context, *info); err != nil { if err := r.Node.Host().Connect(r.Context, *info); err != nil {
r.Logger.Warn("⚠️ Failed to connect to bootstrap peer %s: %v", addrStr, err) r.Logger.Warn("Failed to connect to bootstrap peer %s: %v", addrStr, err)
} else { } else {
r.Logger.Info("🔗 Connected to DHT bootstrap peer: %s", addrStr) r.Logger.Info("🔗 Connected to DHT bootstrap peer: %s", addrStr)
} }
@@ -508,7 +593,7 @@ func (r *SharedRuntime) initializeDHTStorage() error {
r.Node.ID().ShortString(), r.Node.ID().ShortString(),
r.Config.Agent.ID, r.Config.Agent.ID,
) )
r.Logger.Info("📤 Decision publisher initialized") r.Logger.Info("Decision publisher initialized")
} }
} else { } else {
r.Logger.Info("⚪ DHT disabled in configuration") r.Logger.Info("⚪ DHT disabled in configuration")
@@ -526,12 +611,13 @@ func (r *SharedRuntime) initializeServices() error {
taskTracker := &SimpleTaskTracker{ taskTracker := &SimpleTaskTracker{
maxTasks: r.Config.Agent.MaxTasks, maxTasks: r.Config.Agent.MaxTasks,
activeTasks: make(map[string]bool), activeTasks: make(map[string]bool),
logger: logging.ForComponent(logging.ComponentRuntime),
} }
// Connect decision publisher to task tracker if available // Connect decision publisher to task tracker if available
if r.DecisionPublisher != nil { if r.DecisionPublisher != nil {
taskTracker.decisionPublisher = r.DecisionPublisher taskTracker.decisionPublisher = r.DecisionPublisher
r.Logger.Info("📤 Task completion decisions will be published to DHT") r.Logger.Info("Task completion decisions will be published to DHT")
} }
r.TaskTracker = taskTracker r.TaskTracker = taskTracker
@@ -548,18 +634,34 @@ func (r *SharedRuntime) initializeServices() error {
taskCoordinator.Start() taskCoordinator.Start()
r.TaskCoordinator = taskCoordinator r.TaskCoordinator = taskCoordinator
r.Logger.Info("Task coordination system active") r.Logger.Info("Task coordination system active")
// Start HTTP API server // Start HTTP API server
httpServer := api.NewHTTPServer(r.Config.Network.APIPort, r.HypercoreLog, r.PubSub) httpServer := api.NewHTTPServer(r.Config, r.Node, r.HypercoreLog, r.PubSub)
go func() { go func() {
r.Logger.Info("🌐 HTTP API server starting on :%d", r.Config.Network.APIPort) r.Logger.Info("HTTP API server starting on :%d", r.Config.Network.APIPort)
if err := httpServer.Start(); err != nil && err != http.ErrServerClosed { if err := httpServer.Start(); err != nil && err != http.ErrServerClosed {
r.Logger.Error("HTTP server error: %v", err) r.Logger.Error("HTTP server error: %v", err)
} }
}() }()
r.HTTPServer = httpServer r.HTTPServer = httpServer
// Enable NATS-based council opportunity delivery.
natsURL := strings.TrimSpace(os.Getenv("CHORUS_COUNCIL_NATS_URL"))
if natsURL == "" {
natsURL = strings.TrimSpace(os.Getenv("CHORUS_BACKBEAT_NATS_URL"))
}
if natsURL == "" {
natsURL = "nats://backbeat-nats:4222"
}
if subscriber, err := councilnats.NewCouncilSubscriber(natsURL, httpServer.CouncilManager, httpServer.WhooshEndpoint()); err != nil {
r.Logger.Warn("Council NATS subscriber disabled: %v", err)
} else {
r.CouncilSubscriber = subscriber
r.Logger.Info("Council opportunities via NATS enabled (url=%s)", natsURL)
}
// === UCXI Server Integration === // === UCXI Server Integration ===
var ucxiServer *ucxi.Server var ucxiServer *ucxi.Server
if r.Config.UCXL.Enabled && r.Config.UCXL.Server.Enabled { if r.Config.UCXL.Enabled && r.Config.UCXL.Server.Enabled {
@@ -570,7 +672,7 @@ func (r *SharedRuntime) initializeServices() error {
storage, err := ucxi.NewBasicContentStorage(storageDir) storage, err := ucxi.NewBasicContentStorage(storageDir)
if err != nil { if err != nil {
r.Logger.Warn("⚠️ Failed to create UCXI storage: %v", err) r.Logger.Warn("Failed to create UCXI storage: %v", err)
} else { } else {
resolver := ucxi.NewBasicAddressResolver(r.Node.ID().ShortString()) resolver := ucxi.NewBasicAddressResolver(r.Node.ID().ShortString())
resolver.SetDefaultTTL(r.Config.UCXL.Resolution.CacheTTL) resolver.SetDefaultTTL(r.Config.UCXL.Resolution.CacheTTL)
@@ -580,14 +682,14 @@ func (r *SharedRuntime) initializeServices() error {
BasePath: r.Config.UCXL.Server.BasePath, BasePath: r.Config.UCXL.Server.BasePath,
Resolver: resolver, Resolver: resolver,
Storage: storage, Storage: storage,
Logger: ucxi.SimpleLogger{}, Logger: ucxi.NewSimpleLogger(logging.ComponentUCXI),
} }
ucxiServer = ucxi.NewServer(ucxiConfig) ucxiServer = ucxi.NewServer(ucxiConfig)
go func() { go func() {
r.Logger.Info("🔗 UCXI server starting on :%d", r.Config.UCXL.Server.Port) r.Logger.Info("🔗 UCXI server starting on :%d", r.Config.UCXL.Server.Port)
if err := ucxiServer.Start(); err != nil && err != http.ErrServerClosed { if err := ucxiServer.Start(); err != nil && err != http.ErrServerClosed {
r.Logger.Error("UCXI server error: %v", err) r.Logger.Error("UCXI server error: %v", err)
} }
}() }()
} }
@@ -637,7 +739,7 @@ func initializeAIProvider(cfg *config.Config, logger *SimpleLogger) error {
Timeout: cfg.AI.ResetData.Timeout, Timeout: cfg.AI.ResetData.Timeout,
} }
reasoning.SetResetDataConfig(resetdataConfig) reasoning.SetResetDataConfig(resetdataConfig)
logger.Info("🌐 ResetData AI provider configured - Endpoint: %s, Model: %s", logger.Info("ResetData AI provider configured - Endpoint: %s, Model: %s",
cfg.AI.ResetData.BaseURL, cfg.AI.ResetData.Model) cfg.AI.ResetData.BaseURL, cfg.AI.ResetData.Model)
case "ollama": case "ollama":
@@ -645,7 +747,7 @@ func initializeAIProvider(cfg *config.Config, logger *SimpleLogger) error {
logger.Info("🦙 Ollama AI provider configured - Endpoint: %s", cfg.AI.Ollama.Endpoint) logger.Info("🦙 Ollama AI provider configured - Endpoint: %s", cfg.AI.Ollama.Endpoint)
default: default:
logger.Warn("⚠️ Unknown AI provider '%s', defaulting to resetdata", cfg.AI.Provider) logger.Warn("Unknown AI provider '%s', defaulting to resetdata", cfg.AI.Provider)
if cfg.AI.ResetData.APIKey == "" { if cfg.AI.ResetData.APIKey == "" {
return fmt.Errorf("RESETDATA_API_KEY environment variable is required for default resetdata provider") return fmt.Errorf("RESETDATA_API_KEY environment variable is required for default resetdata provider")
} }
@@ -700,9 +802,95 @@ func initializeAIProvider(cfg *config.Config, logger *SimpleLogger) error {
logger.Info("📚 LightRAG RAG system enabled - Endpoint: %s, Mode: %s", logger.Info("📚 LightRAG RAG system enabled - Endpoint: %s, Mode: %s",
cfg.LightRAG.BaseURL, cfg.LightRAG.DefaultMode) cfg.LightRAG.BaseURL, cfg.LightRAG.DefaultMode)
} else { } else {
logger.Warn("⚠️ LightRAG enabled but server not healthy at %s", cfg.LightRAG.BaseURL) logger.Warn("LightRAG enabled but server not healthy at %s", cfg.LightRAG.BaseURL)
} }
} }
return nil return nil
} }
// fetchBootstrapPeers fetches bootstrap peer list from WHOOSH
func fetchBootstrapPeers(whooshURL string, logger *SimpleLogger) ([]peer.AddrInfo, error) {
client := &http.Client{Timeout: 10 * time.Second}
url := fmt.Sprintf("%s/api/v1/bootstrap-peers", whooshURL)
logger.Info("Fetching bootstrap peers from: %s", url)
resp, err := client.Get(url)
if err != nil {
return nil, fmt.Errorf("failed to fetch bootstrap peers: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("bootstrap endpoint returned status %d", resp.StatusCode)
}
var result struct {
BootstrapPeers []struct {
Multiaddr string `json:"multiaddr"`
PeerID string `json:"peer_id"`
Name string `json:"name"`
Priority int `json:"priority"`
} `json:"bootstrap_peers"`
}
if err := json.NewDecoder(resp.Body).Decode(&result); err != nil {
return nil, fmt.Errorf("failed to decode bootstrap peers: %w", err)
}
// Convert to peer.AddrInfo format
peers := make([]peer.AddrInfo, 0, len(result.BootstrapPeers))
for _, bp := range result.BootstrapPeers {
maddr, err := multiaddr.NewMultiaddr(bp.Multiaddr)
if err != nil {
logger.Warn("Invalid multiaddr %s: %v", bp.Multiaddr, err)
continue
}
peerID, err := peer.Decode(bp.PeerID)
if err != nil {
logger.Warn("Invalid peer ID %s: %v", bp.PeerID, err)
continue
}
peers = append(peers, peer.AddrInfo{
ID: peerID,
Addrs: []multiaddr.Multiaddr{maddr},
})
logger.Info(" Bootstrap peer: %s (%s, priority %d)", bp.Name, bp.PeerID, bp.Priority)
}
return peers, nil
}
// getStaticBootstrapPeers returns a static fallback list of bootstrap peers
func getStaticBootstrapPeers(logger *SimpleLogger) []peer.AddrInfo {
logger.Warn("Using static bootstrap peer configuration (fallback)")
// Static HMMM monitor peer (if WHOOSH is unavailable)
staticPeers := []string{
"/ip4/172.27.0.6/tcp/9001/p2p/12D3KooWBhVfNETuGyjsrGwmhny7vnJzP1y7H59oqmq1VAPTzQMW",
}
peers := make([]peer.AddrInfo, 0, len(staticPeers))
for _, peerStr := range staticPeers {
maddr, err := multiaddr.NewMultiaddr(peerStr)
if err != nil {
logger.Warn("Invalid static multiaddr %s: %v", peerStr, err)
continue
}
addrInfo, err := peer.AddrInfoFromP2pAddr(maddr)
if err != nil {
logger.Warn("Failed to parse static peer address %s: %v", peerStr, err)
continue
}
peers = append(peers, *addrInfo)
logger.Info(" 📌 Static bootstrap peer: %s", addrInfo.ID.ShortString())
}
return peers
}

View File

@@ -0,0 +1,126 @@
package ageio
import (
"bytes"
"fmt"
"io"
"os"
"filippo.io/age"
)
// Encryptor handles age encryption operations
type Encryptor struct {
recipients []age.Recipient
}
// Decryptor handles age decryption operations
type Decryptor struct {
identities []age.Identity
}
// NewEncryptor creates an encryptor from a recipients file
func NewEncryptor(recipientsPath string) (*Encryptor, error) {
if recipientsPath == "" {
return nil, fmt.Errorf("recipients path is empty")
}
data, err := os.ReadFile(recipientsPath)
if err != nil {
return nil, fmt.Errorf("read recipients file: %w", err)
}
recipients, err := age.ParseRecipients(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("parse recipients: %w", err)
}
if len(recipients) == 0 {
return nil, fmt.Errorf("no recipients found in file")
}
return &Encryptor{recipients: recipients}, nil
}
// NewDecryptor creates a decryptor from an identity file
func NewDecryptor(identityPath string) (*Decryptor, error) {
if identityPath == "" {
return nil, fmt.Errorf("identity path is empty")
}
data, err := os.ReadFile(identityPath)
if err != nil {
return nil, fmt.Errorf("read identity file: %w", err)
}
identities, err := age.ParseIdentities(bytes.NewReader(data))
if err != nil {
return nil, fmt.Errorf("parse identities: %w", err)
}
if len(identities) == 0 {
return nil, fmt.Errorf("no identities found in file")
}
return &Decryptor{identities: identities}, nil
}
// Encrypt encrypts plaintext data with age
func (e *Encryptor) Encrypt(plaintext []byte) ([]byte, error) {
if len(plaintext) == 0 {
return nil, fmt.Errorf("plaintext is empty")
}
var buf bytes.Buffer
w, err := age.Encrypt(&buf, e.recipients...)
if err != nil {
return nil, fmt.Errorf("create encryptor: %w", err)
}
if _, err := w.Write(plaintext); err != nil {
return nil, fmt.Errorf("write plaintext: %w", err)
}
if err := w.Close(); err != nil {
return nil, fmt.Errorf("close encryptor: %w", err)
}
return buf.Bytes(), nil
}
// Decrypt decrypts age-encrypted data
func (d *Decryptor) Decrypt(ciphertext []byte) ([]byte, error) {
if len(ciphertext) == 0 {
return nil, fmt.Errorf("ciphertext is empty")
}
r, err := age.Decrypt(bytes.NewReader(ciphertext), d.identities...)
if err != nil {
return nil, fmt.Errorf("create decryptor: %w", err)
}
plaintext, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("read plaintext: %w", err)
}
return plaintext, nil
}
// EncryptStream creates an encrypted writer for streaming
func (e *Encryptor) EncryptStream(w io.Writer) (io.WriteCloser, error) {
ew, err := age.Encrypt(w, e.recipients...)
if err != nil {
return nil, fmt.Errorf("create stream encryptor: %w", err)
}
return ew, nil
}
// DecryptStream creates a decrypted reader for streaming
func (d *Decryptor) DecryptStream(r io.Reader) (io.Reader, error) {
dr, err := age.Decrypt(r, d.identities...)
if err != nil {
return nil, fmt.Errorf("create stream decryptor: %w", err)
}
return dr, nil
}

View File

@@ -0,0 +1,291 @@
package ageio
import (
"bytes"
"os"
"path/filepath"
"testing"
"filippo.io/age"
)
func TestEncryptDecryptRoundTrip(t *testing.T) {
// Generate test key pair
tmpDir := t.TempDir()
identityPath, recipientPath, err := GenerateTestKeyPair(tmpDir)
if err != nil {
t.Fatalf("generate test key pair: %v", err)
}
// Create encryptor and decryptor
enc, err := NewEncryptor(recipientPath)
if err != nil {
t.Fatalf("create encryptor: %v", err)
}
dec, err := NewDecryptor(identityPath)
if err != nil {
t.Fatalf("create decryptor: %v", err)
}
// Test data
testCases := []struct {
name string
plaintext []byte
}{
{
name: "simple text",
plaintext: []byte("hello world"),
},
{
name: "json data",
plaintext: []byte(`{"tool":"sequentialthinking","payload":{"thought":"test"}}`),
},
{
name: "large data",
plaintext: bytes.Repeat([]byte("ABCDEFGHIJ"), 1000), // 10KB
},
{
name: "unicode",
plaintext: []byte("Hello 世界 🌍"),
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
// Encrypt
ciphertext, err := enc.Encrypt(tc.plaintext)
if err != nil {
t.Fatalf("encrypt: %v", err)
}
// Verify ciphertext is not empty and different from plaintext
if len(ciphertext) == 0 {
t.Fatal("ciphertext is empty")
}
if bytes.Equal(ciphertext, tc.plaintext) {
t.Fatal("ciphertext equals plaintext (not encrypted)")
}
// Decrypt
decrypted, err := dec.Decrypt(ciphertext)
if err != nil {
t.Fatalf("decrypt: %v", err)
}
// Verify decrypted matches original
if !bytes.Equal(decrypted, tc.plaintext) {
t.Fatalf("decrypted data doesn't match original\ngot: %q\nwant: %q", decrypted, tc.plaintext)
}
})
}
}
func TestEncryptEmptyData(t *testing.T) {
tmpDir := t.TempDir()
_, recipientPath, err := GenerateTestKeyPair(tmpDir)
if err != nil {
t.Fatalf("generate test key pair: %v", err)
}
enc, err := NewEncryptor(recipientPath)
if err != nil {
t.Fatalf("create encryptor: %v", err)
}
_, err = enc.Encrypt([]byte{})
if err == nil {
t.Fatal("expected error encrypting empty data")
}
}
func TestDecryptEmptyData(t *testing.T) {
tmpDir := t.TempDir()
identityPath, _, err := GenerateTestKeyPair(tmpDir)
if err != nil {
t.Fatalf("generate test key pair: %v", err)
}
dec, err := NewDecryptor(identityPath)
if err != nil {
t.Fatalf("create decryptor: %v", err)
}
_, err = dec.Decrypt([]byte{})
if err == nil {
t.Fatal("expected error decrypting empty data")
}
}
func TestDecryptInvalidCiphertext(t *testing.T) {
tmpDir := t.TempDir()
identityPath, _, err := GenerateTestKeyPair(tmpDir)
if err != nil {
t.Fatalf("generate test key pair: %v", err)
}
dec, err := NewDecryptor(identityPath)
if err != nil {
t.Fatalf("create decryptor: %v", err)
}
// Try to decrypt garbage data
_, err = dec.Decrypt([]byte("not a valid age ciphertext"))
if err == nil {
t.Fatal("expected error decrypting invalid ciphertext")
}
}
func TestDecryptWrongKey(t *testing.T) {
tmpDir := t.TempDir()
// Generate two separate key pairs
identity1Path := filepath.Join(tmpDir, "key1.age")
recipient1Path := filepath.Join(tmpDir, "key1.pub")
identity2Path := filepath.Join(tmpDir, "key2.age")
// Create first key pair
id1, err := age.GenerateX25519Identity()
if err != nil {
t.Fatalf("generate key 1: %v", err)
}
os.WriteFile(identity1Path, []byte(id1.String()+"\n"), 0600)
os.WriteFile(recipient1Path, []byte(id1.Recipient().String()+"\n"), 0644)
// Create second key pair
id2, err := age.GenerateX25519Identity()
if err != nil {
t.Fatalf("generate key 2: %v", err)
}
os.WriteFile(identity2Path, []byte(id2.String()+"\n"), 0600)
// Encrypt with key 1
enc, err := NewEncryptor(recipient1Path)
if err != nil {
t.Fatalf("create encryptor: %v", err)
}
ciphertext, err := enc.Encrypt([]byte("secret message"))
if err != nil {
t.Fatalf("encrypt: %v", err)
}
// Try to decrypt with key 2 (should fail)
dec, err := NewDecryptor(identity2Path)
if err != nil {
t.Fatalf("create decryptor: %v", err)
}
_, err = dec.Decrypt(ciphertext)
if err == nil {
t.Fatal("expected error decrypting with wrong key")
}
}
func TestNewEncryptorInvalidPath(t *testing.T) {
_, err := NewEncryptor("/nonexistent/path/to/recipients")
if err == nil {
t.Fatal("expected error with nonexistent recipients file")
}
}
func TestNewDecryptorInvalidPath(t *testing.T) {
_, err := NewDecryptor("/nonexistent/path/to/identity")
if err == nil {
t.Fatal("expected error with nonexistent identity file")
}
}
func TestNewEncryptorEmptyPath(t *testing.T) {
_, err := NewEncryptor("")
if err == nil {
t.Fatal("expected error with empty recipients path")
}
}
func TestNewDecryptorEmptyPath(t *testing.T) {
_, err := NewDecryptor("")
if err == nil {
t.Fatal("expected error with empty identity path")
}
}
func TestStreamingEncryptDecrypt(t *testing.T) {
// Generate test key pair
tmpDir := t.TempDir()
identityPath, recipientPath, err := GenerateTestKeyPair(tmpDir)
if err != nil {
t.Fatalf("generate test key pair: %v", err)
}
// Create encryptor and decryptor
enc, err := NewEncryptor(recipientPath)
if err != nil {
t.Fatalf("create encryptor: %v", err)
}
dec, err := NewDecryptor(identityPath)
if err != nil {
t.Fatalf("create decryptor: %v", err)
}
// Test streaming encryption
plaintext := []byte("streaming test data")
var ciphertextBuf bytes.Buffer
encWriter, err := enc.EncryptStream(&ciphertextBuf)
if err != nil {
t.Fatalf("create encrypt stream: %v", err)
}
if _, err := encWriter.Write(plaintext); err != nil {
t.Fatalf("write to encrypt stream: %v", err)
}
if err := encWriter.Close(); err != nil {
t.Fatalf("close encrypt stream: %v", err)
}
// Test streaming decryption
decReader, err := dec.DecryptStream(&ciphertextBuf)
if err != nil {
t.Fatalf("create decrypt stream: %v", err)
}
decrypted := make([]byte, len(plaintext))
n, err := decReader.Read(decrypted)
if err != nil {
t.Fatalf("read from decrypt stream: %v", err)
}
if !bytes.Equal(decrypted[:n], plaintext) {
t.Fatalf("decrypted data doesn't match original\ngot: %q\nwant: %q", decrypted[:n], plaintext)
}
}
func TestConvenienceFunctions(t *testing.T) {
// Generate test keys in memory
identity, recipient, err := GenerateTestKeys()
if err != nil {
t.Fatalf("generate test keys: %v", err)
}
plaintext := []byte("test message")
// Encrypt with convenience function
ciphertext, err := EncryptBytes(plaintext, recipient)
if err != nil {
t.Fatalf("encrypt bytes: %v", err)
}
// Decrypt with convenience function
decrypted, err := DecryptBytes(ciphertext, identity)
if err != nil {
t.Fatalf("decrypt bytes: %v", err)
}
if !bytes.Equal(decrypted, plaintext) {
t.Fatalf("decrypted data doesn't match original\ngot: %q\nwant: %q", decrypted, plaintext)
}
}

View File

@@ -0,0 +1,354 @@
package ageio
import (
"bytes"
"os"
"path/filepath"
"testing"
)
// TestGoldenEncryptionRoundTrip validates encryption/decryption with golden test data
func TestGoldenEncryptionRoundTrip(t *testing.T) {
// Generate test key pair once
tmpDir := t.TempDir()
identityPath, recipientPath, err := GenerateTestKeyPair(tmpDir)
if err != nil {
t.Fatalf("generate test key pair: %v", err)
}
// Create encryptor and decryptor
enc, err := NewEncryptor(recipientPath)
if err != nil {
t.Fatalf("create encryptor: %v", err)
}
dec, err := NewDecryptor(identityPath)
if err != nil {
t.Fatalf("create decryptor: %v", err)
}
// Golden test cases representing real MCP payloads
goldenTests := []struct {
name string
payload []byte
description string
}{
{
name: "sequential_thinking_request",
payload: []byte(`{
"tool": "mcp__sequential-thinking__sequentialthinking",
"payload": {
"thought": "First, I need to analyze the problem by breaking it down into smaller components.",
"thoughtNumber": 1,
"totalThoughts": 5,
"nextThoughtNeeded": true,
"isRevision": false
}
}`),
description: "Initial sequential thinking request",
},
{
name: "sequential_thinking_revision",
payload: []byte(`{
"tool": "mcp__sequential-thinking__sequentialthinking",
"payload": {
"thought": "Wait, I need to revise my previous thought - I missed considering edge cases.",
"thoughtNumber": 3,
"totalThoughts": 6,
"nextThoughtNeeded": true,
"isRevision": true,
"revisesThought": 2
}
}`),
description: "Revision of previous thought",
},
{
name: "sequential_thinking_branching",
payload: []byte(`{
"tool": "mcp__sequential-thinking__sequentialthinking",
"payload": {
"thought": "Let me explore an alternative approach using event sourcing instead.",
"thoughtNumber": 4,
"totalThoughts": 8,
"nextThoughtNeeded": true,
"branchFromThought": 2,
"branchId": "alternative-approach-1"
}
}`),
description: "Branching to explore alternative",
},
{
name: "sequential_thinking_final",
payload: []byte(`{
"tool": "mcp__sequential-thinking__sequentialthinking",
"payload": {
"thought": "Based on all previous analysis, I recommend implementing the event sourcing pattern with CQRS for optimal scalability.",
"thoughtNumber": 8,
"totalThoughts": 8,
"nextThoughtNeeded": false,
"confidence": 0.85
}
}`),
description: "Final thought with conclusion",
},
{
name: "large_context_payload",
payload: bytes.Repeat([]byte(`{"key": "value", "data": "ABCDEFGHIJ"}`), 100),
description: "Large payload testing encryption of substantial data",
},
{
name: "unicode_payload",
payload: []byte(`{
"tool": "mcp__sequential-thinking__sequentialthinking",
"payload": {
"thought": "分析日本語でのデータ処理 🌸🎌 and mixed language content: 你好世界",
"thoughtNumber": 1,
"totalThoughts": 1,
"nextThoughtNeeded": false
}
}`),
description: "Unicode and emoji content",
},
{
name: "special_characters",
payload: []byte(`{
"tool": "test",
"payload": {
"special": "Testing: \n\t\r\b\"'\\\/\u0000\u001f",
"symbols": "!@#$%^&*()_+-=[]{}|;:,.<>?~"
}
}`),
description: "Special characters and escape sequences",
},
}
for _, gt := range goldenTests {
t.Run(gt.name, func(t *testing.T) {
t.Logf("Testing: %s", gt.description)
t.Logf("Original size: %d bytes", len(gt.payload))
// Encrypt
ciphertext, err := enc.Encrypt(gt.payload)
if err != nil {
t.Fatalf("encrypt failed: %v", err)
}
t.Logf("Encrypted size: %d bytes (%.1f%% overhead)",
len(ciphertext),
float64(len(ciphertext)-len(gt.payload))/float64(len(gt.payload))*100)
// Verify ciphertext is different from plaintext
if bytes.Equal(ciphertext, gt.payload) {
t.Fatal("ciphertext equals plaintext - encryption failed")
}
// Verify ciphertext doesn't contain plaintext patterns
// (basic sanity check - not cryptographically rigorous)
if bytes.Contains(ciphertext, []byte("mcp__sequential-thinking")) {
t.Error("ciphertext contains plaintext patterns - weak encryption")
}
// Decrypt
decrypted, err := dec.Decrypt(ciphertext)
if err != nil {
t.Fatalf("decrypt failed: %v", err)
}
// Verify perfect round-trip
if !bytes.Equal(decrypted, gt.payload) {
t.Errorf("decrypted data doesn't match original\nOriginal: %s\nDecrypted: %s",
string(gt.payload), string(decrypted))
}
// Optional: Save golden files for inspection
if os.Getenv("SAVE_GOLDEN") == "1" {
goldenDir := filepath.Join(tmpDir, "golden")
os.MkdirAll(goldenDir, 0755)
plainPath := filepath.Join(goldenDir, gt.name+".plain.json")
encPath := filepath.Join(goldenDir, gt.name+".encrypted.age")
os.WriteFile(plainPath, gt.payload, 0644)
os.WriteFile(encPath, ciphertext, 0644)
t.Logf("Golden files saved to: %s", goldenDir)
}
})
}
}
// TestGoldenDecryptionFailures validates proper error handling
func TestGoldenDecryptionFailures(t *testing.T) {
tmpDir := t.TempDir()
identityPath, recipientPath, err := GenerateTestKeyPair(tmpDir)
if err != nil {
t.Fatalf("generate test key pair: %v", err)
}
dec, err := NewDecryptor(identityPath)
if err != nil {
t.Fatalf("create decryptor: %v", err)
}
enc, err := NewEncryptor(recipientPath)
if err != nil {
t.Fatalf("create encryptor: %v", err)
}
failureTests := []struct {
name string
ciphertext []byte
expectError string
}{
{
name: "empty_ciphertext",
ciphertext: []byte{},
expectError: "ciphertext is empty",
},
{
name: "invalid_age_format",
ciphertext: []byte("not a valid age ciphertext"),
expectError: "create decryptor",
},
{
name: "corrupted_header",
ciphertext: []byte("-----BEGIN AGE ENCRYPTED FILE-----\ngarbage\n-----END AGE ENCRYPTED FILE-----"),
expectError: "create decryptor",
},
}
for _, ft := range failureTests {
t.Run(ft.name, func(t *testing.T) {
_, err := dec.Decrypt(ft.ciphertext)
if err == nil {
t.Fatal("expected error but got none")
}
// Just verify we got an error - specific error messages may vary
t.Logf("Got expected error: %v", err)
})
}
// Test truncated ciphertext
t.Run("truncated_ciphertext", func(t *testing.T) {
// Create valid ciphertext
validPlaintext := []byte("test message")
validCiphertext, err := enc.Encrypt(validPlaintext)
if err != nil {
t.Fatalf("encrypt: %v", err)
}
// Truncate it
truncated := validCiphertext[:len(validCiphertext)/2]
// Try to decrypt
_, err = dec.Decrypt(truncated)
if err == nil {
t.Fatal("expected error decrypting truncated ciphertext")
}
t.Logf("Got expected error for truncated ciphertext: %v", err)
})
// Test modified ciphertext
t.Run("modified_ciphertext", func(t *testing.T) {
// Create valid ciphertext
validPlaintext := []byte("test message")
validCiphertext, err := enc.Encrypt(validPlaintext)
if err != nil {
t.Fatalf("encrypt: %v", err)
}
// Flip a bit in the middle
modified := make([]byte, len(validCiphertext))
copy(modified, validCiphertext)
modified[len(modified)/2] ^= 0x01
// Try to decrypt
_, err = dec.Decrypt(modified)
if err == nil {
t.Fatal("expected error decrypting modified ciphertext")
}
t.Logf("Got expected error for modified ciphertext: %v", err)
})
}
// BenchmarkEncryption benchmarks encryption performance
func BenchmarkEncryption(b *testing.B) {
tmpDir := b.TempDir()
_, recipientPath, err := GenerateTestKeyPair(tmpDir)
if err != nil {
b.Fatalf("generate test key pair: %v", err)
}
enc, err := NewEncryptor(recipientPath)
if err != nil {
b.Fatalf("create encryptor: %v", err)
}
payloads := map[string][]byte{
"small_1KB": bytes.Repeat([]byte("A"), 1024),
"medium_10KB": bytes.Repeat([]byte("A"), 10*1024),
"large_100KB": bytes.Repeat([]byte("A"), 100*1024),
}
for name, payload := range payloads {
b.Run(name, func(b *testing.B) {
b.SetBytes(int64(len(payload)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := enc.Encrypt(payload)
if err != nil {
b.Fatalf("encrypt: %v", err)
}
}
})
}
}
// BenchmarkDecryption benchmarks decryption performance
func BenchmarkDecryption(b *testing.B) {
tmpDir := b.TempDir()
identityPath, recipientPath, err := GenerateTestKeyPair(tmpDir)
if err != nil {
b.Fatalf("generate test key pair: %v", err)
}
enc, err := NewEncryptor(recipientPath)
if err != nil {
b.Fatalf("create encryptor: %v", err)
}
dec, err := NewDecryptor(identityPath)
if err != nil {
b.Fatalf("create decryptor: %v", err)
}
payloads := map[string][]byte{
"small_1KB": bytes.Repeat([]byte("A"), 1024),
"medium_10KB": bytes.Repeat([]byte("A"), 10*1024),
"large_100KB": bytes.Repeat([]byte("A"), 100*1024),
}
for name, payload := range payloads {
// Pre-encrypt
ciphertext, err := enc.Encrypt(payload)
if err != nil {
b.Fatalf("encrypt: %v", err)
}
b.Run(name, func(b *testing.B) {
b.SetBytes(int64(len(payload)))
b.ResetTimer()
for i := 0; i < b.N; i++ {
_, err := dec.Decrypt(ciphertext)
if err != nil {
b.Fatalf("decrypt: %v", err)
}
}
})
}
}

View File

@@ -0,0 +1,88 @@
package ageio
import (
"bytes"
"fmt"
"io"
"os"
"path/filepath"
"filippo.io/age"
)
// GenerateTestKeyPair generates a test age key pair and returns paths
func GenerateTestKeyPair(dir string) (identityPath, recipientPath string, err error) {
// Generate identity
identity, err := age.GenerateX25519Identity()
if err != nil {
return "", "", fmt.Errorf("generate identity: %w", err)
}
// Create identity file
identityPath = filepath.Join(dir, "age.key")
if err := os.WriteFile(identityPath, []byte(identity.String()+"\n"), 0600); err != nil {
return "", "", fmt.Errorf("write identity file: %w", err)
}
// Create recipient file
recipientPath = filepath.Join(dir, "age.pub")
recipient := identity.Recipient().String()
if err := os.WriteFile(recipientPath, []byte(recipient+"\n"), 0644); err != nil {
return "", "", fmt.Errorf("write recipient file: %w", err)
}
return identityPath, recipientPath, nil
}
// GenerateTestKeys generates test keys in memory
func GenerateTestKeys() (identity age.Identity, recipient age.Recipient, err error) {
id, err := age.GenerateX25519Identity()
if err != nil {
return nil, nil, fmt.Errorf("generate identity: %w", err)
}
return id, id.Recipient(), nil
}
// MustGenerateTestKeyPair generates a test key pair or panics
func MustGenerateTestKeyPair(dir string) (identityPath, recipientPath string) {
identityPath, recipientPath, err := GenerateTestKeyPair(dir)
if err != nil {
panic(fmt.Sprintf("failed to generate test key pair: %v", err))
}
return identityPath, recipientPath
}
// EncryptBytes is a convenience function for one-shot encryption
func EncryptBytes(plaintext []byte, recipients ...age.Recipient) ([]byte, error) {
var buf bytes.Buffer
w, err := age.Encrypt(&buf, recipients...)
if err != nil {
return nil, fmt.Errorf("create encryptor: %w", err)
}
if _, err := w.Write(plaintext); err != nil {
return nil, fmt.Errorf("write plaintext: %w", err)
}
if err := w.Close(); err != nil {
return nil, fmt.Errorf("close encryptor: %w", err)
}
return buf.Bytes(), nil
}
// DecryptBytes is a convenience function for one-shot decryption
func DecryptBytes(ciphertext []byte, identities ...age.Identity) ([]byte, error) {
r, err := age.Decrypt(bytes.NewReader(ciphertext), identities...)
if err != nil {
return nil, fmt.Errorf("create decryptor: %w", err)
}
plaintext, err := io.ReadAll(r)
if err != nil {
return nil, fmt.Errorf("read plaintext: %w", err)
}
return plaintext, nil
}

View File

@@ -0,0 +1,100 @@
package mcpclient
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
)
// Client is a client for the Sequential Thinking MCP server
type Client struct {
baseURL string
httpClient *http.Client
}
// ToolRequest represents a request to call an MCP tool
type ToolRequest struct {
Tool string `json:"tool"`
Payload map[string]interface{} `json:"payload"`
}
// ToolResponse represents the response from an MCP tool call
type ToolResponse struct {
Result interface{} `json:"result,omitempty"`
Error string `json:"error,omitempty"`
}
// New creates a new MCP client
func New(baseURL string) *Client {
return &Client{
baseURL: baseURL,
httpClient: &http.Client{
Timeout: 120 * time.Second, // Longer timeout for thinking operations
},
}
}
// Health checks if the MCP server is healthy
func (c *Client) Health(ctx context.Context) error {
req, err := http.NewRequestWithContext(ctx, "GET", c.baseURL+"/health", nil)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
resp, err := c.httpClient.Do(req)
if err != nil {
return fmt.Errorf("http request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("health check failed: status %d", resp.StatusCode)
}
return nil
}
// CallTool calls an MCP tool
func (c *Client) CallTool(ctx context.Context, req *ToolRequest) (*ToolResponse, error) {
jsonData, err := json.Marshal(req)
if err != nil {
return nil, fmt.Errorf("marshal request: %w", err)
}
httpReq, err := http.NewRequestWithContext(ctx, "POST", c.baseURL+"/mcp/tool", bytes.NewReader(jsonData))
if err != nil {
return nil, fmt.Errorf("create request: %w", err)
}
httpReq.Header.Set("Content-Type", "application/json")
resp, err := c.httpClient.Do(httpReq)
if err != nil {
return nil, fmt.Errorf("http request: %w", err)
}
defer resp.Body.Close()
body, err := io.ReadAll(resp.Body)
if err != nil {
return nil, fmt.Errorf("read response: %w", err)
}
if resp.StatusCode != http.StatusOK {
return nil, fmt.Errorf("tool call failed: status %d, body: %s", resp.StatusCode, string(body))
}
var toolResp ToolResponse
if err := json.Unmarshal(body, &toolResp); err != nil {
return nil, fmt.Errorf("unmarshal response: %w", err)
}
if toolResp.Error != "" {
return nil, fmt.Errorf("tool error: %s", toolResp.Error)
}
return &toolResp, nil
}

View File

@@ -0,0 +1,39 @@
package observability
import (
"os"
"strings"
"time"
"github.com/rs/zerolog"
"github.com/rs/zerolog/log"
)
// InitLogger initializes the global logger
func InitLogger(level string) {
// Set up zerolog with human-friendly console output
output := zerolog.ConsoleWriter{
Out: os.Stdout,
TimeFormat: time.RFC3339,
}
log.Logger = zerolog.New(output).
With().
Timestamp().
Caller().
Logger()
// Set log level
switch strings.ToLower(level) {
case "debug":
zerolog.SetGlobalLevel(zerolog.DebugLevel)
case "info":
zerolog.SetGlobalLevel(zerolog.InfoLevel)
case "warn":
zerolog.SetGlobalLevel(zerolog.WarnLevel)
case "error":
zerolog.SetGlobalLevel(zerolog.ErrorLevel)
default:
zerolog.SetGlobalLevel(zerolog.InfoLevel)
}
}

View File

@@ -0,0 +1,85 @@
package observability
import (
"net/http"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/client_golang/prometheus/promhttp"
)
// Metrics holds Prometheus metrics for the wrapper
type Metrics struct {
requestsTotal prometheus.Counter
errorsTotal prometheus.Counter
decryptFails prometheus.Counter
encryptFails prometheus.Counter
policyDenials prometheus.Counter
requestDuration prometheus.Histogram
}
// InitMetrics initializes Prometheus metrics
func InitMetrics() *Metrics {
return &Metrics{
requestsTotal: promauto.NewCounter(prometheus.CounterOpts{
Name: "seqthink_requests_total",
Help: "Total number of requests received",
}),
errorsTotal: promauto.NewCounter(prometheus.CounterOpts{
Name: "seqthink_errors_total",
Help: "Total number of errors",
}),
decryptFails: promauto.NewCounter(prometheus.CounterOpts{
Name: "seqthink_decrypt_failures_total",
Help: "Total number of decryption failures",
}),
encryptFails: promauto.NewCounter(prometheus.CounterOpts{
Name: "seqthink_encrypt_failures_total",
Help: "Total number of encryption failures",
}),
policyDenials: promauto.NewCounter(prometheus.CounterOpts{
Name: "seqthink_policy_denials_total",
Help: "Total number of policy denials",
}),
requestDuration: promauto.NewHistogram(prometheus.HistogramOpts{
Name: "seqthink_request_duration_seconds",
Help: "Request duration in seconds",
Buckets: prometheus.DefBuckets,
}),
}
}
// IncrementRequests increments the request counter
func (m *Metrics) IncrementRequests() {
m.requestsTotal.Inc()
}
// IncrementErrors increments the error counter
func (m *Metrics) IncrementErrors() {
m.errorsTotal.Inc()
}
// IncrementDecryptFails increments the decrypt failure counter
func (m *Metrics) IncrementDecryptFails() {
m.decryptFails.Inc()
}
// IncrementEncryptFails increments the encrypt failure counter
func (m *Metrics) IncrementEncryptFails() {
m.encryptFails.Inc()
}
// IncrementPolicyDenials increments the policy denial counter
func (m *Metrics) IncrementPolicyDenials() {
m.policyDenials.Inc()
}
// ObserveRequestDuration records request duration
func (m *Metrics) ObserveRequestDuration(seconds float64) {
m.requestDuration.Observe(seconds)
}
// Handler returns the Prometheus metrics HTTP handler
func (m *Metrics) Handler() http.Handler {
return promhttp.Handler()
}

354
pkg/seqthink/policy/jwt.go Normal file
View File

@@ -0,0 +1,354 @@
package policy
import (
"context"
"crypto/ed25519"
"crypto/rsa"
"encoding/base64"
"encoding/json"
"fmt"
"io"
"math/big"
"net/http"
"strings"
"sync"
"time"
"github.com/golang-jwt/jwt/v5"
"github.com/rs/zerolog/log"
)
// Claims represents the JWT claims structure
type Claims struct {
Subject string `json:"sub"`
Scopes []string `json:"scopes,omitempty"`
Scope string `json:"scope,omitempty"` // Space-separated scopes
jwt.RegisteredClaims
}
// JWKS represents a JSON Web Key Set
type JWKS struct {
Keys []JWK `json:"keys"`
}
// JWK represents a JSON Web Key
type JWK struct {
Kid string `json:"kid"`
Kty string `json:"kty"`
Alg string `json:"alg"`
Use string `json:"use"`
N string `json:"n"`
E string `json:"e"`
X string `json:"x"`
Crv string `json:"crv"`
}
// Validator validates JWT tokens
type Validator struct {
jwksURL string
requiredScope string
httpClient *http.Client
keys map[string]interface{}
keysMutex sync.RWMutex
lastFetch time.Time
cacheDuration time.Duration
}
// NewValidator creates a new JWT validator
func NewValidator(jwksURL, requiredScope string) *Validator {
return &Validator{
jwksURL: jwksURL,
requiredScope: requiredScope,
httpClient: &http.Client{
Timeout: 10 * time.Second,
},
keys: make(map[string]interface{}),
cacheDuration: 1 * time.Hour, // Cache JWKS for 1 hour
}
}
// ValidateToken validates a JWT token and checks required scopes
func (v *Validator) ValidateToken(tokenString string) (*Claims, error) {
// Parse token
token, err := jwt.ParseWithClaims(tokenString, &Claims{}, func(token *jwt.Token) (interface{}, error) {
// Get key ID from header
kid, ok := token.Header["kid"].(string)
if !ok {
return nil, fmt.Errorf("no kid in token header")
}
// Get public key for this kid
publicKey, err := v.getPublicKey(kid)
if err != nil {
return nil, fmt.Errorf("get public key: %w", err)
}
switch token.Method.(type) {
case *jwt.SigningMethodRSA, *jwt.SigningMethodRSAPSS:
rsaKey, ok := publicKey.(*rsa.PublicKey)
if !ok {
return nil, fmt.Errorf("expected RSA public key for kid %s", kid)
}
return rsaKey, nil
case *jwt.SigningMethodEd25519:
edKey, ok := publicKey.(ed25519.PublicKey)
if !ok {
return nil, fmt.Errorf("expected Ed25519 public key for kid %s", kid)
}
return edKey, nil
default:
return nil, fmt.Errorf("unexpected signing method: %v", token.Header["alg"])
}
})
if err != nil {
return nil, fmt.Errorf("parse token: %w", err)
}
// Extract claims
claims, ok := token.Claims.(*Claims)
if !ok || !token.Valid {
return nil, fmt.Errorf("invalid token claims")
}
// Validate expiration
if claims.ExpiresAt != nil && claims.ExpiresAt.Before(time.Now()) {
return nil, fmt.Errorf("token expired")
}
// Validate not before
if claims.NotBefore != nil && claims.NotBefore.After(time.Now()) {
return nil, fmt.Errorf("token not yet valid")
}
// Check required scope
if v.requiredScope != "" {
if !v.hasRequiredScope(claims) {
return nil, fmt.Errorf("missing required scope: %s", v.requiredScope)
}
}
return claims, nil
}
// hasRequiredScope checks if claims contain the required scope
func (v *Validator) hasRequiredScope(claims *Claims) bool {
// Check scopes array
for _, scope := range claims.Scopes {
if scope == v.requiredScope {
return true
}
}
// Check space-separated scope string (OAuth2 style)
if claims.Scope != "" {
for _, scope := range parseScopes(claims.Scope) {
if scope == v.requiredScope {
return true
}
}
}
return false
}
// getPublicKey retrieves a public key by kid, fetching JWKS if needed
func (v *Validator) getPublicKey(kid string) (interface{}, error) {
// Check if cache is expired
v.keysMutex.RLock()
cacheExpired := time.Since(v.lastFetch) > v.cacheDuration
key, keyExists := v.keys[kid]
v.keysMutex.RUnlock()
// If key exists and cache is not expired, return it
if keyExists && !cacheExpired {
return key, nil
}
// Need to fetch JWKS (either key not found or cache expired)
if err := v.fetchJWKS(); err != nil {
return nil, fmt.Errorf("fetch JWKS: %w", err)
}
// Try again after fetch
v.keysMutex.RLock()
defer v.keysMutex.RUnlock()
if key, ok := v.keys[kid]; ok {
return key, nil
}
return nil, fmt.Errorf("key not found: %s", kid)
}
// fetchJWKS fetches and caches the JWKS from the server
func (v *Validator) fetchJWKS() error {
log.Info().Str("url", v.jwksURL).Msg("Fetching JWKS")
ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", v.jwksURL, nil)
if err != nil {
return fmt.Errorf("create request: %w", err)
}
resp, err := v.httpClient.Do(req)
if err != nil {
return fmt.Errorf("http request: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("JWKS fetch failed: status %d", resp.StatusCode)
}
body, err := io.ReadAll(resp.Body)
if err != nil {
return fmt.Errorf("read response: %w", err)
}
var jwks JWKS
if err := json.Unmarshal(body, &jwks); err != nil {
return fmt.Errorf("unmarshal JWKS: %w", err)
}
// Parse and cache all keys
newKeys := make(map[string]interface{})
for _, jwk := range jwks.Keys {
switch jwk.Kty {
case "RSA":
publicKey, err := jwk.toRSAPublicKey()
if err != nil {
log.Error().Err(err).Str("kid", jwk.Kid).Msg("Failed to parse RSA JWK")
continue
}
newKeys[jwk.Kid] = publicKey
case "OKP":
if strings.EqualFold(jwk.Crv, "Ed25519") {
publicKey, err := jwk.toEd25519PublicKey()
if err != nil {
log.Error().Err(err).Str("kid", jwk.Kid).Msg("Failed to parse Ed25519 JWK")
continue
}
newKeys[jwk.Kid] = publicKey
} else {
log.Warn().Str("kid", jwk.Kid).Str("crv", jwk.Crv).Msg("Skipping unsupported OKP curve")
}
default:
log.Warn().Str("kid", jwk.Kid).Str("kty", jwk.Kty).Msg("Skipping unsupported key type")
}
}
if len(newKeys) == 0 {
return fmt.Errorf("no valid keys found in JWKS")
}
// Update cache
v.keysMutex.Lock()
v.keys = newKeys
v.lastFetch = time.Now()
v.keysMutex.Unlock()
log.Info().Int("key_count", len(newKeys)).Msg("JWKS cached successfully")
return nil
}
// toRSAPublicKey converts a JWK to an RSA public key
func (jwk *JWK) toRSAPublicKey() (*rsa.PublicKey, error) {
// Decode N (modulus) - use base64 URL encoding without padding
nBytes, err := base64URLDecode(jwk.N)
if err != nil {
return nil, fmt.Errorf("decode N: %w", err)
}
// Decode E (exponent)
eBytes, err := base64URLDecode(jwk.E)
if err != nil {
return nil, fmt.Errorf("decode E: %w", err)
}
// Convert E bytes to int
var e int
for _, b := range eBytes {
e = e<<8 | int(b)
}
// Create RSA public key
publicKey := &rsa.PublicKey{
N: new(big.Int).SetBytes(nBytes),
E: e,
}
return publicKey, nil
}
// toEd25519PublicKey converts a JWK to an Ed25519 public key
func (jwk *JWK) toEd25519PublicKey() (ed25519.PublicKey, error) {
if jwk.X == "" {
return nil, fmt.Errorf("missing x coordinate for Ed25519 key")
}
xBytes, err := base64URLDecode(jwk.X)
if err != nil {
return nil, fmt.Errorf("decode x: %w", err)
}
if len(xBytes) != ed25519.PublicKeySize {
return nil, fmt.Errorf("invalid Ed25519 public key length: expected %d, got %d", ed25519.PublicKeySize, len(xBytes))
}
return ed25519.PublicKey(xBytes), nil
}
// parseScopes splits a space-separated scope string
func parseScopes(scopeString string) []string {
if scopeString == "" {
return nil
}
var scopes []string
current := ""
for _, ch := range scopeString {
if ch == ' ' {
if current != "" {
scopes = append(scopes, current)
current = ""
}
} else {
current += string(ch)
}
}
if current != "" {
scopes = append(scopes, current)
}
return scopes
}
// RefreshJWKS forces a refresh of the JWKS cache
func (v *Validator) RefreshJWKS() error {
return v.fetchJWKS()
}
// GetCachedKeyCount returns the number of cached keys
func (v *Validator) GetCachedKeyCount() int {
v.keysMutex.RLock()
defer v.keysMutex.RUnlock()
return len(v.keys)
}
// base64URLDecode decodes a base64 URL-encoded string (with or without padding)
func base64URLDecode(s string) ([]byte, error) {
// Add padding if needed
if l := len(s) % 4; l > 0 {
s += strings.Repeat("=", 4-l)
}
return base64.URLEncoding.DecodeString(s)
}
// base64URLEncode encodes bytes to base64 URL encoding without padding
func base64URLEncode(data []byte) string {
return strings.TrimRight(base64.URLEncoding.EncodeToString(data), "=")
}

View File

@@ -0,0 +1,354 @@
package policy
import (
"crypto/rand"
"crypto/rsa"
"encoding/json"
"net/http"
"net/http/httptest"
"testing"
"time"
"github.com/golang-jwt/jwt/v5"
)
// generateTestKeyPair generates an RSA key pair for testing
func generateTestKeyPair() (*rsa.PrivateKey, error) {
return rsa.GenerateKey(rand.Reader, 2048)
}
// createTestJWKS creates a test JWKS server
func createTestJWKS(t *testing.T, privateKey *rsa.PrivateKey) *httptest.Server {
publicKey := &privateKey.PublicKey
// Create JWK from public key
jwk := JWK{
Kid: "test-key-1",
Kty: "RSA",
Alg: "RS256",
Use: "sig",
N: base64URLEncode(publicKey.N.Bytes()),
E: base64URLEncode([]byte{1, 0, 1}), // 65537
}
jwks := JWKS{
Keys: []JWK{jwk},
}
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(jwks)
}))
return server
}
// createTestToken creates a test JWT token
func createTestToken(privateKey *rsa.PrivateKey, claims *Claims) (string, error) {
token := jwt.NewWithClaims(jwt.SigningMethodRS256, claims)
token.Header["kid"] = "test-key-1"
return token.SignedString(privateKey)
}
func TestValidateToken(t *testing.T) {
// Generate test key pair
privateKey, err := generateTestKeyPair()
if err != nil {
t.Fatalf("generate key pair: %v", err)
}
// Create test JWKS server
jwksServer := createTestJWKS(t, privateKey)
defer jwksServer.Close()
// Create validator
validator := NewValidator(jwksServer.URL, "sequentialthinking.run")
// Test valid token
t.Run("valid_token", func(t *testing.T) {
claims := &Claims{
Subject: "test-user",
Scopes: []string{"sequentialthinking.run"},
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(1 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}
tokenString, err := createTestToken(privateKey, claims)
if err != nil {
t.Fatalf("create token: %v", err)
}
validatedClaims, err := validator.ValidateToken(tokenString)
if err != nil {
t.Fatalf("validate token: %v", err)
}
if validatedClaims.Subject != "test-user" {
t.Errorf("wrong subject: got %s, want test-user", validatedClaims.Subject)
}
})
// Test expired token
t.Run("expired_token", func(t *testing.T) {
claims := &Claims{
Subject: "test-user",
Scopes: []string{"sequentialthinking.run"},
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(-1 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now().Add(-2 * time.Hour)),
},
}
tokenString, err := createTestToken(privateKey, claims)
if err != nil {
t.Fatalf("create token: %v", err)
}
_, err = validator.ValidateToken(tokenString)
if err == nil {
t.Fatal("expected error for expired token")
}
})
// Test missing scope
t.Run("missing_scope", func(t *testing.T) {
claims := &Claims{
Subject: "test-user",
Scopes: []string{"other.scope"},
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(1 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}
tokenString, err := createTestToken(privateKey, claims)
if err != nil {
t.Fatalf("create token: %v", err)
}
_, err = validator.ValidateToken(tokenString)
if err == nil {
t.Fatal("expected error for missing scope")
}
})
// Test space-separated scopes
t.Run("space_separated_scopes", func(t *testing.T) {
claims := &Claims{
Subject: "test-user",
Scope: "read write sequentialthinking.run admin",
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(1 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}
tokenString, err := createTestToken(privateKey, claims)
if err != nil {
t.Fatalf("create token: %v", err)
}
validatedClaims, err := validator.ValidateToken(tokenString)
if err != nil {
t.Fatalf("validate token: %v", err)
}
if validatedClaims.Subject != "test-user" {
t.Errorf("wrong subject: got %s, want test-user", validatedClaims.Subject)
}
})
// Test not before
t.Run("not_yet_valid", func(t *testing.T) {
claims := &Claims{
Subject: "test-user",
Scopes: []string{"sequentialthinking.run"},
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(2 * time.Hour)),
NotBefore: jwt.NewNumericDate(time.Now().Add(1 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}
tokenString, err := createTestToken(privateKey, claims)
if err != nil {
t.Fatalf("create token: %v", err)
}
_, err = validator.ValidateToken(tokenString)
if err == nil {
t.Fatal("expected error for not-yet-valid token")
}
})
}
func TestJWKSCaching(t *testing.T) {
privateKey, err := generateTestKeyPair()
if err != nil {
t.Fatalf("generate key pair: %v", err)
}
fetchCount := 0
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
fetchCount++
publicKey := &privateKey.PublicKey
jwk := JWK{
Kid: "test-key-1",
Kty: "RSA",
Alg: "RS256",
Use: "sig",
N: base64URLEncode(publicKey.N.Bytes()),
E: base64URLEncode([]byte{1, 0, 1}),
}
jwks := JWKS{Keys: []JWK{jwk}}
w.Header().Set("Content-Type", "application/json")
json.NewEncoder(w).Encode(jwks)
}))
defer server.Close()
validator := NewValidator(server.URL, "sequentialthinking.run")
validator.cacheDuration = 100 * time.Millisecond // Short cache for testing
claims := &Claims{
Subject: "test-user",
Scopes: []string{"sequentialthinking.run"},
RegisteredClaims: jwt.RegisteredClaims{
ExpiresAt: jwt.NewNumericDate(time.Now().Add(1 * time.Hour)),
IssuedAt: jwt.NewNumericDate(time.Now()),
},
}
tokenString, err := createTestToken(privateKey, claims)
if err != nil {
t.Fatalf("create token: %v", err)
}
// First validation - should fetch JWKS
_, err = validator.ValidateToken(tokenString)
if err != nil {
t.Fatalf("validate token: %v", err)
}
if fetchCount != 1 {
t.Errorf("expected 1 fetch, got %d", fetchCount)
}
// Second validation - should use cache
_, err = validator.ValidateToken(tokenString)
if err != nil {
t.Fatalf("validate token: %v", err)
}
if fetchCount != 1 {
t.Errorf("expected 1 fetch (cached), got %d", fetchCount)
}
// Wait for cache to expire
time.Sleep(150 * time.Millisecond)
// Third validation - should fetch again
_, err = validator.ValidateToken(tokenString)
if err != nil {
t.Fatalf("validate token: %v", err)
}
if fetchCount != 2 {
t.Errorf("expected 2 fetches (cache expired), got %d", fetchCount)
}
}
func TestParseScopes(t *testing.T) {
tests := []struct {
name string
input string
expected []string
}{
{
name: "single_scope",
input: "read",
expected: []string{"read"},
},
{
name: "multiple_scopes",
input: "read write admin",
expected: []string{"read", "write", "admin"},
},
{
name: "extra_spaces",
input: "read write admin",
expected: []string{"read", "write", "admin"},
},
{
name: "empty_string",
input: "",
expected: nil,
},
{
name: "spaces_only",
input: " ",
expected: nil,
},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
result := parseScopes(tt.input)
if len(result) != len(tt.expected) {
t.Errorf("wrong length: got %d, want %d", len(result), len(tt.expected))
return
}
for i, expected := range tt.expected {
if result[i] != expected {
t.Errorf("scope %d: got %s, want %s", i, result[i], expected)
}
}
})
}
}
func TestInvalidJWKS(t *testing.T) {
server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusInternalServerError)
}))
defer server.Close()
validator := NewValidator(server.URL, "sequentialthinking.run")
err := validator.RefreshJWKS()
if err == nil {
t.Fatal("expected error for invalid JWKS server")
}
}
func TestGetCachedKeyCount(t *testing.T) {
privateKey, err := generateTestKeyPair()
if err != nil {
t.Fatalf("generate key pair: %v", err)
}
jwksServer := createTestJWKS(t, privateKey)
defer jwksServer.Close()
validator := NewValidator(jwksServer.URL, "sequentialthinking.run")
// Initially no keys
if count := validator.GetCachedKeyCount(); count != 0 {
t.Errorf("expected 0 cached keys initially, got %d", count)
}
// Refresh JWKS
if err := validator.RefreshJWKS(); err != nil {
t.Fatalf("refresh JWKS: %v", err)
}
// Should have 1 key
if count := validator.GetCachedKeyCount(); count != 1 {
t.Errorf("expected 1 cached key after refresh, got %d", count)
}
}

View File

@@ -0,0 +1,80 @@
package policy
import (
"net/http"
"strings"
"github.com/rs/zerolog/log"
)
// AuthMiddleware creates HTTP middleware for JWT authentication
type AuthMiddleware struct {
validator *Validator
policyDenials func() // Metrics callback for policy denials
enforcementEnabled bool
}
// NewAuthMiddleware creates a new authentication middleware
func NewAuthMiddleware(validator *Validator, policyDenials func()) *AuthMiddleware {
return &AuthMiddleware{
validator: validator,
policyDenials: policyDenials,
enforcementEnabled: validator != nil,
}
}
// Wrap wraps an HTTP handler with JWT authentication
func (m *AuthMiddleware) Wrap(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
// If enforcement is disabled, pass through
if !m.enforcementEnabled {
log.Warn().Msg("Policy enforcement disabled - allowing request")
next.ServeHTTP(w, r)
return
}
// Extract token from Authorization header
authHeader := r.Header.Get("Authorization")
if authHeader == "" {
log.Error().Msg("Missing Authorization header")
m.policyDenials()
http.Error(w, "Unauthorized: missing authorization header", http.StatusUnauthorized)
return
}
// Check Bearer scheme
parts := strings.SplitN(authHeader, " ", 2)
if len(parts) != 2 || parts[0] != "Bearer" {
log.Error().Str("auth_header", authHeader).Msg("Invalid Authorization header format")
m.policyDenials()
http.Error(w, "Unauthorized: invalid authorization format", http.StatusUnauthorized)
return
}
tokenString := parts[1]
// Validate token
claims, err := m.validator.ValidateToken(tokenString)
if err != nil {
log.Error().Err(err).Msg("Token validation failed")
m.policyDenials()
http.Error(w, "Unauthorized: "+err.Error(), http.StatusUnauthorized)
return
}
log.Info().
Str("subject", claims.Subject).
Strs("scopes", claims.Scopes).
Msg("Request authorized")
// Token is valid, pass to next handler
next.ServeHTTP(w, r)
})
}
// WrapFunc wraps an HTTP handler function with JWT authentication
func (m *AuthMiddleware) WrapFunc(next http.HandlerFunc) http.HandlerFunc {
return func(w http.ResponseWriter, r *http.Request) {
m.Wrap(next).ServeHTTP(w, r)
}
}

View File

@@ -0,0 +1,185 @@
package proxy
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"chorus/pkg/seqthink/mcpclient"
"chorus/pkg/seqthink/observability"
"chorus/pkg/seqthink/policy"
"github.com/gorilla/mux"
"github.com/rs/zerolog/log"
)
// ServerConfig holds the proxy server configuration
type ServerConfig struct {
MCPClient *mcpclient.Client
Metrics *observability.Metrics
MaxBodyMB int
AgeIdentPath string
AgeRecipsPath string
KachingJWKSURL string
RequiredScope string
}
// Server is the proxy server handling requests
type Server struct {
config ServerConfig
router *mux.Router
authMiddleware *policy.AuthMiddleware
}
// NewServer creates a new proxy server
func NewServer(cfg ServerConfig) (*Server, error) {
s := &Server{
config: cfg,
router: mux.NewRouter(),
}
// Setup policy enforcement if configured
if cfg.KachingJWKSURL != "" && cfg.RequiredScope != "" {
log.Info().
Str("jwks_url", cfg.KachingJWKSURL).
Str("required_scope", cfg.RequiredScope).
Msg("Policy enforcement enabled")
validator := policy.NewValidator(cfg.KachingJWKSURL, cfg.RequiredScope)
// Pre-fetch JWKS
if err := validator.RefreshJWKS(); err != nil {
log.Warn().Err(err).Msg("Failed to pre-fetch JWKS, will retry on first request")
}
s.authMiddleware = policy.NewAuthMiddleware(validator, cfg.Metrics.IncrementPolicyDenials)
} else {
log.Warn().Msg("Policy enforcement disabled - no JWKS URL or required scope configured")
s.authMiddleware = policy.NewAuthMiddleware(nil, cfg.Metrics.IncrementPolicyDenials)
}
// Setup routes
s.setupRoutes()
return s, nil
}
// Handler returns the HTTP handler
func (s *Server) Handler() http.Handler {
return s.router
}
// setupRoutes configures the HTTP routes
func (s *Server) setupRoutes() {
// Health checks (no auth required)
s.router.HandleFunc("/health", s.handleHealth).Methods("GET")
s.router.HandleFunc("/ready", s.handleReady).Methods("GET")
// MCP tool endpoint - route based on encryption config, with auth
if s.isEncryptionEnabled() {
log.Info().Msg("Encryption enabled - using encrypted endpoint")
s.router.Handle("/mcp/tool",
s.authMiddleware.Wrap(http.HandlerFunc(s.handleToolCallEncrypted))).Methods("POST")
} else {
log.Warn().Msg("Encryption disabled - using plaintext endpoint")
s.router.Handle("/mcp/tool",
s.authMiddleware.Wrap(http.HandlerFunc(s.handleToolCall))).Methods("POST")
}
// SSE endpoint - route based on encryption config, with auth
if s.isEncryptionEnabled() {
s.router.Handle("/mcp/sse",
s.authMiddleware.Wrap(http.HandlerFunc(s.handleSSEEncrypted))).Methods("GET")
} else {
s.router.Handle("/mcp/sse",
s.authMiddleware.Wrap(http.HandlerFunc(s.handleSSEPlaintext))).Methods("GET")
}
// Metrics endpoint (no auth required for internal monitoring)
s.router.Handle("/metrics", s.config.Metrics.Handler())
}
// isEncryptionEnabled checks if encryption is configured
func (s *Server) isEncryptionEnabled() bool {
return s.config.AgeIdentPath != "" && s.config.AgeRecipsPath != ""
}
// handleHealth returns 200 OK if wrapper is running
func (s *Server) handleHealth(w http.ResponseWriter, r *http.Request) {
w.WriteHeader(http.StatusOK)
w.Write([]byte("OK"))
}
// handleReady checks if MCP server is ready
func (s *Server) handleReady(w http.ResponseWriter, r *http.Request) {
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Second)
defer cancel()
if err := s.config.MCPClient.Health(ctx); err != nil {
log.Error().Err(err).Msg("MCP server not ready")
http.Error(w, "MCP server not ready", http.StatusServiceUnavailable)
return
}
w.WriteHeader(http.StatusOK)
w.Write([]byte("READY"))
}
// handleToolCall proxies tool calls to MCP server (plaintext for Beat 1)
func (s *Server) handleToolCall(w http.ResponseWriter, r *http.Request) {
s.config.Metrics.IncrementRequests()
startTime := time.Now()
// Limit request body size
r.Body = http.MaxBytesReader(w, r.Body, int64(s.config.MaxBodyMB)*1024*1024)
// Read request body
body, err := io.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Msg("Failed to read request body")
s.config.Metrics.IncrementErrors()
http.Error(w, "Failed to read request", http.StatusBadRequest)
return
}
// Parse tool request
var toolReq mcpclient.ToolRequest
if err := json.Unmarshal(body, &toolReq); err != nil {
log.Error().Err(err).Msg("Failed to parse tool request")
s.config.Metrics.IncrementErrors()
http.Error(w, "Invalid request format", http.StatusBadRequest)
return
}
log.Info().
Str("tool", toolReq.Tool).
Msg("Proxying tool call to MCP server")
// Call MCP server
ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second)
defer cancel()
toolResp, err := s.config.MCPClient.CallTool(ctx, &toolReq)
if err != nil {
log.Error().Err(err).Msg("MCP tool call failed")
s.config.Metrics.IncrementErrors()
http.Error(w, fmt.Sprintf("Tool call failed: %v", err), http.StatusInternalServerError)
return
}
// Return response
w.Header().Set("Content-Type", "application/json")
if err := json.NewEncoder(w).Encode(toolResp); err != nil {
log.Error().Err(err).Msg("Failed to encode response")
s.config.Metrics.IncrementErrors()
return
}
duration := time.Since(startTime)
log.Info().
Str("tool", toolReq.Tool).
Dur("duration", duration).
Msg("Tool call completed")
}

View File

@@ -0,0 +1,140 @@
package proxy
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"time"
"chorus/pkg/seqthink/ageio"
"chorus/pkg/seqthink/mcpclient"
"github.com/rs/zerolog/log"
)
// handleToolCallEncrypted proxies encrypted tool calls to MCP server (Beat 2)
func (s *Server) handleToolCallEncrypted(w http.ResponseWriter, r *http.Request) {
s.config.Metrics.IncrementRequests()
startTime := time.Now()
// Check Content-Type header
contentType := r.Header.Get("Content-Type")
if contentType != "application/age" {
log.Error().
Str("content_type", contentType).
Msg("Invalid Content-Type, expected application/age")
s.config.Metrics.IncrementErrors()
http.Error(w, "Content-Type must be application/age", http.StatusUnsupportedMediaType)
return
}
// Limit request body size
r.Body = http.MaxBytesReader(w, r.Body, int64(s.config.MaxBodyMB)*1024*1024)
// Read encrypted request body
encryptedBody, err := io.ReadAll(r.Body)
if err != nil {
log.Error().Err(err).Msg("Failed to read encrypted request body")
s.config.Metrics.IncrementErrors()
http.Error(w, "Failed to read request", http.StatusBadRequest)
return
}
// Create decryptor
decryptor, err := ageio.NewDecryptor(s.config.AgeIdentPath)
if err != nil {
log.Error().Err(err).Msg("Failed to create decryptor")
s.config.Metrics.IncrementErrors()
http.Error(w, "Decryption initialization failed", http.StatusInternalServerError)
return
}
// Decrypt request
plaintext, err := decryptor.Decrypt(encryptedBody)
if err != nil {
log.Error().Err(err).Msg("Failed to decrypt request")
s.config.Metrics.IncrementDecryptFails()
http.Error(w, "Decryption failed", http.StatusBadRequest)
return
}
log.Debug().
Int("encrypted_size", len(encryptedBody)).
Int("plaintext_size", len(plaintext)).
Msg("Request decrypted successfully")
// Parse tool request
var toolReq mcpclient.ToolRequest
if err := json.Unmarshal(plaintext, &toolReq); err != nil {
log.Error().Err(err).Msg("Failed to parse decrypted tool request")
s.config.Metrics.IncrementErrors()
http.Error(w, "Invalid request format", http.StatusBadRequest)
return
}
log.Info().
Str("tool", toolReq.Tool).
Msg("Proxying encrypted tool call to MCP server")
// Call MCP server (plaintext internally)
ctx, cancel := context.WithTimeout(r.Context(), 120*time.Second)
defer cancel()
toolResp, err := s.config.MCPClient.CallTool(ctx, &toolReq)
if err != nil {
log.Error().Err(err).Msg("MCP tool call failed")
s.config.Metrics.IncrementErrors()
http.Error(w, fmt.Sprintf("Tool call failed: %v", err), http.StatusInternalServerError)
return
}
// Serialize response
responseJSON, err := json.Marshal(toolResp)
if err != nil {
log.Error().Err(err).Msg("Failed to marshal response")
s.config.Metrics.IncrementErrors()
http.Error(w, "Response serialization failed", http.StatusInternalServerError)
return
}
// Create encryptor
encryptor, err := ageio.NewEncryptor(s.config.AgeRecipsPath)
if err != nil {
log.Error().Err(err).Msg("Failed to create encryptor")
s.config.Metrics.IncrementErrors()
http.Error(w, "Encryption initialization failed", http.StatusInternalServerError)
return
}
// Encrypt response
encryptedResponse, err := encryptor.Encrypt(responseJSON)
if err != nil {
log.Error().Err(err).Msg("Failed to encrypt response")
s.config.Metrics.IncrementEncryptFails()
http.Error(w, "Encryption failed", http.StatusInternalServerError)
return
}
log.Debug().
Int("plaintext_size", len(responseJSON)).
Int("encrypted_size", len(encryptedResponse)).
Msg("Response encrypted successfully")
// Return encrypted response
w.Header().Set("Content-Type", "application/age")
w.WriteHeader(http.StatusOK)
if _, err := w.Write(encryptedResponse); err != nil {
log.Error().Err(err).Msg("Failed to write encrypted response")
s.config.Metrics.IncrementErrors()
return
}
duration := time.Since(startTime)
s.config.Metrics.ObserveRequestDuration(duration.Seconds())
log.Info().
Str("tool", toolReq.Tool).
Dur("duration", duration).
Bool("encrypted", true).
Msg("Tool call completed")
}

242
pkg/seqthink/proxy/sse.go Normal file
View File

@@ -0,0 +1,242 @@
package proxy
import (
"bufio"
"bytes"
"context"
"encoding/base64"
"fmt"
"io"
"net/http"
"time"
"chorus/pkg/seqthink/ageio"
"github.com/rs/zerolog/log"
)
// SSEFrame represents a single Server-Sent Event frame
type SSEFrame struct {
Event string `json:"event,omitempty"`
Data string `json:"data"`
ID string `json:"id,omitempty"`
}
// handleSSEEncrypted handles encrypted Server-Sent Events streaming
func (s *Server) handleSSEEncrypted(w http.ResponseWriter, r *http.Request) {
s.config.Metrics.IncrementRequests()
startTime := time.Now()
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no") // Disable nginx buffering
// Create flusher for streaming
flusher, ok := w.(http.Flusher)
if !ok {
log.Error().Msg("Streaming not supported")
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
// Create encryptor for streaming
encryptor, err := ageio.NewEncryptor(s.config.AgeRecipsPath)
if err != nil {
log.Error().Err(err).Msg("Failed to create encryptor")
http.Error(w, "Encryption initialization failed", http.StatusInternalServerError)
return
}
// Create context with timeout
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute)
defer cancel()
log.Info().Msg("Starting encrypted SSE stream")
// Simulate streaming encrypted frames
// In production, this would stream from MCP server
frameCount := 0
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info().
Int("frames_sent", frameCount).
Dur("duration", time.Since(startTime)).
Msg("SSE stream closed")
return
case <-ticker.C:
frameCount++
// Create frame data
frameData := fmt.Sprintf(`{"thought_number":%d,"thought":"Processing...","next_thought_needed":true}`, frameCount)
// Encrypt frame
encryptedFrame, err := encryptor.Encrypt([]byte(frameData))
if err != nil {
log.Error().Err(err).Msg("Failed to encrypt SSE frame")
continue
}
// Base64 encode for SSE transmission
encodedFrame := base64.StdEncoding.EncodeToString(encryptedFrame)
// Send SSE frame
fmt.Fprintf(w, "event: thought\n")
fmt.Fprintf(w, "data: %s\n", encodedFrame)
fmt.Fprintf(w, "id: %d\n\n", frameCount)
flusher.Flush()
log.Debug().
Int("frame", frameCount).
Int("encrypted_size", len(encryptedFrame)).
Msg("Sent encrypted SSE frame")
// Stop after 10 frames for demo
if frameCount >= 10 {
fmt.Fprintf(w, "event: done\n")
fmt.Fprintf(w, "data: complete\n\n")
flusher.Flush()
log.Info().
Int("frames_sent", frameCount).
Dur("duration", time.Since(startTime)).
Msg("SSE stream completed")
return
}
}
}
}
// handleSSEPlaintext handles plaintext Server-Sent Events streaming
func (s *Server) handleSSEPlaintext(w http.ResponseWriter, r *http.Request) {
s.config.Metrics.IncrementRequests()
startTime := time.Now()
// Set SSE headers
w.Header().Set("Content-Type", "text/event-stream")
w.Header().Set("Cache-Control", "no-cache")
w.Header().Set("Connection", "keep-alive")
w.Header().Set("X-Accel-Buffering", "no")
// Create flusher for streaming
flusher, ok := w.(http.Flusher)
if !ok {
log.Error().Msg("Streaming not supported")
http.Error(w, "Streaming not supported", http.StatusInternalServerError)
return
}
// Create context with timeout
ctx, cancel := context.WithTimeout(r.Context(), 5*time.Minute)
defer cancel()
log.Info().Msg("Starting plaintext SSE stream")
// Simulate streaming frames
frameCount := 0
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
log.Info().
Int("frames_sent", frameCount).
Dur("duration", time.Since(startTime)).
Msg("SSE stream closed")
return
case <-ticker.C:
frameCount++
// Create frame data
frameData := fmt.Sprintf(`{"thought_number":%d,"thought":"Processing...","next_thought_needed":true}`, frameCount)
// Send SSE frame
fmt.Fprintf(w, "event: thought\n")
fmt.Fprintf(w, "data: %s\n", frameData)
fmt.Fprintf(w, "id: %d\n\n", frameCount)
flusher.Flush()
log.Debug().
Int("frame", frameCount).
Msg("Sent plaintext SSE frame")
// Stop after 10 frames for demo
if frameCount >= 10 {
fmt.Fprintf(w, "event: done\n")
fmt.Fprintf(w, "data: complete\n\n")
flusher.Flush()
log.Info().
Int("frames_sent", frameCount).
Dur("duration", time.Since(startTime)).
Msg("SSE stream completed")
return
}
}
}
}
// DecryptSSEFrame decrypts a base64-encoded encrypted SSE frame
func DecryptSSEFrame(encodedFrame string, identityPath string) ([]byte, error) {
// Base64 decode
encryptedFrame, err := base64.StdEncoding.DecodeString(encodedFrame)
if err != nil {
return nil, fmt.Errorf("base64 decode: %w", err)
}
// Create decryptor
decryptor, err := ageio.NewDecryptor(identityPath)
if err != nil {
return nil, fmt.Errorf("create decryptor: %w", err)
}
// Decrypt
plaintext, err := decryptor.Decrypt(encryptedFrame)
if err != nil {
return nil, fmt.Errorf("decrypt: %w", err)
}
return plaintext, nil
}
// ReadSSEStream reads an SSE stream and returns frames
func ReadSSEStream(r io.Reader) ([]SSEFrame, error) {
var frames []SSEFrame
scanner := bufio.NewScanner(r)
var currentFrame SSEFrame
for scanner.Scan() {
line := scanner.Text()
if line == "" {
// Empty line signals end of frame
if currentFrame.Data != "" {
frames = append(frames, currentFrame)
currentFrame = SSEFrame{}
}
continue
}
// Parse SSE field
if bytes.HasPrefix([]byte(line), []byte("event: ")) {
currentFrame.Event = line[7:]
} else if bytes.HasPrefix([]byte(line), []byte("data: ")) {
currentFrame.Data = line[6:]
} else if bytes.HasPrefix([]byte(line), []byte("id: ")) {
currentFrame.ID = line[4:]
}
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("scan stream: %w", err)
}
return frames, nil
}

View File

@@ -8,10 +8,12 @@ import (
"sync" "sync"
"time" "time"
"chorus/internal/logging"
"chorus/pkg/shhh" "chorus/pkg/shhh"
pubsub "github.com/libp2p/go-libp2p-pubsub" pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/rs/zerolog"
) )
// PubSub handles publish/subscribe messaging for Bzzz coordination and HMMM meta-discussion // PubSub handles publish/subscribe messaging for Bzzz coordination and HMMM meta-discussion
@@ -56,6 +58,9 @@ type PubSub struct {
// SHHH sentinel // SHHH sentinel
redactor *shhh.Sentinel redactor *shhh.Sentinel
redactorMux sync.RWMutex redactorMux sync.RWMutex
// Structured logger
logger zerolog.Logger
} }
// HypercoreLogger interface for dependency injection // HypercoreLogger interface for dependency injection
@@ -168,6 +173,7 @@ func NewPubSubWithLogger(ctx context.Context, h host.Host, chorusTopic, hmmmTopi
dynamicSubs: make(map[string]*pubsub.Subscription), dynamicSubs: make(map[string]*pubsub.Subscription),
dynamicHandlers: make(map[string]func([]byte, peer.ID)), dynamicHandlers: make(map[string]func([]byte, peer.ID)),
hypercoreLog: logger, hypercoreLog: logger,
logger: logging.ForComponent(logging.ComponentP2P),
} }
// Join static topics // Join static topics
@@ -181,7 +187,11 @@ func NewPubSubWithLogger(ctx context.Context, h host.Host, chorusTopic, hmmmTopi
go p.handleHmmmMessages() go p.handleHmmmMessages()
go p.handleContextFeedbackMessages() go p.handleContextFeedbackMessages()
fmt.Printf("📡 PubSub initialized - Bzzz: %s, HMMM: %s, Context: %s\n", chorusTopic, hmmmTopic, contextTopic) p.logger.Info().
Str("bzzz_topic", chorusTopic).
Str("hmmm_topic", hmmmTopic).
Str("context_topic", contextTopic).
Msg("PubSub initialized")
return p, nil return p, nil
} }
@@ -297,7 +307,7 @@ func (p *PubSub) subscribeDynamicTopic(topicName string, handler func([]byte, pe
go p.handleDynamicMessages(topicName, sub) go p.handleDynamicMessages(topicName, sub)
fmt.Printf("Joined dynamic topic: %s\n", topicName) p.logger.Info().Str("topic_name", topicName).Msg("Joined dynamic topic")
return nil return nil
} }
@@ -339,12 +349,12 @@ func (p *PubSub) JoinRoleBasedTopics(role string, expertise []string, reportsTo
// Join all identified topics // Join all identified topics
for _, topicName := range topicsToJoin { for _, topicName := range topicsToJoin {
if err := p.JoinDynamicTopic(topicName); err != nil { if err := p.JoinDynamicTopic(topicName); err != nil {
fmt.Printf("⚠️ Failed to join role-based topic %s: %v\n", topicName, err) p.logger.Warn().Err(err).Str("topic_name", topicName).Msg("Failed to join role-based topic")
continue continue
} }
} }
fmt.Printf("🎯 Joined %d role-based topics for role: %s\n", len(topicsToJoin), role) p.logger.Info().Int("topic_count", len(topicsToJoin)).Str("role", role).Msg("Joined role-based topics")
return nil return nil
} }
@@ -379,7 +389,7 @@ func (p *PubSub) LeaveDynamicTopic(topicName string) {
delete(p.dynamicHandlers, topicName) delete(p.dynamicHandlers, topicName)
p.dynamicHandlersMux.Unlock() p.dynamicHandlersMux.Unlock()
fmt.Printf("🗑️ Left dynamic topic: %s\n", topicName) p.logger.Info().Str("topic_name", topicName).Msg("Left dynamic topic")
} }
// PublishToDynamicTopic publishes a message to a specific dynamic topic // PublishToDynamicTopic publishes a message to a specific dynamic topic
@@ -588,7 +598,7 @@ func (p *PubSub) handleBzzzMessages() {
if p.ctx.Err() != nil { if p.ctx.Err() != nil {
return // Context cancelled return // Context cancelled
} }
fmt.Printf("Error receiving Bzzz message: %v\n", err) p.logger.Warn().Err(err).Msg("Error receiving Bzzz message")
continue continue
} }
@@ -598,7 +608,7 @@ func (p *PubSub) handleBzzzMessages() {
var chorusMsg Message var chorusMsg Message
if err := json.Unmarshal(msg.Data, &chorusMsg); err != nil { if err := json.Unmarshal(msg.Data, &chorusMsg); err != nil {
fmt.Printf("Failed to unmarshal Bzzz message: %v\n", err) p.logger.Warn().Err(err).Msg("Failed to unmarshal Bzzz message")
continue continue
} }
@@ -614,7 +624,7 @@ func (p *PubSub) handleHmmmMessages() {
if p.ctx.Err() != nil { if p.ctx.Err() != nil {
return // Context cancelled return // Context cancelled
} }
fmt.Printf("Error receiving HMMM message: %v\n", err) p.logger.Warn().Err(err).Msg("Error receiving HMMM message")
continue continue
} }
@@ -624,7 +634,7 @@ func (p *PubSub) handleHmmmMessages() {
var hmmmMsg Message var hmmmMsg Message
if err := json.Unmarshal(msg.Data, &hmmmMsg); err != nil { if err := json.Unmarshal(msg.Data, &hmmmMsg); err != nil {
fmt.Printf("Failed to unmarshal HMMM message: %v\n", err) p.logger.Warn().Err(err).Msg("Failed to unmarshal HMMM message")
continue continue
} }
@@ -644,7 +654,7 @@ func (p *PubSub) handleContextFeedbackMessages() {
if p.ctx.Err() != nil { if p.ctx.Err() != nil {
return // Context cancelled return // Context cancelled
} }
fmt.Printf("Error receiving Context Feedback message: %v\n", err) p.logger.Warn().Err(err).Msg("Error receiving Context Feedback message")
continue continue
} }
@@ -654,7 +664,7 @@ func (p *PubSub) handleContextFeedbackMessages() {
var contextMsg Message var contextMsg Message
if err := json.Unmarshal(msg.Data, &contextMsg); err != nil { if err := json.Unmarshal(msg.Data, &contextMsg); err != nil {
fmt.Printf("Failed to unmarshal Context Feedback message: %v\n", err) p.logger.Warn().Err(err).Msg("Failed to unmarshal Context Feedback message")
continue continue
} }
@@ -682,7 +692,7 @@ func (p *PubSub) handleDynamicMessages(topicName string, sub *pubsub.Subscriptio
if p.ctx.Err() != nil || err.Error() == "subscription cancelled" { if p.ctx.Err() != nil || err.Error() == "subscription cancelled" {
return // Subscription was cancelled, exit handler return // Subscription was cancelled, exit handler
} }
fmt.Printf("Error receiving dynamic message on %s: %v\n", topicName, err) p.logger.Warn().Err(err).Str("topic_name", topicName).Msg("Error receiving dynamic message")
continue continue
} }
@@ -697,7 +707,7 @@ func (p *PubSub) handleDynamicMessages(topicName string, sub *pubsub.Subscriptio
var dynamicMsg Message var dynamicMsg Message
if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil { if err := json.Unmarshal(msg.Data, &dynamicMsg); err != nil {
fmt.Printf("Failed to unmarshal dynamic message on %s: %v\n", topicName, err) p.logger.Warn().Err(err).Str("topic_name", topicName).Msg("Failed to unmarshal dynamic message")
continue continue
} }
@@ -710,7 +720,11 @@ func (p *PubSub) handleDynamicMessages(topicName string, sub *pubsub.Subscriptio
// processBzzzMessage handles different types of Bzzz coordination messages // processBzzzMessage handles different types of Bzzz coordination messages
func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) { func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) {
fmt.Printf("🐝 Bzzz [%s] from %s: %v\n", msg.Type, from.ShortString(), msg.Data) p.logger.Debug().
Str("message_type", string(msg.Type)).
Str("from_peer", from.ShortString()).
Interface("data", msg.Data).
Msg("Bzzz message received")
// Log to hypercore if logger is available // Log to hypercore if logger is available
if p.hypercoreLog != nil { if p.hypercoreLog != nil {
@@ -743,15 +757,18 @@ func (p *PubSub) processBzzzMessage(msg Message, from peer.ID) {
} }
if err := p.hypercoreLog.AppendString(logType, logData); err != nil { if err := p.hypercoreLog.AppendString(logType, logData); err != nil {
fmt.Printf("Failed to log Bzzz message to hypercore: %v\n", err) p.logger.Warn().Err(err).Msg("Failed to log Bzzz message to hypercore")
} }
} }
} }
// processHmmmMessage provides default handling for HMMM messages if no external handler is set // processHmmmMessage provides default handling for HMMM messages if no external handler is set
func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) { func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) {
fmt.Printf("🎯 Default HMMM Handler [%s] from %s: %v\n", p.logger.Debug().
msg.Type, from.ShortString(), msg.Data) Str("message_type", string(msg.Type)).
Str("from_peer", from.ShortString()).
Interface("data", msg.Data).
Msg("Default HMMM Handler")
// Log to hypercore if logger is available // Log to hypercore if logger is available
if p.hypercoreLog != nil { if p.hypercoreLog != nil {
@@ -794,15 +811,18 @@ func (p *PubSub) processHmmmMessage(msg Message, from peer.ID) {
} }
if err := p.hypercoreLog.AppendString(logType, logData); err != nil { if err := p.hypercoreLog.AppendString(logType, logData); err != nil {
fmt.Printf("Failed to log HMMM message to hypercore: %v\n", err) p.logger.Warn().Err(err).Msg("Failed to log HMMM message to hypercore")
} }
} }
} }
// processContextFeedbackMessage provides default handling for context feedback messages if no external handler is set // processContextFeedbackMessage provides default handling for context feedback messages if no external handler is set
func (p *PubSub) processContextFeedbackMessage(msg Message, from peer.ID) { func (p *PubSub) processContextFeedbackMessage(msg Message, from peer.ID) {
fmt.Printf("🧠 Context Feedback [%s] from %s: %v\n", p.logger.Debug().
msg.Type, from.ShortString(), msg.Data) Str("message_type", string(msg.Type)).
Str("from_peer", from.ShortString()).
Interface("data", msg.Data).
Msg("Context Feedback")
// Log to hypercore if logger is available // Log to hypercore if logger is available
if p.hypercoreLog != nil { if p.hypercoreLog != nil {
@@ -834,7 +854,7 @@ func (p *PubSub) processContextFeedbackMessage(msg Message, from peer.ID) {
} }
if err := p.hypercoreLog.AppendString(logType, logData); err != nil { if err := p.hypercoreLog.AppendString(logType, logData); err != nil {
fmt.Printf("Failed to log Context Feedback message to hypercore: %v\n", err) p.logger.Warn().Err(err).Msg("Failed to log Context Feedback message to hypercore")
} }
} }
} }