package swoosh import ( "encoding/json" "fmt" "sync" "github.com/dgraph-io/badger/v4" ) // BadgerWALStore implements WALStore using BadgerDB for durable, ordered persistence. // Each WALRecord is stored with a monotonic Index as the key. // Badger guarantees durability via LSM tree and WAL at the storage layer. type BadgerWALStore struct { db *badger.DB mu sync.Mutex // Protects lastIndex tracking } // NewBadgerWALStore opens or creates a BadgerDB instance at the given path. // The directory must exist or be creatable. func NewBadgerWALStore(path string) (*BadgerWALStore, error) { opts := badger.DefaultOptions(path) opts.Logger = nil // Suppress badger's internal logging db, err := badger.Open(opts) if err != nil { return nil, fmt.Errorf("open badger wal at %s: %w", path, err) } return &BadgerWALStore{db: db}, nil } // Append writes a WALRecord to disk with monotonic Index. // The record is serialized as JSON and written under key = uint64(Index). // Badger's internal WAL ensures durability before Append returns. func (b *BadgerWALStore) Append(record WALRecord) error { b.mu.Lock() defer b.mu.Unlock() data, err := json.Marshal(record) if err != nil { return fmt.Errorf("marshal wal record: %w", err) } key := indexToKey(record.Index) err = b.db.Update(func(txn *badger.Txn) error { return txn.Set(key, data) }) if err != nil { return fmt.Errorf("badger set index=%d: %w", record.Index, err) } return nil } // Replay returns all WALRecords with Index >= fromIndex in ascending order. // Badger's iterator guarantees key ordering. func (b *BadgerWALStore) Replay(fromIndex uint64) ([]WALRecord, error) { b.mu.Lock() defer b.mu.Unlock() var records []WALRecord err := b.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.PrefetchValues = true it := txn.NewIterator(opts) defer it.Close() startKey := indexToKey(fromIndex) for it.Seek(startKey); it.Valid(); it.Next() { item := it.Item() var record WALRecord err := item.Value(func(val []byte) error { return json.Unmarshal(val, &record) }) if err != nil { return fmt.Errorf("unmarshal wal record: %w", err) } records = append(records, record) } return nil }) if err != nil { return nil, fmt.Errorf("badger replay from %d: %w", fromIndex, err) } return records, nil } // Sync forces Badger to flush any buffered writes to disk. // Badger uses an internal WAL and LSM tree; calling Sync() triggers ValueLogGC // and ensures durable writes up to this point. func (b *BadgerWALStore) Sync() error { b.mu.Lock() defer b.mu.Unlock() // Badger's Sync operation: run value log garbage collection with ratio 0.5 // This forces a flush of pending writes. err := b.db.RunValueLogGC(0.5) if err != nil && err != badger.ErrNoRewrite { return fmt.Errorf("badger sync: %w", err) } return nil } // LastIndex returns the highest Index in the WAL, or 0 if empty. func (b *BadgerWALStore) LastIndex() uint64 { b.mu.Lock() defer b.mu.Unlock() var lastIdx uint64 err := b.db.View(func(txn *badger.Txn) error { opts := badger.DefaultIteratorOptions opts.Reverse = true it := txn.NewIterator(opts) defer it.Close() it.Rewind() if it.Valid() { item := it.Item() var record WALRecord err := item.Value(func(val []byte) error { return json.Unmarshal(val, &record) }) if err == nil { lastIdx = record.Index } } return nil }) if err != nil { return 0 } return lastIdx } // Close closes the BadgerDB instance. func (b *BadgerWALStore) Close() error { return b.db.Close() } // indexToKey converts a uint64 index to a Badger key (8-byte big-endian). // This ensures lexicographic ordering matches numeric ordering. func indexToKey(idx uint64) []byte { key := make([]byte, 8) key[0] = byte(idx >> 56) key[1] = byte(idx >> 48) key[2] = byte(idx >> 40) key[3] = byte(idx >> 32) key[4] = byte(idx >> 24) key[5] = byte(idx >> 16) key[6] = byte(idx >> 8) key[7] = byte(idx) return key }