Skip to content

Commit dfcde79

Browse files
committed
✅ Clean up tests
Goal here is to reduce flakiness and remove extraneous log messages
1 parent 0aae944 commit dfcde79

19 files changed

Lines changed: 171 additions & 78 deletions

config/test.exs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -41,10 +41,10 @@ config :sequin, Sequin.Repo,
4141
hostname: "localhost",
4242
database: "sequin_test",
4343
pool: Ecto.Adapters.SQL.Sandbox,
44-
pool_size: 20,
44+
pool_size: 40,
4545
port: 5432,
46-
queue_target: 100,
47-
queue_interval: 1000,
46+
queue_target: 500,
47+
queue_interval: 2000,
4848
ssl: false,
4949
types: PostgrexTypes
5050

lib/sequin/application.ex

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -54,6 +54,7 @@ defmodule Sequin.Application do
5454
children =
5555
base_children() ++
5656
[
57+
Sequin.SystemMetricsServer,
5758
SequinWeb.Telemetry,
5859
MutexedSupervisor.child_spec(
5960
Sequin.Runtime.MutexedSupervisor,
@@ -91,7 +92,6 @@ defmodule Sequin.Application do
9192
Sequin.Sinks.Nats.ConnectionCache,
9293
Sequin.Sinks.RabbitMq.ConnectionCache,
9394
SequinWeb.Presence,
94-
Sequin.SystemMetricsServer,
9595
{Task, fn -> enqueue_workers() end},
9696
# Start to serve requests, typically the last entry
9797
SequinWeb.Endpoint,

lib/sequin/runtime/message_consistency_check_worker.ex

Lines changed: 5 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,15 +25,17 @@ defmodule Sequin.Runtime.MessageConsistencyCheckWorker do
2525
def audit_and_trim_undelivered_cursors(consumer_id, older_than_timestamp) do
2626
case MessageLedgers.count_undelivered_wal_cursors(consumer_id, older_than_timestamp) do
2727
{:ok, 0} ->
28-
:ok
28+
{:ok, 0}
2929

3030
{:ok, undelivered_cursor_count} ->
31-
Logger.warning("[MessageConsistencyCheckWorker] Found undelivered cursors (count=#{undelivered_cursor_count})",
31+
Logger.warning(
32+
"[MessageConsistencyCheckWorker] Found undelivered cursors (count=#{undelivered_cursor_count})",
3233
consumer_id: consumer_id,
3334
undelivered_cursor_count: undelivered_cursor_count
3435
)
3536

36-
MessageLedgers.trim_stale_undelivered_wal_cursors(consumer_id, older_than_timestamp)
37+
:ok = MessageLedgers.trim_stale_undelivered_wal_cursors(consumer_id, older_than_timestamp)
38+
{:ok, undelivered_cursor_count}
3739
end
3840
end
3941

mix.exs

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,14 @@ defmodule Sequin.MixProject do
2323
def application do
2424
[
2525
mod: {Sequin.Application, []},
26-
extra_applications: [:logger, :runtime_tools, :os_mon] ++ extra_applications(Mix.env()),
26+
extra_applications: [:logger, :runtime_tools] ++ extra_applications(Mix.env()),
2727
included_applications: [:aws_credentials]
2828
]
2929
end
3030

31-
defp extra_applications(:dev), do: [:wx, :observer]
32-
defp extra_applications(_), do: []
31+
defp extra_applications(:dev), do: [:os_mon, :wx, :observer]
32+
defp extra_applications(:test), do: []
33+
defp extra_applications(_), do: [:os_mon]
3334

3435
# Specifies which paths to compile per environment.
3536
defp elixirc_paths(:test), do: ["lib", "test/support"]

test/sequin/gcp_pubsub_pipeline_test.exs

Lines changed: 46 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -12,34 +12,30 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do
1212
setup do
1313
consumer = ConsumersFactory.insert_sink_consumer!(type: :gcp_pubsub, batch_size: 10)
1414

15-
# Setup auth token expectation for all tests
16-
Req.Test.expect(PubSub, fn conn ->
17-
if conn.host == "oauth2.googleapis.com" do
18-
Req.Test.json(conn, %{"access_token" => "test_token"})
19-
end
20-
end)
21-
2215
{:ok, %{consumer: consumer}}
2316
end
2417

2518
test "events are sent to PubSub", %{consumer: consumer} do
2619
message = ConsumersFactory.consumer_message()
2720

28-
# Mock PubSub publish
29-
Req.Test.expect(PubSub, fn conn ->
30-
assert conn.method == "POST"
31-
assert conn.host == "pubsub.googleapis.com"
32-
assert String.contains?(conn.request_path, ":publish")
21+
Req.Test.stub(PubSub, fn conn ->
22+
if conn.host == "oauth2.googleapis.com" do
23+
Req.Test.json(conn, %{"access_token" => "test_token"})
24+
else
25+
assert conn.method == "POST"
26+
assert conn.host == "pubsub.googleapis.com"
27+
assert String.contains?(conn.request_path, ":publish")
3328

34-
{:ok, body, _} = Plug.Conn.read_body(conn)
35-
body = Jason.decode!(body)
29+
{:ok, body, _} = Plug.Conn.read_body(conn)
30+
body = Jason.decode!(body)
3631

37-
data = get_in(body, ["messages", Access.at(0), "data"])
38-
data = data |> Base.decode64!() |> Jason.decode!()
39-
assert Map.has_key?(data, "record")
40-
assert Map.has_key?(data, "metadata")
32+
data = get_in(body, ["messages", Access.at(0), "data"])
33+
data = data |> Base.decode64!() |> Jason.decode!()
34+
assert Map.has_key?(data, "record")
35+
assert Map.has_key?(data, "metadata")
4136

42-
Req.Test.json(conn, %{})
37+
Req.Test.json(conn, %{})
38+
end
4339
end)
4440

4541
start_pipeline!(consumer)
@@ -55,16 +51,19 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do
5551
message1 = ConsumersFactory.consumer_message(group_id: group_id)
5652
message2 = ConsumersFactory.consumer_message(group_id: group_id)
5753

58-
# Mock PubSub publish and verify batch
59-
Req.Test.expect(PubSub, fn conn ->
60-
assert conn.method == "POST"
61-
assert conn.host == "pubsub.googleapis.com"
54+
Req.Test.stub(PubSub, fn conn ->
55+
if conn.host == "oauth2.googleapis.com" do
56+
Req.Test.json(conn, %{"access_token" => "test_token"})
57+
else
58+
assert conn.method == "POST"
59+
assert conn.host == "pubsub.googleapis.com"
6260

63-
{:ok, body, _} = Plug.Conn.read_body(conn)
64-
body = Jason.decode!(body)
65-
assert length(body["messages"]) == 2
61+
{:ok, body, _} = Plug.Conn.read_body(conn)
62+
body = Jason.decode!(body)
63+
assert length(body["messages"]) == 2
6664

67-
Req.Test.json(conn, %{})
65+
Req.Test.json(conn, %{})
66+
end
6867
end)
6968

7069
start_pipeline!(consumer)
@@ -78,7 +77,9 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do
7877
message1 = ConsumersFactory.consumer_message(group_id: group_id)
7978
message2 = ConsumersFactory.consumer_message(group_id: group_id)
8079

81-
Req.Test.expect(PubSub, 3, fn conn ->
80+
# Use stub to handle non-deterministic auth request count (1 or 2 auth requests
81+
# depending on whether the token is cached before the second batch processor starts)
82+
Req.Test.stub(PubSub, fn conn ->
8283
if conn.host == "oauth2.googleapis.com" do
8384
Req.Test.json(conn, %{"access_token" => "test_token"})
8485
else
@@ -102,11 +103,14 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do
102103

103104
@tag capture_log: true
104105
test "failed PubSub publish results in failed events", %{consumer: consumer} do
105-
# Mock failed PubSub publish
106-
Req.Test.expect(PubSub, fn conn ->
107-
conn
108-
|> Plug.Conn.put_status(500)
109-
|> Req.Test.json(%{"error" => "Failed to publish to PubSub"})
106+
Req.Test.stub(PubSub, fn conn ->
107+
if conn.host == "oauth2.googleapis.com" do
108+
Req.Test.json(conn, %{"access_token" => "test_token"})
109+
else
110+
conn
111+
|> Plug.Conn.put_status(500)
112+
|> Req.Test.json(%{"error" => "Failed to publish to PubSub"})
113+
end
110114
end)
111115

112116
start_pipeline!(consumer)
@@ -133,13 +137,16 @@ defmodule Sequin.Runtime.GcpPubsubPipelineTest do
133137

134138
message = ConsumersFactory.consumer_message()
135139

136-
# Mock PubSub publish and verify topic
137-
Req.Test.expect(PubSub, fn conn ->
138-
assert conn.method == "POST"
139-
assert conn.host == "pubsub.googleapis.com"
140-
assert String.contains?(conn.request_path, "/my_topic:publish")
140+
Req.Test.stub(PubSub, fn conn ->
141+
if conn.host == "oauth2.googleapis.com" do
142+
Req.Test.json(conn, %{"access_token" => "test_token"})
143+
else
144+
assert conn.method == "POST"
145+
assert conn.host == "pubsub.googleapis.com"
146+
assert String.contains?(conn.request_path, "/my_topic:publish")
141147

142-
Req.Test.json(conn, %{})
148+
Req.Test.json(conn, %{})
149+
end
143150
end)
144151

145152
start_pipeline!(consumer)

test/sequin/message_consistency_check_worker_test.exs

Lines changed: 18 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,6 @@
11
defmodule Sequin.MessageConsistencyCheckWorkerTest do
22
use Sequin.DataCase, async: true
33

4-
import ExUnit.CaptureLog
5-
64
alias Sequin.Factory.ConsumersFactory
75
alias Sequin.Runtime.MessageConsistencyCheckWorker
86
alias Sequin.Runtime.MessageLedgers
@@ -26,10 +24,11 @@ defmodule Sequin.MessageConsistencyCheckWorkerTest do
2624
# Set timestamp to 2 minutes ago
2725
two_minutes_ago = DateTime.add(DateTime.utc_now(), -2 * 60, :second)
2826

29-
# Capture logs to verify output
30-
assert capture_log(fn ->
31-
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(consumer.id, two_minutes_ago)
32-
end) =~ "Found undelivered cursors (count=3)"
27+
assert {:ok, 3} =
28+
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(
29+
consumer.id,
30+
two_minutes_ago
31+
)
3332

3433
# Verify that the undelivered cursors set was trimmed
3534
assert {:ok, 0} = MessageLedgers.count_undelivered_wal_cursors(consumer.id, two_minutes_ago)
@@ -42,10 +41,12 @@ defmodule Sequin.MessageConsistencyCheckWorkerTest do
4241
# Set timestamp to 2 minutes ago
4342
two_minutes_ago = DateTime.add(DateTime.utc_now(), -2 * 60, :second)
4443

45-
# Capture logs to verify output
46-
assert capture_log(fn ->
47-
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(consumer.id, two_minutes_ago)
48-
end) == ""
44+
# Should return :ok without logging (0-count path)
45+
assert {:ok, 0} =
46+
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(
47+
consumer.id,
48+
two_minutes_ago
49+
)
4950

5051
# Verify that the undelivered cursors set is empty
5152
assert {:ok, 0} = MessageLedgers.count_undelivered_wal_cursors(consumer.id, two_minutes_ago)
@@ -67,13 +68,15 @@ defmodule Sequin.MessageConsistencyCheckWorkerTest do
6768
# Set timestamp to 1 minute ago (not stale enough)
6869
one_minute_ago = DateTime.add(DateTime.utc_now(), -60, :second)
6970

70-
# Capture logs to verify output
71-
assert capture_log(fn ->
72-
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(consumer.id, one_minute_ago)
73-
end) == ""
71+
assert {:ok, 0} =
72+
MessageConsistencyCheckWorker.audit_and_trim_undelivered_cursors(
73+
consumer.id,
74+
one_minute_ago
75+
)
7476

7577
# Verify that the undelivered cursors set still contains the messages
76-
assert {:ok, 2} = MessageLedgers.count_undelivered_wal_cursors(consumer.id, DateTime.utc_now())
78+
assert {:ok, 2} =
79+
MessageLedgers.count_undelivered_wal_cursors(consumer.id, DateTime.utc_now())
7780
end
7881
end
7982
end

test/sequin/message_handler_test.exs

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -143,8 +143,8 @@ defmodule Sequin.MessageHandlerTest do
143143
database = DatabasesFactory.insert_postgres_database!(account_id: account.id)
144144

145145
field = ReplicationFactory.field()
146-
table_schema1 = Factory.postgres_object()
147-
table_schema2 = Factory.postgres_object()
146+
table_schema1 = "schema_one_#{Factory.sequence()}"
147+
table_schema2 = "schema_two_#{Factory.sequence()}"
148148

149149
message1 =
150150
ReplicationFactory.postgres_message(table_oid: 123, action: :insert, fields: [field], table_schema: table_schema1)

test/sequin/postgres/benchmark_source_test.exs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -173,7 +173,7 @@ defmodule Sequin.Postgres.BenchmarkSourceTest do
173173
receive do
174174
{:messages_received, messages} -> receive_messages(count, acc ++ messages)
175175
after
176-
100 -> acc
176+
1_000 -> acc
177177
end
178178
end
179179
end

test/sequin/postgres_replication_test.exs

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -44,6 +44,7 @@ defmodule Sequin.PostgresReplicationTest do
4444
alias Sequin.TestSupport.SimpleHttpServer
4545

4646
@moduletag :unboxed
47+
@moduletag :capture_log
4748

4849
@publication "characters_publication"
4950

@@ -1499,6 +1500,21 @@ defmodule Sequin.PostgresReplicationTest do
14991500
defp stop_replication!(pg_replication) do
15001501
# Stop the supervisor using its via_tuple
15011502
stop_supervised!(Supervisor.via_tuple(pg_replication.id))
1503+
# Wait for :syn to clean up the process registration asynchronously
1504+
wait_for_syn_cleanup({Supervisor, pg_replication.id})
1505+
end
1506+
1507+
defp wait_for_syn_cleanup(key, attempts \\ 50) do
1508+
if :syn.lookup(:replication, key) == :undefined do
1509+
:ok
1510+
else
1511+
if attempts > 0 do
1512+
Process.sleep(10)
1513+
wait_for_syn_cleanup(key, attempts - 1)
1514+
else
1515+
raise "Timed out waiting for :syn to clean up registration for #{inspect(key)}"
1516+
end
1517+
end
15021518
end
15031519

15041520
# defp config do

test/sequin/postgres_test.exs

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,8 @@ defmodule Sequin.PostgresTest do
77
alias Sequin.Repo
88
alias Sequin.Test.UnboxedRepo
99

10+
@moduletag :capture_log
11+
1012
setup do
1113
{:ok, conn} = Postgrex.start_link(UnboxedRepo.config())
1214
%{conn: conn}

0 commit comments

Comments
 (0)