Skip to content

feat: custom Rust UDFs [experimental] [skip ci]#4283

Draft
andygrove wants to merge 31 commits into
apache:mainfrom
andygrove:feat/rust-udfs
Draft

feat: custom Rust UDFs [experimental] [skip ci]#4283
andygrove wants to merge 31 commits into
apache:mainfrom
andygrove:feat/rust-udfs

Conversation

@andygrove
Copy link
Copy Markdown
Member

@andygrove andygrove commented May 10, 2026

Which issue does this PR close?`

Closes #.

Rationale for this change

This PR adds support for custom UDFs in Rust. The user implements a CometScalarUDF trait, builds their crate as a cdylib, and registers the resulting .so / .dylib from Scala. Comet loads the library inside the executor and dispatches to it directly during native execution.

Example user UDF:

impl CometScalarUdf for AddOne {
    fn name(&self) -> &str {
        "add_one"
    }
    fn signature(&self) -> &CometUdfSignature {
        &self.sig
    }
    fn invoke(&self, args: &[ArrayRef]) -> Result<ArrayRef, CometUdfError> {
        let arr = args[0]
            .as_any()
            .downcast_ref::<Int64Array>()
            .ok_or_else(|| CometUdfError::new("expected Int64Array"))?;
        let out: Int64Array = arr.iter().map(|v| v.map(|x| x + 1)).collect();
        Ok(Arc::new(out))
    }
}

The cross-.so boundary uses the Arrow C Data Interface (FFI_ArrowArray / FFI_ArrowSchema), so user libraries are decoupled from Comet's arrow-rs and datafusion versions: the only stability contract is the SDK ABI version (currently 1).

What changes are included in this PR?

Three new pieces, plus narrow integration in existing Comet:

  • comet-udf-sdk — public Rust crate. Defines CometScalarUdf, signature / type-tag / error types, an export! macro emitting versioned extern "C" entry points, and an optional from_scalar_udf_impl adapter behind the datafusion-adapter feature.
  • comet-test-udfs — in-tree test cdylib exposing five UDFs (happy path, struct-typed, user error, panic, length mismatch) used by host and end-to-end tests.
  • rust_udf module in native/coreloader (libloading + ABI check + descriptor parse), process-wide cache, and RustUdfAdapter impl ScalarUDFImpl.
  • RustUdfCall proto in expr.proto and a planner branch in create_expr that resolves the call against the cache and wraps the adapter as a ScalarUDF.
  • JNI bridge (CometRustUdfBridge / comet_rust_udf_bridge.rs) for driver-side validateLibrary / listUdfs.
  • Scala APICometRustUDF.register / registerAll, CometRustUdfRegistry, typed exception classes.
  • QueryPlanSerde branch that recognizes a ScalaUDF whose name is registered and emits RustUdfCall instead.
  • User guide at docs/source/user-guide/latest/custom-rust-udfs.md.

Marked experimental: scope is intentionally scalar-only, dynamic-library loading only, no JVM fallback, library distribution is the user's responsibility (Spark --files or pre-install). Aggregate / window / table-valued UDFs and richer nested-type signature mapping are deliberately deferred.

How are these changes tested?

  • SDK unit tests (comet-udf-sdk) — 11 tests covering type-tag round-trip, IPC field encoding, error types, layout assertions for both UdfError and UdfDescriptor, the EncodedSignature builder, and the optional DataFusion adapter (signature derivation, scalar materialization, non-Exact rejection).
  • Native host tests (native/core/src/execution/rust_udf/) — 9 tests covering library load + ABI check, descriptor parse for primitive and struct-typed UDFs, process-wide cache identity, and four async tokio adapter tests that run UDFs end-to-end through DataFusion (happy path, user error, panic, length mismatch).
  • Driver-side Scala suite (CometRustUdfRegistrySuite) — 3 tests covering register / re-register / snapshot semantics on the driver registry.
  • End-to-end Spark suite (CometRustUdfSuite) — 6 tests pass: native execution of add_one, error / panic surfacing, missing-path failure, signature mismatch failure, and registerAll for primitive-typed UDFs. One test (registerAll over the struct-typed fixture) is currently cancelled — it hits a v1 limitation around mapping Arrow's DataType::to_string output for Struct to a Spark DDL parser-acceptable form. Documented in the user guide's Limitations section; works fine via explicit register with declared types.

The end-to-end suite is gated on -Dcomet.test.udfs.lib=<path to libcomet_test_udfs>; the path is plumbed through scalatest's systemProperties in the root pom.xml. The Rust test crate's core/build.rs exposes the same path to native tests via the COMET_TEST_UDFS_LIB env var.

andygrove added 30 commits May 10, 2026 09:23
Create the comet-udf-sdk workspace crate with the ABI version constant
and a single unit test. Register it in native/Cargo.toml default-members
and members so it builds as part of the native workspace.
Add homepage, repository, and authors from workspace.package to match
the pattern used by other Comet crates. Add publish = false to make the
intent explicit. Fix a broken intra-doc link on COMET_UDF_ABI_VERSION
that referenced the not-yet-existing export! macro.
Re-export field_to_ipc_bytes and field_from_ipc_bytes from the crate
root so callers don't need to reach into the types module directly.

Tighten the struct_type_via_ipc test to assert full Field equality
(name, nullability, metadata) rather than just data_type equality.

Update doc comments on both IPC helpers to clearly state these are raw
FlatBuffer-encoded Schema messages (not framed Arrow IPC streams), and
fix a pre-existing broken intra-doc link on to_data_type.
Rename UdfErrorCode::None to Reserved (None reads as absent value),
add compile-time layout assertions for the repr(C) UdfError struct,
and make UdfError::set auto-free any existing message so repeat calls
are safe without manual free_in_place between them.
Add a compile-time layout assertion block for UdfDescriptor (mirroring
the existing one for UdfError) to catch field reorders at compile time.
Move use std::ffi::c_char to the top-of-file imports block.
Add null-pointer checks at the start of invoke_impl (rc=3) so caller
misuse cannot cause UB; reorder the null check in comet_udf_describe
before OnceLock initialization; add cdylib crate-type requirement to
the export! macro doc; tighten name_len NUL semantics comment.
…tions

Make `from_scalar_udf_impl` return `Result<impl CometScalarUdf, CometUdfError>`
so non-Exact signatures and return_type failures surface immediately at
construction time instead of silently producing a broken adapter. Cache
`ConfigOptions` on the struct rather than allocating per `invoke` call.
Add tests for scalar-return materialization and rejection of non-Exact
signatures.
Add `libloading = "0.8"` to datafusion-comet dependencies, create the
`execution::rust_udf` module with stub submodules (loader, cache, adapter),
add `test_support` helpers, and introduce `core/build.rs` that sets
`COMET_TEST_UDFS_LIB` so tests can locate the comet-test-udfs cdylib.
Move comet-udf-sdk from dev-dependencies to dependencies so production
code in the UDF loader compiles outside of test builds. Add SAFETY
comments to all unsafe blocks in loader.rs and implement Error::source()
for LoaderError to chain the underlying libloading error.
Remove per-call dlsym lookups by caching InvokeFn and FreeErrFn raw
function pointers on RustUdfHandle at construction time. Remove
mem::forget of input Vecs so partial-failure paths free live
FFI_ArrowArray slots correctly via Vec::drop. Add a return-type drift
check after the existing row-count check.
Add CometRustUdfBridge JNI entry points (validateLibrary, listUdfs) in
native/core and corresponding Java class in org.apache.comet.udf so the
driver can validate cdylib contents before distributing to executors.
Add CometRustUdfSuite covering both happy-path (Tasks 20) and
failure-path (Task 21) scenarios:

- add_one executes natively and returns id+1
- add_one result verified row-by-row against Spark reference
- registerAll succeeds for primitive-typed UDFs (struct-type parse
  limitation documented as a cancelled/skipped test with TODO)
- always_err surfaces "intentional failure" in the exception chain
- always_panic surfaces a panic-related error
- missing library path throws CometRustUdfLoadException at register time
- wrong arity in declared signature throws CometRustUdfSignatureException

Also forward comet.test.udfs.lib system property through the
scalatest-maven-plugin configuration in the root pom.xml, so the
property set via -D on the Maven command line reaches the test JVM.
@andygrove andygrove changed the title [experimental] feat: custom Rust UDFs feat: custom Rust UDFs [experimental] May 10, 2026
- Exception classes now extend Comet hierarchy (CometNativeException /
  CometRuntimeException) instead of RuntimeException directly
- Replace hand-rolled JSON in JNI bridge with serde_json for correctness
- Remove forward-reference Task N comments from docs and tests
- Extract symbol-name byte literals into a symbols module
- Add RUST_UDFS_CONF_KEY constant for the conf key
- Simplify propagateConf Option round-trip
- Remove redundant field_tag check in loader's decode_type
- Honor declared volatility in registerAll (Volatile -> non-deterministic)
- Dedupe InvokeFn / FreeErrFn type definitions in rust_udf::mod
- Skip canonicalize() syscall on cache hit (fast path on raw path)
- Pre-compute arg_fields / return_field on DataFusionAdapter
- Rewrite spark.comet.rustUdfs from registry snapshot to dedupe entries
@coderfender
Copy link
Copy Markdown
Contributor

This is great news Andy. I was thinking if we could use abi_stable instead of native mem layout ? .. cc : @timsaucer

@andygrove andygrove changed the title feat: custom Rust UDFs [experimental] feat: custom Rust UDFs [experimental] [skip ci] May 11, 2026
@timsaucer timsaucer self-requested a review May 22, 2026 21:28
@timsaucer
Copy link
Copy Markdown
Member

timsaucer commented May 27, 2026

Ok, I asked my agent to help me with a pro/con list of switching to use datafusion-ffi crate for the scalar UDFs. Here is their review. For me the big question is if the versioning requirement would be a deal breaker. Right now we have some instability in datafusion-ffi which leads to requiring datafusion major versions to match. This has been an annoyance, but not a deal breaker for datafusion-python. The major advantage is that it will make adding the aggregates and windows trivial once you have the scalars UDFs.


I read through native/comet-udf-sdk/src/{lib,types,error,macros,adapter}.rs and the datafusion-ffi udf module. Short answer: yes, this can be reworked to lean on datafusion-ffi, and a large fraction of the SDK becomes deletable. The harder question is whether you should — and the answer there is mostly yes, with one real tradeoff worth surfacing in review.

What the rework looks like concretely

The Comet SDK has five concerns, and four of them are already in datafusion-ffi:

Comet SDK piece datafusion-ffi equivalent
CometScalarUdf trait (name/signature/invoke) ScalarUDFImpl directly
UdfDescriptor #[repr(C)] + offset_of asserts FFI_ScalarUDF with stabby-checked layout
ArrowTypeTag + IPC FlatBuffer bytes for non-primitives WrappedSchema (wraps FFI_ArrowSchema for everything, primitive or not)
UdfError (#[repr(C)] + code + raw C-string) FFI_Result<...> + DataFusion's Result round-tripped through stabby
from_scalar_udf_impl adapter (feature-gated) this is the FFI; no adapter needed
Custom Volatility enum FFI_Volatility
EncodedSignature, manual descriptor lifetime mgmt stabby's SString/SVec handle this

What stays in Comet is roughly:

  1. A very thin discovery ABI — something like extern "C" fn comet_udf_list_v1(out: *mut SVec<FFI_ScalarUDF>) -> i32. You still need this because datafusion-ffi describes a single UDF; it doesn't define the "open this .so, ask it what UDFs it has" entry point. That's the only piece Comet legitimately needs to own. It's maybe 30-50 lines plus an export! macro.
  2. The host-side loader (loader.rs, cache.rs) — basically unchanged, but RustUdfHandle holds an Arc<ScalarUDF> built from a ForeignScalarUDF instead of raw InvokeFn/FreeErrFn pointers.
  3. The planner branch and JNI bridge — unchanged in shape; the JNI bridge actually gets simpler because you can clone() an FFI_ScalarUDF on the driver and ask it directly for name/signature/return type instead of round-tripping through hand-coded JSON.
  4. RustUdfAdapter — deletable. ForeignScalarUDF already implements ScalarUDFImpl, so it goes straight into the existing ScalarUDF::new_from_impl(...) path in PhysicalPlanner.

That's roughly ~1000 of the ~1300 lines under native/comet-udf-sdk/ and native/core/src/execution/rust_udf/ either deleted or significantly simplified.

Benefits

The big functional wins come from inheriting things the PR explicitly punts on. The user guide and adapter.rs both call out "TypeSignature::Exact only" — that's because deriving an Arrow type list at describe-time is the only thing the custom ArrowTypeTag scheme can express. With FFI_ScalarUDF, coerce_types and return_field_from_args are part of the surface, so variadic signatures, type coercion, and metadata/nullability-dependent return types all work without redesign. Same goes for aliases (Comet has none today), short_circuits, and full ConfigOptions propagation. The struct-type DDL parsing limitation called out as a registerAll test cancellation goes away too — FFI_ArrowSchema round-trips arbitrary Arrow types including nested structs without going through Spark's DDL parser.

The other big win is that aggregate and window UDFs (deferred in the PR) become a straight extension: FFI_AggregateUDF and FFI_WindowUDF already exist with the same shape, so the discovery ABI just grows from comet_udf_list_v1 to a small enum of UDF kinds, instead of needing a parallel comet_aggregate_udf_* ABI later.

From a correctness angle, stabby handles a few sharp edges in the current code:

  • The UdfError::set / free_in_place design correctly forces the host to call comet_udf_free_error in the same cdylib (you documented this), but it's load-bearing — if a future contributor cross-cdylib calls CString::from_raw on a pointer from a different allocator, it's UB. stabby's SString is allocator-agnostic.
  • The #[cfg(target_pointer_width = "64")] gating on layout asserts means 32-bit is implicitly unsupported — fine, but stabby makes that explicit and refuses to compile on layouts it can't guarantee.
  • The hand-rolled EncodedSignature pinning into a OnceLock to keep raw pointers stable for the process lifetime is replaced by stabby-owned SVec/SString whose lifetimes are tied to the FFI_ScalarUDF value.

Drawbacks — the honest list

Arrow-version decoupling is the one claim that genuinely weakens. The PR states user libraries are "decoupled from Comet's arrow-rs and datafusion versions: the only stability contract is the SDK ABI version." That claim is already softer than it sounds — lib.rs's docs note "You also need a direct dependency on arrow (matching the version used by Comet's host)." So the decoupling is at the Arrow C Data Interface level (the data layout), not the Rust type level. The user already needs a matching arrow crate.

Moving to datafusion-ffi adds datafusion-ffi itself (plus transitively datafusion-common and datafusion-expr) to the user's Cargo.toml. The whole point of datafusion-ffi is to be ABI-stable across DataFusion versions, but the user does pull more code in. Realistically that's an extra 10-30 MB of stripped cdylib and a chunk of build time. For Spark --files distribution this isn't great but isn't a dealbreaker either.

User-facing API surface is larger. CometScalarUdf is three methods. ScalarUDFImpl is ~ten plus a Signature. For someone writing "downcast to Int64Array, return Int64Array," the current SDK is friendlier. Users who already know DataFusion get reuse, and the example in the user guide barely changes; users who don't get a steeper ramp. You could ease this with a thin re-export / type alias in comet-udf-sdk so the public surface still looks like "Comet's SDK" while the trait underneath is ScalarUDFImpl.

Velocity coupling to datafusion-ffi. If you hit a bug in stabby's behavior or in an FFI_ScalarUDF field, you're now waiting on a datafusion release (or vendoring). The flip side is the bugs being fixed are not Comet's to fix.

Panic semantics. The Comet SDK has very deliberate catch_unwind placement in invoke_impl with a distinct UdfErrorCode::Panic so the host can attribute panics versus user errors. datafusion-ffi catches panics inside its FFI wrappers and surfaces them as FFI_Result::Err, but the user-error / panic distinction is collapsed — both become DataFusionError. If the Spark-facing exception classification (CometRustUdfPanicException etc.) is meant to stay distinct, you'd want to verify that distinction survives or layer it back on (e.g. checking message prefix, or adding a panic flag to FFI_Result upstream).

Loss of "Comet owns the wire format." Today the wire format is in comet-udf-sdk and Comet controls when ABI v2 happens. Switch to datafusion-ffi and the wire format is FFI_ScalarUDF's layout, which is owned upstream and stabilized through stabby. That's fine — but it does mean a layout change in datafusion-ffi forces a Comet bump, and COMET_UDF_ABI_VERSION either goes away or becomes a (Comet discovery version, datafusion-ffi version) tuple.

Recommendation

Two things to verify before committing to the direction:

  • Whether ForeignScalarUDF preserves the user-error vs panic distinction the way Comet's typed exceptions need it to.
  • Whether the discovery ABI (comet_udf_list_v1) can return SVec<FFI_ScalarUDF> cleanly, or whether you want a small wrapper enum to leave room for FFI_AggregateUDF / FFI_WindowUDF in the same discovery call without breaking the discovery ABI later.

Copy link
Copy Markdown
Member

@paleolimbot paleolimbot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Right now we have some instability in datafusion-ffi which leads to requiring datafusion major versions to match. This has been an annoyance, but not a deal breaker for datafusion-python.

This has been tricky for us to deal with (because we're not on bleeding-edge DataFusion and the extension libraries are, so bugfixes don't always get backported and rereleased). Our other constraint is that we implement some things in C++ (for reasons out of our control), so we need the C side to be explicit.

We're using this:

https://github.com/apache/sedona-db/blob/6fe541cade2fcd7132feffb17091f83ac1411777/c/sedona-extension/src/sedona_extension.h#L114-L204

...and on the Rust side

https://github.com/apache/sedona-db/blob/6fe541cade2fcd7132feffb17091f83ac1411777/c/sedona-extension/src/scalar_kernel.rs#L172-L330

(I'd love to remove all of that in favour of datausion-ffi, but I think it's reasonable for projects to use a datafusion-version independent workaround until it stabilizes)

Comment on lines +36 to +47
/// DataFusion adapter for a Rust UDF loaded from a cdylib.
///
/// Holds:
/// - an `Arc<LoadedLibrary>` so the cdylib outlives the adapter,
/// - a clone of the per-UDF descriptor (`LoadedUdf`),
/// - a precomputed DataFusion `Signature`,
/// - cached raw function pointers to avoid per-call `dlsym` lookups.
///
/// On every batch, `invoke_with_args` exports each input via the Arrow
/// C Data Interface, calls the cdylib's `comet_udf_invoke`, and imports
/// the result.
pub struct RustUdfHandle {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's probably worth separating the dynamic library from the C Udf implementation (this seems like it couples them closely). Dynamic libraries are one way to manage these but passing the pointer address through Java (like the C Data interface) might be more flexible.

@andygrove
Copy link
Copy Markdown
Member Author

Posted #4459 as an alternative implementation for comparison, using only arrow-stable FFI surfaces as suggested in the feedback above.

The new PR provides two flavors side-by-side so reviewers can compare:

  1. C ABI (sedona-style) — pure C-callable struct of fn pointers, parameterized only by the Arrow C Data Interface. No DataFusion types in the FFI surface; portable to C/C++. Modeled on @paleolimbot's sedona-db reference.
  2. datafusion-ffi (FFI_ScalarUDF) — wraps user ScalarUDFImpls as FFI_ScalarUDF. Inherits the full ScalarUDFImpl surface (variadic, type coercion, metadata-aware return types) for free, at the cost of a major-version pin against datafusion-ffi.

Same JVM API as this PR (CometRustUDF.register, registry, JNI bridge, RustUdfCall proto, QueryPlanSerde branch). Scope is intentionally narrower — scalar-only, happy-path e2e tests only, no docs — so the diff stays focused on the ABI strategy comparison.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants