Integrate chorus-dev-images repository with automatic language detection and appropriate development container selection. New features: - ImageSelector for automatic language-to-image mapping - Language detection from task context, description, and repository - Standardized workspace environment variables - Support for 7 development environments (Rust, Go, Python, Node, Java, C++) Changes: - pkg/execution/images.go (new): Image selection and language detection logic - pkg/execution/engine.go: Modified createSandboxConfig to use ImageSelector This ensures agents automatically get the right tools for their tasks without manual configuration. Related: https://gitea.chorus.services/tony/chorus-dev-images 🤖 Generated with Claude Code (https://claude.com/claude-code) Co-Authored-By: Claude <noreply@anthropic.com>
504 lines
16 KiB
Go
504 lines
16 KiB
Go
package execution
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"strings"
|
|
"time"
|
|
|
|
"chorus/pkg/ai"
|
|
)
|
|
|
|
// TaskExecutionEngine provides AI-powered task execution with isolated sandboxes
|
|
type TaskExecutionEngine interface {
|
|
ExecuteTask(ctx context.Context, request *TaskExecutionRequest) (*TaskExecutionResult, error)
|
|
Initialize(ctx context.Context, config *EngineConfig) error
|
|
Shutdown() error
|
|
GetMetrics() *EngineMetrics
|
|
}
|
|
|
|
// TaskExecutionRequest represents a task to be executed
|
|
type TaskExecutionRequest struct {
|
|
ID string `json:"id"`
|
|
Type string `json:"type"`
|
|
Description string `json:"description"`
|
|
Context map[string]interface{} `json:"context,omitempty"`
|
|
Requirements *TaskRequirements `json:"requirements,omitempty"`
|
|
Timeout time.Duration `json:"timeout,omitempty"`
|
|
}
|
|
|
|
// TaskRequirements specifies execution environment needs
|
|
type TaskRequirements struct {
|
|
AIModel string `json:"ai_model,omitempty"`
|
|
SandboxType string `json:"sandbox_type,omitempty"`
|
|
RequiredTools []string `json:"required_tools,omitempty"`
|
|
EnvironmentVars map[string]string `json:"environment_vars,omitempty"`
|
|
ResourceLimits *ResourceLimits `json:"resource_limits,omitempty"`
|
|
SecurityPolicy *SecurityPolicy `json:"security_policy,omitempty"`
|
|
}
|
|
|
|
// TaskExecutionResult contains the results of task execution
|
|
type TaskExecutionResult struct {
|
|
TaskID string `json:"task_id"`
|
|
Success bool `json:"success"`
|
|
Output string `json:"output"`
|
|
ErrorMessage string `json:"error_message,omitempty"`
|
|
Artifacts []TaskArtifact `json:"artifacts,omitempty"`
|
|
Metrics *ExecutionMetrics `json:"metrics"`
|
|
Metadata map[string]interface{} `json:"metadata,omitempty"`
|
|
}
|
|
|
|
// TaskArtifact represents a file or data produced during execution
|
|
type TaskArtifact struct {
|
|
Name string `json:"name"`
|
|
Type string `json:"type"`
|
|
Path string `json:"path,omitempty"`
|
|
Content []byte `json:"content,omitempty"`
|
|
Size int64 `json:"size"`
|
|
CreatedAt time.Time `json:"created_at"`
|
|
Metadata map[string]string `json:"metadata,omitempty"`
|
|
}
|
|
|
|
// ExecutionMetrics tracks resource usage and performance
|
|
type ExecutionMetrics struct {
|
|
StartTime time.Time `json:"start_time"`
|
|
EndTime time.Time `json:"end_time"`
|
|
Duration time.Duration `json:"duration"`
|
|
AIProviderTime time.Duration `json:"ai_provider_time"`
|
|
SandboxTime time.Duration `json:"sandbox_time"`
|
|
ResourceUsage *ResourceUsage `json:"resource_usage,omitempty"`
|
|
CommandsExecuted int `json:"commands_executed"`
|
|
FilesGenerated int `json:"files_generated"`
|
|
}
|
|
|
|
// EngineConfig configures the task execution engine
|
|
type EngineConfig struct {
|
|
AIProviderFactory *ai.ProviderFactory `json:"-"`
|
|
SandboxDefaults *SandboxConfig `json:"sandbox_defaults"`
|
|
DefaultTimeout time.Duration `json:"default_timeout"`
|
|
MaxConcurrentTasks int `json:"max_concurrent_tasks"`
|
|
EnableMetrics bool `json:"enable_metrics"`
|
|
LogLevel string `json:"log_level"`
|
|
}
|
|
|
|
// EngineMetrics tracks overall engine performance
|
|
type EngineMetrics struct {
|
|
TasksExecuted int64 `json:"tasks_executed"`
|
|
TasksSuccessful int64 `json:"tasks_successful"`
|
|
TasksFailed int64 `json:"tasks_failed"`
|
|
AverageTime time.Duration `json:"average_time"`
|
|
TotalExecutionTime time.Duration `json:"total_execution_time"`
|
|
ActiveTasks int `json:"active_tasks"`
|
|
}
|
|
|
|
// DefaultTaskExecutionEngine implements the TaskExecutionEngine interface
|
|
type DefaultTaskExecutionEngine struct {
|
|
config *EngineConfig
|
|
aiFactory *ai.ProviderFactory
|
|
metrics *EngineMetrics
|
|
activeTasks map[string]context.CancelFunc
|
|
logger *log.Logger
|
|
}
|
|
|
|
// NewTaskExecutionEngine creates a new task execution engine
|
|
func NewTaskExecutionEngine() *DefaultTaskExecutionEngine {
|
|
return &DefaultTaskExecutionEngine{
|
|
metrics: &EngineMetrics{},
|
|
activeTasks: make(map[string]context.CancelFunc),
|
|
logger: log.Default(),
|
|
}
|
|
}
|
|
|
|
// Initialize configures and prepares the execution engine
|
|
func (e *DefaultTaskExecutionEngine) Initialize(ctx context.Context, config *EngineConfig) error {
|
|
if config == nil {
|
|
return fmt.Errorf("engine config cannot be nil")
|
|
}
|
|
|
|
if config.AIProviderFactory == nil {
|
|
return fmt.Errorf("AI provider factory is required")
|
|
}
|
|
|
|
e.config = config
|
|
e.aiFactory = config.AIProviderFactory
|
|
|
|
// Set default values
|
|
if e.config.DefaultTimeout == 0 {
|
|
e.config.DefaultTimeout = 5 * time.Minute
|
|
}
|
|
if e.config.MaxConcurrentTasks == 0 {
|
|
e.config.MaxConcurrentTasks = 10
|
|
}
|
|
|
|
e.logger.Printf("TaskExecutionEngine initialized with %d max concurrent tasks", e.config.MaxConcurrentTasks)
|
|
return nil
|
|
}
|
|
|
|
// ExecuteTask executes a task using AI providers and isolated sandboxes
|
|
func (e *DefaultTaskExecutionEngine) ExecuteTask(ctx context.Context, request *TaskExecutionRequest) (*TaskExecutionResult, error) {
|
|
if e.config == nil {
|
|
return nil, fmt.Errorf("engine not initialized")
|
|
}
|
|
|
|
startTime := time.Now()
|
|
|
|
// Create task context with timeout
|
|
timeout := request.Timeout
|
|
if timeout == 0 {
|
|
timeout = e.config.DefaultTimeout
|
|
}
|
|
|
|
taskCtx, cancel := context.WithTimeout(ctx, timeout)
|
|
defer cancel()
|
|
|
|
// Track active task
|
|
e.activeTasks[request.ID] = cancel
|
|
defer delete(e.activeTasks, request.ID)
|
|
|
|
e.metrics.ActiveTasks++
|
|
defer func() { e.metrics.ActiveTasks-- }()
|
|
|
|
result := &TaskExecutionResult{
|
|
TaskID: request.ID,
|
|
Metrics: &ExecutionMetrics{StartTime: startTime},
|
|
}
|
|
|
|
// Execute the task
|
|
err := e.executeTaskInternal(taskCtx, request, result)
|
|
|
|
// Update metrics
|
|
result.Metrics.EndTime = time.Now()
|
|
result.Metrics.Duration = result.Metrics.EndTime.Sub(result.Metrics.StartTime)
|
|
|
|
e.metrics.TasksExecuted++
|
|
e.metrics.TotalExecutionTime += result.Metrics.Duration
|
|
|
|
if err != nil {
|
|
result.Success = false
|
|
result.ErrorMessage = err.Error()
|
|
e.metrics.TasksFailed++
|
|
e.logger.Printf("Task %s failed: %v", request.ID, err)
|
|
} else {
|
|
result.Success = true
|
|
e.metrics.TasksSuccessful++
|
|
e.logger.Printf("Task %s completed successfully in %v", request.ID, result.Metrics.Duration)
|
|
}
|
|
|
|
e.metrics.AverageTime = e.metrics.TotalExecutionTime / time.Duration(e.metrics.TasksExecuted)
|
|
|
|
return result, err
|
|
}
|
|
|
|
// executeTaskInternal performs the actual task execution
|
|
func (e *DefaultTaskExecutionEngine) executeTaskInternal(ctx context.Context, request *TaskExecutionRequest, result *TaskExecutionResult) error {
|
|
// Step 1: Determine AI model and get provider
|
|
aiStartTime := time.Now()
|
|
|
|
role := e.determineRoleFromTask(request)
|
|
provider, providerConfig, err := e.aiFactory.GetProviderForRole(role)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get AI provider for role %s: %w", role, err)
|
|
}
|
|
|
|
// Step 2: Create AI request
|
|
aiRequest := &ai.TaskRequest{
|
|
TaskID: request.ID,
|
|
TaskTitle: request.Type,
|
|
TaskDescription: request.Description,
|
|
Context: request.Context,
|
|
ModelName: providerConfig.DefaultModel,
|
|
AgentRole: role,
|
|
}
|
|
|
|
// Step 3: Get AI response
|
|
aiResponse, err := provider.ExecuteTask(ctx, aiRequest)
|
|
if err != nil {
|
|
return fmt.Errorf("AI provider execution failed: %w", err)
|
|
}
|
|
|
|
result.Metrics.AIProviderTime = time.Since(aiStartTime)
|
|
|
|
// Step 4: Parse AI response for executable commands
|
|
commands, artifacts, err := e.parseAIResponse(aiResponse)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to parse AI response: %w", err)
|
|
}
|
|
|
|
// Step 5: Execute commands in sandbox if needed
|
|
if len(commands) > 0 {
|
|
sandboxStartTime := time.Now()
|
|
|
|
sandboxResult, err := e.executeSandboxCommands(ctx, request, commands)
|
|
if err != nil {
|
|
return fmt.Errorf("sandbox execution failed: %w", err)
|
|
}
|
|
|
|
result.Metrics.SandboxTime = time.Since(sandboxStartTime)
|
|
result.Metrics.CommandsExecuted = len(commands)
|
|
result.Metrics.ResourceUsage = sandboxResult.ResourceUsage
|
|
|
|
// Merge sandbox artifacts
|
|
artifacts = append(artifacts, sandboxResult.Artifacts...)
|
|
}
|
|
|
|
// Step 6: Process results and artifacts
|
|
result.Output = e.formatOutput(aiResponse, artifacts)
|
|
result.Artifacts = artifacts
|
|
result.Metrics.FilesGenerated = len(artifacts)
|
|
|
|
// Add metadata
|
|
result.Metadata = map[string]interface{}{
|
|
"ai_provider": providerConfig.Type,
|
|
"ai_model": providerConfig.DefaultModel,
|
|
"role": role,
|
|
"commands": len(commands),
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// determineRoleFromTask analyzes the task to determine appropriate AI role
|
|
func (e *DefaultTaskExecutionEngine) determineRoleFromTask(request *TaskExecutionRequest) string {
|
|
taskType := strings.ToLower(request.Type)
|
|
description := strings.ToLower(request.Description)
|
|
|
|
// Determine role based on task type and description keywords
|
|
if strings.Contains(taskType, "code") || strings.Contains(description, "program") ||
|
|
strings.Contains(description, "script") || strings.Contains(description, "function") {
|
|
return "developer"
|
|
}
|
|
|
|
if strings.Contains(taskType, "analysis") || strings.Contains(description, "analyze") ||
|
|
strings.Contains(description, "review") {
|
|
return "analyst"
|
|
}
|
|
|
|
if strings.Contains(taskType, "test") || strings.Contains(description, "test") {
|
|
return "tester"
|
|
}
|
|
|
|
// Default to general purpose
|
|
return "general"
|
|
}
|
|
|
|
// parseAIResponse extracts executable commands and artifacts from AI response
|
|
func (e *DefaultTaskExecutionEngine) parseAIResponse(response *ai.TaskResponse) ([]string, []TaskArtifact, error) {
|
|
var commands []string
|
|
var artifacts []TaskArtifact
|
|
|
|
// Parse response content for commands and files
|
|
// This is a simplified parser - in reality would need more sophisticated parsing
|
|
|
|
if len(response.Actions) > 0 {
|
|
for _, action := range response.Actions {
|
|
switch action.Type {
|
|
case "command", "command_run":
|
|
// Extract command from content or target
|
|
if action.Content != "" {
|
|
commands = append(commands, action.Content)
|
|
} else if action.Target != "" {
|
|
commands = append(commands, action.Target)
|
|
}
|
|
case "file", "file_create", "file_edit":
|
|
// Create artifact from file action
|
|
if action.Target != "" && action.Content != "" {
|
|
artifact := TaskArtifact{
|
|
Name: action.Target,
|
|
Type: "file",
|
|
Content: []byte(action.Content),
|
|
Size: int64(len(action.Content)),
|
|
CreatedAt: time.Now(),
|
|
}
|
|
artifacts = append(artifacts, artifact)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return commands, artifacts, nil
|
|
}
|
|
|
|
// SandboxExecutionResult contains results from sandbox command execution
|
|
type SandboxExecutionResult struct {
|
|
Output string
|
|
Artifacts []TaskArtifact
|
|
ResourceUsage *ResourceUsage
|
|
}
|
|
|
|
// executeSandboxCommands runs commands in an isolated sandbox
|
|
func (e *DefaultTaskExecutionEngine) executeSandboxCommands(ctx context.Context, request *TaskExecutionRequest, commands []string) (*SandboxExecutionResult, error) {
|
|
// Create sandbox configuration
|
|
sandboxConfig := e.createSandboxConfig(request)
|
|
|
|
// Initialize sandbox
|
|
sandbox := NewDockerSandbox()
|
|
err := sandbox.Initialize(ctx, sandboxConfig)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to initialize sandbox: %w", err)
|
|
}
|
|
defer sandbox.Cleanup()
|
|
|
|
var outputs []string
|
|
var artifacts []TaskArtifact
|
|
|
|
// Execute each command
|
|
for _, cmdStr := range commands {
|
|
cmd := &Command{
|
|
Executable: "/bin/sh",
|
|
Args: []string{"-c", cmdStr},
|
|
WorkingDir: "/workspace",
|
|
Timeout: 30 * time.Second,
|
|
}
|
|
|
|
cmdResult, err := sandbox.ExecuteCommand(ctx, cmd)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("command execution failed: %w", err)
|
|
}
|
|
|
|
outputs = append(outputs, fmt.Sprintf("$ %s\n%s", cmdStr, cmdResult.Stdout))
|
|
|
|
if cmdResult.ExitCode != 0 {
|
|
outputs = append(outputs, fmt.Sprintf("Error (exit %d): %s", cmdResult.ExitCode, cmdResult.Stderr))
|
|
}
|
|
}
|
|
|
|
// Get resource usage
|
|
resourceUsage, _ := sandbox.GetResourceUsage(ctx)
|
|
|
|
// Collect any generated files as artifacts
|
|
files, err := sandbox.ListFiles(ctx, "/workspace")
|
|
if err == nil {
|
|
for _, file := range files {
|
|
if !file.IsDir && file.Size > 0 {
|
|
content, err := sandbox.ReadFile(ctx, "/workspace/"+file.Name)
|
|
if err == nil {
|
|
artifact := TaskArtifact{
|
|
Name: file.Name,
|
|
Type: "generated_file",
|
|
Content: content,
|
|
Size: file.Size,
|
|
CreatedAt: file.ModTime,
|
|
}
|
|
artifacts = append(artifacts, artifact)
|
|
}
|
|
}
|
|
}
|
|
}
|
|
|
|
return &SandboxExecutionResult{
|
|
Output: strings.Join(outputs, "\n"),
|
|
Artifacts: artifacts,
|
|
ResourceUsage: resourceUsage,
|
|
}, nil
|
|
}
|
|
|
|
// createSandboxConfig creates a sandbox configuration from task requirements
|
|
func (e *DefaultTaskExecutionEngine) createSandboxConfig(request *TaskExecutionRequest) *SandboxConfig {
|
|
// Use image selector to choose appropriate development environment
|
|
imageSelector := NewImageSelector()
|
|
selectedImage := imageSelector.SelectImageForTask(request)
|
|
|
|
config := &SandboxConfig{
|
|
Type: "docker",
|
|
Image: selectedImage, // Auto-selected based on task language
|
|
Architecture: "amd64",
|
|
WorkingDir: "/workspace/data", // Use standardized workspace structure
|
|
Timeout: 5 * time.Minute,
|
|
Environment: make(map[string]string),
|
|
}
|
|
|
|
// Add standardized workspace environment variables
|
|
config.Environment["WORKSPACE_ROOT"] = "/workspace"
|
|
config.Environment["WORKSPACE_INPUT"] = "/workspace/input"
|
|
config.Environment["WORKSPACE_DATA"] = "/workspace/data"
|
|
config.Environment["WORKSPACE_OUTPUT"] = "/workspace/output"
|
|
|
|
// Apply defaults from engine config
|
|
if e.config.SandboxDefaults != nil {
|
|
if e.config.SandboxDefaults.Image != "" {
|
|
config.Image = e.config.SandboxDefaults.Image
|
|
}
|
|
if e.config.SandboxDefaults.Resources.MemoryLimit > 0 {
|
|
config.Resources = e.config.SandboxDefaults.Resources
|
|
}
|
|
if e.config.SandboxDefaults.Security.NoNewPrivileges {
|
|
config.Security = e.config.SandboxDefaults.Security
|
|
}
|
|
}
|
|
|
|
// Apply task-specific requirements
|
|
if request.Requirements != nil {
|
|
if request.Requirements.SandboxType != "" {
|
|
config.Type = request.Requirements.SandboxType
|
|
}
|
|
|
|
if request.Requirements.EnvironmentVars != nil {
|
|
for k, v := range request.Requirements.EnvironmentVars {
|
|
config.Environment[k] = v
|
|
}
|
|
}
|
|
|
|
if request.Requirements.ResourceLimits != nil {
|
|
config.Resources = *request.Requirements.ResourceLimits
|
|
}
|
|
|
|
if request.Requirements.SecurityPolicy != nil {
|
|
config.Security = *request.Requirements.SecurityPolicy
|
|
}
|
|
}
|
|
|
|
return config
|
|
}
|
|
|
|
// formatOutput creates a formatted output string from AI response and artifacts
|
|
func (e *DefaultTaskExecutionEngine) formatOutput(aiResponse *ai.TaskResponse, artifacts []TaskArtifact) string {
|
|
var output strings.Builder
|
|
|
|
output.WriteString("AI Response:\n")
|
|
output.WriteString(aiResponse.Response)
|
|
output.WriteString("\n\n")
|
|
|
|
if len(artifacts) > 0 {
|
|
output.WriteString("Generated Artifacts:\n")
|
|
for _, artifact := range artifacts {
|
|
output.WriteString(fmt.Sprintf("- %s (%s, %d bytes)\n",
|
|
artifact.Name, artifact.Type, artifact.Size))
|
|
}
|
|
}
|
|
|
|
return output.String()
|
|
}
|
|
|
|
// GetMetrics returns current engine metrics
|
|
func (e *DefaultTaskExecutionEngine) GetMetrics() *EngineMetrics {
|
|
return e.metrics
|
|
}
|
|
|
|
// Shutdown gracefully shuts down the execution engine
|
|
func (e *DefaultTaskExecutionEngine) Shutdown() error {
|
|
e.logger.Printf("Shutting down TaskExecutionEngine...")
|
|
|
|
// Cancel all active tasks
|
|
for taskID, cancel := range e.activeTasks {
|
|
e.logger.Printf("Canceling active task: %s", taskID)
|
|
cancel()
|
|
}
|
|
|
|
// Wait for tasks to finish (with timeout)
|
|
shutdownCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
|
|
defer cancel()
|
|
|
|
for len(e.activeTasks) > 0 {
|
|
select {
|
|
case <-shutdownCtx.Done():
|
|
e.logger.Printf("Shutdown timeout reached, %d tasks may still be active", len(e.activeTasks))
|
|
return nil
|
|
case <-time.After(100 * time.Millisecond):
|
|
// Continue waiting
|
|
}
|
|
}
|
|
|
|
e.logger.Printf("TaskExecutionEngine shutdown complete")
|
|
return nil
|
|
} |