feat: custom Rust UDFs [experimental] [skip ci]#4283
Conversation
Create the comet-udf-sdk workspace crate with the ABI version constant and a single unit test. Register it in native/Cargo.toml default-members and members so it builds as part of the native workspace.
Add homepage, repository, and authors from workspace.package to match the pattern used by other Comet crates. Add publish = false to make the intent explicit. Fix a broken intra-doc link on COMET_UDF_ABI_VERSION that referenced the not-yet-existing export! macro.
Re-export field_to_ipc_bytes and field_from_ipc_bytes from the crate root so callers don't need to reach into the types module directly. Tighten the struct_type_via_ipc test to assert full Field equality (name, nullability, metadata) rather than just data_type equality. Update doc comments on both IPC helpers to clearly state these are raw FlatBuffer-encoded Schema messages (not framed Arrow IPC streams), and fix a pre-existing broken intra-doc link on to_data_type.
Rename UdfErrorCode::None to Reserved (None reads as absent value), add compile-time layout assertions for the repr(C) UdfError struct, and make UdfError::set auto-free any existing message so repeat calls are safe without manual free_in_place between them.
Add a compile-time layout assertion block for UdfDescriptor (mirroring the existing one for UdfError) to catch field reorders at compile time. Move use std::ffi::c_char to the top-of-file imports block.
Add null-pointer checks at the start of invoke_impl (rc=3) so caller misuse cannot cause UB; reorder the null check in comet_udf_describe before OnceLock initialization; add cdylib crate-type requirement to the export! macro doc; tighten name_len NUL semantics comment.
…tions Make `from_scalar_udf_impl` return `Result<impl CometScalarUdf, CometUdfError>` so non-Exact signatures and return_type failures surface immediately at construction time instead of silently producing a broken adapter. Cache `ConfigOptions` on the struct rather than allocating per `invoke` call. Add tests for scalar-return materialization and rejection of non-Exact signatures.
Add `libloading = "0.8"` to datafusion-comet dependencies, create the `execution::rust_udf` module with stub submodules (loader, cache, adapter), add `test_support` helpers, and introduce `core/build.rs` that sets `COMET_TEST_UDFS_LIB` so tests can locate the comet-test-udfs cdylib.
Move comet-udf-sdk from dev-dependencies to dependencies so production code in the UDF loader compiles outside of test builds. Add SAFETY comments to all unsafe blocks in loader.rs and implement Error::source() for LoaderError to chain the underlying libloading error.
Remove per-call dlsym lookups by caching InvokeFn and FreeErrFn raw function pointers on RustUdfHandle at construction time. Remove mem::forget of input Vecs so partial-failure paths free live FFI_ArrowArray slots correctly via Vec::drop. Add a return-type drift check after the existing row-count check.
Add CometRustUdfBridge JNI entry points (validateLibrary, listUdfs) in native/core and corresponding Java class in org.apache.comet.udf so the driver can validate cdylib contents before distributing to executors.
Add CometRustUdfSuite covering both happy-path (Tasks 20) and failure-path (Task 21) scenarios: - add_one executes natively and returns id+1 - add_one result verified row-by-row against Spark reference - registerAll succeeds for primitive-typed UDFs (struct-type parse limitation documented as a cancelled/skipped test with TODO) - always_err surfaces "intentional failure" in the exception chain - always_panic surfaces a panic-related error - missing library path throws CometRustUdfLoadException at register time - wrong arity in declared signature throws CometRustUdfSignatureException Also forward comet.test.udfs.lib system property through the scalatest-maven-plugin configuration in the root pom.xml, so the property set via -D on the Maven command line reaches the test JVM.
- Exception classes now extend Comet hierarchy (CometNativeException / CometRuntimeException) instead of RuntimeException directly - Replace hand-rolled JSON in JNI bridge with serde_json for correctness - Remove forward-reference Task N comments from docs and tests - Extract symbol-name byte literals into a symbols module - Add RUST_UDFS_CONF_KEY constant for the conf key - Simplify propagateConf Option round-trip - Remove redundant field_tag check in loader's decode_type - Honor declared volatility in registerAll (Volatile -> non-deterministic) - Dedupe InvokeFn / FreeErrFn type definitions in rust_udf::mod - Skip canonicalize() syscall on cache hit (fast path on raw path) - Pre-compute arg_fields / return_field on DataFusionAdapter - Rewrite spark.comet.rustUdfs from registry snapshot to dedupe entries
|
This is great news Andy. I was thinking if we could use abi_stable instead of native mem layout ? .. cc : @timsaucer |
|
Ok, I asked my agent to help me with a pro/con list of switching to use I read through What the rework looks like concretelyThe Comet SDK has five concerns, and four of them are already in
What stays in Comet is roughly:
That's roughly ~1000 of the ~1300 lines under BenefitsThe big functional wins come from inheriting things the PR explicitly punts on. The user guide and The other big win is that aggregate and window UDFs (deferred in the PR) become a straight extension: From a correctness angle, stabby handles a few sharp edges in the current code:
Drawbacks — the honest listArrow-version decoupling is the one claim that genuinely weakens. The PR states user libraries are "decoupled from Comet's Moving to User-facing API surface is larger. Velocity coupling to datafusion-ffi. If you hit a bug in stabby's behavior or in an Panic semantics. The Comet SDK has very deliberate Loss of "Comet owns the wire format." Today the wire format is in RecommendationTwo things to verify before committing to the direction:
|
paleolimbot
left a comment
There was a problem hiding this comment.
Right now we have some instability in datafusion-ffi which leads to requiring datafusion major versions to match. This has been an annoyance, but not a deal breaker for datafusion-python.
This has been tricky for us to deal with (because we're not on bleeding-edge DataFusion and the extension libraries are, so bugfixes don't always get backported and rereleased). Our other constraint is that we implement some things in C++ (for reasons out of our control), so we need the C side to be explicit.
We're using this:
...and on the Rust side
(I'd love to remove all of that in favour of datausion-ffi, but I think it's reasonable for projects to use a datafusion-version independent workaround until it stabilizes)
| /// DataFusion adapter for a Rust UDF loaded from a cdylib. | ||
| /// | ||
| /// Holds: | ||
| /// - an `Arc<LoadedLibrary>` so the cdylib outlives the adapter, | ||
| /// - a clone of the per-UDF descriptor (`LoadedUdf`), | ||
| /// - a precomputed DataFusion `Signature`, | ||
| /// - cached raw function pointers to avoid per-call `dlsym` lookups. | ||
| /// | ||
| /// On every batch, `invoke_with_args` exports each input via the Arrow | ||
| /// C Data Interface, calls the cdylib's `comet_udf_invoke`, and imports | ||
| /// the result. | ||
| pub struct RustUdfHandle { |
There was a problem hiding this comment.
It's probably worth separating the dynamic library from the C Udf implementation (this seems like it couples them closely). Dynamic libraries are one way to manage these but passing the pointer address through Java (like the C Data interface) might be more flexible.
|
Posted #4459 as an alternative implementation for comparison, using only arrow-stable FFI surfaces as suggested in the feedback above. The new PR provides two flavors side-by-side so reviewers can compare:
Same JVM API as this PR ( |
Which issue does this PR close?`
Closes #.
Rationale for this change
This PR adds support for custom UDFs in Rust. The user implements a
CometScalarUDFtrait, builds their crate as acdylib, and registers the resulting.so/.dylibfrom Scala. Comet loads the library inside the executor and dispatches to it directly during native execution.Example user UDF:
The cross-
.soboundary uses the Arrow C Data Interface (FFI_ArrowArray/FFI_ArrowSchema), so user libraries are decoupled from Comet'sarrow-rsanddatafusionversions: the only stability contract is the SDK ABI version (currently1).What changes are included in this PR?
Three new pieces, plus narrow integration in existing Comet:
comet-udf-sdk— public Rust crate. DefinesCometScalarUdf, signature / type-tag / error types, anexport!macro emitting versionedextern "C"entry points, and an optionalfrom_scalar_udf_impladapter behind thedatafusion-adapterfeature.comet-test-udfs— in-tree test cdylib exposing five UDFs (happy path, struct-typed, user error, panic, length mismatch) used by host and end-to-end tests.rust_udfmodule innative/core—loader(libloading + ABI check + descriptor parse), process-widecache, andRustUdfAdapterimplScalarUDFImpl.RustUdfCallproto inexpr.protoand a planner branch increate_exprthat resolves the call against the cache and wraps the adapter as aScalarUDF.CometRustUdfBridge/comet_rust_udf_bridge.rs) for driver-sidevalidateLibrary/listUdfs.CometRustUDF.register/registerAll,CometRustUdfRegistry, typed exception classes.QueryPlanSerdebranch that recognizes aScalaUDFwhose name is registered and emitsRustUdfCallinstead.docs/source/user-guide/latest/custom-rust-udfs.md.Marked experimental: scope is intentionally scalar-only, dynamic-library loading only, no JVM fallback, library distribution is the user's responsibility (Spark
--filesor pre-install). Aggregate / window / table-valued UDFs and richer nested-type signature mapping are deliberately deferred.How are these changes tested?
comet-udf-sdk) — 11 tests covering type-tag round-trip, IPC field encoding, error types, layout assertions for bothUdfErrorandUdfDescriptor, theEncodedSignaturebuilder, and the optional DataFusion adapter (signature derivation, scalar materialization, non-Exact rejection).native/core/src/execution/rust_udf/) — 9 tests covering library load + ABI check, descriptor parse for primitive and struct-typed UDFs, process-wide cache identity, and four async tokio adapter tests that run UDFs end-to-end through DataFusion (happy path, user error, panic, length mismatch).CometRustUdfRegistrySuite) — 3 tests covering register / re-register / snapshot semantics on the driver registry.CometRustUdfSuite) — 6 tests pass: native execution ofadd_one, error / panic surfacing, missing-path failure, signature mismatch failure, andregisterAllfor primitive-typed UDFs. One test (registerAllover the struct-typed fixture) is currently cancelled — it hits a v1 limitation around mapping Arrow'sDataType::to_stringoutput forStructto a Spark DDL parser-acceptable form. Documented in the user guide's Limitations section; works fine via explicitregisterwith declared types.The end-to-end suite is gated on
-Dcomet.test.udfs.lib=<path to libcomet_test_udfs>; the path is plumbed through scalatest'ssystemPropertiesin the rootpom.xml. The Rust test crate'score/build.rsexposes the same path to native tests via theCOMET_TEST_UDFS_LIBenv var.