Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,9 @@ public enum ControllerGauge implements AbstractMetrics.Gauge {
// TODO: Unify below subtask metrics into a single metric with status label
NUM_MINION_TASKS_IN_PROGRESS("NumMinionTasksInProgress", true),
NUM_MINION_SUBTASKS_WAITING("NumMinionSubtasksWaiting", true),
MAX_SUBTASK_WAIT_TIME_MS("MaxSubtaskWaitTimeMs", false),
NUM_MINION_SUBTASKS_RUNNING("NumMinionSubtasksRunning", true),
MAX_SUBTASK_RUNNING_TIME_MS("MaxSubtaskRunningTimeMs", false),
NUM_MINION_SUBTASKS_ERROR("NumMinionSubtasksError", true),
NUM_MINION_SUBTASKS_UNKNOWN("NumMinionSubtasksUnknown", true),
NUM_MINION_SUBTASKS_DROPPED("NumMinionSubtasksDropped", true),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,10 +38,7 @@ public enum ControllerTimer implements AbstractMetrics.Timer {
DEEP_STORE_SEGMENT_WRITE_TIME_MS("deepStoreSegmentWriteTimeMs", true),
// Audit logging timers
AUDIT_REQUEST_PROCESSING_TIME("auditRequestProcessingTime", true),
AUDIT_RESPONSE_PROCESSING_TIME("auditResponseProcessingTime", true),
// Log subtask waiting (until not started) and running (until not completed) time
SUBTASK_WAITING_TIME("subtaskWaitingTime", false),
SUBTASK_RUNNING_TIME("subtaskRunningTime", false);
AUDIT_RESPONSE_PROCESSING_TIME("auditResponseProcessingTime", true);


private final String _timerName;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,10 +25,8 @@
import java.util.Map;
import java.util.Properties;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import org.apache.pinot.common.metrics.ControllerGauge;
import org.apache.pinot.common.metrics.ControllerMetrics;
import org.apache.pinot.common.metrics.ControllerTimer;
import org.apache.pinot.controller.ControllerConf;
import org.apache.pinot.controller.LeadControllerManager;
import org.apache.pinot.controller.helix.core.PinotHelixResourceManager;
Expand Down Expand Up @@ -109,6 +107,8 @@ protected final void runTask(Properties periodicTaskProperties) {
for (String taskType : taskTypes) {
TaskCount taskTypeAccumulatedCount = new TaskCount();
Map<String, TaskCount> tableAccumulatedCount = new HashMap<>();
Map<String, Long> tableMaxWaitTimeMs = new HashMap<>();
Map<String, Long> tableMaxRunningTimeMs = new HashMap<>();
try {
// Capture the current execution timestamp for this task type collection cycle
long currentExecutionTimestamp = System.currentTimeMillis();
Expand Down Expand Up @@ -147,12 +147,10 @@ protected final void runTask(Properties periodicTaskProperties) {
}
count.accumulate(taskCount);
taskStatusSummary.getSubtaskWaitingTimes().values().forEach(subtaskWaitingTime -> {
_controllerMetrics.addTimedTableValue(tableNameWithType, ControllerTimer.SUBTASK_WAITING_TIME,
subtaskWaitingTime, TimeUnit.MILLISECONDS);
tableMaxWaitTimeMs.merge(tableNameWithType, subtaskWaitingTime, Math::max);
});
taskStatusSummary.getSubtaskRunningTimes().values().forEach(subtaskRunningTime -> {
_controllerMetrics.addTimedTableValue(tableNameWithType, ControllerTimer.SUBTASK_RUNNING_TIME,
subtaskRunningTime, TimeUnit.MILLISECONDS);
tableMaxRunningTimeMs.merge(tableNameWithType, subtaskRunningTime, Math::max);
});
return count;
});
Expand Down Expand Up @@ -211,6 +209,16 @@ protected final void runTask(Properties periodicTaskProperties) {
ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR, tablePercent);
});

// Emit 0 for tables with no waiting/running subtasks so the gauge (and alert) self-resolves
tableAccumulatedCount.keySet().forEach(tableNameWithType -> {
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
ControllerGauge.MAX_SUBTASK_WAIT_TIME_MS,
tableMaxWaitTimeMs.getOrDefault(tableNameWithType, 0L));
_controllerMetrics.setOrUpdateTableGauge(tableNameWithType, taskType,
ControllerGauge.MAX_SUBTASK_RUNNING_TIME_MS,
tableMaxRunningTimeMs.getOrDefault(tableNameWithType, 0L));
});

if (_preReportedTables.containsKey(taskType)) {
Set<String> tableNameWithTypeSet = _preReportedTables.get(taskType);
tableNameWithTypeSet.removeAll(tableAccumulatedCount.keySet());
Expand Down Expand Up @@ -286,6 +294,8 @@ private void removeTableTaskTypeMetrics(Set<String> tableNameWithTypeSet, String
ControllerGauge.PERCENT_MINION_SUBTASKS_IN_QUEUE);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType,
ControllerGauge.PERCENT_MINION_SUBTASKS_IN_ERROR);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType, ControllerGauge.MAX_SUBTASK_WAIT_TIME_MS);
_controllerMetrics.removeTableGauge(tableNameWithType, taskType, ControllerGauge.MAX_SUBTASK_RUNNING_TIME_MS);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -174,8 +174,9 @@ public void taskType1WithTwoTablesEmitMetricTwice() {
private void runAndAssertForTaskType1WithTwoTables() {
PinotMetricsRegistry metricsRegistry = _controllerMetrics.getMetricsRegistry();
_taskMetricsEmitter.runTask(null);
// Expected 31 metrics: 29 original + 2 timing metrics (SUBTASK_WAITING_TIME and SUBTASK_RUNNING_TIME)
Assert.assertEquals(metricsRegistry.allMetrics().size(), 31);
// Expected 33 metrics: 29 original + MAX_SUBTASK_WAIT_TIME_MS gauge (x2 tables)
// + MAX_SUBTASK_RUNNING_TIME_MS gauge (x2 tables)
Assert.assertEquals(metricsRegistry.allMetrics().size(), 33);

Assert.assertTrue(metricsRegistry.allMetrics().containsKey(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.onlineMinionInstances")));
Expand Down Expand Up @@ -250,6 +251,26 @@ private void runAndAssertForTaskType1WithTwoTables() {
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.percentMinionSubtasksInError.table2_OFFLINE.taskType1"))
.getMetric()).value(), 50L);

// table1 has a waiting subtask (subtask2, 3000ms); table2 has none (0ms)
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.maxSubtaskWaitTimeMs.table1_OFFLINE.taskType1"))
.getMetric()).value(), 3000L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.maxSubtaskWaitTimeMs.table2_OFFLINE.taskType1"))
.getMetric()).value(), 0L);

// table2 has a running subtask (subtask1, 5000ms); table1 has none (0ms)
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.maxSubtaskRunningTimeMs.table1_OFFLINE.taskType1"))
.getMetric()).value(), 0L);
Assert.assertEquals(((YammerSettableGauge<?>) metricsRegistry.allMetrics().get(
new YammerMetricName(ControllerMetrics.class,
"pinot.controller.maxSubtaskRunningTimeMs.table2_OFFLINE.taskType1"))
.getMetric()).value(), 5000L);
}

@Test
Expand Down Expand Up @@ -281,9 +302,9 @@ private void oneTaskTypeWithOneTable(String taskType, String taskName1, String t

PinotMetricsRegistry metricsRegistry = _controllerMetrics.getMetricsRegistry();
_taskMetricsEmitter.runTask(null);
// Expected at least 21 metrics: 20 original + 1 timing metric (SUBTASK_WAITING_TIME)
// The actual count may vary slightly based on test execution order
Assert.assertTrue(metricsRegistry.allMetrics().size() >= 21);
// Expected at least 22 metrics: 20 original + MAX_SUBTASK_WAIT_TIME_MS gauge (x1 table)
// + MAX_SUBTASK_RUNNING_TIME_MS gauge (x1 table). Count may vary based on test execution order.
Assert.assertTrue(metricsRegistry.allMetrics().size() >= 22);

Assert.assertTrue(metricsRegistry.allMetrics().containsKey(
new YammerMetricName(ControllerMetrics.class, "pinot.controller.onlineMinionInstances")));
Expand Down
Loading