3 Commits

Author SHA1 Message Date
anthonyrawlins
f7130b327c feat: Implement council brief processing loop for task execution
Add processBriefs() polling loop that checks for assigned council briefs
and executes them using the ExecutionEngine infrastructure.

Changes:
- Add GetCurrentAssignment() public method to council.Manager
- Make HTTPServer.CouncilManager public for brief access
- Add processBriefs() 15-second polling loop in agent_support.go
- Add executeBrief() to initialize and run ExecutionEngine
- Add buildExecutionRequest() to convert briefs to execution requests
- Add uploadResults() to send completed work to WHOOSH
- Wire processBriefs() into StartAgentMode() as background goroutine

This addresses the root cause of task execution not happening: briefs
were being stored but never polled or executed. The execution
infrastructure (ExecutionEngine, AI providers, prompt system) was
complete but not connected to the council workflow.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-11 12:15:49 +11:00
anthonyrawlins
7381137db5 feat(chorus): run chorus-agent (replace deprecated wrapper); deterministic council role-claim shuffle; compose: WHOOSH UI env + Traefik label fixes + rotated JWT secret 2025-10-08 23:52:06 +11:00
anthonyrawlins
9f480986fa Deprecate Alpine-based Dockerfile to prevent glibc compatibility issues
Changes:
- Renamed Dockerfile.simple → Dockerfile.simple.DEPRECATED
- Added prominent warning about Alpine/musl libc incompatibility
- Updated Makefile docker-agent target to use Dockerfile.ubuntu
- Added production deployment notes in Makefile
- Updated docker-compose.yml with LightRAG environment variables

Reason:
The chorus-agent binary built with 'make build-agent' is linked against
glibc and cannot run on Alpine's musl libc. This causes the runtime error:
"exec /app/chorus-agent: no such file or directory"

Production deployments MUST use Dockerfile.ubuntu for glibc compatibility.

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-01 08:49:37 +10:00
7 changed files with 1009 additions and 29 deletions

View File

@@ -1,3 +1,19 @@
# ⚠️ DEPRECATED: DO NOT USE THIS DOCKERFILE ⚠️
#
# This Alpine-based Dockerfile is INCOMPATIBLE with the chorus-agent binary
# built by 'make build-agent'. The binary is compiled with glibc dependencies
# and will NOT run on Alpine's musl libc.
#
# ERROR when used: "exec /app/chorus-agent: no such file or directory"
#
# ✅ USE Dockerfile.ubuntu INSTEAD
#
# This file is kept for reference only and should not be used for builds.
# Last failed: 2025-10-01
# Reason: Alpine musl libc incompatibility with glibc-linked binary
#
# -------------------------------------------------------------------
# CHORUS - Simple Docker image using pre-built binary
FROM alpine:3.18

View File

@@ -90,10 +90,13 @@ run-hap: build-hap
./$(BUILD_DIR)/$(BINARY_NAME_HAP)
# Docker builds
# NOTE: Always use Dockerfile.ubuntu for production builds!
# Dockerfile.simple.DEPRECATED uses Alpine which is incompatible with glibc-linked binaries
.PHONY: docker-agent
docker-agent:
@echo "🐳 Building Docker image for CHORUS agent..."
docker build -f docker/Dockerfile.agent -t chorus-agent:$(VERSION) .
docker build -f Dockerfile.ubuntu -t chorus-agent:$(VERSION) .
@echo "⚠️ IMPORTANT: Production images MUST use Dockerfile.ubuntu (glibc compatibility)"
.PHONY: docker-hap
docker-hap:

View File

@@ -4,10 +4,15 @@ import (
"encoding/json"
"fmt"
"net/http"
"os"
"strconv"
"strings"
"time"
"chorus/internal/council"
"chorus/internal/logging"
"chorus/p2p"
"chorus/pkg/config"
"chorus/pubsub"
"github.com/gorilla/mux"
@@ -15,19 +20,96 @@ import (
// HTTPServer provides HTTP API endpoints for CHORUS
type HTTPServer struct {
port int
hypercoreLog *logging.HypercoreLog
pubsub *pubsub.PubSub
server *http.Server
port int
hypercoreLog *logging.HypercoreLog
pubsub *pubsub.PubSub
server *http.Server
CouncilManager *council.Manager // Exported for brief processing
whooshEndpoint string
}
// NewHTTPServer creates a new HTTP server for CHORUS API
func NewHTTPServer(port int, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer {
return &HTTPServer{
port: port,
hypercoreLog: hlog,
pubsub: ps,
func NewHTTPServer(cfg *config.Config, node *p2p.Node, hlog *logging.HypercoreLog, ps *pubsub.PubSub) *HTTPServer {
agentID := cfg.Agent.ID
agentName := deriveAgentName(cfg)
endpoint := deriveAgentEndpoint(cfg)
p2pAddr := deriveAgentP2PAddress(cfg, node)
capabilities := cfg.Agent.Capabilities
if len(capabilities) == 0 {
capabilities = []string{"general_development", "task_coordination"}
}
councilMgr := council.NewManager(agentID, agentName, endpoint, p2pAddr, capabilities)
whooshEndpoint := overrideWhooshEndpoint(cfg)
return &HTTPServer{
port: cfg.Network.APIPort,
hypercoreLog: hlog,
pubsub: ps,
CouncilManager: councilMgr,
whooshEndpoint: strings.TrimRight(whooshEndpoint, "/"),
}
}
func deriveAgentName(cfg *config.Config) string {
if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_NAME")); v != "" {
return v
}
if cfg.Agent.Specialization != "" {
return cfg.Agent.Specialization
}
return cfg.Agent.ID
}
func deriveAgentEndpoint(cfg *config.Config) string {
if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_ENDPOINT")); v != "" {
return strings.TrimRight(v, "/")
}
host := strings.TrimSpace(os.Getenv("CHORUS_AGENT_SERVICE_HOST"))
if host == "" {
host = "chorus"
}
scheme := strings.TrimSpace(os.Getenv("CHORUS_AGENT_ENDPOINT_SCHEME"))
if scheme == "" {
scheme = "http"
}
return fmt.Sprintf("%s://%s:%d", scheme, host, cfg.Network.APIPort)
}
func deriveAgentP2PAddress(cfg *config.Config, node *p2p.Node) string {
if v := strings.TrimSpace(os.Getenv("CHORUS_AGENT_P2P_ENDPOINT")); v != "" {
return v
}
if node != nil {
addrs := node.Addresses()
if len(addrs) > 0 {
return fmt.Sprintf("%s/p2p/%s", addrs[0], node.ID())
}
}
host := strings.TrimSpace(os.Getenv("CHORUS_AGENT_SERVICE_HOST"))
if host == "" {
host = "chorus"
}
return fmt.Sprintf("%s:%d", host, cfg.Network.P2PPort)
}
func overrideWhooshEndpoint(cfg *config.Config) string {
if v := strings.TrimSpace(os.Getenv("CHORUS_WHOOSH_ENDPOINT")); v != "" {
return strings.TrimRight(v, "/")
}
candidate := cfg.WHOOSHAPI.BaseURL
if candidate == "" {
candidate = cfg.WHOOSHAPI.URL
}
if candidate == "" {
return "http://whoosh:8080"
}
trimmed := strings.TrimRight(candidate, "/")
if strings.Contains(trimmed, "localhost") || strings.Contains(trimmed, "127.0.0.1") {
return "http://whoosh:8080"
}
return trimmed
}
// Start starts the HTTP server
@@ -65,6 +147,12 @@ func (h *HTTPServer) Start() error {
// Status endpoint
api.HandleFunc("/status", h.handleStatus).Methods("GET")
// Council opportunity endpoints (v1)
v1 := api.PathPrefix("/v1").Subrouter()
v1.HandleFunc("/opportunities/council", h.handleCouncilOpportunity).Methods("POST")
v1.HandleFunc("/councils/status", h.handleCouncilStatusUpdate).Methods("POST")
v1.HandleFunc("/councils/{councilID}/roles/{roleName}/brief", h.handleCouncilBrief).Methods("POST")
h.server = &http.Server{
Addr: fmt.Sprintf(":%d", h.port),
Handler: router,
@@ -242,3 +330,209 @@ func (h *HTTPServer) handleStatus(w http.ResponseWriter, r *http.Request) {
json.NewEncoder(w).Encode(status)
}
// handleCouncilOpportunity receives council formation opportunities from WHOOSH
func (h *HTTPServer) handleCouncilOpportunity(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
var opportunity council.CouncilOpportunity
if err := json.NewDecoder(r.Body).Decode(&opportunity); err != nil {
http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest)
return
}
// Log the received opportunity to hypercore
logData := map[string]interface{}{
"event": "council_opportunity_received",
"council_id": opportunity.CouncilID,
"project_name": opportunity.ProjectName,
"repository": opportunity.Repository,
"core_roles": len(opportunity.CoreRoles),
"optional_roles": len(opportunity.OptionalRoles),
"ucxl_address": opportunity.UCXLAddress,
"message": fmt.Sprintf("📡 Received council opportunity for project: %s", opportunity.ProjectName),
}
if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil {
fmt.Printf("Failed to log council opportunity: %v\n", err)
}
// Log to console for immediate visibility
fmt.Printf("\n📡 COUNCIL OPPORTUNITY RECEIVED\n")
fmt.Printf(" Council ID: %s\n", opportunity.CouncilID)
fmt.Printf(" Project: %s\n", opportunity.ProjectName)
fmt.Printf(" Repository: %s\n", opportunity.Repository)
fmt.Printf(" Core Roles: %d\n", len(opportunity.CoreRoles))
fmt.Printf(" Optional Roles: %d\n", len(opportunity.OptionalRoles))
fmt.Printf(" UCXL: %s\n", opportunity.UCXLAddress)
fmt.Printf("\n Available Roles:\n")
for _, role := range opportunity.CoreRoles {
fmt.Printf(" - %s (%s) [CORE]\n", role.AgentName, role.RoleName)
}
for _, role := range opportunity.OptionalRoles {
fmt.Printf(" - %s (%s) [OPTIONAL]\n", role.AgentName, role.RoleName)
}
fmt.Printf("\n")
// Evaluate the opportunity and claim a role if suitable
go func() {
if err := h.CouncilManager.EvaluateOpportunity(&opportunity, h.whooshEndpoint); err != nil {
fmt.Printf("Failed to evaluate/claim council role: %v\n", err)
}
}()
response := map[string]interface{}{
"status": "received",
"council_id": opportunity.CouncilID,
"message": "Council opportunity received and being evaluated",
"timestamp": time.Now().Unix(),
"agent_id": h.CouncilManager.AgentID(),
}
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(response)
}
// handleCouncilStatusUpdate receives council staffing updates from WHOOSH
func (h *HTTPServer) handleCouncilStatusUpdate(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
type roleCountsPayload struct {
Total int `json:"total"`
Claimed int `json:"claimed"`
}
type councilStatusPayload struct {
CouncilID string `json:"council_id"`
ProjectName string `json:"project_name"`
Status string `json:"status"`
Message string `json:"message"`
Timestamp time.Time `json:"timestamp"`
CoreRoles roleCountsPayload `json:"core_roles"`
Optional roleCountsPayload `json:"optional_roles"`
}
var payload councilStatusPayload
if err := json.NewDecoder(r.Body).Decode(&payload); err != nil {
http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest)
return
}
if payload.CouncilID == "" {
http.Error(w, "council_id is required", http.StatusBadRequest)
return
}
if payload.Status == "" {
payload.Status = "unknown"
}
if payload.Timestamp.IsZero() {
payload.Timestamp = time.Now()
}
if payload.Message == "" {
payload.Message = fmt.Sprintf("Council status update: %s (core %d/%d, optional %d/%d)",
payload.Status,
payload.CoreRoles.Claimed, payload.CoreRoles.Total,
payload.Optional.Claimed, payload.Optional.Total,
)
}
logData := map[string]interface{}{
"event": "council_status_update",
"council_id": payload.CouncilID,
"project_name": payload.ProjectName,
"status": payload.Status,
"message": payload.Message,
"timestamp": payload.Timestamp.Format(time.RFC3339),
"core_roles_total": payload.CoreRoles.Total,
"core_roles_claimed": payload.CoreRoles.Claimed,
"optional_roles_total": payload.Optional.Total,
"optional_roles_claimed": payload.Optional.Claimed,
}
if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil {
fmt.Printf("Failed to log council status update: %v\n", err)
}
fmt.Printf("\n🏁 COUNCIL STATUS UPDATE\n")
fmt.Printf(" Council ID: %s\n", payload.CouncilID)
if payload.ProjectName != "" {
fmt.Printf(" Project: %s\n", payload.ProjectName)
}
fmt.Printf(" Status: %s\n", payload.Status)
fmt.Printf(" Core Roles: %d/%d claimed\n", payload.CoreRoles.Claimed, payload.CoreRoles.Total)
fmt.Printf(" Optional Roles: %d/%d claimed\n", payload.Optional.Claimed, payload.Optional.Total)
fmt.Printf(" Message: %s\n\n", payload.Message)
response := map[string]interface{}{
"status": "received",
"council_id": payload.CouncilID,
"timestamp": payload.Timestamp.Unix(),
}
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(response)
}
func (h *HTTPServer) handleCouncilBrief(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
vars := mux.Vars(r)
councilID := vars["councilID"]
roleName := vars["roleName"]
if councilID == "" || roleName == "" {
http.Error(w, "councilID and roleName are required", http.StatusBadRequest)
return
}
var brief council.CouncilBrief
if err := json.NewDecoder(r.Body).Decode(&brief); err != nil {
http.Error(w, fmt.Sprintf("Invalid JSON payload: %v", err), http.StatusBadRequest)
return
}
brief.CouncilID = councilID
brief.RoleName = roleName
fmt.Printf("\n📦 Received council brief for %s (%s)\n", councilID, roleName)
if brief.BriefURL != "" {
fmt.Printf(" Brief URL: %s\n", brief.BriefURL)
}
if brief.Summary != "" {
fmt.Printf(" Summary: %s\n", brief.Summary)
}
if h.CouncilManager != nil {
h.CouncilManager.HandleCouncilBrief(councilID, roleName, &brief)
}
logData := map[string]interface{}{
"event": "council_brief_received",
"council_id": councilID,
"role_name": roleName,
"project_name": brief.ProjectName,
"repository": brief.Repository,
"brief_url": brief.BriefURL,
"ucxl_address": brief.UCXLAddress,
"hmmm_topic": brief.HMMMTopic,
"expected_artifacts": brief.ExpectedArtifacts,
"timestamp": time.Now().Format(time.RFC3339),
}
if _, err := h.hypercoreLog.Append(logging.NetworkEvent, logData); err != nil {
fmt.Printf("Failed to log council brief: %v\n", err)
}
response := map[string]interface{}{
"status": "received",
"council_id": councilID,
"role_name": roleName,
"timestamp": time.Now().Unix(),
}
w.WriteHeader(http.StatusAccepted)
json.NewEncoder(w).Encode(response)
}

View File

@@ -11,18 +11,18 @@ WORKDIR /build
# Copy go mod files first (for better caching)
COPY go.mod go.sum ./
# Download dependencies
RUN go mod download
# Skip go mod download; we rely on vendored deps to avoid local replaces
RUN echo "Using vendored dependencies (skipping go mod download)"
# Copy source code
COPY . .
# Build the CHORUS binary with mod mode
# Build the CHORUS agent binary with vendored deps
RUN CGO_ENABLED=0 GOOS=linux go build \
-mod=mod \
-mod=vendor \
-ldflags='-w -s -extldflags "-static"' \
-o chorus \
./cmd/chorus
-o chorus-agent \
./cmd/agent
# Final minimal runtime image
FROM alpine:3.18
@@ -42,8 +42,8 @@ RUN mkdir -p /app/data && \
chown -R chorus:chorus /app
# Copy binary from builder stage
COPY --from=builder /build/chorus /app/chorus
RUN chmod +x /app/chorus
COPY --from=builder /build/chorus-agent /app/chorus-agent
RUN chmod +x /app/chorus-agent
# Switch to non-root user
USER chorus
@@ -64,5 +64,5 @@ ENV LOG_LEVEL=info \
CHORUS_HEALTH_PORT=8081 \
CHORUS_P2P_PORT=9000
# Start CHORUS
ENTRYPOINT ["/app/chorus"]
# Start CHORUS Agent
ENTRYPOINT ["/app/chorus-agent"]

View File

@@ -29,8 +29,8 @@ services:
- CHORUS_MAX_CONCURRENT_DHT=16 # Limit concurrent DHT queries
# Election stability windows (Medium-risk fix 2.1)
- CHORUS_ELECTION_MIN_TERM=30s # Minimum time between elections to prevent churn
- CHORUS_LEADER_MIN_TERM=45s # Minimum time before challenging healthy leader
- CHORUS_ELECTION_MIN_TERM=120s # Minimum time between elections to prevent churn
- CHORUS_LEADER_MIN_TERM=240s # Minimum time before challenging healthy leader
# Assignment system for runtime configuration (Medium-risk fix 2.2)
- ASSIGN_URL=${ASSIGN_URL:-} # Optional: WHOOSH assignment endpoint
@@ -57,6 +57,13 @@ services:
- CHORUS_MODELS=${CHORUS_MODELS:-meta/llama-3.1-8b-instruct}
- CHORUS_DEFAULT_REASONING_MODEL=${CHORUS_DEFAULT_REASONING_MODEL:-meta/llama-3.1-8b-instruct}
# LightRAG configuration (optional RAG enhancement)
- CHORUS_LIGHTRAG_ENABLED=${CHORUS_LIGHTRAG_ENABLED:-false}
- CHORUS_LIGHTRAG_BASE_URL=${CHORUS_LIGHTRAG_BASE_URL:-http://lightrag:9621}
- CHORUS_LIGHTRAG_TIMEOUT=${CHORUS_LIGHTRAG_TIMEOUT:-30s}
- CHORUS_LIGHTRAG_API_KEY=${CHORUS_LIGHTRAG_API_KEY:-your-secure-api-key-here}
- CHORUS_LIGHTRAG_DEFAULT_MODE=${CHORUS_LIGHTRAG_DEFAULT_MODE:-hybrid}
# Logging configuration
- LOG_LEVEL=${LOG_LEVEL:-info}
- LOG_FORMAT=${LOG_FORMAT:-structured}
@@ -95,7 +102,7 @@ services:
# Container resource limits
deploy:
mode: replicated
replicas: ${CHORUS_REPLICAS:-9}
replicas: ${CHORUS_REPLICAS:-20}
update_config:
parallelism: 1
delay: 10s
@@ -166,6 +173,8 @@ services:
WHOOSH_SERVER_READ_TIMEOUT: "30s"
WHOOSH_SERVER_WRITE_TIMEOUT: "30s"
WHOOSH_SERVER_SHUTDOWN_TIMEOUT: "30s"
# UI static directory (served at site root by WHOOSH)
WHOOSH_UI_DIR: "/app/ui"
# GITEA configuration
WHOOSH_GITEA_BASE_URL: https://gitea.chorus.services
@@ -210,7 +219,8 @@ services:
- jwt_secret
- service_tokens
- redis_password
# volumes:
volumes:
- whoosh_ui:/app/ui:ro
# - /var/run/docker.sock:/var/run/docker.sock # Disabled for agent assignment architecture
deploy:
replicas: 2
@@ -247,11 +257,11 @@ services:
- traefik.enable=true
- traefik.docker.network=tengig
- traefik.http.routers.whoosh.rule=Host(`whoosh.chorus.services`)
- traefik.http.routers.whoosh.entrypoints=web,web-secured
- traefik.http.routers.whoosh.tls=true
- traefik.http.routers.whoosh.tls.certresolver=letsencryptresolver
- traefik.http.routers.photoprism.entrypoints=web,web-secured
- traefik.http.services.whoosh.loadbalancer.server.port=8080
- traefik.http.services.photoprism.loadbalancer.passhostheader=true
- traefik.http.services.whoosh.loadbalancer.passhostheader=true
- traefik.http.middlewares.whoosh-auth.basicauth.users=admin:$2y$10$example_hash
networks:
- tengig
@@ -407,7 +417,7 @@ services:
# REQ: BACKBEAT-REQ-001 - Single BeatFrame publisher per cluster
# REQ: BACKBEAT-OPS-001 - One replica prefers leadership
backbeat-pulse:
image: anthonyrawlins/backbeat-pulse:v1.0.5
image: anthonyrawlins/backbeat-pulse:v1.0.6
command: >
./pulse
-cluster=chorus-production
@@ -574,6 +584,14 @@ services:
max-file: "3"
tag: "nats/{{.Name}}/{{.ID}}"
watchtower:
image: containrrr/watchtower
volumes:
- /var/run/docker.sock:/var/run/docker.sock
command: --interval 300 --cleanup --revive-stopped --include-stopped
restart: always
# KACHING services are deployed separately in their own stack
# License validation will access https://kaching.chorus.services/api
@@ -611,6 +629,12 @@ volumes:
type: none
o: bind
device: /rust/containers/WHOOSH/redis
whoosh_ui:
driver: local
driver_opts:
type: none
o: bind
device: /rust/containers/WHOOSH/ui
# Networks for CHORUS communication
@@ -645,7 +669,7 @@ secrets:
name: whoosh_webhook_token
jwt_secret:
external: true
name: whoosh_jwt_secret
name: whoosh_jwt_secret_v4
service_tokens:
external: true
name: whoosh_service_tokens

451
internal/council/manager.go Normal file
View File

@@ -0,0 +1,451 @@
package council
import (
"bytes"
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"hash/fnv"
"math/rand"
"net/http"
"strings"
"sync"
"time"
"chorus/internal/persona"
)
// CouncilOpportunity represents a council formation opportunity from WHOOSH.
type CouncilOpportunity struct {
CouncilID string `json:"council_id"`
ProjectName string `json:"project_name"`
Repository string `json:"repository"`
ProjectBrief string `json:"project_brief"`
CoreRoles []CouncilRole `json:"core_roles"`
OptionalRoles []CouncilRole `json:"optional_roles"`
UCXLAddress string `json:"ucxl_address"`
FormationDeadline time.Time `json:"formation_deadline"`
CreatedAt time.Time `json:"created_at"`
Metadata map[string]interface{} `json:"metadata"`
}
// CouncilRole represents a single role available within a council.
type CouncilRole struct {
RoleName string `json:"role_name"`
AgentName string `json:"agent_name"`
Required bool `json:"required"`
RequiredSkills []string `json:"required_skills"`
Description string `json:"description"`
}
// RoleProfile mirrors WHOOSH role profile metadata included in claim responses.
type RoleProfile struct {
RoleName string `json:"role_name"`
DisplayName string `json:"display_name"`
PromptKey string `json:"prompt_key"`
PromptPack string `json:"prompt_pack"`
Capabilities []string `json:"capabilities"`
BriefRoutingHint string `json:"brief_routing_hint"`
DefaultBriefOwner bool `json:"default_brief_owner"`
}
// CouncilBrief carries the high-level brief metadata for an activated council.
type CouncilBrief struct {
CouncilID string `json:"council_id"`
RoleName string `json:"role_name"`
ProjectName string `json:"project_name"`
Repository string `json:"repository"`
Summary string `json:"summary"`
BriefURL string `json:"brief_url"`
IssueID *int64 `json:"issue_id"`
UCXLAddress string `json:"ucxl_address"`
ExpectedArtifacts []string `json:"expected_artifacts"`
HMMMTopic string `json:"hmmm_topic"`
}
// RoleAssignment keeps track of the agent's current council engagement.
type RoleAssignment struct {
CouncilID string
RoleName string
UCXLAddress string
AssignedAt time.Time
Profile RoleProfile
Brief *CouncilBrief
Persona *persona.Persona
PersonaHash string
}
var ErrRoleConflict = errors.New("council role already claimed")
const defaultModelProvider = "ollama"
// Manager handles council opportunity evaluation, persona preparation, and brief handoff.
type Manager struct {
agentID string
agentName string
endpoint string
p2pAddr string
capabilities []string
httpClient *http.Client
personaLoader *persona.Loader
mu sync.Mutex
currentAssignment *RoleAssignment
}
// NewManager creates a new council manager.
func NewManager(agentID, agentName, endpoint, p2pAddr string, capabilities []string) *Manager {
loader, err := persona.NewLoader()
if err != nil {
fmt.Printf("⚠️ Persona loader initialisation failed: %v\n", err)
}
return &Manager{
agentID: agentID,
agentName: agentName,
endpoint: endpoint,
p2pAddr: p2pAddr,
capabilities: capabilities,
httpClient: &http.Client{Timeout: 10 * time.Second},
personaLoader: loader,
}
}
// AgentID returns the agent's identifier.
func (m *Manager) AgentID() string {
return m.agentID
}
// EvaluateOpportunity analyzes a council opportunity and decides whether to claim a role.
func (m *Manager) EvaluateOpportunity(opportunity *CouncilOpportunity, whooshEndpoint string) error {
fmt.Printf("\n🤔 Evaluating council opportunity for: %s\n", opportunity.ProjectName)
if current := m.currentAssignmentSnapshot(); current != nil {
fmt.Printf(" Agent already assigned to council %s as %s; skipping new claims\n", current.CouncilID, current.RoleName)
return nil
}
const maxAttempts = 10
const retryDelay = 3 * time.Second
var attemptedAtLeastOne bool
for attempt := 1; attempt <= maxAttempts; attempt++ {
assignment, attemptedCore, err := m.tryClaimRoles(opportunity.CoreRoles, opportunity, whooshEndpoint, "CORE")
attemptedAtLeastOne = attemptedAtLeastOne || attemptedCore
if assignment != nil {
m.setCurrentAssignment(assignment)
return nil
}
if err != nil && !errors.Is(err, ErrRoleConflict) {
return err
}
assignment, attemptedOptional, err := m.tryClaimRoles(opportunity.OptionalRoles, opportunity, whooshEndpoint, "OPTIONAL")
attemptedAtLeastOne = attemptedAtLeastOne || attemptedOptional
if assignment != nil {
m.setCurrentAssignment(assignment)
return nil
}
if err != nil && !errors.Is(err, ErrRoleConflict) {
return err
}
if !attemptedAtLeastOne {
fmt.Printf(" ✗ No suitable roles found for this agent\n\n")
return nil
}
fmt.Printf(" ↻ Attempt %d did not secure a council role; retrying in %s...\n", attempt, retryDelay)
time.Sleep(retryDelay)
}
return fmt.Errorf("exhausted council role claim attempts for council %s", opportunity.CouncilID)
}
func (m *Manager) tryClaimRoles(roles []CouncilRole, opportunity *CouncilOpportunity, whooshEndpoint string, roleType string) (*RoleAssignment, bool, error) {
var attempted bool
// Shuffle roles deterministically per agent+council to reduce herd on the first role
shuffled := append([]CouncilRole(nil), roles...)
if len(shuffled) > 1 {
h := fnv.New64a()
_, _ = h.Write([]byte(m.agentID))
_, _ = h.Write([]byte(opportunity.CouncilID))
seed := int64(h.Sum64())
r := rand.New(rand.NewSource(seed))
r.Shuffle(len(shuffled), func(i, j int) { shuffled[i], shuffled[j] = shuffled[j], shuffled[i] })
}
for _, role := range shuffled {
if !m.shouldClaimRole(role, opportunity) {
continue
}
attempted = true
fmt.Printf(" ✓ Attempting to claim %s role: %s (%s)\n", roleType, role.AgentName, role.RoleName)
assignment, err := m.claimRole(opportunity, role, whooshEndpoint)
if assignment != nil {
return assignment, attempted, nil
}
if errors.Is(err, ErrRoleConflict) {
fmt.Printf(" ⚠️ Role %s already claimed by another agent, trying next role...\n", role.RoleName)
continue
}
if err != nil {
return nil, attempted, err
}
}
return nil, attempted, nil
}
func (m *Manager) shouldClaimRole(role CouncilRole, _ *CouncilOpportunity) bool {
if m.hasActiveAssignment() {
return false
}
// TODO: implement capability-based selection. For now, opportunistically claim any available role.
return true
}
func (m *Manager) claimRole(opportunity *CouncilOpportunity, role CouncilRole, whooshEndpoint string) (*RoleAssignment, error) {
claimURL := fmt.Sprintf("%s/api/v1/councils/%s/claims", strings.TrimRight(whooshEndpoint, "/"), opportunity.CouncilID)
claim := map[string]interface{}{
"agent_id": m.agentID,
"agent_name": m.agentName,
"role_name": role.RoleName,
"capabilities": m.capabilities,
"confidence": 0.75, // TODO: calculate based on capability match quality.
"reasoning": fmt.Sprintf("Agent has capabilities matching role: %s", role.RoleName),
"endpoint": m.endpoint,
"p2p_addr": m.p2pAddr,
}
payload, err := json.Marshal(claim)
if err != nil {
return nil, fmt.Errorf("failed to marshal claim: %w", err)
}
req, err := http.NewRequest(http.MethodPost, claimURL, bytes.NewBuffer(payload))
if err != nil {
return nil, fmt.Errorf("failed to create claim request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := m.httpClient.Do(req)
if err != nil {
return nil, fmt.Errorf("failed to send claim: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusCreated {
var errorResp map[string]interface{}
_ = json.NewDecoder(resp.Body).Decode(&errorResp)
if resp.StatusCode == http.StatusConflict {
reason := "role already claimed"
if msg, ok := errorResp["error"].(string); ok && msg != "" {
reason = msg
}
return nil, fmt.Errorf("%w: %s", ErrRoleConflict, reason)
}
return nil, fmt.Errorf("claim rejected (status %d): %v", resp.StatusCode, errorResp)
}
var claimResp roleClaimResponse
if err := json.NewDecoder(resp.Body).Decode(&claimResp); err != nil {
return nil, fmt.Errorf("failed to decode claim response: %w", err)
}
assignment := &RoleAssignment{
CouncilID: opportunity.CouncilID,
RoleName: role.RoleName,
UCXLAddress: claimResp.UCXLAddress,
Profile: claimResp.RoleProfile,
}
if t, err := time.Parse(time.RFC3339, claimResp.AssignedAt); err == nil {
assignment.AssignedAt = t
}
if claimResp.CouncilBrief != nil {
assignment.Brief = claimResp.CouncilBrief
}
fmt.Printf("\n✅ ROLE CLAIM ACCEPTED!\n")
fmt.Printf(" Council ID: %s\n", opportunity.CouncilID)
fmt.Printf(" Role: %s (%s)\n", role.AgentName, role.RoleName)
fmt.Printf(" UCXL: %s\n", assignment.UCXLAddress)
fmt.Printf(" Assigned At: %s\n", claimResp.AssignedAt)
if err := m.preparePersonaAndAck(opportunity.CouncilID, role.RoleName, &assignment.Profile, claimResp.CouncilBrief, whooshEndpoint, assignment); err != nil {
fmt.Printf(" ⚠️ Persona preparation encountered an issue: %v\n", err)
}
fmt.Printf("\n")
return assignment, nil
}
func (m *Manager) preparePersonaAndAck(councilID, roleName string, profile *RoleProfile, brief *CouncilBrief, whooshEndpoint string, assignment *RoleAssignment) error {
if m.personaLoader == nil {
return m.sendPersonaAck(councilID, roleName, whooshEndpoint, nil, "", "failed", []string{"persona loader unavailable"})
}
promptKey := profile.PromptKey
if promptKey == "" {
promptKey = roleName
}
personaCapabilities := profile.Capabilities
personaCapabilities = append([]string{}, personaCapabilities...)
personaEntry, err := m.personaLoader.Compose(promptKey, profile.DisplayName, "", personaCapabilities)
if err != nil {
return m.sendPersonaAck(councilID, roleName, whooshEndpoint, nil, "", "failed", []string{err.Error()})
}
hash := sha256.Sum256([]byte(personaEntry.SystemPrompt))
personaHash := hex.EncodeToString(hash[:])
assignment.Persona = personaEntry
assignment.PersonaHash = personaHash
if err := m.sendPersonaAck(councilID, roleName, whooshEndpoint, personaEntry, personaHash, "loaded", nil); err != nil {
return err
}
return nil
}
func (m *Manager) sendPersonaAck(councilID, roleName, whooshEndpoint string, personaEntry *persona.Persona, personaHash string, status string, errs []string) error {
ackURL := fmt.Sprintf("%s/api/v1/councils/%s/roles/%s/personas", strings.TrimRight(whooshEndpoint, "/"), councilID, roleName)
payload := map[string]interface{}{
"agent_id": m.agentID,
"status": status,
"model_provider": defaultModelProvider,
"capabilities": m.capabilities,
"metadata": map[string]interface{}{
"endpoint": m.endpoint,
"p2p_addr": m.p2pAddr,
"agent_name": m.agentName,
},
}
if personaEntry != nil {
payload["system_prompt_hash"] = personaHash
payload["model_name"] = personaEntry.Model
if len(personaEntry.Capabilities) > 0 {
payload["capabilities"] = personaEntry.Capabilities
}
}
if len(errs) > 0 {
payload["errors"] = errs
}
body, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("marshal persona ack: %w", err)
}
req, err := http.NewRequest(http.MethodPost, ackURL, bytes.NewBuffer(body))
if err != nil {
return fmt.Errorf("create persona ack request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
resp, err := m.httpClient.Do(req)
if err != nil {
return fmt.Errorf("send persona ack: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("persona ack rejected with status %d", resp.StatusCode)
}
fmt.Printf(" 📫 Persona status '%s' acknowledged by WHOOSH\n", status)
return nil
}
// HandleCouncilBrief records the design brief assigned to this agent once WHOOSH dispatches it.
func (m *Manager) HandleCouncilBrief(councilID, roleName string, brief *CouncilBrief) {
if brief == nil {
return
}
m.mu.Lock()
defer m.mu.Unlock()
if m.currentAssignment == nil {
fmt.Printf("⚠️ Received council brief for %s (%s) but agent has no active assignment\n", councilID, roleName)
return
}
if m.currentAssignment.CouncilID != councilID || !strings.EqualFold(m.currentAssignment.RoleName, roleName) {
fmt.Printf("⚠️ Received council brief for %s (%s) but agent is assigned to %s (%s)\n", councilID, roleName, m.currentAssignment.CouncilID, m.currentAssignment.RoleName)
return
}
brief.CouncilID = councilID
brief.RoleName = roleName
m.currentAssignment.Brief = brief
fmt.Printf("📦 Design brief received for council %s (%s)\n", councilID, roleName)
if brief.BriefURL != "" {
fmt.Printf(" Brief URL: %s\n", brief.BriefURL)
}
if brief.Summary != "" {
fmt.Printf(" Summary: %s\n", brief.Summary)
}
if len(brief.ExpectedArtifacts) > 0 {
fmt.Printf(" Expected Artifacts: %v\n", brief.ExpectedArtifacts)
}
if brief.HMMMTopic != "" {
fmt.Printf(" HMMM Topic: %s\n", brief.HMMMTopic)
}
}
func (m *Manager) hasActiveAssignment() bool {
m.mu.Lock()
defer m.mu.Unlock()
return m.currentAssignment != nil
}
func (m *Manager) setCurrentAssignment(assignment *RoleAssignment) {
m.mu.Lock()
defer m.mu.Unlock()
m.currentAssignment = assignment
}
func (m *Manager) currentAssignmentSnapshot() *RoleAssignment {
m.mu.Lock()
defer m.mu.Unlock()
return m.currentAssignment
}
// GetCurrentAssignment returns the current role assignment (public accessor)
func (m *Manager) GetCurrentAssignment() *RoleAssignment {
return m.currentAssignmentSnapshot()
}
// roleClaimResponse mirrors WHOOSH role claim response payload.
type roleClaimResponse struct {
Status string `json:"status"`
CouncilID string `json:"council_id"`
RoleName string `json:"role_name"`
UCXLAddress string `json:"ucxl_address"`
AssignedAt string `json:"assigned_at"`
RoleProfile RoleProfile `json:"role_profile"`
CouncilBrief *CouncilBrief `json:"council_brief"`
PersonaStatus string `json:"persona_status"`
}

View File

@@ -1,12 +1,18 @@
package runtime
import (
"bytes"
"context"
"encoding/json"
"fmt"
"net/http"
"time"
"chorus/internal/council"
"chorus/internal/logging"
"chorus/pkg/ai"
"chorus/pkg/dht"
"chorus/pkg/execution"
"chorus/pkg/health"
"chorus/pkg/shutdown"
"chorus/pubsub"
@@ -39,6 +45,10 @@ func (r *SharedRuntime) StartAgentMode() error {
// Start status reporting
go r.statusReporter()
// Start council brief processing
ctx := context.Background()
go r.processBriefs(ctx)
r.Logger.Info("🔍 Listening for peers on container network...")
r.Logger.Info("📡 Ready for task coordination and meta-discussion")
r.Logger.Info("🎯 HMMM collaborative reasoning enabled")
@@ -321,3 +331,185 @@ func (r *SharedRuntime) setupGracefulShutdown(shutdownManager *shutdown.Manager,
r.Logger.Info("🛡️ Graceful shutdown components registered")
}
// processBriefs polls for council briefs and executes them
func (r *SharedRuntime) processBriefs(ctx context.Context) {
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
r.Logger.Info("📦 Brief processing loop started")
for {
select {
case <-ctx.Done():
r.Logger.Info("📦 Brief processing loop stopped")
return
case <-ticker.C:
if r.HTTPServer == nil || r.HTTPServer.CouncilManager == nil {
continue
}
assignment := r.HTTPServer.CouncilManager.GetCurrentAssignment()
if assignment == nil || assignment.Brief == nil {
continue
}
// Check if we have a brief to execute
brief := assignment.Brief
if brief.BriefURL == "" && brief.Summary == "" {
continue
}
r.Logger.Info("📦 Processing design brief for council %s, role %s", assignment.CouncilID, assignment.RoleName)
// Execute the brief
if err := r.executeBrief(ctx, assignment); err != nil {
r.Logger.Error("❌ Failed to execute brief: %v", err)
continue
}
r.Logger.Info("✅ Brief execution completed for council %s", assignment.CouncilID)
// Clear the brief after execution to prevent re-execution
assignment.Brief = nil
}
}
}
// executeBrief executes a council brief using the ExecutionEngine
func (r *SharedRuntime) executeBrief(ctx context.Context, assignment *council.RoleAssignment) error {
brief := assignment.Brief
if brief == nil {
return fmt.Errorf("no brief to execute")
}
// Create execution engine
engine := execution.NewTaskExecutionEngine()
// Create AI provider factory
aiFactory := ai.NewProviderFactory()
engineConfig := &execution.EngineConfig{
AIProviderFactory: aiFactory,
MaxConcurrentTasks: 1,
DefaultTimeout: time.Hour,
EnableMetrics: true,
LogLevel: "info",
}
if err := engine.Initialize(ctx, engineConfig); err != nil {
return fmt.Errorf("failed to initialize execution engine: %w", err)
}
defer engine.Shutdown()
// Build execution request
request := r.buildExecutionRequest(assignment)
r.Logger.Info("🚀 Executing brief for council %s, role %s", assignment.CouncilID, assignment.RoleName)
// Track task
taskID := fmt.Sprintf("council-%s-%s", assignment.CouncilID, assignment.RoleName)
r.TaskTracker.AddTask(taskID)
defer r.TaskTracker.RemoveTask(taskID)
// Execute the task
result, err := engine.ExecuteTask(ctx, request)
if err != nil {
return fmt.Errorf("task execution failed: %w", err)
}
r.Logger.Info("✅ Task execution successful. Output: %s", result.Output)
// Upload results to WHOOSH
if err := r.uploadResults(assignment, result); err != nil {
r.Logger.Error("⚠️ Failed to upload results to WHOOSH: %v", err)
// Don't fail the execution if upload fails
}
return nil
}
// buildExecutionRequest converts a council brief to an execution request
func (r *SharedRuntime) buildExecutionRequest(assignment *council.RoleAssignment) *execution.TaskExecutionRequest {
brief := assignment.Brief
// Build task description from brief
taskDescription := brief.Summary
if taskDescription == "" {
taskDescription = "Execute council brief"
}
// Add additional context
additionalContext := map[string]interface{}{
"council_id": assignment.CouncilID,
"role_name": assignment.RoleName,
"brief_url": brief.BriefURL,
"expected_artifacts": brief.ExpectedArtifacts,
"hmmm_topic": brief.HMMMTopic,
"persona": assignment.Persona,
}
return &execution.TaskExecutionRequest{
ID: fmt.Sprintf("council-%s-%s", assignment.CouncilID, assignment.RoleName),
Type: "council_brief",
Description: taskDescription,
Context: additionalContext,
Requirements: &execution.TaskRequirements{
AIModel: r.Config.AI.Provider,
SandboxType: "docker",
RequiredTools: []string{},
},
Timeout: time.Hour,
}
}
// uploadResults uploads execution results to WHOOSH
func (r *SharedRuntime) uploadResults(assignment *council.RoleAssignment, result *execution.TaskExecutionResult) error {
// Get WHOOSH endpoint from environment or config
whooshEndpoint := r.Config.WHOOSHAPI.BaseURL
if whooshEndpoint == "" {
whooshEndpoint = "http://whoosh:8080"
}
// Build result payload
payload := map[string]interface{}{
"council_id": assignment.CouncilID,
"role_name": assignment.RoleName,
"agent_id": r.Config.Agent.ID,
"ucxl_address": assignment.UCXLAddress,
"output": result.Output,
"artifacts": result.Artifacts,
"success": result.Success,
"error_message": result.ErrorMessage,
"execution_time": result.Metrics.Duration.Seconds(),
"timestamp": time.Now().Unix(),
}
jsonData, err := json.Marshal(payload)
if err != nil {
return fmt.Errorf("failed to marshal result payload: %w", err)
}
// Send to WHOOSH
url := fmt.Sprintf("%s/api/councils/%s/results", whooshEndpoint, assignment.CouncilID)
req, err := http.NewRequest("POST", url, bytes.NewBuffer(jsonData))
if err != nil {
return fmt.Errorf("failed to create HTTP request: %w", err)
}
req.Header.Set("Content-Type", "application/json")
client := &http.Client{Timeout: 30 * time.Second}
resp, err := client.Do(req)
if err != nil {
return fmt.Errorf("failed to send results to WHOOSH: %w", err)
}
defer resp.Body.Close()
if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted {
return fmt.Errorf("WHOOSH returned status %d", resp.StatusCode)
}
r.Logger.Info("📤 Results uploaded to WHOOSH for council %s", assignment.CouncilID)
return nil
}