Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
* then values in column 'b' must also be identical.
*/
public class DataTrait {

public static int UNIQUE_UNION_LIMIT = 16;
public static final DataTrait EMPTY_TRAIT
= new DataTrait(new UniqueDescription().toImmutable(),
new UniformDescription().toImmutable(), new ImmutableSet.Builder<FdItem>().build(),
Expand Down Expand Up @@ -138,6 +138,20 @@ public boolean isNullSafeEqual(Slot l, Slot r) {
return equalSet.isEqual(l, r);
}

/**
* Get all unique slot sets, including those containing nullable slots.
* Each returned set is a unique key of the relation (no two rows share the same value combination,
* under SQL's "NULL is distinct from NULL" semantics in the UNIQUE-set sense).
*/
public List<Set<Slot>> getAllUniqueSets() {
List<Set<Slot>> res = new ArrayList<>(uniqueSet.uniqueSlots.size() + uniqueSet.combinedUniqueSlotSet.size());
for (Slot slot : uniqueSet.uniqueSlots) {
res.add(ImmutableSet.of(slot));
}
res.addAll(uniqueSet.combinedUniqueSlotSet);
return res;
}

public FuncDeps getAllValidFuncDeps(Set<Slot> validSlots) {
return fdDg.findValidFuncDeps(validSlots);
}
Expand Down Expand Up @@ -268,23 +282,23 @@ public void addDepsByEqualSet(Set<Slot> equalSet) {

/**
* Extends a unique slot using an equivalence set.
* Within slots, if any slot in the equivalence set is unique,
* Within uniqueSlots, if any slot in the equivalence set is unique,
* then all slots in the set are considered unique.
* For slotSets, if there is an intersection with the equivalence set,
* For combinedUniqueSlotSet, if there is an intersection with the equivalence set,
* the slotSet can be substituted with the equivalence set.
* Example:
* Given an equivalence set {a1, a2, a3} and a uniqueSet {a1, b1, c1},
* the sets {a2, b1, c1} and {a3, b1, c1} are also treated as unique.
*/
public void addUniqueByEqualSet(Set<Slot> equalSet) {
if (uniqueSet.isIntersect(equalSet, uniqueSet.slots)) {
uniqueSet.slots.addAll(equalSet);
if (uniqueSet.isIntersect(equalSet, uniqueSet.uniqueSlots)) {
uniqueSet.uniqueSlots.addAll(equalSet);
return;
}
for (Set<Slot> slotSet : uniqueSet.slotSets) {
Set<Slot> intersection = Sets.intersection(equalSet, uniqueSet.slots);
for (Set<Slot> slotSet : uniqueSet.combinedUniqueSlotSet) {
Set<Slot> intersection = Sets.intersection(equalSet, uniqueSet.uniqueSlots);
if (intersection.size() > 2) {
uniqueSet.slotSets.remove(slotSet);
uniqueSet.combinedUniqueSlotSet.remove(slotSet);
slotSet.removeAll(intersection);
for (Slot slot : equalSet) {
ImmutableSet<Slot> uniqueSlotSet
Expand Down Expand Up @@ -330,12 +344,12 @@ public List<Set<Slot>> calEqualSetList() {
*/
public List<Set<Slot>> getAllUniqueAndNotNull() {
List<Set<Slot>> res = new ArrayList<>();
for (Slot slot : uniqueSet.slots) {
for (Slot slot : uniqueSet.uniqueSlots) {
if (!slot.nullable()) {
res.add(ImmutableSet.of(slot));
}
}
for (Set<Slot> slotSet : uniqueSet.slotSets) {
for (Set<Slot> slotSet : uniqueSet.combinedUniqueSlotSet) {
boolean containsNullable = false;
for (Slot slot : slotSet) {
if (slot.nullable()) {
Expand Down Expand Up @@ -380,7 +394,7 @@ public DataTrait build() {

public void pruneSlots(Set<Slot> outputSlots) {
uniformSet.removeNotContain(outputSlots);
uniqueSet.removeNotContain(outputSlots);
uniqueSet.removeNotContain(outputSlots, equalSetBuilder.build());
equalSetBuilder.removeNotContain(outputSlots);
fdDgBuilder.removeNotContain(outputSlots);
}
Expand All @@ -404,81 +418,115 @@ public void replaceEqualSetBy(Map<Slot, Slot> replaceMap) {
public void replaceFuncDepsBy(Map<Slot, Slot> replaceMap) {
fdDgBuilder.replace(replaceMap);
}

/** replace uniqueSet.slotSets slot to root in equalSets */
public void normalizeUniqueSetsToEqualSetRoot() {
ImmutableEqualSet<Slot> equalSet = equalSetBuilder.build();
Set<ImmutableSet<Slot>> newSlotSets = new HashSet<>();
for (ImmutableSet<Slot> uniqueSet : uniqueSet.combinedUniqueSlotSet) {
ImmutableSet.Builder<Slot> roots = ImmutableSet.builder();
for (Slot slot : uniqueSet) {
Slot root = equalSet.getRoot(slot);
if (root != null) {
roots.add(root);
} else {
roots.add(slot);
}
}
newSlotSets.add(roots.build());
}
uniqueSet.combinedUniqueSlotSet = newSlotSets;
}
}

static class UniqueDescription {
Set<Slot> slots;
Set<ImmutableSet<Slot>> slotSets;
Set<Slot> uniqueSlots;
Set<ImmutableSet<Slot>> combinedUniqueSlotSet;

UniqueDescription() {
slots = new HashSet<>();
slotSets = new HashSet<>();
uniqueSlots = new HashSet<>();
combinedUniqueSlotSet = new HashSet<>();
}

UniqueDescription(UniqueDescription o) {
this.slots = new HashSet<>(o.slots);
this.slotSets = new HashSet<>(o.slotSets);
this.uniqueSlots = new HashSet<>(o.uniqueSlots);
this.combinedUniqueSlotSet = new HashSet<>(o.combinedUniqueSlotSet);
}

UniqueDescription(Set<Slot> slots, Set<ImmutableSet<Slot>> slotSets) {
this.slots = slots;
this.slotSets = slotSets;
this.uniqueSlots = slots;
this.combinedUniqueSlotSet = slotSets;
}

public boolean contains(Slot slot) {
return slots.contains(slot);
return uniqueSlots.contains(slot);
}

public boolean contains(Set<Slot> slotSet) {
if (slotSet.size() == 1) {
return slots.contains(slotSet.iterator().next());
return uniqueSlots.contains(slotSet.iterator().next());
}
return slotSets.contains(ImmutableSet.copyOf(slotSet));
return combinedUniqueSlotSet.contains(ImmutableSet.copyOf(slotSet));
}

public boolean containsAnySub(Set<Slot> slotSet) {
return slotSet.stream().anyMatch(s -> slots.contains(s))
|| slotSets.stream().anyMatch(slotSet::containsAll);
return slotSet.stream().anyMatch(s -> uniqueSlots.contains(s))
|| combinedUniqueSlotSet.stream().anyMatch(slotSet::containsAll);
}

public void removeNotContain(Set<Slot> slotSet) {
if (!slotSet.isEmpty()) {
Set<Slot> newSlots = Sets.newLinkedHashSetWithExpectedSize(slots.size());
for (Slot slot : slots) {
if (slotSet.contains(slot)) {
public void removeNotContain(Set<Slot> outputSlots, ImmutableEqualSet<Slot> equalSet) {
if (!outputSlots.isEmpty()) {
Set<Slot> newSlots = Sets.newLinkedHashSetWithExpectedSize(uniqueSlots.size());
for (Slot slot : uniqueSlots) {
if (outputSlots.contains(slot)) {
newSlots.add(slot);
}
}
this.slots = newSlots;

Set<ImmutableSet<Slot>> newSlotSets = Sets.newLinkedHashSetWithExpectedSize(slots.size());
for (ImmutableSet<Slot> set : slotSets) {
if (slotSet.containsAll(set)) {
newSlotSets.add(set);
this.uniqueSlots = newSlots;

Set<ImmutableSet<Slot>> newCombinedUniqueSlotSet = Sets.newHashSetWithExpectedSize(uniqueSlots.size());
for (ImmutableSet<Slot> combinedUniqueSlots : combinedUniqueSlotSet) {
ImmutableSet.Builder<Slot> builder = ImmutableSet.builder();
boolean allCanFindReplacement = true;
for (Slot slot : combinedUniqueSlots) {
if (outputSlots.contains(slot)) {
builder.add(slot);
} else {
Set<Slot> equalSlots = equalSet.calEqualSet(slot);
Set<Slot> replaceSlots = Sets.intersection(outputSlots, equalSlots);
if (!replaceSlots.isEmpty()) {
builder.add(replaceSlots.iterator().next());
} else {
allCanFindReplacement = false;
}
}
}
if (allCanFindReplacement) {
newCombinedUniqueSlotSet.add(builder.build());
}
}
this.slotSets = newSlotSets;
this.combinedUniqueSlotSet = newCombinedUniqueSlotSet;
}
}

public void add(Slot slot) {
slots.add(slot);
uniqueSlots.add(slot);
}

public void add(ImmutableSet<Slot> slotSet) {
if (slotSet.isEmpty()) {
return;
}
if (slotSet.size() == 1) {
slots.add(slotSet.iterator().next());
uniqueSlots.add(slotSet.iterator().next());
return;
}
slotSets.add(slotSet);
combinedUniqueSlotSet.add(slotSet);
}

public void add(UniqueDescription uniqueDescription) {
slots.addAll(uniqueDescription.slots);
slotSets.addAll(uniqueDescription.slotSets);
uniqueSlots.addAll(uniqueDescription.uniqueSlots);
combinedUniqueSlotSet.addAll(uniqueDescription.combinedUniqueSlotSet);
}

public boolean isIntersect(Set<Slot> set1, Set<Slot> set2) {
Expand All @@ -496,26 +544,26 @@ public boolean isIntersect(Set<Slot> set1, Set<Slot> set2) {
}

public boolean isEmpty() {
return slots.isEmpty() && slotSets.isEmpty();
return uniqueSlots.isEmpty() && combinedUniqueSlotSet.isEmpty();
}

@Override
public String toString() {
return "{" + slots + slotSets + "}";
return "{" + uniqueSlots + combinedUniqueSlotSet + "}";
}

public void replace(Map<Slot, Slot> replaceMap) {
slots = slots.stream()
uniqueSlots = uniqueSlots.stream()
.map(s -> replaceMap.getOrDefault(s, s))
.collect(Collectors.toSet());
slotSets = slotSets.stream()
combinedUniqueSlotSet = combinedUniqueSlotSet.stream()
.map(set -> set.stream().map(s -> replaceMap.getOrDefault(s, s))
.collect(ImmutableSet.toImmutableSet()))
.collect(Collectors.toSet());
}

public UniqueDescription toImmutable() {
return new UniqueDescription(ImmutableSet.copyOf(slots), ImmutableSet.copyOf(slotSets));
return new UniqueDescription(ImmutableSet.copyOf(uniqueSlots), ImmutableSet.copyOf(combinedUniqueSlotSet));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,6 +281,8 @@ default DataTrait computeDataTrait() {
computeEqualSet(fdBuilder);
computeFd(fdBuilder);

fdBuilder.normalizeUniqueSetsToEqualSetRoot();

for (Slot slot : getOutput()) {
Set<Slot> o = ImmutableSet.of(slot);
// all slots dependent unique slot
Expand All @@ -298,6 +300,7 @@ default DataTrait computeDataTrait() {
fdBuilder.addUniformByEqualSet(validEqualSet);
fdBuilder.addUniqueByEqualSet(validEqualSet);
}

Set<Slot> output = this.getOutputSet();
for (Plan child : children()) {
if (!output.containsAll(child.getOutputSet())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.apache.doris.nereids.hint.DistributeHint;
import org.apache.doris.nereids.jobs.joinorder.hypergraph.bitmap.LongBitmap;
import org.apache.doris.nereids.memo.GroupExpression;
import org.apache.doris.nereids.properties.DataTrait;
import org.apache.doris.nereids.properties.DataTrait.Builder;
import org.apache.doris.nereids.properties.LogicalProperties;
import org.apache.doris.nereids.rules.exploration.join.JoinReorderContext;
Expand Down Expand Up @@ -635,6 +636,37 @@ public void computeUnique(Builder builder) {
} else if (joinType.isRightSemiOrAntiJoin() || joinType.isAsofRightJoin()) {
builder.addUniqueSlot(right().getLogicalProperties().getTrait());
}

// Union propagation:
// For INNER / CROSS / LEFT_OUTER / RIGHT_OUTER / FULL_OUTER joins, if the left side
// is unique on U_L and the right side is unique on U_R, then the join output is unique
// on U_L ∪ U_R.
// Proof sketch (INNER): two output rows (l_a, r_a) and (l_b, r_b) agreeing on U_L ∪ U_R
// must agree on U_L (so l_a = l_b by L's uniqueness on U_L) and on U_R (so r_a = r_b),
// hence the two rows are identical.
if (joinType.isInnerJoin() || joinType.isCrossJoin()
|| joinType.isLeftOuterJoin() || joinType.isRightOuterJoin()
|| joinType.isFullOuterJoin()) {
List<Set<Slot>> leftUniqueSets =
left().getLogicalProperties().getTrait().getAllUniqueSets();
List<Set<Slot>> rightUniqueSets =
right().getLogicalProperties().getTrait().getAllUniqueSets();
if (!leftUniqueSets.isEmpty() && !rightUniqueSets.isEmpty()) {
int count = 0;
outer:
for (Set<Slot> leftUnique : leftUniqueSets) {
for (Set<Slot> rightUnique : rightUniqueSets) {
if (count >= DataTrait.UNIQUE_UNION_LIMIT) {
break outer;
}
builder.addUniqueSlot(ImmutableSet.<Slot>builder()
.addAll(leftUnique).addAll(rightUnique).build());
count++;
}
}
}
}

// if there is non-equal join conditions, don't propagate unique
if (hashJoinConjuncts.isEmpty()) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public static <T> ImmutableEqualSet<T> empty() {
*/
public static class Builder<T> {
private Map<T, T> parent;
private ImmutableEqualSet<T> built;

Builder(Map<T, T> parent) {
this.parent = parent;
Expand All @@ -65,6 +66,7 @@ public Builder(ImmutableEqualSet<T> equalSet) {
* replace all key value according replace map
*/
public void replace(Map<T, T> replaceMap) {
built = null;
Map<T, T> newMap = new LinkedHashMap<>();
for (Entry<T, T> entry : parent.entrySet()) {
newMap.put(replaceMap.getOrDefault(entry.getKey(), entry.getKey()),
Expand All @@ -78,6 +80,7 @@ public void replace(Map<T, T> replaceMap) {
* @param containSet the set to contain
*/
public void removeNotContain(Set<T> containSet) {
built = null;
List<Set<T>> equalSetList = calEqualSetList();
this.parent.clear();
for (Set<T> equalSet : equalSetList) {
Expand All @@ -98,6 +101,7 @@ public void removeNotContain(Set<T> containSet) {
* Add a equal pair
*/
public void addEqualPair(T a, T b) {
built = null;
if (!parent.containsKey(a)) {
parent.put(a, a);
}
Expand Down Expand Up @@ -136,6 +140,7 @@ public List<Set<T>> calEqualSetList() {
}

public void addEqualSet(ImmutableEqualSet<T> equalSet) {
built = null;
this.parent.putAll(equalSet.root);
}

Expand All @@ -146,15 +151,23 @@ private T findRoot(T a) {
return findRoot(parent.get(a));
}

/** compute if built is null */
public ImmutableEqualSet<T> build() {
ImmutableMap.Builder<T, T> foldMapBuilder = new ImmutableMap.Builder<>();
for (T k : parent.keySet()) {
foldMapBuilder.put(k, findRoot(k));
if (built == null) {
ImmutableMap.Builder<T, T> foldMapBuilder = new ImmutableMap.Builder<>();
for (T k : parent.keySet()) {
foldMapBuilder.put(k, findRoot(k));
}
built = new ImmutableEqualSet<>(foldMapBuilder.build());
}
return new ImmutableEqualSet<>(foldMapBuilder.build());
return built;
}
}

public T getRoot(T node) {
return root.get(node);
}

/**
* Calculate equal set for a except self
*/
Expand Down
Loading
Loading