-
Notifications
You must be signed in to change notification settings - Fork 476
WIP - Preserve duplicate keys for scan resumptions #5957
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Open
DomGarguilo
wants to merge
13
commits into
apache:2.1
Choose a base branch
from
DomGarguilo:droppedKeysFix
base: 2.1
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
+444
−11
Open
Changes from all commits
Commits
Show all changes
13 commits
Select commit
Hold shift + click to select a range
2fe1a3e
Add test case that proves bug
DomGarguilo ccbea52
Preserve duplicate keys for scan resumptions
DomGarguilo 72fcfd9
Merge remote-tracking branch 'upstream/2.1' into droppedKeysFix
DomGarguilo b32c036
Merge remote-tracking branch 'upstream/2.1' into droppedKeysFix
DomGarguilo f8d426e
Merge remote-tracking branch 'upstream/2.1' into droppedKeysFix
DomGarguilo fc8b6c9
Fix hanging bug, keep dup keys across boundaries
DomGarguilo 0f01109
Revert unrelated change
DomGarguilo dd3b3d3
add new test class
DomGarguilo c87e52e
Merge remote-tracking branch 'upstream/2.1' into droppedKeysFix
DomGarguilo cd44ba1
Merge remote-tracking branch 'upstream/2.1' into droppedKeysFix
DomGarguilo 09dec0d
Merge remote-tracking branch 'upstream/2.1' into droppedKeysFix
DomGarguilo 004d94c
Merge remote-tracking branch 'upstream/2.1' into droppedKeysFix
DomGarguilo 9a7a523
Allow scan to grow via new prop
DomGarguilo File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -60,6 +60,7 @@ | |
| import org.apache.accumulo.server.conf.TableConfiguration; | ||
| import org.apache.accumulo.server.fs.TooManyFilesException; | ||
| import org.apache.accumulo.tserver.InMemoryMap; | ||
| import org.apache.accumulo.tserver.MemKey; | ||
| import org.apache.accumulo.tserver.TabletHostingServer; | ||
| import org.apache.accumulo.tserver.TabletServerResourceManager; | ||
| import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics; | ||
|
|
@@ -276,8 +277,8 @@ void recordScanTrace(Span span, List<KVEntry> batch, ScanParameters scanParamete | |
| } | ||
| } | ||
|
|
||
| Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams) | ||
| throws IOException { | ||
| Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams, | ||
| int duplicatesToSkip) throws IOException { | ||
|
|
||
| // log.info("In nextBatch.."); | ||
|
|
||
|
|
@@ -297,9 +298,33 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParamet | |
| long resultBytes = 0L; | ||
|
|
||
| long maxResultsSize = getTableConfiguration().getAsBytes(Property.TABLE_SCAN_MAXMEM); | ||
| int duplicateBatchMultiplier = | ||
| getTableConfiguration().getCount(Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER); | ||
| if (duplicateBatchMultiplier < 1) { | ||
| duplicateBatchMultiplier = 1; | ||
| } | ||
| long maxResultsSizeWithDuplicates = maxResultsSize; | ||
| long maxEntriesWithDuplicates = scanParams.getMaxEntries(); | ||
| if (duplicateBatchMultiplier > 1) { | ||
| try { | ||
| maxResultsSizeWithDuplicates = | ||
| Math.multiplyExact(maxResultsSize, (long) duplicateBatchMultiplier); | ||
| } catch (ArithmeticException e) { | ||
| maxResultsSizeWithDuplicates = Long.MAX_VALUE; | ||
| // TODO maybe log that this happened? Deduped somehow? | ||
| } | ||
| try { | ||
| maxEntriesWithDuplicates = | ||
| Math.multiplyExact(scanParams.getMaxEntries(), (long) duplicateBatchMultiplier); | ||
| } catch (ArithmeticException e) { | ||
| maxEntriesWithDuplicates = Long.MAX_VALUE; | ||
| // TODO maybe log that this happened? Deduped somehow? | ||
| } | ||
| } | ||
|
|
||
| Key continueKey = null; | ||
| boolean skipContinueKey = false; | ||
| boolean skipContinueKey = true; | ||
| boolean resumeOnSameKey = false; | ||
|
|
||
| YieldCallback<Key> yield = new YieldCallback<>(); | ||
|
|
||
|
|
@@ -314,32 +339,73 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParamet | |
| iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), true); | ||
| } | ||
|
|
||
| skipReturnedDuplicates(iter, duplicatesToSkip, range); | ||
|
|
||
| Key rangeStartKey = range.getStartKey(); | ||
| Key currentKey = null; | ||
| boolean resumingOnSameKey = | ||
| iter.hasTop() && rangeStartKey != null && rangeStartKey.equals(iter.getTopKey()); | ||
| int previousDuplicates = resumingOnSameKey ? duplicatesToSkip : 0; | ||
| int duplicatesReturnedForCurrentKey = 0; | ||
| Key cutKey = null; | ||
| boolean cutPending = false; | ||
|
|
||
| while (iter.hasTop()) { | ||
| if (yield.hasYielded()) { | ||
| throw new IOException( | ||
| "Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset()); | ||
| } | ||
| value = iter.getTopValue(); | ||
| key = iter.getTopKey(); | ||
| if (cutPending && !key.equals(cutKey)) { | ||
| continueKey = copyResumeKey(cutKey); | ||
| resumeOnSameKey = true; | ||
| skipContinueKey = false; | ||
| break; | ||
| } | ||
| if (!key.equals(currentKey)) { | ||
| currentKey = copyResumeKey(key); | ||
| if (resumingOnSameKey && key.equals(rangeStartKey)) { | ||
| duplicatesReturnedForCurrentKey = previousDuplicates; | ||
| } else { | ||
| duplicatesReturnedForCurrentKey = 0; | ||
| resumingOnSameKey = false; | ||
| } | ||
| } | ||
|
|
||
| KVEntry kvEntry = new KVEntry(key, value); // copies key and value | ||
| results.add(kvEntry); | ||
| resultSize += kvEntry.estimateMemoryUsed(); | ||
| resultBytes += kvEntry.numBytes(); | ||
|
|
||
| duplicatesReturnedForCurrentKey++; | ||
|
|
||
| if (cutPending && (resultSize >= maxResultsSizeWithDuplicates | ||
| || results.size() >= maxEntriesWithDuplicates)) { | ||
| throw new IllegalStateException("Duplicate key run exceeded scan batch growth limit for " | ||
| + cutKey + ". Increase " + Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey() | ||
| + " or reduce duplicates for this key."); | ||
| } | ||
|
|
||
| boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun; | ||
|
|
||
| if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) { | ||
| continueKey = new Key(key); | ||
| skipContinueKey = true; | ||
| break; | ||
| if (!cutPending) { | ||
| cutPending = true; | ||
| cutKey = currentKey; | ||
| } else if (timesUp) { | ||
| throw new IllegalStateException("Duplicate key run exceeded scan batch timeout for " | ||
| + cutKey + ". Increase " + Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey() | ||
| + " or batch timeout, or reduce duplicates for this key."); | ||
| } | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This if stmt used to have a break in it, seems like that will still be needed. When in here we have finished batch and need to break out of the loop. |
||
| } | ||
|
|
||
| iter.next(); | ||
| } | ||
|
|
||
| if (yield.hasYielded()) { | ||
| continueKey = new Key(yield.getPositionAndReset()); | ||
| continueKey = copyResumeKey(yield.getPositionAndReset()); | ||
| resumeOnSameKey = false; | ||
| skipContinueKey = true; | ||
| if (!range.contains(continueKey)) { | ||
| throw new IOException("Underlying iterator yielded to a position outside of its range: " | ||
|
|
@@ -362,7 +428,9 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParamet | |
| } | ||
| } | ||
|
|
||
| return new Batch(skipContinueKey, results, continueKey, resultBytes); | ||
| int duplicatesToSkipForNextBatch = resumeOnSameKey ? duplicatesReturnedForCurrentKey : 0; | ||
| return new Batch(skipContinueKey, results, continueKey, resultBytes, | ||
| duplicatesToSkipForNextBatch); | ||
| } | ||
|
|
||
| private Tablet.LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges, | ||
|
|
@@ -515,7 +583,8 @@ private void handleTabletClosedDuringScan(List<KVEntry> results, Tablet.LookupRe | |
|
|
||
| private void addUnfinishedRange(Tablet.LookupResult lookupResult, Range range, Key key) { | ||
| if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) { | ||
| Range nlur = new Range(new Key(key), false, range.getEndKey(), range.isEndKeyInclusive()); | ||
| Key copy = copyResumeKey(key); | ||
| Range nlur = new Range(copy, false, range.getEndKey(), range.isEndKeyInclusive()); | ||
| lookupResult.unfinishedRanges.add(nlur); | ||
| } | ||
| } | ||
|
|
@@ -526,4 +595,30 @@ public synchronized void updateQueryStats(int size, long numBytes) { | |
| this.queryResultBytes.addAndGet(numBytes); | ||
| this.server.getScanMetrics().incrementQueryResultBytes(numBytes); | ||
| } | ||
|
|
||
| private Key copyResumeKey(Key key) { | ||
| if (key instanceof MemKey) { | ||
| MemKey memKey = (MemKey) key; | ||
| return new MemKey(memKey, memKey.getKVCount()); | ||
| } | ||
| return new Key(key); | ||
| } | ||
|
|
||
| private void skipReturnedDuplicates(SortedKeyValueIterator<Key,Value> iter, int duplicatesToSkip, | ||
| Range range) throws IOException { | ||
| if (duplicatesToSkip <= 0 || !range.isStartKeyInclusive()) { | ||
| return; | ||
| } | ||
|
|
||
| Key startKey = range.getStartKey(); | ||
| if (startKey == null) { | ||
| return; | ||
| } | ||
|
|
||
| int skipped = 0; | ||
| while (skipped < duplicatesToSkip && iter.hasTop() && iter.getTopKey().equals(startKey)) { | ||
| iter.next(); | ||
| skipped++; | ||
| } | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Uh oh!
There was an error while loading. Please reload this page.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
These changes are adding a lot of new work per key/value read. Normally that work will be wasted effort and never needed. Instead of doing that, could instead do a check when closing off batch to see if we are in this situation where the keys are the same. Maybe something like the following, not sure how this would change the rest of the code. But hopefully this strategy could lead to not adding any new work for each key/value.