Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 20 additions & 2 deletions cpp/celeborn/client/ShuffleClient.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -67,7 +67,11 @@ ShuffleClientImpl::ShuffleClientImpl(
[conf]() {
return compress::Compressor::createCompressor(*conf);
})
: std::function<std::unique_ptr<compress::Compressor>()>()) {
: std::function<std::unique_ptr<compress::Compressor>()>()),
pushReplicateEnabled_(conf->clientPushReplicateEnabled()),
fetchExcludeWorkerOnFailureEnabled_(
conf->clientFetchExcludeWorkerOnFailureEnabled()),
fetchExcludedWorkers_(std::make_shared<FetchExcludedWorkers>()) {
CELEBORN_CHECK_NOT_NULL(clientFactory_);
CELEBORN_CHECK_NOT_NULL(pushDataRetryPool_);
}
Expand Down Expand Up @@ -348,7 +352,21 @@ std::unique_ptr<CelebornInputStream> ShuffleClientImpl::readPartition(
attemptNumber,
startMapIndex,
endMapIndex,
needCompression);
needCompression,
fetchExcludedWorkers_,
this);
}

void ShuffleClientImpl::excludeFailedFetchLocation(
const std::string& hostAndFetchPort,
const std::exception& e) {
if (pushReplicateEnabled_ && fetchExcludeWorkerOnFailureEnabled_ &&
utils::isCriticalCauseForFetch(e)) {
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
fetchExcludedWorkers_->set(hostAndFetchPort, now);
}
}

void ShuffleClientImpl::updateReducerFileGroup(int shuffleId) {
Expand Down
14 changes: 14 additions & 0 deletions cpp/celeborn/client/ShuffleClient.h
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,8 @@ class ShuffleClient {

virtual void updateReducerFileGroup(int shuffleId) = 0;

using FetchExcludedWorkers = utils::ConcurrentHashMap<std::string, int64_t>;

virtual std::unique_ptr<CelebornInputStream> readPartition(
int shuffleId,
int partitionId,
Expand All @@ -70,6 +72,10 @@ class ShuffleClient {
int endMapIndex,
bool needCompression) = 0;

virtual void excludeFailedFetchLocation(
const std::string& hostAndFetchPort,
const std::exception& e) = 0;

Comment thread
afterincomparableyum marked this conversation as resolved.
virtual bool cleanupShuffle(int shuffleId) = 0;

virtual void shutdown() = 0;
Expand Down Expand Up @@ -163,6 +169,10 @@ class ShuffleClientImpl
int endMapIndex,
bool needCompression) override;

void excludeFailedFetchLocation(
const std::string& hostAndFetchPort,
const std::exception& e) override;

void updateReducerFileGroup(int shuffleId) override;

bool cleanupShuffle(int shuffleId) override;
Expand Down Expand Up @@ -272,6 +282,10 @@ class ShuffleClientImpl
// Factory for creating compressor instances on demand to avoid sharing a
// single non-thread-safe compressor across concurrent operations.
std::function<std::unique_ptr<compress::Compressor>()> compressorFactory_;
bool pushReplicateEnabled_;
bool fetchExcludeWorkerOnFailureEnabled_;
std::shared_ptr<FetchExcludedWorkers> fetchExcludedWorkers_;

// TODO: pushExcludedWorker is not supported yet
};
} // namespace client
Expand Down
112 changes: 107 additions & 5 deletions cpp/celeborn/client/reader/CelebornInputStream.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,11 @@

#include "celeborn/client/reader/CelebornInputStream.h"
#include <lz4.h>
#include <thread>
#include "CelebornInputStream.h"
#include "celeborn/client/ShuffleClient.h"
#include "celeborn/client/compress/Decompressor.h"
#include "celeborn/utils/CelebornUtils.h"

namespace celeborn {
namespace client {
Expand All @@ -30,7 +34,9 @@ CelebornInputStream::CelebornInputStream(
int attemptNumber,
int startMapIndex,
int endMapIndex,
bool needCompression)
bool needCompression,
const std::shared_ptr<FetchExcludedWorkers>& fetchExcludedWorkers,
ShuffleClient* shuffleClient)
: shuffleKey_(shuffleKey),
conf_(conf),
clientFactory_(clientFactory),
Expand All @@ -45,7 +51,22 @@ CelebornInputStream::CelebornInputStream(
shouldDecompress_(
conf_->shuffleCompressionCodec() !=
protocol::CompressionCodec::NONE &&
needCompression) {
needCompression),
fetchChunkRetryCnt_(0),
fetchChunkMaxRetry_(
conf_->clientPushReplicateEnabled()
? conf_->clientFetchMaxRetriesForEachReplica() * 2
: conf_->clientFetchMaxRetriesForEachReplica()),
retryWait_(conf_->networkIoRetryWait()),
fetchExcludedWorkers_(fetchExcludedWorkers),
fetchExcludedWorkerExpireTimeoutMs_(
std::chrono::duration_cast<std::chrono::milliseconds>(
conf_->clientFetchExcludedWorkerExpireTimeout())
.count()),
readSkewPartitionWithoutMapRange_(
conf_->clientAdaptiveOptimizeSkewedPartitionReadEnabled() &&
startMapIndex > endMapIndex),
shuffleClient_(shuffleClient) {
if (shouldDecompress_) {
decompressor_ = compress::Decompressor::createDecompressor(
conf_->shuffleCompressionCodec());
Expand Down Expand Up @@ -178,6 +199,7 @@ void CelebornInputStream::moveToNextReader() {
if (!location) {
return;
}
fetchChunkRetryCnt_ = 0;
currReader_ = createReaderWithRetry(*location);
currLocationIndex_++;
if (currReader_->hasNext()) {
Expand All @@ -189,9 +211,62 @@ void CelebornInputStream::moveToNextReader() {

std::shared_ptr<PartitionReader> CelebornInputStream::createReaderWithRetry(
const protocol::PartitionLocation& location) {
// TODO: support retrying when createReader failed. Maybe switch to peer
// location?
return createReader(location);
const protocol::PartitionLocation* currentLocation = &location;
std::exception_ptr lastException;

Comment thread
afterincomparableyum marked this conversation as resolved.
while (fetchChunkRetryCnt_ < fetchChunkMaxRetry_) {
// TODO (Investigate): When a location is already in the exclusion list, the
// code throws and then retries/sleeps (especially in the no-peer branch).
// Since isExcluded(*currentLocation) will keep returning true until the
// exclusion expires, these retries are guaranteed to fail and just add
// delay. Consider failing fast (no sleep/retry) or skipping to another
// available location/peer when the current location is excluded.
try {
VLOG(1) << "Create reader for location " << currentLocation->host << ":"
<< currentLocation->fetchPort;
if (isExcluded(*currentLocation)) {
CELEBORN_FAIL(
"Fetch data from excluded worker! {}",
currentLocation->hostAndFetchPort());
}
Comment thread
afterincomparableyum marked this conversation as resolved.
auto reader = createReader(*currentLocation);
Comment thread
afterincomparableyum marked this conversation as resolved.
return reader;
} catch (const std::exception& e) {
lastException = std::current_exception();
shuffleClient_->excludeFailedFetchLocation(
currentLocation->hostAndFetchPort(), e);
fetchChunkRetryCnt_++;
Comment thread
afterincomparableyum marked this conversation as resolved.

if (currentLocation->hasPeer() && !readSkewPartitionWithoutMapRange_) {
if (fetchChunkRetryCnt_ % 2 == 0) {
std::this_thread::sleep_for(retryWait_);
}
Comment thread
SteNicholas marked this conversation as resolved.
LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
Comment thread
afterincomparableyum marked this conversation as resolved.
<< "/" << fetchChunkMaxRetry_ << " times for location "
<< currentLocation->hostAndFetchPort()
<< ", change to peer. Error: " << e.what();
// TODO: When stream handlers are supported, send BUFFER_STREAM_END
// to close the active stream before switching to peer, matching the
// Java CelebornInputStream behavior. Currently, the C++ client does
// not have pre-opened stream handlers, so stream cleanup is handled
// by WorkerPartitionReader's destructor.
currentLocation = currentLocation->getPeer();
} else {
LOG(WARNING) << "CreatePartitionReader failed " << fetchChunkRetryCnt_
<< "/" << fetchChunkMaxRetry_ << " times for location "
<< currentLocation->hostAndFetchPort()
<< ", retry the same location. Error: " << e.what();
std::this_thread::sleep_for(retryWait_);
}
}
}

// Max retries exceeded, rethrow the last exception wrapped with context
std::string errorMessage = "createPartitionReader failed after " +
std::to_string(fetchChunkRetryCnt_) +
" retries. Original location: " + location.hostAndFetchPort() +
", last attempted location: " + currentLocation->hostAndFetchPort();
throw utils::CelebornRuntimeError(lastException, errorMessage, false);
}

std::shared_ptr<PartitionReader> CelebornInputStream::createReader(
Expand Down Expand Up @@ -236,6 +311,33 @@ std::unordered_set<int>& CelebornInputStream::getBatchRecord(int mapId) {
return *batchRecords_[mapId];
}

bool CelebornInputStream::isExcluded(
const protocol::PartitionLocation& location) {
auto now = std::chrono::duration_cast<std::chrono::milliseconds>(
std::chrono::steady_clock::now().time_since_epoch())
.count();
auto timestamp = fetchExcludedWorkers_->get(location.hostAndFetchPort());
if (!timestamp.has_value()) {
return false;
}
if (now - timestamp.value() > fetchExcludedWorkerExpireTimeoutMs_) {
fetchExcludedWorkers_->erase(location.hostAndFetchPort());
return false;
}
if (location.hasPeer()) {
auto peerTimestamp =
fetchExcludedWorkers_->get(location.getPeer()->hostAndFetchPort());
// To avoid both replicate locations being excluded, if peer was added to
// excluded list earlier, change to try peer.
if (!peerTimestamp.has_value() ||
peerTimestamp.value() < timestamp.value()) {
return true;
}
return false;
}
return true;
}

void CelebornInputStream::cleanupReader() {
currReader_ = nullptr;
}
Expand Down
19 changes: 18 additions & 1 deletion cpp/celeborn/client/reader/CelebornInputStream.h
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,16 @@
#include "celeborn/client/compress/Decompressor.h"
#include "celeborn/client/reader/WorkerPartitionReader.h"
#include "celeborn/conf/CelebornConf.h"
#include "celeborn/utils/CelebornUtils.h"

namespace celeborn {
namespace client {
class ShuffleClient;

class CelebornInputStream {
public:
using FetchExcludedWorkers = utils::ConcurrentHashMap<std::string, int64_t>;

CelebornInputStream(
const std::string& shuffleKey,
const std::shared_ptr<const conf::CelebornConf>& conf,
Expand All @@ -35,7 +40,9 @@ class CelebornInputStream {
int attemptNumber,
int startMapIndex,
int endMapIndex,
bool needCompression);
bool needCompression,
const std::shared_ptr<FetchExcludedWorkers>& fetchExcludedWorkers,
ShuffleClient* shuffleClient);

int read(uint8_t* buffer, size_t offset, size_t len);

Expand All @@ -56,6 +63,8 @@ class CelebornInputStream {
std::shared_ptr<PartitionReader> createReader(
const protocol::PartitionLocation& location);

bool isExcluded(const protocol::PartitionLocation& location);

std::shared_ptr<const protocol::PartitionLocation> nextReadableLocation();

std::unordered_set<int>& getBatchRecord(int mapId);
Expand All @@ -81,6 +90,14 @@ class CelebornInputStream {
size_t currBatchSize_;
std::shared_ptr<PartitionReader> currReader_;
std::vector<std::unique_ptr<std::unordered_set<int>>> batchRecords_;

int fetchChunkRetryCnt_;
int fetchChunkMaxRetry_;
utils::Timeout retryWait_;
std::shared_ptr<FetchExcludedWorkers> fetchExcludedWorkers_;
int64_t fetchExcludedWorkerExpireTimeoutMs_;
bool readSkewPartitionWithoutMapRange_;
ShuffleClient* shuffleClient_;
};
} // namespace client
} // namespace celeborn
1 change: 1 addition & 0 deletions cpp/celeborn/client/tests/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ add_executable(
celeborn_client_test
WorkerPartitionReaderTest.cpp
PushDataCallbackTest.cpp
CelebornInputStreamRetryTest.cpp
PushStateTest.cpp
ReviveManagerTest.cpp
Lz4DecompressorTest.cpp
Expand Down
Loading
Loading