Initial SWOOSH executor and reducer implementation

This commit is contained in:
Codex Agent
2025-10-24 18:35:13 +11:00
commit 38707dd182
9 changed files with 1931 additions and 0 deletions

424
SWOOSH_PLAN.md Normal file
View File

@@ -0,0 +1,424 @@
Heres your **Codex briefing document**, ready to hand over.
It defines the SWOOSH architecture, constraints, coding style, and behavioral contract in a single authoritative form.
**Download:** [SWOOSH_Codex_Design_Brief.md](sandbox:/mnt/data/SWOOSH_Codex_Design_Brief.md)
---
### 📄 **SWOOSH_Codex_Design_Brief.md**
````markdown
# SWOOSH — Deterministic Orchestrator (Codex Implementation Brief)
> **Purpose:**
> Codex is to implement the SWOOSH orchestrator exactly as described in this brief.
> SWOOSH replaces the WHOOSH orchestration module with a deterministic state-machine core built in **Go**, designed for **predictable, audit-ready, chaos-resilient orchestration**.
---
## 1. Overview
SWOOSH governs:
- Project ingestion (DISCOVER → READY)
- Council formation (PLAN_ROLES → READY)
- Environment provisioning (ALLOCATE → READY)
- Execution (PLAN → WORK → REVIEW → REVERB)
It guarantees:
- **Deterministic state transitions** (pure reducer pattern)
- **Idempotent application** of transitions
- **Write-ahead logging (WAL)** with append-only semantics
- **Periodic snapshots** and deterministic replay
- **Hybrid Logical Clock (HLC)** ordering across replicas
- **Immutable audit evidence** (UCXL + BUBBLE Decision Records)
---
## 2. Language and Runtime
- **Language:** Go 1.22+
- **Dependencies:** stdlib only, except for BadgerDB (WAL store)
- **Concurrency:** single-threaded state executor goroutine; NO mutexes on state
- **Architecture:** pure reducer + guard providers + WAL + snapshot
- **All state mutation must occur within the executor loop**
---
## 3. Core Contracts
Codex must **not invent or modify** these structs, fields, or method signatures.
### 3.1 OrchestratorState
```go
type OrchestratorState struct {
Meta struct {
Version string
SchemaHash string
}
Boot struct {
Licensed bool
LicenseExpiry time.Time
NodeID string
}
Ingestion struct {
Phase string // DISCOVER|FETCH|VALIDATE|INDEX|READY
ContentHash string
SourceSet []string
LastError string
Epoch uint64
}
Council struct {
Phase string // PLAN_ROLES|ELECT|TOOLING_SYNC|READY
PlannedRoles []string
Members []CouncilMember
QuorumCertHash string
MCPHealthGreen bool
Epoch uint64
}
Environment struct {
Phase string // ALLOCATE|PROVISION|HEALTHCHECK|READY|DEGRADED
CapacityOK bool
Health string // green|amber
Resources []EnvResource
Epoch uint64
}
Execution struct {
Phase string // PLAN|WORK|REVIEW|REVERB
ActiveWindowID string
BeatIndex uint64
PlanLocked bool
Approvals uint32
Epoch uint64
}
Control struct {
Paused bool
Degraded bool
Recovering bool
}
Policy struct {
Quarantined bool
Rationale string
}
HLCLast string // last applied hlc
StateHash string // sha256 of canonical serialization
}
````
---
### 3.2 TransitionProposal
```go
type TransitionProposal struct {
CurrentStateHash string `json:"current_state_hash"`
TransitionName string `json:"transition"`
InputsHash string `json:"inputs_hash"`
Signer string `json:"signer"`
IdemKey string `json:"idem_key"`
HLC string `json:"hlc"`
WindowID string `json:"window_id"`
Evidence []string `json:"evidence"`
}
```
---
### 3.3 GuardOutcome
```go
type GuardOutcome struct {
LicenseOK bool
BackbeatOK bool
QuorumOK bool
PolicyOK bool
MCPHealthy bool
Rationale []string
}
```
Guards are computed **outside** the reducer. Reducer consumes them as verified input.
---
### 3.4 WALRecord
```go
type WALRecord struct {
StatePreHash string `json:"state_pre_hash"`
StatePostHash string `json:"state_post_hash"`
Transition TransitionProposal `json:"transition"`
Guard GuardOutcome `json:"guard"`
AppliedAtHLC string `json:"applied_hlc"`
AppliedAtUnixNs int64 `json:"applied_unix_ns"`
Index uint64 `json:"index"`
}
```
---
## 4. Reducer Contract
```go
// Reduce applies a validated transition to the given state and returns a new state.
// It MUST be deterministic, pure, and side-effect free.
// It MUST NOT perform I/O, logging, or time reads.
func Reduce(oldState OrchestratorState, t TransitionProposal, g GuardOutcome) (OrchestratorState, error)
```
* Reducer logic implemented as a **switch on TransitionName**.
* Guard outcomes are already evaluated.
* Each case:
* Updates state fields
* Increments relevant Epoch
* Computes new StateHash
* Returns new state, no side-effects.
---
## 5. Executor Model
Codex must implement an **Executor** struct that:
* Runs as a single goroutine
* Serializes all writes to state
* Exposes a safe interface for transition submission
```go
type Executor struct {
state OrchestratorState
wal WALStore
snapshot SnapshotStore
applyCh chan TransitionProposal
resultsCh chan ApplyResult
}
type ApplyResult struct {
Success bool
Error error
NewState OrchestratorState
GuardInfo GuardOutcome
}
```
### Rules
* No other goroutine may mutate `state`.
* All writes funnel through `applyCh`.
* Each accepted transition:
1. Validate → Guard → Reduce
2. Append to WAL (fsync)
3. Update canonical state
4. Snapshot periodically
---
## 6. WAL Layer
Codex must implement:
```go
type WALStore interface {
Append(record WALRecord) error
Replay(fromIndex uint64) ([]WALRecord, error)
Sync() error
LastIndex() uint64
}
```
Implementation details:
* Backed by BadgerDB or flat append-only file.
* Must fsync after Append or Sync().
* Replay must yield records in index order.
* No concurrent writers (single executor owns WAL).
---
## 7. Snapshot Layer
```go
type Snapshot struct {
State OrchestratorState
LastAppliedHLC string
LastAppliedIndex uint64
}
type SnapshotStore interface {
Save(s Snapshot) error
LoadLatest() (Snapshot, error)
}
```
* Writes as atomic replace (temp → fsync → rename).
* Snapshot frequency configurable (every N transitions or REVERB phase).
---
## 8. Guard Providers
Codex must define interfaces only (not implementations):
```go
type GuardProvider interface {
Evaluate(t TransitionProposal, s OrchestratorState) (GuardOutcome, error)
}
```
Specific guard providers (implemented later):
* KACHINGGuard
* BACKBEATGuard
* HMMMQuorumGuard
* SHHHPolicyGuard
* MCPHealthGuard
---
## 9. API Layer
Codex will expose a minimal HTTP API:
| Method | Path | Description |
| ------ | ------------- | ------------------------------------------------------- |
| POST | `/transition` | Submit a TransitionProposal |
| GET | `/state` | Return current OrchestratorState (optionally projected) |
| GET | `/health` | Returns readiness, WAL lag, last snapshot info |
Rules:
* Transition proposals must include `idem_key`, `hlc`, `window_id`.
* Transition acceptance is serialized by executor.
* Response must include resulting StateHash or failure reason.
---
## 10. Determinism Requirements
* Reducer must be **referentially transparent**.
* No access to:
* System time
* Random generators
* Environment variables
* Network sockets
* Hashing, sorting, or serialization must use deterministic algorithms.
* WAL replay must yield identical `StatePostHash` as original execution.
---
## 11. Concurrency Rules
* Only the **executor goroutine** writes to state.
* All other goroutines are read-only or handle I/O.
* Mutexes are forbidden on state objects.
* Communication via typed channels only.
---
## 12. Logging and Metrics
Codex may stub structured logging, but **no I/O inside reducer**.
Metrics hooks (later implemented):
* `state_advance_latency`
* `retry_count`
* `quarantine_rate`
* `recovery_time`
* `pending_transitions`
---
## 13. Testing Hooks
Codex must include:
* In-memory WAL and snapshot mocks for property tests.
* `Replay()` function that:
* Loads snapshot + WAL
* Applies transitions deterministically
* Returns resulting `StateHash`
* Determinism test: assert identical hashes across runs.
---
## 14. Generation Scope
Codex is to generate **Go code only**, adhering to this spec.
Modules to generate (one per iteration):
1. `state.go` — definitions from §3
2. `reducer.go` — function skeleton for `Reduce()`
3. `wal.go` — WALStore interface + Badger impl
4. `snapshot.go` — SnapshotStore interface + atomic file impl
5. `executor.go` — single-goroutine executor
6. `api.go` — HTTP handlers for `/transition` and `/state`
---
## 15. Non-Goals
* No database migrations
* No gRPC
* No background workers beyond executor
* No dynamic schema changes
* No plugins or reflection
---
## 16. Style & Discipline
* Idiomatic Go formatting and error handling.
* Deterministic serialization (canonical JSON or sorted keys).
* No use of global variables or shared mutable state.
* Reducer logic must be explicit and exhaustive — no default fallthrough.
---
## 17. Example Flow (Codex reference)
1. Client submits `TransitionProposal` (e.g. `"ELECT_QUORUM_CERT"`).
2. API enqueues proposal to executors `applyCh`.
3. Executor calls `GuardProvider.Evaluate()`.
4. Guards OK → `Reduce(oldState, proposal, guard)` → new state.
5. Append WALRecord.
6. Update state.
7. Broadcast new snapshot hash to metrics/log.
---
## 18. Acceptance Criteria
Codex output is accepted when:
* All interfaces and structs match this doc exactly.
* Code compiles with `go build ./...`.
* Reducer is pure, single-threaded, and deterministic.
* WAL replay reproduces identical state.
* HTTP API handles at least POST `/transition` and GET `/state`.
---
## 19. Keywords
`deterministic`, `state-machine`, `pure reducer`, `single executor`, `idempotent`, `WAL`, `snapshot`, `HLC`, `auditability`, `no concurrency races`.
---
## 20. End of Brief
> Codex, implement the SWOOSH orchestrator **exactly as specified**,
> using Go and following the constraints in this document.
> Do not add dependencies, types, or concurrency patterns not explicitly described here.

152
determinism_test.go Normal file
View File

@@ -0,0 +1,152 @@
package swoosh
import (
"testing"
"time"
)
type staticGuardProvider struct {
outcomes map[string]GuardOutcome
}
func newStaticGuardProvider() *staticGuardProvider {
return &staticGuardProvider{outcomes: make(map[string]GuardOutcome)}
}
func (g *staticGuardProvider) setOutcome(name string, outcome GuardOutcome) {
g.outcomes[name] = outcome
}
func (g *staticGuardProvider) Evaluate(t TransitionProposal, s OrchestratorState) (GuardOutcome, error) {
if outcome, ok := g.outcomes[t.TransitionName]; ok {
return outcome, nil
}
return GuardOutcome{
LicenseOK: true,
BackbeatOK: true,
QuorumOK: true,
PolicyOK: true,
MCPHealthy: true,
}, nil
}
func newTestExecutor(t *testing.T, guard GuardProvider, wal WALStore, snap SnapshotStore) *Executor {
t.Helper()
initial := Snapshot{}
exec := NewExecutor(wal, snap, guard, initial)
return exec
}
func waitResult(t *testing.T, ch <-chan ApplyResult) ApplyResult {
t.Helper()
select {
case res := <-ch:
return res
case <-time.After(2 * time.Second):
t.Fatalf("timed out waiting for apply result")
return ApplyResult{}
}
}
func TestDeterministicReplay(t *testing.T) {
walStore := &InMemoryWAL{}
snapStore := &InMemorySnapshotStore{}
guard := newStaticGuardProvider()
guard.setOutcome("TRANSITION_QUARANTINE_ENTER", GuardOutcome{PolicyOK: false})
exec := newTestExecutor(t, guard, walStore, snapStore)
transitions := []TransitionProposal{
{TransitionName: "LICENSE_GRANTED", HLC: "1"},
{TransitionName: "INGESTION_SOURCES_RESOLVED", HLC: "2"},
{TransitionName: "INGESTION_BYTES_OK", HLC: "3"},
{TransitionName: "INGESTION_SCHEMA_OK", HLC: "4"},
{TransitionName: "INGESTION_CORPUS_BUILT", HLC: "5"},
{TransitionName: "COUNCIL_PROFILES_LOADED", HLC: "6"},
{TransitionName: "COUNCIL_QUORUM_CERT", HLC: "7"},
{TransitionName: "COUNCIL_MCP_GREEN", HLC: "8"},
{TransitionName: "ENV_CAPACITY_OK", HLC: "9"},
{TransitionName: "ENV_INSTALLED", HLC: "10"},
{TransitionName: "ENV_HEALTH_GREEN", HLC: "11"},
{TransitionName: "EXEC_PLAN_LOCKED", HLC: "12", WindowID: "window-1"},
{TransitionName: "EXEC_BEAT_REVIEW_GATE", HLC: "13", WindowID: "window-1"},
{TransitionName: "EXEC_APPROVALS_THRESHOLD", HLC: "14", WindowID: "window-1"},
{TransitionName: "EXEC_NEXT_WINDOW", HLC: "15", WindowID: "window-2"},
}
var finalState OrchestratorState
for _, proposal := range transitions {
ch, err := exec.SubmitTransition(proposal)
if err != nil {
t.Fatalf("submit transition %s: %v", proposal.TransitionName, err)
}
res := waitResult(t, ch)
if !res.Success || res.Error != nil {
t.Fatalf("transition %s failed: success=%t error=%v", proposal.TransitionName, res.Success, res.Error)
}
finalState = res.NewState
}
finalHash := finalState.StateHash
if finalHash == "" {
t.Fatal("final state hash empty")
}
snapshot := Snapshot{
State: finalState,
LastAppliedHLC: finalState.HLCLast,
LastAppliedIndex: walStore.LastIndex(),
}
if err := snapStore.Save(snapshot); err != nil {
t.Fatalf("save snapshot: %v", err)
}
replayState, err := Replay(walStore, snapshot)
if err != nil {
t.Fatalf("replay failed: %v", err)
}
if replayState.StateHash != finalHash {
t.Fatalf("state hash mismatch after replay: got %s want %s", replayState.StateHash, finalHash)
}
}
func TestQuarantineEnforced(t *testing.T) {
walStore := &InMemoryWAL{}
snapStore := &InMemorySnapshotStore{}
guard := newStaticGuardProvider()
guard.setOutcome("TRANSITION_QUARANTINE_ENTER", GuardOutcome{PolicyOK: false})
exec := newTestExecutor(t, guard, walStore, snapStore)
enterProposal := TransitionProposal{TransitionName: "TRANSITION_QUARANTINE_ENTER", HLC: "1"}
ch, err := exec.SubmitTransition(enterProposal)
if err != nil {
t.Fatalf("submit quarantine enter: %v", err)
}
res := waitResult(t, ch)
if !res.Success || res.Error != nil {
t.Fatalf("quarantine enter failed: %v", res.Error)
}
quarantinedState := res.NewState
if !quarantinedState.Policy.Quarantined {
t.Fatal("state not quarantined after enter transition")
}
hashBefore := quarantinedState.StateHash
illegalProposal := TransitionProposal{TransitionName: "EXEC_BEAT_REVIEW_GATE", HLC: "2", WindowID: "window-1"}
ch, err = exec.SubmitTransition(illegalProposal)
if err != nil {
t.Fatalf("submit illegal transition: %v", err)
}
res = waitResult(t, ch)
if res.Success || res.Error == nil {
t.Fatalf("expected failure for illegal transition in quarantine, got success=%t error=%v", res.Success, res.Error)
}
stateAfter := exec.GetStateSnapshot()
if stateAfter.StateHash != hashBefore {
t.Fatalf("state hash changed after illegal transition: got %s want %s", stateAfter.StateHash, hashBefore)
}
}

248
executor.go Normal file
View File

@@ -0,0 +1,248 @@
package swoosh
import (
"encoding/json"
"fmt"
"time"
)
const defaultSnapshotInterval uint64 = 32
// GuardProvider evaluates transition guards prior to reducer execution.
type GuardProvider interface {
Evaluate(t TransitionProposal, s OrchestratorState) (GuardOutcome, error)
}
// ApplyResult communicates the outcome of an executor-applied transition.
type ApplyResult struct {
Success bool
Error error
NewState OrchestratorState
GuardInfo GuardOutcome
}
type applyRequest struct {
proposal TransitionProposal
response chan<- ApplyResult
}
type stateRequest struct {
response chan<- OrchestratorState
}
// Executor serializes state mutations for the orchestrator.
type Executor struct {
applyCh chan applyRequest
stateReqCh chan stateRequest
wal WALStore
snapshotStore SnapshotStore
guard GuardProvider
state OrchestratorState
snapshotEvery uint64
appliedSinceSn uint64
lastIndex uint64
}
// NewExecutor constructs and starts an Executor using the provided collaborators.
func NewExecutor(wal WALStore, snap SnapshotStore, guard GuardProvider, initial Snapshot) *Executor {
state := initial.State
if cloned, err := cloneState(state); err == nil {
state = cloned
}
if state.StateHash == "" {
if hash, err := computeStateHash(state); err == nil {
state.StateHash = hash
}
}
lastIdx := initial.LastAppliedIndex
if walIdx := wal.LastIndex(); walIdx > lastIdx {
lastIdx = walIdx
}
exec := &Executor{
applyCh: make(chan applyRequest),
stateReqCh: make(chan stateRequest),
wal: wal,
snapshotStore: snap,
guard: guard,
state: state,
snapshotEvery: defaultSnapshotInterval,
appliedSinceSn: 0,
lastIndex: lastIdx,
}
go exec.loop()
return exec
}
// SubmitTransition enqueues a TransitionProposal and returns a channel for its result.
func (e *Executor) SubmitTransition(p TransitionProposal) (<-chan ApplyResult, error) {
if p.TransitionName == "" {
return nil, fmt.Errorf("%w: %s", ErrUnknownTransition, "empty transition name")
}
resCh := make(chan ApplyResult, 1)
req := applyRequest{proposal: p, response: resCh}
e.applyCh <- req
return resCh, nil
}
// GetStateSnapshot returns a deep copy of the current orchestrator state.
func (e *Executor) GetStateSnapshot() OrchestratorState {
respCh := make(chan OrchestratorState, 1)
e.stateReqCh <- stateRequest{response: respCh}
snapshot := <-respCh
return snapshot
}
func (e *Executor) loop() {
for {
select {
case req := <-e.applyCh:
e.handleApply(req)
case req := <-e.stateReqCh:
clone, err := cloneState(e.state)
if err != nil {
clone = e.state
}
req.response <- clone
}
}
}
func (e *Executor) handleApply(req applyRequest) {
proposal := req.proposal
result := ApplyResult{}
if e.state.Policy.Quarantined && proposal.TransitionName != "TRANSITION_QUARANTINE_RELEASE" && proposal.TransitionName != "TRANSITION_QUARANTINE_CONFIRM_BLOCK" {
result.Error = fmt.Errorf("%w: %s", ErrQuarantined, proposal.TransitionName)
req.response <- result
return
}
guardOutcome, err := e.evaluateGuard(proposal)
if err != nil {
result.Error = fmt.Errorf("guard evaluation: %w", err)
req.response <- result
return
}
newState, err := Reduce(e.state, proposal, guardOutcome)
if err != nil {
result.Error = fmt.Errorf("reduce: %w", err)
result.GuardInfo = guardOutcome
req.response <- result
return
}
if newState.StateHash == e.state.StateHash {
result.Success = true
result.NewState = e.cloneForResult(newState)
result.GuardInfo = guardOutcome
req.response <- result
return
}
walRecord := WALRecord{
StatePreHash: e.state.StateHash,
StatePostHash: newState.StateHash,
Transition: proposal,
Guard: guardOutcome,
AppliedAtHLC: proposal.HLC,
AppliedAtUnixNs: time.Now().UnixNano(),
Index: e.nextIndex(),
}
if err := e.appendAndSync(walRecord); err != nil {
result.Error = fmt.Errorf("wal append: %w", err)
result.GuardInfo = guardOutcome
req.response <- result
return
}
e.state = newState
e.lastIndex = walRecord.Index
e.appliedSinceSn++
if e.shouldSnapshot() {
snap := Snapshot{
State: e.state,
LastAppliedHLC: e.state.HLCLast,
LastAppliedIndex: e.lastIndex,
}
if err := e.saveSnapshot(snap); err != nil {
result.Error = fmt.Errorf("snapshot save: %w", err)
result.GuardInfo = guardOutcome
req.response <- result
return
}
e.appliedSinceSn = 0
}
result.Success = true
result.NewState = e.cloneForResult(e.state)
result.GuardInfo = guardOutcome
req.response <- result
}
func (e *Executor) evaluateGuard(p TransitionProposal) (GuardOutcome, error) {
if e.guard == nil {
return GuardOutcome{
LicenseOK: true,
BackbeatOK: true,
QuorumOK: true,
PolicyOK: true,
MCPHealthy: true,
}, nil
}
return e.guard.Evaluate(p, e.state)
}
func (e *Executor) appendAndSync(record WALRecord) error {
if err := e.wal.Append(record); err != nil {
return err
}
if err := e.wal.Sync(); err != nil {
return err
}
return nil
}
func (e *Executor) saveSnapshot(s Snapshot) error {
if e.snapshotStore == nil {
return nil
}
return e.snapshotStore.Save(s)
}
func (e *Executor) shouldSnapshot() bool {
return e.snapshotEvery > 0 && e.appliedSinceSn >= e.snapshotEvery && e.snapshotStore != nil
}
func (e *Executor) nextIndex() uint64 {
if e.lastIndex == 0 {
return 1
}
return e.lastIndex + 1
}
func (e *Executor) cloneForResult(state OrchestratorState) OrchestratorState {
cloned, err := cloneState(state)
if err != nil {
return state
}
return cloned
}
func cloneState(state OrchestratorState) (OrchestratorState, error) {
payload, err := json.Marshal(state)
if err != nil {
return OrchestratorState{}, err
}
var out OrchestratorState
if err := json.Unmarshal(payload, &out); err != nil {
return OrchestratorState{}, err
}
return out, nil
}

3
go.mod Normal file
View File

@@ -0,0 +1,3 @@
module swoosh
go 1.22

633
reducer.go Normal file
View File

@@ -0,0 +1,633 @@
package swoosh
import (
"crypto/sha256"
"encoding/hex"
"encoding/json"
"errors"
"fmt"
"strings"
)
var (
// ErrGuardRejected is returned when guard outcomes prevent a transition from applying.
ErrGuardRejected = errors.New("guard rejected transition")
// ErrUnknownTransition is returned when Reduce encounters an unhandled transition name.
ErrUnknownTransition = errors.New("unknown transition")
// ErrQuarantined indicates transitions are blocked while policy quarantine is active.
ErrQuarantined = errors.New("transition blocked by quarantine")
// ErrInvalidPhase signals an unexpected phase for the requested transition.
ErrInvalidPhase = errors.New("invalid phase for transition")
// ErrPrecondition signals a precondition failure unrelated to phases or guards.
ErrPrecondition = errors.New("precondition failed")
)
// Reduce applies a validated transition to the given state and returns a new state.
// Reducer must remain deterministic and side-effect free.
func Reduce(oldState OrchestratorState, t TransitionProposal, g GuardOutcome) (OrchestratorState, error) {
newState := oldState
changed, err := applyTransition(&newState, t, g)
if err != nil {
return OrchestratorState{}, err
}
if !changed {
return oldState, nil
}
newState.HLCLast = t.HLC
hash, err := computeStateHash(newState)
if err != nil {
return OrchestratorState{}, fmt.Errorf("compute state hash: %w", err)
}
newState.StateHash = hash
return newState, nil
}
func applyTransition(state *OrchestratorState, t TransitionProposal, g GuardOutcome) (bool, error) {
name := t.TransitionName
switch name {
case "":
return false, fmt.Errorf("%w: empty transition name", ErrUnknownTransition)
}
if state.Policy.Quarantined && name != "TRANSITION_QUARANTINE_RELEASE" && name != "TRANSITION_QUARANTINE_CONFIRM_BLOCK" {
return false, fmt.Errorf("%w: %s", ErrQuarantined, name)
}
switch name {
case "BOOT_CONFIG_OK":
if state.Boot.Licensed {
return false, fmt.Errorf("%w: boot already licensed", ErrPrecondition)
}
return false, nil
case "LICENSE_GRANTED":
if state.Boot.Licensed {
return false, nil
}
if !g.LicenseOK {
return false, fmt.Errorf("%w: license check failed", ErrGuardRejected)
}
state.Boot.Licensed = true
return true, nil
case "INGESTION_SOURCES_RESOLVED":
if state.Ingestion.Phase == "FETCH" && state.Ingestion.LastError == "" {
return false, nil
}
if state.Ingestion.Phase != "" && state.Ingestion.Phase != "DISCOVER" {
return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase)
}
changed := false
if state.Ingestion.Phase != "FETCH" {
state.Ingestion.Phase = "FETCH"
changed = true
}
if state.Ingestion.LastError != "" {
state.Ingestion.LastError = ""
changed = true
}
if changed {
state.Ingestion.Epoch++
}
return changed, nil
case "INGESTION_BYTES_OK":
if state.Ingestion.Phase == "VALIDATE" && state.Ingestion.LastError == "" {
return false, nil
}
if state.Ingestion.Phase != "FETCH" {
return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase)
}
changed := false
if state.Ingestion.Phase != "VALIDATE" {
state.Ingestion.Phase = "VALIDATE"
changed = true
}
if state.Ingestion.LastError != "" {
state.Ingestion.LastError = ""
changed = true
}
if changed {
state.Ingestion.Epoch++
}
return changed, nil
case "INGESTION_SCHEMA_OK":
if state.Ingestion.Phase == "INDEX" && state.Ingestion.LastError == "" {
return false, nil
}
if state.Ingestion.Phase != "VALIDATE" {
return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase)
}
if !g.PolicyOK || !g.BackbeatOK {
return false, fmt.Errorf("%w: policy=%t backbeat=%t", ErrGuardRejected, g.PolicyOK, g.BackbeatOK)
}
changed := false
if state.Ingestion.Phase != "INDEX" {
state.Ingestion.Phase = "INDEX"
changed = true
}
if state.Ingestion.LastError != "" {
state.Ingestion.LastError = ""
changed = true
}
if changed {
state.Ingestion.Epoch++
}
return changed, nil
case "INGESTION_CORPUS_BUILT":
if state.Ingestion.Phase == "READY" && state.Ingestion.LastError == "" {
return false, nil
}
if state.Ingestion.Phase != "INDEX" {
return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase)
}
changed := false
if state.Ingestion.Phase != "READY" {
state.Ingestion.Phase = "READY"
changed = true
}
if state.Ingestion.LastError != "" {
state.Ingestion.LastError = ""
changed = true
}
if changed {
state.Ingestion.Epoch++
}
return changed, nil
case "INGESTION_POLICY_VIOLATION":
if g.PolicyOK {
return false, fmt.Errorf("%w: policy guard true", ErrGuardRejected)
}
if state.Ingestion.Phase != "VALIDATE" && state.Ingestion.Phase != "INDEX" {
return false, fmt.Errorf("%w: ingestion phase %q", ErrInvalidPhase, state.Ingestion.Phase)
}
rationale := joinRationale(g.Rationale)
changed := false
if !state.Policy.Quarantined {
state.Policy.Quarantined = true
changed = true
}
if state.Policy.Rationale != rationale {
state.Policy.Rationale = rationale
changed = true
}
if !state.Control.Degraded {
state.Control.Degraded = true
changed = true
}
if changed {
state.Ingestion.Epoch++
}
return changed, nil
case "COUNCIL_PROFILES_LOADED":
if state.Council.Phase == "ELECT" {
return false, nil
}
if state.Council.Phase != "" && state.Council.Phase != "PLAN_ROLES" {
return false, fmt.Errorf("%w: council phase %q", ErrInvalidPhase, state.Council.Phase)
}
state.Council.Phase = "ELECT"
state.Council.Epoch++
return true, nil
case "COUNCIL_QUORUM_CERT":
if state.Council.Phase == "TOOLING_SYNC" {
return false, nil
}
if state.Council.Phase != "ELECT" {
return false, fmt.Errorf("%w: council phase %q", ErrInvalidPhase, state.Council.Phase)
}
if !g.QuorumOK || !g.PolicyOK {
return false, fmt.Errorf("%w: quorum=%t policy=%t", ErrGuardRejected, g.QuorumOK, g.PolicyOK)
}
state.Council.Phase = "TOOLING_SYNC"
state.Council.Epoch++
return true, nil
case "COUNCIL_MCP_GREEN":
if state.Council.Phase == "READY" && state.Council.MCPHealthGreen {
return false, nil
}
if state.Council.Phase != "TOOLING_SYNC" {
return false, fmt.Errorf("%w: council phase %q", ErrInvalidPhase, state.Council.Phase)
}
if !g.MCPHealthy {
return false, fmt.Errorf("%w: mcp unhealthy", ErrGuardRejected)
}
changed := false
if state.Council.Phase != "READY" {
state.Council.Phase = "READY"
changed = true
}
if !state.Council.MCPHealthGreen {
state.Council.MCPHealthGreen = true
changed = true
}
if changed {
state.Council.Epoch++
}
return changed, nil
case "ENV_CAPACITY_OK":
if state.Environment.Phase == "PROVISION" && state.Environment.CapacityOK && state.Environment.Health == "" {
return false, nil
}
if state.Environment.Phase != "" && state.Environment.Phase != "ALLOCATE" {
return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase)
}
if !g.BackbeatOK {
return false, fmt.Errorf("%w: backbeat guard false", ErrGuardRejected)
}
changed := false
if state.Environment.Phase != "PROVISION" {
state.Environment.Phase = "PROVISION"
changed = true
}
if !state.Environment.CapacityOK {
state.Environment.CapacityOK = true
changed = true
}
if state.Environment.Health != "" {
state.Environment.Health = ""
changed = true
}
if changed {
state.Environment.Epoch++
}
return changed, nil
case "ENV_INSTALLED":
if state.Environment.Phase == "HEALTHCHECK" {
return false, nil
}
if state.Environment.Phase != "PROVISION" {
return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase)
}
state.Environment.Phase = "HEALTHCHECK"
state.Environment.Epoch++
return true, nil
case "ENV_HEALTH_GREEN":
if state.Environment.Phase == "READY" && state.Environment.Health == "green" && !state.Control.Degraded {
return false, nil
}
if state.Environment.Phase != "HEALTHCHECK" {
return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase)
}
changed := false
if state.Environment.Phase != "READY" {
state.Environment.Phase = "READY"
changed = true
}
if state.Environment.Health != "green" {
state.Environment.Health = "green"
changed = true
}
if state.Control.Degraded {
state.Control.Degraded = false
changed = true
}
if changed {
state.Environment.Epoch++
}
return changed, nil
case "ENV_HEALTH_AMBER":
if state.Environment.Phase == "DEGRADED" && state.Environment.Health == "amber" && state.Control.Degraded {
return false, nil
}
if state.Environment.Phase != "HEALTHCHECK" {
return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase)
}
changed := false
if state.Environment.Phase != "DEGRADED" {
state.Environment.Phase = "DEGRADED"
changed = true
}
if state.Environment.Health != "amber" {
state.Environment.Health = "amber"
changed = true
}
if !state.Control.Degraded {
state.Control.Degraded = true
changed = true
}
if changed {
state.Environment.Epoch++
}
return changed, nil
case "ENV_RECOVERED_GREEN":
if state.Environment.Phase == "READY" && state.Environment.Health == "green" && !state.Control.Degraded {
return false, nil
}
if state.Environment.Phase != "DEGRADED" {
return false, fmt.Errorf("%w: environment phase %q", ErrInvalidPhase, state.Environment.Phase)
}
if !g.MCPHealthy {
return false, fmt.Errorf("%w: mcp unhealthy", ErrGuardRejected)
}
changed := false
if state.Environment.Phase != "READY" {
state.Environment.Phase = "READY"
changed = true
}
if state.Environment.Health != "green" {
state.Environment.Health = "green"
changed = true
}
if state.Control.Degraded {
state.Control.Degraded = false
changed = true
}
if changed {
state.Environment.Epoch++
}
return changed, nil
case "EXEC_PLAN_LOCKED":
if state.Execution.Phase == "WORK" && state.Execution.PlanLocked && state.Execution.ActiveWindowID == t.WindowID {
return false, nil
}
if state.Execution.Phase != "" && state.Execution.Phase != "PLAN" {
return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase)
}
if !g.BackbeatOK {
return false, fmt.Errorf("%w: backbeat guard false", ErrGuardRejected)
}
if state.Ingestion.Phase != "READY" || state.Council.Phase != "READY" || state.Environment.Phase != "READY" {
return false, fmt.Errorf("%w: prerequisites not ready", ErrPrecondition)
}
changed := false
if state.Execution.Phase != "WORK" {
state.Execution.Phase = "WORK"
changed = true
}
if !state.Execution.PlanLocked {
state.Execution.PlanLocked = true
changed = true
}
if state.Execution.ActiveWindowID != t.WindowID {
state.Execution.ActiveWindowID = t.WindowID
changed = true
}
if changed {
state.Execution.Epoch++
}
return changed, nil
case "EXEC_BEAT_REVIEW_GATE":
if state.Execution.Phase == "REVIEW" {
return false, nil
}
if state.Execution.Phase != "WORK" {
return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase)
}
if !g.BackbeatOK {
return false, fmt.Errorf("%w: backbeat guard false", ErrGuardRejected)
}
state.Execution.Phase = "REVIEW"
state.Execution.PlanLocked = true
state.Execution.Epoch++
return true, nil
case "EXEC_APPROVALS_THRESHOLD":
if state.Execution.Phase == "REVERB" {
return false, nil
}
if state.Execution.Phase != "REVIEW" {
return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase)
}
if !g.QuorumOK || !g.PolicyOK {
return false, fmt.Errorf("%w: quorum=%t policy=%t", ErrGuardRejected, g.QuorumOK, g.PolicyOK)
}
state.Execution.Phase = "REVERB"
state.Execution.Epoch++
return true, nil
case "EXEC_CHANGES_REQUESTED":
if state.Execution.Phase == "WORK" {
return false, nil
}
if state.Execution.Phase != "REVIEW" {
return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase)
}
if !g.PolicyOK {
return false, fmt.Errorf("%w: policy guard false", ErrGuardRejected)
}
state.Execution.Phase = "WORK"
state.Execution.PlanLocked = true
state.Execution.Epoch++
return true, nil
case "EXEC_NEXT_WINDOW":
if state.Execution.Phase == "PLAN" && !state.Execution.PlanLocked && state.Execution.ActiveWindowID == t.WindowID && state.Execution.Approvals == 0 {
return false, nil
}
if state.Execution.Phase != "REVERB" {
return false, fmt.Errorf("%w: execution phase %q", ErrInvalidPhase, state.Execution.Phase)
}
if !g.BackbeatOK {
return false, fmt.Errorf("%w: backbeat guard false", ErrGuardRejected)
}
changed := false
if state.Execution.Phase != "PLAN" {
state.Execution.Phase = "PLAN"
changed = true
}
if state.Execution.PlanLocked {
state.Execution.PlanLocked = false
changed = true
}
if state.Execution.ActiveWindowID != t.WindowID {
state.Execution.ActiveWindowID = t.WindowID
changed = true
}
if state.Execution.Approvals != 0 {
state.Execution.Approvals = 0
changed = true
}
if changed {
state.Execution.Epoch++
}
return changed, nil
case "CONTROL_PAUSE":
needPause := !state.Control.Paused
needRecoverReset := state.Control.Recovering
if !needPause && !needRecoverReset {
return false, nil
}
if !g.PolicyOK {
return false, fmt.Errorf("%w: policy guard false", ErrGuardRejected)
}
if needPause {
state.Control.Paused = true
}
if needRecoverReset {
state.Control.Recovering = false
}
return true, nil
case "CONTROL_RESUME":
if !state.Control.Paused {
return false, fmt.Errorf("%w: system not paused", ErrPrecondition)
}
if !g.PolicyOK {
return false, fmt.Errorf("%w: policy guard false", ErrGuardRejected)
}
changed := false
if state.Control.Paused {
state.Control.Paused = false
changed = true
}
if state.Control.Recovering {
state.Control.Recovering = false
changed = true
}
return changed, nil
case "CONTROL_RECOVERY_ENTER":
if state.Control.Recovering && state.Control.Degraded {
return false, nil
}
if g.QuorumOK && !state.Control.Degraded {
return false, fmt.Errorf("%w: no recovery trigger", ErrGuardRejected)
}
changed := false
if !state.Control.Recovering {
state.Control.Recovering = true
changed = true
}
if !state.Control.Degraded {
state.Control.Degraded = true
changed = true
}
return changed, nil
case "CONTROL_RECOVERY_RESOLVED":
if !state.Control.Recovering {
return false, fmt.Errorf("%w: not in recovery", ErrPrecondition)
}
if !g.QuorumOK && state.Environment.Health != "green" {
return false, fmt.Errorf("%w: recovery not satisfied", ErrGuardRejected)
}
changed := false
if state.Control.Recovering {
state.Control.Recovering = false
changed = true
}
if state.Control.Degraded {
state.Control.Degraded = false
changed = true
}
return changed, nil
case "TRANSITION_QUARANTINE_ENTER":
if g.PolicyOK {
return false, fmt.Errorf("%w: policy guard true", ErrGuardRejected)
}
rationale := joinRationale(g.Rationale)
changed := false
if !state.Policy.Quarantined {
state.Policy.Quarantined = true
changed = true
}
if state.Policy.Rationale != rationale {
state.Policy.Rationale = rationale
changed = true
}
if !state.Control.Paused {
state.Control.Paused = true
changed = true
}
if state.Control.Recovering {
state.Control.Recovering = false
changed = true
}
if !state.Control.Degraded {
state.Control.Degraded = true
changed = true
}
return changed, nil
case "TRANSITION_QUARANTINE_RELEASE":
if !state.Policy.Quarantined {
return false, fmt.Errorf("%w: not quarantined", ErrPrecondition)
}
if !g.PolicyOK {
return false, fmt.Errorf("%w: policy guard false", ErrGuardRejected)
}
changed := false
if state.Policy.Quarantined {
state.Policy.Quarantined = false
changed = true
}
if state.Control.Paused {
state.Control.Paused = false
changed = true
}
if state.Control.Degraded {
state.Control.Degraded = false
changed = true
}
if state.Control.Recovering {
state.Control.Recovering = false
changed = true
}
return changed, nil
case "TRANSITION_QUARANTINE_CONFIRM_BLOCK":
if !state.Policy.Quarantined {
return false, fmt.Errorf("%w: not quarantined", ErrPrecondition)
}
changed := false
if !state.Control.Paused {
state.Control.Paused = true
changed = true
}
if !state.Control.Degraded {
state.Control.Degraded = true
changed = true
}
if state.Control.Recovering {
state.Control.Recovering = false
changed = true
}
return changed, nil
default:
return false, fmt.Errorf("%w: %s", ErrUnknownTransition, name)
}
}
func joinRationale(r []string) string {
if len(r) == 0 {
return ""
}
return strings.Join(r, "; ")
}
func computeStateHash(state OrchestratorState) (string, error) {
payload, err := canonicalJSON(state)
if err != nil {
return "", err
}
sum := sha256.Sum256(payload)
return hex.EncodeToString(sum[:]), nil
}
func canonicalJSON(value any) ([]byte, error) {
buf, err := json.Marshal(value)
if err != nil {
return nil, err
}
return buf, nil
}

45
replay.go Normal file
View File

@@ -0,0 +1,45 @@
package swoosh
import "fmt"
// Replay deterministically replays WAL records on top of a snapshot and returns the resulting state.
func Replay(wal WALStore, snapshot Snapshot) (OrchestratorState, error) {
base, err := cloneState(snapshot.State)
if err != nil {
return OrchestratorState{}, fmt.Errorf("clone snapshot state: %w", err)
}
if base.StateHash == "" {
hash, err := computeStateHash(base)
if err != nil {
return OrchestratorState{}, fmt.Errorf("compute snapshot hash: %w", err)
}
base.StateHash = hash
}
start := snapshot.LastAppliedIndex + 1
records, err := wal.Replay(start)
if err != nil {
return OrchestratorState{}, fmt.Errorf("replay wal: %w", err)
}
state := base
for _, record := range records {
if state.StateHash != record.StatePreHash {
return OrchestratorState{}, fmt.Errorf("wal pre-hash mismatch at index %d", record.Index)
}
next, err := Reduce(state, record.Transition, record.Guard)
if err != nil {
return OrchestratorState{}, fmt.Errorf("reduce wal record %d: %w", record.Index, err)
}
if next.StateHash != record.StatePostHash {
return OrchestratorState{}, fmt.Errorf("wal post-hash mismatch at index %d", record.Index)
}
state = next
}
return state, nil
}

117
snapshot.go Normal file
View File

@@ -0,0 +1,117 @@
package swoosh
import (
"encoding/json"
"errors"
"fmt"
"os"
"path/filepath"
)
// Snapshot captures a serialized view of the orchestrator state and replay cursor.
type Snapshot struct {
State OrchestratorState `json:"state"`
LastAppliedHLC string `json:"last_applied_hlc"`
LastAppliedIndex uint64 `json:"last_applied_index"`
}
// SnapshotStore persists and loads orchestrator snapshots.
type SnapshotStore interface {
Save(s Snapshot) error
LoadLatest() (Snapshot, error)
}
// ErrSnapshotNotFound indicates there is no stored snapshot.
var ErrSnapshotNotFound = errors.New("snapshot not found")
// FileSnapshotStore stores snapshots using atomic file replacement.
type FileSnapshotStore struct {
path string
}
// NewFileSnapshotStore creates a snapshot store at the supplied path.
func NewFileSnapshotStore(path string) *FileSnapshotStore {
return &FileSnapshotStore{path: path}
}
// Save writes the snapshot with atomic replace semantics.
func (s *FileSnapshotStore) Save(snapshot Snapshot) error {
dir := filepath.Dir(s.path)
if err := os.MkdirAll(dir, 0o755); err != nil {
return fmt.Errorf("create snapshot directory: %w", err)
}
payload, err := canonicalJSON(snapshot)
if err != nil {
return fmt.Errorf("marshal snapshot: %w", err)
}
temp, err := os.CreateTemp(dir, "snapshot-*.tmp")
if err != nil {
return fmt.Errorf("create temp snapshot: %w", err)
}
tempName := temp.Name()
if _, err := temp.Write(payload); err != nil {
temp.Close()
os.Remove(tempName)
return fmt.Errorf("write snapshot: %w", err)
}
if err := temp.Sync(); err != nil {
temp.Close()
os.Remove(tempName)
return fmt.Errorf("sync snapshot: %w", err)
}
if err := temp.Close(); err != nil {
os.Remove(tempName)
return fmt.Errorf("close snapshot temp file: %w", err)
}
if err := os.Rename(tempName, s.path); err != nil {
os.Remove(tempName)
return fmt.Errorf("rename snapshot: %w", err)
}
return nil
}
// LoadLatest returns the persisted snapshot or ErrSnapshotNotFound if absent.
func (s *FileSnapshotStore) LoadLatest() (Snapshot, error) {
payload, err := os.ReadFile(s.path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return Snapshot{}, ErrSnapshotNotFound
}
return Snapshot{}, fmt.Errorf("read snapshot: %w", err)
}
var snapshot Snapshot
if err := json.Unmarshal(payload, &snapshot); err != nil {
return Snapshot{}, fmt.Errorf("decode snapshot: %w", err)
}
return snapshot, nil
}
// InMemorySnapshotStore fulfills SnapshotStore in memory for testing.
type InMemorySnapshotStore struct {
snapshot Snapshot
has bool
}
// Save stores the snapshot in memory.
func (s *InMemorySnapshotStore) Save(snapshot Snapshot) error {
s.snapshot = snapshot
s.has = true
return nil
}
// LoadLatest retrieves the stored snapshot or ErrSnapshotNotFound.
func (s *InMemorySnapshotStore) LoadLatest() (Snapshot, error) {
if !s.has {
return Snapshot{}, ErrSnapshotNotFound
}
return s.snapshot, nil
}

123
state.go Normal file
View File

@@ -0,0 +1,123 @@
package swoosh
import "time"
// CouncilMember represents one elected or proposed council participant.
// This struct must be stable across replicas and serialization order.
type CouncilMember struct {
ID string `json:"id"`
Role string `json:"role"`
PublicKey string `json:"public_key"`
VoteWeight int `json:"vote_weight"`
Signatures []string `json:"signatures"`
Status string `json:"status"`
Epoch uint64 `json:"epoch"`
}
// EnvResource defines one environment allocation or provisioned asset.
// It is used by the Environment phase to track compute or service resources.
type EnvResource struct {
ID string `json:"id"`
Type string `json:"type"`
Address string `json:"address"`
Capacity string `json:"capacity"`
AllocatedAt time.Time `json:"allocated_at"`
ReleasedAt time.Time `json:"released_at"`
HealthStatus string `json:"health_status"`
Provider string `json:"provider"`
}
// OrchestratorState captures the full replicated SWOOSH state machine.
type OrchestratorState struct {
Meta struct {
Version string
SchemaHash string
}
Boot struct {
Licensed bool
LicenseExpiry time.Time
NodeID string
}
Ingestion struct {
Phase string
ContentHash string
SourceSet []string
LastError string
Epoch uint64
}
Council struct {
Phase string
PlannedRoles []string
Members []CouncilMember
QuorumCertHash string
MCPHealthGreen bool
Epoch uint64
}
Environment struct {
Phase string
CapacityOK bool
Health string
Resources []EnvResource
Epoch uint64
}
Execution struct {
Phase string
ActiveWindowID string
BeatIndex uint64
PlanLocked bool
Approvals uint32
Epoch uint64
}
Control struct {
Paused bool
Degraded bool
Recovering bool
}
Policy struct {
Quarantined bool
Rationale string
}
HLCLast string
StateHash string
}
// TransitionProposal encapsulates a requested state transition.
type TransitionProposal struct {
CurrentStateHash string `json:"current_state_hash"`
TransitionName string `json:"transition"`
InputsHash string `json:"inputs_hash"`
Signer string `json:"signer"`
IdemKey string `json:"idem_key"`
HLC string `json:"hlc"`
WindowID string `json:"window_id"`
Evidence []string `json:"evidence"`
}
// GuardOutcome aggregates guard evaluation for a transition.
type GuardOutcome struct {
LicenseOK bool
BackbeatOK bool
QuorumOK bool
PolicyOK bool
MCPHealthy bool
Rationale []string
}
// WALRecord captures append-only transition metadata for replay and audit.
type WALRecord struct {
StatePreHash string `json:"state_pre_hash"`
StatePostHash string `json:"state_post_hash"`
Transition TransitionProposal `json:"transition"`
Guard GuardOutcome `json:"guard"`
AppliedAtHLC string `json:"applied_hlc"`
AppliedAtUnixNs int64 `json:"applied_unix_ns"`
Index uint64 `json:"index"`
}

186
wal.go Normal file
View File

@@ -0,0 +1,186 @@
package swoosh
import (
"bufio"
"encoding/json"
"errors"
"fmt"
"os"
)
// WALStore persists state transition records for deterministic replay.
type WALStore interface {
Append(record WALRecord) error
Replay(fromIndex uint64) ([]WALRecord, error)
Sync() error
LastIndex() uint64
}
// FileWAL implements WALStore using an append-only file.
type FileWAL struct {
path string
file *os.File
lastIndex uint64
}
// NewFileWAL constructs a file-backed write-ahead log at the given path.
func NewFileWAL(path string) (*FileWAL, error) {
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o600)
if err != nil {
return nil, fmt.Errorf("open wal: %w", err)
}
wal := &FileWAL{
path: path,
file: file,
}
if err := wal.bootstrapLastIndex(); err != nil {
_ = file.Close()
return nil, err
}
return wal, nil
}
// Append writes the WALRecord to the log and fsyncs the file.
func (w *FileWAL) Append(record WALRecord) error {
if record.Index == 0 {
return errors.New("wal record index must be > 0")
}
if record.Index <= w.lastIndex {
return fmt.Errorf("wal index regression: %d <= %d", record.Index, w.lastIndex)
}
payload, err := canonicalJSON(record)
if err != nil {
return fmt.Errorf("marshal wal record: %w", err)
}
if _, err := w.file.Write(append(payload, '\n')); err != nil {
return fmt.Errorf("append wal: %w", err)
}
if err := w.Sync(); err != nil {
return err
}
w.lastIndex = record.Index
return nil
}
// Replay reads records with index >= fromIndex in order.
func (w *FileWAL) Replay(fromIndex uint64) ([]WALRecord, error) {
reader, err := os.Open(w.path)
if err != nil {
if errors.Is(err, os.ErrNotExist) {
return nil, nil
}
return nil, fmt.Errorf("open wal for replay: %w", err)
}
defer reader.Close()
scanner := bufio.NewScanner(reader)
buf := make([]byte, 0, 64*1024)
scanner.Buffer(buf, 16*1024*1024)
var records []WALRecord
for scanner.Scan() {
line := scanner.Bytes()
if len(line) == 0 {
continue
}
var record WALRecord
if err := json.Unmarshal(line, &record); err != nil {
return nil, fmt.Errorf("decode wal record: %w", err)
}
if record.Index >= fromIndex {
records = append(records, record)
}
}
if err := scanner.Err(); err != nil {
return nil, fmt.Errorf("scan wal: %w", err)
}
return records, nil
}
// Sync fsyncs the underlying file to persist appended records.
func (w *FileWAL) Sync() error {
if err := w.file.Sync(); err != nil {
return fmt.Errorf("sync wal: %w", err)
}
return nil
}
// LastIndex returns the highest record index stored in the WAL.
func (w *FileWAL) LastIndex() uint64 {
return w.lastIndex
}
// Close releases resources associated with the FileWAL.
func (w *FileWAL) Close() error {
if w.file == nil {
return nil
}
return w.file.Close()
}
func (w *FileWAL) bootstrapLastIndex() error {
records, err := w.Replay(0)
if err != nil {
return err
}
if len(records) == 0 {
w.lastIndex = 0
return nil
}
w.lastIndex = records[len(records)-1].Index
return nil
}
// InMemoryWAL provides an in-memory WAL implementation for tests.
type InMemoryWAL struct {
records []WALRecord
}
// Append adds the record to the in-memory sequence.
func (w *InMemoryWAL) Append(record WALRecord) error {
if record.Index == 0 {
return errors.New("wal record index must be > 0")
}
if len(w.records) > 0 && record.Index <= w.records[len(w.records)-1].Index {
return fmt.Errorf("wal index regression: %d", record.Index)
}
w.records = append(w.records, record)
return nil
}
// Replay returns a copy of stored records from the requested index onward.
func (w *InMemoryWAL) Replay(fromIndex uint64) ([]WALRecord, error) {
if len(w.records) == 0 {
return nil, nil
}
var out []WALRecord
for _, record := range w.records {
if record.Index >= fromIndex {
out = append(out, record)
}
}
return out, nil
}
// Sync is a no-op for the in-memory WAL.
func (w *InMemoryWAL) Sync() error { return nil }
// LastIndex returns the latest stored index.
func (w *InMemoryWAL) LastIndex() uint64 {
if len(w.records) == 0 {
return 0
}
return w.records[len(w.records)-1].Index
}