diff --git a/docker/Dockerfile b/docker/Dockerfile index e361893..a105022 100644 --- a/docker/Dockerfile +++ b/docker/Dockerfile @@ -11,18 +11,18 @@ WORKDIR /build # Copy go mod files first (for better caching) COPY go.mod go.sum ./ -# Download dependencies -RUN go mod download +# Skip go mod download; we rely on vendored deps to avoid local replaces +RUN echo "Using vendored dependencies (skipping go mod download)" # Copy source code COPY . . -# Build the CHORUS binary with mod mode +# Build the CHORUS agent binary with vendored deps RUN CGO_ENABLED=0 GOOS=linux go build \ - -mod=mod \ + -mod=vendor \ -ldflags='-w -s -extldflags "-static"' \ - -o chorus \ - ./cmd/chorus + -o chorus-agent \ + ./cmd/agent # Final minimal runtime image FROM alpine:3.18 @@ -42,8 +42,8 @@ RUN mkdir -p /app/data && \ chown -R chorus:chorus /app # Copy binary from builder stage -COPY --from=builder /build/chorus /app/chorus -RUN chmod +x /app/chorus +COPY --from=builder /build/chorus-agent /app/chorus-agent +RUN chmod +x /app/chorus-agent # Switch to non-root user USER chorus @@ -64,5 +64,5 @@ ENV LOG_LEVEL=info \ CHORUS_HEALTH_PORT=8081 \ CHORUS_P2P_PORT=9000 -# Start CHORUS -ENTRYPOINT ["/app/chorus"] \ No newline at end of file +# Start CHORUS Agent +ENTRYPOINT ["/app/chorus-agent"] diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 150ca5c..da1a998 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -29,8 +29,8 @@ services: - CHORUS_MAX_CONCURRENT_DHT=16 # Limit concurrent DHT queries # Election stability windows (Medium-risk fix 2.1) - - CHORUS_ELECTION_MIN_TERM=30s # Minimum time between elections to prevent churn - - CHORUS_LEADER_MIN_TERM=45s # Minimum time before challenging healthy leader + - CHORUS_ELECTION_MIN_TERM=120s # Minimum time between elections to prevent churn + - CHORUS_LEADER_MIN_TERM=240s # Minimum time before challenging healthy leader # Assignment system for runtime configuration (Medium-risk fix 2.2) - ASSIGN_URL=${ASSIGN_URL:-} # Optional: WHOOSH assignment endpoint @@ -61,7 +61,7 @@ services: - CHORUS_LIGHTRAG_ENABLED=${CHORUS_LIGHTRAG_ENABLED:-false} - CHORUS_LIGHTRAG_BASE_URL=${CHORUS_LIGHTRAG_BASE_URL:-http://lightrag:9621} - CHORUS_LIGHTRAG_TIMEOUT=${CHORUS_LIGHTRAG_TIMEOUT:-30s} - - CHORUS_LIGHTRAG_API_KEY=${CHORUS_LIGHTRAG_API_KEY:-} + - CHORUS_LIGHTRAG_API_KEY=${CHORUS_LIGHTRAG_API_KEY:-your-secure-api-key-here} - CHORUS_LIGHTRAG_DEFAULT_MODE=${CHORUS_LIGHTRAG_DEFAULT_MODE:-hybrid} # Logging configuration @@ -102,7 +102,7 @@ services: # Container resource limits deploy: mode: replicated - replicas: ${CHORUS_REPLICAS:-9} + replicas: ${CHORUS_REPLICAS:-20} update_config: parallelism: 1 delay: 10s @@ -173,6 +173,8 @@ services: WHOOSH_SERVER_READ_TIMEOUT: "30s" WHOOSH_SERVER_WRITE_TIMEOUT: "30s" WHOOSH_SERVER_SHUTDOWN_TIMEOUT: "30s" + # UI static directory (served at site root by WHOOSH) + WHOOSH_UI_DIR: "/app/ui" # GITEA configuration WHOOSH_GITEA_BASE_URL: https://gitea.chorus.services @@ -217,7 +219,8 @@ services: - jwt_secret - service_tokens - redis_password - # volumes: + volumes: + - whoosh_ui:/app/ui:ro # - /var/run/docker.sock:/var/run/docker.sock # Disabled for agent assignment architecture deploy: replicas: 2 @@ -254,11 +257,11 @@ services: - traefik.enable=true - traefik.docker.network=tengig - traefik.http.routers.whoosh.rule=Host(`whoosh.chorus.services`) + - traefik.http.routers.whoosh.entrypoints=web,web-secured - traefik.http.routers.whoosh.tls=true - traefik.http.routers.whoosh.tls.certresolver=letsencryptresolver - - traefik.http.routers.photoprism.entrypoints=web,web-secured - traefik.http.services.whoosh.loadbalancer.server.port=8080 - - traefik.http.services.photoprism.loadbalancer.passhostheader=true + - traefik.http.services.whoosh.loadbalancer.passhostheader=true - traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash networks: - tengig @@ -414,7 +417,7 @@ services: # REQ: BACKBEAT-REQ-001 - Single BeatFrame publisher per cluster # REQ: BACKBEAT-OPS-001 - One replica prefers leadership backbeat-pulse: - image: anthonyrawlins/backbeat-pulse:v1.0.5 + image: anthonyrawlins/backbeat-pulse:v1.0.6 command: > ./pulse -cluster=chorus-production @@ -581,6 +584,14 @@ services: max-file: "3" tag: "nats/{{.Name}}/{{.ID}}" + watchtower: + image: containrrr/watchtower + volumes: + - /var/run/docker.sock:/var/run/docker.sock + command: --interval 300 --cleanup --revive-stopped --include-stopped + restart: always + + # KACHING services are deployed separately in their own stack # License validation will access https://kaching.chorus.services/api @@ -618,6 +629,12 @@ volumes: type: none o: bind device: /rust/containers/WHOOSH/redis + whoosh_ui: + driver: local + driver_opts: + type: none + o: bind + device: /rust/containers/WHOOSH/ui # Networks for CHORUS communication @@ -652,7 +669,7 @@ secrets: name: whoosh_webhook_token jwt_secret: external: true - name: whoosh_jwt_secret + name: whoosh_jwt_secret_v4 service_tokens: external: true name: whoosh_service_tokens diff --git a/internal/council/manager.go b/internal/council/manager.go new file mode 100644 index 0000000..3e60fe5 --- /dev/null +++ b/internal/council/manager.go @@ -0,0 +1,446 @@ +package council + +import ( + "bytes" + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "hash/fnv" + "math/rand" + "net/http" + "strings" + "sync" + "time" + + "chorus/internal/persona" +) + +// CouncilOpportunity represents a council formation opportunity from WHOOSH. +type CouncilOpportunity struct { + CouncilID string `json:"council_id"` + ProjectName string `json:"project_name"` + Repository string `json:"repository"` + ProjectBrief string `json:"project_brief"` + CoreRoles []CouncilRole `json:"core_roles"` + OptionalRoles []CouncilRole `json:"optional_roles"` + UCXLAddress string `json:"ucxl_address"` + FormationDeadline time.Time `json:"formation_deadline"` + CreatedAt time.Time `json:"created_at"` + Metadata map[string]interface{} `json:"metadata"` +} + +// CouncilRole represents a single role available within a council. +type CouncilRole struct { + RoleName string `json:"role_name"` + AgentName string `json:"agent_name"` + Required bool `json:"required"` + RequiredSkills []string `json:"required_skills"` + Description string `json:"description"` +} + +// RoleProfile mirrors WHOOSH role profile metadata included in claim responses. +type RoleProfile struct { + RoleName string `json:"role_name"` + DisplayName string `json:"display_name"` + PromptKey string `json:"prompt_key"` + PromptPack string `json:"prompt_pack"` + Capabilities []string `json:"capabilities"` + BriefRoutingHint string `json:"brief_routing_hint"` + DefaultBriefOwner bool `json:"default_brief_owner"` +} + +// CouncilBrief carries the high-level brief metadata for an activated council. +type CouncilBrief struct { + CouncilID string `json:"council_id"` + RoleName string `json:"role_name"` + ProjectName string `json:"project_name"` + Repository string `json:"repository"` + Summary string `json:"summary"` + BriefURL string `json:"brief_url"` + IssueID *int64 `json:"issue_id"` + UCXLAddress string `json:"ucxl_address"` + ExpectedArtifacts []string `json:"expected_artifacts"` + HMMMTopic string `json:"hmmm_topic"` +} + +// RoleAssignment keeps track of the agent's current council engagement. +type RoleAssignment struct { + CouncilID string + RoleName string + UCXLAddress string + AssignedAt time.Time + Profile RoleProfile + Brief *CouncilBrief + Persona *persona.Persona + PersonaHash string +} + +var ErrRoleConflict = errors.New("council role already claimed") + +const defaultModelProvider = "ollama" + +// Manager handles council opportunity evaluation, persona preparation, and brief handoff. +type Manager struct { + agentID string + agentName string + endpoint string + p2pAddr string + capabilities []string + + httpClient *http.Client + personaLoader *persona.Loader + + mu sync.Mutex + currentAssignment *RoleAssignment +} + +// NewManager creates a new council manager. +func NewManager(agentID, agentName, endpoint, p2pAddr string, capabilities []string) *Manager { + loader, err := persona.NewLoader() + if err != nil { + fmt.Printf("⚠️ Persona loader initialisation failed: %v\n", err) + } + + return &Manager{ + agentID: agentID, + agentName: agentName, + endpoint: endpoint, + p2pAddr: p2pAddr, + capabilities: capabilities, + httpClient: &http.Client{Timeout: 10 * time.Second}, + personaLoader: loader, + } +} + +// AgentID returns the agent's identifier. +func (m *Manager) AgentID() string { + return m.agentID +} + +// EvaluateOpportunity analyzes a council opportunity and decides whether to claim a role. +func (m *Manager) EvaluateOpportunity(opportunity *CouncilOpportunity, whooshEndpoint string) error { + fmt.Printf("\n🤔 Evaluating council opportunity for: %s\n", opportunity.ProjectName) + + if current := m.currentAssignmentSnapshot(); current != nil { + fmt.Printf(" ℹ️ Agent already assigned to council %s as %s; skipping new claims\n", current.CouncilID, current.RoleName) + return nil + } + + const maxAttempts = 10 + const retryDelay = 3 * time.Second + + var attemptedAtLeastOne bool + + for attempt := 1; attempt <= maxAttempts; attempt++ { + assignment, attemptedCore, err := m.tryClaimRoles(opportunity.CoreRoles, opportunity, whooshEndpoint, "CORE") + attemptedAtLeastOne = attemptedAtLeastOne || attemptedCore + if assignment != nil { + m.setCurrentAssignment(assignment) + return nil + } + if err != nil && !errors.Is(err, ErrRoleConflict) { + return err + } + + assignment, attemptedOptional, err := m.tryClaimRoles(opportunity.OptionalRoles, opportunity, whooshEndpoint, "OPTIONAL") + attemptedAtLeastOne = attemptedAtLeastOne || attemptedOptional + if assignment != nil { + m.setCurrentAssignment(assignment) + return nil + } + if err != nil && !errors.Is(err, ErrRoleConflict) { + return err + } + + if !attemptedAtLeastOne { + fmt.Printf(" ✗ No suitable roles found for this agent\n\n") + return nil + } + + fmt.Printf(" ↻ Attempt %d did not secure a council role; retrying in %s...\n", attempt, retryDelay) + time.Sleep(retryDelay) + } + + return fmt.Errorf("exhausted council role claim attempts for council %s", opportunity.CouncilID) +} + +func (m *Manager) tryClaimRoles(roles []CouncilRole, opportunity *CouncilOpportunity, whooshEndpoint string, roleType string) (*RoleAssignment, bool, error) { + var attempted bool + + // Shuffle roles deterministically per agent+council to reduce herd on the first role + shuffled := append([]CouncilRole(nil), roles...) + if len(shuffled) > 1 { + h := fnv.New64a() + _, _ = h.Write([]byte(m.agentID)) + _, _ = h.Write([]byte(opportunity.CouncilID)) + seed := int64(h.Sum64()) + r := rand.New(rand.NewSource(seed)) + r.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] }) + } + + for _, role := range shuffled { + if !m.shouldClaimRole(role, opportunity) { + continue + } + + attempted = true + fmt.Printf(" ✓ Attempting to claim %s role: %s (%s)\n", roleType, role.AgentName, role.RoleName) + + assignment, err := m.claimRole(opportunity, role, whooshEndpoint) + if assignment != nil { + return assignment, attempted, nil + } + + if errors.Is(err, ErrRoleConflict) { + fmt.Printf(" ⚠️ Role %s already claimed by another agent, trying next role...\n", role.RoleName) + continue + } + + if err != nil { + return nil, attempted, err + } + } + + return nil, attempted, nil +} + +func (m *Manager) shouldClaimRole(role CouncilRole, _ *CouncilOpportunity) bool { + if m.hasActiveAssignment() { + return false + } + // TODO: implement capability-based selection. For now, opportunistically claim any available role. + return true +} + +func (m *Manager) claimRole(opportunity *CouncilOpportunity, role CouncilRole, whooshEndpoint string) (*RoleAssignment, error) { + claimURL := fmt.Sprintf("%s/api/v1/councils/%s/claims", strings.TrimRight(whooshEndpoint, "/"), opportunity.CouncilID) + + claim := map[string]interface{}{ + "agent_id": m.agentID, + "agent_name": m.agentName, + "role_name": role.RoleName, + "capabilities": m.capabilities, + "confidence": 0.75, // TODO: calculate based on capability match quality. + "reasoning": fmt.Sprintf("Agent has capabilities matching role: %s", role.RoleName), + "endpoint": m.endpoint, + "p2p_addr": m.p2pAddr, + } + + payload, err := json.Marshal(claim) + if err != nil { + return nil, fmt.Errorf("failed to marshal claim: %w", err) + } + + req, err := http.NewRequest(http.MethodPost, claimURL, bytes.NewBuffer(payload)) + if err != nil { + return nil, fmt.Errorf("failed to create claim request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := m.httpClient.Do(req) + if err != nil { + return nil, fmt.Errorf("failed to send claim: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated { + var errorResp map[string]interface{} + _ = json.NewDecoder(resp.Body).Decode(&errorResp) + + if resp.StatusCode == http.StatusConflict { + reason := "role already claimed" + if msg, ok := errorResp["error"].(string); ok && msg != "" { + reason = msg + } + return nil, fmt.Errorf("%w: %s", ErrRoleConflict, reason) + } + + return nil, fmt.Errorf("claim rejected (status %d): %v", resp.StatusCode, errorResp) + } + + var claimResp roleClaimResponse + if err := json.NewDecoder(resp.Body).Decode(&claimResp); err != nil { + return nil, fmt.Errorf("failed to decode claim response: %w", err) + } + + assignment := &RoleAssignment{ + CouncilID: opportunity.CouncilID, + RoleName: role.RoleName, + UCXLAddress: claimResp.UCXLAddress, + Profile: claimResp.RoleProfile, + } + + if t, err := time.Parse(time.RFC3339, claimResp.AssignedAt); err == nil { + assignment.AssignedAt = t + } + + if claimResp.CouncilBrief != nil { + assignment.Brief = claimResp.CouncilBrief + } + + fmt.Printf("\n✅ ROLE CLAIM ACCEPTED!\n") + fmt.Printf(" Council ID: %s\n", opportunity.CouncilID) + fmt.Printf(" Role: %s (%s)\n", role.AgentName, role.RoleName) + fmt.Printf(" UCXL: %s\n", assignment.UCXLAddress) + fmt.Printf(" Assigned At: %s\n", claimResp.AssignedAt) + + if err := m.preparePersonaAndAck(opportunity.CouncilID, role.RoleName, &assignment.Profile, claimResp.CouncilBrief, whooshEndpoint, assignment); err != nil { + fmt.Printf(" ⚠️ Persona preparation encountered an issue: %v\n", err) + } + + fmt.Printf("\n") + return assignment, nil +} + +func (m *Manager) preparePersonaAndAck(councilID, roleName string, profile *RoleProfile, brief *CouncilBrief, whooshEndpoint string, assignment *RoleAssignment) error { + if m.personaLoader == nil { + return m.sendPersonaAck(councilID, roleName, whooshEndpoint, nil, "", "failed", []string{"persona loader unavailable"}) + } + + promptKey := profile.PromptKey + if promptKey == "" { + promptKey = roleName + } + + personaCapabilities := profile.Capabilities + personaCapabilities = append([]string{}, personaCapabilities...) + + personaEntry, err := m.personaLoader.Compose(promptKey, profile.DisplayName, "", personaCapabilities) + if err != nil { + return m.sendPersonaAck(councilID, roleName, whooshEndpoint, nil, "", "failed", []string{err.Error()}) + } + + hash := sha256.Sum256([]byte(personaEntry.SystemPrompt)) + personaHash := hex.EncodeToString(hash[:]) + + assignment.Persona = personaEntry + assignment.PersonaHash = personaHash + + if err := m.sendPersonaAck(councilID, roleName, whooshEndpoint, personaEntry, personaHash, "loaded", nil); err != nil { + return err + } + + return nil +} + +func (m *Manager) sendPersonaAck(councilID, roleName, whooshEndpoint string, personaEntry *persona.Persona, personaHash string, status string, errs []string) error { + ackURL := fmt.Sprintf("%s/api/v1/councils/%s/roles/%s/personas", strings.TrimRight(whooshEndpoint, "/"), councilID, roleName) + + payload := map[string]interface{}{ + "agent_id": m.agentID, + "status": status, + "model_provider": defaultModelProvider, + "capabilities": m.capabilities, + "metadata": map[string]interface{}{ + "endpoint": m.endpoint, + "p2p_addr": m.p2pAddr, + "agent_name": m.agentName, + }, + } + + if personaEntry != nil { + payload["system_prompt_hash"] = personaHash + payload["model_name"] = personaEntry.Model + if len(personaEntry.Capabilities) > 0 { + payload["capabilities"] = personaEntry.Capabilities + } + } + + if len(errs) > 0 { + payload["errors"] = errs + } + + body, err := json.Marshal(payload) + if err != nil { + return fmt.Errorf("marshal persona ack: %w", err) + } + + req, err := http.NewRequest(http.MethodPost, ackURL, bytes.NewBuffer(body)) + if err != nil { + return fmt.Errorf("create persona ack request: %w", err) + } + req.Header.Set("Content-Type", "application/json") + + resp, err := m.httpClient.Do(req) + if err != nil { + return fmt.Errorf("send persona ack: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + return fmt.Errorf("persona ack rejected with status %d", resp.StatusCode) + } + + fmt.Printf(" 📫 Persona status '%s' acknowledged by WHOOSH\n", status) + return nil +} + +// HandleCouncilBrief records the design brief assigned to this agent once WHOOSH dispatches it. +func (m *Manager) HandleCouncilBrief(councilID, roleName string, brief *CouncilBrief) { + if brief == nil { + return + } + + m.mu.Lock() + defer m.mu.Unlock() + + if m.currentAssignment == nil { + fmt.Printf("⚠️ Received council brief for %s (%s) but agent has no active assignment\n", councilID, roleName) + return + } + + if m.currentAssignment.CouncilID != councilID || !strings.EqualFold(m.currentAssignment.RoleName, roleName) { + fmt.Printf("⚠️ Received council brief for %s (%s) but agent is assigned to %s (%s)\n", councilID, roleName, m.currentAssignment.CouncilID, m.currentAssignment.RoleName) + return + } + + brief.CouncilID = councilID + brief.RoleName = roleName + m.currentAssignment.Brief = brief + + fmt.Printf("📦 Design brief received for council %s (%s)\n", councilID, roleName) + if brief.BriefURL != "" { + fmt.Printf(" Brief URL: %s\n", brief.BriefURL) + } + if brief.Summary != "" { + fmt.Printf(" Summary: %s\n", brief.Summary) + } + if len(brief.ExpectedArtifacts) > 0 { + fmt.Printf(" Expected Artifacts: %v\n", brief.ExpectedArtifacts) + } + if brief.HMMMTopic != "" { + fmt.Printf(" HMMM Topic: %s\n", brief.HMMMTopic) + } +} + +func (m *Manager) hasActiveAssignment() bool { + m.mu.Lock() + defer m.mu.Unlock() + return m.currentAssignment != nil +} + +func (m *Manager) setCurrentAssignment(assignment *RoleAssignment) { + m.mu.Lock() + defer m.mu.Unlock() + m.currentAssignment = assignment +} + +func (m *Manager) currentAssignmentSnapshot() *RoleAssignment { + m.mu.Lock() + defer m.mu.Unlock() + return m.currentAssignment +} + +// roleClaimResponse mirrors WHOOSH role claim response payload. +type roleClaimResponse struct { + Status string `json:"status"` + CouncilID string `json:"council_id"` + RoleName string `json:"role_name"` + UCXLAddress string `json:"ucxl_address"` + AssignedAt string `json:"assigned_at"` + RoleProfile RoleProfile `json:"role_profile"` + CouncilBrief *CouncilBrief `json:"council_brief"` + PersonaStatus string `json:"persona_status"` +}