Skip to content

[SPARK-56661] Implementing UDFWorkerManager for new UDF worker sessions#55712

Open
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:sven-weber_data/SPARK-56661-udf-changes
Open

[SPARK-56661] Implementing UDFWorkerManager for new UDF worker sessions#55712
sven-weber-db wants to merge 1 commit intoapache:masterfrom
sven-weber-db:sven-weber_data/SPARK-56661-udf-changes

Conversation

@sven-weber-db
Copy link
Copy Markdown
Contributor

@sven-weber-db sven-weber-db commented May 6, 2026

What changes were proposed in this pull request?

This PR implements a UDFWorkerManager class in the new /udf package that was initiated by SPIP SPARK-55278. The purpose of the new Manager class is to provide a single entry-point for Spark with which a UDF session to a external UDF worker can be created, based on a WorkerSpecification instance. This manager and entry-point will be used by follow-up PRs to implement new, language agnostic Catalyst nodes.

Why are the changes needed?

The UDFWorkerManager serves two main purposes:

  1. Provide a single, unified entry-point to Spark for UDF worker/session creation
  2. Implement the management of UDF WorkerDispachter classes - depending on the UDFWorkerSpecification they are created for. This is required because the newly proposed UDF framework from SPIP SPARK-55278, enables clients to specify different UDF dispatchers for their UDFs. This implies:

2.1. Multiple, different dispatchers can exist at the same time
-> The right one needs to be selected to create a UDF session
2.2. Dispatcher lifetime needs to be managed
-> Dispatchers and their resources need to be cleaned-up if they are no longer needed by clients

Does this PR introduce any user-facing change?

No - All changes are marked as Experimental and not yet consumed.

How was this patch tested?

New unit-tests where added for the changes in the UDFWorkerManager and WorkerSession

Was this patch authored or co-authored using generative AI tooling?

Partially. However, the code was manually reviewed and adjusted.

@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661-udf-changes branch from d4409b8 to 83e1033 Compare May 6, 2026 14:12
@sven-weber-db sven-weber-db force-pushed the sven-weber_data/SPARK-56661-udf-changes branch from 83e1033 to 184fc44 Compare May 6, 2026 14:25
@sven-weber-db sven-weber-db changed the title [SPARK-56324] Implementing UDFWorkerManager for new UDF worker sessions [SPARK-56661] Implementing UDFWorkerManager for new UDF worker sessions May 6, 2026
@sven-weber-db sven-weber-db marked this pull request as ready for review May 6, 2026 14:43
import org.apache.spark.udf.worker.UDFWorkerSpecification
import org.apache.spark.udf.worker.core.{UDFWorkerManager, WorkerDispatcher, WorkerLogger}

class DirectUDFWorkerManager(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The manager may not care about if the backend of a dispatcher. I suggest:

  1. Call it DispatcherManager, as it is a central place to hold the dispatchers
  2. instead of creating the dispatcher here, register a created dispatcher from callsite, so as this manager does not have to care about the creation logic or the backend of the dispatcher.

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems to be a dead class not used anywhere, let's remove it

// https://github.com/apache/spark/pull/55657

@Experimental
private[direct] class SimpleWorkerConnection(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

let's wait for the other PR to land and we can avoid touching this part of logic in this PR.


// Must be called while holding `lock`.
private def handleSessionTermination(
workerSpec: UDFWorkerSpecification
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe we shall also pass the session object here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants