Skip to content

[runtime] Refactor ActionExecutionOperator into focused manager classes#546

Open
weiqingy wants to merge 6 commits intoapache:mainfrom
weiqingy:issue_545
Open

[runtime] Refactor ActionExecutionOperator into focused manager classes#546
weiqingy wants to merge 6 commits intoapache:mainfrom
weiqingy:issue_545

Conversation

@weiqingy
Copy link
Copy Markdown
Collaborator

@weiqingy weiqingy commented Feb 22, 2026

Linked issue: #545

Purpose of change

Decomposes ActionExecutionOperator (1,166 lines) into 5 package-private manager classes, each owning a single concern:

New Class Lines Responsibility
OperatorStateManager 225 Flink state handles (7 fields), sequence numbers, key-group filtering
DurableExecutionManager 285 ActionStateStore, persistence, recovery, checkpoint pruning (implements ActionStatePersister)
ActionTaskContextManager 215 Runner context creation, memory contexts, continuation executor, context transfer
EventRouter 175 Event wrapping/routing, notification (logger + listeners), watermark management
PythonBridgeManager 177 Python interpreter, action executor, resource adapters lifecycle

The operator shrinks from 1,166 to 562 lines (52% reduction) of pure coordination logic.

Key design decisions:

  • No manager-to-manager references — all cross-cutting dependencies flow through method parameters with the operator as mediator
  • sequenceNumberKState placed in OperatorStateManager (not DurableExecutionManager) because it's used in non-durable mode
  • ActionStatePersister interface moved from the operator to DurableExecutionManager
  • ResourceCache stays operator-owned — closed first before Python stack
  • Close order preserved: resourceCache → contextManager → pythonBridge → eventRouter → durableExecManager → super

Backward compatibility:

  • All new classes are package-private — no public API changes
  • Flink state descriptor names/types unchanged — savepoint compatible
  • ActionExecutionOperatorFactory constructor signature preserved

Tests

  • All 21 existing ActionExecutionOperatorTest tests pass as regression gates
  • ./tools/lint.sh -c — formatting compliance verified
  • ./tools/ut.sh -j — full Java test suite passes
  • ./tools/build.sh -j — full Java build passes

Test reflection accesses to actionStateStore (6 tests) and eventLogger (1 test) updated to use @VisibleForTesting getter chains instead of reflection on private fields.

No new tests added. This is a pure structural refactoring — every code path flows through ActionExecutionOperator which the existing integration tests exercise via KeyedOneInputStreamOperatorTestHarness. The managers are package-private and not independently consumable APIs.

API

No public API changes. All new classes are package-private.

Documentation

  • doc-needed
  • doc-not-needed
  • doc-included

@github-actions github-actions bot added priority/major Default priority of the PR or issue. fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. doc-not-needed Your PR changes do not impact docs labels Feb 22, 2026
@xintongsong xintongsong requested a review from Sxnan February 24, 2026 09:32
weiqingy added 3 commits April 5, 2026 22:31
Move 7 Flink state fields, 2 constants, and state management methods
into a package-private OperatorStateManager class. The operator
delegates all state access through the manager.

Moved fields: actionTasksKState, pendingInputEventsKState,
currentProcessingKeysOpState, sequenceNumberKState, sensoryMemState,
shortTermMemState, jobIdentifier.

Part of apache#545.
Move action state persistence, recovery markers, checkpoint maps, and
durable/continuation context maps into a package-private
DurableExecutionManager class. The manager implements ActionStatePersister
(moved from the operator).

Moved fields: actionStateStore, recoveryMarkerOpState,
checkpointIdToSeqNums, actionTaskDurableContexts, continuationContexts,
pythonAwaitableRefs.

Test reflection accesses to actionStateStore updated to use
@VisibleForTesting getter chain.

Part of apache#545.
…idgeManager

Extract the remaining 3 manager classes from ActionExecutionOperator:

- ActionTaskContextManager: runner context creation, memory contexts,
  continuation executor
- EventRouter: event wrapping/routing, notification, watermarks, logging
- PythonBridgeManager: Python env, interpreter, executor, resource adapters

Test reflection for eventLogger updated to use @VisibleForTesting getter.

Part of apache#545.
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-not-needed Your PR changes do not impact docs labels Apr 6, 2026
Move all 7 state fields, 2 constants, and state methods from
ActionExecutionOperator into OperatorStateManager. All state access
now goes through stateManager delegation calls.

Add transferContexts() to ActionTaskContextManager to encapsulate
the context transfer logic for unfinished async action tasks.

Operator shrinks from 700 to 562 lines.

Part of apache#545.
@github-actions github-actions bot added doc-not-needed Your PR changes do not impact docs and removed doc-not-needed Your PR changes do not impact docs labels Apr 6, 2026
@weiqingy
Copy link
Copy Markdown
Collaborator Author

weiqingy commented Apr 6, 2026

Hey @Sxnan Could you please take a look at this PR when you have a chance? Thank you!

OperatorStateManager was created in both initializeState() and open(),
causing the instance with jobIdentifier (set in initializeState) to be
overwritten by a fresh instance in open(). This passed null jobIdentifier
to PythonBridgeManager, causing VectorStoreLongTermMemory validation
failure and visit_count memory scoping issues in e2e tests.

Fix: create stateManager only in initializeState() (which runs first
in Flink lifecycle), then initialize state descriptors in open().

Part of apache#545.
Copy link
Copy Markdown
Collaborator

@wenjin272 wenjin272 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hi, @weiqingy, LGTM, only one comment. Please take a look at your convenience @xintongsong.

actionTask.setRunnerContext(context);
}

Resource getResource(String name, ResourceType type, ResourceCache resourceCache) {
Copy link
Copy Markdown
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like this method is never invoked.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch — removed in 300a0fa. The method was dead code (callers go through RunnerContextImpl.getResource or ResourceCache directly), so I deleted it along with the now-unused Resource/ResourceType imports.

Addresses review feedback on PR apache#546: the getResource method was never
invoked. Callers go through RunnerContextImpl.getResource or directly
through ResourceCache. Also removes the now-unused Resource/ResourceType
imports.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

doc-not-needed Your PR changes do not impact docs fixVersion/0.3.0 The feature or bug should be implemented/fixed in the 0.3.0 version. priority/major Default priority of the PR or issue.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants