Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions lib/beetle/deduplication_store.rb
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,7 @@ module Beetle
# * how often a message has already been seen by some consumer
# * whether a message has been processed successfully
# * how many attempts have been made to execute a message handler for a given message
# * how long we should wait before trying to execute the message handler after a failure
# * how many exceptions have been raised during previous execution attempts
# * how long we should wait before trying to perform the next execution attempt
# * whether some other process is already trying to execute the message handler
#
# It also provides a method to garbage collect keys for expired messages.
Expand Down Expand Up @@ -39,7 +37,7 @@ def redis

# list of key suffixes to use for storing values in Redis. 'status'
# always needs to be the first element of the array.
KEY_SUFFIXES = [:status, :ack_count, :timeout, :delay, :attempts, :exceptions, :mutex, :expires]
KEY_SUFFIXES = [:status, :ack_count, :timeout, :attempts, :exceptions, :mutex, :expires]

# build a Redis key out of a message id and a given suffix
def key(msg_id, suffix)
Expand Down
36 changes: 3 additions & 33 deletions lib/beetle/message.rb
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,6 @@ class Message
TIMEOUT_GRACE_PERIOD = 10.seconds
# how many times we should try to run a handler before giving up
DEFAULT_HANDLER_EXECUTION_ATTEMPTS = 1
# how many seconds we should wait before retrying handler execution
DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY = 10.seconds
# how many exceptions should be tolerated before giving up
DEFAULT_EXCEPTION_LIMIT = 0

Expand All @@ -46,10 +44,6 @@ class Message
attr_reader :expires_at
# how many seconds the handler is allowed to execute
attr_reader :timeout
# how long to wait before retrying the message handler
attr_reader :delay
# maximum wait time for message handler retries (uses exponential backoff)
attr_reader :max_delay
# how many times we should try to run the handler
attr_reader :attempts_limit
# how many exceptions we should tolerate before giving up
Expand All @@ -75,14 +69,11 @@ def initialize(queue, header, body, logger, opts = {})
def setup(opts) #:nodoc:
@server = opts[:server]
@timeout = opts[:timeout] || DEFAULT_HANDLER_TIMEOUT.to_i
@delay = (opts[:delay] || DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY).ceil
@attempts_limit = opts[:attempts] || DEFAULT_HANDLER_EXECUTION_ATTEMPTS
@exceptions_limit = opts[:exceptions] || DEFAULT_EXCEPTION_LIMIT
@attempts_limit = @exceptions_limit + 1 if @attempts_limit <= @exceptions_limit
@retry_on = opts[:retry_on] || nil
@store = opts[:store]
max_delay = opts[:max_delay] || @delay
@max_delay = max_delay.ceil if max_delay >= 2*@delay
end

# extracts various values from the AMQP header properties
Expand Down Expand Up @@ -190,16 +181,6 @@ def completed!
@store.mset(msg_id, :status => "completed", :timeout => 0)
end

# whether we should wait before running the handler
def delayed?(t = nil)
(t ||= @store.get(msg_id, :delay)) && t.to_i > now
end

# store delay value in the deduplication store
def set_delay!
@store.set(msg_id, :delay, now + next_delay(attempts))
end

# how many times we already tried running the handler
def attempts
@store.get(msg_id, :attempts).to_i
Expand Down Expand Up @@ -269,8 +250,8 @@ def delivery_count
cnt.to_i
end

def fetch_status_delay_timeout_attempts_exceptions
@store.mget(msg_id, [:status, :delay, :timeout, :attempts, :exceptions])
def fetch_status_timeout_attempts_exceptions
@store.mget(msg_id, [:status, :timeout, :attempts, :exceptions])
end

# process this message and do not allow any exception to escape to the caller
Expand Down Expand Up @@ -320,13 +301,10 @@ def process_internal(handler)
elsif !key_exists?
run_handler!(handler)
else
status, delay, timeout, attempts, exceptions = fetch_status_delay_timeout_attempts_exceptions
status, timeout, attempts, exceptions = fetch_status_timeout_attempts_exceptions
if status == "completed"
ack!
RC::OK
elsif delay && delayed?(delay)
logger.warn "Beetle: ignored delayed message (#{msg_id})!"
RC::Delayed
elsif !(timeout && timed_out?(timeout))
RC::HandlerNotYetTimedOut
elsif attempts && attempts_limit_reached?(attempts)
Expand Down Expand Up @@ -394,7 +372,6 @@ def handler_failed!(result)
else
delete_mutex!
timed_out!
set_delay!
result
end
end
Expand All @@ -415,12 +392,5 @@ def ack!
end
end

def next_delay(n)
if max_delay
[delay * (2**n), max_delay].min
else
delay
end
end
end
end
1 change: 0 additions & 1 deletion lib/beetle/r_c.rb
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,6 @@ def self.rc(name, *args)
rc :AttemptsLimitReached, :failure
rc :ExceptionsLimitReached, :failure
rc :ExceptionNotAccepted, :failure
rc :Delayed, :reject
rc :HandlerCrash, :reject
rc :HandlerNotYetTimedOut, :reject
rc :MutexLocked, :reject
Expand Down
55 changes: 0 additions & 55 deletions test/beetle/message/settings_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,61 +19,6 @@ def setup
assert_equal "completed", @store.get(message.msg_id, :status)
end

test "set_delay! should store the current time plus the delay offset in the database" do
message = Message.new("somequeue", header_with_params, 'foo', logger, :delay => 2, :store => @store)
message.expects(:now).returns(9)
message.set_delay!
assert_equal "11", @store.get(message.msg_id, :delay)
message.expects(:now).returns(12)
assert !message.delayed?
message.expects(:now).returns(10)
assert message.delayed?
end

test "set_delay! should store the current time plus the exponential delay offset in the database" do
message = Message.new("somequeue", header_with_params, 'foo', logger, :delay => 3, :max_delay => 30, :store => @store)
message.stubs(:now).returns(1)

[4, 7, 13, 25].each do |exp_delay|
message.set_delay!
assert_equal exp_delay, @store.get(message.msg_id, :delay).to_i
message.increment_execution_attempts!
end
end

test "set_delay! should store the current time plus the exponential delay offset in the database up to given max value" do
message = Message.new("somequeue", header_with_params, 'foo', logger, :delay => 3, :max_delay => 10, :store => @store)
message.stubs(:now).returns(1)

[4, 7, 11, 11].each do |exp_delay|
message.set_delay!
assert_equal exp_delay, @store.get(message.msg_id, :delay).to_i
message.increment_execution_attempts!
end
end

test "set_delay! should store the current time plus the linear delay offset in the database" do
delay = 32
message = Message.new("somequeue", header_with_params, 'foo', logger, :delay => delay, :store => @store)
[3, 5, 6].each do |now_offset|
message.stubs(:now).returns(now_offset)
message.set_delay!
assert_equal @store.get(message.msg_id, :delay).to_i, now_offset + delay
message.increment_execution_attempts!
end
end

test "set_delay! should use the default delay if the delay hasn't been set on the message instance" do
message = Message.new("somequeue", header_with_params, 'foo', logger, :store => @store)
message.expects(:now).returns(0)
message.set_delay!
assert_equal "#{Message::DEFAULT_HANDLER_EXECUTION_ATTEMPTS_DELAY}", @store.get(message.msg_id, :delay)
message.expects(:now).returns(message.delay)
assert !message.delayed?
message.expects(:now).returns(0)
assert message.delayed?
end

test "set_timeout! should store the current time plus the number of timeout seconds in the database" do
message = Message.new("somequeue", header_with_params, 'foo', logger, :timeout => 1, :store => @store)
message.expects(:now).returns(1)
Expand Down
44 changes: 4 additions & 40 deletions test/beetle/message_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -236,7 +236,7 @@ def setup
assert_equal 0, @store.redis.exists(*keys)
end

test "successful processing of a redundant message once should insert all but the delay key and the exception count key into the database" do
test "successful processing of a redundant message once should insert all but the exception count key into the database" do
header = header_with_params({:redundant => true})
header.expects(:ack)
message = Message.new("somequeue", header, 'foo', logger, :store => @store)
Expand All @@ -251,7 +251,6 @@ def setup
assert @store.exists(message.msg_id, :attempts)
assert @store.exists(message.msg_id, :timeout)
assert @store.exists(message.msg_id, :ack_count)
assert !@store.exists(message.msg_id, :delay)
assert !@store.exists(message.msg_id, :exceptions)
end
end
Expand All @@ -278,19 +277,6 @@ def setup
assert_equal :no, processed
end

test "a delayed message should not be acked and the handler should not be called" do
header = header_with_params()
header.expects(:ack).never
message = Message.new("somequeue", header, 'foo', logger, :attempts => 2, :store => @store)
message.set_delay!
assert !message.key_exists?
assert message.delayed?

processed = :no
message.process(Handler.create(lambda {|*args| processed = true}, logger))
assert_equal :no, processed
end

test "acking a non redundant message should remove the ack_count key" do
header = header_with_params({})
header.expects(:ack)
Expand Down Expand Up @@ -447,7 +433,7 @@ def setup

test "a message should not be acked if the handler crashes and the exception limit has not been reached" do
header = header_with_params({})
message = Message.new("somequeue", header, 'foo', logger, :delay => 42, :timeout => 10.seconds, :exceptions => 1, :store => @store)
message = Message.new("somequeue", header, 'foo', logger, :timeout => 10.seconds, :exceptions => 1, :store => @store)
assert !message.attempts_limit_reached?
assert !message.exceptions_limit_reached?
assert !message.timed_out?
Expand All @@ -460,13 +446,13 @@ def setup
assert !message.completed?
assert_equal "1", @store.get(message.msg_id, :exceptions)
assert_equal "0", @store.get(message.msg_id, :timeout)
assert_equal "52", @store.get(message.msg_id, :delay)
assert !@store.exists(message.msg_id, :mutex)
end

test "a message should delete the mutex before resetting the timer if attempts and exception limits haven't been reached" do
Message.stubs(:now).returns(9)
header = header_with_params({})
message = Message.new("somequeue", header, 'foo', logger, :delay => 42, :timeout => 10.seconds, :exceptions => 1, :store => @store)
message = Message.new("somequeue", header, 'foo', logger, :timeout => 10.seconds, :exceptions => 1, :store => @store)
assert !message.attempts_limit_reached?
assert !message.exceptions_limit_reached?
assert !@store.get(message.msg_id, :mutex)
Expand Down Expand Up @@ -592,36 +578,18 @@ def setup
assert_equal RC::OK, message.__send__(:process_internal, proc)
end

test "an incomplete, delayed existing message should be processed later" do
header = header_with_params({})
message = Message.new("somequeue", header, 'foo', logger, :delay => 10.seconds, :attempts => 2, :store => @store)
assert !message.key_exists?
assert !message.completed?
message.set_delay!
assert message.delayed?

proc = mock("proc")
header.expects(:ack).never
proc.expects(:call).never
assert_equal RC::Delayed, message.__send__(:process_internal, proc)
assert message.delayed?
assert !message.completed?
end

test "an incomplete, undelayed, not yet timed out, existing message should be processed later" do
header = header_with_params({})
message = Message.new("somequeue", header, 'foo', logger, :timeout => 10.seconds, :attempts => 2, :store => @store)
assert !message.key_exists?
assert !message.completed?
assert !message.delayed?
message.set_timeout!
assert !message.timed_out?

proc = mock("proc")
header.expects(:ack).never
proc.expects(:call).never
assert_equal RC::HandlerNotYetTimedOut, message.__send__(:process_internal, proc)
assert !message.delayed?
assert !message.completed?
assert !message.timed_out?
end
Expand All @@ -632,7 +600,6 @@ def setup
message.increment_execution_attempts!
assert !message.key_exists?
assert !message.completed?
assert !message.delayed?
message.timed_out!
assert message.timed_out?

Expand All @@ -652,7 +619,6 @@ def setup
message.increment_execution_attempts!
assert !message.key_exists?
assert !message.completed?
assert !message.delayed?
message.timed_out!
assert message.timed_out?
assert !message.attempts_limit_reached?
Expand All @@ -670,7 +636,6 @@ def setup
message = Message.new("somequeue", header, 'foo', logger, :store => @store)
assert !message.key_exists?
assert !message.completed?
assert !message.delayed?
message.timed_out!
assert message.timed_out?
assert !message.attempts_limit_reached?
Expand All @@ -690,7 +655,6 @@ def setup
message = Message.new("somequeue", header, 'foo', logger, :store => @store)
assert !message.key_exists?
assert !message.completed?
assert !message.delayed?
message.timed_out!
assert message.timed_out?
assert !message.attempts_limit_reached?
Expand Down