Skip to content

[FLINK-39226][python] Fix embedded PyIterator class cast after recovery#27849

Open
bowenli86 wants to merge 1 commit intoapache:masterfrom
bowenli86:flink-39226-pyiterator-classcast-fix
Open

[FLINK-39226][python] Fix embedded PyIterator class cast after recovery#27849
bowenli86 wants to merge 1 commit intoapache:masterfrom
bowenli86:flink-39226-pyiterator-classcast-fix

Conversation

@bowenli86
Copy link
Copy Markdown
Member

@bowenli86 bowenli86 commented Mar 30, 2026

What is the purpose of the change

This pull request fixes FLINK-39226 by avoiding direct casts to pemja.core.object.PyIterator in embedded Python operators. After recovery, the returned iterator object can be loaded by a different user-code classloader, so hard casts can fail with ClassCastException. The change introduces a reflective wrapper that can consume iterator-like results without depending on the local PemJa class identity.

Brief change log

  • Add EmbeddedPythonIterator as a reflective adapter for embedded Python result iterators
  • Replace direct PyIterator casts in embedded one-input, two-input, keyed timer, window, and table operators
  • Add tests that reproduce the cross-classloader cast failure and verify the new iterator wrapper

Verifying this change

Please make sure both new and modified tests in this PR follow the conventions for tests defined in our code quality guide.

This change added tests and can be verified as follows:

  • Added EmbeddedPythonIteratorTest to cover foreign-classloader iterators and direct-cast failure reproduction
  • Ran ./mvnw -pl flink-python -am -Dfast -DskipITs -Dtest=EmbeddedPythonIteratorTest -Dsurefire.failIfNoSpecifiedTests=false test

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): no
  • The public API, i.e., is any changed class annotated with @Public(Evolving): no
  • The serializers: no
  • The runtime per-record code paths (performance sensitive): yes
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes
  • The S3 file system connector: no

Documentation

  • Does this pull request introduce a new feature? no
  • If yes, how is the feature documented? not applicable

@flinkbot
Copy link
Copy Markdown
Collaborator

flinkbot commented Mar 30, 2026

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@bowenli86 bowenli86 force-pushed the flink-39226-pyiterator-classcast-fix branch 3 times, most recently from f666478 to 393a6a8 Compare March 30, 2026 02:09
@bowenli86 bowenli86 force-pushed the flink-39226-pyiterator-classcast-fix branch from 393a6a8 to f96b414 Compare March 30, 2026 02:09
package org.apache.flink.streaming.api.operators.python.embedded;

/** Test-only iterator that can be loaded through an isolated classloader. */
public class ForeignClassLoaderIterator {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Added iterator implementation for testing, enabling cross-classloader behavior validation.

@github-actions github-actions bot added the community-reviewed PR has been reviewed by the community. label Mar 30, 2026
@bowenli86 bowenli86 requested a review from dianfu March 30, 2026 15:25
@bowenli86
Copy link
Copy Markdown
Member Author

bowenli86 commented Mar 30, 2026

@featzhang thanks for reviewing
@dianfu can you help review as well?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

community-reviewed PR has been reviewed by the community.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants