[runtime] Refactor ActionExecutionOperator into focused manager classes#546
Open
weiqingy wants to merge 6 commits intoapache:mainfrom
Open
[runtime] Refactor ActionExecutionOperator into focused manager classes#546weiqingy wants to merge 6 commits intoapache:mainfrom
weiqingy wants to merge 6 commits intoapache:mainfrom
Conversation
Move 7 Flink state fields, 2 constants, and state management methods into a package-private OperatorStateManager class. The operator delegates all state access through the manager. Moved fields: actionTasksKState, pendingInputEventsKState, currentProcessingKeysOpState, sequenceNumberKState, sensoryMemState, shortTermMemState, jobIdentifier. Part of apache#545.
Move action state persistence, recovery markers, checkpoint maps, and durable/continuation context maps into a package-private DurableExecutionManager class. The manager implements ActionStatePersister (moved from the operator). Moved fields: actionStateStore, recoveryMarkerOpState, checkpointIdToSeqNums, actionTaskDurableContexts, continuationContexts, pythonAwaitableRefs. Test reflection accesses to actionStateStore updated to use @VisibleForTesting getter chain. Part of apache#545.
…idgeManager Extract the remaining 3 manager classes from ActionExecutionOperator: - ActionTaskContextManager: runner context creation, memory contexts, continuation executor - EventRouter: event wrapping/routing, notification, watermarks, logging - PythonBridgeManager: Python env, interpreter, executor, resource adapters Test reflection for eventLogger updated to use @VisibleForTesting getter. Part of apache#545.
Move all 7 state fields, 2 constants, and state methods from ActionExecutionOperator into OperatorStateManager. All state access now goes through stateManager delegation calls. Add transferContexts() to ActionTaskContextManager to encapsulate the context transfer logic for unfinished async action tasks. Operator shrinks from 700 to 562 lines. Part of apache#545.
Collaborator
Author
|
Hey @Sxnan Could you please take a look at this PR when you have a chance? Thank you! |
OperatorStateManager was created in both initializeState() and open(), causing the instance with jobIdentifier (set in initializeState) to be overwritten by a fresh instance in open(). This passed null jobIdentifier to PythonBridgeManager, causing VectorStoreLongTermMemory validation failure and visit_count memory scoping issues in e2e tests. Fix: create stateManager only in initializeState() (which runs first in Flink lifecycle), then initialize state descriptors in open(). Part of apache#545.
wenjin272
reviewed
Apr 17, 2026
Collaborator
There was a problem hiding this comment.
Hi, @weiqingy, LGTM, only one comment. Please take a look at your convenience @xintongsong.
| actionTask.setRunnerContext(context); | ||
| } | ||
|
|
||
| Resource getResource(String name, ResourceType type, ResourceCache resourceCache) { |
Collaborator
There was a problem hiding this comment.
Looks like this method is never invoked.
Collaborator
Author
There was a problem hiding this comment.
Good catch — removed in 300a0fa. The method was dead code (callers go through RunnerContextImpl.getResource or ResourceCache directly), so I deleted it along with the now-unused Resource/ResourceType imports.
Addresses review feedback on PR apache#546: the getResource method was never invoked. Callers go through RunnerContextImpl.getResource or directly through ResourceCache. Also removes the now-unused Resource/ResourceType imports.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Linked issue: #545
Purpose of change
Decomposes
ActionExecutionOperator(1,166 lines) into 5 package-private manager classes, each owning a single concern:OperatorStateManagerDurableExecutionManagerActionStatePersister)ActionTaskContextManagerEventRouterPythonBridgeManagerThe operator shrinks from 1,166 to 562 lines (52% reduction) of pure coordination logic.
Key design decisions:
sequenceNumberKStateplaced in OperatorStateManager (not DurableExecutionManager) because it's used in non-durable modeActionStatePersisterinterface moved from the operator toDurableExecutionManagerResourceCachestays operator-owned — closed first before Python stackBackward compatibility:
ActionExecutionOperatorFactoryconstructor signature preservedTests
ActionExecutionOperatorTesttests pass as regression gates./tools/lint.sh -c— formatting compliance verified./tools/ut.sh -j— full Java test suite passes./tools/build.sh -j— full Java build passesTest reflection accesses to
actionStateStore(6 tests) andeventLogger(1 test) updated to use@VisibleForTestinggetter chains instead of reflection on private fields.No new tests added. This is a pure structural refactoring — every code path flows through
ActionExecutionOperatorwhich the existing integration tests exercise viaKeyedOneInputStreamOperatorTestHarness. The managers are package-private and not independently consumable APIs.API
No public API changes. All new classes are package-private.
Documentation
doc-neededdoc-not-neededdoc-included