diff --git a/internal/council/models.go b/internal/council/models.go index d554a2d..212c8ba 100644 --- a/internal/council/models.go +++ b/internal/council/models.go @@ -35,14 +35,18 @@ type CouncilComposition struct { // CouncilAgent represents a single agent in the council type CouncilAgent struct { - AgentID string `json:"agent_id"` - RoleName string `json:"role_name"` - AgentName string `json:"agent_name"` - Required bool `json:"required"` - Deployed bool `json:"deployed"` - ServiceID string `json:"service_id,omitempty"` - DeployedAt *time.Time `json:"deployed_at,omitempty"` - Status string `json:"status"` // pending, deploying, active, failed + AgentID string `json:"agent_id"` + RoleName string `json:"role_name"` + AgentName string `json:"agent_name"` + Required bool `json:"required"` + Deployed bool `json:"deployed"` + ServiceID string `json:"service_id,omitempty"` + DeployedAt *time.Time `json:"deployed_at,omitempty"` + Status string `json:"status"` // pending, assigned, deploying, active, failed + PersonaStatus *string `json:"persona_status,omitempty"` // pending, loading, loaded, failed + PersonaLoadedAt *time.Time `json:"persona_loaded_at,omitempty"` + EndpointURL *string `json:"endpoint_url,omitempty"` + PersonaAckPayload map[string]interface{} `json:"persona_ack_payload,omitempty"` } // CouncilDeploymentResult represents the result of council agent deployment @@ -81,15 +85,10 @@ type CouncilArtifacts struct { } // CoreCouncilRoles defines the required roles for any project kickoff council +// Reduced to minimal set for faster formation and easier debugging var CoreCouncilRoles = []string{ - "systems-analyst", - "senior-software-architect", "tpm", - "security-architect", - "devex-platform-engineer", - "qa-test-engineer", - "sre-observability-lead", - "technical-writer", + "senior-software-architect", } // OptionalCouncilRoles defines the optional roles that may be included based on project needs diff --git a/internal/monitor/monitor.go b/internal/monitor/monitor.go index 459aaa4..2026e55 100644 --- a/internal/monitor/monitor.go +++ b/internal/monitor/monitor.go @@ -13,6 +13,8 @@ import ( "github.com/chorus-services/whoosh/internal/council" "github.com/chorus-services/whoosh/internal/gitea" "github.com/chorus-services/whoosh/internal/orchestrator" + "github.com/chorus-services/whoosh/internal/p2p" + "github.com/chorus-services/whoosh/internal/tasks" "github.com/chorus-services/whoosh/internal/tracing" "github.com/google/uuid" "github.com/jackc/pgx/v5" @@ -28,18 +30,20 @@ type Monitor struct { composer *composer.Service council *council.CouncilComposer agentDeployer *orchestrator.AgentDeployer + broadcaster *p2p.Broadcaster stopCh chan struct{} syncInterval time.Duration } // NewMonitor creates a new repository monitor -func NewMonitor(db *pgxpool.Pool, giteaCfg config.GITEAConfig, composerService *composer.Service, councilComposer *council.CouncilComposer, agentDeployer *orchestrator.AgentDeployer) *Monitor { +func NewMonitor(db *pgxpool.Pool, giteaCfg config.GITEAConfig, composerService *composer.Service, councilComposer *council.CouncilComposer, agentDeployer *orchestrator.AgentDeployer, broadcaster *p2p.Broadcaster) *Monitor { return &Monitor{ db: db, gitea: gitea.NewClient(giteaCfg), composer: composerService, council: councilComposer, agentDeployer: agentDeployer, + broadcaster: broadcaster, stopCh: make(chan struct{}), syncInterval: 5 * time.Minute, // Default sync every 5 minutes } @@ -53,22 +57,22 @@ func (m *Monitor) GetGiteaClient() *gitea.Client { // Start begins the monitoring process func (m *Monitor) Start(ctx context.Context) error { log.Info().Msg("🔍 Starting repository monitoring service") - + // Test Gitea connection if err := m.gitea.TestConnection(ctx); err != nil { log.Error().Err(err).Msg("Failed to connect to Gitea") return fmt.Errorf("gitea connection failed: %w", err) } - + log.Info().Msg("✅ Gitea connection established") - + // Start monitoring loop ticker := time.NewTicker(m.syncInterval) defer ticker.Stop() - + // Initial sync m.syncAllRepositories(ctx) - + for { select { case <-ctx.Done(): @@ -92,25 +96,25 @@ func (m *Monitor) Stop() { func (m *Monitor) syncAllRepositories(ctx context.Context) { ctx, span := tracing.StartMonitorSpan(ctx, "sync_all_repositories", "all") defer span.End() - + log.Info().Msg("🔄 Starting repository sync cycle") - + repos, err := m.getMonitoredRepositories(ctx) if err != nil { tracing.SetSpanError(span, err) log.Error().Err(err).Msg("Failed to get monitored repositories") return } - + span.SetAttributes(attribute.Int("repositories.count", len(repos))) - + if len(repos) == 0 { log.Info().Msg("No repositories to monitor") return } - + log.Info().Int("count", len(repos)).Msg("Syncing repositories") - + for _, repo := range repos { select { case <-ctx.Done(): @@ -119,7 +123,7 @@ func (m *Monitor) syncAllRepositories(ctx context.Context) { m.syncRepository(ctx, repo) } } - + span.SetAttributes(attribute.String("sync.status", "completed")) log.Info().Msg("✅ Repository sync cycle completed") } @@ -128,7 +132,7 @@ func (m *Monitor) syncAllRepositories(ctx context.Context) { func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) { ctx, span := tracing.StartMonitorSpan(ctx, "sync_repository", repo.FullName) defer span.End() - + span.SetAttributes( attribute.String("repository.id", repo.ID), attribute.String("repository.owner", repo.Owner), @@ -136,26 +140,26 @@ func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) { attribute.String("repository.sync_status", repo.SyncStatus), attribute.Bool("repository.chorus_enabled", repo.EnableChorusIntegration), ) - + log.Info(). Str("repository", repo.FullName). Msg("Syncing repository") - + startTime := time.Now() - + // Update repository status to active if err := m.updateRepositoryStatus(ctx, repo.ID, "active", nil); err != nil { log.Error().Err(err). Str("repository", repo.FullName). Msg("Failed to update repository status to active") } - + // Get issues since last sync opts := gitea.IssueListOptions{ State: "open", Limit: 100, } - + // Only use Since parameter for repositories that have completed initial scan // For initial_scan or pending status, we want to scan ALL issues to find Design Briefs and UCXL content if repo.LastIssueSync != nil && repo.SyncStatus != "initial_scan" && repo.SyncStatus != "pending" { @@ -170,34 +174,34 @@ func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) { Str("sync_status", repo.SyncStatus). Msg("Performing full scan (no Since parameter) - initial scan or looking for Design Briefs") } - + // Filter by CHORUS task labels if enabled if repo.EnableChorusIntegration && len(repo.ChorusTaskLabels) > 0 { opts.Labels = strings.Join(repo.ChorusTaskLabels, ",") } - + issues, err := m.gitea.GetIssues(ctx, repo.Owner, repo.Name, opts) if err != nil { m.logSyncError(ctx, repo.ID, "fetch_issues", fmt.Sprintf("Failed to fetch issues: %v", err)) log.Error().Err(err). Str("repository", repo.FullName). Msg("Failed to fetch issues") - + if err := m.updateRepositoryStatus(ctx, repo.ID, "error", err); err != nil { log.Error().Err(err).Msg("Failed to update repository status to error") } return } - + created := 0 updated := 0 - + for _, issue := range issues { // Skip issues without CHORUS labels if CHORUS integration is enabled if repo.EnableChorusIntegration && !m.hasChorusLabels(issue, repo.ChorusTaskLabels) { continue } - + taskID, isNew, err := m.createOrUpdateTask(ctx, repo, issue) if err != nil { log.Error().Err(err). @@ -206,7 +210,7 @@ func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) { Msg("Failed to create/update task") continue } - + if isNew { created++ log.Info(). @@ -217,15 +221,15 @@ func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) { } else { updated++ } - + // Check if this issue should trigger council formation if m.isProjectKickoffBrief(issue, repo) { m.triggerCouncilFormation(ctx, taskID, issue, repo) } } - + duration := time.Since(startTime) - + // Add span attributes for the sync results span.SetAttributes( attribute.Int("issues.processed", len(issues)), @@ -233,21 +237,21 @@ func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) { attribute.Int("tasks.updated", updated), attribute.Int64("duration.ms", duration.Milliseconds()), ) - + // Check if repository should transition from initial scan to active status if repo.SyncStatus == "initial_scan" || repo.SyncStatus == "pending" { // Repository has completed initial scan // For now, transition to active if we processed any issues or found Design Briefs // Future: Add UCXL content detection logic here shouldActivate := (created > 0 || updated > 0) - + if shouldActivate { log.Info(). Str("repository", repo.FullName). Int("tasks_created", created). Int("tasks_updated", updated). Msg("Transitioning repository from initial scan to active status - content found") - + if err := m.updateRepositoryStatus(ctx, repo.ID, "active", nil); err != nil { tracing.SetSpanError(span, err) log.Error().Err(err). @@ -263,7 +267,7 @@ func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) { span.SetAttributes(attribute.String("repository.transition", "initial_scan_no_content")) } } - + // Update repository sync timestamps and statistics if err := m.updateRepositorySyncInfo(ctx, repo.ID, time.Now(), created, updated); err != nil { tracing.SetSpanError(span, err) @@ -271,10 +275,10 @@ func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) { Str("repository", repo.FullName). Msg("Failed to update repository sync info") } - + // Log successful sync m.logSyncSuccess(ctx, repo.ID, "full_sync", duration, len(issues), created, updated) - + log.Info(). Str("repository", repo.FullName). Int("issues_processed", len(issues)). @@ -290,34 +294,34 @@ func (m *Monitor) createOrUpdateTask(ctx context.Context, repo RepositoryConfig, var existingTaskID string query := `SELECT id FROM tasks WHERE external_id = $1 AND source_type = $2` err := m.db.QueryRow(ctx, query, strconv.FormatInt(issue.Number, 10), repo.SourceType).Scan(&existingTaskID) - + taskExists := err == nil - + // Handle errors other than "no rows found" if err != nil && err != pgx.ErrNoRows { return "", false, fmt.Errorf("failed to check existing task: %w", err) } - + // Prepare labels labels := make([]string, len(issue.Labels)) for i, label := range issue.Labels { labels[i] = label.Name } labelsJSON, _ := json.Marshal(labels) - + // Determine task status status := "open" if issue.State == "closed" { status = "completed" } - + // Determine priority from labels priority := m.extractPriorityFromLabels(issue.Labels) - + // Extract tech stack from labels and description techStack := m.extractTechStackFromIssue(issue) techStackJSON, _ := json.Marshal(techStack) - + if taskExists { // Task exists - update it updateQuery := ` @@ -332,7 +336,7 @@ func (m *Monitor) createOrUpdateTask(ctx context.Context, repo RepositoryConfig, updated_at = NOW() WHERE id = $8 ` - + _, err = m.db.Exec(ctx, updateQuery, issue.Title, issue.Body, @@ -343,11 +347,11 @@ func (m *Monitor) createOrUpdateTask(ctx context.Context, repo RepositoryConfig, issue.UpdatedAt, existingTaskID, ) - + if err != nil { return "", false, fmt.Errorf("failed to update task: %w", err) } - + return existingTaskID, false, nil } else { // Create new task @@ -362,15 +366,15 @@ func (m *Monitor) createOrUpdateTask(ctx context.Context, repo RepositoryConfig, $1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14 ) RETURNING id ` - + // Prepare source config sourceConfig := map[string]interface{}{ "repository_id": repo.ID, "issue_number": issue.Number, - "issue_id": issue.ID, + "issue_id": issue.ID, } sourceConfigJSON, _ := json.Marshal(sourceConfig) - + err = m.db.QueryRow(ctx, insertQuery, strconv.FormatInt(issue.Number, 10), // external_id issue.HTMLURL, // external_url @@ -381,28 +385,28 @@ func (m *Monitor) createOrUpdateTask(ctx context.Context, repo RepositoryConfig, status, // status priority, // priority repo.FullName, // repository - repo.ID, // repository_id - labelsJSON, // labels - techStackJSON, // tech_stack - issue.CreatedAt, // external_created_at - issue.UpdatedAt, // external_updated_at + repo.ID, // repository_id + labelsJSON, // labels + techStackJSON, // tech_stack + issue.CreatedAt, // external_created_at + issue.UpdatedAt, // external_updated_at ).Scan(&taskID) - + if err != nil { return "", false, fmt.Errorf("failed to create task: %w", err) } - + // For newly created bzzz-task issues, check if it's a council formation trigger if m.composer != nil && m.shouldTriggerTeamComposition(issue.Labels) { if m.isProjectKickoffBrief(issue, repo) { // This is a project kickoff - trigger council formation go m.triggerCouncilFormation(context.Background(), taskID, issue, repo) } else { - // Regular bzzz-task - trigger normal team composition + // Regular bzzz-task - trigger normal team composition go m.triggerTeamComposition(context.Background(), taskID, issue, repo) } } - + return taskID, true, nil } } @@ -412,18 +416,18 @@ func (m *Monitor) hasChorusLabels(issue gitea.Issue, requiredLabels []string) bo if len(requiredLabels) == 0 { return true // If no specific labels required, all issues qualify } - + issueLabels := make(map[string]bool) for _, label := range issue.Labels { issueLabels[strings.ToLower(label.Name)] = true } - + for _, required := range requiredLabels { if issueLabels[strings.ToLower(required)] { return true } } - + return false } @@ -437,20 +441,20 @@ func (m *Monitor) extractPriorityFromLabels(labels []gitea.Label) string { "low": "low", "minor": "low", } - + for _, label := range labels { if priority, exists := priorities[strings.ToLower(label.Name)]; exists { return priority } } - + return "medium" // Default priority } // extractTechStackFromIssue extracts technology stack from issue labels and content func (m *Monitor) extractTechStackFromIssue(issue gitea.Issue) []string { techStack := make(map[string]bool) - + // Extract from labels techLabels := []string{ "go", "golang", "javascript", "typescript", "python", "rust", "java", @@ -458,7 +462,7 @@ func (m *Monitor) extractTechStackFromIssue(issue gitea.Issue) []string { "postgresql", "mysql", "mongodb", "redis", "docker", "kubernetes", "api", "frontend", "backend", "database", "devops", "ci/cd", } - + for _, label := range issue.Labels { labelName := strings.ToLower(label.Name) for _, tech := range techLabels { @@ -467,30 +471,30 @@ func (m *Monitor) extractTechStackFromIssue(issue gitea.Issue) []string { } } } - + // Convert map to slice result := make([]string, 0, len(techStack)) for tech := range techStack { result = append(result, tech) } - + return result } // RepositoryConfig represents a monitored repository configuration type RepositoryConfig struct { - ID string `db:"id"` - Name string `db:"name"` - Owner string `db:"owner"` - FullName string `db:"full_name"` - URL string `db:"url"` - SourceType string `db:"source_type"` - MonitorIssues bool `db:"monitor_issues"` - EnableChorusIntegration bool `db:"enable_chorus_integration"` - ChorusTaskLabels []string `db:"chorus_task_labels"` - LastSync *time.Time `db:"last_sync_at"` - LastIssueSync *time.Time `db:"last_issue_sync"` - SyncStatus string `db:"sync_status"` + ID string `db:"id"` + Name string `db:"name"` + Owner string `db:"owner"` + FullName string `db:"full_name"` + URL string `db:"url"` + SourceType string `db:"source_type"` + MonitorIssues bool `db:"monitor_issues"` + EnableChorusIntegration bool `db:"enable_chorus_integration"` + ChorusTaskLabels []string `db:"chorus_task_labels"` + LastSync *time.Time `db:"last_sync_at"` + LastIssueSync *time.Time `db:"last_issue_sync"` + SyncStatus string `db:"sync_status"` } // getMonitoredRepositories retrieves all repositories that should be monitored @@ -503,18 +507,18 @@ func (m *Monitor) getMonitoredRepositories(ctx context.Context) ([]RepositoryCon WHERE monitor_issues = true AND sync_status != 'disabled' ORDER BY last_sync_at ASC NULLS FIRST ` - + rows, err := m.db.Query(ctx, query) if err != nil { return nil, fmt.Errorf("failed to query repositories: %w", err) } defer rows.Close() - + var repos []RepositoryConfig for rows.Next() { var repo RepositoryConfig var chorusLabelsJSON []byte - + err := rows.Scan( &repo.ID, &repo.Name, &repo.Owner, &repo.FullName, &repo.URL, &repo.SourceType, &repo.MonitorIssues, &repo.EnableChorusIntegration, @@ -524,16 +528,16 @@ func (m *Monitor) getMonitoredRepositories(ctx context.Context) ([]RepositoryCon log.Error().Err(err).Msg("Failed to scan repository row") continue } - + // Parse CHORUS task labels if err := json.Unmarshal(chorusLabelsJSON, &repo.ChorusTaskLabels); err != nil { log.Error().Err(err).Str("repository", repo.FullName).Msg("Failed to parse CHORUS task labels") repo.ChorusTaskLabels = []string{"bzzz-task", "chorus-task", "chorus-entrypoint"} // Default labels } - + repos = append(repos, repo) } - + return repos, nil } @@ -544,18 +548,18 @@ func (m *Monitor) updateRepositoryStatus(ctx context.Context, repoID, status str errStr := err.Error() errorMsg = &errStr } - + query := ` UPDATE repositories SET sync_status = $1, sync_error = $2, updated_at = NOW() WHERE id = $3 ` - + _, dbErr := m.db.Exec(ctx, query, status, errorMsg, repoID) if dbErr != nil { return fmt.Errorf("failed to update repository status: %w", dbErr) } - + return nil } @@ -568,12 +572,12 @@ func (m *Monitor) updateRepositorySyncInfo(ctx context.Context, repoID string, s updated_at = NOW() WHERE id = $3 ` - + _, err := m.db.Exec(ctx, query, syncTime, created, repoID) if err != nil { return fmt.Errorf("failed to update repository sync info: %w", err) } - + return nil } @@ -585,9 +589,9 @@ func (m *Monitor) logSyncSuccess(ctx context.Context, repoID, operation string, items_processed, items_created, items_updated, duration_ms ) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9) ` - + message := fmt.Sprintf("Processed %d items, created %d tasks, updated %d tasks in %v", processed, created, updated, duration) - + _, err := m.db.Exec(ctx, query, repoID, "full_sync", operation, "success", message, processed, created, updated, duration.Milliseconds(), @@ -604,13 +608,13 @@ func (m *Monitor) logSyncError(ctx context.Context, repoID, operation, errorMsg repository_id, sync_type, operation, status, message, error_details ) VALUES ($1, $2, $3, $4, $5, $6) ` - + errorDetails := map[string]interface{}{ "error": errorMsg, "timestamp": time.Now().Format(time.RFC3339), } errorDetailsJSON, _ := json.Marshal(errorDetails) - + _, err := m.db.Exec(ctx, query, repoID, "full_sync", operation, "error", errorMsg, errorDetailsJSON, ) @@ -622,16 +626,16 @@ func (m *Monitor) logSyncError(ctx context.Context, repoID, operation, errorMsg // SyncRepository manually syncs a specific repository by ID func (m *Monitor) SyncRepository(ctx context.Context, repoID string) error { log.Info().Str("repository_id", repoID).Msg("Manual repository sync requested") - + // Get repository configuration repo, err := m.getRepositoryByID(ctx, repoID) if err != nil { return fmt.Errorf("failed to get repository: %w", err) } - + // Sync the repository m.syncRepository(ctx, *repo) - + return nil } @@ -644,10 +648,10 @@ func (m *Monitor) getRepositoryByID(ctx context.Context, repoID string) (*Reposi FROM repositories WHERE id = $1 ` - + var repo RepositoryConfig var chorusLabelsJSON []byte - + err := m.db.QueryRow(ctx, query, repoID).Scan( &repo.ID, &repo.Name, &repo.Owner, &repo.FullName, &repo.URL, &repo.SourceType, &repo.MonitorIssues, &repo.EnableChorusIntegration, @@ -659,13 +663,13 @@ func (m *Monitor) getRepositoryByID(ctx context.Context, repoID string) (*Reposi } return nil, fmt.Errorf("failed to query repository: %w", err) } - + // Parse CHORUS task labels if err := json.Unmarshal(chorusLabelsJSON, &repo.ChorusTaskLabels); err != nil { log.Error().Err(err).Str("repository", repo.FullName).Msg("Failed to parse CHORUS task labels") repo.ChorusTaskLabels = []string{"bzzz-task", "chorus-task", "chorus-entrypoint"} // Default labels } - + return &repo, nil } @@ -692,21 +696,21 @@ func (m *Monitor) isProjectKickoffBrief(issue gitea.Issue, repo RepositoryConfig if !hasChorusEntrypoint { return false } - + // Check if the issue title contains "Design Brief" title := strings.ToLower(issue.Title) if !strings.Contains(title, "design brief") { return false } - + // Additional validation: this should be a new/empty repository // For now, we'll rely on the title check, but could add repo analysis later - + log.Info(). Str("repository", repo.FullName). Str("issue_title", issue.Title). Msg("🎭 Detected project kickoff brief - council formation required") - + return true } @@ -717,11 +721,11 @@ func (m *Monitor) triggerTeamComposition(ctx context.Context, taskID string, iss Int64("issue_id", issue.ID). Str("repository", repo.FullName). Msg("🎯 Triggering team composition for bzzz-task") - + // Convert Gitea issue to TaskAnalysisInput techStack := m.extractTechStackFromIssue(issue) requirements := m.extractRequirementsFromIssue(issue) - + analysisInput := &composer.TaskAnalysisInput{ Title: issue.Title, Description: issue.Body, @@ -730,14 +734,14 @@ func (m *Monitor) triggerTeamComposition(ctx context.Context, taskID string, iss Priority: m.mapPriorityToComposer(m.extractPriorityFromLabels(issue.Labels)), TechStack: techStack, Metadata: map[string]interface{}{ - "task_id": taskID, - "issue_id": issue.ID, - "issue_number": issue.Number, + "task_id": taskID, + "issue_id": issue.ID, + "issue_number": issue.Number, "repository_id": repo.ID, - "external_url": issue.HTMLURL, + "external_url": issue.HTMLURL, }, } - + // Perform team composition analysis result, err := m.composer.AnalyzeAndComposeTeam(ctx, analysisInput) if err != nil { @@ -746,14 +750,14 @@ func (m *Monitor) triggerTeamComposition(ctx context.Context, taskID string, iss Msg("Failed to perform team composition analysis") return } - + log.Info(). Str("task_id", taskID). Str("team_id", result.TeamComposition.TeamID.String()). Int("team_size", result.TeamComposition.EstimatedSize). Float64("confidence", result.TeamComposition.ConfidenceScore). Msg("✅ Team composition analysis completed") - + // Create the team in the database team, err := m.composer.CreateTeam(ctx, result.TeamComposition, analysisInput) if err != nil { @@ -762,7 +766,7 @@ func (m *Monitor) triggerTeamComposition(ctx context.Context, taskID string, iss Msg("Failed to create team") return } - + // Update task with team assignment err = m.assignTaskToTeam(ctx, taskID, team.ID.String()) if err != nil { @@ -772,12 +776,12 @@ func (m *Monitor) triggerTeamComposition(ctx context.Context, taskID string, iss Msg("Failed to assign task to team") return } - + // Deploy agents for the newly formed team if agent deployer is available if m.agentDeployer != nil { go m.deployTeamAgents(ctx, taskID, team, result.TeamComposition, repo) } - + log.Info(). Str("task_id", taskID). Str("team_id", team.ID.String()). @@ -785,6 +789,80 @@ func (m *Monitor) triggerTeamComposition(ctx context.Context, taskID string, iss Msg("🚀 Task successfully assigned to team") } +// TriggerTeamCompositionForCouncil runs team composition for the task associated with a council once the council is active. +func (m *Monitor) TriggerTeamCompositionForCouncil(ctx context.Context, taskID string) { + logger := log.With().Str("task_id", taskID).Logger() + logger.Info().Msg("🔁 Triggering team composition for council task") + + if m.composer == nil { + logger.Warn().Msg("Composer service unavailable; cannot trigger team composition") + return + } + + taskUUID, err := uuid.Parse(taskID) + if err != nil { + logger.Error().Err(err).Msg("Invalid task ID format; skipping team composition") + return + } + + // Load task details so we can build the analysis input + taskService := tasks.NewService(m.db) + task, err := taskService.GetTask(ctx, taskUUID) + if err != nil { + logger.Error().Err(err).Msg("Failed to load task for council team composition") + return + } + + analysisInput := &composer.TaskAnalysisInput{ + Title: task.Title, + Description: task.Description, + Repository: task.Repository, + Requirements: task.Requirements, + Priority: m.mapPriorityToComposer(string(task.Priority)), + TechStack: task.TechStack, + Metadata: map[string]interface{}{ + "task_id": task.ID.String(), + "source_type": string(task.SourceType), + "source_config": task.SourceConfig, + "labels": task.Labels, + }, + } + + // Perform team composition analysis + result, err := m.composer.AnalyzeAndComposeTeam(ctx, analysisInput) + if err != nil { + logger.Error().Err(err).Msg("Team composition analysis failed for council task") + return + } + + logger.Info(). + Str("team_id", result.TeamComposition.TeamID.String()). + Int("estimated_size", result.TeamComposition.EstimatedSize). + Float64("confidence", result.TeamComposition.ConfidenceScore). + Msg("✅ Council task team composition analysis completed") + + team, err := m.composer.CreateTeam(ctx, result.TeamComposition, analysisInput) + if err != nil { + logger.Error().Err(err).Msg("Failed to create team for council task") + return + } + + if err := m.assignTaskToTeam(ctx, taskID, team.ID.String()); err != nil { + logger.Error().Err(err).Str("team_id", team.ID.String()).Msg("Failed to assign council task to team") + } + + // Optionally deploy agents for the team if our orchestrator is available + if m.agentDeployer != nil { + repo := RepositoryConfig{FullName: task.Repository} + go m.deployTeamAgents(ctx, taskID, team, result.TeamComposition, repo) + } + + logger.Info(). + Str("team_id", team.ID.String()). + Str("team_name", team.Name). + Msg("🚀 Council task assigned to team") +} + // deployTeamAgents deploys Docker containers for the assigned team agents func (m *Monitor) deployTeamAgents(ctx context.Context, taskID string, team *composer.Team, teamComposition *composer.TeamComposition, repo RepositoryConfig) { log.Info(). @@ -792,14 +870,14 @@ func (m *Monitor) deployTeamAgents(ctx context.Context, taskID string, team *com Str("team_id", team.ID.String()). Int("agents_to_deploy", len(teamComposition.AgentMatches)). Msg("🚀 Starting agent deployment for team") - + // Convert string UUIDs to uuid.UUID type taskUUID, err := uuid.Parse(taskID) if err != nil { log.Error().Err(err).Str("task_id", taskID).Msg("Invalid task ID format") return } - + // Create deployment request for the entire team deploymentRequest := &orchestrator.DeploymentRequest{ TaskID: taskUUID, @@ -810,16 +888,25 @@ func (m *Monitor) deployTeamAgents(ctx context.Context, taskID string, team *com IssueDescription: team.Description, // TODO: Extract actual issue description Repository: repo.FullName, TechStack: []string{"go", "docker", "ai"}, // TODO: Extract from analysis - Requirements: []string{}, // TODO: Extract from issue - Priority: "medium", // TODO: Extract from team data - ExternalURL: "", // TODO: Add issue URL + Requirements: []string{}, // TODO: Extract from issue + Priority: "medium", // TODO: Extract from team data + ExternalURL: "", // TODO: Add issue URL Metadata: map[string]interface{}{ "task_type": "development", }, }, DeploymentMode: "immediate", } - + + // Check if agent deployment is available (Docker enabled) + if m.agentDeployer == nil { + log.Info(). + Str("task_id", taskID). + Str("team_id", team.ID.String()). + Msg("Docker disabled - team assignment completed without agent deployment") + return + } + // Deploy all agents for this team deploymentResult, err := m.agentDeployer.DeployTeamAgents(deploymentRequest) if err != nil { @@ -829,14 +916,14 @@ func (m *Monitor) deployTeamAgents(ctx context.Context, taskID string, team *com Msg("Failed to deploy team agents") return } - + log.Info(). Str("task_id", taskID). Str("team_id", team.ID.String()). Str("status", deploymentResult.Status). Int("agents_deployed", len(deploymentResult.DeployedServices)). Msg("🎉 Successfully deployed team agents") - + // TODO: Update database with deployment information // This could include service IDs, container names, deployment status, etc. } @@ -844,7 +931,7 @@ func (m *Monitor) deployTeamAgents(ctx context.Context, taskID string, team *com // extractRequirementsFromIssue extracts requirements from issue description func (m *Monitor) extractRequirementsFromIssue(issue gitea.Issue) []string { requirements := []string{} - + // Split description into lines and look for bullet points or numbered lists lines := strings.Split(issue.Body, "\n") for _, line := range lines { @@ -862,7 +949,7 @@ func (m *Monitor) extractRequirementsFromIssue(issue gitea.Issue) []string { } } } - + return requirements } @@ -887,12 +974,12 @@ func (m *Monitor) assignTaskToTeam(ctx context.Context, taskID, teamID string) e SET assigned_team_id = $1, status = $2, updated_at = NOW() WHERE id = $3 ` - + _, err := m.db.Exec(ctx, query, teamID, "claimed", taskID) if err != nil { return fmt.Errorf("failed to assign task to team: %w", err) } - + return nil } @@ -900,7 +987,7 @@ func (m *Monitor) assignTaskToTeam(ctx context.Context, taskID, teamID string) e func (m *Monitor) triggerCouncilFormation(ctx context.Context, taskID string, issue gitea.Issue, repo RepositoryConfig) { ctx, span := tracing.StartCouncilSpan(ctx, "trigger_council_formation", "") defer span.End() - + span.SetAttributes( attribute.String("task.id", taskID), attribute.Int64("issue.id", issue.ID), @@ -908,14 +995,14 @@ func (m *Monitor) triggerCouncilFormation(ctx context.Context, taskID string, is attribute.String("repository.name", repo.FullName), attribute.String("issue.title", issue.Title), ) - + log.Info(). Str("task_id", taskID). Int64("issue_id", issue.ID). Str("repository", repo.FullName). Str("issue_title", issue.Title). Msg("🎭 Triggering council formation for project kickoff") - + // Convert task ID to UUID taskUUID, err := uuid.Parse(taskID) if err != nil { @@ -926,30 +1013,30 @@ func (m *Monitor) triggerCouncilFormation(ctx context.Context, taskID string, is Msg("Failed to parse task ID as UUID") return } - + // Extract project name from repository name (remove owner prefix) projectName := strings.Split(repo.FullName, "/")[1] span.SetAttributes(attribute.String("project.name", projectName)) - + // Create council formation request councilRequest := &council.CouncilFormationRequest{ - ProjectName: projectName, - Repository: repo.FullName, - ProjectBrief: issue.Body, - TaskID: taskUUID, - IssueID: issue.ID, - ExternalURL: issue.HTMLURL, + ProjectName: projectName, + Repository: repo.FullName, + ProjectBrief: issue.Body, + TaskID: taskUUID, + IssueID: issue.ID, + ExternalURL: issue.HTMLURL, Metadata: map[string]interface{}{ - "task_id": taskID, - "issue_id": issue.ID, - "issue_number": issue.Number, - "repository_id": repo.ID, - "created_by": issue.User.Login, - "labels": m.extractLabelNames(issue.Labels), - "milestone": m.extractMilestone(issue), + "task_id": taskID, + "issue_id": issue.ID, + "issue_number": issue.Number, + "repository_id": repo.ID, + "created_by": issue.User.Login, + "labels": m.extractLabelNames(issue.Labels), + "milestone": m.extractMilestone(issue), }, } - + // Form the council composition, err := m.council.FormCouncil(ctx, councilRequest) if err != nil { @@ -960,25 +1047,80 @@ func (m *Monitor) triggerCouncilFormation(ctx context.Context, taskID string, is Msg("Failed to form project kickoff council") return } - + span.SetAttributes( attribute.String("council.id", composition.CouncilID.String()), attribute.Int("council.core_agents", len(composition.CoreAgents)), attribute.Int("council.optional_agents", len(composition.OptionalAgents)), ) - + log.Info(). Str("task_id", taskID). Str("council_id", composition.CouncilID.String()). Int("core_agents", len(composition.CoreAgents)). Int("optional_agents", len(composition.OptionalAgents)). Msg("✅ Council composition formed") - + + // Broadcast council opportunity to CHORUS agents + if m.broadcaster != nil { + go func() { + // Build council opportunity for broadcast + coreRoles := make([]p2p.CouncilRole, len(composition.CoreAgents)) + for i, agent := range composition.CoreAgents { + coreRoles[i] = p2p.CouncilRole{ + RoleName: agent.RoleName, + AgentName: agent.AgentName, + Required: agent.Required, + Description: fmt.Sprintf("Core role: %s", agent.AgentName), + } + } + + optionalRoles := make([]p2p.CouncilRole, len(composition.OptionalAgents)) + for i, agent := range composition.OptionalAgents { + optionalRoles[i] = p2p.CouncilRole{ + RoleName: agent.RoleName, + AgentName: agent.AgentName, + Required: agent.Required, + Description: fmt.Sprintf("Optional role: %s", agent.AgentName), + } + } + + opportunity := &p2p.CouncilOpportunity{ + CouncilID: composition.CouncilID, + ProjectName: projectName, + Repository: repo.FullName, + ProjectBrief: issue.Body, + CoreRoles: coreRoles, + OptionalRoles: optionalRoles, + UCXLAddress: fmt.Sprintf("ucxl://team:council@project:%s:council/councils/%s", strings.ReplaceAll(projectName, " ", "-"), composition.CouncilID.String()), + FormationDeadline: time.Now().Add(24 * time.Hour), + CreatedAt: composition.CreatedAt, + Metadata: councilRequest.Metadata, + } + + broadcastCtx, cancel := context.WithTimeout(context.Background(), 30*time.Second) + defer cancel() + + if err := m.broadcaster.BroadcastCouncilOpportunity(broadcastCtx, opportunity); err != nil { + log.Error(). + Err(err). + Str("council_id", composition.CouncilID.String()). + Msg("Failed to broadcast council opportunity to CHORUS agents") + } else { + log.Info(). + Str("council_id", composition.CouncilID.String()). + Int("core_roles", len(coreRoles)). + Int("optional_roles", len(optionalRoles)). + Msg("📡 Successfully broadcast council opportunity to CHORUS agents") + } + }() + } + // Deploy council agents if agent deployer is available if m.agentDeployer != nil { go m.deployCouncilAgents(ctx, taskID, composition, councilRequest, repo) } - + // Update task status to indicate council formation err = m.assignTaskToCouncil(ctx, taskID, composition.CouncilID.String()) if err != nil { @@ -988,7 +1130,7 @@ func (m *Monitor) triggerCouncilFormation(ctx context.Context, taskID string, is Str("council_id", composition.CouncilID.String()). Msg("Failed to assign task to council") } - + log.Info(). Str("task_id", taskID). Str("council_id", composition.CouncilID.String()). @@ -1000,7 +1142,7 @@ func (m *Monitor) triggerCouncilFormation(ctx context.Context, taskID string, is func (m *Monitor) deployCouncilAgents(ctx context.Context, taskID string, composition *council.CouncilComposition, request *council.CouncilFormationRequest, repo RepositoryConfig) { ctx, span := tracing.StartDeploymentSpan(ctx, "deploy_council_agents", composition.CouncilID.String()) defer span.End() - + span.SetAttributes( attribute.String("task.id", taskID), attribute.String("council.id", composition.CouncilID.String()), @@ -1009,59 +1151,77 @@ func (m *Monitor) deployCouncilAgents(ctx context.Context, taskID string, compos attribute.Int("council.optional_agents", len(composition.OptionalAgents)), attribute.String("repository.name", repo.FullName), ) - + log.Info(). Str("task_id", taskID). Str("council_id", composition.CouncilID.String()). Int("core_agents", len(composition.CoreAgents)). Int("optional_agents", len(composition.OptionalAgents)). Msg("🚀 Starting council agent deployment") - + // Create council deployment request deploymentRequest := &orchestrator.CouncilDeploymentRequest{ - CouncilID: composition.CouncilID, - ProjectName: composition.ProjectName, + CouncilID: composition.CouncilID, + ProjectName: composition.ProjectName, CouncilComposition: composition, ProjectContext: &orchestrator.CouncilProjectContext{ - ProjectName: composition.ProjectName, - Repository: request.Repository, - ProjectBrief: request.ProjectBrief, - Constraints: request.Constraints, - TechLimits: request.TechLimits, + ProjectName: composition.ProjectName, + Repository: request.Repository, + ProjectBrief: request.ProjectBrief, + Constraints: request.Constraints, + TechLimits: request.TechLimits, ComplianceNotes: request.ComplianceNotes, - Targets: request.Targets, - ExternalURL: request.ExternalURL, + Targets: request.Targets, + ExternalURL: request.ExternalURL, }, DeploymentMode: "immediate", } - - // Deploy the council agents - result, err := m.agentDeployer.DeployCouncilAgents(deploymentRequest) + + // Check if agent deployment is available (Docker enabled) + if m.agentDeployer == nil { + log.Info(). + Str("council_id", composition.CouncilID.String()). + Msg("Docker disabled - council formation completed without agent deployment") + + // Update council status to active since formation is complete + m.council.UpdateCouncilStatus(ctx, composition.CouncilID, "active") + + span.SetAttributes( + attribute.String("deployment.status", "skipped"), + attribute.String("deployment.reason", "docker_disabled"), + attribute.Int("deployment.deployed_agents", 0), + attribute.Int("deployment.errors", 0), + ) + return + } + + // Assign the council agents to available CHORUS agents instead of deploying new services + result, err := m.agentDeployer.AssignCouncilAgents(deploymentRequest) if err != nil { tracing.SetSpanError(span, err) log.Error(). Err(err). Str("council_id", composition.CouncilID.String()). Msg("Failed to deploy council agents") - + // Update council status to failed m.council.UpdateCouncilStatus(ctx, composition.CouncilID, "failed") return } - + span.SetAttributes( attribute.String("deployment.status", result.Status), attribute.Int("deployment.deployed_agents", len(result.DeployedAgents)), attribute.Int("deployment.errors", len(result.Errors)), ) - + log.Info(). Str("council_id", composition.CouncilID.String()). Str("deployment_status", result.Status). Int("deployed_agents", len(result.DeployedAgents)). Int("errors", len(result.Errors)). Msg("✅ Council agent deployment completed") - + // Log deployment details for each agent for _, agent := range result.DeployedAgents { log.Info(). @@ -1071,7 +1231,7 @@ func (m *Monitor) deployCouncilAgents(ctx context.Context, taskID string, compos Str("agent_id", agent.AgentID). Msg("🤖 Council agent deployed") } - + if len(result.Errors) > 0 { for _, errMsg := range result.Errors { log.Warn(). @@ -1089,13 +1249,13 @@ func (m *Monitor) assignTaskToCouncil(ctx context.Context, taskID, councilID str SET assigned_team_id = $1, status = $2, updated_at = NOW() WHERE id = $3 ` - + // Use council ID as team ID for consistency with existing schema _, err := m.db.Exec(ctx, query, councilID, "council_forming", taskID) if err != nil { return fmt.Errorf("failed to assign task to council: %w", err) } - + return nil } @@ -1114,4 +1274,3 @@ func (m *Monitor) extractMilestone(issue gitea.Issue) string { // For now, return empty string to avoid build issues return "" } - diff --git a/internal/p2p/broadcaster.go b/internal/p2p/broadcaster.go new file mode 100644 index 0000000..9b7a514 --- /dev/null +++ b/internal/p2p/broadcaster.go @@ -0,0 +1,325 @@ +package p2p + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + "time" + + "github.com/google/uuid" + "github.com/rs/zerolog/log" +) + +// Broadcaster handles P2P broadcasting of opportunities and events to CHORUS agents +type Broadcaster struct { + discovery *Discovery + ctx context.Context + cancel context.CancelFunc +} + +// NewBroadcaster creates a new P2P broadcaster +func NewBroadcaster(discovery *Discovery) *Broadcaster { + ctx, cancel := context.WithCancel(context.Background()) + + return &Broadcaster{ + discovery: discovery, + ctx: ctx, + cancel: cancel, + } +} + +// Close shuts down the broadcaster +func (b *Broadcaster) Close() error { + b.cancel() + return nil +} + +// CouncilOpportunity represents a council formation opportunity for agents to claim +type CouncilOpportunity struct { + CouncilID uuid.UUID `json:"council_id"` + ProjectName string `json:"project_name"` + Repository string `json:"repository"` + ProjectBrief string `json:"project_brief"` + CoreRoles []CouncilRole `json:"core_roles"` + OptionalRoles []CouncilRole `json:"optional_roles"` + UCXLAddress string `json:"ucxl_address"` + FormationDeadline time.Time `json:"formation_deadline"` + CreatedAt time.Time `json:"created_at"` + Metadata map[string]interface{} `json:"metadata"` +} + +// CouncilRole represents a role within a council that can be claimed +type CouncilRole struct { + RoleName string `json:"role_name"` + AgentName string `json:"agent_name"` + Required bool `json:"required"` + RequiredSkills []string `json:"required_skills"` + Description string `json:"description"` +} + +// RoleCounts provides claimed vs total counts for a role category +type RoleCounts struct { + Total int `json:"total"` + Claimed int `json:"claimed"` +} + +// PersonaCounts captures persona readiness across council roles. +type PersonaCounts struct { + Total int `json:"total"` + Loaded int `json:"loaded"` + CoreLoaded int `json:"core_loaded"` +} + +// CouncilStatusUpdate notifies agents about council staffing progress +type CouncilStatusUpdate struct { + CouncilID uuid.UUID `json:"council_id"` + ProjectName string `json:"project_name"` + Status string `json:"status"` + Message string `json:"message,omitempty"` + Timestamp time.Time `json:"timestamp"` + CoreRoles RoleCounts `json:"core_roles"` + Optional RoleCounts `json:"optional_roles"` + Personas PersonaCounts `json:"personas,omitempty"` + BriefDispatched bool `json:"brief_dispatched"` +} + +// BroadcastCouncilOpportunity broadcasts a council formation opportunity to all available CHORUS agents +func (b *Broadcaster) BroadcastCouncilOpportunity(ctx context.Context, opportunity *CouncilOpportunity) error { + log.Info(). + Str("council_id", opportunity.CouncilID.String()). + Str("project_name", opportunity.ProjectName). + Int("core_roles", len(opportunity.CoreRoles)). + Int("optional_roles", len(opportunity.OptionalRoles)). + Msg("📡 Broadcasting council opportunity to CHORUS agents") + + // Get all discovered agents + agents := b.discovery.GetAgents() + + if len(agents) == 0 { + log.Warn().Msg("No CHORUS agents discovered to broadcast opportunity to") + return fmt.Errorf("no agents available to receive broadcast") + } + + successCount := 0 + errorCount := 0 + + // Broadcast to each agent + for _, agent := range agents { + err := b.sendOpportunityToAgent(ctx, agent, opportunity) + if err != nil { + log.Error(). + Err(err). + Str("agent_id", agent.ID). + Str("endpoint", agent.Endpoint). + Msg("Failed to send opportunity to agent") + errorCount++ + continue + } + successCount++ + } + + log.Info(). + Int("success_count", successCount). + Int("error_count", errorCount). + Int("total_agents", len(agents)). + Msg("✅ Council opportunity broadcast completed") + + if successCount == 0 { + return fmt.Errorf("failed to broadcast to any agents") + } + + return nil +} + +// sendOpportunityToAgent sends a council opportunity to a specific CHORUS agent +func (b *Broadcaster) sendOpportunityToAgent(ctx context.Context, agent *Agent, opportunity *CouncilOpportunity) error { + // Construct the agent's opportunity endpoint + // CHORUS agents should expose /api/v1/opportunities endpoint to receive opportunities + opportunityURL := fmt.Sprintf("%s/api/v1/opportunities/council", agent.Endpoint) + + // Marshal opportunity to JSON + payload, err := json.Marshal(opportunity) + if err != nil { + return fmt.Errorf("failed to marshal opportunity: %w", err) + } + + // Create HTTP request + req, err := http.NewRequestWithContext(ctx, "POST", opportunityURL, bytes.NewBuffer(payload)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-WHOOSH-Broadcast", "council-opportunity") + req.Header.Set("X-Council-ID", opportunity.CouncilID.String()) + + // Send request with timeout + client := &http.Client{ + Timeout: 10 * time.Second, + } + + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send opportunity to agent: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + return fmt.Errorf("agent returned non-success status: %d", resp.StatusCode) + } + + log.Debug(). + Str("agent_id", agent.ID). + Str("council_id", opportunity.CouncilID.String()). + Int("status_code", resp.StatusCode). + Msg("Successfully sent council opportunity to agent") + + return nil +} + +// BroadcastAgentAssignment notifies an agent that they've been assigned to a council role +func (b *Broadcaster) BroadcastAgentAssignment(ctx context.Context, agentID string, assignment *AgentAssignment) error { + // Find the agent + agents := b.discovery.GetAgents() + var targetAgent *Agent + + for _, agent := range agents { + if agent.ID == agentID { + targetAgent = agent + break + } + } + + if targetAgent == nil { + return fmt.Errorf("agent %s not found in discovery", agentID) + } + + // Send assignment to agent + assignmentURL := fmt.Sprintf("%s/api/v1/assignments/council", targetAgent.Endpoint) + + payload, err := json.Marshal(assignment) + if err != nil { + return fmt.Errorf("failed to marshal assignment: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", assignmentURL, bytes.NewBuffer(payload)) + if err != nil { + return fmt.Errorf("failed to create request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-WHOOSH-Broadcast", "council-assignment") + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send assignment to agent: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + return fmt.Errorf("agent returned non-success status: %d", resp.StatusCode) + } + + log.Info(). + Str("agent_id", agentID). + Str("council_id", assignment.CouncilID.String()). + Str("role", assignment.RoleName). + Msg("✅ Successfully notified agent of council assignment") + + return nil +} + +// BroadcastCouncilStatusUpdate notifies all discovered agents about council staffing status +func (b *Broadcaster) BroadcastCouncilStatusUpdate(ctx context.Context, update *CouncilStatusUpdate) error { + log.Info(). + Str("council_id", update.CouncilID.String()). + Str("status", update.Status). + Msg("📢 Broadcasting council status update to CHORUS agents") + + agents := b.discovery.GetAgents() + if len(agents) == 0 { + log.Warn().Str("council_id", update.CouncilID.String()).Msg("No CHORUS agents discovered for council status update") + return fmt.Errorf("no agents available to receive council status update") + } + + successCount := 0 + errorCount := 0 + + for _, agent := range agents { + if err := b.sendCouncilStatusToAgent(ctx, agent, update); err != nil { + log.Error(). + Err(err). + Str("agent_id", agent.ID). + Str("council_id", update.CouncilID.String()). + Msg("Failed to send council status update to agent") + errorCount++ + continue + } + successCount++ + } + + log.Info(). + Str("council_id", update.CouncilID.String()). + Int("success_count", successCount). + Int("error_count", errorCount). + Int("total_agents", len(agents)). + Msg("✅ Council status update broadcast completed") + + if successCount == 0 { + return fmt.Errorf("failed to broadcast council status update to any agents") + } + + return nil +} + +func (b *Broadcaster) sendCouncilStatusToAgent(ctx context.Context, agent *Agent, update *CouncilStatusUpdate) error { + statusURL := fmt.Sprintf("%s/api/v1/councils/status", agent.Endpoint) + + payload, err := json.Marshal(update) + if err != nil { + return fmt.Errorf("failed to marshal council status update: %w", err) + } + + req, err := http.NewRequestWithContext(ctx, "POST", statusURL, bytes.NewBuffer(payload)) + if err != nil { + return fmt.Errorf("failed to create council status request: %w", err) + } + + req.Header.Set("Content-Type", "application/json") + req.Header.Set("X-WHOOSH-Broadcast", "council-status") + req.Header.Set("X-Council-ID", update.CouncilID.String()) + + client := &http.Client{Timeout: 10 * time.Second} + resp, err := client.Do(req) + if err != nil { + return fmt.Errorf("failed to send council status to agent: %w", err) + } + defer resp.Body.Close() + + if resp.StatusCode != http.StatusOK && resp.StatusCode != http.StatusAccepted { + return fmt.Errorf("agent returned non-success status: %d", resp.StatusCode) + } + + log.Debug(). + Str("agent_id", agent.ID). + Str("council_id", update.CouncilID.String()). + Int("status_code", resp.StatusCode). + Msg("Successfully sent council status update to agent") + + return nil +} + +// AgentAssignment represents an assignment of an agent to a council role +type AgentAssignment struct { + CouncilID uuid.UUID `json:"council_id"` + ProjectName string `json:"project_name"` + RoleName string `json:"role_name"` + UCXLAddress string `json:"ucxl_address"` + ProjectBrief string `json:"project_brief"` + Repository string `json:"repository"` + AssignedAt time.Time `json:"assigned_at"` + Metadata map[string]interface{} `json:"metadata"` +} diff --git a/internal/server/server.go b/internal/server/server.go index 1632b52..fdbbf98 100644 --- a/internal/server/server.go +++ b/internal/server/server.go @@ -179,8 +179,8 @@ func NewServer(cfg *config.Config, db *database.DB) (*Server, error) { log.Warn().Msg("🐳 Docker integration disabled - scaling system and council agent deployment unavailable") } - // Initialize repository monitor with team composer, council composer, and agent deployer - repoMonitor := monitor.NewMonitor(db.Pool, cfg.GITEA, teamComposer, councilComposer, agentDeployer) + // Initialize repository monitor with team composer, council composer, agent deployer, and broadcaster + repoMonitor := monitor.NewMonitor(db.Pool, cfg.GITEA, teamComposer, councilComposer, agentDeployer, p2pBroadcaster) s := &Server{ config: cfg,