187 lines
4.1 KiB
Go
187 lines
4.1 KiB
Go
package swoosh
|
|
|
|
import (
|
|
"bufio"
|
|
"encoding/json"
|
|
"errors"
|
|
"fmt"
|
|
"os"
|
|
)
|
|
|
|
// WALStore persists state transition records for deterministic replay.
|
|
type WALStore interface {
|
|
Append(record WALRecord) error
|
|
Replay(fromIndex uint64) ([]WALRecord, error)
|
|
Sync() error
|
|
LastIndex() uint64
|
|
}
|
|
|
|
// FileWAL implements WALStore using an append-only file.
|
|
type FileWAL struct {
|
|
path string
|
|
file *os.File
|
|
lastIndex uint64
|
|
}
|
|
|
|
// NewFileWAL constructs a file-backed write-ahead log at the given path.
|
|
func NewFileWAL(path string) (*FileWAL, error) {
|
|
file, err := os.OpenFile(path, os.O_CREATE|os.O_RDWR|os.O_APPEND, 0o600)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("open wal: %w", err)
|
|
}
|
|
|
|
wal := &FileWAL{
|
|
path: path,
|
|
file: file,
|
|
}
|
|
|
|
if err := wal.bootstrapLastIndex(); err != nil {
|
|
_ = file.Close()
|
|
return nil, err
|
|
}
|
|
|
|
return wal, nil
|
|
}
|
|
|
|
// Append writes the WALRecord to the log and fsyncs the file.
|
|
func (w *FileWAL) Append(record WALRecord) error {
|
|
if record.Index == 0 {
|
|
return errors.New("wal record index must be > 0")
|
|
}
|
|
|
|
if record.Index <= w.lastIndex {
|
|
return fmt.Errorf("wal index regression: %d <= %d", record.Index, w.lastIndex)
|
|
}
|
|
|
|
payload, err := canonicalJSON(record)
|
|
if err != nil {
|
|
return fmt.Errorf("marshal wal record: %w", err)
|
|
}
|
|
|
|
if _, err := w.file.Write(append(payload, '\n')); err != nil {
|
|
return fmt.Errorf("append wal: %w", err)
|
|
}
|
|
|
|
if err := w.Sync(); err != nil {
|
|
return err
|
|
}
|
|
|
|
w.lastIndex = record.Index
|
|
return nil
|
|
}
|
|
|
|
// Replay reads records with index >= fromIndex in order.
|
|
func (w *FileWAL) Replay(fromIndex uint64) ([]WALRecord, error) {
|
|
reader, err := os.Open(w.path)
|
|
if err != nil {
|
|
if errors.Is(err, os.ErrNotExist) {
|
|
return nil, nil
|
|
}
|
|
return nil, fmt.Errorf("open wal for replay: %w", err)
|
|
}
|
|
defer reader.Close()
|
|
|
|
scanner := bufio.NewScanner(reader)
|
|
buf := make([]byte, 0, 64*1024)
|
|
scanner.Buffer(buf, 16*1024*1024)
|
|
|
|
var records []WALRecord
|
|
for scanner.Scan() {
|
|
line := scanner.Bytes()
|
|
if len(line) == 0 {
|
|
continue
|
|
}
|
|
|
|
var record WALRecord
|
|
if err := json.Unmarshal(line, &record); err != nil {
|
|
return nil, fmt.Errorf("decode wal record: %w", err)
|
|
}
|
|
|
|
if record.Index >= fromIndex {
|
|
records = append(records, record)
|
|
}
|
|
}
|
|
|
|
if err := scanner.Err(); err != nil {
|
|
return nil, fmt.Errorf("scan wal: %w", err)
|
|
}
|
|
|
|
return records, nil
|
|
}
|
|
|
|
// Sync fsyncs the underlying file to persist appended records.
|
|
func (w *FileWAL) Sync() error {
|
|
if err := w.file.Sync(); err != nil {
|
|
return fmt.Errorf("sync wal: %w", err)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
// LastIndex returns the highest record index stored in the WAL.
|
|
func (w *FileWAL) LastIndex() uint64 {
|
|
return w.lastIndex
|
|
}
|
|
|
|
// Close releases resources associated with the FileWAL.
|
|
func (w *FileWAL) Close() error {
|
|
if w.file == nil {
|
|
return nil
|
|
}
|
|
return w.file.Close()
|
|
}
|
|
|
|
func (w *FileWAL) bootstrapLastIndex() error {
|
|
records, err := w.Replay(0)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
if len(records) == 0 {
|
|
w.lastIndex = 0
|
|
return nil
|
|
}
|
|
w.lastIndex = records[len(records)-1].Index
|
|
return nil
|
|
}
|
|
|
|
// InMemoryWAL provides an in-memory WAL implementation for tests.
|
|
type InMemoryWAL struct {
|
|
records []WALRecord
|
|
}
|
|
|
|
// Append adds the record to the in-memory sequence.
|
|
func (w *InMemoryWAL) Append(record WALRecord) error {
|
|
if record.Index == 0 {
|
|
return errors.New("wal record index must be > 0")
|
|
}
|
|
if len(w.records) > 0 && record.Index <= w.records[len(w.records)-1].Index {
|
|
return fmt.Errorf("wal index regression: %d", record.Index)
|
|
}
|
|
w.records = append(w.records, record)
|
|
return nil
|
|
}
|
|
|
|
// Replay returns a copy of stored records from the requested index onward.
|
|
func (w *InMemoryWAL) Replay(fromIndex uint64) ([]WALRecord, error) {
|
|
if len(w.records) == 0 {
|
|
return nil, nil
|
|
}
|
|
var out []WALRecord
|
|
for _, record := range w.records {
|
|
if record.Index >= fromIndex {
|
|
out = append(out, record)
|
|
}
|
|
}
|
|
return out, nil
|
|
}
|
|
|
|
// Sync is a no-op for the in-memory WAL.
|
|
func (w *InMemoryWAL) Sync() error { return nil }
|
|
|
|
// LastIndex returns the latest stored index.
|
|
func (w *InMemoryWAL) LastIndex() uint64 {
|
|
if len(w.records) == 0 {
|
|
return 0
|
|
}
|
|
return w.records[len(w.records)-1].Index
|
|
}
|