Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
214 changes: 167 additions & 47 deletions ydb/library/yql/dq/opt/dq_opt_make_join_hypergraph.h
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
#include "dq_opt_join_hypergraph.h"
#include "dq_opt_conflict_rules_collector.h"

#include <library/cpp/disjoint_sets/disjoint_sets.h>
#include <yql/essentials/core/cbo/cbo_optimizer_new.h>
#include <yql/essentials/utils/log/log.h>

Expand Down Expand Up @@ -31,82 +32,100 @@ inline TVector<TString> GetConditionUsedRelationNames(const TVector<TJoinColumn>
return res;
}

inline bool AllJoinsAreInner(const std::shared_ptr<IBaseOptimizerNode>& joinTree) {
if (joinTree->Kind == RelNodeType) { return true; }
auto joinNode = std::static_pointer_cast<TJoinOptimizerNode>(joinTree);
return (joinNode->JoinType == EJoinKind::InnerJoin) && AllJoinsAreInner(joinNode->LeftArg) && AllJoinsAreInner(joinNode->RightArg);
}

template <typename TNodeSet>
typename TJoinHypergraph<TNodeSet>::TEdge MakeHyperedge(
TJoinHypergraph<TNodeSet>& graph,
const std::shared_ptr<TJoinOptimizerNode>& joinNode,
const TNodeSet& conditionUsedRels,
std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes,
const TVector<TJoinColumn>& leftJoinKeys,
const TVector<TJoinColumn>& rightJoinKeys
) {
TVector<TString> relations = GetConditionUsedRelationNames(leftJoinKeys, rightJoinKeys);
TNodeSet SES = graph.GetNodesByRelNames(relations);

auto conflictRulesCollector = TConflictRulesCollector<TNodeSet>(joinNode, subtreeNodes);
auto conflictRules = conflictRulesCollector.CollectConflicts();

TNodeSet TES = ConvertConflictRulesIntoTES(conditionUsedRels, conflictRules);
bool isLHSReorderable = Overlaps(SES, subtreeNodes[joinNode->LeftArg]) && !joinNode->LeftAny;
bool isRHSReorderable = Overlaps(SES, subtreeNodes[joinNode->RightArg]) && !joinNode->RightAny;

/* For CROSS, Non-Reorderable, ANY Joins and degenerate predicates (if subtree tables and joinCondition tables do not intersect) */
if (!Overlaps(TES, subtreeNodes[joinNode->LeftArg]) || !joinNode->IsReorderable || joinNode->LeftAny) {
TES |= subtreeNodes[joinNode->LeftArg];
TES = ConvertConflictRulesIntoTES(TES, conflictRules);
if (!isLHSReorderable || !joinNode->IsReorderable) {
SES |= subtreeNodes[joinNode->LeftArg];
}

if (!Overlaps(TES, subtreeNodes[joinNode->RightArg]) || !joinNode->IsReorderable || joinNode->RightAny) {
TES |= subtreeNodes[joinNode->RightArg];
TES = ConvertConflictRulesIntoTES(TES, conflictRules);
if (!isRHSReorderable || !joinNode->IsReorderable) {
SES |= subtreeNodes[joinNode->RightArg];
}

TNodeSet TES = ConvertConflictRulesIntoTES(SES, conflictRules);

TNodeSet left = TES & subtreeNodes[joinNode->LeftArg];
TNodeSet right = TES & subtreeNodes[joinNode->RightArg];

bool isCommutative = OperatorIsCommutative(joinNode->JoinType) && (joinNode->IsReorderable);
return typename TJoinHypergraph<TNodeSet>::TEdge(left, right, joinNode->JoinType, joinNode->LeftAny, joinNode->RightAny, isCommutative, leftJoinKeys, rightJoinKeys);

typename TJoinHypergraph<TNodeSet>::TEdge edge(
left, right,
joinNode->JoinType,
joinNode->LeftAny, joinNode->RightAny,
isCommutative,
leftJoinKeys, rightJoinKeys
);

return edge;
}

/*
* In this routine we decompose AND condition for equijoin into many edges, instead of one hyperedge.
* We group conditions with same relations into one (for example A.id = B.id, A.z = B.z).
*/
template<typename TNodeSet>
void AddCycle(
template <typename TNodeSet>
void AddHyperedges(
TJoinHypergraph<TNodeSet>& graph,
const std::shared_ptr<TJoinOptimizerNode>& joinNode,
std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes
std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes,
const TVector<TJoinColumn>& leftJoinKeys,
const TVector<TJoinColumn>& rightJoinKeys
) {
auto zip = Zip(joinNode->LeftJoinKeys, joinNode->RightJoinKeys);
if (joinNode->JoinType != EJoinKind::InnerJoin || leftJoinKeys.size() <= 1) {
graph.AddEdge(MakeHyperedge(graph, joinNode, subtreeNodes, leftJoinKeys, rightJoinKeys));
return;
}

auto zip = Zip(leftJoinKeys, rightJoinKeys);

using TJoinCondition = std::pair<TJoinColumn, TJoinColumn>;
std::vector<TJoinCondition> joinConds{zip.begin(), zip.end()};
std::sort(joinConds.begin(), joinConds.end());
std::vector<TJoinCondition> joinConditions{zip.begin(), zip.end()};

std::sort(joinConditions.begin(), joinConditions.end());

auto isOneGroup = [](const TJoinCondition& lhs, const TJoinCondition& rhs) -> bool {
return lhs.first.RelName == rhs.first.RelName && lhs.second.RelName == rhs.second.RelName;
return lhs.first.RelName == rhs.first.RelName
&& lhs.second.RelName == rhs.second.RelName;
};

for (size_t i = 0; i < joinConds.size();) {
size_t groupBegin = i;
TVector<TJoinColumn> curGroupLhsJoinKeys, curGroupRhsJoinKeys;
while (i < joinConds.size() && isOneGroup(joinConds[groupBegin], joinConds[i])) {
curGroupLhsJoinKeys.push_back(joinConds[i].first);
curGroupRhsJoinKeys.push_back(joinConds[i].second);
++i;
for (ui32 i = 0; i < joinConditions.size(); ) {
TVector<TJoinColumn> currentGroupLhsJoinKeys, currentGroupRhsJoinKeys;

ui32 groupBegin = i;
while (i < joinConditions.size() &&
isOneGroup(joinConditions[groupBegin], joinConditions[i])) {

const auto &[lhs, rhs] = joinConditions[i];

currentGroupLhsJoinKeys.push_back(lhs);
currentGroupRhsJoinKeys.push_back(rhs);
++ i;
}

TNodeSet conditionUsedRels{};
conditionUsedRels = graph.GetNodesByRelNames(GetConditionUsedRelationNames(curGroupLhsJoinKeys, curGroupRhsJoinKeys));
graph.AddEdge(MakeHyperedge(joinNode, conditionUsedRels,subtreeNodes, curGroupLhsJoinKeys, curGroupRhsJoinKeys));
graph.AddEdge(
MakeHyperedge(graph, joinNode, subtreeNodes,
currentGroupLhsJoinKeys, currentGroupRhsJoinKeys));
}
}

template<typename TNodeSet>
void MakeJoinHypergraphRec(
TJoinHypergraph<TNodeSet>& graph,
const std::shared_ptr<IBaseOptimizerNode>& joinTree,
std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes
std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet>& subtreeNodes,
TVector<typename TJoinHypergraph<TNodeSet>::TEdge>& crossJoins
) {
if (joinTree->Kind == RelNodeType) {
size_t nodeId = graph.AddNode(joinTree);
Expand All @@ -118,20 +137,113 @@ void MakeJoinHypergraphRec(

auto joinNode = std::static_pointer_cast<TJoinOptimizerNode>(joinTree);

MakeJoinHypergraphRec(graph, joinNode->LeftArg, subtreeNodes);
MakeJoinHypergraphRec(graph, joinNode->RightArg, subtreeNodes);
MakeJoinHypergraphRec(graph, joinNode->LeftArg, subtreeNodes, crossJoins);
MakeJoinHypergraphRec(graph, joinNode->RightArg, subtreeNodes, crossJoins);

subtreeNodes[joinTree] = subtreeNodes[joinNode->LeftArg] | subtreeNodes[joinNode->RightArg];

/* In case of inner equi-innerjoins we create a cycle, not a hyperedge */
if (joinNode->LeftJoinKeys.size() > 1 && AllJoinsAreInner(joinTree)) {
AddCycle(graph, joinNode, subtreeNodes);
if (joinNode->JoinType == EJoinKind::Cross) {
auto edge = MakeHyperedge(
graph, joinNode, subtreeNodes,
joinNode->LeftJoinKeys, joinNode->RightJoinKeys
);

crossJoins.emplace_back(std::move(edge));
return;
}

TNodeSet conditionUsedRels{};
conditionUsedRels = graph.GetNodesByRelNames(GetConditionUsedRelationNames(joinNode->LeftJoinKeys, joinNode->RightJoinKeys));
graph.AddEdge(MakeHyperedge<TNodeSet>(joinNode, conditionUsedRels, subtreeNodes, joinNode->LeftJoinKeys, joinNode->RightJoinKeys));
AddHyperedges<TNodeSet>(graph, joinNode, subtreeNodes, joinNode->LeftJoinKeys, joinNode->RightJoinKeys);
}

template<typename TNodeSet>
void AddCrossJoins(TJoinHypergraph<TNodeSet>& graph,
TVector<typename TJoinHypergraph<TNodeSet>::TEdge>& crossJoins) {

TDisjointSets connectedComponents(graph.GetNodes().size());

// If all nodes in nodeSet lie in the same connected component returns
// canonical element of that component, otherwise -1
auto getCommonComponent = [&connectedComponents](TNodeSet nodeSet) -> i32 {
i32 previousComponent = -1;
for (ui32 i = 0; i < nodeSet.size(); ++ i) {
if (nodeSet[i]) {
i32 component = connectedComponents.CanonicSetElement(i);
if (previousComponent != -1 && previousComponent != component) {
return -1;
}

previousComponent = component;
}
}

return previousComponent;
};

auto applyEdgeIfEnabled = [&](const typename TJoinHypergraph<TNodeSet>::TEdge& edge) {
// For edge to become "active", lhs nodes have to form a connected
// hypersubgraph and rhs nodes also have to form connected hypersubgraph

i32 componentLHS = getCommonComponent(edge.Left);
if (componentLHS == -1) {
return false; // lhs nodes are not connected, can't enable the edge
}

i32 componentRHS = getCommonComponent(edge.Right);
if (componentRHS == -1) {
return false; // rhs nodes are not connected, can't enable the edge
}

if (componentLHS == componentRHS) {
// Enabling the edge wouldn't change anything, both sides are already
// in the same connected component -- nothing changes
return false;
}

connectedComponents.UnionSets(componentLHS, componentRHS);
return true;
};

TVector<typename TJoinHypergraph<TNodeSet>::TEdge> enabledCrossJoins;

std::vector<bool> isEdgeActive(graph.GetEdges().size(), false);
bool hasChanged{};
do {
hasChanged = false;

// Try to re-enable any of the hyperedges that might have become active
for (ui32 i = 0; i < graph.GetEdges().size(); ++ i) {
if (isEdgeActive[i]) {
continue;
}

auto& edge = graph.GetEdge(i);
bool wasEdgeApplied = applyEdgeIfEnabled(edge);

// Mark edge as active not to recheck on every iteration:
isEdgeActive[i] |= wasEdgeApplied;
hasChanged |= wasEdgeApplied;
}

// Check if any of the cross joins reduce number of connected components
for (auto& edge : crossJoins) {
bool wasEdgeApplied = applyEdgeIfEnabled(edge);
if (wasEdgeApplied) {
enabledCrossJoins.push_back(edge);
}

hasChanged |= wasEdgeApplied;
}
} while (hasChanged);


// After all missing cross joins have been added, the graph
// has to become fully connected:
Y_ENSURE(connectedComponents.SetCount() == 1);

// Add all missing cross joins to the hypergraph
for (auto& edge : enabledCrossJoins) {
graph.AddEdge(edge);
}
}

template <typename TNodeSet>
Expand All @@ -142,7 +254,8 @@ TJoinHypergraph<TNodeSet> MakeJoinHypergraph(
) {
TJoinHypergraph<TNodeSet> graph{};
std::unordered_map<std::shared_ptr<IBaseOptimizerNode>, TNodeSet> subtreeNodes{};
MakeJoinHypergraphRec(graph, joinTree, subtreeNodes);
TVector<typename TJoinHypergraph<TNodeSet>::TEdge> crossJoins;
MakeJoinHypergraphRec(graph, joinTree, subtreeNodes, crossJoins);

if (logGraph && NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::CoreDq, NYql::NLog::ELevel::TRACE)) {
YQL_CLOG(TRACE, CoreDq) << "Hypergraph build: ";
Expand All @@ -166,6 +279,13 @@ TJoinHypergraph<TNodeSet> MakeJoinHypergraph(
YQL_CLOG(TRACE, CoreDq) << graph.String();
}

AddCrossJoins(graph, crossJoins);

if (logGraph && NYql::NLog::YqlLogger().NeedToLog(NYql::NLog::EComponent::CoreDq, NYql::NLog::ELevel::TRACE)) {
YQL_CLOG(TRACE, CoreDq) << "Hypergraph after adding cross joins: ";
YQL_CLOG(TRACE, CoreDq) << graph.String();
}

return graph;
}

Expand Down
Loading
Loading