Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
49 changes: 42 additions & 7 deletions src/java/org/apache/cassandra/db/rows/Row.java
Original file line number Diff line number Diff line change
Expand Up @@ -47,7 +47,9 @@
import org.apache.cassandra.utils.MergeIterator;
import org.apache.cassandra.utils.ObjectSizes;
import org.apache.cassandra.utils.SearchIterator;
import org.apache.cassandra.utils.BulkIterator;
import org.apache.cassandra.utils.btree.BTree;
import org.apache.cassandra.utils.btree.UpdateFunction;
import org.apache.cassandra.utils.memory.Cloner;

/**
Expand Down Expand Up @@ -716,7 +718,10 @@ public static class Merger
private int rowsToMerge;
private int lastRowSet = -1;

private final List<ColumnData> dataBuffer = new ArrayList<>();
private static final ColumnData[] EMPTY_DATA_BUFFER = new ColumnData[0];

private ColumnData[] dataBuffer = EMPTY_DATA_BUFFER;
private int dataBufferSize;
private final ColumnDataReducer columnDataReducer;

public Merger(int size, boolean hasComplex)
Expand All @@ -728,7 +733,8 @@ public Merger(int size, boolean hasComplex)

public void clear()
{
dataBuffer.clear();
Arrays.fill(dataBuffer, 0, dataBufferSize, null);
dataBufferSize = 0;
Arrays.fill(rows, null);
columnDataIterators.clear();
rowsToMerge = 0;
Expand Down Expand Up @@ -778,22 +784,51 @@ public Row merge(DeletionTime activeDeletion)
if (activeDeletion.deletes(rowInfo))
rowInfo = LivenessInfo.EMPTY;

int columnsCountEstimation = 0;
for (Row row : rows)
columnDataIterators.add(row == null ? Collections.emptyIterator() : row.iterator());
{
if (row != null)
{
columnDataIterators.add(row.iterator());
columnsCountEstimation = Math.max(columnsCountEstimation, row.columnCount());
}
else
{
columnDataIterators.add(Collections.emptyIterator());
}
}
// try to estimate and set a potential target capacity
if (dataBuffer.length < columnsCountEstimation)
dataBuffer = Arrays.copyOf(dataBuffer, columnsCountEstimation);

columnDataReducer.setActiveDeletion(activeDeletion);
Iterator<ColumnData> merged = MergeIterator.get(columnDataIterators, ColumnData.comparator, columnDataReducer);
while (merged.hasNext())
{
ColumnData data = merged.next();
if (data != null)
dataBuffer.add(data);
{
ensureDataBufferCapacity();
dataBuffer[dataBufferSize++] = data;
}
}

// Because some data might have been shadowed by the 'activeDeletion', we could have an empty row
return rowInfo.isEmpty() && rowDeletion.isLive() && dataBuffer.isEmpty()
? null
: BTreeRow.create(clustering, rowInfo, rowDeletion, BTree.build(dataBuffer));
if (rowInfo.isEmpty() && rowDeletion.isLive() && dataBufferSize == 0)
return null;

try (BulkIterator<ColumnData> it = BulkIterator.of(dataBuffer))
{
return BTreeRow.create(clustering, rowInfo, rowDeletion,
BTree.build(it, dataBufferSize, UpdateFunction.noOp()));
}
}

private void ensureDataBufferCapacity()
{
if (dataBufferSize == dataBuffer.length)
// increase capacity by 50%, use 4 as a default capacity
dataBuffer = Arrays.copyOf(dataBuffer, Math.max(dataBuffer.length + (dataBuffer.length >> 1), 4));
}

public Clustering<?> mergedClustering()
Expand Down
21 changes: 11 additions & 10 deletions src/java/org/apache/cassandra/utils/MergeIterator.java
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,8 @@ static final class ManyToOne<In,Out> extends MergeIterator<In,Out>
{
protected final Candidate<In>[] heap;

private final Comparator<? super In> comp;

/** Number of non-exhausted iterators. */
int size;

Expand All @@ -174,9 +176,10 @@ public ManyToOne(List<? extends Iterator<In>> iters, Comparator<? super In> comp
this.heap = heap;
size = 0;

this.comp = comp;
for (int i = 0; i < iters.size(); i++)
{
Candidate<In> candidate = new Candidate<>(i, iters.get(i), comp);
Candidate<In> candidate = new Candidate<>(i, iters.get(i));
heap[size++] = candidate;
}
needingAdvance = size;
Expand Down Expand Up @@ -292,7 +295,7 @@ private void replaceAndSink(Candidate<In> candidate, int currIdx)
{
if (!heap[nextIdx].equalParent) // if we were greater then an (or were the) equal parent, we are >= the child
{
int cmp = candidate.compareTo(heap[nextIdx]);
int cmp = candidate.compareTo(heap[nextIdx], comp);
if (cmp <= 0)
{
heap[nextIdx].equalParent = cmp == 0;
Expand All @@ -316,12 +319,12 @@ private void replaceAndSink(Candidate<In> candidate, int currIdx)
if (!heap[nextIdx + 1].equalParent)
{
// pick the smallest of the two children
int siblingCmp = heap[nextIdx + 1].compareTo(heap[nextIdx]);
int siblingCmp = heap[nextIdx + 1].compareTo(heap[nextIdx], comp);
if (siblingCmp < 0)
++nextIdx;

// if we're smaller than this, we are done, and must only restore the heap and equalParent properties
int cmp = candidate.compareTo(heap[nextIdx]);
int cmp = candidate.compareTo(heap[nextIdx], comp);
if (cmp <= 0)
{
if (cmp == 0)
Expand Down Expand Up @@ -362,7 +365,7 @@ private void replaceAndSink(Candidate<In> candidate, int currIdx)
// ... but sometimes we will have one last child to compare against, that has no siblings
if (!heap[nextIdx].equalParent)
{
int cmp = candidate.compareTo(heap[nextIdx]);
int cmp = candidate.compareTo(heap[nextIdx], comp);
if (cmp <= 0)
{
heap[nextIdx].equalParent = cmp == 0;
Expand All @@ -377,19 +380,17 @@ private void replaceAndSink(Candidate<In> candidate, int currIdx)
}

// Holds and is comparable by the head item of an iterator it owns
protected static final class Candidate<In> implements Comparable<Candidate<In>>
protected static final class Candidate<In>
{
private final Iterator<? extends In> iter;
private final Comparator<? super In> comp;
private final int idx;
private In item;
private In lowerBound;
boolean equalParent;

public Candidate(int idx, Iterator<? extends In> iter, Comparator<? super In> comp)
public Candidate(int idx, Iterator<? extends In> iter)
{
this.iter = iter;
this.comp = comp;
this.idx = idx;
this.lowerBound = iter instanceof IteratorWithLowerBound ? ((IteratorWithLowerBound<In>)iter).lowerBound() : null;
}
Expand All @@ -410,7 +411,7 @@ protected Candidate<In> advance()
return this;
}

public int compareTo(Candidate<In> that)
public int compareTo(Candidate<In> that, Comparator<? super In> comp)
{
assert this.item != null && that.item != null;
int ret = comp.compare(this.item, that.item);
Expand Down