Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,8 @@ The options are:
- `silence_polling` - whether to silence Active Record logs emitted when polling (Defaults to true)
- `use_skip_locked` - whether to use `FOR UPDATE SKIP LOCKED` when performing trimming. This will be automatically detected in the future, and for now, you'd only need to set this to `false` if your database doesn't support it. For MySQL, that'd be versions < 8, and for PostgreSQL, versions < 9.5. If you use SQLite, this has no effect, as writes are sequential. (Defaults to true)
- `trim_batch_size` - the batch size to use when deleting old records (default: `100`)
- `reconnect_attempts` - Supports a number of connection attempts or an array of
durations to wait between attempts. (Defaults to 1 retry attempt)


## Trimming
Expand Down
26 changes: 25 additions & 1 deletion lib/action_cable/subscription_adapter/solid_cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ def listener
end

class Listener < ::ActionCable::SubscriptionAdapter::SubscriberMap
CONNECTION_ERRORS = [ ActiveRecord::ConnectionFailed ]
Stop = Class.new(Exception)

def initialize(event_loop)
Expand All @@ -51,11 +52,17 @@ def initialize(event_loop)
# for specific sections of code, rather than acquired.
@critical = Concurrent::Semaphore.new(0)

@reconnect_attempt = 0

@thread = Thread.new do
Thread.current.name = "solid_cable_listener"
Thread.current.report_on_exception = true

listen
begin
listen
rescue *CONNECTION_ERRORS
retry if retry_connecting?
end
end
end

Expand Down Expand Up @@ -107,6 +114,7 @@ def invoke_callback(*)
private
attr_reader :event_loop, :thread
attr_writer :last_id
attr_accessor :reconnect_attempt

def last_id
@last_id ||= last_message_id
Expand Down Expand Up @@ -146,6 +154,22 @@ def with_polling_volume
yield
end
end

def reconnect_attempts
@reconnect_attempts ||= ::SolidCable.reconnect_attempts
end

def retry_connecting?
self.reconnect_attempt += 1

return false if reconnect_attempt > reconnect_attempts.size

sleep_t = reconnect_attempts[reconnect_attempt - 1]

sleep(sleep_t) if sleep_t > 0

true
end
end
end
end
Expand Down
6 changes: 6 additions & 0 deletions lib/solid_cable.rb
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,12 @@ def trim_chance
2
end

def reconnect_attempts
attempts = cable_config.fetch(:reconnect_attempts, 1)
attempts = Array.new(attempts, 0) if attempts.is_a?(Integer)
attempts
end

private
def cable_config
Rails.application.config_for("cable")
Expand Down
24 changes: 24 additions & 0 deletions test/lib/action_cable/subscription_adapter/solid_cable_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,30 @@ class ActionCable::SubscriptionAdapter::SolidCableTest < ActionCable::TestCase
end
end

test "retries after a connection failure and keeps listening" do
with_cable_config reconnect_attempts: [0] do
raised = false
original = SolidCable::Message.method(:broadcastable)

SolidCable::Message.stub(:broadcastable, lambda { |channels, last_id|
if raised
original.call(channels, last_id)
else
raised = true
raise ActiveRecord::ConnectionFailed, "boom"
end
}) do
subscribe_as_queue("reconnect-channel") do |queue|
@tx_adapter.broadcast("reconnect-channel", "hello")

assert_equal "hello", next_message_in_queue(queue)
end
end

assert raised
end
end

private
def cable_config
{ adapter: "solid_cable", message_retention: "1.second",
Expand Down
16 changes: 16 additions & 0 deletions test/solid_cable_test.rb
Original file line number Diff line number Diff line change
Expand Up @@ -46,4 +46,20 @@ class SolidCableTest < ActiveSupport::TestCase
assert_equal 42, SolidCable.trim_batch_size
end
end

test "reconnect_attempts defaults to a single zero" do
assert_equal [ 0 ], SolidCable.reconnect_attempts
end

test "reconnect_attempts accepts an integer" do
with_cable_config reconnect_attempts: 3 do
assert_equal [ 0, 0, 0 ], SolidCable.reconnect_attempts
end
end

test "reconnect_attempts accepts an array" do
with_cable_config reconnect_attempts: [ 0, 1, 2 ] do
assert_equal [ 0, 1, 2 ], SolidCable.reconnect_attempts
end
end
end