Compare commits
4 Commits
1de8695736
...
feature/ch
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
14b5125c12 | ||
|
|
ea04378962 | ||
| 237e8699eb | |||
|
|
e523c4b543 |
35
README.md
35
README.md
@@ -8,7 +8,7 @@ CHORUS is the runtime that ties the CHORUS ecosystem together: libp2p mesh, DHT-
|
|||||||
| --- | --- | --- |
|
| --- | --- | --- |
|
||||||
| libp2p node + PubSub | ✅ Running | `internal/runtime/shared.go` spins up the mesh, hypercore logging, availability broadcasts. |
|
| libp2p node + PubSub | ✅ Running | `internal/runtime/shared.go` spins up the mesh, hypercore logging, availability broadcasts. |
|
||||||
| DHT + DecisionPublisher | ✅ Running | Encrypted storage wired through `pkg/dht`; decisions written via `ucxl.DecisionPublisher`. |
|
| DHT + DecisionPublisher | ✅ Running | Encrypted storage wired through `pkg/dht`; decisions written via `ucxl.DecisionPublisher`. |
|
||||||
| Election manager | ✅ Running | Admin election integrated with Backbeat; metrics exposed under `pkg/metrics`. |
|
| **Leader Election System** | ✅ **FULLY FUNCTIONAL** | **🎉 MILESTONE: Complete admin election with consensus, discovery protocol, heartbeats, and SLURP activation!** |
|
||||||
| SLURP (context intelligence) | 🚧 Stubbed | `pkg/slurp/slurp.go` contains TODOs for resolver, temporal graphs, intelligence. Leader integration scaffolding exists but uses placeholder IDs/request forwarding. |
|
| SLURP (context intelligence) | 🚧 Stubbed | `pkg/slurp/slurp.go` contains TODOs for resolver, temporal graphs, intelligence. Leader integration scaffolding exists but uses placeholder IDs/request forwarding. |
|
||||||
| SHHH (secrets sentinel) | 🚧 Sentinel live | `pkg/shhh` redacts hypercore + PubSub payloads with audit + metrics hooks (policy replay TBD). |
|
| SHHH (secrets sentinel) | 🚧 Sentinel live | `pkg/shhh` redacts hypercore + PubSub payloads with audit + metrics hooks (policy replay TBD). |
|
||||||
| HMMM routing | 🚧 Partial | PubSub topics join, but capability/role announcements and HMMM router wiring are placeholders (`internal/runtime/agent_support.go`). |
|
| HMMM routing | 🚧 Partial | PubSub topics join, but capability/role announcements and HMMM router wiring are placeholders (`internal/runtime/agent_support.go`). |
|
||||||
@@ -35,6 +35,39 @@ You’ll get a single agent container with:
|
|||||||
|
|
||||||
**Missing today:** SLURP context resolution, advanced SHHH policy replay, HMMM per-issue routing. Expect log warnings/TODOs for those paths.
|
**Missing today:** SLURP context resolution, advanced SHHH policy replay, HMMM per-issue routing. Expect log warnings/TODOs for those paths.
|
||||||
|
|
||||||
|
## 🎉 Leader Election System (NEW!)
|
||||||
|
|
||||||
|
CHORUS now features a complete, production-ready leader election system:
|
||||||
|
|
||||||
|
### Core Features
|
||||||
|
- **Consensus-based election** with weighted scoring (uptime, capabilities, resources)
|
||||||
|
- **Admin discovery protocol** for network-wide leader identification
|
||||||
|
- **Heartbeat system** with automatic failover (15-second intervals)
|
||||||
|
- **Concurrent election prevention** with randomized delays
|
||||||
|
- **SLURP activation** on elected admin nodes
|
||||||
|
|
||||||
|
### How It Works
|
||||||
|
1. **Bootstrap**: Nodes start in idle state, no admin known
|
||||||
|
2. **Discovery**: Nodes send discovery requests to find existing admin
|
||||||
|
3. **Election trigger**: If no admin found after grace period, trigger election
|
||||||
|
4. **Candidacy**: Eligible nodes announce themselves with capability scores
|
||||||
|
5. **Consensus**: Network selects winner based on highest score
|
||||||
|
6. **Leadership**: Winner starts heartbeats, activates SLURP functionality
|
||||||
|
7. **Monitoring**: Nodes continuously verify admin health via heartbeats
|
||||||
|
|
||||||
|
### Debugging
|
||||||
|
Use these log patterns to monitor election health:
|
||||||
|
```bash
|
||||||
|
# Monitor WHOAMI messages and leader identification
|
||||||
|
docker service logs CHORUS_chorus | grep "🤖 WHOAMI\|👑\|📡.*Discovered"
|
||||||
|
|
||||||
|
# Track election cycles
|
||||||
|
docker service logs CHORUS_chorus | grep "🗳️\|📢.*candidacy\|🏆.*winner"
|
||||||
|
|
||||||
|
# Watch discovery protocol
|
||||||
|
docker service logs CHORUS_chorus | grep "📩\|📤\|📥"
|
||||||
|
```
|
||||||
|
|
||||||
## Roadmap Highlights
|
## Roadmap Highlights
|
||||||
|
|
||||||
1. **Security substrate** – land SHHH sentinel, finish SLURP leader-only operations, validate COOEE enrolment (see roadmap Phase 1).
|
1. **Security substrate** – land SHHH sentinel, finish SLURP leader-only operations, validate COOEE enrolment (see roadmap Phase 1).
|
||||||
|
|||||||
@@ -9,10 +9,11 @@ import (
|
|||||||
|
|
||||||
"chorus/internal/logging"
|
"chorus/internal/logging"
|
||||||
"chorus/pubsub"
|
"chorus/pubsub"
|
||||||
|
|
||||||
"github.com/gorilla/mux"
|
"github.com/gorilla/mux"
|
||||||
)
|
)
|
||||||
|
|
||||||
// HTTPServer provides HTTP API endpoints for Bzzz
|
// HTTPServer provides HTTP API endpoints for CHORUS
|
||||||
type HTTPServer struct {
|
type HTTPServer struct {
|
||||||
port int
|
port int
|
||||||
hypercoreLog *logging.HypercoreLog
|
hypercoreLog *logging.HypercoreLog
|
||||||
@@ -20,7 +21,7 @@ type HTTPServer struct {
|
|||||||
server *http.Server
|
server *http.Server
|
||||||
}
|
}
|
||||||
|
|
||||||
// NewHTTPServer creates a new HTTP server for Bzzz API
|
// NewHTTPServer creates a new HTTP server for CHORUS API
|
||||||
func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer {
|
func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer {
|
||||||
return &HTTPServer{
|
return &HTTPServer{
|
||||||
port: port,
|
port: port,
|
||||||
@@ -197,11 +198,11 @@ func (h *HTTPServer) handleGetLogsSince(w http.ResponseWriter, r *http.Request)
|
|||||||
}
|
}
|
||||||
|
|
||||||
response := map[string]interface{}{
|
response := map[string]interface{}{
|
||||||
"entries": entries,
|
"entries": entries,
|
||||||
"count": len(entries),
|
"count": len(entries),
|
||||||
"since_index": index,
|
"since_index": index,
|
||||||
"timestamp": time.Now().Unix(),
|
"timestamp": time.Now().Unix(),
|
||||||
"total": h.hypercoreLog.Length(),
|
"total": h.hypercoreLog.Length(),
|
||||||
}
|
}
|
||||||
|
|
||||||
json.NewEncoder(w).Encode(response)
|
json.NewEncoder(w).Encode(response)
|
||||||
@@ -220,8 +221,8 @@ func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
health := map[string]interface{}{
|
health := map[string]interface{}{
|
||||||
"status": "healthy",
|
"status": "healthy",
|
||||||
"timestamp": time.Now().Unix(),
|
"timestamp": time.Now().Unix(),
|
||||||
"log_entries": h.hypercoreLog.Length(),
|
"log_entries": h.hypercoreLog.Length(),
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -233,10 +234,10 @@ func (h *HTTPServer) handleStatus(w http.ResponseWriter, r *http.Request) {
|
|||||||
w.Header().Set("Content-Type", "application/json")
|
w.Header().Set("Content-Type", "application/json")
|
||||||
|
|
||||||
status := map[string]interface{}{
|
status := map[string]interface{}{
|
||||||
"status": "running",
|
"status": "running",
|
||||||
"timestamp": time.Now().Unix(),
|
"timestamp": time.Now().Unix(),
|
||||||
"hypercore": h.hypercoreLog.GetStats(),
|
"hypercore": h.hypercoreLog.GetStats(),
|
||||||
"api_version": "1.0.0",
|
"api_version": "1.0.0",
|
||||||
}
|
}
|
||||||
|
|
||||||
json.NewEncoder(w).Encode(status)
|
json.NewEncoder(w).Encode(status)
|
||||||
|
|||||||
38
docker/bootstrap.json
Normal file
38
docker/bootstrap.json
Normal file
@@ -0,0 +1,38 @@
|
|||||||
|
{
|
||||||
|
"metadata": {
|
||||||
|
"generated_at": "2024-12-19T10:00:00Z",
|
||||||
|
"cluster_id": "production-cluster",
|
||||||
|
"version": "1.0.0",
|
||||||
|
"notes": "Bootstrap configuration for CHORUS scaling - managed by WHOOSH"
|
||||||
|
},
|
||||||
|
"peers": [
|
||||||
|
{
|
||||||
|
"address": "/ip4/10.0.1.10/tcp/9000/p2p/12D3KooWExample1234567890abcdef",
|
||||||
|
"priority": 100,
|
||||||
|
"region": "us-east-1",
|
||||||
|
"roles": ["admin", "stable"],
|
||||||
|
"enabled": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"address": "/ip4/10.0.1.11/tcp/9000/p2p/12D3KooWExample1234567890abcde2",
|
||||||
|
"priority": 90,
|
||||||
|
"region": "us-east-1",
|
||||||
|
"roles": ["worker", "stable"],
|
||||||
|
"enabled": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"address": "/ip4/10.0.2.10/tcp/9000/p2p/12D3KooWExample1234567890abcde3",
|
||||||
|
"priority": 80,
|
||||||
|
"region": "us-west-2",
|
||||||
|
"roles": ["worker", "stable"],
|
||||||
|
"enabled": true
|
||||||
|
},
|
||||||
|
{
|
||||||
|
"address": "/ip4/10.0.3.10/tcp/9000/p2p/12D3KooWExample1234567890abcde4",
|
||||||
|
"priority": 70,
|
||||||
|
"region": "eu-central-1",
|
||||||
|
"roles": ["worker"],
|
||||||
|
"enabled": false
|
||||||
|
}
|
||||||
|
]
|
||||||
|
}
|
||||||
@@ -15,7 +15,7 @@ services:
|
|||||||
- CHORUS_AGENT_ID=${CHORUS_AGENT_ID:-} # Auto-generated if not provided
|
- CHORUS_AGENT_ID=${CHORUS_AGENT_ID:-} # Auto-generated if not provided
|
||||||
- CHORUS_SPECIALIZATION=${CHORUS_SPECIALIZATION:-general_developer}
|
- CHORUS_SPECIALIZATION=${CHORUS_SPECIALIZATION:-general_developer}
|
||||||
- CHORUS_MAX_TASKS=${CHORUS_MAX_TASKS:-3}
|
- CHORUS_MAX_TASKS=${CHORUS_MAX_TASKS:-3}
|
||||||
- CHORUS_CAPABILITIES=${CHORUS_CAPABILITIES:-general_development,task_coordination,admin_election}
|
- CHORUS_CAPABILITIES=general_development,task_coordination,admin_election
|
||||||
|
|
||||||
# Network configuration
|
# Network configuration
|
||||||
- CHORUS_API_PORT=8080
|
- CHORUS_API_PORT=8080
|
||||||
@@ -23,6 +23,25 @@ services:
|
|||||||
- CHORUS_P2P_PORT=9000
|
- CHORUS_P2P_PORT=9000
|
||||||
- CHORUS_BIND_ADDRESS=0.0.0.0
|
- CHORUS_BIND_ADDRESS=0.0.0.0
|
||||||
|
|
||||||
|
# Scaling optimizations (as per WHOOSH issue #7)
|
||||||
|
- CHORUS_MDNS_ENABLED=false # Disabled for container/swarm environments
|
||||||
|
- CHORUS_DIALS_PER_SEC=5 # Rate limit outbound connections to prevent storms
|
||||||
|
- CHORUS_MAX_CONCURRENT_DHT=16 # Limit concurrent DHT queries
|
||||||
|
|
||||||
|
# Election stability windows (Medium-risk fix 2.1)
|
||||||
|
- CHORUS_ELECTION_MIN_TERM=30s # Minimum time between elections to prevent churn
|
||||||
|
- CHORUS_LEADER_MIN_TERM=45s # Minimum time before challenging healthy leader
|
||||||
|
|
||||||
|
# Assignment system for runtime configuration (Medium-risk fix 2.2)
|
||||||
|
- ASSIGN_URL=${ASSIGN_URL:-} # Optional: WHOOSH assignment endpoint
|
||||||
|
- TASK_SLOT=${TASK_SLOT:-} # Optional: Task slot identifier
|
||||||
|
- TASK_ID=${TASK_ID:-} # Optional: Task identifier
|
||||||
|
- NODE_ID=${NODE_ID:-} # Optional: Node identifier
|
||||||
|
|
||||||
|
# Bootstrap pool configuration (supports JSON and CSV)
|
||||||
|
- BOOTSTRAP_JSON=/config/bootstrap.json # Optional: JSON bootstrap config
|
||||||
|
- CHORUS_BOOTSTRAP_PEERS=${CHORUS_BOOTSTRAP_PEERS:-} # CSV fallback
|
||||||
|
|
||||||
# AI configuration - Provider selection
|
# AI configuration - Provider selection
|
||||||
- CHORUS_AI_PROVIDER=${CHORUS_AI_PROVIDER:-resetdata}
|
- CHORUS_AI_PROVIDER=${CHORUS_AI_PROVIDER:-resetdata}
|
||||||
|
|
||||||
@@ -58,6 +77,11 @@ services:
|
|||||||
- chorus_license_id
|
- chorus_license_id
|
||||||
- resetdata_api_key
|
- resetdata_api_key
|
||||||
|
|
||||||
|
# Configuration files
|
||||||
|
configs:
|
||||||
|
- source: chorus_bootstrap
|
||||||
|
target: /config/bootstrap.json
|
||||||
|
|
||||||
# Persistent data storage
|
# Persistent data storage
|
||||||
volumes:
|
volumes:
|
||||||
- chorus_data:/app/data
|
- chorus_data:/app/data
|
||||||
@@ -91,7 +115,6 @@ services:
|
|||||||
memory: 128M
|
memory: 128M
|
||||||
placement:
|
placement:
|
||||||
constraints:
|
constraints:
|
||||||
- node.hostname != rosewood
|
|
||||||
- node.hostname != acacia
|
- node.hostname != acacia
|
||||||
preferences:
|
preferences:
|
||||||
- spread: node.hostname
|
- spread: node.hostname
|
||||||
@@ -169,7 +192,14 @@ services:
|
|||||||
# Scaling system configuration
|
# Scaling system configuration
|
||||||
WHOOSH_SCALING_KACHING_URL: "https://kaching.chorus.services"
|
WHOOSH_SCALING_KACHING_URL: "https://kaching.chorus.services"
|
||||||
WHOOSH_SCALING_BACKBEAT_URL: "http://backbeat-pulse:8080"
|
WHOOSH_SCALING_BACKBEAT_URL: "http://backbeat-pulse:8080"
|
||||||
WHOOSH_SCALING_CHORUS_URL: "http://chorus:8080"
|
WHOOSH_SCALING_CHORUS_URL: "http://chorus:9000"
|
||||||
|
|
||||||
|
# BACKBEAT integration configuration (temporarily disabled)
|
||||||
|
WHOOSH_BACKBEAT_ENABLED: "false"
|
||||||
|
WHOOSH_BACKBEAT_CLUSTER_ID: "chorus-production"
|
||||||
|
WHOOSH_BACKBEAT_AGENT_ID: "whoosh"
|
||||||
|
WHOOSH_BACKBEAT_NATS_URL: "nats://backbeat-nats:4222"
|
||||||
|
|
||||||
secrets:
|
secrets:
|
||||||
- whoosh_db_password
|
- whoosh_db_password
|
||||||
- gitea_token
|
- gitea_token
|
||||||
@@ -222,7 +252,6 @@ services:
|
|||||||
- traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash
|
- traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash
|
||||||
networks:
|
networks:
|
||||||
- tengig
|
- tengig
|
||||||
- whoosh-backend
|
|
||||||
- chorus_net
|
- chorus_net
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "/app/whoosh", "--health-check"]
|
test: ["CMD", "/app/whoosh", "--health-check"]
|
||||||
@@ -260,14 +289,13 @@ services:
|
|||||||
memory: 256M
|
memory: 256M
|
||||||
cpus: '0.5'
|
cpus: '0.5'
|
||||||
networks:
|
networks:
|
||||||
- whoosh-backend
|
|
||||||
- chorus_net
|
- chorus_net
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD-SHELL", "pg_isready -U whoosh"]
|
test: ["CMD-SHELL", "pg_isready -h localhost -p 5432 -U whoosh -d whoosh"]
|
||||||
interval: 30s
|
interval: 30s
|
||||||
timeout: 10s
|
timeout: 10s
|
||||||
retries: 5
|
retries: 5
|
||||||
start_period: 30s
|
start_period: 40s
|
||||||
|
|
||||||
|
|
||||||
redis:
|
redis:
|
||||||
@@ -295,7 +323,6 @@ services:
|
|||||||
memory: 64M
|
memory: 64M
|
||||||
cpus: '0.1'
|
cpus: '0.1'
|
||||||
networks:
|
networks:
|
||||||
- whoosh-backend
|
|
||||||
- chorus_net
|
- chorus_net
|
||||||
healthcheck:
|
healthcheck:
|
||||||
test: ["CMD", "sh", "-c", "redis-cli --no-auth-warning -a $$(cat /run/secrets/redis_password) ping"]
|
test: ["CMD", "sh", "-c", "redis-cli --no-auth-warning -a $$(cat /run/secrets/redis_password) ping"]
|
||||||
@@ -327,9 +354,6 @@ services:
|
|||||||
- "9099:9090" # Expose Prometheus UI
|
- "9099:9090" # Expose Prometheus UI
|
||||||
deploy:
|
deploy:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
placement:
|
|
||||||
constraints:
|
|
||||||
- node.hostname != rosewood
|
|
||||||
labels:
|
labels:
|
||||||
- traefik.enable=true
|
- traefik.enable=true
|
||||||
- traefik.http.routers.prometheus.rule=Host(`prometheus.chorus.services`)
|
- traefik.http.routers.prometheus.rule=Host(`prometheus.chorus.services`)
|
||||||
@@ -359,9 +383,6 @@ services:
|
|||||||
- "3300:3000" # Expose Grafana UI
|
- "3300:3000" # Expose Grafana UI
|
||||||
deploy:
|
deploy:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
placement:
|
|
||||||
constraints:
|
|
||||||
- node.hostname != rosewood
|
|
||||||
labels:
|
labels:
|
||||||
- traefik.enable=true
|
- traefik.enable=true
|
||||||
- traefik.http.routers.grafana.rule=Host(`grafana.chorus.services`)
|
- traefik.http.routers.grafana.rule=Host(`grafana.chorus.services`)
|
||||||
@@ -424,8 +445,6 @@ services:
|
|||||||
placement:
|
placement:
|
||||||
preferences:
|
preferences:
|
||||||
- spread: node.hostname
|
- spread: node.hostname
|
||||||
constraints:
|
|
||||||
- node.hostname != rosewood # Avoid intermittent gaming PC
|
|
||||||
resources:
|
resources:
|
||||||
limits:
|
limits:
|
||||||
memory: 256M
|
memory: 256M
|
||||||
@@ -493,8 +512,6 @@ services:
|
|||||||
placement:
|
placement:
|
||||||
preferences:
|
preferences:
|
||||||
- spread: node.hostname
|
- spread: node.hostname
|
||||||
constraints:
|
|
||||||
- node.hostname != rosewood
|
|
||||||
resources:
|
resources:
|
||||||
limits:
|
limits:
|
||||||
memory: 512M # Larger for window aggregation
|
memory: 512M # Larger for window aggregation
|
||||||
@@ -527,7 +544,6 @@ services:
|
|||||||
backbeat-nats:
|
backbeat-nats:
|
||||||
image: nats:2.9-alpine
|
image: nats:2.9-alpine
|
||||||
command: ["--jetstream"]
|
command: ["--jetstream"]
|
||||||
|
|
||||||
deploy:
|
deploy:
|
||||||
replicas: 1
|
replicas: 1
|
||||||
restart_policy:
|
restart_policy:
|
||||||
@@ -538,8 +554,6 @@ services:
|
|||||||
placement:
|
placement:
|
||||||
preferences:
|
preferences:
|
||||||
- spread: node.hostname
|
- spread: node.hostname
|
||||||
constraints:
|
|
||||||
- node.hostname != rosewood
|
|
||||||
resources:
|
resources:
|
||||||
limits:
|
limits:
|
||||||
memory: 256M
|
memory: 256M
|
||||||
@@ -547,10 +561,8 @@ services:
|
|||||||
reservations:
|
reservations:
|
||||||
memory: 128M
|
memory: 128M
|
||||||
cpus: '0.25'
|
cpus: '0.25'
|
||||||
|
|
||||||
networks:
|
networks:
|
||||||
- chorus_net
|
- chorus_net
|
||||||
|
|
||||||
# Container logging
|
# Container logging
|
||||||
logging:
|
logging:
|
||||||
driver: "json-file"
|
driver: "json-file"
|
||||||
@@ -603,18 +615,14 @@ networks:
|
|||||||
tengig:
|
tengig:
|
||||||
external: true
|
external: true
|
||||||
|
|
||||||
whoosh-backend:
|
|
||||||
driver: overlay
|
|
||||||
attachable: false
|
|
||||||
|
|
||||||
chorus_net:
|
chorus_net:
|
||||||
driver: overlay
|
driver: overlay
|
||||||
attachable: true
|
attachable: true
|
||||||
ipam:
|
|
||||||
config:
|
|
||||||
- subnet: 10.201.0.0/24
|
|
||||||
|
|
||||||
|
|
||||||
|
configs:
|
||||||
|
chorus_bootstrap:
|
||||||
|
file: ./bootstrap.json
|
||||||
|
|
||||||
secrets:
|
secrets:
|
||||||
chorus_license_id:
|
chorus_license_id:
|
||||||
|
|||||||
@@ -105,6 +105,7 @@ func (t *SimpleTaskTracker) publishTaskCompletion(taskID string, success bool, s
|
|||||||
// SharedRuntime contains all the shared P2P infrastructure components
|
// SharedRuntime contains all the shared P2P infrastructure components
|
||||||
type SharedRuntime struct {
|
type SharedRuntime struct {
|
||||||
Config *config.Config
|
Config *config.Config
|
||||||
|
RuntimeConfig *config.RuntimeConfig
|
||||||
Logger *SimpleLogger
|
Logger *SimpleLogger
|
||||||
Context context.Context
|
Context context.Context
|
||||||
Cancel context.CancelFunc
|
Cancel context.CancelFunc
|
||||||
@@ -149,6 +150,28 @@ func Initialize(appMode string) (*SharedRuntime, error) {
|
|||||||
runtime.Config = cfg
|
runtime.Config = cfg
|
||||||
|
|
||||||
runtime.Logger.Info("✅ Configuration loaded successfully")
|
runtime.Logger.Info("✅ Configuration loaded successfully")
|
||||||
|
|
||||||
|
// Initialize runtime configuration with assignment support
|
||||||
|
runtime.RuntimeConfig = config.NewRuntimeConfig(cfg)
|
||||||
|
|
||||||
|
// Load assignment if ASSIGN_URL is configured
|
||||||
|
if assignURL := os.Getenv("ASSIGN_URL"); assignURL != "" {
|
||||||
|
runtime.Logger.Info("📡 Loading assignment from WHOOSH: %s", assignURL)
|
||||||
|
|
||||||
|
ctx, cancel := context.WithTimeout(runtime.Context, 10*time.Second)
|
||||||
|
if err := runtime.RuntimeConfig.LoadAssignment(ctx, assignURL); err != nil {
|
||||||
|
runtime.Logger.Warn("⚠️ Failed to load assignment (continuing with base config): %v", err)
|
||||||
|
} else {
|
||||||
|
runtime.Logger.Info("✅ Assignment loaded successfully")
|
||||||
|
}
|
||||||
|
cancel()
|
||||||
|
|
||||||
|
// Start reload handler for SIGHUP
|
||||||
|
runtime.RuntimeConfig.StartReloadHandler(runtime.Context, assignURL)
|
||||||
|
runtime.Logger.Info("📡 SIGHUP reload handler started for assignment updates")
|
||||||
|
} else {
|
||||||
|
runtime.Logger.Info("⚪ No ASSIGN_URL configured, using static configuration")
|
||||||
|
}
|
||||||
runtime.Logger.Info("🤖 Agent ID: %s", cfg.Agent.ID)
|
runtime.Logger.Info("🤖 Agent ID: %s", cfg.Agent.ID)
|
||||||
runtime.Logger.Info("🎯 Specialization: %s", cfg.Agent.Specialization)
|
runtime.Logger.Info("🎯 Specialization: %s", cfg.Agent.Specialization)
|
||||||
|
|
||||||
@@ -225,12 +248,17 @@ func Initialize(appMode string) (*SharedRuntime, error) {
|
|||||||
runtime.HypercoreLog = hlog
|
runtime.HypercoreLog = hlog
|
||||||
runtime.Logger.Info("📝 Hypercore logger initialized")
|
runtime.Logger.Info("📝 Hypercore logger initialized")
|
||||||
|
|
||||||
// Initialize mDNS discovery
|
// Initialize mDNS discovery (disabled in container environments for scaling)
|
||||||
mdnsDiscovery, err := discovery.NewMDNSDiscovery(ctx, node.Host(), "chorus-peer-discovery")
|
if cfg.V2.DHT.MDNSEnabled {
|
||||||
if err != nil {
|
mdnsDiscovery, err := discovery.NewMDNSDiscovery(ctx, node.Host(), "chorus-peer-discovery")
|
||||||
return nil, fmt.Errorf("failed to create mDNS discovery: %v", err)
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create mDNS discovery: %v", err)
|
||||||
|
}
|
||||||
|
runtime.MDNSDiscovery = mdnsDiscovery
|
||||||
|
runtime.Logger.Info("🔍 mDNS discovery enabled for local network")
|
||||||
|
} else {
|
||||||
|
runtime.Logger.Info("⚪ mDNS discovery disabled (recommended for container/swarm deployments)")
|
||||||
}
|
}
|
||||||
runtime.MDNSDiscovery = mdnsDiscovery
|
|
||||||
|
|
||||||
// Initialize PubSub with hypercore logging
|
// Initialize PubSub with hypercore logging
|
||||||
ps, err := pubsub.NewPubSubWithLogger(ctx, node.Host(), "chorus/coordination/v1", "hmmm/meta-discussion/v1", hlog)
|
ps, err := pubsub.NewPubSubWithLogger(ctx, node.Host(), "chorus/coordination/v1", "hmmm/meta-discussion/v1", hlog)
|
||||||
@@ -283,6 +311,7 @@ func (r *SharedRuntime) Cleanup() {
|
|||||||
|
|
||||||
if r.MDNSDiscovery != nil {
|
if r.MDNSDiscovery != nil {
|
||||||
r.MDNSDiscovery.Close()
|
r.MDNSDiscovery.Close()
|
||||||
|
r.Logger.Info("🔍 mDNS discovery closed")
|
||||||
}
|
}
|
||||||
|
|
||||||
if r.PubSub != nil {
|
if r.PubSub != nil {
|
||||||
@@ -407,8 +436,20 @@ func (r *SharedRuntime) initializeDHTStorage() error {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// Connect to bootstrap peers if configured
|
// Connect to bootstrap peers (with assignment override support)
|
||||||
for _, addrStr := range r.Config.V2.DHT.BootstrapPeers {
|
bootstrapPeers := r.RuntimeConfig.GetBootstrapPeers()
|
||||||
|
if len(bootstrapPeers) == 0 {
|
||||||
|
bootstrapPeers = r.Config.V2.DHT.BootstrapPeers
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply join stagger if configured
|
||||||
|
joinStagger := r.RuntimeConfig.GetJoinStagger()
|
||||||
|
if joinStagger > 0 {
|
||||||
|
r.Logger.Info("⏱️ Applying join stagger delay: %v", joinStagger)
|
||||||
|
time.Sleep(joinStagger)
|
||||||
|
}
|
||||||
|
|
||||||
|
for _, addrStr := range bootstrapPeers {
|
||||||
addr, err := multiaddr.NewMultiaddr(addrStr)
|
addr, err := multiaddr.NewMultiaddr(addrStr)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
r.Logger.Warn("⚠️ Invalid bootstrap address %s: %v", addrStr, err)
|
r.Logger.Warn("⚠️ Invalid bootstrap address %s: %v", addrStr, err)
|
||||||
|
|||||||
23
p2p/node.go
23
p2p/node.go
@@ -6,16 +6,18 @@ import (
|
|||||||
"time"
|
"time"
|
||||||
|
|
||||||
"chorus/pkg/dht"
|
"chorus/pkg/dht"
|
||||||
|
|
||||||
"github.com/libp2p/go-libp2p"
|
"github.com/libp2p/go-libp2p"
|
||||||
|
kaddht "github.com/libp2p/go-libp2p-kad-dht"
|
||||||
"github.com/libp2p/go-libp2p/core/host"
|
"github.com/libp2p/go-libp2p/core/host"
|
||||||
"github.com/libp2p/go-libp2p/core/peer"
|
"github.com/libp2p/go-libp2p/core/peer"
|
||||||
|
"github.com/libp2p/go-libp2p/p2p/net/connmgr"
|
||||||
"github.com/libp2p/go-libp2p/p2p/security/noise"
|
"github.com/libp2p/go-libp2p/p2p/security/noise"
|
||||||
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
|
"github.com/libp2p/go-libp2p/p2p/transport/tcp"
|
||||||
kaddht "github.com/libp2p/go-libp2p-kad-dht"
|
|
||||||
"github.com/multiformats/go-multiaddr"
|
"github.com/multiformats/go-multiaddr"
|
||||||
)
|
)
|
||||||
|
|
||||||
// Node represents a Bzzz P2P node
|
// Node represents a CHORUS P2P node
|
||||||
type Node struct {
|
type Node struct {
|
||||||
host host.Host
|
host host.Host
|
||||||
ctx context.Context
|
ctx context.Context
|
||||||
@@ -44,13 +46,26 @@ func NewNode(ctx context.Context, opts ...Option) (*Node, error) {
|
|||||||
listenAddrs = append(listenAddrs, ma)
|
listenAddrs = append(listenAddrs, ma)
|
||||||
}
|
}
|
||||||
|
|
||||||
// Create libp2p host with security and transport options
|
// Create connection manager with scaling-optimized limits
|
||||||
|
connManager, err := connmgr.NewConnManager(
|
||||||
|
config.LowWatermark, // Low watermark (32)
|
||||||
|
config.HighWatermark, // High watermark (128)
|
||||||
|
connmgr.WithGracePeriod(30*time.Second), // Grace period before pruning
|
||||||
|
)
|
||||||
|
if err != nil {
|
||||||
|
cancel()
|
||||||
|
return nil, fmt.Errorf("failed to create connection manager: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create libp2p host with security, transport, and scaling options
|
||||||
h, err := libp2p.New(
|
h, err := libp2p.New(
|
||||||
libp2p.ListenAddrs(listenAddrs...),
|
libp2p.ListenAddrs(listenAddrs...),
|
||||||
libp2p.Security(noise.ID, noise.New),
|
libp2p.Security(noise.ID, noise.New),
|
||||||
libp2p.Transport(tcp.NewTCPTransport),
|
libp2p.Transport(tcp.NewTCPTransport),
|
||||||
libp2p.DefaultMuxers,
|
libp2p.DefaultMuxers,
|
||||||
libp2p.EnableRelay(),
|
libp2p.EnableRelay(),
|
||||||
|
libp2p.ConnectionManager(connManager), // Add connection management
|
||||||
|
libp2p.EnableAutoRelay(), // Enable AutoRelay for container environments
|
||||||
)
|
)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
cancel()
|
cancel()
|
||||||
@@ -157,7 +172,7 @@ func (n *Node) startBackgroundTasks() {
|
|||||||
// logConnectionStatus logs the current connection status
|
// logConnectionStatus logs the current connection status
|
||||||
func (n *Node) logConnectionStatus() {
|
func (n *Node) logConnectionStatus() {
|
||||||
peers := n.Peers()
|
peers := n.Peers()
|
||||||
fmt.Printf("🐝 Bzzz Node Status - ID: %s, Connected Peers: %d\n",
|
fmt.Printf("CHORUS Node Status - ID: %s, Connected Peers: %d\n",
|
||||||
n.ID().ShortString(), len(peers))
|
n.ID().ShortString(), len(peers))
|
||||||
|
|
||||||
if len(peers) > 0 {
|
if len(peers) > 0 {
|
||||||
|
|||||||
517
pkg/config/assignment.go
Normal file
517
pkg/config/assignment.go
Normal file
@@ -0,0 +1,517 @@
|
|||||||
|
package config
|
||||||
|
|
||||||
|
import (
|
||||||
|
"context"
|
||||||
|
"encoding/json"
|
||||||
|
"fmt"
|
||||||
|
"io"
|
||||||
|
"net/http"
|
||||||
|
"os"
|
||||||
|
"os/signal"
|
||||||
|
"strings"
|
||||||
|
"sync"
|
||||||
|
"syscall"
|
||||||
|
"time"
|
||||||
|
)
|
||||||
|
|
||||||
|
// RuntimeConfig manages runtime configuration with assignment overrides
|
||||||
|
type RuntimeConfig struct {
|
||||||
|
Base *Config `json:"base"`
|
||||||
|
Override *AssignmentConfig `json:"override"`
|
||||||
|
mu sync.RWMutex
|
||||||
|
reloadCh chan struct{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// AssignmentConfig represents runtime assignment from WHOOSH
|
||||||
|
type AssignmentConfig struct {
|
||||||
|
// Assignment metadata
|
||||||
|
AssignmentID string `json:"assignment_id"`
|
||||||
|
TaskSlot string `json:"task_slot"`
|
||||||
|
TaskID string `json:"task_id"`
|
||||||
|
ClusterID string `json:"cluster_id"`
|
||||||
|
AssignedAt time.Time `json:"assigned_at"`
|
||||||
|
ExpiresAt time.Time `json:"expires_at,omitempty"`
|
||||||
|
|
||||||
|
// Agent configuration overrides
|
||||||
|
Agent *AgentConfig `json:"agent,omitempty"`
|
||||||
|
Network *NetworkConfig `json:"network,omitempty"`
|
||||||
|
AI *AIConfig `json:"ai,omitempty"`
|
||||||
|
Logging *LoggingConfig `json:"logging,omitempty"`
|
||||||
|
|
||||||
|
// Bootstrap configuration for scaling
|
||||||
|
BootstrapPeers []string `json:"bootstrap_peers,omitempty"`
|
||||||
|
JoinStagger int `json:"join_stagger_ms,omitempty"`
|
||||||
|
|
||||||
|
// Runtime capabilities
|
||||||
|
RuntimeCapabilities []string `json:"runtime_capabilities,omitempty"`
|
||||||
|
|
||||||
|
// Key derivation for encryption
|
||||||
|
RoleKey string `json:"role_key,omitempty"`
|
||||||
|
ClusterSecret string `json:"cluster_secret,omitempty"`
|
||||||
|
|
||||||
|
// Custom fields
|
||||||
|
Custom map[string]interface{} `json:"custom,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// AssignmentRequest represents a request for assignment from WHOOSH
|
||||||
|
type AssignmentRequest struct {
|
||||||
|
ClusterID string `json:"cluster_id"`
|
||||||
|
TaskSlot string `json:"task_slot,omitempty"`
|
||||||
|
TaskID string `json:"task_id,omitempty"`
|
||||||
|
AgentID string `json:"agent_id"`
|
||||||
|
NodeID string `json:"node_id"`
|
||||||
|
Timestamp time.Time `json:"timestamp"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// NewRuntimeConfig creates a new runtime configuration manager
|
||||||
|
func NewRuntimeConfig(baseConfig *Config) *RuntimeConfig {
|
||||||
|
return &RuntimeConfig{
|
||||||
|
Base: baseConfig,
|
||||||
|
Override: nil,
|
||||||
|
reloadCh: make(chan struct{}, 1),
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Get returns the effective configuration value, with override taking precedence
|
||||||
|
func (rc *RuntimeConfig) Get(field string) interface{} {
|
||||||
|
rc.mu.RLock()
|
||||||
|
defer rc.mu.RUnlock()
|
||||||
|
|
||||||
|
// Try override first
|
||||||
|
if rc.Override != nil {
|
||||||
|
if value := rc.getFromAssignment(field); value != nil {
|
||||||
|
return value
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fall back to base configuration
|
||||||
|
return rc.getFromBase(field)
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetConfig returns a merged configuration with overrides applied
|
||||||
|
func (rc *RuntimeConfig) GetConfig() *Config {
|
||||||
|
rc.mu.RLock()
|
||||||
|
defer rc.mu.RUnlock()
|
||||||
|
|
||||||
|
if rc.Override == nil {
|
||||||
|
return rc.Base
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create a copy of base config
|
||||||
|
merged := *rc.Base
|
||||||
|
|
||||||
|
// Apply overrides
|
||||||
|
if rc.Override.Agent != nil {
|
||||||
|
rc.mergeAgentConfig(&merged.Agent, rc.Override.Agent)
|
||||||
|
}
|
||||||
|
if rc.Override.Network != nil {
|
||||||
|
rc.mergeNetworkConfig(&merged.Network, rc.Override.Network)
|
||||||
|
}
|
||||||
|
if rc.Override.AI != nil {
|
||||||
|
rc.mergeAIConfig(&merged.AI, rc.Override.AI)
|
||||||
|
}
|
||||||
|
if rc.Override.Logging != nil {
|
||||||
|
rc.mergeLoggingConfig(&merged.Logging, rc.Override.Logging)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &merged
|
||||||
|
}
|
||||||
|
|
||||||
|
// LoadAssignment fetches assignment from WHOOSH and applies it
|
||||||
|
func (rc *RuntimeConfig) LoadAssignment(ctx context.Context, assignURL string) error {
|
||||||
|
if assignURL == "" {
|
||||||
|
return nil // No assignment URL configured
|
||||||
|
}
|
||||||
|
|
||||||
|
// Build assignment request
|
||||||
|
agentID := rc.Base.Agent.ID
|
||||||
|
if agentID == "" {
|
||||||
|
agentID = "unknown"
|
||||||
|
}
|
||||||
|
|
||||||
|
req := AssignmentRequest{
|
||||||
|
ClusterID: rc.Base.License.ClusterID,
|
||||||
|
TaskSlot: os.Getenv("TASK_SLOT"),
|
||||||
|
TaskID: os.Getenv("TASK_ID"),
|
||||||
|
AgentID: agentID,
|
||||||
|
NodeID: os.Getenv("NODE_ID"),
|
||||||
|
Timestamp: time.Now(),
|
||||||
|
}
|
||||||
|
|
||||||
|
// Make HTTP request to WHOOSH
|
||||||
|
assignment, err := rc.fetchAssignment(ctx, assignURL, req)
|
||||||
|
if err != nil {
|
||||||
|
return fmt.Errorf("failed to fetch assignment: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Apply assignment
|
||||||
|
rc.mu.Lock()
|
||||||
|
rc.Override = assignment
|
||||||
|
rc.mu.Unlock()
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// StartReloadHandler starts a signal handler for SIGHUP configuration reloads
|
||||||
|
func (rc *RuntimeConfig) StartReloadHandler(ctx context.Context, assignURL string) {
|
||||||
|
sigCh := make(chan os.Signal, 1)
|
||||||
|
signal.Notify(sigCh, syscall.SIGHUP)
|
||||||
|
|
||||||
|
go func() {
|
||||||
|
for {
|
||||||
|
select {
|
||||||
|
case <-ctx.Done():
|
||||||
|
return
|
||||||
|
case <-sigCh:
|
||||||
|
fmt.Println("📡 Received SIGHUP, reloading assignment configuration...")
|
||||||
|
if err := rc.LoadAssignment(ctx, assignURL); err != nil {
|
||||||
|
fmt.Printf("❌ Failed to reload assignment: %v\n", err)
|
||||||
|
} else {
|
||||||
|
fmt.Println("✅ Assignment configuration reloaded successfully")
|
||||||
|
}
|
||||||
|
case <-rc.reloadCh:
|
||||||
|
// Manual reload trigger
|
||||||
|
if err := rc.LoadAssignment(ctx, assignURL); err != nil {
|
||||||
|
fmt.Printf("❌ Failed to reload assignment: %v\n", err)
|
||||||
|
} else {
|
||||||
|
fmt.Println("✅ Assignment configuration reloaded successfully")
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}()
|
||||||
|
}
|
||||||
|
|
||||||
|
// Reload triggers a manual configuration reload
|
||||||
|
func (rc *RuntimeConfig) Reload() {
|
||||||
|
select {
|
||||||
|
case rc.reloadCh <- struct{}{}:
|
||||||
|
default:
|
||||||
|
// Channel full, reload already pending
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// fetchAssignment makes HTTP request to WHOOSH assignment API
|
||||||
|
func (rc *RuntimeConfig) fetchAssignment(ctx context.Context, assignURL string, req AssignmentRequest) (*AssignmentConfig, error) {
|
||||||
|
// Build query parameters
|
||||||
|
queryParams := fmt.Sprintf("?cluster_id=%s&agent_id=%s&node_id=%s",
|
||||||
|
req.ClusterID, req.AgentID, req.NodeID)
|
||||||
|
|
||||||
|
if req.TaskSlot != "" {
|
||||||
|
queryParams += "&task_slot=" + req.TaskSlot
|
||||||
|
}
|
||||||
|
if req.TaskID != "" {
|
||||||
|
queryParams += "&task_id=" + req.TaskID
|
||||||
|
}
|
||||||
|
|
||||||
|
// Create HTTP request
|
||||||
|
httpReq, err := http.NewRequestWithContext(ctx, "GET", assignURL+queryParams, nil)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to create assignment request: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
httpReq.Header.Set("Accept", "application/json")
|
||||||
|
httpReq.Header.Set("User-Agent", "CHORUS-Agent/0.1.0")
|
||||||
|
|
||||||
|
// Make request with timeout
|
||||||
|
client := &http.Client{Timeout: 10 * time.Second}
|
||||||
|
resp, err := client.Do(httpReq)
|
||||||
|
if err != nil {
|
||||||
|
return nil, fmt.Errorf("assignment request failed: %w", err)
|
||||||
|
}
|
||||||
|
defer resp.Body.Close()
|
||||||
|
|
||||||
|
if resp.StatusCode == http.StatusNotFound {
|
||||||
|
// No assignment available
|
||||||
|
return nil, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
if resp.StatusCode != http.StatusOK {
|
||||||
|
body, _ := io.ReadAll(resp.Body)
|
||||||
|
return nil, fmt.Errorf("assignment request failed with status %d: %s", resp.StatusCode, string(body))
|
||||||
|
}
|
||||||
|
|
||||||
|
// Parse assignment response
|
||||||
|
var assignment AssignmentConfig
|
||||||
|
if err := json.NewDecoder(resp.Body).Decode(&assignment); err != nil {
|
||||||
|
return nil, fmt.Errorf("failed to decode assignment response: %w", err)
|
||||||
|
}
|
||||||
|
|
||||||
|
return &assignment, nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper methods for getting values from different sources
|
||||||
|
func (rc *RuntimeConfig) getFromAssignment(field string) interface{} {
|
||||||
|
if rc.Override == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Simple field mapping - in a real implementation, you'd use reflection
|
||||||
|
// or a more sophisticated field mapping system
|
||||||
|
switch field {
|
||||||
|
case "agent.id":
|
||||||
|
if rc.Override.Agent != nil && rc.Override.Agent.ID != "" {
|
||||||
|
return rc.Override.Agent.ID
|
||||||
|
}
|
||||||
|
case "agent.role":
|
||||||
|
if rc.Override.Agent != nil && rc.Override.Agent.Role != "" {
|
||||||
|
return rc.Override.Agent.Role
|
||||||
|
}
|
||||||
|
case "agent.capabilities":
|
||||||
|
if len(rc.Override.RuntimeCapabilities) > 0 {
|
||||||
|
return rc.Override.RuntimeCapabilities
|
||||||
|
}
|
||||||
|
case "bootstrap_peers":
|
||||||
|
if len(rc.Override.BootstrapPeers) > 0 {
|
||||||
|
return rc.Override.BootstrapPeers
|
||||||
|
}
|
||||||
|
case "join_stagger":
|
||||||
|
if rc.Override.JoinStagger > 0 {
|
||||||
|
return rc.Override.JoinStagger
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check custom fields
|
||||||
|
if rc.Override.Custom != nil {
|
||||||
|
if val, exists := rc.Override.Custom[field]; exists {
|
||||||
|
return val
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *RuntimeConfig) getFromBase(field string) interface{} {
|
||||||
|
// Simple field mapping for base config
|
||||||
|
switch field {
|
||||||
|
case "agent.id":
|
||||||
|
return rc.Base.Agent.ID
|
||||||
|
case "agent.role":
|
||||||
|
return rc.Base.Agent.Role
|
||||||
|
case "agent.capabilities":
|
||||||
|
return rc.Base.Agent.Capabilities
|
||||||
|
default:
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Helper methods for merging configuration sections
|
||||||
|
func (rc *RuntimeConfig) mergeAgentConfig(base *AgentConfig, override *AgentConfig) {
|
||||||
|
if override.ID != "" {
|
||||||
|
base.ID = override.ID
|
||||||
|
}
|
||||||
|
if override.Specialization != "" {
|
||||||
|
base.Specialization = override.Specialization
|
||||||
|
}
|
||||||
|
if override.MaxTasks > 0 {
|
||||||
|
base.MaxTasks = override.MaxTasks
|
||||||
|
}
|
||||||
|
if len(override.Capabilities) > 0 {
|
||||||
|
base.Capabilities = override.Capabilities
|
||||||
|
}
|
||||||
|
if len(override.Models) > 0 {
|
||||||
|
base.Models = override.Models
|
||||||
|
}
|
||||||
|
if override.Role != "" {
|
||||||
|
base.Role = override.Role
|
||||||
|
}
|
||||||
|
if override.Project != "" {
|
||||||
|
base.Project = override.Project
|
||||||
|
}
|
||||||
|
if len(override.Expertise) > 0 {
|
||||||
|
base.Expertise = override.Expertise
|
||||||
|
}
|
||||||
|
if override.ReportsTo != "" {
|
||||||
|
base.ReportsTo = override.ReportsTo
|
||||||
|
}
|
||||||
|
if len(override.Deliverables) > 0 {
|
||||||
|
base.Deliverables = override.Deliverables
|
||||||
|
}
|
||||||
|
if override.ModelSelectionWebhook != "" {
|
||||||
|
base.ModelSelectionWebhook = override.ModelSelectionWebhook
|
||||||
|
}
|
||||||
|
if override.DefaultReasoningModel != "" {
|
||||||
|
base.DefaultReasoningModel = override.DefaultReasoningModel
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *RuntimeConfig) mergeNetworkConfig(base *NetworkConfig, override *NetworkConfig) {
|
||||||
|
if override.P2PPort > 0 {
|
||||||
|
base.P2PPort = override.P2PPort
|
||||||
|
}
|
||||||
|
if override.APIPort > 0 {
|
||||||
|
base.APIPort = override.APIPort
|
||||||
|
}
|
||||||
|
if override.HealthPort > 0 {
|
||||||
|
base.HealthPort = override.HealthPort
|
||||||
|
}
|
||||||
|
if override.BindAddr != "" {
|
||||||
|
base.BindAddr = override.BindAddr
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *RuntimeConfig) mergeAIConfig(base *AIConfig, override *AIConfig) {
|
||||||
|
if override.Provider != "" {
|
||||||
|
base.Provider = override.Provider
|
||||||
|
}
|
||||||
|
// Merge Ollama config if present
|
||||||
|
if override.Ollama.Endpoint != "" {
|
||||||
|
base.Ollama.Endpoint = override.Ollama.Endpoint
|
||||||
|
}
|
||||||
|
if override.Ollama.Timeout > 0 {
|
||||||
|
base.Ollama.Timeout = override.Ollama.Timeout
|
||||||
|
}
|
||||||
|
// Merge ResetData config if present
|
||||||
|
if override.ResetData.BaseURL != "" {
|
||||||
|
base.ResetData.BaseURL = override.ResetData.BaseURL
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
func (rc *RuntimeConfig) mergeLoggingConfig(base *LoggingConfig, override *LoggingConfig) {
|
||||||
|
if override.Level != "" {
|
||||||
|
base.Level = override.Level
|
||||||
|
}
|
||||||
|
if override.Format != "" {
|
||||||
|
base.Format = override.Format
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// BootstrapConfig represents JSON bootstrap configuration
|
||||||
|
type BootstrapConfig struct {
|
||||||
|
Peers []BootstrapPeer `json:"peers"`
|
||||||
|
Metadata BootstrapMeta `json:"metadata,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BootstrapPeer represents a single bootstrap peer
|
||||||
|
type BootstrapPeer struct {
|
||||||
|
Address string `json:"address"`
|
||||||
|
Priority int `json:"priority,omitempty"`
|
||||||
|
Region string `json:"region,omitempty"`
|
||||||
|
Roles []string `json:"roles,omitempty"`
|
||||||
|
Enabled bool `json:"enabled"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// BootstrapMeta contains metadata about the bootstrap configuration
|
||||||
|
type BootstrapMeta struct {
|
||||||
|
GeneratedAt time.Time `json:"generated_at,omitempty"`
|
||||||
|
ClusterID string `json:"cluster_id,omitempty"`
|
||||||
|
Version string `json:"version,omitempty"`
|
||||||
|
Notes string `json:"notes,omitempty"`
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetBootstrapPeers returns bootstrap peers with assignment override support and JSON config
|
||||||
|
func (rc *RuntimeConfig) GetBootstrapPeers() []string {
|
||||||
|
rc.mu.RLock()
|
||||||
|
defer rc.mu.RUnlock()
|
||||||
|
|
||||||
|
// First priority: Assignment override from WHOOSH
|
||||||
|
if rc.Override != nil && len(rc.Override.BootstrapPeers) > 0 {
|
||||||
|
return rc.Override.BootstrapPeers
|
||||||
|
}
|
||||||
|
|
||||||
|
// Second priority: JSON bootstrap configuration
|
||||||
|
if jsonPeers := rc.loadBootstrapJSON(); len(jsonPeers) > 0 {
|
||||||
|
return jsonPeers
|
||||||
|
}
|
||||||
|
|
||||||
|
// Third priority: Environment variable (CSV format)
|
||||||
|
if bootstrapEnv := os.Getenv("CHORUS_BOOTSTRAP_PEERS"); bootstrapEnv != "" {
|
||||||
|
peers := strings.Split(bootstrapEnv, ",")
|
||||||
|
// Trim whitespace from each peer
|
||||||
|
for i, peer := range peers {
|
||||||
|
peers[i] = strings.TrimSpace(peer)
|
||||||
|
}
|
||||||
|
return peers
|
||||||
|
}
|
||||||
|
|
||||||
|
return []string{}
|
||||||
|
}
|
||||||
|
|
||||||
|
// loadBootstrapJSON loads bootstrap peers from JSON file
|
||||||
|
func (rc *RuntimeConfig) loadBootstrapJSON() []string {
|
||||||
|
jsonPath := os.Getenv("BOOTSTRAP_JSON")
|
||||||
|
if jsonPath == "" {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check if file exists
|
||||||
|
if _, err := os.Stat(jsonPath); os.IsNotExist(err) {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Read and parse JSON file
|
||||||
|
data, err := os.ReadFile(jsonPath)
|
||||||
|
if err != nil {
|
||||||
|
fmt.Printf("⚠️ Failed to read bootstrap JSON file %s: %v\n", jsonPath, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
var config BootstrapConfig
|
||||||
|
if err := json.Unmarshal(data, &config); err != nil {
|
||||||
|
fmt.Printf("⚠️ Failed to parse bootstrap JSON file %s: %v\n", jsonPath, err)
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract enabled peer addresses, sorted by priority
|
||||||
|
var peers []string
|
||||||
|
enabledPeers := make([]BootstrapPeer, 0, len(config.Peers))
|
||||||
|
|
||||||
|
// Filter enabled peers
|
||||||
|
for _, peer := range config.Peers {
|
||||||
|
if peer.Enabled && peer.Address != "" {
|
||||||
|
enabledPeers = append(enabledPeers, peer)
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Sort by priority (higher priority first)
|
||||||
|
for i := 0; i < len(enabledPeers)-1; i++ {
|
||||||
|
for j := i + 1; j < len(enabledPeers); j++ {
|
||||||
|
if enabledPeers[j].Priority > enabledPeers[i].Priority {
|
||||||
|
enabledPeers[i], enabledPeers[j] = enabledPeers[j], enabledPeers[i]
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Extract addresses
|
||||||
|
for _, peer := range enabledPeers {
|
||||||
|
peers = append(peers, peer.Address)
|
||||||
|
}
|
||||||
|
|
||||||
|
if len(peers) > 0 {
|
||||||
|
fmt.Printf("📋 Loaded %d bootstrap peers from JSON: %s\n", len(peers), jsonPath)
|
||||||
|
}
|
||||||
|
|
||||||
|
return peers
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetJoinStagger returns join stagger delay with assignment override support
|
||||||
|
func (rc *RuntimeConfig) GetJoinStagger() time.Duration {
|
||||||
|
rc.mu.RLock()
|
||||||
|
defer rc.mu.RUnlock()
|
||||||
|
|
||||||
|
if rc.Override != nil && rc.Override.JoinStagger > 0 {
|
||||||
|
return time.Duration(rc.Override.JoinStagger) * time.Millisecond
|
||||||
|
}
|
||||||
|
|
||||||
|
// Fall back to environment variable
|
||||||
|
if staggerEnv := os.Getenv("CHORUS_JOIN_STAGGER_MS"); staggerEnv != "" {
|
||||||
|
if ms, err := time.ParseDuration(staggerEnv + "ms"); err == nil {
|
||||||
|
return ms
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return 0
|
||||||
|
}
|
||||||
|
|
||||||
|
// GetAssignmentInfo returns current assignment metadata
|
||||||
|
func (rc *RuntimeConfig) GetAssignmentInfo() *AssignmentConfig {
|
||||||
|
rc.mu.RLock()
|
||||||
|
defer rc.mu.RUnlock()
|
||||||
|
|
||||||
|
if rc.Override == nil {
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
|
||||||
|
// Return a copy to prevent external modification
|
||||||
|
assignment := *rc.Override
|
||||||
|
return &assignment
|
||||||
|
}
|
||||||
@@ -100,6 +100,7 @@ type V2Config struct {
|
|||||||
type DHTConfig struct {
|
type DHTConfig struct {
|
||||||
Enabled bool `yaml:"enabled"`
|
Enabled bool `yaml:"enabled"`
|
||||||
BootstrapPeers []string `yaml:"bootstrap_peers"`
|
BootstrapPeers []string `yaml:"bootstrap_peers"`
|
||||||
|
MDNSEnabled bool `yaml:"mdns_enabled"`
|
||||||
}
|
}
|
||||||
|
|
||||||
// UCXLConfig defines UCXL protocol settings
|
// UCXLConfig defines UCXL protocol settings
|
||||||
@@ -192,6 +193,7 @@ func LoadFromEnvironment() (*Config, error) {
|
|||||||
DHT: DHTConfig{
|
DHT: DHTConfig{
|
||||||
Enabled: getEnvBoolOrDefault("CHORUS_DHT_ENABLED", true),
|
Enabled: getEnvBoolOrDefault("CHORUS_DHT_ENABLED", true),
|
||||||
BootstrapPeers: getEnvArrayOrDefault("CHORUS_BOOTSTRAP_PEERS", []string{}),
|
BootstrapPeers: getEnvArrayOrDefault("CHORUS_BOOTSTRAP_PEERS", []string{}),
|
||||||
|
MDNSEnabled: getEnvBoolOrDefault("CHORUS_MDNS_ENABLED", true),
|
||||||
},
|
},
|
||||||
},
|
},
|
||||||
UCXL: UCXLConfig{
|
UCXL: UCXLConfig{
|
||||||
|
|||||||
@@ -41,10 +41,16 @@ type HybridUCXLConfig struct {
|
|||||||
}
|
}
|
||||||
|
|
||||||
type DiscoveryConfig struct {
|
type DiscoveryConfig struct {
|
||||||
MDNSEnabled bool `env:"CHORUS_MDNS_ENABLED" default:"true" json:"mdns_enabled" yaml:"mdns_enabled"`
|
MDNSEnabled bool `env:"CHORUS_MDNS_ENABLED" default:"true" json:"mdns_enabled" yaml:"mdns_enabled"`
|
||||||
DHTDiscovery bool `env:"CHORUS_DHT_DISCOVERY" default:"false" json:"dht_discovery" yaml:"dht_discovery"`
|
DHTDiscovery bool `env:"CHORUS_DHT_DISCOVERY" default:"false" json:"dht_discovery" yaml:"dht_discovery"`
|
||||||
AnnounceInterval time.Duration `env:"CHORUS_ANNOUNCE_INTERVAL" default:"30s" json:"announce_interval" yaml:"announce_interval"`
|
AnnounceInterval time.Duration `env:"CHORUS_ANNOUNCE_INTERVAL" default:"30s" json:"announce_interval" yaml:"announce_interval"`
|
||||||
ServiceName string `env:"CHORUS_SERVICE_NAME" default:"CHORUS" json:"service_name" yaml:"service_name"`
|
ServiceName string `env:"CHORUS_SERVICE_NAME" default:"CHORUS" json:"service_name" yaml:"service_name"`
|
||||||
|
|
||||||
|
// Rate limiting for scaling (as per WHOOSH issue #7)
|
||||||
|
DialsPerSecond int `env:"CHORUS_DIALS_PER_SEC" default:"5" json:"dials_per_second" yaml:"dials_per_second"`
|
||||||
|
MaxConcurrentDHT int `env:"CHORUS_MAX_CONCURRENT_DHT" default:"16" json:"max_concurrent_dht" yaml:"max_concurrent_dht"`
|
||||||
|
MaxConcurrentDials int `env:"CHORUS_MAX_CONCURRENT_DIALS" default:"10" json:"max_concurrent_dials" yaml:"max_concurrent_dials"`
|
||||||
|
JoinStaggerMS int `env:"CHORUS_JOIN_STAGGER_MS" default:"0" json:"join_stagger_ms" yaml:"join_stagger_ms"`
|
||||||
}
|
}
|
||||||
|
|
||||||
type MonitoringConfig struct {
|
type MonitoringConfig struct {
|
||||||
@@ -79,10 +85,16 @@ func LoadHybridConfig() (*HybridConfig, error) {
|
|||||||
|
|
||||||
// Load Discovery configuration
|
// Load Discovery configuration
|
||||||
config.Discovery = DiscoveryConfig{
|
config.Discovery = DiscoveryConfig{
|
||||||
MDNSEnabled: getEnvBool("CHORUS_MDNS_ENABLED", true),
|
MDNSEnabled: getEnvBool("CHORUS_MDNS_ENABLED", true),
|
||||||
DHTDiscovery: getEnvBool("CHORUS_DHT_DISCOVERY", false),
|
DHTDiscovery: getEnvBool("CHORUS_DHT_DISCOVERY", false),
|
||||||
AnnounceInterval: getEnvDuration("CHORUS_ANNOUNCE_INTERVAL", 30*time.Second),
|
AnnounceInterval: getEnvDuration("CHORUS_ANNOUNCE_INTERVAL", 30*time.Second),
|
||||||
ServiceName: getEnvString("CHORUS_SERVICE_NAME", "CHORUS"),
|
ServiceName: getEnvString("CHORUS_SERVICE_NAME", "CHORUS"),
|
||||||
|
|
||||||
|
// Rate limiting for scaling (as per WHOOSH issue #7)
|
||||||
|
DialsPerSecond: getEnvInt("CHORUS_DIALS_PER_SEC", 5),
|
||||||
|
MaxConcurrentDHT: getEnvInt("CHORUS_MAX_CONCURRENT_DHT", 16),
|
||||||
|
MaxConcurrentDials: getEnvInt("CHORUS_MAX_CONCURRENT_DIALS", 10),
|
||||||
|
JoinStaggerMS: getEnvInt("CHORUS_JOIN_STAGGER_MS", 0),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Load Monitoring configuration
|
// Load Monitoring configuration
|
||||||
|
|||||||
@@ -1,354 +0,0 @@
|
|||||||
package config
|
|
||||||
|
|
||||||
import (
|
|
||||||
"context"
|
|
||||||
"encoding/json"
|
|
||||||
"fmt"
|
|
||||||
"io/ioutil"
|
|
||||||
"net/http"
|
|
||||||
"net/url"
|
|
||||||
"os"
|
|
||||||
"os/signal"
|
|
||||||
"sync"
|
|
||||||
"syscall"
|
|
||||||
"time"
|
|
||||||
)
|
|
||||||
|
|
||||||
// RuntimeConfig provides dynamic configuration with assignment override support
|
|
||||||
type RuntimeConfig struct {
|
|
||||||
mu sync.RWMutex
|
|
||||||
base *Config // Base configuration from environment
|
|
||||||
over *Config // Override configuration from assignment
|
|
||||||
}
|
|
||||||
|
|
||||||
// AssignmentConfig represents configuration received from WHOOSH assignment
|
|
||||||
type AssignmentConfig struct {
|
|
||||||
Role string `json:"role,omitempty"`
|
|
||||||
Model string `json:"model,omitempty"`
|
|
||||||
PromptUCXL string `json:"prompt_ucxl,omitempty"`
|
|
||||||
Specialization string `json:"specialization,omitempty"`
|
|
||||||
Capabilities []string `json:"capabilities,omitempty"`
|
|
||||||
Environment map[string]string `json:"environment,omitempty"`
|
|
||||||
BootstrapPeers []string `json:"bootstrap_peers,omitempty"`
|
|
||||||
JoinStaggerMS int `json:"join_stagger_ms,omitempty"`
|
|
||||||
DialsPerSecond int `json:"dials_per_second,omitempty"`
|
|
||||||
MaxConcurrentDHT int `json:"max_concurrent_dht,omitempty"`
|
|
||||||
AssignmentID string `json:"assignment_id,omitempty"`
|
|
||||||
ConfigEpoch int64 `json:"config_epoch,omitempty"`
|
|
||||||
}
|
|
||||||
|
|
||||||
// NewRuntimeConfig creates a new runtime configuration manager
|
|
||||||
func NewRuntimeConfig(baseConfig *Config) *RuntimeConfig {
|
|
||||||
return &RuntimeConfig{
|
|
||||||
base: baseConfig,
|
|
||||||
over: &Config{}, // Empty override initially
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Get retrieves a configuration value with override precedence
|
|
||||||
func (rc *RuntimeConfig) Get(key string) interface{} {
|
|
||||||
rc.mu.RLock()
|
|
||||||
defer rc.mu.RUnlock()
|
|
||||||
|
|
||||||
// Check override first, then base
|
|
||||||
if value := rc.getFromConfig(rc.over, key); value != nil {
|
|
||||||
return value
|
|
||||||
}
|
|
||||||
return rc.getFromConfig(rc.base, key)
|
|
||||||
}
|
|
||||||
|
|
||||||
// getFromConfig extracts a value from a config struct by key
|
|
||||||
func (rc *RuntimeConfig) getFromConfig(cfg *Config, key string) interface{} {
|
|
||||||
if cfg == nil {
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
switch key {
|
|
||||||
case "agent.role":
|
|
||||||
if cfg.Agent.Role != "" {
|
|
||||||
return cfg.Agent.Role
|
|
||||||
}
|
|
||||||
case "agent.specialization":
|
|
||||||
if cfg.Agent.Specialization != "" {
|
|
||||||
return cfg.Agent.Specialization
|
|
||||||
}
|
|
||||||
case "agent.capabilities":
|
|
||||||
if len(cfg.Agent.Capabilities) > 0 {
|
|
||||||
return cfg.Agent.Capabilities
|
|
||||||
}
|
|
||||||
case "agent.models":
|
|
||||||
if len(cfg.Agent.Models) > 0 {
|
|
||||||
return cfg.Agent.Models
|
|
||||||
}
|
|
||||||
case "agent.default_reasoning_model":
|
|
||||||
if cfg.Agent.DefaultReasoningModel != "" {
|
|
||||||
return cfg.Agent.DefaultReasoningModel
|
|
||||||
}
|
|
||||||
case "v2.dht.bootstrap_peers":
|
|
||||||
if len(cfg.V2.DHT.BootstrapPeers) > 0 {
|
|
||||||
return cfg.V2.DHT.BootstrapPeers
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetString retrieves a string configuration value
|
|
||||||
func (rc *RuntimeConfig) GetString(key string) string {
|
|
||||||
if value := rc.Get(key); value != nil {
|
|
||||||
if str, ok := value.(string); ok {
|
|
||||||
return str
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return ""
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetStringSlice retrieves a string slice configuration value
|
|
||||||
func (rc *RuntimeConfig) GetStringSlice(key string) []string {
|
|
||||||
if value := rc.Get(key); value != nil {
|
|
||||||
if slice, ok := value.([]string); ok {
|
|
||||||
return slice
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetInt retrieves an integer configuration value
|
|
||||||
func (rc *RuntimeConfig) GetInt(key string) int {
|
|
||||||
if value := rc.Get(key); value != nil {
|
|
||||||
if i, ok := value.(int); ok {
|
|
||||||
return i
|
|
||||||
}
|
|
||||||
}
|
|
||||||
return 0
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadAssignment loads configuration from WHOOSH assignment endpoint
|
|
||||||
func (rc *RuntimeConfig) LoadAssignment(ctx context.Context) error {
|
|
||||||
assignURL := os.Getenv("ASSIGN_URL")
|
|
||||||
if assignURL == "" {
|
|
||||||
return nil // No assignment URL configured
|
|
||||||
}
|
|
||||||
|
|
||||||
// Build assignment request URL with task identity
|
|
||||||
params := url.Values{}
|
|
||||||
if taskSlot := os.Getenv("TASK_SLOT"); taskSlot != "" {
|
|
||||||
params.Set("slot", taskSlot)
|
|
||||||
}
|
|
||||||
if taskID := os.Getenv("TASK_ID"); taskID != "" {
|
|
||||||
params.Set("task", taskID)
|
|
||||||
}
|
|
||||||
if clusterID := os.Getenv("CHORUS_CLUSTER_ID"); clusterID != "" {
|
|
||||||
params.Set("cluster", clusterID)
|
|
||||||
}
|
|
||||||
|
|
||||||
fullURL := assignURL
|
|
||||||
if len(params) > 0 {
|
|
||||||
fullURL += "?" + params.Encode()
|
|
||||||
}
|
|
||||||
|
|
||||||
// Fetch assignment with timeout
|
|
||||||
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
|
|
||||||
defer cancel()
|
|
||||||
|
|
||||||
req, err := http.NewRequestWithContext(ctx, "GET", fullURL, nil)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to create assignment request: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
client := &http.Client{Timeout: 10 * time.Second}
|
|
||||||
resp, err := client.Do(req)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("assignment request failed: %w", err)
|
|
||||||
}
|
|
||||||
defer resp.Body.Close()
|
|
||||||
|
|
||||||
if resp.StatusCode != http.StatusOK {
|
|
||||||
return fmt.Errorf("assignment request failed with status %d", resp.StatusCode)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Parse assignment response
|
|
||||||
var assignment AssignmentConfig
|
|
||||||
if err := json.NewDecoder(resp.Body).Decode(&assignment); err != nil {
|
|
||||||
return fmt.Errorf("failed to decode assignment response: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply assignment to override config
|
|
||||||
if err := rc.applyAssignment(&assignment); err != nil {
|
|
||||||
return fmt.Errorf("failed to apply assignment: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("📥 Loaded assignment: role=%s, model=%s, epoch=%d\n",
|
|
||||||
assignment.Role, assignment.Model, assignment.ConfigEpoch)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// LoadAssignmentFromFile loads configuration from a file (for config objects)
|
|
||||||
func (rc *RuntimeConfig) LoadAssignmentFromFile(filePath string) error {
|
|
||||||
if filePath == "" {
|
|
||||||
return nil // No file configured
|
|
||||||
}
|
|
||||||
|
|
||||||
data, err := ioutil.ReadFile(filePath)
|
|
||||||
if err != nil {
|
|
||||||
return fmt.Errorf("failed to read assignment file %s: %w", filePath, err)
|
|
||||||
}
|
|
||||||
|
|
||||||
var assignment AssignmentConfig
|
|
||||||
if err := json.Unmarshal(data, &assignment); err != nil {
|
|
||||||
return fmt.Errorf("failed to parse assignment file: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
if err := rc.applyAssignment(&assignment); err != nil {
|
|
||||||
return fmt.Errorf("failed to apply file assignment: %w", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
fmt.Printf("📁 Loaded assignment from file: role=%s, model=%s\n",
|
|
||||||
assignment.Role, assignment.Model)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// applyAssignment applies an assignment to the override configuration
|
|
||||||
func (rc *RuntimeConfig) applyAssignment(assignment *AssignmentConfig) error {
|
|
||||||
rc.mu.Lock()
|
|
||||||
defer rc.mu.Unlock()
|
|
||||||
|
|
||||||
// Create new override config
|
|
||||||
override := &Config{
|
|
||||||
Agent: AgentConfig{
|
|
||||||
Role: assignment.Role,
|
|
||||||
Specialization: assignment.Specialization,
|
|
||||||
Capabilities: assignment.Capabilities,
|
|
||||||
DefaultReasoningModel: assignment.Model,
|
|
||||||
},
|
|
||||||
V2: V2Config{
|
|
||||||
DHT: DHTConfig{
|
|
||||||
BootstrapPeers: assignment.BootstrapPeers,
|
|
||||||
},
|
|
||||||
},
|
|
||||||
}
|
|
||||||
|
|
||||||
// Handle models array
|
|
||||||
if assignment.Model != "" {
|
|
||||||
override.Agent.Models = []string{assignment.Model}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Apply environment variables from assignment
|
|
||||||
for key, value := range assignment.Environment {
|
|
||||||
os.Setenv(key, value)
|
|
||||||
}
|
|
||||||
|
|
||||||
rc.over = override
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
|
|
||||||
// StartReloadHandler starts a signal handler for configuration reload (SIGHUP)
|
|
||||||
func (rc *RuntimeConfig) StartReloadHandler(ctx context.Context) {
|
|
||||||
sigChan := make(chan os.Signal, 1)
|
|
||||||
signal.Notify(sigChan, syscall.SIGHUP)
|
|
||||||
|
|
||||||
go func() {
|
|
||||||
for {
|
|
||||||
select {
|
|
||||||
case <-ctx.Done():
|
|
||||||
return
|
|
||||||
case <-sigChan:
|
|
||||||
fmt.Println("🔄 Received SIGHUP, reloading configuration...")
|
|
||||||
if err := rc.LoadAssignment(ctx); err != nil {
|
|
||||||
fmt.Printf("⚠️ Failed to reload assignment: %v\n", err)
|
|
||||||
} else {
|
|
||||||
fmt.Println("✅ Configuration reloaded successfully")
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}()
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetBaseConfig returns the base configuration (from environment)
|
|
||||||
func (rc *RuntimeConfig) GetBaseConfig() *Config {
|
|
||||||
rc.mu.RLock()
|
|
||||||
defer rc.mu.RUnlock()
|
|
||||||
return rc.base
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetEffectiveConfig returns the effective merged configuration
|
|
||||||
func (rc *RuntimeConfig) GetEffectiveConfig() *Config {
|
|
||||||
rc.mu.RLock()
|
|
||||||
defer rc.mu.RUnlock()
|
|
||||||
|
|
||||||
// Start with base config
|
|
||||||
effective := *rc.base
|
|
||||||
|
|
||||||
// Apply overrides
|
|
||||||
if rc.over.Agent.Role != "" {
|
|
||||||
effective.Agent.Role = rc.over.Agent.Role
|
|
||||||
}
|
|
||||||
if rc.over.Agent.Specialization != "" {
|
|
||||||
effective.Agent.Specialization = rc.over.Agent.Specialization
|
|
||||||
}
|
|
||||||
if len(rc.over.Agent.Capabilities) > 0 {
|
|
||||||
effective.Agent.Capabilities = rc.over.Agent.Capabilities
|
|
||||||
}
|
|
||||||
if len(rc.over.Agent.Models) > 0 {
|
|
||||||
effective.Agent.Models = rc.over.Agent.Models
|
|
||||||
}
|
|
||||||
if rc.over.Agent.DefaultReasoningModel != "" {
|
|
||||||
effective.Agent.DefaultReasoningModel = rc.over.Agent.DefaultReasoningModel
|
|
||||||
}
|
|
||||||
if len(rc.over.V2.DHT.BootstrapPeers) > 0 {
|
|
||||||
effective.V2.DHT.BootstrapPeers = rc.over.V2.DHT.BootstrapPeers
|
|
||||||
}
|
|
||||||
|
|
||||||
return &effective
|
|
||||||
}
|
|
||||||
|
|
||||||
// GetAssignmentStats returns assignment statistics for monitoring
|
|
||||||
func (rc *RuntimeConfig) GetAssignmentStats() map[string]interface{} {
|
|
||||||
rc.mu.RLock()
|
|
||||||
defer rc.mu.RUnlock()
|
|
||||||
|
|
||||||
hasOverride := rc.over.Agent.Role != "" ||
|
|
||||||
rc.over.Agent.Specialization != "" ||
|
|
||||||
len(rc.over.Agent.Capabilities) > 0 ||
|
|
||||||
len(rc.over.V2.DHT.BootstrapPeers) > 0
|
|
||||||
|
|
||||||
stats := map[string]interface{}{
|
|
||||||
"has_assignment": hasOverride,
|
|
||||||
"assign_url": os.Getenv("ASSIGN_URL"),
|
|
||||||
"task_slot": os.Getenv("TASK_SLOT"),
|
|
||||||
"task_id": os.Getenv("TASK_ID"),
|
|
||||||
}
|
|
||||||
|
|
||||||
if hasOverride {
|
|
||||||
stats["assigned_role"] = rc.over.Agent.Role
|
|
||||||
stats["assigned_specialization"] = rc.over.Agent.Specialization
|
|
||||||
stats["assigned_capabilities"] = rc.over.Agent.Capabilities
|
|
||||||
stats["assigned_models"] = rc.over.Agent.Models
|
|
||||||
stats["bootstrap_peers_count"] = len(rc.over.V2.DHT.BootstrapPeers)
|
|
||||||
}
|
|
||||||
|
|
||||||
return stats
|
|
||||||
}
|
|
||||||
|
|
||||||
// InitializeAssignmentFromEnv initializes assignment from environment variables
|
|
||||||
func (rc *RuntimeConfig) InitializeAssignmentFromEnv(ctx context.Context) error {
|
|
||||||
// Try loading from assignment URL first
|
|
||||||
if err := rc.LoadAssignment(ctx); err != nil {
|
|
||||||
fmt.Printf("⚠️ Failed to load assignment from URL: %v\n", err)
|
|
||||||
}
|
|
||||||
|
|
||||||
// Try loading from file (for config objects)
|
|
||||||
if assignFile := os.Getenv("ASSIGNMENT_FILE"); assignFile != "" {
|
|
||||||
if err := rc.LoadAssignmentFromFile(assignFile); err != nil {
|
|
||||||
fmt.Printf("⚠️ Failed to load assignment from file: %v\n", err)
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
// Start reload handler for SIGHUP
|
|
||||||
rc.StartReloadHandler(ctx)
|
|
||||||
|
|
||||||
return nil
|
|
||||||
}
|
|
||||||
@@ -6,6 +6,7 @@ import (
|
|||||||
"fmt"
|
"fmt"
|
||||||
"log"
|
"log"
|
||||||
"math/rand"
|
"math/rand"
|
||||||
|
"os"
|
||||||
"sync"
|
"sync"
|
||||||
"time"
|
"time"
|
||||||
|
|
||||||
@@ -102,6 +103,11 @@ type ElectionManager struct {
|
|||||||
onAdminChanged func(oldAdmin, newAdmin string)
|
onAdminChanged func(oldAdmin, newAdmin string)
|
||||||
onElectionComplete func(winner string)
|
onElectionComplete func(winner string)
|
||||||
|
|
||||||
|
// Stability window to prevent election churn (Medium-risk fix 2.1)
|
||||||
|
lastElectionTime time.Time
|
||||||
|
electionStabilityWindow time.Duration
|
||||||
|
leaderStabilityWindow time.Duration
|
||||||
|
|
||||||
startTime time.Time
|
startTime time.Time
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -137,6 +143,10 @@ func NewElectionManager(
|
|||||||
votes: make(map[string]string),
|
votes: make(map[string]string),
|
||||||
electionTrigger: make(chan ElectionTrigger, 10),
|
electionTrigger: make(chan ElectionTrigger, 10),
|
||||||
startTime: time.Now(),
|
startTime: time.Now(),
|
||||||
|
|
||||||
|
// Initialize stability windows (as per WHOOSH issue #7)
|
||||||
|
electionStabilityWindow: getElectionStabilityWindow(cfg),
|
||||||
|
leaderStabilityWindow: getLeaderStabilityWindow(cfg),
|
||||||
}
|
}
|
||||||
|
|
||||||
// Initialize heartbeat manager
|
// Initialize heartbeat manager
|
||||||
@@ -220,11 +230,13 @@ func (em *ElectionManager) Stop() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
// TriggerElection manually triggers an election
|
// TriggerElection manually triggers an election with stability window checks
|
||||||
func (em *ElectionManager) TriggerElection(trigger ElectionTrigger) {
|
func (em *ElectionManager) TriggerElection(trigger ElectionTrigger) {
|
||||||
// Check if election already in progress
|
// Check if election already in progress
|
||||||
em.mu.RLock()
|
em.mu.RLock()
|
||||||
currentState := em.state
|
currentState := em.state
|
||||||
|
currentAdmin := em.currentAdmin
|
||||||
|
lastElection := em.lastElectionTime
|
||||||
em.mu.RUnlock()
|
em.mu.RUnlock()
|
||||||
|
|
||||||
if currentState != StateIdle {
|
if currentState != StateIdle {
|
||||||
@@ -232,6 +244,26 @@ func (em *ElectionManager) TriggerElection(trigger ElectionTrigger) {
|
|||||||
return
|
return
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Apply stability window to prevent election churn (WHOOSH issue #7)
|
||||||
|
now := time.Now()
|
||||||
|
if !lastElection.IsZero() {
|
||||||
|
timeSinceElection := now.Sub(lastElection)
|
||||||
|
|
||||||
|
// If we have a current admin, check leader stability window
|
||||||
|
if currentAdmin != "" && timeSinceElection < em.leaderStabilityWindow {
|
||||||
|
log.Printf("⏳ Leader stability window active (%.1fs remaining), ignoring trigger: %s",
|
||||||
|
(em.leaderStabilityWindow - timeSinceElection).Seconds(), trigger)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
|
||||||
|
// General election stability window
|
||||||
|
if timeSinceElection < em.electionStabilityWindow {
|
||||||
|
log.Printf("⏳ Election stability window active (%.1fs remaining), ignoring trigger: %s",
|
||||||
|
(em.electionStabilityWindow - timeSinceElection).Seconds(), trigger)
|
||||||
|
return
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
select {
|
select {
|
||||||
case em.electionTrigger <- trigger:
|
case em.electionTrigger <- trigger:
|
||||||
log.Printf("🗳️ Election triggered: %s", trigger)
|
log.Printf("🗳️ Election triggered: %s", trigger)
|
||||||
@@ -442,6 +474,7 @@ func (em *ElectionManager) beginElection(trigger ElectionTrigger) {
|
|||||||
em.mu.Lock()
|
em.mu.Lock()
|
||||||
em.state = StateElecting
|
em.state = StateElecting
|
||||||
em.currentTerm++
|
em.currentTerm++
|
||||||
|
em.lastElectionTime = time.Now() // Record election timestamp for stability window
|
||||||
term := em.currentTerm
|
term := em.currentTerm
|
||||||
em.candidates = make(map[string]*AdminCandidate)
|
em.candidates = make(map[string]*AdminCandidate)
|
||||||
em.votes = make(map[string]string)
|
em.votes = make(map[string]string)
|
||||||
@@ -1119,3 +1152,43 @@ func (hm *HeartbeatManager) GetHeartbeatStatus() map[string]interface{} {
|
|||||||
|
|
||||||
return status
|
return status
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Helper functions for stability window configuration
|
||||||
|
|
||||||
|
// getElectionStabilityWindow gets the minimum time between elections
|
||||||
|
func getElectionStabilityWindow(cfg *config.Config) time.Duration {
|
||||||
|
// Try to get from environment or use default
|
||||||
|
if stability := os.Getenv("CHORUS_ELECTION_MIN_TERM"); stability != "" {
|
||||||
|
if duration, err := time.ParseDuration(stability); err == nil {
|
||||||
|
return duration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to get from config structure if it exists
|
||||||
|
if cfg.Security.ElectionConfig.DiscoveryTimeout > 0 {
|
||||||
|
// Use double the discovery timeout as default stability window
|
||||||
|
return cfg.Security.ElectionConfig.DiscoveryTimeout * 2
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default fallback
|
||||||
|
return 30 * time.Second
|
||||||
|
}
|
||||||
|
|
||||||
|
// getLeaderStabilityWindow gets the minimum time before challenging a healthy leader
|
||||||
|
func getLeaderStabilityWindow(cfg *config.Config) time.Duration {
|
||||||
|
// Try to get from environment or use default
|
||||||
|
if stability := os.Getenv("CHORUS_LEADER_MIN_TERM"); stability != "" {
|
||||||
|
if duration, err := time.ParseDuration(stability); err == nil {
|
||||||
|
return duration
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Try to get from config structure if it exists
|
||||||
|
if cfg.Security.ElectionConfig.HeartbeatTimeout > 0 {
|
||||||
|
// Use 3x heartbeat timeout as default leader stability
|
||||||
|
return cfg.Security.ElectionConfig.HeartbeatTimeout * 3
|
||||||
|
}
|
||||||
|
|
||||||
|
// Default fallback
|
||||||
|
return 45 * time.Second
|
||||||
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user