commit 38707dd18206742ccebb3844eeb1147341020e06 Author: Codex Agent Date: Fri Oct 24 18:35:13 2025 +1100 Initial SWOOSH executor and reducer implementation diff --git a/SWOOSH_PLAN.md b/SWOOSH_PLAN.md new file mode 100644 index 0000000..4f5db66 --- /dev/null +++ b/SWOOSH_PLAN.md @@ -0,0 +1,424 @@ +Here’s your **Codex briefing document**, ready to hand over. +It defines the SWOOSH architecture, constraints, coding style, and behavioral contract in a single authoritative form. + +**Download:** [SWOOSH_Codex_Design_Brief.md](sandbox:/mnt/data/SWOOSH_Codex_Design_Brief.md) + +--- + +### πŸ“„ **SWOOSH_Codex_Design_Brief.md** + +````markdown +# SWOOSH β€” Deterministic Orchestrator (Codex Implementation Brief) + +> **Purpose:** +> Codex is to implement the SWOOSH orchestrator exactly as described in this brief. +> SWOOSH replaces the WHOOSH orchestration module with a deterministic state-machine core built in **Go**, designed for **predictable, audit-ready, chaos-resilient orchestration**. + +--- + +## 1. Overview + +SWOOSH governs: +- Project ingestion (DISCOVER β†’ READY) +- Council formation (PLAN_ROLES β†’ READY) +- Environment provisioning (ALLOCATE β†’ READY) +- Execution (PLAN β†’ WORK β†’ REVIEW β†’ REVERB) + +It guarantees: +- **Deterministic state transitions** (pure reducer pattern) +- **Idempotent application** of transitions +- **Write-ahead logging (WAL)** with append-only semantics +- **Periodic snapshots** and deterministic replay +- **Hybrid Logical Clock (HLC)** ordering across replicas +- **Immutable audit evidence** (UCXL + BUBBLE Decision Records) + +--- + +## 2. Language and Runtime + +- **Language:** Go 1.22+ +- **Dependencies:** stdlib only, except for BadgerDB (WAL store) +- **Concurrency:** single-threaded state executor goroutine; NO mutexes on state +- **Architecture:** pure reducer + guard providers + WAL + snapshot +- **All state mutation must occur within the executor loop** + +--- + +## 3. Core Contracts + +Codex must **not invent or modify** these structs, fields, or method signatures. + +### 3.1 OrchestratorState + +```go +type OrchestratorState struct { + Meta struct { + Version string + SchemaHash string + } + + Boot struct { + Licensed bool + LicenseExpiry time.Time + NodeID string + } + + Ingestion struct { + Phase string // DISCOVER|FETCH|VALIDATE|INDEX|READY + ContentHash string + SourceSet []string + LastError string + Epoch uint64 + } + + Council struct { + Phase string // PLAN_ROLES|ELECT|TOOLING_SYNC|READY + PlannedRoles []string + Members []CouncilMember + QuorumCertHash string + MCPHealthGreen bool + Epoch uint64 + } + + Environment struct { + Phase string // ALLOCATE|PROVISION|HEALTHCHECK|READY|DEGRADED + CapacityOK bool + Health string // green|amber + Resources []EnvResource + Epoch uint64 + } + + Execution struct { + Phase string // PLAN|WORK|REVIEW|REVERB + ActiveWindowID string + BeatIndex uint64 + PlanLocked bool + Approvals uint32 + Epoch uint64 + } + + Control struct { + Paused bool + Degraded bool + Recovering bool + } + + Policy struct { + Quarantined bool + Rationale string + } + + HLCLast string // last applied hlc + StateHash string // sha256 of canonical serialization +} +```` + +--- + +### 3.2 TransitionProposal + +```go +type TransitionProposal struct { + CurrentStateHash string `json:"current_state_hash"` + TransitionName string `json:"transition"` + InputsHash string `json:"inputs_hash"` + Signer string `json:"signer"` + IdemKey string `json:"idem_key"` + HLC string `json:"hlc"` + WindowID string `json:"window_id"` + Evidence []string `json:"evidence"` +} +``` + +--- + +### 3.3 GuardOutcome + +```go +type GuardOutcome struct { + LicenseOK bool + BackbeatOK bool + QuorumOK bool + PolicyOK bool + MCPHealthy bool + Rationale []string +} +``` + +Guards are computed **outside** the reducer. Reducer consumes them as verified input. + +--- + +### 3.4 WALRecord + +```go +type WALRecord struct { + StatePreHash string `json:"state_pre_hash"` + StatePostHash string `json:"state_post_hash"` + Transition TransitionProposal `json:"transition"` + Guard GuardOutcome `json:"guard"` + AppliedAtHLC string `json:"applied_hlc"` + AppliedAtUnixNs int64 `json:"applied_unix_ns"` + Index uint64 `json:"index"` +} +``` + +--- + +## 4. Reducer Contract + +```go +// Reduce applies a validated transition to the given state and returns a new state. +// It MUST be deterministic, pure, and side-effect free. +// It MUST NOT perform I/O, logging, or time reads. +func Reduce(oldState OrchestratorState, t TransitionProposal, g GuardOutcome) (OrchestratorState, error) +``` + +* Reducer logic implemented as a **switch on TransitionName**. +* Guard outcomes are already evaluated. +* Each case: + + * Updates state fields + * Increments relevant Epoch + * Computes new StateHash +* Returns new state, no side-effects. + +--- + +## 5. Executor Model + +Codex must implement an **Executor** struct that: + +* Runs as a single goroutine +* Serializes all writes to state +* Exposes a safe interface for transition submission + +```go +type Executor struct { + state OrchestratorState + wal WALStore + snapshot SnapshotStore + applyCh chan TransitionProposal + resultsCh chan ApplyResult +} + +type ApplyResult struct { + Success bool + Error error + NewState OrchestratorState + GuardInfo GuardOutcome +} +``` + +### Rules + +* No other goroutine may mutate `state`. +* All writes funnel through `applyCh`. +* Each accepted transition: + + 1. Validate β†’ Guard β†’ Reduce + 2. Append to WAL (fsync) + 3. Update canonical state + 4. Snapshot periodically + +--- + +## 6. WAL Layer + +Codex must implement: + +```go +type WALStore interface { + Append(record WALRecord) error + Replay(fromIndex uint64) ([]WALRecord, error) + Sync() error + LastIndex() uint64 +} +``` + +Implementation details: + +* Backed by BadgerDB or flat append-only file. +* Must fsync after Append or Sync(). +* Replay must yield records in index order. +* No concurrent writers (single executor owns WAL). + +--- + +## 7. Snapshot Layer + +```go +type Snapshot struct { + State OrchestratorState + LastAppliedHLC string + LastAppliedIndex uint64 +} + +type SnapshotStore interface { + Save(s Snapshot) error + LoadLatest() (Snapshot, error) +} +``` + +* Writes as atomic replace (temp β†’ fsync β†’ rename). +* Snapshot frequency configurable (every N transitions or REVERB phase). + +--- + +## 8. Guard Providers + +Codex must define interfaces only (not implementations): + +```go +type GuardProvider interface { + Evaluate(t TransitionProposal, s OrchestratorState) (GuardOutcome, error) +} +``` + +Specific guard providers (implemented later): + +* KACHINGGuard +* BACKBEATGuard +* HMMMQuorumGuard +* SHHHPolicyGuard +* MCPHealthGuard + +--- + +## 9. API Layer + +Codex will expose a minimal HTTP API: + +| Method | Path | Description | +| ------ | ------------- | ------------------------------------------------------- | +| POST | `/transition` | Submit a TransitionProposal | +| GET | `/state` | Return current OrchestratorState (optionally projected) | +| GET | `/health` | Returns readiness, WAL lag, last snapshot info | + +Rules: + +* Transition proposals must include `idem_key`, `hlc`, `window_id`. +* Transition acceptance is serialized by executor. +* Response must include resulting StateHash or failure reason. + +--- + +## 10. Determinism Requirements + +* Reducer must be **referentially transparent**. +* No access to: + + * System time + * Random generators + * Environment variables + * Network sockets +* Hashing, sorting, or serialization must use deterministic algorithms. +* WAL replay must yield identical `StatePostHash` as original execution. + +--- + +## 11. Concurrency Rules + +* Only the **executor goroutine** writes to state. +* All other goroutines are read-only or handle I/O. +* Mutexes are forbidden on state objects. +* Communication via typed channels only. + +--- + +## 12. Logging and Metrics + +Codex may stub structured logging, but **no I/O inside reducer**. + +Metrics hooks (later implemented): + +* `state_advance_latency` +* `retry_count` +* `quarantine_rate` +* `recovery_time` +* `pending_transitions` + +--- + +## 13. Testing Hooks + +Codex must include: + +* In-memory WAL and snapshot mocks for property tests. +* `Replay()` function that: + + * Loads snapshot + WAL + * Applies transitions deterministically + * Returns resulting `StateHash` +* Determinism test: assert identical hashes across runs. + +--- + +## 14. Generation Scope + +Codex is to generate **Go code only**, adhering to this spec. +Modules to generate (one per iteration): + +1. `state.go` β€” definitions from Β§3 +2. `reducer.go` β€” function skeleton for `Reduce()` +3. `wal.go` β€” WALStore interface + Badger impl +4. `snapshot.go` β€” SnapshotStore interface + atomic file impl +5. `executor.go` β€” single-goroutine executor +6. `api.go` β€” HTTP handlers for `/transition` and `/state` + +--- + +## 15. Non-Goals + +* No database migrations +* No gRPC +* No background workers beyond executor +* No dynamic schema changes +* No plugins or reflection + +--- + +## 16. Style & Discipline + +* Idiomatic Go formatting and error handling. +* Deterministic serialization (canonical JSON or sorted keys). +* No use of global variables or shared mutable state. +* Reducer logic must be explicit and exhaustive β€” no default fallthrough. + +--- + +## 17. Example Flow (Codex reference) + +1. Client submits `TransitionProposal` (e.g. `"ELECT_QUORUM_CERT"`). +2. API enqueues proposal to executor’s `applyCh`. +3. Executor calls `GuardProvider.Evaluate()`. +4. Guards OK β†’ `Reduce(oldState, proposal, guard)` β†’ new state. +5. Append WALRecord. +6. Update state. +7. Broadcast new snapshot hash to metrics/log. + +--- + +## 18. Acceptance Criteria + +Codex output is accepted when: + +* All interfaces and structs match this doc exactly. +* Code compiles with `go build ./...`. +* Reducer is pure, single-threaded, and deterministic. +* WAL replay reproduces identical state. +* HTTP API handles at least POST `/transition` and GET `/state`. + +--- + +## 19. Keywords + +`deterministic`, `state-machine`, `pure reducer`, `single executor`, `idempotent`, `WAL`, `snapshot`, `HLC`, `auditability`, `no concurrency races`. + +--- + +## 20. End of Brief + +> Codex, implement the SWOOSH orchestrator **exactly as specified**, +> using Go and following the constraints in this document. +> Do not add dependencies, types, or concurrency patterns not explicitly described here. diff --git a/determinism_test.go b/determinism_test.go new file mode 100644 index 0000000..3e6089e --- /dev/null +++ b/determinism_test.go @@ -0,0 +1,152 @@ +package swoosh + +import ( + "testing" + "time" +) + +type staticGuardProvider struct { + outcomes map[string]GuardOutcome +} + +func newStaticGuardProvider() *staticGuardProvider { + return &staticGuardProvider{outcomes: make(map[string]GuardOutcome)} +} + +func (g *staticGuardProvider) setOutcome(name string, outcome GuardOutcome) { + g.outcomes[name] = outcome +} + +func (g *staticGuardProvider) Evaluate(t TransitionProposal, s OrchestratorState) (GuardOutcome, error) { + if outcome, ok := g.outcomes[t.TransitionName]; ok { + return outcome, nil + } + return GuardOutcome{ + LicenseOK: true, + BackbeatOK: true, + QuorumOK: true, + PolicyOK: true, + MCPHealthy: true, + }, nil +} + +func newTestExecutor(t *testing.T, guard GuardProvider, wal WALStore, snap SnapshotStore) *Executor { + t.Helper() + initial := Snapshot{} + exec := NewExecutor(wal, snap, guard, initial) + return exec +} + +func waitResult(t *testing.T, ch <-chan ApplyResult) ApplyResult { + t.Helper() + select { + case res := <-ch: + return res + case <-time.After(2 * time.Second): + t.Fatalf("timed out waiting for apply result") + return ApplyResult{} + } +} + +func TestDeterministicReplay(t *testing.T) { + walStore := &InMemoryWAL{} + snapStore := &InMemorySnapshotStore{} + guard := newStaticGuardProvider() + guard.setOutcome("TRANSITION_QUARANTINE_ENTER", GuardOutcome{PolicyOK: false}) + + exec := newTestExecutor(t, guard, walStore, snapStore) + + transitions := []TransitionProposal{ + {TransitionName: "LICENSE_GRANTED", HLC: "1"}, + {TransitionName: "INGESTION_SOURCES_RESOLVED", HLC: "2"}, + {TransitionName: "INGESTION_BYTES_OK", HLC: "3"}, + {TransitionName: "INGESTION_SCHEMA_OK", HLC: "4"}, + {TransitionName: "INGESTION_CORPUS_BUILT", HLC: "5"}, + {TransitionName: "COUNCIL_PROFILES_LOADED", HLC: "6"}, + {TransitionName: "COUNCIL_QUORUM_CERT", HLC: "7"}, + {TransitionName: "COUNCIL_MCP_GREEN", HLC: "8"}, + {TransitionName: "ENV_CAPACITY_OK", HLC: "9"}, + {TransitionName: "ENV_INSTALLED", HLC: "10"}, + {TransitionName: "ENV_HEALTH_GREEN", HLC: "11"}, + {TransitionName: "EXEC_PLAN_LOCKED", HLC: "12", WindowID: "window-1"}, + {TransitionName: "EXEC_BEAT_REVIEW_GATE", HLC: "13", WindowID: "window-1"}, + {TransitionName: "EXEC_APPROVALS_THRESHOLD", HLC: "14", WindowID: "window-1"}, + {TransitionName: "EXEC_NEXT_WINDOW", HLC: "15", WindowID: "window-2"}, + } + + var finalState OrchestratorState + for _, proposal := range transitions { + ch, err := exec.SubmitTransition(proposal) + if err != nil { + t.Fatalf("submit transition %s: %v", proposal.TransitionName, err) + } + res := waitResult(t, ch) + if !res.Success || res.Error != nil { + t.Fatalf("transition %s failed: success=%t error=%v", proposal.TransitionName, res.Success, res.Error) + } + finalState = res.NewState + } + + finalHash := finalState.StateHash + if finalHash == "" { + t.Fatal("final state hash empty") + } + + snapshot := Snapshot{ + State: finalState, + LastAppliedHLC: finalState.HLCLast, + LastAppliedIndex: walStore.LastIndex(), + } + if err := snapStore.Save(snapshot); err != nil { + t.Fatalf("save snapshot: %v", err) + } + + replayState, err := Replay(walStore, snapshot) + if err != nil { + t.Fatalf("replay failed: %v", err) + } + + if replayState.StateHash != finalHash { + t.Fatalf("state hash mismatch after replay: got %s want %s", replayState.StateHash, finalHash) + } +} + +func TestQuarantineEnforced(t *testing.T) { + walStore := &InMemoryWAL{} + snapStore := &InMemorySnapshotStore{} + guard := newStaticGuardProvider() + guard.setOutcome("TRANSITION_QUARANTINE_ENTER", GuardOutcome{PolicyOK: false}) + + exec := newTestExecutor(t, guard, walStore, snapStore) + + enterProposal := TransitionProposal{TransitionName: "TRANSITION_QUARANTINE_ENTER", HLC: "1"} + ch, err := exec.SubmitTransition(enterProposal) + if err != nil { + t.Fatalf("submit quarantine enter: %v", err) + } + res := waitResult(t, ch) + if !res.Success || res.Error != nil { + t.Fatalf("quarantine enter failed: %v", res.Error) + } + + quarantinedState := res.NewState + if !quarantinedState.Policy.Quarantined { + t.Fatal("state not quarantined after enter transition") + } + hashBefore := quarantinedState.StateHash + + illegalProposal := TransitionProposal{TransitionName: "EXEC_BEAT_REVIEW_GATE", HLC: "2", WindowID: "window-1"} + ch, err = exec.SubmitTransition(illegalProposal) + if err != nil { + t.Fatalf("submit illegal transition: %v", err) + } + res = waitResult(t, ch) + if res.Success || res.Error == nil { + t.Fatalf("expected failure for illegal transition in quarantine, got success=%t error=%v", res.Success, res.Error) + } + + stateAfter := exec.GetStateSnapshot() + if stateAfter.StateHash != hashBefore { + t.Fatalf("state hash changed after illegal transition: got %s want %s", stateAfter.StateHash, hashBefore) + } +} diff --git a/executor.go b/executor.go new file mode 100644 index 0000000..90664b6 --- /dev/null +++ b/executor.go @@ -0,0 +1,248 @@ +package swoosh + +import ( + "encoding/json" + "fmt" + "time" +) + +const defaultSnapshotInterval uint64 = 32 + +// GuardProvider evaluates transition guards prior to reducer execution. +type GuardProvider interface { + Evaluate(t TransitionProposal, s OrchestratorState) (GuardOutcome, error) +} + +// ApplyResult communicates the outcome of an executor-applied transition. +type ApplyResult struct { + Success bool + Error error + NewState OrchestratorState + GuardInfo GuardOutcome +} + +type applyRequest struct { + proposal TransitionProposal + response chan<- ApplyResult +} + +type stateRequest struct { + response chan<- OrchestratorState +} + +// Executor serializes state mutations for the orchestrator. +type Executor struct { + applyCh chan applyRequest + stateReqCh chan stateRequest + wal WALStore + snapshotStore SnapshotStore + guard GuardProvider + state OrchestratorState + snapshotEvery uint64 + appliedSinceSn uint64 + lastIndex uint64 +} + +// NewExecutor constructs and starts an Executor using the provided collaborators. +func NewExecutor(wal WALStore, snap SnapshotStore, guard GuardProvider, initial Snapshot) *Executor { + state := initial.State + if cloned, err := cloneState(state); err == nil { + state = cloned + } + if state.StateHash == "" { + if hash, err := computeStateHash(state); err == nil { + state.StateHash = hash + } + } + + lastIdx := initial.LastAppliedIndex + if walIdx := wal.LastIndex(); walIdx > lastIdx { + lastIdx = walIdx + } + + exec := &Executor{ + applyCh: make(chan applyRequest), + stateReqCh: make(chan stateRequest), + wal: wal, + snapshotStore: snap, + guard: guard, + state: state, + snapshotEvery: defaultSnapshotInterval, + appliedSinceSn: 0, + lastIndex: lastIdx, + } + + go exec.loop() + return exec +} + +// SubmitTransition enqueues a TransitionProposal and returns a channel for its result. +func (e *Executor) SubmitTransition(p TransitionProposal) (<-chan ApplyResult, error) { + if p.TransitionName == "" { + return nil, fmt.Errorf("%w: %s", ErrUnknownTransition, "empty transition name") + } + + resCh := make(chan ApplyResult, 1) + req := applyRequest{proposal: p, response: resCh} + e.applyCh <- req + return resCh, nil +} + +// GetStateSnapshot returns a deep copy of the current orchestrator state. +func (e *Executor) GetStateSnapshot() OrchestratorState { + respCh := make(chan OrchestratorState, 1) + e.stateReqCh <- stateRequest{response: respCh} + snapshot := <-respCh + return snapshot +} + +func (e *Executor) loop() { + for { + select { + case req := <-e.applyCh: + e.handleApply(req) + case req := <-e.stateReqCh: + clone, err := cloneState(e.state) + if err != nil { + clone = e.state + } + req.response <- clone + } + } +} + +func (e *Executor) handleApply(req applyRequest) { + proposal := req.proposal + result := ApplyResult{} + + if e.state.Policy.Quarantined && proposal.TransitionName != "TRANSITION_QUARANTINE_RELEASE" && proposal.TransitionName != "TRANSITION_QUARANTINE_CONFIRM_BLOCK" { + result.Error = fmt.Errorf("%w: %s", ErrQuarantined, proposal.TransitionName) + req.response <- result + return + } + + guardOutcome, err := e.evaluateGuard(proposal) + if err != nil { + result.Error = fmt.Errorf("guard evaluation: %w", err) + req.response <- result + return + } + + newState, err := Reduce(e.state, proposal, guardOutcome) + if err != nil { + result.Error = fmt.Errorf("reduce: %w", err) + result.GuardInfo = guardOutcome + req.response <- result + return + } + + if newState.StateHash == e.state.StateHash { + result.Success = true + result.NewState = e.cloneForResult(newState) + result.GuardInfo = guardOutcome + req.response <- result + return + } + + walRecord := WALRecord{ + StatePreHash: e.state.StateHash, + StatePostHash: newState.StateHash, + Transition: proposal, + Guard: guardOutcome, + AppliedAtHLC: proposal.HLC, + AppliedAtUnixNs: time.Now().UnixNano(), + Index: e.nextIndex(), + } + + if err := e.appendAndSync(walRecord); err != nil { + result.Error = fmt.Errorf("wal append: %w", err) + result.GuardInfo = guardOutcome + req.response <- result + return + } + + e.state = newState + e.lastIndex = walRecord.Index + e.appliedSinceSn++ + + if e.shouldSnapshot() { + snap := Snapshot{ + State: e.state, + LastAppliedHLC: e.state.HLCLast, + LastAppliedIndex: e.lastIndex, + } + if err := e.saveSnapshot(snap); err != nil { + result.Error = fmt.Errorf("snapshot save: %w", err) + result.GuardInfo = guardOutcome + req.response <- result + return + } + e.appliedSinceSn = 0 + } + + result.Success = true + result.NewState = e.cloneForResult(e.state) + result.GuardInfo = guardOutcome + req.response <- result +} + +func (e *Executor) evaluateGuard(p TransitionProposal) (GuardOutcome, error) { + if e.guard == nil { + return GuardOutcome{ + LicenseOK: true, + BackbeatOK: true, + QuorumOK: true, + PolicyOK: true, + MCPHealthy: true, + }, nil + } + return e.guard.Evaluate(p, e.state) +} + +func (e *Executor) appendAndSync(record WALRecord) error { + if err := e.wal.Append(record); err != nil { + return err + } + if err := e.wal.Sync(); err != nil { + return err + } + return nil +} + +func (e *Executor) saveSnapshot(s Snapshot) error { + if e.snapshotStore == nil { + return nil + } + return e.snapshotStore.Save(s) +} + +func (e *Executor) shouldSnapshot() bool { + return e.snapshotEvery > 0 && e.appliedSinceSn >= e.snapshotEvery && e.snapshotStore != nil +} + +func (e *Executor) nextIndex() uint64 { + if e.lastIndex == 0 { + return 1 + } + return e.lastIndex + 1 +} + +func (e *Executor) cloneForResult(state OrchestratorState) OrchestratorState { + cloned, err := cloneState(state) + if err != nil { + return state + } + return cloned +} + +func cloneState(state OrchestratorState) (OrchestratorState, error) { + payload, err := json.Marshal(state) + if err != nil { + return OrchestratorState{}, err + } + var out OrchestratorState + if err := json.Unmarshal(payload, &out); err != nil { + return OrchestratorState{}, err + } + return out, nil +} diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..5af1f02 --- /dev/null +++ b/go.mod @@ -0,0 +1,3 @@ +module swoosh + +go 1.22 diff --git a/reducer.go b/reducer.go new file mode 100644 index 0000000..dc109bb --- /dev/null +++ b/reducer.go @@ -0,0 +1,633 @@ +package swoosh + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "errors" + "fmt" + "strings" +) + +var ( + // ErrGuardRejected is returned when guard outcomes prevent a transition from applying. + ErrGuardRejected = errors.New("guard rejected transition") + // ErrUnknownTransition is returned when Reduce encounters an unhandled transition name. + ErrUnknownTransition = errors.New("unknown transition") + // ErrQuarantined indicates transitions are blocked while policy quarantine is active. + ErrQuarantined = errors.New("transition blocked by quarantine") + // ErrInvalidPhase signals an unexpected phase for the requested transition. + ErrInvalidPhase = errors.New("invalid phase for transition") + // ErrPrecondition signals a precondition failure unrelated to phases or guards. + ErrPrecondition = errors.New("precondition failed") +) + +// Reduce applies a validated transition to the given state and returns a new state. +// Reducer must remain deterministic and side-effect free. +func Reduce(oldState OrchestratorState, t TransitionProposal, g GuardOutcome) (OrchestratorState, error) { + newState := oldState + changed, err := applyTransition(&newState, t, g) + if err != nil { + return OrchestratorState{}, err + } + if !changed { + return oldState, nil + } + + newState.HLCLast = t.HLC + hash, err := computeStateHash(newState) + if err != nil { + return OrchestratorState{}, fmt.Errorf("compute state hash: %w", err) + } + newState.StateHash = hash + + return newState, nil +} + +func applyTransition(state *OrchestratorState, t TransitionProposal, g GuardOutcome) (bool, error) { + name := t.TransitionName + switch name { + case "": + return false, fmt.Errorf("%w: empty transition name", ErrUnknownTransition) + } + + if state.Policy.Quarantined && name != "TRANSITION_QUARANTINE_RELEASE" && name != "TRANSITION_QUARANTINE_CONFIRM_BLOCK" { + return false, fmt.Errorf("%w: %s", ErrQuarantined, name) + } + + switch name { + case "BOOT_CONFIG_OK": + if state.Boot.Licensed { + return false, fmt.Errorf("%w: boot already licensed", ErrPrecondition) + } + return false, nil + + case "LICENSE_GRANTED": + if state.Boot.Licensed { + return false, nil + } + if !g.LicenseOK { + return false, fmt.Errorf("%w: license check failed", ErrGuardRejected) + } + state.Boot.Licensed = true + return true, nil + + case "INGESTION_SOURCES_RESOLVED": + if state.Ingestion.Phase == "FETCH" && state.Ingestion.LastError == "" { + return false, nil + } + if state.Ingestion.Phase != "" && state.Ingestion.Phase != "DISCOVER" { + return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase) + } + changed := false + if state.Ingestion.Phase != "FETCH" { + state.Ingestion.Phase = "FETCH" + changed = true + } + if state.Ingestion.LastError != "" { + state.Ingestion.LastError = "" + changed = true + } + if changed { + state.Ingestion.Epoch++ + } + return changed, nil + + case "INGESTION_BYTES_OK": + if state.Ingestion.Phase == "VALIDATE" && state.Ingestion.LastError == "" { + return false, nil + } + if state.Ingestion.Phase != "FETCH" { + return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase) + } + changed := false + if state.Ingestion.Phase != "VALIDATE" { + state.Ingestion.Phase = "VALIDATE" + changed = true + } + if state.Ingestion.LastError != "" { + state.Ingestion.LastError = "" + changed = true + } + if changed { + state.Ingestion.Epoch++ + } + return changed, nil + + case "INGESTION_SCHEMA_OK": + if state.Ingestion.Phase == "INDEX" && state.Ingestion.LastError == "" { + return false, nil + } + if state.Ingestion.Phase != "VALIDATE" { + return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase) + } + if !g.PolicyOK || !g.BackbeatOK { + return false, fmt.Errorf("%w: policy=%t backbeat=%t", ErrGuardRejected, g.PolicyOK, g.BackbeatOK) + } + changed := false + if state.Ingestion.Phase != "INDEX" { + state.Ingestion.Phase = "INDEX" + changed = true + } + if state.Ingestion.LastError != "" { + state.Ingestion.LastError = "" + changed = true + } + if changed { + state.Ingestion.Epoch++ + } + return changed, nil + + case "INGESTION_CORPUS_BUILT": + if state.Ingestion.Phase == "READY" && state.Ingestion.LastError == "" { + return false, nil + } + if state.Ingestion.Phase != "INDEX" { + return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase) + } + changed := false + if state.Ingestion.Phase != "READY" { + state.Ingestion.Phase = "READY" + changed = true + } + if state.Ingestion.LastError != "" { + state.Ingestion.LastError = "" + changed = true + } + if changed { + state.Ingestion.Epoch++ + } + return changed, nil + + case "INGESTION_POLICY_VIOLATION": + if g.PolicyOK { + return false, fmt.Errorf("%w: policy guard true", ErrGuardRejected) + } + if state.Ingestion.Phase != "VALIDATE" && state.Ingestion.Phase != "INDEX" { + return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase) + } + rationale := joinRationale(g.Rationale) + changed := false + if !state.Policy.Quarantined { + state.Policy.Quarantined = true + changed = true + } + if state.Policy.Rationale != rationale { + state.Policy.Rationale = rationale + changed = true + } + if !state.Control.Degraded { + state.Control.Degraded = true + changed = true + } + if changed { + state.Ingestion.Epoch++ + } + return changed, nil + + case "COUNCIL_PROFILES_LOADED": + if state.Council.Phase == "ELECT" { + return false, nil + } + if state.Council.Phase != "" && state.Council.Phase != "PLAN_ROLES" { + return false, fmt.Errorf("%w: council phase %q", ErrInvalidPhase, state.Council.Phase) + } + state.Council.Phase = "ELECT" + state.Council.Epoch++ + return true, nil + + case "COUNCIL_QUORUM_CERT": + if state.Council.Phase == "TOOLING_SYNC" { + return false, nil + } + if state.Council.Phase != "ELECT" { + return false, fmt.Errorf("%w: council phase %q", ErrInvalidPhase, state.Council.Phase) + } + if !g.QuorumOK || !g.PolicyOK { + return false, fmt.Errorf("%w: quorum=%t policy=%t", ErrGuardRejected, g.QuorumOK, g.PolicyOK) + } + state.Council.Phase = "TOOLING_SYNC" + state.Council.Epoch++ + return true, nil + + case "COUNCIL_MCP_GREEN": + if state.Council.Phase == "READY" && state.Council.MCPHealthGreen { + return false, nil + } + if state.Council.Phase != "TOOLING_SYNC" { + return false, fmt.Errorf("%w: council phase %q", ErrInvalidPhase, state.Council.Phase) + } + if !g.MCPHealthy { + return false, fmt.Errorf("%w: mcp unhealthy", ErrGuardRejected) + } + changed := false + if state.Council.Phase != "READY" { + state.Council.Phase = "READY" + changed = true + } + if !state.Council.MCPHealthGreen { + state.Council.MCPHealthGreen = true + changed = true + } + if changed { + state.Council.Epoch++ + } + return changed, nil + + case "ENV_CAPACITY_OK": + if state.Environment.Phase == "PROVISION" && state.Environment.CapacityOK && state.Environment.Health == "" { + return false, nil + } + if state.Environment.Phase != "" && state.Environment.Phase != "ALLOCATE" { + return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase) + } + if !g.BackbeatOK { + return false, fmt.Errorf("%w: backbeat guard false", ErrGuardRejected) + } + changed := false + if state.Environment.Phase != "PROVISION" { + state.Environment.Phase = "PROVISION" + changed = true + } + if !state.Environment.CapacityOK { + state.Environment.CapacityOK = true + changed = true + } + if state.Environment.Health != "" { + state.Environment.Health = "" + changed = true + } + if changed { + state.Environment.Epoch++ + } + return changed, nil + + case "ENV_INSTALLED": + if state.Environment.Phase == "HEALTHCHECK" { + return false, nil + } + if state.Environment.Phase != "PROVISION" { + return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase) + } + state.Environment.Phase = "HEALTHCHECK" + state.Environment.Epoch++ + return true, nil + + case "ENV_HEALTH_GREEN": + if state.Environment.Phase == "READY" && state.Environment.Health == "green" && !state.Control.Degraded { + return false, nil + } + if state.Environment.Phase != "HEALTHCHECK" { + return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase) + } + changed := false + if state.Environment.Phase != "READY" { + state.Environment.Phase = "READY" + changed = true + } + if state.Environment.Health != "green" { + state.Environment.Health = "green" + changed = true + } + if state.Control.Degraded { + state.Control.Degraded = false + changed = true + } + if changed { + state.Environment.Epoch++ + } + return changed, nil + + case "ENV_HEALTH_AMBER": + if state.Environment.Phase == "DEGRADED" && state.Environment.Health == "amber" && state.Control.Degraded { + return false, nil + } + if state.Environment.Phase != "HEALTHCHECK" { + return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase) + } + changed := false + if state.Environment.Phase != "DEGRADED" { + state.Environment.Phase = "DEGRADED" + changed = true + } + if state.Environment.Health != "amber" { + state.Environment.Health = "amber" + changed = true + } + if !state.Control.Degraded { + state.Control.Degraded = true + changed = true + } + if changed { + state.Environment.Epoch++ + } + return changed, nil + + case "ENV_RECOVERED_GREEN": + if state.Environment.Phase == "READY" && state.Environment.Health == "green" && !state.Control.Degraded { + return false, nil + } + if state.Environment.Phase != "DEGRADED" { + return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase) + } + if !g.MCPHealthy { + return false, fmt.Errorf("%w: mcp unhealthy", ErrGuardRejected) + } + changed := false + if state.Environment.Phase != "READY" { + state.Environment.Phase = "READY" + changed = true + } + if state.Environment.Health != "green" { + state.Environment.Health = "green" + changed = true + } + if state.Control.Degraded { + state.Control.Degraded = false + changed = true + } + if changed { + state.Environment.Epoch++ + } + return changed, nil + + case "EXEC_PLAN_LOCKED": + if state.Execution.Phase == "WORK" && state.Execution.PlanLocked && state.Execution.ActiveWindowID == t.WindowID { + return false, nil + } + if state.Execution.Phase != "" && state.Execution.Phase != "PLAN" { + return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase) + } + if !g.BackbeatOK { + return false, fmt.Errorf("%w: backbeat guard false", ErrGuardRejected) + } + if state.Ingestion.Phase != "READY" || state.Council.Phase != "READY" || state.Environment.Phase != "READY" { + return false, fmt.Errorf("%w: prerequisites not ready", ErrPrecondition) + } + changed := false + if state.Execution.Phase != "WORK" { + state.Execution.Phase = "WORK" + changed = true + } + if !state.Execution.PlanLocked { + state.Execution.PlanLocked = true + changed = true + } + if state.Execution.ActiveWindowID != t.WindowID { + state.Execution.ActiveWindowID = t.WindowID + changed = true + } + if changed { + state.Execution.Epoch++ + } + return changed, nil + + case "EXEC_BEAT_REVIEW_GATE": + if state.Execution.Phase == "REVIEW" { + return false, nil + } + if state.Execution.Phase != "WORK" { + return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase) + } + if !g.BackbeatOK { + return false, fmt.Errorf("%w: backbeat guard false", ErrGuardRejected) + } + state.Execution.Phase = "REVIEW" + state.Execution.PlanLocked = true + state.Execution.Epoch++ + return true, nil + + case "EXEC_APPROVALS_THRESHOLD": + if state.Execution.Phase == "REVERB" { + return false, nil + } + if state.Execution.Phase != "REVIEW" { + return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase) + } + if !g.QuorumOK || !g.PolicyOK { + return false, fmt.Errorf("%w: quorum=%t policy=%t", ErrGuardRejected, g.QuorumOK, g.PolicyOK) + } + state.Execution.Phase = "REVERB" + state.Execution.Epoch++ + return true, nil + + case "EXEC_CHANGES_REQUESTED": + if state.Execution.Phase == "WORK" { + return false, nil + } + if state.Execution.Phase != "REVIEW" { + return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase) + } + if !g.PolicyOK { + return false, fmt.Errorf("%w: policy guard false", ErrGuardRejected) + } + state.Execution.Phase = "WORK" + state.Execution.PlanLocked = true + state.Execution.Epoch++ + return true, nil + + case "EXEC_NEXT_WINDOW": + if state.Execution.Phase == "PLAN" && !state.Execution.PlanLocked && state.Execution.ActiveWindowID == t.WindowID && state.Execution.Approvals == 0 { + return false, nil + } + if state.Execution.Phase != "REVERB" { + return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase) + } + if !g.BackbeatOK { + return false, fmt.Errorf("%w: backbeat guard false", ErrGuardRejected) + } + changed := false + if state.Execution.Phase != "PLAN" { + state.Execution.Phase = "PLAN" + changed = true + } + if state.Execution.PlanLocked { + state.Execution.PlanLocked = false + changed = true + } + if state.Execution.ActiveWindowID != t.WindowID { + state.Execution.ActiveWindowID = t.WindowID + changed = true + } + if state.Execution.Approvals != 0 { + state.Execution.Approvals = 0 + changed = true + } + if changed { + state.Execution.Epoch++ + } + return changed, nil + + case "CONTROL_PAUSE": + needPause := !state.Control.Paused + needRecoverReset := state.Control.Recovering + if !needPause && !needRecoverReset { + return false, nil + } + if !g.PolicyOK { + return false, fmt.Errorf("%w: policy guard false", ErrGuardRejected) + } + if needPause { + state.Control.Paused = true + } + if needRecoverReset { + state.Control.Recovering = false + } + return true, nil + + case "CONTROL_RESUME": + if !state.Control.Paused { + return false, fmt.Errorf("%w: system not paused", ErrPrecondition) + } + if !g.PolicyOK { + return false, fmt.Errorf("%w: policy guard false", ErrGuardRejected) + } + changed := false + if state.Control.Paused { + state.Control.Paused = false + changed = true + } + if state.Control.Recovering { + state.Control.Recovering = false + changed = true + } + return changed, nil + + case "CONTROL_RECOVERY_ENTER": + if state.Control.Recovering && state.Control.Degraded { + return false, nil + } + if g.QuorumOK && !state.Control.Degraded { + return false, fmt.Errorf("%w: no recovery trigger", ErrGuardRejected) + } + changed := false + if !state.Control.Recovering { + state.Control.Recovering = true + changed = true + } + if !state.Control.Degraded { + state.Control.Degraded = true + changed = true + } + return changed, nil + + case "CONTROL_RECOVERY_RESOLVED": + if !state.Control.Recovering { + return false, fmt.Errorf("%w: not in recovery", ErrPrecondition) + } + if !g.QuorumOK && state.Environment.Health != "green" { + return false, fmt.Errorf("%w: recovery not satisfied", ErrGuardRejected) + } + changed := false + if state.Control.Recovering { + state.Control.Recovering = false + changed = true + } + if state.Control.Degraded { + state.Control.Degraded = false + changed = true + } + return changed, nil + + case "TRANSITION_QUARANTINE_ENTER": + if g.PolicyOK { + return false, fmt.Errorf("%w: policy guard true", ErrGuardRejected) + } + rationale := joinRationale(g.Rationale) + changed := false + if !state.Policy.Quarantined { + state.Policy.Quarantined = true + changed = true + } + if state.Policy.Rationale != rationale { + state.Policy.Rationale = rationale + changed = true + } + if !state.Control.Paused { + state.Control.Paused = true + changed = true + } + if state.Control.Recovering { + state.Control.Recovering = false + changed = true + } + if !state.Control.Degraded { + state.Control.Degraded = true + changed = true + } + return changed, nil + + case "TRANSITION_QUARANTINE_RELEASE": + if !state.Policy.Quarantined { + return false, fmt.Errorf("%w: not quarantined", ErrPrecondition) + } + if !g.PolicyOK { + return false, fmt.Errorf("%w: policy guard false", ErrGuardRejected) + } + changed := false + if state.Policy.Quarantined { + state.Policy.Quarantined = false + changed = true + } + if state.Control.Paused { + state.Control.Paused = false + changed = true + } + if state.Control.Degraded { + state.Control.Degraded = false + changed = true + } + if state.Control.Recovering { + state.Control.Recovering = false + changed = true + } + return changed, nil + + case "TRANSITION_QUARANTINE_CONFIRM_BLOCK": + if !state.Policy.Quarantined { + return false, fmt.Errorf("%w: not quarantined", ErrPrecondition) + } + changed := false + if !state.Control.Paused { + state.Control.Paused = true + changed = true + } + if !state.Control.Degraded { + state.Control.Degraded = true + changed = true + } + if state.Control.Recovering { + state.Control.Recovering = false + changed = true + } + return changed, nil + + default: + return false, fmt.Errorf("%w: %s", ErrUnknownTransition, name) + } +} + +func joinRationale(r []string) string { + if len(r) == 0 { + return "" + } + return strings.Join(r, "; ") +} + +func computeStateHash(state OrchestratorState) (string, error) { + payload, err := canonicalJSON(state) + if err != nil { + return "", err + } + + sum := sha256.Sum256(payload) + return hex.EncodeToString(sum[:]), nil +} + +func canonicalJSON(value any) ([]byte, error) { + buf, err := json.Marshal(value) + if err != nil { + return nil, err + } + return buf, nil +} diff --git a/replay.go b/replay.go new file mode 100644 index 0000000..6a59701 --- /dev/null +++ b/replay.go @@ -0,0 +1,45 @@ +package swoosh + +import "fmt" + +// Replay deterministically replays WAL records on top of a snapshot and returns the resulting state. +func Replay(wal WALStore, snapshot Snapshot) (OrchestratorState, error) { + base, err := cloneState(snapshot.State) + if err != nil { + return OrchestratorState{}, fmt.Errorf("clone snapshot state: %w", err) + } + + if base.StateHash == "" { + hash, err := computeStateHash(base) + if err != nil { + return OrchestratorState{}, fmt.Errorf("compute snapshot hash: %w", err) + } + base.StateHash = hash + } + + start := snapshot.LastAppliedIndex + 1 + records, err := wal.Replay(start) + if err != nil { + return OrchestratorState{}, fmt.Errorf("replay wal: %w", err) + } + + state := base + for _, record := range records { + if state.StateHash != record.StatePreHash { + return OrchestratorState{}, fmt.Errorf("wal pre-hash mismatch at index %d", record.Index) + } + + next, err := Reduce(state, record.Transition, record.Guard) + if err != nil { + return OrchestratorState{}, fmt.Errorf("reduce wal record %d: %w", record.Index, err) + } + + if next.StateHash != record.StatePostHash { + return OrchestratorState{}, fmt.Errorf("wal post-hash mismatch at index %d", record.Index) + } + + state = next + } + + return state, nil +} diff --git a/snapshot.go b/snapshot.go new file mode 100644 index 0000000..8466b39 --- /dev/null +++ b/snapshot.go @@ -0,0 +1,117 @@ +package swoosh + +import ( + "encoding/json" + "errors" + "fmt" + "os" + "path/filepath" +) + +// Snapshot captures a serialized view of the orchestrator state and replay cursor. +type Snapshot struct { + State OrchestratorState `json:"state"` + LastAppliedHLC string `json:"last_applied_hlc"` + LastAppliedIndex uint64 `json:"last_applied_index"` +} + +// SnapshotStore persists and loads orchestrator snapshots. +type SnapshotStore interface { + Save(s Snapshot) error + LoadLatest() (Snapshot, error) +} + +// ErrSnapshotNotFound indicates there is no stored snapshot. +var ErrSnapshotNotFound = errors.New("snapshot not found") + +// FileSnapshotStore stores snapshots using atomic file replacement. +type FileSnapshotStore struct { + path string +} + +// NewFileSnapshotStore creates a snapshot store at the supplied path. +func NewFileSnapshotStore(path string) *FileSnapshotStore { + return &FileSnapshotStore{path: path} +} + +// Save writes the snapshot with atomic replace semantics. +func (s *FileSnapshotStore) Save(snapshot Snapshot) error { + dir := filepath.Dir(s.path) + if err := os.MkdirAll(dir, 0o755); err != nil { + return fmt.Errorf("create snapshot directory: %w", err) + } + + payload, err := canonicalJSON(snapshot) + if err != nil { + return fmt.Errorf("marshal snapshot: %w", err) + } + + temp, err := os.CreateTemp(dir, "snapshot-*.tmp") + if err != nil { + return fmt.Errorf("create temp snapshot: %w", err) + } + tempName := temp.Name() + + if _, err := temp.Write(payload); err != nil { + temp.Close() + os.Remove(tempName) + return fmt.Errorf("write snapshot: %w", err) + } + + if err := temp.Sync(); err != nil { + temp.Close() + os.Remove(tempName) + return fmt.Errorf("sync snapshot: %w", err) + } + + if err := temp.Close(); err != nil { + os.Remove(tempName) + return fmt.Errorf("close snapshot temp file: %w", err) + } + + if err := os.Rename(tempName, s.path); err != nil { + os.Remove(tempName) + return fmt.Errorf("rename snapshot: %w", err) + } + + return nil +} + +// LoadLatest returns the persisted snapshot or ErrSnapshotNotFound if absent. +func (s *FileSnapshotStore) LoadLatest() (Snapshot, error) { + payload, err := os.ReadFile(s.path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return Snapshot{}, ErrSnapshotNotFound + } + return Snapshot{}, fmt.Errorf("read snapshot: %w", err) + } + + var snapshot Snapshot + if err := json.Unmarshal(payload, &snapshot); err != nil { + return Snapshot{}, fmt.Errorf("decode snapshot: %w", err) + } + + return snapshot, nil +} + +// InMemorySnapshotStore fulfills SnapshotStore in memory for testing. +type InMemorySnapshotStore struct { + snapshot Snapshot + has bool +} + +// Save stores the snapshot in memory. +func (s *InMemorySnapshotStore) Save(snapshot Snapshot) error { + s.snapshot = snapshot + s.has = true + return nil +} + +// LoadLatest retrieves the stored snapshot or ErrSnapshotNotFound. +func (s *InMemorySnapshotStore) LoadLatest() (Snapshot, error) { + if !s.has { + return Snapshot{}, ErrSnapshotNotFound + } + return s.snapshot, nil +} diff --git a/state.go b/state.go new file mode 100644 index 0000000..7813254 --- /dev/null +++ b/state.go @@ -0,0 +1,123 @@ +package swoosh + +import "time" + +// CouncilMember represents one elected or proposed council participant. +// This struct must be stable across replicas and serialization order. +type CouncilMember struct { + ID string `json:"id"` + Role string `json:"role"` + PublicKey string `json:"public_key"` + VoteWeight int `json:"vote_weight"` + Signatures []string `json:"signatures"` + Status string `json:"status"` + Epoch uint64 `json:"epoch"` +} + +// EnvResource defines one environment allocation or provisioned asset. +// It is used by the Environment phase to track compute or service resources. +type EnvResource struct { + ID string `json:"id"` + Type string `json:"type"` + Address string `json:"address"` + Capacity string `json:"capacity"` + AllocatedAt time.Time `json:"allocated_at"` + ReleasedAt time.Time `json:"released_at"` + HealthStatus string `json:"health_status"` + Provider string `json:"provider"` +} + +// OrchestratorState captures the full replicated SWOOSH state machine. +type OrchestratorState struct { + Meta struct { + Version string + SchemaHash string + } + + Boot struct { + Licensed bool + LicenseExpiry time.Time + NodeID string + } + + Ingestion struct { + Phase string + ContentHash string + SourceSet []string + LastError string + Epoch uint64 + } + + Council struct { + Phase string + PlannedRoles []string + Members []CouncilMember + QuorumCertHash string + MCPHealthGreen bool + Epoch uint64 + } + + Environment struct { + Phase string + CapacityOK bool + Health string + Resources []EnvResource + Epoch uint64 + } + + Execution struct { + Phase string + ActiveWindowID string + BeatIndex uint64 + PlanLocked bool + Approvals uint32 + Epoch uint64 + } + + Control struct { + Paused bool + Degraded bool + Recovering bool + } + + Policy struct { + Quarantined bool + Rationale string + } + + HLCLast string + StateHash string +} + +// TransitionProposal encapsulates a requested state transition. +type TransitionProposal struct { + CurrentStateHash string `json:"current_state_hash"` + TransitionName string `json:"transition"` + InputsHash string `json:"inputs_hash"` + Signer string `json:"signer"` + IdemKey string `json:"idem_key"` + HLC string `json:"hlc"` + WindowID string `json:"window_id"` + Evidence []string `json:"evidence"` +} + +// GuardOutcome aggregates guard evaluation for a transition. +type GuardOutcome struct { + LicenseOK bool + BackbeatOK bool + QuorumOK bool + PolicyOK bool + MCPHealthy bool + Rationale []string +} + +// WALRecord captures append-only transition metadata for replay and audit. +type WALRecord struct { + StatePreHash string `json:"state_pre_hash"` + StatePostHash string `json:"state_post_hash"` + Transition TransitionProposal `json:"transition"` + Guard GuardOutcome `json:"guard"` + AppliedAtHLC string `json:"applied_hlc"` + AppliedAtUnixNs int64 `json:"applied_unix_ns"` + Index uint64 `json:"index"` +} diff --git a/wal.go b/wal.go new file mode 100644 index 0000000..810a1f8 --- /dev/null +++ b/wal.go @@ -0,0 +1,186 @@ +package swoosh + +import ( + "bufio" + "encoding/json" + "errors" + "fmt" + "os" +) + +// WALStore persists state transition records for deterministic replay. +type WALStore interface { + Append(record WALRecord) error + Replay(fromIndex uint64) ([]WALRecord, error) + Sync() error + LastIndex() uint64 +} + +// FileWAL implements WALStore using an append-only file. +type FileWAL struct { + path string + file *os.File + lastIndex uint64 +} + +// NewFileWAL constructs a file-backed write-ahead log at the given path. +func NewFileWAL(path string) (*FileWAL, error) { + file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o600) + if err != nil { + return nil, fmt.Errorf("open wal: %w", err) + } + + wal := &FileWAL{ + path: path, + file: file, + } + + if err := wal.bootstrapLastIndex(); err != nil { + _ = file.Close() + return nil, err + } + + return wal, nil +} + +// Append writes the WALRecord to the log and fsyncs the file. +func (w *FileWAL) Append(record WALRecord) error { + if record.Index == 0 { + return errors.New("wal record index must be > 0") + } + + if record.Index <= w.lastIndex { + return fmt.Errorf("wal index regression: %d <= %d", record.Index, w.lastIndex) + } + + payload, err := canonicalJSON(record) + if err != nil { + return fmt.Errorf("marshal wal record: %w", err) + } + + if _, err := w.file.Write(append(payload, '\n')); err != nil { + return fmt.Errorf("append wal: %w", err) + } + + if err := w.Sync(); err != nil { + return err + } + + w.lastIndex = record.Index + return nil +} + +// Replay reads records with index >= fromIndex in order. +func (w *FileWAL) Replay(fromIndex uint64) ([]WALRecord, error) { + reader, err := os.Open(w.path) + if err != nil { + if errors.Is(err, os.ErrNotExist) { + return nil, nil + } + return nil, fmt.Errorf("open wal for replay: %w", err) + } + defer reader.Close() + + scanner := bufio.NewScanner(reader) + buf := make([]byte, 0, 64*1024) + scanner.Buffer(buf, 16*1024*1024) + + var records []WALRecord + for scanner.Scan() { + line := scanner.Bytes() + if len(line) == 0 { + continue + } + + var record WALRecord + if err := json.Unmarshal(line, &record); err != nil { + return nil, fmt.Errorf("decode wal record: %w", err) + } + + if record.Index >= fromIndex { + records = append(records, record) + } + } + + if err := scanner.Err(); err != nil { + return nil, fmt.Errorf("scan wal: %w", err) + } + + return records, nil +} + +// Sync fsyncs the underlying file to persist appended records. +func (w *FileWAL) Sync() error { + if err := w.file.Sync(); err != nil { + return fmt.Errorf("sync wal: %w", err) + } + return nil +} + +// LastIndex returns the highest record index stored in the WAL. +func (w *FileWAL) LastIndex() uint64 { + return w.lastIndex +} + +// Close releases resources associated with the FileWAL. +func (w *FileWAL) Close() error { + if w.file == nil { + return nil + } + return w.file.Close() +} + +func (w *FileWAL) bootstrapLastIndex() error { + records, err := w.Replay(0) + if err != nil { + return err + } + if len(records) == 0 { + w.lastIndex = 0 + return nil + } + w.lastIndex = records[len(records)-1].Index + return nil +} + +// InMemoryWAL provides an in-memory WAL implementation for tests. +type InMemoryWAL struct { + records []WALRecord +} + +// Append adds the record to the in-memory sequence. +func (w *InMemoryWAL) Append(record WALRecord) error { + if record.Index == 0 { + return errors.New("wal record index must be > 0") + } + if len(w.records) > 0 && record.Index <= w.records[len(w.records)-1].Index { + return fmt.Errorf("wal index regression: %d", record.Index) + } + w.records = append(w.records, record) + return nil +} + +// Replay returns a copy of stored records from the requested index onward. +func (w *InMemoryWAL) Replay(fromIndex uint64) ([]WALRecord, error) { + if len(w.records) == 0 { + return nil, nil + } + var out []WALRecord + for _, record := range w.records { + if record.Index >= fromIndex { + out = append(out, record) + } + } + return out, nil +} + +// Sync is a no-op for the in-memory WAL. +func (w *InMemoryWAL) Sync() error { return nil } + +// LastIndex returns the latest stored index. +func (w *InMemoryWAL) LastIndex() uint64 { + if len(w.records) == 0 { + return 0 + } + return w.records[len(w.records)-1].Index +}