Major security, observability, and configuration improvements:
## Security Hardening
- Implemented configurable CORS (no more wildcards)
- Added comprehensive auth middleware for admin endpoints
- Enhanced webhook HMAC validation
- Added input validation and rate limiting
- Security headers and CSP policies
## Configuration Management
- Made N8N webhook URL configurable (WHOOSH_N8N_BASE_URL)
- Replaced all hardcoded endpoints with environment variables
- Added feature flags for LLM vs heuristic composition
- Gitea fetch hardening with EAGER_FILTER and FULL_RESCAN options
## API Completeness
- Implemented GetCouncilComposition function
- Added GET /api/v1/councils/{id} endpoint
- Council artifacts API (POST/GET /api/v1/councils/{id}/artifacts)
- /admin/health/details endpoint with component status
- Database lookup for repository URLs (no hardcoded fallbacks)
## Observability & Performance
- Added OpenTelemetry distributed tracing with goal/pulse correlation
- Performance optimization database indexes
- Comprehensive health monitoring
- Enhanced logging and error handling
## Infrastructure
- Production-ready P2P discovery (replaces mock implementation)
- Removed unused Redis configuration
- Enhanced Docker Swarm integration
- Added migration files for performance indexes
## Code Quality
- Comprehensive input validation
- Graceful error handling and failsafe fallbacks
- Backwards compatibility maintained
- Following security best practices
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
1118 lines
34 KiB
Go
1118 lines
34 KiB
Go
package monitor
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/chorus-services/whoosh/internal/composer"
|
|
"github.com/chorus-services/whoosh/internal/config"
|
|
"github.com/chorus-services/whoosh/internal/council"
|
|
"github.com/chorus-services/whoosh/internal/gitea"
|
|
"github.com/chorus-services/whoosh/internal/orchestrator"
|
|
"github.com/chorus-services/whoosh/internal/tracing"
|
|
"github.com/google/uuid"
|
|
"github.com/jackc/pgx/v5"
|
|
"github.com/jackc/pgx/v5/pgxpool"
|
|
"github.com/rs/zerolog/log"
|
|
"go.opentelemetry.io/otel/attribute"
|
|
)
|
|
|
|
// Monitor manages repository monitoring and task creation
|
|
type Monitor struct {
|
|
db *pgxpool.Pool
|
|
gitea *gitea.Client
|
|
composer *composer.Service
|
|
council *council.CouncilComposer
|
|
agentDeployer *orchestrator.AgentDeployer
|
|
stopCh chan struct{}
|
|
syncInterval time.Duration
|
|
}
|
|
|
|
// NewMonitor creates a new repository monitor
|
|
func NewMonitor(db *pgxpool.Pool, giteaCfg config.GITEAConfig, composerService *composer.Service, councilComposer *council.CouncilComposer, agentDeployer *orchestrator.AgentDeployer) *Monitor {
|
|
return &Monitor{
|
|
db: db,
|
|
gitea: gitea.NewClient(giteaCfg),
|
|
composer: composerService,
|
|
council: councilComposer,
|
|
agentDeployer: agentDeployer,
|
|
stopCh: make(chan struct{}),
|
|
syncInterval: 5 * time.Minute, // Default sync every 5 minutes
|
|
}
|
|
}
|
|
|
|
// GetGiteaClient returns the Gitea client for external use
|
|
func (m *Monitor) GetGiteaClient() *gitea.Client {
|
|
return m.gitea
|
|
}
|
|
|
|
// Start begins the monitoring process
|
|
func (m *Monitor) Start(ctx context.Context) error {
|
|
log.Info().Msg("🔍 Starting repository monitoring service")
|
|
|
|
// Test Gitea connection
|
|
if err := m.gitea.TestConnection(ctx); err != nil {
|
|
log.Error().Err(err).Msg("Failed to connect to Gitea")
|
|
return fmt.Errorf("gitea connection failed: %w", err)
|
|
}
|
|
|
|
log.Info().Msg("✅ Gitea connection established")
|
|
|
|
// Start monitoring loop
|
|
ticker := time.NewTicker(m.syncInterval)
|
|
defer ticker.Stop()
|
|
|
|
// Initial sync
|
|
m.syncAllRepositories(ctx)
|
|
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
log.Info().Msg("🛑 Repository monitoring service stopping")
|
|
return ctx.Err()
|
|
case <-m.stopCh:
|
|
log.Info().Msg("🛑 Repository monitoring service stopped")
|
|
return nil
|
|
case <-ticker.C:
|
|
m.syncAllRepositories(ctx)
|
|
}
|
|
}
|
|
}
|
|
|
|
// Stop stops the monitoring service
|
|
func (m *Monitor) Stop() {
|
|
close(m.stopCh)
|
|
}
|
|
|
|
// syncAllRepositories syncs all monitored repositories
|
|
func (m *Monitor) syncAllRepositories(ctx context.Context) {
|
|
ctx, span := tracing.StartMonitorSpan(ctx, "sync_all_repositories", "all")
|
|
defer span.End()
|
|
|
|
log.Info().Msg("🔄 Starting repository sync cycle")
|
|
|
|
repos, err := m.getMonitoredRepositories(ctx)
|
|
if err != nil {
|
|
tracing.SetSpanError(span, err)
|
|
log.Error().Err(err).Msg("Failed to get monitored repositories")
|
|
return
|
|
}
|
|
|
|
span.SetAttributes(attribute.Int("repositories.count", len(repos)))
|
|
|
|
if len(repos) == 0 {
|
|
log.Info().Msg("No repositories to monitor")
|
|
return
|
|
}
|
|
|
|
log.Info().Int("count", len(repos)).Msg("Syncing repositories")
|
|
|
|
for _, repo := range repos {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
default:
|
|
m.syncRepository(ctx, repo)
|
|
}
|
|
}
|
|
|
|
span.SetAttributes(attribute.String("sync.status", "completed"))
|
|
log.Info().Msg("✅ Repository sync cycle completed")
|
|
}
|
|
|
|
// syncRepository syncs a single repository
|
|
func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) {
|
|
ctx, span := tracing.StartMonitorSpan(ctx, "sync_repository", repo.FullName)
|
|
defer span.End()
|
|
|
|
span.SetAttributes(
|
|
attribute.String("repository.id", repo.ID),
|
|
attribute.String("repository.owner", repo.Owner),
|
|
attribute.String("repository.name", repo.Name),
|
|
attribute.String("repository.sync_status", repo.SyncStatus),
|
|
attribute.Bool("repository.chorus_enabled", repo.EnableChorusIntegration),
|
|
)
|
|
|
|
log.Info().
|
|
Str("repository", repo.FullName).
|
|
Msg("Syncing repository")
|
|
|
|
startTime := time.Now()
|
|
|
|
// Update repository status to active
|
|
if err := m.updateRepositoryStatus(ctx, repo.ID, "active", nil); err != nil {
|
|
log.Error().Err(err).
|
|
Str("repository", repo.FullName).
|
|
Msg("Failed to update repository status to active")
|
|
}
|
|
|
|
// Get issues since last sync
|
|
opts := gitea.IssueListOptions{
|
|
State: "open",
|
|
Limit: 100,
|
|
}
|
|
|
|
// Only use Since parameter for repositories that have completed initial scan
|
|
// For initial_scan or pending status, we want to scan ALL issues to find Design Briefs and UCXL content
|
|
if repo.LastIssueSync != nil && repo.SyncStatus != "initial_scan" && repo.SyncStatus != "pending" {
|
|
opts.Since = *repo.LastIssueSync
|
|
log.Debug().
|
|
Str("repository", repo.FullName).
|
|
Time("since", *repo.LastIssueSync).
|
|
Msg("Using incremental sync with Since parameter")
|
|
} else {
|
|
log.Info().
|
|
Str("repository", repo.FullName).
|
|
Str("sync_status", repo.SyncStatus).
|
|
Msg("Performing full scan (no Since parameter) - initial scan or looking for Design Briefs")
|
|
}
|
|
|
|
// Filter by CHORUS task labels if enabled
|
|
if repo.EnableChorusIntegration && len(repo.ChorusTaskLabels) > 0 {
|
|
opts.Labels = strings.Join(repo.ChorusTaskLabels, ",")
|
|
}
|
|
|
|
issues, err := m.gitea.GetIssues(ctx, repo.Owner, repo.Name, opts)
|
|
if err != nil {
|
|
m.logSyncError(ctx, repo.ID, "fetch_issues", fmt.Sprintf("Failed to fetch issues: %v", err))
|
|
log.Error().Err(err).
|
|
Str("repository", repo.FullName).
|
|
Msg("Failed to fetch issues")
|
|
|
|
if err := m.updateRepositoryStatus(ctx, repo.ID, "error", err); err != nil {
|
|
log.Error().Err(err).Msg("Failed to update repository status to error")
|
|
}
|
|
return
|
|
}
|
|
|
|
created := 0
|
|
updated := 0
|
|
|
|
for _, issue := range issues {
|
|
// Skip issues without CHORUS labels if CHORUS integration is enabled
|
|
if repo.EnableChorusIntegration && !m.hasChorusLabels(issue, repo.ChorusTaskLabels) {
|
|
continue
|
|
}
|
|
|
|
taskID, isNew, err := m.createOrUpdateTask(ctx, repo, issue)
|
|
if err != nil {
|
|
log.Error().Err(err).
|
|
Str("repository", repo.FullName).
|
|
Int64("issue", issue.Number).
|
|
Msg("Failed to create/update task")
|
|
continue
|
|
}
|
|
|
|
if isNew {
|
|
created++
|
|
log.Info().
|
|
Str("repository", repo.FullName).
|
|
Int64("issue", issue.Number).
|
|
Str("task_id", taskID).
|
|
Msg("Created task from issue")
|
|
} else {
|
|
updated++
|
|
}
|
|
|
|
// Check if this issue should trigger council formation
|
|
if m.isProjectKickoffBrief(issue, repo) {
|
|
m.triggerCouncilFormation(ctx, taskID, issue, repo)
|
|
}
|
|
}
|
|
|
|
duration := time.Since(startTime)
|
|
|
|
// Add span attributes for the sync results
|
|
span.SetAttributes(
|
|
attribute.Int("issues.processed", len(issues)),
|
|
attribute.Int("tasks.created", created),
|
|
attribute.Int("tasks.updated", updated),
|
|
attribute.Int64("duration.ms", duration.Milliseconds()),
|
|
)
|
|
|
|
// Check if repository should transition from initial scan to active status
|
|
if repo.SyncStatus == "initial_scan" || repo.SyncStatus == "pending" {
|
|
// Repository has completed initial scan
|
|
// For now, transition to active if we processed any issues or found Design Briefs
|
|
// Future: Add UCXL content detection logic here
|
|
shouldActivate := (created > 0 || updated > 0)
|
|
|
|
if shouldActivate {
|
|
log.Info().
|
|
Str("repository", repo.FullName).
|
|
Int("tasks_created", created).
|
|
Int("tasks_updated", updated).
|
|
Msg("Transitioning repository from initial scan to active status - content found")
|
|
|
|
if err := m.updateRepositoryStatus(ctx, repo.ID, "active", nil); err != nil {
|
|
tracing.SetSpanError(span, err)
|
|
log.Error().Err(err).
|
|
Str("repository", repo.FullName).
|
|
Msg("Failed to transition repository to active status")
|
|
} else {
|
|
span.SetAttributes(attribute.String("repository.transition", "initial_scan_to_active"))
|
|
}
|
|
} else {
|
|
log.Info().
|
|
Str("repository", repo.FullName).
|
|
Msg("Initial scan completed - no content found, keeping in initial_scan status")
|
|
span.SetAttributes(attribute.String("repository.transition", "initial_scan_no_content"))
|
|
}
|
|
}
|
|
|
|
// Update repository sync timestamps and statistics
|
|
if err := m.updateRepositorySyncInfo(ctx, repo.ID, time.Now(), created, updated); err != nil {
|
|
tracing.SetSpanError(span, err)
|
|
log.Error().Err(err).
|
|
Str("repository", repo.FullName).
|
|
Msg("Failed to update repository sync info")
|
|
}
|
|
|
|
// Log successful sync
|
|
m.logSyncSuccess(ctx, repo.ID, "full_sync", duration, len(issues), created, updated)
|
|
|
|
log.Info().
|
|
Str("repository", repo.FullName).
|
|
Int("issues_processed", len(issues)).
|
|
Int("tasks_created", created).
|
|
Int("tasks_updated", updated).
|
|
Dur("duration", duration).
|
|
Msg("Repository sync completed")
|
|
}
|
|
|
|
// createOrUpdateTask creates a new task or updates an existing one from a Gitea issue
|
|
func (m *Monitor) createOrUpdateTask(ctx context.Context, repo RepositoryConfig, issue gitea.Issue) (string, bool, error) {
|
|
// Check if task already exists
|
|
var existingTaskID string
|
|
query := `SELECT id FROM tasks WHERE external_id = $1 AND source_type = $2`
|
|
err := m.db.QueryRow(ctx, query, strconv.FormatInt(issue.Number, 10), repo.SourceType).Scan(&existingTaskID)
|
|
|
|
taskExists := err == nil
|
|
|
|
// Handle errors other than "no rows found"
|
|
if err != nil && err != pgx.ErrNoRows {
|
|
return "", false, fmt.Errorf("failed to check existing task: %w", err)
|
|
}
|
|
|
|
// Prepare labels
|
|
labels := make([]string, len(issue.Labels))
|
|
for i, label := range issue.Labels {
|
|
labels[i] = label.Name
|
|
}
|
|
labelsJSON, _ := json.Marshal(labels)
|
|
|
|
// Determine task status
|
|
status := "open"
|
|
if issue.State == "closed" {
|
|
status = "completed"
|
|
}
|
|
|
|
// Determine priority from labels
|
|
priority := m.extractPriorityFromLabels(issue.Labels)
|
|
|
|
// Extract tech stack from labels and description
|
|
techStack := m.extractTechStackFromIssue(issue)
|
|
techStackJSON, _ := json.Marshal(techStack)
|
|
|
|
if taskExists {
|
|
// Task exists - update it
|
|
updateQuery := `
|
|
UPDATE tasks SET
|
|
title = $1,
|
|
description = $2,
|
|
status = $3,
|
|
priority = $4,
|
|
labels = $5,
|
|
tech_stack = $6,
|
|
external_updated_at = $7,
|
|
updated_at = NOW()
|
|
WHERE id = $8
|
|
`
|
|
|
|
_, err = m.db.Exec(ctx, updateQuery,
|
|
issue.Title,
|
|
issue.Body,
|
|
status,
|
|
priority,
|
|
labelsJSON,
|
|
techStackJSON,
|
|
issue.UpdatedAt,
|
|
existingTaskID,
|
|
)
|
|
|
|
if err != nil {
|
|
return "", false, fmt.Errorf("failed to update task: %w", err)
|
|
}
|
|
|
|
return existingTaskID, false, nil
|
|
} else {
|
|
// Create new task
|
|
var taskID string
|
|
insertQuery := `
|
|
INSERT INTO tasks (
|
|
external_id, external_url, source_type, source_config,
|
|
title, description, status, priority,
|
|
repository, repository_id, labels, tech_stack,
|
|
external_created_at, external_updated_at
|
|
) VALUES (
|
|
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
|
|
) RETURNING id
|
|
`
|
|
|
|
// Prepare source config
|
|
sourceConfig := map[string]interface{}{
|
|
"repository_id": repo.ID,
|
|
"issue_number": issue.Number,
|
|
"issue_id": issue.ID,
|
|
}
|
|
sourceConfigJSON, _ := json.Marshal(sourceConfig)
|
|
|
|
err = m.db.QueryRow(ctx, insertQuery,
|
|
strconv.FormatInt(issue.Number, 10), // external_id
|
|
issue.HTMLURL, // external_url
|
|
repo.SourceType, // source_type
|
|
sourceConfigJSON, // source_config
|
|
issue.Title, // title
|
|
issue.Body, // description
|
|
status, // status
|
|
priority, // priority
|
|
repo.FullName, // repository
|
|
repo.ID, // repository_id
|
|
labelsJSON, // labels
|
|
techStackJSON, // tech_stack
|
|
issue.CreatedAt, // external_created_at
|
|
issue.UpdatedAt, // external_updated_at
|
|
).Scan(&taskID)
|
|
|
|
if err != nil {
|
|
return "", false, fmt.Errorf("failed to create task: %w", err)
|
|
}
|
|
|
|
// For newly created bzzz-task issues, check if it's a council formation trigger
|
|
if m.composer != nil && m.shouldTriggerTeamComposition(issue.Labels) {
|
|
if m.isProjectKickoffBrief(issue, repo) {
|
|
// This is a project kickoff - trigger council formation
|
|
go m.triggerCouncilFormation(context.Background(), taskID, issue, repo)
|
|
} else {
|
|
// Regular bzzz-task - trigger normal team composition
|
|
go m.triggerTeamComposition(context.Background(), taskID, issue, repo)
|
|
}
|
|
}
|
|
|
|
return taskID, true, nil
|
|
}
|
|
}
|
|
|
|
// hasChorusLabels checks if an issue has any of the required CHORUS labels
|
|
func (m *Monitor) hasChorusLabels(issue gitea.Issue, requiredLabels []string) bool {
|
|
if len(requiredLabels) == 0 {
|
|
return true // If no specific labels required, all issues qualify
|
|
}
|
|
|
|
issueLabels := make(map[string]bool)
|
|
for _, label := range issue.Labels {
|
|
issueLabels[strings.ToLower(label.Name)] = true
|
|
}
|
|
|
|
for _, required := range requiredLabels {
|
|
if issueLabels[strings.ToLower(required)] {
|
|
return true
|
|
}
|
|
}
|
|
|
|
return false
|
|
}
|
|
|
|
// extractPriorityFromLabels determines priority from issue labels
|
|
func (m *Monitor) extractPriorityFromLabels(labels []gitea.Label) string {
|
|
priorities := map[string]string{
|
|
"critical": "critical",
|
|
"urgent": "critical",
|
|
"high": "high",
|
|
"medium": "medium",
|
|
"low": "low",
|
|
"minor": "low",
|
|
}
|
|
|
|
for _, label := range labels {
|
|
if priority, exists := priorities[strings.ToLower(label.Name)]; exists {
|
|
return priority
|
|
}
|
|
}
|
|
|
|
return "medium" // Default priority
|
|
}
|
|
|
|
// extractTechStackFromIssue extracts technology stack from issue labels and content
|
|
func (m *Monitor) extractTechStackFromIssue(issue gitea.Issue) []string {
|
|
techStack := make(map[string]bool)
|
|
|
|
// Extract from labels
|
|
techLabels := []string{
|
|
"go", "golang", "javascript", "typescript", "python", "rust", "java",
|
|
"react", "vue", "angular", "node.js", "express", "fastapi", "gin",
|
|
"postgresql", "mysql", "mongodb", "redis", "docker", "kubernetes",
|
|
"api", "frontend", "backend", "database", "devops", "ci/cd",
|
|
}
|
|
|
|
for _, label := range issue.Labels {
|
|
labelName := strings.ToLower(label.Name)
|
|
for _, tech := range techLabels {
|
|
if strings.Contains(labelName, tech) {
|
|
techStack[tech] = true
|
|
}
|
|
}
|
|
}
|
|
|
|
// Convert map to slice
|
|
result := make([]string, 0, len(techStack))
|
|
for tech := range techStack {
|
|
result = append(result, tech)
|
|
}
|
|
|
|
return result
|
|
}
|
|
|
|
// RepositoryConfig represents a monitored repository configuration
|
|
type RepositoryConfig struct {
|
|
ID string `db:"id"`
|
|
Name string `db:"name"`
|
|
Owner string `db:"owner"`
|
|
FullName string `db:"full_name"`
|
|
URL string `db:"url"`
|
|
SourceType string `db:"source_type"`
|
|
MonitorIssues bool `db:"monitor_issues"`
|
|
EnableChorusIntegration bool `db:"enable_chorus_integration"`
|
|
ChorusTaskLabels []string `db:"chorus_task_labels"`
|
|
LastSync *time.Time `db:"last_sync_at"`
|
|
LastIssueSync *time.Time `db:"last_issue_sync"`
|
|
SyncStatus string `db:"sync_status"`
|
|
}
|
|
|
|
// getMonitoredRepositories retrieves all repositories that should be monitored
|
|
func (m *Monitor) getMonitoredRepositories(ctx context.Context) ([]RepositoryConfig, error) {
|
|
query := `
|
|
SELECT id, name, owner, full_name, url, source_type, monitor_issues,
|
|
enable_chorus_integration, chorus_task_labels, last_sync_at,
|
|
last_issue_sync, sync_status
|
|
FROM repositories
|
|
WHERE monitor_issues = true AND sync_status != 'disabled'
|
|
ORDER BY last_sync_at ASC NULLS FIRST
|
|
`
|
|
|
|
rows, err := m.db.Query(ctx, query)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to query repositories: %w", err)
|
|
}
|
|
defer rows.Close()
|
|
|
|
var repos []RepositoryConfig
|
|
for rows.Next() {
|
|
var repo RepositoryConfig
|
|
var chorusLabelsJSON []byte
|
|
|
|
err := rows.Scan(
|
|
&repo.ID, &repo.Name, &repo.Owner, &repo.FullName, &repo.URL,
|
|
&repo.SourceType, &repo.MonitorIssues, &repo.EnableChorusIntegration,
|
|
&chorusLabelsJSON, &repo.LastSync, &repo.LastIssueSync, &repo.SyncStatus,
|
|
)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to scan repository row")
|
|
continue
|
|
}
|
|
|
|
// Parse CHORUS task labels
|
|
if err := json.Unmarshal(chorusLabelsJSON, &repo.ChorusTaskLabels); err != nil {
|
|
log.Error().Err(err).Str("repository", repo.FullName).Msg("Failed to parse CHORUS task labels")
|
|
repo.ChorusTaskLabels = []string{"bzzz-task", "chorus-task", "chorus-entrypoint"} // Default labels
|
|
}
|
|
|
|
repos = append(repos, repo)
|
|
}
|
|
|
|
return repos, nil
|
|
}
|
|
|
|
// updateRepositoryStatus updates the sync status of a repository
|
|
func (m *Monitor) updateRepositoryStatus(ctx context.Context, repoID, status string, err error) error {
|
|
var errorMsg *string
|
|
if err != nil {
|
|
errStr := err.Error()
|
|
errorMsg = &errStr
|
|
}
|
|
|
|
query := `
|
|
UPDATE repositories
|
|
SET sync_status = $1, sync_error = $2, updated_at = NOW()
|
|
WHERE id = $3
|
|
`
|
|
|
|
_, dbErr := m.db.Exec(ctx, query, status, errorMsg, repoID)
|
|
if dbErr != nil {
|
|
return fmt.Errorf("failed to update repository status: %w", dbErr)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateRepositorySyncInfo updates repository sync information
|
|
func (m *Monitor) updateRepositorySyncInfo(ctx context.Context, repoID string, syncTime time.Time, created, updated int) error {
|
|
query := `
|
|
UPDATE repositories
|
|
SET last_sync_at = $1, last_issue_sync = $1,
|
|
total_tasks_created = total_tasks_created + $2,
|
|
updated_at = NOW()
|
|
WHERE id = $3
|
|
`
|
|
|
|
_, err := m.db.Exec(ctx, query, syncTime, created, repoID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to update repository sync info: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// logSyncSuccess logs a successful sync operation
|
|
func (m *Monitor) logSyncSuccess(ctx context.Context, repoID, operation string, duration time.Duration, processed, created, updated int) {
|
|
query := `
|
|
INSERT INTO repository_sync_logs (
|
|
repository_id, sync_type, operation, status, message,
|
|
items_processed, items_created, items_updated, duration_ms
|
|
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
|
`
|
|
|
|
message := fmt.Sprintf("Processed %d items, created %d tasks, updated %d tasks in %v", processed, created, updated, duration)
|
|
|
|
_, err := m.db.Exec(ctx, query,
|
|
repoID, "full_sync", operation, "success", message,
|
|
processed, created, updated, duration.Milliseconds(),
|
|
)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to log sync success")
|
|
}
|
|
}
|
|
|
|
// logSyncError logs a sync error
|
|
func (m *Monitor) logSyncError(ctx context.Context, repoID, operation, errorMsg string) {
|
|
query := `
|
|
INSERT INTO repository_sync_logs (
|
|
repository_id, sync_type, operation, status, message, error_details
|
|
) VALUES ($1, $2, $3, $4, $5, $6)
|
|
`
|
|
|
|
errorDetails := map[string]interface{}{
|
|
"error": errorMsg,
|
|
"timestamp": time.Now().Format(time.RFC3339),
|
|
}
|
|
errorDetailsJSON, _ := json.Marshal(errorDetails)
|
|
|
|
_, err := m.db.Exec(ctx, query,
|
|
repoID, "full_sync", operation, "error", errorMsg, errorDetailsJSON,
|
|
)
|
|
if err != nil {
|
|
log.Error().Err(err).Msg("Failed to log sync error")
|
|
}
|
|
}
|
|
|
|
// SyncRepository manually syncs a specific repository by ID
|
|
func (m *Monitor) SyncRepository(ctx context.Context, repoID string) error {
|
|
log.Info().Str("repository_id", repoID).Msg("Manual repository sync requested")
|
|
|
|
// Get repository configuration
|
|
repo, err := m.getRepositoryByID(ctx, repoID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get repository: %w", err)
|
|
}
|
|
|
|
// Sync the repository
|
|
m.syncRepository(ctx, *repo)
|
|
|
|
return nil
|
|
}
|
|
|
|
// getRepositoryByID retrieves a specific repository configuration by ID
|
|
func (m *Monitor) getRepositoryByID(ctx context.Context, repoID string) (*RepositoryConfig, error) {
|
|
query := `
|
|
SELECT id, name, owner, full_name, url, source_type, monitor_issues,
|
|
enable_chorus_integration, chorus_task_labels, last_sync_at,
|
|
last_issue_sync, sync_status
|
|
FROM repositories
|
|
WHERE id = $1
|
|
`
|
|
|
|
var repo RepositoryConfig
|
|
var chorusLabelsJSON []byte
|
|
|
|
err := m.db.QueryRow(ctx, query, repoID).Scan(
|
|
&repo.ID, &repo.Name, &repo.Owner, &repo.FullName, &repo.URL,
|
|
&repo.SourceType, &repo.MonitorIssues, &repo.EnableChorusIntegration,
|
|
&chorusLabelsJSON, &repo.LastSync, &repo.LastIssueSync, &repo.SyncStatus,
|
|
)
|
|
if err != nil {
|
|
if err == pgx.ErrNoRows {
|
|
return nil, fmt.Errorf("repository not found: %s", repoID)
|
|
}
|
|
return nil, fmt.Errorf("failed to query repository: %w", err)
|
|
}
|
|
|
|
// Parse CHORUS task labels
|
|
if err := json.Unmarshal(chorusLabelsJSON, &repo.ChorusTaskLabels); err != nil {
|
|
log.Error().Err(err).Str("repository", repo.FullName).Msg("Failed to parse CHORUS task labels")
|
|
repo.ChorusTaskLabels = []string{"bzzz-task", "chorus-task", "chorus-entrypoint"} // Default labels
|
|
}
|
|
|
|
return &repo, nil
|
|
}
|
|
|
|
// shouldTriggerTeamComposition checks if the issue has labels that should trigger team composition
|
|
func (m *Monitor) shouldTriggerTeamComposition(labels []gitea.Label) bool {
|
|
for _, label := range labels {
|
|
if strings.ToLower(label.Name) == "bzzz-task" {
|
|
return true
|
|
}
|
|
}
|
|
return false
|
|
}
|
|
|
|
// isProjectKickoffBrief checks if the issue represents a new project kickoff council trigger
|
|
func (m *Monitor) isProjectKickoffBrief(issue gitea.Issue, repo RepositoryConfig) bool {
|
|
// Check if it has the chorus-entrypoint label
|
|
hasChorusEntrypoint := false
|
|
for _, label := range issue.Labels {
|
|
if strings.ToLower(label.Name) == "chorus-entrypoint" {
|
|
hasChorusEntrypoint = true
|
|
break
|
|
}
|
|
}
|
|
if !hasChorusEntrypoint {
|
|
return false
|
|
}
|
|
|
|
// Check if the issue title contains "Design Brief"
|
|
title := strings.ToLower(issue.Title)
|
|
if !strings.Contains(title, "design brief") {
|
|
return false
|
|
}
|
|
|
|
// Additional validation: this should be a new/empty repository
|
|
// For now, we'll rely on the title check, but could add repo analysis later
|
|
|
|
log.Info().
|
|
Str("repository", repo.FullName).
|
|
Str("issue_title", issue.Title).
|
|
Msg("🎭 Detected project kickoff brief - council formation required")
|
|
|
|
return true
|
|
}
|
|
|
|
// triggerTeamComposition initiates team composition for a newly created task
|
|
func (m *Monitor) triggerTeamComposition(ctx context.Context, taskID string, issue gitea.Issue, repo RepositoryConfig) {
|
|
log.Info().
|
|
Str("task_id", taskID).
|
|
Int64("issue_id", issue.ID).
|
|
Str("repository", repo.FullName).
|
|
Msg("🎯 Triggering team composition for bzzz-task")
|
|
|
|
// Convert Gitea issue to TaskAnalysisInput
|
|
techStack := m.extractTechStackFromIssue(issue)
|
|
requirements := m.extractRequirementsFromIssue(issue)
|
|
|
|
analysisInput := &composer.TaskAnalysisInput{
|
|
Title: issue.Title,
|
|
Description: issue.Body,
|
|
Requirements: requirements,
|
|
Repository: repo.FullName,
|
|
Priority: m.mapPriorityToComposer(m.extractPriorityFromLabels(issue.Labels)),
|
|
TechStack: techStack,
|
|
Metadata: map[string]interface{}{
|
|
"task_id": taskID,
|
|
"issue_id": issue.ID,
|
|
"issue_number": issue.Number,
|
|
"repository_id": repo.ID,
|
|
"external_url": issue.HTMLURL,
|
|
},
|
|
}
|
|
|
|
// Perform team composition analysis
|
|
result, err := m.composer.AnalyzeAndComposeTeam(ctx, analysisInput)
|
|
if err != nil {
|
|
log.Error().Err(err).
|
|
Str("task_id", taskID).
|
|
Msg("Failed to perform team composition analysis")
|
|
return
|
|
}
|
|
|
|
log.Info().
|
|
Str("task_id", taskID).
|
|
Str("team_id", result.TeamComposition.TeamID.String()).
|
|
Int("team_size", result.TeamComposition.EstimatedSize).
|
|
Float64("confidence", result.TeamComposition.ConfidenceScore).
|
|
Msg("✅ Team composition analysis completed")
|
|
|
|
// Create the team in the database
|
|
team, err := m.composer.CreateTeam(ctx, result.TeamComposition, analysisInput)
|
|
if err != nil {
|
|
log.Error().Err(err).
|
|
Str("task_id", taskID).
|
|
Msg("Failed to create team")
|
|
return
|
|
}
|
|
|
|
// Update task with team assignment
|
|
err = m.assignTaskToTeam(ctx, taskID, team.ID.String())
|
|
if err != nil {
|
|
log.Error().Err(err).
|
|
Str("task_id", taskID).
|
|
Str("team_id", team.ID.String()).
|
|
Msg("Failed to assign task to team")
|
|
return
|
|
}
|
|
|
|
// Deploy agents for the newly formed team if agent deployer is available
|
|
if m.agentDeployer != nil {
|
|
go m.deployTeamAgents(ctx, taskID, team, result.TeamComposition, repo)
|
|
}
|
|
|
|
log.Info().
|
|
Str("task_id", taskID).
|
|
Str("team_id", team.ID.String()).
|
|
Str("team_name", team.Name).
|
|
Msg("🚀 Task successfully assigned to team")
|
|
}
|
|
|
|
// deployTeamAgents deploys Docker containers for the assigned team agents
|
|
func (m *Monitor) deployTeamAgents(ctx context.Context, taskID string, team *composer.Team, teamComposition *composer.TeamComposition, repo RepositoryConfig) {
|
|
log.Info().
|
|
Str("task_id", taskID).
|
|
Str("team_id", team.ID.String()).
|
|
Int("agents_to_deploy", len(teamComposition.AgentMatches)).
|
|
Msg("🚀 Starting agent deployment for team")
|
|
|
|
// Convert string UUIDs to uuid.UUID type
|
|
taskUUID, err := uuid.Parse(taskID)
|
|
if err != nil {
|
|
log.Error().Err(err).Str("task_id", taskID).Msg("Invalid task ID format")
|
|
return
|
|
}
|
|
|
|
// Create deployment request for the entire team
|
|
deploymentRequest := &orchestrator.DeploymentRequest{
|
|
TaskID: taskUUID,
|
|
TeamID: team.ID,
|
|
TeamComposition: teamComposition,
|
|
TaskContext: &orchestrator.TaskContext{
|
|
IssueTitle: team.Description, // Use team description which comes from issue title
|
|
IssueDescription: team.Description, // TODO: Extract actual issue description
|
|
Repository: repo.FullName,
|
|
TechStack: []string{"go", "docker", "ai"}, // TODO: Extract from analysis
|
|
Requirements: []string{}, // TODO: Extract from issue
|
|
Priority: "medium", // TODO: Extract from team data
|
|
ExternalURL: "", // TODO: Add issue URL
|
|
Metadata: map[string]interface{}{
|
|
"task_type": "development",
|
|
},
|
|
},
|
|
DeploymentMode: "immediate",
|
|
}
|
|
|
|
// Deploy all agents for this team
|
|
deploymentResult, err := m.agentDeployer.DeployTeamAgents(deploymentRequest)
|
|
if err != nil {
|
|
log.Error().Err(err).
|
|
Str("task_id", taskID).
|
|
Str("team_id", team.ID.String()).
|
|
Msg("Failed to deploy team agents")
|
|
return
|
|
}
|
|
|
|
log.Info().
|
|
Str("task_id", taskID).
|
|
Str("team_id", team.ID.String()).
|
|
Str("status", deploymentResult.Status).
|
|
Int("agents_deployed", len(deploymentResult.DeployedServices)).
|
|
Msg("🎉 Successfully deployed team agents")
|
|
|
|
// TODO: Update database with deployment information
|
|
// This could include service IDs, container names, deployment status, etc.
|
|
}
|
|
|
|
// extractRequirementsFromIssue extracts requirements from issue description
|
|
func (m *Monitor) extractRequirementsFromIssue(issue gitea.Issue) []string {
|
|
requirements := []string{}
|
|
|
|
// Split description into lines and look for bullet points or numbered lists
|
|
lines := strings.Split(issue.Body, "\n")
|
|
for _, line := range lines {
|
|
line = strings.TrimSpace(line)
|
|
// Look for bullet points (-, *, +) or numbers (1., 2., etc.)
|
|
if strings.HasPrefix(line, "-") || strings.HasPrefix(line, "*") || strings.HasPrefix(line, "+") {
|
|
req := strings.TrimSpace(line[1:])
|
|
if req != "" {
|
|
requirements = append(requirements, req)
|
|
}
|
|
} else if len(line) > 2 && line[1] == '.' && line[0] >= '0' && line[0] <= '9' {
|
|
req := strings.TrimSpace(line[2:])
|
|
if req != "" {
|
|
requirements = append(requirements, req)
|
|
}
|
|
}
|
|
}
|
|
|
|
return requirements
|
|
}
|
|
|
|
// mapPriorityToComposer converts internal priority to composer priority
|
|
func (m *Monitor) mapPriorityToComposer(priority string) composer.TaskPriority {
|
|
switch strings.ToLower(priority) {
|
|
case "critical":
|
|
return composer.PriorityCritical
|
|
case "high":
|
|
return composer.PriorityHigh
|
|
case "low":
|
|
return composer.PriorityLow
|
|
default:
|
|
return composer.PriorityMedium
|
|
}
|
|
}
|
|
|
|
// assignTaskToTeam updates the task record with the assigned team ID
|
|
func (m *Monitor) assignTaskToTeam(ctx context.Context, taskID, teamID string) error {
|
|
query := `
|
|
UPDATE tasks
|
|
SET assigned_team_id = $1, status = $2, updated_at = NOW()
|
|
WHERE id = $3
|
|
`
|
|
|
|
_, err := m.db.Exec(ctx, query, teamID, "claimed", taskID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to assign task to team: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// triggerCouncilFormation initiates council formation for a project kickoff
|
|
func (m *Monitor) triggerCouncilFormation(ctx context.Context, taskID string, issue gitea.Issue, repo RepositoryConfig) {
|
|
ctx, span := tracing.StartCouncilSpan(ctx, "trigger_council_formation", "")
|
|
defer span.End()
|
|
|
|
span.SetAttributes(
|
|
attribute.String("task.id", taskID),
|
|
attribute.Int64("issue.id", issue.ID),
|
|
attribute.Int64("issue.number", issue.Number),
|
|
attribute.String("repository.name", repo.FullName),
|
|
attribute.String("issue.title", issue.Title),
|
|
)
|
|
|
|
log.Info().
|
|
Str("task_id", taskID).
|
|
Int64("issue_id", issue.ID).
|
|
Str("repository", repo.FullName).
|
|
Str("issue_title", issue.Title).
|
|
Msg("🎭 Triggering council formation for project kickoff")
|
|
|
|
// Convert task ID to UUID
|
|
taskUUID, err := uuid.Parse(taskID)
|
|
if err != nil {
|
|
tracing.SetSpanError(span, err)
|
|
log.Error().
|
|
Err(err).
|
|
Str("task_id", taskID).
|
|
Msg("Failed to parse task ID as UUID")
|
|
return
|
|
}
|
|
|
|
// Extract project name from repository name (remove owner prefix)
|
|
projectName := strings.Split(repo.FullName, "/")[1]
|
|
span.SetAttributes(attribute.String("project.name", projectName))
|
|
|
|
// Create council formation request
|
|
councilRequest := &council.CouncilFormationRequest{
|
|
ProjectName: projectName,
|
|
Repository: repo.FullName,
|
|
ProjectBrief: issue.Body,
|
|
TaskID: taskUUID,
|
|
IssueID: issue.ID,
|
|
ExternalURL: issue.HTMLURL,
|
|
Metadata: map[string]interface{}{
|
|
"task_id": taskID,
|
|
"issue_id": issue.ID,
|
|
"issue_number": issue.Number,
|
|
"repository_id": repo.ID,
|
|
"created_by": issue.User.Login,
|
|
"labels": m.extractLabelNames(issue.Labels),
|
|
"milestone": m.extractMilestone(issue),
|
|
},
|
|
}
|
|
|
|
// Form the council
|
|
composition, err := m.council.FormCouncil(ctx, councilRequest)
|
|
if err != nil {
|
|
tracing.SetSpanError(span, err)
|
|
log.Error().Err(err).
|
|
Str("task_id", taskID).
|
|
Str("project_name", projectName).
|
|
Msg("Failed to form project kickoff council")
|
|
return
|
|
}
|
|
|
|
span.SetAttributes(
|
|
attribute.String("council.id", composition.CouncilID.String()),
|
|
attribute.Int("council.core_agents", len(composition.CoreAgents)),
|
|
attribute.Int("council.optional_agents", len(composition.OptionalAgents)),
|
|
)
|
|
|
|
log.Info().
|
|
Str("task_id", taskID).
|
|
Str("council_id", composition.CouncilID.String()).
|
|
Int("core_agents", len(composition.CoreAgents)).
|
|
Int("optional_agents", len(composition.OptionalAgents)).
|
|
Msg("✅ Council composition formed")
|
|
|
|
// Deploy council agents if agent deployer is available
|
|
if m.agentDeployer != nil {
|
|
go m.deployCouncilAgents(ctx, taskID, composition, councilRequest, repo)
|
|
}
|
|
|
|
// Update task status to indicate council formation
|
|
err = m.assignTaskToCouncil(ctx, taskID, composition.CouncilID.String())
|
|
if err != nil {
|
|
log.Error().
|
|
Err(err).
|
|
Str("task_id", taskID).
|
|
Str("council_id", composition.CouncilID.String()).
|
|
Msg("Failed to assign task to council")
|
|
}
|
|
|
|
log.Info().
|
|
Str("task_id", taskID).
|
|
Str("council_id", composition.CouncilID.String()).
|
|
Str("project_name", projectName).
|
|
Msg("🚀 Project kickoff council successfully formed and deploying")
|
|
}
|
|
|
|
// deployCouncilAgents deploys Docker containers for the council agents
|
|
func (m *Monitor) deployCouncilAgents(ctx context.Context, taskID string, composition *council.CouncilComposition, request *council.CouncilFormationRequest, repo RepositoryConfig) {
|
|
ctx, span := tracing.StartDeploymentSpan(ctx, "deploy_council_agents", composition.CouncilID.String())
|
|
defer span.End()
|
|
|
|
span.SetAttributes(
|
|
attribute.String("task.id", taskID),
|
|
attribute.String("council.id", composition.CouncilID.String()),
|
|
attribute.String("project.name", composition.ProjectName),
|
|
attribute.Int("council.core_agents", len(composition.CoreAgents)),
|
|
attribute.Int("council.optional_agents", len(composition.OptionalAgents)),
|
|
attribute.String("repository.name", repo.FullName),
|
|
)
|
|
|
|
log.Info().
|
|
Str("task_id", taskID).
|
|
Str("council_id", composition.CouncilID.String()).
|
|
Int("core_agents", len(composition.CoreAgents)).
|
|
Int("optional_agents", len(composition.OptionalAgents)).
|
|
Msg("🚀 Starting council agent deployment")
|
|
|
|
// Create council deployment request
|
|
deploymentRequest := &orchestrator.CouncilDeploymentRequest{
|
|
CouncilID: composition.CouncilID,
|
|
ProjectName: composition.ProjectName,
|
|
CouncilComposition: composition,
|
|
ProjectContext: &orchestrator.CouncilProjectContext{
|
|
ProjectName: composition.ProjectName,
|
|
Repository: request.Repository,
|
|
ProjectBrief: request.ProjectBrief,
|
|
Constraints: request.Constraints,
|
|
TechLimits: request.TechLimits,
|
|
ComplianceNotes: request.ComplianceNotes,
|
|
Targets: request.Targets,
|
|
ExternalURL: request.ExternalURL,
|
|
},
|
|
DeploymentMode: "immediate",
|
|
}
|
|
|
|
// Deploy the council agents
|
|
result, err := m.agentDeployer.DeployCouncilAgents(deploymentRequest)
|
|
if err != nil {
|
|
tracing.SetSpanError(span, err)
|
|
log.Error().
|
|
Err(err).
|
|
Str("council_id", composition.CouncilID.String()).
|
|
Msg("Failed to deploy council agents")
|
|
|
|
// Update council status to failed
|
|
m.council.UpdateCouncilStatus(ctx, composition.CouncilID, "failed")
|
|
return
|
|
}
|
|
|
|
span.SetAttributes(
|
|
attribute.String("deployment.status", result.Status),
|
|
attribute.Int("deployment.deployed_agents", len(result.DeployedAgents)),
|
|
attribute.Int("deployment.errors", len(result.Errors)),
|
|
)
|
|
|
|
log.Info().
|
|
Str("council_id", composition.CouncilID.String()).
|
|
Str("deployment_status", result.Status).
|
|
Int("deployed_agents", len(result.DeployedAgents)).
|
|
Int("errors", len(result.Errors)).
|
|
Msg("✅ Council agent deployment completed")
|
|
|
|
// Log deployment details for each agent
|
|
for _, agent := range result.DeployedAgents {
|
|
log.Info().
|
|
Str("council_id", composition.CouncilID.String()).
|
|
Str("service_id", agent.ServiceID).
|
|
Str("role", agent.RoleName).
|
|
Str("agent_id", agent.AgentID).
|
|
Msg("🤖 Council agent deployed")
|
|
}
|
|
|
|
if len(result.Errors) > 0 {
|
|
for _, errMsg := range result.Errors {
|
|
log.Warn().
|
|
Str("council_id", composition.CouncilID.String()).
|
|
Str("error", errMsg).
|
|
Msg("⚠️ Council agent deployment error")
|
|
}
|
|
}
|
|
}
|
|
|
|
// assignTaskToCouncil updates the task record with the assigned council ID
|
|
func (m *Monitor) assignTaskToCouncil(ctx context.Context, taskID, councilID string) error {
|
|
query := `
|
|
UPDATE tasks
|
|
SET assigned_team_id = $1, status = $2, updated_at = NOW()
|
|
WHERE id = $3
|
|
`
|
|
|
|
// Use council ID as team ID for consistency with existing schema
|
|
_, err := m.db.Exec(ctx, query, councilID, "council_forming", taskID)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to assign task to council: %w", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
// extractLabelNames extracts label names from gitea labels
|
|
func (m *Monitor) extractLabelNames(labels []gitea.Label) []string {
|
|
names := make([]string, len(labels))
|
|
for i, label := range labels {
|
|
names[i] = label.Name
|
|
}
|
|
return names
|
|
}
|
|
|
|
// extractMilestone extracts milestone information if present
|
|
func (m *Monitor) extractMilestone(issue gitea.Issue) string {
|
|
// Note: Milestone field access depends on Gitea SDK version
|
|
// For now, return empty string to avoid build issues
|
|
return ""
|
|
}
|
|
|