Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

## master / unreleased

* [ENHANCEMENT] Query Scheduler: Add `cortex_query_scheduler_tracked_requests` metric to track the current number of requests held by the scheduler. #7355

## 1.21.0 in progress

* [CHANGE] Ruler: Graduate Ruler API from experimental. #7312
Expand Down
27 changes: 20 additions & 7 deletions pkg/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ type Scheduler struct {

pendingRequestsMu sync.Mutex

pendingRequests map[requestKey]*schedulerRequest // Request is kept in this map even after being dispatched to querier. It can still be canceled at that time.
trackedRequests map[requestKey]*schedulerRequest // Request is kept in this map even after being dispatched to querier. It can still be canceled at that time.

// Subservices manager.
subservices *services.Manager
Expand All @@ -72,6 +72,7 @@ type Scheduler struct {
connectedQuerierClients prometheus.GaugeFunc
connectedFrontendClients prometheus.GaugeFunc
queueDuration prometheus.Histogram
trackedRequestsLength prometheus.GaugeFunc

// Enables or disables distributed query execution functionality
distributedExecEnabled bool
Expand Down Expand Up @@ -120,7 +121,7 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
log: log,
limits: limits,

pendingRequests: map[requestKey]*schedulerRequest{},
trackedRequests: map[requestKey]*schedulerRequest{},
connectedFrontends: map[string]*connectedFrontend{},

fragmentTable: fragment_table.NewFragmentTable(2 * time.Minute),
Expand Down Expand Up @@ -155,6 +156,11 @@ func NewScheduler(cfg Config, limits Limits, log log.Logger, registerer promethe
Help: "Number of query-frontend worker clients currently connected to the query-scheduler.",
}, s.getConnectedFrontendClientsMetric)

s.trackedRequestsLength = promauto.With(registerer).NewGaugeFunc(prometheus.GaugeOpts{
Name: "cortex_query_scheduler_tracked_requests",
Help: "Number of requests currently tracked by the scheduler.",
}, s.getPendingRequestsMetric)

s.activeUsers = users.NewActiveUsersCleanupWithDefaultValues(s.cleanupMetricsForInactiveUser)

var err error
Expand Down Expand Up @@ -434,7 +440,7 @@ func (s *Scheduler) enqueueRequest(frontendContext context.Context, frontendAddr

queryKey := queryKey{frontendAddr: frontendAddr, queryID: msg.QueryID}
s.queryFragmentRegistry[queryKey] = append(s.queryFragmentRegistry[queryKey], req.fragment.FragmentID)
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: req.fragment.FragmentID}] = req
s.trackedRequests[requestKey{queryKey: queryKey, fragmentID: req.fragment.FragmentID}] = req
})
}

Expand All @@ -449,19 +455,19 @@ func (s *Scheduler) cancelRequestAndRemoveFromPending(frontendAddr string, query
// cancel all requests under the queryID
for _, fragID := range s.queryFragmentRegistry[querykey] {
key := requestKey{queryKey: querykey, fragmentID: fragID}
if req := s.pendingRequests[key]; req != nil {
if req := s.trackedRequests[key]; req != nil {
req.ctxCancel()
}
delete(s.pendingRequests, key)
delete(s.trackedRequests, key)
}
delete(s.queryFragmentRegistry, querykey)
} else {
// cancel specific fragment of the query by its queryID and fragmentID
key := requestKey{queryKey: querykey, fragmentID: fragmentID}
if req := s.pendingRequests[key]; req != nil {
if req := s.trackedRequests[key]; req != nil {
req.ctxCancel()
}
delete(s.pendingRequests, key)
delete(s.trackedRequests, key)

// Clean up queryFragmentRegistry for this specific fragment
if fragmentIDs, ok := s.queryFragmentRegistry[querykey]; ok {
Expand Down Expand Up @@ -710,3 +716,10 @@ func (s *Scheduler) getConnectedFrontendClientsMetric() float64 {

return float64(count)
}

func (s *Scheduler) getPendingRequestsMetric() float64 {
s.pendingRequestsMu.Lock()
defer s.pendingRequestsMu.Unlock()

return float64(len(s.trackedRequests))
}
121 changes: 102 additions & 19 deletions pkg/scheduler/scheduler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -257,9 +257,9 @@ func TestTracingContext(t *testing.T) {

scheduler.pendingRequestsMu.Lock()
defer scheduler.pendingRequestsMu.Unlock()
require.Equal(t, 1, len(scheduler.pendingRequests))
require.Equal(t, 1, len(scheduler.trackedRequests))

for _, r := range scheduler.pendingRequests {
for _, r := range scheduler.trackedRequests {
require.NotNil(t, r.parentSpanContext)
}
}
Expand Down Expand Up @@ -674,7 +674,7 @@ func verifyNoPendingRequestsLeft(t *testing.T, scheduler *Scheduler) {
test.Poll(t, 1*time.Second, 0, func() any {
scheduler.pendingRequestsMu.Lock()
defer scheduler.pendingRequestsMu.Unlock()
return len(scheduler.pendingRequests)
return len(scheduler.trackedRequests)
})
}

Expand Down Expand Up @@ -725,23 +725,23 @@ func TestQueryFragmentRegistryCleanupSingleFragment(t *testing.T) {
s.pendingRequestsMu.Lock()
queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID}
s.queryFragmentRegistry[queryKey] = []uint64{fragmentID}
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req
s.trackedRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req
s.pendingRequestsMu.Unlock()

// Verify both entries exist
s.pendingRequestsMu.Lock()
require.Len(t, s.queryFragmentRegistry[queryKey], 1)
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID})
require.Contains(t, s.trackedRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID})
s.pendingRequestsMu.Unlock()

// Simulate request completion (cancelAll=false)
s.cancelRequestAndRemoveFromPending(frontendAddr, queryID, fragmentID, false)

// Verify cleanup: both pendingRequests AND queryFragmentRegistry should be cleaned up
// Verify cleanup: both trackedRequests AND queryFragmentRegistry should be cleaned up
s.pendingRequestsMu.Lock()
_, registryExists := s.queryFragmentRegistry[queryKey]
require.False(t, registryExists, "queryFragmentRegistry should be cleaned up when last fragment completes")
require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID}, "pendingRequests should be cleaned up")
require.NotContains(t, s.trackedRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID}, "trackedRequests should be cleaned up")
s.pendingRequestsMu.Unlock()
}

Expand Down Expand Up @@ -792,15 +792,15 @@ func TestQueryFragmentRegistryCleanupMultipleFragments(t *testing.T) {
s.pendingRequestsMu.Lock()
queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID}
s.queryFragmentRegistry[queryKey] = []uint64{fragmentID1, fragmentID2, fragmentID3}
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID1}] = req1
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID2}] = req2
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID3}] = req3
s.trackedRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID1}] = req1
s.trackedRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID2}] = req2
s.trackedRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID3}] = req3
s.pendingRequestsMu.Unlock()

// Verify all three fragments exist
s.pendingRequestsMu.Lock()
require.Len(t, s.queryFragmentRegistry[queryKey], 3)
require.Len(t, s.pendingRequests, 3)
require.Len(t, s.trackedRequests, 3)
s.pendingRequestsMu.Unlock()

// Fragment 1 completes
Expand All @@ -810,9 +810,9 @@ func TestQueryFragmentRegistryCleanupMultipleFragments(t *testing.T) {
s.pendingRequestsMu.Lock()
require.Len(t, s.queryFragmentRegistry[queryKey], 2, "should have 2 fragments remaining")
require.ElementsMatch(t, []uint64{fragmentID2, fragmentID3}, s.queryFragmentRegistry[queryKey])
require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID1})
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2})
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3})
require.NotContains(t, s.trackedRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID1})
require.Contains(t, s.trackedRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2})
require.Contains(t, s.trackedRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3})
s.pendingRequestsMu.Unlock()

// Fragment 2 completes
Expand All @@ -822,8 +822,8 @@ func TestQueryFragmentRegistryCleanupMultipleFragments(t *testing.T) {
s.pendingRequestsMu.Lock()
require.Len(t, s.queryFragmentRegistry[queryKey], 1, "should have 1 fragment remaining")
require.Equal(t, []uint64{fragmentID3}, s.queryFragmentRegistry[queryKey])
require.NotContains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2})
require.Contains(t, s.pendingRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3})
require.NotContains(t, s.trackedRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID2})
require.Contains(t, s.trackedRequests, requestKey{queryKey: queryKey, fragmentID: fragmentID3})
s.pendingRequestsMu.Unlock()

// Fragment 3 completes (last fragment)
Expand All @@ -833,7 +833,7 @@ func TestQueryFragmentRegistryCleanupMultipleFragments(t *testing.T) {
s.pendingRequestsMu.Lock()
_, registryExists := s.queryFragmentRegistry[queryKey]
require.False(t, registryExists, "queryFragmentRegistry should be deleted when last fragment completes")
require.Empty(t, s.pendingRequests, "all pendingRequests should be cleaned up")
require.Empty(t, s.trackedRequests, "all trackedRequests should be cleaned up")
s.pendingRequestsMu.Unlock()
}

Expand Down Expand Up @@ -865,7 +865,7 @@ func TestQueryFragmentRegistryNoLeak(t *testing.T) {
s.pendingRequestsMu.Lock()
queryKey := queryKey{frontendAddr: frontendAddr, queryID: queryID}
s.queryFragmentRegistry[queryKey] = []uint64{fragmentID}
s.pendingRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req
s.trackedRequests[requestKey{queryKey: queryKey, fragmentID: fragmentID}] = req
s.pendingRequestsMu.Unlock()

// Complete the request
Expand All @@ -877,6 +877,89 @@ func TestQueryFragmentRegistryNoLeak(t *testing.T) {
// Verify no leak: registry should be empty
s.pendingRequestsMu.Lock()
require.Empty(t, s.queryFragmentRegistry, "queryFragmentRegistry should be empty after all requests complete")
require.Empty(t, s.pendingRequests, "pendingRequests should be empty after all requests complete")
require.Empty(t, s.trackedRequests, "trackedRequests should be empty after all requests complete")
s.pendingRequestsMu.Unlock()
}

func TestSchedulerPendingRequestsMetric(t *testing.T) {
reg := prometheus.NewPedanticRegistry()

_, frontendClient, querierClient := setupScheduler(t, reg, false)
frontendLoop := initFrontendLoop(t, frontendClient, "frontend-12345")

// Initial state validation: 0 tracked requests.
require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_scheduler_tracked_requests Number of requests currently tracked by the scheduler.
# TYPE cortex_query_scheduler_tracked_requests gauge
cortex_query_scheduler_tracked_requests 0
`), "cortex_query_scheduler_tracked_requests"))

// Enqueue the first request.
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
QueryID: 1,
UserID: "test",
HttpRequest: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello"},
})

// Metric should increase to reflect 1 tracked request.
require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_scheduler_tracked_requests Number of requests currently tracked by the scheduler.
# TYPE cortex_query_scheduler_tracked_requests gauge
cortex_query_scheduler_tracked_requests 1
`), "cortex_query_scheduler_tracked_requests"))

// Enqueue the second request.
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{
Type: schedulerpb.ENQUEUE,
QueryID: 2,
UserID: "test",
HttpRequest: &httpgrpc.HTTPRequest{Method: "GET", Url: "/hello2"},
})

// Metric should increase to reflect 2 tracked requests.
require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_scheduler_tracked_requests Number of requests currently tracked by the scheduler.
# TYPE cortex_query_scheduler_tracked_requests gauge
cortex_query_scheduler_tracked_requests 2
`), "cortex_query_scheduler_tracked_requests"))

// Cancel the first request from the Query Frontend
frontendToScheduler(t, frontendLoop, &schedulerpb.FrontendToScheduler{
Type: schedulerpb.CANCEL,
QueryID: 1,
})

// The canceled request is removed from the map immediately, so the metric should decrease to 1.
require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_scheduler_tracked_requests Number of requests currently tracked by the scheduler.
# TYPE cortex_query_scheduler_tracked_requests gauge
cortex_query_scheduler_tracked_requests 1
`), "cortex_query_scheduler_tracked_requests"))

// A Querier picks up the second request.
querierLoop := initQuerierLoop(t, querierClient, "querier-1")
msg, err := querierLoop.Recv()
require.NoError(t, err)
require.Equal(t, uint64(2), msg.QueryID)

// Since the picked request currently executing in the Querier, the metric should be 1.
require.NoError(t, promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_scheduler_tracked_requests Number of requests currently tracked by the scheduler.
# TYPE cortex_query_scheduler_tracked_requests gauge
cortex_query_scheduler_tracked_requests 1
`), "cortex_query_scheduler_tracked_requests"))

// The Querier finishes processing the request and reports back.
require.NoError(t, querierLoop.Send(&schedulerpb.QuerierToScheduler{}))

// The background goroutine (forwardRequestToQuerier) removes the request.
test.Poll(t, 2*time.Second, true, func() any {
err := promtest.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_query_scheduler_tracked_requests Number of requests currently tracked by the scheduler.
# TYPE cortex_query_scheduler_tracked_requests gauge
cortex_query_scheduler_tracked_requests 0
`), "cortex_query_scheduler_tracked_requests")
return err == nil
})
}
Loading