 131868bdca
			
		
	
	131868bdca
	
	
	
		
			
			Major security, observability, and configuration improvements:
## Security Hardening
- Implemented configurable CORS (no more wildcards)
- Added comprehensive auth middleware for admin endpoints
- Enhanced webhook HMAC validation
- Added input validation and rate limiting
- Security headers and CSP policies
## Configuration Management
- Made N8N webhook URL configurable (WHOOSH_N8N_BASE_URL)
- Replaced all hardcoded endpoints with environment variables
- Added feature flags for LLM vs heuristic composition
- Gitea fetch hardening with EAGER_FILTER and FULL_RESCAN options
## API Completeness
- Implemented GetCouncilComposition function
- Added GET /api/v1/councils/{id} endpoint
- Council artifacts API (POST/GET /api/v1/councils/{id}/artifacts)
- /admin/health/details endpoint with component status
- Database lookup for repository URLs (no hardcoded fallbacks)
## Observability & Performance
- Added OpenTelemetry distributed tracing with goal/pulse correlation
- Performance optimization database indexes
- Comprehensive health monitoring
- Enhanced logging and error handling
## Infrastructure
- Production-ready P2P discovery (replaces mock implementation)
- Removed unused Redis configuration
- Enhanced Docker Swarm integration
- Added migration files for performance indexes
## Code Quality
- Comprehensive input validation
- Graceful error handling and failsafe fallbacks
- Backwards compatibility maintained
- Following security best practices
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
		
	
		
			
				
	
	
		
			494 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			494 lines
		
	
	
		
			14 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| //go:build go1.9
 | |
| // +build go1.9
 | |
| 
 | |
| package postgres
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"database/sql"
 | |
| 	"fmt"
 | |
| 	"io"
 | |
| 	nurl "net/url"
 | |
| 	"regexp"
 | |
| 	"strconv"
 | |
| 	"strings"
 | |
| 	"time"
 | |
| 
 | |
| 	"go.uber.org/atomic"
 | |
| 
 | |
| 	"github.com/golang-migrate/migrate/v4"
 | |
| 	"github.com/golang-migrate/migrate/v4/database"
 | |
| 	"github.com/golang-migrate/migrate/v4/database/multistmt"
 | |
| 	"github.com/hashicorp/go-multierror"
 | |
| 	"github.com/lib/pq"
 | |
| )
 | |
| 
 | |
| func init() {
 | |
| 	db := Postgres{}
 | |
| 	database.Register("postgres", &db)
 | |
| 	database.Register("postgresql", &db)
 | |
| }
 | |
| 
 | |
| var (
 | |
| 	multiStmtDelimiter = []byte(";")
 | |
| 
 | |
| 	DefaultMigrationsTable       = "schema_migrations"
 | |
| 	DefaultMultiStatementMaxSize = 10 * 1 << 20 // 10 MB
 | |
| )
 | |
| 
 | |
| var (
 | |
| 	ErrNilConfig      = fmt.Errorf("no config")
 | |
| 	ErrNoDatabaseName = fmt.Errorf("no database name")
 | |
| 	ErrNoSchema       = fmt.Errorf("no schema")
 | |
| 	ErrDatabaseDirty  = fmt.Errorf("database is dirty")
 | |
| )
 | |
| 
 | |
| type Config struct {
 | |
| 	MigrationsTable       string
 | |
| 	MigrationsTableQuoted bool
 | |
| 	MultiStatementEnabled bool
 | |
| 	DatabaseName          string
 | |
| 	SchemaName            string
 | |
| 	migrationsSchemaName  string
 | |
| 	migrationsTableName   string
 | |
| 	StatementTimeout      time.Duration
 | |
| 	MultiStatementMaxSize int
 | |
| }
 | |
| 
 | |
| type Postgres struct {
 | |
| 	// Locking and unlocking need to use the same connection
 | |
| 	conn     *sql.Conn
 | |
| 	db       *sql.DB
 | |
| 	isLocked atomic.Bool
 | |
| 
 | |
| 	// Open and WithInstance need to guarantee that config is never nil
 | |
| 	config *Config
 | |
| }
 | |
| 
 | |
| func WithConnection(ctx context.Context, conn *sql.Conn, config *Config) (*Postgres, error) {
 | |
| 	if config == nil {
 | |
| 		return nil, ErrNilConfig
 | |
| 	}
 | |
| 
 | |
| 	if err := conn.PingContext(ctx); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	if config.DatabaseName == "" {
 | |
| 		query := `SELECT CURRENT_DATABASE()`
 | |
| 		var databaseName string
 | |
| 		if err := conn.QueryRowContext(ctx, query).Scan(&databaseName); err != nil {
 | |
| 			return nil, &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 		}
 | |
| 
 | |
| 		if len(databaseName) == 0 {
 | |
| 			return nil, ErrNoDatabaseName
 | |
| 		}
 | |
| 
 | |
| 		config.DatabaseName = databaseName
 | |
| 	}
 | |
| 
 | |
| 	if config.SchemaName == "" {
 | |
| 		query := `SELECT CURRENT_SCHEMA()`
 | |
| 		var schemaName sql.NullString
 | |
| 		if err := conn.QueryRowContext(ctx, query).Scan(&schemaName); err != nil {
 | |
| 			return nil, &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 		}
 | |
| 
 | |
| 		if !schemaName.Valid {
 | |
| 			return nil, ErrNoSchema
 | |
| 		}
 | |
| 
 | |
| 		config.SchemaName = schemaName.String
 | |
| 	}
 | |
| 
 | |
| 	if len(config.MigrationsTable) == 0 {
 | |
| 		config.MigrationsTable = DefaultMigrationsTable
 | |
| 	}
 | |
| 
 | |
| 	config.migrationsSchemaName = config.SchemaName
 | |
| 	config.migrationsTableName = config.MigrationsTable
 | |
| 	if config.MigrationsTableQuoted {
 | |
| 		re := regexp.MustCompile(`"(.*?)"`)
 | |
| 		result := re.FindAllStringSubmatch(config.MigrationsTable, -1)
 | |
| 		config.migrationsTableName = result[len(result)-1][1]
 | |
| 		if len(result) == 2 {
 | |
| 			config.migrationsSchemaName = result[0][1]
 | |
| 		} else if len(result) > 2 {
 | |
| 			return nil, fmt.Errorf("\"%s\" MigrationsTable contains too many dot characters", config.MigrationsTable)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	px := &Postgres{
 | |
| 		conn:   conn,
 | |
| 		config: config,
 | |
| 	}
 | |
| 
 | |
| 	if err := px.ensureVersionTable(); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return px, nil
 | |
| }
 | |
| 
 | |
| func WithInstance(instance *sql.DB, config *Config) (database.Driver, error) {
 | |
| 	ctx := context.Background()
 | |
| 
 | |
| 	if err := instance.Ping(); err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	conn, err := instance.Conn(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	px, err := WithConnection(ctx, conn, config)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 	px.db = instance
 | |
| 	return px, nil
 | |
| }
 | |
| 
 | |
| func (p *Postgres) Open(url string) (database.Driver, error) {
 | |
| 	purl, err := nurl.Parse(url)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	db, err := sql.Open("postgres", migrate.FilterCustomQuery(purl).String())
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	migrationsTable := purl.Query().Get("x-migrations-table")
 | |
| 	migrationsTableQuoted := false
 | |
| 	if s := purl.Query().Get("x-migrations-table-quoted"); len(s) > 0 {
 | |
| 		migrationsTableQuoted, err = strconv.ParseBool(s)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("Unable to parse option x-migrations-table-quoted: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 	if (len(migrationsTable) > 0) && (migrationsTableQuoted) && ((migrationsTable[0] != '"') || (migrationsTable[len(migrationsTable)-1] != '"')) {
 | |
| 		return nil, fmt.Errorf("x-migrations-table must be quoted (for instance '\"migrate\".\"schema_migrations\"') when x-migrations-table-quoted is enabled, current value is: %s", migrationsTable)
 | |
| 	}
 | |
| 
 | |
| 	statementTimeoutString := purl.Query().Get("x-statement-timeout")
 | |
| 	statementTimeout := 0
 | |
| 	if statementTimeoutString != "" {
 | |
| 		statementTimeout, err = strconv.Atoi(statementTimeoutString)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	multiStatementMaxSize := DefaultMultiStatementMaxSize
 | |
| 	if s := purl.Query().Get("x-multi-statement-max-size"); len(s) > 0 {
 | |
| 		multiStatementMaxSize, err = strconv.Atoi(s)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 		if multiStatementMaxSize <= 0 {
 | |
| 			multiStatementMaxSize = DefaultMultiStatementMaxSize
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	multiStatementEnabled := false
 | |
| 	if s := purl.Query().Get("x-multi-statement"); len(s) > 0 {
 | |
| 		multiStatementEnabled, err = strconv.ParseBool(s)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("Unable to parse option x-multi-statement: %w", err)
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	px, err := WithInstance(db, &Config{
 | |
| 		DatabaseName:          purl.Path,
 | |
| 		MigrationsTable:       migrationsTable,
 | |
| 		MigrationsTableQuoted: migrationsTableQuoted,
 | |
| 		StatementTimeout:      time.Duration(statementTimeout) * time.Millisecond,
 | |
| 		MultiStatementEnabled: multiStatementEnabled,
 | |
| 		MultiStatementMaxSize: multiStatementMaxSize,
 | |
| 	})
 | |
| 
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return px, nil
 | |
| }
 | |
| 
 | |
| func (p *Postgres) Close() error {
 | |
| 	connErr := p.conn.Close()
 | |
| 	var dbErr error
 | |
| 	if p.db != nil {
 | |
| 		dbErr = p.db.Close()
 | |
| 	}
 | |
| 
 | |
| 	if connErr != nil || dbErr != nil {
 | |
| 		return fmt.Errorf("conn: %v, db: %v", connErr, dbErr)
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // https://www.postgresql.org/docs/9.6/static/explicit-locking.html#ADVISORY-LOCKS
 | |
| func (p *Postgres) Lock() error {
 | |
| 	return database.CasRestoreOnErr(&p.isLocked, false, true, database.ErrLocked, func() error {
 | |
| 		aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		// This will wait indefinitely until the lock can be acquired.
 | |
| 		query := `SELECT pg_advisory_lock($1)`
 | |
| 		if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
 | |
| 			return &database.Error{OrigErr: err, Err: "try lock failed", Query: []byte(query)}
 | |
| 		}
 | |
| 
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (p *Postgres) Unlock() error {
 | |
| 	return database.CasRestoreOnErr(&p.isLocked, true, false, database.ErrNotLocked, func() error {
 | |
| 		aid, err := database.GenerateAdvisoryLockId(p.config.DatabaseName, p.config.migrationsSchemaName, p.config.migrationsTableName)
 | |
| 		if err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 
 | |
| 		query := `SELECT pg_advisory_unlock($1)`
 | |
| 		if _, err := p.conn.ExecContext(context.Background(), query, aid); err != nil {
 | |
| 			return &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 		}
 | |
| 		return nil
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (p *Postgres) Run(migration io.Reader) error {
 | |
| 	if p.config.MultiStatementEnabled {
 | |
| 		var err error
 | |
| 		if e := multistmt.Parse(migration, multiStmtDelimiter, p.config.MultiStatementMaxSize, func(m []byte) bool {
 | |
| 			if err = p.runStatement(m); err != nil {
 | |
| 				return false
 | |
| 			}
 | |
| 			return true
 | |
| 		}); e != nil {
 | |
| 			return e
 | |
| 		}
 | |
| 		return err
 | |
| 	}
 | |
| 	migr, err := io.ReadAll(migration)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	return p.runStatement(migr)
 | |
| }
 | |
| 
 | |
| func (p *Postgres) runStatement(statement []byte) error {
 | |
| 	ctx := context.Background()
 | |
| 	if p.config.StatementTimeout != 0 {
 | |
| 		var cancel context.CancelFunc
 | |
| 		ctx, cancel = context.WithTimeout(ctx, p.config.StatementTimeout)
 | |
| 		defer cancel()
 | |
| 	}
 | |
| 	query := string(statement)
 | |
| 	if strings.TrimSpace(query) == "" {
 | |
| 		return nil
 | |
| 	}
 | |
| 	if _, err := p.conn.ExecContext(ctx, query); err != nil {
 | |
| 		if pgErr, ok := err.(*pq.Error); ok {
 | |
| 			var line uint
 | |
| 			var col uint
 | |
| 			var lineColOK bool
 | |
| 			if pgErr.Position != "" {
 | |
| 				if pos, err := strconv.ParseUint(pgErr.Position, 10, 64); err == nil {
 | |
| 					line, col, lineColOK = computeLineFromPos(query, int(pos))
 | |
| 				}
 | |
| 			}
 | |
| 			message := fmt.Sprintf("migration failed: %s", pgErr.Message)
 | |
| 			if lineColOK {
 | |
| 				message = fmt.Sprintf("%s (column %d)", message, col)
 | |
| 			}
 | |
| 			if pgErr.Detail != "" {
 | |
| 				message = fmt.Sprintf("%s, %s", message, pgErr.Detail)
 | |
| 			}
 | |
| 			return database.Error{OrigErr: err, Err: message, Query: statement, Line: line}
 | |
| 		}
 | |
| 		return database.Error{OrigErr: err, Err: "migration failed", Query: statement}
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func computeLineFromPos(s string, pos int) (line uint, col uint, ok bool) {
 | |
| 	// replace crlf with lf
 | |
| 	s = strings.Replace(s, "\r\n", "\n", -1)
 | |
| 	// pg docs: pos uses index 1 for the first character, and positions are measured in characters not bytes
 | |
| 	runes := []rune(s)
 | |
| 	if pos > len(runes) {
 | |
| 		return 0, 0, false
 | |
| 	}
 | |
| 	sel := runes[:pos]
 | |
| 	line = uint(runesCount(sel, newLine) + 1)
 | |
| 	col = uint(pos - 1 - runesLastIndex(sel, newLine))
 | |
| 	return line, col, true
 | |
| }
 | |
| 
 | |
| const newLine = '\n'
 | |
| 
 | |
| func runesCount(input []rune, target rune) int {
 | |
| 	var count int
 | |
| 	for _, r := range input {
 | |
| 		if r == target {
 | |
| 			count++
 | |
| 		}
 | |
| 	}
 | |
| 	return count
 | |
| }
 | |
| 
 | |
| func runesLastIndex(input []rune, target rune) int {
 | |
| 	for i := len(input) - 1; i >= 0; i-- {
 | |
| 		if input[i] == target {
 | |
| 			return i
 | |
| 		}
 | |
| 	}
 | |
| 	return -1
 | |
| }
 | |
| 
 | |
| func (p *Postgres) SetVersion(version int, dirty bool) error {
 | |
| 	tx, err := p.conn.BeginTx(context.Background(), &sql.TxOptions{})
 | |
| 	if err != nil {
 | |
| 		return &database.Error{OrigErr: err, Err: "transaction start failed"}
 | |
| 	}
 | |
| 
 | |
| 	query := `TRUNCATE ` + pq.QuoteIdentifier(p.config.migrationsSchemaName) + `.` + pq.QuoteIdentifier(p.config.migrationsTableName)
 | |
| 	if _, err := tx.Exec(query); err != nil {
 | |
| 		if errRollback := tx.Rollback(); errRollback != nil {
 | |
| 			err = multierror.Append(err, errRollback)
 | |
| 		}
 | |
| 		return &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 	}
 | |
| 
 | |
| 	// Also re-write the schema version for nil dirty versions to prevent
 | |
| 	// empty schema version for failed down migration on the first migration
 | |
| 	// See: https://github.com/golang-migrate/migrate/issues/330
 | |
| 	if version >= 0 || (version == database.NilVersion && dirty) {
 | |
| 		query = `INSERT INTO ` + pq.QuoteIdentifier(p.config.migrationsSchemaName) + `.` + pq.QuoteIdentifier(p.config.migrationsTableName) + ` (version, dirty) VALUES ($1, $2)`
 | |
| 		if _, err := tx.Exec(query, version, dirty); err != nil {
 | |
| 			if errRollback := tx.Rollback(); errRollback != nil {
 | |
| 				err = multierror.Append(err, errRollback)
 | |
| 			}
 | |
| 			return &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if err := tx.Commit(); err != nil {
 | |
| 		return &database.Error{OrigErr: err, Err: "transaction commit failed"}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (p *Postgres) Version() (version int, dirty bool, err error) {
 | |
| 	query := `SELECT version, dirty FROM ` + pq.QuoteIdentifier(p.config.migrationsSchemaName) + `.` + pq.QuoteIdentifier(p.config.migrationsTableName) + ` LIMIT 1`
 | |
| 	err = p.conn.QueryRowContext(context.Background(), query).Scan(&version, &dirty)
 | |
| 	switch {
 | |
| 	case err == sql.ErrNoRows:
 | |
| 		return database.NilVersion, false, nil
 | |
| 
 | |
| 	case err != nil:
 | |
| 		if e, ok := err.(*pq.Error); ok {
 | |
| 			if e.Code.Name() == "undefined_table" {
 | |
| 				return database.NilVersion, false, nil
 | |
| 			}
 | |
| 		}
 | |
| 		return 0, false, &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 
 | |
| 	default:
 | |
| 		return version, dirty, nil
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *Postgres) Drop() (err error) {
 | |
| 	// select all tables in current schema
 | |
| 	query := `SELECT table_name FROM information_schema.tables WHERE table_schema=(SELECT current_schema()) AND table_type='BASE TABLE'`
 | |
| 	tables, err := p.conn.QueryContext(context.Background(), query)
 | |
| 	if err != nil {
 | |
| 		return &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 	}
 | |
| 	defer func() {
 | |
| 		if errClose := tables.Close(); errClose != nil {
 | |
| 			err = multierror.Append(err, errClose)
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// delete one table after another
 | |
| 	tableNames := make([]string, 0)
 | |
| 	for tables.Next() {
 | |
| 		var tableName string
 | |
| 		if err := tables.Scan(&tableName); err != nil {
 | |
| 			return err
 | |
| 		}
 | |
| 		if len(tableName) > 0 {
 | |
| 			tableNames = append(tableNames, tableName)
 | |
| 		}
 | |
| 	}
 | |
| 	if err := tables.Err(); err != nil {
 | |
| 		return &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 	}
 | |
| 
 | |
| 	if len(tableNames) > 0 {
 | |
| 		// delete one by one ...
 | |
| 		for _, t := range tableNames {
 | |
| 			query = `DROP TABLE IF EXISTS ` + pq.QuoteIdentifier(t) + ` CASCADE`
 | |
| 			if _, err := p.conn.ExecContext(context.Background(), query); err != nil {
 | |
| 				return &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 			}
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| // ensureVersionTable checks if versions table exists and, if not, creates it.
 | |
| // Note that this function locks the database, which deviates from the usual
 | |
| // convention of "caller locks" in the Postgres type.
 | |
| func (p *Postgres) ensureVersionTable() (err error) {
 | |
| 	if err = p.Lock(); err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 
 | |
| 	defer func() {
 | |
| 		if e := p.Unlock(); e != nil {
 | |
| 			if err == nil {
 | |
| 				err = e
 | |
| 			} else {
 | |
| 				err = multierror.Append(err, e)
 | |
| 			}
 | |
| 		}
 | |
| 	}()
 | |
| 
 | |
| 	// This block checks whether the `MigrationsTable` already exists. This is useful because it allows read only postgres
 | |
| 	// users to also check the current version of the schema. Previously, even if `MigrationsTable` existed, the
 | |
| 	// `CREATE TABLE IF NOT EXISTS...` query would fail because the user does not have the CREATE permission.
 | |
| 	// Taken from https://github.com/mattes/migrate/blob/master/database/postgres/postgres.go#L258
 | |
| 	query := `SELECT COUNT(1) FROM information_schema.tables WHERE table_schema = $1 AND table_name = $2 LIMIT 1`
 | |
| 	row := p.conn.QueryRowContext(context.Background(), query, p.config.migrationsSchemaName, p.config.migrationsTableName)
 | |
| 
 | |
| 	var count int
 | |
| 	err = row.Scan(&count)
 | |
| 	if err != nil {
 | |
| 		return &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 	}
 | |
| 
 | |
| 	if count == 1 {
 | |
| 		return nil
 | |
| 	}
 | |
| 
 | |
| 	query = `CREATE TABLE IF NOT EXISTS ` + pq.QuoteIdentifier(p.config.migrationsSchemaName) + `.` + pq.QuoteIdentifier(p.config.migrationsTableName) + ` (version bigint not null primary key, dirty boolean not null)`
 | |
| 	if _, err = p.conn.ExecContext(context.Background(), query); err != nil {
 | |
| 		return &database.Error{OrigErr: err, Query: []byte(query)}
 | |
| 	}
 | |
| 
 | |
| 	return nil
 | |
| }
 |