Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 14 additions & 4 deletions pkg/queue/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,11 @@ func (manager *Manager) Run(ctx context.Context) error {
// Run task loop with handler (if any queues registered)
if len(queues) > 0 {
if err := manager.RunTaskLoop(ctx, func(ctx context.Context, task *schema.Task) error {
return manager.runTaskWorker(ctx, task, manager.tracer)
err := manager.runTaskWorker(ctx, task, manager.tracer)
if err != nil {
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The log variable may be nil, which would cause a panic when calling log.With. Add a nil check before using the logger, similar to the pattern used elsewhere in this file (see lines 106-111).

Suggested change
if err != nil {
if err != nil && log != nil {

Copilot uses AI. Check for mistakes.
log.With("task", task).Print(ctx, err)
}
return err
}, queues...); err != nil {
if !errors.Is(err, context.Canceled) {
mu.Lock()
Expand Down Expand Up @@ -206,13 +210,12 @@ func (manager *Manager) runTaskWorker(ctx context.Context, task *schema.Task, tr
return fmt.Errorf("no worker registered for queue %q", task.Queue)
}

var result error

// Set deadline based on task dies_at
child, cancel := withDeadline(ctx, types.PtrTime(task.DiesAt))
defer cancel()

// Create the span
var result error
child2, endfunc := otel.StartSpan(tracer, child, spanManagerName("task."+task.Queue),
attribute.String("task", task.String()),
)
Expand All @@ -222,10 +225,17 @@ func (manager *Manager) runTaskWorker(ctx context.Context, task *schema.Task, tr
result = worker.Run(child2, task.Payload)

// Release the task back to the queue as success/failure (use child2 to nest the span)
if _, releaseErr := manager.ReleaseTask(child2, task.Id, result == nil, result, nil); releaseErr != nil {
var status string
if _, releaseErr := manager.ReleaseTask(child2, task.Id, result == nil, result, &status); releaseErr != nil {
result = errors.Join(result, releaseErr)
}

// If the status is not 'released', log a warning
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The comment says "log a warning" but the code creates an error instead. Either the comment should be updated to match the implementation, or the implementation should use a logging approach instead of creating an error.

Suggested change
// If the status is not 'released', log a warning
// If the status is not 'released', record this in the result error

Copilot uses AI. Check for mistakes.
if status != "released" {
Comment on lines +233 to +234
Copy link

Copilot AI Jan 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This logic incorrectly treats all non-"released" statuses as errors. According to the database schema, valid task statuses include 'expired', 'new', 'failed', 'retry', 'retained', 'released', and 'unknown'. Status 'retry' is a valid outcome when a task is being retried and should not be treated as an error. Consider checking for explicitly problematic statuses (e.g., 'failed', 'expired') rather than checking for the absence of 'released'.

Suggested change
// If the status is not 'released', log a warning
if status != "released" {
// Only treat explicitly problematic statuses as errors
if status == "failed" || status == "expired" || status == "unknown" {

Copilot uses AI. Check for mistakes.
result = errors.Join(result, fmt.Errorf("task status: %s", status))
}

// Return any errors
return result
}

Expand Down