Files
BACKBEAT/pkg/sdk/examples/simple_agent.go
2025-10-17 08:56:25 +11:00

150 lines
3.7 KiB
Go

// Package examples demonstrates BACKBEAT SDK usage patterns
package examples
import (
"context"
"crypto/ed25519"
"crypto/rand"
"fmt"
"log/slog"
"os"
"os/signal"
"sync/atomic"
"syscall"
"time"
"github.com/chorus-services/backbeat/pkg/sdk"
)
// SimpleAgent demonstrates basic BACKBEAT SDK usage
// This example shows the minimal integration pattern for CHORUS services
func SimpleAgent() {
// Generate a signing key for this example
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
slog.Error("Failed to generate signing key", "error", err)
return
}
// Create SDK configuration
config := sdk.DefaultConfig()
config.ClusterID = "chorus-dev"
config.AgentID = "simple-agent"
config.NATSUrl = "nats://localhost:4222" // Adjust for your setup
config.SigningKey = signingKey
config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelInfo,
}))
// Create BACKBEAT client
client := sdk.NewClient(config)
// Track some simple state
var taskCounter int64
var completedTasks int64
// Register beat callback - this runs on every beat
client.OnBeat(func(beat sdk.BeatFrame) {
currentTasks := atomic.LoadInt64(&taskCounter)
completed := atomic.LoadInt64(&completedTasks)
// Emit status every few beats
if beat.BeatIndex%3 == 0 {
progress := 0.0
if currentTasks > 0 {
progress = float64(completed) / float64(currentTasks)
}
err := client.EmitStatusClaim(sdk.StatusClaim{
State: determineState(currentTasks, completed),
BeatsLeft: calculateBeatsLeft(currentTasks, completed),
Progress: progress,
Notes: fmt.Sprintf("Processing tasks: %d/%d", completed, currentTasks),
})
if err != nil {
slog.Error("Failed to emit status claim", "error", err)
}
}
})
// Register downbeat callback - this runs at the start of each bar
client.OnDownbeat(func(beat sdk.BeatFrame) {
slog.Info("Bar started",
"beat_index", beat.BeatIndex,
"window_id", beat.WindowID,
"phase", beat.Phase)
// Start new tasks at the beginning of bars
atomic.AddInt64(&taskCounter, 2) // Add 2 new tasks per bar
})
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
slog.Info("Shutdown signal received")
cancel()
}()
// Start the client
if err := client.Start(ctx); err != nil {
slog.Error("Failed to start BACKBEAT client", "error", err)
return
}
defer client.Stop()
slog.Info("Simple agent started - use Ctrl+C to stop")
// Simulate some work - complete tasks periodically
ticker := time.NewTicker(2 * time.Second)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
slog.Info("Shutting down simple agent")
return
case <-ticker.C:
// Complete a task if we have any pending
current := atomic.LoadInt64(&taskCounter)
completed := atomic.LoadInt64(&completedTasks)
if completed < current {
atomic.AddInt64(&completedTasks, 1)
slog.Debug("Completed a task",
"completed", completed+1,
"total", current)
}
}
}
}
// determineState calculates the current state based on task progress
func determineState(total, completed int64) string {
if total == 0 {
return "waiting"
}
if completed == total {
return "done"
}
if completed > 0 {
return "executing"
}
return "planning"
}
// calculateBeatsLeft estimates beats remaining based on current progress
func calculateBeatsLeft(total, completed int64) int {
if total == 0 || completed >= total {
return 0
}
remaining := total - completed
// Assume each task takes about 5 beats to complete
return int(remaining * 5)
}