Files
SWOOSH/executor.go
2025-10-24 18:35:13 +11:00

249 lines
5.8 KiB
Go

package swoosh
import (
"encoding/json"
"fmt"
"time"
)
const defaultSnapshotInterval uint64 = 32
// GuardProvider evaluates transition guards prior to reducer execution.
type GuardProvider interface {
Evaluate(t TransitionProposal, s OrchestratorState) (GuardOutcome, error)
}
// ApplyResult communicates the outcome of an executor-applied transition.
type ApplyResult struct {
Success bool
Error error
NewState OrchestratorState
GuardInfo GuardOutcome
}
type applyRequest struct {
proposal TransitionProposal
response chan<- ApplyResult
}
type stateRequest struct {
response chan<- OrchestratorState
}
// Executor serializes state mutations for the orchestrator.
type Executor struct {
applyCh chan applyRequest
stateReqCh chan stateRequest
wal WALStore
snapshotStore SnapshotStore
guard GuardProvider
state OrchestratorState
snapshotEvery uint64
appliedSinceSn uint64
lastIndex uint64
}
// NewExecutor constructs and starts an Executor using the provided collaborators.
func NewExecutor(wal WALStore, snap SnapshotStore, guard GuardProvider, initial Snapshot) *Executor {
state := initial.State
if cloned, err := cloneState(state); err == nil {
state = cloned
}
if state.StateHash == "" {
if hash, err := computeStateHash(state); err == nil {
state.StateHash = hash
}
}
lastIdx := initial.LastAppliedIndex
if walIdx := wal.LastIndex(); walIdx > lastIdx {
lastIdx = walIdx
}
exec := &Executor{
applyCh: make(chan applyRequest),
stateReqCh: make(chan stateRequest),
wal: wal,
snapshotStore: snap,
guard: guard,
state: state,
snapshotEvery: defaultSnapshotInterval,
appliedSinceSn: 0,
lastIndex: lastIdx,
}
go exec.loop()
return exec
}
// SubmitTransition enqueues a TransitionProposal and returns a channel for its result.
func (e *Executor) SubmitTransition(p TransitionProposal) (<-chan ApplyResult, error) {
if p.TransitionName == "" {
return nil, fmt.Errorf("%w: %s", ErrUnknownTransition, "empty transition name")
}
resCh := make(chan ApplyResult, 1)
req := applyRequest{proposal: p, response: resCh}
e.applyCh <- req
return resCh, nil
}
// GetStateSnapshot returns a deep copy of the current orchestrator state.
func (e *Executor) GetStateSnapshot() OrchestratorState {
respCh := make(chan OrchestratorState, 1)
e.stateReqCh <- stateRequest{response: respCh}
snapshot := <-respCh
return snapshot
}
func (e *Executor) loop() {
for {
select {
case req := <-e.applyCh:
e.handleApply(req)
case req := <-e.stateReqCh:
clone, err := cloneState(e.state)
if err != nil {
clone = e.state
}
req.response <- clone
}
}
}
func (e *Executor) handleApply(req applyRequest) {
proposal := req.proposal
result := ApplyResult{}
if e.state.Policy.Quarantined && proposal.TransitionName != "TRANSITION_QUARANTINE_RELEASE" && proposal.TransitionName != "TRANSITION_QUARANTINE_CONFIRM_BLOCK" {
result.Error = fmt.Errorf("%w: %s", ErrQuarantined, proposal.TransitionName)
req.response <- result
return
}
guardOutcome, err := e.evaluateGuard(proposal)
if err != nil {
result.Error = fmt.Errorf("guard evaluation: %w", err)
req.response <- result
return
}
newState, err := Reduce(e.state, proposal, guardOutcome)
if err != nil {
result.Error = fmt.Errorf("reduce: %w", err)
result.GuardInfo = guardOutcome
req.response <- result
return
}
if newState.StateHash == e.state.StateHash {
result.Success = true
result.NewState = e.cloneForResult(newState)
result.GuardInfo = guardOutcome
req.response <- result
return
}
walRecord := WALRecord{
StatePreHash: e.state.StateHash,
StatePostHash: newState.StateHash,
Transition: proposal,
Guard: guardOutcome,
AppliedAtHLC: proposal.HLC,
AppliedAtUnixNs: time.Now().UnixNano(),
Index: e.nextIndex(),
}
if err := e.appendAndSync(walRecord); err != nil {
result.Error = fmt.Errorf("wal append: %w", err)
result.GuardInfo = guardOutcome
req.response <- result
return
}
e.state = newState
e.lastIndex = walRecord.Index
e.appliedSinceSn++
if e.shouldSnapshot() {
snap := Snapshot{
State: e.state,
LastAppliedHLC: e.state.HLCLast,
LastAppliedIndex: e.lastIndex,
}
if err := e.saveSnapshot(snap); err != nil {
result.Error = fmt.Errorf("snapshot save: %w", err)
result.GuardInfo = guardOutcome
req.response <- result
return
}
e.appliedSinceSn = 0
}
result.Success = true
result.NewState = e.cloneForResult(e.state)
result.GuardInfo = guardOutcome
req.response <- result
}
func (e *Executor) evaluateGuard(p TransitionProposal) (GuardOutcome, error) {
if e.guard == nil {
return GuardOutcome{
LicenseOK: true,
BackbeatOK: true,
QuorumOK: true,
PolicyOK: true,
MCPHealthy: true,
}, nil
}
return e.guard.Evaluate(p, e.state)
}
func (e *Executor) appendAndSync(record WALRecord) error {
if err := e.wal.Append(record); err != nil {
return err
}
if err := e.wal.Sync(); err != nil {
return err
}
return nil
}
func (e *Executor) saveSnapshot(s Snapshot) error {
if e.snapshotStore == nil {
return nil
}
return e.snapshotStore.Save(s)
}
func (e *Executor) shouldSnapshot() bool {
return e.snapshotEvery > 0 && e.appliedSinceSn >= e.snapshotEvery && e.snapshotStore != nil
}
func (e *Executor) nextIndex() uint64 {
if e.lastIndex == 0 {
return 1
}
return e.lastIndex + 1
}
func (e *Executor) cloneForResult(state OrchestratorState) OrchestratorState {
cloned, err := cloneState(state)
if err != nil {
return state
}
return cloned
}
func cloneState(state OrchestratorState) (OrchestratorState, error) {
payload, err := json.Marshal(state)
if err != nil {
return OrchestratorState{}, err
}
var out OrchestratorState
if err := json.Unmarshal(payload, &out); err != nil {
return OrchestratorState{}, err
}
return out, nil
}