Skip to content

[SPARK-56413] Add gRPC UDF execution protocol#55657

Open
haiyangsun-db wants to merge 11 commits into
apache:masterfrom
haiyangsun-db:SPARK-56413
Open

[SPARK-56413] Add gRPC UDF execution protocol#55657
haiyangsun-db wants to merge 11 commits into
apache:masterfrom
haiyangsun-db:SPARK-56413

Conversation

@haiyangsun-db
Copy link
Copy Markdown
Contributor

@haiyangsun-db haiyangsun-db commented May 3, 2026

This PR introduces the gRPC-based UDF execution protocol for the language-agnostic UDF worker framework (SPIP SPARK-55278).

udf/worker/proto/src/main/protobuf/udf_protocol.proto — defines the full UdfWorker gRPC service:

  • Execute RPC (bidirectional streaming): carries one UDF execution. The wire protocol is:
    Engine -> Worker: Init -> PayloadChunk* -> (DataRequest)* -> Finish (Cancel)?
    | Cancel
    Worker -> Engine: InitResponse -> (DataResponse)* -> (ExecutionError)? -> (FinishResponse | CancelResponse)
    Key design decisions:

  • Init.protocol_version (field 1) is the first field on every stream for early version mismatch detection.

  • Init.is_chunking_payload explicitly signals that PayloadChunk messages will follow, removing ambiguity about when the worker should send InitResponse.

  • DataRequest and DataResponse are independent streams: the worker may emit DataResponse before the first DataRequest (generator-style UDFs), and the engine may pipeline DataRequests before InitResponse arrives.

  • Cancel-after-Finish: Cancel MAY follow Finish on the same stream, allowing the engine to abort processing of already-submitted data without waiting for FinishResponse. The worker sends CancelResponse if Cancel arrives before
    FinishResponse is sent, otherwise FinishResponse; the engine must accept either.

  • ExecutionError carries three subtypes: UserError (UDF code exception), WorkerError (worker implementation error), ProtocolError (protocol violation). After sending ExecutionError the worker waits for the engine's
    Finish/Cancel before sending the terminator; the engine sends Cancel if the stream is still open, or waits for FinishResponse if already closed.

  • Error contract: gRPC errors are reserved for transport failures only. All application-level errors go through ExecutionError, keeping the stream lifecycle intact.

  • Backpressure: current protocol relies on gRPC's HTTP/2 transport-level flow control; application-level backpressure is not yet defined and may be introduced in a future revision.

  • Manage RPC (unary): worker-scoped operations independent of any Execute stream — Heartbeat (application-level liveness probe, distinct from gRPC keepalive), and ShutdownRequest/ShutdownResponse (cancel_sessions flag
    controls whether in-flight streams are aborted or drained; sessions_settled on the response confirms the worker's state).

WorkerSession.scala — updated lifecycle contract:

  • cancel() is valid even after all data has been submitted (post-Finish); implementations must send Cancel if FinishResponse has not yet been received and accept either terminator.
  • close() (now final) automatically sends Cancel when init() was called but process() was not, preventing the worker from seeing a bare gRPC transport error.
  • doInit() contract: implementations must not open the Execute stream before this call, so cancel() before init() remains a transport-level no-op.
  • doProcess() contract: on gRPC transport error the worker must not be returned to any reuse pool.

EchoProtocolSuite.scala — a protocol validation test that implements a minimal echo worker (gRPC server) and engine client against the generated stubs:

  • Covers the full state machine: AwaitingInit → AwaitingData → (Chunking) → Data → Finishing → Done.
  • Exercises all protocol paths: normal echo, chunked payload, generator-style (zero DataRequests), mid-stream cancel, Cancel-after-Finish, ExecutionError with engine-driven terminator, ProtocolError on protocol violations, concurrent
    producer/consumer interleaving, and Manage RPC.

README.md — updated wire protocol notation and added a backpressure note.

Why are the changes needed?

The SPIP requires a language-agnostic, versioned, and extensible execution protocol between the Spark engine and UDF workers implemented in any language. This PR defines that protocol in protobuf/gRPC, with precise lifecycle semantics that
guide both the Spark engine implementation and external worker authors.

Does this PR introduce any user-facing change?

No. The protocol and its Scala abstractions are marked @Experimental. No existing user-facing APIs are modified.

How was this patch tested?

EchoProtocolSuite implements a self-contained gRPC server (echo worker) and client (engine) against the protocol, exercising normal paths, error paths, cancellation (including Cancel-after-Finish), chunked payload delivery, the concurrent
producer/consumer pattern, and the Manage RPC. The suite serves as a live specification check: if the proto comments and the test disagree, the test fails.

build/sbt 'udf-worker-core/testOnly *EchoProtocolSuite'

Was this patch authored or co-authored using generative AI tooling?

Yes

@haiyangsun-db haiyangsun-db marked this pull request as ready for review May 3, 2026 16:13
@haiyangsun-db haiyangsun-db changed the title [SPARK-56413] Introduce the grpc protocol for UDF execution. [SPARK-56413] Add gRPC UDF execution protocol May 3, 2026
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto Outdated
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto Outdated
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto Outdated
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto Outdated
Comment thread udf/worker/proto/src/main/protobuf/udf_protocol.proto Outdated
Copy link
Copy Markdown
Contributor

@sven-weber-db sven-weber-db left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for addressing the comments

Comment thread udf/worker/README.md Outdated
@haiyangsun-db
Copy link
Copy Markdown
Contributor Author

@hvanhovell could you please help take another pass?

@dtenedor
Copy link
Copy Markdown
Contributor

PR Review: SPARK-56413 — gRPC UDF execution protocol

Summary

The PR adds udf/worker/proto/src/main/protobuf/udf_protocol.proto, swaps the placeholder InitMessage case class on WorkerSession for the generated Init proto, fixes two typos in common.proto, and updates the README and a test. The protocol defines:

  • service Worker { rpc Execute(stream UdfRequest) returns (stream UdfResponse); rpc Manage(WorkerRequest) returns (WorkerResponse); }
  • Execute wire order: InitPayloadChunk* → (DataRequest/DataResponse)* → exactly one Finish or Cancel.
  • Manage operations: Heartbeat, ShutdownRequest.

Overall this is a high-quality, well-documented wire contract. The doc comments are unusually thorough (lifecycle, ordering, "Required/Optional" tags, escape-hatch conventions, reserved ranges), and the engine/client split — typed engine-side fields vs. opaque UdfPayload carrying everything the client packs — is a good factoring.

Below are mostly questions / suggested clarifications; only a couple are blocking-ish.


Strengths

  • Clear separation of envelope (UdfRequest/UdfResponse) vs. control (UdfControlRequest/UdfControlResponse) vs. data (DataRequest/DataResponse).
  • Top-level bytes data on DataRequest/DataResponse (not nested) — explicit copy-avoidance rationale is great.
  • Reserved field range 8 to 99 for graduated session_conf keys, with the timezone precedent already showing the pattern.
  • >= 100 convention for opaque escape-hatch fields (parameters, future siblings).
  • PayloadChunk semantics (concatenation order, single InitResponse covering Init + all chunks, first DataRequest ending the chunk phase) are well-specified.
  • Scala-side: the new cancel-lifecycle doc + idempotency of cancel/close is exactly the kind of contract that pays off in practice.

Concerns / suggestions

1. Cancel-vs-finish race contradicts wire-level "mutually exclusive" wording (medium)

Finish doc in udf_protocol.proto says:

// Exactly one of [[Finish]] or [[Cancel]] is sent per Execute stream;
// they are mutually exclusive. If the engine has already sent
// [[Finish]] it MUST NOT send [[Cancel]] afterwards (and vice versa).
message Finish {}

…but the new Scala doc on WorkerSession describes a benign race:

 * Cancel-vs-finish race: when the session driver has finished
 * sending input (and therefore queued an implicit finish on the
 * underlying transport) and a [[cancel]] arrives concurrently, both
 * are valid stream-terminating actions; the response side carries
 * either a `FinishResponse` or a `CancelResponse` depending on which
 * the worker observes first, and either is acceptable to the caller.

If Finish is already queued on the transport and cancel() then writes Cancel, the engine has, on the wire, sent both — violating the proto's MUST NOT. Two reconciliations are possible, please pick one and state it clearly in the proto:

  • a) The wire really does forbid both, so cancel() after a buffered Finish is a no-op on the transport (just an early gRPC half-close). Reword the Scala "queued an implicit finish" sentence to match.
  • b) The wire tolerates a Cancel after Finish only before any FinishResponse has been observed (the engine raced with itself, not with another client). Then the proto MUST NOT needs softening to "MUST NOT send Cancel after observing FinishResponse" or similar.

As written, the Scala doc invites engine implementations that the proto's strict reading prohibits.

2. InitResponse timing vs. PayloadChunk is ambiguous (low/medium)

PayloadChunk says "The single InitResponse covers Init plus all of its chunks together", implying the worker must wait until chunking completes. But the order diagram doesn't say this explicitly, and it interacts with the optional PayloadChunk.last early-completion hint:

    bytes data = 1;

    // (Optional) Set to true on the final chunk. Receivers MAY use
    // this as an early signal that the payload is complete and
    // decoding can begin; receivers that prefer to wait for the
    // first [[DataRequest]] (which marks the end of the chunking
    // phase) MAY ignore this. When unset, the receiver determines
    // completeness by the arrival of the first [[DataRequest]].
    optional bool last = 2;

Two questions worth pinning down:

  • Is the worker permitted to send InitResponse before observing all PayloadChunk messages (e.g., as soon as it's decoded enough to start)? Or must it wait until the chunking phase is complete?
  • If last is unset, the receiver can only detect end-of-chunking by the first DataRequest. That's fine for the worker, but it means InitResponse can never be sent until the first DataRequest arrives — i.e., the engine cannot block sending data on receiving InitResponse. If that's the intent, say so; otherwise consider making last true mandatory when chunking is used.

3. No protocol/version field on Init (medium)

WorkerCapabilities (in worker_spec.proto) presumably handles up-front capability negotiation, but a protocol_version (or min_required_version) on Init would make per-stream rollout and rollback much simpler, especially before the engine has fully read the worker spec. Worth at least reserving a field number for it now, even if not populated.

4. Unknown UDFWorkerDataFormat values (low)

Init says receivers MUST reject UDF_WORKER_DATA_FORMAT_UNSPECIFIED, which is good, but doesn't say what to do with values outside the worker's supported set (e.g., a newer client picks PARQUET=2 that the worker doesn't speak). proto3 will pass unknown enums through as numeric values; the doc should say the worker MUST reject any value not in its declared WorkerCapabilities.supported_data_formats.

5. WorkerResponse should require matching branch (low)

message WorkerResponse {
    // Exactly one branch MUST be set, mirroring the request oneof.
    oneof manage {
        HeartbeatResponse heartbeat = 1;
        ShutdownResponse  shutdown  = 2;
    }
}

"Mirroring" is implicit. Suggest tightening to "The engine MUST receive a response whose oneof branch matches the request's branch; a mismatched response is a protocol error."

6. Empty DataRequest/DataResponse (low)

message DataRequest {
    // (Required, non-empty.) Encoded data bytes for one batch in the
    // session-declared format.
    bytes data = 1;
}

// Worker -> Engine per-batch payload. The worker emits zero or more
// [[DataResponse]]s between [[InitResponse]] and [[FinishResponse]] /
// [[CancelResponse]]. Sink-style UDFs (which consume input but
// produce no output rows on the data plane) emit exactly zero.
message DataResponse {
    // (Required, non-empty.) Encoded data bytes for one batch in the
    // session-declared format.
    bytes data = 1;
}

For Arrow this is fine (an empty IPC batch still has a header). For future formats that allow truly empty batches, requiring non-empty may be inconvenient. Either narrow the wording to "non-empty as defined by the session's data_format" or accept zero-length payloads and let the decoder reject. Not blocking.

7. Behavior of cancel() between init and process (low)

 * '''Lifecycle:''' [[cancel]] is idempotent and safe at any point in
 * the session's life:
 *  - before [[init]] -- nothing has been sent on the transport yet,
 *    so [[cancel]] is a no-op (the session may still be closed
 *    normally via [[close]]).
 *  - between [[init]] and [[process]] -- transitions the session
 *    into a cancelled state; subsequent [[process]] calls observe
 *    the cancellation.

"Subsequent process calls observe the cancellation" — by throwing? returning an empty iterator? returning a special exception class? Worth nailing down. My read of the surrounding contract is that callers expect an exception, since process returns an iterator and a silent empty result would mask cancellations.

8. Manage(Shutdown) while Execute streams are active (low)

The proto says Manage is "independent of any per-execution stream" but doesn't say what the worker is supposed to do if ShutdownRequest arrives while Execute streams are still open. Options:

  • Reject (return an error in ShutdownResponse?)
  • Accept and let active streams drain.
  • Accept and cancel active streams.

Worth specifying. Today there's no error channel on ShutdownResponse; if the worker can reject, you may want one.

9. PayloadChunk size validation vs. payload_size (nit)

UdfPayload.payload_size is documented for buffer pre-allocation. It would also be useful for chunk validation (sum of chunk bytes + inline payload == payload_size). Worth mentioning that validation use, or adding a sentence to PayloadChunk that the receiver MAY validate against payload_size.

10. Service naming (nit)

service Worker is quite generic in a project the size of Spark; another service named Worker could land elsewhere and confuse generated code lookups. Consider UdfWorker (matches UDFWorkerSpecification, UDFWorkerDataFormat).

11. Builder usage in README example (nit)

.setUdf(UdfPayload.newBuilder()
  .setPayload(ByteString.copyFrom(serializedFunction))
  .setFormat(payloadFormat))   // worker-recognised tag

This works because setUdf accepts a builder, but Spark's other proto-builder snippets tend to call .build() explicitly. Either pattern is fine, just suggest picking one for consistency across docs.

12. eval_type shape (discussion, not blocking)

    // (Optional) Worker / language-specific dispatch hint. A
    // free-form string the worker uses to pick the code path that
    // handles this payload. The protocol does not enumerate eval
    // types because they are language-specific; the client side of
    // the protocol and the worker agree on the namespace and the
    // values.

This deliberately mirrors PySpark's PythonEvalType. The "any string both sides agree on" approach is flexible, but in practice every worker will hard-code recognised values, so this is effectively an enum with no central registry. Worth at least linking from here to a doc that lists the recognised values per worker once the first worker lands, so users don't have to grep server source.

13. Coupling of abstract WorkerSession to generated proto class (discussion)

doInit(message: Init) now binds every WorkerSession implementation directly to the generated class. Pros: zero conversion overhead, single source of truth. Cons: any proto-incompatible change ripples into every implementation. Given the @Experimental annotation this is fine for now, but it does mean we should be conservative about breaking changes in Init going forward (more conservative than the proto itself would suggest).


Nits

  • common.proto: typo fixes look good.
  • The doc-link style [[Init]] / [[FinishResponse]] in proto comments isn't standard protoc-gen-doc markup but is harmless plain text — fine.
  • Heartbeat/HeartbeatResponse empty messages: if you'd like richer health probes later (e.g., a server-side load hint), reserve a field number now to make additive evolution easy.
  • Consider an explicit note on Heartbeat about its relationship to gRPC keepalive — they aren't the same thing, and users will ask.

Verdict

LGTM in shape; the protocol is well-thought-through and the docs are noticeably above average. I'd want at least the cancel-vs-finish wire contradiction (#1) reconciled and the unknown-data_format rejection rule (#4) made explicit before merge; the rest are doc clarifications and minor questions.

// wire protocol and ordering invariants.
//
// Error contract: if the gRPC connection breaks at any point, gRPC
// surfaces an error on the stream. The engine therefore never needs
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think this is true. The UDF server can hang when you give it a poorly written UDF. I am not sure that you should give the UDF server this responsibility in the first place; the client side should have timeouts to deal with this type of scenario.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMO, detecting hanging UDF code is out-of-scope of this protocol as in theory customer can intentionally define a long sleep UDF. But yes, we should be able to set a timeout for that - e.g., a max timeout processing one row/unit of data.

A client-side cancel mechanism here may not be sufficient to deal with that - assuming the grpc buffer is filled with input batches, and it causes a hang in the UDF worker, then the cancel message would never make it to the UDF worker.

However, we shall be able to handle it with a UDF worker side time-out - if processing one row of data is taking more than a threshold, we shall trigger a hanging user-code exception.

The comments here are not quite well written, i will polish that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically, a specific worker impl. can easily add on top of this grpc proto to deal with hanging udfs in application level (introduce a timeout in worker code) - we don't have to complicate the udf protocol with extra logic to tell real hanging issue from false positives.

// eventually arrive. This applies to every in-flight response, not
// only [[CancelResponse]].
//
// Stream lifecycle: the engine MUST half-close the request stream
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Isn't this how GRPC stream-stream works to begin with? In general I don't understand what this section is trying to accomplish.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, i agree this is a bit redundant, i can trim the comments.


// Worker-scoped management RPC, independent of any per-execution
// stream. Used for heartbeat, capability query, and graceful
// shutdown. Kept unary so it does not depend on the lifecycle of an
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think I buy the comment on kept unary... I think it is simpler, and it does not really need it.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fixing the comment.

// MUST do so over the default connection of the worker specification.
//
// In future, additional connections (e.g. a separate channel) may be
// reserved by the worker spec for other purposes.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Like?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

e.g., we may use a dedicated side-channel for streaming state store.

// language) acts as the gRPC server.
// =====================================================================

// The default UDF gRPC service. A worker that exposes this service
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this make statements about the amount of concurrent execute calls are allowed, and that an implementation needs to make sure that we reserve some capacity for Manage calls?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, it's possible.
It's related to worker reuse and idle pooling - as we need to allocate sessions to workers for reuse - we will carefully add that part when we introduce the idle pooling in the dispatcher.

Btw, this should be a property in the worker specification as it's a worker property that is transparent to the protocol here.

// is the canonical end-of-chunking signal; the worker MUST NOT send
// [[InitResponse]] before receiving it.
//
// When [[UdfPayload.payload_size]] is set on [[Init.udf]], receivers
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this include a checksum?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For security or for integrity? How could this break?

// of the stream.
//
// Backpressure: this protocol currently relies on gRPC's transport-level
// (HTTP/2) flow control for backpressure. Application-level backpressure
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is probably going to good enough provided that client -> server streaming backpressure works properly....

message FinishResponse {
// Final metrics aggregated over the whole session (e.g. rows
// in/out, time per phase). Free-form; names are worker-defined.
map<string, string> metrics = 1;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I am very much in favor of metrics. I am just wondering if this is the right place to emit them. I guess folks might more realtime metrics for example. Perhaps put them in a separate message? Another question would be if metrics should be part of the UDF execution flow or the maintenance rpc?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea.

  1. maintenance rpc can give global metrics as that's for the whole worker.
  2. finish response can give metrics that are accumulated through the udf execution time.
  3. we are open to more instant metrics along the way - we can introduce some periodic ack from worker to the engine telling the status of the udf execution, which can also be used to implement application-level back pressure if needed.

do we need that now? imo, this can be added as an optional message in future, without breaking any of existing assumptions, and not every udf worker needs to implement that.


// (Optional) Language-specific error class name (e.g. "ValueError"
// in Python, "RuntimeException" in Java).
optional string error_class = 3;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add NERF support here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NERF(Non-Error Return Flow)? What do you mean by support NERF here? could you please give an example?

// to decode. The engine forwards this string unchanged.
string format = 2;

// (Optional) Total payload size in bytes. Useful when chunked
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add a CRC as well?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

would that be expensive, and is it really necessary? I assume that if the payload is malformed, then it would fail anyway, what would be the extra benefits using checksum? Is it for security concern?

}

// Liveness probe. The engine may send this periodically to detect a
// hung worker process. The worker SHOULD reply within a small bounded
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

As noted earlier this puts a somewhat interesting constraint on the worker implementation. They have to reserve a single thread to make sure these can get served. Even then things can get wonky if there are too many threads being used by the worker.

}

// Acknowledgment for [[Heartbeat]].
message HeartbeatResponse {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It'd be nice if we'd return some metrics here as well.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sure, i can add that

responseObserver.onCompleted()

case WorkerRequest.Manage.Shutdown(req) =>
// FINDING 2: The proto says the worker SHOULD exit after all Execute
Copy link
Copy Markdown
Contributor

@hvanhovell hvanhovell May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be addressed or reworded?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

rephrased - this was some temporary comment that i forgot to cleanup.

return
}
val inlinePayload = init.getUdf.getPayload
state = AwaitingData(inlinePayload)
Copy link
Copy Markdown
Contributor

@hvanhovell hvanhovell May 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you send a response in for non chunking mode here?


private def handleInit(init: Init): Unit = state match {
case AwaitingInit =>
// FINDING 4 (resolved): unsupported protocol_version is now surfaced
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not sure if I understand the FINDING comments.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

non-cleaned up comment, removing...

}

case Chunking(existing) =>
val updated = existing.concat(chunk.getData)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You seem to be discarding the payload. It is probably fine for testing, but it is a bit weird.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yeah, this is a test-only dummy server as we would not like to do any serious serialization / deserialization.


private def handleFinish(): Unit = state match {
case AwaitingData(_) =>
// Generator-style UDF: engine sends Finish directly after Init.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should not be the case here.

// gRPC transport error from the engine side (connection dropped).
// Per the protocol: treat as equivalent to Cancel for cleanup purposes;
// do NOT attempt to send CancelResponse (stream is dead).
state = Done
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Shouldn't you call handleCancel here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants