Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
34 commits
Select commit Hold shift + click to select a range
396e826
Consume new metrics added in HBase for bytes read from cache vs FS vs…
Sep 10, 2025
5cf01f2
Account for cross RS scans from Unconvered indexex and read repairs
Sep 21, 2025
8b362b8
Add tests for different query plans
Sep 22, 2025
89226cf
Added tests for different query plans
Sep 23, 2025
55644e9
Added tests for different query plans
Sep 23, 2025
d2bbca3
Prepare to create PR
Sep 24, 2025
7c485d1
Prepare to create PR
Sep 24, 2025
4d4f05b
Make it backward compatible
Sep 24, 2025
440fecf
Add phoenix-hbase-compat-2.6.4
Oct 8, 2025
d923638
Merge branch 'master' into new-hbase-metrics
Oct 8, 2025
dee0a5f
Fix 2.6.4 compat module build
Oct 8, 2025
56f80f0
Fix modules list with new phoenix-hbase-compat-2.6.4
Oct 8, 2025
16174de
Slowest scan metrics initial metrics
Oct 15, 2025
58f4f1b
Add flag for noop
Oct 15, 2025
88c2f26
Add iterator Id
Oct 16, 2025
1c61986
Capture top N slowest scan paths and scan metrics by region
Oct 18, 2025
1005785
Integrate scan metrics by region
Oct 18, 2025
b01edfb
Expose toJson()
Oct 23, 2025
395de72
Get a working implmentation for top N slowest scans
Oct 28, 2025
31f9332
Commit perf validated changes
Nov 5, 2025
a813351
Addig ITs
Nov 6, 2025
3e8c4ee
Addig ITs
Nov 6, 2025
0cbfcf1
Addig ITs
Nov 7, 2025
5301a38
Addig ITs
Nov 8, 2025
a9baadb
Adding ITs
Nov 9, 2025
40a71ef
Adding ITs for non-Ultra HA
Nov 11, 2025
2c247f9
Add Ultra HA ITs and UTs
Nov 13, 2025
314e846
Merge remote-tracking branch 'apache/master' into slowest-scan-metrics
Nov 13, 2025
065438b
Add paging related IT
Nov 13, 2025
64150af
Merge remote-tracking branch 'apache/master' into new-hbase-metrics
Nov 13, 2025
c5a168f
Merge branch 'new-hbase-metrics' into slowest-scan-metrics
Nov 13, 2025
19de520
Prepare to create PR
Nov 13, 2025
b82f78f
For demo
Dec 4, 2025
903d67c
Merge branch 'master' into slowest-scan-metrics
Dec 26, 2025
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@

import java.sql.SQLException;
import java.text.Format;
import java.util.ArrayDeque;
import java.util.ArrayList;
import java.util.Collections;
import java.util.Deque;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -33,6 +36,9 @@
import org.apache.phoenix.log.QueryLogger;
import org.apache.phoenix.monitoring.OverAllQueryMetrics;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsGroup;
import org.apache.phoenix.monitoring.SlowestScanMetricsQueue;
import org.apache.phoenix.monitoring.TopNTreeMultiMap;
import org.apache.phoenix.parse.SelectStatement;
import org.apache.phoenix.query.QueryConstants;
import org.apache.phoenix.query.QueryServices;
Expand Down Expand Up @@ -91,6 +97,8 @@ public class StatementContext {
private Integer totalSegmentsValue;
private boolean hasRowSizeFunction = false;
private boolean hasRawRowSizeFunction = false;
private final SlowestScanMetricsQueue slowestScanMetricsQueue;
private final int slowestScanMetricsCount;

public StatementContext(PhoenixStatement statement) {
this(statement, new Scan());
Expand All @@ -116,6 +124,8 @@ public StatementContext(StatementContext context) {
this.subqueryResults = context.subqueryResults;
this.readMetricsQueue = context.readMetricsQueue;
this.overAllQueryMetrics = context.overAllQueryMetrics;
this.slowestScanMetricsQueue = context.slowestScanMetricsQueue;
this.slowestScanMetricsCount = context.slowestScanMetricsCount;
this.queryLogger = context.queryLogger;
this.isClientSideUpsertSelect = context.isClientSideUpsertSelect;
this.isUncoveredIndex = context.isUncoveredIndex;
Expand Down Expand Up @@ -180,6 +190,10 @@ public StatementContext(PhoenixStatement statement, ColumnResolver resolver, Bin
this.readMetricsQueue = new ReadMetricQueue(isRequestMetricsEnabled, connection.getLogLevel());
this.overAllQueryMetrics =
new OverAllQueryMetrics(isRequestMetricsEnabled, connection.getLogLevel());
this.slowestScanMetricsCount = connection.getSlowestScanMetricsCount();
this.slowestScanMetricsQueue = slowestScanMetricsCount > 0
? new SlowestScanMetricsQueue()
: SlowestScanMetricsQueue.NOOP_SLOWEST_SCAN_METRICS_QUEUE;
this.retryingPersistentCache = Maps.<Long, Boolean> newHashMap();
this.hasFirstValidResult = new AtomicBoolean(false);
this.subStatementContexts = Sets.newHashSet();
Expand Down Expand Up @@ -475,4 +489,63 @@ public Integer getTotalSegmentsValue() {
public void setTotalSegmentsValue(Integer totalSegmentsValue) {
this.totalSegmentsValue = totalSegmentsValue;
}

public SlowestScanMetricsQueue getSlowestScanMetricsQueue() {
return slowestScanMetricsQueue;
}

public List<List<ScanMetricsGroup>> getTopNSlowestScanMetrics() {
if (slowestScanMetricsCount <= 0) {
return Collections.emptyList();
}
TopNTreeMultiMap<Long, List<ScanMetricsGroup>> slowestScanMetricsGroups =
new TopNTreeMultiMap<>(slowestScanMetricsCount, (s1, s2) -> Long.compare(s2, s1));
getSlowestScanMetricsUtil(this, new ArrayDeque<>(), 0, slowestScanMetricsGroups);
List<List<ScanMetricsGroup>> topNSlowestScanMetricsGroups =
new ArrayList<>(slowestScanMetricsGroups.size());
for (Map.Entry<Long, List<ScanMetricsGroup>> entry : slowestScanMetricsGroups.entries()) {
topNSlowestScanMetricsGroups.add(entry.getValue());
}
return topNSlowestScanMetricsGroups;
}

private static void getSlowestScanMetricsUtil(StatementContext node,
Deque<ScanMetricsGroup> currentScanMetricsGroups, long sumOfMillisBetweenNexts,
TopNTreeMultiMap<Long, List<ScanMetricsGroup>> topNSlowestScanMetricsGroups) {
Iterator<ScanMetricsGroup> currentScanMetricsGroupIterator =
node.getSlowestScanMetricsQueue().getIterator();
if (currentScanMetricsGroupIterator.hasNext()) {
do {
ScanMetricsGroup currentScanMetricsGroup = currentScanMetricsGroupIterator.next();
long currentMillisBetweenNexts = currentScanMetricsGroup.getSumOfMillisSecBetweenNexts();
long newSumOfMillisBetweenNexts = sumOfMillisBetweenNexts + currentMillisBetweenNexts;

// Add at tail of the dequeue
currentScanMetricsGroups.addLast(currentScanMetricsGroup);

traverseSlowestScanMetricsTree(node, currentScanMetricsGroups, newSumOfMillisBetweenNexts,
topNSlowestScanMetricsGroups);
currentScanMetricsGroups.removeLast();
} while (currentScanMetricsGroupIterator.hasNext());
} else {
traverseSlowestScanMetricsTree(node, currentScanMetricsGroups, sumOfMillisBetweenNexts,
topNSlowestScanMetricsGroups);
}
}

private static void traverseSlowestScanMetricsTree(StatementContext node,
Deque<ScanMetricsGroup> currentScanMetricsGroups, long sumOfMillisBetweenNexts,
TopNTreeMultiMap<Long, List<ScanMetricsGroup>> topNSlowestScanMetricsGroups) {
Set<StatementContext> subContexts = node.getSubStatementContexts();
if (subContexts == null || subContexts.isEmpty()) {
topNSlowestScanMetricsGroups.put(sumOfMillisBetweenNexts,
() -> new ArrayList<>(currentScanMetricsGroups));
} else {
// Process sub-contexts
for (StatementContext sub : subContexts) {
getSlowestScanMetricsUtil(sub, currentScanMetricsGroups, sumOfMillisBetweenNexts,
topNSlowestScanMetricsGroups);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import org.apache.phoenix.cache.ServerCacheClient.ServerCache;
import org.apache.phoenix.compile.QueryPlan;
import org.apache.phoenix.hbase.index.util.ImmutableBytesPtr;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.job.JobManager.JobCallable;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
Expand Down Expand Up @@ -115,8 +116,10 @@ protected void submitWork(final List<List<Scan>> nestedScans,
context.getConnection().getQueryServices().getRenewLeaseThresholdMilliSeconds();
for (final ScanLocator scanLocation : scanLocations) {
final Scan scan = scanLocation.getScan();
final ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics,
physicalTableName, scan, context.getConnection().getLogLevel());
PhoenixConnection connection = context.getConnection();
final ScanMetricsHolder scanMetricsHolder =
ScanMetricsHolder.getInstance(readMetrics, physicalTableName, scan,
connection.getLogLevel(), connection.isScanMetricsByRegionEnabled());
final TaskExecutionMetricsHolder taskMetrics =
new TaskExecutionMetricsHolder(readMetrics, physicalTableName);
final TableResultIterator tableResultItr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,18 +48,22 @@
import java.sql.SQLException;
import java.util.List;
import java.util.Map;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.Result;
import org.apache.hadoop.hbase.client.ResultScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.metrics.ScanMetrics;
import org.apache.hadoop.hbase.client.metrics.ScanMetricsRegionInfo;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.phoenix.compat.hbase.CompatScanMetrics;
import org.apache.phoenix.compile.ExplainPlanAttributes.ExplainPlanAttributesBuilder;
import org.apache.phoenix.compile.StatementContext;
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.jdbc.PhoenixConnection;
import org.apache.phoenix.monitoring.CombinableMetric;
import org.apache.phoenix.monitoring.GlobalClientMetrics;
import org.apache.phoenix.monitoring.ScanMetricsGroup;
import org.apache.phoenix.monitoring.ScanMetricsHolder;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.schema.tuple.ResultTuple;
Expand All @@ -83,6 +87,8 @@ public class ScanningResultIterator implements ResultIterator {

private final boolean isMapReduceContext;
private final long maxQueryEndTime;
private final TableName tableName;
private final boolean isScanMetricsByRegionEnabled;

private long dummyRowCounter = 0;

Expand All @@ -91,8 +97,9 @@ public class ScanningResultIterator implements ResultIterator {

public ScanningResultIterator(ResultScanner scanner, Scan scan,
ScanMetricsHolder scanMetricsHolder, StatementContext context, boolean isMapReduceContext,
long maxQueryEndTime) {
long maxQueryEndTime, TableName tableName) {
this.scanner = scanner;
this.tableName = tableName;
this.scanMetricsHolder = scanMetricsHolder;
this.context = context;
scanMetricsUpdated = false;
Expand All @@ -111,6 +118,7 @@ public ScanningResultIterator(ResultScanner scanner, Scan scan,
ScanningResultPostValidResultCaller.class);
this.scanningResultPostValidResultCaller = ReflectionUtils.newInstance(validResultCallerClazz,
context.getConnection().getQueryServices().getConfiguration());
this.isScanMetricsByRegionEnabled = scan.isScanMetricsByRegionEnabled();
}

@Override
Expand Down Expand Up @@ -203,6 +211,21 @@ private void updateMetrics() {

changeMetric(GLOBAL_PAGED_ROWS_COUNTER, dummyRowCounter);

PhoenixConnection connection = context.getConnection();
int slowestScanMetricsCount = connection.getSlowestScanMetricsCount();
if (slowestScanMetricsCount > 0) {
ScanMetricsGroup scanMetricsGroup;
if (isScanMetricsByRegionEnabled) {
Map<ScanMetricsRegionInfo, Map<String, Long>> scanMetricsByRegion =
scanMetrics.collectMetricsByRegion();
scanMetricsGroup =
new ScanMetricsGroup(tableName.getNameAsString(), scanMetricsByRegion, scanMetricsMap);
} else {
scanMetricsGroup = new ScanMetricsGroup(tableName.getNameAsString(), scanMetricsMap);
}
context.getSlowestScanMetricsQueue().add(scanMetricsGroup);
}

scanMetricsUpdated = true;
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,8 +185,9 @@ private PeekingResultIterator nextIterator() throws SQLException {
currentScan.setAttribute(BaseScannerRegionObserverConstants.SCAN_OFFSET,
PInteger.INSTANCE.toBytes(remainingOffset));
}
PhoenixConnection connection = context.getConnection();
ScanMetricsHolder scanMetricsHolder = ScanMetricsHolder.getInstance(readMetrics, tableName,
currentScan, context.getConnection().getLogLevel());
currentScan, connection.getLogLevel(), connection.isScanMetricsByRegionEnabled());
TableResultIterator itr = new TableResultIterator(mutationState, currentScan,
scanMetricsHolder, renewLeaseThreshold, plan, scanGrouper, caches, maxQueryEndTime);
PeekingResultIterator peekingItr =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import javax.annotation.concurrent.GuardedBy;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.AbstractClientScanner;
import org.apache.hadoop.hbase.client.Scan;
import org.apache.hadoop.hbase.client.Table;
Expand Down Expand Up @@ -319,8 +320,9 @@ public void initScanner() throws SQLException {
if (ScanUtil.isReversed(scan)) {
ScanUtil.setupReverseScan(scan);
}
TableName tableName = htable.getName();
this.scanIterator = new ScanningResultIterator(htable.getScanner(scan), scan,
scanMetricsHolder, plan.getContext(), isMapReduceContext, maxQueryEndTime);
scanMetricsHolder, plan.getContext(), isMapReduceContext, maxQueryEndTime, tableName);
} catch (IOException e) {
Closeables.closeQuietly(htable);
throw ClientUtil.parseServerException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@ public UnionResultIterators(List<QueryPlan> plans, StatementContext parentStmtCt
readMetricsList = Lists.newArrayListWithCapacity(nPlans);
overAllQueryMetricsList = Lists.newArrayListWithCapacity(nPlans);
for (QueryPlan plan : plans) {
parentStmtCtx.addSubStatementContext(plan.getContext());
readMetricsList.add(plan.getContext().getReadMetricsQueue());
overAllQueryMetricsList.add(plan.getContext().getOverallQueryMetrics());
iterators.add(LookAheadResultIterator.wrap(plan.iterator()));
Expand Down Expand Up @@ -102,11 +103,12 @@ public void close() throws SQLException {
private void setMetricsInParentContext() {
ReadMetricQueue parentCtxReadMetrics = parentStmtCtx.getReadMetricsQueue();
for (ReadMetricQueue readMetrics : readMetricsList) {
parentCtxReadMetrics.combineReadMetrics(readMetrics);
parentCtxReadMetrics.combineReadMetrics(readMetrics, true);
}
OverAllQueryMetrics parentCtxQueryMetrics = parentStmtCtx.getOverallQueryMetrics();
for (OverAllQueryMetrics metric : overAllQueryMetricsList) {
parentCtxQueryMetrics.combine(metric);
metric.reset();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -33,6 +34,7 @@
import org.apache.phoenix.exception.SQLExceptionCode;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.ScanMetricsGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -240,6 +242,17 @@ public Map<String, Map<MetricType, Long>> getReadMetrics() {
return metrics;
}

@Override
public List<List<ScanMetricsGroup>> getTopNSlowestScanMetrics() {
List<List<ScanMetricsGroup>> metrics;
if (rs != null) {
metrics = ((PhoenixMonitoredResultSet) rs).getTopNSlowestScanMetrics();
} else {
metrics = Collections.emptyList();
}
return metrics;
}

@Override
public Map<MetricType, Long> getOverAllRequestReadMetrics() {
Map<MetricType, Long> metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@

import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
Expand All @@ -29,6 +30,7 @@
import java.util.function.Function;
import org.apache.phoenix.exception.SQLExceptionInfo;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.ScanMetricsGroup;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -133,6 +135,17 @@ public Map<String, Map<MetricType, Long>> getReadMetrics() {
return metrics;
}

@Override
public List<List<ScanMetricsGroup>> getTopNSlowestScanMetrics() {
List<List<ScanMetricsGroup>> metrics;
if (rs != null) {
metrics = ((PhoenixMonitoredResultSet) rs).getTopNSlowestScanMetrics();
} else {
metrics = Collections.emptyList();
}
return metrics;
}

@Override
public Map<MetricType, Long> getOverAllRequestReadMetrics() {
Map<MetricType, Long> metrics;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,8 @@ public class PhoenixConnection
private String sourceOfOperation;
private volatile SQLException reasonForClose;
private static final String[] CONNECTION_PROPERTIES;
private final int slowestScanMetricsCount;
private final boolean isScanMetricsByRegionEnabled;

private final ConcurrentLinkedQueue<PhoenixConnection> childConnections =
new ConcurrentLinkedQueue<>();
Expand Down Expand Up @@ -320,6 +322,10 @@ public ReadOnlyProps getProps() {
this.mutateBatchSize = JDBCUtil.getMutateBatchSize(url, this.info, this.services.getProps());
this.mutateBatchSizeBytes =
JDBCUtil.getMutateBatchSizeBytes(url, this.info, this.services.getProps());
this.slowestScanMetricsCount =
JDBCUtil.getSlowestScanMetricsCount(url, this.info, this.services.getProps());
this.isScanMetricsByRegionEnabled =
JDBCUtil.isScanMetricsByRegionEnabled(url, this.info, this.services.getProps());
datePattern =
this.services.getProps().get(QueryServices.DATE_FORMAT_ATTRIB, DateUtil.DEFAULT_DATE_FORMAT);
timePattern =
Expand Down Expand Up @@ -1489,4 +1495,12 @@ public ConnectionActivityLogger getActivityLogger() {
public void setActivityLogger(ConnectionActivityLogger connectionActivityLogger) {
this.connectionActivityLogger = connectionActivityLogger;
}

public int getSlowestScanMetricsCount() {
return slowestScanMetricsCount;
}

public boolean isScanMetricsByRegionEnabled() {
return slowestScanMetricsCount > 0 && isScanMetricsByRegionEnabled;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,17 @@
package org.apache.phoenix.jdbc;

import java.sql.ResultSet;
import java.util.List;
import java.util.Map;
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.ScanMetricsGroup;

public interface PhoenixMonitoredResultSet extends ResultSet {

Map<String, Map<MetricType, Long>> getReadMetrics();

List<List<ScanMetricsGroup>> getTopNSlowestScanMetrics();

Map<MetricType, Long> getOverAllRequestReadMetrics();

void resetMetrics();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,7 @@
import org.apache.phoenix.monitoring.MetricType;
import org.apache.phoenix.monitoring.OverAllQueryMetrics;
import org.apache.phoenix.monitoring.ReadMetricQueue;
import org.apache.phoenix.monitoring.ScanMetricsGroup;
import org.apache.phoenix.monitoring.TableMetricsManager;
import org.apache.phoenix.schema.PColumn;
import org.apache.phoenix.schema.PColumnImpl;
Expand Down Expand Up @@ -1496,6 +1497,11 @@ public Map<String, Map<MetricType, Long>> getReadMetrics() {
return one.aggregate();
}

@Override
public List<List<ScanMetricsGroup>> getTopNSlowestScanMetrics() {
return context.getTopNSlowestScanMetrics();
}

@Override
public Map<MetricType, Long> getOverAllRequestReadMetrics() {
return overAllQueryMetrics.publish();
Expand Down
Loading