package swoosh import ( "encoding/json" "fmt" "time" ) const defaultSnapshotInterval uint64 = 32 // GuardProvider evaluates transition guards prior to reducer execution. type GuardProvider interface { Evaluate(t TransitionProposal, s OrchestratorState) (GuardOutcome, error) } // ApplyResult communicates the outcome of an executor-applied transition. type ApplyResult struct { Success bool Error error NewState OrchestratorState GuardInfo GuardOutcome } type applyRequest struct { proposal TransitionProposal response chan<- ApplyResult } type stateRequest struct { response chan<- OrchestratorState } // Executor serializes state mutations for the orchestrator. type Executor struct { applyCh chan applyRequest stateReqCh chan stateRequest wal WALStore snapshotStore SnapshotStore guard GuardProvider state OrchestratorState snapshotEvery uint64 appliedSinceSn uint64 lastIndex uint64 } // NewExecutor constructs and starts an Executor using the provided collaborators. func NewExecutor(wal WALStore, snap SnapshotStore, guard GuardProvider, initial Snapshot) *Executor { state := initial.State if cloned, err := cloneState(state); err == nil { state = cloned } if state.StateHash == "" { if hash, err := computeStateHash(state); err == nil { state.StateHash = hash } } lastIdx := initial.LastAppliedIndex if walIdx := wal.LastIndex(); walIdx > lastIdx { lastIdx = walIdx } exec := &Executor{ applyCh: make(chan applyRequest), stateReqCh: make(chan stateRequest), wal: wal, snapshotStore: snap, guard: guard, state: state, snapshotEvery: defaultSnapshotInterval, appliedSinceSn: 0, lastIndex: lastIdx, } go exec.loop() return exec } // SubmitTransition enqueues a TransitionProposal and returns a channel for its result. func (e *Executor) SubmitTransition(p TransitionProposal) (<-chan ApplyResult, error) { if p.TransitionName == "" { return nil, fmt.Errorf("%w: %s", ErrUnknownTransition, "empty transition name") } resCh := make(chan ApplyResult, 1) req := applyRequest{proposal: p, response: resCh} e.applyCh <- req return resCh, nil } // GetStateSnapshot returns a deep copy of the current orchestrator state. func (e *Executor) GetStateSnapshot() OrchestratorState { respCh := make(chan OrchestratorState, 1) e.stateReqCh <- stateRequest{response: respCh} snapshot := <-respCh return snapshot } func (e *Executor) loop() { for { select { case req := <-e.applyCh: e.handleApply(req) case req := <-e.stateReqCh: clone, err := cloneState(e.state) if err != nil { clone = e.state } req.response <- clone } } } func (e *Executor) handleApply(req applyRequest) { proposal := req.proposal result := ApplyResult{} if e.state.Policy.Quarantined && proposal.TransitionName != "TRANSITION_QUARANTINE_RELEASE" && proposal.TransitionName != "TRANSITION_QUARANTINE_CONFIRM_BLOCK" { result.Error = fmt.Errorf("%w: %s", ErrQuarantined, proposal.TransitionName) req.response <- result return } guardOutcome, err := e.evaluateGuard(proposal) if err != nil { result.Error = fmt.Errorf("guard evaluation: %w", err) req.response <- result return } newState, err := Reduce(e.state, proposal, guardOutcome) if err != nil { result.Error = fmt.Errorf("reduce: %w", err) result.GuardInfo = guardOutcome req.response <- result return } if newState.StateHash == e.state.StateHash { result.Success = true result.NewState = e.cloneForResult(newState) result.GuardInfo = guardOutcome req.response <- result return } walRecord := WALRecord{ StatePreHash: e.state.StateHash, StatePostHash: newState.StateHash, Transition: proposal, Guard: guardOutcome, AppliedAtHLC: proposal.HLC, AppliedAtUnixNs: time.Now().UnixNano(), Index: e.nextIndex(), } if err := e.appendAndSync(walRecord); err != nil { result.Error = fmt.Errorf("wal append: %w", err) result.GuardInfo = guardOutcome req.response <- result return } e.state = newState e.lastIndex = walRecord.Index e.appliedSinceSn++ if e.shouldSnapshot() { snap := Snapshot{ State: e.state, LastAppliedHLC: e.state.HLCLast, LastAppliedIndex: e.lastIndex, } if err := e.saveSnapshot(snap); err != nil { result.Error = fmt.Errorf("snapshot save: %w", err) result.GuardInfo = guardOutcome req.response <- result return } e.appliedSinceSn = 0 } result.Success = true result.NewState = e.cloneForResult(e.state) result.GuardInfo = guardOutcome req.response <- result } func (e *Executor) evaluateGuard(p TransitionProposal) (GuardOutcome, error) { if e.guard == nil { return GuardOutcome{ LicenseOK: true, BackbeatOK: true, QuorumOK: true, PolicyOK: true, MCPHealthy: true, }, nil } return e.guard.Evaluate(p, e.state) } func (e *Executor) appendAndSync(record WALRecord) error { if err := e.wal.Append(record); err != nil { return err } if err := e.wal.Sync(); err != nil { return err } return nil } func (e *Executor) saveSnapshot(s Snapshot) error { if e.snapshotStore == nil { return nil } return e.snapshotStore.Save(s) } func (e *Executor) shouldSnapshot() bool { return e.snapshotEvery > 0 && e.appliedSinceSn >= e.snapshotEvery && e.snapshotStore != nil } func (e *Executor) nextIndex() uint64 { if e.lastIndex == 0 { return 1 } return e.lastIndex + 1 } func (e *Executor) cloneForResult(state OrchestratorState) OrchestratorState { cloned, err := cloneState(state) if err != nil { return state } return cloned } func cloneState(state OrchestratorState) (OrchestratorState, error) { payload, err := json.Marshal(state) if err != nil { return OrchestratorState{}, err } var out OrchestratorState if err := json.Unmarshal(payload, &out); err != nil { return OrchestratorState{}, err } return out, nil }