diff --git a/Dockerfile.simple b/Dockerfile.simple index 1b78467..cf59a20 100644 --- a/Dockerfile.simple +++ b/Dockerfile.simple @@ -15,14 +15,16 @@ RUN addgroup -g 1000 chorus && \ RUN mkdir -p /app/data && \ chown -R chorus:chorus /app -# Copy pre-built binary -COPY chorus-agent /app/chorus-agent +# Copy pre-built binary from build directory (ensure it exists and is the correct one) +COPY build/chorus-agent /app/chorus-agent RUN chmod +x /app/chorus-agent && chown chorus:chorus /app/chorus-agent # Switch to non-root user USER chorus WORKDIR /app +# Note: Using correct chorus-agent binary built with 'make build-agent' + # Expose ports EXPOSE 8080 8081 9000 diff --git a/chorus-agent b/chorus-agent index 5567d6e..b086d89 100755 Binary files a/chorus-agent and b/chorus-agent differ diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index 3badc2d..e9ccd75 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -2,7 +2,7 @@ version: "3.9" services: chorus: - image: anthonyrawlins/chorus:resetdata-secrets-v1.0.6 + image: anthonyrawlins/chorus:discovery-debug # REQUIRED: License configuration (CHORUS will not start without this) environment: @@ -15,7 +15,7 @@ services: - CHORUS_AGENT_ID=${CHORUS_AGENT_ID:-} # Auto-generated if not provided - CHORUS_SPECIALIZATION=${CHORUS_SPECIALIZATION:-general_developer} - CHORUS_MAX_TASKS=${CHORUS_MAX_TASKS:-3} - - CHORUS_CAPABILITIES=${CHORUS_CAPABILITIES:-general_development,task_coordination} + - CHORUS_CAPABILITIES=${CHORUS_CAPABILITIES:-general_development,task_coordination,admin_election} # Network configuration - CHORUS_API_PORT=8080 @@ -71,7 +71,7 @@ services: # Container resource limits deploy: mode: replicated - replicas: ${CHORUS_REPLICAS:-1} + replicas: ${CHORUS_REPLICAS:-9} update_config: parallelism: 1 delay: 10s diff --git a/pkg/config/config.go b/pkg/config/config.go index 3529618..037ea96 100644 --- a/pkg/config/config.go +++ b/pkg/config/config.go @@ -216,7 +216,7 @@ func LoadFromEnvironment() (*Config, error) { AuditLogging: getEnvBoolOrDefault("CHORUS_AUDIT_LOGGING", true), AuditPath: getEnvOrDefault("CHORUS_AUDIT_PATH", "/tmp/chorus-audit.log"), ElectionConfig: ElectionConfig{ - DiscoveryTimeout: getEnvDurationOrDefault("CHORUS_DISCOVERY_TIMEOUT", 10*time.Second), + DiscoveryTimeout: getEnvDurationOrDefault("CHORUS_DISCOVERY_TIMEOUT", 15*time.Second), HeartbeatTimeout: getEnvDurationOrDefault("CHORUS_HEARTBEAT_TIMEOUT", 30*time.Second), ElectionTimeout: getEnvDurationOrDefault("CHORUS_ELECTION_TIMEOUT", 60*time.Second), DiscoveryBackoff: getEnvDurationOrDefault("CHORUS_DISCOVERY_BACKOFF", 5*time.Second), diff --git a/pkg/election/election.go b/pkg/election/election.go index e9fce59..208a5a0 100644 --- a/pkg/election/election.go +++ b/pkg/election/election.go @@ -167,10 +167,18 @@ func (em *ElectionManager) Start() error { } // Start discovery process - go em.startDiscoveryLoop() + log.Printf("🔍 About to start discovery loop goroutine...") + go func() { + log.Printf("🔍 Discovery loop goroutine started successfully") + em.startDiscoveryLoop() + }() // Start election coordinator - go em.electionCoordinator() + log.Printf("🗳️ About to start election coordinator goroutine...") + go func() { + log.Printf("🗳️ Election coordinator goroutine started successfully") + em.electionCoordinator() + }() // Start heartbeat if this node is already admin at startup if em.IsCurrentAdmin() { @@ -214,6 +222,16 @@ func (em *ElectionManager) Stop() { // TriggerElection manually triggers an election func (em *ElectionManager) TriggerElection(trigger ElectionTrigger) { + // Check if election already in progress + em.mu.RLock() + currentState := em.state + em.mu.RUnlock() + + if currentState != StateIdle { + log.Printf("🗳️ Election already in progress (state: %s), ignoring trigger: %s", currentState, trigger) + return + } + select { case em.electionTrigger <- trigger: log.Printf("🗳️ Election triggered: %s", trigger) @@ -262,13 +280,27 @@ func (em *ElectionManager) GetHeartbeatStatus() map[string]interface{} { // startDiscoveryLoop starts the admin discovery loop func (em *ElectionManager) startDiscoveryLoop() { - log.Printf("🔍 Starting admin discovery loop") + defer func() { + if r := recover(); r != nil { + log.Printf("🔍 PANIC in discovery loop: %v", r) + } + log.Printf("🔍 Discovery loop goroutine exiting") + }() + + log.Printf("🔍 ENHANCED-DEBUG: Starting admin discovery loop with timeout: %v", em.config.Security.ElectionConfig.DiscoveryTimeout) + log.Printf("🔍 ENHANCED-DEBUG: Context status: err=%v", em.ctx.Err()) + log.Printf("🔍 ENHANCED-DEBUG: Node ID: %s, Can be admin: %v", em.nodeID, em.canBeAdmin()) for { + log.Printf("🔍 Discovery loop iteration starting, waiting for timeout...") + log.Printf("🔍 Context status before select: err=%v", em.ctx.Err()) + select { case <-em.ctx.Done(): + log.Printf("🔍 Discovery loop cancelled via context: %v", em.ctx.Err()) return case <-time.After(em.config.Security.ElectionConfig.DiscoveryTimeout): + log.Printf("🔍 Discovery timeout triggered! Calling performAdminDiscovery()...") em.performAdminDiscovery() } } @@ -281,8 +313,12 @@ func (em *ElectionManager) performAdminDiscovery() { lastHeartbeat := em.lastHeartbeat em.mu.Unlock() + log.Printf("🔍 Discovery check: state=%s, lastHeartbeat=%v, canAdmin=%v", + currentState, lastHeartbeat, em.canBeAdmin()) + // Only discover if we're idle or the heartbeat is stale if currentState != StateIdle { + log.Printf("🔍 Skipping discovery - not in idle state (current: %s)", currentState) return } @@ -294,13 +330,66 @@ func (em *ElectionManager) performAdminDiscovery() { } // If we haven't heard from an admin recently, try to discover one - if lastHeartbeat.IsZero() || time.Since(lastHeartbeat) > em.config.Security.ElectionConfig.DiscoveryTimeout/2 { + timeSinceHeartbeat := time.Since(lastHeartbeat) + discoveryThreshold := em.config.Security.ElectionConfig.DiscoveryTimeout / 2 + + log.Printf("🔍 Heartbeat check: isZero=%v, timeSince=%v, threshold=%v", + lastHeartbeat.IsZero(), timeSinceHeartbeat, discoveryThreshold) + + if lastHeartbeat.IsZero() || timeSinceHeartbeat > discoveryThreshold { + log.Printf("🔍 Sending discovery request...") em.sendDiscoveryRequest() + + // 🚨 CRITICAL FIX: If we have no admin and can become admin, trigger election after discovery timeout + em.mu.Lock() + currentAdmin := em.currentAdmin + em.mu.Unlock() + + if currentAdmin == "" && em.canBeAdmin() { + log.Printf("🗳️ No admin discovered and we can be admin - scheduling election check") + go func() { + // Add randomization to prevent simultaneous elections from all nodes + baseDelay := em.config.Security.ElectionConfig.DiscoveryTimeout * 2 + randomDelay := time.Duration(rand.Intn(int(em.config.Security.ElectionConfig.DiscoveryTimeout))) + totalDelay := baseDelay + randomDelay + + log.Printf("🗳️ Waiting %v before checking if election needed", totalDelay) + time.Sleep(totalDelay) + + // Check again if still no admin and no one else started election + em.mu.RLock() + stillNoAdmin := em.currentAdmin == "" + stillIdle := em.state == StateIdle + em.mu.RUnlock() + + if stillNoAdmin && stillIdle && em.canBeAdmin() { + log.Printf("🗳️ Election grace period expired with no admin - triggering election") + em.TriggerElection(TriggerDiscoveryFailure) + } else { + log.Printf("🗳️ Election check: admin=%s, state=%s - skipping election", em.currentAdmin, em.state) + } + }() + } + } else { + log.Printf("🔍 Discovery threshold not met - waiting") } } // sendDiscoveryRequest broadcasts admin discovery request func (em *ElectionManager) sendDiscoveryRequest() { + em.mu.RLock() + currentAdmin := em.currentAdmin + em.mu.RUnlock() + + // WHOAMI debug message + if currentAdmin == "" { + log.Printf("🤖 WHOAMI: I'm %s and I have no leader", em.nodeID) + } else { + log.Printf("🤖 WHOAMI: I'm %s and my leader is %s", em.nodeID, currentAdmin) + } + + log.Printf("📡 Sending admin discovery request from node %s", em.nodeID) + discoveryMsg := ElectionMessage{ Type: "admin_discovery_request", NodeID: em.nodeID, @@ -309,6 +398,8 @@ func (em *ElectionManager) sendDiscoveryRequest() { if err := em.publishElectionMessage(discoveryMsg); err != nil { log.Printf("❌ Failed to send admin discovery request: %v", err) + } else { + log.Printf("✅ Admin discovery request sent successfully") } } @@ -652,6 +743,9 @@ func (em *ElectionManager) handleAdminDiscoveryRequest(msg ElectionMessage) { state := em.state em.mu.RUnlock() + log.Printf("📩 Received admin discovery request from %s (my leader: %s, state: %s)", + msg.NodeID, currentAdmin, state) + // Only respond if we know who the current admin is and we're idle if currentAdmin != "" && state == StateIdle { responseMsg := ElectionMessage{ @@ -663,23 +757,43 @@ func (em *ElectionManager) handleAdminDiscoveryRequest(msg ElectionMessage) { }, } + log.Printf("📤 Responding to discovery with admin: %s", currentAdmin) if err := em.publishElectionMessage(responseMsg); err != nil { log.Printf("❌ Failed to send admin discovery response: %v", err) + } else { + log.Printf("✅ Admin discovery response sent successfully") } + } else { + log.Printf("🔇 Not responding to discovery (admin=%s, state=%s)", currentAdmin, state) } } // handleAdminDiscoveryResponse processes admin discovery responses func (em *ElectionManager) handleAdminDiscoveryResponse(msg ElectionMessage) { + log.Printf("📥 Received admin discovery response from %s", msg.NodeID) + if data, ok := msg.Data.(map[string]interface{}); ok { if admin, ok := data["current_admin"].(string); ok && admin != "" { em.mu.Lock() + oldAdmin := em.currentAdmin if em.currentAdmin == "" { - log.Printf("📡 Discovered admin: %s", admin) + log.Printf("📡 Discovered admin: %s (reported by %s)", admin, msg.NodeID) em.currentAdmin = admin + em.lastHeartbeat = time.Now() // Set initial heartbeat + } else if em.currentAdmin != admin { + log.Printf("⚠️ Admin conflict: I know %s, but %s reports %s", em.currentAdmin, msg.NodeID, admin) + } else { + log.Printf("📡 Admin confirmed: %s (reported by %s)", admin, msg.NodeID) } em.mu.Unlock() + + // Trigger callback if admin changed + if oldAdmin != admin && em.onAdminChanged != nil { + em.onAdminChanged(oldAdmin, admin) + } } + } else { + log.Printf("❌ Invalid admin discovery response from %s", msg.NodeID) } } diff --git a/pkg/health/enhanced_health_checks.go b/pkg/health/enhanced_health_checks.go index 5e24d00..6522a15 100644 --- a/pkg/health/enhanced_health_checks.go +++ b/pkg/health/enhanced_health_checks.go @@ -292,7 +292,7 @@ func (ehc *EnhancedHealthChecks) createElectionHealthCheck() *HealthCheck { return &HealthCheck{ Name: "election-health", Description: "Election system health and leadership stability check", - Enabled: true, + Enabled: false, // Temporarily disabled to prevent shutdown loops Critical: false, Interval: ehc.config.ElectionProbeInterval, Timeout: ehc.config.ElectionProbeTimeout, diff --git a/vendor/github.com/sony/gobreaker/LICENSE b/vendor/github.com/sony/gobreaker/LICENSE new file mode 100644 index 0000000..81795bf --- /dev/null +++ b/vendor/github.com/sony/gobreaker/LICENSE @@ -0,0 +1,21 @@ +The MIT License (MIT) + +Copyright 2015 Sony Corporation + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in +all copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN +THE SOFTWARE. diff --git a/vendor/github.com/sony/gobreaker/README.md b/vendor/github.com/sony/gobreaker/README.md new file mode 100644 index 0000000..bbc2376 --- /dev/null +++ b/vendor/github.com/sony/gobreaker/README.md @@ -0,0 +1,132 @@ +gobreaker +========= + +[![GoDoc](https://godoc.org/github.com/sony/gobreaker?status.svg)](http://godoc.org/github.com/sony/gobreaker) + +[gobreaker][repo-url] implements the [Circuit Breaker pattern](https://msdn.microsoft.com/en-us/library/dn589784.aspx) in Go. + +Installation +------------ + +``` +go get github.com/sony/gobreaker +``` + +Usage +----- + +The struct `CircuitBreaker` is a state machine to prevent sending requests that are likely to fail. +The function `NewCircuitBreaker` creates a new `CircuitBreaker`. + +```go +func NewCircuitBreaker(st Settings) *CircuitBreaker +``` + +You can configure `CircuitBreaker` by the struct `Settings`: + +```go +type Settings struct { + Name string + MaxRequests uint32 + Interval time.Duration + Timeout time.Duration + ReadyToTrip func(counts Counts) bool + OnStateChange func(name string, from State, to State) + IsSuccessful func(err error) bool +} +``` + +- `Name` is the name of the `CircuitBreaker`. + +- `MaxRequests` is the maximum number of requests allowed to pass through + when the `CircuitBreaker` is half-open. + If `MaxRequests` is 0, `CircuitBreaker` allows only 1 request. + +- `Interval` is the cyclic period of the closed state + for `CircuitBreaker` to clear the internal `Counts`, described later in this section. + If `Interval` is 0, `CircuitBreaker` doesn't clear the internal `Counts` during the closed state. + +- `Timeout` is the period of the open state, + after which the state of `CircuitBreaker` becomes half-open. + If `Timeout` is 0, the timeout value of `CircuitBreaker` is set to 60 seconds. + +- `ReadyToTrip` is called with a copy of `Counts` whenever a request fails in the closed state. + If `ReadyToTrip` returns true, `CircuitBreaker` will be placed into the open state. + If `ReadyToTrip` is `nil`, default `ReadyToTrip` is used. + Default `ReadyToTrip` returns true when the number of consecutive failures is more than 5. + +- `OnStateChange` is called whenever the state of `CircuitBreaker` changes. + +- `IsSuccessful` is called with the error returned from a request. + If `IsSuccessful` returns true, the error is counted as a success. + Otherwise the error is counted as a failure. + If `IsSuccessful` is nil, default `IsSuccessful` is used, which returns false for all non-nil errors. + +The struct `Counts` holds the numbers of requests and their successes/failures: + +```go +type Counts struct { + Requests uint32 + TotalSuccesses uint32 + TotalFailures uint32 + ConsecutiveSuccesses uint32 + ConsecutiveFailures uint32 +} +``` + +`CircuitBreaker` clears the internal `Counts` either +on the change of the state or at the closed-state intervals. +`Counts` ignores the results of the requests sent before clearing. + +`CircuitBreaker` can wrap any function to send a request: + +```go +func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) +``` + +The method `Execute` runs the given request if `CircuitBreaker` accepts it. +`Execute` returns an error instantly if `CircuitBreaker` rejects the request. +Otherwise, `Execute` returns the result of the request. +If a panic occurs in the request, `CircuitBreaker` handles it as an error +and causes the same panic again. + +Example +------- + +```go +var cb *breaker.CircuitBreaker + +func Get(url string) ([]byte, error) { + body, err := cb.Execute(func() (interface{}, error) { + resp, err := http.Get(url) + if err != nil { + return nil, err + } + + defer resp.Body.Close() + body, err := ioutil.ReadAll(resp.Body) + if err != nil { + return nil, err + } + + return body, nil + }) + if err != nil { + return nil, err + } + + return body.([]byte), nil +} +``` + +See [example](https://github.com/sony/gobreaker/blob/master/example) for details. + +License +------- + +The MIT License (MIT) + +See [LICENSE](https://github.com/sony/gobreaker/blob/master/LICENSE) for details. + + +[repo-url]: https://github.com/sony/gobreaker diff --git a/vendor/github.com/sony/gobreaker/gobreaker.go b/vendor/github.com/sony/gobreaker/gobreaker.go new file mode 100644 index 0000000..7503a27 --- /dev/null +++ b/vendor/github.com/sony/gobreaker/gobreaker.go @@ -0,0 +1,380 @@ +// Package gobreaker implements the Circuit Breaker pattern. +// See https://msdn.microsoft.com/en-us/library/dn589784.aspx. +package gobreaker + +import ( + "errors" + "fmt" + "sync" + "time" +) + +// State is a type that represents a state of CircuitBreaker. +type State int + +// These constants are states of CircuitBreaker. +const ( + StateClosed State = iota + StateHalfOpen + StateOpen +) + +var ( + // ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests + ErrTooManyRequests = errors.New("too many requests") + // ErrOpenState is returned when the CB state is open + ErrOpenState = errors.New("circuit breaker is open") +) + +// String implements stringer interface. +func (s State) String() string { + switch s { + case StateClosed: + return "closed" + case StateHalfOpen: + return "half-open" + case StateOpen: + return "open" + default: + return fmt.Sprintf("unknown state: %d", s) + } +} + +// Counts holds the numbers of requests and their successes/failures. +// CircuitBreaker clears the internal Counts either +// on the change of the state or at the closed-state intervals. +// Counts ignores the results of the requests sent before clearing. +type Counts struct { + Requests uint32 + TotalSuccesses uint32 + TotalFailures uint32 + ConsecutiveSuccesses uint32 + ConsecutiveFailures uint32 +} + +func (c *Counts) onRequest() { + c.Requests++ +} + +func (c *Counts) onSuccess() { + c.TotalSuccesses++ + c.ConsecutiveSuccesses++ + c.ConsecutiveFailures = 0 +} + +func (c *Counts) onFailure() { + c.TotalFailures++ + c.ConsecutiveFailures++ + c.ConsecutiveSuccesses = 0 +} + +func (c *Counts) clear() { + c.Requests = 0 + c.TotalSuccesses = 0 + c.TotalFailures = 0 + c.ConsecutiveSuccesses = 0 + c.ConsecutiveFailures = 0 +} + +// Settings configures CircuitBreaker: +// +// Name is the name of the CircuitBreaker. +// +// MaxRequests is the maximum number of requests allowed to pass through +// when the CircuitBreaker is half-open. +// If MaxRequests is 0, the CircuitBreaker allows only 1 request. +// +// Interval is the cyclic period of the closed state +// for the CircuitBreaker to clear the internal Counts. +// If Interval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state. +// +// Timeout is the period of the open state, +// after which the state of the CircuitBreaker becomes half-open. +// If Timeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds. +// +// ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state. +// If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state. +// If ReadyToTrip is nil, default ReadyToTrip is used. +// Default ReadyToTrip returns true when the number of consecutive failures is more than 5. +// +// OnStateChange is called whenever the state of the CircuitBreaker changes. +// +// IsSuccessful is called with the error returned from a request. +// If IsSuccessful returns true, the error is counted as a success. +// Otherwise the error is counted as a failure. +// If IsSuccessful is nil, default IsSuccessful is used, which returns false for all non-nil errors. +type Settings struct { + Name string + MaxRequests uint32 + Interval time.Duration + Timeout time.Duration + ReadyToTrip func(counts Counts) bool + OnStateChange func(name string, from State, to State) + IsSuccessful func(err error) bool +} + +// CircuitBreaker is a state machine to prevent sending requests that are likely to fail. +type CircuitBreaker struct { + name string + maxRequests uint32 + interval time.Duration + timeout time.Duration + readyToTrip func(counts Counts) bool + isSuccessful func(err error) bool + onStateChange func(name string, from State, to State) + + mutex sync.Mutex + state State + generation uint64 + counts Counts + expiry time.Time +} + +// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function +// with the breaker functionality, it only checks whether a request can proceed and +// expects the caller to report the outcome in a separate step using a callback. +type TwoStepCircuitBreaker struct { + cb *CircuitBreaker +} + +// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings. +func NewCircuitBreaker(st Settings) *CircuitBreaker { + cb := new(CircuitBreaker) + + cb.name = st.Name + cb.onStateChange = st.OnStateChange + + if st.MaxRequests == 0 { + cb.maxRequests = 1 + } else { + cb.maxRequests = st.MaxRequests + } + + if st.Interval <= 0 { + cb.interval = defaultInterval + } else { + cb.interval = st.Interval + } + + if st.Timeout <= 0 { + cb.timeout = defaultTimeout + } else { + cb.timeout = st.Timeout + } + + if st.ReadyToTrip == nil { + cb.readyToTrip = defaultReadyToTrip + } else { + cb.readyToTrip = st.ReadyToTrip + } + + if st.IsSuccessful == nil { + cb.isSuccessful = defaultIsSuccessful + } else { + cb.isSuccessful = st.IsSuccessful + } + + cb.toNewGeneration(time.Now()) + + return cb +} + +// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings. +func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker { + return &TwoStepCircuitBreaker{ + cb: NewCircuitBreaker(st), + } +} + +const defaultInterval = time.Duration(0) * time.Second +const defaultTimeout = time.Duration(60) * time.Second + +func defaultReadyToTrip(counts Counts) bool { + return counts.ConsecutiveFailures > 5 +} + +func defaultIsSuccessful(err error) bool { + return err == nil +} + +// Name returns the name of the CircuitBreaker. +func (cb *CircuitBreaker) Name() string { + return cb.name +} + +// State returns the current state of the CircuitBreaker. +func (cb *CircuitBreaker) State() State { + cb.mutex.Lock() + defer cb.mutex.Unlock() + + now := time.Now() + state, _ := cb.currentState(now) + return state +} + +// Counts returns internal counters +func (cb *CircuitBreaker) Counts() Counts { + cb.mutex.Lock() + defer cb.mutex.Unlock() + + return cb.counts +} + +// Execute runs the given request if the CircuitBreaker accepts it. +// Execute returns an error instantly if the CircuitBreaker rejects the request. +// Otherwise, Execute returns the result of the request. +// If a panic occurs in the request, the CircuitBreaker handles it as an error +// and causes the same panic again. +func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) { + generation, err := cb.beforeRequest() + if err != nil { + return nil, err + } + + defer func() { + e := recover() + if e != nil { + cb.afterRequest(generation, false) + panic(e) + } + }() + + result, err := req() + cb.afterRequest(generation, cb.isSuccessful(err)) + return result, err +} + +// Name returns the name of the TwoStepCircuitBreaker. +func (tscb *TwoStepCircuitBreaker) Name() string { + return tscb.cb.Name() +} + +// State returns the current state of the TwoStepCircuitBreaker. +func (tscb *TwoStepCircuitBreaker) State() State { + return tscb.cb.State() +} + +// Counts returns internal counters +func (tscb *TwoStepCircuitBreaker) Counts() Counts { + return tscb.cb.Counts() +} + +// Allow checks if a new request can proceed. It returns a callback that should be used to +// register the success or failure in a separate step. If the circuit breaker doesn't allow +// requests, it returns an error. +func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) { + generation, err := tscb.cb.beforeRequest() + if err != nil { + return nil, err + } + + return func(success bool) { + tscb.cb.afterRequest(generation, success) + }, nil +} + +func (cb *CircuitBreaker) beforeRequest() (uint64, error) { + cb.mutex.Lock() + defer cb.mutex.Unlock() + + now := time.Now() + state, generation := cb.currentState(now) + + if state == StateOpen { + return generation, ErrOpenState + } else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests { + return generation, ErrTooManyRequests + } + + cb.counts.onRequest() + return generation, nil +} + +func (cb *CircuitBreaker) afterRequest(before uint64, success bool) { + cb.mutex.Lock() + defer cb.mutex.Unlock() + + now := time.Now() + state, generation := cb.currentState(now) + if generation != before { + return + } + + if success { + cb.onSuccess(state, now) + } else { + cb.onFailure(state, now) + } +} + +func (cb *CircuitBreaker) onSuccess(state State, now time.Time) { + switch state { + case StateClosed: + cb.counts.onSuccess() + case StateHalfOpen: + cb.counts.onSuccess() + if cb.counts.ConsecutiveSuccesses >= cb.maxRequests { + cb.setState(StateClosed, now) + } + } +} + +func (cb *CircuitBreaker) onFailure(state State, now time.Time) { + switch state { + case StateClosed: + cb.counts.onFailure() + if cb.readyToTrip(cb.counts) { + cb.setState(StateOpen, now) + } + case StateHalfOpen: + cb.setState(StateOpen, now) + } +} + +func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) { + switch cb.state { + case StateClosed: + if !cb.expiry.IsZero() && cb.expiry.Before(now) { + cb.toNewGeneration(now) + } + case StateOpen: + if cb.expiry.Before(now) { + cb.setState(StateHalfOpen, now) + } + } + return cb.state, cb.generation +} + +func (cb *CircuitBreaker) setState(state State, now time.Time) { + if cb.state == state { + return + } + + prev := cb.state + cb.state = state + + cb.toNewGeneration(now) + + if cb.onStateChange != nil { + cb.onStateChange(cb.name, prev, state) + } +} + +func (cb *CircuitBreaker) toNewGeneration(now time.Time) { + cb.generation++ + cb.counts.clear() + + var zero time.Time + switch cb.state { + case StateClosed: + if cb.interval == 0 { + cb.expiry = zero + } else { + cb.expiry = now.Add(cb.interval) + } + case StateOpen: + cb.expiry = now.Add(cb.timeout) + default: // StateHalfOpen + cb.expiry = zero + } +} diff --git a/vendor/modules.txt b/vendor/modules.txt index 0560132..1b0ec29 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -123,7 +123,7 @@ github.com/blevesearch/zapx/v16 # github.com/cespare/xxhash/v2 v2.2.0 ## explicit; go 1.11 github.com/cespare/xxhash/v2 -# github.com/chorus-services/backbeat v0.0.0-00010101000000-000000000000 => /home/tony/chorus/project-queues/active/BACKBEAT/backbeat/prototype +# github.com/chorus-services/backbeat v0.0.0-00010101000000-000000000000 => ../BACKBEAT/backbeat/prototype ## explicit; go 1.22 github.com/chorus-services/backbeat/pkg/sdk # github.com/containerd/cgroups v1.1.0 @@ -614,6 +614,9 @@ github.com/robfig/cron/v3 github.com/sashabaranov/go-openai github.com/sashabaranov/go-openai/internal github.com/sashabaranov/go-openai/jsonschema +# github.com/sony/gobreaker v0.5.0 +## explicit; go 1.12 +github.com/sony/gobreaker # github.com/spaolacci/murmur3 v1.1.0 ## explicit github.com/spaolacci/murmur3 @@ -844,4 +847,4 @@ gopkg.in/yaml.v3 # lukechampine.com/blake3 v1.2.1 ## explicit; go 1.17 lukechampine.com/blake3 -# github.com/chorus-services/backbeat => /home/tony/chorus/project-queues/active/BACKBEAT/backbeat/prototype +# github.com/chorus-services/backbeat => ../BACKBEAT/backbeat/prototype