Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.function.Supplier;
import org.apache.calcite.plan.RelTraitSet;
Expand All @@ -43,9 +42,7 @@
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAggregate;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalAsOfJoin;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalJoin;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalProject;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalSort;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalTableScan;
import org.apache.pinot.query.planner.physical.v2.nodes.PhysicalWindow;


Expand Down Expand Up @@ -134,9 +131,11 @@ RelNode assignSort(PhysicalSort sort) {
*/
@VisibleForTesting
RelNode assignJoin(Join join) {
// Case-1: Handle lookup joins.
// Case-1: Lookup joins — no distribution traits needed. LookupJoinRule (post-pass) handles
// fragment isolation by converting the right exchange to LOOKUP_LOCAL_EXCHANGE, ensuring the
// left has an exchange, and wrapping the join with IDENTITY_EXCHANGE above.
if (PinotHintOptions.JoinHintOptions.useLookupJoinStrategy(join)) {
return assignLookupJoin(join);
return join;
}
// Case-2: Handle dynamic filter for semi joins.
JoinInfo joinInfo = join.analyzeCondition();
Expand Down Expand Up @@ -257,28 +256,6 @@ RelNode assignWindow(PhysicalWindow window) {
return window.copy(window.getTraitSet(), List.of(input));
}

private RelNode assignLookupJoin(Join join) {
/*
* Lookup join expects right input to have project and table-scan nodes exactly. Moreover, lookup join is used
* with Dimension tables only. Given this, we expect the entire right input to be available in all workers
* selected for the left input. For now, we will assign broadcast trait to the entire right input. Worker
* assignment will have to handle this explicitly regardless.
*/
RelNode leftInput = join.getInputs().get(0);
RelNode rightInput = join.getInputs().get(1);
Preconditions.checkState(rightInput instanceof PhysicalProject, "Expected project as right input of table scan");
Preconditions.checkState(rightInput.getInput(0) instanceof PhysicalTableScan,
"Expected table scan under project for right input of lookup join");
PhysicalProject oldProject = (PhysicalProject) rightInput;
PhysicalTableScan oldTableScan = (PhysicalTableScan) oldProject.getInput(0);
PhysicalTableScan newTableScan =
(PhysicalTableScan) oldTableScan.copy(oldTableScan.getTraitSet().plus(
RelDistributions.BROADCAST_DISTRIBUTED), Collections.emptyList());
PhysicalProject newProject =
(PhysicalProject) oldProject.copy(oldProject.getTraitSet().plus(RelDistributions.BROADCAST_DISTRIBUTED),
List.of(newTableScan));
return join.copy(join.getTraitSet(), List.of(leftInput, newProject));
}

@SuppressWarnings("unused")
private RelNode assignDynamicFilterSemiJoin(PhysicalJoin join) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,13 @@ public enum ExchangeStrategy {
/**
* Records are sent randomly from a given worker in the sender to some worker in the receiver.
*/
RANDOM_EXCHANGE(false);
RANDOM_EXCHANGE(false),
/**
* Pseudo-exchange for lookup join right side. The dim table stays in the same plan fragment as
* the join (no fragment split). Inserted by {@code LookupJoinRule} after worker/exchange assignment;
* handled transparently by {@code PlanFragmentAndMailboxAssignment.processLookupLocalExchange}.
*/
LOOKUP_LOCAL_EXCHANGE(false);

/**
* This is true when the Exchange Strategy is such that it requires a List<Integer> representing the
Expand Down Expand Up @@ -110,6 +116,12 @@ public static RelDistribution getRelDistribution(ExchangeStrategy exchangeStrate
return RelDistributions.ROUND_ROBIN_DISTRIBUTED;
case RANDOM_EXCHANGE:
return RelDistributions.RANDOM_DISTRIBUTED;
case LOOKUP_LOCAL_EXCHANGE:
// LOOKUP_LOCAL is a pseudo-exchange (no fragment split, no mailbox). This mapping is only used
// by the PhysicalExchange constructor to satisfy the Calcite Exchange superclass — it has no
// runtime significance since PlanFragmentAndMailboxAssignment handles LOOKUP_LOCAL transparently.
// Uses RANDOM_DISTRIBUTED as placeholder (Calcite's Exchange constructor rejects ANY).
return RelDistributions.RANDOM_DISTRIBUTED;
default:
throw new IllegalStateException(String.format("Unexpected exchange strategy: %s", exchangeStrategy));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import org.apache.pinot.query.routing.MailboxInfos;
import org.apache.pinot.query.routing.QueryServerInstance;
import org.apache.pinot.query.routing.SharedMailboxInfos;
import org.apache.pinot.spi.config.table.TableType;


/**
Expand Down Expand Up @@ -87,11 +88,15 @@ private void process(PRelNode pRelNode, @Nullable PlanNode parent, int currentFr
processTableScan((PhysicalTableScan) pRelNode.unwrap(), currentFragmentId, context);
}
if (pRelNode.unwrap() instanceof PhysicalExchange) {
PhysicalExchange physicalExchange = (PhysicalExchange) pRelNode.unwrap();
if (physicalExchange.getExchangeStrategy() == ExchangeStrategy.LOOKUP_LOCAL_EXCHANGE) {
processLookupLocalExchange(pRelNode, parent, currentFragmentId, context);
return;
}
// Split an exchange into two fragments: one for the sender and one for the receiver.
// The sender fragment will have a MailboxSendNode and receiver a MailboxReceiveNode.
// It is possible that the receiver fragment doesn't exist yet (e.g. when PhysicalExchange is the root node).
// In that case, we also create it here. If it exists already, we simply re-use it.
PhysicalExchange physicalExchange = (PhysicalExchange) pRelNode.unwrap();
PlanFragment receiverFragment = context._planFragmentMap.get(currentFragmentId);
int senderFragmentId = context._planFragmentMap.size() + (receiverFragment == null ? 1 : 0);
final DataSchema inputFragmentSchema = PRelToPlanNodeConverter.toDataSchema(
Expand Down Expand Up @@ -173,6 +178,77 @@ private void processTableScan(PhysicalTableScan tableScan, int currentFragmentId
}
}

/**
* Handles LOOKUP_LOCAL_EXCHANGE: a pseudo-exchange that does NOT split fragments. The dim table
* stays in the join's fragment. This method:
* <ol>
* <li>Registers the dim table name so the fragment is classified as a leaf stage</li>
* <li>Sets fake empty segments per worker (the dim table is accessed via
* {@code DimensionTableDataManager} at runtime, not via segment routing)</li>
* <li>Converts children to PlanNodes in the same fragment (no MailboxSend/Receive)</li>
* </ol>
* This matches V1's behavior in {@code WorkerManager.assignWorkersToNonRootFragment} where
* lookup joins are detected and the dim table is registered with empty segments.
*/
private void processLookupLocalExchange(PRelNode pRelNode, @Nullable PlanNode parent, int currentFragmentId,
Context context) {
// Find the dim table scan in the exchange's children and register it with empty segments.
DispatchablePlanMetadata fragmentMetadata = context._fragmentMetadataMap.get(currentFragmentId);
for (PRelNode child : pRelNode.getPRelInputs()) {
registerDimTableInFragment(child, fragmentMetadata);
}
// Process children in the same fragment (no MailboxSend/Receive), but skip processTableScan
// by converting PRelNodes to PlanNodes directly. The right side of a lookup join is always
// [Project →] TableScan (at most 2 levels deep) — Calcite pushes dim-side filters to post-join.
for (PRelNode child : pRelNode.getPRelInputs()) {
PlanNode planNode = PRelToPlanNodeConverter.toPlanNode(child, currentFragmentId);
for (PRelNode grandChild : child.getPRelInputs()) {
Preconditions.checkState(grandChild.getPRelInputs().isEmpty(),
"LOOKUP_LOCAL_EXCHANGE right side deeper than 2 levels: found children under %s. "
+ "Expected [Project →] TableScan only.", grandChild.unwrap().getClass().getSimpleName());
PlanNode grandChildNode = PRelToPlanNodeConverter.toPlanNode(grandChild, currentFragmentId);
planNode.getInputs().add(grandChildNode);
}
if (parent != null) {
parent.getInputs().add(planNode);
}
}
}

/**
* Recursively find TableScan nodes and register the dim table name in the fragment metadata with
* fake empty segments per worker, matching V1's {@code WorkerManager.assignWorkersToNonRootFragment}
* behavior for lookup joins.
*/
private void registerDimTableInFragment(PRelNode pRelNode, DispatchablePlanMetadata fragmentMetadata) {
if (pRelNode.unwrap() instanceof TableScan) {
PhysicalTableScan tableScan = (PhysicalTableScan) pRelNode.unwrap();
TableScanMetadata tableScanMetadata = Objects.requireNonNull(tableScan.getTableScanMetadata(),
"No metadata in table scan PRelNode");
String tableName = tableScanMetadata.getScannedTables().stream().findFirst().orElseThrow();
fragmentMetadata.addScannedTable(tableName);
// Set fake empty segments for each worker so isLeafStageWorker() returns true.
// The actual dim table data comes from DimensionTableDataManager at runtime.
// Use putIfAbsent rather than overwrite to be defensive if called multiple times.
Map<Integer, QueryServerInstance> workers = fragmentMetadata.getWorkerIdToServerInstanceMap();
if (workers != null) {
Map<Integer, Map<String, List<String>>> existing = fragmentMetadata.getWorkerIdToSegmentsMap();
Map<Integer, Map<String, List<String>>> fakeSegmentsMap =
existing != null ? new HashMap<>(existing) : new HashMap<>();
for (Integer workerId : workers.keySet()) {
fakeSegmentsMap.putIfAbsent(workerId, Map.of(TableType.OFFLINE.name(), List.of()));
}
fragmentMetadata.setWorkerIdToSegmentsMap(fakeSegmentsMap);
}
NodeHint nodeHint = NodeHint.fromRelHints(tableScan.getHints());
fragmentMetadata.setTableOptions(nodeHint.getHintOptions().get(PinotHintOptions.TABLE_HINT_OPTIONS));
return;
}
for (PRelNode child : pRelNode.getPRelInputs()) {
registerDimTableInFragment(child, fragmentMetadata);
}
}

private PlanFragment createFragment(int fragmentId, PlanNode planNode, List<PlanFragment> inputFragments,
Context context, List<String> workers) {
// track new plan fragment
Expand Down Expand Up @@ -248,6 +324,9 @@ private void computeMailboxInfos(int senderStageId, int receiverStageId,
}
break;
}
case LOOKUP_LOCAL_EXCHANGE:
throw new IllegalStateException("LOOKUP_LOCAL_EXCHANGE should not reach computeMailboxInfos — "
+ "it must be handled as transparent in process() before fragment splitting");
default:
throw new UnsupportedOperationException("exchange desc not supported yet: " + exchangeDesc);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import org.apache.pinot.query.planner.physical.v2.opt.rules.LeafStageWorkerAssignmentRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeSortInsertRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.LiteModeWorkerAssignmentRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.LookupJoinRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.RootExchangeInsertRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.SortPushdownRule;
import org.apache.pinot.query.planner.physical.v2.opt.rules.WorkerExchangeAssignmentRule;
Expand All @@ -44,6 +45,7 @@ public static List<PRelNodeTransformer> create(PhysicalPlannerContext context, T
context));
transformers.add(create(new LeafStageAggregateRule(context), RuleExecutors.Type.POST_ORDER, context));
transformers.add(createWorkerAssignmentRule(context));
transformers.add(new LookupJoinRule(context));
transformers.add(create(new AggregatePushdownRule(context), RuleExecutors.Type.POST_ORDER, context));
transformers.add(create(new SortPushdownRule(context), RuleExecutors.Type.POST_ORDER, context));
if (context.isUseLiteMode()) {
Expand Down
Loading
Loading