Skip to content

Conversation

@afterincomparableyum
Copy link

@afterincomparableyum afterincomparableyum commented Dec 16, 2025

What changes were proposed in this pull request?

Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient, similar to InFlightRequestTracker.java

Does this PR resolve a correctness bug?

No

How was this patch tested?

Compile locally, CI/CD will run unit tests

@HolyLow HolyLow self-requested a review December 17, 2025 08:59
@afterincomparableyum afterincomparableyum marked this pull request as ready for review December 21, 2025 23:11
@afterincomparableyum
Copy link
Author

@HolyLow @SteNicholas @RexXiong @FMX Could you please help review this PR? Thanks a lot for helping improve this as needed!

NUM_PROP(kShuffleCompressionZstdCompressLevel, 1),
STR_PROP(kClientPushBufferMaxSize, "64k"),
BOOL_PROP(kClientPushMaxBytesSizeInFlightEnabled, false),
NONE_PROP(kClientPushMaxBytesSizeInFlightTotal),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this NONE_PROP?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

my understanding is that NONE_PROP is correct. It matches Java's .createOptional behavior.

the property is optional, and the default is computed dynamically when not set.

Do you believe it should be something else other than NONE_PROP? I feel like NONE_PROP is good because these parameters are optional.

pushStrategy_->clear();

if (maxInFlightBytesSizeEnabled_) {
LOG(INFO) << "Cleanup " << totalInflightBytes_.load() << " bytes in flight.";
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pls do not use LOG(INFO) here which is too verbose.

int>>
inflightBatchBytesSizes_;
folly::Synchronized<std::unique_ptr<std::exception>> exception_;
volatile bool cleaned_{false};
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this be volatile? The volatile keyword in CPP has different meaning from Java.

kShuffleCompressionCodec,
protocol::toString(protocol::CompressionCodec::NONE)),
NUM_PROP(kShuffleCompressionZstdCompressLevel, 1),
STR_PROP(kClientPushBufferMaxSize, "64k"),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

How will this be translated? Is this option tested?

optionalProperty(kShuffleCompressionZstdCompressLevel).value());
}
} // namespace conf
} // namespace celeborn
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this change necessary?

}

std::unique_ptr<PushState> pushState_;
static constexpr int pushTimeoutMs_ = 100;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It might be more consistent if we change the naming of static constexpr namings to kXxxXxx_

pushState_->cleanup();

EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort));
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should be an empty line at end of file.

std::string,
std::shared_ptr<utils::ConcurrentHashSet<int>>>
inflightBatchesPerAddress_;
std::optional<utils::ConcurrentHashMap<
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If we already have flags, why should we use std::optional here?

long CelebornConf::clientPushMaxBytesSizeInFlightPerWorker() const {
auto optionalValue =
optionalProperty(kClientPushMaxBytesSizeInFlightPerWorker);
long maxBytesSizeInFlight =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why should this be that complicated?

Copy link
Author

@afterincomparableyum afterincomparableyum Dec 25, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

any suggestions for how to simplify it @HolyLow ? I see in the java client, this is how the function is:

  def clientPushMaxBytesSizeInFlightPerWorker: Long = {
    val maxBytesSizeInFlight = get(CLIENT_PUSH_MAX_BYTES_SIZE_IN_FLIGHT_PERWORKER).getOrElse(0L)
    if (clientPushMaxBytesSizeInFlightEnabled && maxBytesSizeInFlight > 0L) {
      maxBytesSizeInFlight
    } else {
      clientPushMaxReqsInFlightPerWorker * clientPushBufferMaxSize
    }
  }

The logic is similar

}

bool CelebornConf::clientPushMaxBytesSizeInFlightEnabled() const {
return optionalProperty(kClientPushMaxBytesSizeInFlightEnabled).value() ==
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this is not consistent to existing code.


long CelebornConf::clientPushMaxBytesSizeInFlightTotal() const {
auto optionalValue = optionalProperty(kClientPushMaxBytesSizeInFlightTotal);
long maxBytesSizeInFlight =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Why is this so complicated?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It's exactly how the java client function is:

  def clientPushMaxBytesSizeInFlightTotal: Long = {
    val maxBytesSizeInFlight = get(CLIENT_PUSH_MAX_BYTES_SIZE_IN_FLIGHT_TOTAL).getOrElse(0L)
    if (clientPushMaxBytesSizeInFlightEnabled && maxBytesSizeInFlight > 0L) {
      maxBytesSizeInFlight
    } else {
      clientPushMaxReqsInFlightTotal * clientPushBufferMaxSize
    }
  }

Any suggestions for how to simplify it @HolyLow ?

Copy link
Contributor

@HolyLow HolyLow left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for your contribution. I've left some comments, and I think we should fix all the CI tests firstly.

@afterincomparableyum
Copy link
Author

Thank you for comments @HolyLow wiil address them soon

@afterincomparableyum
Copy link
Author

most recent commit should fix C++ unit test errors and clang formatting issues, next commit will address remaining comments.

@afterincomparableyum
Copy link
Author

@HolyLow are you able to approve the github workflows again please? I've addressed all comments/responded to ones I did not make any code changes for. C++ unit tests pass, and I've applied clang format.

@afterincomparableyum
Copy link
Author

@HolyLow is it safe to ignore the CI/CD Failures for Celeborn CI. All of them are due to:

curl: (28) Failed to connect to archive.apache.org port 443 after 135159 ms: Connection timed out

As for the Celeborn SBT CI failures, I will look into those.

@afterincomparableyum
Copy link
Author

After taking a closer look it looks like the CI/CD failures for Celeborn SBT CI are not related to my C++ Code changes. It looks flaky to me.

I also see the exact workflows passed in my previous commit it ran for: 41ac021

I also ran the tests locally:

[info] Passed: Total 10, Failed 0, Errors 0, Passed 10
[info] Passed: Total 11, Failed 0, Errors 0, Passed 11
[info] ScalaTest
[info] Run completed in 1 hour, 16 seconds.
[info] Total number of tests run: 1
[info] Suites: completed 1, aborted 0
[info] Tests: succeeded 1, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[info] Passed: Total 26, Failed 0, Errors 0, Passed 26
[info] Run completed in 1 hour, 1 second.
[info] Total number of tests run: 111
[info] Suites: completed 28, aborted 0
[info] Tests: succeeded 111, failed 0, canceled 0, ignored 0, pending 0
[info] All tests passed.
[success] Total time: 3737 s (01:02:17), completed Dec 29, 2025 9:50:22 PM

@HolyLow let me know if there are any other concerns with my PR that you would like me to address. Thank you for your review!

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants