From b6634e4c1b1fa4f7117383b8657f11ca93eab15f Mon Sep 17 00:00:00 2001 From: anthonyrawlins Date: Sat, 6 Sep 2025 14:47:41 +1000 Subject: [PATCH] refactor CHORUS --- HAP_ACTION_PLAN.md | 10 +- api/setup_manager.go | 8 +- p2p/config.go | 4 +- pkg/config/config_test.go | 12 +- pkg/config/hybrid_config.go | 76 +++++----- pkg/crypto/README.md | 14 +- pkg/crypto/security_test.go | 4 +- pkg/mcp/server.go | 14 +- pkg/metrics/prometheus_metrics.go | 176 ++++++++++++------------ pkg/protocol/uri.go | 10 +- pkg/slurp/leader/config.go | 58 ++++---- pkg/slurp/leader/enhanced_manager.go | 4 +- pkg/slurp/leader/integration_example.go | 4 +- pkg/ucxi/server.go | 2 +- pubsub/pubsub.go | 52 +++---- pubsub/pubsub_test.go | 2 +- 16 files changed, 225 insertions(+), 225 deletions(-) diff --git a/HAP_ACTION_PLAN.md b/HAP_ACTION_PLAN.md index 8bc1cf9..ffb3b82 100644 --- a/HAP_ACTION_PLAN.md +++ b/HAP_ACTION_PLAN.md @@ -133,7 +133,7 @@ CHORUS currently implements a **comprehensive P2P autonomous agent system** with | **HAP Interface** | ❌ Not implemented | Required | **Phase 2-3** | ### Shared Runtime Components -Both `bzzz-agent` and `bzzz-hap` will share: +Both `chorus-agent` and `chorus-hap` will share: - **P2P networking** and peer discovery - **Agent identity** and cryptographic signing @@ -155,7 +155,7 @@ Both `bzzz-agent` and `bzzz-hap` will share: 4. **Gradual enhancement** - start with basic HAP, add features incrementally ### Key Principles -- **Backward compatibility**: Existing BZZZ deployments unaffected +- **Backward compatibility**: Existing CHORUS deployments unaffected - **Shared protocols**: Human and machine agents are indistinguishable on P2P mesh - **Common codebase**: Maximum code reuse between binaries - **Incremental delivery**: Each phase delivers working functionality @@ -171,7 +171,7 @@ Both `bzzz-agent` and `bzzz-hap` will share: ## 📈 Success Metrics ### Phase 1 Success -- [ ] `make build` produces both `bzzz-agent` and `bzzz-hap` binaries +- [ ] `make build` produces both `chorus-agent` and `chorus-hap` binaries - [ ] Existing autonomous agent functionality unchanged - [ ] Both binaries can join same P2P mesh @@ -217,7 +217,7 @@ Both `bzzz-agent` and `bzzz-hap` will share: ## 📚 Resources & References -- **Original HAP Plan**: `archive/bzzz_hap_dev_plan.md` +- **Original HAP Plan**: `archive/chorus_hap_dev_plan.md` - **Current Architecture**: `pkg/` directory structure - **P2P Infrastructure**: `p2p/`, `pubsub/`, `pkg/dht/` - **Agent Identity**: `pkg/agentid/`, `pkg/crypto/` @@ -225,4 +225,4 @@ Both `bzzz-agent` and `bzzz-hap` will share: - **Context System**: `pkg/ucxl/`, `pkg/ucxi/` - **Configuration**: `pkg/config/`, role definitions -The current BZZZ implementation provides an excellent foundation for the HAP vision. The primary challenge is architectural restructuring rather than building new functionality from scratch. +The current CHORUS implementation provides an excellent foundation for the HAP vision. The primary challenge is architectural restructuring rather than building new functionality from scratch. diff --git a/api/setup_manager.go b/api/setup_manager.go index 3d6d2c4..b0361ad 100644 --- a/api/setup_manager.go +++ b/api/setup_manager.go @@ -1018,9 +1018,9 @@ func (s *SetupManager) executeSudoCommand(client *ssh.Client, password string, c // SECURITY: Use here-document to avoid password exposure in process list // This keeps the password out of command line arguments and process lists escapedPassword := strings.ReplaceAll(password, "'", "'\"'\"'") - secureCommand := fmt.Sprintf(`sudo -S %s <<'BZZZ_EOF' + secureCommand := fmt.Sprintf(`sudo -S %s <<'CHORUS_EOF' %s -BZZZ_EOF`, safeCommand, escapedPassword) +CHORUS_EOF`, safeCommand, escapedPassword) return s.executeSSHCommand(client, secureCommand) } else { @@ -1977,7 +1977,7 @@ github: p2p: service_tag: "CHORUS-peer-discovery" - bzzz_topic: "CHORUS/coordination/v1" + chorus_topic: "CHORUS/coordination/v1" hmmm_topic: "hmmm/meta-discussion/v1" discovery_timeout: 10s escalation_webhook: "https://n8n.home.deepblack.cloud/webhook-test/human-escalation" @@ -2160,7 +2160,7 @@ github: p2p: service_tag: "CHORUS-peer-discovery" - bzzz_topic: "CHORUS/coordination/v1" + chorus_topic: "CHORUS/coordination/v1" hmmm_topic: "hmmm/meta-discussion/v1" discovery_timeout: 10s escalation_webhook: "" diff --git a/p2p/config.go b/p2p/config.go index fd0e001..4d35f94 100644 --- a/p2p/config.go +++ b/p2p/config.go @@ -131,9 +131,9 @@ func WithPubsub(enabled bool) Option { } // WithTopics sets the Bzzz and HMMM topic names -func WithTopics(bzzzTopic, hmmmTopic string) Option { +func WithTopics(chorusTopic, hmmmTopic string) Option { return func(c *Config) { - c.BzzzTopic = bzzzTopic + c.BzzzTopic = chorusTopic c.HmmmTopic = hmmmTopic } } diff --git a/pkg/config/config_test.go b/pkg/config/config_test.go index 21ce85d..0ff4996 100644 --- a/pkg/config/config_test.go +++ b/pkg/config/config_test.go @@ -241,13 +241,13 @@ func TestConfig_GetBootstrapPeers(t *testing.T) { func TestConfigWithEnvironmentOverrides(t *testing.T) { // Set environment variables - os.Setenv("BZZZ_AGENT_ID", "env-test-agent") - os.Setenv("BZZZ_P2P_PORT", "9999") - os.Setenv("BZZZ_ENCRYPTION_ENABLED", "false") + os.Setenv("CHORUS_AGENT_ID", "env-test-agent") + os.Setenv("CHORUS_P2P_PORT", "9999") + os.Setenv("CHORUS_ENCRYPTION_ENABLED", "false") defer func() { - os.Unsetenv("BZZZ_AGENT_ID") - os.Unsetenv("BZZZ_P2P_PORT") - os.Unsetenv("BZZZ_ENCRYPTION_ENABLED") + os.Unsetenv("CHORUS_AGENT_ID") + os.Unsetenv("CHORUS_P2P_PORT") + os.Unsetenv("CHORUS_ENCRYPTION_ENABLED") }() cfg := DefaultConfig() diff --git a/pkg/config/hybrid_config.go b/pkg/config/hybrid_config.go index 0c562b6..368fb55 100644 --- a/pkg/config/hybrid_config.go +++ b/pkg/config/hybrid_config.go @@ -24,34 +24,34 @@ type HybridConfig struct { } type HybridDHTConfig struct { - Backend string `env:"BZZZ_DHT_BACKEND" default:"mock" json:"backend" yaml:"backend"` - BootstrapNodes []string `env:"BZZZ_DHT_BOOTSTRAP_NODES" json:"bootstrap_nodes" yaml:"bootstrap_nodes"` - FallbackOnError bool `env:"BZZZ_FALLBACK_ON_ERROR" default:"true" json:"fallback_on_error" yaml:"fallback_on_error"` - HealthCheckInterval time.Duration `env:"BZZZ_HEALTH_CHECK_INTERVAL" default:"30s" json:"health_check_interval" yaml:"health_check_interval"` - MaxRetries int `env:"BZZZ_DHT_MAX_RETRIES" default:"3" json:"max_retries" yaml:"max_retries"` - RetryBackoff time.Duration `env:"BZZZ_DHT_RETRY_BACKOFF" default:"1s" json:"retry_backoff" yaml:"retry_backoff"` - OperationTimeout time.Duration `env:"BZZZ_DHT_OPERATION_TIMEOUT" default:"10s" json:"operation_timeout" yaml:"operation_timeout"` + Backend string `env:"CHORUS_DHT_BACKEND" default:"mock" json:"backend" yaml:"backend"` + BootstrapNodes []string `env:"CHORUS_DHT_BOOTSTRAP_NODES" json:"bootstrap_nodes" yaml:"bootstrap_nodes"` + FallbackOnError bool `env:"CHORUS_FALLBACK_ON_ERROR" default:"true" json:"fallback_on_error" yaml:"fallback_on_error"` + HealthCheckInterval time.Duration `env:"CHORUS_HEALTH_CHECK_INTERVAL" default:"30s" json:"health_check_interval" yaml:"health_check_interval"` + MaxRetries int `env:"CHORUS_DHT_MAX_RETRIES" default:"3" json:"max_retries" yaml:"max_retries"` + RetryBackoff time.Duration `env:"CHORUS_DHT_RETRY_BACKOFF" default:"1s" json:"retry_backoff" yaml:"retry_backoff"` + OperationTimeout time.Duration `env:"CHORUS_DHT_OPERATION_TIMEOUT" default:"10s" json:"operation_timeout" yaml:"operation_timeout"` } type HybridUCXLConfig struct { - CacheEnabled bool `env:"BZZZ_UCXL_CACHE_ENABLED" default:"true" json:"cache_enabled" yaml:"cache_enabled"` - CacheTTL time.Duration `env:"BZZZ_UCXL_CACHE_TTL" default:"5m" json:"cache_ttl" yaml:"cache_ttl"` - UseDistributed bool `env:"BZZZ_UCXL_USE_DISTRIBUTED" default:"false" json:"use_distributed" yaml:"use_distributed"` - MaxCacheSize int `env:"BZZZ_UCXL_MAX_CACHE_SIZE" default:"10000" json:"max_cache_size" yaml:"max_cache_size"` + CacheEnabled bool `env:"CHORUS_UCXL_CACHE_ENABLED" default:"true" json:"cache_enabled" yaml:"cache_enabled"` + CacheTTL time.Duration `env:"CHORUS_UCXL_CACHE_TTL" default:"5m" json:"cache_ttl" yaml:"cache_ttl"` + UseDistributed bool `env:"CHORUS_UCXL_USE_DISTRIBUTED" default:"false" json:"use_distributed" yaml:"use_distributed"` + MaxCacheSize int `env:"CHORUS_UCXL_MAX_CACHE_SIZE" default:"10000" json:"max_cache_size" yaml:"max_cache_size"` } type DiscoveryConfig struct { - MDNSEnabled bool `env:"BZZZ_MDNS_ENABLED" default:"true" json:"mdns_enabled" yaml:"mdns_enabled"` - DHTDiscovery bool `env:"BZZZ_DHT_DISCOVERY" default:"false" json:"dht_discovery" yaml:"dht_discovery"` - AnnounceInterval time.Duration `env:"BZZZ_ANNOUNCE_INTERVAL" default:"30s" json:"announce_interval" yaml:"announce_interval"` - ServiceName string `env:"BZZZ_SERVICE_NAME" default:"CHORUS" json:"service_name" yaml:"service_name"` + MDNSEnabled bool `env:"CHORUS_MDNS_ENABLED" default:"true" json:"mdns_enabled" yaml:"mdns_enabled"` + DHTDiscovery bool `env:"CHORUS_DHT_DISCOVERY" default:"false" json:"dht_discovery" yaml:"dht_discovery"` + AnnounceInterval time.Duration `env:"CHORUS_ANNOUNCE_INTERVAL" default:"30s" json:"announce_interval" yaml:"announce_interval"` + ServiceName string `env:"CHORUS_SERVICE_NAME" default:"CHORUS" json:"service_name" yaml:"service_name"` } type MonitoringConfig struct { - Enabled bool `env:"BZZZ_MONITORING_ENABLED" default:"true" json:"enabled" yaml:"enabled"` - MetricsInterval time.Duration `env:"BZZZ_METRICS_INTERVAL" default:"15s" json:"metrics_interval" yaml:"metrics_interval"` - HealthEndpoint string `env:"BZZZ_HEALTH_ENDPOINT" default:"/health" json:"health_endpoint" yaml:"health_endpoint"` - MetricsEndpoint string `env:"BZZZ_METRICS_ENDPOINT" default:"/metrics" json:"metrics_endpoint" yaml:"metrics_endpoint"` + Enabled bool `env:"CHORUS_MONITORING_ENABLED" default:"true" json:"enabled" yaml:"enabled"` + MetricsInterval time.Duration `env:"CHORUS_METRICS_INTERVAL" default:"15s" json:"metrics_interval" yaml:"metrics_interval"` + HealthEndpoint string `env:"CHORUS_HEALTH_ENDPOINT" default:"/health" json:"health_endpoint" yaml:"health_endpoint"` + MetricsEndpoint string `env:"CHORUS_METRICS_ENDPOINT" default:"/metrics" json:"metrics_endpoint" yaml:"metrics_endpoint"` } // LoadHybridConfig loads configuration from environment variables with defaults @@ -60,37 +60,37 @@ func LoadHybridConfig() (*HybridConfig, error) { // Load DHT configuration config.DHT = HybridDHTConfig{ - Backend: getEnvString("BZZZ_DHT_BACKEND", "mock"), - BootstrapNodes: getEnvStringSlice("BZZZ_DHT_BOOTSTRAP_NODES", []string{}), - FallbackOnError: getEnvBool("BZZZ_FALLBACK_ON_ERROR", true), - HealthCheckInterval: getEnvDuration("BZZZ_HEALTH_CHECK_INTERVAL", 30*time.Second), - MaxRetries: getEnvInt("BZZZ_DHT_MAX_RETRIES", 3), - RetryBackoff: getEnvDuration("BZZZ_DHT_RETRY_BACKOFF", 1*time.Second), - OperationTimeout: getEnvDuration("BZZZ_DHT_OPERATION_TIMEOUT", 10*time.Second), + Backend: getEnvString("CHORUS_DHT_BACKEND", "mock"), + BootstrapNodes: getEnvStringSlice("CHORUS_DHT_BOOTSTRAP_NODES", []string{}), + FallbackOnError: getEnvBool("CHORUS_FALLBACK_ON_ERROR", true), + HealthCheckInterval: getEnvDuration("CHORUS_HEALTH_CHECK_INTERVAL", 30*time.Second), + MaxRetries: getEnvInt("CHORUS_DHT_MAX_RETRIES", 3), + RetryBackoff: getEnvDuration("CHORUS_DHT_RETRY_BACKOFF", 1*time.Second), + OperationTimeout: getEnvDuration("CHORUS_DHT_OPERATION_TIMEOUT", 10*time.Second), } // Load UCXL configuration config.UCXL = HybridUCXLConfig{ - CacheEnabled: getEnvBool("BZZZ_UCXL_CACHE_ENABLED", true), - CacheTTL: getEnvDuration("BZZZ_UCXL_CACHE_TTL", 5*time.Minute), - UseDistributed: getEnvBool("BZZZ_UCXL_USE_DISTRIBUTED", false), - MaxCacheSize: getEnvInt("BZZZ_UCXL_MAX_CACHE_SIZE", 10000), + CacheEnabled: getEnvBool("CHORUS_UCXL_CACHE_ENABLED", true), + CacheTTL: getEnvDuration("CHORUS_UCXL_CACHE_TTL", 5*time.Minute), + UseDistributed: getEnvBool("CHORUS_UCXL_USE_DISTRIBUTED", false), + MaxCacheSize: getEnvInt("CHORUS_UCXL_MAX_CACHE_SIZE", 10000), } // Load Discovery configuration config.Discovery = DiscoveryConfig{ - MDNSEnabled: getEnvBool("BZZZ_MDNS_ENABLED", true), - DHTDiscovery: getEnvBool("BZZZ_DHT_DISCOVERY", false), - AnnounceInterval: getEnvDuration("BZZZ_ANNOUNCE_INTERVAL", 30*time.Second), - ServiceName: getEnvString("BZZZ_SERVICE_NAME", "CHORUS"), + MDNSEnabled: getEnvBool("CHORUS_MDNS_ENABLED", true), + DHTDiscovery: getEnvBool("CHORUS_DHT_DISCOVERY", false), + AnnounceInterval: getEnvDuration("CHORUS_ANNOUNCE_INTERVAL", 30*time.Second), + ServiceName: getEnvString("CHORUS_SERVICE_NAME", "CHORUS"), } // Load Monitoring configuration config.Monitoring = MonitoringConfig{ - Enabled: getEnvBool("BZZZ_MONITORING_ENABLED", true), - MetricsInterval: getEnvDuration("BZZZ_METRICS_INTERVAL", 15*time.Second), - HealthEndpoint: getEnvString("BZZZ_HEALTH_ENDPOINT", "/health"), - MetricsEndpoint: getEnvString("BZZZ_METRICS_ENDPOINT", "/metrics"), + Enabled: getEnvBool("CHORUS_MONITORING_ENABLED", true), + MetricsInterval: getEnvDuration("CHORUS_METRICS_INTERVAL", 15*time.Second), + HealthEndpoint: getEnvString("CHORUS_HEALTH_ENDPOINT", "/health"), + MetricsEndpoint: getEnvString("CHORUS_METRICS_ENDPOINT", "/metrics"), } // Validate configuration diff --git a/pkg/crypto/README.md b/pkg/crypto/README.md index 0e3f407..0fe3791 100644 --- a/pkg/crypto/README.md +++ b/pkg/crypto/README.md @@ -606,9 +606,9 @@ services: CHORUS-crypto: image: CHORUS/crypto-service:latest environment: - - BZZZ_CONFIG_PATH=/etc/CHORUS/config.yaml - - BZZZ_LOG_LEVEL=info - - BZZZ_AUDIT_STORAGE=postgresql + - CHORUS_CONFIG_PATH=/etc/CHORUS/config.yaml + - CHORUS_LOG_LEVEL=info + - CHORUS_AUDIT_STORAGE=postgresql volumes: - ./config:/etc/CHORUS - ./logs:/var/log/CHORUS @@ -621,7 +621,7 @@ services: postgresql: image: postgres:13 environment: - - POSTGRES_DB=bzzz_audit + - POSTGRES_DB=chorus_audit - POSTGRES_USER=CHORUS - POSTGRES_PASSWORD_FILE=/run/secrets/db_password volumes: @@ -676,9 +676,9 @@ spec: - containerPort: 8443 name: https env: - - name: BZZZ_CONFIG_PATH + - name: CHORUS_CONFIG_PATH value: "/etc/CHORUS/config.yaml" - - name: BZZZ_LOG_LEVEL + - name: CHORUS_LOG_LEVEL value: "info" volumeMounts: - name: config @@ -761,7 +761,7 @@ key_integrity_status{role="backend_developer",status="valid"} ```yaml # Prometheus alerting rules groups: -- name: bzzz_crypto_security +- name: chorus_crypto_security rules: - alert: HighSecurityRiskAccess expr: security_risk_score > 0.8 diff --git a/pkg/crypto/security_test.go b/pkg/crypto/security_test.go index cba34cb..19639e0 100644 --- a/pkg/crypto/security_test.go +++ b/pkg/crypto/security_test.go @@ -15,7 +15,7 @@ import ( // TestSecurityConfig tests SecurityConfig enforcement func TestSecurityConfig(t *testing.T) { // Create temporary audit log file - tmpDir, err := ioutil.TempDir("", "bzzz_security_test") + tmpDir, err := ioutil.TempDir("", "chorus_security_test") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } @@ -259,7 +259,7 @@ func TestDHTSecurityIntegration(t *testing.T) { // TestAuditLogging tests comprehensive audit logging func TestAuditLogging(t *testing.T) { - tmpDir, err := ioutil.TempDir("", "bzzz_audit_test") + tmpDir, err := ioutil.TempDir("", "chorus_audit_test") if err != nil { t.Fatalf("Failed to create temp dir: %v", err) } diff --git a/pkg/mcp/server.go b/pkg/mcp/server.go index f340b0d..f45da5d 100644 --- a/pkg/mcp/server.go +++ b/pkg/mcp/server.go @@ -445,24 +445,24 @@ func (s *McpServer) callTool(params map[string]interface{}) (map[string]interfac args, _ := params["arguments"].(map[string]interface{}) switch toolName { - case "bzzz_announce": + case "chorus_announce": return s.handleBzzzAnnounce(args) - case "bzzz_lookup": + case "chorus_lookup": return s.handleBzzzLookup(args) - case "bzzz_get": + case "chorus_get": return s.handleBzzzGet(args) - case "bzzz_post": + case "chorus_post": return s.handleBzzzPost(args) - case "bzzz_thread": + case "chorus_thread": return s.handleBzzzThread(args) - case "bzzz_subscribe": + case "chorus_subscribe": return s.handleBzzzSubscribe(args) default: return nil, fmt.Errorf("unknown tool: %s", toolName) } } -// handleBzzzAnnounce implements the bzzz_announce tool +// handleBzzzAnnounce implements the chorus_announce tool func (s *McpServer) handleBzzzAnnounce(args map[string]interface{}) (map[string]interface{}, error) { agentID, ok := args["agent_id"].(string) if !ok { diff --git a/pkg/metrics/prometheus_metrics.go b/pkg/metrics/prometheus_metrics.go index 08b1cc9..465b0ce 100644 --- a/pkg/metrics/prometheus_metrics.go +++ b/pkg/metrics/prometheus_metrics.go @@ -13,8 +13,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" ) -// BZZZMetrics provides comprehensive Prometheus metrics for the CHORUS system -type BZZZMetrics struct { +// CHORUSMetrics provides comprehensive Prometheus metrics for the CHORUS system +type CHORUSMetrics struct { registry *prometheus.Registry httpServer *http.Server @@ -138,15 +138,15 @@ func DefaultMetricsConfig() *MetricsConfig { } } -// NewBZZZMetrics creates a new metrics collector -func NewBZZZMetrics(config *MetricsConfig) *BZZZMetrics { +// NewCHORUSMetrics creates a new metrics collector +func NewCHORUSMetrics(config *MetricsConfig) *CHORUSMetrics { if config == nil { config = DefaultMetricsConfig() } registry := prometheus.NewRegistry() - metrics := &BZZZMetrics{ + metrics := &CHORUSMetrics{ registry: registry, startTime: time.Now(), } @@ -161,11 +161,11 @@ func NewBZZZMetrics(config *MetricsConfig) *BZZZMetrics { } // initializeMetrics initializes all Prometheus metrics -func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { +func (m *CHORUSMetrics) initializeMetrics(config *MetricsConfig) { // System metrics m.systemInfo = promauto.NewGaugeVec( prometheus.GaugeOpts{ - Name: "bzzz_system_info", + Name: "chorus_system_info", Help: "System information", }, []string{"node_id", "version", "go_version", "cluster", "environment"}, @@ -173,7 +173,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.uptime = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_uptime_seconds", + Name: "chorus_uptime_seconds", Help: "System uptime in seconds", }, ) @@ -181,14 +181,14 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // P2P metrics m.p2pConnectedPeers = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_p2p_connected_peers", + Name: "chorus_p2p_connected_peers", Help: "Number of connected P2P peers", }, ) m.p2pMessagesSent = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_p2p_messages_sent_total", + Name: "chorus_p2p_messages_sent_total", Help: "Total number of P2P messages sent", }, []string{"message_type", "peer_id"}, @@ -196,7 +196,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.p2pMessagesReceived = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_p2p_messages_received_total", + Name: "chorus_p2p_messages_received_total", Help: "Total number of P2P messages received", }, []string{"message_type", "peer_id"}, @@ -204,7 +204,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.p2pMessageLatency = promauto.NewHistogramVec( prometheus.HistogramOpts{ - Name: "bzzz_p2p_message_latency_seconds", + Name: "chorus_p2p_message_latency_seconds", Help: "P2P message round-trip latency", Buckets: config.LatencyBuckets, }, @@ -214,7 +214,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // DHT metrics m.dhtPutOperations = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_dht_put_operations_total", + Name: "chorus_dht_put_operations_total", Help: "Total number of DHT put operations", }, []string{"status"}, @@ -222,7 +222,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.dhtGetOperations = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_dht_get_operations_total", + Name: "chorus_dht_get_operations_total", Help: "Total number of DHT get operations", }, []string{"status"}, @@ -230,7 +230,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.dhtOperationLatency = promauto.NewHistogramVec( prometheus.HistogramOpts{ - Name: "bzzz_dht_operation_latency_seconds", + Name: "chorus_dht_operation_latency_seconds", Help: "DHT operation latency", Buckets: config.LatencyBuckets, }, @@ -239,21 +239,21 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.dhtProviderRecords = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_dht_provider_records", + Name: "chorus_dht_provider_records", Help: "Number of DHT provider records", }, ) m.dhtContentKeys = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_dht_content_keys", + Name: "chorus_dht_content_keys", Help: "Number of DHT content keys", }, ) m.dhtReplicationFactor = promauto.NewGaugeVec( prometheus.GaugeOpts{ - Name: "bzzz_dht_replication_factor", + Name: "chorus_dht_replication_factor", Help: "DHT replication factor by key", }, []string{"key_hash"}, @@ -262,14 +262,14 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // PubSub metrics m.pubsubTopics = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_pubsub_topics", + Name: "chorus_pubsub_topics", Help: "Number of active PubSub topics", }, ) m.pubsubMessages = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_pubsub_messages_total", + Name: "chorus_pubsub_messages_total", Help: "Total number of PubSub messages", }, []string{"topic", "direction", "message_type"}, @@ -277,7 +277,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.pubsubMessageLatency = promauto.NewHistogramVec( prometheus.HistogramOpts{ - Name: "bzzz_pubsub_message_latency_seconds", + Name: "chorus_pubsub_message_latency_seconds", Help: "PubSub message latency", Buckets: config.LatencyBuckets, }, @@ -287,14 +287,14 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // Election metrics m.electionTerm = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_election_term", + Name: "chorus_election_term", Help: "Current election term", }, ) m.electionState = promauto.NewGaugeVec( prometheus.GaugeOpts{ - Name: "bzzz_election_state", + Name: "chorus_election_state", Help: "Current election state (1 for active state)", }, []string{"state"}, @@ -302,21 +302,21 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.heartbeatsSent = promauto.NewCounter( prometheus.CounterOpts{ - Name: "bzzz_heartbeats_sent_total", + Name: "chorus_heartbeats_sent_total", Help: "Total number of heartbeats sent", }, ) m.heartbeatsReceived = promauto.NewCounter( prometheus.CounterOpts{ - Name: "bzzz_heartbeats_received_total", + Name: "chorus_heartbeats_received_total", Help: "Total number of heartbeats received", }, ) m.leadershipChanges = promauto.NewCounter( prometheus.CounterOpts{ - Name: "bzzz_leadership_changes_total", + Name: "chorus_leadership_changes_total", Help: "Total number of leadership changes", }, ) @@ -324,7 +324,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // Health metrics m.healthChecksPassed = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_health_checks_passed_total", + Name: "chorus_health_checks_passed_total", Help: "Total number of health checks passed", }, []string{"check_name"}, @@ -332,7 +332,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.healthChecksFailed = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_health_checks_failed_total", + Name: "chorus_health_checks_failed_total", Help: "Total number of health checks failed", }, []string{"check_name", "reason"}, @@ -340,14 +340,14 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.systemHealthScore = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_system_health_score", + Name: "chorus_system_health_score", Help: "Overall system health score (0-1)", }, ) m.componentHealthScore = promauto.NewGaugeVec( prometheus.GaugeOpts{ - Name: "bzzz_component_health_score", + Name: "chorus_component_health_score", Help: "Component health score (0-1)", }, []string{"component"}, @@ -356,21 +356,21 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // Task metrics m.tasksActive = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_tasks_active", + Name: "chorus_tasks_active", Help: "Number of active tasks", }, ) m.tasksQueued = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_tasks_queued", + Name: "chorus_tasks_queued", Help: "Number of queued tasks", }, ) m.tasksCompleted = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_tasks_completed_total", + Name: "chorus_tasks_completed_total", Help: "Total number of completed tasks", }, []string{"status", "task_type"}, @@ -378,7 +378,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.taskDuration = promauto.NewHistogramVec( prometheus.HistogramOpts{ - Name: "bzzz_task_duration_seconds", + Name: "chorus_task_duration_seconds", Help: "Task execution duration", Buckets: config.LatencyBuckets, }, @@ -388,7 +388,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // SLURP metrics m.slurpGenerated = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_slurp_contexts_generated_total", + Name: "chorus_slurp_contexts_generated_total", Help: "Total number of contexts generated by SLURP", }, []string{"role", "status"}, @@ -396,7 +396,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.slurpGenerationTime = promauto.NewHistogram( prometheus.HistogramOpts{ - Name: "bzzz_slurp_generation_time_seconds", + Name: "chorus_slurp_generation_time_seconds", Help: "SLURP context generation time", Buckets: []float64{0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0, 60.0, 120.0}, }, @@ -404,7 +404,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.slurpQueueLength = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_slurp_queue_length", + Name: "chorus_slurp_queue_length", Help: "Length of SLURP generation queue", }, ) @@ -412,7 +412,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // UCXI metrics m.ucxiRequests = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_ucxi_requests_total", + Name: "chorus_ucxi_requests_total", Help: "Total number of UCXI requests", }, []string{"method", "status"}, @@ -420,7 +420,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.ucxiResolutionLatency = promauto.NewHistogram( prometheus.HistogramOpts{ - Name: "bzzz_ucxi_resolution_latency_seconds", + Name: "chorus_ucxi_resolution_latency_seconds", Help: "UCXI address resolution latency", Buckets: config.LatencyBuckets, }, @@ -429,21 +429,21 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // Resource metrics m.cpuUsage = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_cpu_usage_ratio", + Name: "chorus_cpu_usage_ratio", Help: "CPU usage ratio (0-1)", }, ) m.memoryUsage = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_memory_usage_bytes", + Name: "chorus_memory_usage_bytes", Help: "Memory usage in bytes", }, ) m.diskUsage = promauto.NewGaugeVec( prometheus.GaugeOpts{ - Name: "bzzz_disk_usage_ratio", + Name: "chorus_disk_usage_ratio", Help: "Disk usage ratio (0-1)", }, []string{"mount_point"}, @@ -451,7 +451,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.goroutines = promauto.NewGauge( prometheus.GaugeOpts{ - Name: "bzzz_goroutines", + Name: "chorus_goroutines", Help: "Number of goroutines", }, ) @@ -459,7 +459,7 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { // Error metrics m.errors = promauto.NewCounterVec( prometheus.CounterOpts{ - Name: "bzzz_errors_total", + Name: "chorus_errors_total", Help: "Total number of errors", }, []string{"component", "error_type"}, @@ -467,20 +467,20 @@ func (m *BZZZMetrics) initializeMetrics(config *MetricsConfig) { m.panics = promauto.NewCounter( prometheus.CounterOpts{ - Name: "bzzz_panics_total", + Name: "chorus_panics_total", Help: "Total number of panics", }, ) } // registerMetrics registers all metrics with the registry -func (m *BZZZMetrics) registerMetrics() { +func (m *CHORUSMetrics) registerMetrics() { // All metrics are auto-registered with the default registry // For custom registry, we would need to register manually } // StartServer starts the Prometheus metrics HTTP server -func (m *BZZZMetrics) StartServer(config *MetricsConfig) error { +func (m *CHORUSMetrics) StartServer(config *MetricsConfig) error { mux := http.NewServeMux() // Use custom registry @@ -511,7 +511,7 @@ func (m *BZZZMetrics) StartServer(config *MetricsConfig) error { } // StopServer stops the metrics HTTP server -func (m *BZZZMetrics) StopServer() error { +func (m *CHORUSMetrics) StopServer() error { if m.httpServer != nil { ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) defer cancel() @@ -522,69 +522,69 @@ func (m *BZZZMetrics) StopServer() error { // P2P Metrics Methods -func (m *BZZZMetrics) SetConnectedPeers(count int) { +func (m *CHORUSMetrics) SetConnectedPeers(count int) { m.p2pConnectedPeers.Set(float64(count)) } -func (m *BZZZMetrics) IncrementMessagesSent(messageType, peerID string) { +func (m *CHORUSMetrics) IncrementMessagesSent(messageType, peerID string) { m.p2pMessagesSent.WithLabelValues(messageType, peerID).Inc() } -func (m *BZZZMetrics) IncrementMessagesReceived(messageType, peerID string) { +func (m *CHORUSMetrics) IncrementMessagesReceived(messageType, peerID string) { m.p2pMessagesReceived.WithLabelValues(messageType, peerID).Inc() } -func (m *BZZZMetrics) ObserveMessageLatency(messageType string, latency time.Duration) { +func (m *CHORUSMetrics) ObserveMessageLatency(messageType string, latency time.Duration) { m.p2pMessageLatency.WithLabelValues(messageType).Observe(latency.Seconds()) } // DHT Metrics Methods -func (m *BZZZMetrics) IncrementDHTPutOperations(status string) { +func (m *CHORUSMetrics) IncrementDHTPutOperations(status string) { m.dhtPutOperations.WithLabelValues(status).Inc() } -func (m *BZZZMetrics) IncrementDHTGetOperations(status string) { +func (m *CHORUSMetrics) IncrementDHTGetOperations(status string) { m.dhtGetOperations.WithLabelValues(status).Inc() } -func (m *BZZZMetrics) ObserveDHTOperationLatency(operation, status string, latency time.Duration) { +func (m *CHORUSMetrics) ObserveDHTOperationLatency(operation, status string, latency time.Duration) { m.dhtOperationLatency.WithLabelValues(operation, status).Observe(latency.Seconds()) } -func (m *BZZZMetrics) SetDHTProviderRecords(count int) { +func (m *CHORUSMetrics) SetDHTProviderRecords(count int) { m.dhtProviderRecords.Set(float64(count)) } -func (m *BZZZMetrics) SetDHTContentKeys(count int) { +func (m *CHORUSMetrics) SetDHTContentKeys(count int) { m.dhtContentKeys.Set(float64(count)) } -func (m *BZZZMetrics) SetDHTReplicationFactor(keyHash string, factor float64) { +func (m *CHORUSMetrics) SetDHTReplicationFactor(keyHash string, factor float64) { m.dhtReplicationFactor.WithLabelValues(keyHash).Set(factor) } // PubSub Metrics Methods -func (m *BZZZMetrics) SetPubSubTopics(count int) { +func (m *CHORUSMetrics) SetPubSubTopics(count int) { m.pubsubTopics.Set(float64(count)) } -func (m *BZZZMetrics) IncrementPubSubMessages(topic, direction, messageType string) { +func (m *CHORUSMetrics) IncrementPubSubMessages(topic, direction, messageType string) { m.pubsubMessages.WithLabelValues(topic, direction, messageType).Inc() } -func (m *BZZZMetrics) ObservePubSubMessageLatency(topic string, latency time.Duration) { +func (m *CHORUSMetrics) ObservePubSubMessageLatency(topic string, latency time.Duration) { m.pubsubMessageLatency.WithLabelValues(topic).Observe(latency.Seconds()) } // Election Metrics Methods -func (m *BZZZMetrics) SetElectionTerm(term int) { +func (m *CHORUSMetrics) SetElectionTerm(term int) { m.electionTerm.Set(float64(term)) } -func (m *BZZZMetrics) SetElectionState(state string) { +func (m *CHORUSMetrics) SetElectionState(state string) { // Reset all state gauges states := []string{"idle", "discovering", "electing", "reconstructing", "complete"} for _, s := range states { @@ -594,118 +594,118 @@ func (m *BZZZMetrics) SetElectionState(state string) { m.electionState.WithLabelValues(state).Set(1) } -func (m *BZZZMetrics) IncrementHeartbeatsSent() { +func (m *CHORUSMetrics) IncrementHeartbeatsSent() { m.heartbeatsSent.Inc() } -func (m *BZZZMetrics) IncrementHeartbeatsReceived() { +func (m *CHORUSMetrics) IncrementHeartbeatsReceived() { m.heartbeatsReceived.Inc() } -func (m *BZZZMetrics) IncrementLeadershipChanges() { +func (m *CHORUSMetrics) IncrementLeadershipChanges() { m.leadershipChanges.Inc() } // Health Metrics Methods -func (m *BZZZMetrics) IncrementHealthCheckPassed(checkName string) { +func (m *CHORUSMetrics) IncrementHealthCheckPassed(checkName string) { m.healthChecksPassed.WithLabelValues(checkName).Inc() } -func (m *BZZZMetrics) IncrementHealthCheckFailed(checkName, reason string) { +func (m *CHORUSMetrics) IncrementHealthCheckFailed(checkName, reason string) { m.healthChecksFailed.WithLabelValues(checkName, reason).Inc() } -func (m *BZZZMetrics) SetSystemHealthScore(score float64) { +func (m *CHORUSMetrics) SetSystemHealthScore(score float64) { m.systemHealthScore.Set(score) } -func (m *BZZZMetrics) SetComponentHealthScore(component string, score float64) { +func (m *CHORUSMetrics) SetComponentHealthScore(component string, score float64) { m.componentHealthScore.WithLabelValues(component).Set(score) } // Task Metrics Methods -func (m *BZZZMetrics) SetActiveTasks(count int) { +func (m *CHORUSMetrics) SetActiveTasks(count int) { m.tasksActive.Set(float64(count)) } -func (m *BZZZMetrics) SetQueuedTasks(count int) { +func (m *CHORUSMetrics) SetQueuedTasks(count int) { m.tasksQueued.Set(float64(count)) } -func (m *BZZZMetrics) IncrementTasksCompleted(status, taskType string) { +func (m *CHORUSMetrics) IncrementTasksCompleted(status, taskType string) { m.tasksCompleted.WithLabelValues(status, taskType).Inc() } -func (m *BZZZMetrics) ObserveTaskDuration(taskType, status string, duration time.Duration) { +func (m *CHORUSMetrics) ObserveTaskDuration(taskType, status string, duration time.Duration) { m.taskDuration.WithLabelValues(taskType, status).Observe(duration.Seconds()) } // SLURP Metrics Methods -func (m *BZZZMetrics) IncrementSLURPGenerated(role, status string) { +func (m *CHORUSMetrics) IncrementSLURPGenerated(role, status string) { m.slurpGenerated.WithLabelValues(role, status).Inc() } -func (m *BZZZMetrics) ObserveSLURPGenerationTime(duration time.Duration) { +func (m *CHORUSMetrics) ObserveSLURPGenerationTime(duration time.Duration) { m.slurpGenerationTime.Observe(duration.Seconds()) } -func (m *BZZZMetrics) SetSLURPQueueLength(length int) { +func (m *CHORUSMetrics) SetSLURPQueueLength(length int) { m.slurpQueueLength.Set(float64(length)) } // UCXI Metrics Methods -func (m *BZZZMetrics) IncrementUCXIRequests(method, status string) { +func (m *CHORUSMetrics) IncrementUCXIRequests(method, status string) { m.ucxiRequests.WithLabelValues(method, status).Inc() } -func (m *BZZZMetrics) ObserveUCXIResolutionLatency(latency time.Duration) { +func (m *CHORUSMetrics) ObserveUCXIResolutionLatency(latency time.Duration) { m.ucxiResolutionLatency.Observe(latency.Seconds()) } // Resource Metrics Methods -func (m *BZZZMetrics) SetCPUUsage(usage float64) { +func (m *CHORUSMetrics) SetCPUUsage(usage float64) { m.cpuUsage.Set(usage) } -func (m *BZZZMetrics) SetMemoryUsage(usage float64) { +func (m *CHORUSMetrics) SetMemoryUsage(usage float64) { m.memoryUsage.Set(usage) } -func (m *BZZZMetrics) SetDiskUsage(mountPoint string, usage float64) { +func (m *CHORUSMetrics) SetDiskUsage(mountPoint string, usage float64) { m.diskUsage.WithLabelValues(mountPoint).Set(usage) } -func (m *BZZZMetrics) SetGoroutines(count int) { +func (m *CHORUSMetrics) SetGoroutines(count int) { m.goroutines.Set(float64(count)) } // Error Metrics Methods -func (m *BZZZMetrics) IncrementErrors(component, errorType string) { +func (m *CHORUSMetrics) IncrementErrors(component, errorType string) { m.errors.WithLabelValues(component, errorType).Inc() } -func (m *BZZZMetrics) IncrementPanics() { +func (m *CHORUSMetrics) IncrementPanics() { m.panics.Inc() } // System Metrics Methods -func (m *BZZZMetrics) UpdateSystemInfo(nodeID, version, goVersion, cluster, environment string) { +func (m *CHORUSMetrics) UpdateSystemInfo(nodeID, version, goVersion, cluster, environment string) { m.systemInfo.WithLabelValues(nodeID, version, goVersion, cluster, environment).Set(1) } -func (m *BZZZMetrics) UpdateUptime() { +func (m *CHORUSMetrics) UpdateUptime() { m.uptime.Set(time.Since(m.startTime).Seconds()) } // CollectMetrics starts background metric collection -func (m *BZZZMetrics) CollectMetrics(config *MetricsConfig) { +func (m *CHORUSMetrics) CollectMetrics(config *MetricsConfig) { systemTicker := time.NewTicker(config.SystemMetricsInterval) resourceTicker := time.NewTicker(config.ResourceMetricsInterval) diff --git a/pkg/protocol/uri.go b/pkg/protocol/uri.go index fcf16be..5117c60 100644 --- a/pkg/protocol/uri.go +++ b/pkg/protocol/uri.go @@ -49,7 +49,7 @@ var ( pathPattern = regexp.MustCompile(`^/[a-zA-Z0-9\-_/\.]*$|^$`) // Full URI pattern for validation - bzzzURIPattern = regexp.MustCompile(`^CHORUS://([a-zA-Z0-9\-_*]|any):([a-zA-Z0-9\-_*]|any)@([a-zA-Z0-9\-_*]|any):([a-zA-Z0-9\-_*]|any)(/[a-zA-Z0-9\-_/\.]*)?(\?[^#]*)?(\#.*)?$`) + chorusURIPattern = regexp.MustCompile(`^CHORUS://([a-zA-Z0-9\-_*]|any):([a-zA-Z0-9\-_*]|any)@([a-zA-Z0-9\-_*]|any):([a-zA-Z0-9\-_*]|any)(/[a-zA-Z0-9\-_/\.]*)?(\?[^#]*)?(\#.*)?$`) ) // ParseBzzzURI parses a CHORUS:// URI string into a BzzzURI struct @@ -104,7 +104,7 @@ func ParseBzzzURI(uri string) (*BzzzURI, error) { task := parts[1] // Create BzzzURI instance - bzzzURI := &BzzzURI{ + chorusURI := &BzzzURI{ Agent: agent, Role: role, Project: project, @@ -116,11 +116,11 @@ func ParseBzzzURI(uri string) (*BzzzURI, error) { } // Validate components - if err := bzzzURI.Validate(); err != nil { + if err := chorusURI.Validate(); err != nil { return nil, fmt.Errorf("validation failed: %w", err) } - return bzzzURI, nil + return chorusURI, nil } // Validate validates all components of the BzzzURI @@ -298,7 +298,7 @@ func ValidateBzzzURIString(uri string) error { return fmt.Errorf("empty URI") } - if !bzzzURIPattern.MatchString(uri) { + if !chorusURIPattern.MatchString(uri) { return fmt.Errorf("invalid CHORUS:// URI format") } diff --git a/pkg/slurp/leader/config.go b/pkg/slurp/leader/config.go index db34379..8efda56 100644 --- a/pkg/slurp/leader/config.go +++ b/pkg/slurp/leader/config.go @@ -579,146 +579,146 @@ func (cfg *SLURPLeaderConfig) GetEffectiveConfig() *SLURPLeaderConfig { return &effective } -// ToBaseBZZZConfig converts SLURP leader config to base CHORUS config format -func (cfg *SLURPLeaderConfig) ToBaseBZZZConfig() *config.Config { +// ToBaseCHORUSConfig converts SLURP leader config to base CHORUS config format +func (cfg *SLURPLeaderConfig) ToBaseCHORUSConfig() *config.Config { // TODO: Convert to base CHORUS config structure // This would map SLURP-specific configuration to the existing // CHORUS configuration structure for compatibility - bzzzConfig := &config.Config{ + chorusConfig := &config.Config{ // Map core settings // Map agent settings // Map security settings // etc. } - return bzzzConfig + return chorusConfig } // overrideWithEnvironment applies environment variable overrides to configuration func overrideWithEnvironment(cfg *SLURPLeaderConfig) error { // Core configuration overrides - if val := os.Getenv("BZZZ_NODE_ID"); val != "" { + if val := os.Getenv("CHORUS_NODE_ID"); val != "" { cfg.Core.NodeID = val } - if val := os.Getenv("BZZZ_CLUSTER_ID"); val != "" { + if val := os.Getenv("CHORUS_CLUSTER_ID"); val != "" { cfg.Core.ClusterID = val } - if val := os.Getenv("BZZZ_DATA_DIRECTORY"); val != "" { + if val := os.Getenv("CHORUS_DATA_DIRECTORY"); val != "" { cfg.Core.DataDirectory = val } - if val := os.Getenv("BZZZ_LISTEN_ADDRESS"); val != "" { + if val := os.Getenv("CHORUS_LISTEN_ADDRESS"); val != "" { cfg.Core.ListenAddress = val } - if val := os.Getenv("BZZZ_ADVERTISE_ADDRESS"); val != "" { + if val := os.Getenv("CHORUS_ADVERTISE_ADDRESS"); val != "" { cfg.Core.AdvertiseAddress = val } - if val := os.Getenv("BZZZ_DEBUG_MODE"); val != "" { + if val := os.Getenv("CHORUS_DEBUG_MODE"); val != "" { if debug, err := strconv.ParseBool(val); err == nil { cfg.Core.DebugMode = debug } } - if val := os.Getenv("BZZZ_VERBOSE_LOGGING"); val != "" { + if val := os.Getenv("CHORUS_VERBOSE_LOGGING"); val != "" { if verbose, err := strconv.ParseBool(val); err == nil { cfg.Core.VerboseLogging = verbose } } // Capabilities override - if val := os.Getenv("BZZZ_CAPABILITIES"); val != "" { + if val := os.Getenv("CHORUS_CAPABILITIES"); val != "" { cfg.Core.Capabilities = strings.Split(val, ",") } - if val := os.Getenv("BZZZ_PROJECT_MANAGER_ENABLED"); val != "" { + if val := os.Getenv("CHORUS_PROJECT_MANAGER_ENABLED"); val != "" { if enabled, err := strconv.ParseBool(val); err == nil { cfg.Core.ProjectManagerEnabled = enabled } } - if val := os.Getenv("BZZZ_CONTEXT_CURATION_ENABLED"); val != "" { + if val := os.Getenv("CHORUS_CONTEXT_CURATION_ENABLED"); val != "" { if enabled, err := strconv.ParseBool(val); err == nil { cfg.Core.ContextCurationEnabled = enabled } } // Election configuration overrides - if val := os.Getenv("BZZZ_ELECTION_TIMEOUT"); val != "" { + if val := os.Getenv("CHORUS_ELECTION_TIMEOUT"); val != "" { if duration, err := time.ParseDuration(val); err == nil { cfg.Election.ElectionTimeout = duration } } - if val := os.Getenv("BZZZ_HEARTBEAT_INTERVAL"); val != "" { + if val := os.Getenv("CHORUS_HEARTBEAT_INTERVAL"); val != "" { if duration, err := time.ParseDuration(val); err == nil { cfg.Election.HeartbeatInterval = duration } } - if val := os.Getenv("BZZZ_HEARTBEAT_TIMEOUT"); val != "" { + if val := os.Getenv("CHORUS_HEARTBEAT_TIMEOUT"); val != "" { if duration, err := time.ParseDuration(val); err == nil { cfg.Election.HeartbeatTimeout = duration } } - if val := os.Getenv("BZZZ_MIN_QUORUM_SIZE"); val != "" { + if val := os.Getenv("CHORUS_MIN_QUORUM_SIZE"); val != "" { if size, err := strconv.Atoi(val); err == nil { cfg.Election.MinQuorumSize = size } } - if val := os.Getenv("BZZZ_REQUIRE_QUORUM"); val != "" { + if val := os.Getenv("CHORUS_REQUIRE_QUORUM"); val != "" { if require, err := strconv.ParseBool(val); err == nil { cfg.Election.RequireQuorum = require } } // Context management configuration overrides - if val := os.Getenv("BZZZ_MAX_CONCURRENT_GENERATION"); val != "" { + if val := os.Getenv("CHORUS_MAX_CONCURRENT_GENERATION"); val != "" { if max, err := strconv.Atoi(val); err == nil { cfg.ContextManagement.MaxConcurrentGeneration = max } } - if val := os.Getenv("BZZZ_GENERATION_TIMEOUT"); val != "" { + if val := os.Getenv("CHORUS_GENERATION_TIMEOUT"); val != "" { if duration, err := time.ParseDuration(val); err == nil { cfg.ContextManagement.GenerationTimeout = duration } } - if val := os.Getenv("BZZZ_CONTEXT_CACHE_SIZE"); val != "" { + if val := os.Getenv("CHORUS_CONTEXT_CACHE_SIZE"); val != "" { if size, err := strconv.Atoi(val); err == nil { cfg.ContextManagement.ContextCacheSize = size } } // Health monitoring overrides - if val := os.Getenv("BZZZ_HEALTH_CHECK_INTERVAL"); val != "" { + if val := os.Getenv("CHORUS_HEALTH_CHECK_INTERVAL"); val != "" { if duration, err := time.ParseDuration(val); err == nil { cfg.Health.HealthCheckInterval = duration } } - if val := os.Getenv("BZZZ_HEALTH_CHECK_TIMEOUT"); val != "" { + if val := os.Getenv("CHORUS_HEALTH_CHECK_TIMEOUT"); val != "" { if duration, err := time.ParseDuration(val); err == nil { cfg.Health.HealthCheckTimeout = duration } } // Performance overrides - if val := os.Getenv("BZZZ_WORKER_POOL_SIZE"); val != "" { + if val := os.Getenv("CHORUS_WORKER_POOL_SIZE"); val != "" { if size, err := strconv.Atoi(val); err == nil { cfg.Performance.WorkerPoolSize = size } } - if val := os.Getenv("BZZZ_QUEUE_BUFFER_SIZE"); val != "" { + if val := os.Getenv("CHORUS_QUEUE_BUFFER_SIZE"); val != "" { if size, err := strconv.Atoi(val); err == nil { cfg.Performance.QueueBufferSize = size } } // Observability overrides - if val := os.Getenv("BZZZ_METRICS_ENABLED"); val != "" { + if val := os.Getenv("CHORUS_METRICS_ENABLED"); val != "" { if enabled, err := strconv.ParseBool(val); err == nil { cfg.Observability.MetricsEnabled = enabled } } - if val := os.Getenv("BZZZ_METRICS_PORT"); val != "" { + if val := os.Getenv("CHORUS_METRICS_PORT"); val != "" { if port, err := strconv.Atoi(val); err == nil { cfg.Observability.MetricsPort = port } } - if val := os.Getenv("BZZZ_LOG_LEVEL"); val != "" { + if val := os.Getenv("CHORUS_LOG_LEVEL"); val != "" { cfg.Observability.LogLevel = val } diff --git a/pkg/slurp/leader/enhanced_manager.go b/pkg/slurp/leader/enhanced_manager.go index f297994..48984d3 100644 --- a/pkg/slurp/leader/enhanced_manager.go +++ b/pkg/slurp/leader/enhanced_manager.go @@ -21,7 +21,7 @@ type EnhancedLeaderManager struct { // Enhanced components healthMonitor *SLURPHealthMonitor - metricsCollector *metrics.BZZZMetrics + metricsCollector *metrics.CHORUSMetrics leadershipHistory *LeadershipHistory // Lifecycle management @@ -269,7 +269,7 @@ func NewEnhancedLeaderManager( intelligence intelligence.IntelligenceEngine, storage storage.ContextStore, resolver slurpContext.ContextResolver, - metricsCollector *metrics.BZZZMetrics, + metricsCollector *metrics.CHORUSMetrics, config *EnhancedManagerConfig, ) *EnhancedLeaderManager { if config == nil { diff --git a/pkg/slurp/leader/integration_example.go b/pkg/slurp/leader/integration_example.go index b28a1ea..0fee23b 100644 --- a/pkg/slurp/leader/integration_example.go +++ b/pkg/slurp/leader/integration_example.go @@ -283,7 +283,7 @@ func (sys *SLURPLeaderSystem) initializeElectionSystem(ctx context.Context) erro sys.logger.Debug("Initializing election system") // Convert to base CHORUS config - bzzzConfig := sys.config.ToBaseBZZZConfig() + chorusConfig := sys.config.ToBaseCHORUSConfig() // Create SLURP election configuration slurpElectionConfig := &election.SLURPElectionConfig{ @@ -311,7 +311,7 @@ func (sys *SLURPLeaderSystem) initializeElectionSystem(ctx context.Context) erro // Create SLURP election manager sys.slurpElection = election.NewSLURPElectionManager( ctx, - bzzzConfig, + chorusConfig, sys.host, sys.pubsub, sys.nodeID, diff --git a/pkg/ucxi/server.go b/pkg/ucxi/server.go index bea47ed..da804e2 100644 --- a/pkg/ucxi/server.go +++ b/pkg/ucxi/server.go @@ -839,7 +839,7 @@ func (s *Server) getCollaborationStatus() map[string]interface{} { }, "pubsub": map[string]interface{}{ "topics": map[string]interface{}{ - "bzzz_coordination": "CHORUS/coordination/v1", + "chorus_coordination": "CHORUS/coordination/v1", "hmmm_meta_discussion": "hmmm/meta-discussion/v1", "context_feedback": "CHORUS/context-feedback/v1", }, diff --git a/pubsub/pubsub.go b/pubsub/pubsub.go index e85986d..7a28473 100644 --- a/pubsub/pubsub.go +++ b/pubsub/pubsub.go @@ -21,12 +21,12 @@ type PubSub struct { cancel context.CancelFunc // Topic subscriptions - bzzzTopic *pubsub.Topic + chorusTopic *pubsub.Topic hmmmTopic *pubsub.Topic contextTopic *pubsub.Topic // Message subscriptions - bzzzSub *pubsub.Subscription + chorusSub *pubsub.Subscription hmmmSub *pubsub.Subscription contextSub *pubsub.Subscription @@ -37,7 +37,7 @@ type PubSub struct { dynamicSubsMux sync.RWMutex // Configuration - bzzzTopicName string + chorusTopicName string hmmmTopicName string contextTopicName string @@ -121,14 +121,14 @@ type Message struct { } // NewPubSub creates a new PubSub instance for Bzzz coordination and HMMM meta-discussion -func NewPubSub(ctx context.Context, h host.Host, bzzzTopic, hmmmTopic string) (*PubSub, error) { - return NewPubSubWithLogger(ctx, h, bzzzTopic, hmmmTopic, nil) +func NewPubSub(ctx context.Context, h host.Host, chorusTopic, hmmmTopic string) (*PubSub, error) { + return NewPubSubWithLogger(ctx, h, chorusTopic, hmmmTopic, nil) } // NewPubSubWithLogger creates a new PubSub instance with hypercore logging -func NewPubSubWithLogger(ctx context.Context, h host.Host, bzzzTopic, hmmmTopic string, logger HypercoreLogger) (*PubSub, error) { - if bzzzTopic == "" { - bzzzTopic = "CHORUS/coordination/v1" +func NewPubSubWithLogger(ctx context.Context, h host.Host, chorusTopic, hmmmTopic string, logger HypercoreLogger) (*PubSub, error) { + if chorusTopic == "" { + chorusTopic = "CHORUS/coordination/v1" } if hmmmTopic == "" { hmmmTopic = "hmmm/meta-discussion/v1" @@ -154,7 +154,7 @@ func NewPubSubWithLogger(ctx context.Context, h host.Host, bzzzTopic, hmmmTopic host: h, ctx: pubsubCtx, cancel: cancel, - bzzzTopicName: bzzzTopic, + chorusTopicName: chorusTopic, hmmmTopicName: hmmmTopic, contextTopicName: contextTopic, dynamicTopics: make(map[string]*pubsub.Topic), @@ -173,7 +173,7 @@ func NewPubSubWithLogger(ctx context.Context, h host.Host, bzzzTopic, hmmmTopic go p.handleHmmmMessages() go p.handleContextFeedbackMessages() - fmt.Printf("📡 PubSub initialized - Bzzz: %s, HMMM: %s, Context: %s\n", bzzzTopic, hmmmTopic, contextTopic) + fmt.Printf("📡 PubSub initialized - Bzzz: %s, HMMM: %s, Context: %s\n", chorusTopic, hmmmTopic, contextTopic) return p, nil } @@ -190,17 +190,17 @@ func (p *PubSub) SetContextFeedbackHandler(handler func(msg Message, from peer.I // joinStaticTopics joins the main Bzzz, HMMM, and Context Feedback topics func (p *PubSub) joinStaticTopics() error { // Join Bzzz coordination topic - bzzzTopic, err := p.ps.Join(p.bzzzTopicName) + chorusTopic, err := p.ps.Join(p.chorusTopicName) if err != nil { return fmt.Errorf("failed to join Bzzz topic: %w", err) } - p.bzzzTopic = bzzzTopic + p.chorusTopic = chorusTopic - bzzzSub, err := bzzzTopic.Subscribe() + chorusSub, err := chorusTopic.Subscribe() if err != nil { return fmt.Errorf("failed to subscribe to Bzzz topic: %w", err) } - p.bzzzSub = bzzzSub + p.chorusSub = chorusSub // Join HMMM meta-discussion topic hmmmTopic, err := p.ps.Join(p.hmmmTopicName) @@ -366,8 +366,8 @@ func (p *PubSub) PublishRaw(topicName string, payload []byte) error { // Static topics by name switch topicName { - case p.bzzzTopicName: - return p.bzzzTopic.Publish(p.ctx, payload) + case p.chorusTopicName: + return p.chorusTopic.Publish(p.ctx, payload) case p.hmmmTopicName: return p.hmmmTopic.Publish(p.ctx, payload) case p.contextTopicName: @@ -391,7 +391,7 @@ func (p *PubSub) PublishBzzzMessage(msgType MessageType, data map[string]interfa return fmt.Errorf("failed to marshal message: %w", err) } - return p.bzzzTopic.Publish(p.ctx, msgBytes) + return p.chorusTopic.Publish(p.ctx, msgBytes) } // PublishHmmmMessage publishes a message to the HMMM meta-discussion topic @@ -468,7 +468,7 @@ func (p *PubSub) PublishRoleBasedMessage(msgType MessageType, data map[string]in ProjectUpdate, DeliverableReady: topic = p.hmmmTopic // Use HMMM topic for role-based messages default: - topic = p.bzzzTopic // Default to Bzzz topic + topic = p.chorusTopic // Default to Bzzz topic } return topic.Publish(p.ctx, msgBytes) @@ -521,7 +521,7 @@ type MessageOptions struct { // handleBzzzMessages processes incoming Bzzz coordination messages func (p *PubSub) handleBzzzMessages() { for { - msg, err := p.bzzzSub.Next(p.ctx) + msg, err := p.chorusSub.Next(p.ctx) if err != nil { if p.ctx.Err() != nil { return // Context cancelled @@ -534,13 +534,13 @@ func (p *PubSub) handleBzzzMessages() { continue } - var bzzzMsg Message - if err := json.Unmarshal(msg.Data, &bzzzMsg); err != nil { + var chorusMsg Message + if err := json.Unmarshal(msg.Data, &chorusMsg); err != nil { fmt.Printf("❌ Failed to unmarshal Bzzz message: %v\n", err) continue } - p.processBzzzMessage(bzzzMsg, msg.ReceivedFrom) + p.processBzzzMessage(chorusMsg, msg.ReceivedFrom) } } @@ -768,8 +768,8 @@ func (p *PubSub) processContextFeedbackMessage(msg Message, from peer.ID) { func (p *PubSub) Close() error { p.cancel() - if p.bzzzSub != nil { - p.bzzzSub.Cancel() + if p.chorusSub != nil { + p.chorusSub.Cancel() } if p.hmmmSub != nil { p.hmmmSub.Cancel() @@ -778,8 +778,8 @@ func (p *PubSub) Close() error { p.contextSub.Cancel() } - if p.bzzzTopic != nil { - p.bzzzTopic.Close() + if p.chorusTopic != nil { + p.chorusTopic.Close() } if p.hmmmTopic != nil { p.hmmmTopic.Close() diff --git a/pubsub/pubsub_test.go b/pubsub/pubsub_test.go index 714e711..8ecc16c 100644 --- a/pubsub/pubsub_test.go +++ b/pubsub/pubsub_test.go @@ -7,7 +7,7 @@ import ( // Note: This is a light test around name routing; full pubsub requires network. func TestPublishRaw_NameRouting_NoSubscription(t *testing.T) { // Build a minimal PubSub with names set but no subscriptions. - p := &PubSub{ bzzzTopicName: "CHORUS/coordination/v1", hmmmTopicName: "hmmm/meta-discussion/v1", contextTopicName: "CHORUS/context-feedback/v1" } + p := &PubSub{ chorusTopicName: "CHORUS/coordination/v1", hmmmTopicName: "hmmm/meta-discussion/v1", contextTopicName: "CHORUS/context-feedback/v1" } if err := p.PublishRaw("nonexistent/topic", []byte("{}")); err == nil { t.Fatalf("expected error for unknown topic") }