Implement comprehensive task management system with GITEA integration
Replace mock endpoints with real database-backed task management:
- Add tasks table with full relationships and indexes
- Create generic task management service supporting multiple sources
- Implement GITEA integration service for issue synchronization
- Add task creation, retrieval, assignment, and status updates
Database schema changes:
- New tasks table with external_id mapping for GITEA/GitHub/Jira
- Foreign key relationships to teams and agents
- Task workflow tracking (claimed_at, started_at, completed_at)
- JSONB fields for labels, tech_stack, requirements
Task management features:
- Generic TaskFilter with pagination and multi-field filtering
- Automatic tech stack inference from labels and descriptions
- Complexity scoring based on multiple factors
- Real task assignment to teams and agents
- GITEA webhook integration for automated task sync
API endpoints now use real database operations:
- GET /api/v1/tasks (real filtering and pagination)
- GET /api/v1/tasks/{id} (database lookup)
- POST /api/v1/tasks/ingest (creates actual task records)
- POST /api/v1/tasks/{id}/claim (real assignment operations)
GITEA integration includes:
- Issue-to-task synchronization with configurable task labels
- Priority mapping from issue labels
- Estimated hours extraction from issue descriptions
- Webhook processing for real-time updates
This removes the major mocked components and provides
a foundation for genuine E2E testing with real data.
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
This commit is contained in:
482
internal/tasks/service.go
Normal file
482
internal/tasks/service.go
Normal file
@@ -0,0 +1,482 @@
|
||||
package tasks
|
||||
|
||||
import (
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/google/uuid"
|
||||
"github.com/jackc/pgx/v5"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// Service manages tasks from various external sources
|
||||
type Service struct {
|
||||
db *pgxpool.Pool
|
||||
}
|
||||
|
||||
// NewService creates a new task management service
|
||||
func NewService(db *pgxpool.Pool) *Service {
|
||||
return &Service{
|
||||
db: db,
|
||||
}
|
||||
}
|
||||
|
||||
// CreateTask creates a new task in the database
|
||||
func (s *Service) CreateTask(ctx context.Context, input *CreateTaskInput) (*Task, error) {
|
||||
task := &Task{
|
||||
ID: uuid.New(),
|
||||
ExternalID: input.ExternalID,
|
||||
ExternalURL: input.ExternalURL,
|
||||
SourceType: input.SourceType,
|
||||
SourceConfig: input.SourceConfig,
|
||||
Title: input.Title,
|
||||
Description: input.Description,
|
||||
Status: TaskStatusOpen,
|
||||
Priority: input.Priority,
|
||||
Repository: input.Repository,
|
||||
ProjectID: input.ProjectID,
|
||||
Labels: input.Labels,
|
||||
EstimatedHours: input.EstimatedHours,
|
||||
CreatedAt: time.Now(),
|
||||
UpdatedAt: time.Now(),
|
||||
ExternalCreatedAt: input.ExternalCreatedAt,
|
||||
ExternalUpdatedAt: input.ExternalUpdatedAt,
|
||||
}
|
||||
|
||||
// Set defaults
|
||||
if task.Priority == "" {
|
||||
task.Priority = TaskPriorityMedium
|
||||
}
|
||||
if task.SourceConfig == nil {
|
||||
task.SourceConfig = make(map[string]interface{})
|
||||
}
|
||||
if task.Labels == nil {
|
||||
task.Labels = []string{}
|
||||
}
|
||||
|
||||
// Infer tech stack from labels and description
|
||||
task.TechStack = s.inferTechStack(task.Labels, task.Description)
|
||||
task.Requirements = s.extractRequirements(task.Description)
|
||||
task.ComplexityScore = s.estimateComplexity(task)
|
||||
|
||||
// Marshal JSON fields
|
||||
sourceConfigJSON, _ := json.Marshal(task.SourceConfig)
|
||||
labelsJSON, _ := json.Marshal(task.Labels)
|
||||
techStackJSON, _ := json.Marshal(task.TechStack)
|
||||
requirementsJSON, _ := json.Marshal(task.Requirements)
|
||||
|
||||
// Insert into database
|
||||
query := `
|
||||
INSERT INTO tasks (
|
||||
id, external_id, external_url, source_type, source_config,
|
||||
title, description, status, priority,
|
||||
repository, project_id, labels, tech_stack, requirements,
|
||||
estimated_hours, complexity_score,
|
||||
created_at, updated_at, external_created_at, external_updated_at
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5,
|
||||
$6, $7, $8, $9,
|
||||
$10, $11, $12, $13, $14,
|
||||
$15, $16,
|
||||
$17, $18, $19, $20
|
||||
)
|
||||
`
|
||||
|
||||
_, err := s.db.Exec(ctx, query,
|
||||
task.ID, task.ExternalID, task.ExternalURL, task.SourceType, sourceConfigJSON,
|
||||
task.Title, task.Description, task.Status, task.Priority,
|
||||
task.Repository, task.ProjectID, labelsJSON, techStackJSON, requirementsJSON,
|
||||
task.EstimatedHours, task.ComplexityScore,
|
||||
task.CreatedAt, task.UpdatedAt, task.ExternalCreatedAt, task.ExternalUpdatedAt,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create task: %w", err)
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("task_id", task.ID.String()).
|
||||
Str("external_id", task.ExternalID).
|
||||
Str("source_type", string(task.SourceType)).
|
||||
Str("title", task.Title).
|
||||
Msg("Task created")
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// GetTask retrieves a task by ID
|
||||
func (s *Service) GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error) {
|
||||
query := `
|
||||
SELECT id, external_id, external_url, source_type, source_config,
|
||||
title, description, status, priority,
|
||||
assigned_team_id, assigned_agent_id,
|
||||
repository, project_id, labels, tech_stack, requirements,
|
||||
estimated_hours, complexity_score,
|
||||
claimed_at, started_at, completed_at,
|
||||
created_at, updated_at, external_created_at, external_updated_at
|
||||
FROM tasks WHERE id = $1
|
||||
`
|
||||
|
||||
row := s.db.QueryRow(ctx, query, taskID)
|
||||
task, err := s.scanTask(row)
|
||||
if err != nil {
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, fmt.Errorf("task not found")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get task: %w", err)
|
||||
}
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// GetTaskByExternalID retrieves a task by external ID and source type
|
||||
func (s *Service) GetTaskByExternalID(ctx context.Context, externalID string, sourceType SourceType) (*Task, error) {
|
||||
query := `
|
||||
SELECT id, external_id, external_url, source_type, source_config,
|
||||
title, description, status, priority,
|
||||
assigned_team_id, assigned_agent_id,
|
||||
repository, project_id, labels, tech_stack, requirements,
|
||||
estimated_hours, complexity_score,
|
||||
claimed_at, started_at, completed_at,
|
||||
created_at, updated_at, external_created_at, external_updated_at
|
||||
FROM tasks WHERE external_id = $1 AND source_type = $2
|
||||
`
|
||||
|
||||
row := s.db.QueryRow(ctx, query, externalID, sourceType)
|
||||
task, err := s.scanTask(row)
|
||||
if err != nil {
|
||||
if err == pgx.ErrNoRows {
|
||||
return nil, fmt.Errorf("task not found")
|
||||
}
|
||||
return nil, fmt.Errorf("failed to get task: %w", err)
|
||||
}
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
// ListTasks retrieves tasks with filtering and pagination
|
||||
func (s *Service) ListTasks(ctx context.Context, filter *TaskFilter) ([]*Task, int, error) {
|
||||
if filter == nil {
|
||||
filter = &TaskFilter{}
|
||||
}
|
||||
|
||||
// Build WHERE clause
|
||||
conditions := []string{}
|
||||
args := []interface{}{}
|
||||
argIndex := 1
|
||||
|
||||
if len(filter.Status) > 0 {
|
||||
placeholders := make([]string, len(filter.Status))
|
||||
for i, status := range filter.Status {
|
||||
placeholders[i] = fmt.Sprintf("$%d", argIndex)
|
||||
args = append(args, status)
|
||||
argIndex++
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("status IN (%s)", strings.Join(placeholders, ",")))
|
||||
}
|
||||
|
||||
if len(filter.Priority) > 0 {
|
||||
placeholders := make([]string, len(filter.Priority))
|
||||
for i, priority := range filter.Priority {
|
||||
placeholders[i] = fmt.Sprintf("$%d", argIndex)
|
||||
args = append(args, priority)
|
||||
argIndex++
|
||||
}
|
||||
conditions = append(conditions, fmt.Sprintf("priority IN (%s)", strings.Join(placeholders, ",")))
|
||||
}
|
||||
|
||||
if filter.Repository != "" {
|
||||
conditions = append(conditions, fmt.Sprintf("repository = $%d", argIndex))
|
||||
args = append(args, filter.Repository)
|
||||
argIndex++
|
||||
}
|
||||
|
||||
if filter.AssignedTeam != nil {
|
||||
conditions = append(conditions, fmt.Sprintf("assigned_team_id = $%d", argIndex))
|
||||
args = append(args, *filter.AssignedTeam)
|
||||
argIndex++
|
||||
}
|
||||
|
||||
whereClause := ""
|
||||
if len(conditions) > 0 {
|
||||
whereClause = "WHERE " + strings.Join(conditions, " AND ")
|
||||
}
|
||||
|
||||
// Get total count
|
||||
countQuery := fmt.Sprintf("SELECT COUNT(*) FROM tasks %s", whereClause)
|
||||
var total int
|
||||
err := s.db.QueryRow(ctx, countQuery, args...).Scan(&total)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to count tasks: %w", err)
|
||||
}
|
||||
|
||||
// Get tasks with pagination
|
||||
limit := 20
|
||||
offset := 0
|
||||
if filter.Limit > 0 && filter.Limit <= 100 {
|
||||
limit = filter.Limit
|
||||
}
|
||||
if filter.Offset >= 0 {
|
||||
offset = filter.Offset
|
||||
}
|
||||
|
||||
query := fmt.Sprintf(`
|
||||
SELECT id, external_id, external_url, source_type, source_config,
|
||||
title, description, status, priority,
|
||||
assigned_team_id, assigned_agent_id,
|
||||
repository, project_id, labels, tech_stack, requirements,
|
||||
estimated_hours, complexity_score,
|
||||
claimed_at, started_at, completed_at,
|
||||
created_at, updated_at, external_created_at, external_updated_at
|
||||
FROM tasks %s
|
||||
ORDER BY created_at DESC
|
||||
LIMIT $%d OFFSET $%d
|
||||
`, whereClause, argIndex, argIndex+1)
|
||||
|
||||
args = append(args, limit, offset)
|
||||
|
||||
rows, err := s.db.Query(ctx, query, args...)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to query tasks: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var tasks []*Task
|
||||
for rows.Next() {
|
||||
task, err := s.scanTask(rows)
|
||||
if err != nil {
|
||||
return nil, 0, fmt.Errorf("failed to scan task: %w", err)
|
||||
}
|
||||
tasks = append(tasks, task)
|
||||
}
|
||||
|
||||
return tasks, total, rows.Err()
|
||||
}
|
||||
|
||||
// AssignTask assigns a task to a team or agent
|
||||
func (s *Service) AssignTask(ctx context.Context, assignment *TaskAssignment) error {
|
||||
var query string
|
||||
var args []interface{}
|
||||
|
||||
if assignment.TeamID != nil && assignment.AgentID != nil {
|
||||
query = `
|
||||
UPDATE tasks
|
||||
SET assigned_team_id = $1, assigned_agent_id = $2, status = $3, claimed_at = $4, updated_at = $4
|
||||
WHERE id = $5
|
||||
`
|
||||
args = []interface{}{assignment.TeamID, assignment.AgentID, TaskStatusClaimed, time.Now(), assignment.TaskID}
|
||||
} else if assignment.TeamID != nil {
|
||||
query = `
|
||||
UPDATE tasks
|
||||
SET assigned_team_id = $1, status = $2, claimed_at = $3, updated_at = $3
|
||||
WHERE id = $4
|
||||
`
|
||||
args = []interface{}{assignment.TeamID, TaskStatusClaimed, time.Now(), assignment.TaskID}
|
||||
} else if assignment.AgentID != nil {
|
||||
query = `
|
||||
UPDATE tasks
|
||||
SET assigned_agent_id = $1, status = $2, claimed_at = $3, updated_at = $3
|
||||
WHERE id = $4
|
||||
`
|
||||
args = []interface{}{assignment.AgentID, TaskStatusClaimed, time.Now(), assignment.TaskID}
|
||||
} else {
|
||||
return fmt.Errorf("either team_id or agent_id must be provided")
|
||||
}
|
||||
|
||||
result, err := s.db.Exec(ctx, query, args...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to assign task: %w", err)
|
||||
}
|
||||
|
||||
if result.RowsAffected() == 0 {
|
||||
return fmt.Errorf("task not found")
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("task_id", assignment.TaskID.String()).
|
||||
Interface("team_id", assignment.TeamID).
|
||||
Interface("agent_id", assignment.AgentID).
|
||||
Str("reason", assignment.Reason).
|
||||
Msg("Task assigned")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateTaskStatus updates a task's status
|
||||
func (s *Service) UpdateTaskStatus(ctx context.Context, update *TaskStatusUpdate) error {
|
||||
query := `
|
||||
UPDATE tasks
|
||||
SET status = $1, updated_at = $2
|
||||
WHERE id = $3
|
||||
`
|
||||
args := []interface{}{update.Status, time.Now(), update.TaskID}
|
||||
|
||||
// Handle special status transitions
|
||||
switch update.Status {
|
||||
case TaskStatusInProgress:
|
||||
query = `
|
||||
UPDATE tasks
|
||||
SET status = $1, started_at = $2, updated_at = $2
|
||||
WHERE id = $3
|
||||
`
|
||||
case TaskStatusCompleted:
|
||||
query = `
|
||||
UPDATE tasks
|
||||
SET status = $1, completed_at = $2, updated_at = $2
|
||||
WHERE id = $3
|
||||
`
|
||||
}
|
||||
|
||||
result, err := s.db.Exec(ctx, query, args...)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update task status: %w", err)
|
||||
}
|
||||
|
||||
if result.RowsAffected() == 0 {
|
||||
return fmt.Errorf("task not found")
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("task_id", update.TaskID.String()).
|
||||
Str("status", string(update.Status)).
|
||||
Str("reason", update.Reason).
|
||||
Msg("Task status updated")
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// Helper methods
|
||||
|
||||
func (s *Service) scanTask(scanner interface{ Scan(...interface{}) error }) (*Task, error) {
|
||||
task := &Task{}
|
||||
var sourceConfigJSON, labelsJSON, techStackJSON, requirementsJSON []byte
|
||||
|
||||
err := scanner.Scan(
|
||||
&task.ID, &task.ExternalID, &task.ExternalURL, &task.SourceType, &sourceConfigJSON,
|
||||
&task.Title, &task.Description, &task.Status, &task.Priority,
|
||||
&task.AssignedTeamID, &task.AssignedAgentID,
|
||||
&task.Repository, &task.ProjectID, &labelsJSON, &techStackJSON, &requirementsJSON,
|
||||
&task.EstimatedHours, &task.ComplexityScore,
|
||||
&task.ClaimedAt, &task.StartedAt, &task.CompletedAt,
|
||||
&task.CreatedAt, &task.UpdatedAt, &task.ExternalCreatedAt, &task.ExternalUpdatedAt,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
// Parse JSON fields
|
||||
if len(sourceConfigJSON) > 0 {
|
||||
json.Unmarshal(sourceConfigJSON, &task.SourceConfig)
|
||||
}
|
||||
if len(labelsJSON) > 0 {
|
||||
json.Unmarshal(labelsJSON, &task.Labels)
|
||||
}
|
||||
if len(techStackJSON) > 0 {
|
||||
json.Unmarshal(techStackJSON, &task.TechStack)
|
||||
}
|
||||
if len(requirementsJSON) > 0 {
|
||||
json.Unmarshal(requirementsJSON, &task.Requirements)
|
||||
}
|
||||
|
||||
return task, nil
|
||||
}
|
||||
|
||||
func (s *Service) inferTechStack(labels []string, description string) []string {
|
||||
techMap := map[string]bool{
|
||||
"go": true, "golang": true, "javascript": true, "react": true, "node": true,
|
||||
"python": true, "java": true, "rust": true, "docker": true, "postgres": true,
|
||||
"mysql": true, "redis": true, "api": true, "backend": true, "frontend": true,
|
||||
"database": true, "typescript": true, "vue": true, "angular": true,
|
||||
}
|
||||
|
||||
techSet := make(map[string]bool)
|
||||
|
||||
// Check labels
|
||||
for _, label := range labels {
|
||||
if techMap[strings.ToLower(label)] {
|
||||
techSet[strings.ToLower(label)] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Check description for tech keywords
|
||||
descLower := strings.ToLower(description)
|
||||
for tech := range techMap {
|
||||
if strings.Contains(descLower, tech) {
|
||||
techSet[tech] = true
|
||||
}
|
||||
}
|
||||
|
||||
// Convert to slice
|
||||
var techStack []string
|
||||
for tech := range techSet {
|
||||
techStack = append(techStack, tech)
|
||||
}
|
||||
|
||||
return techStack
|
||||
}
|
||||
|
||||
func (s *Service) extractRequirements(description string) []string {
|
||||
// Simple requirement extraction - look for bullet points, numbered lists, etc.
|
||||
// This could be enhanced with NLP in the future
|
||||
lines := strings.Split(description, "\n")
|
||||
var requirements []string
|
||||
|
||||
for _, line := range lines {
|
||||
line = strings.TrimSpace(line)
|
||||
if strings.HasPrefix(line, "- ") || strings.HasPrefix(line, "* ") ||
|
||||
strings.HasPrefix(line, "• ") || len(line) > 0 && line[0] >= '1' && line[0] <= '9' && strings.Contains(line, ".") {
|
||||
req := strings.TrimPrefix(line, "- ")
|
||||
req = strings.TrimPrefix(req, "* ")
|
||||
req = strings.TrimPrefix(req, "• ")
|
||||
if len(req) > 10 { // Filter out very short items
|
||||
requirements = append(requirements, strings.TrimSpace(req))
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return requirements
|
||||
}
|
||||
|
||||
func (s *Service) estimateComplexity(task *Task) float64 {
|
||||
complexity := 0.3 // Base complexity
|
||||
|
||||
// Factor in description length
|
||||
if len(task.Description) > 500 {
|
||||
complexity += 0.2
|
||||
} else if len(task.Description) > 200 {
|
||||
complexity += 0.1
|
||||
}
|
||||
|
||||
// Factor in tech stack diversity
|
||||
if len(task.TechStack) > 3 {
|
||||
complexity += 0.2
|
||||
} else if len(task.TechStack) > 1 {
|
||||
complexity += 0.1
|
||||
}
|
||||
|
||||
// Factor in requirements count
|
||||
if len(task.Requirements) > 5 {
|
||||
complexity += 0.2
|
||||
} else if len(task.Requirements) > 2 {
|
||||
complexity += 0.1
|
||||
}
|
||||
|
||||
// Factor in estimated hours
|
||||
if task.EstimatedHours > 20 {
|
||||
complexity += 0.2
|
||||
} else if task.EstimatedHours > 8 {
|
||||
complexity += 0.1
|
||||
}
|
||||
|
||||
// Cap at 1.0
|
||||
if complexity > 1.0 {
|
||||
complexity = 1.0
|
||||
}
|
||||
|
||||
return complexity
|
||||
}
|
||||
Reference in New Issue
Block a user