Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 2 additions & 3 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
/buildSrc/build
/buildSrc/subprojects/*/build
/site/.jekyll-cache
**/generated/

# VSCode Java plugin
/example/*/bin
Expand All @@ -33,9 +34,7 @@
/.vscode/*

# IDEA
/out
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why change .gitignore file?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

When I run with intellij, git detected a ton of extra files.

/*/out/
/example/*/out
**/out/
# The star is required for further !/.idea/ to work, see https://git-scm.com/docs/gitignore
/.idea/*
# Icon for JetBrains Toolbox
Expand Down
16 changes: 16 additions & 0 deletions core/src/main/java/org/apache/calcite/plan/RelOptUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -303,6 +303,22 @@ public static Set<CorrelationId> getVariablesUsed(RelNode rel) {
return visitor.vuv.variables;
}

/**
* Returns the set of variables used by an expression or its
* descendants.
*
* <p>The set may contain "duplicates" (variables with different ids that,
* when resolved, will reference the same source relational expression).
*
* <p>The item type is the same as
* {@link org.apache.calcite.rex.RexCorrelVariable#id}.
*/
public static Set<CorrelationId> getVariablesUsed(RexNode node) {
CorrelationCollector visitor = new CorrelationCollector();
node.accept(visitor.vuv);
return visitor.vuv.variables;
}

/**
* Returns the set of variables used by the given list of sub-queries and its descendants.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,18 @@ public static RelNode go(RexBuilder builder, CorrelationId canonicalId,
ImmutableSet.copyOf(alternateIds)));
}

/**
* Rewrites an expression, replacing alternate correlation variables
* with a canonical correlation variable.
*/
public static RexNode go(RexBuilder builder, CorrelationId canonicalId,
Iterable<? extends CorrelationId> alternateIds, RexNode r) {
DeduplicateCorrelateVariables shuttle =
new DeduplicateCorrelateVariables(builder, canonicalId,
ImmutableSet.copyOf(alternateIds));
return r.accept(shuttle.dedupRex);
}

@Override public RelNode visit(RelNode other) {
RelNode next = super.visit(other);
return next.accept(dedupRex);
Expand Down
233 changes: 168 additions & 65 deletions core/src/main/java/org/apache/calcite/sql2rel/SqlToRelConverter.java
Original file line number Diff line number Diff line change
Expand Up @@ -217,6 +217,7 @@
import java.util.TreeSet;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
import java.util.function.Consumer;
import java.util.function.Supplier;
import java.util.function.UnaryOperator;
import java.util.stream.Collectors;
Expand Down Expand Up @@ -3261,12 +3262,12 @@ protected RelNode createAsofJoin(
return node;
}

private @Nullable CorrelationUse getCorrelationUse(Blackboard bb, final RelNode r0) {
final Set<CorrelationId> correlatedVariables =
RelOptUtil.getVariablesUsed(r0);
if (correlatedVariables.isEmpty()) {
return null;
}
/** Common utility for resolving correlation names, required columns and field mappings used by
* both {@link #massageExpressionsForCorrelation} and {@link #getCorrelationUse}.
* returns null if no correlation names are detected for this scope.
*/
private @Nullable ResolvedCorrelationInfo getCorrelationInfo(Blackboard bb,
final Set<CorrelationId> correlatedVariables) {
final ImmutableBitSet.Builder requiredColumns = ImmutableBitSet.builder();
final List<CorrelationId> correlNames = new ArrayList<>();
// Mapping from (correlId, originalFieldIndex) to projectedFieldIndex for aggregation
Expand Down Expand Up @@ -3350,29 +3351,92 @@ protected RelNode createAsofJoin(
if (correlNames.isEmpty()) {
// None of the correlating variables originated in this scope.
return null;
} else {
return new ResolvedCorrelationInfo(correlNames, requiredColumns.build(), fieldMapping);
}
}

private @Nullable CorrelationUse getCorrelationUse(Blackboard bb, final RelNode r0) {
final Set<CorrelationId> correlatedVariables =
RelOptUtil.getVariablesUsed(r0);
if (correlatedVariables.isEmpty()) {
return null;
}

ResolvedCorrelationInfo correlationInfo = getCorrelationInfo(bb, correlatedVariables);
if (correlationInfo == null) {
// None of the correlating variables originated in this scope.
return null;
}

RelNode r = r0;
if (correlNames.size() > 1) {
if (correlationInfo.correlNames.size() > 1) {
// The same table was referenced more than once.
// So we deduplicate.
r =
DeduplicateCorrelateVariables.go(rexBuilder, correlNames.get(0),
Util.skip(correlNames), r0);
DeduplicateCorrelateVariables.go(rexBuilder, correlationInfo.correlNames.get(0),
Util.skip(correlationInfo.correlNames), r0);
// Add new node to leaves.
leaves.put(r, r.getRowType().getFieldCount());
}

// If there are field mappings (due to aggregation), rewrite the RelNode tree
// to update correlation variable row type and field indices
if (!fieldMapping.isEmpty()) {
if (!correlationInfo.fieldMapping.isEmpty()) {
r =
r.accept(
new CorrelationFieldMappingShuttle(rexBuilder, correlNames.get(0),
bb.root().getRowType(), fieldMapping));
new CorrelationFieldMappingShuttle(rexBuilder, correlationInfo.correlNames.get(0),
bb.root().getRowType(), correlationInfo.fieldMapping));
}

return new CorrelationUse(correlNames.get(0), requiredColumns.build(), r);
return new CorrelationUse(correlationInfo.correlNames.get(0), correlationInfo.requiredColumns,
r);
}

/** Before building a RelNode tree with the provided expressions, detect and resolve correlation
* names, rewrite expressions, and return a callback to be called after the tree is built.
* Returns null if no correlation is detected.
*/
private @Nullable MassagedCorrelationExpressions massageExpressionsForCorrelation(Blackboard bb,
final List<RexNode> exprs) {
Set<CorrelationId> correlatedVariables = new HashSet<>();
for (RexNode e : exprs) {
correlatedVariables.addAll(RelOptUtil.getVariablesUsed(e));
}
if (correlatedVariables.isEmpty()) {
return null;
}

ResolvedCorrelationInfo correlationInfo = getCorrelationInfo(bb, correlatedVariables);
if (correlationInfo == null) {
// None of the correlating variables originated in this scope.
return null;
}

List<RexNode> newExprs = new ArrayList<>(exprs);
Consumer<RelNode> callback = (RelNode r) -> { };
if (correlationInfo.correlNames.size() > 1) {
// The same table was referenced more than once.
// So we deduplicate.
CorrelationId canonicalId = correlationInfo.correlNames.get(0);
List<CorrelationId> tail = Util.skip(correlationInfo.correlNames);
newExprs.replaceAll(e ->
DeduplicateCorrelateVariables.go(rexBuilder, canonicalId, tail, e));
// Add new node to leaves.
callback = r -> leaves.put(r, r.getRowType().getFieldCount());
}

// If there are field mappings (due to aggregation), rewrite the RelNode tree
// to update correlation variable row type and field indices
if (!correlationInfo.fieldMapping.isEmpty()) {
CorrelationFieldMappingRexShuttle shuttle =
new CorrelationFieldMappingRexShuttle(rexBuilder, correlationInfo.correlNames.get(0),
bb.root().getRowType(), correlationInfo.fieldMapping);
newExprs.replaceAll(e -> e.accept(shuttle));
}

return new MassagedCorrelationExpressions(newExprs, correlationInfo.correlNames.get(0),
callback);
}

/**
Expand Down Expand Up @@ -3773,26 +3837,22 @@ private void createAggImpl(Blackboard bb,

final RelNode inputRel = bb.root();

// Project the expressions required by agg and having.
RelNode intermediateProject = relBuilder.push(inputRel)
.projectNamed(preExprs.leftList(), preExprs.rightList(), false)
.build();
final RelNode r2;
// deal with correlation
final CorrelationUse p = getCorrelationUse(bb, intermediateProject);
if (p != null) {
assert p.r instanceof Project;
// correlation variables have been normalized in p.r, we should use expressions
// in p.r instead of the original exprs
Project project1 = (Project) p.r;
r2 = relBuilder.push(bb.root())
.projectNamed(project1.getProjects(), project1.getRowType().getFieldNames(),
true, ImmutableSet.of(p.id))
.build();
final @Nullable MassagedCorrelationExpressions massagedResult =
massageExpressionsForCorrelation(bb, preExprs.leftList());
relBuilder.push(inputRel);
if (massagedResult == null) {
relBuilder.projectNamed(preExprs.leftList(), preExprs.rightList(), false);
} else {
r2 = intermediateProject;
relBuilder.projectNamed(massagedResult.exprs, preExprs.rightList(), false,
ImmutableSet.of(massagedResult.correlId));
}

RelNode project = relBuilder.build();
if (massagedResult != null) {
massagedResult.callback.accept(project);
}
bb.setRoot(r2, false);

bb.setRoot(project, false);
bb.mapRootRelToFieldProjection.put(bb.root(), r.groupExprProjection);

// REVIEW jvs 31-Oct-2007: doesn't the declaration of
Expand Down Expand Up @@ -5006,27 +5066,22 @@ private void convertNonAggregateSelectList(
SqlValidatorUtil.uniquify(fieldNames,
catalogReader.nameMatcher().isCaseSensitive());

relBuilder.push(bb.root())
.projectNamed(exprs, uniqueFieldNames, true);
final @Nullable MassagedCorrelationExpressions massagedResult =
massageExpressionsForCorrelation(bb, exprs);
relBuilder.push(bb.root());
if (massagedResult == null) {
relBuilder.projectNamed(exprs, uniqueFieldNames, true);
} else {
relBuilder.projectNamed(massagedResult.exprs, uniqueFieldNames, true,
ImmutableSet.of(massagedResult.correlId));
}

RelNode project = relBuilder.build();

final RelNode r;
final CorrelationUse p = getCorrelationUse(bb, project);
if (p != null) {
assert p.r instanceof Project;
// correlation variables have been normalized in p.r, we should use expressions
// in p.r instead of the original exprs
Project project1 = (Project) p.r;
r = relBuilder.push(bb.root())
.projectNamed(project1.getProjects(), uniqueFieldNames, true,
ImmutableSet.of(p.id))
.build();
} else {
r = project;
if (massagedResult != null) {
massagedResult.callback.accept(project);
}

bb.setRoot(r, false);
bb.setRoot(project, false);

assert bb.columnMonotonicities.isEmpty();
bb.columnMonotonicities.addAll(columnMonotonicityList);
Expand Down Expand Up @@ -6179,13 +6234,13 @@ RexFieldAccess getFieldAccess(CorrelationId name) {
* Shuttle that rewrites correlation field accesses to use projected field indices
* when correlation references aggregated relations.
*/
private static class CorrelationFieldMappingShuttle extends RelHomogeneousShuttle {
private static class CorrelationFieldMappingRexShuttle extends RexShuttle {
private final RexBuilder rexBuilder;
private final CorrelationId targetCorrelId;
private final RelDataType newCorrelRowType;
private final Map<Pair<CorrelationId, Integer>, Integer> fieldMapping;

CorrelationFieldMappingShuttle(RexBuilder rexBuilder,
CorrelationFieldMappingRexShuttle(RexBuilder rexBuilder,
CorrelationId targetCorrelId,
RelDataType newCorrelRowType,
Map<Pair<CorrelationId, Integer>, Integer> fieldMapping) {
Expand All @@ -6195,23 +6250,40 @@ private static class CorrelationFieldMappingShuttle extends RelHomogeneousShuttl
this.fieldMapping = fieldMapping;
}

@Override public RelNode visit(RelNode other) {
return super.visit(other).accept(new RexShuttle() {
@Override public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
if (fieldAccess.getReferenceExpr() instanceof RexCorrelVariable) {
RexCorrelVariable correlVar = (RexCorrelVariable) fieldAccess.getReferenceExpr();
if (correlVar.id.equals(targetCorrelId)) {
Integer newIndex =
fieldMapping.get(Pair.of(correlVar.id, fieldAccess.getField().getIndex()));
if (newIndex != null) {
return rexBuilder.makeFieldAccess(
rexBuilder.makeCorrel(newCorrelRowType, correlVar.id), newIndex);
}
}
@Override public RexNode visitFieldAccess(RexFieldAccess fieldAccess) {
if (fieldAccess.getReferenceExpr() instanceof RexCorrelVariable) {
RexCorrelVariable correlVar = (RexCorrelVariable) fieldAccess.getReferenceExpr();
if (correlVar.id.equals(targetCorrelId)) {
Integer newIndex =
fieldMapping.get(Pair.of(correlVar.id, fieldAccess.getField().getIndex()));
if (newIndex != null) {
return rexBuilder.makeFieldAccess(
rexBuilder.makeCorrel(newCorrelRowType, correlVar.id), newIndex);
}
return super.visitFieldAccess(fieldAccess);
}
});
}
return super.visitFieldAccess(fieldAccess);
}
}

/**
* Shuttle that rewrites correlation field accesses to use projected field indices
* when correlation references aggregated relations.
*/
private static class CorrelationFieldMappingShuttle extends RelHomogeneousShuttle {
private final CorrelationFieldMappingRexShuttle rexShuttle;

CorrelationFieldMappingShuttle(RexBuilder rexBuilder,
CorrelationId targetCorrelId,
RelDataType newCorrelRowType,
Map<Pair<CorrelationId, Integer>, Integer> fieldMapping) {
this.rexShuttle =
new CorrelationFieldMappingRexShuttle(rexBuilder, targetCorrelId, newCorrelRowType,
fieldMapping);
}

@Override public RelNode visit(RelNode other) {
return super.visit(other).accept(this.rexShuttle);
}
}

Expand Down Expand Up @@ -6581,6 +6653,37 @@ private static class CorrelationUse {
}
}

/** Wrapper around information collected by {@link #getCorrelationInfo}. */
private static class ResolvedCorrelationInfo {
public final List<CorrelationId> correlNames;
public final ImmutableBitSet requiredColumns;
public final Map<Pair<CorrelationId, Integer>, Integer> fieldMapping;

ResolvedCorrelationInfo(List<CorrelationId> correlNames, ImmutableBitSet requiredColumns,
Map<Pair<CorrelationId, Integer>, Integer> fieldMapping) {
this.correlNames = correlNames;
this.requiredColumns = requiredColumns;
this.fieldMapping = fieldMapping;
}
}

/** Wrapper around optionally returned results from {@link #massageExpressionsForCorrelation}. */
private static class MassagedCorrelationExpressions {
/** Modified expressions. */
public final List<RexNode> exprs;
/** CorrelationId to use when building the relational expression. */
public final CorrelationId correlId;
/** Callback to be called after the RelNode tree which uses these expressions is built. */
public final Consumer<RelNode> callback;

MassagedCorrelationExpressions(List<RexNode> exprs, CorrelationId correlId,
Consumer<RelNode> callback) {
this.exprs = exprs;
this.correlId = correlId;
this.callback = callback;
}
}

/** Returns a default {@link Config}. */
public static Config config() {
return CONFIG;
Expand Down
Loading
Loading