11package redisqueue
22
33import (
4+ "context"
5+ "log/slog"
46 "net"
57 "os"
68 "sync"
79 "time"
810
9- "github.com/go-redis/redis/v7"
1011 "github.com/pkg/errors"
12+ "github.com/redis/go-redis/v9"
1113)
1214
1315// ConsumerFunc is a type alias for the functions that will be used to handle
@@ -52,13 +54,19 @@ type ConsumerOptions struct {
5254 BufferSize int
5355 // Concurrency dictates how many goroutines to spawn to handle the messages.
5456 Concurrency int
57+
58+ // MaxDeliveryCount is the maximum number of times a message can be delivered
59+ // before it is considered failed. If this is set to 0, the message will be
60+ // retried indefinitely
61+ MaxDeliveryCount int64
62+
5563 // RedisClient supersedes the RedisOptions field, and allows you to inject
5664 // an already-made Redis Client for use in the consumer. This may be either
5765 // the standard client or a cluster client.
5866 RedisClient redis.UniversalClient
5967 // RedisOptions allows you to configure the underlying Redis connection.
6068 // More info here:
61- // https://pkg.go.dev/github.com/go-redis/redis/v7?tab=doc #Options.
69+ // https://pkg.go.dev/github.com/redis/ go-redis/v9 #Options.
6270 //
6371 // This field is used if RedisClient field is nil.
6472 RedisOptions * RedisOptions
@@ -99,14 +107,14 @@ var defaultConsumerOptions = &ConsumerOptions{
99107// BufferSize to 100, and Concurrency to 10. In most production environments,
100108// you'll want to use NewConsumerWithOptions.
101109func NewConsumer () (* Consumer , error ) {
102- return NewConsumerWithOptions (defaultConsumerOptions )
110+ return NewConsumerWithOptions (context . Background (), defaultConsumerOptions )
103111}
104112
105113// NewConsumerWithOptions creates a Consumer with custom ConsumerOptions. If
106114// Name is left empty, it defaults to the hostname; if GroupName is left empty,
107115// it defaults to "redisqueue"; if BlockingTimeout is 0, it defaults to 5
108116// seconds; if ReclaimInterval is 0, it defaults to 1 second.
109- func NewConsumerWithOptions (options * ConsumerOptions ) (* Consumer , error ) {
117+ func NewConsumerWithOptions (ctx context. Context , options * ConsumerOptions ) (* Consumer , error ) {
110118 hostname , _ := os .Hostname ()
111119
112120 if options .Name == "" {
@@ -130,7 +138,7 @@ func NewConsumerWithOptions(options *ConsumerOptions) (*Consumer, error) {
130138 r = newRedisClient (options .RedisOptions )
131139 }
132140
133- if err := redisPreflightChecks (r ); err != nil {
141+ if err := redisPreflightChecks (ctx , r ); err != nil {
134142 return nil , err
135143 }
136144
@@ -182,15 +190,15 @@ func (c *Consumer) Register(stream string, fn ConsumerFunc) {
182190// Run will terminate early. The same will happen if an error occurs when
183191// creating the consumer group in Redis. Run will block until Shutdown is called
184192// and all of the in-flight messages have been processed.
185- func (c * Consumer ) Run () {
193+ func (c * Consumer ) Run (ctx context. Context ) {
186194 if len (c .consumers ) == 0 {
187195 c .Errors <- errors .New ("at least one consumer function needs to be registered" )
188196 return
189197 }
190198
191199 for stream , consumer := range c .consumers {
192200 c .streams = append (c .streams , stream )
193- err := c .redis .XGroupCreateMkStream (stream , c .options .GroupName , consumer .id ).Err ()
201+ err := c .redis .XGroupCreateMkStream (ctx , stream , c .options .GroupName , consumer .id ).Err ()
194202 // ignoring the BUSYGROUP error makes this a noop
195203 if err != nil && err .Error () != "BUSYGROUP Consumer Group name already exists" {
196204 c .Errors <- errors .Wrap (err , "error creating consumer group" )
@@ -202,8 +210,8 @@ func (c *Consumer) Run() {
202210 c .streams = append (c .streams , ">" )
203211 }
204212
205- go c .reclaim ()
206- go c .poll ()
213+ go c .reclaim (ctx )
214+ go c .poll (ctx )
207215
208216 stop := newSignalHandler ()
209217 go func () {
@@ -214,7 +222,7 @@ func (c *Consumer) Run() {
214222 c .wg .Add (c .options .Concurrency )
215223
216224 for i := 0 ; i < c .options .Concurrency ; i ++ {
217- go c .work ()
225+ go c .work (ctx )
218226 }
219227
220228 c .wg .Wait ()
@@ -237,7 +245,7 @@ func (c *Consumer) Shutdown() {
237245// If VisibilityTimeout is 0, this function returns early and no messages are
238246// reclaimed. It checks the list of pending messages according to
239247// ReclaimInterval.
240- func (c * Consumer ) reclaim () {
248+ func (c * Consumer ) reclaim (ctx context. Context ) {
241249 if c .options .VisibilityTimeout == 0 {
242250 return
243251 }
@@ -256,7 +264,7 @@ func (c *Consumer) reclaim() {
256264 end := "+"
257265
258266 for {
259- res , err := c .redis .XPendingExt (& redis.XPendingExtArgs {
267+ res , err := c .redis .XPendingExt (ctx , & redis.XPendingExtArgs {
260268 Stream : stream ,
261269 Group : c .options .GroupName ,
262270 Start : start ,
@@ -275,8 +283,17 @@ func (c *Consumer) reclaim() {
275283 msgs := make ([]string , 0 )
276284
277285 for _ , r := range res {
286+ slog .Info ("pending message" , "id" , r .ID , "count" , r .RetryCount , "max" , c .options .MaxDeliveryCount )
287+ if c .options .MaxDeliveryCount > 0 && r .RetryCount >= c .options .MaxDeliveryCount {
288+ slog .Info ("message exceeded delivery count limit" , "id" , r .ID , "count" , r .RetryCount , "max" , c .options .MaxDeliveryCount )
289+ err = c .redis .XAck (ctx , stream , c .options .GroupName , r .ID ).Err ()
290+ if err != nil {
291+ c .Errors <- errors .Wrapf (err , "error acknowledging after retry count exceeded for %q stream and %q message, " , stream , r .ID )
292+ continue
293+ }
294+ }
278295 if r .Idle >= c .options .VisibilityTimeout {
279- claimres , err := c .redis .XClaim (& redis.XClaimArgs {
296+ claimres , err := c .redis .XClaim (ctx , & redis.XClaimArgs {
280297 Stream : stream ,
281298 Group : c .options .GroupName ,
282299 Consumer : c .options .Name ,
@@ -297,7 +314,7 @@ func (c *Consumer) reclaim() {
297314 // exists, the only way we can get it out of the
298315 // pending state is to acknowledge it.
299316 if err == redis .Nil {
300- err = c .redis .XAck (stream , c .options .GroupName , r .ID ).Err ()
317+ err = c .redis .XAck (ctx , stream , c .options .GroupName , r .ID ).Err ()
301318 if err != nil {
302319 c .Errors <- errors .Wrapf (err , "error acknowledging after failed claim for %q stream and %q message" , stream , r .ID )
303320 continue
@@ -324,18 +341,18 @@ func (c *Consumer) reclaim() {
324341// messages for this consumer to process. It blocks for up to 5 seconds instead
325342// of blocking indefinitely so that it can periodically check to see if Shutdown
326343// was called.
327- func (c * Consumer ) poll () {
344+ func (c * Consumer ) poll (ctx context. Context ) {
328345 for {
329346 select {
330347 case <- c .stopPoll :
331348 // once the polling has stopped (i.e. there will be no more messages
332349 // put onto c.queue), stop all of the workers
333- for i := 0 ; i < c .options .Concurrency ; i ++ {
350+ for range c .options .Concurrency {
334351 c .stopWorkers <- struct {}{}
335352 }
336353 return
337354 default :
338- res , err := c .redis .XReadGroup (& redis.XReadGroupArgs {
355+ res , err := c .redis .XReadGroup (ctx , & redis.XReadGroupArgs {
339356 Group : c .options .GroupName ,
340357 Consumer : c .options .Name ,
341358 Streams : c .streams ,
@@ -378,7 +395,7 @@ func (c *Consumer) enqueue(stream string, msgs []redis.XMessage) {
378395// channel, it calls the corrensponding ConsumerFunc depending on the stream it
379396// came from. If no error is returned from the ConsumerFunc, the message is
380397// acknowledged in Redis.
381- func (c * Consumer ) work () {
398+ func (c * Consumer ) work (ctx context. Context ) {
382399 defer c .wg .Done ()
383400
384401 for {
@@ -389,7 +406,7 @@ func (c *Consumer) work() {
389406 c .Errors <- errors .Wrapf (err , "error calling ConsumerFunc for %q stream and %q message" , msg .Stream , msg .ID )
390407 continue
391408 }
392- err = c .redis .XAck (msg .Stream , c .options .GroupName , msg .ID ).Err ()
409+ err = c .redis .XAck (ctx , msg .Stream , c .options .GroupName , msg .ID ).Err ()
393410 if err != nil {
394411 c .Errors <- errors .Wrapf (err , "error acknowledging after success for %q stream and %q message" , msg .Stream , msg .ID )
395412 continue
0 commit comments