Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Readme.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
[![GoDoc](https://godoc.org/github.com/oshankkumar/taskqueue-go?status.svg)](https://godoc.org/github.com/oshankkumar/taskqueue-go)
[![License: MIT](https://img.shields.io/badge/license-MIT-green.svg)](https://opensource.org/licenses/MIT)
![Build Status](https://github.com/oshankkumar/taskqueue-go/actions/workflows/go.yml/badge.svg?branch=main)
![Build Status](https://github.com/oshankkumar/taskqueue-go/actions/workflows/build_taskmanager.yml/badge.svg?branch=main)
[![Go Report Card](https://goreportcard.com/badge/github.com/oshankkumar/taskqueue-go)](https://goreportcard.com/report/github.com/oshankkumar/taskqueue-go)

**TaskQueue-Go** is a high-performance, distributed task queue library for Go, designed to simplify background job
Expand Down
8 changes: 4 additions & 4 deletions examples/basic-worker/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ func main() {

fmt.Printf("job processed queue_name=email_queue job_id=%s\n", job.ID)
return nil
}), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(1))
}), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(4))

worker.RegisterHandler("payment_queue", taskqueue.HandlerFunc(func(ctx context.Context, job *taskqueue.Job) error {
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)
Expand All @@ -72,19 +72,19 @@ func main() {
paymentProcessed.Add(1)
fmt.Printf("job processed queue_name=payment_queue job_id=%s\n", job.ID)
return nil
}), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(1))
}), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(4))

worker.RegisterHandler("push_notification_queue", taskqueue.HandlerFunc(func(ctx context.Context, job *taskqueue.Job) error {
time.Sleep(time.Duration(rand.Intn(2000)) * time.Millisecond)

if rand.Intn(100) < 30 {
return errors.New("something bad happened")
return taskqueue.ErrSkipRetry{Err: errors.New("something bad happened"), SkipReason: "Don't want to send outdated notification"}
}

notifyProcessed.Add(1)
fmt.Printf("job processed queue_name=push_notification_queue job_id=%s\n", job.ID)
return nil
}), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(1))
}), taskqueue.WithConcurrency(8), taskqueue.WithMaxAttempts(4))

ctx, cancel := signal.NotifyContext(context.Background(), os.Interrupt, syscall.SIGTERM)
defer cancel()
Expand Down
6 changes: 6 additions & 0 deletions redis/luascripts/dequeue_inline.lua
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,12 @@ redis.call("ZADD", queueKey, "XX", newScore, jobID)

-- Fetch job details from the hash
local jobKey = jobKeyPrefix .. jobID

local fieldsAdded = redis.call("HSET", jobKey, "status", "Active")
if fieldsAdded ~= 0 then
return { err = "Failed in updating the job status to Active" }
end

local jobDetails = redis.call("HGETALL", jobKey)

-- Return the job ID and its details
Expand Down
6 changes: 3 additions & 3 deletions redis/redisq.go
Original file line number Diff line number Diff line change
Expand Up @@ -165,10 +165,10 @@ func (q *Queue) Ack(ctx context.Context, job *taskqueue.Job, opts *taskqueue.Ack
}

func (q *Queue) Nack(ctx context.Context, job *taskqueue.Job, opts *taskqueue.NackOptions) error {
if opts.MaxAttemptsExceeded {
return q.moveToDead(ctx, job, opts)
if opts.ShouldRetry {
return q.retry(ctx, job, opts)
}
return q.retry(ctx, job, opts)
return q.moveToDead(ctx, job, opts)
}

func (q *Queue) moveToDead(ctx context.Context, job *taskqueue.Job, opts *taskqueue.NackOptions) error {
Expand Down
18 changes: 9 additions & 9 deletions redis/redisq_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,6 @@ package redis
import (
"bytes"
"context"
"os"
"testing"
"time"

Expand All @@ -26,16 +25,13 @@ const testPayload = `{
"phone": "+1 (823) 515-3571"
}`

func TestRedisQueueEnqueue(t *testing.T) {
redisAddr := os.Getenv("REDIS_ADDR")
if redisAddr == "" {
t.Skip("skipping test since REDIS_ADDR is not set")
}

client := redis.NewClient(&redis.Options{Addr: redisAddr})
func TestRedisQueue(t *testing.T) {
client := redis.NewClient(&redis.Options{Addr: "localhost:6379"})

q := NewQueue(client, WithCompletedJobTTL(time.Minute*30))

client.Del(context.Background(), redisKeyPendingQueue(taskqueue.DefaultNameSpace, "test_redis_queue"))

job := taskqueue.NewJob()
job.Payload = []byte(testPayload)

Expand All @@ -60,7 +56,11 @@ func TestRedisQueueEnqueue(t *testing.T) {
}

if deqJob.ID != job.ID {
t.Fatal("expected ID to be equal to job ID")
t.Fatalf("expected ID to be equal to job ID, expected: %s got: %s", job.ID, deqJob.ID)
}

if deqJob.Status != taskqueue.JobStatusActive {
t.Error("expected status to be active after dequeue got:", deqJob.Status)
}

deqJob.Status = taskqueue.JobStatusCompleted
Expand Down
6 changes: 3 additions & 3 deletions redis/redisqwithjobstorage.go
Original file line number Diff line number Diff line change
Expand Up @@ -127,10 +127,10 @@ func (q *QueueWithExternalJobStorage) Nack(ctx context.Context, job *taskqueue.J
return err
}

if opts.MaxAttemptsExceeded {
return q.nackDead(ctx, job.ID, opts)
if opts.ShouldRetry {
return q.nack(ctx, job.ID, opts)
}
return q.nack(ctx, job.ID, opts)
return q.nackDead(ctx, job.ID, opts)
}

func (q *QueueWithExternalJobStorage) nackDead(ctx context.Context, jobID string, opts *taskqueue.NackOptions) error {
Expand Down
6 changes: 3 additions & 3 deletions taskqueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,9 +126,9 @@ type AckOptions struct {
}

type NackOptions struct {
QueueName string
RetryAfter time.Duration
MaxAttemptsExceeded bool
QueueName string
RetryAfter time.Duration
ShouldRetry bool
}

type Acker interface {
Expand Down
34 changes: 26 additions & 8 deletions worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,13 +3,15 @@ package taskqueue
import (
"context"
"errors"
"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/mem"
"fmt"
"log/slog"
"math"
"os"
"sync"
"time"

"github.com/shirou/gopsutil/v4/cpu"
"github.com/shirou/gopsutil/v4/mem"
)

type Handler interface {
Expand Down Expand Up @@ -255,6 +257,19 @@ func (w *Worker) dequeueJob(ctx context.Context, jobCh chan<- *Job, h *queueHand
}
}

type ErrSkipRetry struct {
Err error
SkipReason string
}

func (e ErrSkipRetry) Error() string {
return fmt.Sprintf("skip retry: %s, reason: %s", e.Err, e.SkipReason)
}

func (e ErrSkipRetry) Unwrap() error {
return e.Err
}

func (w *Worker) processJob(ctx context.Context, job *Job, h *queueHandler) error {
ctx, cancel := context.WithTimeout(context.WithoutCancel(ctx), h.jobOptions.Timeout)
defer cancel()
Expand All @@ -271,26 +286,27 @@ func (w *Worker) processJob(ctx context.Context, job *Job, h *queueHandler) erro
job.UpdatedAt = time.Now()
job.Attempts++

var skipErr ErrSkipRetry
switch {
case jobErr == nil:
job.Status = JobStatusCompleted
case job.Attempts >= h.jobOptions.MaxAttempts:
case job.Attempts >= h.jobOptions.MaxAttempts, errors.As(jobErr, &skipErr):
job.Status = JobStatusDead
default:
job.Status = JobStatusFailed
}

if jobErr == nil {
if job.Status == JobStatusCompleted {
_ = w.metricsBackend.IncrementCounter(ctx, Metric{Name: MetricJobProcessedCount}, 1, time.Now())
return w.queue.Ack(ctx, job, &AckOptions{QueueName: h.queueName})
}

_ = w.metricsBackend.IncrementCounter(ctx, Metric{Name: MetricJobFailedCount}, 1, time.Now())

nackOpts := &NackOptions{
QueueName: h.queueName,
RetryAfter: h.jobOptions.BackoffFunc(job.Attempts),
MaxAttemptsExceeded: job.Attempts >= h.jobOptions.MaxAttempts,
QueueName: h.queueName,
RetryAfter: h.jobOptions.BackoffFunc(job.Attempts),
ShouldRetry: job.Status == JobStatusFailed,
}

return w.queue.Nack(ctx, job, nackOpts)
Expand Down Expand Up @@ -348,7 +364,7 @@ func (w *Worker) startHeartBeat(ctx context.Context) {

sendHearBeat()

heartBeatTicker := time.NewTicker(time.Second * 10)
heartBeatTicker := time.NewTicker(heartBeatInterval)
defer heartBeatTicker.Stop()

for {
Expand Down Expand Up @@ -466,6 +482,8 @@ func (w *Worker) monitorQueues(ctx context.Context) {
}
}

const heartBeatInterval = time.Second * 30

type HeartbeatQueueData struct {
Name string
Concurrency int
Expand Down