Skip to content

Conversation

@eemario
Copy link
Contributor

@eemario eemario commented Dec 23, 2025

What is the purpose of the change

This pull request introduces SingleJobApplication that is used in session mode to run a single job in an application.

Brief change log

  • Add the SingleJobApplication implementation
  • Refactor JarRunHandler and JobSubmitHandler to use SingleJobApplication instead of submitJob directly

Verifying this change

This change added tests and can be verified as follows:

  • Added tests that validate the core interfaces of SingleJobApplication
  • Added tests that validate the Dispatcher's application management behavior
  • Updated integration tests that validate the new behavior of JarRunHandler and JobSubmitHandler
  • Manually verified the change by running a standalone cluster in session mode

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @Public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: (no)
  • The S3 file system connector: (no)

Documentation

  • Does this pull request introduce a new feature? (no)
  • If yes, how is the feature documented? (not applicable)

@flinkbot
Copy link
Collaborator

flinkbot commented Dec 23, 2025

CI report:

Bot commands The @flinkbot bot supports the following commands:
  • @flinkbot run azure re-run the last Azure build

@eemario eemario marked this pull request as ready for review December 23, 2025 02:12
@eemario eemario force-pushed the FLIP549-5 branch 2 times, most recently from f9f006f to 933fcc1 Compare December 24, 2025 03:05
.thenApply(ack -> streamGraph.getJobID());
}

protected CompletableFuture<Acknowledge> internalSubmit(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be package private


private void runRecoveredJob(final ExecutionPlan recoveredJob) {
private void runRecoveredJob(
final ExecutionPlan recoveredJob, final boolean wrapIntoApplication) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could you add some comments to explain the behavior around with regard to wrapIntoApplication?

executionPlan.getJobID()));
}

private Optional<AbstractApplication> getApplicationForJob(ExecutionPlan executionPlan) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I guess this method should be used in internalSubmitJob?

instanceof
FlinkJobNotFoundException) {
log.warn(
"Ignore job {} which may be expired when requesting application {}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's weird that a job is expired in the history while its hosting application is not expired.

JobManagerJobMetricGroupFactory jobManagerJobMetricGroupFactory,
FatalErrorHandler fatalErrorHandler,
Collection<FailureEnricher> failureEnrichers,
JobStatusListener singleJobApplication,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

singleJobApplication -> jobStatusListener

The handler should simple treat it as a JobStatusListener instead of a SingleJobApplication.

executionPlan.getApplicationId().map(applications::get);
if (optionalApplication.isPresent()) {
AbstractApplication application = optionalApplication.get();
if (application instanceof JobStatusListener) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think the clearer logic should be: if an application is a SingleJobApplication, it needs be passed to the jobManager as a JobStatusListener.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants