 131868bdca
			
		
	
	131868bdca
	
	
	
		
			
			Major security, observability, and configuration improvements:
## Security Hardening
- Implemented configurable CORS (no more wildcards)
- Added comprehensive auth middleware for admin endpoints
- Enhanced webhook HMAC validation
- Added input validation and rate limiting
- Security headers and CSP policies
## Configuration Management
- Made N8N webhook URL configurable (WHOOSH_N8N_BASE_URL)
- Replaced all hardcoded endpoints with environment variables
- Added feature flags for LLM vs heuristic composition
- Gitea fetch hardening with EAGER_FILTER and FULL_RESCAN options
## API Completeness
- Implemented GetCouncilComposition function
- Added GET /api/v1/councils/{id} endpoint
- Council artifacts API (POST/GET /api/v1/councils/{id}/artifacts)
- /admin/health/details endpoint with component status
- Database lookup for repository URLs (no hardcoded fallbacks)
## Observability & Performance
- Added OpenTelemetry distributed tracing with goal/pulse correlation
- Performance optimization database indexes
- Comprehensive health monitoring
- Enhanced logging and error handling
## Infrastructure
- Production-ready P2P discovery (replaces mock implementation)
- Removed unused Redis configuration
- Enhanced Docker Swarm integration
- Added migration files for performance indexes
## Code Quality
- Comprehensive input validation
- Graceful error handling and failsafe fallbacks
- Backwards compatibility maintained
- Following security best practices
🤖 Generated with [Claude Code](https://claude.ai/code)
Co-Authored-By: Claude <noreply@anthropic.com>
		
	
		
			
				
	
	
		
			696 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
			
		
		
	
	
			696 lines
		
	
	
		
			22 KiB
		
	
	
	
		
			Go
		
	
	
	
	
	
| package pgxpool
 | |
| 
 | |
| import (
 | |
| 	"context"
 | |
| 	"fmt"
 | |
| 	"math/rand"
 | |
| 	"runtime"
 | |
| 	"strconv"
 | |
| 	"sync"
 | |
| 	"sync/atomic"
 | |
| 	"time"
 | |
| 
 | |
| 	"github.com/jackc/pgx/v5"
 | |
| 	"github.com/jackc/pgx/v5/pgconn"
 | |
| 	"github.com/jackc/puddle/v2"
 | |
| )
 | |
| 
 | |
| var defaultMaxConns = int32(4)
 | |
| var defaultMinConns = int32(0)
 | |
| var defaultMaxConnLifetime = time.Hour
 | |
| var defaultMaxConnIdleTime = time.Minute * 30
 | |
| var defaultHealthCheckPeriod = time.Minute
 | |
| 
 | |
| type connResource struct {
 | |
| 	conn       *pgx.Conn
 | |
| 	conns      []Conn
 | |
| 	poolRows   []poolRow
 | |
| 	poolRowss  []poolRows
 | |
| 	maxAgeTime time.Time
 | |
| }
 | |
| 
 | |
| func (cr *connResource) getConn(p *Pool, res *puddle.Resource[*connResource]) *Conn {
 | |
| 	if len(cr.conns) == 0 {
 | |
| 		cr.conns = make([]Conn, 128)
 | |
| 	}
 | |
| 
 | |
| 	c := &cr.conns[len(cr.conns)-1]
 | |
| 	cr.conns = cr.conns[0 : len(cr.conns)-1]
 | |
| 
 | |
| 	c.res = res
 | |
| 	c.p = p
 | |
| 
 | |
| 	return c
 | |
| }
 | |
| 
 | |
| func (cr *connResource) getPoolRow(c *Conn, r pgx.Row) *poolRow {
 | |
| 	if len(cr.poolRows) == 0 {
 | |
| 		cr.poolRows = make([]poolRow, 128)
 | |
| 	}
 | |
| 
 | |
| 	pr := &cr.poolRows[len(cr.poolRows)-1]
 | |
| 	cr.poolRows = cr.poolRows[0 : len(cr.poolRows)-1]
 | |
| 
 | |
| 	pr.c = c
 | |
| 	pr.r = r
 | |
| 
 | |
| 	return pr
 | |
| }
 | |
| 
 | |
| func (cr *connResource) getPoolRows(c *Conn, r pgx.Rows) *poolRows {
 | |
| 	if len(cr.poolRowss) == 0 {
 | |
| 		cr.poolRowss = make([]poolRows, 128)
 | |
| 	}
 | |
| 
 | |
| 	pr := &cr.poolRowss[len(cr.poolRowss)-1]
 | |
| 	cr.poolRowss = cr.poolRowss[0 : len(cr.poolRowss)-1]
 | |
| 
 | |
| 	pr.c = c
 | |
| 	pr.r = r
 | |
| 
 | |
| 	return pr
 | |
| }
 | |
| 
 | |
| // Pool allows for connection reuse.
 | |
| type Pool struct {
 | |
| 	// 64 bit fields accessed with atomics must be at beginning of struct to guarantee alignment for certain 32-bit
 | |
| 	// architectures. See BUGS section of https://pkg.go.dev/sync/atomic and https://github.com/jackc/pgx/issues/1288.
 | |
| 	newConnsCount        int64
 | |
| 	lifetimeDestroyCount int64
 | |
| 	idleDestroyCount     int64
 | |
| 
 | |
| 	p                     *puddle.Pool[*connResource]
 | |
| 	config                *Config
 | |
| 	beforeConnect         func(context.Context, *pgx.ConnConfig) error
 | |
| 	afterConnect          func(context.Context, *pgx.Conn) error
 | |
| 	beforeAcquire         func(context.Context, *pgx.Conn) bool
 | |
| 	afterRelease          func(*pgx.Conn) bool
 | |
| 	beforeClose           func(*pgx.Conn)
 | |
| 	minConns              int32
 | |
| 	maxConns              int32
 | |
| 	maxConnLifetime       time.Duration
 | |
| 	maxConnLifetimeJitter time.Duration
 | |
| 	maxConnIdleTime       time.Duration
 | |
| 	healthCheckPeriod     time.Duration
 | |
| 
 | |
| 	healthCheckChan chan struct{}
 | |
| 
 | |
| 	closeOnce sync.Once
 | |
| 	closeChan chan struct{}
 | |
| }
 | |
| 
 | |
| // Config is the configuration struct for creating a pool. It must be created by [ParseConfig] and then it can be
 | |
| // modified.
 | |
| type Config struct {
 | |
| 	ConnConfig *pgx.ConnConfig
 | |
| 
 | |
| 	// BeforeConnect is called before a new connection is made. It is passed a copy of the underlying pgx.ConnConfig and
 | |
| 	// will not impact any existing open connections.
 | |
| 	BeforeConnect func(context.Context, *pgx.ConnConfig) error
 | |
| 
 | |
| 	// AfterConnect is called after a connection is established, but before it is added to the pool.
 | |
| 	AfterConnect func(context.Context, *pgx.Conn) error
 | |
| 
 | |
| 	// BeforeAcquire is called before a connection is acquired from the pool. It must return true to allow the
 | |
| 	// acquisition or false to indicate that the connection should be destroyed and a different connection should be
 | |
| 	// acquired.
 | |
| 	BeforeAcquire func(context.Context, *pgx.Conn) bool
 | |
| 
 | |
| 	// AfterRelease is called after a connection is released, but before it is returned to the pool. It must return true to
 | |
| 	// return the connection to the pool or false to destroy the connection.
 | |
| 	AfterRelease func(*pgx.Conn) bool
 | |
| 
 | |
| 	// BeforeClose is called right before a connection is closed and removed from the pool.
 | |
| 	BeforeClose func(*pgx.Conn)
 | |
| 
 | |
| 	// MaxConnLifetime is the duration since creation after which a connection will be automatically closed.
 | |
| 	MaxConnLifetime time.Duration
 | |
| 
 | |
| 	// MaxConnLifetimeJitter is the duration after MaxConnLifetime to randomly decide to close a connection.
 | |
| 	// This helps prevent all connections from being closed at the exact same time, starving the pool.
 | |
| 	MaxConnLifetimeJitter time.Duration
 | |
| 
 | |
| 	// MaxConnIdleTime is the duration after which an idle connection will be automatically closed by the health check.
 | |
| 	MaxConnIdleTime time.Duration
 | |
| 
 | |
| 	// MaxConns is the maximum size of the pool. The default is the greater of 4 or runtime.NumCPU().
 | |
| 	MaxConns int32
 | |
| 
 | |
| 	// MinConns is the minimum size of the pool. After connection closes, the pool might dip below MinConns. A low
 | |
| 	// number of MinConns might mean the pool is empty after MaxConnLifetime until the health check has a chance
 | |
| 	// to create new connections.
 | |
| 	MinConns int32
 | |
| 
 | |
| 	// HealthCheckPeriod is the duration between checks of the health of idle connections.
 | |
| 	HealthCheckPeriod time.Duration
 | |
| 
 | |
| 	createdByParseConfig bool // Used to enforce created by ParseConfig rule.
 | |
| }
 | |
| 
 | |
| // Copy returns a deep copy of the config that is safe to use and modify.
 | |
| // The only exception is the tls.Config:
 | |
| // according to the tls.Config docs it must not be modified after creation.
 | |
| func (c *Config) Copy() *Config {
 | |
| 	newConfig := new(Config)
 | |
| 	*newConfig = *c
 | |
| 	newConfig.ConnConfig = c.ConnConfig.Copy()
 | |
| 	return newConfig
 | |
| }
 | |
| 
 | |
| // ConnString returns the connection string as parsed by pgxpool.ParseConfig into pgxpool.Config.
 | |
| func (c *Config) ConnString() string { return c.ConnConfig.ConnString() }
 | |
| 
 | |
| // New creates a new Pool. See [ParseConfig] for information on connString format.
 | |
| func New(ctx context.Context, connString string) (*Pool, error) {
 | |
| 	config, err := ParseConfig(connString)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return NewWithConfig(ctx, config)
 | |
| }
 | |
| 
 | |
| // NewWithConfig creates a new Pool. config must have been created by [ParseConfig].
 | |
| func NewWithConfig(ctx context.Context, config *Config) (*Pool, error) {
 | |
| 	// Default values are set in ParseConfig. Enforce initial creation by ParseConfig rather than setting defaults from
 | |
| 	// zero values.
 | |
| 	if !config.createdByParseConfig {
 | |
| 		panic("config must be created by ParseConfig")
 | |
| 	}
 | |
| 
 | |
| 	p := &Pool{
 | |
| 		config:                config,
 | |
| 		beforeConnect:         config.BeforeConnect,
 | |
| 		afterConnect:          config.AfterConnect,
 | |
| 		beforeAcquire:         config.BeforeAcquire,
 | |
| 		afterRelease:          config.AfterRelease,
 | |
| 		beforeClose:           config.BeforeClose,
 | |
| 		minConns:              config.MinConns,
 | |
| 		maxConns:              config.MaxConns,
 | |
| 		maxConnLifetime:       config.MaxConnLifetime,
 | |
| 		maxConnLifetimeJitter: config.MaxConnLifetimeJitter,
 | |
| 		maxConnIdleTime:       config.MaxConnIdleTime,
 | |
| 		healthCheckPeriod:     config.HealthCheckPeriod,
 | |
| 		healthCheckChan:       make(chan struct{}, 1),
 | |
| 		closeChan:             make(chan struct{}),
 | |
| 	}
 | |
| 
 | |
| 	var err error
 | |
| 	p.p, err = puddle.NewPool(
 | |
| 		&puddle.Config[*connResource]{
 | |
| 			Constructor: func(ctx context.Context) (*connResource, error) {
 | |
| 				atomic.AddInt64(&p.newConnsCount, 1)
 | |
| 				connConfig := p.config.ConnConfig.Copy()
 | |
| 
 | |
| 				// Connection will continue in background even if Acquire is canceled. Ensure that a connect won't hang forever.
 | |
| 				if connConfig.ConnectTimeout <= 0 {
 | |
| 					connConfig.ConnectTimeout = 2 * time.Minute
 | |
| 				}
 | |
| 
 | |
| 				if p.beforeConnect != nil {
 | |
| 					if err := p.beforeConnect(ctx, connConfig); err != nil {
 | |
| 						return nil, err
 | |
| 					}
 | |
| 				}
 | |
| 
 | |
| 				conn, err := pgx.ConnectConfig(ctx, connConfig)
 | |
| 				if err != nil {
 | |
| 					return nil, err
 | |
| 				}
 | |
| 
 | |
| 				if p.afterConnect != nil {
 | |
| 					err = p.afterConnect(ctx, conn)
 | |
| 					if err != nil {
 | |
| 						conn.Close(ctx)
 | |
| 						return nil, err
 | |
| 					}
 | |
| 				}
 | |
| 
 | |
| 				jitterSecs := rand.Float64() * config.MaxConnLifetimeJitter.Seconds()
 | |
| 				maxAgeTime := time.Now().Add(config.MaxConnLifetime).Add(time.Duration(jitterSecs) * time.Second)
 | |
| 
 | |
| 				cr := &connResource{
 | |
| 					conn:       conn,
 | |
| 					conns:      make([]Conn, 64),
 | |
| 					poolRows:   make([]poolRow, 64),
 | |
| 					poolRowss:  make([]poolRows, 64),
 | |
| 					maxAgeTime: maxAgeTime,
 | |
| 				}
 | |
| 
 | |
| 				return cr, nil
 | |
| 			},
 | |
| 			Destructor: func(value *connResource) {
 | |
| 				ctx, cancel := context.WithTimeout(context.Background(), 15*time.Second)
 | |
| 				conn := value.conn
 | |
| 				if p.beforeClose != nil {
 | |
| 					p.beforeClose(conn)
 | |
| 				}
 | |
| 				conn.Close(ctx)
 | |
| 				select {
 | |
| 				case <-conn.PgConn().CleanupDone():
 | |
| 				case <-ctx.Done():
 | |
| 				}
 | |
| 				cancel()
 | |
| 			},
 | |
| 			MaxSize: config.MaxConns,
 | |
| 		},
 | |
| 	)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	go func() {
 | |
| 		p.createIdleResources(ctx, int(p.minConns))
 | |
| 		p.backgroundHealthCheck()
 | |
| 	}()
 | |
| 
 | |
| 	return p, nil
 | |
| }
 | |
| 
 | |
| // ParseConfig builds a Config from connString. It parses connString with the same behavior as [pgx.ParseConfig] with the
 | |
| // addition of the following variables:
 | |
| //
 | |
| //   - pool_max_conns: integer greater than 0
 | |
| //   - pool_min_conns: integer 0 or greater
 | |
| //   - pool_max_conn_lifetime: duration string
 | |
| //   - pool_max_conn_idle_time: duration string
 | |
| //   - pool_health_check_period: duration string
 | |
| //   - pool_max_conn_lifetime_jitter: duration string
 | |
| //
 | |
| // See Config for definitions of these arguments.
 | |
| //
 | |
| //	# Example DSN
 | |
| //	user=jack password=secret host=pg.example.com port=5432 dbname=mydb sslmode=verify-ca pool_max_conns=10
 | |
| //
 | |
| //	# Example URL
 | |
| //	postgres://jack:secret@pg.example.com:5432/mydb?sslmode=verify-ca&pool_max_conns=10
 | |
| func ParseConfig(connString string) (*Config, error) {
 | |
| 	connConfig, err := pgx.ParseConfig(connString)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	config := &Config{
 | |
| 		ConnConfig:           connConfig,
 | |
| 		createdByParseConfig: true,
 | |
| 	}
 | |
| 
 | |
| 	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conns"]; ok {
 | |
| 		delete(connConfig.Config.RuntimeParams, "pool_max_conns")
 | |
| 		n, err := strconv.ParseInt(s, 10, 32)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("cannot parse pool_max_conns: %w", err)
 | |
| 		}
 | |
| 		if n < 1 {
 | |
| 			return nil, fmt.Errorf("pool_max_conns too small: %d", n)
 | |
| 		}
 | |
| 		config.MaxConns = int32(n)
 | |
| 	} else {
 | |
| 		config.MaxConns = defaultMaxConns
 | |
| 		if numCPU := int32(runtime.NumCPU()); numCPU > config.MaxConns {
 | |
| 			config.MaxConns = numCPU
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_min_conns"]; ok {
 | |
| 		delete(connConfig.Config.RuntimeParams, "pool_min_conns")
 | |
| 		n, err := strconv.ParseInt(s, 10, 32)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("cannot parse pool_min_conns: %w", err)
 | |
| 		}
 | |
| 		config.MinConns = int32(n)
 | |
| 	} else {
 | |
| 		config.MinConns = defaultMinConns
 | |
| 	}
 | |
| 
 | |
| 	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime"]; ok {
 | |
| 		delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime")
 | |
| 		d, err := time.ParseDuration(s)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("invalid pool_max_conn_lifetime: %w", err)
 | |
| 		}
 | |
| 		config.MaxConnLifetime = d
 | |
| 	} else {
 | |
| 		config.MaxConnLifetime = defaultMaxConnLifetime
 | |
| 	}
 | |
| 
 | |
| 	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_idle_time"]; ok {
 | |
| 		delete(connConfig.Config.RuntimeParams, "pool_max_conn_idle_time")
 | |
| 		d, err := time.ParseDuration(s)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("invalid pool_max_conn_idle_time: %w", err)
 | |
| 		}
 | |
| 		config.MaxConnIdleTime = d
 | |
| 	} else {
 | |
| 		config.MaxConnIdleTime = defaultMaxConnIdleTime
 | |
| 	}
 | |
| 
 | |
| 	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_health_check_period"]; ok {
 | |
| 		delete(connConfig.Config.RuntimeParams, "pool_health_check_period")
 | |
| 		d, err := time.ParseDuration(s)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("invalid pool_health_check_period: %w", err)
 | |
| 		}
 | |
| 		config.HealthCheckPeriod = d
 | |
| 	} else {
 | |
| 		config.HealthCheckPeriod = defaultHealthCheckPeriod
 | |
| 	}
 | |
| 
 | |
| 	if s, ok := config.ConnConfig.Config.RuntimeParams["pool_max_conn_lifetime_jitter"]; ok {
 | |
| 		delete(connConfig.Config.RuntimeParams, "pool_max_conn_lifetime_jitter")
 | |
| 		d, err := time.ParseDuration(s)
 | |
| 		if err != nil {
 | |
| 			return nil, fmt.Errorf("invalid pool_max_conn_lifetime_jitter: %w", err)
 | |
| 		}
 | |
| 		config.MaxConnLifetimeJitter = d
 | |
| 	}
 | |
| 
 | |
| 	return config, nil
 | |
| }
 | |
| 
 | |
| // Close closes all connections in the pool and rejects future Acquire calls. Blocks until all connections are returned
 | |
| // to pool and closed.
 | |
| func (p *Pool) Close() {
 | |
| 	p.closeOnce.Do(func() {
 | |
| 		close(p.closeChan)
 | |
| 		p.p.Close()
 | |
| 	})
 | |
| }
 | |
| 
 | |
| func (p *Pool) isExpired(res *puddle.Resource[*connResource]) bool {
 | |
| 	return time.Now().After(res.Value().maxAgeTime)
 | |
| }
 | |
| 
 | |
| func (p *Pool) triggerHealthCheck() {
 | |
| 	go func() {
 | |
| 		// Destroy is asynchronous so we give it time to actually remove itself from
 | |
| 		// the pool otherwise we might try to check the pool size too soon
 | |
| 		time.Sleep(500 * time.Millisecond)
 | |
| 		select {
 | |
| 		case p.healthCheckChan <- struct{}{}:
 | |
| 		default:
 | |
| 		}
 | |
| 	}()
 | |
| }
 | |
| 
 | |
| func (p *Pool) backgroundHealthCheck() {
 | |
| 	ticker := time.NewTicker(p.healthCheckPeriod)
 | |
| 	defer ticker.Stop()
 | |
| 	for {
 | |
| 		select {
 | |
| 		case <-p.closeChan:
 | |
| 			return
 | |
| 		case <-p.healthCheckChan:
 | |
| 			p.checkHealth()
 | |
| 		case <-ticker.C:
 | |
| 			p.checkHealth()
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| func (p *Pool) checkHealth() {
 | |
| 	for {
 | |
| 		// If checkMinConns failed we don't destroy any connections since we couldn't
 | |
| 		// even get to minConns
 | |
| 		if err := p.checkMinConns(); err != nil {
 | |
| 			// Should we log this error somewhere?
 | |
| 			break
 | |
| 		}
 | |
| 		if !p.checkConnsHealth() {
 | |
| 			// Since we didn't destroy any connections we can stop looping
 | |
| 			break
 | |
| 		}
 | |
| 		// Technically Destroy is asynchronous but 500ms should be enough for it to
 | |
| 		// remove it from the underlying pool
 | |
| 		select {
 | |
| 		case <-p.closeChan:
 | |
| 			return
 | |
| 		case <-time.After(500 * time.Millisecond):
 | |
| 		}
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // checkConnsHealth will check all idle connections, destroy a connection if
 | |
| // it's idle or too old, and returns true if any were destroyed
 | |
| func (p *Pool) checkConnsHealth() bool {
 | |
| 	var destroyed bool
 | |
| 	totalConns := p.Stat().TotalConns()
 | |
| 	resources := p.p.AcquireAllIdle()
 | |
| 	for _, res := range resources {
 | |
| 		// We're okay going under minConns if the lifetime is up
 | |
| 		if p.isExpired(res) && totalConns >= p.minConns {
 | |
| 			atomic.AddInt64(&p.lifetimeDestroyCount, 1)
 | |
| 			res.Destroy()
 | |
| 			destroyed = true
 | |
| 			// Since Destroy is async we manually decrement totalConns.
 | |
| 			totalConns--
 | |
| 		} else if res.IdleDuration() > p.maxConnIdleTime && totalConns > p.minConns {
 | |
| 			atomic.AddInt64(&p.idleDestroyCount, 1)
 | |
| 			res.Destroy()
 | |
| 			destroyed = true
 | |
| 			// Since Destroy is async we manually decrement totalConns.
 | |
| 			totalConns--
 | |
| 		} else {
 | |
| 			res.ReleaseUnused()
 | |
| 		}
 | |
| 	}
 | |
| 	return destroyed
 | |
| }
 | |
| 
 | |
| func (p *Pool) checkMinConns() error {
 | |
| 	// TotalConns can include ones that are being destroyed but we should have
 | |
| 	// sleep(500ms) around all of the destroys to help prevent that from throwing
 | |
| 	// off this check
 | |
| 	toCreate := p.minConns - p.Stat().TotalConns()
 | |
| 	if toCreate > 0 {
 | |
| 		return p.createIdleResources(context.Background(), int(toCreate))
 | |
| 	}
 | |
| 	return nil
 | |
| }
 | |
| 
 | |
| func (p *Pool) createIdleResources(parentCtx context.Context, targetResources int) error {
 | |
| 	ctx, cancel := context.WithCancel(parentCtx)
 | |
| 	defer cancel()
 | |
| 
 | |
| 	errs := make(chan error, targetResources)
 | |
| 
 | |
| 	for i := 0; i < targetResources; i++ {
 | |
| 		go func() {
 | |
| 			err := p.p.CreateResource(ctx)
 | |
| 			// Ignore ErrNotAvailable since it means that the pool has become full since we started creating resource.
 | |
| 			if err == puddle.ErrNotAvailable {
 | |
| 				err = nil
 | |
| 			}
 | |
| 			errs <- err
 | |
| 		}()
 | |
| 	}
 | |
| 
 | |
| 	var firstError error
 | |
| 	for i := 0; i < targetResources; i++ {
 | |
| 		err := <-errs
 | |
| 		if err != nil && firstError == nil {
 | |
| 			cancel()
 | |
| 			firstError = err
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return firstError
 | |
| }
 | |
| 
 | |
| // Acquire returns a connection (*Conn) from the Pool
 | |
| func (p *Pool) Acquire(ctx context.Context) (*Conn, error) {
 | |
| 	for {
 | |
| 		res, err := p.p.Acquire(ctx)
 | |
| 		if err != nil {
 | |
| 			return nil, err
 | |
| 		}
 | |
| 
 | |
| 		cr := res.Value()
 | |
| 
 | |
| 		if res.IdleDuration() > time.Second {
 | |
| 			err := cr.conn.Ping(ctx)
 | |
| 			if err != nil {
 | |
| 				res.Destroy()
 | |
| 				continue
 | |
| 			}
 | |
| 		}
 | |
| 
 | |
| 		if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
 | |
| 			return cr.getConn(p, res), nil
 | |
| 		}
 | |
| 
 | |
| 		res.Destroy()
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // AcquireFunc acquires a *Conn and calls f with that *Conn. ctx will only affect the Acquire. It has no effect on the
 | |
| // call of f. The return value is either an error acquiring the *Conn or the return value of f. The *Conn is
 | |
| // automatically released after the call of f.
 | |
| func (p *Pool) AcquireFunc(ctx context.Context, f func(*Conn) error) error {
 | |
| 	conn, err := p.Acquire(ctx)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer conn.Release()
 | |
| 
 | |
| 	return f(conn)
 | |
| }
 | |
| 
 | |
| // AcquireAllIdle atomically acquires all currently idle connections. Its intended use is for health check and
 | |
| // keep-alive functionality. It does not update pool statistics.
 | |
| func (p *Pool) AcquireAllIdle(ctx context.Context) []*Conn {
 | |
| 	resources := p.p.AcquireAllIdle()
 | |
| 	conns := make([]*Conn, 0, len(resources))
 | |
| 	for _, res := range resources {
 | |
| 		cr := res.Value()
 | |
| 		if p.beforeAcquire == nil || p.beforeAcquire(ctx, cr.conn) {
 | |
| 			conns = append(conns, cr.getConn(p, res))
 | |
| 		} else {
 | |
| 			res.Destroy()
 | |
| 		}
 | |
| 	}
 | |
| 
 | |
| 	return conns
 | |
| }
 | |
| 
 | |
| // Reset closes all connections, but leaves the pool open. It is intended for use when an error is detected that would
 | |
| // disrupt all connections (such as a network interruption or a server state change).
 | |
| //
 | |
| // It is safe to reset a pool while connections are checked out. Those connections will be closed when they are returned
 | |
| // to the pool.
 | |
| func (p *Pool) Reset() {
 | |
| 	p.p.Reset()
 | |
| }
 | |
| 
 | |
| // Config returns a copy of config that was used to initialize this pool.
 | |
| func (p *Pool) Config() *Config { return p.config.Copy() }
 | |
| 
 | |
| // Stat returns a pgxpool.Stat struct with a snapshot of Pool statistics.
 | |
| func (p *Pool) Stat() *Stat {
 | |
| 	return &Stat{
 | |
| 		s:                    p.p.Stat(),
 | |
| 		newConnsCount:        atomic.LoadInt64(&p.newConnsCount),
 | |
| 		lifetimeDestroyCount: atomic.LoadInt64(&p.lifetimeDestroyCount),
 | |
| 		idleDestroyCount:     atomic.LoadInt64(&p.idleDestroyCount),
 | |
| 	}
 | |
| }
 | |
| 
 | |
| // Exec acquires a connection from the Pool and executes the given SQL.
 | |
| // SQL can be either a prepared statement name or an SQL string.
 | |
| // Arguments should be referenced positionally from the SQL string as $1, $2, etc.
 | |
| // The acquired connection is returned to the pool when the Exec function returns.
 | |
| func (p *Pool) Exec(ctx context.Context, sql string, arguments ...any) (pgconn.CommandTag, error) {
 | |
| 	c, err := p.Acquire(ctx)
 | |
| 	if err != nil {
 | |
| 		return pgconn.CommandTag{}, err
 | |
| 	}
 | |
| 	defer c.Release()
 | |
| 
 | |
| 	return c.Exec(ctx, sql, arguments...)
 | |
| }
 | |
| 
 | |
| // Query acquires a connection and executes a query that returns pgx.Rows.
 | |
| // Arguments should be referenced positionally from the SQL string as $1, $2, etc.
 | |
| // See pgx.Rows documentation to close the returned Rows and return the acquired connection to the Pool.
 | |
| //
 | |
| // If there is an error, the returned pgx.Rows will be returned in an error state.
 | |
| // If preferred, ignore the error returned from Query and handle errors using the returned pgx.Rows.
 | |
| //
 | |
| // For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
 | |
| // QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
 | |
| // needed. See the documentation for those types for details.
 | |
| func (p *Pool) Query(ctx context.Context, sql string, args ...any) (pgx.Rows, error) {
 | |
| 	c, err := p.Acquire(ctx)
 | |
| 	if err != nil {
 | |
| 		return errRows{err: err}, err
 | |
| 	}
 | |
| 
 | |
| 	rows, err := c.Query(ctx, sql, args...)
 | |
| 	if err != nil {
 | |
| 		c.Release()
 | |
| 		return errRows{err: err}, err
 | |
| 	}
 | |
| 
 | |
| 	return c.getPoolRows(rows), nil
 | |
| }
 | |
| 
 | |
| // QueryRow acquires a connection and executes a query that is expected
 | |
| // to return at most one row (pgx.Row). Errors are deferred until pgx.Row's
 | |
| // Scan method is called. If the query selects no rows, pgx.Row's Scan will
 | |
| // return ErrNoRows. Otherwise, pgx.Row's Scan scans the first selected row
 | |
| // and discards the rest. The acquired connection is returned to the Pool when
 | |
| // pgx.Row's Scan method is called.
 | |
| //
 | |
| // Arguments should be referenced positionally from the SQL string as $1, $2, etc.
 | |
| //
 | |
| // For extra control over how the query is executed, the types QuerySimpleProtocol, QueryResultFormats, and
 | |
| // QueryResultFormatsByOID may be used as the first args to control exactly how the query is executed. This is rarely
 | |
| // needed. See the documentation for those types for details.
 | |
| func (p *Pool) QueryRow(ctx context.Context, sql string, args ...any) pgx.Row {
 | |
| 	c, err := p.Acquire(ctx)
 | |
| 	if err != nil {
 | |
| 		return errRow{err: err}
 | |
| 	}
 | |
| 
 | |
| 	row := c.QueryRow(ctx, sql, args...)
 | |
| 	return c.getPoolRow(row)
 | |
| }
 | |
| 
 | |
| func (p *Pool) SendBatch(ctx context.Context, b *pgx.Batch) pgx.BatchResults {
 | |
| 	c, err := p.Acquire(ctx)
 | |
| 	if err != nil {
 | |
| 		return errBatchResults{err: err}
 | |
| 	}
 | |
| 
 | |
| 	br := c.SendBatch(ctx, b)
 | |
| 	return &poolBatchResults{br: br, c: c}
 | |
| }
 | |
| 
 | |
| // Begin acquires a connection from the Pool and starts a transaction. Unlike database/sql, the context only affects the begin command. i.e. there is no
 | |
| // auto-rollback on context cancellation. Begin initiates a transaction block without explicitly setting a transaction mode for the block (see BeginTx with TxOptions if transaction mode is required).
 | |
| // *pgxpool.Tx is returned, which implements the pgx.Tx interface.
 | |
| // Commit or Rollback must be called on the returned transaction to finalize the transaction block.
 | |
| func (p *Pool) Begin(ctx context.Context) (pgx.Tx, error) {
 | |
| 	return p.BeginTx(ctx, pgx.TxOptions{})
 | |
| }
 | |
| 
 | |
| // BeginTx acquires a connection from the Pool and starts a transaction with pgx.TxOptions determining the transaction mode.
 | |
| // Unlike database/sql, the context only affects the begin command. i.e. there is no auto-rollback on context cancellation.
 | |
| // *pgxpool.Tx is returned, which implements the pgx.Tx interface.
 | |
| // Commit or Rollback must be called on the returned transaction to finalize the transaction block.
 | |
| func (p *Pool) BeginTx(ctx context.Context, txOptions pgx.TxOptions) (pgx.Tx, error) {
 | |
| 	c, err := p.Acquire(ctx)
 | |
| 	if err != nil {
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	t, err := c.BeginTx(ctx, txOptions)
 | |
| 	if err != nil {
 | |
| 		c.Release()
 | |
| 		return nil, err
 | |
| 	}
 | |
| 
 | |
| 	return &Tx{t: t, c: c}, nil
 | |
| }
 | |
| 
 | |
| func (p *Pool) CopyFrom(ctx context.Context, tableName pgx.Identifier, columnNames []string, rowSrc pgx.CopyFromSource) (int64, error) {
 | |
| 	c, err := p.Acquire(ctx)
 | |
| 	if err != nil {
 | |
| 		return 0, err
 | |
| 	}
 | |
| 	defer c.Release()
 | |
| 
 | |
| 	return c.Conn().CopyFrom(ctx, tableName, columnNames, rowSrc)
 | |
| }
 | |
| 
 | |
| // Ping acquires a connection from the Pool and executes an empty sql statement against it.
 | |
| // If the sql returns without error, the database Ping is considered successful, otherwise, the error is returned.
 | |
| func (p *Pool) Ping(ctx context.Context) error {
 | |
| 	c, err := p.Acquire(ctx)
 | |
| 	if err != nil {
 | |
| 		return err
 | |
| 	}
 | |
| 	defer c.Release()
 | |
| 	return c.Ping(ctx)
 | |
| }
 |