package tasks import ( "context" "encoding/json" "fmt" "strings" "time" "github.com/google/uuid" "github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog/log" ) // Service manages tasks from various external sources type Service struct { db *pgxpool.Pool } // NewService creates a new task management service func NewService(db *pgxpool.Pool) *Service { return &Service{ db: db, } } // CreateTask creates a new task in the database func (s *Service) CreateTask(ctx context.Context, input *CreateTaskInput) (*Task, error) { task := &Task{ ID: uuid.New(), ExternalID: input.ExternalID, ExternalURL: input.ExternalURL, SourceType: input.SourceType, SourceConfig: input.SourceConfig, Title: input.Title, Description: input.Description, Status: TaskStatusOpen, Priority: input.Priority, Repository: input.Repository, ProjectID: input.ProjectID, Labels: input.Labels, EstimatedHours: input.EstimatedHours, CreatedAt: time.Now(), UpdatedAt: time.Now(), ExternalCreatedAt: input.ExternalCreatedAt, ExternalUpdatedAt: input.ExternalUpdatedAt, } // Set defaults if task.Priority == "" { task.Priority = TaskPriorityMedium } if task.SourceConfig == nil { task.SourceConfig = make(map[string]interface{}) } if task.Labels == nil { task.Labels = []string{} } // Infer tech stack from labels and description task.TechStack = s.inferTechStack(task.Labels, task.Description) task.Requirements = s.extractRequirements(task.Description) task.ComplexityScore = s.estimateComplexity(task) // Marshal JSON fields sourceConfigJSON, _ := json.Marshal(task.SourceConfig) labelsJSON, _ := json.Marshal(task.Labels) techStackJSON, _ := json.Marshal(task.TechStack) requirementsJSON, _ := json.Marshal(task.Requirements) // Insert into database query := ` INSERT INTO tasks ( id, external_id, external_url, source_type, source_config, title, description, status, priority, repository, project_id, labels, tech_stack, requirements, estimated_hours, complexity_score, created_at, updated_at, external_created_at, external_updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14, $15, $16, $17, $18, $19, $20 ) ` _, err := s.db.Exec(ctx, query, task.ID, task.ExternalID, task.ExternalURL, task.SourceType, sourceConfigJSON, task.Title, task.Description, task.Status, task.Priority, task.Repository, task.ProjectID, labelsJSON, techStackJSON, requirementsJSON, task.EstimatedHours, task.ComplexityScore, task.CreatedAt, task.UpdatedAt, task.ExternalCreatedAt, task.ExternalUpdatedAt, ) if err != nil { return nil, fmt.Errorf("failed to create task: %w", err) } log.Info(). Str("task_id", task.ID.String()). Str("external_id", task.ExternalID). Str("source_type", string(task.SourceType)). Str("title", task.Title). Msg("Task created") return task, nil } // GetTask retrieves a task by ID func (s *Service) GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error) { query := ` SELECT id, external_id, external_url, source_type, source_config, title, description, status, priority, assigned_team_id, assigned_agent_id, repository, project_id, labels, tech_stack, requirements, estimated_hours, complexity_score, claimed_at, started_at, completed_at, created_at, updated_at, external_created_at, external_updated_at FROM tasks WHERE id = $1 ` row := s.db.QueryRow(ctx, query, taskID) task, err := s.scanTask(row) if err != nil { if err == pgx.ErrNoRows { return nil, fmt.Errorf("task not found") } return nil, fmt.Errorf("failed to get task: %w", err) } return task, nil } // GetTaskByExternalID retrieves a task by external ID and source type func (s *Service) GetTaskByExternalID(ctx context.Context, externalID string, sourceType SourceType) (*Task, error) { query := ` SELECT id, external_id, external_url, source_type, source_config, title, description, status, priority, assigned_team_id, assigned_agent_id, repository, project_id, labels, tech_stack, requirements, estimated_hours, complexity_score, claimed_at, started_at, completed_at, created_at, updated_at, external_created_at, external_updated_at FROM tasks WHERE external_id = $1 AND source_type = $2 ` row := s.db.QueryRow(ctx, query, externalID, sourceType) task, err := s.scanTask(row) if err != nil { if err == pgx.ErrNoRows { return nil, fmt.Errorf("task not found") } return nil, fmt.Errorf("failed to get task: %w", err) } return task, nil } // ListTasks retrieves tasks with filtering and pagination func (s *Service) ListTasks(ctx context.Context, filter *TaskFilter) ([]*Task, int, error) { if filter == nil { filter = &TaskFilter{} } // Build WHERE clause conditions := []string{} args := []interface{}{} argIndex := 1 if len(filter.Status) > 0 { placeholders := make([]string, len(filter.Status)) for i, status := range filter.Status { placeholders[i] = fmt.Sprintf("$%d", argIndex) args = append(args, status) argIndex++ } conditions = append(conditions, fmt.Sprintf("status IN (%s)", strings.Join(placeholders, ","))) } if len(filter.Priority) > 0 { placeholders := make([]string, len(filter.Priority)) for i, priority := range filter.Priority { placeholders[i] = fmt.Sprintf("$%d", argIndex) args = append(args, priority) argIndex++ } conditions = append(conditions, fmt.Sprintf("priority IN (%s)", strings.Join(placeholders, ","))) } if filter.Repository != "" { conditions = append(conditions, fmt.Sprintf("repository = $%d", argIndex)) args = append(args, filter.Repository) argIndex++ } if filter.AssignedTeam != nil { conditions = append(conditions, fmt.Sprintf("assigned_team_id = $%d", argIndex)) args = append(args, *filter.AssignedTeam) argIndex++ } whereClause := "" if len(conditions) > 0 { whereClause = "WHERE " + strings.Join(conditions, " AND ") } // Get total count countQuery := fmt.Sprintf("SELECT COUNT(*) FROM tasks %s", whereClause) var total int err := s.db.QueryRow(ctx, countQuery, args...).Scan(&total) if err != nil { return nil, 0, fmt.Errorf("failed to count tasks: %w", err) } // Get tasks with pagination limit := 20 offset := 0 if filter.Limit > 0 && filter.Limit <= 100 { limit = filter.Limit } if filter.Offset >= 0 { offset = filter.Offset } query := fmt.Sprintf(` SELECT id, external_id, external_url, source_type, source_config, title, description, status, priority, assigned_team_id, assigned_agent_id, repository, project_id, labels, tech_stack, requirements, estimated_hours, complexity_score, claimed_at, started_at, completed_at, created_at, updated_at, external_created_at, external_updated_at FROM tasks %s ORDER BY created_at DESC LIMIT $%d OFFSET $%d `, whereClause, argIndex, argIndex+1) args = append(args, limit, offset) rows, err := s.db.Query(ctx, query, args...) if err != nil { return nil, 0, fmt.Errorf("failed to query tasks: %w", err) } defer rows.Close() var tasks []*Task for rows.Next() { task, err := s.scanTask(rows) if err != nil { return nil, 0, fmt.Errorf("failed to scan task: %w", err) } tasks = append(tasks, task) } return tasks, total, rows.Err() } // AssignTask assigns a task to a team or agent func (s *Service) AssignTask(ctx context.Context, assignment *TaskAssignment) error { var query string var args []interface{} if assignment.TeamID != nil && assignment.AgentID != nil { query = ` UPDATE tasks SET assigned_team_id = $1, assigned_agent_id = $2, status = $3, claimed_at = $4, updated_at = $4 WHERE id = $5 ` args = []interface{}{assignment.TeamID, assignment.AgentID, TaskStatusClaimed, time.Now(), assignment.TaskID} } else if assignment.TeamID != nil { query = ` UPDATE tasks SET assigned_team_id = $1, status = $2, claimed_at = $3, updated_at = $3 WHERE id = $4 ` args = []interface{}{assignment.TeamID, TaskStatusClaimed, time.Now(), assignment.TaskID} } else if assignment.AgentID != nil { query = ` UPDATE tasks SET assigned_agent_id = $1, status = $2, claimed_at = $3, updated_at = $3 WHERE id = $4 ` args = []interface{}{assignment.AgentID, TaskStatusClaimed, time.Now(), assignment.TaskID} } else { return fmt.Errorf("either team_id or agent_id must be provided") } result, err := s.db.Exec(ctx, query, args...) if err != nil { return fmt.Errorf("failed to assign task: %w", err) } if result.RowsAffected() == 0 { return fmt.Errorf("task not found") } log.Info(). Str("task_id", assignment.TaskID.String()). Interface("team_id", assignment.TeamID). Interface("agent_id", assignment.AgentID). Str("reason", assignment.Reason). Msg("Task assigned") return nil } // UpdateTaskStatus updates a task's status func (s *Service) UpdateTaskStatus(ctx context.Context, update *TaskStatusUpdate) error { query := ` UPDATE tasks SET status = $1, updated_at = $2 WHERE id = $3 ` args := []interface{}{update.Status, time.Now(), update.TaskID} // Handle special status transitions switch update.Status { case TaskStatusInProgress: query = ` UPDATE tasks SET status = $1, started_at = $2, updated_at = $2 WHERE id = $3 ` case TaskStatusCompleted: query = ` UPDATE tasks SET status = $1, completed_at = $2, updated_at = $2 WHERE id = $3 ` } result, err := s.db.Exec(ctx, query, args...) if err != nil { return fmt.Errorf("failed to update task status: %w", err) } if result.RowsAffected() == 0 { return fmt.Errorf("task not found") } log.Info(). Str("task_id", update.TaskID.String()). Str("status", string(update.Status)). Str("reason", update.Reason). Msg("Task status updated") return nil } // Helper methods func (s *Service) scanTask(scanner interface{ Scan(...interface{}) error }) (*Task, error) { task := &Task{} var sourceConfigJSON, labelsJSON, techStackJSON, requirementsJSON []byte var repository, projectID *string var estimatedHours *int var complexityScore *float64 err := scanner.Scan( &task.ID, &task.ExternalID, &task.ExternalURL, &task.SourceType, &sourceConfigJSON, &task.Title, &task.Description, &task.Status, &task.Priority, &task.AssignedTeamID, &task.AssignedAgentID, &repository, &projectID, &labelsJSON, &techStackJSON, &requirementsJSON, &estimatedHours, &complexityScore, &task.ClaimedAt, &task.StartedAt, &task.CompletedAt, &task.CreatedAt, &task.UpdatedAt, &task.ExternalCreatedAt, &task.ExternalUpdatedAt, ) if err != nil { return nil, err } // Handle nullable fields if repository != nil { task.Repository = *repository } if projectID != nil { task.ProjectID = *projectID } if estimatedHours != nil { task.EstimatedHours = *estimatedHours } if complexityScore != nil { task.ComplexityScore = *complexityScore } // Parse JSON fields if len(sourceConfigJSON) > 0 { json.Unmarshal(sourceConfigJSON, &task.SourceConfig) } if len(labelsJSON) > 0 { json.Unmarshal(labelsJSON, &task.Labels) } if len(techStackJSON) > 0 { json.Unmarshal(techStackJSON, &task.TechStack) } if len(requirementsJSON) > 0 { json.Unmarshal(requirementsJSON, &task.Requirements) } return task, nil } func (s *Service) inferTechStack(labels []string, description string) []string { techMap := map[string]bool{ "go": true, "golang": true, "javascript": true, "react": true, "node": true, "python": true, "java": true, "rust": true, "docker": true, "postgres": true, "mysql": true, "redis": true, "api": true, "backend": true, "frontend": true, "database": true, "typescript": true, "vue": true, "angular": true, } techSet := make(map[string]bool) // Check labels for _, label := range labels { if techMap[strings.ToLower(label)] { techSet[strings.ToLower(label)] = true } } // Check description for tech keywords descLower := strings.ToLower(description) for tech := range techMap { if strings.Contains(descLower, tech) { techSet[tech] = true } } // Convert to slice var techStack []string for tech := range techSet { techStack = append(techStack, tech) } return techStack } func (s *Service) extractRequirements(description string) []string { // Simple requirement extraction - look for bullet points, numbered lists, etc. // This could be enhanced with NLP in the future lines := strings.Split(description, "\n") var requirements []string for _, line := range lines { line = strings.TrimSpace(line) if strings.HasPrefix(line, "- ") || strings.HasPrefix(line, "* ") || strings.HasPrefix(line, "• ") || len(line) > 0 && line[0] >= '1' && line[0] <= '9' && strings.Contains(line, ".") { req := strings.TrimPrefix(line, "- ") req = strings.TrimPrefix(req, "* ") req = strings.TrimPrefix(req, "• ") if len(req) > 10 { // Filter out very short items requirements = append(requirements, strings.TrimSpace(req)) } } } return requirements } func (s *Service) estimateComplexity(task *Task) float64 { complexity := 0.3 // Base complexity // Factor in description length if len(task.Description) > 500 { complexity += 0.2 } else if len(task.Description) > 200 { complexity += 0.1 } // Factor in tech stack diversity if len(task.TechStack) > 3 { complexity += 0.2 } else if len(task.TechStack) > 1 { complexity += 0.1 } // Factor in requirements count if len(task.Requirements) > 5 { complexity += 0.2 } else if len(task.Requirements) > 2 { complexity += 0.1 } // Factor in estimated hours if task.EstimatedHours > 20 { complexity += 0.2 } else if task.EstimatedHours > 8 { complexity += 0.1 } // Cap at 1.0 if complexity > 1.0 { complexity = 1.0 } return complexity }