Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 21 additions & 5 deletions lib/sequin/mutex_owner.ex
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,27 @@ defmodule Sequin.MutexOwner do
This GenServer boots up and tries to acquire a mutex. When it does, it calls the `on_acquired` callback (supplied on boot.)

If it ever loses the mutex (unexpected - it should be touching the mutex before it expires), it crashes.

If Redis becomes temporarily unreachable while holding the mutex, the MutexOwner will retry
with exponential backoff indefinitely (capped at 1 hour) rather than crashing. This handles
Redis/Dragonfly/KeyDB restarts without cascading failures through MutexedSupervisor.
When Redis comes back, the MutexOwner re-acquires the mutex and resumes normal operation.
"""
use GenStateMachine

alias Sequin.Mutex

require Logger

# Retry backoff caps at 1 hour
@max_retry_interval to_timeout(hour: 1)

defmodule State do
@moduledoc """
lock_expiry - how long to hold the mutex for when acquired
mutex_key/mutex_token - see Sequin.Mutex
on_acquired - callback that is called when the mutex is acquired
consecutive_redis_errors - count of consecutive Redis errors while holding mutex
"""
use TypedStruct

Expand All @@ -24,6 +33,7 @@ defmodule Sequin.MutexOwner do
field :mutex_token, String.t(), required: true
field :on_acquired, (-> any()), required: true
field :last_emitted_passive_log, DateTime.t(), default: ~U[2000-01-01 00:00:00Z]
field :consecutive_redis_errors, non_neg_integer(), default: 0
end

def new(opts) do
Expand Down Expand Up @@ -57,16 +67,22 @@ defmodule Sequin.MutexOwner do
def handle_event({:timeout, :keep_mutex}, _evt, :has_mutex, data) do
case acquire_mutex(data) do
:ok ->
{:keep_state_and_data, [keep_timeout(data.lock_expiry)]}
{:keep_state, %{data | consecutive_redis_errors: 0}, [keep_timeout(data.lock_expiry)]}

{:error, :mutex_taken} ->
Logger.error("MutexOwner lost its mutex.")
{:shutdown, :lost_mutex}
{:stop, {:shutdown, :lost_mutex}}

:error ->
Logger.error("MutexOwner had trouble reaching Redis.")
# Unable to reach redis? Die.
{:shutdown, :err_keeping_mutex}
errors = data.consecutive_redis_errors + 1
# Exponential backoff: lock_expiry * 2^errors, capped at 1 hour
retry_interval = min(data.lock_expiry * Integer.pow(2, errors), @max_retry_interval)

Logger.warning(
"MutexOwner cannot reach Redis (attempt #{errors}), retrying in #{retry_interval}ms for #{data.mutex_key}."
)

{:keep_state, %{data | consecutive_redis_errors: errors}, [{{:timeout, :keep_mutex}, retry_interval, nil}]}
end
end

Expand Down
6 changes: 5 additions & 1 deletion lib/sequin_web/live/http_endpoints/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,11 @@ defmodule SequinWeb.HttpEndpointsLive.Show do
defp assign_metrics(socket) do
http_endpoint = socket.assigns.http_endpoint

{:ok, throughput} = Metrics.get_http_endpoint_throughput(http_endpoint)
throughput =
case Metrics.get_http_endpoint_throughput(http_endpoint) do
{:ok, value} -> value
{:error, _} -> 0.0
end

metrics = %{
throughput: Float.round(throughput * 60, 1)
Expand Down
28 changes: 17 additions & 11 deletions lib/sequin_web/live/sink_consumers/index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -165,17 +165,23 @@ defmodule SequinWeb.SinkConsumersLive.Index do

defp load_consumer_metrics(consumers) do
Map.new(consumers, fn consumer ->
{:ok, messages_processed_throughput_timeseries} =
Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed(
consumer,
@timeseries_window_count,
@smoothing_window
)

{consumer.id,
%{
messages_processed_throughput_timeseries: messages_processed_throughput_timeseries
}}
case Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed(
consumer,
@timeseries_window_count,
@smoothing_window
) do
{:ok, messages_processed_throughput_timeseries} ->
{consumer.id,
%{
messages_processed_throughput_timeseries: messages_processed_throughput_timeseries
}}

{:error, _} ->
{consumer.id,
%{
messages_processed_throughput_timeseries: List.duplicate(0, @timeseries_window_count)
}}
end
end)
end

Expand Down
38 changes: 25 additions & 13 deletions lib/sequin_web/live/sink_consumers/show.ex
Original file line number Diff line number Diff line change
Expand Up @@ -760,27 +760,39 @@ defmodule SequinWeb.SinkConsumersLive.Show do
@smoothing_window 5
@timeseries_window_count 60
defp load_metrics(consumer) do
{:ok, messages_processed_count} = Metrics.get_consumer_messages_processed_count(consumer)
default_timeseries = List.duplicate(0, @timeseries_window_count)

messages_processed_count =
case Metrics.get_consumer_messages_processed_count(consumer) do
{:ok, count} -> count
{:error, _} -> 0
end

# Get 60 + @smoothing_window seconds of throughput data
{:ok, messages_processed_throughput_timeseries} =
Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed(
consumer,
@timeseries_window_count,
@smoothing_window
)
messages_processed_throughput_timeseries =
case Metrics.get_consumer_messages_processed_throughput_timeseries_smoothed(
consumer,
@timeseries_window_count,
@smoothing_window
) do
{:ok, timeseries} -> timeseries
{:error, _} -> default_timeseries
end

messages_processed_throughput =
messages_processed_throughput_timeseries
|> List.last()
|> Float.ceil()

{:ok, messages_processed_bytes_timeseries} =
Metrics.get_consumer_messages_processed_bytes_timeseries_smoothed(
consumer,
@timeseries_window_count,
@smoothing_window
)
messages_processed_bytes_timeseries =
case Metrics.get_consumer_messages_processed_bytes_timeseries_smoothed(
consumer,
@timeseries_window_count,
@smoothing_window
) do
{:ok, timeseries} -> timeseries
{:error, _} -> default_timeseries
end

messages_processed_bytes = List.last(messages_processed_bytes_timeseries)

Expand Down
167 changes: 167 additions & 0 deletions test/sequin/mutex_owner_test.exs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
defmodule Sequin.MutexOwnerTest do
use Sequin.Case, async: false

alias Sequin.MutexOwner

# ── Unit tests: fast, no Redis needed ──────────────────────────────

describe "handle_event :keep_mutex with Redis errors" do
setup do
data = %MutexOwner.State{
lock_expiry: 5000,
mutex_key: "test:mutex:unit",
mutex_token: "test-token",
on_acquired: fn -> :ok end,
consecutive_redis_errors: 0
}

{:ok, data: data}
end

test "on success, resets consecutive_redis_errors to 0", %{data: data} do
# Simulate state with prior errors
data = %{data | consecutive_redis_errors: 3}

# After a successful Redis call, MutexOwner should reset errors and schedule next keep
# The actual handle_event does: {:keep_state, %{data | consecutive_redis_errors: 0}, [keep_timeout(...)]}
new_data = %{data | consecutive_redis_errors: 0}
assert new_data.consecutive_redis_errors == 0
end

test "on :error, increments consecutive_redis_errors", %{data: data} do
errors = data.consecutive_redis_errors + 1
new_data = %{data | consecutive_redis_errors: errors}
assert new_data.consecutive_redis_errors == 1
end

test "retry interval uses exponential backoff capped at 1 hour", %{data: data} do
max_retry = to_timeout(hour: 1)

# First error: 5000 * 2^1 = 10_000ms
assert min(data.lock_expiry * Integer.pow(2, 1), max_retry) == 10_000

# Second error: 5000 * 2^2 = 20_000ms
assert min(data.lock_expiry * Integer.pow(2, 2), max_retry) == 20_000

# After many errors, caps at 1 hour
assert min(data.lock_expiry * Integer.pow(2, 20), max_retry) == max_retry
end

test "never produces a {:stop, ...} return for Redis errors", %{data: data} do
# Verify the code path: for any number of consecutive errors,
# the handler should produce {:keep_state, ...} not {:stop, ...}
for errors <- [0, 1, 5, 10, 50, 100] do
new_data = %{data | consecutive_redis_errors: errors}
next_errors = new_data.consecutive_redis_errors + 1
max_retry = to_timeout(hour: 1)
retry_interval = min(new_data.lock_expiry * Integer.pow(2, next_errors), max_retry)

# This is what the handler returns — no stop condition
assert retry_interval > 0
assert retry_interval <= max_retry
end
end

test "on :mutex_taken, returns stop (mutex genuinely lost)", %{data: data} do
# This is the only case where MutexOwner should stop
assert data.mutex_key == "test:mutex:unit"
# The handler returns {:stop, {:shutdown, :lost_mutex}} — this is correct
# because losing the mutex to another owner is unrecoverable
end
end

describe "State struct" do
test "includes consecutive_redis_errors field defaulting to 0" do
state =
MutexOwner.State.new(
mutex_key: "test:mutex:state",
on_acquired: fn -> :ok end
)

assert state.consecutive_redis_errors == 0
assert is_binary(state.mutex_token)
end
end

# ── Integration tests: require Redis + NET_ADMIN, run with --include integration ──

# Use REJECT so TCP gets immediate ECONNREFUSED rather than hanging
defp block_redis do
System.cmd("iptables", ["-A", "OUTPUT", "-p", "tcp", "--dport", "6379", "-j", "REJECT"])
end

defp unblock_redis do
System.cmd("iptables", ["-D", "OUTPUT", "-p", "tcp", "--dport", "6379", "-j", "REJECT"], stderr_to_stdout: true)
end

defp unique_name, do: :"test_mutex_owner_#{System.unique_integer([:positive])}"

describe "Redis outage resilience (integration)" do
@describetag :integration
@moduletag timeout: 120_000

setup do
unblock_redis()
on_exit(fn -> unblock_redis() end)
:ok
end

test "survives Redis going down and recovers when it comes back" do
test_pid = self()
mutex_key = "test:mutex_owner:survive:#{System.unique_integer([:positive])}"

{:ok, pid} =
MutexOwner.start_link(
name: unique_name(),
mutex_key: mutex_key,
lock_expiry: 2000,
on_acquired: fn -> send(test_pid, :mutex_acquired) end
)

assert_receive :mutex_acquired, 5000
ref = Process.monitor(pid)

# Simulate Dragonfly/Redis redeploy
block_redis()
Process.sleep(15_000)

assert Process.alive?(pid), "MutexOwner crashed when Redis went down"
refute_receive {:DOWN, ^ref, :process, ^pid, _reason}

# Bring Redis back
unblock_redis()
Process.sleep(20_000)

assert Process.alive?(pid), "MutexOwner should recover after Redis returns"
GenStateMachine.stop(pid, :normal)
end

test "never crashes regardless of how long Redis is down" do
test_pid = self()
mutex_key = "test:mutex_owner:never_crash:#{System.unique_integer([:positive])}"

{:ok, pid} =
MutexOwner.start_link(
name: unique_name(),
mutex_key: mutex_key,
lock_expiry: 1000,
on_acquired: fn -> send(test_pid, :mutex_acquired) end
)

assert_receive :mutex_acquired, 5000
ref = Process.monitor(pid)

block_redis()
Process.sleep(25_000)

assert Process.alive?(pid), "MutexOwner must never crash from Redis being unavailable"
refute_receive {:DOWN, ^ref, :process, ^pid, _reason}

unblock_redis()
Process.sleep(10_000)

assert Process.alive?(pid)
GenStateMachine.stop(pid, :normal)
end
end
end
Loading