259 lines
6.3 KiB
Go
259 lines
6.3 KiB
Go
package examples
|
|
|
|
import (
|
|
"context"
|
|
"crypto/ed25519"
|
|
"crypto/rand"
|
|
"fmt"
|
|
"log/slog"
|
|
"math"
|
|
mathRand "math/rand"
|
|
"os"
|
|
"os/signal"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/chorus-services/backbeat/pkg/sdk"
|
|
)
|
|
|
|
// Task represents a work item with beat budget requirements
|
|
type Task struct {
|
|
ID string
|
|
Description string
|
|
BeatBudget int // Maximum beats allowed for completion
|
|
WorkTime time.Duration // Simulated work duration
|
|
Created time.Time
|
|
}
|
|
|
|
// TaskProcessor demonstrates beat budget usage and timeout management
|
|
// This example shows how to use beat budgets for reliable task execution
|
|
func TaskProcessor() {
|
|
// Generate a signing key for this example
|
|
_, signingKey, err := ed25519.GenerateKey(rand.Reader)
|
|
if err != nil {
|
|
slog.Error("Failed to generate signing key", "error", err)
|
|
return
|
|
}
|
|
|
|
// Create SDK configuration
|
|
config := sdk.DefaultConfig()
|
|
config.ClusterID = "chorus-dev"
|
|
config.AgentID = "task-processor"
|
|
config.NATSUrl = "nats://localhost:4222"
|
|
config.SigningKey = signingKey
|
|
config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{
|
|
Level: slog.LevelDebug,
|
|
}))
|
|
|
|
// Create BACKBEAT client
|
|
client := sdk.NewClient(config)
|
|
|
|
// Task management
|
|
var (
|
|
taskQueue = make(chan *Task, 100)
|
|
activeTasks = make(map[string]*Task)
|
|
completedTasks = 0
|
|
failedTasks = 0
|
|
taskMutex sync.RWMutex
|
|
)
|
|
|
|
// Register beat callback for status reporting
|
|
client.OnBeat(func(beat sdk.BeatFrame) {
|
|
taskMutex.RLock()
|
|
activeCount := len(activeTasks)
|
|
taskMutex.RUnlock()
|
|
|
|
// Emit status every 2 beats
|
|
if beat.BeatIndex%2 == 0 {
|
|
state := "waiting"
|
|
if activeCount > 0 {
|
|
state = "executing"
|
|
}
|
|
|
|
progress := float64(completedTasks) / float64(completedTasks+failedTasks+activeCount+len(taskQueue))
|
|
if math.IsNaN(progress) {
|
|
progress = 0.0
|
|
}
|
|
|
|
err := client.EmitStatusClaim(sdk.StatusClaim{
|
|
State: state,
|
|
BeatsLeft: activeCount * 5, // Estimate 5 beats per active task
|
|
Progress: progress,
|
|
Notes: fmt.Sprintf("Active: %d, Completed: %d, Failed: %d, Queue: %d",
|
|
activeCount, completedTasks, failedTasks, len(taskQueue)),
|
|
})
|
|
if err != nil {
|
|
slog.Error("Failed to emit status claim", "error", err)
|
|
}
|
|
}
|
|
})
|
|
|
|
// Register downbeat callback to create new tasks
|
|
client.OnDownbeat(func(beat sdk.BeatFrame) {
|
|
slog.Info("New bar - creating tasks",
|
|
"beat_index", beat.BeatIndex,
|
|
"window_id", beat.WindowID)
|
|
|
|
// Create 1-3 new tasks each bar
|
|
numTasks := mathRand.Intn(3) + 1
|
|
for i := 0; i < numTasks; i++ {
|
|
task := &Task{
|
|
ID: fmt.Sprintf("task-%d-%d", beat.BeatIndex, i),
|
|
Description: fmt.Sprintf("Process data batch %d", i),
|
|
BeatBudget: mathRand.Intn(8) + 2, // 2-10 beat budget
|
|
WorkTime: time.Duration(mathRand.Intn(3)+1) * time.Second, // 1-4 seconds of work
|
|
Created: time.Now(),
|
|
}
|
|
|
|
select {
|
|
case taskQueue <- task:
|
|
slog.Debug("Task created", "task_id", task.ID, "budget", task.BeatBudget)
|
|
default:
|
|
slog.Warn("Task queue full, dropping task", "task_id", task.ID)
|
|
}
|
|
}
|
|
})
|
|
|
|
// Setup graceful shutdown
|
|
ctx, cancel := context.WithCancel(context.Background())
|
|
defer cancel()
|
|
|
|
// Handle shutdown signals
|
|
sigChan := make(chan os.Signal, 1)
|
|
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
|
|
|
|
go func() {
|
|
<-sigChan
|
|
slog.Info("Shutdown signal received")
|
|
cancel()
|
|
}()
|
|
|
|
// Start the client
|
|
if err := client.Start(ctx); err != nil {
|
|
slog.Error("Failed to start BACKBEAT client", "error", err)
|
|
return
|
|
}
|
|
defer client.Stop()
|
|
|
|
slog.Info("Task processor started - use Ctrl+C to stop")
|
|
|
|
// Start task workers
|
|
const numWorkers = 3
|
|
for i := 0; i < numWorkers; i++ {
|
|
go func(workerID int) {
|
|
for {
|
|
select {
|
|
case <-ctx.Done():
|
|
return
|
|
case task := <-taskQueue:
|
|
processTaskWithBudget(ctx, client, task, workerID, &taskMutex, activeTasks, &completedTasks, &failedTasks)
|
|
}
|
|
}
|
|
}(i)
|
|
}
|
|
|
|
// Wait for shutdown
|
|
<-ctx.Done()
|
|
slog.Info("Task processor shutting down")
|
|
}
|
|
|
|
// processTaskWithBudget processes a task using BACKBEAT beat budgets
|
|
func processTaskWithBudget(
|
|
ctx context.Context,
|
|
client sdk.Client,
|
|
task *Task,
|
|
workerID int,
|
|
taskMutex *sync.RWMutex,
|
|
activeTasks map[string]*Task,
|
|
completedTasks *int,
|
|
failedTasks *int,
|
|
) {
|
|
// Add task to active tasks
|
|
taskMutex.Lock()
|
|
activeTasks[task.ID] = task
|
|
taskMutex.Unlock()
|
|
|
|
// Remove from active tasks when done
|
|
defer func() {
|
|
taskMutex.Lock()
|
|
delete(activeTasks, task.ID)
|
|
taskMutex.Unlock()
|
|
}()
|
|
|
|
slog.Info("Processing task",
|
|
"worker", workerID,
|
|
"task_id", task.ID,
|
|
"budget", task.BeatBudget,
|
|
"work_time", task.WorkTime)
|
|
|
|
// Use beat budget to execute the task
|
|
err := client.WithBeatBudget(task.BeatBudget, func() error {
|
|
// Emit starting status
|
|
client.EmitStatusClaim(sdk.StatusClaim{
|
|
TaskID: task.ID,
|
|
State: "executing",
|
|
BeatsLeft: task.BeatBudget,
|
|
Progress: 0.0,
|
|
Notes: fmt.Sprintf("Worker %d processing %s", workerID, task.Description),
|
|
})
|
|
|
|
// Simulate work with progress updates
|
|
steps := 5
|
|
stepDuration := task.WorkTime / time.Duration(steps)
|
|
|
|
for step := 0; step < steps; step++ {
|
|
select {
|
|
case <-ctx.Done():
|
|
return ctx.Err()
|
|
case <-time.After(stepDuration):
|
|
progress := float64(step+1) / float64(steps)
|
|
|
|
client.EmitStatusClaim(sdk.StatusClaim{
|
|
TaskID: task.ID,
|
|
State: "executing",
|
|
BeatsLeft: int(float64(task.BeatBudget) * (1.0 - progress)),
|
|
Progress: progress,
|
|
Notes: fmt.Sprintf("Worker %d step %d/%d", workerID, step+1, steps),
|
|
})
|
|
}
|
|
}
|
|
|
|
return nil
|
|
})
|
|
|
|
// Handle completion or timeout
|
|
if err != nil {
|
|
slog.Warn("Task failed or timed out",
|
|
"worker", workerID,
|
|
"task_id", task.ID,
|
|
"error", err)
|
|
|
|
*failedTasks++
|
|
|
|
// Emit failure status
|
|
client.EmitStatusClaim(sdk.StatusClaim{
|
|
TaskID: task.ID,
|
|
State: "failed",
|
|
BeatsLeft: 0,
|
|
Progress: 0.0,
|
|
Notes: fmt.Sprintf("Worker %d failed: %s", workerID, err.Error()),
|
|
})
|
|
} else {
|
|
slog.Info("Task completed successfully",
|
|
"worker", workerID,
|
|
"task_id", task.ID,
|
|
"duration", time.Since(task.Created))
|
|
|
|
*completedTasks++
|
|
|
|
// Emit completion status
|
|
client.EmitStatusClaim(sdk.StatusClaim{
|
|
TaskID: task.ID,
|
|
State: "done",
|
|
BeatsLeft: 0,
|
|
Progress: 1.0,
|
|
Notes: fmt.Sprintf("Worker %d completed %s", workerID, task.Description),
|
|
})
|
|
}
|
|
} |