-
Notifications
You must be signed in to change notification settings - Fork 408
[CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient #3568
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
[CELEBORN-2229][CIP-14] Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient #3568
Conversation
|
@HolyLow @SteNicholas @RexXiong @FMX Could you please help review this PR? Thanks a lot for helping improve this as needed! |
| NUM_PROP(kShuffleCompressionZstdCompressLevel, 1), | ||
| STR_PROP(kClientPushBufferMaxSize, "64k"), | ||
| BOOL_PROP(kClientPushMaxBytesSizeInFlightEnabled, false), | ||
| NONE_PROP(kClientPushMaxBytesSizeInFlightTotal), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this NONE_PROP?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
my understanding is that NONE_PROP is correct. It matches Java's .createOptional behavior.
the property is optional, and the default is computed dynamically when not set.
Do you believe it should be something else other than NONE_PROP? I feel like NONE_PROP is good because these parameters are optional.
| pushStrategy_->clear(); | ||
|
|
||
| if (maxInFlightBytesSizeEnabled_) { | ||
| LOG(INFO) << "Cleanup " << totalInflightBytes_.load() << " bytes in flight."; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Pls do not use LOG(INFO) here which is too verbose.
| int>> | ||
| inflightBatchBytesSizes_; | ||
| folly::Synchronized<std::unique_ptr<std::exception>> exception_; | ||
| volatile bool cleaned_{false}; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should this be volatile? The volatile keyword in CPP has different meaning from Java.
cpp/celeborn/conf/CelebornConf.cpp
Outdated
| kShuffleCompressionCodec, | ||
| protocol::toString(protocol::CompressionCodec::NONE)), | ||
| NUM_PROP(kShuffleCompressionZstdCompressLevel, 1), | ||
| STR_PROP(kClientPushBufferMaxSize, "64k"), |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will this be translated? Is this option tested?
| optionalProperty(kShuffleCompressionZstdCompressLevel).value()); | ||
| } | ||
| } // namespace conf | ||
| } // namespace celeborn |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why is this change necessary?
| } | ||
|
|
||
| std::unique_ptr<PushState> pushState_; | ||
| static constexpr int pushTimeoutMs_ = 100; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It might be more consistent if we change the naming of static constexpr namings to kXxxXxx_
| pushState_->cleanup(); | ||
|
|
||
| EXPECT_FALSE(pushState_->limitMaxInFlight(hostAndPushPort)); | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should be an empty line at end of file.
| std::string, | ||
| std::shared_ptr<utils::ConcurrentHashSet<int>>> | ||
| inflightBatchesPerAddress_; | ||
| std::optional<utils::ConcurrentHashMap< |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we already have flags, why should we use std::optional here?
cpp/celeborn/conf/CelebornConf.cpp
Outdated
| long CelebornConf::clientPushMaxBytesSizeInFlightPerWorker() const { | ||
| auto optionalValue = | ||
| optionalProperty(kClientPushMaxBytesSizeInFlightPerWorker); | ||
| long maxBytesSizeInFlight = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why should this be that complicated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
any suggestions for how to simplify it @HolyLow ? I see in the java client, this is how the function is:
def clientPushMaxBytesSizeInFlightPerWorker: Long = {
val maxBytesSizeInFlight = get(CLIENT_PUSH_MAX_BYTES_SIZE_IN_FLIGHT_PERWORKER).getOrElse(0L)
if (clientPushMaxBytesSizeInFlightEnabled && maxBytesSizeInFlight > 0L) {
maxBytesSizeInFlight
} else {
clientPushMaxReqsInFlightPerWorker * clientPushBufferMaxSize
}
}
The logic is similar
cpp/celeborn/conf/CelebornConf.cpp
Outdated
| } | ||
|
|
||
| bool CelebornConf::clientPushMaxBytesSizeInFlightEnabled() const { | ||
| return optionalProperty(kClientPushMaxBytesSizeInFlightEnabled).value() == |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think this is not consistent to existing code.
cpp/celeborn/conf/CelebornConf.cpp
Outdated
|
|
||
| long CelebornConf::clientPushMaxBytesSizeInFlightTotal() const { | ||
| auto optionalValue = optionalProperty(kClientPushMaxBytesSizeInFlightTotal); | ||
| long maxBytesSizeInFlight = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ditto. Why is this so complicated?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's exactly how the java client function is:
def clientPushMaxBytesSizeInFlightTotal: Long = {
val maxBytesSizeInFlight = get(CLIENT_PUSH_MAX_BYTES_SIZE_IN_FLIGHT_TOTAL).getOrElse(0L)
if (clientPushMaxBytesSizeInFlightEnabled && maxBytesSizeInFlight > 0L) {
maxBytesSizeInFlight
} else {
clientPushMaxReqsInFlightTotal * clientPushBufferMaxSize
}
}
Any suggestions for how to simplify it @HolyLow ?
HolyLow
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for your contribution. I've left some comments, and I think we should fix all the CI tests firstly.
|
Thank you for comments @HolyLow wiil address them soon |
|
most recent commit should fix C++ unit test errors and clang formatting issues, next commit will address remaining comments. |
|
@HolyLow are you able to approve the github workflows again please? I've addressed all comments/responded to ones I did not make any code changes for. C++ unit tests pass, and I've applied clang format. |
|
@HolyLow is it safe to ignore the CI/CD Failures for Celeborn CI. All of them are due to:
As for the Celeborn SBT CI failures, I will look into those. |
|
After taking a closer look it looks like the CI/CD failures for Celeborn SBT CI are not related to my C++ Code changes. It looks flaky to me. I also see the exact workflows passed in my previous commit it ran for: 41ac021 I also ran the tests locally: @HolyLow let me know if there are any other concerns with my PR that you would like me to address. Thank you for your review! |
What changes were proposed in this pull request?
Add support for celeborn.client.push.maxBytesSizeInFlight in CppClient, similar to InFlightRequestTracker.java
Does this PR resolve a correctness bug?
No
How was this patch tested?
Compile locally, CI/CD will run unit tests