Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
* [ENHANCEMENT] Ingester: Add fetch timeout for Ingester expanded postings cache. #7185
* [ENHANCEMENT] Ingester: Add feature flag to collect metrics of how expensive an unoptimized regex matcher is and new limits to protect Ingester query path against expensive unoptimized regex matchers. #7194 #7210
* [ENHANCEMENT] Compactor: Add partition group creation time to visit marker. #7217
* [BUGFIX] Distributor: If remote write v2 is disabled, explicitly return HTTP 415 (Unsupported Media Type) for Remote Write V2 requests instead of attempting to parse them as V1. #7238
* [BUGFIX] Ring: Change DynamoDB KV to retry indefinitely for WatchKey. #7088
* [BUGFIX] Ruler: Add XFunctions validation support. #7111
* [BUGFIX] Querier: propagate Prometheus info annotations in protobuf responses. #7132
Expand Down
2 changes: 1 addition & 1 deletion integration/remote_write_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestIngest_SenderSendPRW2_DistributorNotAllowPRW2(t *testing.T) {
symbols1, series, _ := e2e.GenerateSeriesV2("test_series", now, prompb.Label{Name: "job", Value: "test"}, prompb.Label{Name: "foo", Value: "bar"})
_, err = c.PushV2(symbols1, series)
require.Error(t, err)
require.Contains(t, err.Error(), "sent v2 request; got 2xx, but PRW 2.0 response header statistics indicate 0 samples, 0 histograms and 0 exemplars were accepted")
require.Contains(t, err.Error(), "io.prometheus.write.v2.Request protobuf message is not accepted by this server; only accepts prometheus.WriteRequest")

// sample
result, err := c.Query("test_series", now)
Expand Down
65 changes: 33 additions & 32 deletions pkg/util/push/push.go
Original file line number Diff line number Diff line change
Expand Up @@ -137,43 +137,44 @@ func Handler(remoteWrite2Enabled bool, maxRecvMsgSize int, overrides *validation
}
}

if remoteWrite2Enabled {
// follow Prometheus https://github.com/prometheus/prometheus/blob/v3.3.1/storage/remote/write_handler.go#L121
contentType := r.Header.Get("Content-Type")
if contentType == "" {
contentType = appProtoContentType
}
// follow Prometheus https://github.com/prometheus/prometheus/blob/v3.3.1/storage/remote/write_handler.go#L121
contentType := r.Header.Get("Content-Type")
if contentType == "" {
contentType = appProtoContentType
}

msgType, err := remote.ParseProtoMsg(contentType)
if err != nil {
level.Error(logger).Log("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}
msgType, err := remote.ParseProtoMsg(contentType)
if err != nil {
level.Error(logger).Log("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}

if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType {
level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}
if msgType != remote.WriteV1MessageType && msgType != remote.WriteV2MessageType {
level.Error(logger).Log("Not accepted msg type", "msgType", msgType, "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}

enc := r.Header.Get("Content-Encoding")
if enc == "" {
} else if enc != compression.Snappy {
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, compression.Snappy)
level.Error(logger).Log("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}
enc := r.Header.Get("Content-Encoding")
if enc == "" {
} else if enc != compression.Snappy {
err := fmt.Errorf("%v encoding (compression) is not accepted by this server; only %v is acceptable", enc, compression.Snappy)
level.Error(logger).Log("Error decoding remote write request", "err", err)
http.Error(w, err.Error(), http.StatusUnsupportedMediaType)
return
}

switch msgType {
case remote.WriteV1MessageType:
handlePRW1()
case remote.WriteV2MessageType:
handlePRW2()
}
} else {
switch msgType {
case remote.WriteV1MessageType:
handlePRW1()
case remote.WriteV2MessageType:
if !remoteWrite2Enabled {
errMsg := fmt.Sprintf("%v protobuf message is not accepted by this server; only accepts %v", msgType, remote.WriteV1MessageType)
http.Error(w, errMsg, http.StatusUnsupportedMediaType)
return
}
handlePRW2()
}
})
}
Expand Down
65 changes: 43 additions & 22 deletions pkg/util/push/push_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -484,13 +484,13 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) {
overrides := validation.NewOverrides(limits, nil)

sourceIPs, _ := middleware.NewSourceIPs("SomeField", "(.*)")
handler := Handler(true, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API))

tests := []struct {
description string
reqHeaders map[string]string
expectedCode int
isV2 bool
description string
reqHeaders map[string]string
expectedCode int
isV2 bool
remoteWrite2Enabled bool
}{
{
description: "[RW 2.0] correct content-type",
Expand All @@ -499,8 +499,9 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) {
"Content-Encoding": "snappy",
remoteWriteVersionHeader: "2.0.0",
},
expectedCode: http.StatusNoContent,
isV2: true,
expectedCode: http.StatusNoContent,
isV2: true,
remoteWrite2Enabled: true,
},
{
description: "[RW 1.0] correct content-type",
Expand Down Expand Up @@ -529,8 +530,9 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) {
"Content-Encoding": "snappy",
remoteWriteVersionHeader: "2.0.0",
},
expectedCode: http.StatusUnsupportedMediaType,
isV2: true,
expectedCode: http.StatusUnsupportedMediaType,
isV2: true,
remoteWrite2Enabled: true,
},
{
description: "[RW 2.0] wrong content-encoding",
Expand All @@ -539,54 +541,73 @@ func TestHandler_ContentTypeAndEncoding(t *testing.T) {
"Content-Encoding": "zstd",
remoteWriteVersionHeader: "2.0.0",
},
expectedCode: http.StatusUnsupportedMediaType,
isV2: true,
expectedCode: http.StatusUnsupportedMediaType,
isV2: true,
remoteWrite2Enabled: true,
},
{
description: "no header, should treated as RW 1.0",
expectedCode: http.StatusOK,
isV2: false,
description: "[RW 2.0] V2 disabled",
reqHeaders: map[string]string{
"Content-Type": appProtoV2ContentType,
"Content-Encoding": "snappy",
remoteWriteVersionHeader: "2.0.0",
},
expectedCode: http.StatusUnsupportedMediaType,
isV2: true,
remoteWrite2Enabled: false,
},
{
description: "no header, should treated as RW 1.0",
expectedCode: http.StatusOK,
isV2: false,
remoteWrite2Enabled: true,
},
{
description: "missing content-type, should treated as RW 1.0",
reqHeaders: map[string]string{
"Content-Encoding": "snappy",
remoteWriteVersionHeader: "2.0.0",
},
expectedCode: http.StatusOK,
isV2: false,
expectedCode: http.StatusOK,
isV2: false,
remoteWrite2Enabled: true,
},
{
description: "missing content-encoding",
reqHeaders: map[string]string{
"Content-Type": appProtoV2ContentType,
remoteWriteVersionHeader: "2.0.0",
},
expectedCode: http.StatusNoContent,
isV2: true,
expectedCode: http.StatusNoContent,
isV2: true,
remoteWrite2Enabled: true,
},
{
description: "missing remote write version, should treated based on Content-type",
reqHeaders: map[string]string{
"Content-Type": appProtoV2ContentType,
"Content-Encoding": "snappy",
},
expectedCode: http.StatusNoContent,
isV2: true,
expectedCode: http.StatusNoContent,
isV2: true,
remoteWrite2Enabled: true,
},
{
description: "missing remote write version, should treated based on Content-type",
reqHeaders: map[string]string{
"Content-Type": appProtoV1ContentType,
"Content-Encoding": "snappy",
},
expectedCode: http.StatusOK,
isV2: false,
expectedCode: http.StatusOK,
isV2: false,
remoteWrite2Enabled: true,
},
}

for _, test := range tests {
t.Run(test.description, func(t *testing.T) {
handler := Handler(test.remoteWrite2Enabled, 100000, overrides, sourceIPs, verifyWriteRequestHandler(t, cortexpb.API))

if test.isV2 {
ctx := context.Background()
ctx = user.InjectOrgID(ctx, "user-1")
Expand Down
Loading