Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
57 changes: 42 additions & 15 deletions lib/resque_stuck_queue.rb
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,14 @@ def abort_on_exception
end
end

def disconnect_recovery
config[:disconnect_recovery]
end

def recovery_interval
config[:recovery_interval] || 2.seconds
end

def start_in_background
Thread.new do
Thread.current.abort_on_exception = abort_on_exception
Expand Down Expand Up @@ -171,22 +179,25 @@ def setup_watcher_thread
Thread.current.abort_on_exception = abort_on_exception
log_starting_thread(:watcher)
while @running
mutex = RedisMutex.new(:resque_stuck_queue, :block => 0)
if mutex.lock
begin
queues.each do |queue_name|
log_watcher_info(queue_name)
if should_trigger?(queue_name)
trigger_handler(queue_name, :triggered)
elsif should_recover?(queue_name)
trigger_handler(queue_name, :recovered)
error_lambda = lambda { |e| "Watcher thread couldn't access redis: #{e.inspect}" }
handle_redis_disconnect(error_lambda) do
mutex = RedisMutex.new(:resque_stuck_queue, :block => 0)
if mutex.lock
begin
queues.each do |queue_name|
log_watcher_info(queue_name)
if should_trigger?(queue_name)
trigger_handler(queue_name, :triggered)
elsif should_recover?(queue_name)
trigger_handler(queue_name, :recovered)
end
end
ensure
mutex.unlock
end
ensure
mutex.unlock
end
wait_for_it(:watcher_interval)
end
wait_for_it(:watcher_interval)
end
end
end
Expand Down Expand Up @@ -227,9 +238,12 @@ def enqueue_jobs
config[:heartbeat_job].call
else
queues.each do |queue_name|
# Redis::Namespace.new support as well as Redis.new
namespace = redis.respond_to?(:namespace) ? redis.namespace : nil
Resque.enqueue_to(queue_name, HeartbeatJob, heartbeat_key_for(queue_name), redis.client.host, redis.client.port, namespace, Time.now.to_i )
error_lambda = lambda { |e| "Enqueuing heartbeat job for #{queue_name} crashed: #{e.inspect}" }
handle_redis_disconnect(error_lambda) do
# Redis::Namespace.new support as well as Redis.new
namespace = redis.respond_to?(:namespace) ? redis.namespace : nil
Resque.enqueue_to(queue_name, HeartbeatJob, heartbeat_key_for(queue_name), redis.client.host, redis.client.port, namespace, Time.now.to_i )
end
end
end
end
Expand Down Expand Up @@ -314,6 +328,19 @@ def pretty_process_name
$0 = "rake --trace resque:stuck_queue #{redis.inspect} QUEUES=#{queues.join(",")}"
end

def handle_redis_disconnect(error_message)
yield
rescue Redis::BaseError, SocketError => e
message = error_message.respond_to?(:call) ? error_message.call(e) : error_message
logger.error(message)
logger.error("\n#{e.backtrace.join("\n")}")
if disconnect_recovery
logger.error("Sleeping for #{recovery_interval} before trying again")
sleep recovery_interval
else
raise e
end
end
end
end
end
Expand Down
30 changes: 16 additions & 14 deletions lib/resque_stuck_queue/config.rb
Original file line number Diff line number Diff line change
Expand Up @@ -19,20 +19,22 @@ module StuckQueue
class Config < Hash

OPTIONS_DESCRIPTIONS = {
:triggered_handler => "set to what gets triggered when resque-stuck-queue will detect the latest heartbeat is older than the trigger_timeout time setting.\n\tExample:\n\tResque::StuckQueue.config[:triggered_handler] = proc { |queue_name, lagtime| send_email('queue \#{queue_name} isnt working, aaah the daemons') }",
:recovered_handler => "set to what gets triggered when resque-stuck-queue has triggered a problem, but then detects the queue went back down to functioning well again(it wont trigger again until it has recovered).\n\tExample:\n\tResque::StuckQueue.config[:recovered_handler] = proc { |queue_name, lagtime| send_email('phew, queue \#{queue_name} is ok') }",
:heartbeat_interval => "set to how often to push the 'heartbeat' job which will refresh the latest working time.\n\tExample:\n\tResque::StuckQueue.config[:heartbeat_interval] = 5.minutes",
:watcher_interval => "set to how often to check to see when the last time it worked was.\n\tExample:\n\tResque::StuckQueue.config[:watcher_interval] = 1.minute",
:trigger_timeout => "set to how much of a resque work lag you are willing to accept before being notified. note: take the :watcher_interval setting into account when setting this timeout.\n\tExample:\n\tResque::StuckQueue.config[:trigger_timeout] = 9.minutes",
:warn_interval => "optional: if set, it will continiously trigger/warn in spaces of this interval after first trigger. eg, as long as lagtime keeps on being above trigger_timeout/recover hasn't occured yet.",
:redis => "set the Redis StuckQueue will use. Either a Redis or Redis::Namespace instance.",
:heartbeat_key => "optional, name of keys to keep track of the last good resque heartbeat time",
:triggered_key => "optional, name of keys to keep track of the last trigger time",
:logger => "optional, pass a Logger. Default a ruby logger will be instantiated. Needs to respond to that interface.",
:queues => "optional, monitor specific queues you want to send a heartbeat/monitor to. default is [:app]",
:abort_on_exception => "optional, if you want the resque-stuck-queue threads to explicitly raise, default is true",
:heartbeat_job => "optional, your own custom refreshing job. if you are using something other than resque",
:enable_signals => "optional, allow resque::stuck's signal_handlers which do mostly nothing at this point. possible future plan: log info, reopen log file, etc.",
:triggered_handler => "set to what gets triggered when resque-stuck-queue will detect the latest heartbeat is older than the trigger_timeout time setting.\n\tExample:\n\tResque::StuckQueue.config[:triggered_handler] = proc { |queue_name, lagtime| send_email('queue \#{queue_name} isnt working, aaah the daemons') }",
:recovered_handler => "set to what gets triggered when resque-stuck-queue has triggered a problem, but then detects the queue went back down to functioning well again(it wont trigger again until it has recovered).\n\tExample:\n\tResque::StuckQueue.config[:recovered_handler] = proc { |queue_name, lagtime| send_email('phew, queue \#{queue_name} is ok') }",
:heartbeat_interval => "set to how often to push the 'heartbeat' job which will refresh the latest working time.\n\tExample:\n\tResque::StuckQueue.config[:heartbeat_interval] = 5.minutes",
:watcher_interval => "set to how often to check to see when the last time it worked was.\n\tExample:\n\tResque::StuckQueue.config[:watcher_interval] = 1.minute",
:trigger_timeout => "set to how much of a resque work lag you are willing to accept before being notified. note: take the :watcher_interval setting into account when setting this timeout.\n\tExample:\n\tResque::StuckQueue.config[:trigger_timeout] = 9.minutes",
:warn_interval => "optional: if set, it will continiously trigger/warn in spaces of this interval after first trigger. eg, as long as lagtime keeps on being above trigger_timeout/recover hasn't occured yet.",
:redis => "set the Redis StuckQueue will use. Either a Redis or Redis::Namespace instance.",
:heartbeat_key => "optional, name of keys to keep track of the last good resque heartbeat time",
:triggered_key => "optional, name of keys to keep track of the last trigger time",
:logger => "optional, pass a Logger. Default a ruby logger will be instantiated. Needs to respond to that interface.",
:queues => "optional, monitor specific queues you want to send a heartbeat/monitor to. default is [:app]",
:abort_on_exception => "optional, if you want the resque-stuck-queue threads to explicitly raise, default is true",
:heartbeat_job => "optional, your own custom refreshing job. if you are using something other than resque",
:enable_signals => "optional, allow resque::stuck's signal_handlers which do mostly nothing at this point. possible future plan: log info, reopen log file, etc.",
:disconnect_recovery => "optional, continues running resque-stuck-queue even if there is an issue connecting to redis. Will continue looping and making attempts until connection is re-established. Default is false",
:recovery_interval => "optional, set to how oftern to try and connect to redis if there is a network failure. Defaults to 2 seconds.",
}

OPTIONS = OPTIONS_DESCRIPTIONS.keys
Expand Down