-
Notifications
You must be signed in to change notification settings - Fork 850
Add source label (rw1/rw2) to incoming/received data #7237
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
Large diffs are not rendered by default.
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -329,7 +329,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove | |
| Namespace: "cortex", | ||
| Name: "distributor_received_samples_total", | ||
| Help: "The total number of received samples, excluding rejected and deduped samples.", | ||
| }, []string{"user", "type"}), | ||
| }, []string{"user", "type", "source"}), | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Personally I don't really have a usecase to keep track of received samples or other data types per source. Is it sufficient to keep track of number of remote write v2 vs v1 requests?
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. That sounds right, just counting requests enough for now. |
||
| receivedSamplesPerLabelSet: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "cortex", | ||
| Name: "distributor_received_samples_per_labelset_total", | ||
|
|
@@ -339,27 +339,27 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove | |
| Namespace: "cortex", | ||
| Name: "distributor_received_exemplars_total", | ||
| Help: "The total number of received exemplars, excluding rejected and deduped exemplars.", | ||
| }, []string{"user"}), | ||
| }, []string{"user", "source"}), | ||
| receivedMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "cortex", | ||
| Name: "distributor_received_metadata_total", | ||
| Help: "The total number of received metadata, excluding rejected.", | ||
| }, []string{"user"}), | ||
| }, []string{"user", "source"}), | ||
| incomingSamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "cortex", | ||
| Name: "distributor_samples_in_total", | ||
| Help: "The total number of samples that have come in to the distributor, including rejected or deduped samples.", | ||
| }, []string{"user", "type"}), | ||
| }, []string{"user", "type", "source"}), | ||
| incomingExemplars: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "cortex", | ||
| Name: "distributor_exemplars_in_total", | ||
| Help: "The total number of exemplars that have come in to the distributor, including rejected or deduped exemplars.", | ||
| }, []string{"user"}), | ||
| }, []string{"user", "source"}), | ||
| incomingMetadata: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "cortex", | ||
| Name: "distributor_metadata_in_total", | ||
| Help: "The total number of metadata that have come in to the distributor, including rejected.", | ||
| }, []string{"user"}), | ||
| }, []string{"user", "source"}), | ||
| nonHASamples: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{ | ||
| Namespace: "cortex", | ||
| Name: "distributor_non_ha_samples_received_total", | ||
|
|
@@ -518,25 +518,42 @@ func (d *Distributor) running(ctx context.Context) error { | |
|
|
||
| func (d *Distributor) cleanupInactiveUser(userID string) { | ||
| d.ingestersRing.CleanupShuffleShardCache(userID) | ||
|
|
||
| d.HATracker.CleanupHATrackerMetricsForUser(userID) | ||
|
|
||
| d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeFloat) | ||
| d.receivedSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram) | ||
| d.receivedExemplars.DeleteLabelValues(userID) | ||
| d.receivedMetadata.DeleteLabelValues(userID) | ||
| d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeFloat) | ||
| d.incomingSamples.DeleteLabelValues(userID, sampleMetricTypeHistogram) | ||
| d.incomingExemplars.DeleteLabelValues(userID) | ||
| d.incomingMetadata.DeleteLabelValues(userID) | ||
| labelsToMatch := map[string]string{"user": userID} | ||
|
|
||
| if err := util.DeleteMatchingLabels(d.receivedSamples, labelsToMatch); err != nil { | ||
| level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_total metric for user", "user", userID, "err", err) | ||
| } | ||
|
|
||
| if err := util.DeleteMatchingLabels(d.receivedExemplars, labelsToMatch); err != nil { | ||
| level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_exemplars_total metric for user", "user", userID, "err", err) | ||
| } | ||
|
|
||
| if err := util.DeleteMatchingLabels(d.receivedMetadata, labelsToMatch); err != nil { | ||
| level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_metadata_total metric for user", "user", userID, "err", err) | ||
| } | ||
|
|
||
| if err := util.DeleteMatchingLabels(d.incomingSamples, labelsToMatch); err != nil { | ||
| level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_samples_in_total metric for user", "user", userID, "err", err) | ||
| } | ||
|
|
||
| if err := util.DeleteMatchingLabels(d.incomingExemplars, labelsToMatch); err != nil { | ||
| level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_exemplars_in_total metric for user", "user", userID, "err", err) | ||
| } | ||
|
|
||
| if err := util.DeleteMatchingLabels(d.incomingMetadata, labelsToMatch); err != nil { | ||
| level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_metadata_in_total metric for user", "user", userID, "err", err) | ||
| } | ||
|
|
||
| d.nonHASamples.DeleteLabelValues(userID) | ||
| d.latestSeenSampleTimestampPerUser.DeleteLabelValues(userID) | ||
|
|
||
| if err := util.DeleteMatchingLabels(d.dedupedSamples, map[string]string{"user": userID}); err != nil { | ||
| if err := util.DeleteMatchingLabels(d.dedupedSamples, labelsToMatch); err != nil { | ||
| level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_deduped_samples_total metric for user", "user", userID, "err", err) | ||
| } | ||
|
|
||
| if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, map[string]string{"user": userID}); err != nil { | ||
| if err := util.DeleteMatchingLabels(d.receivedSamplesPerLabelSet, labelsToMatch); err != nil { | ||
| level.Warn(d.log).Log("msg", "failed to remove cortex_distributor_received_samples_per_labelset_total metric for user", "user", userID, "err", err) | ||
| } | ||
|
|
||
|
|
@@ -742,12 +759,14 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co | |
| numHistogramSamples += len(ts.Histograms) | ||
| numExemplars += len(ts.Exemplars) | ||
| } | ||
| sourceLabel := getSourceLabel(req.Source) | ||
|
|
||
| // Count the total samples, exemplars in, prior to validation or deduplication, for comparison with other metrics. | ||
| d.incomingSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(numFloatSamples)) | ||
| d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(numHistogramSamples)) | ||
| d.incomingExemplars.WithLabelValues(userID).Add(float64(numExemplars)) | ||
| d.incomingSamples.WithLabelValues(userID, sampleMetricTypeFloat, sourceLabel).Add(float64(numFloatSamples)) | ||
| d.incomingSamples.WithLabelValues(userID, sampleMetricTypeHistogram, sourceLabel).Add(float64(numHistogramSamples)) | ||
| d.incomingExemplars.WithLabelValues(userID, sourceLabel).Add(float64(numExemplars)) | ||
| // Count the total number of metadata in. | ||
| d.incomingMetadata.WithLabelValues(userID).Add(float64(len(req.Metadata))) | ||
| d.incomingMetadata.WithLabelValues(userID, sourceLabel).Add(float64(len(req.Metadata))) | ||
|
|
||
| if d.cfg.InstanceLimits.MaxInflightPushRequests > 0 && inflight > int64(d.cfg.InstanceLimits.MaxInflightPushRequests) { | ||
| return nil, httpgrpc.Errorf(http.StatusServiceUnavailable, "too many inflight push requests in distributor") | ||
|
|
@@ -798,10 +817,10 @@ func (d *Distributor) Push(ctx context.Context, req *cortexpb.WriteRequest) (*co | |
| } | ||
| metadataKeys, validatedMetadata, firstPartialErr := d.prepareMetadataKeys(req, limits, userID, firstPartialErr) | ||
|
|
||
| d.receivedSamples.WithLabelValues(userID, sampleMetricTypeFloat).Add(float64(validatedFloatSamples)) | ||
| d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogram).Add(float64(validatedHistogramSamples)) | ||
| d.receivedExemplars.WithLabelValues(userID).Add(float64(validatedExemplars)) | ||
| d.receivedMetadata.WithLabelValues(userID).Add(float64(len(validatedMetadata))) | ||
| d.receivedSamples.WithLabelValues(userID, sampleMetricTypeFloat, sourceLabel).Add(float64(validatedFloatSamples)) | ||
| d.receivedSamples.WithLabelValues(userID, sampleMetricTypeHistogram, sourceLabel).Add(float64(validatedHistogramSamples)) | ||
| d.receivedExemplars.WithLabelValues(userID, sourceLabel).Add(float64(validatedExemplars)) | ||
| d.receivedMetadata.WithLabelValues(userID, sourceLabel).Add(float64(len(validatedMetadata))) | ||
|
|
||
| if len(seriesKeys) == 0 && len(nhSeriesKeys) == 0 && len(metadataKeys) == 0 { | ||
| return &cortexpb.WriteResponse{}, firstPartialErr | ||
|
|
@@ -1757,3 +1776,16 @@ func getLimitFromSelectHints(hints *storage.SelectHints) int { | |
| } | ||
| return 0 | ||
| } | ||
|
|
||
| func getSourceLabel(s cortexpb.SourceEnum) string { | ||
| switch s { | ||
| case cortexpb.API: | ||
| return "rw1" | ||
| case cortexpb.API_V2: | ||
| return "rw2" | ||
| case cortexpb.RULE: | ||
| return "rule" | ||
| default: | ||
| return "unknown" | ||
| } | ||
| } | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
For me I think API v2 is not another source. It should be still API source