Compare commits
	
		
			7 Commits
		
	
	
		
			26e4ef7d8b
			...
			feature/ch
		
	
	| Author | SHA1 | Date | |
|---|---|---|---|
|   | 14b5125c12 | ||
|   | ea04378962 | ||
| 237e8699eb | |||
| 1de8695736 | |||
| c30c6dc480 | |||
|   | e523c4b543 | ||
| ef4bf1efe0 | 
							
								
								
									
										35
									
								
								README.md
									
									
									
									
									
								
							
							
						
						
									
										35
									
								
								README.md
									
									
									
									
									
								
							| @@ -8,7 +8,7 @@ CHORUS is the runtime that ties the CHORUS ecosystem together: libp2p mesh, DHT- | |||||||
| | --- | --- | --- | | | --- | --- | --- | | ||||||
| | libp2p node + PubSub | ✅ Running | `internal/runtime/shared.go` spins up the mesh, hypercore logging, availability broadcasts. | | | libp2p node + PubSub | ✅ Running | `internal/runtime/shared.go` spins up the mesh, hypercore logging, availability broadcasts. | | ||||||
| | DHT + DecisionPublisher | ✅ Running | Encrypted storage wired through `pkg/dht`; decisions written via `ucxl.DecisionPublisher`. | | | DHT + DecisionPublisher | ✅ Running | Encrypted storage wired through `pkg/dht`; decisions written via `ucxl.DecisionPublisher`. | | ||||||
| | Election manager | ✅ Running | Admin election integrated with Backbeat; metrics exposed under `pkg/metrics`. | | | **Leader Election System** | ✅ **FULLY FUNCTIONAL** | **🎉 MILESTONE: Complete admin election with consensus, discovery protocol, heartbeats, and SLURP activation!** | | ||||||
| | SLURP (context intelligence) | 🚧 Stubbed | `pkg/slurp/slurp.go` contains TODOs for resolver, temporal graphs, intelligence. Leader integration scaffolding exists but uses placeholder IDs/request forwarding. | | | SLURP (context intelligence) | 🚧 Stubbed | `pkg/slurp/slurp.go` contains TODOs for resolver, temporal graphs, intelligence. Leader integration scaffolding exists but uses placeholder IDs/request forwarding. | | ||||||
| | SHHH (secrets sentinel) | 🚧 Sentinel live | `pkg/shhh` redacts hypercore + PubSub payloads with audit + metrics hooks (policy replay TBD). | | | SHHH (secrets sentinel) | 🚧 Sentinel live | `pkg/shhh` redacts hypercore + PubSub payloads with audit + metrics hooks (policy replay TBD). | | ||||||
| | HMMM routing | 🚧 Partial | PubSub topics join, but capability/role announcements and HMMM router wiring are placeholders (`internal/runtime/agent_support.go`). | | | HMMM routing | 🚧 Partial | PubSub topics join, but capability/role announcements and HMMM router wiring are placeholders (`internal/runtime/agent_support.go`). | | ||||||
| @@ -35,6 +35,39 @@ You’ll get a single agent container with: | |||||||
|  |  | ||||||
| **Missing today:** SLURP context resolution, advanced SHHH policy replay, HMMM per-issue routing. Expect log warnings/TODOs for those paths. | **Missing today:** SLURP context resolution, advanced SHHH policy replay, HMMM per-issue routing. Expect log warnings/TODOs for those paths. | ||||||
|  |  | ||||||
|  | ## 🎉 Leader Election System (NEW!) | ||||||
|  |  | ||||||
|  | CHORUS now features a complete, production-ready leader election system: | ||||||
|  |  | ||||||
|  | ### Core Features | ||||||
|  | - **Consensus-based election** with weighted scoring (uptime, capabilities, resources) | ||||||
|  | - **Admin discovery protocol** for network-wide leader identification | ||||||
|  | - **Heartbeat system** with automatic failover (15-second intervals) | ||||||
|  | - **Concurrent election prevention** with randomized delays | ||||||
|  | - **SLURP activation** on elected admin nodes | ||||||
|  |  | ||||||
|  | ### How It Works | ||||||
|  | 1. **Bootstrap**: Nodes start in idle state, no admin known | ||||||
|  | 2. **Discovery**: Nodes send discovery requests to find existing admin | ||||||
|  | 3. **Election trigger**: If no admin found after grace period, trigger election | ||||||
|  | 4. **Candidacy**: Eligible nodes announce themselves with capability scores | ||||||
|  | 5. **Consensus**: Network selects winner based on highest score | ||||||
|  | 6. **Leadership**: Winner starts heartbeats, activates SLURP functionality | ||||||
|  | 7. **Monitoring**: Nodes continuously verify admin health via heartbeats | ||||||
|  |  | ||||||
|  | ### Debugging | ||||||
|  | Use these log patterns to monitor election health: | ||||||
|  | ```bash | ||||||
|  | # Monitor WHOAMI messages and leader identification | ||||||
|  | docker service logs CHORUS_chorus | grep "🤖 WHOAMI\|👑\|📡.*Discovered" | ||||||
|  |  | ||||||
|  | # Track election cycles | ||||||
|  | docker service logs CHORUS_chorus | grep "🗳️\|📢.*candidacy\|🏆.*winner" | ||||||
|  |  | ||||||
|  | # Watch discovery protocol | ||||||
|  | docker service logs CHORUS_chorus | grep "📩\|📤\|📥" | ||||||
|  | ``` | ||||||
|  |  | ||||||
| ## Roadmap Highlights | ## Roadmap Highlights | ||||||
|  |  | ||||||
| 1. **Security substrate** – land SHHH sentinel, finish SLURP leader-only operations, validate COOEE enrolment (see roadmap Phase 1). | 1. **Security substrate** – land SHHH sentinel, finish SLURP leader-only operations, validate COOEE enrolment (see roadmap Phase 1). | ||||||
|   | |||||||
| @@ -9,10 +9,11 @@ import ( | |||||||
|  |  | ||||||
| 	"chorus/internal/logging" | 	"chorus/internal/logging" | ||||||
| 	"chorus/pubsub" | 	"chorus/pubsub" | ||||||
|  |  | ||||||
| 	"github.com/gorilla/mux" | 	"github.com/gorilla/mux" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // HTTPServer provides HTTP API endpoints for Bzzz | // HTTPServer provides HTTP API endpoints for CHORUS | ||||||
| type HTTPServer struct { | type HTTPServer struct { | ||||||
| 	port         int | 	port         int | ||||||
| 	hypercoreLog *logging.HypercoreLog | 	hypercoreLog *logging.HypercoreLog | ||||||
| @@ -20,7 +21,7 @@ type HTTPServer struct { | |||||||
| 	server       *http.Server | 	server       *http.Server | ||||||
| } | } | ||||||
|  |  | ||||||
| // NewHTTPServer creates a new HTTP server for Bzzz API | // NewHTTPServer creates a new HTTP server for CHORUS API | ||||||
| func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer { | func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer { | ||||||
| 	return &HTTPServer{ | 	return &HTTPServer{ | ||||||
| 		port:         port, | 		port:         port, | ||||||
| @@ -197,11 +198,11 @@ func (h *HTTPServer) handleGetLogsSince(w http.ResponseWriter, r *http.Request) | |||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	response := map[string]interface{}{ | 	response := map[string]interface{}{ | ||||||
| 		"entries":    entries, | 		"entries":     entries, | ||||||
| 		"count":      len(entries), | 		"count":       len(entries), | ||||||
| 		"since_index": index, | 		"since_index": index, | ||||||
| 		"timestamp":  time.Now().Unix(), | 		"timestamp":   time.Now().Unix(), | ||||||
| 		"total":      h.hypercoreLog.Length(), | 		"total":       h.hypercoreLog.Length(), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	json.NewEncoder(w).Encode(response) | 	json.NewEncoder(w).Encode(response) | ||||||
| @@ -220,8 +221,8 @@ func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) { | |||||||
| 	w.Header().Set("Content-Type", "application/json") | 	w.Header().Set("Content-Type", "application/json") | ||||||
|  |  | ||||||
| 	health := map[string]interface{}{ | 	health := map[string]interface{}{ | ||||||
| 		"status":     "healthy", | 		"status":      "healthy", | ||||||
| 		"timestamp":  time.Now().Unix(), | 		"timestamp":   time.Now().Unix(), | ||||||
| 		"log_entries": h.hypercoreLog.Length(), | 		"log_entries": h.hypercoreLog.Length(), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| @@ -233,10 +234,10 @@ func (h *HTTPServer) handleStatus(w http.ResponseWriter, r *http.Request) { | |||||||
| 	w.Header().Set("Content-Type", "application/json") | 	w.Header().Set("Content-Type", "application/json") | ||||||
|  |  | ||||||
| 	status := map[string]interface{}{ | 	status := map[string]interface{}{ | ||||||
| 		"status":       "running", | 		"status":      "running", | ||||||
| 		"timestamp":    time.Now().Unix(), | 		"timestamp":   time.Now().Unix(), | ||||||
| 		"hypercore":    h.hypercoreLog.GetStats(), | 		"hypercore":   h.hypercoreLog.GetStats(), | ||||||
| 		"api_version":  "1.0.0", | 		"api_version": "1.0.0", | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	json.NewEncoder(w).Encode(status) | 	json.NewEncoder(w).Encode(status) | ||||||
|   | |||||||
							
								
								
									
										38
									
								
								docker/bootstrap.json
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										38
									
								
								docker/bootstrap.json
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,38 @@ | |||||||
|  | { | ||||||
|  |   "metadata": { | ||||||
|  |     "generated_at": "2024-12-19T10:00:00Z", | ||||||
|  |     "cluster_id": "production-cluster", | ||||||
|  |     "version": "1.0.0", | ||||||
|  |     "notes": "Bootstrap configuration for CHORUS scaling - managed by WHOOSH" | ||||||
|  |   }, | ||||||
|  |   "peers": [ | ||||||
|  |     { | ||||||
|  |       "address": "/ip4/10.0.1.10/tcp/9000/p2p/12D3KooWExample1234567890abcdef", | ||||||
|  |       "priority": 100, | ||||||
|  |       "region": "us-east-1", | ||||||
|  |       "roles": ["admin", "stable"], | ||||||
|  |       "enabled": true | ||||||
|  |     }, | ||||||
|  |     { | ||||||
|  |       "address": "/ip4/10.0.1.11/tcp/9000/p2p/12D3KooWExample1234567890abcde2", | ||||||
|  |       "priority": 90, | ||||||
|  |       "region": "us-east-1", | ||||||
|  |       "roles": ["worker", "stable"], | ||||||
|  |       "enabled": true | ||||||
|  |     }, | ||||||
|  |     { | ||||||
|  |       "address": "/ip4/10.0.2.10/tcp/9000/p2p/12D3KooWExample1234567890abcde3", | ||||||
|  |       "priority": 80, | ||||||
|  |       "region": "us-west-2", | ||||||
|  |       "roles": ["worker", "stable"], | ||||||
|  |       "enabled": true | ||||||
|  |     }, | ||||||
|  |     { | ||||||
|  |       "address": "/ip4/10.0.3.10/tcp/9000/p2p/12D3KooWExample1234567890abcde4", | ||||||
|  |       "priority": 70, | ||||||
|  |       "region": "eu-central-1", | ||||||
|  |       "roles": ["worker"], | ||||||
|  |       "enabled": false | ||||||
|  |     } | ||||||
|  |   ] | ||||||
|  | } | ||||||
| @@ -15,7 +15,7 @@ services: | |||||||
|       - CHORUS_AGENT_ID=${CHORUS_AGENT_ID:-}  # Auto-generated if not provided |       - CHORUS_AGENT_ID=${CHORUS_AGENT_ID:-}  # Auto-generated if not provided | ||||||
|       - CHORUS_SPECIALIZATION=${CHORUS_SPECIALIZATION:-general_developer} |       - CHORUS_SPECIALIZATION=${CHORUS_SPECIALIZATION:-general_developer} | ||||||
|       - CHORUS_MAX_TASKS=${CHORUS_MAX_TASKS:-3} |       - CHORUS_MAX_TASKS=${CHORUS_MAX_TASKS:-3} | ||||||
|       - CHORUS_CAPABILITIES=${CHORUS_CAPABILITIES:-general_development,task_coordination,admin_election} |       - CHORUS_CAPABILITIES=general_development,task_coordination,admin_election | ||||||
|        |        | ||||||
|       # Network configuration |       # Network configuration | ||||||
|       - CHORUS_API_PORT=8080 |       - CHORUS_API_PORT=8080 | ||||||
| @@ -23,6 +23,25 @@ services: | |||||||
|       - CHORUS_P2P_PORT=9000 |       - CHORUS_P2P_PORT=9000 | ||||||
|       - CHORUS_BIND_ADDRESS=0.0.0.0 |       - CHORUS_BIND_ADDRESS=0.0.0.0 | ||||||
|  |  | ||||||
|  |       # Scaling optimizations (as per WHOOSH issue #7) | ||||||
|  |       - CHORUS_MDNS_ENABLED=false  # Disabled for container/swarm environments | ||||||
|  |       - CHORUS_DIALS_PER_SEC=5     # Rate limit outbound connections to prevent storms | ||||||
|  |       - CHORUS_MAX_CONCURRENT_DHT=16  # Limit concurrent DHT queries | ||||||
|  |  | ||||||
|  |       # Election stability windows (Medium-risk fix 2.1) | ||||||
|  |       - CHORUS_ELECTION_MIN_TERM=30s  # Minimum time between elections to prevent churn | ||||||
|  |       - CHORUS_LEADER_MIN_TERM=45s    # Minimum time before challenging healthy leader | ||||||
|  |  | ||||||
|  |       # Assignment system for runtime configuration (Medium-risk fix 2.2) | ||||||
|  |       - ASSIGN_URL=${ASSIGN_URL:-}  # Optional: WHOOSH assignment endpoint | ||||||
|  |       - TASK_SLOT=${TASK_SLOT:-}    # Optional: Task slot identifier | ||||||
|  |       - TASK_ID=${TASK_ID:-}        # Optional: Task identifier | ||||||
|  |       - NODE_ID=${NODE_ID:-}        # Optional: Node identifier | ||||||
|  |  | ||||||
|  |       # Bootstrap pool configuration (supports JSON and CSV) | ||||||
|  |       - BOOTSTRAP_JSON=/config/bootstrap.json  # Optional: JSON bootstrap config | ||||||
|  |       - CHORUS_BOOTSTRAP_PEERS=${CHORUS_BOOTSTRAP_PEERS:-}  # CSV fallback | ||||||
|  |        | ||||||
|       # AI configuration - Provider selection |       # AI configuration - Provider selection | ||||||
|       - CHORUS_AI_PROVIDER=${CHORUS_AI_PROVIDER:-resetdata} |       - CHORUS_AI_PROVIDER=${CHORUS_AI_PROVIDER:-resetdata} | ||||||
|        |        | ||||||
| @@ -58,6 +77,11 @@ services: | |||||||
|       - chorus_license_id |       - chorus_license_id | ||||||
|       - resetdata_api_key |       - resetdata_api_key | ||||||
|  |  | ||||||
|  |     # Configuration files | ||||||
|  |     configs: | ||||||
|  |       - source: chorus_bootstrap | ||||||
|  |         target: /config/bootstrap.json | ||||||
|  |        | ||||||
|     # Persistent data storage |     # Persistent data storage | ||||||
|     volumes: |     volumes: | ||||||
|       - chorus_data:/app/data |       - chorus_data:/app/data | ||||||
| @@ -91,7 +115,6 @@ services: | |||||||
|           memory: 128M |           memory: 128M | ||||||
|       placement: |       placement: | ||||||
|         constraints: |         constraints: | ||||||
|           - node.hostname != rosewood |  | ||||||
|           - node.hostname != acacia |           - node.hostname != acacia | ||||||
|         preferences: |         preferences: | ||||||
|           - spread: node.hostname |           - spread: node.hostname | ||||||
| @@ -169,7 +192,14 @@ services: | |||||||
|       # Scaling system configuration |       # Scaling system configuration | ||||||
|       WHOOSH_SCALING_KACHING_URL: "https://kaching.chorus.services" |       WHOOSH_SCALING_KACHING_URL: "https://kaching.chorus.services" | ||||||
|       WHOOSH_SCALING_BACKBEAT_URL: "http://backbeat-pulse:8080" |       WHOOSH_SCALING_BACKBEAT_URL: "http://backbeat-pulse:8080" | ||||||
|       WHOOSH_SCALING_CHORUS_URL: "http://chorus:8080" |       WHOOSH_SCALING_CHORUS_URL: "http://chorus:9000" | ||||||
|  |  | ||||||
|  |       # BACKBEAT integration configuration (temporarily disabled) | ||||||
|  |       WHOOSH_BACKBEAT_ENABLED: "false" | ||||||
|  |       WHOOSH_BACKBEAT_CLUSTER_ID: "chorus-production" | ||||||
|  |       WHOOSH_BACKBEAT_AGENT_ID: "whoosh" | ||||||
|  |       WHOOSH_BACKBEAT_NATS_URL: "nats://backbeat-nats:4222" | ||||||
|  |  | ||||||
|     secrets: |     secrets: | ||||||
|       - whoosh_db_password |       - whoosh_db_password | ||||||
|       - gitea_token |       - gitea_token | ||||||
| @@ -222,7 +252,6 @@ services: | |||||||
|         - traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash |         - traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash | ||||||
|     networks: |     networks: | ||||||
|       - tengig |       - tengig | ||||||
|       - whoosh-backend |  | ||||||
|       - chorus_net |       - chorus_net | ||||||
|     healthcheck: |     healthcheck: | ||||||
|       test: ["CMD", "/app/whoosh", "--health-check"] |       test: ["CMD", "/app/whoosh", "--health-check"] | ||||||
| @@ -260,14 +289,13 @@ services: | |||||||
|           memory: 256M |           memory: 256M | ||||||
|           cpus: '0.5' |           cpus: '0.5' | ||||||
|     networks: |     networks: | ||||||
|       - whoosh-backend |  | ||||||
|       - chorus_net |       - chorus_net | ||||||
|     healthcheck: |     healthcheck: | ||||||
|       test: ["CMD-SHELL", "pg_isready -U whoosh"] |       test: ["CMD-SHELL", "pg_isready -h localhost -p 5432 -U whoosh -d whoosh"] | ||||||
|       interval: 30s |       interval: 30s | ||||||
|       timeout: 10s |       timeout: 10s | ||||||
|       retries: 5 |       retries: 5 | ||||||
|       start_period: 30s |       start_period: 40s | ||||||
|  |  | ||||||
|  |  | ||||||
|   redis: |   redis: | ||||||
| @@ -295,7 +323,6 @@ services: | |||||||
|           memory: 64M |           memory: 64M | ||||||
|           cpus: '0.1' |           cpus: '0.1' | ||||||
|     networks: |     networks: | ||||||
|       - whoosh-backend |  | ||||||
|       - chorus_net |       - chorus_net | ||||||
|     healthcheck: |     healthcheck: | ||||||
|       test: ["CMD", "sh", "-c", "redis-cli --no-auth-warning -a $$(cat /run/secrets/redis_password) ping"] |       test: ["CMD", "sh", "-c", "redis-cli --no-auth-warning -a $$(cat /run/secrets/redis_password) ping"] | ||||||
| @@ -327,9 +354,6 @@ services: | |||||||
|       - "9099:9090" # Expose Prometheus UI |       - "9099:9090" # Expose Prometheus UI | ||||||
|     deploy: |     deploy: | ||||||
|       replicas: 1 |       replicas: 1 | ||||||
|       placement: |  | ||||||
|         constraints: |  | ||||||
|           - node.hostname != rosewood |  | ||||||
|       labels: |       labels: | ||||||
|         - traefik.enable=true |         - traefik.enable=true | ||||||
|         - traefik.http.routers.prometheus.rule=Host(`prometheus.chorus.services`) |         - traefik.http.routers.prometheus.rule=Host(`prometheus.chorus.services`) | ||||||
| @@ -359,9 +383,6 @@ services: | |||||||
|       - "3300:3000" # Expose Grafana UI |       - "3300:3000" # Expose Grafana UI | ||||||
|     deploy: |     deploy: | ||||||
|       replicas: 1 |       replicas: 1 | ||||||
|       placement: |  | ||||||
|         constraints: |  | ||||||
|           - node.hostname != rosewood |  | ||||||
|       labels: |       labels: | ||||||
|         - traefik.enable=true |         - traefik.enable=true | ||||||
|         - traefik.http.routers.grafana.rule=Host(`grafana.chorus.services`) |         - traefik.http.routers.grafana.rule=Host(`grafana.chorus.services`) | ||||||
| @@ -424,8 +445,6 @@ services: | |||||||
|       placement: |       placement: | ||||||
|         preferences: |         preferences: | ||||||
|           - spread: node.hostname |           - spread: node.hostname | ||||||
|         constraints: |  | ||||||
|           - node.hostname != rosewood  # Avoid intermittent gaming PC |  | ||||||
|       resources: |       resources: | ||||||
|         limits: |         limits: | ||||||
|           memory: 256M |           memory: 256M | ||||||
| @@ -493,8 +512,6 @@ services: | |||||||
|       placement: |       placement: | ||||||
|         preferences: |         preferences: | ||||||
|           - spread: node.hostname |           - spread: node.hostname | ||||||
|         constraints: |  | ||||||
|           - node.hostname != rosewood |  | ||||||
|       resources: |       resources: | ||||||
|         limits: |         limits: | ||||||
|           memory: 512M         # Larger for window aggregation |           memory: 512M         # Larger for window aggregation | ||||||
| @@ -527,7 +544,6 @@ services: | |||||||
|   backbeat-nats: |   backbeat-nats: | ||||||
|     image: nats:2.9-alpine |     image: nats:2.9-alpine | ||||||
|     command: ["--jetstream"] |     command: ["--jetstream"] | ||||||
|      |  | ||||||
|     deploy: |     deploy: | ||||||
|       replicas: 1 |       replicas: 1 | ||||||
|       restart_policy: |       restart_policy: | ||||||
| @@ -538,8 +554,6 @@ services: | |||||||
|       placement: |       placement: | ||||||
|         preferences: |         preferences: | ||||||
|           - spread: node.hostname |           - spread: node.hostname | ||||||
|         constraints: |  | ||||||
|           - node.hostname != rosewood |  | ||||||
|       resources: |       resources: | ||||||
|         limits: |         limits: | ||||||
|           memory: 256M |           memory: 256M | ||||||
| @@ -547,10 +561,8 @@ services: | |||||||
|         reservations: |         reservations: | ||||||
|           memory: 128M |           memory: 128M | ||||||
|           cpus: '0.25' |           cpus: '0.25' | ||||||
|      |  | ||||||
|     networks: |     networks: | ||||||
|       - chorus_net |       - chorus_net | ||||||
|      |  | ||||||
|     # Container logging |     # Container logging | ||||||
|     logging: |     logging: | ||||||
|       driver: "json-file" |       driver: "json-file" | ||||||
| @@ -603,18 +615,14 @@ networks: | |||||||
|   tengig: |   tengig: | ||||||
|     external: true |     external: true | ||||||
|  |  | ||||||
|   whoosh-backend: |  | ||||||
|     driver: overlay |  | ||||||
|     attachable: false |  | ||||||
|  |  | ||||||
|   chorus_net: |   chorus_net: | ||||||
|     driver: overlay |     driver: overlay | ||||||
|     attachable: true |     attachable: true | ||||||
|     ipam: |  | ||||||
|       config: |  | ||||||
|         - subnet: 10.201.0.0/24 |  | ||||||
|  |  | ||||||
|  |  | ||||||
|  | configs: | ||||||
|  |   chorus_bootstrap: | ||||||
|  |     file: ./bootstrap.json | ||||||
|  |  | ||||||
| secrets: | secrets: | ||||||
|   chorus_license_id: |   chorus_license_id: | ||||||
|   | |||||||
| @@ -105,6 +105,7 @@ func (t *SimpleTaskTracker) publishTaskCompletion(taskID string, success bool, s | |||||||
| // SharedRuntime contains all the shared P2P infrastructure components | // SharedRuntime contains all the shared P2P infrastructure components | ||||||
| type SharedRuntime struct { | type SharedRuntime struct { | ||||||
| 	Config              *config.Config | 	Config              *config.Config | ||||||
|  | 	RuntimeConfig       *config.RuntimeConfig | ||||||
| 	Logger              *SimpleLogger | 	Logger              *SimpleLogger | ||||||
| 	Context             context.Context | 	Context             context.Context | ||||||
| 	Cancel              context.CancelFunc | 	Cancel              context.CancelFunc | ||||||
| @@ -149,6 +150,28 @@ func Initialize(appMode string) (*SharedRuntime, error) { | |||||||
| 	runtime.Config = cfg | 	runtime.Config = cfg | ||||||
|  |  | ||||||
| 	runtime.Logger.Info("✅ Configuration loaded successfully") | 	runtime.Logger.Info("✅ Configuration loaded successfully") | ||||||
|  |  | ||||||
|  | 	// Initialize runtime configuration with assignment support | ||||||
|  | 	runtime.RuntimeConfig = config.NewRuntimeConfig(cfg) | ||||||
|  |  | ||||||
|  | 	// Load assignment if ASSIGN_URL is configured | ||||||
|  | 	if assignURL := os.Getenv("ASSIGN_URL"); assignURL != "" { | ||||||
|  | 		runtime.Logger.Info("📡 Loading assignment from WHOOSH: %s", assignURL) | ||||||
|  |  | ||||||
|  | 		ctx, cancel := context.WithTimeout(runtime.Context, 10*time.Second) | ||||||
|  | 		if err := runtime.RuntimeConfig.LoadAssignment(ctx, assignURL); err != nil { | ||||||
|  | 			runtime.Logger.Warn("⚠️ Failed to load assignment (continuing with base config): %v", err) | ||||||
|  | 		} else { | ||||||
|  | 			runtime.Logger.Info("✅ Assignment loaded successfully") | ||||||
|  | 		} | ||||||
|  | 		cancel() | ||||||
|  |  | ||||||
|  | 		// Start reload handler for SIGHUP | ||||||
|  | 		runtime.RuntimeConfig.StartReloadHandler(runtime.Context, assignURL) | ||||||
|  | 		runtime.Logger.Info("📡 SIGHUP reload handler started for assignment updates") | ||||||
|  | 	} else { | ||||||
|  | 		runtime.Logger.Info("⚪ No ASSIGN_URL configured, using static configuration") | ||||||
|  | 	} | ||||||
| 	runtime.Logger.Info("🤖 Agent ID: %s", cfg.Agent.ID) | 	runtime.Logger.Info("🤖 Agent ID: %s", cfg.Agent.ID) | ||||||
| 	runtime.Logger.Info("🎯 Specialization: %s", cfg.Agent.Specialization) | 	runtime.Logger.Info("🎯 Specialization: %s", cfg.Agent.Specialization) | ||||||
|  |  | ||||||
| @@ -225,12 +248,17 @@ func Initialize(appMode string) (*SharedRuntime, error) { | |||||||
| 	runtime.HypercoreLog = hlog | 	runtime.HypercoreLog = hlog | ||||||
| 	runtime.Logger.Info("📝 Hypercore logger initialized") | 	runtime.Logger.Info("📝 Hypercore logger initialized") | ||||||
|  |  | ||||||
| 	// Initialize mDNS discovery | 	// Initialize mDNS discovery (disabled in container environments for scaling) | ||||||
| 	mdnsDiscovery, err := discovery.NewMDNSDiscovery(ctx, node.Host(), "chorus-peer-discovery") | 	if cfg.V2.DHT.MDNSEnabled { | ||||||
| 	if err != nil { | 		mdnsDiscovery, err := discovery.NewMDNSDiscovery(ctx, node.Host(), "chorus-peer-discovery") | ||||||
| 		return nil, fmt.Errorf("failed to create mDNS discovery: %v", err) | 		if err != nil { | ||||||
|  | 			return nil, fmt.Errorf("failed to create mDNS discovery: %v", err) | ||||||
|  | 		} | ||||||
|  | 		runtime.MDNSDiscovery = mdnsDiscovery | ||||||
|  | 		runtime.Logger.Info("🔍 mDNS discovery enabled for local network") | ||||||
|  | 	} else { | ||||||
|  | 		runtime.Logger.Info("⚪ mDNS discovery disabled (recommended for container/swarm deployments)") | ||||||
| 	} | 	} | ||||||
| 	runtime.MDNSDiscovery = mdnsDiscovery |  | ||||||
|  |  | ||||||
| 	// Initialize PubSub with hypercore logging | 	// Initialize PubSub with hypercore logging | ||||||
| 	ps, err := pubsub.NewPubSubWithLogger(ctx, node.Host(), "chorus/coordination/v1", "hmmm/meta-discussion/v1", hlog) | 	ps, err := pubsub.NewPubSubWithLogger(ctx, node.Host(), "chorus/coordination/v1", "hmmm/meta-discussion/v1", hlog) | ||||||
| @@ -283,6 +311,7 @@ func (r *SharedRuntime) Cleanup() { | |||||||
|  |  | ||||||
| 	if r.MDNSDiscovery != nil { | 	if r.MDNSDiscovery != nil { | ||||||
| 		r.MDNSDiscovery.Close() | 		r.MDNSDiscovery.Close() | ||||||
|  | 		r.Logger.Info("🔍 mDNS discovery closed") | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	if r.PubSub != nil { | 	if r.PubSub != nil { | ||||||
| @@ -407,8 +436,20 @@ func (r *SharedRuntime) initializeDHTStorage() error { | |||||||
| 				} | 				} | ||||||
| 			} | 			} | ||||||
|  |  | ||||||
| 			// Connect to bootstrap peers if configured | 			// Connect to bootstrap peers (with assignment override support) | ||||||
| 			for _, addrStr := range r.Config.V2.DHT.BootstrapPeers { | 			bootstrapPeers := r.RuntimeConfig.GetBootstrapPeers() | ||||||
|  | 			if len(bootstrapPeers) == 0 { | ||||||
|  | 				bootstrapPeers = r.Config.V2.DHT.BootstrapPeers | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			// Apply join stagger if configured | ||||||
|  | 			joinStagger := r.RuntimeConfig.GetJoinStagger() | ||||||
|  | 			if joinStagger > 0 { | ||||||
|  | 				r.Logger.Info("⏱️ Applying join stagger delay: %v", joinStagger) | ||||||
|  | 				time.Sleep(joinStagger) | ||||||
|  | 			} | ||||||
|  |  | ||||||
|  | 			for _, addrStr := range bootstrapPeers { | ||||||
| 				addr, err := multiaddr.NewMultiaddr(addrStr) | 				addr, err := multiaddr.NewMultiaddr(addrStr) | ||||||
| 				if err != nil { | 				if err != nil { | ||||||
| 					r.Logger.Warn("⚠️ Invalid bootstrap address %s: %v", addrStr, err) | 					r.Logger.Warn("⚠️ Invalid bootstrap address %s: %v", addrStr, err) | ||||||
|   | |||||||
							
								
								
									
										23
									
								
								p2p/node.go
									
									
									
									
									
								
							
							
						
						
									
										23
									
								
								p2p/node.go
									
									
									
									
									
								
							| @@ -6,16 +6,18 @@ import ( | |||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| 	"chorus/pkg/dht" | 	"chorus/pkg/dht" | ||||||
|  |  | ||||||
| 	"github.com/libp2p/go-libp2p" | 	"github.com/libp2p/go-libp2p" | ||||||
|  | 	kaddht "github.com/libp2p/go-libp2p-kad-dht" | ||||||
| 	"github.com/libp2p/go-libp2p/core/host" | 	"github.com/libp2p/go-libp2p/core/host" | ||||||
| 	"github.com/libp2p/go-libp2p/core/peer" | 	"github.com/libp2p/go-libp2p/core/peer" | ||||||
|  | 	"github.com/libp2p/go-libp2p/p2p/net/connmgr" | ||||||
| 	"github.com/libp2p/go-libp2p/p2p/security/noise" | 	"github.com/libp2p/go-libp2p/p2p/security/noise" | ||||||
| 	"github.com/libp2p/go-libp2p/p2p/transport/tcp" | 	"github.com/libp2p/go-libp2p/p2p/transport/tcp" | ||||||
| 	kaddht "github.com/libp2p/go-libp2p-kad-dht" |  | ||||||
| 	"github.com/multiformats/go-multiaddr" | 	"github.com/multiformats/go-multiaddr" | ||||||
| ) | ) | ||||||
|  |  | ||||||
| // Node represents a Bzzz P2P node | // Node represents a CHORUS P2P node | ||||||
| type Node struct { | type Node struct { | ||||||
| 	host   host.Host | 	host   host.Host | ||||||
| 	ctx    context.Context | 	ctx    context.Context | ||||||
| @@ -44,13 +46,26 @@ func NewNode(ctx context.Context, opts ...Option) (*Node, error) { | |||||||
| 		listenAddrs = append(listenAddrs, ma) | 		listenAddrs = append(listenAddrs, ma) | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Create libp2p host with security and transport options | 	// Create connection manager with scaling-optimized limits | ||||||
|  | 	connManager, err := connmgr.NewConnManager( | ||||||
|  | 		config.LowWatermark,                     // Low watermark (32) | ||||||
|  | 		config.HighWatermark,                    // High watermark (128) | ||||||
|  | 		connmgr.WithGracePeriod(30*time.Second), // Grace period before pruning | ||||||
|  | 	) | ||||||
|  | 	if err != nil { | ||||||
|  | 		cancel() | ||||||
|  | 		return nil, fmt.Errorf("failed to create connection manager: %w", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Create libp2p host with security, transport, and scaling options | ||||||
| 	h, err := libp2p.New( | 	h, err := libp2p.New( | ||||||
| 		libp2p.ListenAddrs(listenAddrs...), | 		libp2p.ListenAddrs(listenAddrs...), | ||||||
| 		libp2p.Security(noise.ID, noise.New), | 		libp2p.Security(noise.ID, noise.New), | ||||||
| 		libp2p.Transport(tcp.NewTCPTransport), | 		libp2p.Transport(tcp.NewTCPTransport), | ||||||
| 		libp2p.DefaultMuxers, | 		libp2p.DefaultMuxers, | ||||||
| 		libp2p.EnableRelay(), | 		libp2p.EnableRelay(), | ||||||
|  | 		libp2p.ConnectionManager(connManager), // Add connection management | ||||||
|  | 		libp2p.EnableAutoRelay(),              // Enable AutoRelay for container environments | ||||||
| 	) | 	) | ||||||
| 	if err != nil { | 	if err != nil { | ||||||
| 		cancel() | 		cancel() | ||||||
| @@ -157,7 +172,7 @@ func (n *Node) startBackgroundTasks() { | |||||||
| // logConnectionStatus logs the current connection status | // logConnectionStatus logs the current connection status | ||||||
| func (n *Node) logConnectionStatus() { | func (n *Node) logConnectionStatus() { | ||||||
| 	peers := n.Peers() | 	peers := n.Peers() | ||||||
| 	fmt.Printf("🐝 Bzzz Node Status - ID: %s, Connected Peers: %d\n",  | 	fmt.Printf("CHORUS Node Status - ID: %s, Connected Peers: %d\n", | ||||||
| 		n.ID().ShortString(), len(peers)) | 		n.ID().ShortString(), len(peers)) | ||||||
|  |  | ||||||
| 	if len(peers) > 0 { | 	if len(peers) > 0 { | ||||||
|   | |||||||
							
								
								
									
										517
									
								
								pkg/config/assignment.go
									
									
									
									
									
										Normal file
									
								
							
							
						
						
									
										517
									
								
								pkg/config/assignment.go
									
									
									
									
									
										Normal file
									
								
							| @@ -0,0 +1,517 @@ | |||||||
|  | package config | ||||||
|  |  | ||||||
|  | import ( | ||||||
|  | 	"context" | ||||||
|  | 	"encoding/json" | ||||||
|  | 	"fmt" | ||||||
|  | 	"io" | ||||||
|  | 	"net/http" | ||||||
|  | 	"os" | ||||||
|  | 	"os/signal" | ||||||
|  | 	"strings" | ||||||
|  | 	"sync" | ||||||
|  | 	"syscall" | ||||||
|  | 	"time" | ||||||
|  | ) | ||||||
|  |  | ||||||
|  | // RuntimeConfig manages runtime configuration with assignment overrides | ||||||
|  | type RuntimeConfig struct { | ||||||
|  | 	Base     *Config              `json:"base"` | ||||||
|  | 	Override *AssignmentConfig    `json:"override"` | ||||||
|  | 	mu       sync.RWMutex | ||||||
|  | 	reloadCh chan struct{} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // AssignmentConfig represents runtime assignment from WHOOSH | ||||||
|  | type AssignmentConfig struct { | ||||||
|  | 	// Assignment metadata | ||||||
|  | 	AssignmentID   string    `json:"assignment_id"` | ||||||
|  | 	TaskSlot       string    `json:"task_slot"` | ||||||
|  | 	TaskID         string    `json:"task_id"` | ||||||
|  | 	ClusterID      string    `json:"cluster_id"` | ||||||
|  | 	AssignedAt     time.Time `json:"assigned_at"` | ||||||
|  | 	ExpiresAt      time.Time `json:"expires_at,omitempty"` | ||||||
|  |  | ||||||
|  | 	// Agent configuration overrides | ||||||
|  | 	Agent     *AgentConfig      `json:"agent,omitempty"` | ||||||
|  | 	Network   *NetworkConfig    `json:"network,omitempty"` | ||||||
|  | 	AI        *AIConfig         `json:"ai,omitempty"` | ||||||
|  | 	Logging   *LoggingConfig    `json:"logging,omitempty"` | ||||||
|  |  | ||||||
|  | 	// Bootstrap configuration for scaling | ||||||
|  | 	BootstrapPeers   []string `json:"bootstrap_peers,omitempty"` | ||||||
|  | 	JoinStagger      int      `json:"join_stagger_ms,omitempty"` | ||||||
|  |  | ||||||
|  | 	// Runtime capabilities | ||||||
|  | 	RuntimeCapabilities []string          `json:"runtime_capabilities,omitempty"` | ||||||
|  |  | ||||||
|  | 	// Key derivation for encryption | ||||||
|  | 	RoleKey          string            `json:"role_key,omitempty"` | ||||||
|  | 	ClusterSecret    string            `json:"cluster_secret,omitempty"` | ||||||
|  |  | ||||||
|  | 	// Custom fields | ||||||
|  | 	Custom           map[string]interface{} `json:"custom,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // AssignmentRequest represents a request for assignment from WHOOSH | ||||||
|  | type AssignmentRequest struct { | ||||||
|  | 	ClusterID  string `json:"cluster_id"` | ||||||
|  | 	TaskSlot   string `json:"task_slot,omitempty"` | ||||||
|  | 	TaskID     string `json:"task_id,omitempty"` | ||||||
|  | 	AgentID    string `json:"agent_id"` | ||||||
|  | 	NodeID     string `json:"node_id"` | ||||||
|  | 	Timestamp  time.Time `json:"timestamp"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // NewRuntimeConfig creates a new runtime configuration manager | ||||||
|  | func NewRuntimeConfig(baseConfig *Config) *RuntimeConfig { | ||||||
|  | 	return &RuntimeConfig{ | ||||||
|  | 		Base:     baseConfig, | ||||||
|  | 		Override: nil, | ||||||
|  | 		reloadCh: make(chan struct{}, 1), | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Get returns the effective configuration value, with override taking precedence | ||||||
|  | func (rc *RuntimeConfig) Get(field string) interface{} { | ||||||
|  | 	rc.mu.RLock() | ||||||
|  | 	defer rc.mu.RUnlock() | ||||||
|  |  | ||||||
|  | 	// Try override first | ||||||
|  | 	if rc.Override != nil { | ||||||
|  | 		if value := rc.getFromAssignment(field); value != nil { | ||||||
|  | 			return value | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Fall back to base configuration | ||||||
|  | 	return rc.getFromBase(field) | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // GetConfig returns a merged configuration with overrides applied | ||||||
|  | func (rc *RuntimeConfig) GetConfig() *Config { | ||||||
|  | 	rc.mu.RLock() | ||||||
|  | 	defer rc.mu.RUnlock() | ||||||
|  |  | ||||||
|  | 	if rc.Override == nil { | ||||||
|  | 		return rc.Base | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Create a copy of base config | ||||||
|  | 	merged := *rc.Base | ||||||
|  |  | ||||||
|  | 	// Apply overrides | ||||||
|  | 	if rc.Override.Agent != nil { | ||||||
|  | 		rc.mergeAgentConfig(&merged.Agent, rc.Override.Agent) | ||||||
|  | 	} | ||||||
|  | 	if rc.Override.Network != nil { | ||||||
|  | 		rc.mergeNetworkConfig(&merged.Network, rc.Override.Network) | ||||||
|  | 	} | ||||||
|  | 	if rc.Override.AI != nil { | ||||||
|  | 		rc.mergeAIConfig(&merged.AI, rc.Override.AI) | ||||||
|  | 	} | ||||||
|  | 	if rc.Override.Logging != nil { | ||||||
|  | 		rc.mergeLoggingConfig(&merged.Logging, rc.Override.Logging) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &merged | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // LoadAssignment fetches assignment from WHOOSH and applies it | ||||||
|  | func (rc *RuntimeConfig) LoadAssignment(ctx context.Context, assignURL string) error { | ||||||
|  | 	if assignURL == "" { | ||||||
|  | 		return nil // No assignment URL configured | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Build assignment request | ||||||
|  | 	agentID := rc.Base.Agent.ID | ||||||
|  | 	if agentID == "" { | ||||||
|  | 		agentID = "unknown" | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	req := AssignmentRequest{ | ||||||
|  | 		ClusterID: rc.Base.License.ClusterID, | ||||||
|  | 		TaskSlot:  os.Getenv("TASK_SLOT"), | ||||||
|  | 		TaskID:    os.Getenv("TASK_ID"), | ||||||
|  | 		AgentID:   agentID, | ||||||
|  | 		NodeID:    os.Getenv("NODE_ID"), | ||||||
|  | 		Timestamp: time.Now(), | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Make HTTP request to WHOOSH | ||||||
|  | 	assignment, err := rc.fetchAssignment(ctx, assignURL, req) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return fmt.Errorf("failed to fetch assignment: %w", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Apply assignment | ||||||
|  | 	rc.mu.Lock() | ||||||
|  | 	rc.Override = assignment | ||||||
|  | 	rc.mu.Unlock() | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // StartReloadHandler starts a signal handler for SIGHUP configuration reloads | ||||||
|  | func (rc *RuntimeConfig) StartReloadHandler(ctx context.Context, assignURL string) { | ||||||
|  | 	sigCh := make(chan os.Signal, 1) | ||||||
|  | 	signal.Notify(sigCh, syscall.SIGHUP) | ||||||
|  |  | ||||||
|  | 	go func() { | ||||||
|  | 		for { | ||||||
|  | 			select { | ||||||
|  | 			case <-ctx.Done(): | ||||||
|  | 				return | ||||||
|  | 			case <-sigCh: | ||||||
|  | 				fmt.Println("📡 Received SIGHUP, reloading assignment configuration...") | ||||||
|  | 				if err := rc.LoadAssignment(ctx, assignURL); err != nil { | ||||||
|  | 					fmt.Printf("❌ Failed to reload assignment: %v\n", err) | ||||||
|  | 				} else { | ||||||
|  | 					fmt.Println("✅ Assignment configuration reloaded successfully") | ||||||
|  | 				} | ||||||
|  | 			case <-rc.reloadCh: | ||||||
|  | 				// Manual reload trigger | ||||||
|  | 				if err := rc.LoadAssignment(ctx, assignURL); err != nil { | ||||||
|  | 					fmt.Printf("❌ Failed to reload assignment: %v\n", err) | ||||||
|  | 				} else { | ||||||
|  | 					fmt.Println("✅ Assignment configuration reloaded successfully") | ||||||
|  | 				} | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	}() | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Reload triggers a manual configuration reload | ||||||
|  | func (rc *RuntimeConfig) Reload() { | ||||||
|  | 	select { | ||||||
|  | 	case rc.reloadCh <- struct{}{}: | ||||||
|  | 	default: | ||||||
|  | 		// Channel full, reload already pending | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // fetchAssignment makes HTTP request to WHOOSH assignment API | ||||||
|  | func (rc *RuntimeConfig) fetchAssignment(ctx context.Context, assignURL string, req AssignmentRequest) (*AssignmentConfig, error) { | ||||||
|  | 	// Build query parameters | ||||||
|  | 	queryParams := fmt.Sprintf("?cluster_id=%s&agent_id=%s&node_id=%s", | ||||||
|  | 		req.ClusterID, req.AgentID, req.NodeID) | ||||||
|  |  | ||||||
|  | 	if req.TaskSlot != "" { | ||||||
|  | 		queryParams += "&task_slot=" + req.TaskSlot | ||||||
|  | 	} | ||||||
|  | 	if req.TaskID != "" { | ||||||
|  | 		queryParams += "&task_id=" + req.TaskID | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Create HTTP request | ||||||
|  | 	httpReq, err := http.NewRequestWithContext(ctx, "GET", assignURL+queryParams, nil) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("failed to create assignment request: %w", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	httpReq.Header.Set("Accept", "application/json") | ||||||
|  | 	httpReq.Header.Set("User-Agent", "CHORUS-Agent/0.1.0") | ||||||
|  |  | ||||||
|  | 	// Make request with timeout | ||||||
|  | 	client := &http.Client{Timeout: 10 * time.Second} | ||||||
|  | 	resp, err := client.Do(httpReq) | ||||||
|  | 	if err != nil { | ||||||
|  | 		return nil, fmt.Errorf("assignment request failed: %w", err) | ||||||
|  | 	} | ||||||
|  | 	defer resp.Body.Close() | ||||||
|  |  | ||||||
|  | 	if resp.StatusCode == http.StatusNotFound { | ||||||
|  | 		// No assignment available | ||||||
|  | 		return nil, nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if resp.StatusCode != http.StatusOK { | ||||||
|  | 		body, _ := io.ReadAll(resp.Body) | ||||||
|  | 		return nil, fmt.Errorf("assignment request failed with status %d: %s", resp.StatusCode, string(body)) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Parse assignment response | ||||||
|  | 	var assignment AssignmentConfig | ||||||
|  | 	if err := json.NewDecoder(resp.Body).Decode(&assignment); err != nil { | ||||||
|  | 		return nil, fmt.Errorf("failed to decode assignment response: %w", err) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return &assignment, nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Helper methods for getting values from different sources | ||||||
|  | func (rc *RuntimeConfig) getFromAssignment(field string) interface{} { | ||||||
|  | 	if rc.Override == nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Simple field mapping - in a real implementation, you'd use reflection | ||||||
|  | 	// or a more sophisticated field mapping system | ||||||
|  | 	switch field { | ||||||
|  | 	case "agent.id": | ||||||
|  | 		if rc.Override.Agent != nil && rc.Override.Agent.ID != "" { | ||||||
|  | 			return rc.Override.Agent.ID | ||||||
|  | 		} | ||||||
|  | 	case "agent.role": | ||||||
|  | 		if rc.Override.Agent != nil && rc.Override.Agent.Role != "" { | ||||||
|  | 			return rc.Override.Agent.Role | ||||||
|  | 		} | ||||||
|  | 	case "agent.capabilities": | ||||||
|  | 		if len(rc.Override.RuntimeCapabilities) > 0 { | ||||||
|  | 			return rc.Override.RuntimeCapabilities | ||||||
|  | 		} | ||||||
|  | 	case "bootstrap_peers": | ||||||
|  | 		if len(rc.Override.BootstrapPeers) > 0 { | ||||||
|  | 			return rc.Override.BootstrapPeers | ||||||
|  | 		} | ||||||
|  | 	case "join_stagger": | ||||||
|  | 		if rc.Override.JoinStagger > 0 { | ||||||
|  | 			return rc.Override.JoinStagger | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Check custom fields | ||||||
|  | 	if rc.Override.Custom != nil { | ||||||
|  | 		if val, exists := rc.Override.Custom[field]; exists { | ||||||
|  | 			return val | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return nil | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rc *RuntimeConfig) getFromBase(field string) interface{} { | ||||||
|  | 	// Simple field mapping for base config | ||||||
|  | 	switch field { | ||||||
|  | 	case "agent.id": | ||||||
|  | 		return rc.Base.Agent.ID | ||||||
|  | 	case "agent.role": | ||||||
|  | 		return rc.Base.Agent.Role | ||||||
|  | 	case "agent.capabilities": | ||||||
|  | 		return rc.Base.Agent.Capabilities | ||||||
|  | 	default: | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // Helper methods for merging configuration sections | ||||||
|  | func (rc *RuntimeConfig) mergeAgentConfig(base *AgentConfig, override *AgentConfig) { | ||||||
|  | 	if override.ID != "" { | ||||||
|  | 		base.ID = override.ID | ||||||
|  | 	} | ||||||
|  | 	if override.Specialization != "" { | ||||||
|  | 		base.Specialization = override.Specialization | ||||||
|  | 	} | ||||||
|  | 	if override.MaxTasks > 0 { | ||||||
|  | 		base.MaxTasks = override.MaxTasks | ||||||
|  | 	} | ||||||
|  | 	if len(override.Capabilities) > 0 { | ||||||
|  | 		base.Capabilities = override.Capabilities | ||||||
|  | 	} | ||||||
|  | 	if len(override.Models) > 0 { | ||||||
|  | 		base.Models = override.Models | ||||||
|  | 	} | ||||||
|  | 	if override.Role != "" { | ||||||
|  | 		base.Role = override.Role | ||||||
|  | 	} | ||||||
|  | 	if override.Project != "" { | ||||||
|  | 		base.Project = override.Project | ||||||
|  | 	} | ||||||
|  | 	if len(override.Expertise) > 0 { | ||||||
|  | 		base.Expertise = override.Expertise | ||||||
|  | 	} | ||||||
|  | 	if override.ReportsTo != "" { | ||||||
|  | 		base.ReportsTo = override.ReportsTo | ||||||
|  | 	} | ||||||
|  | 	if len(override.Deliverables) > 0 { | ||||||
|  | 		base.Deliverables = override.Deliverables | ||||||
|  | 	} | ||||||
|  | 	if override.ModelSelectionWebhook != "" { | ||||||
|  | 		base.ModelSelectionWebhook = override.ModelSelectionWebhook | ||||||
|  | 	} | ||||||
|  | 	if override.DefaultReasoningModel != "" { | ||||||
|  | 		base.DefaultReasoningModel = override.DefaultReasoningModel | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rc *RuntimeConfig) mergeNetworkConfig(base *NetworkConfig, override *NetworkConfig) { | ||||||
|  | 	if override.P2PPort > 0 { | ||||||
|  | 		base.P2PPort = override.P2PPort | ||||||
|  | 	} | ||||||
|  | 	if override.APIPort > 0 { | ||||||
|  | 		base.APIPort = override.APIPort | ||||||
|  | 	} | ||||||
|  | 	if override.HealthPort > 0 { | ||||||
|  | 		base.HealthPort = override.HealthPort | ||||||
|  | 	} | ||||||
|  | 	if override.BindAddr != "" { | ||||||
|  | 		base.BindAddr = override.BindAddr | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rc *RuntimeConfig) mergeAIConfig(base *AIConfig, override *AIConfig) { | ||||||
|  | 	if override.Provider != "" { | ||||||
|  | 		base.Provider = override.Provider | ||||||
|  | 	} | ||||||
|  | 	// Merge Ollama config if present | ||||||
|  | 	if override.Ollama.Endpoint != "" { | ||||||
|  | 		base.Ollama.Endpoint = override.Ollama.Endpoint | ||||||
|  | 	} | ||||||
|  | 	if override.Ollama.Timeout > 0 { | ||||||
|  | 		base.Ollama.Timeout = override.Ollama.Timeout | ||||||
|  | 	} | ||||||
|  | 	// Merge ResetData config if present | ||||||
|  | 	if override.ResetData.BaseURL != "" { | ||||||
|  | 		base.ResetData.BaseURL = override.ResetData.BaseURL | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | func (rc *RuntimeConfig) mergeLoggingConfig(base *LoggingConfig, override *LoggingConfig) { | ||||||
|  | 	if override.Level != "" { | ||||||
|  | 		base.Level = override.Level | ||||||
|  | 	} | ||||||
|  | 	if override.Format != "" { | ||||||
|  | 		base.Format = override.Format | ||||||
|  | 	} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // BootstrapConfig represents JSON bootstrap configuration | ||||||
|  | type BootstrapConfig struct { | ||||||
|  | 	Peers     []BootstrapPeer `json:"peers"` | ||||||
|  | 	Metadata  BootstrapMeta   `json:"metadata,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // BootstrapPeer represents a single bootstrap peer | ||||||
|  | type BootstrapPeer struct { | ||||||
|  | 	Address   string   `json:"address"` | ||||||
|  | 	Priority  int      `json:"priority,omitempty"` | ||||||
|  | 	Region    string   `json:"region,omitempty"` | ||||||
|  | 	Roles     []string `json:"roles,omitempty"` | ||||||
|  | 	Enabled   bool     `json:"enabled"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // BootstrapMeta contains metadata about the bootstrap configuration | ||||||
|  | type BootstrapMeta struct { | ||||||
|  | 	GeneratedAt time.Time `json:"generated_at,omitempty"` | ||||||
|  | 	ClusterID   string    `json:"cluster_id,omitempty"` | ||||||
|  | 	Version     string    `json:"version,omitempty"` | ||||||
|  | 	Notes       string    `json:"notes,omitempty"` | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // GetBootstrapPeers returns bootstrap peers with assignment override support and JSON config | ||||||
|  | func (rc *RuntimeConfig) GetBootstrapPeers() []string { | ||||||
|  | 	rc.mu.RLock() | ||||||
|  | 	defer rc.mu.RUnlock() | ||||||
|  |  | ||||||
|  | 	// First priority: Assignment override from WHOOSH | ||||||
|  | 	if rc.Override != nil && len(rc.Override.BootstrapPeers) > 0 { | ||||||
|  | 		return rc.Override.BootstrapPeers | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Second priority: JSON bootstrap configuration | ||||||
|  | 	if jsonPeers := rc.loadBootstrapJSON(); len(jsonPeers) > 0 { | ||||||
|  | 		return jsonPeers | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Third priority: Environment variable (CSV format) | ||||||
|  | 	if bootstrapEnv := os.Getenv("CHORUS_BOOTSTRAP_PEERS"); bootstrapEnv != "" { | ||||||
|  | 		peers := strings.Split(bootstrapEnv, ",") | ||||||
|  | 		// Trim whitespace from each peer | ||||||
|  | 		for i, peer := range peers { | ||||||
|  | 			peers[i] = strings.TrimSpace(peer) | ||||||
|  | 		} | ||||||
|  | 		return peers | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return []string{} | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // loadBootstrapJSON loads bootstrap peers from JSON file | ||||||
|  | func (rc *RuntimeConfig) loadBootstrapJSON() []string { | ||||||
|  | 	jsonPath := os.Getenv("BOOTSTRAP_JSON") | ||||||
|  | 	if jsonPath == "" { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Check if file exists | ||||||
|  | 	if _, err := os.Stat(jsonPath); os.IsNotExist(err) { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Read and parse JSON file | ||||||
|  | 	data, err := os.ReadFile(jsonPath) | ||||||
|  | 	if err != nil { | ||||||
|  | 		fmt.Printf("⚠️ Failed to read bootstrap JSON file %s: %v\n", jsonPath, err) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	var config BootstrapConfig | ||||||
|  | 	if err := json.Unmarshal(data, &config); err != nil { | ||||||
|  | 		fmt.Printf("⚠️ Failed to parse bootstrap JSON file %s: %v\n", jsonPath, err) | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Extract enabled peer addresses, sorted by priority | ||||||
|  | 	var peers []string | ||||||
|  | 	enabledPeers := make([]BootstrapPeer, 0, len(config.Peers)) | ||||||
|  |  | ||||||
|  | 	// Filter enabled peers | ||||||
|  | 	for _, peer := range config.Peers { | ||||||
|  | 		if peer.Enabled && peer.Address != "" { | ||||||
|  | 			enabledPeers = append(enabledPeers, peer) | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Sort by priority (higher priority first) | ||||||
|  | 	for i := 0; i < len(enabledPeers)-1; i++ { | ||||||
|  | 		for j := i + 1; j < len(enabledPeers); j++ { | ||||||
|  | 			if enabledPeers[j].Priority > enabledPeers[i].Priority { | ||||||
|  | 				enabledPeers[i], enabledPeers[j] = enabledPeers[j], enabledPeers[i] | ||||||
|  | 			} | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Extract addresses | ||||||
|  | 	for _, peer := range enabledPeers { | ||||||
|  | 		peers = append(peers, peer.Address) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	if len(peers) > 0 { | ||||||
|  | 		fmt.Printf("📋 Loaded %d bootstrap peers from JSON: %s\n", len(peers), jsonPath) | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return peers | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // GetJoinStagger returns join stagger delay with assignment override support | ||||||
|  | func (rc *RuntimeConfig) GetJoinStagger() time.Duration { | ||||||
|  | 	rc.mu.RLock() | ||||||
|  | 	defer rc.mu.RUnlock() | ||||||
|  |  | ||||||
|  | 	if rc.Override != nil && rc.Override.JoinStagger > 0 { | ||||||
|  | 		return time.Duration(rc.Override.JoinStagger) * time.Millisecond | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Fall back to environment variable | ||||||
|  | 	if staggerEnv := os.Getenv("CHORUS_JOIN_STAGGER_MS"); staggerEnv != "" { | ||||||
|  | 		if ms, err := time.ParseDuration(staggerEnv + "ms"); err == nil { | ||||||
|  | 			return ms | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	return 0 | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // GetAssignmentInfo returns current assignment metadata | ||||||
|  | func (rc *RuntimeConfig) GetAssignmentInfo() *AssignmentConfig { | ||||||
|  | 	rc.mu.RLock() | ||||||
|  | 	defer rc.mu.RUnlock() | ||||||
|  |  | ||||||
|  | 	if rc.Override == nil { | ||||||
|  | 		return nil | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Return a copy to prevent external modification | ||||||
|  | 	assignment := *rc.Override | ||||||
|  | 	return &assignment | ||||||
|  | } | ||||||
| @@ -100,6 +100,7 @@ type V2Config struct { | |||||||
| type DHTConfig struct { | type DHTConfig struct { | ||||||
| 	Enabled        bool     `yaml:"enabled"` | 	Enabled        bool     `yaml:"enabled"` | ||||||
| 	BootstrapPeers []string `yaml:"bootstrap_peers"` | 	BootstrapPeers []string `yaml:"bootstrap_peers"` | ||||||
|  | 	MDNSEnabled    bool     `yaml:"mdns_enabled"` | ||||||
| } | } | ||||||
|  |  | ||||||
| // UCXLConfig defines UCXL protocol settings | // UCXLConfig defines UCXL protocol settings | ||||||
| @@ -192,6 +193,7 @@ func LoadFromEnvironment() (*Config, error) { | |||||||
| 			DHT: DHTConfig{ | 			DHT: DHTConfig{ | ||||||
| 				Enabled:        getEnvBoolOrDefault("CHORUS_DHT_ENABLED", true), | 				Enabled:        getEnvBoolOrDefault("CHORUS_DHT_ENABLED", true), | ||||||
| 				BootstrapPeers: getEnvArrayOrDefault("CHORUS_BOOTSTRAP_PEERS", []string{}), | 				BootstrapPeers: getEnvArrayOrDefault("CHORUS_BOOTSTRAP_PEERS", []string{}), | ||||||
|  | 				MDNSEnabled:    getEnvBoolOrDefault("CHORUS_MDNS_ENABLED", true), | ||||||
| 			}, | 			}, | ||||||
| 		}, | 		}, | ||||||
| 		UCXL: UCXLConfig{ | 		UCXL: UCXLConfig{ | ||||||
|   | |||||||
| @@ -41,10 +41,16 @@ type HybridUCXLConfig struct { | |||||||
| } | } | ||||||
|  |  | ||||||
| type DiscoveryConfig struct { | type DiscoveryConfig struct { | ||||||
| 	MDNSEnabled       bool          `env:"CHORUS_MDNS_ENABLED" default:"true" json:"mdns_enabled" yaml:"mdns_enabled"` | 	MDNSEnabled        bool          `env:"CHORUS_MDNS_ENABLED" default:"true" json:"mdns_enabled" yaml:"mdns_enabled"` | ||||||
| 	DHTDiscovery      bool          `env:"CHORUS_DHT_DISCOVERY" default:"false" json:"dht_discovery" yaml:"dht_discovery"` | 	DHTDiscovery       bool          `env:"CHORUS_DHT_DISCOVERY" default:"false" json:"dht_discovery" yaml:"dht_discovery"` | ||||||
| 	AnnounceInterval  time.Duration `env:"CHORUS_ANNOUNCE_INTERVAL" default:"30s" json:"announce_interval" yaml:"announce_interval"` | 	AnnounceInterval   time.Duration `env:"CHORUS_ANNOUNCE_INTERVAL" default:"30s" json:"announce_interval" yaml:"announce_interval"` | ||||||
| 	ServiceName       string        `env:"CHORUS_SERVICE_NAME" default:"CHORUS" json:"service_name" yaml:"service_name"` | 	ServiceName        string        `env:"CHORUS_SERVICE_NAME" default:"CHORUS" json:"service_name" yaml:"service_name"` | ||||||
|  |  | ||||||
|  | 	// Rate limiting for scaling (as per WHOOSH issue #7) | ||||||
|  | 	DialsPerSecond     int           `env:"CHORUS_DIALS_PER_SEC" default:"5" json:"dials_per_second" yaml:"dials_per_second"` | ||||||
|  | 	MaxConcurrentDHT   int           `env:"CHORUS_MAX_CONCURRENT_DHT" default:"16" json:"max_concurrent_dht" yaml:"max_concurrent_dht"` | ||||||
|  | 	MaxConcurrentDials int           `env:"CHORUS_MAX_CONCURRENT_DIALS" default:"10" json:"max_concurrent_dials" yaml:"max_concurrent_dials"` | ||||||
|  | 	JoinStaggerMS      int           `env:"CHORUS_JOIN_STAGGER_MS" default:"0" json:"join_stagger_ms" yaml:"join_stagger_ms"` | ||||||
| } | } | ||||||
|  |  | ||||||
| type MonitoringConfig struct { | type MonitoringConfig struct { | ||||||
| @@ -79,10 +85,16 @@ func LoadHybridConfig() (*HybridConfig, error) { | |||||||
| 	 | 	 | ||||||
| 	// Load Discovery configuration | 	// Load Discovery configuration | ||||||
| 	config.Discovery = DiscoveryConfig{ | 	config.Discovery = DiscoveryConfig{ | ||||||
| 		MDNSEnabled:      getEnvBool("CHORUS_MDNS_ENABLED", true), | 		MDNSEnabled:        getEnvBool("CHORUS_MDNS_ENABLED", true), | ||||||
| 		DHTDiscovery:     getEnvBool("CHORUS_DHT_DISCOVERY", false), | 		DHTDiscovery:       getEnvBool("CHORUS_DHT_DISCOVERY", false), | ||||||
| 		AnnounceInterval: getEnvDuration("CHORUS_ANNOUNCE_INTERVAL", 30*time.Second), | 		AnnounceInterval:   getEnvDuration("CHORUS_ANNOUNCE_INTERVAL", 30*time.Second), | ||||||
| 		ServiceName:      getEnvString("CHORUS_SERVICE_NAME", "CHORUS"), | 		ServiceName:        getEnvString("CHORUS_SERVICE_NAME", "CHORUS"), | ||||||
|  |  | ||||||
|  | 		// Rate limiting for scaling (as per WHOOSH issue #7) | ||||||
|  | 		DialsPerSecond:     getEnvInt("CHORUS_DIALS_PER_SEC", 5), | ||||||
|  | 		MaxConcurrentDHT:   getEnvInt("CHORUS_MAX_CONCURRENT_DHT", 16), | ||||||
|  | 		MaxConcurrentDials: getEnvInt("CHORUS_MAX_CONCURRENT_DIALS", 10), | ||||||
|  | 		JoinStaggerMS:      getEnvInt("CHORUS_JOIN_STAGGER_MS", 0), | ||||||
| 	} | 	} | ||||||
| 	 | 	 | ||||||
| 	// Load Monitoring configuration | 	// Load Monitoring configuration | ||||||
|   | |||||||
| @@ -1,354 +0,0 @@ | |||||||
| package config |  | ||||||
|  |  | ||||||
| import ( |  | ||||||
| 	"context" |  | ||||||
| 	"encoding/json" |  | ||||||
| 	"fmt" |  | ||||||
| 	"io/ioutil" |  | ||||||
| 	"net/http" |  | ||||||
| 	"net/url" |  | ||||||
| 	"os" |  | ||||||
| 	"os/signal" |  | ||||||
| 	"sync" |  | ||||||
| 	"syscall" |  | ||||||
| 	"time" |  | ||||||
| ) |  | ||||||
|  |  | ||||||
| // RuntimeConfig provides dynamic configuration with assignment override support |  | ||||||
| type RuntimeConfig struct { |  | ||||||
| 	mu   sync.RWMutex |  | ||||||
| 	base *Config // Base configuration from environment |  | ||||||
| 	over *Config // Override configuration from assignment |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // AssignmentConfig represents configuration received from WHOOSH assignment |  | ||||||
| type AssignmentConfig struct { |  | ||||||
| 	Role                  string            `json:"role,omitempty"` |  | ||||||
| 	Model                 string            `json:"model,omitempty"` |  | ||||||
| 	PromptUCXL           string            `json:"prompt_ucxl,omitempty"` |  | ||||||
| 	Specialization       string            `json:"specialization,omitempty"` |  | ||||||
| 	Capabilities         []string          `json:"capabilities,omitempty"` |  | ||||||
| 	Environment          map[string]string `json:"environment,omitempty"` |  | ||||||
| 	BootstrapPeers       []string          `json:"bootstrap_peers,omitempty"` |  | ||||||
| 	JoinStaggerMS        int               `json:"join_stagger_ms,omitempty"` |  | ||||||
| 	DialsPerSecond       int               `json:"dials_per_second,omitempty"` |  | ||||||
| 	MaxConcurrentDHT     int               `json:"max_concurrent_dht,omitempty"` |  | ||||||
| 	AssignmentID         string            `json:"assignment_id,omitempty"` |  | ||||||
| 	ConfigEpoch          int64             `json:"config_epoch,omitempty"` |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // NewRuntimeConfig creates a new runtime configuration manager |  | ||||||
| func NewRuntimeConfig(baseConfig *Config) *RuntimeConfig { |  | ||||||
| 	return &RuntimeConfig{ |  | ||||||
| 		base: baseConfig, |  | ||||||
| 		over: &Config{}, // Empty override initially |  | ||||||
| 	} |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // Get retrieves a configuration value with override precedence |  | ||||||
| func (rc *RuntimeConfig) Get(key string) interface{} { |  | ||||||
| 	rc.mu.RLock() |  | ||||||
| 	defer rc.mu.RUnlock() |  | ||||||
|  |  | ||||||
| 	// Check override first, then base |  | ||||||
| 	if value := rc.getFromConfig(rc.over, key); value != nil { |  | ||||||
| 		return value |  | ||||||
| 	} |  | ||||||
| 	return rc.getFromConfig(rc.base, key) |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // getFromConfig extracts a value from a config struct by key |  | ||||||
| func (rc *RuntimeConfig) getFromConfig(cfg *Config, key string) interface{} { |  | ||||||
| 	if cfg == nil { |  | ||||||
| 		return nil |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	switch key { |  | ||||||
| 	case "agent.role": |  | ||||||
| 		if cfg.Agent.Role != "" { |  | ||||||
| 			return cfg.Agent.Role |  | ||||||
| 		} |  | ||||||
| 	case "agent.specialization": |  | ||||||
| 		if cfg.Agent.Specialization != "" { |  | ||||||
| 			return cfg.Agent.Specialization |  | ||||||
| 		} |  | ||||||
| 	case "agent.capabilities": |  | ||||||
| 		if len(cfg.Agent.Capabilities) > 0 { |  | ||||||
| 			return cfg.Agent.Capabilities |  | ||||||
| 		} |  | ||||||
| 	case "agent.models": |  | ||||||
| 		if len(cfg.Agent.Models) > 0 { |  | ||||||
| 			return cfg.Agent.Models |  | ||||||
| 		} |  | ||||||
| 	case "agent.default_reasoning_model": |  | ||||||
| 		if cfg.Agent.DefaultReasoningModel != "" { |  | ||||||
| 			return cfg.Agent.DefaultReasoningModel |  | ||||||
| 		} |  | ||||||
| 	case "v2.dht.bootstrap_peers": |  | ||||||
| 		if len(cfg.V2.DHT.BootstrapPeers) > 0 { |  | ||||||
| 			return cfg.V2.DHT.BootstrapPeers |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // GetString retrieves a string configuration value |  | ||||||
| func (rc *RuntimeConfig) GetString(key string) string { |  | ||||||
| 	if value := rc.Get(key); value != nil { |  | ||||||
| 		if str, ok := value.(string); ok { |  | ||||||
| 			return str |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return "" |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // GetStringSlice retrieves a string slice configuration value |  | ||||||
| func (rc *RuntimeConfig) GetStringSlice(key string) []string { |  | ||||||
| 	if value := rc.Get(key); value != nil { |  | ||||||
| 		if slice, ok := value.([]string); ok { |  | ||||||
| 			return slice |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // GetInt retrieves an integer configuration value |  | ||||||
| func (rc *RuntimeConfig) GetInt(key string) int { |  | ||||||
| 	if value := rc.Get(key); value != nil { |  | ||||||
| 		if i, ok := value.(int); ok { |  | ||||||
| 			return i |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
| 	return 0 |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // LoadAssignment loads configuration from WHOOSH assignment endpoint |  | ||||||
| func (rc *RuntimeConfig) LoadAssignment(ctx context.Context) error { |  | ||||||
| 	assignURL := os.Getenv("ASSIGN_URL") |  | ||||||
| 	if assignURL == "" { |  | ||||||
| 		return nil // No assignment URL configured |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Build assignment request URL with task identity |  | ||||||
| 	params := url.Values{} |  | ||||||
| 	if taskSlot := os.Getenv("TASK_SLOT"); taskSlot != "" { |  | ||||||
| 		params.Set("slot", taskSlot) |  | ||||||
| 	} |  | ||||||
| 	if taskID := os.Getenv("TASK_ID"); taskID != "" { |  | ||||||
| 		params.Set("task", taskID) |  | ||||||
| 	} |  | ||||||
| 	if clusterID := os.Getenv("CHORUS_CLUSTER_ID"); clusterID != "" { |  | ||||||
| 		params.Set("cluster", clusterID) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	fullURL := assignURL |  | ||||||
| 	if len(params) > 0 { |  | ||||||
| 		fullURL += "?" + params.Encode() |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Fetch assignment with timeout |  | ||||||
| 	ctx, cancel := context.WithTimeout(ctx, 10*time.Second) |  | ||||||
| 	defer cancel() |  | ||||||
|  |  | ||||||
| 	req, err := http.NewRequestWithContext(ctx, "GET", fullURL, nil) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("failed to create assignment request: %w", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	client := &http.Client{Timeout: 10 * time.Second} |  | ||||||
| 	resp, err := client.Do(req) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("assignment request failed: %w", err) |  | ||||||
| 	} |  | ||||||
| 	defer resp.Body.Close() |  | ||||||
|  |  | ||||||
| 	if resp.StatusCode != http.StatusOK { |  | ||||||
| 		return fmt.Errorf("assignment request failed with status %d", resp.StatusCode) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Parse assignment response |  | ||||||
| 	var assignment AssignmentConfig |  | ||||||
| 	if err := json.NewDecoder(resp.Body).Decode(&assignment); err != nil { |  | ||||||
| 		return fmt.Errorf("failed to decode assignment response: %w", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Apply assignment to override config |  | ||||||
| 	if err := rc.applyAssignment(&assignment); err != nil { |  | ||||||
| 		return fmt.Errorf("failed to apply assignment: %w", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	fmt.Printf("📥 Loaded assignment: role=%s, model=%s, epoch=%d\n", |  | ||||||
| 		assignment.Role, assignment.Model, assignment.ConfigEpoch) |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // LoadAssignmentFromFile loads configuration from a file (for config objects) |  | ||||||
| func (rc *RuntimeConfig) LoadAssignmentFromFile(filePath string) error { |  | ||||||
| 	if filePath == "" { |  | ||||||
| 		return nil // No file configured |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	data, err := ioutil.ReadFile(filePath) |  | ||||||
| 	if err != nil { |  | ||||||
| 		return fmt.Errorf("failed to read assignment file %s: %w", filePath, err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	var assignment AssignmentConfig |  | ||||||
| 	if err := json.Unmarshal(data, &assignment); err != nil { |  | ||||||
| 		return fmt.Errorf("failed to parse assignment file: %w", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if err := rc.applyAssignment(&assignment); err != nil { |  | ||||||
| 		return fmt.Errorf("failed to apply file assignment: %w", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	fmt.Printf("📁 Loaded assignment from file: role=%s, model=%s\n", |  | ||||||
| 		assignment.Role, assignment.Model) |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // applyAssignment applies an assignment to the override configuration |  | ||||||
| func (rc *RuntimeConfig) applyAssignment(assignment *AssignmentConfig) error { |  | ||||||
| 	rc.mu.Lock() |  | ||||||
| 	defer rc.mu.Unlock() |  | ||||||
|  |  | ||||||
| 	// Create new override config |  | ||||||
| 	override := &Config{ |  | ||||||
| 		Agent: AgentConfig{ |  | ||||||
| 			Role:                  assignment.Role, |  | ||||||
| 			Specialization:        assignment.Specialization, |  | ||||||
| 			Capabilities:          assignment.Capabilities, |  | ||||||
| 			DefaultReasoningModel: assignment.Model, |  | ||||||
| 		}, |  | ||||||
| 		V2: V2Config{ |  | ||||||
| 			DHT: DHTConfig{ |  | ||||||
| 				BootstrapPeers: assignment.BootstrapPeers, |  | ||||||
| 			}, |  | ||||||
| 		}, |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Handle models array |  | ||||||
| 	if assignment.Model != "" { |  | ||||||
| 		override.Agent.Models = []string{assignment.Model} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Apply environment variables from assignment |  | ||||||
| 	for key, value := range assignment.Environment { |  | ||||||
| 		os.Setenv(key, value) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	rc.over = override |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // StartReloadHandler starts a signal handler for configuration reload (SIGHUP) |  | ||||||
| func (rc *RuntimeConfig) StartReloadHandler(ctx context.Context) { |  | ||||||
| 	sigChan := make(chan os.Signal, 1) |  | ||||||
| 	signal.Notify(sigChan, syscall.SIGHUP) |  | ||||||
|  |  | ||||||
| 	go func() { |  | ||||||
| 		for { |  | ||||||
| 			select { |  | ||||||
| 			case <-ctx.Done(): |  | ||||||
| 				return |  | ||||||
| 			case <-sigChan: |  | ||||||
| 				fmt.Println("🔄 Received SIGHUP, reloading configuration...") |  | ||||||
| 				if err := rc.LoadAssignment(ctx); err != nil { |  | ||||||
| 					fmt.Printf("⚠️ Failed to reload assignment: %v\n", err) |  | ||||||
| 				} else { |  | ||||||
| 					fmt.Println("✅ Configuration reloaded successfully") |  | ||||||
| 				} |  | ||||||
| 			} |  | ||||||
| 		} |  | ||||||
| 	}() |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // GetBaseConfig returns the base configuration (from environment) |  | ||||||
| func (rc *RuntimeConfig) GetBaseConfig() *Config { |  | ||||||
| 	rc.mu.RLock() |  | ||||||
| 	defer rc.mu.RUnlock() |  | ||||||
| 	return rc.base |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // GetEffectiveConfig returns the effective merged configuration |  | ||||||
| func (rc *RuntimeConfig) GetEffectiveConfig() *Config { |  | ||||||
| 	rc.mu.RLock() |  | ||||||
| 	defer rc.mu.RUnlock() |  | ||||||
|  |  | ||||||
| 	// Start with base config |  | ||||||
| 	effective := *rc.base |  | ||||||
|  |  | ||||||
| 	// Apply overrides |  | ||||||
| 	if rc.over.Agent.Role != "" { |  | ||||||
| 		effective.Agent.Role = rc.over.Agent.Role |  | ||||||
| 	} |  | ||||||
| 	if rc.over.Agent.Specialization != "" { |  | ||||||
| 		effective.Agent.Specialization = rc.over.Agent.Specialization |  | ||||||
| 	} |  | ||||||
| 	if len(rc.over.Agent.Capabilities) > 0 { |  | ||||||
| 		effective.Agent.Capabilities = rc.over.Agent.Capabilities |  | ||||||
| 	} |  | ||||||
| 	if len(rc.over.Agent.Models) > 0 { |  | ||||||
| 		effective.Agent.Models = rc.over.Agent.Models |  | ||||||
| 	} |  | ||||||
| 	if rc.over.Agent.DefaultReasoningModel != "" { |  | ||||||
| 		effective.Agent.DefaultReasoningModel = rc.over.Agent.DefaultReasoningModel |  | ||||||
| 	} |  | ||||||
| 	if len(rc.over.V2.DHT.BootstrapPeers) > 0 { |  | ||||||
| 		effective.V2.DHT.BootstrapPeers = rc.over.V2.DHT.BootstrapPeers |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return &effective |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // GetAssignmentStats returns assignment statistics for monitoring |  | ||||||
| func (rc *RuntimeConfig) GetAssignmentStats() map[string]interface{} { |  | ||||||
| 	rc.mu.RLock() |  | ||||||
| 	defer rc.mu.RUnlock() |  | ||||||
|  |  | ||||||
| 	hasOverride := rc.over.Agent.Role != "" || |  | ||||||
| 		rc.over.Agent.Specialization != "" || |  | ||||||
| 		len(rc.over.Agent.Capabilities) > 0 || |  | ||||||
| 		len(rc.over.V2.DHT.BootstrapPeers) > 0 |  | ||||||
|  |  | ||||||
| 	stats := map[string]interface{}{ |  | ||||||
| 		"has_assignment": hasOverride, |  | ||||||
| 		"assign_url":     os.Getenv("ASSIGN_URL"), |  | ||||||
| 		"task_slot":      os.Getenv("TASK_SLOT"), |  | ||||||
| 		"task_id":        os.Getenv("TASK_ID"), |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	if hasOverride { |  | ||||||
| 		stats["assigned_role"] = rc.over.Agent.Role |  | ||||||
| 		stats["assigned_specialization"] = rc.over.Agent.Specialization |  | ||||||
| 		stats["assigned_capabilities"] = rc.over.Agent.Capabilities |  | ||||||
| 		stats["assigned_models"] = rc.over.Agent.Models |  | ||||||
| 		stats["bootstrap_peers_count"] = len(rc.over.V2.DHT.BootstrapPeers) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	return stats |  | ||||||
| } |  | ||||||
|  |  | ||||||
| // InitializeAssignmentFromEnv initializes assignment from environment variables |  | ||||||
| func (rc *RuntimeConfig) InitializeAssignmentFromEnv(ctx context.Context) error { |  | ||||||
| 	// Try loading from assignment URL first |  | ||||||
| 	if err := rc.LoadAssignment(ctx); err != nil { |  | ||||||
| 		fmt.Printf("⚠️ Failed to load assignment from URL: %v\n", err) |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Try loading from file (for config objects) |  | ||||||
| 	if assignFile := os.Getenv("ASSIGNMENT_FILE"); assignFile != "" { |  | ||||||
| 		if err := rc.LoadAssignmentFromFile(assignFile); err != nil { |  | ||||||
| 			fmt.Printf("⚠️ Failed to load assignment from file: %v\n", err) |  | ||||||
| 		} |  | ||||||
| 	} |  | ||||||
|  |  | ||||||
| 	// Start reload handler for SIGHUP |  | ||||||
| 	rc.StartReloadHandler(ctx) |  | ||||||
|  |  | ||||||
| 	return nil |  | ||||||
| } |  | ||||||
| @@ -6,6 +6,7 @@ import ( | |||||||
| 	"fmt" | 	"fmt" | ||||||
| 	"log" | 	"log" | ||||||
| 	"math/rand" | 	"math/rand" | ||||||
|  | 	"os" | ||||||
| 	"sync" | 	"sync" | ||||||
| 	"time" | 	"time" | ||||||
|  |  | ||||||
| @@ -102,6 +103,11 @@ type ElectionManager struct { | |||||||
| 	onAdminChanged     func(oldAdmin, newAdmin string) | 	onAdminChanged     func(oldAdmin, newAdmin string) | ||||||
| 	onElectionComplete func(winner string) | 	onElectionComplete func(winner string) | ||||||
|  |  | ||||||
|  | 	// Stability window to prevent election churn (Medium-risk fix 2.1) | ||||||
|  | 	lastElectionTime    time.Time | ||||||
|  | 	electionStabilityWindow time.Duration | ||||||
|  | 	leaderStabilityWindow   time.Duration | ||||||
|  |  | ||||||
| 	startTime time.Time | 	startTime time.Time | ||||||
| } | } | ||||||
|  |  | ||||||
| @@ -137,6 +143,10 @@ func NewElectionManager( | |||||||
| 		votes:           make(map[string]string), | 		votes:           make(map[string]string), | ||||||
| 		electionTrigger: make(chan ElectionTrigger, 10), | 		electionTrigger: make(chan ElectionTrigger, 10), | ||||||
| 		startTime:       time.Now(), | 		startTime:       time.Now(), | ||||||
|  |  | ||||||
|  | 		// Initialize stability windows (as per WHOOSH issue #7) | ||||||
|  | 		electionStabilityWindow: getElectionStabilityWindow(cfg), | ||||||
|  | 		leaderStabilityWindow:   getLeaderStabilityWindow(cfg), | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
| 	// Initialize heartbeat manager | 	// Initialize heartbeat manager | ||||||
| @@ -220,11 +230,13 @@ func (em *ElectionManager) Stop() { | |||||||
| 	} | 	} | ||||||
| } | } | ||||||
|  |  | ||||||
| // TriggerElection manually triggers an election | // TriggerElection manually triggers an election with stability window checks | ||||||
| func (em *ElectionManager) TriggerElection(trigger ElectionTrigger) { | func (em *ElectionManager) TriggerElection(trigger ElectionTrigger) { | ||||||
| 	// Check if election already in progress | 	// Check if election already in progress | ||||||
| 	em.mu.RLock() | 	em.mu.RLock() | ||||||
| 	currentState := em.state | 	currentState := em.state | ||||||
|  | 	currentAdmin := em.currentAdmin | ||||||
|  | 	lastElection := em.lastElectionTime | ||||||
| 	em.mu.RUnlock() | 	em.mu.RUnlock() | ||||||
|  |  | ||||||
| 	if currentState != StateIdle { | 	if currentState != StateIdle { | ||||||
| @@ -232,6 +244,26 @@ func (em *ElectionManager) TriggerElection(trigger ElectionTrigger) { | |||||||
| 		return | 		return | ||||||
| 	} | 	} | ||||||
|  |  | ||||||
|  | 	// Apply stability window to prevent election churn (WHOOSH issue #7) | ||||||
|  | 	now := time.Now() | ||||||
|  | 	if !lastElection.IsZero() { | ||||||
|  | 		timeSinceElection := now.Sub(lastElection) | ||||||
|  |  | ||||||
|  | 		// If we have a current admin, check leader stability window | ||||||
|  | 		if currentAdmin != "" && timeSinceElection < em.leaderStabilityWindow { | ||||||
|  | 			log.Printf("⏳ Leader stability window active (%.1fs remaining), ignoring trigger: %s", | ||||||
|  | 				(em.leaderStabilityWindow - timeSinceElection).Seconds(), trigger) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  |  | ||||||
|  | 		// General election stability window | ||||||
|  | 		if timeSinceElection < em.electionStabilityWindow { | ||||||
|  | 			log.Printf("⏳ Election stability window active (%.1fs remaining), ignoring trigger: %s", | ||||||
|  | 				(em.electionStabilityWindow - timeSinceElection).Seconds(), trigger) | ||||||
|  | 			return | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
| 	select { | 	select { | ||||||
| 	case em.electionTrigger <- trigger: | 	case em.electionTrigger <- trigger: | ||||||
| 		log.Printf("🗳️ Election triggered: %s", trigger) | 		log.Printf("🗳️ Election triggered: %s", trigger) | ||||||
| @@ -442,6 +474,7 @@ func (em *ElectionManager) beginElection(trigger ElectionTrigger) { | |||||||
| 	em.mu.Lock() | 	em.mu.Lock() | ||||||
| 	em.state = StateElecting | 	em.state = StateElecting | ||||||
| 	em.currentTerm++ | 	em.currentTerm++ | ||||||
|  | 	em.lastElectionTime = time.Now() // Record election timestamp for stability window | ||||||
| 	term := em.currentTerm | 	term := em.currentTerm | ||||||
| 	em.candidates = make(map[string]*AdminCandidate) | 	em.candidates = make(map[string]*AdminCandidate) | ||||||
| 	em.votes = make(map[string]string) | 	em.votes = make(map[string]string) | ||||||
| @@ -1119,3 +1152,43 @@ func (hm *HeartbeatManager) GetHeartbeatStatus() map[string]interface{} { | |||||||
|  |  | ||||||
| 	return status | 	return status | ||||||
| } | } | ||||||
|  |  | ||||||
|  | // Helper functions for stability window configuration | ||||||
|  |  | ||||||
|  | // getElectionStabilityWindow gets the minimum time between elections | ||||||
|  | func getElectionStabilityWindow(cfg *config.Config) time.Duration { | ||||||
|  | 	// Try to get from environment or use default | ||||||
|  | 	if stability := os.Getenv("CHORUS_ELECTION_MIN_TERM"); stability != "" { | ||||||
|  | 		if duration, err := time.ParseDuration(stability); err == nil { | ||||||
|  | 			return duration | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Try to get from config structure if it exists | ||||||
|  | 	if cfg.Security.ElectionConfig.DiscoveryTimeout > 0 { | ||||||
|  | 		// Use double the discovery timeout as default stability window | ||||||
|  | 		return cfg.Security.ElectionConfig.DiscoveryTimeout * 2 | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Default fallback | ||||||
|  | 	return 30 * time.Second | ||||||
|  | } | ||||||
|  |  | ||||||
|  | // getLeaderStabilityWindow gets the minimum time before challenging a healthy leader | ||||||
|  | func getLeaderStabilityWindow(cfg *config.Config) time.Duration { | ||||||
|  | 	// Try to get from environment or use default | ||||||
|  | 	if stability := os.Getenv("CHORUS_LEADER_MIN_TERM"); stability != "" { | ||||||
|  | 		if duration, err := time.ParseDuration(stability); err == nil { | ||||||
|  | 			return duration | ||||||
|  | 		} | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Try to get from config structure if it exists | ||||||
|  | 	if cfg.Security.ElectionConfig.HeartbeatTimeout > 0 { | ||||||
|  | 		// Use 3x heartbeat timeout as default leader stability | ||||||
|  | 		return cfg.Security.ElectionConfig.HeartbeatTimeout * 3 | ||||||
|  | 	} | ||||||
|  |  | ||||||
|  | 	// Default fallback | ||||||
|  | 	return 45 * time.Second | ||||||
|  | } | ||||||
|   | |||||||
		Reference in New Issue
	
	Block a user