package examples import ( "context" "crypto/ed25519" "crypto/rand" "fmt" "log/slog" "math" mathRand "math/rand" "os" "os/signal" "sync" "syscall" "time" "github.com/chorus-services/backbeat/pkg/sdk" ) // Task represents a work item with beat budget requirements type Task struct { ID string Description string BeatBudget int // Maximum beats allowed for completion WorkTime time.Duration // Simulated work duration Created time.Time } // TaskProcessor demonstrates beat budget usage and timeout management // This example shows how to use beat budgets for reliable task execution func TaskProcessor() { // Generate a signing key for this example _, signingKey, err := ed25519.GenerateKey(rand.Reader) if err != nil { slog.Error("Failed to generate signing key", "error", err) return } // Create SDK configuration config := sdk.DefaultConfig() config.ClusterID = "chorus-dev" config.AgentID = "task-processor" config.NATSUrl = "nats://localhost:4222" config.SigningKey = signingKey config.Logger = slog.New(slog.NewTextHandler(os.Stdout, &slog.HandlerOptions{ Level: slog.LevelDebug, })) // Create BACKBEAT client client := sdk.NewClient(config) // Task management var ( taskQueue = make(chan *Task, 100) activeTasks = make(map[string]*Task) completedTasks = 0 failedTasks = 0 taskMutex sync.RWMutex ) // Register beat callback for status reporting client.OnBeat(func(beat sdk.BeatFrame) { taskMutex.RLock() activeCount := len(activeTasks) taskMutex.RUnlock() // Emit status every 2 beats if beat.BeatIndex%2 == 0 { state := "waiting" if activeCount > 0 { state = "executing" } progress := float64(completedTasks) / float64(completedTasks+failedTasks+activeCount+len(taskQueue)) if math.IsNaN(progress) { progress = 0.0 } err := client.EmitStatusClaim(sdk.StatusClaim{ State: state, BeatsLeft: activeCount * 5, // Estimate 5 beats per active task Progress: progress, Notes: fmt.Sprintf("Active: %d, Completed: %d, Failed: %d, Queue: %d", activeCount, completedTasks, failedTasks, len(taskQueue)), }) if err != nil { slog.Error("Failed to emit status claim", "error", err) } } }) // Register downbeat callback to create new tasks client.OnDownbeat(func(beat sdk.BeatFrame) { slog.Info("New bar - creating tasks", "beat_index", beat.BeatIndex, "window_id", beat.WindowID) // Create 1-3 new tasks each bar numTasks := mathRand.Intn(3) + 1 for i := 0; i < numTasks; i++ { task := &Task{ ID: fmt.Sprintf("task-%d-%d", beat.BeatIndex, i), Description: fmt.Sprintf("Process data batch %d", i), BeatBudget: mathRand.Intn(8) + 2, // 2-10 beat budget WorkTime: time.Duration(mathRand.Intn(3)+1) * time.Second, // 1-4 seconds of work Created: time.Now(), } select { case taskQueue <- task: slog.Debug("Task created", "task_id", task.ID, "budget", task.BeatBudget) default: slog.Warn("Task queue full, dropping task", "task_id", task.ID) } } }) // Setup graceful shutdown ctx, cancel := context.WithCancel(context.Background()) defer cancel() // Handle shutdown signals sigChan := make(chan os.Signal, 1) signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) go func() { <-sigChan slog.Info("Shutdown signal received") cancel() }() // Start the client if err := client.Start(ctx); err != nil { slog.Error("Failed to start BACKBEAT client", "error", err) return } defer client.Stop() slog.Info("Task processor started - use Ctrl+C to stop") // Start task workers const numWorkers = 3 for i := 0; i < numWorkers; i++ { go func(workerID int) { for { select { case <-ctx.Done(): return case task := <-taskQueue: processTaskWithBudget(ctx, client, task, workerID, &taskMutex, activeTasks, &completedTasks, &failedTasks) } } }(i) } // Wait for shutdown <-ctx.Done() slog.Info("Task processor shutting down") } // processTaskWithBudget processes a task using BACKBEAT beat budgets func processTaskWithBudget( ctx context.Context, client sdk.Client, task *Task, workerID int, taskMutex *sync.RWMutex, activeTasks map[string]*Task, completedTasks *int, failedTasks *int, ) { // Add task to active tasks taskMutex.Lock() activeTasks[task.ID] = task taskMutex.Unlock() // Remove from active tasks when done defer func() { taskMutex.Lock() delete(activeTasks, task.ID) taskMutex.Unlock() }() slog.Info("Processing task", "worker", workerID, "task_id", task.ID, "budget", task.BeatBudget, "work_time", task.WorkTime) // Use beat budget to execute the task err := client.WithBeatBudget(task.BeatBudget, func() error { // Emit starting status client.EmitStatusClaim(sdk.StatusClaim{ TaskID: task.ID, State: "executing", BeatsLeft: task.BeatBudget, Progress: 0.0, Notes: fmt.Sprintf("Worker %d processing %s", workerID, task.Description), }) // Simulate work with progress updates steps := 5 stepDuration := task.WorkTime / time.Duration(steps) for step := 0; step < steps; step++ { select { case <-ctx.Done(): return ctx.Err() case <-time.After(stepDuration): progress := float64(step+1) / float64(steps) client.EmitStatusClaim(sdk.StatusClaim{ TaskID: task.ID, State: "executing", BeatsLeft: int(float64(task.BeatBudget) * (1.0 - progress)), Progress: progress, Notes: fmt.Sprintf("Worker %d step %d/%d", workerID, step+1, steps), }) } } return nil }) // Handle completion or timeout if err != nil { slog.Warn("Task failed or timed out", "worker", workerID, "task_id", task.ID, "error", err) *failedTasks++ // Emit failure status client.EmitStatusClaim(sdk.StatusClaim{ TaskID: task.ID, State: "failed", BeatsLeft: 0, Progress: 0.0, Notes: fmt.Sprintf("Worker %d failed: %s", workerID, err.Error()), }) } else { slog.Info("Task completed successfully", "worker", workerID, "task_id", task.ID, "duration", time.Since(task.Created)) *completedTasks++ // Emit completion status client.EmitStatusClaim(sdk.StatusClaim{ TaskID: task.ID, State: "done", BeatsLeft: 0, Progress: 1.0, Notes: fmt.Sprintf("Worker %d completed %s", workerID, task.Description), }) } }