Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
## master / unreleased
* [CHANGE] Querier: Make query time range configurations per-tenant: `query_ingesters_within`, `query_store_after`, and `shuffle_sharding_ingesters_lookback_period`. Uses `model.Duration` instead of `time.Duration` to support serialization but has minimum unit of 1ms (nanoseconds/microseconds not supported). #7160
* [CHANGE] Cache: Setting `-blocks-storage.bucket-store.metadata-cache.bucket-index-content-ttl` to 0 will disable the bucket-index cache. #7446
* [CHANGE] HA Tracker: Add per-tenant configurable failover timeout via `-distributor.ha-tracker.failover-timeout-override` runtime config option. When set to a non-zero value, overrides the global `-distributor.ha-tracker.failover-timeout` for that tenant. #7481
* [FEATURE] Ruler: Add per-tenant `ruler_alert_generator_url_template` runtime config option to customize alert generator URLs using Go templates. Supports Grafana Explore, Perses, and other UIs. #7302
* [FEATURE] Distributor: Add experimental `-distributor.enable-start-timestamp` flag for Prometheus Remote Write 2.0. When enabled, `StartTimestamp (ST)` is ingested. #7371
* [FEATURE] Memberlist: Add `-memberlist.cluster-label` and `-memberlist.cluster-label-verification-disabled` to prevent accidental cross-cluster gossip joins and support rolling label rollout. #7385
Expand Down
5 changes: 5 additions & 0 deletions docs/configuration/config-file-reference.md
Original file line number Diff line number Diff line change
Expand Up @@ -4027,6 +4027,11 @@ The `limits_config` configures default and per-tenant limits imposed by Cortex s
# CLI flag: -distributor.ha-tracker.max-clusters
[ha_max_clusters: <int> | default = 0]

# Per-tenant failover timeout for the HA tracker. If set to 0, the global
# -distributor.ha-tracker.failover-timeout value is used.
# CLI flag: -distributor.ha-tracker.failover-timeout-override
[ha_tracker_failover_timeout: <duration> | default = 0s]

# This flag can be used to specify label names that to drop during sample
# ingestion within the distributor and can be repeated in order to drop multiple
# labels.
Expand Down
16 changes: 13 additions & 3 deletions pkg/ha/ha_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,10 @@ type HATrackerLimits interface {
// MaxHAReplicaGroups returns max number of replica groups that HA tracker should track for a user.
// Samples from additional replicaGroups are rejected.
MaxHAReplicaGroups(user string) int

// HATrackerFailoverTimeout returns the failover timeout for a user.
// If 0, the global config value is used.
HATrackerFailoverTimeout(user string) time.Duration
}

// ProtoReplicaDescFactory makes new InstanceDescs
Expand Down Expand Up @@ -612,7 +616,7 @@ func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, repl
}
}

err := c.checkKVStore(ctx, key, replica, now)
err := c.checkKVStore(ctx, key, replica, userID, now)
c.kvCASCalls.WithLabelValues(userID, replicaGroup).Inc()
if err != nil {
// The callback within checkKVStore will return a ReplicasNotMatchError if the sample is being deduped,
Expand All @@ -624,7 +628,7 @@ func (c *HATracker) CheckReplica(ctx context.Context, userID, replicaGroup, repl
return err
}

func (c *HATracker) checkKVStore(ctx context.Context, key, replica string, now time.Time) error {
func (c *HATracker) checkKVStore(ctx context.Context, key, replica, userID string, now time.Time) error {
return c.client.CAS(ctx, key, func(in any) (out any, retry bool, err error) {
if desc, ok := in.(*ReplicaDesc); ok && desc.DeletedAt == 0 {
// We don't need to CAS and update the timestamp in the KV store if the timestamp we've received
Expand All @@ -635,7 +639,13 @@ func (c *HATracker) checkKVStore(ctx context.Context, key, replica string, now t

// We shouldn't failover to accepting a new replica if the timestamp we've received this sample at
// is less than failover timeout amount of time since the timestamp in the KV store.
if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < c.cfg.FailoverTimeout {
failoverTimeout := c.cfg.FailoverTimeout
if c.limits != nil {
if t := c.limits.HATrackerFailoverTimeout(userID); t > 0 {
failoverTimeout = t
}
}
if desc.Replica != replica && now.Sub(timestamp.Time(desc.ReceivedAt)) < failoverTimeout {
return nil, false, ReplicasNotMatchError{replica: replica, elected: desc.Replica}
}
}
Expand Down
9 changes: 8 additions & 1 deletion pkg/ha/ha_tracker_http.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,13 +76,20 @@ func (h *HATracker) ServeHTTP(w http.ResponseWriter, req *http.Request) {
for key, desc := range h.elected {
chunks := strings.SplitN(key, "/", 2)

failoverTimeout := h.cfg.FailoverTimeout
if h.limits != nil {
if t := h.limits.HATrackerFailoverTimeout(chunks[0]); t > 0 {
failoverTimeout = t
}
}

electedReplicas = append(electedReplicas, replica{
UserID: chunks[0],
Cluster: chunks[1],
Replica: desc.Replica,
ElectedAt: timestamp.Time(desc.ReceivedAt),
UpdateTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.UpdateTimeout)),
FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(h.cfg.FailoverTimeout)),
FailoverTime: time.Until(timestamp.Time(desc.ReceivedAt).Add(failoverTimeout)),
})
}
h.electedLock.RUnlock()
Expand Down
5 changes: 5 additions & 0 deletions pkg/ha/ha_tracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -767,12 +767,17 @@ func TestReplicasNotMatchError(t *testing.T) {

type trackerLimits struct {
maxReplicaGroups int
failoverTimeout time.Duration
}

func (l trackerLimits) MaxHAReplicaGroups(_ string) int {
return l.maxReplicaGroups
}

func (l trackerLimits) HATrackerFailoverTimeout(_ string) time.Duration {
return l.failoverTimeout
}

func TestHATracker_MetricsCleanup(t *testing.T) {
t.Parallel()
reg := prometheus.NewPedanticRegistry()
Expand Down
1 change: 1 addition & 0 deletions pkg/util/validation/exporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ func TestOverridesExporter_withConfig(t *testing.T) {
cortex_overrides{limit_name="enforce_metadata_metric_name",user="tenant-a"} 1
cortex_overrides{limit_name="enforce_metric_name",user="tenant-a"} 1
cortex_overrides{limit_name="ha_max_clusters",user="tenant-a"} 0
cortex_overrides{limit_name="ha_tracker_failover_timeout",user="tenant-a"} 0
cortex_overrides{limit_name="ingestion_burst_size",user="tenant-a"} 50000
cortex_overrides{limit_name="ingestion_rate",user="tenant-a"} 25000
cortex_overrides{limit_name="ingestion_tenant_shard_size",user="tenant-a"} 0
Expand Down
8 changes: 8 additions & 0 deletions pkg/util/validation/limits.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,6 +138,7 @@ type Limits struct {
HAClusterLabel string `yaml:"ha_cluster_label" json:"ha_cluster_label"`
HAReplicaLabel string `yaml:"ha_replica_label" json:"ha_replica_label"`
HAMaxClusters int `yaml:"ha_max_clusters" json:"ha_max_clusters"`
HATrackerFailoverTimeout model.Duration `yaml:"ha_tracker_failover_timeout" json:"ha_tracker_failover_timeout"`
DropLabels flagext.StringSlice `yaml:"drop_labels" json:"drop_labels"`
MaxLabelNameLength int `yaml:"max_label_name_length" json:"max_label_name_length"`
MaxLabelValueLength int `yaml:"max_label_value_length" json:"max_label_value_length"`
Expand Down Expand Up @@ -281,6 +282,8 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) {
f.StringVar(&l.HAClusterLabel, "distributor.ha-tracker.cluster", "cluster", "Prometheus label to look for in samples to identify a Prometheus HA cluster.")
f.StringVar(&l.HAReplicaLabel, "distributor.ha-tracker.replica", "__replica__", "Prometheus label to look for in samples to identify a Prometheus HA replica.")
f.IntVar(&l.HAMaxClusters, "distributor.ha-tracker.max-clusters", 0, "Maximum number of clusters that HA tracker will keep track of for single user. 0 to disable the limit.")
_ = l.HATrackerFailoverTimeout.Set("0s")
f.Var(&l.HATrackerFailoverTimeout, "distributor.ha-tracker.failover-timeout-override", "Per-tenant failover timeout for the HA tracker. If set to 0, the global -distributor.ha-tracker.failover-timeout value is used.")
f.Var((*flagext.StringSliceCSV)(&l.PromoteResourceAttributes), "distributor.promote-resource-attributes", "Comma separated list of resource attributes that should be converted to labels.")
f.Var(&l.DropLabels, "distributor.drop-label", "This flag can be used to specify label names that to drop during sample ingestion within the distributor and can be repeated in order to drop multiple labels.")
f.BoolVar(&l.EnableTypeAndUnitLabels, "distributor.enable-type-and-unit-labels", false, "EXPERIMENTAL: If true, the __type__ and __unit__ labels are added to metrics. This applies to remote write v2 and OTLP requests.")
Expand Down Expand Up @@ -1070,6 +1073,11 @@ func (o *Overrides) MaxHAReplicaGroups(user string) int {
return o.GetOverridesForUser(user).HAMaxClusters
}

// HATrackerFailoverTimeout returns the per-tenant HA tracker failover timeout.
func (o *Overrides) HATrackerFailoverTimeout(user string) time.Duration {
return time.Duration(o.GetOverridesForUser(user).HATrackerFailoverTimeout)
}

// S3SSEType returns the per-tenant S3 SSE type.
func (o *Overrides) S3SSEType(user string) string {
return o.GetOverridesForUser(user).S3SSEType
Expand Down
7 changes: 7 additions & 0 deletions schemas/cortex-config-schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -5119,6 +5119,13 @@
"type": "string",
"x-cli-flag": "distributor.ha-tracker.replica"
},
"ha_tracker_failover_timeout": {
"default": "0s",
"description": "Per-tenant failover timeout for the HA tracker. If set to 0, the global -distributor.ha-tracker.failover-timeout value is used.",
"type": "string",
"x-cli-flag": "distributor.ha-tracker.failover-timeout-override",
"x-format": "duration"
},
"ingestion_burst_size": {
"default": 50000,
"description": "Per-user allowed ingestion burst size (in number of samples).",
Expand Down
Loading