Compare commits
2 Commits
1a6ac007a4
...
4173c0c8c8
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
4173c0c8c8 | ||
|
|
982b63306a |
@@ -1,199 +1,350 @@
|
||||
package gitea
|
||||
|
||||
import (
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"net/http"
|
||||
"net/url"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/chorus-services/whoosh/internal/config"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// Client represents a Gitea API client
|
||||
type Client struct {
|
||||
baseURL string
|
||||
token string
|
||||
httpClient *http.Client
|
||||
client *http.Client
|
||||
}
|
||||
|
||||
// Issue represents a Gitea issue
|
||||
type Issue struct {
|
||||
ID int `json:"id"`
|
||||
Number int `json:"number"`
|
||||
ID int64 `json:"id"`
|
||||
Number int64 `json:"number"`
|
||||
Title string `json:"title"`
|
||||
Body string `json:"body"`
|
||||
State string `json:"state"`
|
||||
URL string `json:"html_url"`
|
||||
HTMLURL string `json:"html_url"`
|
||||
Labels []struct {
|
||||
Name string `json:"name"`
|
||||
Color string `json:"color"`
|
||||
} `json:"labels"`
|
||||
Repository struct {
|
||||
Name string `json:"name"`
|
||||
FullName string `json:"full_name"`
|
||||
} `json:"repository"`
|
||||
Labels []Label `json:"labels"`
|
||||
Assignees []User `json:"assignees"`
|
||||
CreatedAt time.Time `json:"created_at"`
|
||||
UpdatedAt time.Time `json:"updated_at"`
|
||||
ClosedAt *time.Time `json:"closed_at"`
|
||||
HTMLURL string `json:"html_url"`
|
||||
User User `json:"user"`
|
||||
Repository Repository `json:"repository,omitempty"`
|
||||
}
|
||||
|
||||
// Label represents a Gitea issue label
|
||||
type Label struct {
|
||||
ID int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
Color string `json:"color"`
|
||||
Description string `json:"description"`
|
||||
}
|
||||
|
||||
// User represents a Gitea user
|
||||
type User struct {
|
||||
ID int64 `json:"id"`
|
||||
Login string `json:"login"`
|
||||
FullName string `json:"full_name"`
|
||||
Email string `json:"email"`
|
||||
AvatarURL string `json:"avatar_url"`
|
||||
}
|
||||
|
||||
// Repository represents a Gitea repository
|
||||
type Repository struct {
|
||||
ID int `json:"id"`
|
||||
ID int64 `json:"id"`
|
||||
Name string `json:"name"`
|
||||
FullName string `json:"full_name"`
|
||||
Owner User `json:"owner"`
|
||||
Description string `json:"description"`
|
||||
Private bool `json:"private"`
|
||||
HTMLURL string `json:"html_url"`
|
||||
CloneURL string `json:"clone_url"`
|
||||
SSHURL string `json:"ssh_url"`
|
||||
Language string `json:"language"`
|
||||
}
|
||||
|
||||
type WebhookPayload struct {
|
||||
Action string `json:"action"`
|
||||
Issue *Issue `json:"issue,omitempty"`
|
||||
Repository Repository `json:"repository"`
|
||||
Sender struct {
|
||||
Login string `json:"login"`
|
||||
} `json:"sender"`
|
||||
}
|
||||
|
||||
type CreateIssueRequest struct {
|
||||
Title string `json:"title"`
|
||||
Body string `json:"body"`
|
||||
Labels []string `json:"labels,omitempty"`
|
||||
Assignee string `json:"assignee,omitempty"`
|
||||
}
|
||||
|
||||
// NewClient creates a new Gitea API client
|
||||
func NewClient(cfg config.GITEAConfig) *Client {
|
||||
token := cfg.Token
|
||||
// TODO: Handle TokenFile if needed
|
||||
|
||||
return &Client{
|
||||
baseURL: cfg.BaseURL,
|
||||
token: cfg.Token,
|
||||
httpClient: &http.Client{
|
||||
token: token,
|
||||
client: &http.Client{
|
||||
Timeout: 30 * time.Second,
|
||||
},
|
||||
}
|
||||
}
|
||||
|
||||
func (c *Client) makeRequest(ctx context.Context, method, path string, body interface{}) (*http.Response, error) {
|
||||
url := c.baseURL + "/api/v1" + path
|
||||
// makeRequest makes an authenticated request to the Gitea API
|
||||
func (c *Client) makeRequest(ctx context.Context, method, endpoint string) (*http.Response, error) {
|
||||
url := fmt.Sprintf("%s/api/v1%s", c.baseURL, endpoint)
|
||||
|
||||
var reqBody *bytes.Buffer
|
||||
if body != nil {
|
||||
jsonData, err := json.Marshal(body)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to marshal request body: %w", err)
|
||||
}
|
||||
reqBody = bytes.NewBuffer(jsonData)
|
||||
}
|
||||
|
||||
var req *http.Request
|
||||
var err error
|
||||
if reqBody != nil {
|
||||
req, err = http.NewRequestWithContext(ctx, method, url, reqBody)
|
||||
} else {
|
||||
req, err = http.NewRequestWithContext(ctx, method, url, nil)
|
||||
}
|
||||
req, err := http.NewRequestWithContext(ctx, method, url, nil)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
if c.token != "" {
|
||||
req.Header.Set("Authorization", "token "+c.token)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
resp, err := c.httpClient.Do(req)
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("request failed: %w", err)
|
||||
return nil, fmt.Errorf("failed to make request: %w", err)
|
||||
}
|
||||
|
||||
if resp.StatusCode >= 400 {
|
||||
defer resp.Body.Close()
|
||||
return nil, fmt.Errorf("API request failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
return resp, nil
|
||||
}
|
||||
|
||||
func (c *Client) CreateIssue(ctx context.Context, owner, repo string, issue CreateIssueRequest) (*Issue, error) {
|
||||
path := fmt.Sprintf("/repos/%s/%s/issues", owner, repo)
|
||||
// GetRepository retrieves repository information
|
||||
func (c *Client) GetRepository(ctx context.Context, owner, repo string) (*Repository, error) {
|
||||
endpoint := fmt.Sprintf("/repos/%s/%s", url.PathEscape(owner), url.PathEscape(repo))
|
||||
|
||||
resp, err := c.makeRequest(ctx, "POST", path, issue)
|
||||
resp, err := c.makeRequest(ctx, "GET", endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to get repository: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusCreated {
|
||||
return nil, fmt.Errorf("failed to create issue: status %d", resp.StatusCode)
|
||||
var repository Repository
|
||||
if err := json.NewDecoder(resp.Body).Decode(&repository); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode repository: %w", err)
|
||||
}
|
||||
|
||||
var createdIssue Issue
|
||||
if err := json.NewDecoder(resp.Body).Decode(&createdIssue); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
log.Info().
|
||||
Str("repo", fmt.Sprintf("%s/%s", owner, repo)).
|
||||
Int("issue_number", createdIssue.Number).
|
||||
Str("title", createdIssue.Title).
|
||||
Msg("Created GITEA issue")
|
||||
|
||||
return &createdIssue, nil
|
||||
return &repository, nil
|
||||
}
|
||||
|
||||
func (c *Client) GetIssue(ctx context.Context, owner, repo string, issueNumber int) (*Issue, error) {
|
||||
path := fmt.Sprintf("/repos/%s/%s/issues/%d", owner, repo, issueNumber)
|
||||
// GetIssues retrieves issues from a repository
|
||||
func (c *Client) GetIssues(ctx context.Context, owner, repo string, opts IssueListOptions) ([]Issue, error) {
|
||||
endpoint := fmt.Sprintf("/repos/%s/%s/issues", url.PathEscape(owner), url.PathEscape(repo))
|
||||
|
||||
resp, err := c.makeRequest(ctx, "GET", path, nil)
|
||||
// Add query parameters
|
||||
params := url.Values{}
|
||||
if opts.State != "" {
|
||||
params.Set("state", opts.State)
|
||||
}
|
||||
if opts.Labels != "" {
|
||||
params.Set("labels", opts.Labels)
|
||||
}
|
||||
if opts.Page > 0 {
|
||||
params.Set("page", strconv.Itoa(opts.Page))
|
||||
}
|
||||
if opts.Limit > 0 {
|
||||
params.Set("limit", strconv.Itoa(opts.Limit))
|
||||
}
|
||||
if !opts.Since.IsZero() {
|
||||
params.Set("since", opts.Since.Format(time.RFC3339))
|
||||
}
|
||||
|
||||
if len(params) > 0 {
|
||||
endpoint += "?" + params.Encode()
|
||||
}
|
||||
|
||||
resp, err := c.makeRequest(ctx, "GET", endpoint)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to get issues: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("failed to get issue: status %d", resp.StatusCode)
|
||||
var issues []Issue
|
||||
if err := json.NewDecoder(resp.Body).Decode(&issues); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode issues: %w", err)
|
||||
}
|
||||
|
||||
// Set repository information on each issue for context
|
||||
for i := range issues {
|
||||
issues[i].Repository = Repository{
|
||||
Name: repo,
|
||||
FullName: fmt.Sprintf("%s/%s", owner, repo),
|
||||
Owner: User{Login: owner},
|
||||
}
|
||||
}
|
||||
|
||||
return issues, nil
|
||||
}
|
||||
|
||||
// GetIssue retrieves a specific issue
|
||||
func (c *Client) GetIssue(ctx context.Context, owner, repo string, issueNumber int64) (*Issue, error) {
|
||||
endpoint := fmt.Sprintf("/repos/%s/%s/issues/%d", url.PathEscape(owner), url.PathEscape(repo), issueNumber)
|
||||
|
||||
resp, err := c.makeRequest(ctx, "GET", endpoint)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get issue: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var issue Issue
|
||||
if err := json.NewDecoder(resp.Body).Decode(&issue); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
return nil, fmt.Errorf("failed to decode issue: %w", err)
|
||||
}
|
||||
|
||||
// Set repository information
|
||||
issue.Repository = Repository{
|
||||
Name: repo,
|
||||
FullName: fmt.Sprintf("%s/%s", owner, repo),
|
||||
Owner: User{Login: owner},
|
||||
}
|
||||
|
||||
return &issue, nil
|
||||
}
|
||||
|
||||
func (c *Client) ListRepositories(ctx context.Context) ([]Repository, error) {
|
||||
path := "/user/repos"
|
||||
// IssueListOptions contains options for listing issues
|
||||
type IssueListOptions struct {
|
||||
State string // "open", "closed", "all"
|
||||
Labels string // Comma-separated list of label names
|
||||
Page int // Page number (1-based)
|
||||
Limit int // Number of items per page (default: 20, max: 100)
|
||||
Since time.Time // Only show issues updated after this time
|
||||
}
|
||||
|
||||
resp, err := c.makeRequest(ctx, "GET", path, nil)
|
||||
// TestConnection tests the connection to Gitea API
|
||||
func (c *Client) TestConnection(ctx context.Context) error {
|
||||
resp, err := c.makeRequest(ctx, "GET", "/user")
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return fmt.Errorf("connection test failed: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("failed to list repositories: status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var repos []Repository
|
||||
if err := json.NewDecoder(resp.Body).Decode(&repos); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
}
|
||||
|
||||
return repos, nil
|
||||
return nil
|
||||
}
|
||||
|
||||
func (c *Client) GetRepository(ctx context.Context, owner, repo string) (*Repository, error) {
|
||||
path := fmt.Sprintf("/repos/%s/%s", owner, repo)
|
||||
// WebhookPayload represents a Gitea webhook payload
|
||||
type WebhookPayload struct {
|
||||
Action string `json:"action"`
|
||||
Number int64 `json:"number,omitempty"`
|
||||
Issue *Issue `json:"issue,omitempty"`
|
||||
Repository Repository `json:"repository"`
|
||||
Sender User `json:"sender"`
|
||||
}
|
||||
|
||||
resp, err := c.makeRequest(ctx, "GET", path, nil)
|
||||
// CreateLabelRequest represents the request to create a new label
|
||||
type CreateLabelRequest struct {
|
||||
Name string `json:"name"`
|
||||
Color string `json:"color"`
|
||||
Description string `json:"description"`
|
||||
}
|
||||
|
||||
// CreateLabel creates a new label in a repository
|
||||
func (c *Client) CreateLabel(ctx context.Context, owner, repo string, label CreateLabelRequest) (*Label, error) {
|
||||
endpoint := fmt.Sprintf("/repos/%s/%s/labels", url.PathEscape(owner), url.PathEscape(repo))
|
||||
|
||||
jsonData, err := json.Marshal(label)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
return nil, fmt.Errorf("failed to marshal label data: %w", err)
|
||||
}
|
||||
|
||||
req, err := http.NewRequestWithContext(ctx, "POST", fmt.Sprintf("%s/api/v1%s", c.baseURL, endpoint), strings.NewReader(string(jsonData)))
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to create request: %w", err)
|
||||
}
|
||||
|
||||
if c.token != "" {
|
||||
req.Header.Set("Authorization", "token "+c.token)
|
||||
}
|
||||
req.Header.Set("Content-Type", "application/json")
|
||||
req.Header.Set("Accept", "application/json")
|
||||
|
||||
resp, err := c.client.Do(req)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to make request: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
if resp.StatusCode != http.StatusOK {
|
||||
return nil, fmt.Errorf("failed to get repository: status %d", resp.StatusCode)
|
||||
if resp.StatusCode >= 400 {
|
||||
return nil, fmt.Errorf("API request failed with status %d", resp.StatusCode)
|
||||
}
|
||||
|
||||
var repository Repository
|
||||
if err := json.NewDecoder(resp.Body).Decode(&repository); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode response: %w", err)
|
||||
var createdLabel Label
|
||||
if err := json.NewDecoder(resp.Body).Decode(&createdLabel); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode label: %w", err)
|
||||
}
|
||||
|
||||
return &repository, nil
|
||||
return &createdLabel, nil
|
||||
}
|
||||
|
||||
// GetLabels retrieves all labels from a repository
|
||||
func (c *Client) GetLabels(ctx context.Context, owner, repo string) ([]Label, error) {
|
||||
endpoint := fmt.Sprintf("/repos/%s/%s/labels", url.PathEscape(owner), url.PathEscape(repo))
|
||||
|
||||
resp, err := c.makeRequest(ctx, "GET", endpoint)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to get labels: %w", err)
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
var labels []Label
|
||||
if err := json.NewDecoder(resp.Body).Decode(&labels); err != nil {
|
||||
return nil, fmt.Errorf("failed to decode labels: %w", err)
|
||||
}
|
||||
|
||||
return labels, nil
|
||||
}
|
||||
|
||||
// EnsureRequiredLabels ensures that required labels exist in the repository
|
||||
func (c *Client) EnsureRequiredLabels(ctx context.Context, owner, repo string) error {
|
||||
requiredLabels := []CreateLabelRequest{
|
||||
{
|
||||
Name: "bzzz-task",
|
||||
Color: "ff6b6b",
|
||||
Description: "Issues that should be converted to BZZZ tasks for CHORUS",
|
||||
},
|
||||
{
|
||||
Name: "whoosh-monitored",
|
||||
Color: "4ecdc4",
|
||||
Description: "Repository is monitored by WHOOSH",
|
||||
},
|
||||
{
|
||||
Name: "priority-high",
|
||||
Color: "e74c3c",
|
||||
Description: "High priority task for immediate attention",
|
||||
},
|
||||
{
|
||||
Name: "priority-medium",
|
||||
Color: "f39c12",
|
||||
Description: "Medium priority task",
|
||||
},
|
||||
{
|
||||
Name: "priority-low",
|
||||
Color: "95a5a6",
|
||||
Description: "Low priority task",
|
||||
},
|
||||
}
|
||||
|
||||
// Get existing labels
|
||||
existingLabels, err := c.GetLabels(ctx, owner, repo)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get existing labels: %w", err)
|
||||
}
|
||||
|
||||
// Create a map of existing label names for quick lookup
|
||||
existingLabelNames := make(map[string]bool)
|
||||
for _, label := range existingLabels {
|
||||
existingLabelNames[label.Name] = true
|
||||
}
|
||||
|
||||
// Create missing required labels
|
||||
for _, requiredLabel := range requiredLabels {
|
||||
if !existingLabelNames[requiredLabel.Name] {
|
||||
_, err := c.CreateLabel(ctx, owner, repo, requiredLabel)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to create label %s: %w", requiredLabel.Name, err)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
@@ -178,7 +178,7 @@ func (h *WebhookHandler) ProcessWebhook(payload *WebhookPayload) *WebhookEvent {
|
||||
log.Info().
|
||||
Str("action", payload.Action).
|
||||
Str("repository", payload.Repository.FullName).
|
||||
Int("issue_number", payload.Issue.Number).
|
||||
Int64("issue_number", payload.Issue.Number).
|
||||
Str("title", payload.Issue.Title).
|
||||
Msg("Processing task issue webhook")
|
||||
}
|
||||
|
||||
571
internal/monitor/monitor.go
Normal file
571
internal/monitor/monitor.go
Normal file
@@ -0,0 +1,571 @@
|
||||
package monitor
|
||||
|
||||
import (
|
||||
"context"
|
||||
"database/sql"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"strconv"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/chorus-services/whoosh/internal/config"
|
||||
"github.com/chorus-services/whoosh/internal/gitea"
|
||||
"github.com/jackc/pgx/v5/pgxpool"
|
||||
"github.com/rs/zerolog/log"
|
||||
)
|
||||
|
||||
// Monitor manages repository monitoring and task creation
|
||||
type Monitor struct {
|
||||
db *pgxpool.Pool
|
||||
gitea *gitea.Client
|
||||
stopCh chan struct{}
|
||||
syncInterval time.Duration
|
||||
}
|
||||
|
||||
// NewMonitor creates a new repository monitor
|
||||
func NewMonitor(db *pgxpool.Pool, giteaCfg config.GITEAConfig) *Monitor {
|
||||
return &Monitor{
|
||||
db: db,
|
||||
gitea: gitea.NewClient(giteaCfg),
|
||||
stopCh: make(chan struct{}),
|
||||
syncInterval: 5 * time.Minute, // Default sync every 5 minutes
|
||||
}
|
||||
}
|
||||
|
||||
// GetGiteaClient returns the Gitea client for external use
|
||||
func (m *Monitor) GetGiteaClient() *gitea.Client {
|
||||
return m.gitea
|
||||
}
|
||||
|
||||
// Start begins the monitoring process
|
||||
func (m *Monitor) Start(ctx context.Context) error {
|
||||
log.Info().Msg("🔍 Starting repository monitoring service")
|
||||
|
||||
// Test Gitea connection
|
||||
if err := m.gitea.TestConnection(ctx); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to connect to Gitea")
|
||||
return fmt.Errorf("gitea connection failed: %w", err)
|
||||
}
|
||||
|
||||
log.Info().Msg("✅ Gitea connection established")
|
||||
|
||||
// Start monitoring loop
|
||||
ticker := time.NewTicker(m.syncInterval)
|
||||
defer ticker.Stop()
|
||||
|
||||
// Initial sync
|
||||
m.syncAllRepositories(ctx)
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
log.Info().Msg("🛑 Repository monitoring service stopping")
|
||||
return ctx.Err()
|
||||
case <-m.stopCh:
|
||||
log.Info().Msg("🛑 Repository monitoring service stopped")
|
||||
return nil
|
||||
case <-ticker.C:
|
||||
m.syncAllRepositories(ctx)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Stop stops the monitoring service
|
||||
func (m *Monitor) Stop() {
|
||||
close(m.stopCh)
|
||||
}
|
||||
|
||||
// syncAllRepositories syncs all monitored repositories
|
||||
func (m *Monitor) syncAllRepositories(ctx context.Context) {
|
||||
log.Info().Msg("🔄 Starting repository sync cycle")
|
||||
|
||||
repos, err := m.getMonitoredRepositories(ctx)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to get monitored repositories")
|
||||
return
|
||||
}
|
||||
|
||||
if len(repos) == 0 {
|
||||
log.Info().Msg("No repositories to monitor")
|
||||
return
|
||||
}
|
||||
|
||||
log.Info().Int("count", len(repos)).Msg("Syncing repositories")
|
||||
|
||||
for _, repo := range repos {
|
||||
select {
|
||||
case <-ctx.Done():
|
||||
return
|
||||
default:
|
||||
m.syncRepository(ctx, repo)
|
||||
}
|
||||
}
|
||||
|
||||
log.Info().Msg("✅ Repository sync cycle completed")
|
||||
}
|
||||
|
||||
// syncRepository syncs a single repository
|
||||
func (m *Monitor) syncRepository(ctx context.Context, repo RepositoryConfig) {
|
||||
log.Info().
|
||||
Str("repository", repo.FullName).
|
||||
Msg("Syncing repository")
|
||||
|
||||
startTime := time.Now()
|
||||
|
||||
// Update repository status to active
|
||||
if err := m.updateRepositoryStatus(ctx, repo.ID, "active", nil); err != nil {
|
||||
log.Error().Err(err).
|
||||
Str("repository", repo.FullName).
|
||||
Msg("Failed to update repository status to active")
|
||||
}
|
||||
|
||||
// Get issues since last sync
|
||||
opts := gitea.IssueListOptions{
|
||||
State: "open",
|
||||
Limit: 100,
|
||||
}
|
||||
|
||||
if repo.LastIssueSync != nil {
|
||||
opts.Since = *repo.LastIssueSync
|
||||
}
|
||||
|
||||
// Filter by CHORUS task labels if enabled
|
||||
if repo.EnableChorusIntegration && len(repo.ChorusTaskLabels) > 0 {
|
||||
opts.Labels = strings.Join(repo.ChorusTaskLabels, ",")
|
||||
}
|
||||
|
||||
issues, err := m.gitea.GetIssues(ctx, repo.Owner, repo.Name, opts)
|
||||
if err != nil {
|
||||
m.logSyncError(ctx, repo.ID, "fetch_issues", fmt.Sprintf("Failed to fetch issues: %v", err))
|
||||
log.Error().Err(err).
|
||||
Str("repository", repo.FullName).
|
||||
Msg("Failed to fetch issues")
|
||||
|
||||
if err := m.updateRepositoryStatus(ctx, repo.ID, "error", err); err != nil {
|
||||
log.Error().Err(err).Msg("Failed to update repository status to error")
|
||||
}
|
||||
return
|
||||
}
|
||||
|
||||
created := 0
|
||||
updated := 0
|
||||
|
||||
for _, issue := range issues {
|
||||
// Skip issues without CHORUS labels if CHORUS integration is enabled
|
||||
if repo.EnableChorusIntegration && !m.hasChorusLabels(issue, repo.ChorusTaskLabels) {
|
||||
continue
|
||||
}
|
||||
|
||||
taskID, isNew, err := m.createOrUpdateTask(ctx, repo, issue)
|
||||
if err != nil {
|
||||
log.Error().Err(err).
|
||||
Str("repository", repo.FullName).
|
||||
Int64("issue", issue.Number).
|
||||
Msg("Failed to create/update task")
|
||||
continue
|
||||
}
|
||||
|
||||
if isNew {
|
||||
created++
|
||||
log.Info().
|
||||
Str("repository", repo.FullName).
|
||||
Int64("issue", issue.Number).
|
||||
Str("task_id", taskID).
|
||||
Msg("Created task from issue")
|
||||
} else {
|
||||
updated++
|
||||
}
|
||||
}
|
||||
|
||||
duration := time.Since(startTime)
|
||||
|
||||
// Update repository sync timestamps and statistics
|
||||
if err := m.updateRepositorySyncInfo(ctx, repo.ID, time.Now(), created, updated); err != nil {
|
||||
log.Error().Err(err).
|
||||
Str("repository", repo.FullName).
|
||||
Msg("Failed to update repository sync info")
|
||||
}
|
||||
|
||||
// Log successful sync
|
||||
m.logSyncSuccess(ctx, repo.ID, "full_sync", duration, len(issues), created, updated)
|
||||
|
||||
log.Info().
|
||||
Str("repository", repo.FullName).
|
||||
Int("issues_processed", len(issues)).
|
||||
Int("tasks_created", created).
|
||||
Int("tasks_updated", updated).
|
||||
Dur("duration", duration).
|
||||
Msg("Repository sync completed")
|
||||
}
|
||||
|
||||
// createOrUpdateTask creates a new task or updates an existing one from a Gitea issue
|
||||
func (m *Monitor) createOrUpdateTask(ctx context.Context, repo RepositoryConfig, issue gitea.Issue) (string, bool, error) {
|
||||
// Check if task already exists
|
||||
var existingTaskID sql.NullString
|
||||
query := `SELECT id FROM tasks WHERE external_id = $1 AND source_type = $2`
|
||||
err := m.db.QueryRow(ctx, query, strconv.FormatInt(issue.Number, 10), repo.SourceType).Scan(&existingTaskID)
|
||||
|
||||
if err != nil && err != sql.ErrNoRows {
|
||||
return "", false, fmt.Errorf("failed to check existing task: %w", err)
|
||||
}
|
||||
|
||||
// Prepare labels
|
||||
labels := make([]string, len(issue.Labels))
|
||||
for i, label := range issue.Labels {
|
||||
labels[i] = label.Name
|
||||
}
|
||||
labelsJSON, _ := json.Marshal(labels)
|
||||
|
||||
// Determine task status
|
||||
status := "open"
|
||||
if issue.State == "closed" {
|
||||
status = "completed"
|
||||
}
|
||||
|
||||
// Determine priority from labels
|
||||
priority := m.extractPriorityFromLabels(issue.Labels)
|
||||
|
||||
// Extract tech stack from labels and description
|
||||
techStack := m.extractTechStackFromIssue(issue)
|
||||
techStackJSON, _ := json.Marshal(techStack)
|
||||
|
||||
if existingTaskID.Valid {
|
||||
// Update existing task
|
||||
updateQuery := `
|
||||
UPDATE tasks SET
|
||||
title = $1,
|
||||
description = $2,
|
||||
status = $3,
|
||||
priority = $4,
|
||||
labels = $5,
|
||||
tech_stack = $6,
|
||||
external_updated_at = $7,
|
||||
updated_at = NOW()
|
||||
WHERE id = $8
|
||||
`
|
||||
|
||||
_, err = m.db.Exec(ctx, updateQuery,
|
||||
issue.Title,
|
||||
issue.Body,
|
||||
status,
|
||||
priority,
|
||||
labelsJSON,
|
||||
techStackJSON,
|
||||
issue.UpdatedAt,
|
||||
existingTaskID.String,
|
||||
)
|
||||
|
||||
if err != nil {
|
||||
return "", false, fmt.Errorf("failed to update task: %w", err)
|
||||
}
|
||||
|
||||
return existingTaskID.String, false, nil
|
||||
} else {
|
||||
// Create new task
|
||||
var taskID string
|
||||
insertQuery := `
|
||||
INSERT INTO tasks (
|
||||
external_id, external_url, source_type, source_config,
|
||||
title, description, status, priority,
|
||||
repository, repository_id, labels, tech_stack,
|
||||
external_created_at, external_updated_at
|
||||
) VALUES (
|
||||
$1, $2, $3, $4, $5, $6, $7, $8, $9, $10, $11, $12, $13, $14
|
||||
) RETURNING id
|
||||
`
|
||||
|
||||
// Prepare source config
|
||||
sourceConfig := map[string]interface{}{
|
||||
"repository_id": repo.ID,
|
||||
"issue_number": issue.Number,
|
||||
"issue_id": issue.ID,
|
||||
}
|
||||
sourceConfigJSON, _ := json.Marshal(sourceConfig)
|
||||
|
||||
err = m.db.QueryRow(ctx, insertQuery,
|
||||
strconv.FormatInt(issue.Number, 10), // external_id
|
||||
issue.HTMLURL, // external_url
|
||||
repo.SourceType, // source_type
|
||||
sourceConfigJSON, // source_config
|
||||
issue.Title, // title
|
||||
issue.Body, // description
|
||||
status, // status
|
||||
priority, // priority
|
||||
repo.FullName, // repository
|
||||
repo.ID, // repository_id
|
||||
labelsJSON, // labels
|
||||
techStackJSON, // tech_stack
|
||||
issue.CreatedAt, // external_created_at
|
||||
issue.UpdatedAt, // external_updated_at
|
||||
).Scan(&taskID)
|
||||
|
||||
if err != nil {
|
||||
return "", false, fmt.Errorf("failed to create task: %w", err)
|
||||
}
|
||||
|
||||
return taskID, true, nil
|
||||
}
|
||||
}
|
||||
|
||||
// hasChorusLabels checks if an issue has any of the required CHORUS labels
|
||||
func (m *Monitor) hasChorusLabels(issue gitea.Issue, requiredLabels []string) bool {
|
||||
if len(requiredLabels) == 0 {
|
||||
return true // If no specific labels required, all issues qualify
|
||||
}
|
||||
|
||||
issueLabels := make(map[string]bool)
|
||||
for _, label := range issue.Labels {
|
||||
issueLabels[strings.ToLower(label.Name)] = true
|
||||
}
|
||||
|
||||
for _, required := range requiredLabels {
|
||||
if issueLabels[strings.ToLower(required)] {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
return false
|
||||
}
|
||||
|
||||
// extractPriorityFromLabels determines priority from issue labels
|
||||
func (m *Monitor) extractPriorityFromLabels(labels []gitea.Label) string {
|
||||
priorities := map[string]string{
|
||||
"critical": "critical",
|
||||
"urgent": "critical",
|
||||
"high": "high",
|
||||
"medium": "medium",
|
||||
"low": "low",
|
||||
"minor": "low",
|
||||
}
|
||||
|
||||
for _, label := range labels {
|
||||
if priority, exists := priorities[strings.ToLower(label.Name)]; exists {
|
||||
return priority
|
||||
}
|
||||
}
|
||||
|
||||
return "medium" // Default priority
|
||||
}
|
||||
|
||||
// extractTechStackFromIssue extracts technology stack from issue labels and content
|
||||
func (m *Monitor) extractTechStackFromIssue(issue gitea.Issue) []string {
|
||||
techStack := make(map[string]bool)
|
||||
|
||||
// Extract from labels
|
||||
techLabels := []string{
|
||||
"go", "golang", "javascript", "typescript", "python", "rust", "java",
|
||||
"react", "vue", "angular", "node.js", "express", "fastapi", "gin",
|
||||
"postgresql", "mysql", "mongodb", "redis", "docker", "kubernetes",
|
||||
"api", "frontend", "backend", "database", "devops", "ci/cd",
|
||||
}
|
||||
|
||||
for _, label := range issue.Labels {
|
||||
labelName := strings.ToLower(label.Name)
|
||||
for _, tech := range techLabels {
|
||||
if strings.Contains(labelName, tech) {
|
||||
techStack[tech] = true
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Convert map to slice
|
||||
result := make([]string, 0, len(techStack))
|
||||
for tech := range techStack {
|
||||
result = append(result, tech)
|
||||
}
|
||||
|
||||
return result
|
||||
}
|
||||
|
||||
// RepositoryConfig represents a monitored repository configuration
|
||||
type RepositoryConfig struct {
|
||||
ID string `db:"id"`
|
||||
Name string `db:"name"`
|
||||
Owner string `db:"owner"`
|
||||
FullName string `db:"full_name"`
|
||||
URL string `db:"url"`
|
||||
SourceType string `db:"source_type"`
|
||||
MonitorIssues bool `db:"monitor_issues"`
|
||||
EnableChorusIntegration bool `db:"enable_chorus_integration"`
|
||||
ChorusTaskLabels []string `db:"chorus_task_labels"`
|
||||
LastSync *time.Time `db:"last_sync_at"`
|
||||
LastIssueSync *time.Time `db:"last_issue_sync"`
|
||||
SyncStatus string `db:"sync_status"`
|
||||
}
|
||||
|
||||
// getMonitoredRepositories retrieves all repositories that should be monitored
|
||||
func (m *Monitor) getMonitoredRepositories(ctx context.Context) ([]RepositoryConfig, error) {
|
||||
query := `
|
||||
SELECT id, name, owner, full_name, url, source_type, monitor_issues,
|
||||
enable_chorus_integration, chorus_task_labels, last_sync_at,
|
||||
last_issue_sync, sync_status
|
||||
FROM repositories
|
||||
WHERE monitor_issues = true AND sync_status != 'disabled'
|
||||
ORDER BY last_sync_at ASC NULLS FIRST
|
||||
`
|
||||
|
||||
rows, err := m.db.Query(ctx, query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to query repositories: %w", err)
|
||||
}
|
||||
defer rows.Close()
|
||||
|
||||
var repos []RepositoryConfig
|
||||
for rows.Next() {
|
||||
var repo RepositoryConfig
|
||||
var chorusLabelsJSON []byte
|
||||
|
||||
err := rows.Scan(
|
||||
&repo.ID, &repo.Name, &repo.Owner, &repo.FullName, &repo.URL,
|
||||
&repo.SourceType, &repo.MonitorIssues, &repo.EnableChorusIntegration,
|
||||
&chorusLabelsJSON, &repo.LastSync, &repo.LastIssueSync, &repo.SyncStatus,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to scan repository row")
|
||||
continue
|
||||
}
|
||||
|
||||
// Parse CHORUS task labels
|
||||
if err := json.Unmarshal(chorusLabelsJSON, &repo.ChorusTaskLabels); err != nil {
|
||||
log.Error().Err(err).Str("repository", repo.FullName).Msg("Failed to parse CHORUS task labels")
|
||||
repo.ChorusTaskLabels = []string{"bzzz-task", "chorus-task"} // Default labels
|
||||
}
|
||||
|
||||
repos = append(repos, repo)
|
||||
}
|
||||
|
||||
return repos, nil
|
||||
}
|
||||
|
||||
// updateRepositoryStatus updates the sync status of a repository
|
||||
func (m *Monitor) updateRepositoryStatus(ctx context.Context, repoID, status string, err error) error {
|
||||
var errorMsg sql.NullString
|
||||
if err != nil {
|
||||
errorMsg.String = err.Error()
|
||||
errorMsg.Valid = true
|
||||
}
|
||||
|
||||
query := `
|
||||
UPDATE repositories
|
||||
SET sync_status = $1, sync_error = $2, updated_at = NOW()
|
||||
WHERE id = $3
|
||||
`
|
||||
|
||||
_, dbErr := m.db.Exec(ctx, query, status, errorMsg, repoID)
|
||||
if dbErr != nil {
|
||||
return fmt.Errorf("failed to update repository status: %w", dbErr)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// updateRepositorySyncInfo updates repository sync information
|
||||
func (m *Monitor) updateRepositorySyncInfo(ctx context.Context, repoID string, syncTime time.Time, created, updated int) error {
|
||||
query := `
|
||||
UPDATE repositories
|
||||
SET last_sync_at = $1, last_issue_sync = $1,
|
||||
total_tasks_created = total_tasks_created + $2,
|
||||
updated_at = NOW()
|
||||
WHERE id = $3
|
||||
`
|
||||
|
||||
_, err := m.db.Exec(ctx, query, syncTime, created, repoID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to update repository sync info: %w", err)
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// logSyncSuccess logs a successful sync operation
|
||||
func (m *Monitor) logSyncSuccess(ctx context.Context, repoID, operation string, duration time.Duration, processed, created, updated int) {
|
||||
query := `
|
||||
INSERT INTO repository_sync_logs (
|
||||
repository_id, sync_type, operation, status, message,
|
||||
items_processed, items_created, items_updated, duration_ms
|
||||
) VALUES ($1, $2, $3, $4, $5, $6, $7, $8, $9)
|
||||
`
|
||||
|
||||
message := fmt.Sprintf("Processed %d items, created %d tasks, updated %d tasks in %v", processed, created, updated, duration)
|
||||
|
||||
_, err := m.db.Exec(ctx, query,
|
||||
repoID, "full_sync", operation, "success", message,
|
||||
processed, created, updated, duration.Milliseconds(),
|
||||
)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to log sync success")
|
||||
}
|
||||
}
|
||||
|
||||
// logSyncError logs a sync error
|
||||
func (m *Monitor) logSyncError(ctx context.Context, repoID, operation, errorMsg string) {
|
||||
query := `
|
||||
INSERT INTO repository_sync_logs (
|
||||
repository_id, sync_type, operation, status, message, error_details
|
||||
) VALUES ($1, $2, $3, $4, $5, $6)
|
||||
`
|
||||
|
||||
errorDetails := map[string]interface{}{
|
||||
"error": errorMsg,
|
||||
"timestamp": time.Now().Format(time.RFC3339),
|
||||
}
|
||||
errorDetailsJSON, _ := json.Marshal(errorDetails)
|
||||
|
||||
_, err := m.db.Exec(ctx, query,
|
||||
repoID, "full_sync", operation, "error", errorMsg, errorDetailsJSON,
|
||||
)
|
||||
if err != nil {
|
||||
log.Error().Err(err).Msg("Failed to log sync error")
|
||||
}
|
||||
}
|
||||
|
||||
// SyncRepository manually syncs a specific repository by ID
|
||||
func (m *Monitor) SyncRepository(ctx context.Context, repoID string) error {
|
||||
log.Info().Str("repository_id", repoID).Msg("Manual repository sync requested")
|
||||
|
||||
// Get repository configuration
|
||||
repo, err := m.getRepositoryByID(ctx, repoID)
|
||||
if err != nil {
|
||||
return fmt.Errorf("failed to get repository: %w", err)
|
||||
}
|
||||
|
||||
// Sync the repository
|
||||
m.syncRepository(ctx, *repo)
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// getRepositoryByID retrieves a specific repository configuration by ID
|
||||
func (m *Monitor) getRepositoryByID(ctx context.Context, repoID string) (*RepositoryConfig, error) {
|
||||
query := `
|
||||
SELECT id, name, owner, full_name, url, source_type, monitor_issues,
|
||||
enable_chorus_integration, chorus_task_labels, last_sync_at,
|
||||
last_issue_sync, sync_status
|
||||
FROM repositories
|
||||
WHERE id = $1
|
||||
`
|
||||
|
||||
var repo RepositoryConfig
|
||||
var chorusLabelsJSON []byte
|
||||
|
||||
err := m.db.QueryRow(ctx, query, repoID).Scan(
|
||||
&repo.ID, &repo.Name, &repo.Owner, &repo.FullName, &repo.URL,
|
||||
&repo.SourceType, &repo.MonitorIssues, &repo.EnableChorusIntegration,
|
||||
&chorusLabelsJSON, &repo.LastSync, &repo.LastIssueSync, &repo.SyncStatus,
|
||||
)
|
||||
if err != nil {
|
||||
if err == sql.ErrNoRows {
|
||||
return nil, fmt.Errorf("repository not found: %s", repoID)
|
||||
}
|
||||
return nil, fmt.Errorf("failed to query repository: %w", err)
|
||||
}
|
||||
|
||||
// Parse CHORUS task labels
|
||||
if err := json.Unmarshal(chorusLabelsJSON, &repo.ChorusTaskLabels); err != nil {
|
||||
log.Error().Err(err).Str("repository", repo.FullName).Msg("Failed to parse CHORUS task labels")
|
||||
repo.ChorusTaskLabels = []string{"bzzz-task", "chorus-task"} // Default labels
|
||||
}
|
||||
|
||||
return &repo, nil
|
||||
}
|
||||
File diff suppressed because it is too large
Load Diff
9
migrations/003_add_repositories_table.down.sql
Normal file
9
migrations/003_add_repositories_table.down.sql
Normal file
@@ -0,0 +1,9 @@
|
||||
-- Rollback repository monitoring tables
|
||||
|
||||
-- Remove new column from tasks table
|
||||
ALTER TABLE tasks DROP COLUMN IF EXISTS repository_id;
|
||||
|
||||
-- Drop tables in reverse order
|
||||
DROP TABLE IF EXISTS repository_sync_logs;
|
||||
DROP TABLE IF EXISTS repository_webhooks;
|
||||
DROP TABLE IF EXISTS repositories;
|
||||
127
migrations/003_add_repositories_table.up.sql
Normal file
127
migrations/003_add_repositories_table.up.sql
Normal file
@@ -0,0 +1,127 @@
|
||||
-- Repository monitoring table for WHOOSH
|
||||
-- Tracks Gitea repositories for issue monitoring and CHORUS integration
|
||||
|
||||
CREATE TABLE repositories (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
|
||||
-- Repository identification
|
||||
name VARCHAR(255) NOT NULL, -- e.g., "WHOOSH", "CHORUS"
|
||||
owner VARCHAR(255) NOT NULL, -- e.g., "tony", "chorus-services"
|
||||
full_name VARCHAR(255) NOT NULL, -- e.g., "tony/WHOOSH"
|
||||
|
||||
-- Repository URLs and access
|
||||
url TEXT NOT NULL, -- Full Gitea URL, e.g., "https://gitea.chorus.services/tony/WHOOSH"
|
||||
clone_url TEXT, -- Git clone URL
|
||||
ssh_url TEXT, -- SSH clone URL
|
||||
|
||||
-- Repository configuration
|
||||
source_type VARCHAR(50) NOT NULL DEFAULT 'gitea', -- 'gitea', 'github', 'gitlab'
|
||||
source_config JSONB DEFAULT '{}', -- Source-specific configuration (API tokens, etc.)
|
||||
|
||||
-- Monitoring settings
|
||||
monitor_issues BOOLEAN NOT NULL DEFAULT true,
|
||||
monitor_pull_requests BOOLEAN NOT NULL DEFAULT false,
|
||||
monitor_releases BOOLEAN NOT NULL DEFAULT false,
|
||||
|
||||
-- CHORUS/BZZZ integration settings
|
||||
enable_chorus_integration BOOLEAN NOT NULL DEFAULT true,
|
||||
chorus_task_labels JSONB DEFAULT '["bzzz-task", "chorus-task"]', -- Labels that trigger CHORUS tasks
|
||||
auto_assign_teams BOOLEAN NOT NULL DEFAULT true,
|
||||
|
||||
-- Repository metadata
|
||||
description TEXT,
|
||||
default_branch VARCHAR(100) DEFAULT 'main',
|
||||
is_private BOOLEAN DEFAULT false,
|
||||
language VARCHAR(100),
|
||||
topics JSONB DEFAULT '[]',
|
||||
|
||||
-- Monitoring state
|
||||
last_sync_at TIMESTAMP WITH TIME ZONE,
|
||||
last_issue_sync TIMESTAMP WITH TIME ZONE,
|
||||
sync_status VARCHAR(50) NOT NULL DEFAULT 'pending', -- 'pending', 'active', 'error', 'disabled'
|
||||
sync_error TEXT,
|
||||
|
||||
-- Statistics
|
||||
open_issues_count INTEGER DEFAULT 0,
|
||||
closed_issues_count INTEGER DEFAULT 0,
|
||||
total_tasks_created INTEGER DEFAULT 0,
|
||||
|
||||
-- Timestamps
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
|
||||
-- Constraints
|
||||
UNIQUE(full_name, source_type) -- Prevent duplicate repositories
|
||||
);
|
||||
|
||||
-- Repository webhooks for real-time updates
|
||||
CREATE TABLE repository_webhooks (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
repository_id UUID NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
|
||||
|
||||
-- Webhook configuration
|
||||
webhook_url TEXT NOT NULL, -- The webhook endpoint URL
|
||||
webhook_secret VARCHAR(255), -- Secret for webhook validation
|
||||
events JSONB NOT NULL DEFAULT '["issues", "pull_request"]', -- Events to listen for
|
||||
|
||||
-- Webhook state
|
||||
is_active BOOLEAN NOT NULL DEFAULT true,
|
||||
last_delivery_at TIMESTAMP WITH TIME ZONE,
|
||||
delivery_count INTEGER DEFAULT 0,
|
||||
failure_count INTEGER DEFAULT 0,
|
||||
|
||||
-- Timestamps
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW(),
|
||||
updated_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- Repository sync logs for debugging and monitoring
|
||||
CREATE TABLE repository_sync_logs (
|
||||
id UUID PRIMARY KEY DEFAULT gen_random_uuid(),
|
||||
repository_id UUID NOT NULL REFERENCES repositories(id) ON DELETE CASCADE,
|
||||
|
||||
-- Sync operation details
|
||||
sync_type VARCHAR(50) NOT NULL, -- 'full_sync', 'incremental_sync', 'webhook'
|
||||
operation VARCHAR(100) NOT NULL, -- 'fetch_issues', 'create_task', 'update_task'
|
||||
|
||||
-- Sync results
|
||||
status VARCHAR(50) NOT NULL, -- 'success', 'error', 'warning'
|
||||
message TEXT,
|
||||
error_details JSONB,
|
||||
|
||||
-- Metrics
|
||||
items_processed INTEGER DEFAULT 0,
|
||||
items_created INTEGER DEFAULT 0,
|
||||
items_updated INTEGER DEFAULT 0,
|
||||
duration_ms INTEGER,
|
||||
|
||||
-- Context
|
||||
external_id VARCHAR(255), -- Issue ID, PR ID, etc.
|
||||
external_url TEXT,
|
||||
|
||||
-- Timestamps
|
||||
created_at TIMESTAMP WITH TIME ZONE DEFAULT NOW()
|
||||
);
|
||||
|
||||
-- Indexes for performance
|
||||
CREATE INDEX idx_repositories_source_type ON repositories(source_type);
|
||||
CREATE INDEX idx_repositories_full_name ON repositories(full_name);
|
||||
CREATE INDEX idx_repositories_sync_status ON repositories(sync_status);
|
||||
CREATE INDEX idx_repositories_monitor_issues ON repositories(monitor_issues);
|
||||
CREATE INDEX idx_repositories_enable_chorus ON repositories(enable_chorus_integration);
|
||||
CREATE INDEX idx_repositories_last_sync ON repositories(last_sync_at);
|
||||
|
||||
CREATE INDEX idx_repository_webhooks_repository_id ON repository_webhooks(repository_id);
|
||||
CREATE INDEX idx_repository_webhooks_active ON repository_webhooks(is_active);
|
||||
|
||||
CREATE INDEX idx_repository_sync_logs_repository_id ON repository_sync_logs(repository_id);
|
||||
CREATE INDEX idx_repository_sync_logs_created_at ON repository_sync_logs(created_at);
|
||||
CREATE INDEX idx_repository_sync_logs_status ON repository_sync_logs(status);
|
||||
CREATE INDEX idx_repository_sync_logs_sync_type ON repository_sync_logs(sync_type);
|
||||
|
||||
-- Add repository relationship to tasks table
|
||||
ALTER TABLE tasks ADD COLUMN IF NOT EXISTS repository_id UUID REFERENCES repositories(id) ON DELETE SET NULL;
|
||||
CREATE INDEX IF NOT EXISTS idx_tasks_repository_id ON tasks(repository_id);
|
||||
|
||||
-- Update tasks table to improve repository tracking
|
||||
ALTER TABLE tasks ALTER COLUMN repository TYPE TEXT; -- Allow longer repository names
|
||||
Reference in New Issue
Block a user