Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 49 additions & 0 deletions udf/worker/core/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,55 @@
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
</dependency>
<!--
gRPC Netty transport is the transport used by the direct gRPC dispatcher
to talk to spawned worker processes over a Unix domain socket.
-->
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-netty</artifactId>
<version>${io.grpc.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-codec-http2</artifactId>
<version>${netty.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-handler-proxy</artifactId>
<version>${netty.version}</version>
</dependency>
<!-- Required for DomainSocketAddress across Linux (epoll) and macOS (kqueue). -->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-unix-common</artifactId>
<version>${netty.version}</version>
</dependency>
<!--
OS-specific native transports for UDS. Versions are managed via the
classifier-bound entries in the parent pom's dependencyManagement.
-->
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-epoll</artifactId>
<classifier>linux-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-x86_64</classifier>
</dependency>
<dependency>
<groupId>io.netty</groupId>
<artifactId>netty-transport-native-kqueue</artifactId>
<classifier>osx-aarch_64</classifier>
</dependency>
<dependency>
<groupId>io.grpc</groupId>
<artifactId>grpc-inprocess</artifactId>
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,123 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker.core;

import java.util.Iterator;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.udf.worker.Init;

/**
* Pure protocol-side handle for one UDF execution.
*
* <p>A {@code ProtocolSession} knows only the UDF wire protocol -- how
* to send {@link Init}, stream input batches, request cancellation, and
* read result batches. It is completely unaware of:
* <ul>
* <li>how the worker process is created (direct spawn, daemon fork,
* remote service);</li>
* <li>how the worker is shared / pooled / torn down (ref counting,
* idle timeout, restart);</li>
* <li>how the engine identifies the worker (process id, registration
* token, etc).</li>
* </ul>
*
* <p>This is the Implementor side of the Bridge between two orthogonal
* axes of the worker framework:
* <ul>
* <li><b>Lifecycle (Abstraction)</b>: {@link WorkerDispatcher}
* subclasses such as
* {@code DirectWorkerDispatcher} (spawned local process) or a
* future {@code IndirectWorkerDispatcher} (workers obtained from
* a provisioning service). They own the {@link WorkerSession}
* returned to the engine and the bookkeeping that follows its
* lifecycle.</li>
* <li><b>Protocol (Implementor)</b>: {@link WorkerBackend}s such as a
* gRPC-over-UDS backend, a gRPC-over-TCP backend, or a custom
* raw-socket backend. They produce
* {@code ProtocolSession}s.</li>
* </ul>
*
* <p>Each dispatcher composes a backend by storing a reference to it
* and delegating connection / session creation. Adding a new lifecycle
* or a new protocol does NOT require touching the other axis.
*
* <p>Threading and ordering rules are protocol-specific and documented
* on the concrete implementation (e.g. {@code GrpcProtocolSession}).
*
* <p><b>Lifecycle guarantees expected from the wrapper:</b>
* <ul>
* <li>{@link #init} is invoked at most once before
* {@link #process}.</li>
* <li>{@link #process} is invoked at most once and only after
* {@link #init} succeeded.</li>
* <li>{@link #cancel} may be invoked from any thread at any
* point.</li>
* <li>{@link #close} is invoked exactly once, after {@link #process}
* returns or throws (or instead of {@link #process} when init
* failed).</li>
* </ul>
*
* <p>Implementations therefore do NOT need to re-guard the call order;
* {@link WorkerSession}'s lifecycle CAS already enforces it for the
* wrapping session.
*/
@Experimental
public interface ProtocolSession extends AutoCloseable {

/**
* Sends the {@link Init} handshake to the worker and waits for its
* acknowledgement.
*/
void init(Init message);

/**
* Streams {@code input} batches to the worker and returns an
* iterator of result batches.
*/
Iterator<byte[]> process(Iterator<byte[]> input);

/**
* Best-effort cooperative cancellation. Safe at any lifecycle point
* and any thread.
*/
void cancel();

/**
* Releases protocol-level resources (e.g. half-closes the request
* stream).
*/
@Override
void close();

/**
* Whether the underlying worker is still in a state safe to reuse
* after this session ends. A return of {@code false} signals that
* the worker MUST NOT be returned to any reuse pool -- typically
* because the wire protocol observed a transport failure, an
* unexpected disconnect, or any other error that leaves the worker
* state ambiguous.
*
* <p>The default is {@code true}: clean end-of-session.
* Implementations override when they can detect unhealthy
* termination. Dispatchers consult this in their session-release
* path to decide between terminate and pool-return.
*/
default boolean isWorkerSalvageable() {
return true;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker.core;

import java.io.File;

import org.apache.spark.annotation.Experimental;

/**
* A {@link WorkerConnection} over a Unix domain socket. Owns the socket
* path and removes the socket file on {@link #close}. Subclasses provide
* the protocol-specific channel (e.g. gRPC over UDS) and may override
* {@code close} to add transport-level shutdown -- they should call
* {@code super.close()} to ensure the socket file is removed.
*
* <p>{@code close} is idempotent: deleting an already-removed file is a
* no-op.
*/
@Experimental
public abstract class UnixSocketWorkerConnection extends WorkerConnection {

private final String socketPath;

protected UnixSocketWorkerConnection(String socketPath) {
this.socketPath = socketPath;
}

/** Path of the Unix domain socket the worker is bound to. */
public String socketPath() {
return socketPath;
}

@Override
public void close() {
File f = new File(socketPath);
if (f.exists()) {
// best-effort: deletion may race with the worker process which is
// typically responsible for unlinking its own socket on shutdown.
f.delete();
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,119 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.spark.udf.worker.core;

import org.apache.spark.annotation.Experimental;
import org.apache.spark.udf.worker.UDFWorkerSpecification;

/**
* Strategy for talking to a UDF worker over a particular wire protocol.
*
* <p>Together with {@link WorkerDispatcher}, {@code WorkerBackend} forms
* the Implementor side of a Bridge:
*
* <pre>{@code
* WorkerDispatcher (lifecycle: spawn / lookup / pool)
* o------> WorkerBackend (protocol: how to connect, how to talk)
* |
* +-- newConnection(address) --> WorkerConnection
* +-- newProtocolSession(conn) -> ProtocolSession
* }</pre>
*
* <p>A backend is responsible for:
* <ul>
* <li>Validating that the supplied {@link UDFWorkerSpecification} is
* compatible with this protocol (e.g. a gRPC backend rejects
* specs missing the required communication pattern).
* {@link #validateSpec}</li>
* <li>Opening a transport-level connection to a worker reachable at
* the given address (a UDS path, a host:port pair, etc.).
* {@link #newConnection}</li>
* <li>Opening per-session protocol handles on an established
* connection. {@link #newProtocolSession}</li>
* <li>Releasing any shared protocol-level state on shutdown.
* {@link #close}</li>
* </ul>
*
* <p>A backend is <b>stateless w.r.t. specific workers</b> -- the
* dispatcher passes addresses to the backend, never worker handles.
* This is what keeps the bridge symmetric: any dispatcher can compose
* with any backend.
*
* <p>The address format is a free-form {@code String} whose
* interpretation is agreed between the dispatcher and the backend,
* matching the {@code WorkerConnectionSpec} declared in the spec (a UDS
* path for {@code UnixDomainSocket}, a port number or {@code host:port}
* for {@code LocalTcpConnection}, etc.). It is the same value the
* dispatcher passes to the worker subprocess via {@code --connection}.
*/
@Experimental
public interface WorkerBackend extends AutoCloseable {

/** Short, human-readable backend name for logs/diagnostics (e.g. "grpc-uds"). */
String name();

/**
* Validates that this backend can serve the given worker
* specification.
*
* <p>Called once per dispatcher at construction time. Implementations
* should throw {@link IllegalArgumentException} on incompatible
* specs -- e.g. unsupported communication pattern, wrong transport
* choice on {@code WorkerConnectionSpec}, missing data format.
*
* <p>Dispatchers also do their own lifecycle-side validation (e.g.
* "spec must have {@code direct} set"); the two checks complement
* each other.
*/
void validateSpec(UDFWorkerSpecification spec);

/**
* Opens the transport-level {@link WorkerConnection} to a worker
* reachable at {@code address}. Called once per worker; the
* resulting connection is shared by all sessions on that worker.
*
* @param address transport-specific endpoint string (matches the
* value injected into the worker subprocess via
* {@code --connection}).
* @param logger logger for transport-level diagnostics.
*/
WorkerConnection newConnection(String address, WorkerLogger logger);

/**
* Opens one protocol session on an established
* {@link WorkerConnection}.
*
* <p>Each call returns a fresh, independent {@link ProtocolSession}.
* The returned session owns no lifecycle state of the underlying
* connection -- closing it must NOT close the connection. The
* dispatcher is responsible for connection teardown.
*
* @param connection the worker's transport connection (returned
* earlier from {@link #newConnection} for the
* same worker).
* @param logger logger for protocol-level diagnostics.
*/
ProtocolSession newProtocolSession(WorkerConnection connection, WorkerLogger logger);

/**
* Releases backend-shared protocol resources (event loops shared
* across connections, thread pools, ...). Most backends are
* stateless and the default no-op is fine.
*/
@Override
default void close() {}
}
Loading