package orchestrator import ( "context" "encoding/json" "fmt" "io" "time" "github.com/docker/docker/api/types" "github.com/docker/docker/api/types/container" "github.com/docker/docker/api/types/filters" "github.com/docker/docker/api/types/mount" "github.com/docker/docker/api/types/swarm" "github.com/docker/docker/client" "github.com/rs/zerolog/log" "go.opentelemetry.io/otel/attribute" "github.com/chorus-services/whoosh/internal/tracing" ) // SwarmManager manages Docker Swarm services for agent deployment type SwarmManager struct { client *client.Client ctx context.Context cancel context.CancelFunc registry string // Docker registry for agent images } // NewSwarmManager creates a new Docker Swarm manager func NewSwarmManager(dockerHost, registry string) (*SwarmManager, error) { ctx, cancel := context.WithCancel(context.Background()) // Create Docker client var dockerClient *client.Client var err error if dockerHost != "" { dockerClient, err = client.NewClientWithOpts( client.WithHost(dockerHost), client.WithAPIVersionNegotiation(), ) } else { dockerClient, err = client.NewClientWithOpts( client.FromEnv, client.WithAPIVersionNegotiation(), ) } if err != nil { cancel() return nil, fmt.Errorf("failed to create Docker client: %w", err) } // Test connection _, err = dockerClient.Ping(ctx) if err != nil { cancel() return nil, fmt.Errorf("failed to connect to Docker daemon: %w", err) } if registry == "" { registry = "registry.home.deepblack.cloud" // Default private registry } return &SwarmManager{ client: dockerClient, ctx: ctx, cancel: cancel, registry: registry, }, nil } // Close closes the Docker client and cancels context func (sm *SwarmManager) Close() error { sm.cancel() return sm.client.Close() } // AgentDeploymentConfig defines configuration for deploying an agent type AgentDeploymentConfig struct { TeamID string `json:"team_id"` TaskID string `json:"task_id"` AgentRole string `json:"agent_role"` // executor, coordinator, reviewer AgentType string `json:"agent_type"` // general, specialized Image string `json:"image"` // Docker image to use Replicas uint64 `json:"replicas"` // Number of instances Resources ResourceLimits `json:"resources"` // CPU/Memory limits Environment map[string]string `json:"environment"` // Environment variables TaskContext TaskContext `json:"task_context"` // Task-specific context Networks []string `json:"networks"` // Docker networks to join Volumes []VolumeMount `json:"volumes"` // Volume mounts Placement PlacementConfig `json:"placement"` // Node placement constraints GoalID string `json:"goal_id,omitempty"` PulseID string `json:"pulse_id,omitempty"` } // ResourceLimits defines CPU and memory limits for containers type ResourceLimits struct { CPULimit int64 `json:"cpu_limit"` // CPU limit in nano CPUs (1e9 = 1 CPU) MemoryLimit int64 `json:"memory_limit"` // Memory limit in bytes CPURequest int64 `json:"cpu_request"` // CPU request in nano CPUs MemoryRequest int64 `json:"memory_request"` // Memory request in bytes } // TaskContext provides task-specific information to agents type TaskContext struct { IssueTitle string `json:"issue_title"` IssueDescription string `json:"issue_description"` Repository string `json:"repository"` TechStack []string `json:"tech_stack"` Requirements []string `json:"requirements"` Priority string `json:"priority"` ExternalURL string `json:"external_url"` Metadata map[string]interface{} `json:"metadata"` } // VolumeMount defines a volume mount for containers type VolumeMount struct { Source string `json:"source"` // Host path or volume name Target string `json:"target"` // Container path ReadOnly bool `json:"readonly"` // Read-only mount Type string `json:"type"` // bind, volume, tmpfs } // PlacementConfig defines where containers should be placed type PlacementConfig struct { Constraints []string `json:"constraints"` // Node constraints Preferences []PlacementPref `json:"preferences"` // Placement preferences Platforms []Platform `json:"platforms"` // Target platforms } // PlacementPref defines placement preferences type PlacementPref struct { Spread string `json:"spread"` // Spread across nodes } // Platform defines target platform for containers type Platform struct { Architecture string `json:"architecture"` // amd64, arm64, etc. OS string `json:"os"` // linux, windows } // DeployAgent deploys an agent service to Docker Swarm func (sm *SwarmManager) DeployAgent(config *AgentDeploymentConfig) (*swarm.Service, error) { ctx, span := tracing.StartDeploymentSpan(sm.ctx, "deploy_agent", config.AgentRole) defer span.End() // Add tracing attributes span.SetAttributes( attribute.String("agent.team_id", config.TeamID), attribute.String("agent.task_id", config.TaskID), attribute.String("agent.role", config.AgentRole), attribute.String("agent.type", config.AgentType), attribute.String("agent.image", config.Image), ) // Add goal.id and pulse.id if available in config if config.GoalID != "" { span.SetAttributes(attribute.String("goal.id", config.GoalID)) } if config.PulseID != "" { span.SetAttributes(attribute.String("pulse.id", config.PulseID)) } log.Info(). Str("team_id", config.TeamID). Str("task_id", config.TaskID). Str("agent_role", config.AgentRole). Str("image", config.Image). Msg("🚀 Deploying agent to Docker Swarm") // Generate unique service name serviceName := fmt.Sprintf("whoosh-agent-%s-%s-%s", config.TeamID[:8], config.TaskID[:8], config.AgentRole, ) // Build environment variables env := sm.buildEnvironment(config) // Build volume mounts mounts := sm.buildMounts(config.Volumes) // Build resource specifications resources := sm.buildResources(config.Resources) // Build placement constraints placement := sm.buildPlacement(config.Placement) // Create service specification serviceSpec := swarm.ServiceSpec{ Annotations: swarm.Annotations{ Name: serviceName, Labels: map[string]string{ "whoosh.team_id": config.TeamID, "whoosh.task_id": config.TaskID, "whoosh.agent_role": config.AgentRole, "whoosh.agent_type": config.AgentType, "whoosh.managed_by": "whoosh", "whoosh.created_at": time.Now().Format(time.RFC3339), }, }, TaskTemplate: swarm.TaskSpec{ ContainerSpec: &swarm.ContainerSpec{ Image: config.Image, Env: env, Mounts: mounts, Labels: map[string]string{ "whoosh.team_id": config.TeamID, "whoosh.task_id": config.TaskID, "whoosh.agent_role": config.AgentRole, }, // Add healthcheck Healthcheck: &container.HealthConfig{ Test: []string{"CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"}, Interval: 30 * time.Second, Timeout: 10 * time.Second, Retries: 3, }, }, Resources: resources, Placement: placement, Networks: sm.buildNetworks(config.Networks), }, Mode: swarm.ServiceMode{ Replicated: &swarm.ReplicatedService{ Replicas: &config.Replicas, }, }, UpdateConfig: &swarm.UpdateConfig{ Parallelism: 1, Order: "start-first", }, // RollbackConfig removed for compatibility } // Create the service response, err := sm.client.ServiceCreate(ctx, serviceSpec, types.ServiceCreateOptions{}) if err != nil { tracing.SetSpanError(span, err) span.SetAttributes( attribute.String("deployment.status", "failed"), attribute.String("deployment.service_name", serviceName), ) return nil, fmt.Errorf("failed to create agent service: %w", err) } // Add success metrics to span span.SetAttributes( attribute.String("deployment.status", "success"), attribute.String("deployment.service_id", response.ID), attribute.String("deployment.service_name", serviceName), attribute.Int64("deployment.replicas", int64(config.Replicas)), ) log.Info(). Str("service_id", response.ID). Str("service_name", serviceName). Msg("✅ Agent service created successfully") // Wait for service to be created and return service info service, _, err := sm.client.ServiceInspectWithRaw(sm.ctx, response.ID, types.ServiceInspectOptions{}) if err != nil { return nil, fmt.Errorf("failed to inspect created service: %w", err) } return &service, nil } // buildEnvironment constructs environment variables for the container func (sm *SwarmManager) buildEnvironment(config *AgentDeploymentConfig) []string { env := []string{ fmt.Sprintf("WHOOSH_TEAM_ID=%s", config.TeamID), fmt.Sprintf("WHOOSH_TASK_ID=%s", config.TaskID), fmt.Sprintf("WHOOSH_AGENT_ROLE=%s", config.AgentRole), fmt.Sprintf("WHOOSH_AGENT_TYPE=%s", config.AgentType), } // Add task context as environment variables if config.TaskContext.IssueTitle != "" { env = append(env, fmt.Sprintf("TASK_TITLE=%s", config.TaskContext.IssueTitle)) } if config.TaskContext.Repository != "" { env = append(env, fmt.Sprintf("TASK_REPOSITORY=%s", config.TaskContext.Repository)) } if config.TaskContext.Priority != "" { env = append(env, fmt.Sprintf("TASK_PRIORITY=%s", config.TaskContext.Priority)) } if config.TaskContext.ExternalURL != "" { env = append(env, fmt.Sprintf("TASK_EXTERNAL_URL=%s", config.TaskContext.ExternalURL)) } // Add tech stack as JSON if len(config.TaskContext.TechStack) > 0 { techStackJSON, _ := json.Marshal(config.TaskContext.TechStack) env = append(env, fmt.Sprintf("TASK_TECH_STACK=%s", string(techStackJSON))) } // Add requirements as JSON if len(config.TaskContext.Requirements) > 0 { requirementsJSON, _ := json.Marshal(config.TaskContext.Requirements) env = append(env, fmt.Sprintf("TASK_REQUIREMENTS=%s", string(requirementsJSON))) } // Add custom environment variables for key, value := range config.Environment { env = append(env, fmt.Sprintf("%s=%s", key, value)) } return env } // buildMounts constructs volume mounts for the container func (sm *SwarmManager) buildMounts(volumes []VolumeMount) []mount.Mount { mounts := make([]mount.Mount, len(volumes)) for i, vol := range volumes { mountType := mount.TypeBind switch vol.Type { case "volume": mountType = mount.TypeVolume case "tmpfs": mountType = mount.TypeTmpfs } mounts[i] = mount.Mount{ Type: mountType, Source: vol.Source, Target: vol.Target, ReadOnly: vol.ReadOnly, } } // Add default workspace volume mounts = append(mounts, mount.Mount{ Type: mount.TypeVolume, Source: fmt.Sprintf("whoosh-workspace"), // Shared workspace volume Target: "/workspace", ReadOnly: false, }) return mounts } // buildResources constructs resource specifications func (sm *SwarmManager) buildResources(limits ResourceLimits) *swarm.ResourceRequirements { resources := &swarm.ResourceRequirements{} // Set limits if limits.CPULimit > 0 || limits.MemoryLimit > 0 { resources.Limits = &swarm.Limit{} if limits.CPULimit > 0 { resources.Limits.NanoCPUs = limits.CPULimit } if limits.MemoryLimit > 0 { resources.Limits.MemoryBytes = limits.MemoryLimit } } // Set requests/reservations if limits.CPURequest > 0 || limits.MemoryRequest > 0 { resources.Reservations = &swarm.Resources{} if limits.CPURequest > 0 { resources.Reservations.NanoCPUs = limits.CPURequest } if limits.MemoryRequest > 0 { resources.Reservations.MemoryBytes = limits.MemoryRequest } } return resources } // buildPlacement constructs placement specifications func (sm *SwarmManager) buildPlacement(config PlacementConfig) *swarm.Placement { placement := &swarm.Placement{ Constraints: config.Constraints, } // Add preferences for _, pref := range config.Preferences { placement.Preferences = append(placement.Preferences, swarm.PlacementPreference{ Spread: &swarm.SpreadOver{ SpreadDescriptor: pref.Spread, }, }) } // Add platforms for _, platform := range config.Platforms { placement.Platforms = append(placement.Platforms, swarm.Platform{ Architecture: platform.Architecture, OS: platform.OS, }) } return placement } // buildNetworks constructs network specifications func (sm *SwarmManager) buildNetworks(networks []string) []swarm.NetworkAttachmentConfig { if len(networks) == 0 { // Default to chorus_default network networks = []string{"chorus_default"} } networkConfigs := make([]swarm.NetworkAttachmentConfig, len(networks)) for i, networkName := range networks { networkConfigs[i] = swarm.NetworkAttachmentConfig{ Target: networkName, } } return networkConfigs } // RemoveAgent removes an agent service from Docker Swarm func (sm *SwarmManager) RemoveAgent(serviceID string) error { log.Info(). Str("service_id", serviceID). Msg("🗑️ Removing agent service from Docker Swarm") err := sm.client.ServiceRemove(sm.ctx, serviceID) if err != nil { return fmt.Errorf("failed to remove service: %w", err) } log.Info(). Str("service_id", serviceID). Msg("✅ Agent service removed successfully") return nil } // ListAgentServices lists all agent services managed by WHOOSH func (sm *SwarmManager) ListAgentServices() ([]swarm.Service, error) { services, err := sm.client.ServiceList(sm.ctx, types.ServiceListOptions{ Filters: filters.NewArgs(), }) if err != nil { return nil, fmt.Errorf("failed to list services: %w", err) } // Filter for WHOOSH-managed services var agentServices []swarm.Service for _, service := range services { if managed, exists := service.Spec.Labels["whoosh.managed_by"]; exists && managed == "whoosh" { agentServices = append(agentServices, service) } } return agentServices, nil } // GetServiceLogs retrieves logs for a service func (sm *SwarmManager) GetServiceLogs(serviceID string, lines int) (string, error) { // Create logs options struct inline to avoid import issues options := struct { ShowStdout bool ShowStderr bool Since string Until string Timestamps bool Follow bool Tail string Details bool }{ ShowStdout: true, ShowStderr: true, Tail: fmt.Sprintf("%d", lines), Timestamps: true, } reader, err := sm.client.ServiceLogs(sm.ctx, serviceID, options) if err != nil { return "", fmt.Errorf("failed to get service logs: %w", err) } defer reader.Close() logs, err := io.ReadAll(reader) if err != nil { return "", fmt.Errorf("failed to read service logs: %w", err) } return string(logs), nil } // ScaleService scales a service to the specified number of replicas func (sm *SwarmManager) ScaleService(serviceID string, replicas uint64) error { log.Info(). Str("service_id", serviceID). Uint64("replicas", replicas). Msg("📈 Scaling agent service") // Get current service spec service, _, err := sm.client.ServiceInspectWithRaw(sm.ctx, serviceID, types.ServiceInspectOptions{}) if err != nil { return fmt.Errorf("failed to inspect service: %w", err) } // Update replicas service.Spec.Mode.Replicated.Replicas = &replicas // Update the service _, err = sm.client.ServiceUpdate(sm.ctx, serviceID, service.Version, service.Spec, types.ServiceUpdateOptions{}) if err != nil { return fmt.Errorf("failed to scale service: %w", err) } log.Info(). Str("service_id", serviceID). Uint64("replicas", replicas). Msg("✅ Service scaled successfully") return nil } // GetServiceStatus returns the current status of a service func (sm *SwarmManager) GetServiceStatus(serviceID string) (*ServiceStatus, error) { service, _, err := sm.client.ServiceInspectWithRaw(sm.ctx, serviceID, types.ServiceInspectOptions{}) if err != nil { return nil, fmt.Errorf("failed to inspect service: %w", err) } // Get task status tasks, err := sm.client.TaskList(sm.ctx, types.TaskListOptions{ Filters: filters.NewArgs(filters.Arg("service", serviceID)), }) if err != nil { return nil, fmt.Errorf("failed to list tasks: %w", err) } status := &ServiceStatus{ ServiceID: serviceID, ServiceName: service.Spec.Name, Image: service.Spec.TaskTemplate.ContainerSpec.Image, Replicas: 0, RunningTasks: 0, FailedTasks: 0, TaskStates: make(map[string]int), CreatedAt: service.CreatedAt, UpdatedAt: service.UpdatedAt, } if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil { status.Replicas = *service.Spec.Mode.Replicated.Replicas } // Count task states for _, task := range tasks { state := string(task.Status.State) status.TaskStates[state]++ switch task.Status.State { case swarm.TaskStateRunning: status.RunningTasks++ case swarm.TaskStateFailed: status.FailedTasks++ } } return status, nil } // ServiceStatus represents the current status of a service type ServiceStatus struct { ServiceID string `json:"service_id"` ServiceName string `json:"service_name"` Image string `json:"image"` Replicas uint64 `json:"replicas"` RunningTasks uint64 `json:"running_tasks"` FailedTasks uint64 `json:"failed_tasks"` TaskStates map[string]int `json:"task_states"` CreatedAt time.Time `json:"created_at"` UpdatedAt time.Time `json:"updated_at"` } // CleanupFailedServices removes failed services func (sm *SwarmManager) CleanupFailedServices() error { services, err := sm.ListAgentServices() if err != nil { return fmt.Errorf("failed to list services: %w", err) } for _, service := range services { status, err := sm.GetServiceStatus(service.ID) if err != nil { log.Error(). Err(err). Str("service_id", service.ID). Msg("Failed to get service status") continue } // Remove services with all failed tasks and no running tasks if status.FailedTasks > 0 && status.RunningTasks == 0 { log.Warn(). Str("service_id", service.ID). Str("service_name", service.Spec.Name). Uint64("failed_tasks", status.FailedTasks). Msg("Removing failed service") err = sm.RemoveAgent(service.ID) if err != nil { log.Error(). Err(err). Str("service_id", service.ID). Msg("Failed to remove failed service") } } } return nil }