Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions src/client/StripeReader.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -311,10 +311,8 @@ void StripeReader::readStripe() {
returnedChunk->state = StripedBlockUtil::StripingChunk::FETCHED;
alignedStripe.fetchedChunksNum++;
updateState4SuccessRead(r);
if (alignedStripe.fetchedChunksNum == dataBlkNum) {
clearFutures();
break;
}
// Do not exit early when fetchedChunksNum reaches dataBlkNum: parity
// reads and decode retries also enqueue futures and must be drained.
} else {
returnedChunk->state = StripedBlockUtil::StripingChunk::MISSING;
dfsStripedInputStream->closeReader(readerInfos[r.index]);
Expand Down
5 changes: 3 additions & 2 deletions src/client/StripedBlockUtil.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,8 @@ void StripedBlockUtil::constructInternalBlock(LocatedBlock & bg, int32_t idxInRe

void StripedBlockUtil::divideOneStripe(shared_ptr<ECPolicy> ecPolicy,
int cellSize, LocatedBlock & blockGroup, long rangeStartInBlockGroup,
long rangeEndInBlockGroup, ByteBuffer * buf, std::vector<AlignedStripe*> & stripes) {
long rangeEndInBlockGroup, ByteBuffer * buf, int32_t bufBaseOffset,
std::vector<AlignedStripe*> & stripes) {
int dataBlkNum = ecPolicy->getNumDataUnits();
// Step 1: map the byte range to StripingCells
std::vector<StripingCell> cells;
Expand Down Expand Up @@ -105,7 +106,7 @@ void StripedBlockUtil::divideOneStripe(shared_ptr<ECPolicy> ecPolicy,
chunk = shared_ptr<StripingChunk>(new StripingChunk());
s->chunks[cell.idxInStripe] = chunk;
}
int pos = static_cast<int>(done + overlapStart - cellStart);
int pos = static_cast<int>(bufBaseOffset + done + overlapStart - cellStart);
chunk->getChunkBuffer()->addSlice(buf, pos, overLapLen);
}
}
Expand Down
37 changes: 22 additions & 15 deletions src/client/StripedBlockUtil.h
Original file line number Diff line number Diff line change
Expand Up @@ -329,26 +329,26 @@ class StripedBlockUtil {

StripingCell() = default;

StripingCell(shared_ptr<ECPolicy> ecPolicy, int cellSize, long idxInBlkGroup,
StripingCell(shared_ptr<ECPolicy> policy, int portionSize, long blkGroupCellIndex,
long cellOffset) {
ecPolicy = ecPolicy;
idxInBlkGroup = idxInBlkGroup;
idxInInternalBlk = idxInBlkGroup / ecPolicy->getNumDataUnits();
idxInStripe = static_cast<int>(idxInBlkGroup -
idxInInternalBlk * ecPolicy->getNumDataUnits());
ecPolicy = policy;
idxInBlkGroup = blkGroupCellIndex;
idxInInternalBlk = blkGroupCellIndex / policy->getNumDataUnits();
idxInStripe = static_cast<int>(blkGroupCellIndex -
idxInInternalBlk * policy->getNumDataUnits());
offset = cellOffset;
size = cellSize;
size = portionSize;
}

void init(shared_ptr<ECPolicy> ecPolicy, int cellSize, long idxInBlkGroup,
void init(shared_ptr<ECPolicy> policy, int portionSize, long blkGroupCellIndex,
long cellOffset) {
ecPolicy = ecPolicy;
idxInBlkGroup = idxInBlkGroup;
idxInInternalBlk = idxInBlkGroup / ecPolicy->getNumDataUnits();
idxInStripe = static_cast<int>(idxInBlkGroup -
idxInInternalBlk * ecPolicy->getNumDataUnits());
ecPolicy = policy;
idxInBlkGroup = blkGroupCellIndex;
idxInInternalBlk = blkGroupCellIndex / policy->getNumDataUnits();
idxInStripe = static_cast<int>(blkGroupCellIndex -
idxInInternalBlk * policy->getNumDataUnits());
offset = cellOffset;
size = cellSize;
size = portionSize;
}

int getIdxInStripe() const {
Expand Down Expand Up @@ -426,9 +426,16 @@ class StripedBlockUtil {
static void checkBlocks(ExtendedBlock blockGroup, int i, ExtendedBlock blocki);
static void constructInternalBlock(LocatedBlock & bg, int32_t idxInReturnedLocs, int32_t cellSize,
int32_t dataBlkNum, int32_t idxInBlockGroup, LocatedBlock & lb);
/**
* @param bufBaseOffset byte offset in buf where the logical range
* [rangeStartInBlockGroup, rangeEndInBlockGroup] is laid out contiguously.
* Use 0 when buf only holds that range; use (offsetInBlockGroup % stripeLen)
* when buf is the full stripe buffer in StripedInputStreamImpl::readOneStripe.
*/
static void divideOneStripe(shared_ptr<ECPolicy> ecPolicy,
int cellSize, LocatedBlock & blockGroup, long rangeStartInBlockGroup,
long rangeEndInBlockGroup, ByteBuffer * buf, std::vector<AlignedStripe*> & stripes);
long rangeEndInBlockGroup, ByteBuffer * buf, int32_t bufBaseOffset,
std::vector<AlignedStripe*> & stripes);

static int64_t getInternalBlockLength(int64_t dataSize, int32_t cellSize,
int32_t numDataBlocks, int32_t idxInBlockGroup);
Expand Down
15 changes: 12 additions & 3 deletions src/client/StripedInputStreamImpl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

#include <inttypes.h>
#include <algorithm>
#include <cstring>


namespace Hdfs {
Expand Down Expand Up @@ -108,6 +109,14 @@ void StripedInputStreamImpl::resetCurStripeBuffer(bool force) {

if (curStripeBuf != nullptr) {
curStripeBuf->clear();
// EC stripes can have logical holes (ALLZERO / padding). Zero the whole
// buffer so copyToTarget never exposes stale bytes from a prior stripe.
std::memset(curStripeBuf->getBuffer(), 0, curStripeBuf->capacity());
}

if (parityBuf != nullptr) {
parityBuf->clear();
std::memset(parityBuf->getBuffer(), 0, parityBuf->capacity());
}

curStripeRange->setLength(0);
Expand Down Expand Up @@ -255,7 +264,7 @@ void StripedInputStreamImpl::fetchBlockByteRange(shared_ptr<LocatedBlock> curBlo
std::vector<StripedBlockUtil::AlignedStripe*> stripes;
StripedBlockUtil::divideOneStripe(
ecPolicy, cellSize, *blockGroup, start,
end, byteBuffer.get(), stripes);
end, byteBuffer.get(), 0, stripes);
std::vector<LocatedBlock> blks;
StripedBlockUtil::parseStripedBlockGroup(*blockGroup, cellSize, dataBlkNum, parityBlkNum, blks);
for (int i = 0; i < static_cast<int>(blockGroup->getIndices().size()); ++i) {
Expand Down Expand Up @@ -322,8 +331,8 @@ void StripedInputStreamImpl::readOneStripe() {
int64_t stripeRangeLen = stripeLimit - stripeBufOffset;
std::vector<StripedBlockUtil::AlignedStripe*> stripes;
StripedBlockUtil::divideOneStripe(
ecPolicy, cellSize, *curBlock, offsetInBlockGroup,
offsetInBlockGroup + stripeRangeLen - 1, curStripeBuf, stripes);
ecPolicy, cellSize, *curBlock, offsetInBlockGroup,
offsetInBlockGroup + stripeRangeLen - 1, curStripeBuf, stripeBufOffset, stripes);

// 3. read stripe
for (int i = 0; i < static_cast<int>(stripes.size()); ++i) {
Expand Down