Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.hbase.Tag;
import org.apache.hadoop.hbase.io.util.Dictionary;
import org.apache.hadoop.hbase.io.util.StreamUtils;
Expand All @@ -39,6 +41,9 @@
@InterfaceAudience.Private
public class TagCompressionContext {
private final Dictionary tagDict;
private boolean deferAdditions = false;
private final List<byte[]> deferredAdditions = new ArrayList<>();
private int deferredBaseIndex;

public TagCompressionContext(Class<? extends Dictionary> dictType, int dictCapacity)
throws SecurityException, NoSuchMethodException, InstantiationException, IllegalAccessException,
Expand All @@ -52,6 +57,27 @@ public void clear() {
tagDict.clear();
}

public void setDeferAdditions(boolean defer) {
this.deferAdditions = defer;
if (defer) {
deferredBaseIndex = tagDict.size();
deferredAdditions.clear();
}
}

public void commitDeferredAdditions() {
for (byte[] entry : deferredAdditions) {
tagDict.addEntry(entry, 0, entry.length);
}
deferredAdditions.clear();
deferAdditions = false;
}

public void clearDeferredAdditions() {
deferredAdditions.clear();
deferAdditions = false;
}

/**
* Compress tags one by one and writes to the OutputStream.
* @param out Stream to which the compressed tags to be written
Expand Down Expand Up @@ -112,11 +138,17 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
int tagLen = StreamUtils.readRawVarint32(src);
offset = Bytes.putAsShort(dest, offset, tagLen);
IOUtils.readFully(src, dest, offset, tagLen);
tagDict.addEntry(dest, offset, tagLen);
if (deferAdditions) {
byte[] copy = new byte[tagLen];
System.arraycopy(dest, offset, copy, 0, tagLen);
deferredAdditions.add(copy);
} else {
tagDict.addEntry(dest, offset, tagLen);
}
offset += tagLen;
} else {
short dictIdx = StreamUtils.toShort(status, StreamUtils.readByte(src));
byte[] entry = tagDict.getEntry(dictIdx);
byte[] entry = getDeferredOrDictEntry(dictIdx);
if (entry == null) {
throw new IOException("Missing dictionary entry for index " + dictIdx);
}
Expand All @@ -127,6 +159,20 @@ public void uncompressTags(InputStream src, byte[] dest, int offset, int length)
}
}

private byte[] getDeferredOrDictEntry(short dictIdx) {
if (deferAdditions) {
int deferredIdx = dictIdx - deferredBaseIndex;
if (deferredIdx >= 0 && deferredIdx < deferredAdditions.size()) {
return deferredAdditions.get(deferredIdx);
}
}
try {
return tagDict.getEntry(dictIdx);
} catch (IndexOutOfBoundsException e) {
return null;
}
}

/**
* Uncompress tags from the input ByteBuffer and writes to the destination array.
* @param src Buffer where the compressed tags are available
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,11 @@ public interface Dictionary {
*/
short addEntry(byte[] data, int offset, int length);

/**
* Returns the number of entries in the dictionary.
*/
int size();

/**
* Flushes the dictionary, empties all values.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,11 @@ private short addEntryInternal(byte[] data, int offset, int length, boolean copy
return backingStore.put(data, offset, length, copy);
}

@Override
public int size() {
return backingStore.currSize;
}

@Override
public void clear() {
backingStore.clear();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ public class ProtobufWALTailingReader extends AbstractProtobufWALReader

private DelegatingInputStream delegatingInput;

private Entry pendingEntry = null;
private int pendingRemainingCells = 0;
private long pendingResumePosition = -1;

private static final class ReadWALKeyResult {
final State state;
final Entry entry;
Expand Down Expand Up @@ -184,37 +188,55 @@ private Result editError() {
}

private Result readWALEdit(Entry entry, int followingKvCount) {
return readCellsIntoEntry(entry, followingKvCount, false);
}

private Result readCellsIntoEntry(Entry entry, int remainingCells, boolean isResume) {
long posBefore;
try {
posBefore = inputStream.getPos();
} catch (IOException e) {
LOG.warn("failed to get position", e);
return State.ERROR_AND_RESET.getResult();
}
if (followingKvCount == 0) {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
posBefore);
if (remainingCells == 0) {
if (!isResume) {
LOG.trace("WALKey has no KVs that follow it; trying the next one. current offset={}",
posBefore);
}
return new Result(State.NORMAL, entry, posBefore);
}
int actualCells;
try {
actualCells = entry.getEdit().readFromCells(cellDecoder, followingKvCount);
} catch (Exception e) {
String message = " while reading " + followingKvCount + " WAL KVs; started reading at "
+ posBefore + " and read up to " + getPositionQuietly();
IOException realEofEx = extractHiddenEof(e);
if (realEofEx != null) {
LOG.warn("EOF " + message, realEofEx);
return editEof();
} else {
LOG.warn("Error " + message, e);
long lastGoodPos = posBefore;
int cellsRead = 0;
for (int i = 0; i < remainingCells; i++) {
try {
lastGoodPos = inputStream.getPos();
} catch (IOException e) {
LOG.warn("failed to get position before cell read", e);
return editError();
}
}
if (actualCells != followingKvCount) {
LOG.warn("Only read {} cells, expected {}; started reading at {} and read up to {}",
actualCells, followingKvCount, posBefore, getPositionQuietly());
return editEof();
boolean advanced;
try {
advanced = cellDecoder.advance();
} catch (Exception e) {
IOException realEofEx = extractHiddenEof(e);
if (realEofEx != null) {
LOG.debug("EOF after reading {} of {} cells; started reading at {}, last good pos={}",
cellsRead, remainingCells, posBefore, lastGoodPos, realEofEx);
return savePendingAndReturnEof(entry, remainingCells - cellsRead, lastGoodPos);
} else {
LOG.warn("Error after reading {} of {} cells; started reading at {}, read up to {}",
cellsRead, remainingCells, posBefore, getPositionQuietly(), e);
return editError();
}
}
if (!advanced) {
LOG.debug("EOF (advance returned false) after reading {} of {} cells; started at {},"
+ " last good pos={}", cellsRead, remainingCells, posBefore, lastGoodPos);
return savePendingAndReturnEof(entry, remainingCells - cellsRead, lastGoodPos);
}
entry.getEdit().add(cellDecoder.current());
cellsRead++;
}
long posAfter;
try {
Expand All @@ -231,8 +253,45 @@ private Result readWALEdit(Entry entry, int followingKvCount) {
return new Result(State.NORMAL, entry, posAfter);
}

private Result savePendingAndReturnEof(Entry entry, int remaining, long resumePos) {
if (hasCompression) {
pendingEntry = entry;
pendingRemainingCells = remaining;
pendingResumePosition = resumePos;
return new Result(State.EOF_AND_RESET, null, resumePos);
}
return editEof();
}

private void clearPendingState() {
pendingEntry = null;
pendingRemainingCells = 0;
pendingResumePosition = -1;
}

@Override
public Result next(long limit) {
if (pendingEntry != null) {
long originalPosition;
try {
originalPosition = inputStream.getPos();
} catch (IOException e) {
LOG.warn("failed to get position", e);
clearPendingState();
return State.EOF_AND_RESET.getResult();
}
if (limit < 0) {
delegatingInput.setDelegate(inputStream);
} else if (limit <= originalPosition) {
return State.EOF_AND_RESET.getResult();
} else {
delegatingInput.setDelegate(ByteStreams.limit(inputStream, limit - originalPosition));
}
Entry entry = pendingEntry;
int remaining = pendingRemainingCells;
clearPendingState();
return readCellsIntoEntry(entry, remaining, true);
}
long originalPosition;
try {
originalPosition = inputStream.getPos();
Expand Down Expand Up @@ -268,6 +327,13 @@ private void skipHeader(FSDataInputStream stream) throws IOException {

@Override
public void resetTo(long position, boolean resetCompression) throws IOException {
if (resetCompression) {
clearPendingState();
}
long seekPosition = position;
if (!resetCompression && pendingResumePosition > 0) {
seekPosition = pendingResumePosition;
}
close();
Pair<FSDataInputStream, FileStatus> pair = open();
boolean resetSucceed = false;
Expand All @@ -283,6 +349,7 @@ public void resetTo(long position, boolean resetCompression) throws IOException
if (compressionCtx != null) {
compressionCtx.clear();
}
clearPendingState();
skipHeader(inputStream);
} else if (resetCompression && compressionCtx != null) {
// clear compressCtx and skip to the expected position, to fill up the dictionary
Expand All @@ -293,7 +360,7 @@ public void resetTo(long position, boolean resetCompression) throws IOException
}
} else {
// just seek to the expected position
inputStream.seek(position);
inputStream.seek(seekPosition);
}
resetSucceed = true;
} finally {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.Cell;
import org.apache.hadoop.hbase.ExtendedCell;
Expand Down Expand Up @@ -280,15 +282,77 @@ static class CompressedKvDecoder extends BaseDecoder {
private final boolean hasValueCompression;
private final boolean hasTagCompression;

// When the WAL tailing reader hits EOF mid-cell, the compression dictionaries must remain
// in the state they were after the last fully-read cell. Otherwise the reader would need
// an expensive O(n) reset (re-read from the start of the file to rebuild dictionary state).
// To achieve this, dictionary additions for ROW, FAMILY, and QUALIFIER are buffered here
// and only flushed on successful cell parse. On failure, they are discarded.
// Tag dictionary additions are deferred similarly via TagCompressionContext.
private final List<PendingDictAddition> pendingDictAdditions = new ArrayList<>();

// Tracks whether we are in the value decompression phase of parseCellInner(), so that on
// IOException we know whether the ValueCompressor's internal state needs to be reset.
private boolean readingValue = false;

private static class PendingDictAddition {
final Dictionary dict;
final byte[] data;
final int offset;
final int length;

PendingDictAddition(Dictionary dict, byte[] data, int offset, int length) {
this.dict = dict;
this.data = data;
this.offset = offset;
this.length = length;
}
}

public CompressedKvDecoder(InputStream in, CompressionContext compression) {
super(in);
this.compression = compression;
this.hasValueCompression = compression.hasValueCompression();
this.hasTagCompression = compression.hasTagCompression();
}

private void commitPendingAdditions() {
for (PendingDictAddition pending : pendingDictAdditions) {
pending.dict.addEntry(pending.data, pending.offset, pending.length);
}
pendingDictAdditions.clear();
if (hasTagCompression) {
compression.tagCompressionContext.commitDeferredAdditions();
}
}

private void clearPendingAdditions() {
pendingDictAdditions.clear();
if (hasTagCompression) {
compression.tagCompressionContext.clearDeferredAdditions();
}
}

@Override
protected ExtendedCell parseCell() throws IOException {
clearPendingAdditions();
if (hasTagCompression) {
compression.tagCompressionContext.setDeferAdditions(true);
}
readingValue = false;
try {
ExtendedCell cell = parseCellInner();
commitPendingAdditions();
return cell;
} catch (IOException e) {
clearPendingAdditions();
if (readingValue && hasValueCompression) {
compression.getValueCompressor().clear();
}
throw e;
}
}

private ExtendedCell parseCellInner() throws IOException {
int keylength = StreamUtils.readRawVarint32(in);
int vlength = StreamUtils.readRawVarint32(in);
int tagsLength = StreamUtils.readRawVarint32(in);
Expand Down Expand Up @@ -334,7 +398,9 @@ protected ExtendedCell parseCell() throws IOException {
pos = Bytes.putByte(backingArray, pos, (byte) in.read());
int valLen = typeValLen - 1;
if (hasValueCompression) {
readingValue = true;
readCompressedValue(in, backingArray, pos, valLen);
readingValue = false;
pos += valLen;
} else {
IOUtils.readFully(in, backingArray, pos, valLen);
Expand All @@ -359,7 +425,7 @@ private int readIntoArray(byte[] to, int offset, Dictionary dict) throws IOExcep
// if this isn't in the dictionary, we need to add to the dictionary.
int length = StreamUtils.readRawVarint32(in);
IOUtils.readFully(in, to, offset, length);
dict.addEntry(to, offset, length);
pendingDictAdditions.add(new PendingDictAddition(dict, to, offset, length));
return length;
} else {
// the status byte also acts as the higher order byte of the dictionary entry.
Expand Down
Loading