-
Notifications
You must be signed in to change notification settings - Fork 304
feat(sync-service): Write transaction fragments directly to storage #3740
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Conversation
|
Important Review skippedDraft detected. Please check the settings in the CodeRabbit UI or the You can disable this status message by setting the Comment |
❌ 56 Tests Failed:
View the top 3 failed test(s) by shortest run time
View the full list of 14 ❄️ flaky test(s)
To view more test analytics, go to the Test Analytics Dashboard |
This comment has been minimized.
This comment has been minimized.
48b22be to
e6b7397
Compare
Add append_fragment_to_log!/2 and signal_txn_commit!/2 callbacks to Storage behaviour for streaming transaction fragments directly to storage without waiting for commit. - Storage.ex: Define new callbacks in behaviour - PureFileStorage: Implement callbacks (delegate to append_to_log! for now) - InMemoryStorage: Implement callbacks - CrashingFileStorage: Delegate to PureFileStorage - TestStorage: Add wrapper implementations for testing This lays groundwork for consumers without materializers to stream fragments directly to storage, reducing memory usage for large transactions.
Add state support for streaming transaction fragments directly to storage for shapes without subquery dependencies. - Create PendingTxn module to track in-progress transaction metadata - Add fragment_direct? flag to State (true when no shape_dependencies) - Add pending_txn field to track current incomplete transaction The fragment_direct? flag is automatically set based on whether the shape has subquery dependencies. Shapes without dependencies can stream directly to storage.
…t dependencies Add fragment-direct streaming mode that writes transaction fragments directly to storage without buffering the complete transaction in memory. This reduces memory pressure for shapes that don't have subquery dependencies. Key changes: - Add handle_fragment_direct/2 to process fragments directly to storage - Add can_use_fragment_direct?/1 guard that checks 4 conditions: 1. Shape has no subquery dependencies (fragment_direct? flag) 2. Not buffering for initial snapshot 3. No materializer subscribed (inner shapes need full txn handling) 4. Initial snapshot filtering is complete - Add helper functions for fragment processing: - skip_fragment?/2 - skip already processed fragments - maybe_start_pending_txn/2 - initialize PendingTxn on begin - write_fragment_to_storage/2 - write changes via append_fragment_to_log! - maybe_complete_pending_txn/2 - finalize on commit, notify clients - notify_clients_of_new_changes/2 - notify without materializer - Update tests with assert_storage_append/refute_storage_append macros to handle both append_to_log! and append_fragment_to_log! calls
- Add consider_flushed_fragment/2 to properly notify flush boundaries when a transaction has no relevant changes (empty transaction handling) - Handle nil pending_txn in write_fragment_to_storage/2 for recovery scenarios where a middle fragment arrives without the begin fragment - Handle nil pending_txn in maybe_complete_pending_txn/2 for commit-only fragments or other recovery edge cases
Add new test suite for fragment-direct streaming edge cases: - Multi-fragment transaction handling: Verifies that large transactions spanning multiple fragments are correctly accumulated and written - Empty transaction flush boundary: Tests that transactions with no relevant changes still notify the flush boundary - Truncate operation: Verifies truncate triggers shape removal - Skipped fragments during recovery: Tests idempotency when fragments are replayed - Shapes with subquery dependencies: Verifies inner shapes use TransactionBuilder (not fragment-direct mode) - Initial filtering phase: Tests that fragment-direct mode is disabled during initial snapshot filtering - Different table filtering: Verifies changes for other tables are filtered out - Mixed fragments: Tests fragments with both relevant and irrelevant changes
Fragment-direct streaming was bypassing Shape.convert_change which handles: 1. Filtering changes for the shape's root table 2. Applying WHERE clause filtering 3. Converting UPDATEs to INSERTs/DELETEs for move-in/move-out scenarios Also add maybe_mark_last_change/2 to set last?: true on the final change in commit fragments, which is needed for clients to know when a transaction is complete. This fixes the router test 'GET returns correct INSERT and DELETE operations that have been converted from UPDATEs' which was failing because UPDATEs that moved rows in/out of a shape's filter were not being converted.
Remove repeated in-function alias calls for PendingTxn and add a single alias at the top of the module with the other Consumer.* aliases. Also alphabetize the Consumer.* alias block.
Update append_fragment_to_log! and signal_txn_commit! in both PureFileStorage and InMemoryStorage to ensure that: - append_fragment_to_log! writes log lines but does NOT advance last_seen_txn_offset or update @latest_offset_key - signal_txn_commit! completes transaction tracking and updates the latest offset This ensures that on crash/recovery, fetch_latest_offset returns the last committed transaction offset, not a mid-transaction offset. This is critical for correct recovery behavior when using fragment- direct streaming mode.
Add explicit check in maybe_start_pending_txn/2 to raise an error when receiving a begin fragment for a new transaction while another transaction is still pending. This is a defensive measure to catch unexpected replication behavior early rather than silently corrupting state.
Add tests to verify: - Interleaved begin fragments raise an error - Crash/restart with partial fragments persisted recovers correctly - Commit-only fragment with no relevant changes still signals commit - Flush-before-commit does not advance flush boundary beyond last committed offset These tests ensure the commit-gated storage semantics work correctly in the fragment-direct streaming mode.
…eaming Update the expected log message from 'Txn received in Shapes.Consumer' to 'Completed fragment-direct transaction' since the consumer now uses fragment-direct streaming which emits different log messages.
It doesn't matter whether the xid of the pending txn and the newly arrived one is. Seeing a Begin before a Commit is an error regardless.
- Test mid-fragment without prior begin creates pending_txn on-the-fly - Test commit-only fragment with no prior fragments processes correctly - Test uncommitted fragments flushed to disk do not advance latest_offset These tests verify recovery scenarios and the core crash-safety invariant that fetch_latest_offset only returns committed transaction offsets.
…assert_receive Replace assert_storage_append and refute_storage_append helper macros with direct assert_receive/refute_receive calls using :append_to_log!. The tests in the 'event handling' describe block use :append_to_log! because the Consumer does not use fragment-direct streaming during the initial filtering phase (filtering? flag is true until a transaction with xid > xmax is processed).
Multiple calls to Storage.for_stack(...) inside PureFileStorage weren't
prepared to deal with a possible return value of {TestStorage, ....}.
This change provides a more direct function for fetching the wrapped
storage options inside that storage's implementation module.
e6b7397 to
9a65547
Compare
✅ Deploy Preview for electric-next ready!
To edit notification comments on pull requests, go to your Netlify project configuration. |
|
Found 92 test failures on Blacksmith runners: Failures
|
Summary
Implements fragment-direct streaming for shape consumers, allowing transaction fragments to be written directly to storage as they arrive instead of buffering the entire transaction in memory until commit.
Closes #3415
Motivation
Previously, the consumer would buffer all changes in a transaction in memory until a
Commitmessage arrived. For large transactions, this could cause significant memory pressure. This PR enables shapes without subquery dependencies to stream fragments directly to storage, reducing memory usage while maintaining crash-safety guarantees.Key Changes
append_fragment_to_log!/2fetch_latest_offsetonly returns committed transaction offsets, ensuring recovery correctness even if uncommitted fragment data is flushed to diskLimitations
Fragment-direct mode is only enabled when: