Files
BACKBEAT/pkg/sdk/examples/task_processor.go
2025-10-17 08:56:25 +11:00

259 lines
6.3 KiB
Go

package examples
import (
"context"
"crypto/ed25519"
"crypto/rand"
"fmt"
"log/slog"
"math"
mathRand "math/rand"
"os"
"os/signal"
"sync"
"syscall"
"time"
"github.com/chorus-services/backbeat/pkg/sdk"
)
// Task represents a work item with beat budget requirements
type Task struct {
ID string
Description string
BeatBudget int // Maximum beats allowed for completion
WorkTime time.Duration // Simulated work duration
Created time.Time
}
// TaskProcessor demonstrates beat budget usage and timeout management
// This example shows how to use beat budgets for reliable task execution
func TaskProcessor() {
// Generate a signing key for this example
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
if err != nil {
slog.Error("Failed to generate signing key", "error", err)
return
}
// Create SDK configuration
config := sdk.DefaultConfig()
config.ClusterID = "chorus-dev"
config.AgentID = "task-processor"
config.NATSUrl = "nats://localhost:4222"
config.SigningKey = signingKey
config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
Level: slog.LevelDebug,
}))
// Create BACKBEAT client
client := sdk.NewClient(config)
// Task management
var (
taskQueue = make(chan *Task, 100)
activeTasks = make(map[string]*Task)
completedTasks = 0
failedTasks = 0
taskMutex sync.RWMutex
)
// Register beat callback for status reporting
client.OnBeat(func(beat sdk.BeatFrame) {
taskMutex.RLock()
activeCount := len(activeTasks)
taskMutex.RUnlock()
// Emit status every 2 beats
if beat.BeatIndex%2 == 0 {
state := "waiting"
if activeCount > 0 {
state = "executing"
}
progress := float64(completedTasks) / float64(completedTasks+failedTasks+activeCount+len(taskQueue))
if math.IsNaN(progress) {
progress = 0.0
}
err := client.EmitStatusClaim(sdk.StatusClaim{
State: state,
BeatsLeft: activeCount * 5, // Estimate 5 beats per active task
Progress: progress,
Notes: fmt.Sprintf("Active: %d, Completed: %d, Failed: %d, Queue: %d",
activeCount, completedTasks, failedTasks, len(taskQueue)),
})
if err != nil {
slog.Error("Failed to emit status claim", "error", err)
}
}
})
// Register downbeat callback to create new tasks
client.OnDownbeat(func(beat sdk.BeatFrame) {
slog.Info("New bar - creating tasks",
"beat_index", beat.BeatIndex,
"window_id", beat.WindowID)
// Create 1-3 new tasks each bar
numTasks := mathRand.Intn(3) + 1
for i := 0; i < numTasks; i++ {
task := &Task{
ID: fmt.Sprintf("task-%d-%d", beat.BeatIndex, i),
Description: fmt.Sprintf("Process data batch %d", i),
BeatBudget: mathRand.Intn(8) + 2, // 2-10 beat budget
WorkTime: time.Duration(mathRand.Intn(3)+1) * time.Second, // 1-4 seconds of work
Created: time.Now(),
}
select {
case taskQueue <- task:
slog.Debug("Task created", "task_id", task.ID, "budget", task.BeatBudget)
default:
slog.Warn("Task queue full, dropping task", "task_id", task.ID)
}
}
})
// Setup graceful shutdown
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// Handle shutdown signals
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
go func() {
<-sigChan
slog.Info("Shutdown signal received")
cancel()
}()
// Start the client
if err := client.Start(ctx); err != nil {
slog.Error("Failed to start BACKBEAT client", "error", err)
return
}
defer client.Stop()
slog.Info("Task processor started - use Ctrl+C to stop")
// Start task workers
const numWorkers = 3
for i := 0; i < numWorkers; i++ {
go func(workerID int) {
for {
select {
case <-ctx.Done():
return
case task := <-taskQueue:
processTaskWithBudget(ctx, client, task, workerID, &taskMutex, activeTasks, &completedTasks, &failedTasks)
}
}
}(i)
}
// Wait for shutdown
<-ctx.Done()
slog.Info("Task processor shutting down")
}
// processTaskWithBudget processes a task using BACKBEAT beat budgets
func processTaskWithBudget(
ctx context.Context,
client sdk.Client,
task *Task,
workerID int,
taskMutex *sync.RWMutex,
activeTasks map[string]*Task,
completedTasks *int,
failedTasks *int,
) {
// Add task to active tasks
taskMutex.Lock()
activeTasks[task.ID] = task
taskMutex.Unlock()
// Remove from active tasks when done
defer func() {
taskMutex.Lock()
delete(activeTasks, task.ID)
taskMutex.Unlock()
}()
slog.Info("Processing task",
"worker", workerID,
"task_id", task.ID,
"budget", task.BeatBudget,
"work_time", task.WorkTime)
// Use beat budget to execute the task
err := client.WithBeatBudget(task.BeatBudget, func() error {
// Emit starting status
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: task.ID,
State: "executing",
BeatsLeft: task.BeatBudget,
Progress: 0.0,
Notes: fmt.Sprintf("Worker %d processing %s", workerID, task.Description),
})
// Simulate work with progress updates
steps := 5
stepDuration := task.WorkTime / time.Duration(steps)
for step := 0; step < steps; step++ {
select {
case <-ctx.Done():
return ctx.Err()
case <-time.After(stepDuration):
progress := float64(step+1) / float64(steps)
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: task.ID,
State: "executing",
BeatsLeft: int(float64(task.BeatBudget) * (1.0 - progress)),
Progress: progress,
Notes: fmt.Sprintf("Worker %d step %d/%d", workerID, step+1, steps),
})
}
}
return nil
})
// Handle completion or timeout
if err != nil {
slog.Warn("Task failed or timed out",
"worker", workerID,
"task_id", task.ID,
"error", err)
*failedTasks++
// Emit failure status
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: task.ID,
State: "failed",
BeatsLeft: 0,
Progress: 0.0,
Notes: fmt.Sprintf("Worker %d failed: %s", workerID, err.Error()),
})
} else {
slog.Info("Task completed successfully",
"worker", workerID,
"task_id", task.ID,
"duration", time.Since(task.Created))
*completedTasks++
// Emit completion status
client.EmitStatusClaim(sdk.StatusClaim{
TaskID: task.ID,
State: "done",
BeatsLeft: 0,
Progress: 1.0,
Notes: fmt.Sprintf("Worker %d completed %s", workerID, task.Description),
})
}
}