Files
WHOOSH/internal/orchestrator/agent_deployer.go
Claude Code 9aeaa433fc Fix Docker Swarm discovery network name mismatch
- Changed NetworkName from 'chorus_default' to 'chorus_net'
- This matches the actual network 'CHORUS_chorus_net' (service prefix added automatically)
- Fixes discovered_count:0 issue - now successfully discovering all 25 agents
- Updated IMPLEMENTATION-SUMMARY with deployment status

Result: All 25 CHORUS agents now discovered successfully via Docker Swarm API

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-10-10 10:35:25 +11:00

788 lines
28 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package orchestrator
import (
"context"
"fmt"
"sync"
"time"
"github.com/chorus-services/whoosh/internal/agents"
"github.com/chorus-services/whoosh/internal/composer"
"github.com/chorus-services/whoosh/internal/council"
"github.com/docker/docker/api/types/swarm"
"github.com/google/uuid"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/rs/zerolog/log"
)
// AgentDeployer manages deployment of agent containers for teams
type AgentDeployer struct {
swarmManager *SwarmManager
db *pgxpool.Pool
registry string
ctx context.Context
cancel context.CancelFunc
constraintMu sync.Mutex
}
// NewAgentDeployer creates a new agent deployer
func NewAgentDeployer(swarmManager *SwarmManager, db *pgxpool.Pool, registry string) *AgentDeployer {
ctx, cancel := context.WithCancel(context.Background())
if registry == "" {
registry = "registry.home.deepblack.cloud"
}
return &AgentDeployer{
swarmManager: swarmManager,
db: db,
registry: registry,
ctx: ctx,
cancel: cancel,
}
}
// Close shuts down the agent deployer
func (ad *AgentDeployer) Close() error {
ad.cancel()
return nil
}
// DeploymentRequest represents a request to deploy agents for a team
type DeploymentRequest struct {
TeamID uuid.UUID `json:"team_id"`
TaskID uuid.UUID `json:"task_id"`
TeamComposition *composer.TeamComposition `json:"team_composition"`
TaskContext *TaskContext `json:"task_context"`
DeploymentMode string `json:"deployment_mode"` // immediate, scheduled, manual
}
// DeploymentResult represents the result of a deployment operation
type DeploymentResult struct {
TeamID uuid.UUID `json:"team_id"`
TaskID uuid.UUID `json:"task_id"`
DeployedServices []DeployedService `json:"deployed_services"`
Status string `json:"status"` // success, partial, failed
Message string `json:"message"`
DeployedAt time.Time `json:"deployed_at"`
Errors []string `json:"errors,omitempty"`
}
// DeployedService represents a successfully deployed service
type DeployedService struct {
ServiceID string `json:"service_id"`
ServiceName string `json:"service_name"`
AgentRole string `json:"agent_role"`
AgentID string `json:"agent_id"`
Image string `json:"image"`
Status string `json:"status"`
}
// CouncilDeploymentRequest represents a request to deploy council agents
type CouncilDeploymentRequest struct {
CouncilID uuid.UUID `json:"council_id"`
ProjectName string `json:"project_name"`
CouncilComposition *council.CouncilComposition `json:"council_composition"`
ProjectContext *CouncilProjectContext `json:"project_context"`
DeploymentMode string `json:"deployment_mode"` // immediate, scheduled, manual
}
// CouncilProjectContext contains the project information for council agents
type CouncilProjectContext struct {
ProjectName string `json:"project_name"`
Repository string `json:"repository"`
ProjectBrief string `json:"project_brief"`
Constraints string `json:"constraints,omitempty"`
TechLimits string `json:"tech_limits,omitempty"`
ComplianceNotes string `json:"compliance_notes,omitempty"`
Targets string `json:"targets,omitempty"`
ExternalURL string `json:"external_url,omitempty"`
}
// DeployTeamAgents deploys all agents for a team
func (ad *AgentDeployer) DeployTeamAgents(request *DeploymentRequest) (*DeploymentResult, error) {
log.Info().
Str("team_id", request.TeamID.String()).
Str("task_id", request.TaskID.String()).
Int("agent_matches", len(request.TeamComposition.AgentMatches)).
Msg("🚀 Starting team agent deployment")
result := &DeploymentResult{
TeamID: request.TeamID,
TaskID: request.TaskID,
DeployedServices: []DeployedService{},
DeployedAt: time.Now(),
Errors: []string{},
}
// Deploy each agent in the team composition
for _, agentMatch := range request.TeamComposition.AgentMatches {
service, err := ad.deploySingleAgent(request, agentMatch)
if err != nil {
errorMsg := fmt.Sprintf("Failed to deploy agent %s for role %s: %v",
agentMatch.Agent.Name, agentMatch.Role.Name, err)
result.Errors = append(result.Errors, errorMsg)
log.Error().
Err(err).
Str("agent_id", agentMatch.Agent.ID.String()).
Str("role", agentMatch.Role.Name).
Msg("Failed to deploy agent")
continue
}
deployedService := DeployedService{
ServiceID: service.ID,
ServiceName: service.Spec.Name,
AgentRole: agentMatch.Role.Name,
AgentID: agentMatch.Agent.ID.String(),
Image: service.Spec.TaskTemplate.ContainerSpec.Image,
Status: "deploying",
}
result.DeployedServices = append(result.DeployedServices, deployedService)
// Update database with deployment info
err = ad.recordDeployment(request.TeamID, request.TaskID, agentMatch, service.ID)
if err != nil {
log.Error().
Err(err).
Str("service_id", service.ID).
Msg("Failed to record deployment in database")
}
}
// Determine overall deployment status
if len(result.Errors) == 0 {
result.Status = "success"
result.Message = fmt.Sprintf("Successfully deployed %d agents", len(result.DeployedServices))
} else if len(result.DeployedServices) > 0 {
result.Status = "partial"
result.Message = fmt.Sprintf("Deployed %d/%d agents with %d errors",
len(result.DeployedServices),
len(request.TeamComposition.AgentMatches),
len(result.Errors))
} else {
result.Status = "failed"
result.Message = "Failed to deploy any agents"
}
// Update team deployment status in database
err := ad.updateTeamDeploymentStatus(request.TeamID, result.Status, result.Message)
if err != nil {
log.Error().
Err(err).
Str("team_id", request.TeamID.String()).
Msg("Failed to update team deployment status")
}
log.Info().
Str("team_id", request.TeamID.String()).
Str("status", result.Status).
Int("deployed", len(result.DeployedServices)).
Int("errors", len(result.Errors)).
Msg("✅ Team agent deployment completed")
return result, nil
}
// selectAgentImage determines the appropriate CHORUS image for the agent role
func (ad *AgentDeployer) selectAgentImage(roleName string, agent *composer.Agent) string {
// All agents use the same CHORUS image, but with different configurations
// The image handles role specialization internally based on environment variables
return "docker.io/anthonyrawlins/chorus:backbeat-v2.0.1"
}
// buildAgentEnvironment creates environment variables for CHORUS agent configuration
func (ad *AgentDeployer) buildAgentEnvironment(request *DeploymentRequest, agentMatch *composer.AgentMatch) map[string]string {
env := map[string]string{
// Core CHORUS configuration - just pass the agent name from human-roles.yaml
// CHORUS will handle its own prompt composition and system behavior
"CHORUS_AGENT_NAME": agentMatch.Role.Name, // This maps to human-roles.yaml agent definition
"CHORUS_TEAM_ID": request.TeamID.String(),
"CHORUS_TASK_ID": request.TaskID.String(),
// Essential task context
"CHORUS_PROJECT": request.TaskContext.Repository,
"CHORUS_TASK_TITLE": request.TaskContext.IssueTitle,
"CHORUS_TASK_DESC": request.TaskContext.IssueDescription,
"CHORUS_PRIORITY": request.TaskContext.Priority,
"CHORUS_EXTERNAL_URL": request.TaskContext.ExternalURL,
// WHOOSH coordination
"WHOOSH_COORDINATOR": "true",
"WHOOSH_ENDPOINT": "http://whoosh:8080",
// Docker access for CHORUS sandbox management
"DOCKER_HOST": "unix:///var/run/docker.sock",
}
return env
}
// Note: CHORUS handles its own prompt composition from human-roles.yaml
// We just need to pass the agent name and essential task context
// determineAgentType maps role to agent type for resource allocation
func (ad *AgentDeployer) determineAgentType(agentMatch *composer.AgentMatch) string {
// Simple mapping for now - could be enhanced based on role complexity
return "standard"
}
// calculateResources determines resource requirements for the agent
func (ad *AgentDeployer) calculateResources(agentMatch *composer.AgentMatch) ResourceLimits {
// Standard resource allocation for CHORUS agents
// CHORUS handles its own resource management internally
return ResourceLimits{
CPULimit: 1000000000, // 1 CPU core
MemoryLimit: 1073741824, // 1GB RAM
CPURequest: 500000000, // 0.5 CPU core
MemoryRequest: 536870912, // 512MB RAM
}
}
// buildAgentVolumes creates volume mounts for CHORUS agents
func (ad *AgentDeployer) buildAgentVolumes(request *DeploymentRequest) []VolumeMount {
return []VolumeMount{
{
Type: "bind",
Source: "/var/run/docker.sock",
Target: "/var/run/docker.sock",
ReadOnly: false, // CHORUS needs Docker access for sandboxing
},
{
Type: "volume",
Source: fmt.Sprintf("whoosh-workspace-%s", request.TeamID.String()),
Target: "/workspace",
ReadOnly: false,
},
}
}
// buildAgentPlacement creates placement constraints for agents
func (ad *AgentDeployer) buildAgentPlacement(agentMatch *composer.AgentMatch) PlacementConfig {
return PlacementConfig{
Constraints: []string{
"node.role==worker", // Prefer worker nodes for agent containers
},
// Note: Placement preferences removed for compilation compatibility
}
}
// deploySingleAgent deploys a single agent for a specific role
func (ad *AgentDeployer) deploySingleAgent(request *DeploymentRequest, agentMatch *composer.AgentMatch) (*swarm.Service, error) {
// Determine agent image based on role
image := ad.selectAgentImage(agentMatch.Role.Name, agentMatch.Agent)
// Build deployment configuration
config := &AgentDeploymentConfig{
TeamID: request.TeamID.String(),
TaskID: request.TaskID.String(),
AgentRole: agentMatch.Role.Name,
AgentType: ad.determineAgentType(agentMatch),
Image: image,
Replicas: 1, // Start with single replica per agent
Resources: ad.calculateResources(agentMatch),
Environment: ad.buildAgentEnvironment(request, agentMatch),
TaskContext: *request.TaskContext,
Networks: []string{"chorus_default"},
Volumes: ad.buildAgentVolumes(request),
Placement: ad.buildAgentPlacement(agentMatch),
}
// Deploy the service
service, err := ad.swarmManager.DeployAgent(config)
if err != nil {
return nil, fmt.Errorf("failed to deploy agent service: %w", err)
}
return service, nil
}
// recordDeployment records agent deployment information in the database
func (ad *AgentDeployer) recordDeployment(teamID uuid.UUID, taskID uuid.UUID, agentMatch *composer.AgentMatch, serviceID string) error {
query := `
INSERT INTO agent_deployments (team_id, task_id, agent_id, role_id, service_id, status, deployed_at)
VALUES ($1, $2, $3, $4, $5, $6, NOW())
`
_, err := ad.db.Exec(ad.ctx, query, teamID, taskID, agentMatch.Agent.ID, agentMatch.Role.ID, serviceID, "deployed")
return err
}
// updateTeamDeploymentStatus updates the team deployment status in the database
func (ad *AgentDeployer) updateTeamDeploymentStatus(teamID uuid.UUID, status, message string) error {
query := `
UPDATE teams
SET deployment_status = $1, deployment_message = $2, updated_at = NOW()
WHERE id = $3
`
_, err := ad.db.Exec(ad.ctx, query, status, message, teamID)
return err
}
// AssignCouncilAgents assigns council roles to available CHORUS agents instead of deploying new services
func (ad *AgentDeployer) AssignCouncilAgents(request *CouncilDeploymentRequest) (*council.CouncilDeploymentResult, error) {
log.Info().
Str("council_id", request.CouncilID.String()).
Str("project_name", request.ProjectName).
Int("core_agents", len(request.CouncilComposition.CoreAgents)).
Int("optional_agents", len(request.CouncilComposition.OptionalAgents)).
Msg("🎭 Starting council agent assignment to available CHORUS agents")
result := &council.CouncilDeploymentResult{
CouncilID: request.CouncilID,
ProjectName: request.ProjectName,
DeployedAgents: []council.DeployedCouncilAgent{},
DeployedAt: time.Now(),
Errors: []string{},
}
// Get available CHORUS agents from the registry
availableAgents, err := ad.getAvailableChorusAgents()
if err != nil {
return result, fmt.Errorf("failed to get available CHORUS agents: %w", err)
}
if len(availableAgents) == 0 {
result.Status = "failed"
result.Message = "No available CHORUS agents found for council assignment"
result.Errors = append(result.Errors, "No available agents broadcasting availability")
return result, fmt.Errorf("no available CHORUS agents for council formation")
}
log.Info().
Int("available_agents", len(availableAgents)).
Msg("Found available CHORUS agents for council assignment")
// Assign core agents (required)
assignedCount := 0
for _, councilAgent := range request.CouncilComposition.CoreAgents {
if assignedCount >= len(availableAgents) {
errorMsg := fmt.Sprintf("Not enough available agents for role %s - need %d more agents",
councilAgent.RoleName, len(request.CouncilComposition.CoreAgents)+len(request.CouncilComposition.OptionalAgents)-assignedCount)
result.Errors = append(result.Errors, errorMsg)
break
}
// Select next available agent
chorusAgent := availableAgents[assignedCount]
// Assign the council role to this CHORUS agent
deployedAgent, err := ad.assignRoleToChorusAgent(request, councilAgent, chorusAgent)
if err != nil {
errorMsg := fmt.Sprintf("Failed to assign role %s to agent %s: %v",
councilAgent.RoleName, chorusAgent.Name, err)
result.Errors = append(result.Errors, errorMsg)
log.Error().
Err(err).
Str("council_agent_id", councilAgent.AgentID).
Str("chorus_agent_id", chorusAgent.ID.String()).
Str("role", councilAgent.RoleName).
Msg("Failed to assign council role to CHORUS agent")
continue
}
result.DeployedAgents = append(result.DeployedAgents, *deployedAgent)
assignedCount++
// Update database with assignment info
err = ad.recordCouncilAgentAssignment(request.CouncilID, councilAgent, chorusAgent.ID.String())
if err != nil {
log.Error().
Err(err).
Str("chorus_agent_id", chorusAgent.ID.String()).
Msg("Failed to record council agent assignment in database")
}
}
// Assign optional agents (best effort)
for _, councilAgent := range request.CouncilComposition.OptionalAgents {
if assignedCount >= len(availableAgents) {
log.Info().
Str("role", councilAgent.RoleName).
Msg("No more available agents for optional council role")
break
}
// Select next available agent
chorusAgent := availableAgents[assignedCount]
// Assign the optional council role to this CHORUS agent
deployedAgent, err := ad.assignRoleToChorusAgent(request, councilAgent, chorusAgent)
if err != nil {
// Optional agents failing is not critical
log.Warn().
Err(err).
Str("council_agent_id", councilAgent.AgentID).
Str("chorus_agent_id", chorusAgent.ID.String()).
Str("role", councilAgent.RoleName).
Msg("Failed to assign optional council role (non-critical)")
continue
}
result.DeployedAgents = append(result.DeployedAgents, *deployedAgent)
assignedCount++
// Update database with assignment info
err = ad.recordCouncilAgentAssignment(request.CouncilID, councilAgent, chorusAgent.ID.String())
if err != nil {
log.Error().
Err(err).
Str("chorus_agent_id", chorusAgent.ID.String()).
Msg("Failed to record council agent assignment in database")
}
}
// Determine overall assignment status
coreAgentsCount := len(request.CouncilComposition.CoreAgents)
assignedCoreAgents := 0
for _, deployedAgent := range result.DeployedAgents {
// Check if this assigned agent is a core agent
for _, coreAgent := range request.CouncilComposition.CoreAgents {
if coreAgent.RoleName == deployedAgent.RoleName {
assignedCoreAgents++
break
}
}
}
if assignedCoreAgents == coreAgentsCount {
result.Status = "success"
result.Message = fmt.Sprintf("Successfully assigned %d agents (%d core, %d optional) to council roles",
len(result.DeployedAgents), assignedCoreAgents, len(result.DeployedAgents)-assignedCoreAgents)
} else if assignedCoreAgents > 0 {
result.Status = "partial"
result.Message = fmt.Sprintf("Assigned %d/%d core agents with %d errors",
assignedCoreAgents, coreAgentsCount, len(result.Errors))
} else {
result.Status = "failed"
result.Message = "Failed to assign any core council agents"
}
// Update council assignment status in database
err = ad.updateCouncilDeploymentStatus(request.CouncilID, result.Status, result.Message)
if err != nil {
log.Error().
Err(err).
Str("council_id", request.CouncilID.String()).
Msg("Failed to update council assignment status")
}
log.Info().
Str("council_id", request.CouncilID.String()).
Str("status", result.Status).
Int("assigned", len(result.DeployedAgents)).
Int("errors", len(result.Errors)).
Msg("✅ Council agent assignment completed")
return result, nil
}
// deploySingleCouncilAgent deploys a single council agent
func (ad *AgentDeployer) deploySingleCouncilAgent(request *CouncilDeploymentRequest, agent council.CouncilAgent) (*council.DeployedCouncilAgent, error) {
// Use the CHORUS image for all council agents
image := "docker.io/anthonyrawlins/chorus:backbeat-v2.0.1"
// Build council-specific deployment configuration
config := &AgentDeploymentConfig{
TeamID: request.CouncilID.String(), // Use council ID as team ID
TaskID: request.CouncilID.String(), // Use council ID as task ID
AgentRole: agent.RoleName,
AgentType: "council",
Image: image,
Replicas: 1, // Single replica per council agent
Resources: ad.calculateCouncilResources(agent),
Environment: ad.buildCouncilAgentEnvironment(request, agent),
TaskContext: TaskContext{
Repository: request.ProjectContext.Repository,
IssueTitle: request.ProjectContext.ProjectName,
IssueDescription: request.ProjectContext.ProjectBrief,
Priority: "high", // Council formation is always high priority
ExternalURL: request.ProjectContext.ExternalURL,
},
Networks: []string{"chorus_default"}, // Connect to CHORUS network
Volumes: ad.buildCouncilAgentVolumes(request),
Placement: ad.buildCouncilAgentPlacement(agent),
}
// Deploy the service
service, err := ad.swarmManager.DeployAgent(config)
if err != nil {
return nil, fmt.Errorf("failed to deploy council agent service: %w", err)
}
// Create deployed agent result
deployedAgent := &council.DeployedCouncilAgent{
ServiceID: service.ID,
ServiceName: service.Spec.Name,
RoleName: agent.RoleName,
AgentID: agent.AgentID,
Image: image,
Status: "deploying",
DeployedAt: time.Now(),
}
return deployedAgent, nil
}
// buildCouncilAgentEnvironment creates environment variables for council agent configuration
func (ad *AgentDeployer) buildCouncilAgentEnvironment(request *CouncilDeploymentRequest, agent council.CouncilAgent) map[string]string {
env := map[string]string{
// Core CHORUS configuration for council mode
"CHORUS_AGENT_NAME": agent.RoleName, // Maps to human-roles.yaml agent definition
"CHORUS_COUNCIL_MODE": "true", // Enable council mode
"CHORUS_COUNCIL_ID": request.CouncilID.String(),
"CHORUS_PROJECT_NAME": request.ProjectContext.ProjectName,
// Council prompt and context
"CHORUS_COUNCIL_PROMPT": "/app/prompts/council.md",
"CHORUS_PROJECT_BRIEF": request.ProjectContext.ProjectBrief,
"CHORUS_CONSTRAINTS": request.ProjectContext.Constraints,
"CHORUS_TECH_LIMITS": request.ProjectContext.TechLimits,
"CHORUS_COMPLIANCE_NOTES": request.ProjectContext.ComplianceNotes,
"CHORUS_TARGETS": request.ProjectContext.Targets,
// Essential project context
"CHORUS_PROJECT": request.ProjectContext.Repository,
"CHORUS_EXTERNAL_URL": request.ProjectContext.ExternalURL,
"CHORUS_PRIORITY": "high",
// WHOOSH coordination
"WHOOSH_COORDINATOR": "true",
"WHOOSH_ENDPOINT": "http://whoosh:8080",
// Docker access for CHORUS sandbox management
"DOCKER_HOST": "unix:///var/run/docker.sock",
}
return env
}
// calculateCouncilResources determines resource requirements for council agents
func (ad *AgentDeployer) calculateCouncilResources(agent council.CouncilAgent) ResourceLimits {
// Council agents get slightly more resources since they handle complex analysis
return ResourceLimits{
CPULimit: 1500000000, // 1.5 CPU cores
MemoryLimit: 2147483648, // 2GB RAM
CPURequest: 750000000, // 0.75 CPU core
MemoryRequest: 1073741824, // 1GB RAM
}
}
// buildCouncilAgentVolumes creates volume mounts for council agents
func (ad *AgentDeployer) buildCouncilAgentVolumes(request *CouncilDeploymentRequest) []VolumeMount {
return []VolumeMount{
{
Type: "bind",
Source: "/var/run/docker.sock",
Target: "/var/run/docker.sock",
ReadOnly: false, // Council agents need Docker access for complex setup
},
{
Type: "volume",
Source: fmt.Sprintf("whoosh-council-%s", request.CouncilID.String()),
Target: "/workspace",
ReadOnly: false,
},
{
Type: "bind",
Source: "/rust/containers/WHOOSH/prompts",
Target: "/app/prompts",
ReadOnly: true, // Mount council prompts
},
}
}
// buildCouncilAgentPlacement creates placement constraints for council agents
func (ad *AgentDeployer) buildCouncilAgentPlacement(agent council.CouncilAgent) PlacementConfig {
return PlacementConfig{
Constraints: []string{
"node.role==worker", // Prefer worker nodes for council containers
},
}
}
// recordCouncilAgentDeployment records council agent deployment information in the database
func (ad *AgentDeployer) recordCouncilAgentDeployment(councilID uuid.UUID, agent council.CouncilAgent, serviceID string) error {
query := `
UPDATE council_agents
SET deployed = true, status = 'active', service_id = $1, deployed_at = NOW(), updated_at = NOW()
WHERE council_id = $2 AND agent_id = $3
`
_, err := ad.db.Exec(ad.ctx, query, serviceID, councilID, agent.AgentID)
return err
}
// updateCouncilDeploymentStatus updates the council deployment status in the database
func (ad *AgentDeployer) updateCouncilDeploymentStatus(councilID uuid.UUID, status, message string) error {
query := `
UPDATE councils
SET status = $1, updated_at = NOW()
WHERE id = $2
`
// Map deployment status to council status
councilStatus := "active"
if status == "failed" {
councilStatus = "failed"
} else if status == "partial" {
councilStatus = "active" // Partial deployment still allows council to function
}
_, err := ad.db.Exec(ad.ctx, query, councilStatus, councilID)
return err
}
// getAvailableChorusAgents gets available CHORUS agents from the registry
func (ad *AgentDeployer) getAvailableChorusAgents() ([]*agents.DatabaseAgent, error) {
// Create a registry instance to access available agents
registry := agents.NewRegistry(ad.db, nil) // No p2p discovery needed for querying
// Get available agents from the database
availableAgents, err := registry.GetAvailableAgents(ad.ctx)
if err != nil {
return nil, fmt.Errorf("failed to query available agents: %w", err)
}
log.Info().
Int("available_count", len(availableAgents)).
Msg("Retrieved available CHORUS agents from registry")
return availableAgents, nil
}
// assignRoleToChorusAgent assigns a council role to an available CHORUS agent
func (ad *AgentDeployer) assignRoleToChorusAgent(request *CouncilDeploymentRequest, councilAgent council.CouncilAgent, chorusAgent *agents.DatabaseAgent) (*council.DeployedCouncilAgent, error) {
// For now, we'll create a "virtual" assignment without actually deploying anything
// The CHORUS agents will receive role assignments via P2P messaging in a future implementation
// This approach uses the existing agent infrastructure instead of creating new services
log.Info().
Str("council_role", councilAgent.RoleName).
Str("chorus_agent_id", chorusAgent.ID.String()).
Str("chorus_agent_name", chorusAgent.Name).
Msg("🎯 Assigning council role to available CHORUS agent")
// Create a deployed agent record that represents the assignment
deployedAgent := &council.DeployedCouncilAgent{
ServiceID: fmt.Sprintf("assigned-%s", chorusAgent.ID.String()), // Virtual service ID
ServiceName: fmt.Sprintf("council-%s", councilAgent.RoleName),
RoleName: councilAgent.RoleName,
AgentID: chorusAgent.ID.String(), // Use the actual CHORUS agent ID
Image: "chorus:assigned", // Indicate this is an assignment, not a deployment
Status: "assigned", // Different from "deploying" to indicate assignment approach
DeployedAt: time.Now(),
}
// TODO: In a future implementation, send role assignment via P2P messaging
// This would involve:
// 1. Publishing a role assignment message to the P2P network
// 2. The target CHORUS agent receiving and acknowledging the assignment
// 3. The agent reconfiguring itself with the new council role
// 4. The agent updating its availability status to reflect the new role
log.Info().
Str("assignment_id", deployedAgent.ServiceID).
Str("role", deployedAgent.RoleName).
Str("agent", deployedAgent.AgentID).
Msg("✅ Council role assigned to CHORUS agent")
return deployedAgent, nil
}
// recordCouncilAgentAssignment records council agent assignment in the database
func (ad *AgentDeployer) recordCouncilAgentAssignment(councilID uuid.UUID, councilAgent council.CouncilAgent, chorusAgentID string) error {
query := `
UPDATE council_agents
SET deployed = true, status = 'assigned', service_id = $1, deployed_at = NOW(), updated_at = NOW()
WHERE council_id = $2 AND agent_id = $3
`
// Use the chorus agent ID as the "service ID" to track the assignment
assignmentID := fmt.Sprintf("assigned-%s", chorusAgentID)
retry := false
execUpdate := func() error {
_, err := ad.db.Exec(ad.ctx, query, assignmentID, councilID, councilAgent.AgentID)
return err
}
err := execUpdate()
if err != nil {
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "23514" {
retry = true
log.Warn().
Str("council_id", councilID.String()).
Str("role", councilAgent.RoleName).
Str("agent", councilAgent.AgentID).
Msg("Council agent assignment hit legacy status constraint attempting auto-remediation")
if ensureErr := ad.ensureCouncilAgentStatusConstraint(); ensureErr != nil {
return fmt.Errorf("failed to reconcile council agent status constraint: %w", ensureErr)
}
err = execUpdate()
}
}
if err != nil {
return fmt.Errorf("failed to record council agent assignment: %w", err)
}
if retry {
log.Info().
Str("council_id", councilID.String()).
Str("role", councilAgent.RoleName).
Msg("Council agent status constraint updated to support 'assigned' state")
}
log.Debug().
Str("council_id", councilID.String()).
Str("council_agent_id", councilAgent.AgentID).
Str("chorus_agent_id", chorusAgentID).
Str("role", councilAgent.RoleName).
Msg("Recorded council agent assignment in database")
return nil
}
func (ad *AgentDeployer) ensureCouncilAgentStatusConstraint() error {
ad.constraintMu.Lock()
defer ad.constraintMu.Unlock()
tx, err := ad.db.BeginTx(ad.ctx, pgx.TxOptions{})
if err != nil {
return fmt.Errorf("begin council agent status constraint update: %w", err)
}
dropStmt := `ALTER TABLE council_agents DROP CONSTRAINT IF EXISTS council_agents_status_check`
if _, err := tx.Exec(ad.ctx, dropStmt); err != nil {
tx.Rollback(ad.ctx)
return fmt.Errorf("drop council agent status constraint: %w", err)
}
addStmt := `ALTER TABLE council_agents ADD CONSTRAINT council_agents_status_check CHECK (status IN ('pending', 'deploying', 'assigned', 'active', 'failed', 'removed'))`
if _, err := tx.Exec(ad.ctx, addStmt); err != nil {
tx.Rollback(ad.ctx)
if pgErr, ok := err.(*pgconn.PgError); ok && pgErr.Code == "42710" {
// Constraint already exists with desired definition; treat as success.
return nil
}
return fmt.Errorf("add council agent status constraint: %w", err)
}
if err := tx.Commit(ad.ctx); err != nil {
return fmt.Errorf("commit council agent status constraint update: %w", err)
}
return nil
}