package monitor import ( "context" "database/sql" "encoding/json" "fmt" "strconv" "strings" "time" "github.com/chorus-services/whoosh/internal/config" "github.com/chorus-services/whoosh/internal/gitea" "github.com/jackc/pgx/v5/pgxpool" "github.com/rs/zerolog/log" ) // Monitor manages repository monitoring and task creation type Monitor struct { db *pgxpool.Pool gitea *gitea.Client stopCh chan struct{} syncInterval time.Duration } // NewMonitor creates a new repository monitor func NewMonitor(db *pgxpool.Pool, giteaCfg config.GITEAConfig) *Monitor { return &Monitor{ db: db, gitea: gitea.NewClient(giteaCfg), stopCh: make(chan struct{}), syncInterval: 5 * time.Minute, // Default sync every 5 minutes } } // GetGiteaClient returns the Gitea client for external use func (m *Monitor) GetGiteaClient() *gitea.Client { return m.gitea } // Start begins the monitoring process func (m *Monitor) Start(ctx context.Context) error { log.Info().Msg("🔍 Starting repository monitoring service") // Test Gitea connection if err := m.gitea.TestConnection(ctx); err != nil { log.Error().Err(err).Msg("Failed to connect to Gitea") return fmt.Errorf("gitea connection failed: %w", err) } log.Info().Msg("✅ Gitea connection established") // Start monitoring loop ticker := time.NewTicker(m.syncInterval) defer ticker.Stop() // Initial sync m.syncAllRepositories(ctx) for { select { case <-ctx.Done(): log.Info().Msg("🛑 Repository monitoring service stopping") return ctx.Err() case <-m.stopCh: log.Info().Msg("🛑 Repository monitoring service stopped") return nil case <-ticker.C: m.syncAllRepositories(ctx) } } } // Stop stops the monitoring service func (m *Monitor) Stop() { close(m.stopCh) } // syncAllRepositories syncs all monitored repositories func (m *Monitor) syncAllRepositories(ctx context.Context) { log.Info().Msg("🔄 Starting repository sync cycle") repos, err := m.getMonitoredRepositories(ctx) if err != nil { log.Error().Err(err).Msg("Failed to get monitored repositories") return } if len(repos) == 0 { log.Info().Msg("No repositories to monitor") return } log.Info().Int("count", len(repos)).Msg("Syncing repositories") for _, repo := range repos { select { case <-ctx.Done(): return default: m.syncRepository(ctx, repo) } } log.Info().Msg("✅ Repository sync cycle completed") } // syncRepository syncs a single repository func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) { log.Info(). Str("repository", repo.FullName). Msg("Syncing repository") startTime := time.Now() // Update repository status to active if err := m.updateRepositoryStatus(ctx, repo.ID, "active", nil); err != nil { log.Error().Err(err). Str("repository", repo.FullName). Msg("Failed to update repository status to active") } // Get issues since last sync opts := gitea.IssueListOptions{ State: "open", Limit: 100, } if repo.LastIssueSync != nil { opts.Since = *repo.LastIssueSync } // Filter by CHORUS task labels if enabled if repo.EnableChorusIntegration && len(repo.ChorusTaskLabels) > 0 { opts.Labels = strings.Join(repo.ChorusTaskLabels, ",") } issues, err := m.gitea.GetIssues(ctx, repo.Owner, repo.Name, opts) if err != nil { m.logSyncError(ctx, repo.ID, "fetch_issues", fmt.Sprintf("Failed to fetch issues: %v", err)) log.Error().Err(err). Str("repository", repo.FullName). Msg("Failed to fetch issues") if err := m.updateRepositoryStatus(ctx, repo.ID, "error", err); err != nil { log.Error().Err(err).Msg("Failed to update repository status to error") } return } created := 0 updated := 0 for _, issue := range issues { // Skip issues without CHORUS labels if CHORUS integration is enabled if repo.EnableChorusIntegration && !m.hasChorusLabels(issue, repo.ChorusTaskLabels) { continue } taskID, isNew, err := m.createOrUpdateTask(ctx, repo, issue) if err != nil { log.Error().Err(err). Str("repository", repo.FullName). Int64("issue", issue.Number). Msg("Failed to create/update task") continue } if isNew { created++ log.Info(). Str("repository", repo.FullName). Int64("issue", issue.Number). Str("task_id", taskID). Msg("Created task from issue") } else { updated++ } } duration := time.Since(startTime) // Update repository sync timestamps and statistics if err := m.updateRepositorySyncInfo(ctx, repo.ID, time.Now(), created, updated); err != nil { log.Error().Err(err). Str("repository", repo.FullName). Msg("Failed to update repository sync info") } // Log successful sync m.logSyncSuccess(ctx, repo.ID, "full_sync", duration, len(issues), created, updated) log.Info(). Str("repository", repo.FullName). Int("issues_processed", len(issues)). Int("tasks_created", created). Int("tasks_updated", updated). Dur("duration", duration). Msg("Repository sync completed") } // createOrUpdateTask creates a new task or updates an existing one from a Gitea issue func (m *Monitor) createOrUpdateTask(ctx context.Context, repo RepositoryConfig, issue gitea.Issue) (string, bool, error) { // Check if task already exists var existingTaskID sql.NullString query := `SELECT id FROM tasks WHERE external_id = $1 AND source_type = $2` err := m.db.QueryRow(ctx, query, strconv.FormatInt(issue.Number, 10), repo.SourceType).Scan(&existingTaskID) if err != nil && err != sql.ErrNoRows { return "", false, fmt.Errorf("failed to check existing task: %w", err) } // Prepare labels labels := make([]string, len(issue.Labels)) for i, label := range issue.Labels { labels[i] = label.Name } labelsJSON, _ := json.Marshal(labels) // Determine task status status := "open" if issue.State == "closed" { status = "completed" } // Determine priority from labels priority := m.extractPriorityFromLabels(issue.Labels) // Extract tech stack from labels and description techStack := m.extractTechStackFromIssue(issue) techStackJSON, _ := json.Marshal(techStack) if existingTaskID.Valid { // Update existing task updateQuery := ` UPDATE tasks SET title = $1, description = $2, status = $3, priority = $4, labels = $5, tech_stack = $6, external_updated_at = $7, updated_at = NOW() WHERE id = $8 ` _, err = m.db.Exec(ctx, updateQuery, issue.Title, issue.Body, status, priority, labelsJSON, techStackJSON, issue.UpdatedAt, existingTaskID.String, ) if err != nil { return "", false, fmt.Errorf("failed to update task: %w", err) } return existingTaskID.String, false, nil } else { // Create new task var taskID string insertQuery := ` INSERT INTO tasks ( external_id, external_url, source_type, source_config, title, description, status, priority, repository, repository_id, labels, tech_stack, external_created_at, external_updated_at ) VALUES ( $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) RETURNING id ` // Prepare source config sourceConfig := map[string]interface{}{ "repository_id": repo.ID, "issue_number": issue.Number, "issue_id": issue.ID, } sourceConfigJSON, _ := json.Marshal(sourceConfig) err = m.db.QueryRow(ctx, insertQuery, strconv.FormatInt(issue.Number, 10), // external_id issue.HTMLURL, // external_url repo.SourceType, // source_type sourceConfigJSON, // source_config issue.Title, // title issue.Body, // description status, // status priority, // priority repo.FullName, // repository repo.ID, // repository_id labelsJSON, // labels techStackJSON, // tech_stack issue.CreatedAt, // external_created_at issue.UpdatedAt, // external_updated_at ).Scan(&taskID) if err != nil { return "", false, fmt.Errorf("failed to create task: %w", err) } return taskID, true, nil } } // hasChorusLabels checks if an issue has any of the required CHORUS labels func (m *Monitor) hasChorusLabels(issue gitea.Issue, requiredLabels []string) bool { if len(requiredLabels) == 0 { return true // If no specific labels required, all issues qualify } issueLabels := make(map[string]bool) for _, label := range issue.Labels { issueLabels[strings.ToLower(label.Name)] = true } for _, required := range requiredLabels { if issueLabels[strings.ToLower(required)] { return true } } return false } // extractPriorityFromLabels determines priority from issue labels func (m *Monitor) extractPriorityFromLabels(labels []gitea.Label) string { priorities := map[string]string{ "critical": "critical", "urgent": "critical", "high": "high", "medium": "medium", "low": "low", "minor": "low", } for _, label := range labels { if priority, exists := priorities[strings.ToLower(label.Name)]; exists { return priority } } return "medium" // Default priority } // extractTechStackFromIssue extracts technology stack from issue labels and content func (m *Monitor) extractTechStackFromIssue(issue gitea.Issue) []string { techStack := make(map[string]bool) // Extract from labels techLabels := []string{ "go", "golang", "javascript", "typescript", "python", "rust", "java", "react", "vue", "angular", "node.js", "express", "fastapi", "gin", "postgresql", "mysql", "mongodb", "redis", "docker", "kubernetes", "api", "frontend", "backend", "database", "devops", "ci/cd", } for _, label := range issue.Labels { labelName := strings.ToLower(label.Name) for _, tech := range techLabels { if strings.Contains(labelName, tech) { techStack[tech] = true } } } // Convert map to slice result := make([]string, 0, len(techStack)) for tech := range techStack { result = append(result, tech) } return result } // RepositoryConfig represents a monitored repository configuration type RepositoryConfig struct { ID string `db:"id"` Name string `db:"name"` Owner string `db:"owner"` FullName string `db:"full_name"` URL string `db:"url"` SourceType string `db:"source_type"` MonitorIssues bool `db:"monitor_issues"` EnableChorusIntegration bool `db:"enable_chorus_integration"` ChorusTaskLabels []string `db:"chorus_task_labels"` LastSync *time.Time `db:"last_sync_at"` LastIssueSync *time.Time `db:"last_issue_sync"` SyncStatus string `db:"sync_status"` } // getMonitoredRepositories retrieves all repositories that should be monitored func (m *Monitor) getMonitoredRepositories(ctx context.Context) ([]RepositoryConfig, error) { query := ` SELECT id, name, owner, full_name, url, source_type, monitor_issues, enable_chorus_integration, chorus_task_labels, last_sync_at, last_issue_sync, sync_status FROM repositories WHERE monitor_issues = true AND sync_status != 'disabled' ORDER BY last_sync_at ASC NULLS FIRST ` rows, err := m.db.Query(ctx, query) if err != nil { return nil, fmt.Errorf("failed to query repositories: %w", err) } defer rows.Close() var repos []RepositoryConfig for rows.Next() { var repo RepositoryConfig var chorusLabelsJSON []byte err := rows.Scan( &repo.ID, &repo.Name, &repo.Owner, &repo.FullName, &repo.URL, &repo.SourceType, &repo.MonitorIssues, &repo.EnableChorusIntegration, &chorusLabelsJSON, &repo.LastSync, &repo.LastIssueSync, &repo.SyncStatus, ) if err != nil { log.Error().Err(err).Msg("Failed to scan repository row") continue } // Parse CHORUS task labels if err := json.Unmarshal(chorusLabelsJSON, &repo.ChorusTaskLabels); err != nil { log.Error().Err(err).Str("repository", repo.FullName).Msg("Failed to parse CHORUS task labels") repo.ChorusTaskLabels = []string{"bzzz-task", "chorus-task"} // Default labels } repos = append(repos, repo) } return repos, nil } // updateRepositoryStatus updates the sync status of a repository func (m *Monitor) updateRepositoryStatus(ctx context.Context, repoID, status string, err error) error { var errorMsg sql.NullString if err != nil { errorMsg.String = err.Error() errorMsg.Valid = true } query := ` UPDATE repositories SET sync_status = $1, sync_error = $2, updated_at = NOW() WHERE id = $3 ` _, dbErr := m.db.Exec(ctx, query, status, errorMsg, repoID) if dbErr != nil { return fmt.Errorf("failed to update repository status: %w", dbErr) } return nil } // updateRepositorySyncInfo updates repository sync information func (m *Monitor) updateRepositorySyncInfo(ctx context.Context, repoID string, syncTime time.Time, created, updated int) error { query := ` UPDATE repositories SET last_sync_at = $1, last_issue_sync = $1, total_tasks_created = total_tasks_created + $2, updated_at = NOW() WHERE id = $3 ` _, err := m.db.Exec(ctx, query, syncTime, created, repoID) if err != nil { return fmt.Errorf("failed to update repository sync info: %w", err) } return nil } // logSyncSuccess logs a successful sync operation func (m *Monitor) logSyncSuccess(ctx context.Context, repoID, operation string, duration time.Duration, processed, created, updated int) { query := ` INSERT INTO repository_sync_logs ( repository_id, sync_type, operation, status, message, items_processed, items_created, items_updated, duration_ms ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ` message := fmt.Sprintf("Processed %d items, created %d tasks, updated %d tasks in %v", processed, created, updated, duration) _, err := m.db.Exec(ctx, query, repoID, "full_sync", operation, "success", message, processed, created, updated, duration.Milliseconds(), ) if err != nil { log.Error().Err(err).Msg("Failed to log sync success") } } // logSyncError logs a sync error func (m *Monitor) logSyncError(ctx context.Context, repoID, operation, errorMsg string) { query := ` INSERT INTO repository_sync_logs ( repository_id, sync_type, operation, status, message, error_details ) VALUES ($1, $2, $3, $4, $5, $6) ` errorDetails := map[string]interface{}{ "error": errorMsg, "timestamp": time.Now().Format(time.RFC3339), } errorDetailsJSON, _ := json.Marshal(errorDetails) _, err := m.db.Exec(ctx, query, repoID, "full_sync", operation, "error", errorMsg, errorDetailsJSON, ) if err != nil { log.Error().Err(err).Msg("Failed to log sync error") } } // SyncRepository manually syncs a specific repository by ID func (m *Monitor) SyncRepository(ctx context.Context, repoID string) error { log.Info().Str("repository_id", repoID).Msg("Manual repository sync requested") // Get repository configuration repo, err := m.getRepositoryByID(ctx, repoID) if err != nil { return fmt.Errorf("failed to get repository: %w", err) } // Sync the repository m.syncRepository(ctx, *repo) return nil } // getRepositoryByID retrieves a specific repository configuration by ID func (m *Monitor) getRepositoryByID(ctx context.Context, repoID string) (*RepositoryConfig, error) { query := ` SELECT id, name, owner, full_name, url, source_type, monitor_issues, enable_chorus_integration, chorus_task_labels, last_sync_at, last_issue_sync, sync_status FROM repositories WHERE id = $1 ` var repo RepositoryConfig var chorusLabelsJSON []byte err := m.db.QueryRow(ctx, query, repoID).Scan( &repo.ID, &repo.Name, &repo.Owner, &repo.FullName, &repo.URL, &repo.SourceType, &repo.MonitorIssues, &repo.EnableChorusIntegration, &chorusLabelsJSON, &repo.LastSync, &repo.LastIssueSync, &repo.SyncStatus, ) if err != nil { if err == sql.ErrNoRows { return nil, fmt.Errorf("repository not found: %s", repoID) } return nil, fmt.Errorf("failed to query repository: %w", err) } // Parse CHORUS task labels if err := json.Unmarshal(chorusLabelsJSON, &repo.ChorusTaskLabels); err != nil { log.Error().Err(err).Str("repository", repo.FullName).Msg("Failed to parse CHORUS task labels") repo.ChorusTaskLabels = []string{"bzzz-task", "chorus-task"} // Default labels } return &repo, nil }