feat(amber): Re-enable operator reconfiguration in Amber#4220
feat(amber): Re-enable operator reconfiguration in Amber#4220shengquan-ni wants to merge 18 commits intomainfrom
Conversation
|
Thanks, @shengquan-ni. Can we get rid of the restriction of "not including source operators"? This restriction could suggest some part of our design is not elegant. We can discuss the complexity of removing this restriction. If needed, we can schedule a call to discuss. I am adding @zuozhiw so that he can review this PR as well. |
Earlier, I mentioned that the MCS could not include source operators due to certain ECM constraints. However, I have since identified a simple solution to address that issue, so this restriction has now been removed. That said, reconfiguring source operators themselves is still not supported. I’ve updated the PR description to explain the reasoning behind this decision. We can further discuss this if needed. |
|
Thank you, @shengquan-ni . @Yicong-Huang @zuozhiw and @aglinxinyuan : can you chime in? |
There was a problem hiding this comment.
Pull request overview
Re-enables Amber’s operator reconfiguration (backend) by adding new RPCs and a Fries-based reconfiguration planner, plus e2e coverage to validate reconfiguration behavior across Java and Python operators while disallowing source-operator modification.
Changes:
- Add controller/worker RPC support for
ReconfigureWorkflowandUpdateExecutor, including Fries MCS component computation. - Implement controller-side orchestration of reconfiguration (direct update for single-op scope; ECM alignment for multi-op scope).
- Add e2e tests covering Python UDF reconfiguration, Java operator reconfiguration, and source-operator constraints; update CI to install Python deps for e2e.
Reviewed changes
Copilot reviewed 17 out of 17 changed files in this pull request and generated 5 comments.
Show a summary per file
| File | Description |
|---|---|
| common/workflow-operator/src/main/scala/org/apache/texera/amber/operator/TestOperators.scala | Adds Python source-UDF test operator and adjusts Python UDF settings used by tests. |
| amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala | New e2e suite exercising workflow reconfiguration scenarios. |
| amber/src/main/scala/org/apache/texera/amber/engine/common/FriesReconfigurationAlgorithm.scala | Refactors Fries logic to output reconfiguration components for new request types. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala | New worker RPC handler to apply executor updates. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/promisehandlers/InitializeExecutorHandler.scala | Refactors initialization to reuse shared executor setup logic. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/worker/DataProcessorRPCHandlerInitializer.scala | Adds shared setupExecutor and wires in UpdateExecutorHandler. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala | New controller RPC handler implementing orchestration/alignment logic for reconfiguration. |
| amber/src/main/scala/org/apache/texera/amber/engine/architecture/controller/ControllerAsyncRPCHandlerInitializer.scala | Mixes in the new ReconfigurationHandler. |
| amber/src/main/python/proto/org/apache/texera/amber/engine/architecture/rpc/init.py | Regenerates Python RPC stubs/messages for new/updated services and requests. |
| amber/src/main/python/core/runnables/network_receiver.py | Adjusts is_control handling to stabilize hashing/queue behavior. |
| amber/src/main/python/core/runnables/main_loop.py | Extends control-draining loop to also handle ECM elements. |
| amber/src/main/python/core/architecture/rpc/async_rpc_handler_initializer.py | Wires in the Python UpdateExecutorHandler. |
| amber/src/main/python/core/architecture/handlers/control/update_executor_handler.py | Implements Python-side executor update on UpdateExecutorRequest. |
| amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/workerservice.proto | Adds UpdateExecutor RPC to WorkerService. |
| amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controllerservice.proto | Adds ReconfigureWorkflow RPC to ControllerService. |
| amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto | Replaces ModifyLogicRequest with WorkflowReconfigureRequest and updates related messages. |
| .github/workflows/github-action-build.yml | Installs Python dependencies in CI to support Python UDF e2e tests. |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
Outdated
Show resolved
Hide resolved
amber/src/test/scala/org/apache/texera/amber/engine/e2e/ModifyLogicSpec.scala
Show resolved
Hide resolved
...g/apache/texera/amber/engine/architecture/worker/promisehandlers/UpdateExecutorHandler.scala
Show resolved
Hide resolved
...che/texera/amber/engine/architecture/controller/promisehandlers/ReconfigurationHandler.scala
Outdated
Show resolved
Hide resolved
amber/src/main/protobuf/org/apache/texera/amber/engine/architecture/rpc/controlcommands.proto
Show resolved
Hide resolved
|
I asked Copilot to do a first round of review @shengquan-ni please see if comments are valid. |
…LogicSpec.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Yicong Huang <17627829+Yicong-Huang@users.noreply.github.com>
I am actually fine with excluding source operators. On the technical side it might be hard. Our implementation of source is a process function of one single special kick off tuple. This means the entire state of a source operator is associated with this single special tuple, which is hard to be broken and transferred. On the use case side, reconfigure a source operator is less useful: if you need to change the source (e.g., read another csv, fetch from another api/db), it's likely you need to rerun the entire workflow. |
…re/worker/promisehandlers/UpdateExecutorHandler.scala Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Signed-off-by: Shengquan Ni <13672781+shengquan-ni@users.noreply.github.com>
…che/texera into shengquan-add-reconfigration
What changes were proposed in this PR?
Per the discussion in #4016, we decided to bring the operator reconfiguration feature back to the Amber engine. This PR includes only the backend changes for this feature, but it is enabled on both the Java and Python sides.
Since the code for the Fries Algorithm is still in the codebase, this feature is relatively straightforward to implement and maintain going forward.
This PR allows source operators to be included in the reconfiguration scope (MCS), but it does not allow source operators themselves to be modified. First, under the current iterator-based interface, the state of a source operator is fully encapsulated within its iterator. Reading or manipulating the iterator state is already very difficult in both Scala and Python. Second, even if we could access the state, it would still be hard for users to clearly define the expected state transition semantics—e.g., whether to preserve the old state, reset it, or partially transfer it to the new operator.
Due to the reasons above, we disable reconfiguration of source operators for now. If clear use cases emerge in the future, we can revisit this design decision.
Any related issues, documentation, discussions?
See #4016.
How was this PR tested?
Introduced unit tests for this feature.
This PR also updates scala CI to install python dependencies as we are using Python UDFs in our e2e tests.
Was this PR authored or co-authored using generative AI tooling?
No