Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [ENHANCEMENT] Distributor: Add `cortex_distributor_push_requests_total` metric to track the number of push requests by type. #7239
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
* [ENHANCEMENT] HATracker: Add `-distributor.ha-tracker.enable-startup-sync` flag. If enabled, the ha-tracker fetches all tracked keys on startup to populate the local cache. #7213
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
Expand Down
2 changes: 2 additions & 0 deletions integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -368,6 +368,8 @@ func TestIngest(t *testing.T) {
require.NoError(t, err)
testPushHeader(t, writeStats, 0, 1, 0)

require.NoError(t, cortex.WaitSumMetricsWithOptions(e2e.Equals(5), []string{"cortex_distributor_push_requests_total"}, e2e.WithLabelMatchers(labels.MustNewMatcher(labels.MatchEqual, "type", "prw2"))))

testHistogramTimestamp := now.Add(blockRangePeriod * 2)
expectedNH := tsdbutil.GenerateTestHistogram(int64(histogramIdx))
result, err = c.Query(`test_nh`, testHistogramTimestamp)
Expand Down
18 changes: 13 additions & 5 deletions pkg/api/api.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import (
"github.com/grafana/regexp"
"github.com/klauspost/compress/gzhttp"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prometheus/prometheus/model/relabel"
"github.com/prometheus/prometheus/storage"
"github.com/prometheus/prometheus/util/httputil"
Expand Down Expand Up @@ -280,10 +282,16 @@ func (a *API) RegisterRuntimeConfig(runtimeConfigHandler http.HandlerFunc) {
}

// RegisterDistributor registers the endpoints associated with the distributor.
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides) {
func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distributor.Config, overrides *validation.Overrides, reg prometheus.Registerer) {
distributorpb.RegisterDistributorServer(a.server.GRPC, d)

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
requestTotal := promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_push_requests_total",
Help: "Total number of push requests by type.",
}, []string{"type"})

a.RegisterRoute("/api/v1/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST")
a.RegisterRoute("/api/v1/otlp/v1/metrics", push.OTLPHandler(pushConfig.OTLPMaxRecvMsgSize, overrides, pushConfig.OTLPConfig, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")

a.indexPage.AddLink(SectionAdminEndpoints, "/distributor/ring", "Distributor Ring Status")
Expand All @@ -295,7 +303,7 @@ func (a *API) RegisterDistributor(d *distributor.Distributor, pushConfig distrib
a.RegisterRoute("/distributor/ha_tracker", d.HATracker, false, "GET")

// Legacy Routes
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d)), true, "POST")
a.RegisterRoute(path.Join(a.cfg.LegacyHTTPPrefix, "/push"), push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, a.cfg.wrapDistributorPush(d), requestTotal), true, "POST")
a.RegisterRoute("/all_user_stats", http.HandlerFunc(d.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ha-tracker", d.HATracker, false, "GET")
}
Expand Down Expand Up @@ -328,12 +336,12 @@ func (a *API) RegisterIngester(i Ingester, pushConfig distributor.Config, overri
a.RegisterRoute("/ingester/renewTokens", http.HandlerFunc(i.RenewTokenHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/all_user_stats", http.HandlerFunc(i.AllUserStatsHandler), false, "GET")
a.RegisterRoute("/ingester/mode", http.HandlerFunc(i.ModeHandler), false, "GET", "POST")
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/ingester/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.

// Legacy Routes
a.RegisterRoute("/flush", http.HandlerFunc(i.FlushHandler), false, "GET", "POST")
a.RegisterRoute("/shutdown", http.HandlerFunc(i.ShutdownHandler), false, "GET", "POST")
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push), true, "POST") // For testing and debugging.
a.RegisterRoute("/push", push.Handler(pushConfig.RemoteWriteV2Enabled, pushConfig.MaxRecvMsgSize, overrides, a.sourceIPs, i.Push, nil), true, "POST") // For testing and debugging.
}

func (a *API) RegisterTenantDeletion(api *purger.TenantDeletionAPI) {
Expand Down
2 changes: 1 addition & 1 deletion pkg/cortex/modules.go
Original file line number Diff line number Diff line change
Expand Up @@ -258,7 +258,7 @@ func (t *Cortex) initGrpcClientServices() (serv services.Service, err error) {
}

func (t *Cortex) initDistributor() (serv services.Service, err error) {
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides)
t.API.RegisterDistributor(t.Distributor, t.Cfg.Distributor, t.Overrides, prometheus.DefaultRegisterer)

return nil, nil
}
Expand Down
18 changes: 17 additions & 1 deletion pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"github.com/go-kit/log/level"
"github.com/prometheus/client_golang/exp/api/remote"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/schema"
Expand All @@ -34,13 +35,16 @@ const (
rw20WrittenSamplesHeader = "X-Prometheus-Remote-Write-Samples-Written"
rw20WrittenHistogramsHeader = "X-Prometheus-Remote-Write-Histograms-Written"
rw20WrittenExemplarsHeader = "X-Prometheus-Remote-Write-Exemplars-Written"

labelValuePRW1 = "prw1"
labelValuePRW2 = "prw2"
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is prw2 a standard name we should use? I wonder if users know what it means.
It is mentioned in https://prometheus.io/docs/specs/prw/remote_write_spec_2_0 though so maybe we can use it

)

// Func defines the type of the push. It is similar to http.HandlerFunc.
type Func func(context.Context, *cortexpb.WriteRequest) (*cortexpb.WriteResponse, error)

// Handler is a http.Handler which accepts WriteRequests.
func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func) http.Handler {
func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation.Overrides, sourceIPs *middleware.SourceIPExtractor, push Func, requestTotal *prometheus.CounterVec) http.Handler {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we also count OTLP? That's a different push handler though

return http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
ctx := r.Context()
logger := log.WithContext(ctx, log.Logger)
Expand Down Expand Up @@ -151,6 +155,10 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation
return
}

if requestTotal != nil {
requestTotal.WithLabelValues(getTypeLabel(msgType)).Inc()
}

if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType {
level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
Expand Down Expand Up @@ -290,3 +298,11 @@ func convertV2ToV1Exemplars(b *labels.ScratchBuilder, symbols []string, v2Exempl
}
return v1Exemplars, nil
}

func getTypeLabel(msgType remote.WriteMessageType) string {
if msgType == remote.WriteV1MessageType {
return labelValuePRW1
}

return labelValuePRW2
}
53 changes: 46 additions & 7 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"time"

"github.com/golang/snappy"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/testutil"
"github.com/prometheus/prometheus/model/histogram"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/prompb"
Expand Down Expand Up @@ -126,7 +128,7 @@ func Benchmark_Handler(b *testing.B) {
testSeriesNums := []int{10, 100, 500, 1000}
for _, seriesNum := range testSeriesNums {
b.Run(fmt.Sprintf("PRW1 with %d series", seriesNum), func(b *testing.B) {
handler := Handler(true, 1000000, overrides, nil, mockHandler)
handler := Handler(true, 1000000, overrides, nil, mockHandler, nil)
req, err := createPRW1HTTPRequest(seriesNum)
require.NoError(b, err)

Expand All @@ -140,7 +142,7 @@ func Benchmark_Handler(b *testing.B) {
}
})
b.Run(fmt.Sprintf("PRW2 with %d series", seriesNum), func(b *testing.B) {
handler := Handler(true, 1000000, overrides, nil, mockHandler)
handler := Handler(true, 1000000, overrides, nil, mockHandler, nil)
req, err := createPRW2HTTPRequest(seriesNum)
require.NoError(b, err)

Expand Down Expand Up @@ -453,7 +455,7 @@ func TestHandler_remoteWrite(t *testing.T) {
overrides := validation.NewOverrides(limits, nil)

t.Run("remote write v1", func(t *testing.T) {
handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API))
handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), nil)
req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
Expand All @@ -463,7 +465,7 @@ func TestHandler_remoteWrite(t *testing.T) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")

handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API))
handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), nil)
req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true)
req = req.WithContext(ctx)
resp := httptest.NewRecorder()
Expand All @@ -484,7 +486,7 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) {
overrides := validation.NewOverrides(limits, nil)

sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")
handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API))
handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil)

tests := []struct {
description string
Expand Down Expand Up @@ -612,7 +614,7 @@ func TestHandler_cortexWriteRequest(t *testing.T) {
overrides := validation.NewOverrides(limits, nil)

sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")
handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API))
handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API), nil)

t.Run("remote write v1", func(t *testing.T) {
req := createRequest(t, createCortexWriteRequestProtobuf(t, false, cortexpb.API), false)
Expand Down Expand Up @@ -642,12 +644,49 @@ func TestHandler_ignoresSkipLabelNameValidationIfSet(t *testing.T) {
createRequest(t, createCortexWriteRequestProtobuf(t, true, cortexpb.RULE), false),
} {
resp := httptest.NewRecorder()
handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE))
handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.RULE), nil)
handler.ServeHTTP(resp, req)
assert.Equal(t, 200, resp.Code)
}
}

func TestHandler_MetricCollection(t *testing.T) {
var limits validation.Limits
flagext.DefaultValues(&limits)
overrides := validation.NewOverrides(limits, nil)

counter := prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "test_counter",
Help: "test help",
}, []string{"type"})

handler := Handler(true, 100000, overrides, nil, verifyWriteRequestHandler(t, cortexpb.API), counter)

t.Run("counts v1 requests", func(t *testing.T) {
req := createRequest(t, createPrometheusRemoteWriteProtobuf(t), false)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusOK, resp.Code)

val := testutil.ToFloat64(counter.WithLabelValues("prw1"))
assert.Equal(t, 1.0, val)
})

t.Run("counts v2 requests", func(t *testing.T) {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")

req := createRequest(t, createPrometheusRemoteWriteV2Protobuf(t), true)
req = req.WithContext(ctx)
resp := httptest.NewRecorder()
handler.ServeHTTP(resp, req)
assert.Equal(t, http.StatusNoContent, resp.Code)

val := testutil.ToFloat64(counter.WithLabelValues("prw2"))
assert.Equal(t, 1.0, val)
})
}

func verifyWriteRequestHandler(t *testing.T, expectSource cortexpb.SourceEnum) func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
t.Helper()
return func(ctx context.Context, request *cortexpb.WriteRequest) (response *cortexpb.WriteResponse, err error) {
Expand Down
Loading