Files
WHOOSH/internal/orchestrator/swarm_manager.go
Claude Code 9f57e48cef
Some checks failed
WHOOSH CI / speclint (push) Has been cancelled
WHOOSH CI / contracts (push) Has been cancelled
WHOOSH CI / speclint (pull_request) Has been cancelled
WHOOSH CI / contracts (pull_request) Has been cancelled
fix: resolve Docker Client API compilation error in swarm_manager.go
Fixed undefined types.ContainerLogsOptions error by using an inline struct
that matches the Docker API interface. This resolves compilation issues
with the GetServiceLogs method while maintaining compatibility with the
existing Docker v24.0.7 dependency.

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-21 21:46:54 +10:00

616 lines
18 KiB
Go

package orchestrator
import (
"context"
"encoding/json"
"fmt"
"io"
"time"
"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/filters"
"github.com/docker/docker/api/types/mount"
"github.com/docker/docker/api/types/swarm"
"github.com/docker/docker/client"
"github.com/rs/zerolog/log"
"go.opentelemetry.io/otel/attribute"
"github.com/chorus-services/whoosh/internal/tracing"
)
// SwarmManager manages Docker Swarm services for agent deployment
type SwarmManager struct {
client *client.Client
ctx context.Context
cancel context.CancelFunc
registry string // Docker registry for agent images
}
// NewSwarmManager creates a new Docker Swarm manager
func NewSwarmManager(dockerHost, registry string) (*SwarmManager, error) {
ctx, cancel := context.WithCancel(context.Background())
// Create Docker client
var dockerClient *client.Client
var err error
if dockerHost != "" {
dockerClient, err = client.NewClientWithOpts(
client.WithHost(dockerHost),
client.WithAPIVersionNegotiation(),
)
} else {
dockerClient, err = client.NewClientWithOpts(
client.FromEnv,
client.WithAPIVersionNegotiation(),
)
}
if err != nil {
cancel()
return nil, fmt.Errorf("failed to create Docker client: %w", err)
}
// Test connection
_, err = dockerClient.Ping(ctx)
if err != nil {
cancel()
return nil, fmt.Errorf("failed to connect to Docker daemon: %w", err)
}
if registry == "" {
registry = "registry.home.deepblack.cloud" // Default private registry
}
return &SwarmManager{
client: dockerClient,
ctx: ctx,
cancel: cancel,
registry: registry,
}, nil
}
// Close closes the Docker client and cancels context
func (sm *SwarmManager) Close() error {
sm.cancel()
return sm.client.Close()
}
// AgentDeploymentConfig defines configuration for deploying an agent
type AgentDeploymentConfig struct {
TeamID string `json:"team_id"`
TaskID string `json:"task_id"`
AgentRole string `json:"agent_role"` // executor, coordinator, reviewer
AgentType string `json:"agent_type"` // general, specialized
Image string `json:"image"` // Docker image to use
Replicas uint64 `json:"replicas"` // Number of instances
Resources ResourceLimits `json:"resources"` // CPU/Memory limits
Environment map[string]string `json:"environment"` // Environment variables
TaskContext TaskContext `json:"task_context"` // Task-specific context
Networks []string `json:"networks"` // Docker networks to join
Volumes []VolumeMount `json:"volumes"` // Volume mounts
Placement PlacementConfig `json:"placement"` // Node placement constraints
GoalID string `json:"goal_id,omitempty"`
PulseID string `json:"pulse_id,omitempty"`
}
// ResourceLimits defines CPU and memory limits for containers
type ResourceLimits struct {
CPULimit int64 `json:"cpu_limit"` // CPU limit in nano CPUs (1e9 = 1 CPU)
MemoryLimit int64 `json:"memory_limit"` // Memory limit in bytes
CPURequest int64 `json:"cpu_request"` // CPU request in nano CPUs
MemoryRequest int64 `json:"memory_request"` // Memory request in bytes
}
// TaskContext provides task-specific information to agents
type TaskContext struct {
IssueTitle string `json:"issue_title"`
IssueDescription string `json:"issue_description"`
Repository string `json:"repository"`
TechStack []string `json:"tech_stack"`
Requirements []string `json:"requirements"`
Priority string `json:"priority"`
ExternalURL string `json:"external_url"`
Metadata map[string]interface{} `json:"metadata"`
}
// VolumeMount defines a volume mount for containers
type VolumeMount struct {
Source string `json:"source"` // Host path or volume name
Target string `json:"target"` // Container path
ReadOnly bool `json:"readonly"` // Read-only mount
Type string `json:"type"` // bind, volume, tmpfs
}
// PlacementConfig defines where containers should be placed
type PlacementConfig struct {
Constraints []string `json:"constraints"` // Node constraints
Preferences []PlacementPref `json:"preferences"` // Placement preferences
Platforms []Platform `json:"platforms"` // Target platforms
}
// PlacementPref defines placement preferences
type PlacementPref struct {
Spread string `json:"spread"` // Spread across nodes
}
// Platform defines target platform for containers
type Platform struct {
Architecture string `json:"architecture"` // amd64, arm64, etc.
OS string `json:"os"` // linux, windows
}
// DeployAgent deploys an agent service to Docker Swarm
func (sm *SwarmManager) DeployAgent(config *AgentDeploymentConfig) (*swarm.Service, error) {
ctx, span := tracing.StartDeploymentSpan(sm.ctx, "deploy_agent", config.AgentRole)
defer span.End()
// Add tracing attributes
span.SetAttributes(
attribute.String("agent.team_id", config.TeamID),
attribute.String("agent.task_id", config.TaskID),
attribute.String("agent.role", config.AgentRole),
attribute.String("agent.type", config.AgentType),
attribute.String("agent.image", config.Image),
)
// Add goal.id and pulse.id if available in config
if config.GoalID != "" {
span.SetAttributes(attribute.String("goal.id", config.GoalID))
}
if config.PulseID != "" {
span.SetAttributes(attribute.String("pulse.id", config.PulseID))
}
log.Info().
Str("team_id", config.TeamID).
Str("task_id", config.TaskID).
Str("agent_role", config.AgentRole).
Str("image", config.Image).
Msg("🚀 Deploying agent to Docker Swarm")
// Generate unique service name
serviceName := fmt.Sprintf("whoosh-agent-%s-%s-%s",
config.TeamID[:8],
config.TaskID[:8],
config.AgentRole,
)
// Build environment variables
env := sm.buildEnvironment(config)
// Build volume mounts
mounts := sm.buildMounts(config.Volumes)
// Build resource specifications
resources := sm.buildResources(config.Resources)
// Build placement constraints
placement := sm.buildPlacement(config.Placement)
// Create service specification
serviceSpec := swarm.ServiceSpec{
Annotations: swarm.Annotations{
Name: serviceName,
Labels: map[string]string{
"whoosh.team_id": config.TeamID,
"whoosh.task_id": config.TaskID,
"whoosh.agent_role": config.AgentRole,
"whoosh.agent_type": config.AgentType,
"whoosh.managed_by": "whoosh",
"whoosh.created_at": time.Now().Format(time.RFC3339),
},
},
TaskTemplate: swarm.TaskSpec{
ContainerSpec: &swarm.ContainerSpec{
Image: config.Image,
Env: env,
Mounts: mounts,
Labels: map[string]string{
"whoosh.team_id": config.TeamID,
"whoosh.task_id": config.TaskID,
"whoosh.agent_role": config.AgentRole,
},
// Add healthcheck
Healthcheck: &container.HealthConfig{
Test: []string{"CMD-SHELL", "curl -f http://localhost:8080/health || exit 1"},
Interval: 30 * time.Second,
Timeout: 10 * time.Second,
Retries: 3,
},
},
Resources: resources,
Placement: placement,
Networks: sm.buildNetworks(config.Networks),
},
Mode: swarm.ServiceMode{
Replicated: &swarm.ReplicatedService{
Replicas: &config.Replicas,
},
},
UpdateConfig: &swarm.UpdateConfig{
Parallelism: 1,
Order: "start-first",
},
// RollbackConfig removed for compatibility
}
// Create the service
response, err := sm.client.ServiceCreate(ctx, serviceSpec, types.ServiceCreateOptions{})
if err != nil {
tracing.SetSpanError(span, err)
span.SetAttributes(
attribute.String("deployment.status", "failed"),
attribute.String("deployment.service_name", serviceName),
)
return nil, fmt.Errorf("failed to create agent service: %w", err)
}
// Add success metrics to span
span.SetAttributes(
attribute.String("deployment.status", "success"),
attribute.String("deployment.service_id", response.ID),
attribute.String("deployment.service_name", serviceName),
attribute.Int64("deployment.replicas", int64(config.Replicas)),
)
log.Info().
Str("service_id", response.ID).
Str("service_name", serviceName).
Msg("✅ Agent service created successfully")
// Wait for service to be created and return service info
service, _, err := sm.client.ServiceInspectWithRaw(sm.ctx, response.ID, types.ServiceInspectOptions{})
if err != nil {
return nil, fmt.Errorf("failed to inspect created service: %w", err)
}
return &service, nil
}
// buildEnvironment constructs environment variables for the container
func (sm *SwarmManager) buildEnvironment(config *AgentDeploymentConfig) []string {
env := []string{
fmt.Sprintf("WHOOSH_TEAM_ID=%s", config.TeamID),
fmt.Sprintf("WHOOSH_TASK_ID=%s", config.TaskID),
fmt.Sprintf("WHOOSH_AGENT_ROLE=%s", config.AgentRole),
fmt.Sprintf("WHOOSH_AGENT_TYPE=%s", config.AgentType),
}
// Add task context as environment variables
if config.TaskContext.IssueTitle != "" {
env = append(env, fmt.Sprintf("TASK_TITLE=%s", config.TaskContext.IssueTitle))
}
if config.TaskContext.Repository != "" {
env = append(env, fmt.Sprintf("TASK_REPOSITORY=%s", config.TaskContext.Repository))
}
if config.TaskContext.Priority != "" {
env = append(env, fmt.Sprintf("TASK_PRIORITY=%s", config.TaskContext.Priority))
}
if config.TaskContext.ExternalURL != "" {
env = append(env, fmt.Sprintf("TASK_EXTERNAL_URL=%s", config.TaskContext.ExternalURL))
}
// Add tech stack as JSON
if len(config.TaskContext.TechStack) > 0 {
techStackJSON, _ := json.Marshal(config.TaskContext.TechStack)
env = append(env, fmt.Sprintf("TASK_TECH_STACK=%s", string(techStackJSON)))
}
// Add requirements as JSON
if len(config.TaskContext.Requirements) > 0 {
requirementsJSON, _ := json.Marshal(config.TaskContext.Requirements)
env = append(env, fmt.Sprintf("TASK_REQUIREMENTS=%s", string(requirementsJSON)))
}
// Add custom environment variables
for key, value := range config.Environment {
env = append(env, fmt.Sprintf("%s=%s", key, value))
}
return env
}
// buildMounts constructs volume mounts for the container
func (sm *SwarmManager) buildMounts(volumes []VolumeMount) []mount.Mount {
mounts := make([]mount.Mount, len(volumes))
for i, vol := range volumes {
mountType := mount.TypeBind
switch vol.Type {
case "volume":
mountType = mount.TypeVolume
case "tmpfs":
mountType = mount.TypeTmpfs
}
mounts[i] = mount.Mount{
Type: mountType,
Source: vol.Source,
Target: vol.Target,
ReadOnly: vol.ReadOnly,
}
}
// Add default workspace volume
mounts = append(mounts, mount.Mount{
Type: mount.TypeVolume,
Source: fmt.Sprintf("whoosh-workspace"), // Shared workspace volume
Target: "/workspace",
ReadOnly: false,
})
return mounts
}
// buildResources constructs resource specifications
func (sm *SwarmManager) buildResources(limits ResourceLimits) *swarm.ResourceRequirements {
resources := &swarm.ResourceRequirements{}
// Set limits
if limits.CPULimit > 0 || limits.MemoryLimit > 0 {
resources.Limits = &swarm.Limit{}
if limits.CPULimit > 0 {
resources.Limits.NanoCPUs = limits.CPULimit
}
if limits.MemoryLimit > 0 {
resources.Limits.MemoryBytes = limits.MemoryLimit
}
}
// Set requests/reservations
if limits.CPURequest > 0 || limits.MemoryRequest > 0 {
resources.Reservations = &swarm.Resources{}
if limits.CPURequest > 0 {
resources.Reservations.NanoCPUs = limits.CPURequest
}
if limits.MemoryRequest > 0 {
resources.Reservations.MemoryBytes = limits.MemoryRequest
}
}
return resources
}
// buildPlacement constructs placement specifications
func (sm *SwarmManager) buildPlacement(config PlacementConfig) *swarm.Placement {
placement := &swarm.Placement{
Constraints: config.Constraints,
}
// Add preferences
for _, pref := range config.Preferences {
placement.Preferences = append(placement.Preferences, swarm.PlacementPreference{
Spread: &swarm.SpreadOver{
SpreadDescriptor: pref.Spread,
},
})
}
// Add platforms
for _, platform := range config.Platforms {
placement.Platforms = append(placement.Platforms, swarm.Platform{
Architecture: platform.Architecture,
OS: platform.OS,
})
}
return placement
}
// buildNetworks constructs network specifications
func (sm *SwarmManager) buildNetworks(networks []string) []swarm.NetworkAttachmentConfig {
if len(networks) == 0 {
// Default to chorus_default network
networks = []string{"chorus_default"}
}
networkConfigs := make([]swarm.NetworkAttachmentConfig, len(networks))
for i, networkName := range networks {
networkConfigs[i] = swarm.NetworkAttachmentConfig{
Target: networkName,
}
}
return networkConfigs
}
// RemoveAgent removes an agent service from Docker Swarm
func (sm *SwarmManager) RemoveAgent(serviceID string) error {
log.Info().
Str("service_id", serviceID).
Msg("🗑️ Removing agent service from Docker Swarm")
err := sm.client.ServiceRemove(sm.ctx, serviceID)
if err != nil {
return fmt.Errorf("failed to remove service: %w", err)
}
log.Info().
Str("service_id", serviceID).
Msg("✅ Agent service removed successfully")
return nil
}
// ListAgentServices lists all agent services managed by WHOOSH
func (sm *SwarmManager) ListAgentServices() ([]swarm.Service, error) {
services, err := sm.client.ServiceList(sm.ctx, types.ServiceListOptions{
Filters: filters.NewArgs(),
})
if err != nil {
return nil, fmt.Errorf("failed to list services: %w", err)
}
// Filter for WHOOSH-managed services
var agentServices []swarm.Service
for _, service := range services {
if managed, exists := service.Spec.Labels["whoosh.managed_by"]; exists && managed == "whoosh" {
agentServices = append(agentServices, service)
}
}
return agentServices, nil
}
// GetServiceLogs retrieves logs for a service
func (sm *SwarmManager) GetServiceLogs(serviceID string, lines int) (string, error) {
// Create logs options struct inline to avoid import issues
options := struct {
ShowStdout bool
ShowStderr bool
Since string
Until string
Timestamps bool
Follow bool
Tail string
Details bool
}{
ShowStdout: true,
ShowStderr: true,
Tail: fmt.Sprintf("%d", lines),
Timestamps: true,
}
reader, err := sm.client.ServiceLogs(sm.ctx, serviceID, options)
if err != nil {
return "", fmt.Errorf("failed to get service logs: %w", err)
}
defer reader.Close()
logs, err := io.ReadAll(reader)
if err != nil {
return "", fmt.Errorf("failed to read service logs: %w", err)
}
return string(logs), nil
}
// ScaleService scales a service to the specified number of replicas
func (sm *SwarmManager) ScaleService(serviceID string, replicas uint64) error {
log.Info().
Str("service_id", serviceID).
Uint64("replicas", replicas).
Msg("📈 Scaling agent service")
// Get current service spec
service, _, err := sm.client.ServiceInspectWithRaw(sm.ctx, serviceID, types.ServiceInspectOptions{})
if err != nil {
return fmt.Errorf("failed to inspect service: %w", err)
}
// Update replicas
service.Spec.Mode.Replicated.Replicas = &replicas
// Update the service
_, err = sm.client.ServiceUpdate(sm.ctx, serviceID, service.Version, service.Spec, types.ServiceUpdateOptions{})
if err != nil {
return fmt.Errorf("failed to scale service: %w", err)
}
log.Info().
Str("service_id", serviceID).
Uint64("replicas", replicas).
Msg("✅ Service scaled successfully")
return nil
}
// GetServiceStatus returns the current status of a service
func (sm *SwarmManager) GetServiceStatus(serviceID string) (*ServiceStatus, error) {
service, _, err := sm.client.ServiceInspectWithRaw(sm.ctx, serviceID, types.ServiceInspectOptions{})
if err != nil {
return nil, fmt.Errorf("failed to inspect service: %w", err)
}
// Get task status
tasks, err := sm.client.TaskList(sm.ctx, types.TaskListOptions{
Filters: filters.NewArgs(filters.Arg("service", serviceID)),
})
if err != nil {
return nil, fmt.Errorf("failed to list tasks: %w", err)
}
status := &ServiceStatus{
ServiceID: serviceID,
ServiceName: service.Spec.Name,
Image: service.Spec.TaskTemplate.ContainerSpec.Image,
Replicas: 0,
RunningTasks: 0,
FailedTasks: 0,
TaskStates: make(map[string]int),
CreatedAt: service.CreatedAt,
UpdatedAt: service.UpdatedAt,
}
if service.Spec.Mode.Replicated != nil && service.Spec.Mode.Replicated.Replicas != nil {
status.Replicas = *service.Spec.Mode.Replicated.Replicas
}
// Count task states
for _, task := range tasks {
state := string(task.Status.State)
status.TaskStates[state]++
switch task.Status.State {
case swarm.TaskStateRunning:
status.RunningTasks++
case swarm.TaskStateFailed:
status.FailedTasks++
}
}
return status, nil
}
// ServiceStatus represents the current status of a service
type ServiceStatus struct {
ServiceID string `json:"service_id"`
ServiceName string `json:"service_name"`
Image string `json:"image"`
Replicas uint64 `json:"replicas"`
RunningTasks uint64 `json:"running_tasks"`
FailedTasks uint64 `json:"failed_tasks"`
TaskStates map[string]int `json:"task_states"`
CreatedAt time.Time `json:"created_at"`
UpdatedAt time.Time `json:"updated_at"`
}
// CleanupFailedServices removes failed services
func (sm *SwarmManager) CleanupFailedServices() error {
services, err := sm.ListAgentServices()
if err != nil {
return fmt.Errorf("failed to list services: %w", err)
}
for _, service := range services {
status, err := sm.GetServiceStatus(service.ID)
if err != nil {
log.Error().
Err(err).
Str("service_id", service.ID).
Msg("Failed to get service status")
continue
}
// Remove services with all failed tasks and no running tasks
if status.FailedTasks > 0 && status.RunningTasks == 0 {
log.Warn().
Str("service_id", service.ID).
Str("service_name", service.Spec.Name).
Uint64("failed_tasks", status.FailedTasks).
Msg("Removing failed service")
err = sm.RemoveAgent(service.ID)
if err != nil {
log.Error().
Err(err).
Str("service_id", service.ID).
Msg("Failed to remove failed service")
}
}
}
return nil
}