diff --git a/internal/server/server.go b/internal/server/server.go index 06e788c..6baafb1 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -17,6 +17,7 @@ import ( "github.com/chorus-services/whoosh/internal/database" "github.com/chorus-services/whoosh/internal/gitea" "github.com/chorus-services/whoosh/internal/p2p" + "github.com/chorus-services/whoosh/internal/tasks" "github.com/go-chi/chi/v5" "github.com/go-chi/chi/v5/middleware" "github.com/go-chi/cors" @@ -35,16 +36,24 @@ type Server struct { p2pDiscovery *p2p.Discovery backbeat *backbeat.Integration teamComposer *composer.Service + taskService *tasks.Service + giteaIntegration *tasks.GiteaIntegration } func NewServer(cfg *config.Config, db *database.DB) (*Server, error) { + // Initialize core services + taskService := tasks.NewService(db.Pool) + giteaIntegration := tasks.NewGiteaIntegration(taskService, gitea.NewClient(cfg.GITEA), nil) + s := &Server{ - config: cfg, - db: db, - giteaClient: gitea.NewClient(cfg.GITEA), - webhookHandler: gitea.NewWebhookHandler(cfg.GITEA.WebhookToken), - p2pDiscovery: p2p.NewDiscovery(), - teamComposer: composer.NewService(db.Pool, nil), // Use default config + config: cfg, + db: db, + giteaClient: gitea.NewClient(cfg.GITEA), + webhookHandler: gitea.NewWebhookHandler(cfg.GITEA.WebhookToken), + p2pDiscovery: p2p.NewDiscovery(), + teamComposer: composer.NewService(db.Pool, nil), // Use default config + taskService: taskService, + giteaIntegration: giteaIntegration, } // Initialize BACKBEAT integration if enabled @@ -496,58 +505,53 @@ func (s *Server) analyzeTeamCompositionHandler(w http.ResponseWriter, r *http.Re } func (s *Server) listTasksHandler(w http.ResponseWriter, r *http.Request) { - // Get query parameters - status := r.URL.Query().Get("status") // active, queued, completed - if status == "" { - status = "all" + // Parse query parameters + statusParam := r.URL.Query().Get("status") + priorityParam := r.URL.Query().Get("priority") + repositoryParam := r.URL.Query().Get("repository") + limitStr := r.URL.Query().Get("limit") + offsetStr := r.URL.Query().Get("offset") + + // Build filter + filter := &tasks.TaskFilter{} + + if statusParam != "" && statusParam != "all" { + filter.Status = []tasks.TaskStatus{tasks.TaskStatus(statusParam)} } - - // For MVP, we'll simulate task data that would come from GITEA issues - // In full implementation, this would query GITEA API for bzzz-task issues - tasks := []map[string]interface{}{ - { - "id": "task-001", - "title": "Implement user authentication system", - "description": "Add JWT-based authentication with login and registration endpoints", - "status": "active", - "priority": "high", - "repository": "example/backend-api", - "issue_url": "https://gitea.chorus.services/example/backend-api/issues/1", - "assigned_to": "team-001", - "created_at": "2025-09-03T20:00:00Z", - "updated_at": "2025-09-04T00:00:00Z", - "labels": []string{"bzzz-task", "backend", "security"}, - }, - { - "id": "task-002", - "title": "Fix database connection pooling", - "description": "Connection pool is not releasing connections properly under high load", - "status": "queued", - "priority": "medium", - "repository": "example/backend-api", - "issue_url": "https://gitea.chorus.services/example/backend-api/issues/2", - "assigned_to": nil, - "created_at": "2025-09-04T00:15:00Z", - "updated_at": "2025-09-04T00:15:00Z", - "labels": []string{"bzzz-task", "database", "performance"}, - }, + + if priorityParam != "" { + filter.Priority = []tasks.TaskPriority{tasks.TaskPriority(priorityParam)} } - - // Filter tasks by status if specified - if status != "all" { - filtered := []map[string]interface{}{} - for _, task := range tasks { - if task["status"] == status { - filtered = append(filtered, task) - } + + if repositoryParam != "" { + filter.Repository = repositoryParam + } + + if limitStr != "" { + if limit, err := strconv.Atoi(limitStr); err == nil && limit > 0 && limit <= 100 { + filter.Limit = limit } - tasks = filtered } - + + if offsetStr != "" { + if offset, err := strconv.Atoi(offsetStr); err == nil && offset >= 0 { + filter.Offset = offset + } + } + + // Get tasks from database + taskList, total, err := s.taskService.ListTasks(r.Context(), filter) + if err != nil { + log.Error().Err(err).Msg("Failed to list tasks") + render.Status(r, http.StatusInternalServerError) + render.JSON(w, r, map[string]string{"error": "failed to retrieve tasks"}) + return + } + render.JSON(w, r, map[string]interface{}{ - "tasks": tasks, - "total": len(tasks), - "status": status, + "tasks": taskList, + "total": total, + "filter": filter, }) } @@ -603,18 +607,39 @@ func (s *Server) ingestTaskHandler(w http.ResponseWriter, r *http.Request) { // For MVP, we'll create the task record and attempt team composition // In production, this would persist to a tasks table and queue for processing - // Convert to TaskAnalysisInput for team composition - taskInput := &composer.TaskAnalysisInput{ + // Create task in database first + createInput := &tasks.CreateTaskInput{ + ExternalID: taskID, // Use generated ID as external ID for manual tasks + ExternalURL: taskData.IssueURL, + SourceType: tasks.SourceType(taskData.Source), Title: taskData.Title, Description: taskData.Description, + Priority: tasks.TaskPriority(taskData.Priority), Repository: taskData.Repository, - Requirements: []string{}, // Could parse from description or labels - Priority: composer.TaskPriority(taskData.Priority), - TechStack: s.inferTechStackFromLabels(taskData.Labels), + Labels: taskData.Labels, + } + + createdTask, err := s.taskService.CreateTask(r.Context(), createInput) + if err != nil { + log.Error().Err(err).Str("task_id", taskID).Msg("Failed to create task") + render.Status(r, http.StatusInternalServerError) + render.JSON(w, r, map[string]string{"error": "failed to create task"}) + return + } + + // Convert to TaskAnalysisInput for team composition + taskInput := &composer.TaskAnalysisInput{ + Title: createdTask.Title, + Description: createdTask.Description, + Repository: createdTask.Repository, + Requirements: createdTask.Requirements, + Priority: composer.TaskPriority(createdTask.Priority), + TechStack: createdTask.TechStack, Metadata: map[string]interface{}{ + "task_id": createdTask.ID.String(), "source": taskData.Source, "issue_url": taskData.IssueURL, - "labels": taskData.Labels, + "labels": createdTask.Labels, }, } @@ -672,46 +697,31 @@ func (s *Server) ingestTaskHandler(w http.ResponseWriter, r *http.Request) { } func (s *Server) getTaskHandler(w http.ResponseWriter, r *http.Request) { - taskID := chi.URLParam(r, "taskID") - - // For MVP, we'll simulate task retrieval since we don't have a tasks table yet - // In production, this would query the database for the task details - - log.Info(). - Str("task_id", taskID). - Msg("Retrieving task details") - - // Mock task data for demonstration - // In production, this would query: SELECT * FROM tasks WHERE id = $1 - task := map[string]interface{}{ - "id": taskID, - "title": "Sample Task", - "description": "This is a mock task for MVP demonstration", - "status": "active", - "priority": "medium", - "repository": "example/project", - "source": "manual", - "created_at": time.Now().Add(-2 * time.Hour).Format(time.RFC3339), - "updated_at": time.Now().Add(-30 * time.Minute).Format(time.RFC3339), - "labels": []string{"backend", "api", "go"}, + taskIDStr := chi.URLParam(r, "taskID") + taskID, err := uuid.Parse(taskIDStr) + if err != nil { + render.Status(r, http.StatusBadRequest) + render.JSON(w, r, map[string]string{"error": "invalid task ID format"}) + return } - // Try to find associated team (search teams by task metadata) - teams, _, err := s.teamComposer.ListTeams(r.Context(), 10, 0) - var assignedTeam *composer.Team - - if err == nil { - // In a real implementation, we'd have proper task-to-team relationships - // For MVP, we'll return the most recent team as a placeholder - if len(teams) > 0 { - assignedTeam = teams[0] - task["assigned_team"] = assignedTeam + // Get task from database + task, err := s.taskService.GetTask(r.Context(), taskID) + if err != nil { + if strings.Contains(err.Error(), "not found") { + render.Status(r, http.StatusNotFound) + render.JSON(w, r, map[string]string{"error": "task not found"}) + return } + + log.Error().Err(err).Str("task_id", taskIDStr).Msg("Failed to get task") + render.Status(r, http.StatusInternalServerError) + render.JSON(w, r, map[string]string{"error": "failed to retrieve task"}) + return } render.JSON(w, r, map[string]interface{}{ - "task": task, - "message": "Task details retrieved (MVP mock data)", + "task": task, }) } @@ -998,7 +1008,13 @@ func (s *Server) getProjectTaskHandler(w http.ResponseWriter, r *http.Request) { } func (s *Server) claimTaskHandler(w http.ResponseWriter, r *http.Request) { - taskID := chi.URLParam(r, "taskID") + taskIDStr := chi.URLParam(r, "taskID") + taskID, err := uuid.Parse(taskIDStr) + if err != nil { + render.Status(r, http.StatusBadRequest) + render.JSON(w, r, map[string]string{"error": "invalid task ID format"}) + return + } var claimData struct { TeamID string `json:"team_id"` @@ -1041,21 +1057,47 @@ func (s *Server) claimTaskHandler(w http.ResponseWriter, r *http.Request) { return } + // Parse agent ID if provided + var agentUUID *uuid.UUID + if claimData.AgentID != "" { + agentID, err := uuid.Parse(claimData.AgentID) + if err != nil { + render.Status(r, http.StatusBadRequest) + render.JSON(w, r, map[string]string{"error": "invalid agent_id format"}) + return + } + agentUUID = &agentID + } + + // Assign task to team/agent + assignment := &tasks.TaskAssignment{ + TaskID: taskID, + TeamID: &teamUUID, + AgentID: agentUUID, + Reason: claimData.Reason, + } + + err = s.taskService.AssignTask(r.Context(), assignment) + if err != nil { + log.Error().Err(err).Str("task_id", taskIDStr).Msg("Failed to assign task") + render.Status(r, http.StatusInternalServerError) + render.JSON(w, r, map[string]string{"error": "failed to assign task"}) + return + } + log.Info(). - Str("task_id", taskID). + Str("task_id", taskIDStr). Str("team_id", claimData.TeamID). Str("agent_id", claimData.AgentID). - Msg("Task claimed by team") + Msg("Task assigned to team") - // For MVP, we'll just return success - // In production, this would update task assignment in database render.JSON(w, r, map[string]interface{}{ - "task_id": taskID, + "task_id": taskIDStr, "team_id": claimData.TeamID, "agent_id": claimData.AgentID, "status": "claimed", "claimed_at": time.Now().Format(time.RFC3339), - "message": "Task claimed successfully (MVP mode)", + "message": "Task assigned successfully", }) } diff --git a/internal/tasks/gitea_integration.go b/internal/tasks/gitea_integration.go new file mode 100644 index 0000000..3b88599 --- /dev/null +++ b/internal/tasks/gitea_integration.go @@ -0,0 +1,370 @@ +package tasks + +import ( + "context" + "encoding/json" + "fmt" + "strconv" + "strings" + "time" + + "github.com/chorus-services/whoosh/internal/gitea" + "github.com/rs/zerolog/log" +) + +// GiteaIntegration handles synchronization with GITEA issues +type GiteaIntegration struct { + taskService *Service + giteaClient *gitea.Client + config *GiteaConfig +} + +// GiteaConfig contains GITEA integration configuration +type GiteaConfig struct { + BaseURL string `json:"base_url"` + TaskLabel string `json:"task_label"` // e.g., "bzzz-task" + Repositories []string `json:"repositories"` // repositories to monitor + TeamMapping map[string]string `json:"team_mapping"` // label -> team mapping +} + +// NewGiteaIntegration creates a new GITEA integration +func NewGiteaIntegration(taskService *Service, giteaClient *gitea.Client, config *GiteaConfig) *GiteaIntegration { + if config == nil { + config = &GiteaConfig{ + TaskLabel: "bzzz-task", + Repositories: []string{}, + TeamMapping: make(map[string]string), + } + } + + return &GiteaIntegration{ + taskService: taskService, + giteaClient: giteaClient, + config: config, + } +} + +// GiteaIssue represents a GITEA issue response +type GiteaIssue struct { + ID int `json:"id"` + Number int `json:"number"` + Title string `json:"title"` + Body string `json:"body"` + State string `json:"state"` // "open", "closed" + URL string `json:"html_url"` + Labels []GiteaLabel `json:"labels"` + Repository GiteaRepo `json:"repository"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + Assignees []GiteaUser `json:"assignees"` +} + +type GiteaLabel struct { + Name string `json:"name"` + Color string `json:"color"` + Description string `json:"description"` +} + +type GiteaRepo struct { + FullName string `json:"full_name"` + HTMLURL string `json:"html_url"` +} + +type GiteaUser struct { + ID int `json:"id"` + Login string `json:"login"` + FullName string `json:"full_name"` +} + +// SyncIssuesFromGitea fetches issues from GITEA and creates/updates tasks +func (g *GiteaIntegration) SyncIssuesFromGitea(ctx context.Context, repository string) error { + log.Info(). + Str("repository", repository). + Msg("Starting GITEA issue sync") + + // Fetch issues from GITEA API + issues, err := g.fetchIssuesFromGitea(ctx, repository) + if err != nil { + return fmt.Errorf("failed to fetch GITEA issues: %w", err) + } + + syncedCount := 0 + errorCount := 0 + + for _, issue := range issues { + // Check if issue has task label + if !g.hasTaskLabel(issue) { + continue + } + + err := g.syncIssue(ctx, issue) + if err != nil { + log.Error().Err(err). + Int("issue_id", issue.ID). + Str("repository", repository). + Msg("Failed to sync issue") + errorCount++ + continue + } + + syncedCount++ + } + + log.Info(). + Str("repository", repository). + Int("synced", syncedCount). + Int("errors", errorCount). + Msg("GITEA issue sync completed") + + return nil +} + +// SyncIssue synchronizes a single GITEA issue with the task system +func (g *GiteaIntegration) syncIssue(ctx context.Context, issue GiteaIssue) error { + externalID := fmt.Sprintf("%d", issue.ID) + + // Check if task already exists + existingTask, err := g.taskService.GetTaskByExternalID(ctx, externalID, SourceTypeGitea) + if err != nil && !strings.Contains(err.Error(), "not found") { + return fmt.Errorf("failed to check existing task: %w", err) + } + + if existingTask != nil { + // Update existing task + return g.updateTaskFromIssue(ctx, existingTask, issue) + } else { + // Create new task + return g.createTaskFromIssue(ctx, issue) + } +} + +// createTaskFromIssue creates a new task from a GITEA issue +func (g *GiteaIntegration) createTaskFromIssue(ctx context.Context, issue GiteaIssue) error { + labels := make([]string, len(issue.Labels)) + for i, label := range issue.Labels { + labels[i] = label.Name + } + + // Determine priority from labels + priority := g.determinePriorityFromLabels(labels) + + // Extract estimated hours from issue body (look for patterns like "Estimated: 4 hours") + estimatedHours := g.extractEstimatedHours(issue.Body) + + input := &CreateTaskInput{ + ExternalID: fmt.Sprintf("%d", issue.ID), + ExternalURL: issue.URL, + SourceType: SourceTypeGitea, + SourceConfig: map[string]interface{}{ + "gitea_number": issue.Number, + "repository": issue.Repository.FullName, + "assignees": issue.Assignees, + }, + Title: issue.Title, + Description: issue.Body, + Priority: priority, + Repository: issue.Repository.FullName, + Labels: labels, + EstimatedHours: estimatedHours, + ExternalCreatedAt: &issue.CreatedAt, + ExternalUpdatedAt: &issue.UpdatedAt, + } + + task, err := g.taskService.CreateTask(ctx, input) + if err != nil { + return fmt.Errorf("failed to create task from GITEA issue: %w", err) + } + + log.Info(). + Str("task_id", task.ID.String()). + Int("gitea_issue_id", issue.ID). + Str("repository", issue.Repository.FullName). + Msg("Created task from GITEA issue") + + return nil +} + +// updateTaskFromIssue updates an existing task from a GITEA issue +func (g *GiteaIntegration) updateTaskFromIssue(ctx context.Context, task *Task, issue GiteaIssue) error { + // Check if issue was updated since last sync + if task.ExternalUpdatedAt != nil && !issue.UpdatedAt.After(*task.ExternalUpdatedAt) { + return nil // No updates needed + } + + // Determine new status based on GITEA state + var newStatus TaskStatus + switch issue.State { + case "open": + if task.Status == TaskStatusClosed { + newStatus = TaskStatusOpen + } + case "closed": + if task.Status != TaskStatusClosed { + newStatus = TaskStatusClosed + } + } + + // Update status if changed + if newStatus != "" && newStatus != task.Status { + update := &TaskStatusUpdate{ + TaskID: task.ID, + Status: newStatus, + Reason: fmt.Sprintf("GITEA issue state changed to %s", issue.State), + } + + err := g.taskService.UpdateTaskStatus(ctx, update) + if err != nil { + return fmt.Errorf("failed to update task status: %w", err) + } + + log.Info(). + Str("task_id", task.ID.String()). + Int("gitea_issue_id", issue.ID). + Str("old_status", string(task.Status)). + Str("new_status", string(newStatus)). + Msg("Updated task status from GITEA issue") + } + + // TODO: Update other fields like title, description, labels if needed + // This would require additional database operations + + return nil +} + +// ProcessGiteaWebhook processes a GITEA webhook payload +func (g *GiteaIntegration) ProcessGiteaWebhook(ctx context.Context, payload []byte) error { + var webhookData struct { + Action string `json:"action"` + Issue GiteaIssue `json:"issue"` + Repository GiteaRepo `json:"repository"` + } + + if err := json.Unmarshal(payload, &webhookData); err != nil { + return fmt.Errorf("failed to parse GITEA webhook payload: %w", err) + } + + // Only process issues with task label + if !g.hasTaskLabel(webhookData.Issue) { + log.Debug(). + Int("issue_id", webhookData.Issue.ID). + Str("action", webhookData.Action). + Msg("Ignoring GITEA issue without task label") + return nil + } + + log.Info(). + Str("action", webhookData.Action). + Int("issue_id", webhookData.Issue.ID). + Str("repository", webhookData.Repository.FullName). + Msg("Processing GITEA webhook") + + switch webhookData.Action { + case "opened", "edited", "reopened", "closed": + return g.syncIssue(ctx, webhookData.Issue) + case "labeled", "unlabeled": + // Re-sync to update task labels and tech stack + return g.syncIssue(ctx, webhookData.Issue) + default: + log.Debug(). + Str("action", webhookData.Action). + Msg("Ignoring GITEA webhook action") + return nil + } +} + +// Helper methods + +func (g *GiteaIntegration) fetchIssuesFromGitea(ctx context.Context, repository string) ([]GiteaIssue, error) { + // This would make actual HTTP calls to GITEA API + // For MVP, we'll return mock data based on known structure + + // In production, this would be: + // url := fmt.Sprintf("%s/repos/%s/issues", g.config.BaseURL, repository) + // resp, err := g.giteaClient.Get(url) + // ... parse response + + // Mock issues for testing + mockIssues := []GiteaIssue{ + { + ID: 123, + Number: 1, + Title: "Implement user authentication system", + Body: "Add JWT-based authentication with login and registration endpoints\n\n- JWT token generation\n- User registration\n- Password hashing\n\nEstimated: 8 hours", + State: "open", + URL: fmt.Sprintf("https://gitea.chorus.services/%s/issues/1", repository), + Labels: []GiteaLabel{ + {Name: "bzzz-task", Color: "0052cc"}, + {Name: "backend", Color: "1d76db"}, + {Name: "high-priority", Color: "d93f0b"}, + }, + Repository: GiteaRepo{FullName: repository}, + CreatedAt: time.Now().Add(-24 * time.Hour), + UpdatedAt: time.Now().Add(-2 * time.Hour), + }, + { + ID: 124, + Number: 2, + Title: "Fix database connection pooling", + Body: "Connection pool is not releasing connections properly under high load\n\nSteps to reproduce:\n1. Start application\n2. Generate high load\n3. Monitor connection count", + State: "open", + URL: fmt.Sprintf("https://gitea.chorus.services/%s/issues/2", repository), + Labels: []GiteaLabel{ + {Name: "bzzz-task", Color: "0052cc"}, + {Name: "database", Color: "5319e7"}, + {Name: "bug", Color: "d93f0b"}, + }, + Repository: GiteaRepo{FullName: repository}, + CreatedAt: time.Now().Add(-12 * time.Hour), + UpdatedAt: time.Now().Add(-1 * time.Hour), + }, + } + + log.Debug(). + Str("repository", repository). + Int("mock_issues", len(mockIssues)). + Msg("Returning mock GITEA issues for MVP") + + return mockIssues, nil +} + +func (g *GiteaIntegration) hasTaskLabel(issue GiteaIssue) bool { + for _, label := range issue.Labels { + if label.Name == g.config.TaskLabel { + return true + } + } + return false +} + +func (g *GiteaIntegration) determinePriorityFromLabels(labels []string) TaskPriority { + for _, label := range labels { + switch strings.ToLower(label) { + case "critical", "urgent", "critical-priority": + return TaskPriorityCritical + case "high", "high-priority", "important": + return TaskPriorityHigh + case "low", "low-priority", "minor": + return TaskPriorityLow + } + } + return TaskPriorityMedium +} + +func (g *GiteaIntegration) extractEstimatedHours(body string) int { + // Look for patterns like "Estimated: 4 hours", "Est: 8h", etc. + lines := strings.Split(strings.ToLower(body), "\n") + for _, line := range lines { + if strings.Contains(line, "estimated:") || strings.Contains(line, "est:") { + // Extract number from line + words := strings.Fields(line) + for i, word := range words { + if (word == "estimated:" || word == "est:") && i+1 < len(words) { + if hours, err := strconv.Atoi(strings.TrimSuffix(words[i+1], "h")); err == nil { + return hours + } + } + } + } + } + return 0 +} \ No newline at end of file diff --git a/internal/tasks/models.go b/internal/tasks/models.go new file mode 100644 index 0000000..56533bd --- /dev/null +++ b/internal/tasks/models.go @@ -0,0 +1,142 @@ +package tasks + +import ( + "time" + + "github.com/google/uuid" +) + +// TaskStatus represents the current status of a task +type TaskStatus string + +const ( + TaskStatusOpen TaskStatus = "open" + TaskStatusClaimed TaskStatus = "claimed" + TaskStatusInProgress TaskStatus = "in_progress" + TaskStatusCompleted TaskStatus = "completed" + TaskStatusClosed TaskStatus = "closed" + TaskStatusBlocked TaskStatus = "blocked" +) + +// TaskPriority represents task priority levels +type TaskPriority string + +const ( + TaskPriorityLow TaskPriority = "low" + TaskPriorityMedium TaskPriority = "medium" + TaskPriorityHigh TaskPriority = "high" + TaskPriorityCritical TaskPriority = "critical" +) + +// SourceType represents different task management systems +type SourceType string + +const ( + SourceTypeGitea SourceType = "gitea" + SourceTypeGitHub SourceType = "github" + SourceTypeJira SourceType = "jira" + SourceTypeManual SourceType = "manual" +) + +// Task represents a development task from any source system +type Task struct { + ID uuid.UUID `json:"id" db:"id"` + ExternalID string `json:"external_id" db:"external_id"` + ExternalURL string `json:"external_url" db:"external_url"` + SourceType SourceType `json:"source_type" db:"source_type"` + SourceConfig map[string]interface{} `json:"source_config" db:"source_config"` + + // Core task data + Title string `json:"title" db:"title"` + Description string `json:"description" db:"description"` + Status TaskStatus `json:"status" db:"status"` + Priority TaskPriority `json:"priority" db:"priority"` + + // Assignment data + AssignedTeamID *uuid.UUID `json:"assigned_team_id,omitempty" db:"assigned_team_id"` + AssignedAgentID *uuid.UUID `json:"assigned_agent_id,omitempty" db:"assigned_agent_id"` + + // Context data + Repository string `json:"repository,omitempty" db:"repository"` + ProjectID string `json:"project_id,omitempty" db:"project_id"` + Labels []string `json:"labels" db:"labels"` + TechStack []string `json:"tech_stack" db:"tech_stack"` + Requirements []string `json:"requirements" db:"requirements"` + EstimatedHours int `json:"estimated_hours,omitempty" db:"estimated_hours"` + ComplexityScore float64 `json:"complexity_score,omitempty" db:"complexity_score"` + + // Workflow timestamps + ClaimedAt *time.Time `json:"claimed_at,omitempty" db:"claimed_at"` + StartedAt *time.Time `json:"started_at,omitempty" db:"started_at"` + CompletedAt *time.Time `json:"completed_at,omitempty" db:"completed_at"` + + // Timestamps + CreatedAt time.Time `json:"created_at" db:"created_at"` + UpdatedAt time.Time `json:"updated_at" db:"updated_at"` + ExternalCreatedAt *time.Time `json:"external_created_at,omitempty" db:"external_created_at"` + ExternalUpdatedAt *time.Time `json:"external_updated_at,omitempty" db:"external_updated_at"` +} + +// CreateTaskInput represents input for creating a new task +type CreateTaskInput struct { + ExternalID string `json:"external_id"` + ExternalURL string `json:"external_url"` + SourceType SourceType `json:"source_type"` + SourceConfig map[string]interface{} `json:"source_config,omitempty"` + + Title string `json:"title"` + Description string `json:"description"` + Priority TaskPriority `json:"priority,omitempty"` + + Repository string `json:"repository,omitempty"` + ProjectID string `json:"project_id,omitempty"` + Labels []string `json:"labels,omitempty"` + EstimatedHours int `json:"estimated_hours,omitempty"` + + ExternalCreatedAt *time.Time `json:"external_created_at,omitempty"` + ExternalUpdatedAt *time.Time `json:"external_updated_at,omitempty"` +} + +// TaskFilter represents filtering options for task queries +type TaskFilter struct { + Status []TaskStatus `json:"status,omitempty"` + Priority []TaskPriority `json:"priority,omitempty"` + SourceType []SourceType `json:"source_type,omitempty"` + Repository string `json:"repository,omitempty"` + ProjectID string `json:"project_id,omitempty"` + AssignedTeam *uuid.UUID `json:"assigned_team,omitempty"` + AssignedAgent *uuid.UUID `json:"assigned_agent,omitempty"` + TechStack []string `json:"tech_stack,omitempty"` + Limit int `json:"limit,omitempty"` + Offset int `json:"offset,omitempty"` +} + +// TaskAssignment represents assigning a task to a team or agent +type TaskAssignment struct { + TaskID uuid.UUID `json:"task_id"` + TeamID *uuid.UUID `json:"team_id,omitempty"` + AgentID *uuid.UUID `json:"agent_id,omitempty"` + Reason string `json:"reason,omitempty"` +} + +// TaskStatusUpdate represents updating a task's status +type TaskStatusUpdate struct { + TaskID uuid.UUID `json:"task_id"` + Status TaskStatus `json:"status"` + Reason string `json:"reason,omitempty"` + Metadata map[string]interface{} `json:"metadata,omitempty"` +} + +// ExternalTask represents a task from an external system (GITEA, GitHub, etc.) +type ExternalTask struct { + ID string `json:"id"` + Title string `json:"title"` + Description string `json:"description"` + State string `json:"state"` // open, closed, etc. + URL string `json:"url"` + Repository string `json:"repository"` + Labels []string `json:"labels"` + CreatedAt time.Time `json:"created_at"` + UpdatedAt time.Time `json:"updated_at"` + Metadata map[string]interface{} `json:"metadata"` +} \ No newline at end of file diff --git a/internal/tasks/service.go b/internal/tasks/service.go new file mode 100644 index 0000000..cd213b1 --- /dev/null +++ b/internal/tasks/service.go @@ -0,0 +1,482 @@ +package tasks + +import ( + "context" + "encoding/json" + "fmt" + "strings" + "time" + + "github.com/google/uuid" + "github.com/jackc/pgx/v5" + "github.com/jackc/pgx/v5/pgxpool" + "github.com/rs/zerolog/log" +) + +// Service manages tasks from various external sources +type Service struct { + db *pgxpool.Pool +} + +// NewService creates a new task management service +func NewService(db *pgxpool.Pool) *Service { + return &Service{ + db: db, + } +} + +// CreateTask creates a new task in the database +func (s *Service) CreateTask(ctx context.Context, input *CreateTaskInput) (*Task, error) { + task := &Task{ + ID: uuid.New(), + ExternalID: input.ExternalID, + ExternalURL: input.ExternalURL, + SourceType: input.SourceType, + SourceConfig: input.SourceConfig, + Title: input.Title, + Description: input.Description, + Status: TaskStatusOpen, + Priority: input.Priority, + Repository: input.Repository, + ProjectID: input.ProjectID, + Labels: input.Labels, + EstimatedHours: input.EstimatedHours, + CreatedAt: time.Now(), + UpdatedAt: time.Now(), + ExternalCreatedAt: input.ExternalCreatedAt, + ExternalUpdatedAt: input.ExternalUpdatedAt, + } + + // Set defaults + if task.Priority == "" { + task.Priority = TaskPriorityMedium + } + if task.SourceConfig == nil { + task.SourceConfig = make(map[string]interface{}) + } + if task.Labels == nil { + task.Labels = []string{} + } + + // Infer tech stack from labels and description + task.TechStack = s.inferTechStack(task.Labels, task.Description) + task.Requirements = s.extractRequirements(task.Description) + task.ComplexityScore = s.estimateComplexity(task) + + // Marshal JSON fields + sourceConfigJSON, _ := json.Marshal(task.SourceConfig) + labelsJSON, _ := json.Marshal(task.Labels) + techStackJSON, _ := json.Marshal(task.TechStack) + requirementsJSON, _ := json.Marshal(task.Requirements) + + // Insert into database + query := ` + INSERT INTO tasks ( + id, external_id, external_url, source_type, source_config, + title, description, status, priority, + repository, project_id, labels, tech_stack, requirements, + estimated_hours, complexity_score, + created_at, updated_at, external_created_at, external_updated_at + ) VALUES ( + $1, $2, $3, $4, $5, + $6, $7, $8, $9, + $10, $11, $12, $13, $14, + $15, $16, + $17, $18, $19, $20 + ) + ` + + _, err := s.db.Exec(ctx, query, + task.ID, task.ExternalID, task.ExternalURL, task.SourceType, sourceConfigJSON, + task.Title, task.Description, task.Status, task.Priority, + task.Repository, task.ProjectID, labelsJSON, techStackJSON, requirementsJSON, + task.EstimatedHours, task.ComplexityScore, + task.CreatedAt, task.UpdatedAt, task.ExternalCreatedAt, task.ExternalUpdatedAt, + ) + + if err != nil { + return nil, fmt.Errorf("failed to create task: %w", err) + } + + log.Info(). + Str("task_id", task.ID.String()). + Str("external_id", task.ExternalID). + Str("source_type", string(task.SourceType)). + Str("title", task.Title). + Msg("Task created") + + return task, nil +} + +// GetTask retrieves a task by ID +func (s *Service) GetTask(ctx context.Context, taskID uuid.UUID) (*Task, error) { + query := ` + SELECT id, external_id, external_url, source_type, source_config, + title, description, status, priority, + assigned_team_id, assigned_agent_id, + repository, project_id, labels, tech_stack, requirements, + estimated_hours, complexity_score, + claimed_at, started_at, completed_at, + created_at, updated_at, external_created_at, external_updated_at + FROM tasks WHERE id = $1 + ` + + row := s.db.QueryRow(ctx, query, taskID) + task, err := s.scanTask(row) + if err != nil { + if err == pgx.ErrNoRows { + return nil, fmt.Errorf("task not found") + } + return nil, fmt.Errorf("failed to get task: %w", err) + } + + return task, nil +} + +// GetTaskByExternalID retrieves a task by external ID and source type +func (s *Service) GetTaskByExternalID(ctx context.Context, externalID string, sourceType SourceType) (*Task, error) { + query := ` + SELECT id, external_id, external_url, source_type, source_config, + title, description, status, priority, + assigned_team_id, assigned_agent_id, + repository, project_id, labels, tech_stack, requirements, + estimated_hours, complexity_score, + claimed_at, started_at, completed_at, + created_at, updated_at, external_created_at, external_updated_at + FROM tasks WHERE external_id = $1 AND source_type = $2 + ` + + row := s.db.QueryRow(ctx, query, externalID, sourceType) + task, err := s.scanTask(row) + if err != nil { + if err == pgx.ErrNoRows { + return nil, fmt.Errorf("task not found") + } + return nil, fmt.Errorf("failed to get task: %w", err) + } + + return task, nil +} + +// ListTasks retrieves tasks with filtering and pagination +func (s *Service) ListTasks(ctx context.Context, filter *TaskFilter) ([]*Task, int, error) { + if filter == nil { + filter = &TaskFilter{} + } + + // Build WHERE clause + conditions := []string{} + args := []interface{}{} + argIndex := 1 + + if len(filter.Status) > 0 { + placeholders := make([]string, len(filter.Status)) + for i, status := range filter.Status { + placeholders[i] = fmt.Sprintf("$%d", argIndex) + args = append(args, status) + argIndex++ + } + conditions = append(conditions, fmt.Sprintf("status IN (%s)", strings.Join(placeholders, ","))) + } + + if len(filter.Priority) > 0 { + placeholders := make([]string, len(filter.Priority)) + for i, priority := range filter.Priority { + placeholders[i] = fmt.Sprintf("$%d", argIndex) + args = append(args, priority) + argIndex++ + } + conditions = append(conditions, fmt.Sprintf("priority IN (%s)", strings.Join(placeholders, ","))) + } + + if filter.Repository != "" { + conditions = append(conditions, fmt.Sprintf("repository = $%d", argIndex)) + args = append(args, filter.Repository) + argIndex++ + } + + if filter.AssignedTeam != nil { + conditions = append(conditions, fmt.Sprintf("assigned_team_id = $%d", argIndex)) + args = append(args, *filter.AssignedTeam) + argIndex++ + } + + whereClause := "" + if len(conditions) > 0 { + whereClause = "WHERE " + strings.Join(conditions, " AND ") + } + + // Get total count + countQuery := fmt.Sprintf("SELECT COUNT(*) FROM tasks %s", whereClause) + var total int + err := s.db.QueryRow(ctx, countQuery, args...).Scan(&total) + if err != nil { + return nil, 0, fmt.Errorf("failed to count tasks: %w", err) + } + + // Get tasks with pagination + limit := 20 + offset := 0 + if filter.Limit > 0 && filter.Limit <= 100 { + limit = filter.Limit + } + if filter.Offset >= 0 { + offset = filter.Offset + } + + query := fmt.Sprintf(` + SELECT id, external_id, external_url, source_type, source_config, + title, description, status, priority, + assigned_team_id, assigned_agent_id, + repository, project_id, labels, tech_stack, requirements, + estimated_hours, complexity_score, + claimed_at, started_at, completed_at, + created_at, updated_at, external_created_at, external_updated_at + FROM tasks %s + ORDER BY created_at DESC + LIMIT $%d OFFSET $%d + `, whereClause, argIndex, argIndex+1) + + args = append(args, limit, offset) + + rows, err := s.db.Query(ctx, query, args...) + if err != nil { + return nil, 0, fmt.Errorf("failed to query tasks: %w", err) + } + defer rows.Close() + + var tasks []*Task + for rows.Next() { + task, err := s.scanTask(rows) + if err != nil { + return nil, 0, fmt.Errorf("failed to scan task: %w", err) + } + tasks = append(tasks, task) + } + + return tasks, total, rows.Err() +} + +// AssignTask assigns a task to a team or agent +func (s *Service) AssignTask(ctx context.Context, assignment *TaskAssignment) error { + var query string + var args []interface{} + + if assignment.TeamID != nil && assignment.AgentID != nil { + query = ` + UPDATE tasks + SET assigned_team_id = $1, assigned_agent_id = $2, status = $3, claimed_at = $4, updated_at = $4 + WHERE id = $5 + ` + args = []interface{}{assignment.TeamID, assignment.AgentID, TaskStatusClaimed, time.Now(), assignment.TaskID} + } else if assignment.TeamID != nil { + query = ` + UPDATE tasks + SET assigned_team_id = $1, status = $2, claimed_at = $3, updated_at = $3 + WHERE id = $4 + ` + args = []interface{}{assignment.TeamID, TaskStatusClaimed, time.Now(), assignment.TaskID} + } else if assignment.AgentID != nil { + query = ` + UPDATE tasks + SET assigned_agent_id = $1, status = $2, claimed_at = $3, updated_at = $3 + WHERE id = $4 + ` + args = []interface{}{assignment.AgentID, TaskStatusClaimed, time.Now(), assignment.TaskID} + } else { + return fmt.Errorf("either team_id or agent_id must be provided") + } + + result, err := s.db.Exec(ctx, query, args...) + if err != nil { + return fmt.Errorf("failed to assign task: %w", err) + } + + if result.RowsAffected() == 0 { + return fmt.Errorf("task not found") + } + + log.Info(). + Str("task_id", assignment.TaskID.String()). + Interface("team_id", assignment.TeamID). + Interface("agent_id", assignment.AgentID). + Str("reason", assignment.Reason). + Msg("Task assigned") + + return nil +} + +// UpdateTaskStatus updates a task's status +func (s *Service) UpdateTaskStatus(ctx context.Context, update *TaskStatusUpdate) error { + query := ` + UPDATE tasks + SET status = $1, updated_at = $2 + WHERE id = $3 + ` + args := []interface{}{update.Status, time.Now(), update.TaskID} + + // Handle special status transitions + switch update.Status { + case TaskStatusInProgress: + query = ` + UPDATE tasks + SET status = $1, started_at = $2, updated_at = $2 + WHERE id = $3 + ` + case TaskStatusCompleted: + query = ` + UPDATE tasks + SET status = $1, completed_at = $2, updated_at = $2 + WHERE id = $3 + ` + } + + result, err := s.db.Exec(ctx, query, args...) + if err != nil { + return fmt.Errorf("failed to update task status: %w", err) + } + + if result.RowsAffected() == 0 { + return fmt.Errorf("task not found") + } + + log.Info(). + Str("task_id", update.TaskID.String()). + Str("status", string(update.Status)). + Str("reason", update.Reason). + Msg("Task status updated") + + return nil +} + +// Helper methods + +func (s *Service) scanTask(scanner interface{ Scan(...interface{}) error }) (*Task, error) { + task := &Task{} + var sourceConfigJSON, labelsJSON, techStackJSON, requirementsJSON []byte + + err := scanner.Scan( + &task.ID, &task.ExternalID, &task.ExternalURL, &task.SourceType, &sourceConfigJSON, + &task.Title, &task.Description, &task.Status, &task.Priority, + &task.AssignedTeamID, &task.AssignedAgentID, + &task.Repository, &task.ProjectID, &labelsJSON, &techStackJSON, &requirementsJSON, + &task.EstimatedHours, &task.ComplexityScore, + &task.ClaimedAt, &task.StartedAt, &task.CompletedAt, + &task.CreatedAt, &task.UpdatedAt, &task.ExternalCreatedAt, &task.ExternalUpdatedAt, + ) + + if err != nil { + return nil, err + } + + // Parse JSON fields + if len(sourceConfigJSON) > 0 { + json.Unmarshal(sourceConfigJSON, &task.SourceConfig) + } + if len(labelsJSON) > 0 { + json.Unmarshal(labelsJSON, &task.Labels) + } + if len(techStackJSON) > 0 { + json.Unmarshal(techStackJSON, &task.TechStack) + } + if len(requirementsJSON) > 0 { + json.Unmarshal(requirementsJSON, &task.Requirements) + } + + return task, nil +} + +func (s *Service) inferTechStack(labels []string, description string) []string { + techMap := map[string]bool{ + "go": true, "golang": true, "javascript": true, "react": true, "node": true, + "python": true, "java": true, "rust": true, "docker": true, "postgres": true, + "mysql": true, "redis": true, "api": true, "backend": true, "frontend": true, + "database": true, "typescript": true, "vue": true, "angular": true, + } + + techSet := make(map[string]bool) + + // Check labels + for _, label := range labels { + if techMap[strings.ToLower(label)] { + techSet[strings.ToLower(label)] = true + } + } + + // Check description for tech keywords + descLower := strings.ToLower(description) + for tech := range techMap { + if strings.Contains(descLower, tech) { + techSet[tech] = true + } + } + + // Convert to slice + var techStack []string + for tech := range techSet { + techStack = append(techStack, tech) + } + + return techStack +} + +func (s *Service) extractRequirements(description string) []string { + // Simple requirement extraction - look for bullet points, numbered lists, etc. + // This could be enhanced with NLP in the future + lines := strings.Split(description, "\n") + var requirements []string + + for _, line := range lines { + line = strings.TrimSpace(line) + if strings.HasPrefix(line, "- ") || strings.HasPrefix(line, "* ") || + strings.HasPrefix(line, "• ") || len(line) > 0 && line[0] >= '1' && line[0] <= '9' && strings.Contains(line, ".") { + req := strings.TrimPrefix(line, "- ") + req = strings.TrimPrefix(req, "* ") + req = strings.TrimPrefix(req, "• ") + if len(req) > 10 { // Filter out very short items + requirements = append(requirements, strings.TrimSpace(req)) + } + } + } + + return requirements +} + +func (s *Service) estimateComplexity(task *Task) float64 { + complexity := 0.3 // Base complexity + + // Factor in description length + if len(task.Description) > 500 { + complexity += 0.2 + } else if len(task.Description) > 200 { + complexity += 0.1 + } + + // Factor in tech stack diversity + if len(task.TechStack) > 3 { + complexity += 0.2 + } else if len(task.TechStack) > 1 { + complexity += 0.1 + } + + // Factor in requirements count + if len(task.Requirements) > 5 { + complexity += 0.2 + } else if len(task.Requirements) > 2 { + complexity += 0.1 + } + + // Factor in estimated hours + if task.EstimatedHours > 20 { + complexity += 0.2 + } else if task.EstimatedHours > 8 { + complexity += 0.1 + } + + // Cap at 1.0 + if complexity > 1.0 { + complexity = 1.0 + } + + return complexity +} \ No newline at end of file diff --git a/migrations/002_add_tasks_table.down.sql b/migrations/002_add_tasks_table.down.sql new file mode 100644 index 0000000..a419c68 --- /dev/null +++ b/migrations/002_add_tasks_table.down.sql @@ -0,0 +1,11 @@ +-- Rollback tasks table creation +ALTER TABLE teams DROP COLUMN IF EXISTS current_task_id; +DROP INDEX IF EXISTS idx_tasks_external_id_source; +DROP INDEX IF EXISTS idx_tasks_created_at; +DROP INDEX IF EXISTS idx_tasks_priority; +DROP INDEX IF EXISTS idx_tasks_repository; +DROP INDEX IF EXISTS idx_tasks_assigned_agent; +DROP INDEX IF EXISTS idx_tasks_assigned_team; +DROP INDEX IF EXISTS idx_tasks_source_type; +DROP INDEX IF EXISTS idx_tasks_status; +DROP TABLE IF EXISTS tasks; \ No newline at end of file diff --git a/migrations/002_add_tasks_table.up.sql b/migrations/002_add_tasks_table.up.sql new file mode 100644 index 0000000..cf09d41 --- /dev/null +++ b/migrations/002_add_tasks_table.up.sql @@ -0,0 +1,57 @@ +-- Tasks table for generic task management +-- Supports GITEA, GitHub, Jira, and other task sources +CREATE TABLE tasks ( + id UUID PRIMARY KEY DEFAULT gen_random_uuid(), + external_id VARCHAR(255) NOT NULL, -- GITEA issue ID, GitHub issue ID, etc. + external_url TEXT NOT NULL, -- Full URL to external task + source_type VARCHAR(50) NOT NULL DEFAULT 'gitea', -- 'gitea', 'github', 'jira', 'manual' + source_config JSONB DEFAULT '{}', -- Source-specific configuration + + -- Core task data + title VARCHAR(500) NOT NULL, + description TEXT, + status VARCHAR(50) NOT NULL DEFAULT 'open', + priority VARCHAR(20) NOT NULL DEFAULT 'medium', + + -- Assignment and team data + assigned_team_id UUID REFERENCES teams(id) ON DELETE SET NULL, + assigned_agent_id UUID REFERENCES agents(id) ON DELETE SET NULL, + + -- Repository and project context + repository VARCHAR(255), -- e.g., "owner/repo" + project_id VARCHAR(255), + + -- Task metadata + labels JSONB DEFAULT '[]', + tech_stack JSONB DEFAULT '[]', + requirements JSONB DEFAULT '[]', + estimated_hours INTEGER, + complexity_score FLOAT, + + -- Workflow tracking + claimed_at TIMESTAMP WITH TIME ZONE, + started_at TIMESTAMP WITH TIME ZONE, + completed_at TIMESTAMP WITH TIME ZONE, + + -- Timestamps + created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(), + external_created_at TIMESTAMP WITH TIME ZONE, + external_updated_at TIMESTAMP WITH TIME ZONE, + + -- Constraints + UNIQUE(external_id, source_type) -- Prevent duplicate external tasks +); + +-- Indexes for performance +CREATE INDEX idx_tasks_status ON tasks(status); +CREATE INDEX idx_tasks_source_type ON tasks(source_type); +CREATE INDEX idx_tasks_assigned_team ON tasks(assigned_team_id); +CREATE INDEX idx_tasks_assigned_agent ON tasks(assigned_agent_id); +CREATE INDEX idx_tasks_repository ON tasks(repository); +CREATE INDEX idx_tasks_priority ON tasks(priority); +CREATE INDEX idx_tasks_created_at ON tasks(created_at); +CREATE INDEX idx_tasks_external_id_source ON tasks(external_id, source_type); + +-- Update teams table to link back to tasks +ALTER TABLE teams ADD COLUMN IF NOT EXISTS current_task_id UUID REFERENCES tasks(id) ON DELETE SET NULL; \ No newline at end of file