feat: add state materialization across regions#4490
feat: add state materialization across regions#4490aglinxinyuan wants to merge 21 commits intomainfrom
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## main #4490 +/- ##
============================================
+ Coverage 42.68% 42.70% +0.01%
- Complexity 2183 2185 +2
============================================
Files 1031 1005 -26
Lines 38110 37506 -604
Branches 4004 3921 -83
============================================
- Hits 16269 16017 -252
+ Misses 20824 20517 -307
+ Partials 1017 972 -45
*This pull request uses carry forward flags. Click here to find out more. ☔ View full report in Codecov by Sentry. 🚀 New features to boost your workflow:
|
Adds 9 unit tests targeting the codecov-flagged gaps in PR #4490: - InputPortMaterializationReaderRunnable.run() inner state-reading try-block, including the missing-state-document path (ValueError swallow). - DocumentFactory.create_document / open_document namespace routing for STATE and RESULT, plus the unsupported-resource-type and missing-table error paths. Iceberg dependencies are mocked at the document_factory import site so the tests run without Postgres. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
This PR introduces state materialization as a first-class storage artifact alongside result materialization so that operator state can be persisted and later replayed when execution crosses region boundaries, using a shared serialization format across Scala/Java and Python.
Changes:
- Add a new VFS resource type (
state) and route it to a dedicated Iceberg namespace/config on both Scala and Python sides. - Persist state rows (serialized) into a separate Iceberg table per operator output, and read them back during input-port materialization.
- Add round-trip tests for state materialization in both Scala and Python.
Reviewed changes
Copilot reviewed 24 out of 24 changed files in this pull request and generated 8 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-core/src/test/scala/org/apache/texera/amber/storage/result/iceberg/IcebergDocumentSpec.scala | Adds Scala tests for round-tripping materialized state rows via Iceberg. |
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/VFSURIFactory.scala | Introduces STATE as a new VFS resource type. |
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/storage/DocumentFactory.scala | Routes STATE resource type to a configurable Iceberg namespace (create/open). |
| common/workflow-core/src/main/scala/org/apache/texera/amber/core/state/State.scala | Adds helper to derive a state URI from a result URI. |
| common/config/src/main/scala/org/apache/texera/amber/config/StorageConfig.scala | Adds icebergTableStateNamespace configuration and env var constant. |
| common/config/src/main/scala/org/apache/texera/amber/config/EnvironmentalVariable.scala | Adds env var constant for state namespace. |
| common/config/src/main/resources/storage.conf | Adds storage.iceberg.table.state-namespace default + env override. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/managers/InputPortMaterializationReaderThread.scala | Reads materialized state table and emits StateFrames to the input queue. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessor.scala | Attempts to persist state when processing state messages. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/scheduling/RegionExecutionCoordinator.scala | Creates the state table alongside the result table during scheduling/setup. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/pythonworker/PythonWorkflowWorker.scala | Passes state namespace into the Python worker process args. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/messaginglayer/OutputManager.scala | Adds state persistence to storage for output ports with materialization enabled. |
| amber/src/main/python/texera_run_python_worker.py | Extends CLI arg parsing to accept state namespace and initialize StorageConfig accordingly. |
| amber/src/main/python/pytexera/storage/test_large_binary_manager.py | Updates test StorageConfig initialization to include the state namespace. |
| amber/src/main/python/core/storage/vfs_uri_factory.py | Adds STATE as a Python VFS resource type. |
| amber/src/main/python/core/storage/test_document_factory.py | Adds Python unit tests verifying namespace routing for RESULT vs STATE. |
| amber/src/main/python/core/storage/storage_config.py | Adds ICEBERG_TABLE_STATE_NAMESPACE and wires it into initialization. |
| amber/src/main/python/core/storage/runnables/test_input_port_materialization_reader_runnable.py | Adds Python tests for emitting state frames and state-table read behavior. |
| amber/src/main/python/core/storage/runnables/input_port_materialization_reader_runnable.py | Reads and emits state materialization rows before tuple materialization. |
| amber/src/main/python/core/storage/iceberg/test_iceberg_document.py | Adds Python integration-ish tests for round-tripping materialized state rows. |
| amber/src/main/python/core/storage/document_factory.py | Routes STATE to the state namespace for create/open. |
| amber/src/main/python/core/models/state.py | Adds helper to derive a state URI from a result URI. |
| amber/src/main/python/core/architecture/packaging/test_output_manager.py | Adds unit tests for Python OutputManager state persistence behavior. |
| amber/src/main/python/core/architecture/packaging/output_manager.py | Adds state persistence API and records storage URIs for later state writes. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Adds a state-materialization path alongside the existing tuple-result storage. State produced by an operator's processState is written to a companion Iceberg table whose URI is derived from the result URI. The input-port materialization reader replays both tuples and states into downstream workers. Key pieces: - New STATE resource type and a state-namespace storage config entry on both Python and Scala sides; namespaces are read from StorageConfig instead of hardcoded strings. - RegionExecutionCoordinator provisions a state document next to every result document at scheduling time, so readers and writers can rely on its presence without try/catch. - One long-lived BufferedItemWriter per output port, opened at port setup and closed at port completion, so a single Iceberg snapshot is produced per port instead of one per state. - DataProcessor.processInputState (Scala) and MainLoop.process_input_state (Python) persist the executor's *output* state, matching the state that is also emitted downstream. - New Python and Scala unit tests covering the State JSON wire format, the OutputManager state-writer lifecycle, the reader's state-replay block, and DocumentFactory namespace routing. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
10f243f to
581d574
Compare
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 26 out of 26 changed files in this pull request and generated 1 comment.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
There was a problem hiding this comment.
Pull request overview
Copilot reviewed 25 out of 25 changed files in this pull request and generated 2 comments.
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
Xiao-zhen-Liu
left a comment
There was a problem hiding this comment.
Left some comments.
Address PR #4490 review comment 3192875005: document why the input-port materialization reader replays states before tuples (downstream operators typically need their state in place before processing the incoming tuples). Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #4490 review comment 3192889029: explain why the state loop intentionally enqueues every row to every downstream worker while the tuple loop filters by partitioner -- state is shared context, not per-key data. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Address PR #4490 review comment 3192916602: writing the same state row to every output port's state table mirrors the broadcast-to-all-workers behavior on the emit side -- state is shared context, not per-key data, so every downstream operator needs the full set. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
…eader Address PR #4490 review comment 3192901027: the partitioner detour in the Python reader was a no-op (every worker is supposed to see every state, so the broadcast-then-filter round-trip just reduced back to the input). Emit StateFrame(State.from_tuple(row)) directly in run(), matching the Scala reader. Test class TestEmitStateWithFilter is dropped; the run() block test asserts partitioner.flush_state is not called. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Drop the result-as-primary asymmetry in VFSURIFactory: ports now build a single base URI, with result and state URIs as equal first-class derivatives via resultURI(base) / stateURI(base). Removes the substring-replace `siblingStateURI` helper and the asymmetric createResultURI / createStateURI pair.
|
All comments have been addressed. @Xiao-zhen-Liu, please review again. |
What changes were proposed in this PR?
This PR adds state materialization as a general mechanism for passing state across different regions.
Any related issues, documentation, discussions?
Closes #4489
How was this PR tested?
Was this PR authored or co-authored using generative AI tooling?
Generated-by: ChatGPT (Codex), Claude Code (Claude Opus 4.7)