feature/resetdata-docker-secrets-integration #10

Merged
tony merged 3 commits from feature/resetdata-docker-secrets-integration into main 2025-09-24 00:50:37 +00:00
10 changed files with 666 additions and 14 deletions
Showing only changes of commit 26e4ef7d8b - Show all commits

View File

@@ -15,14 +15,16 @@ RUN addgroup -g 1000 chorus && \
RUN mkdir -p /app/data && \
chown -R chorus:chorus /app
# Copy pre-built binary
COPY chorus-agent /app/chorus-agent
# Copy pre-built binary from build directory (ensure it exists and is the correct one)
COPY build/chorus-agent /app/chorus-agent
RUN chmod +x /app/chorus-agent && chown chorus:chorus /app/chorus-agent
# Switch to non-root user
USER chorus
WORKDIR /app
# Note: Using correct chorus-agent binary built with 'make build-agent'
# Expose ports
EXPOSE 8080 8081 9000

Binary file not shown.

View File

@@ -2,7 +2,7 @@ version: "3.9"
services:
chorus:
image: anthonyrawlins/chorus:resetdata-secrets-v1.0.6
image: anthonyrawlins/chorus:discovery-debug
# REQUIRED: License configuration (CHORUS will not start without this)
environment:
@@ -15,7 +15,7 @@ services:
- CHORUS_AGENT_ID=${CHORUS_AGENT_ID:-} # Auto-generated if not provided
- CHORUS_SPECIALIZATION=${CHORUS_SPECIALIZATION:-general_developer}
- CHORUS_MAX_TASKS=${CHORUS_MAX_TASKS:-3}
- CHORUS_CAPABILITIES=${CHORUS_CAPABILITIES:-general_development,task_coordination}
- CHORUS_CAPABILITIES=${CHORUS_CAPABILITIES:-general_development,task_coordination,admin_election}
# Network configuration
- CHORUS_API_PORT=8080
@@ -71,7 +71,7 @@ services:
# Container resource limits
deploy:
mode: replicated
replicas: ${CHORUS_REPLICAS:-1}
replicas: ${CHORUS_REPLICAS:-9}
update_config:
parallelism: 1
delay: 10s

View File

@@ -216,7 +216,7 @@ func LoadFromEnvironment() (*Config, error) {
AuditLogging: getEnvBoolOrDefault("CHORUS_AUDIT_LOGGING", true),
AuditPath: getEnvOrDefault("CHORUS_AUDIT_PATH", "/tmp/chorus-audit.log"),
ElectionConfig: ElectionConfig{
DiscoveryTimeout: getEnvDurationOrDefault("CHORUS_DISCOVERY_TIMEOUT", 10*time.Second),
DiscoveryTimeout: getEnvDurationOrDefault("CHORUS_DISCOVERY_TIMEOUT", 15*time.Second),
HeartbeatTimeout: getEnvDurationOrDefault("CHORUS_HEARTBEAT_TIMEOUT", 30*time.Second),
ElectionTimeout: getEnvDurationOrDefault("CHORUS_ELECTION_TIMEOUT", 60*time.Second),
DiscoveryBackoff: getEnvDurationOrDefault("CHORUS_DISCOVERY_BACKOFF", 5*time.Second),

View File

@@ -167,10 +167,18 @@ func (em *ElectionManager) Start() error {
}
// Start discovery process
go em.startDiscoveryLoop()
log.Printf("🔍 About to start discovery loop goroutine...")
go func() {
log.Printf("🔍 Discovery loop goroutine started successfully")
em.startDiscoveryLoop()
}()
// Start election coordinator
go em.electionCoordinator()
log.Printf("🗳️ About to start election coordinator goroutine...")
go func() {
log.Printf("🗳️ Election coordinator goroutine started successfully")
em.electionCoordinator()
}()
// Start heartbeat if this node is already admin at startup
if em.IsCurrentAdmin() {
@@ -214,6 +222,16 @@ func (em *ElectionManager) Stop() {
// TriggerElection manually triggers an election
func (em *ElectionManager) TriggerElection(trigger ElectionTrigger) {
// Check if election already in progress
em.mu.RLock()
currentState := em.state
em.mu.RUnlock()
if currentState != StateIdle {
log.Printf("🗳️ Election already in progress (state: %s), ignoring trigger: %s", currentState, trigger)
return
}
select {
case em.electionTrigger <- trigger:
log.Printf("🗳️ Election triggered: %s", trigger)
@@ -262,13 +280,27 @@ func (em *ElectionManager) GetHeartbeatStatus() map[string]interface{} {
// startDiscoveryLoop starts the admin discovery loop
func (em *ElectionManager) startDiscoveryLoop() {
log.Printf("🔍 Starting admin discovery loop")
defer func() {
if r := recover(); r != nil {
log.Printf("🔍 PANIC in discovery loop: %v", r)
}
log.Printf("🔍 Discovery loop goroutine exiting")
}()
log.Printf("🔍 ENHANCED-DEBUG: Starting admin discovery loop with timeout: %v", em.config.Security.ElectionConfig.DiscoveryTimeout)
log.Printf("🔍 ENHANCED-DEBUG: Context status: err=%v", em.ctx.Err())
log.Printf("🔍 ENHANCED-DEBUG: Node ID: %s, Can be admin: %v", em.nodeID, em.canBeAdmin())
for {
log.Printf("🔍 Discovery loop iteration starting, waiting for timeout...")
log.Printf("🔍 Context status before select: err=%v", em.ctx.Err())
select {
case <-em.ctx.Done():
log.Printf("🔍 Discovery loop cancelled via context: %v", em.ctx.Err())
return
case <-time.After(em.config.Security.ElectionConfig.DiscoveryTimeout):
log.Printf("🔍 Discovery timeout triggered! Calling performAdminDiscovery()...")
em.performAdminDiscovery()
}
}
@@ -281,8 +313,12 @@ func (em *ElectionManager) performAdminDiscovery() {
lastHeartbeat := em.lastHeartbeat
em.mu.Unlock()
log.Printf("🔍 Discovery check: state=%s, lastHeartbeat=%v, canAdmin=%v",
currentState, lastHeartbeat, em.canBeAdmin())
// Only discover if we're idle or the heartbeat is stale
if currentState != StateIdle {
log.Printf("🔍 Skipping discovery - not in idle state (current: %s)", currentState)
return
}
@@ -294,13 +330,66 @@ func (em *ElectionManager) performAdminDiscovery() {
}
// If we haven't heard from an admin recently, try to discover one
if lastHeartbeat.IsZero() || time.Since(lastHeartbeat) > em.config.Security.ElectionConfig.DiscoveryTimeout/2 {
timeSinceHeartbeat := time.Since(lastHeartbeat)
discoveryThreshold := em.config.Security.ElectionConfig.DiscoveryTimeout / 2
log.Printf("🔍 Heartbeat check: isZero=%v, timeSince=%v, threshold=%v",
lastHeartbeat.IsZero(), timeSinceHeartbeat, discoveryThreshold)
if lastHeartbeat.IsZero() || timeSinceHeartbeat > discoveryThreshold {
log.Printf("🔍 Sending discovery request...")
em.sendDiscoveryRequest()
// 🚨 CRITICAL FIX: If we have no admin and can become admin, trigger election after discovery timeout
em.mu.Lock()
currentAdmin := em.currentAdmin
em.mu.Unlock()
if currentAdmin == "" && em.canBeAdmin() {
log.Printf("🗳️ No admin discovered and we can be admin - scheduling election check")
go func() {
// Add randomization to prevent simultaneous elections from all nodes
baseDelay := em.config.Security.ElectionConfig.DiscoveryTimeout * 2
randomDelay := time.Duration(rand.Intn(int(em.config.Security.ElectionConfig.DiscoveryTimeout)))
totalDelay := baseDelay + randomDelay
log.Printf("🗳️ Waiting %v before checking if election needed", totalDelay)
time.Sleep(totalDelay)
// Check again if still no admin and no one else started election
em.mu.RLock()
stillNoAdmin := em.currentAdmin == ""
stillIdle := em.state == StateIdle
em.mu.RUnlock()
if stillNoAdmin && stillIdle && em.canBeAdmin() {
log.Printf("🗳️ Election grace period expired with no admin - triggering election")
em.TriggerElection(TriggerDiscoveryFailure)
} else {
log.Printf("🗳️ Election check: admin=%s, state=%s - skipping election", em.currentAdmin, em.state)
}
}()
}
} else {
log.Printf("🔍 Discovery threshold not met - waiting")
}
}
// sendDiscoveryRequest broadcasts admin discovery request
func (em *ElectionManager) sendDiscoveryRequest() {
em.mu.RLock()
currentAdmin := em.currentAdmin
em.mu.RUnlock()
// WHOAMI debug message
if currentAdmin == "" {
log.Printf("🤖 WHOAMI: I'm %s and I have no leader", em.nodeID)
} else {
log.Printf("🤖 WHOAMI: I'm %s and my leader is %s", em.nodeID, currentAdmin)
}
log.Printf("📡 Sending admin discovery request from node %s", em.nodeID)
discoveryMsg := ElectionMessage{
Type: "admin_discovery_request",
NodeID: em.nodeID,
@@ -309,6 +398,8 @@ func (em *ElectionManager) sendDiscoveryRequest() {
if err := em.publishElectionMessage(discoveryMsg); err != nil {
log.Printf("❌ Failed to send admin discovery request: %v", err)
} else {
log.Printf("✅ Admin discovery request sent successfully")
}
}
@@ -652,6 +743,9 @@ func (em *ElectionManager) handleAdminDiscoveryRequest(msg ElectionMessage) {
state := em.state
em.mu.RUnlock()
log.Printf("📩 Received admin discovery request from %s (my leader: %s, state: %s)",
msg.NodeID, currentAdmin, state)
// Only respond if we know who the current admin is and we're idle
if currentAdmin != "" && state == StateIdle {
responseMsg := ElectionMessage{
@@ -663,23 +757,43 @@ func (em *ElectionManager) handleAdminDiscoveryRequest(msg ElectionMessage) {
},
}
log.Printf("📤 Responding to discovery with admin: %s", currentAdmin)
if err := em.publishElectionMessage(responseMsg); err != nil {
log.Printf("❌ Failed to send admin discovery response: %v", err)
} else {
log.Printf("✅ Admin discovery response sent successfully")
}
} else {
log.Printf("🔇 Not responding to discovery (admin=%s, state=%s)", currentAdmin, state)
}
}
// handleAdminDiscoveryResponse processes admin discovery responses
func (em *ElectionManager) handleAdminDiscoveryResponse(msg ElectionMessage) {
log.Printf("📥 Received admin discovery response from %s", msg.NodeID)
if data, ok := msg.Data.(map[string]interface{}); ok {
if admin, ok := data["current_admin"].(string); ok && admin != "" {
em.mu.Lock()
oldAdmin := em.currentAdmin
if em.currentAdmin == "" {
log.Printf("📡 Discovered admin: %s", admin)
log.Printf("📡 Discovered admin: %s (reported by %s)", admin, msg.NodeID)
em.currentAdmin = admin
em.lastHeartbeat = time.Now() // Set initial heartbeat
} else if em.currentAdmin != admin {
log.Printf("⚠️ Admin conflict: I know %s, but %s reports %s", em.currentAdmin, msg.NodeID, admin)
} else {
log.Printf("📡 Admin confirmed: %s (reported by %s)", admin, msg.NodeID)
}
em.mu.Unlock()
// Trigger callback if admin changed
if oldAdmin != admin && em.onAdminChanged != nil {
em.onAdminChanged(oldAdmin, admin)
}
}
} else {
log.Printf("❌ Invalid admin discovery response from %s", msg.NodeID)
}
}

View File

@@ -292,7 +292,7 @@ func (ehc *EnhancedHealthChecks) createElectionHealthCheck() *HealthCheck {
return &HealthCheck{
Name: "election-health",
Description: "Election system health and leadership stability check",
Enabled: true,
Enabled: false, // Temporarily disabled to prevent shutdown loops
Critical: false,
Interval: ehc.config.ElectionProbeInterval,
Timeout: ehc.config.ElectionProbeTimeout,

21
vendor/github.com/sony/gobreaker/LICENSE generated vendored Normal file
View File

@@ -0,0 +1,21 @@
The MIT License (MIT)
Copyright 2015 Sony Corporation
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in
all copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN
THE SOFTWARE.

132
vendor/github.com/sony/gobreaker/README.md generated vendored Normal file
View File

@@ -0,0 +1,132 @@
gobreaker
=========
[![GoDoc](https://godoc.org/github.com/sony/gobreaker?status.svg)](http://godoc.org/github.com/sony/gobreaker)
[gobreaker][repo-url] implements the [Circuit Breaker pattern](https://msdn.microsoft.com/en-us/library/dn589784.aspx) in Go.
Installation
------------
```
go get github.com/sony/gobreaker
```
Usage
-----
The struct `CircuitBreaker` is a state machine to prevent sending requests that are likely to fail.
The function `NewCircuitBreaker` creates a new `CircuitBreaker`.
```go
func NewCircuitBreaker(st Settings) *CircuitBreaker
```
You can configure `CircuitBreaker` by the struct `Settings`:
```go
type Settings struct {
Name string
MaxRequests uint32
Interval time.Duration
Timeout time.Duration
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
IsSuccessful func(err error) bool
}
```
- `Name` is the name of the `CircuitBreaker`.
- `MaxRequests` is the maximum number of requests allowed to pass through
when the `CircuitBreaker` is half-open.
If `MaxRequests` is 0, `CircuitBreaker` allows only 1 request.
- `Interval` is the cyclic period of the closed state
for `CircuitBreaker` to clear the internal `Counts`, described later in this section.
If `Interval` is 0, `CircuitBreaker` doesn't clear the internal `Counts` during the closed state.
- `Timeout` is the period of the open state,
after which the state of `CircuitBreaker` becomes half-open.
If `Timeout` is 0, the timeout value of `CircuitBreaker` is set to 60 seconds.
- `ReadyToTrip` is called with a copy of `Counts` whenever a request fails in the closed state.
If `ReadyToTrip` returns true, `CircuitBreaker` will be placed into the open state.
If `ReadyToTrip` is `nil`, default `ReadyToTrip` is used.
Default `ReadyToTrip` returns true when the number of consecutive failures is more than 5.
- `OnStateChange` is called whenever the state of `CircuitBreaker` changes.
- `IsSuccessful` is called with the error returned from a request.
If `IsSuccessful` returns true, the error is counted as a success.
Otherwise the error is counted as a failure.
If `IsSuccessful` is nil, default `IsSuccessful` is used, which returns false for all non-nil errors.
The struct `Counts` holds the numbers of requests and their successes/failures:
```go
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
```
`CircuitBreaker` clears the internal `Counts` either
on the change of the state or at the closed-state intervals.
`Counts` ignores the results of the requests sent before clearing.
`CircuitBreaker` can wrap any function to send a request:
```go
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error)
```
The method `Execute` runs the given request if `CircuitBreaker` accepts it.
`Execute` returns an error instantly if `CircuitBreaker` rejects the request.
Otherwise, `Execute` returns the result of the request.
If a panic occurs in the request, `CircuitBreaker` handles it as an error
and causes the same panic again.
Example
-------
```go
var cb *breaker.CircuitBreaker
func Get(url string) ([]byte, error) {
body, err := cb.Execute(func() (interface{}, error) {
resp, err := http.Get(url)
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err := ioutil.ReadAll(resp.Body)
if err != nil {
return nil, err
}
return body, nil
})
if err != nil {
return nil, err
}
return body.([]byte), nil
}
```
See [example](https://github.com/sony/gobreaker/blob/master/example) for details.
License
-------
The MIT License (MIT)
See [LICENSE](https://github.com/sony/gobreaker/blob/master/LICENSE) for details.
[repo-url]: https://github.com/sony/gobreaker

380
vendor/github.com/sony/gobreaker/gobreaker.go generated vendored Normal file
View File

@@ -0,0 +1,380 @@
// Package gobreaker implements the Circuit Breaker pattern.
// See https://msdn.microsoft.com/en-us/library/dn589784.aspx.
package gobreaker
import (
"errors"
"fmt"
"sync"
"time"
)
// State is a type that represents a state of CircuitBreaker.
type State int
// These constants are states of CircuitBreaker.
const (
StateClosed State = iota
StateHalfOpen
StateOpen
)
var (
// ErrTooManyRequests is returned when the CB state is half open and the requests count is over the cb maxRequests
ErrTooManyRequests = errors.New("too many requests")
// ErrOpenState is returned when the CB state is open
ErrOpenState = errors.New("circuit breaker is open")
)
// String implements stringer interface.
func (s State) String() string {
switch s {
case StateClosed:
return "closed"
case StateHalfOpen:
return "half-open"
case StateOpen:
return "open"
default:
return fmt.Sprintf("unknown state: %d", s)
}
}
// Counts holds the numbers of requests and their successes/failures.
// CircuitBreaker clears the internal Counts either
// on the change of the state or at the closed-state intervals.
// Counts ignores the results of the requests sent before clearing.
type Counts struct {
Requests uint32
TotalSuccesses uint32
TotalFailures uint32
ConsecutiveSuccesses uint32
ConsecutiveFailures uint32
}
func (c *Counts) onRequest() {
c.Requests++
}
func (c *Counts) onSuccess() {
c.TotalSuccesses++
c.ConsecutiveSuccesses++
c.ConsecutiveFailures = 0
}
func (c *Counts) onFailure() {
c.TotalFailures++
c.ConsecutiveFailures++
c.ConsecutiveSuccesses = 0
}
func (c *Counts) clear() {
c.Requests = 0
c.TotalSuccesses = 0
c.TotalFailures = 0
c.ConsecutiveSuccesses = 0
c.ConsecutiveFailures = 0
}
// Settings configures CircuitBreaker:
//
// Name is the name of the CircuitBreaker.
//
// MaxRequests is the maximum number of requests allowed to pass through
// when the CircuitBreaker is half-open.
// If MaxRequests is 0, the CircuitBreaker allows only 1 request.
//
// Interval is the cyclic period of the closed state
// for the CircuitBreaker to clear the internal Counts.
// If Interval is less than or equal to 0, the CircuitBreaker doesn't clear internal Counts during the closed state.
//
// Timeout is the period of the open state,
// after which the state of the CircuitBreaker becomes half-open.
// If Timeout is less than or equal to 0, the timeout value of the CircuitBreaker is set to 60 seconds.
//
// ReadyToTrip is called with a copy of Counts whenever a request fails in the closed state.
// If ReadyToTrip returns true, the CircuitBreaker will be placed into the open state.
// If ReadyToTrip is nil, default ReadyToTrip is used.
// Default ReadyToTrip returns true when the number of consecutive failures is more than 5.
//
// OnStateChange is called whenever the state of the CircuitBreaker changes.
//
// IsSuccessful is called with the error returned from a request.
// If IsSuccessful returns true, the error is counted as a success.
// Otherwise the error is counted as a failure.
// If IsSuccessful is nil, default IsSuccessful is used, which returns false for all non-nil errors.
type Settings struct {
Name string
MaxRequests uint32
Interval time.Duration
Timeout time.Duration
ReadyToTrip func(counts Counts) bool
OnStateChange func(name string, from State, to State)
IsSuccessful func(err error) bool
}
// CircuitBreaker is a state machine to prevent sending requests that are likely to fail.
type CircuitBreaker struct {
name string
maxRequests uint32
interval time.Duration
timeout time.Duration
readyToTrip func(counts Counts) bool
isSuccessful func(err error) bool
onStateChange func(name string, from State, to State)
mutex sync.Mutex
state State
generation uint64
counts Counts
expiry time.Time
}
// TwoStepCircuitBreaker is like CircuitBreaker but instead of surrounding a function
// with the breaker functionality, it only checks whether a request can proceed and
// expects the caller to report the outcome in a separate step using a callback.
type TwoStepCircuitBreaker struct {
cb *CircuitBreaker
}
// NewCircuitBreaker returns a new CircuitBreaker configured with the given Settings.
func NewCircuitBreaker(st Settings) *CircuitBreaker {
cb := new(CircuitBreaker)
cb.name = st.Name
cb.onStateChange = st.OnStateChange
if st.MaxRequests == 0 {
cb.maxRequests = 1
} else {
cb.maxRequests = st.MaxRequests
}
if st.Interval <= 0 {
cb.interval = defaultInterval
} else {
cb.interval = st.Interval
}
if st.Timeout <= 0 {
cb.timeout = defaultTimeout
} else {
cb.timeout = st.Timeout
}
if st.ReadyToTrip == nil {
cb.readyToTrip = defaultReadyToTrip
} else {
cb.readyToTrip = st.ReadyToTrip
}
if st.IsSuccessful == nil {
cb.isSuccessful = defaultIsSuccessful
} else {
cb.isSuccessful = st.IsSuccessful
}
cb.toNewGeneration(time.Now())
return cb
}
// NewTwoStepCircuitBreaker returns a new TwoStepCircuitBreaker configured with the given Settings.
func NewTwoStepCircuitBreaker(st Settings) *TwoStepCircuitBreaker {
return &TwoStepCircuitBreaker{
cb: NewCircuitBreaker(st),
}
}
const defaultInterval = time.Duration(0) * time.Second
const defaultTimeout = time.Duration(60) * time.Second
func defaultReadyToTrip(counts Counts) bool {
return counts.ConsecutiveFailures > 5
}
func defaultIsSuccessful(err error) bool {
return err == nil
}
// Name returns the name of the CircuitBreaker.
func (cb *CircuitBreaker) Name() string {
return cb.name
}
// State returns the current state of the CircuitBreaker.
func (cb *CircuitBreaker) State() State {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, _ := cb.currentState(now)
return state
}
// Counts returns internal counters
func (cb *CircuitBreaker) Counts() Counts {
cb.mutex.Lock()
defer cb.mutex.Unlock()
return cb.counts
}
// Execute runs the given request if the CircuitBreaker accepts it.
// Execute returns an error instantly if the CircuitBreaker rejects the request.
// Otherwise, Execute returns the result of the request.
// If a panic occurs in the request, the CircuitBreaker handles it as an error
// and causes the same panic again.
func (cb *CircuitBreaker) Execute(req func() (interface{}, error)) (interface{}, error) {
generation, err := cb.beforeRequest()
if err != nil {
return nil, err
}
defer func() {
e := recover()
if e != nil {
cb.afterRequest(generation, false)
panic(e)
}
}()
result, err := req()
cb.afterRequest(generation, cb.isSuccessful(err))
return result, err
}
// Name returns the name of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) Name() string {
return tscb.cb.Name()
}
// State returns the current state of the TwoStepCircuitBreaker.
func (tscb *TwoStepCircuitBreaker) State() State {
return tscb.cb.State()
}
// Counts returns internal counters
func (tscb *TwoStepCircuitBreaker) Counts() Counts {
return tscb.cb.Counts()
}
// Allow checks if a new request can proceed. It returns a callback that should be used to
// register the success or failure in a separate step. If the circuit breaker doesn't allow
// requests, it returns an error.
func (tscb *TwoStepCircuitBreaker) Allow() (done func(success bool), err error) {
generation, err := tscb.cb.beforeRequest()
if err != nil {
return nil, err
}
return func(success bool) {
tscb.cb.afterRequest(generation, success)
}, nil
}
func (cb *CircuitBreaker) beforeRequest() (uint64, error) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if state == StateOpen {
return generation, ErrOpenState
} else if state == StateHalfOpen && cb.counts.Requests >= cb.maxRequests {
return generation, ErrTooManyRequests
}
cb.counts.onRequest()
return generation, nil
}
func (cb *CircuitBreaker) afterRequest(before uint64, success bool) {
cb.mutex.Lock()
defer cb.mutex.Unlock()
now := time.Now()
state, generation := cb.currentState(now)
if generation != before {
return
}
if success {
cb.onSuccess(state, now)
} else {
cb.onFailure(state, now)
}
}
func (cb *CircuitBreaker) onSuccess(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onSuccess()
case StateHalfOpen:
cb.counts.onSuccess()
if cb.counts.ConsecutiveSuccesses >= cb.maxRequests {
cb.setState(StateClosed, now)
}
}
}
func (cb *CircuitBreaker) onFailure(state State, now time.Time) {
switch state {
case StateClosed:
cb.counts.onFailure()
if cb.readyToTrip(cb.counts) {
cb.setState(StateOpen, now)
}
case StateHalfOpen:
cb.setState(StateOpen, now)
}
}
func (cb *CircuitBreaker) currentState(now time.Time) (State, uint64) {
switch cb.state {
case StateClosed:
if !cb.expiry.IsZero() && cb.expiry.Before(now) {
cb.toNewGeneration(now)
}
case StateOpen:
if cb.expiry.Before(now) {
cb.setState(StateHalfOpen, now)
}
}
return cb.state, cb.generation
}
func (cb *CircuitBreaker) setState(state State, now time.Time) {
if cb.state == state {
return
}
prev := cb.state
cb.state = state
cb.toNewGeneration(now)
if cb.onStateChange != nil {
cb.onStateChange(cb.name, prev, state)
}
}
func (cb *CircuitBreaker) toNewGeneration(now time.Time) {
cb.generation++
cb.counts.clear()
var zero time.Time
switch cb.state {
case StateClosed:
if cb.interval == 0 {
cb.expiry = zero
} else {
cb.expiry = now.Add(cb.interval)
}
case StateOpen:
cb.expiry = now.Add(cb.timeout)
default: // StateHalfOpen
cb.expiry = zero
}
}

7
vendor/modules.txt vendored
View File

@@ -123,7 +123,7 @@ github.com/blevesearch/zapx/v16
# github.com/cespare/xxhash/v2 v2.2.0
## explicit; go 1.11
github.com/cespare/xxhash/v2
# github.com/chorus-services/backbeat v0.0.0-00010101000000-000000000000 => /home/tony/chorus/project-queues/active/BACKBEAT/backbeat/prototype
# github.com/chorus-services/backbeat v0.0.0-00010101000000-000000000000 => ../BACKBEAT/backbeat/prototype
## explicit; go 1.22
github.com/chorus-services/backbeat/pkg/sdk
# github.com/containerd/cgroups v1.1.0
@@ -614,6 +614,9 @@ github.com/robfig/cron/v3
github.com/sashabaranov/go-openai
github.com/sashabaranov/go-openai/internal
github.com/sashabaranov/go-openai/jsonschema
# github.com/sony/gobreaker v0.5.0
## explicit; go 1.12
github.com/sony/gobreaker
# github.com/spaolacci/murmur3 v1.1.0
## explicit
github.com/spaolacci/murmur3
@@ -844,4 +847,4 @@ gopkg.in/yaml.v3
# lukechampine.com/blake3 v1.2.1
## explicit; go 1.17
lukechampine.com/blake3
# github.com/chorus-services/backbeat => /home/tony/chorus/project-queues/active/BACKBEAT/backbeat/prototype
# github.com/chorus-services/backbeat => ../BACKBEAT/backbeat/prototype