1 Commits

6 changed files with 438 additions and 68 deletions

View File

@@ -9,11 +9,10 @@ import (
"chorus/internal/logging" "chorus/internal/logging"
"chorus/pubsub" "chorus/pubsub"
"github.com/gorilla/mux" "github.com/gorilla/mux"
) )
// HTTPServer provides HTTP API endpoints for CHORUS // HTTPServer provides HTTP API endpoints for Bzzz
type HTTPServer struct { type HTTPServer struct {
port int port int
hypercoreLog *logging.HypercoreLog hypercoreLog *logging.HypercoreLog
@@ -21,7 +20,7 @@ type HTTPServer struct {
server *http.Server server *http.Server
} }
// NewHTTPServer creates a new HTTP server for CHORUS API // NewHTTPServer creates a new HTTP server for Bzzz API
func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer { func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer {
return &HTTPServer{ return &HTTPServer{
port: port, port: port,
@@ -33,38 +32,38 @@ func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTT
// Start starts the HTTP server // Start starts the HTTP server
func (h *HTTPServer) Start() error { func (h *HTTPServer) Start() error {
router := mux.NewRouter() router := mux.NewRouter()
// Enable CORS for all routes // Enable CORS for all routes
router.Use(func(next http.Handler) http.Handler { router.Use(func(next http.Handler) http.Handler {
return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Access-Control-Allow-Origin", "*") w.Header().Set("Access-Control-Allow-Origin", "*")
w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS") w.Header().Set("Access-Control-Allow-Methods", "GET, POST, PUT, DELETE, OPTIONS")
w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization") w.Header().Set("Access-Control-Allow-Headers", "Content-Type, Authorization")
if r.Method == "OPTIONS" { if r.Method == "OPTIONS" {
w.WriteHeader(http.StatusOK) w.WriteHeader(http.StatusOK)
return return
} }
next.ServeHTTP(w, r) next.ServeHTTP(w, r)
}) })
}) })
// API routes // API routes
api := router.PathPrefix("/api").Subrouter() api := router.PathPrefix("/api").Subrouter()
// Hypercore log endpoints // Hypercore log endpoints
api.HandleFunc("/hypercore/logs", h.handleGetLogs).Methods("GET") api.HandleFunc("/hypercore/logs", h.handleGetLogs).Methods("GET")
api.HandleFunc("/hypercore/logs/recent", h.handleGetRecentLogs).Methods("GET") api.HandleFunc("/hypercore/logs/recent", h.handleGetRecentLogs).Methods("GET")
api.HandleFunc("/hypercore/logs/stats", h.handleGetLogStats).Methods("GET") api.HandleFunc("/hypercore/logs/stats", h.handleGetLogStats).Methods("GET")
api.HandleFunc("/hypercore/logs/since/{index}", h.handleGetLogsSince).Methods("GET") api.HandleFunc("/hypercore/logs/since/{index}", h.handleGetLogsSince).Methods("GET")
// Health check // Health check
api.HandleFunc("/health", h.handleHealth).Methods("GET") api.HandleFunc("/health", h.handleHealth).Methods("GET")
// Status endpoint // Status endpoint
api.HandleFunc("/status", h.handleStatus).Methods("GET") api.HandleFunc("/status", h.handleStatus).Methods("GET")
h.server = &http.Server{ h.server = &http.Server{
Addr: fmt.Sprintf(":%d", h.port), Addr: fmt.Sprintf(":%d", h.port),
Handler: router, Handler: router,
@@ -72,7 +71,7 @@ func (h *HTTPServer) Start() error {
WriteTimeout: 15 * time.Second, WriteTimeout: 15 * time.Second,
IdleTimeout: 60 * time.Second, IdleTimeout: 60 * time.Second,
} }
fmt.Printf("🌐 Starting HTTP API server on port %d\n", h.port) fmt.Printf("🌐 Starting HTTP API server on port %d\n", h.port)
return h.server.ListenAndServe() return h.server.ListenAndServe()
} }
@@ -88,16 +87,16 @@ func (h *HTTPServer) Stop() error {
// handleGetLogs returns hypercore log entries // handleGetLogs returns hypercore log entries
func (h *HTTPServer) handleGetLogs(w http.ResponseWriter, r *http.Request) { func (h *HTTPServer) handleGetLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
// Parse query parameters // Parse query parameters
query := r.URL.Query() query := r.URL.Query()
startStr := query.Get("start") startStr := query.Get("start")
endStr := query.Get("end") endStr := query.Get("end")
limitStr := query.Get("limit") limitStr := query.Get("limit")
var start, end uint64 var start, end uint64
var err error var err error
if startStr != "" { if startStr != "" {
start, err = strconv.ParseUint(startStr, 10, 64) start, err = strconv.ParseUint(startStr, 10, 64)
if err != nil { if err != nil {
@@ -105,7 +104,7 @@ func (h *HTTPServer) handleGetLogs(w http.ResponseWriter, r *http.Request) {
return return
} }
} }
if endStr != "" { if endStr != "" {
end, err = strconv.ParseUint(endStr, 10, 64) end, err = strconv.ParseUint(endStr, 10, 64)
if err != nil { if err != nil {
@@ -115,7 +114,7 @@ func (h *HTTPServer) handleGetLogs(w http.ResponseWriter, r *http.Request) {
} else { } else {
end = h.hypercoreLog.Length() end = h.hypercoreLog.Length()
} }
var limit int = 100 // Default limit var limit int = 100 // Default limit
if limitStr != "" { if limitStr != "" {
limit, err = strconv.Atoi(limitStr) limit, err = strconv.Atoi(limitStr)
@@ -123,7 +122,7 @@ func (h *HTTPServer) handleGetLogs(w http.ResponseWriter, r *http.Request) {
limit = 100 limit = 100
} }
} }
// Get log entries // Get log entries
var entries []logging.LogEntry var entries []logging.LogEntry
if endStr != "" || startStr != "" { if endStr != "" || startStr != "" {
@@ -131,87 +130,87 @@ func (h *HTTPServer) handleGetLogs(w http.ResponseWriter, r *http.Request) {
} else { } else {
entries, err = h.hypercoreLog.GetRecentEntries(limit) entries, err = h.hypercoreLog.GetRecentEntries(limit)
} }
if err != nil { if err != nil {
http.Error(w, fmt.Sprintf("Failed to get log entries: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Failed to get log entries: %v", err), http.StatusInternalServerError)
return return
} }
response := map[string]interface{}{ response := map[string]interface{}{
"entries": entries, "entries": entries,
"count": len(entries), "count": len(entries),
"timestamp": time.Now().Unix(), "timestamp": time.Now().Unix(),
"total": h.hypercoreLog.Length(), "total": h.hypercoreLog.Length(),
} }
json.NewEncoder(w).Encode(response) json.NewEncoder(w).Encode(response)
} }
// handleGetRecentLogs returns the most recent log entries // handleGetRecentLogs returns the most recent log entries
func (h *HTTPServer) handleGetRecentLogs(w http.ResponseWriter, r *http.Request) { func (h *HTTPServer) handleGetRecentLogs(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
// Parse limit parameter // Parse limit parameter
query := r.URL.Query() query := r.URL.Query()
limitStr := query.Get("limit") limitStr := query.Get("limit")
limit := 50 // Default limit := 50 // Default
if limitStr != "" { if limitStr != "" {
if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 { if l, err := strconv.Atoi(limitStr); err == nil && l > 0 && l <= 1000 {
limit = l limit = l
} }
} }
entries, err := h.hypercoreLog.GetRecentEntries(limit) entries, err := h.hypercoreLog.GetRecentEntries(limit)
if err != nil { if err != nil {
http.Error(w, fmt.Sprintf("Failed to get recent entries: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Failed to get recent entries: %v", err), http.StatusInternalServerError)
return return
} }
response := map[string]interface{}{ response := map[string]interface{}{
"entries": entries, "entries": entries,
"count": len(entries), "count": len(entries),
"timestamp": time.Now().Unix(), "timestamp": time.Now().Unix(),
"total": h.hypercoreLog.Length(), "total": h.hypercoreLog.Length(),
} }
json.NewEncoder(w).Encode(response) json.NewEncoder(w).Encode(response)
} }
// handleGetLogsSince returns log entries since a given index // handleGetLogsSince returns log entries since a given index
func (h *HTTPServer) handleGetLogsSince(w http.ResponseWriter, r *http.Request) { func (h *HTTPServer) handleGetLogsSince(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
vars := mux.Vars(r) vars := mux.Vars(r)
indexStr := vars["index"] indexStr := vars["index"]
index, err := strconv.ParseUint(indexStr, 10, 64) index, err := strconv.ParseUint(indexStr, 10, 64)
if err != nil { if err != nil {
http.Error(w, "Invalid index parameter", http.StatusBadRequest) http.Error(w, "Invalid index parameter", http.StatusBadRequest)
return return
} }
entries, err := h.hypercoreLog.GetEntriesSince(index) entries, err := h.hypercoreLog.GetEntriesSince(index)
if err != nil { if err != nil {
http.Error(w, fmt.Sprintf("Failed to get entries since index: %v", err), http.StatusInternalServerError) http.Error(w, fmt.Sprintf("Failed to get entries since index: %v", err), http.StatusInternalServerError)
return return
} }
response := map[string]interface{}{ response := map[string]interface{}{
"entries": entries, "entries": entries,
"count": len(entries), "count": len(entries),
"since_index": index, "since_index": index,
"timestamp": time.Now().Unix(), "timestamp": time.Now().Unix(),
"total": h.hypercoreLog.Length(), "total": h.hypercoreLog.Length(),
} }
json.NewEncoder(w).Encode(response) json.NewEncoder(w).Encode(response)
} }
// handleGetLogStats returns statistics about the hypercore log // handleGetLogStats returns statistics about the hypercore log
func (h *HTTPServer) handleGetLogStats(w http.ResponseWriter, r *http.Request) { func (h *HTTPServer) handleGetLogStats(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
stats := h.hypercoreLog.GetStats() stats := h.hypercoreLog.GetStats()
json.NewEncoder(w).Encode(stats) json.NewEncoder(w).Encode(stats)
} }
@@ -219,26 +218,26 @@ func (h *HTTPServer) handleGetLogStats(w http.ResponseWriter, r *http.Request) {
// handleHealth returns health status // handleHealth returns health status
func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) { func (h *HTTPServer) handleHealth(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
health := map[string]interface{}{ health := map[string]interface{}{
"status": "healthy", "status": "healthy",
"timestamp": time.Now().Unix(), "timestamp": time.Now().Unix(),
"log_entries": h.hypercoreLog.Length(), "log_entries": h.hypercoreLog.Length(),
} }
json.NewEncoder(w).Encode(health) json.NewEncoder(w).Encode(health)
} }
// handleStatus returns detailed status information // handleStatus returns detailed status information
func (h *HTTPServer) handleStatus(w http.ResponseWriter, r *http.Request) { func (h *HTTPServer) handleStatus(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json") w.Header().Set("Content-Type", "application/json")
status := map[string]interface{}{ status := map[string]interface{}{
"status": "running", "status": "running",
"timestamp": time.Now().Unix(), "timestamp": time.Now().Unix(),
"hypercore": h.hypercoreLog.GetStats(), "hypercore": h.hypercoreLog.GetStats(),
"api_version": "1.0.0", "api_version": "1.0.0",
} }
json.NewEncoder(w).Encode(status) json.NewEncoder(w).Encode(status)
} }

View File

@@ -115,6 +115,7 @@ services:
memory: 128M memory: 128M
placement: placement:
constraints: constraints:
- node.hostname != rosewood
- node.hostname != acacia - node.hostname != acacia
preferences: preferences:
- spread: node.hostname - spread: node.hostname
@@ -193,13 +194,6 @@ services:
WHOOSH_SCALING_KACHING_URL: "https://kaching.chorus.services" WHOOSH_SCALING_KACHING_URL: "https://kaching.chorus.services"
WHOOSH_SCALING_BACKBEAT_URL: "http://backbeat-pulse:8080" WHOOSH_SCALING_BACKBEAT_URL: "http://backbeat-pulse:8080"
WHOOSH_SCALING_CHORUS_URL: "http://chorus:9000" WHOOSH_SCALING_CHORUS_URL: "http://chorus:9000"
# BACKBEAT integration configuration (temporarily disabled)
WHOOSH_BACKBEAT_ENABLED: "false"
WHOOSH_BACKBEAT_CLUSTER_ID: "chorus-production"
WHOOSH_BACKBEAT_AGENT_ID: "whoosh"
WHOOSH_BACKBEAT_NATS_URL: "nats://backbeat-nats:4222"
secrets: secrets:
- whoosh_db_password - whoosh_db_password
- gitea_token - gitea_token
@@ -252,6 +246,7 @@ services:
- traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash - traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash
networks: networks:
- tengig - tengig
- whoosh-backend
- chorus_net - chorus_net
healthcheck: healthcheck:
test: ["CMD", "/app/whoosh", "--health-check"] test: ["CMD", "/app/whoosh", "--health-check"]
@@ -289,13 +284,14 @@ services:
memory: 256M memory: 256M
cpus: '0.5' cpus: '0.5'
networks: networks:
- whoosh-backend
- chorus_net - chorus_net
healthcheck: healthcheck:
test: ["CMD-SHELL", "pg_isready -h localhost -p 5432 -U whoosh -d whoosh"] test: ["CMD-SHELL", "pg_isready -U whoosh"]
interval: 30s interval: 30s
timeout: 10s timeout: 10s
retries: 5 retries: 5
start_period: 40s start_period: 30s
redis: redis:
@@ -323,6 +319,7 @@ services:
memory: 64M memory: 64M
cpus: '0.1' cpus: '0.1'
networks: networks:
- whoosh-backend
- chorus_net - chorus_net
healthcheck: healthcheck:
test: ["CMD", "sh", "-c", "redis-cli --no-auth-warning -a $$(cat /run/secrets/redis_password) ping"] test: ["CMD", "sh", "-c", "redis-cli --no-auth-warning -a $$(cat /run/secrets/redis_password) ping"]
@@ -354,6 +351,9 @@ services:
- "9099:9090" # Expose Prometheus UI - "9099:9090" # Expose Prometheus UI
deploy: deploy:
replicas: 1 replicas: 1
placement:
constraints:
- node.hostname != rosewood
labels: labels:
- traefik.enable=true - traefik.enable=true
- traefik.http.routers.prometheus.rule=Host(`prometheus.chorus.services`) - traefik.http.routers.prometheus.rule=Host(`prometheus.chorus.services`)
@@ -383,6 +383,9 @@ services:
- "3300:3000" # Expose Grafana UI - "3300:3000" # Expose Grafana UI
deploy: deploy:
replicas: 1 replicas: 1
placement:
constraints:
- node.hostname != rosewood
labels: labels:
- traefik.enable=true - traefik.enable=true
- traefik.http.routers.grafana.rule=Host(`grafana.chorus.services`) - traefik.http.routers.grafana.rule=Host(`grafana.chorus.services`)
@@ -445,6 +448,8 @@ services:
placement: placement:
preferences: preferences:
- spread: node.hostname - spread: node.hostname
constraints:
- node.hostname != rosewood # Avoid intermittent gaming PC
resources: resources:
limits: limits:
memory: 256M memory: 256M
@@ -512,6 +517,8 @@ services:
placement: placement:
preferences: preferences:
- spread: node.hostname - spread: node.hostname
constraints:
- node.hostname != rosewood
resources: resources:
limits: limits:
memory: 512M # Larger for window aggregation memory: 512M # Larger for window aggregation
@@ -544,6 +551,7 @@ services:
backbeat-nats: backbeat-nats:
image: nats:2.9-alpine image: nats:2.9-alpine
command: ["--jetstream"] command: ["--jetstream"]
deploy: deploy:
replicas: 1 replicas: 1
restart_policy: restart_policy:
@@ -554,6 +562,8 @@ services:
placement: placement:
preferences: preferences:
- spread: node.hostname - spread: node.hostname
constraints:
- node.hostname != rosewood
resources: resources:
limits: limits:
memory: 256M memory: 256M
@@ -561,8 +571,10 @@ services:
reservations: reservations:
memory: 128M memory: 128M
cpus: '0.25' cpus: '0.25'
networks: networks:
- chorus_net - chorus_net
# Container logging # Container logging
logging: logging:
driver: "json-file" driver: "json-file"
@@ -615,9 +627,17 @@ networks:
tengig: tengig:
external: true external: true
whoosh-backend:
driver: overlay
attachable: false
chorus_net: chorus_net:
driver: overlay driver: overlay
attachable: true attachable: true
ipam:
config:
- subnet: 10.201.0.0/24
configs: configs:

View File

@@ -6,18 +6,17 @@ import (
"time" "time"
"chorus/pkg/dht" "chorus/pkg/dht"
"github.com/libp2p/go-libp2p" "github.com/libp2p/go-libp2p"
kaddht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/p2p/net/connmgr" "github.com/libp2p/go-libp2p/p2p/net/connmgr"
"github.com/libp2p/go-libp2p/p2p/security/noise" "github.com/libp2p/go-libp2p/p2p/security/noise"
"github.com/libp2p/go-libp2p/p2p/transport/tcp" "github.com/libp2p/go-libp2p/p2p/transport/tcp"
kaddht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/multiformats/go-multiaddr" "github.com/multiformats/go-multiaddr"
) )
// Node represents a CHORUS P2P node // Node represents a Bzzz P2P node
type Node struct { type Node struct {
host host.Host host host.Host
ctx context.Context ctx context.Context
@@ -48,8 +47,8 @@ func NewNode(ctx context.Context, opts ...Option) (*Node, error) {
// Create connection manager with scaling-optimized limits // Create connection manager with scaling-optimized limits
connManager, err := connmgr.NewConnManager( connManager, err := connmgr.NewConnManager(
config.LowWatermark, // Low watermark (32) config.LowWatermark, // Low watermark (32)
config.HighWatermark, // High watermark (128) config.HighWatermark, // High watermark (128)
connmgr.WithGracePeriod(30*time.Second), // Grace period before pruning connmgr.WithGracePeriod(30*time.Second), // Grace period before pruning
) )
if err != nil { if err != nil {
@@ -65,7 +64,7 @@ func NewNode(ctx context.Context, opts ...Option) (*Node, error) {
libp2p.DefaultMuxers, libp2p.DefaultMuxers,
libp2p.EnableRelay(), libp2p.EnableRelay(),
libp2p.ConnectionManager(connManager), // Add connection management libp2p.ConnectionManager(connManager), // Add connection management
libp2p.EnableAutoRelay(), // Enable AutoRelay for container environments libp2p.EnableAutoNATv2(), // Enable AutoNAT for container environments
) )
if err != nil { if err != nil {
cancel() cancel()
@@ -172,9 +171,9 @@ func (n *Node) startBackgroundTasks() {
// logConnectionStatus logs the current connection status // logConnectionStatus logs the current connection status
func (n *Node) logConnectionStatus() { func (n *Node) logConnectionStatus() {
peers := n.Peers() peers := n.Peers()
fmt.Printf("CHORUS Node Status - ID: %s, Connected Peers: %d\n", fmt.Printf("🐝 Bzzz Node Status - ID: %s, Connected Peers: %d\n",
n.ID().ShortString(), len(peers)) n.ID().ShortString(), len(peers))
if len(peers) > 0 { if len(peers) > 0 {
fmt.Printf(" Connected to: ") fmt.Printf(" Connected to: ")
for i, p := range peers { for i, p := range peers {
@@ -212,4 +211,4 @@ func (n *Node) Close() error {
} }
n.cancel() n.cancel()
return n.host.Close() return n.host.Close()
} }

View File

@@ -7,7 +7,7 @@ import (
"io" "io"
"net/http" "net/http"
"os" "os"
"os/signal" "signal"
"strings" "strings"
"sync" "sync"
"syscall" "syscall"

View File

@@ -100,7 +100,6 @@ type V2Config struct {
type DHTConfig struct { type DHTConfig struct {
Enabled bool `yaml:"enabled"` Enabled bool `yaml:"enabled"`
BootstrapPeers []string `yaml:"bootstrap_peers"` BootstrapPeers []string `yaml:"bootstrap_peers"`
MDNSEnabled bool `yaml:"mdns_enabled"`
} }
// UCXLConfig defines UCXL protocol settings // UCXLConfig defines UCXL protocol settings
@@ -193,7 +192,6 @@ func LoadFromEnvironment() (*Config, error) {
DHT: DHTConfig{ DHT: DHTConfig{
Enabled: getEnvBoolOrDefault("CHORUS_DHT_ENABLED", true), Enabled: getEnvBoolOrDefault("CHORUS_DHT_ENABLED", true),
BootstrapPeers: getEnvArrayOrDefault("CHORUS_BOOTSTRAP_PEERS", []string{}), BootstrapPeers: getEnvArrayOrDefault("CHORUS_BOOTSTRAP_PEERS", []string{}),
MDNSEnabled: getEnvBoolOrDefault("CHORUS_MDNS_ENABLED", true),
}, },
}, },
UCXL: UCXLConfig{ UCXL: UCXLConfig{

View File

@@ -0,0 +1,354 @@
package config
import (
"context"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/url"
"os"
"os/signal"
"sync"
"syscall"
"time"
)
// RuntimeConfig provides dynamic configuration with assignment override support
type RuntimeConfig struct {
mu sync.RWMutex
base *Config // Base configuration from environment
over *Config // Override configuration from assignment
}
// AssignmentConfig represents configuration received from WHOOSH assignment
type AssignmentConfig struct {
Role string `json:"role,omitempty"`
Model string `json:"model,omitempty"`
PromptUCXL string `json:"prompt_ucxl,omitempty"`
Specialization string `json:"specialization,omitempty"`
Capabilities []string `json:"capabilities,omitempty"`
Environment map[string]string `json:"environment,omitempty"`
BootstrapPeers []string `json:"bootstrap_peers,omitempty"`
JoinStaggerMS int `json:"join_stagger_ms,omitempty"`
DialsPerSecond int `json:"dials_per_second,omitempty"`
MaxConcurrentDHT int `json:"max_concurrent_dht,omitempty"`
AssignmentID string `json:"assignment_id,omitempty"`
ConfigEpoch int64 `json:"config_epoch,omitempty"`
}
// NewRuntimeConfig creates a new runtime configuration manager
func NewRuntimeConfig(baseConfig *Config) *RuntimeConfig {
return &RuntimeConfig{
base: baseConfig,
over: &Config{}, // Empty override initially
}
}
// Get retrieves a configuration value with override precedence
func (rc *RuntimeConfig) Get(key string) interface{} {
rc.mu.RLock()
defer rc.mu.RUnlock()
// Check override first, then base
if value := rc.getFromConfig(rc.over, key); value != nil {
return value
}
return rc.getFromConfig(rc.base, key)
}
// getFromConfig extracts a value from a config struct by key
func (rc *RuntimeConfig) getFromConfig(cfg *Config, key string) interface{} {
if cfg == nil {
return nil
}
switch key {
case "agent.role":
if cfg.Agent.Role != "" {
return cfg.Agent.Role
}
case "agent.specialization":
if cfg.Agent.Specialization != "" {
return cfg.Agent.Specialization
}
case "agent.capabilities":
if len(cfg.Agent.Capabilities) > 0 {
return cfg.Agent.Capabilities
}
case "agent.models":
if len(cfg.Agent.Models) > 0 {
return cfg.Agent.Models
}
case "agent.default_reasoning_model":
if cfg.Agent.DefaultReasoningModel != "" {
return cfg.Agent.DefaultReasoningModel
}
case "v2.dht.bootstrap_peers":
if len(cfg.V2.DHT.BootstrapPeers) > 0 {
return cfg.V2.DHT.BootstrapPeers
}
}
return nil
}
// GetString retrieves a string configuration value
func (rc *RuntimeConfig) GetString(key string) string {
if value := rc.Get(key); value != nil {
if str, ok := value.(string); ok {
return str
}
}
return ""
}
// GetStringSlice retrieves a string slice configuration value
func (rc *RuntimeConfig) GetStringSlice(key string) []string {
if value := rc.Get(key); value != nil {
if slice, ok := value.([]string); ok {
return slice
}
}
return nil
}
// GetInt retrieves an integer configuration value
func (rc *RuntimeConfig) GetInt(key string) int {
if value := rc.Get(key); value != nil {
if i, ok := value.(int); ok {
return i
}
}
return 0
}
// LoadAssignment loads configuration from WHOOSH assignment endpoint
func (rc *RuntimeConfig) LoadAssignment(ctx context.Context) error {
assignURL := os.Getenv("ASSIGN_URL")
if assignURL == "" {
return nil // No assignment URL configured
}
// Build assignment request URL with task identity
params := url.Values{}
if taskSlot := os.Getenv("TASK_SLOT"); taskSlot != "" {
params.Set("slot", taskSlot)
}
if taskID := os.Getenv("TASK_ID"); taskID != "" {
params.Set("task", taskID)
}
if clusterID := os.Getenv("CHORUS_CLUSTER_ID"); clusterID != "" {
params.Set("cluster", clusterID)
}
fullURL := assignURL
if len(params) > 0 {
fullURL += "?" + params.Encode()
}
// Fetch assignment with timeout
ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
defer cancel()
req, err := http.NewRequestWithContext(ctx, "GET", fullURL, nil)
if err != nil {
return fmt.Errorf("failed to create assignment request: %w", err)
}
client := &http.Client{Timeout: 10 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("assignment request failed: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK {
return fmt.Errorf("assignment request failed with status %d", resp.StatusCode)
}
// Parse assignment response
var assignment AssignmentConfig
if err := json.NewDecoder(resp.Body).Decode(&assignment); err != nil {
return fmt.Errorf("failed to decode assignment response: %w", err)
}
// Apply assignment to override config
if err := rc.applyAssignment(&assignment); err != nil {
return fmt.Errorf("failed to apply assignment: %w", err)
}
fmt.Printf("📥 Loaded assignment: role=%s, model=%s, epoch=%d\n",
assignment.Role, assignment.Model, assignment.ConfigEpoch)
return nil
}
// LoadAssignmentFromFile loads configuration from a file (for config objects)
func (rc *RuntimeConfig) LoadAssignmentFromFile(filePath string) error {
if filePath == "" {
return nil // No file configured
}
data, err := ioutil.ReadFile(filePath)
if err != nil {
return fmt.Errorf("failed to read assignment file %s: %w", filePath, err)
}
var assignment AssignmentConfig
if err := json.Unmarshal(data, &assignment); err != nil {
return fmt.Errorf("failed to parse assignment file: %w", err)
}
if err := rc.applyAssignment(&assignment); err != nil {
return fmt.Errorf("failed to apply file assignment: %w", err)
}
fmt.Printf("📁 Loaded assignment from file: role=%s, model=%s\n",
assignment.Role, assignment.Model)
return nil
}
// applyAssignment applies an assignment to the override configuration
func (rc *RuntimeConfig) applyAssignment(assignment *AssignmentConfig) error {
rc.mu.Lock()
defer rc.mu.Unlock()
// Create new override config
override := &Config{
Agent: AgentConfig{
Role: assignment.Role,
Specialization: assignment.Specialization,
Capabilities: assignment.Capabilities,
DefaultReasoningModel: assignment.Model,
},
V2: V2Config{
DHT: DHTConfig{
BootstrapPeers: assignment.BootstrapPeers,
},
},
}
// Handle models array
if assignment.Model != "" {
override.Agent.Models = []string{assignment.Model}
}
// Apply environment variables from assignment
for key, value := range assignment.Environment {
os.Setenv(key, value)
}
rc.over = override
return nil
}
// StartReloadHandler starts a signal handler for configuration reload (SIGHUP)
func (rc *RuntimeConfig) StartReloadHandler(ctx context.Context) {
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGHUP)
go func() {
for {
select {
case <-ctx.Done():
return
case <-sigChan:
fmt.Println("🔄 Received SIGHUP, reloading configuration...")
if err := rc.LoadAssignment(ctx); err != nil {
fmt.Printf("⚠️ Failed to reload assignment: %v\n", err)
} else {
fmt.Println("✅ Configuration reloaded successfully")
}
}
}
}()
}
// GetBaseConfig returns the base configuration (from environment)
func (rc *RuntimeConfig) GetBaseConfig() *Config {
rc.mu.RLock()
defer rc.mu.RUnlock()
return rc.base
}
// GetEffectiveConfig returns the effective merged configuration
func (rc *RuntimeConfig) GetEffectiveConfig() *Config {
rc.mu.RLock()
defer rc.mu.RUnlock()
// Start with base config
effective := *rc.base
// Apply overrides
if rc.over.Agent.Role != "" {
effective.Agent.Role = rc.over.Agent.Role
}
if rc.over.Agent.Specialization != "" {
effective.Agent.Specialization = rc.over.Agent.Specialization
}
if len(rc.over.Agent.Capabilities) > 0 {
effective.Agent.Capabilities = rc.over.Agent.Capabilities
}
if len(rc.over.Agent.Models) > 0 {
effective.Agent.Models = rc.over.Agent.Models
}
if rc.over.Agent.DefaultReasoningModel != "" {
effective.Agent.DefaultReasoningModel = rc.over.Agent.DefaultReasoningModel
}
if len(rc.over.V2.DHT.BootstrapPeers) > 0 {
effective.V2.DHT.BootstrapPeers = rc.over.V2.DHT.BootstrapPeers
}
return &effective
}
// GetAssignmentStats returns assignment statistics for monitoring
func (rc *RuntimeConfig) GetAssignmentStats() map[string]interface{} {
rc.mu.RLock()
defer rc.mu.RUnlock()
hasOverride := rc.over.Agent.Role != "" ||
rc.over.Agent.Specialization != "" ||
len(rc.over.Agent.Capabilities) > 0 ||
len(rc.over.V2.DHT.BootstrapPeers) > 0
stats := map[string]interface{}{
"has_assignment": hasOverride,
"assign_url": os.Getenv("ASSIGN_URL"),
"task_slot": os.Getenv("TASK_SLOT"),
"task_id": os.Getenv("TASK_ID"),
}
if hasOverride {
stats["assigned_role"] = rc.over.Agent.Role
stats["assigned_specialization"] = rc.over.Agent.Specialization
stats["assigned_capabilities"] = rc.over.Agent.Capabilities
stats["assigned_models"] = rc.over.Agent.Models
stats["bootstrap_peers_count"] = len(rc.over.V2.DHT.BootstrapPeers)
}
return stats
}
// InitializeAssignmentFromEnv initializes assignment from environment variables
func (rc *RuntimeConfig) InitializeAssignmentFromEnv(ctx context.Context) error {
// Try loading from assignment URL first
if err := rc.LoadAssignment(ctx); err != nil {
fmt.Printf("⚠️ Failed to load assignment from URL: %v\n", err)
}
// Try loading from file (for config objects)
if assignFile := os.Getenv("ASSIGNMENT_FILE"); assignFile != "" {
if err := rc.LoadAssignmentFromFile(assignFile); err != nil {
fmt.Printf("⚠️ Failed to load assignment from file: %v\n", err)
}
}
// Start reload handler for SIGHUP
rc.StartReloadHandler(ctx)
return nil
}