-
Notifications
You must be signed in to change notification settings - Fork 7
(Not yet upstream) Reset to last successful cell on EOF with WAL Compression #236
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: hubspot-2.6
Are you sure you want to change the base?
Conversation
|
This hasn't been reviewed with an upstream PR, so I'd really appreciate any feedback on this change. I have verified this hasn't caused any data loss by doing a full table comparison after a few hours of running this in QA. |
charlesconnell
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I recommend changing the title+desc to make clear that this affects tag compression only, which uses a different scheme than compression on other parts of the WAL.
I'm a little unconvinced that this is the best approach to solve the problem. Dictionary is just a glorified map. Additions to the map are effectively atomic (you're not adding half a tag value to it, it's all or nothing). What condition makes it contain invalid data?
This does not only affect tag compression. Rather, the solution affects all compression, including both key and value compression. Those changes are primarily in WalCellCodec |
The Dictionary is using a LRU cache to save data. Therefore, if we read a partial entry, we can read only some cells. This will cause the oldest cells to be evicted, resulting in a "corrupt" cache if we simply seek back to the old starting point. |
|
A relevant PR and JIRA is here, which introduced support for WAL Compression and Replication to be turned on simultaneously. This PR attempts to speed up that process. |
hbase-server/src/main/java/org/apache/hadoop/hbase/regionserver/wal/WALCellCodec.java
Show resolved
Hide resolved
| } | ||
| try { | ||
| return tagDict.getEntry(dictIdx); | ||
| } catch (IndexOutOfBoundsException e) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ooc -- why is this necessary now?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If we were referencing a tag that was previously in the cell, we would need to search the deferred additions to find that tag. This handles that case.
Theoretically, this should be impossible because there should only be at most one of each tag, but this was some defensive programming just in case.
| public class TagCompressionContext { | ||
| private final Dictionary tagDict; | ||
| private boolean deferAdditions = false; | ||
| private List<byte[]> deferredAdditions; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
If you initialize this to an empty ArrayList rather than null, I think it simplifies some of the logic below
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Good call, fixed.
I know we had discussed this, but now I'm looking at the code and I'm forgetting why we can't just continue reading from the offset, and continue appending to the LRU. When we read the values, first we read the length of the tag/value. We should only insert valid entries into the dictionary. Additionally, I'm forgetting why we can't just seek back to the position we were last at. It's not clear to me why the evicted value is important if we just pick up from where we left off. We definitely discussed this yesterday but my memory is admittedly fuzzy, mind elaborating a bit? |
Here's an example: Before reading a cell, both writer and reader dictionaries are in sync: Reader starts reading a cell. ROW = "row-3" is NOT_IN_DICTIONARY on the wire, so the reader reads the raw bytes and calls addEntry("row-3") — added at index 3, currSize = 4. Then EOF hits while reading FAMILY. Reader seeks back to cell start and retries. The stream still starts with NOT_IN_DICTIONARY + "row-3". The reader calls addEntry("row-3") again — added at index 4, currSize = 5. Now the reader has: But the writer has: From this point on, every index the writer assigns is off-by-one from what the reader expects. The next new entry the writer encodes goes at index 4, but the reader's index 4 is the duplicate "row-3". The reader will decode garbage or throw. The same problem gets worse at capacity — extra entries cause different LRU evictions, so the writer and reader disagree on which entry gets evicted and which index gets reused. |
|
For reference, I have now opened an upstream PR here. |
|
ah yup, thanks for writing out the example |
Description
When replication tails a WAL with compression enabled, hitting EOF mid-entry currently triggers EOF_AND_RESET_COMPRESSION. This clears all compression dictionaries, reopens the file from the beginning, and re-reads every entry to rebuild dictionary state — an O(n) operation that grows more expensive the further into the file we are.
This change defers dictionary updates in
CompressedKvDecoderuntil an entire cell is successfully parsed. If EOF interrupts mid-cell, the pending additions are discarded and all dictionaries remain in the exact state they were after the last fully-read cell. This allowsProtobufWALTailingReaderto returnEOF_AND_RESETinstead ofEOF_AND_RESET_COMPRESSION, replacing the O(n) replay with an O(1) seek to the last cell boundary.How it works
CompressedKvDecoder.parseCell()now buffers dictionary additions in a pending list, and putsTagCompressionContextinto deferred mode. On successful parse, all additions are committed. On failure, they're discarded.ProtobufWALTailingReaderswitches from batch cell reading (readFromCells) to cell-by-cell reading, tracking the stream position before each cell. On EOF mid-entry, it saves the partially-read entry, remaining cell count, and resume position. On the nextnext()call after reset, it resumes reading only the remaining cells.Dictionaryinterface gains asize()method so TagCompressionContext can resolve within-cell dictionary references against deferred entries (when a later tag in the same cell was encoded as a dictionary hit referencing an earlier tag that hasn't been committed yet). This is defensive -- it is currently not possible to have a Put with two tags of the same byte on the same cell, but IMO it's better to be safe here.Tests