Files
CHORUS/vendor/github.com/ipfs/go-datastore/autobatch/autobatch.go
anthonyrawlins 9bdcbe0447 Integrate BACKBEAT SDK and resolve KACHING license validation
Major integrations and fixes:
- Added BACKBEAT SDK integration for P2P operation timing
- Implemented beat-aware status tracking for distributed operations
- Added Docker secrets support for secure license management
- Resolved KACHING license validation via HTTPS/TLS
- Updated docker-compose configuration for clean stack deployment
- Disabled rollback policies to prevent deployment failures
- Added license credential storage (CHORUS-DEV-MULTI-001)

Technical improvements:
- BACKBEAT P2P operation tracking with phase management
- Enhanced configuration system with file-based secrets
- Improved error handling for license validation
- Clean separation of KACHING and CHORUS deployment stacks

🤖 Generated with [Claude Code](https://claude.ai/code)

Co-Authored-By: Claude <noreply@anthropic.com>
2025-09-06 07:56:26 +10:00

175 lines
3.6 KiB
Go

// Package autobatch provides a go-datastore implementation that
// automatically batches together writes by holding puts in memory until
// a certain threshold is met.
package autobatch
import (
"context"
ds "github.com/ipfs/go-datastore"
dsq "github.com/ipfs/go-datastore/query"
)
// Datastore implements a go-datastore.
type Datastore struct {
child ds.Batching
// TODO: discuss making ds.Batch implement the full ds.Datastore interface
buffer map[ds.Key]op
maxBufferEntries int
}
var _ ds.Datastore = (*Datastore)(nil)
var _ ds.PersistentDatastore = (*Datastore)(nil)
type op struct {
delete bool
value []byte
}
// NewAutoBatching returns a new datastore that automatically
// batches writes using the given Batching datastore. The size
// of the memory pool is given by size.
func NewAutoBatching(d ds.Batching, size int) *Datastore {
return &Datastore{
child: d,
buffer: make(map[ds.Key]op, size),
maxBufferEntries: size,
}
}
// Delete deletes a key/value
func (d *Datastore) Delete(ctx context.Context, k ds.Key) error {
d.buffer[k] = op{delete: true}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush(ctx)
}
return nil
}
// Get retrieves a value given a key.
func (d *Datastore) Get(ctx context.Context, k ds.Key) ([]byte, error) {
o, ok := d.buffer[k]
if ok {
if o.delete {
return nil, ds.ErrNotFound
}
return o.value, nil
}
return d.child.Get(ctx, k)
}
// Put stores a key/value.
func (d *Datastore) Put(ctx context.Context, k ds.Key, val []byte) error {
d.buffer[k] = op{value: val}
if len(d.buffer) > d.maxBufferEntries {
return d.Flush(ctx)
}
return nil
}
// Sync flushes all operations on keys at or under the prefix
// from the current batch to the underlying datastore
func (d *Datastore) Sync(ctx context.Context, prefix ds.Key) error {
b, err := d.child.Batch(ctx)
if err != nil {
return err
}
for k, o := range d.buffer {
if !(k.Equal(prefix) || k.IsDescendantOf(prefix)) {
continue
}
var err error
if o.delete {
err = b.Delete(ctx, k)
} else {
err = b.Put(ctx, k, o.value)
}
if err != nil {
return err
}
delete(d.buffer, k)
}
return b.Commit(ctx)
}
// Flush flushes the current batch to the underlying datastore.
func (d *Datastore) Flush(ctx context.Context) error {
b, err := d.child.Batch(ctx)
if err != nil {
return err
}
for k, o := range d.buffer {
var err error
if o.delete {
err = b.Delete(ctx, k)
} else {
err = b.Put(ctx, k, o.value)
}
if err != nil {
return err
}
}
// clear out buffer
d.buffer = make(map[ds.Key]op, d.maxBufferEntries)
return b.Commit(ctx)
}
// Has checks if a key is stored.
func (d *Datastore) Has(ctx context.Context, k ds.Key) (bool, error) {
o, ok := d.buffer[k]
if ok {
return !o.delete, nil
}
return d.child.Has(ctx, k)
}
// GetSize implements Datastore.GetSize
func (d *Datastore) GetSize(ctx context.Context, k ds.Key) (int, error) {
o, ok := d.buffer[k]
if ok {
if o.delete {
return -1, ds.ErrNotFound
}
return len(o.value), nil
}
return d.child.GetSize(ctx, k)
}
// Query performs a query
func (d *Datastore) Query(ctx context.Context, q dsq.Query) (dsq.Results, error) {
err := d.Flush(ctx)
if err != nil {
return nil, err
}
return d.child.Query(ctx, q)
}
// DiskUsage implements the PersistentDatastore interface.
func (d *Datastore) DiskUsage(ctx context.Context) (uint64, error) {
return ds.DiskUsage(ctx, d.child)
}
func (d *Datastore) Close() error {
ctx := context.Background()
err1 := d.Flush(ctx)
err2 := d.child.Close()
if err1 != nil {
return err1
}
if err2 != nil {
return err2
}
return nil
}