Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1198,6 +1198,12 @@ public enum Property {
"The maximum amount of memory that will be used to cache results of a client query/scan. "
+ "Once this limit is reached, the buffered data is sent to the client.",
"1.3.5"),
TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER("table.scan.batch.duplicate.max.multiplier", "3",
PropertyType.COUNT,
"When a scan batch would end on a duplicate key, allow the batch to grow by this "
+ "multiplier of the scan batch size and table scan max memory to avoid splitting duplicate keys. "
+ "If the duplicate run still exceeds this limit, the scan fails to avoid dropping keys.",
"2.1.5"),
TABLE_SHUFFLE_SOURCES("table.shuffle.sources", "false", PropertyType.BOOLEAN,
"Shuffle the opening order for Rfiles to reduce thread contention on file open operations.",
"2.1.5"),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,15 @@ final class Batch {
private final List<KVEntry> results;
private final Key continueKey;
private final long numBytes;
private final int duplicatesToSkip;

Batch(boolean skipContinueKey, List<KVEntry> results, Key continueKey, long numBytes) {
Batch(boolean skipContinueKey, List<KVEntry> results, Key continueKey, long numBytes,
int duplicatesToSkip) {
this.skipContinueKey = skipContinueKey;
this.results = results;
this.continueKey = continueKey;
this.numBytes = numBytes;
this.duplicatesToSkip = duplicatesToSkip;
}

public boolean isSkipContinueKey() {
Expand All @@ -50,4 +53,8 @@ public Key getContinueKey() {
public long getNumBytes() {
return numBytes;
}

public int getDuplicatesToSkip() {
return duplicatesToSkip;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -62,6 +62,7 @@ public class Scanner {
private final AtomicBoolean interruptFlag;

private boolean readInProgress = false;
private int duplicatesToSkip = 0;

Scanner(TabletBase tablet, Range range, ScanParameters scanParams, AtomicBoolean interruptFlag) {
this.tablet = tablet;
Expand Down Expand Up @@ -138,7 +139,8 @@ private Pair<ScanBatch,ScanDataSource> readInternal() throws IOException, Tablet
iter = new SourceSwitchingIterator(dataSource, false);
}

results = tablet.nextBatch(iter, range, scanParams);
results = tablet.nextBatch(iter, range, scanParams, duplicatesToSkip);
duplicatesToSkip = 0;

if (results.getResults() == null) {
range = null;
Expand All @@ -148,6 +150,7 @@ private Pair<ScanBatch,ScanDataSource> readInternal() throws IOException, Tablet
} else {
range = new Range(results.getContinueKey(), !results.isSkipContinueKey(), range.getEndKey(),
range.isEndKeyInclusive());
duplicatesToSkip = results.getDuplicatesToSkip();
return new Pair<>(new ScanBatch(results.getResults(), true), dataSource);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import org.apache.accumulo.server.conf.TableConfiguration;
import org.apache.accumulo.server.fs.TooManyFilesException;
import org.apache.accumulo.tserver.InMemoryMap;
import org.apache.accumulo.tserver.MemKey;
import org.apache.accumulo.tserver.TabletHostingServer;
import org.apache.accumulo.tserver.TabletServerResourceManager;
import org.apache.accumulo.tserver.metrics.TabletServerScanMetrics;
Expand Down Expand Up @@ -276,8 +277,8 @@ void recordScanTrace(Span span, List<KVEntry> batch, ScanParameters scanParamete
}
}

Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams)
throws IOException {
Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParameters scanParams,
int duplicatesToSkip) throws IOException {

// log.info("In nextBatch..");

Expand All @@ -297,9 +298,33 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParamet
long resultBytes = 0L;

long maxResultsSize = getTableConfiguration().getAsBytes(Property.TABLE_SCAN_MAXMEM);
int duplicateBatchMultiplier =
getTableConfiguration().getCount(Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER);
if (duplicateBatchMultiplier < 1) {
duplicateBatchMultiplier = 1;
}
long maxResultsSizeWithDuplicates = maxResultsSize;
long maxEntriesWithDuplicates = scanParams.getMaxEntries();
if (duplicateBatchMultiplier > 1) {
try {
maxResultsSizeWithDuplicates =
Math.multiplyExact(maxResultsSize, (long) duplicateBatchMultiplier);
} catch (ArithmeticException e) {
maxResultsSizeWithDuplicates = Long.MAX_VALUE;
// TODO maybe log that this happened? Deduped somehow?
}
try {
maxEntriesWithDuplicates =
Math.multiplyExact(scanParams.getMaxEntries(), (long) duplicateBatchMultiplier);
} catch (ArithmeticException e) {
maxEntriesWithDuplicates = Long.MAX_VALUE;
// TODO maybe log that this happened? Deduped somehow?
}
}

Key continueKey = null;
boolean skipContinueKey = false;
boolean skipContinueKey = true;
boolean resumeOnSameKey = false;

YieldCallback<Key> yield = new YieldCallback<>();

Expand All @@ -314,32 +339,73 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParamet
iter.seek(range, LocalityGroupUtil.families(scanParams.getColumnSet()), true);
}

skipReturnedDuplicates(iter, duplicatesToSkip, range);

Key rangeStartKey = range.getStartKey();
Key currentKey = null;
boolean resumingOnSameKey =
iter.hasTop() && rangeStartKey != null && rangeStartKey.equals(iter.getTopKey());
int previousDuplicates = resumingOnSameKey ? duplicatesToSkip : 0;
int duplicatesReturnedForCurrentKey = 0;
Key cutKey = null;
boolean cutPending = false;

while (iter.hasTop()) {
if (yield.hasYielded()) {
throw new IOException(
"Coding error: hasTop returned true but has yielded at " + yield.getPositionAndReset());
}
value = iter.getTopValue();
key = iter.getTopKey();
if (cutPending && !key.equals(cutKey)) {
continueKey = copyResumeKey(cutKey);
resumeOnSameKey = true;
skipContinueKey = false;
break;
}
if (!key.equals(currentKey)) {
currentKey = copyResumeKey(key);
if (resumingOnSameKey && key.equals(rangeStartKey)) {
duplicatesReturnedForCurrentKey = previousDuplicates;
} else {
duplicatesReturnedForCurrentKey = 0;
resumingOnSameKey = false;
}
}

KVEntry kvEntry = new KVEntry(key, value); // copies key and value
results.add(kvEntry);
resultSize += kvEntry.estimateMemoryUsed();
resultBytes += kvEntry.numBytes();

duplicatesReturnedForCurrentKey++;

if (cutPending && (resultSize >= maxResultsSizeWithDuplicates
|| results.size() >= maxEntriesWithDuplicates)) {
throw new IllegalStateException("Duplicate key run exceeded scan batch growth limit for "
+ cutKey + ". Increase " + Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey()
+ " or reduce duplicates for this key.");
}

boolean timesUp = batchTimeOut > 0 && (System.nanoTime() - startNanos) >= timeToRun;

if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) {
continueKey = new Key(key);
skipContinueKey = true;
break;
if (!cutPending) {
Copy link
Contributor

@keith-turner keith-turner Jan 16, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These changes are adding a lot of new work per key/value read. Normally that work will be wasted effort and never needed. Instead of doing that, could instead do a check when closing off batch to see if we are in this situation where the keys are the same. Maybe something like the following, not sure how this would change the rest of the code. But hopefully this strategy could lead to not adding any new work for each key/value.

if (resultSize >= maxResultsSize || results.size() >= scanParams.getMaxEntries() || timesUp) {
   // closing off a batch, need to see if the last key in the batch and the next unread key are the same
   if(iter.hasTop()) {
          iter.next();
          if(iter.hasTop() && results.get(results.size()-1).getKey().equals(iter.getTopKey())) {
            // the last key in the batch and the next key match, so can not terminate batch yet...

            // not sure what this should do... it could start looping here or set a boolean that
            // modifies the outer loop behavior... could also read the config here to determine what to
            // do... in any case do not need to do extra work until now

            // The best thing do to for efficiency is probably to completely handle the case of duplicates here,
            // maybe call a function that reads the duplicate related config and loops.  This would mean
            // the normal loop can spend zero computation on this edge case.  However, not sure if that
            // could be done cleanly w/o introducing duplicate code.  Depends on if the main loop and this
            // new loop could share code maybe.
          }
        }

cutPending = true;
cutKey = currentKey;
} else if (timesUp) {
throw new IllegalStateException("Duplicate key run exceeded scan batch timeout for "
+ cutKey + ". Increase " + Property.TABLE_SCAN_BATCH_DUPLICATE_MAX_MULTIPLIER.getKey()
+ " or batch timeout, or reduce duplicates for this key.");
}
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This if stmt used to have a break in it, seems like that will still be needed. When in here we have finished batch and need to break out of the loop.

}

iter.next();
}

if (yield.hasYielded()) {
continueKey = new Key(yield.getPositionAndReset());
continueKey = copyResumeKey(yield.getPositionAndReset());
resumeOnSameKey = false;
skipContinueKey = true;
if (!range.contains(continueKey)) {
throw new IOException("Underlying iterator yielded to a position outside of its range: "
Expand All @@ -362,7 +428,9 @@ Batch nextBatch(SortedKeyValueIterator<Key,Value> iter, Range range, ScanParamet
}
}

return new Batch(skipContinueKey, results, continueKey, resultBytes);
int duplicatesToSkipForNextBatch = resumeOnSameKey ? duplicatesReturnedForCurrentKey : 0;
return new Batch(skipContinueKey, results, continueKey, resultBytes,
duplicatesToSkipForNextBatch);
}

private Tablet.LookupResult lookup(SortedKeyValueIterator<Key,Value> mmfi, List<Range> ranges,
Expand Down Expand Up @@ -515,7 +583,8 @@ private void handleTabletClosedDuringScan(List<KVEntry> results, Tablet.LookupRe

private void addUnfinishedRange(Tablet.LookupResult lookupResult, Range range, Key key) {
if (range.getEndKey() == null || key.compareTo(range.getEndKey()) < 0) {
Range nlur = new Range(new Key(key), false, range.getEndKey(), range.isEndKeyInclusive());
Key copy = copyResumeKey(key);
Range nlur = new Range(copy, false, range.getEndKey(), range.isEndKeyInclusive());
lookupResult.unfinishedRanges.add(nlur);
}
}
Expand All @@ -526,4 +595,30 @@ public synchronized void updateQueryStats(int size, long numBytes) {
this.queryResultBytes.addAndGet(numBytes);
this.server.getScanMetrics().incrementQueryResultBytes(numBytes);
}

private Key copyResumeKey(Key key) {
if (key instanceof MemKey) {
MemKey memKey = (MemKey) key;
return new MemKey(memKey, memKey.getKVCount());
}
return new Key(key);
}

private void skipReturnedDuplicates(SortedKeyValueIterator<Key,Value> iter, int duplicatesToSkip,
Range range) throws IOException {
if (duplicatesToSkip <= 0 || !range.isStartKeyInclusive()) {
return;
}

Key startKey = range.getStartKey();
if (startKey == null) {
return;
}

int skipped = 0;
while (skipped < duplicatesToSkip && iter.hasTop() && iter.getTopKey().equals(startKey)) {
iter.next();
skipped++;
}
}
}
Loading