Skip to content

[CELEBORN-2319] Standalone LifecycleManager && rust sdk#3677

Open
gavin9402 wants to merge 18 commits into
apache:mainfrom
gavin9402:standalone_and_rust
Open

[CELEBORN-2319] Standalone LifecycleManager && rust sdk#3677
gavin9402 wants to merge 18 commits into
apache:mainfrom
gavin9402:standalone_and_rust

Conversation

@gavin9402
Copy link
Copy Markdown

What changes were proposed in this pull request?

This PR introduces two major features to support non-JVM (C++/Rust) clients using Apache Celeborn for shuffle:

1. Standalone LifecycleManager Daemon (Scala/JVM)

  • Added LifecycleManagerDaemon — a standalone JVM process that hosts a LifecycleManager independently from any compute engine (Spark/Flink) Driver.
  • Added LifecycleManagerDaemonArguments for CLI argument parsing (--app-id, --master-endpoints, --port, --host, --properties-file).
  • Added sbin/start-lifecycle-manager.sh launch script with classpath assembly, environment loading, and required-argument validation.
  • Updated CelebornBuild.scala and service/pom.xml to add celeborn-client as a dependency of the service module (needed because the Daemon instantiates LifecycleManager from the client module).

2. Rust SDK via C++ FFI (rust/ directory)

  • celeborn-client-sys: Low-level FFI crate using cxx to bridge Rust ↔ C++ Celeborn client. Includes:
    • wrapper.h / wrapper.cc: C++ shim exposing 7 functions (create_client, setup_lifecycle_manager, shutdown, push_data, mapper_end, update_reducer_file_group, read_partition_full).
    • build.rs: Build script linking Celeborn C++ static libs and system dependencies (folly, protobuf, abseil, boost, etc.) for macOS and Linux.
  • celeborn-client: Safe, ergonomic Rust wrapper providing ShuffleClient with:
    • Input validation (app_id non-empty, port > 0, codec ∈ {NONE, LZ4, ZSTD}).
    • Drop-safe shutdown (prevents double ffi::shutdown via UniquePtr::null() swap).
    • Convenience method read_partition_all.
  • Two example programs (data_sum_writer.rs, data_sum_reader.rs) mirroring the existing C++ DataSumWithWriterClient / DataSumWithReaderClient test programs.

Why are the changes needed?

Currently, LifecycleManager can only run embedded inside a JVM-based compute engine Driver (e.g., Spark Driver). This makes it impossible for non-JVM applications (Daft engine, etc.) to use Celeborn as their shuffle service, because:

  1. The C++ client requires a running LifecycleManager to coordinate shuffle metadata (register shuffles, allocate slots, manage partition locations) with Celeborn Masters and Workers.
  2. Without a standalone LifecycleManager, non-JVM applications have no way to bootstrap this coordination layer.

By decoupling the LifecycleManager into a standalone daemon process, any client — regardless of language runtime — can connect to it via RPC. The Rust SDK then leverages this architecture to provide first-class Rust support by bridging to the existing, battle-tested C++ client implementation via FFI.

Does this PR resolve a correctness bug?

No

Does this PR introduce any user-facing change?

Yes.

  • New component: Users can now start a standalone LifecycleManager daemon via sbin/start-lifecycle-manager.sh --app-id <id> --master-endpoints <eps> --port <port>.
  • New SDK: Rust applications can now use the celeborn-client crate to perform shuffle read/write operations against a Celeborn cluster.
  • Limitation: The standalone LifecycleManager does not support auth (celeborn.auth.enabled must be false), as the C++/Rust clients lack SASL support.

How was this patch tested?

  • The Rust SDK was validated using the data_sum_writer and data_sum_reader example programs, which are Rust ports of the existing C++ integration tests (DataSumWithWriterClient.cpp / DataSumWithReaderClient.cpp). These write random numeric data across partitions and verify correctness by comparing partition sums between writer and reader.
  • The LifecycleManagerDaemon was tested by starting it against a local Celeborn cluster (Master + Workers) and verifying that the Rust examples can successfully connect, push data, and read data through the daemon.

@FMX
Copy link
Copy Markdown
Contributor

FMX commented May 8, 2026

This is amazing. What compute engine do you use?

@gavin9402
Copy link
Copy Markdown
Author

This is amazing. What compute engine do you use?

@FMX We plan to integrate it into the Daft engine.

@afterincomparableyum
Copy link
Copy Markdown
Contributor

This is really good @gavin9402 , thanks for contributing. I will drop some review comments soon if I have any.

Copy link
Copy Markdown
Contributor

@afterincomparableyum afterincomparableyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some small comments, overall though looks good to me.

I am fine with service --> client dependency leaks into master/worker, but others may think the Daemon doesn't need to live in service, and would propose that you consider putting it in a new module that depends on both service and client without these dependency leaks.

shutdownLatch.await()
}

private[lifecyclemanager] def applyArgsToConf(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The --host argument is basically ignored. LifecycleManagerDaemonArguments parses --host into host: Option[String], but applyArgsToConf only writes MASTER_ENDPOINTS and CLIENT_SHUFFLE_MANAGER_PORT. The host is never propagated. LifecycleManager binds to lifecycleHost = Utils.localHostName(conf) (LifecycleManager.scala:81), and Utils.localHostName only looks at the CELEBORN_LOCAL_HOSTNAME env or the auto resolved hostname. There's no conf key path for it.

So someone running start-lifecycle-manager.sh --host 10.0.0.5 gets the default hostname with no warning. Either drop the option, set CELEBORN_LOCAL_HOSTNAME from the script, or call Utils.setCustomHostname(host) before constructing LifecycleManager.

Comment thread rust/celeborn-client/src/lib.rs Outdated
/// already be torn down by `IOThreadPoolExecutor::join()`.
pub fn shutdown(mut self) -> Result<()> {
if let Some(pinned) = self.inner.as_mut() {
ffi::shutdown(pinned)?;
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If ffi::shutdown returns Err, the ? propagates and self is dropped normally. Drop::drop then calls ffi::shutdown(pinned) a second time.

The comment on shutdown says the handle is intentionally leaked to avoid a folly EventBase use-after-free during destruction, so calling shutdown twice could re-trigger that teardown path.

I propose either ensuring the handle is leaked even when ffi::shutdown returns an error (so Drop cannot call ffi::shutdown a second time), or adding a “shutdown attempted” flag so Drop skips the shutdown call and only leaks the handle.

Comment thread rust/celeborn-client-sys/src/wrapper.cc Outdated
Comment on lines +115 to +130
rust::Vec<uint8_t> out;
out.reserve(64 * 1024);
std::vector<uint8_t> buf(64 * 1024);

while (true) {
int n = stream->read(buf.data(), 0, buf.size());
if (n == -1) {
break;
}
if (n <= 0) {
throw std::runtime_error(
"celeborn-ffi: CelebornInputStream::read returned unexpected non-positive " +
std::to_string(n));
}
for (int i = 0; i < n; ++i) {
out.push_back(buf[i]);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Per-byte push_back into rust::Vec<uint8_t> can introduce noticeable overhead for large partition reads, and the initial reserve(64 * 1024) only avoids reallocations for the first buffer chunk.

Consider accumulating into a std::vector<uint8_t> (or appending in larger chunks) and copying once at the end, or resizing the destination per chunk and using memcpy instead of pushing one byte at a time.

This doesn’t need to be addressed in the current PR, but it may be worth leaving a TODO here to revisit for performance improvements.

return
}

if (daemonArgs.port < 1024) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LifecycleManagerDaemonArguments already sys.exit(1) on this condition. Dead code.

@gavin9402
Copy link
Copy Markdown
Author

@afterincomparableyum Thank you for your thorough review. I have made the requested changes by your suggestions above.

@gavin9402 gavin9402 force-pushed the standalone_and_rust branch from 391b486 to c08f576 Compare May 13, 2026 06:39
@afterincomparableyum
Copy link
Copy Markdown
Contributor

@gavin9402 can you please fix failing CI. particularly the style check, license check, cpp integration test (which is also a style failure), and the celeborn integration test. I took a look, and all you need to do is fix the format/style.

Copy link
Copy Markdown
Contributor

@afterincomparableyum afterincomparableyum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fix format/style failures in CI pls.

夷羿 added 2 commits May 18, 2026 14:47
- Add Apache license headers to all rust source files
- Fix clang-format violations in cpp source files
- Fix license header format in sbin/start-lifecycle-manager.sh
- Add cpp/build/** and rust/Cargo.lock to .rat-excludes
@gavin9402
Copy link
Copy Markdown
Author

@afterincomparableyum Done. Fixed all format/style failures

Please re-trigger CI. Thanks!

@afterincomparableyum
Copy link
Copy Markdown
Contributor

ping @SteNicholas or @FMX, could one of your please re-trigger CI.

夷羿 and others added 3 commits May 25, 2026 18:12
…red library

Replace the cxx-based bridge with a hand-written C ABI exported from
libceleborn_client.{so,dylib}. The shared library is now the sole external
link dependency — folly / protobuf / glog / abseil stay hidden inside it,
and Rust no longer needs cxx, rust::Vec set_len access tricks, or C++
header includes during the build.

C++ side:
  - cpp/celeborn/ffi/CelebornFfi.{h,cc}: extern "C" surface
    (celeborn_ffi_create_client / push_data / mapper_end / read_partition_full
     / open_partition_reader / read_partition_chunk / close_partition_reader / shutdown).
  - cpp/exports.{map,txt} + cpp/dummy.cc: limit exported symbols to
    celeborn::* and celeborn_ffi_* on both ld.bfd/lld (version script) and
    macOS ld64 (-exported_symbols_list).
  - cpp/CMakeLists.txt aggregates the static archives into one shared lib.

Rust side:
  - celeborn-client-sys is now pure `extern "C"` bindings; wrapper.{h,cc}
    are deleted.
  - build.rs locates libceleborn_client.* via CELEBORN_CPP_PREFIX, then
    rust/resource/lib/<target-triple>/, then falls back to a from-source
    cmake build into OUT_DIR.
  - celeborn-client re-implements the streaming open_partition /
    PartitionReader (std::io::Read) on top of the new C ABI, so the
    rust/examples/data_sum_reader.rs example keeps working.

Misc:
  - rust/resource/lib/README.md documents the layout and lookup precedence.
  - rust/.gitignore excludes prebuilt *.so / *.dylib / *.dll artifacts so
    the resource/lib/ directory only holds README.md in source control.

Verified with `cargo check --workspace --examples`.

Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
Copy link
Copy Markdown

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends Celeborn to better support non-JVM clients by (1) introducing a standalone JVM LifecycleManager daemon that can be launched independently of Spark/Flink drivers, and (2) adding a Rust SDK built on top of a C ABI shim and an aggregated C++ shared library.

Changes:

  • Added lifecycle-manager Maven module with a LifecycleManagerDaemon and CLI argument parsing + tests.
  • Added Rust workspace (rust/) with celeborn-client-sys (raw FFI) and celeborn-client (safe wrapper) plus example programs.
  • Updated C++ build to produce an aggregated libceleborn_client.{so,dylib} and updated distribution packaging/scripts to include the new daemon.

Reviewed changes

Copilot reviewed 29 out of 30 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
sbin/stop-lifecycle-manager.sh New stop script for standalone LifecycleManager daemon
sbin/start-lifecycle-manager.sh New start script with classpath assembly and optional port selection
rust/resource/lib/README.md Documents lookup/layout for prebuilt native client artifacts
rust/examples/data_sum_writer.rs Rust writer example mirroring existing C++ integration program
rust/examples/data_sum_reader.rs Rust reader example mirroring existing C++ integration program
rust/celeborn-client/src/lib.rs Safe Rust wrapper API over the raw FFI
rust/celeborn-client/Cargo.toml Rust wrapper crate manifest + example wiring
rust/celeborn-client/build.rs Emits rpath for downstream artifacts based on sys crate metadata
rust/celeborn-client-sys/src/lib.rs Raw extern "C" declarations and helper to take/free error strings
rust/celeborn-client-sys/Cargo.toml Sys crate manifest (links = "celeborn_client")
rust/celeborn-client-sys/build.rs Builds/locates libceleborn_client and sets link search + metadata
rust/Cargo.toml Declares Rust workspace members
rust/.gitignore Ignores Cargo build output and prebuilt native blobs
pom.xml Adds lifecycle-manager module to the Maven reactor
lifecycle-manager/src/test/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArgumentsSuite.scala Unit tests for daemon CLI parsing
lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemonArguments.scala CLI args parser + usage text
lifecycle-manager/src/main/scala/org/apache/celeborn/server/lifecyclemanager/LifecycleManagerDaemon.scala Standalone daemon main + shutdown hook + auth constraint
lifecycle-manager/pom.xml New Maven module definition and dependencies
cpp/exports.txt macOS exported symbols list for the aggregated shared library
cpp/exports.map ELF version-script controlling exports for the shared library
cpp/dummy.cc Placeholder TU for creating the aggregate shared library target
cpp/CMakeLists.txt Builds aggregated celeborn_client shared lib and adjusts link/export behavior
cpp/celeborn/ffi/CMakeLists.txt Adds static celeborn_ffi target (C ABI shim)
cpp/celeborn/ffi/CelebornFfi.h C ABI header for language bindings
cpp/celeborn/ffi/CelebornFfi.cc C ABI implementation wrapping the C++ ShuffleClient
cpp/celeborn/CMakeLists.txt Wires the new ffi/ subdirectory into the C++ build
cpp/celeborn/client/writer/PushMergedDataCallback.cpp Minor formatting adjustment
cpp/celeborn/client/tests/CelebornInputStreamRetryTest.cpp Minor formatting adjustments in tests
build/make-distribution.sh Packages lifecycle-manager jars into the distribution tarball
.rat-excludes Excludes generated build dirs and Rust lockfile from RAT checks

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment thread sbin/stop-lifecycle-manager.sh
Comment thread sbin/start-lifecycle-manager.sh
Comment thread rust/resource/lib/README.md
Comment thread rust/celeborn-client-sys/build.rs
Comment thread cpp/celeborn/ffi/CelebornFfi.cc
Comment thread cpp/celeborn/ffi/CelebornFfi.cc
Comment thread cpp/celeborn/ffi/CelebornFfi.cc
Comment thread cpp/celeborn/ffi/CelebornFfi.cc
Comment thread cpp/celeborn/ffi/CelebornFfi.cc
Comment thread rust/celeborn-client-sys/build.rs
gavin9402 and others added 2 commits May 26, 2026 11:29
Co-authored-by: Copilot Autofix powered by AI <175728472+Copilot@users.noreply.github.com>
@SteNicholas SteNicholas force-pushed the main branch 2 times, most recently from 1d92a40 to cf8d472 Compare May 27, 2026 02:11
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants