[SPARK-56413] Add gRPC UDF execution protocol#55657
Conversation
sven-weber-db
left a comment
There was a problem hiding this comment.
Thank you for addressing the comments
|
@hvanhovell could you please help take another pass? |
PR Review: SPARK-56413 — gRPC UDF execution protocolSummaryThe PR adds
Overall this is a high-quality, well-documented wire contract. The doc comments are unusually thorough (lifecycle, ordering, "Required/Optional" tags, escape-hatch conventions, reserved ranges), and the engine/client split — typed engine-side fields vs. opaque Below are mostly questions / suggested clarifications; only a couple are blocking-ish. Strengths
Concerns / suggestions1. Cancel-vs-finish race contradicts wire-level "mutually exclusive" wording (medium)
…but the new Scala doc on If
As written, the Scala doc invites engine implementations that the proto's strict reading prohibits. 2.
|
| // wire protocol and ordering invariants. | ||
| // | ||
| // Error contract: if the gRPC connection breaks at any point, gRPC | ||
| // surfaces an error on the stream. The engine therefore never needs |
There was a problem hiding this comment.
I don't think this is true. The UDF server can hang when you give it a poorly written UDF. I am not sure that you should give the UDF server this responsibility in the first place; the client side should have timeouts to deal with this type of scenario.
There was a problem hiding this comment.
IMO, detecting hanging UDF code is out-of-scope of this protocol as in theory customer can intentionally define a long sleep UDF. But yes, we should be able to set a timeout for that - e.g., a max timeout processing one row/unit of data.
A client-side cancel mechanism here may not be sufficient to deal with that - assuming the grpc buffer is filled with input batches, and it causes a hang in the UDF worker, then the cancel message would never make it to the UDF worker.
However, we shall be able to handle it with a UDF worker side time-out - if processing one row of data is taking more than a threshold, we shall trigger a hanging user-code exception.
The comments here are not quite well written, i will polish that.
There was a problem hiding this comment.
basically, a specific worker impl. can easily add on top of this grpc proto to deal with hanging udfs in application level (introduce a timeout in worker code) - we don't have to complicate the udf protocol with extra logic to tell real hanging issue from false positives.
| // eventually arrive. This applies to every in-flight response, not | ||
| // only [[CancelResponse]]. | ||
| // | ||
| // Stream lifecycle: the engine MUST half-close the request stream |
There was a problem hiding this comment.
Isn't this how GRPC stream-stream works to begin with? In general I don't understand what this section is trying to accomplish.
There was a problem hiding this comment.
yeah, i agree this is a bit redundant, i can trim the comments.
|
|
||
| // Worker-scoped management RPC, independent of any per-execution | ||
| // stream. Used for heartbeat, capability query, and graceful | ||
| // shutdown. Kept unary so it does not depend on the lifecycle of an |
There was a problem hiding this comment.
I don't think I buy the comment on kept unary... I think it is simpler, and it does not really need it.
There was a problem hiding this comment.
fixing the comment.
| // MUST do so over the default connection of the worker specification. | ||
| // | ||
| // In future, additional connections (e.g. a separate channel) may be | ||
| // reserved by the worker spec for other purposes. |
There was a problem hiding this comment.
e.g., we may use a dedicated side-channel for streaming state store.
| // language) acts as the gRPC server. | ||
| // ===================================================================== | ||
|
|
||
| // The default UDF gRPC service. A worker that exposes this service |
There was a problem hiding this comment.
Should this make statements about the amount of concurrent execute calls are allowed, and that an implementation needs to make sure that we reserve some capacity for Manage calls?
There was a problem hiding this comment.
yeah, it's possible.
It's related to worker reuse and idle pooling - as we need to allocate sessions to workers for reuse - we will carefully add that part when we introduce the idle pooling in the dispatcher.
Btw, this should be a property in the worker specification as it's a worker property that is transparent to the protocol here.
| // is the canonical end-of-chunking signal; the worker MUST NOT send | ||
| // [[InitResponse]] before receiving it. | ||
| // | ||
| // When [[UdfPayload.payload_size]] is set on [[Init.udf]], receivers |
There was a problem hiding this comment.
should this include a checksum?
There was a problem hiding this comment.
For security or for integrity? How could this break?
| // of the stream. | ||
| // | ||
| // Backpressure: this protocol currently relies on gRPC's transport-level | ||
| // (HTTP/2) flow control for backpressure. Application-level backpressure |
There was a problem hiding this comment.
This is probably going to good enough provided that client -> server streaming backpressure works properly....
| message FinishResponse { | ||
| // Final metrics aggregated over the whole session (e.g. rows | ||
| // in/out, time per phase). Free-form; names are worker-defined. | ||
| map<string, string> metrics = 1; |
There was a problem hiding this comment.
I am very much in favor of metrics. I am just wondering if this is the right place to emit them. I guess folks might more realtime metrics for example. Perhaps put them in a separate message? Another question would be if metrics should be part of the UDF execution flow or the maintenance rpc?
There was a problem hiding this comment.
good idea.
- maintenance rpc can give global metrics as that's for the whole worker.
- finish response can give metrics that are accumulated through the udf execution time.
- we are open to more instant metrics along the way - we can introduce some periodic ack from worker to the engine telling the status of the udf execution, which can also be used to implement application-level back pressure if needed.
do we need that now? imo, this can be added as an optional message in future, without breaking any of existing assumptions, and not every udf worker needs to implement that.
|
|
||
| // (Optional) Language-specific error class name (e.g. "ValueError" | ||
| // in Python, "RuntimeException" in Java). | ||
| optional string error_class = 3; |
There was a problem hiding this comment.
Should we add NERF support here?
There was a problem hiding this comment.
NERF(Non-Error Return Flow)? What do you mean by support NERF here? could you please give an example?
| // to decode. The engine forwards this string unchanged. | ||
| string format = 2; | ||
|
|
||
| // (Optional) Total payload size in bytes. Useful when chunked |
There was a problem hiding this comment.
would that be expensive, and is it really necessary? I assume that if the payload is malformed, then it would fail anyway, what would be the extra benefits using checksum? Is it for security concern?
| } | ||
|
|
||
| // Liveness probe. The engine may send this periodically to detect a | ||
| // hung worker process. The worker SHOULD reply within a small bounded |
There was a problem hiding this comment.
As noted earlier this puts a somewhat interesting constraint on the worker implementation. They have to reserve a single thread to make sure these can get served. Even then things can get wonky if there are too many threads being used by the worker.
| } | ||
|
|
||
| // Acknowledgment for [[Heartbeat]]. | ||
| message HeartbeatResponse { |
There was a problem hiding this comment.
It'd be nice if we'd return some metrics here as well.
There was a problem hiding this comment.
sure, i can add that
| responseObserver.onCompleted() | ||
|
|
||
| case WorkerRequest.Manage.Shutdown(req) => | ||
| // FINDING 2: The proto says the worker SHOULD exit after all Execute |
There was a problem hiding this comment.
Should this be addressed or reworded?
There was a problem hiding this comment.
rephrased - this was some temporary comment that i forgot to cleanup.
| return | ||
| } | ||
| val inlinePayload = init.getUdf.getPayload | ||
| state = AwaitingData(inlinePayload) |
There was a problem hiding this comment.
Shouldn't you send a response in for non chunking mode here?
|
|
||
| private def handleInit(init: Init): Unit = state match { | ||
| case AwaitingInit => | ||
| // FINDING 4 (resolved): unsupported protocol_version is now surfaced |
There was a problem hiding this comment.
Not sure if I understand the FINDING comments.
There was a problem hiding this comment.
non-cleaned up comment, removing...
| } | ||
|
|
||
| case Chunking(existing) => | ||
| val updated = existing.concat(chunk.getData) |
There was a problem hiding this comment.
You seem to be discarding the payload. It is probably fine for testing, but it is a bit weird.
There was a problem hiding this comment.
yeah, this is a test-only dummy server as we would not like to do any serious serialization / deserialization.
|
|
||
| private def handleFinish(): Unit = state match { | ||
| case AwaitingData(_) => | ||
| // Generator-style UDF: engine sends Finish directly after Init. |
There was a problem hiding this comment.
Should not be the case here.
| // gRPC transport error from the engine side (connection dropped). | ||
| // Per the protocol: treat as equivalent to Cancel for cleanup purposes; | ||
| // do NOT attempt to send CancelResponse (stream is dead). | ||
| state = Done |
There was a problem hiding this comment.
Shouldn't you call handleCancel here?
This PR introduces the gRPC-based UDF execution protocol for the language-agnostic UDF worker framework (SPIP SPARK-55278).
udf/worker/proto/src/main/protobuf/udf_protocol.proto— defines the fullUdfWorkergRPC service:ExecuteRPC (bidirectional streaming): carries one UDF execution. The wire protocol is:Engine -> Worker: Init -> PayloadChunk* -> (DataRequest)* -> Finish (Cancel)?
| Cancel
Worker -> Engine: InitResponse -> (DataResponse)* -> (ExecutionError)? -> (FinishResponse | CancelResponse)
Key design decisions:
Init.protocol_version(field 1) is the first field on every stream for early version mismatch detection.Init.is_chunking_payloadexplicitly signals thatPayloadChunkmessages will follow, removing ambiguity about when the worker should sendInitResponse.DataRequestandDataResponseare independent streams: the worker may emitDataResponsebefore the firstDataRequest(generator-style UDFs), and the engine may pipelineDataRequestsbeforeInitResponsearrives.Cancel-after-Finish:
CancelMAY followFinishon the same stream, allowing the engine to abort processing of already-submitted data without waiting forFinishResponse. The worker sendsCancelResponseifCancelarrives beforeFinishResponseis sent, otherwiseFinishResponse; the engine must accept either.ExecutionErrorcarries three subtypes:UserError(UDF code exception),WorkerError(worker implementation error),ProtocolError(protocol violation). After sendingExecutionErrorthe worker waits for the engine'sFinish/Cancelbefore sending the terminator; the engine sendsCancelif the stream is still open, or waits forFinishResponseif already closed.Error contract: gRPC errors are reserved for transport failures only. All application-level errors go through
ExecutionError, keeping the stream lifecycle intact.Backpressure: current protocol relies on gRPC's HTTP/2 transport-level flow control; application-level backpressure is not yet defined and may be introduced in a future revision.
ManageRPC (unary): worker-scoped operations independent of anyExecutestream —Heartbeat(application-level liveness probe, distinct from gRPC keepalive), andShutdownRequest/ShutdownResponse(cancel_sessionsflagcontrols whether in-flight streams are aborted or drained;
sessions_settledon the response confirms the worker's state).WorkerSession.scala— updated lifecycle contract:cancel()is valid even after all data has been submitted (post-Finish); implementations must sendCancelifFinishResponsehas not yet been received and accept either terminator.close()(nowfinal) automatically sendsCancelwheninit()was called butprocess()was not, preventing the worker from seeing a bare gRPC transport error.doInit()contract: implementations must not open the Execute stream before this call, socancel()beforeinit()remains a transport-level no-op.doProcess()contract: on gRPC transport error the worker must not be returned to any reuse pool.EchoProtocolSuite.scala— a protocol validation test that implements a minimal echo worker (gRPC server) and engine client against the generated stubs:AwaitingInit → AwaitingData → (Chunking) → Data → Finishing → Done.ExecutionErrorwith engine-driven terminator,ProtocolErroron protocol violations, concurrentproducer/consumer interleaving, and
ManageRPC.README.md— updated wire protocol notation and added a backpressure note.Why are the changes needed?
The SPIP requires a language-agnostic, versioned, and extensible execution protocol between the Spark engine and UDF workers implemented in any language. This PR defines that protocol in protobuf/gRPC, with precise lifecycle semantics that
guide both the Spark engine implementation and external worker authors.
Does this PR introduce any user-facing change?
No. The protocol and its Scala abstractions are marked
@Experimental. No existing user-facing APIs are modified.How was this patch tested?
EchoProtocolSuiteimplements a self-contained gRPC server (echo worker) and client (engine) against the protocol, exercising normal paths, error paths, cancellation (including Cancel-after-Finish), chunked payload delivery, the concurrentproducer/consumer pattern, and the
ManageRPC. The suite serves as a live specification check: if the proto comments and the test disagree, the test fails.build/sbt 'udf-worker-core/testOnly *EchoProtocolSuite'
Was this patch authored or co-authored using generative AI tooling?
Yes