Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
* [FEATURE] Distributor: Add a per-tenant flag `-distributor.enable-type-and-unit-labels` that enables adding `__unit__` and `__type__` labels for remote write v2 and OTLP requests. This is a breaking change; the `-distributor.otlp.enable-type-and-unit-labels` flag is now deprecated, operates as a no-op, and has been consolidated into this new flag. #7077
* [FEATURE] Querier: Add experimental projection pushdown support in Parquet Queryable. #7152
* [FEATURE] Ingester: Add experimental active series queried metric. #7173
* [ENHANCEMENT] Distributor: Add `source` label to `cortex_distributor_samples_in_total`, `cortex_distributor_received_samples_total`, and related metadata/exemplar metrics to distinguish between Prometheus Remote Write v1 (`rw1`) and v2 (`rw2`) traffic. #7237
* [ENHANCEMENT] Querier: Add `-querier.store-gateway-series-batch-size` flag to configure the maximum number of series to be batched in a single gRPC response message from Store Gateways. #7203
* [ENHANCEMENT] HATracker: Add `-distributor.ha-tracker.enable-startup-sync` flag. If enabled, the ha-tracker fetches all tracked keys on startup to populate the local cache. #7213
* [ENHANCEMENT] Ingester: Add support for ingesting Native Histogram with Custom Buckets. #7191
Expand Down
200 changes: 102 additions & 98 deletions pkg/cortexpb/cortex.pb.go

Large diffs are not rendered by default.

1 change: 1 addition & 0 deletions pkg/cortexpb/cortex.proto
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ option (gogoproto.unmarshaler_all) = true;
enum SourceEnum {
API = 0;
RULE = 1;
API_V2 = 2;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For me I think API v2 is not another source. It should be still API source

}

message MessageWithBufRef {
Expand Down
82 changes: 57 additions & 25 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,7 +329,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Namespace: "cortex",
Name: "distributor_received_samples_total",
Help: "The total number of received samples, excluding rejected and deduped samples.",
}, []string{"user", "type"}),
}, []string{"user", "type", "source"}),
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Personally I don't really have a usecase to keep track of received samples or other data types per source.

Is it sufficient to keep track of number of remote write v2 vs v1 requests?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That sounds right, just counting requests enough for now.

receivedSamplesPerLabelSet: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_samples_per_labelset_total",
Expand All @@ -339,27 +339,27 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Namespace: "cortex",
Name: "distributor_received_exemplars_total",
Help: "The total number of received exemplars, excluding rejected and deduped exemplars.",
}, []string{"user"}),
}, []string{"user", "source"}),
receivedMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_received_metadata_total",
Help: "The total number of received metadata, excluding rejected.",
}, []string{"user"}),
}, []string{"user", "source"}),
incomingSamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_samples_in_total",
Help: "The total number of samples that have come in to the distributor, including rejected or deduped samples.",
}, []string{"user", "type"}),
}, []string{"user", "type", "source"}),
incomingExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_exemplars_in_total",
Help: "The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.",
}, []string{"user"}),
}, []string{"user", "source"}),
incomingMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_metadata_in_total",
Help: "The total number of metadata that have come in to the distributor, including rejected.",
}, []string{"user"}),
}, []string{"user", "source"}),
nonHASamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Namespace: "cortex",
Name: "distributor_non_ha_samples_received_total",
Expand Down Expand Up @@ -518,25 +518,42 @@ func (d *Distributor) running(ctx context.Context) error {

func (d *Distributor) cleanupInactiveUser(userID string) {
d.ingestersRing.CleanupShuffleShardCache(userID)

d.HATracker.CleanupHATrackerMetricsForUser(userID)

d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
d.receivedExemplars.DeleteLabelValues(userID)
d.receivedMetadata.DeleteLabelValues(userID)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeFloat)
d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram)
d.incomingExemplars.DeleteLabelValues(userID)
d.incomingMetadata.DeleteLabelValues(userID)
labelsToMatch := map[string]string{"user": userID}

if err := util.DeleteMatchingLabels(d.receivedSamples, labelsToMatch); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_total metric for user", "user", userID, "err", err)
}

if err := util.DeleteMatchingLabels(d.receivedExemplars, labelsToMatch); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_exemplars_total metric for user", "user", userID, "err", err)
}

if err := util.DeleteMatchingLabels(d.receivedMetadata, labelsToMatch); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_metadata_total metric for user", "user", userID, "err", err)
}

if err := util.DeleteMatchingLabels(d.incomingSamples, labelsToMatch); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_samples_in_total metric for user", "user", userID, "err", err)
}

if err := util.DeleteMatchingLabels(d.incomingExemplars, labelsToMatch); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_exemplars_in_total metric for user", "user", userID, "err", err)
}

if err := util.DeleteMatchingLabels(d.incomingMetadata, labelsToMatch); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_metadata_in_total metric for user", "user", userID, "err", err)
}

d.nonHASamples.DeleteLabelValues(userID)
d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID)

if err := util.DeleteMatchingLabels(d.dedupedSamples, map[string]string{"user": userID}); err != nil {
if err := util.DeleteMatchingLabels(d.dedupedSamples, labelsToMatch); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err)
}

if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, map[string]string{"user": userID}); err != nil {
if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, labelsToMatch); err != nil {
level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user", "user", userID, "err", err)
}

Expand Down Expand Up @@ -742,12 +759,14 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
numHistogramSamples += len(ts.Histograms)
numExemplars += len(ts.Exemplars)
}
sourceLabel := getSourceLabel(req.Source)

// Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics.
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(numFloatSamples))
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(numHistogramSamples))
d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars))
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeFloat, sourceLabel).Add(float64(numFloatSamples))
d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogram, sourceLabel).Add(float64(numHistogramSamples))
d.incomingExemplars.WithLabelValues(userID, sourceLabel).Add(float64(numExemplars))
// Count the total number of metadata in.
d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata)))
d.incomingMetadata.WithLabelValues(userID, sourceLabel).Add(float64(len(req.Metadata)))

if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) {
return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "too many inflight push requests in distributor")
Expand Down Expand Up @@ -798,10 +817,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co
}
metadataKeys, validatedMetadata, firstPartialErr := d.prepareMetadataKeys(req, limits, userID, firstPartialErr)

d.receivedSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(validatedFloatSamples))
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(validatedHistogramSamples))
d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars))
d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata)))
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeFloat, sourceLabel).Add(float64(validatedFloatSamples))
d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogram, sourceLabel).Add(float64(validatedHistogramSamples))
d.receivedExemplars.WithLabelValues(userID, sourceLabel).Add(float64(validatedExemplars))
d.receivedMetadata.WithLabelValues(userID, sourceLabel).Add(float64(len(validatedMetadata)))

if len(seriesKeys) == 0 && len(nhSeriesKeys) == 0 && len(metadataKeys) == 0 {
return &cortexpb.WriteResponse{}, firstPartialErr
Expand Down Expand Up @@ -1757,3 +1776,16 @@ func getLimitFromSelectHints(hints *storage.SelectHints) int {
}
return 0
}

func getSourceLabel(s cortexpb.SourceEnum) string {
switch s {
case cortexpb.API:
return "rw1"
case cortexpb.API_V2:
return "rw2"
case cortexpb.RULE:
return "rule"
default:
return "unknown"
}
}
Loading
Loading