Skip to content

Conversation

@alco
Copy link
Member

@alco alco commented Jan 20, 2026

Summary

Implements fragment-direct streaming for shape consumers, allowing transaction fragments to be written directly to storage as they arrive instead of buffering the entire transaction in memory until commit.

Closes #3415

Motivation

Previously, the consumer would buffer all changes in a transaction in memory until a Commit message arrived. For large transactions, this could cause significant memory pressure. This PR enables shapes without subquery dependencies to stream fragments directly to storage, reducing memory usage while maintaining crash-safety guarantees.

Key Changes

  • Fragment-direct mode: Shapes without subquery dependencies now write transaction fragments directly to storage via append_fragment_to_log!/2
  • Crash-safety invariant: fetch_latest_offset only returns committed transaction offsets, ensuring recovery correctness even if uncommitted fragment data is flushed to disk
  • Fallback to TransactionBuilder: Shapes with dependencies, during initial filtering, or with materializer subscriptions continue using the existing buffered approach

Limitations

Fragment-direct mode is only enabled when:

  • Shape has no subquery dependencies
  • Not buffering for initial snapshot
  • No materializer subscribed (inner shapes need full transaction handling)
  • Initial snapshot filtering is complete

@coderabbitai
Copy link

coderabbitai bot commented Jan 20, 2026

Important

Review skipped

Draft detected.

Please check the settings in the CodeRabbit UI or the .coderabbit.yaml file in this repository. To trigger a single review, invoke the @coderabbitai review command.

You can disable this status message by setting the reviews.review_status to false in the CodeRabbit configuration file.


Comment @coderabbitai help to get the list of available commands and usage tips.

@codecov
Copy link

codecov bot commented Jan 20, 2026

❌ 56 Tests Failed:

Tests completed Failed Passed Skipped
2134 56 2078 0
View the top 3 failed test(s) by shortest run time
test/integration.test.ts > HTTP Sync > should revalidate etags
Stack Traces | 0.0587s run time
AssertionError: expected 1 to deeply equal 10

- Expected
+ Received

- 10
+ 1

 ❯ test/integration.test.ts:679:29
test/integration.test.ts > HTTP Sync > should get initial data and then receive updates (liveSSE=false)
Stack Traces | 0.0702s run time
AssertionError: expected Map{ …(2) } to deeply equal Map{ …(2) }

- Expected
+ Received

@@ -4,8 +4,9 @@
      "priority": 10,
      "title": "foo2",
    },
    "\"electric_test\".\"issues for 509504068_1_10_0_191acc7f248238\"/\"bcae034c-bb5d-4c74-9465-ce348dff2f8f\"" => {
      "id": "bcae034c-bb5d-4c74-9465-ce348dff2f8f",
+     "priority": 10,
      "title": "foo1",
    },
  }

 ❯ test/integration.test.ts:428:25
test/multi-shape-stream.test.ts > MultiShapeStream > should continually sync multiple shapes
Stack Traces | 0.102s run time
AssertionError: expected 3 to be 2 // Object.is equality

- Expected
+ Received

- 2
+ 3

 ❯ test/multi-shape-stream.test.ts:238:7
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming crash/restart with partial fragments persisted recovers correctly
Stack Traces | 0.168s run time
24) test fragment-direct streaming crash/restart with partial fragments persisted recovers correctly (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1703
     Assertion with == failed
     code:  assert latest_offset == initial_offset
     left:  LogOffset.last_before_real_offsets()
     right: LogOffset.new(0, 0)
     stacktrace:
       .../electric/shapes/consumer_test.exs:1757: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming uncommitted fragments flushed to disk do not advance latest_offset
Stack Traces | 0.181s run time
30) test fragment-direct streaming uncommitted fragments flushed to disk do not advance latest_offset (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:2111
     Assertion with == failed
     code:  assert offset_after_committed == LogOffset.new(lsn_init, 0)
     left:  LogOffset.last_before_real_offsets()
     right: LogOffset.new(50, 0)
     stacktrace:
       .../electric/shapes/consumer_test.exs:2144: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming interleaved begin fragments raise an error
Stack Traces | 0.195s run time
7) test fragment-direct streaming interleaved begin fragments raise an error (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1619
     Expected truthy, got false
     code: assert Process.alive?(consumer_pid)
     arguments:

         # 1
         #PID<0.16906.0>

     stacktrace:
       .../electric/shapes/consumer_test.exs:1686: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage transactions are buffered until snapshot xmin is known
Stack Traces | 0.221s run time
28) test transaction handling with real storage transactions are buffered until snapshot xmin is known (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:660
     ** (MatchError) no match of right hand side value:

         {:error, "Shape terminated before snapshot was ready"}

     code: :started = ShapeCache.await_snapshot_start(shape_handle, ctx.stack_id)
     stacktrace:
       .../electric/shapes/consumer_test.exs:703: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes GET returns updated schema in header after column nullability changes
Stack Traces | 0.237s run time
43) test /v1/shapes GET returns updated schema in header after column nullability changes (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:1766
     match (=) failed
     code:  assert %{status: 200} = Task.await(task)
     left:  %{status: 200}
     right: %Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{request: %Electric.Shapes.Api.Request{chunk_end_offset: LogOffset.new(678739032, 0), handle: "122524439-1769094670061637", last_offset: LogOffset.new(678739032, 0), global_last_seen_lsn: 678739032, new_changes_ref: #Reference<0.520905798.2411200514.195068>, new_changes_pid: #PID<0.26679.0>, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes GET returns updated schema in header after column nullability changes", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes GET returns updated schema in header after column nullability changes", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes GET returns updated schema in header after column nullability changes", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, params: %Electric.Shapes.Api.Params{table: "nullability_test", offset: LogOffset.new(678739032, 0), handle: "122524439-1769094670061637", live: true, where: nil, columns: nil, shape_definition: Shape.new!({21765, "public.nullability_test"}), replica: :default, params: %{}, experimental_compaction: false, live_sse: false, log: :full, subset: nil}, response: %Electric.Shapes.Api.Response{handle: "122524439-1769094670061637", offset: LogOffset.new(678739032, 0), shape_definition: Shape.new!({21765, "public.nullability_test"}), known_error: nil, retry_after: nil, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes GET returns updated schema in header after column nullability changes", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes GET returns updated schema in header after column nullability changes", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes GET returns updated schema in header after column nullability changes", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, chunked: false, up_to_date: true, no_changes: false, response_type: :normal_log, params: %Electric.Shapes.Api.Params{...}, ...}}, ...}, ...}
     stacktrace:
       .../electric/plug/router_test.exs:1827: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage should hibernate not suspend if has dependencies
Stack Traces | 0.24s run time
18) test transaction handling with real storage should hibernate not suspend if has dependencies (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:851
     match (=) failed
     code:  assert {:current_function, {:gen_server, :loop_hibernate, 4}} =
              Process.info(consumer_pid, :current_function)
     left:  {:current_function, {:gen_server, :loop_hibernate, 4}}
     right: {:current_function, {:gen_server, :loop, 5}}
     stacktrace:
       .../electric/shapes/consumer_test.exs:884: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries subqueries can reference composite PKs
Stack Traces | 0.254s run time
47) test /v1/shapes - subqueries subqueries can reference composite PKs (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2672
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "1", "role" => "Admin"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "1", "role" => "Admin"}}, _]}
     right: {%{handle: "83372609-1769094669103496", offset: "0_0", table: "member_details", where: "(user_id, team_id) IN (SELECT user_id, team_id FROM members WHERE flag = TRUE)", live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2692: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries allows 3 level subquery in where clauses
Stack Traces | 0.269s run time
3) test /v1/shapes - subqueries allows 3 level subquery in where clauses (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2431
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "1", "value" => "2"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "1", "value" => "2"}}, _]}
     right: {%{handle: "41011198-1769094714921776", offset: "0_0", table: "child", where: "parent_id in (SELECT id FROM parent WHERE grandparent_id in (SELECT id FROM grandparent WHERE value = 10))", live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2455: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries allows subquery in where clause
Stack Traces | 0.287s run time
12) test /v1/shapes - subqueries allows subquery in where clause (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2198
     match (=) failed
     code:  assert %{status: 200} = conn = Task.await(task)
     left:  %{status: 200}
     right: %Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{request: %Electric.Shapes.Api.Request{chunk_end_offset: LogOffset.last_before_real_offsets(), handle: "29101195-1769094702400053", last_offset: LogOffset.last_before_real_offsets(), global_last_seen_lsn: 0, new_changes_ref: #Reference<0.520905798.2411200514.221837>, new_changes_pid: #PID<0.29481.0>, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, params: %Electric.Shapes.Api.Params{table: "child", offset: LogOffset.new(0, 0), handle: "29101195-1769094702400053", live: true, where: "parent_id in (SELECT id FROM parent WHERE value = 1)", columns: nil, shape_definition: Shape.new!({22423, "public.child"}, where: "parent_id IN (SELECT id FROM public.parent WHERE value = 1)", deps: [Shape.new!({22418, "public.parent"}, where: "value = 1", columns: ["id"])]), replica: :default, params: %{}, experimental_compaction: false, live_sse: false, log: :full, subset: nil}, response: %Electric.Shapes.Api.Response{handle: "29101195-1769094702400053", offset: LogOffset.last_before_real_offsets(), shape_definition: Shape.new!({22423, "public.child"}, where: "parent_id IN (SELECT id FROM public.parent WHERE value = 1)", deps: [Shape.new!({22418, "public.parent"}, where: "value = 1", columns: ["id"])]), known_error: nil, retry_after: nil, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries allows subquery in where clause", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, chunked: false, up_to_date: true, no_changes: false, response_type: :normal_log, params: %Electric.Shapes.Api.Params{...}, ...}}, ...}, ...}
     stacktrace:
       .../electric/plug/router_test.exs:2238: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries subqueries work with params
Stack Traces | 0.295s run time
56) test /v1/shapes - subqueries subqueries work with params (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2745
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "2", "other_value" => "20"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "2", "other_value" => "20"}}, _]}
     right: {%{handle: "69757972-1769094647221524", offset: "0_0", table: "child", where: "value in (SELECT value FROM parent WHERE other_value >= $2) AND other_value >= $1", params: %{"1" => "10", "2" => "6"}, live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2767: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes GET returns correct INSERT and DELETE operations that have been converted from UPDATEs
Stack Traces | 0.305s run time
44) test /v1/shapes GET returns correct INSERT and DELETE operations that have been converted from UPDATEs (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:1179
     match (=) failed
     code:  assert %{status: 200} = conn = Task.await(task)
     left:  %{status: 200}
     right: %Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{request: %Electric.Shapes.Api.Request{chunk_end_offset: LogOffset.new(674289024, 2), handle: "24505061-1769094669818819", last_offset: LogOffset.new(674289024, 2), global_last_seen_lsn: 674289024, new_changes_ref: #Reference<0.520905798.2411462657.99213>, new_changes_pid: #PID<0.26575.0>, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes GET returns correct INSERT and DELETE operations that have been converted from UPDATEs", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes GET returns correct INSERT and DELETE operations that have been converted from UPDATEs", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes GET returns correct INSERT and DELETE operations that have been converted from UPDATEs", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, params: %Electric.Shapes.Api.Params{table: "serial_ids", offset: LogOffset.new(674289024, 2), handle: "24505061-1769094669818819", live: true, where: "num > 5", columns: nil, shape_definition: Shape.new!({21735, "public.serial_ids"}, where: "num > 5"), replica: :default, params: %{}, experimental_compaction: false, live_sse: false, log: :full, subset: nil}, response: %Electric.Shapes.Api.Response{handle: "24505061-1769094669818819", offset: LogOffset.new(674289024, 2), shape_definition: Shape.new!({21735, "public.serial_ids"}, where: "num > 5"), known_error: nil, retry_after: nil, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes GET returns correct INSERT and DELETE operations that have been converted from UPDATEs", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes GET returns correct INSERT and DELETE operations that have been converted from UPDATEs", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes GET returns correct INSERT and DELETE operations that have been converted from UPDATEs", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, chunked: false, up_to_date: true, no_changes: false, response_type: :normal_log, params: %Electric.Shapes.Api.Params{...}, ...}}, ...}, ...}
     stacktrace:
       .../electric/plug/router_test.exs:1258: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries table with a composite PK can be used in a subquery
Stack Traces | 0.308s run time
72) test /v1/shapes - subqueries table with a composite PK can be used in a subquery (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2599
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "1", "name" => "Team C"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "1", "name" => "Team C"}}, _]}
     right: {%{handle: "93678359-1769094633059928", offset: "0_0", table: "teams", where: "id IN (SELECT team_id FROM members WHERE user_id = 1)", live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2619: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries allows subquery in where clauses that reference non-PK columns
Stack Traces | 0.318s run time
29) test /v1/shapes - subqueries allows subquery in where clauses that reference non-PK columns (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2373
     match (=) failed
     code:  assert {req, 200, [%{"value" => %{"id" => "1", "other_value" => "2"}}, _]} = Task.await(task)
     left:  {req, 200, [%{"value" => %{"id" => "1", "other_value" => "2"}}, _]}
     right: {%{handle: "128600544-1769094690414784", offset: "0_0", table: "child", where: "value in (SELECT value FROM parent WHERE other_value >= 10)", live: true}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2398: (test)
test/integration.test.ts > HTTP Sync > should get initial data and then receive updates (liveSSE=true)
Stack Traces | 0.343s run time
AssertionError: expected Map{ …(2) } to deeply equal Map{ …(2) }

- Expected
+ Received

@@ -1,8 +1,9 @@
  Map {
    "\"electric_test\".\"issues for 509504068_1_11_0_c6328db3c2dc88\"/\"0e520603-f862-4e9a-b9f6-03ab4022758f\"" => {
      "id": "0e520603-f862-4e9a-b9f6-03ab4022758f",
+     "priority": 10,
      "title": "foo1",
    },
    "\"electric_test\".\"issues for 509504068_1_11_0_c6328db3c2dc88\"/\"e6ec8a28-d8de-4722-85b9-3dbf837c09c0\"" => {
      "id": "e6ec8a28-d8de-4722-85b9-3dbf837c09c0",
      "priority": 10,

 ❯ test/integration.test.ts:428:25
Elixir.Electric.Shapes.ConsumerTest::test event handling appends to log when xid >= xmin
Stack Traces | 0.429s run time
19) test event handling appends to log when xid >= xmin (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:188
     Assertion failed, no matching message after 400ms
     Showing 10 of 12 messages in the mailbox
     code: assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _}
     mailbox:
       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :for_shape, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_latest_offset, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape1", [{LogOffset.new(16, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[100],\"op_position\":0}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape1", 100}
     stacktrace:
       .../electric/shapes/consumer_test.exs:211: (test)
Elixir.Electric.Shapes.ConsumerTest::test event handling notifies listeners of new changes
Stack Traces | 0.479s run time
6) test event handling notifies listeners of new changes (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:394
     Assertion failed, no matching message after 400ms
     Showing 10 of 13 messages in the mailbox
     code: assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1, _}
     mailbox:
       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :for_shape, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_latest_offset, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :set_pg_snapshot, "Electric.Shapes.ConsumerTest-shape1", %{xmin: 100, xmax: 101, xip_list: ~c"d", filter_txns?: false}}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape1", [{LogOffset.new(16, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[150],\"op_position\":0}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1", _}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape1", 150}
     stacktrace:
       .../electric/shapes/consumer_test.exs:413: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries multiple actions result in a correct event sequence
Stack Traces | 0.482s run time
28) test /v1/shapes - subqueries multiple actions result in a correct event sequence (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2823
     match (=) failed
     The following variables were pinned:
       tag = "76558cc5a7532b5182fba5dc9f8247e7"
     code:  assert {req, 200,
             [
               %{"headers" => %{"event" => "move-out"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true, "tags" => [tag2]},
                 "value" => %{"parent_id" => "2", "value" => "12"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true, "tags" => [^tag]},
                 "value" => %{"id" => "1", "parent_id" => "1", "value" => "13"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]} = shape_req(req, ctx.opts)
     left:  {req, 200,
             [
               %{"headers" => %{"event" => "move-out"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true, "tags" => [tag2]},
                 "value" => %{"parent_id" => "2", "value" => "12"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true, "tags" => [^tag]},
                 "value" => %{"id" => "1", "parent_id" => "1", "value" => "13"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]}
     right: {%{handle: "91561037-1769094690826568", offset: "0_0", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: false}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2848: (test)
Elixir.Electric.Shapes.ConsumerTest::test event handling correctly writes only relevant changes to multiple shape logs
Stack Traces | 0.514s run time
9) test event handling correctly writes only relevant changes to multiple shape logs (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:229
     Assertion failed, no matching message after 400ms
     Showing 10 of 16 messages in the mailbox
     code: assert_receive {Support.TestStorage, :append_to_log!, @shape_handle1,
            [{_offset, _key, _type, serialized_record}]}
     mailbox:
       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :fetch_latest_offset, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :set_pg_snapshot, "Electric.Shapes.ConsumerTest-shape1", %{xmin: 100, xmax: 101, xip_list: ~c"d", filter_txns?: false}}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :set_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2", %{xmin: 120, xmax: 121, xip_list: ~c"x", filter_txns?: false}}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape1", [{LogOffset.new(16, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[150],\"op_position\":0}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape1", 150}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape2", [{LogOffset.new(16, 1), "\"public\".\"other_table\"/\"2\"", :insert, "{\"value\":{\"id\":\"2\"},\"key\":\"\\\"public\\\".\\\"other_table\\\"/\\\"2\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"other_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[150],\"op_position\":1}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape1",
                 [{_offset, _key, _type, serialized_record}]}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape2", 150}
     stacktrace:
       .../electric/shapes/consumer_test.exs:269: (test)
Elixir.Electric.Shapes.ConsumerTest::test event handling doesn't append to log when change is irrelevant for active shapes
Stack Traces | 0.59s run time
33) test event handling doesn't append to log when change is irrelevant for active shapes (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:286
     Assertion failed, no matching message after 400ms
     Showing 10 of 14 messages in the mailbox
     code: assert_receive {Support.TestStorage, :append_to_log!, @shape_handle2, _}
     mailbox:
       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape1"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :for_shape, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :fetch_latest_offset, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :snapshot_started?, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "Electric.Shapes.ConsumerTest-shape2"}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :set_pg_snapshot, "Electric.Shapes.ConsumerTest-shape2", %{xmin: 120, xmax: 121, xip_list: ~c"x", filter_txns?: false}}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :append_fragment_to_log!, "Electric.Shapes.ConsumerTest-shape2", [{LogOffset.new(16, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"16\",\"txids\":[150],\"op_position\":0}}"}]}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {Support.TestStorage, :signal_txn_commit!, "Electric.Shapes.ConsumerTest-shape2", 150}

       pattern: {Support.TestStorage, :append_to_log!, "Electric.Shapes.ConsumerTest-shape2", _}
       value:   {#Reference<0.520905798.2410938372.86925>, :new_changes, LogOffset.new(16, 0)}
     stacktrace:
       .../electric/shapes/consumer_test.exs:305: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes GET after a compaction proceeds correctly
Stack Traces | 0.8s run time
48) test /v1/shapes GET after a compaction proceeds correctly (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:151
     match (=) failed
     code:  assert [%{"headers" => %{"control" => "snapshot-end"}}] = response
     left:  [%{"headers" => %{"control" => "snapshot-end"}}]
     right: [%{"headers" => %{"control" => "must-refetch"}}]
     stacktrace:
       .../electric/plug/router_test.exs:182: anonymous fn/4 in Electric.Plug.RouterTest."test /v1/shapes GET after a compaction proceeds correctly"/1
       (elixir 1.19.1) lib/enum.ex:4562: Enum.reduce/3
       .../electric/plug/router_test.exs:171: (test)
test/client.test.ts > Shape - changes_only mode (liveSSE=false) > should ignore updates/deletes for unseen keys until insert observed
Stack Traces | 1.06s run time
AssertionError: expected false to be true // Object.is equality

- Expected
+ Received

- true
+ false

 ❯ test/client.test.ts:1992:11
test/client.test.ts > Shape - changes_only mode (liveSSE=true) > should ignore updates/deletes for unseen keys until insert observed
Stack Traces | 1.09s run time
AssertionError: expected false to be true // Object.is equality

- Expected
+ Received

- true
+ false

 ❯ test/client.test.ts:1992:11
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming commit-only fragment with no prior fragments processes correctly
Stack Traces | 1.12s run time
1) test fragment-direct streaming commit-only fragment with no prior fragments processes correctly (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:2033
     Assertion failed, no matching message after 1000ms
     The following variables were pinned:
       ref = #Reference<0.520905798.2411462660.25286>
     Showing 10 of 16 messages in the mailbox
     code: assert_receive {^ref, :new_changes, offset}
     mailbox:
       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :make_new_snapshot!, "27335419-1769094598941422", []}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769094598941422", %{xmin: 10, xmax: 11, xip_list: [], filter_txns?: true}}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "27335419-1769094598941422"}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769094598941422", %{xmin: 10, xmax: 11, xip_list: [], filter_txns?: false}}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :append_fragment_to_log!, "27335419-1769094598941422", [{LogOffset.new(50, 0), "\"public\".\"test_table\"/\"init\"", :insert, "{\"value\":{\"id\":\"init\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"init\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"50\",\"txids\":[100],\"op_position\":0}}"}]}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :signal_txn_commit!, "27335419-1769094598941422", 100}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :terminate, "27335419-1769094598941422"}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :cleanup!, "27335419-1769094598941422"}

       pattern: {^ref, :new_changes, offset}
       value:   {#Reference<0.520905798.2411462660.25286>, :shape_rotation}

       pattern: {^ref, :new_changes, offset}
       value:   {Electric.ShapeCache.ShapeCleaner, :cleanup, "27335419-1769094598941422"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:2090: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming fragment-direct mode disabled during initial filtering phase
Stack Traces | 1.14s run time
20) test fragment-direct streaming fragment-direct mode disabled during initial filtering phase (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1469
     Assertion failed, no matching message after 1000ms
     Showing 9 of 9 messages in the mailbox
     code: assert_receive {:flush_boundary_updated, 100}
     mailbox:
       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :for_shape, "27335419-1769094587547973"}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :init_writer!, "27335419-1769094587547973", Shape.new!({36849815, "public.test_table"})}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :fetch_latest_offset, "27335419-1769094587547973"}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :fetch_pg_snapshot, "27335419-1769094587547973"}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :snapshot_started?, "27335419-1769094587547973"}

       pattern: {:flush_boundary_updated, 100}
       value:   {:snapshot, "27335419-1769094587547973"}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :make_new_snapshot!, "27335419-1769094587547973", []}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769094587547973", %{xmin: 10, xmax: 15, xip_list: ~c"\f", filter_txns?: true}}

       pattern: {:flush_boundary_updated, 100}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "27335419-1769094587547973"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:1504: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage should notify txns skipped because of xmin/xip as flushed
Stack Traces | 1.16s run time
29) test transaction handling with real storage should notify txns skipped because of xmin/xip as flushed (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:779
     Assertion failed, no matching message after 1000ms
     Showing 1 of 1 message in the mailbox
     code: assert_receive {:flush_boundary_updated, 300}
     mailbox:
       pattern: {:flush_boundary_updated, 300}
       value:   {:snapshot, "27335419-1769094584470354"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:810: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming skipped fragments during recovery still notify flush boundary
Stack Traces | 1.18s run time
12) test fragment-direct streaming skipped fragments during recovery still notify flush boundary (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1364
     Assertion failed, no matching message after 1000ms
     The following variables were pinned:
       ref = #Reference<0.520905798.2411200513.16774>
     Showing 10 of 17 messages in the mailbox
     code: assert_receive {^ref, :new_changes, _}
     mailbox:
       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769094592261248", %{xmin: 10, xmax: 11, xip_list: ~c"\n", filter_txns?: true}}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "27335419-1769094592261248"}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769094592261248", %{xmin: 10, xmax: 11, xip_list: ~c"\n", filter_txns?: false}}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :append_fragment_to_log!, "27335419-1769094592261248", [{LogOffset.new(100, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"100\",\"txids\":[50],\"op_position\":0}}"}]}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :signal_txn_commit!, "27335419-1769094592261248", 50}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :terminate, "27335419-1769094592261248"}

       pattern: {^ref, :new_changes, _}
       value:   {Support.TestStorage, :cleanup!, "27335419-1769094592261248"}

       pattern: {^ref, :new_changes, _}
       value:   {#Reference<0.520905798.2411200513.16774>, :shape_rotation}

       pattern: {^ref, :new_changes, _}
       value:   {Electric.ShapeCache.ShapeCleaner, :cleanup, "27335419-1769094592261248"}

       pattern: {^ref, :new_changes, _}
       value:   {:flush_boundary_updated, 200}
     stacktrace:
       .../electric/shapes/consumer_test.exs:1403: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage should terminate after :hibernate_after ms
Stack Traces | 1.2s run time
15) test transaction handling with real storage should terminate after :hibernate_after ms (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:816
     Assertion failed, no matching message after 1000ms
     Showing 1 of 1 message in the mailbox
     code: assert_receive {:flush_boundary_updated, 300}
     mailbox:
       pattern: {:flush_boundary_updated, 300}
       value:   {:snapshot, "27335419-1769094590259804"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:840: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes GET receives only specified columns out of wide table
Stack Traces | 1.24s run time
10) test /v1/shapes GET receives only specified columns out of wide table (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:1016
     match (=) failed
     code:  assert %{status: 200} = conn = Task.await(task)
     left:  %{status: 200}
     right: %Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{request: %Electric.Shapes.Api.Request{chunk_end_offset: LogOffset.last_before_real_offsets(), handle: "58823085-1769094703089178", last_offset: LogOffset.last_before_real_offsets(), global_last_seen_lsn: 0, new_changes_ref: #Reference<0.520905798.2411462657.131189>, new_changes_pid: #PID<0.29682.0>, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes GET receives only specified columns out of wide table", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes GET receives only specified columns out of wide table", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes GET receives only specified columns out of wide table", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, params: %Electric.Shapes.Api.Params{table: "wide_table", offset: LogOffset.new(0, 0), handle: "58823085-1769094703089178", live: true, where: nil, columns: ["id", "value1"], shape_definition: Shape.new!({22468, "public.wide_table"}, columns: ["id", "value1"]), replica: :default, params: %{}, experimental_compaction: false, live_sse: false, log: :full, subset: nil}, response: %Electric.Shapes.Api.Response{handle: "58823085-1769094703089178", offset: LogOffset.last_before_real_offsets(), shape_definition: Shape.new!({22468, "public.wide_table"}, columns: ["id", "value1"]), known_error: nil, retry_after: nil, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes GET receives only specified columns out of wide table", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes GET receives only specified columns out of wide table", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes GET receives only specified columns out of wide table", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, chunked: false, up_to_date: true, no_changes: false, ...}}, ...}, ...}
     stacktrace:
       .../electric/plug/router_test.exs:1058: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming mid-fragment without prior begin creates pending_txn on-the-fly
Stack Traces | 1.26s run time
3) test fragment-direct streaming mid-fragment without prior begin creates pending_txn on-the-fly (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1934
     Assertion failed, no matching message after 1000ms
     The following variables were pinned:
       ref = #Reference<0.520905798.2411462660.16698>
     Showing 10 of 16 messages in the mailbox
     code: assert_receive {^ref, :new_changes, offset}
     mailbox:
       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :make_new_snapshot!, "27335419-1769094596488275", []}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769094596488275", %{xmin: 10, xmax: 11, xip_list: [], filter_txns?: true}}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "27335419-1769094596488275"}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :set_pg_snapshot, "27335419-1769094596488275", %{xmin: 10, xmax: 11, xip_list: [], filter_txns?: false}}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :append_fragment_to_log!, "27335419-1769094596488275", [{LogOffset.new(50, 0), "\"public\".\"test_table\"/\"init\"", :insert, "{\"value\":{\"id\":\"init\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"init\\\"\",\"headers\":{\"last\":true,\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"50\",\"txids\":[100],\"op_position\":0}}"}]}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :signal_txn_commit!, "27335419-1769094596488275", 100}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :terminate, "27335419-1769094596488275"}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :cleanup!, "27335419-1769094596488275"}

       pattern: {^ref, :new_changes, offset}
       value:   {#Reference<0.520905798.2411462660.16698>, :shape_rotation}

       pattern: {^ref, :new_changes, offset}
       value:   {Electric.ShapeCache.ShapeCleaner, :cleanup, "27335419-1769094596488275"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:2015: (test)
Elixir.Electric.Shapes.ConsumerTest::test fragment-direct streaming commit-only fragment with no relevant changes still signals commit
Stack Traces | 1.27s run time
36) test fragment-direct streaming commit-only fragment with no relevant changes still signals commit (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:1798
     Assertion failed, no matching message after 1000ms
     The following variables were pinned:
       ref = #Reference<0.520905798.2410938370.91543>
     Showing 10 of 16 messages in the mailbox
     code: assert_receive {^ref, :new_changes, offset}
     mailbox:
       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :make_new_snapshot!, "44903071-1769094580805810", []}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :set_pg_snapshot, "44903071-1769094580805810", %{xmin: 10, xmax: 11, xip_list: ~c"\n", filter_txns?: true}}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :mark_snapshot_as_started, "44903071-1769094580805810"}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :set_pg_snapshot, "44903071-1769094580805810", %{xmin: 10, xmax: 11, xip_list: ~c"\n", filter_txns?: false}}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :append_fragment_to_log!, "44903071-1769094580805810", [{LogOffset.new(100, 0), "\"public\".\"test_table\"/\"1\"", :insert, "{\"value\":{\"id\":\"1\"},\"key\":\"\\\"public\\\".\\\"test_table\\\"/\\\"1\\\"\",\"headers\":{\"relation\":[\"public\",\"test_table\"],\"operation\":\"insert\",\"lsn\":\"100\",\"txids\":[50],\"op_position\":0}}"}]}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :terminate, "44903071-1769094580805810"}

       pattern: {^ref, :new_changes, offset}
       value:   {Support.TestStorage, :cleanup!, "44903071-1769094580805810"}

       pattern: {^ref, :new_changes, offset}
       value:   {#Reference<0.520905798.2410938370.91543>, :shape_rotation}

       pattern: {^ref, :new_changes, offset}
       value:   {Electric.ShapeCache.ShapeCleaner, :cleanup, "44903071-1769094580805810"}

       pattern: {^ref, :new_changes, offset}
       value:   {:flush_boundary_updated, 100}
     stacktrace:
       .../electric/shapes/consumer_test.exs:1851: (test)
test/multi-shape-stream.test.ts > TransactionalMultiShapeStream > should maintain transaction boundaries across multiple transactions
Stack Traces | 1.31s run time
AssertionError: expected 3 to be 2 // Object.is equality

- Expected
+ Received

- 2
+ 3

 ❯ test/multi-shape-stream.test.ts:510:35
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries move-in into move-out into move-in of the same parent results in a 
Stack Traces | 1.35s run time
69) test /v1/shapes - subqueries move-in into move-out into move-in of the same parent results in a  (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2890
     match (=) failed
     code:  assert {_req, 200,
             [
               %{"headers" => %{"event" => "move-out", "patterns" => p1}},
               %{"headers" => %{"event" => "move-out", "patterns" => p1}},
               %{"headers" => %{"control" => "snapshot-end"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true},
                 "value" => %{"id" => "3", "parent_id" => "3", "value" => "30"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]} = shape_req(req, ctx.opts)
     left:  {_req, 200,
             [
               %{"headers" => %{"event" => "move-out", "patterns" => p1}},
               %{"headers" => %{"event" => "move-out", "patterns" => p1}},
               %{"headers" => %{"control" => "snapshot-end"}},
               %{
                 "headers" => %{"operation" => "insert", "is_move_in" => true},
                 "value" => %{"id" => "3", "parent_id" => "3", "value" => "30"}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]}
     right: {%{handle: "58602694-1769094638826463", offset: "0_0", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: false}, 409, [%{"headers" => %{"control" => "must-refetch"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2920: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries a move-out from the inner shape is propagated to the outer shape
Stack Traces | 4.24s run time
51) test /v1/shapes - subqueries a move-out from the inner shape is propagated to the outer shape (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2259
     match (=) failed
     code:  assert {_req, 200, [data, %{"headers" => %{"control" => "up-to-date"}}]} = Task.await(task)
     left:  {_req, 200, [data, %{"headers" => %{"control" => "up-to-date"}}]}
     right: {%{handle: "29169213-1769094658617704", offset: "0_inf", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "643155120"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2278: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries move-outs while processing move-ins are handled correctly
Stack Traces | 4.29s run time
19) test /v1/shapes - subqueries move-outs while processing move-ins are handled correctly (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2938
     match (=) failed
     code:  assert {req, 200,
             [
               %{
                 "value" => %{"id" => "1", "parent_id" => "1", "value" => "10"},
                 "headers" => %{"operation" => "insert", "tags" => [tag]}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]} = Task.await(task)
     left:  {req, 200,
             [
               %{
                 "value" => %{"id" => "1", "parent_id" => "1", "value" => "10"},
                 "headers" => %{"operation" => "insert", "tags" => [tag]}
               },
               %{"headers" => %{"control" => "snapshot-end"}},
               up_to_date_ctl()
             ]}
     right: {%{handle: "98445294-1769094696753713", offset: "0_inf", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "808835416"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2949: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries a move-in from the inner shape causes a query and new entries in the outer shape
Stack Traces | 4.29s run time
30) test /v1/shapes - subqueries a move-in from the inner shape causes a query and new entries in the outer shape (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2294
     match (=) failed
     code:  assert {_, 200, [data, %{"headers" => %{"control" => "snapshot-end"}}, up_to_date_ctl()]} =
              Task.await(task)
     left:  {_, 200, [data, %{"headers" => %{"control" => "snapshot-end"}}, up_to_date_ctl()]}
     right: {%{handle: "115235574-1769094686132255", offset: "0_inf", table: "child", where: "parent_id in (SELECT id FROM parent WHERE value = 1)", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "732445080"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2319: (test)
test/client.test.ts > Shape  (liveSSE=false) > should resync from scratch on a shape rotation
Stack Traces | 5.03s run time
Unknown Error: Timed out waiting for data changes
Elixir.Electric.Shapes.PartitionedTablesTest::test new partition tables prompt reload of relation info
Stack Traces | 5.47s run time
2) test new partition tables prompt reload of relation info (Electric.Shapes.PartitionedTablesTest)
     .../electric/shapes/partitioned_tables_test.exs:106
     Assertion failed, no matching message after 5000ms
     The following variables were pinned:
       ref = #Reference<0.520905798.2410938370.212873>
     Showing 10 of 11 messages in the mailbox
     code: assert_receive {^ref, :new_changes, _latest_log_offset}
     mailbox:
       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {:stack_status, #Reference<0.520905798.2410938369.222630>, :connection_lock_acquired}

       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {Electric.PersistentKV.Memory, {:set, "timeline_id_Electric.Shapes.PartitionedTablesTest test new partition tables prompt reload of relation info", "[\"7598203181650014242\",\"1\"]"}}

       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {Electric.PersistentKV.Memory, {:set, "Electric.Shapes.PartitionedTablesTest test new partition tables prompt reload of relation info:ets_inspector_state", [version: 1, data: [{{:oid_info, 17092}, %{parent: nil, relation: {"public", "partitioned_items"}, kind: :partitioned_table, children: [{"public", "partitioned_items_100"}, {"public", "partitioned_items_200"}], relation_id: 17092}, [%{name: "a", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17092, pk_position: 0, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}, %{name: "b", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17092, pk_position: 1, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}]}, {{:relation_to_oid, {"public", "partitioned_items"}}, 17092}]]}}

       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {Electric.PersistentKV.Memory, {:set, "Electric.Shapes.PartitionedTablesTest test new partition tables prompt reload of relation info:tracked_relations", %{id_to_table_info: %{17102 => %Electric.Replication.Changes.Relation{id: 17102, schema: "public", table: "partitioned_items_200", columns: [%Electric.Replication.Changes.Column{name: "a", type_oid: 23}, %Electric.Replication.Changes.Column{name: "b", type_oid: 23}], affected_columns: []}}, table_to_id: %{{"public", "partitioned_items_200"} => 17102}}}}

       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {Electric.PersistentKV.Memory, {:set, "Electric.Shapes.PartitionedTablesTest test new partition tables prompt reload of relation info:ets_inspector_state", [version: 1, data: [{:supported_features, %{supports_generated_column_replication: false}}, {{:oid_info, 17092}, %{parent: nil, relation: {"public", "partitioned_items"}, kind: :partitioned_table, children: [{"public", "partitioned_items_100"}, {"public", "partitioned_items_200"}], relation_id: 17092}, [%{name: "a", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17092, pk_position: 0, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}, %{name: "b", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17092, pk_position: 1, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}]}, {{:oid_info, 17102}, %{parent: {"public", "partitioned_items"}, relation: {"public", "partitioned_items_200"}, kind: :ordinary_table, children: nil, relation_id: 17102}, [%{name: "a", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17102, pk_position: 0, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}, %{name: "b", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17102, ...}]}, ...]]}}

       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {Electric.PersistentKV.Memory, {:set, "Electric.Shapes.PartitionedTablesTest test new partition tables prompt reload of relation info:tracked_relations", %{id_to_table_info: %{17102 => %Electric.Replication.Changes.Relation{id: 17102, schema: "public", table: "partitioned_items_200", columns: [%Electric.Replication.Changes.Column{name: "a", type_oid: 23}, %Electric.Replication.Changes.Column{name: "b", type_oid: 23}], affected_columns: []}, 17115 => %Electric.Replication.Changes.Relation{id: 17115, schema: "public", table: "partitioned_items_300", columns: [%Electric.Replication.Changes.Column{name: "a", type_oid: 23}, %Electric.Replication.Changes.Column{name: "b", type_oid: 23}], affected_columns: []}}, table_to_id: %{{"public", "partitioned_items_200"} => 17102, {"public", "partitioned_items_300"} => 17115}}}}

       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {Electric.PersistentKV.Memory, {:set, "Electric.Shapes.PartitionedTablesTest test new partition tables prompt reload of relation info:ets_inspector_state", [version: 1, data: [{:supported_features, %{supports_generated_column_replication: false}}, {{:oid_info, 17102}, %{parent: {"public", "partitioned_items"}, relation: {"public", "partitioned_items_200"}, kind: :ordinary_table, children: nil, relation_id: 17102}, [%{name: "a", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17102, pk_position: 0, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}, %{name: "b", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17102, pk_position: 1, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}]}, {{:oid_info, 17115}, %{parent: {"public", "partitioned_items"}, relation: {"public", "partitioned_items_300"}, kind: :ordinary_table, children: nil, relation_id: 17115}, [%{name: "a", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17115, pk_position: 0, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}, %{name: "b", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17115, pk_position: 1, array_type: nil, type_mod: -1, array_dimensions: 0, ...}]}, ...]]}}

       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {Electric.PersistentKV.Memory, {:set, "Electric.Shapes.PartitionedTablesTest test new partition tables prompt reload of relation info:ets_inspector_state", [version: 1, data: [{:supported_features, %{supports_generated_column_replication: false}}, {{:oid_info, 17092}, %{parent: nil, relation: {"public", "partitioned_items"}, kind: :partitioned_table, children: [{"public", "partitioned_items_100"}, {"public", "partitioned_items_200"}, {"public", "partitioned_items_300"}], relation_id: 17092}, [%{name: "a", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17092, pk_position: 0, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}, %{name: "b", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17092, pk_position: 1, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}]}, {{:oid_info, 17102}, %{parent: {"public", "partitioned_items"}, relation: {"public", "partitioned_items_200"}, kind: :ordinary_table, children: nil, relation_id: 17102}, [%{name: "a", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, -1}, is_generated: false, relation_id: 17102, pk_position: 0, array_type: nil, type_mod: -1, array_dimensions: 0, not_null: true}, %{name: "b", type: "int4", formatted_type: "integer", type_kind: :base, type_id: {23, ...}, ...}]}, ...]]}}

       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {#Reference<0.520905798.2410938370.212873>, :shape_rotation}

       pattern: {^ref, :new_changes, _latest_log_offset}
       value:   {Electric.ShapeCache.ShapeCleaner, :cleanup, "125914160-1769094587557457"}
     stacktrace:
       .../electric/shapes/partitioned_tables_test.exs:148: (test)
test/client.test.ts > Shape  (liveSSE=true) > should continually sync a shape/table
Stack Traces | 30s run time
Error: Test timed out in 30000ms.
If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout".
 ❯ test/client.test.ts:151:3
test/client.test.ts > Shape  (liveSSE=false) > should continually sync a shape/table
Stack Traces | 30s run time
Error: Test timed out in 30000ms.
If this is a long-running test, pass a timeout value as the last argument or configure it globally with "testTimeout".
 ❯ test/client.test.ts:151:3
View the full list of 14 ❄️ flaky test(s)
Elixir.Electric.Integration.SubqueryMoveOutTest::test resume preserves move-out state move-out after resume generates synthetic delete

Flake rate in main: 22.81% (Passed 132 times, Failed 39 times)

Stack Traces | 4.23s run time
4) test resume preserves move-out state move-out after resume generates synthetic delete (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:330
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :up_to_date, global_last_seen_lsn: 915460680, handle: "123218933-1769094737987086", request_timestamp: ~U[2026-01-22 15:12:18.070544Z]}
     code: assert_delete(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:355: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test subquery move-out with parent/child tables deleting parent row triggers move-out

Flake rate in main: 22.81% (Passed 132 times, Failed 39 times)

Stack Traces | 0.25s run time
2) test subquery move-out with parent/child tables deleting parent row triggers move-out (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:113
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :must_refetch, global_last_seen_lsn: nil, handle: "117754858-1769094750370917-next", request_timestamp: ~U[2026-01-22 15:12:30.469364Z]}
     code: assert_delete(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:129: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test subquery move-out with parent/child tables move-in after row becomes visible through different parent

Flake rate in main: 22.81% (Passed 132 times, Failed 39 times)

Stack Traces | 4.26s run time
5) test subquery move-out with parent/child tables move-in after row becomes visible through different parent (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:138
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :up_to_date, global_last_seen_lsn: 911062104, handle: "84270689-1769094729752093", request_timestamp: ~U[2026-01-22 15:12:09.833898Z]}
     code: assert_delete(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:154: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test subquery move-out with parent/child tables move-out generates synthetic deletes for all affected child rows

Flake rate in main: 22.81% (Passed 132 times, Failed 39 times)

Stack Traces | 5.22s run time
9) test subquery move-out with parent/child tables move-out generates synthetic deletes for all affected child rows (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:80
     ** (MatchError) no match of right hand side value:

         {:error, :timeout}

     code: {:ok, deletes} =
     stacktrace:
       test/integration/subquery_move_out_test.exs:97: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test subquery move-out with parent/child tables receives move-out control message when parent is deactivated

Flake rate in main: 22.81% (Passed 132 times, Failed 39 times)

Stack Traces | 4.14s run time
3) test subquery move-out with parent/child tables receives move-out control message when parent is deactivated (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:55
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :up_to_date, global_last_seen_lsn: 919859200, handle: "22165123-1769094742199517", request_timestamp: ~U[2026-01-22 15:12:22.220325Z]}
     code: assert_delete(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:71: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test tag handling during updates deactivating old parent after child changed parents should not generate delete

Flake rate in main: 22.81% (Passed 132 times, Failed 39 times)

Stack Traces | 0.168s run time
7) test tag handling during updates deactivating old parent after child changed parents should not generate delete (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:225
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :must_refetch, global_last_seen_lsn: nil, handle: "8914986-1769094729363553-next", request_timestamp: ~U[2026-01-22 15:12:09.393600Z]}
     code: assert_update(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:251: (test)
Elixir.Electric.Integration.SubqueryMoveOutTest::test tag handling during updates update that changes parent reference updates tags

Flake rate in main: 22.81% (Passed 132 times, Failed 39 times)

Stack Traces | 0.165s run time
1) test tag handling during updates update that changes parent reference updates tags (Electric.Integration.SubqueryMoveOutTest)
     test/integration/subquery_move_out_test.exs:186
     Received unexpected message: %Electric.Client.Message.ControlMessage{control: :must_refetch, global_last_seen_lsn: nil, handle: "71804294-1769094750637532-next", request_timestamp: ~U[2026-01-22 15:12:30.662529Z]}
     code: update_msg = assert_update(consumer, %{"id" => "child-1"})
     stacktrace:
       test/integration/subquery_move_out_test.exs:209: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries NOT IN subquery should return 409 on move-in to subquery

Flake rate in main: 22.81% (Passed 132 times, Failed 39 times)

Stack Traces | 4.27s run time
41) test /v1/shapes - subqueries NOT IN subquery should return 409 on move-in to subquery (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2334
     match (=) failed
     code:  assert {_req, 409, _response} = Task.await(task)
     left:  {_req, 409, _response}
     right: {%{handle: "48257360-1769094670579084", offset: "0_inf", table: "child", where: "parent_id NOT IN (SELECT id FROM parent WHERE excluded = true)", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "687646384"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2364: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries nested subquery combined with OR should return a 409 on move-in

Flake rate in main: 20.72% (Passed 199 times, Failed 52 times)

Stack Traces | 4.32s run time
32) test /v1/shapes - subqueries nested subquery combined with OR should return a 409 on move-in (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2561
     match (=) failed
     code:  assert {_req, 409, _response} = Task.await(task)
     left:  {_req, 409, _response}
     right: {%{handle: "102897730-1769094681615495", offset: "0_inf", table: "child", where: "parent_id in (SELECT id FROM parent WHERE include_parent = true OR grandparent_id in (SELECT id FROM grandparent WHERE include_grandparent = true))", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "723188344"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2588: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries subquery combined with OR should return a 409 on move-in

Flake rate in main: 20.32% (Passed 200 times, Failed 51 times)

Stack Traces | 4.25s run time
8) test /v1/shapes - subqueries subquery combined with OR should return a 409 on move-in (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2527
     match (=) failed
     code:  assert {_req, 409, _response} = Task.await(task)
     left:  {_req, 409, _response}
     right: {%{handle: "122289184-1769094709608906", offset: "0_inf", table: "child", where: "parent_id in (SELECT id FROM parent WHERE include_parent = true) OR include_child = true", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "853661656"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2550: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries subquery combined with OR should return a 409 on move-out

Flake rate in main: 21.91% (Passed 196 times, Failed 55 times)

Stack Traces | 4.29s run time
66) test /v1/shapes - subqueries subquery combined with OR should return a 409 on move-out (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2495
     match (=) failed
     code:  assert {_req, 409, _response} = Task.await(task)
     left:  {_req, 409, _response}
     right: {%{handle: "27221551-1769094640755811", offset: "0_inf", table: "child", where: "parent_id in (SELECT id FROM parent WHERE include_parent = true) OR include_child = true", live: true}, 200, [%{"headers" => %{"control" => "up-to-date", "global_last_seen_lsn" => "575997568"}}]}
     stacktrace:
       .../electric/plug/router_test.exs:2518: (test)
Elixir.Electric.Plug.RouterTest::test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in

Flake rate in main: 23.11% (Passed 163 times, Failed 49 times)

Stack Traces | 0.391s run time
65) test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in (Electric.Plug.RouterTest)
     .../electric/plug/router_test.exs:2985
     match (=) failed
     code:  assert %{status: 200} = conn = Task.await(task)
     left:  %{status: 200}
     right: %Plug.Conn{adapter: {Plug.Adapters.Test.Conn, :...}, assigns: %{request: %Electric.Shapes.Api.Request{chunk_end_offset: LogOffset.new(580472200, 0), handle: "37888362-1769094645043578", last_offset: LogOffset.new(580472200, 0), global_last_seen_lsn: 0, new_changes_ref: nil, new_changes_pid: nil, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, params: %Electric.Shapes.Api.Params{table: "projects", offset: LogOffset.new(0, 0), handle: "37888362-1769094645043578", live: true, where: "workspace_id IN (SELECT workspace_id FROM workspace_members WHERE user_id = 100) AND id IN (SELECT project_id FROM project_members WHERE user_id = 100)", columns: nil, shape_definition: Shape.new!({21271, "public.projects"}, where: "workspace_id IN (SELECT workspace_id FROM public.workspace_members WHERE user_id = 100) AND id IN (SELECT project_id FROM public.project_members WHERE user_id = 100)", deps: [Shape.new!({21278, "public.workspace_members"}, where: "user_id = 100", columns: ["workspace_id"]), Shape.new!({21283, "public.project_members"}, where: "user_id = 100", columns: ["project_id"])]), replica: :default, params: %{}, experimental_compaction: false, live_sse: false, log: :full, subset: nil}, response: %Electric.Shapes.Api.Response{handle: "37888362-1769094645043578", offset: LogOffset.new(580472200, 0), shape_definition: Shape.new!({21271, "public.projects"}, where: "workspace_id IN (SELECT workspace_id FROM public.workspace_members WHERE user_id = 100) AND id IN (SELECT project_id FROM public.project_members WHERE user_id = 100)", deps: [Shape.new!({21278, "public.workspace_members"}, where: "user_id = 100", columns: ["workspace_id"]), Shape.new!({21283, "public.project_members"}, where: "user_id = 100", columns: ["project_id"])]), known_error: nil, retry_after: nil, api: %Electric.Shapes.Api{inspector: {Electric.Postgres.Inspector.EtsInspector, [stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in", server: {:via, Registry, {:"Electric.ProcessRegistry:Electric.Plug.RouterTest test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in", {Electric.Postgres.Inspector.EtsInspector, nil}}}]}, shape: nil, stack_id: "Electric.Plug.RouterTest test /v1/shapes - subqueries supports two subqueries at the same level but returns 409 on move-in", feature_flags: ["allow_subqueries", "tagged_subqueries"], max_concurrent_requests: %{existing: 1000, initial: 300}, allow_shape_deletion: true, keepalive_interval: 21000, long_poll_timeout: 4000, sse_timeout: 60000, max_age: 60, stack_ready_timeout: 5000, stale_age: 300, send_cache_headers?: true, encoder: Electric.Shapes.Api.Encoder.JSON, sse_encoder: Electric.Shapes.Api.Encoder.SSE, configured: true}, chunked: false, up_to_date: true, no_changes: false, response_type: :normal_log, params: %Electric.Shapes.Api.Params{...}, ...}}, ...}, ...}
     stacktrace:
       .../electric/plug/router_test.exs:3024: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage ConsumerRegistry.enable_suspend should suspend hibernated consumers

Flake rate in main: 24.16% (Passed 135 times, Failed 43 times)

Stack Traces | 1.05s run time
2) test transaction handling with real storage ConsumerRegistry.enable_suspend should suspend hibernated consumers (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:904
     Assertion failed, no matching message after 1000ms
     Showing 1 of 1 message in the mailbox
     code: assert_receive {:flush_boundary_updated, 300}
     mailbox:
       pattern: {:flush_boundary_updated, 300}
       value:   {:snapshot, "27335419-1769094597804762"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:928: (test)
Elixir.Electric.Shapes.ConsumerTest::test transaction handling with real storage UPDATE during pending move-in is converted to INSERT and query result skips duplicate key

Flake rate in main: 19.26% (Passed 218 times, Failed 52 times)

Stack Traces | 0.65s run time
37) test transaction handling with real storage UPDATE during pending move-in is converted to INSERT and query result skips duplicate key (Electric.Shapes.ConsumerTest)
     .../electric/shapes/consumer_test.exs:987
     Assertion failed, no matching message after 400ms
     The following variables were pinned:
       consumer_pid = #PID<0.5534.0>
     Showing 2 of 2 messages in the mailbox
     code: assert_receive {:query_requested, name, ^consumer_pid, results_fn}
     mailbox:
       pattern: {:query_requested, name, ^consumer_pid, results_fn}
       value:   {:snapshot, "26585834-1769094580205296"}

       pattern: {:query_requested, name, ^consumer_pid, results_fn}
       value:   {:snapshot, "51496110-1769094580220024"}
     stacktrace:
       .../electric/shapes/consumer_test.exs:1038: (test)

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

@blacksmith-sh

This comment has been minimized.

@alco alco force-pushed the alco/consumer-write-txn-fragment-to-storage branch from 48b22be to e6b7397 Compare January 20, 2026 14:40
@alco alco changed the title Alco/consumer write txn fragment to storage feat(sync-service): Write transaction fragments directly to storage Jan 20, 2026
alco added 22 commits January 21, 2026 15:29
Add append_fragment_to_log!/2 and signal_txn_commit!/2 callbacks to
Storage behaviour for streaming transaction fragments directly to
storage without waiting for commit.

- Storage.ex: Define new callbacks in behaviour
- PureFileStorage: Implement callbacks (delegate to append_to_log! for now)
- InMemoryStorage: Implement callbacks
- CrashingFileStorage: Delegate to PureFileStorage
- TestStorage: Add wrapper implementations for testing

This lays groundwork for consumers without materializers to stream
fragments directly to storage, reducing memory usage for large
transactions.
Add state support for streaming transaction fragments directly to
storage for shapes without subquery dependencies.

- Create PendingTxn module to track in-progress transaction metadata
- Add fragment_direct? flag to State (true when no shape_dependencies)
- Add pending_txn field to track current incomplete transaction

The fragment_direct? flag is automatically set based on whether the
shape has subquery dependencies. Shapes without dependencies can
stream directly to storage.
…t dependencies

Add fragment-direct streaming mode that writes transaction fragments directly
to storage without buffering the complete transaction in memory. This reduces
memory pressure for shapes that don't have subquery dependencies.

Key changes:
- Add handle_fragment_direct/2 to process fragments directly to storage
- Add can_use_fragment_direct?/1 guard that checks 4 conditions:
  1. Shape has no subquery dependencies (fragment_direct? flag)
  2. Not buffering for initial snapshot
  3. No materializer subscribed (inner shapes need full txn handling)
  4. Initial snapshot filtering is complete
- Add helper functions for fragment processing:
  - skip_fragment?/2 - skip already processed fragments
  - maybe_start_pending_txn/2 - initialize PendingTxn on begin
  - write_fragment_to_storage/2 - write changes via append_fragment_to_log!
  - maybe_complete_pending_txn/2 - finalize on commit, notify clients
  - notify_clients_of_new_changes/2 - notify without materializer
- Update tests with assert_storage_append/refute_storage_append macros
  to handle both append_to_log! and append_fragment_to_log! calls
- Add consider_flushed_fragment/2 to properly notify flush boundaries when
  a transaction has no relevant changes (empty transaction handling)
- Handle nil pending_txn in write_fragment_to_storage/2 for recovery scenarios
  where a middle fragment arrives without the begin fragment
- Handle nil pending_txn in maybe_complete_pending_txn/2 for commit-only
  fragments or other recovery edge cases
Add new test suite for fragment-direct streaming edge cases:

- Multi-fragment transaction handling: Verifies that large transactions
  spanning multiple fragments are correctly accumulated and written
- Empty transaction flush boundary: Tests that transactions with no
  relevant changes still notify the flush boundary
- Truncate operation: Verifies truncate triggers shape removal
- Skipped fragments during recovery: Tests idempotency when fragments
  are replayed
- Shapes with subquery dependencies: Verifies inner shapes use
  TransactionBuilder (not fragment-direct mode)
- Initial filtering phase: Tests that fragment-direct mode is disabled
  during initial snapshot filtering
- Different table filtering: Verifies changes for other tables are
  filtered out
- Mixed fragments: Tests fragments with both relevant and irrelevant
  changes
Fragment-direct streaming was bypassing Shape.convert_change which handles:
1. Filtering changes for the shape's root table
2. Applying WHERE clause filtering
3. Converting UPDATEs to INSERTs/DELETEs for move-in/move-out scenarios

Also add maybe_mark_last_change/2 to set last?: true on the final change
in commit fragments, which is needed for clients to know when a transaction
is complete.

This fixes the router test 'GET returns correct INSERT and DELETE operations
that have been converted from UPDATEs' which was failing because UPDATEs
that moved rows in/out of a shape's filter were not being converted.
Remove repeated in-function alias calls for PendingTxn and add a single
alias at the top of the module with the other Consumer.* aliases.
Also alphabetize the Consumer.* alias block.
Update append_fragment_to_log! and signal_txn_commit! in both
PureFileStorage and InMemoryStorage to ensure that:

- append_fragment_to_log! writes log lines but does NOT advance
  last_seen_txn_offset or update @latest_offset_key
- signal_txn_commit! completes transaction tracking and updates
  the latest offset

This ensures that on crash/recovery, fetch_latest_offset returns
the last committed transaction offset, not a mid-transaction offset.
This is critical for correct recovery behavior when using fragment-
direct streaming mode.
Add explicit check in maybe_start_pending_txn/2 to raise an error when
receiving a begin fragment for a new transaction while another transaction
is still pending. This is a defensive measure to catch unexpected
replication behavior early rather than silently corrupting state.
Add tests to verify:
- Interleaved begin fragments raise an error
- Crash/restart with partial fragments persisted recovers correctly
- Commit-only fragment with no relevant changes still signals commit
- Flush-before-commit does not advance flush boundary beyond last
  committed offset

These tests ensure the commit-gated storage semantics work correctly
in the fragment-direct streaming mode.
…eaming

Update the expected log message from 'Txn received in Shapes.Consumer'
to 'Completed fragment-direct transaction' since the consumer now uses
fragment-direct streaming which emits different log messages.
It doesn't matter whether the xid of the pending txn and the newly
arrived one is. Seeing a Begin before a Commit is an error regardless.
- Test mid-fragment without prior begin creates pending_txn on-the-fly
- Test commit-only fragment with no prior fragments processes correctly
- Test uncommitted fragments flushed to disk do not advance latest_offset

These tests verify recovery scenarios and the core crash-safety invariant
that fetch_latest_offset only returns committed transaction offsets.
…assert_receive

Replace assert_storage_append and refute_storage_append helper macros with
direct assert_receive/refute_receive calls using :append_to_log!.

The tests in the 'event handling' describe block use :append_to_log! because
the Consumer does not use fragment-direct streaming during the initial
filtering phase (filtering? flag is true until a transaction with xid > xmax
is processed).
Multiple calls to Storage.for_stack(...) inside PureFileStorage weren't
prepared to deal with a possible return value of {TestStorage, ....}.

This change provides a more direct function for fetching the wrapped
storage options inside that storage's implementation module.
@alco alco force-pushed the alco/consumer-write-txn-fragment-to-storage branch from e6b7397 to 9a65547 Compare January 22, 2026 13:48
@netlify
Copy link

netlify bot commented Jan 22, 2026

Deploy Preview for electric-next ready!

Name Link
🔨 Latest commit 9a65547
🔍 Latest deploy log https://app.netlify.com/projects/electric-next/deploys/69722ab88cf319000874df2f
😎 Deploy Preview https://deploy-preview-3740--electric-next.netlify.app
📱 Preview on mobile
Toggle QR Code...

QR Code

Use your smartphone camera to open QR code link.

To edit notification comments on pull requests, go to your Netlify project configuration.

@blacksmith-sh
Copy link
Contributor

blacksmith-sh bot commented Jan 22, 2026

Found 92 test failures on Blacksmith runners:

Failures

Test View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test resume preserves move-out state move-out after resume generates synthetic delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test resume preserves move-out state move-out after resume generates synthetic delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables deleting parent row triggers move-out
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables deleting parent row triggers move-out
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-in after row becomes visible through different parent
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-in after row becomes visible through different parent
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-out generates synthetic deletes for all affected child rows
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables move-out generates synthetic deletes for all affected child rows
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables receives move-out control message when parent is deactivated
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/test subquery move-out with parent/
child tables receives move-out control message when parent is deactivated
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates deactivating old parent after child changed parents sh
ould not generate delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates deactivating old parent after child changed parents sh
ould not generate delete
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates update that changes parent reference updates tags
View Logs
Elixir.Electric.Integration.SubqueryMoveOutTest/
test tag handling during updates update that changes parent reference updates tags
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-in from the inner shape causes a query and new entries in th
e outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-in from the inner shape causes a query and new entries in th
e outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-out from the inner shape is propagated to the outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries a move-out from the inner shape is propagated to the outer shape
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows 3 level subquery in where clauses
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows 3 level subquery in where clauses
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clause
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clause
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clauses that reference non-PK columns
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries allows subquery in where clauses that reference non-PK columns
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-in into move-out into move-in of the same parent results in a
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-in into move-out into move-in of the same parent results in a
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-outs while processing move-ins are handled correctly
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries move-outs while processing move-ins are handled correctly
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries multiple actions result in a correct event sequence
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries multiple actions result in a correct event sequence
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries nested subquery combined with OR should return a 409 on move-in
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries nested subquery combined with OR should return a 409 on move-in
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries NOT IN subquery should return 409 on move-in to subquery
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries NOT IN subquery should return 409 on move-in to subquery
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries can reference composite PKs
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries can reference composite PKs
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries work with params
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subqueries work with params
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subquery combined with OR should return a 409 on move-in
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subquery combined with OR should return a 409 on move-in
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subquery combined with OR should return a 409 on move-out
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries subquery combined with OR should return a 409 on move-out
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries supports two subqueries at the same level but returns 409 on move-i
n
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries supports two subqueries at the same level but returns 409 on move-i
n
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries table with a composite PK can be used in a subquery
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes - subqueries table with a composite PK can be used in a subquery
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET after a compaction proceeds correctly
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET after a compaction proceeds correctly
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET receives only specified columns out of wide table
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET receives only specified columns out of wide table
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET returns correct INSERT and DELETE operations that have been converted from U
PDATEs
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET returns correct INSERT and DELETE operations that have been converted from U
PDATEs
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET returns updated schema in header after column nullability changes
View Logs
Elixir.Electric.Plug.RouterTest/test /v1/
shapes GET returns updated schema in header after column nullability changes
View Logs
Elixir.Electric.Shapes.ConsumerTest/test event handling appends to log when xid >= xmin View Logs
Elixir.Electric.Shapes.ConsumerTest/test event handling appends to log when xid >= xmin View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling correctly writes only relevant changes to multiple shape logs
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling correctly writes only relevant changes to multiple shape logs
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling doesn't append to log when change is irrelevant for active shapes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling doesn't append to log when change is irrelevant for active shapes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling notifies listeners of new changes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test event handling notifies listeners of new changes
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming commit-only fragment with no prior fragments processes c
orrectly
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming commit-only fragment with no prior fragments processes c
orrectly
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming commit-only fragment with no relevant changes still sign
als commit
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming commit-only fragment with no relevant changes still sign
als commit
View Logs
Elixir.Electric.Shapes.ConsumerTest/test fragment-direct streaming crash/
restart with partial fragments persisted recovers correctly
View Logs
Elixir.Electric.Shapes.ConsumerTest/test fragment-direct streaming crash/
restart with partial fragments persisted recovers correctly
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming fragment-direct mode disabled during initial filtering p
hase
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming fragment-direct mode disabled during initial filtering p
hase
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming interleaved begin fragments raise an error
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming interleaved begin fragments raise an error
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming mid-fragment without prior begin creates pending_txn on-
the-fly
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming mid-fragment without prior begin creates pending_txn on-
the-fly
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming skipped fragments during recovery still notify flush bou
ndary
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming skipped fragments during recovery still notify flush bou
ndary
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming uncommitted fragments flushed to disk do not advance lat
est_offset
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test fragment-direct streaming uncommitted fragments flushed to disk do not advance lat
est_offset
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage ConsumerRegistry.enable_suspend should susp
end hibernated consumers
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage ConsumerRegistry.enable_suspend should susp
end hibernated consumers
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should hibernate not suspend if has depende
ncies
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should hibernate not suspend if has depende
ncies
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should notify txns skipped because of xmin/
xip as flushed
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should notify txns skipped because of xmin/
xip as flushed
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should terminate after :hibernate_after ms
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage should terminate after :hibernate_after ms
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage transactions are buffered until snapshot xm
in is known
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage transactions are buffered until snapshot xm
in is known
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage UPDATE during pending move-in is converted
to INSERT and query result skips duplicate key
View Logs
Elixir.Electric.Shapes.ConsumerTest/
test transaction handling with real storage UPDATE during pending move-in is converted
to INSERT and query result skips duplicate key
View Logs
Elixir.Electric.Shapes.PartitionedTablesTest/
test new partition tables prompt reload of relation info
View Logs
Elixir.Electric.Shapes.PartitionedTablesTest/
test new partition tables prompt reload of relation info
View Logs

Fix in Cursor

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Avoid holding whole transaction in consumer memory

2 participants