Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
# Changelog

## master / unreleased
* [ENHANCEMENT] Metrics Helper: Add native histogram support for aggregating and merging, including dual-format histogram handling that exposes both native and classic bucket formats. #6489
* [ENHANCEMENT] Cache: Add per-tenant TTL configuration for query results cache to control cache expiration on a per-tenant basis with separate TTLs for regular and out-of-order data. #5039

## 1.21.0 in progress
Expand Down
296 changes: 282 additions & 14 deletions pkg/util/metrics_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"errors"
"fmt"
"math"
"sort"
"strings"
"sync"

Expand Down Expand Up @@ -467,10 +468,77 @@ func (s *SummaryData) Metric(desc *prometheus.Desc, labelValues ...string) prome
}

// HistogramData keeps data required to build histogram Metric.
//
// For native histograms, the Schema and ZeroThreshold are set from the first
// native histogram encountered. All aggregated histograms should use the same
// schema, as bucket indices have different meanings across different schemas.
type HistogramData struct {
sampleCount uint64
sampleSum float64
buckets map[float64]uint64

// Native histogram fields
nativeMode bool
Schema int32
ZeroThreshold float64
ZeroCount uint64
PositiveBuckets map[int]int64 // bucket index -> count
NegativeBuckets map[int]int64
}

// isNative returns true if the dto.Histogram carries native histogram data.
// The authoritative signal is Schema being set (non-nil), which client_golang
// always populates for native/classic histograms regardless of observation count.
// Spans alone are insufficient because a zero-observation dual histogram has
// no spans but still has Schema set.
func isNative(histo *dto.Histogram) bool {
return histo.Schema != nil
}

func (d *HistogramData) hasNative() bool {
return d.nativeMode
}

// spansCountsToBucketMap converts native histogram spans and absolute bucket counts
// into a map of bucket index -> count for easier aggregation.
func spansCountsToBucketMap(spans []*dto.BucketSpan, counts []int64) map[int]int64 {
if len(spans) == 0 {
return nil
}
bucketMap := make(map[int]int64, len(counts))
var idx int32
bucketIdx := 0
for _, sp := range spans {
idx += sp.GetOffset()
for j := 0; j < int(sp.GetLength()) && bucketIdx < len(counts); j++ {
bucketMap[int(idx)] += counts[bucketIdx]
idx++
bucketIdx++
}
}
return bucketMap
}

// deltasToCountsInt converts delta-encoded bucket counts to absolute counts.
func deltasToCountsInt(deltas []int64) []int64 {
counts := make([]int64, len(deltas))
var cur int64
for i, d := range deltas {
cur += int64(d)
counts[i] = cur
}
return counts
}

// mergeBucketMaps merges src bucket map into dst bucket map by summing counts for each bucket index.
func mergeBucketMaps(dst, src map[int]int64) map[int]int64 {
if dst == nil {
dst = make(map[int]int64)
}
for idx, count := range src {
dst[idx] += count
}
return dst
}

// AddHistogram adds histogram from gathered metrics to this histogram data.
Expand All @@ -481,6 +549,26 @@ func (d *HistogramData) AddHistogram(histo *dto.Histogram) {
d.sampleCount += histo.GetSampleCount()
d.sampleSum += histo.GetSampleSum()

if isNative(histo) {
// Initialize schema/threshold on first native histogram
if !d.hasNative() {
d.Schema = histo.GetSchema()
d.ZeroThreshold = histo.GetZeroThreshold()
d.nativeMode = true
}
d.ZeroCount += histo.GetZeroCount()

posCounts := deltasToCountsInt(histo.GetPositiveDelta())
negCounts := deltasToCountsInt(histo.GetNegativeDelta())

posMap := spansCountsToBucketMap(histo.GetPositiveSpan(), posCounts)
negMap := spansCountsToBucketMap(histo.GetNegativeSpan(), negCounts)

d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, posMap)
d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, negMap)
}

// Always collect classic buckets
histoBuckets := histo.GetBucket()
if len(histoBuckets) > 0 && d.buckets == nil {
d.buckets = map[float64]uint64{}
Expand All @@ -499,7 +587,18 @@ func (d *HistogramData) AddHistogram(histo *dto.Histogram) {
func (d *HistogramData) AddHistogramData(histo HistogramData) {
d.sampleCount += histo.sampleCount
d.sampleSum += histo.sampleSum
if histo.hasNative() {
if !d.hasNative() {
d.Schema = histo.Schema
d.ZeroThreshold = histo.ZeroThreshold
d.nativeMode = true
}
d.ZeroCount += histo.ZeroCount
d.PositiveBuckets = mergeBucketMaps(d.PositiveBuckets, histo.PositiveBuckets)
d.NegativeBuckets = mergeBucketMaps(d.NegativeBuckets, histo.NegativeBuckets)
}

// Always merge classic buckets.
if len(histo.buckets) > 0 && d.buckets == nil {
d.buckets = map[float64]uint64{}
}
Expand All @@ -510,11 +609,84 @@ func (d *HistogramData) AddHistogramData(histo HistogramData) {
}
}

// nativeHistogramMetric is basically the same as constNativeHistogram struct in prometheus histogram.go
// we need to create this new struct because the existing method NewConstNativeHistogram method in prometheus
// does not populate classic histogram fields. without this the NH compatible metrics are only exposed in NH format
// and classic histogram buckets are not exposed.
type nativeHistogramMetric struct {
desc *prometheus.Desc
dto.Histogram
labelPairs []*dto.LabelPair
}

func (m *nativeHistogramMetric) Desc() *prometheus.Desc { return m.desc }
func (m *nativeHistogramMetric) Write(out *dto.Metric) error {
out.Histogram = &m.Histogram
out.Label = m.labelPairs
return nil
}

// Metric returns prometheus metric from this histogram data.
//
// Note that returned metric shares bucket with this HistogramData, so avoid
// doing more modifications to this HistogramData after calling Metric.
func (d *HistogramData) Metric(desc *prometheus.Desc, labelValues ...string) prometheus.Metric {
if d.hasNative() {
// Build native spans+deltas from bucket maps.
posSpans, posDeltas := makeBucketsFromMap(d.PositiveBuckets)
negSpans, negDeltas := makeBucketsFromMap(d.NegativeBuckets)

schema := d.Schema
zt := d.ZeroThreshold
sc := d.sampleCount
ss := d.sampleSum
zc := d.ZeroCount

// Build classic buckets if available
var buckets []*dto.Bucket
if len(d.buckets) > 0 {
buckets = make([]*dto.Bucket, 0, len(d.buckets))
for ub, cc := range d.buckets {
upperBound := ub
cumCount := cc
buckets = append(buckets, &dto.Bucket{
UpperBound: &upperBound,
CumulativeCount: &cumCount,
})
}
sort.Slice(buckets, func(i, j int) bool {
return buckets[i].GetUpperBound() < buckets[j].GetUpperBound()
})
}

// Sentinel span for native histograms with no observations.
// This is required to distinguish an empty native histogram from a classic histogram.
// This matches the prometheus behavior (histogram.go:1958)
if zt == 0 && zc == 0 && len(posSpans) == 0 && len(negSpans) == 0 {
posSpans = []*dto.BucketSpan{{
Offset: proto.Int32(0),
Length: proto.Uint32(0),
}}
}

// Construct histogram in-place within the struct (no intermediate copy)
return &nativeHistogramMetric{
desc: desc,
Histogram: dto.Histogram{
Schema: &schema,
ZeroThreshold: &zt,
SampleCount: &sc,
SampleSum: &ss,
ZeroCount: &zc,
PositiveSpan: posSpans,
PositiveDelta: posDeltas,
NegativeSpan: negSpans,
NegativeDelta: negDeltas,
Bucket: buckets,
},
labelPairs: prometheus.MakeLabelPairs(desc, labelValues),
}
}
return prometheus.MustNewConstHistogram(desc, d.sampleCount, d.sampleSum, d.buckets, labelValues...)
}

Expand Down Expand Up @@ -897,24 +1069,120 @@ func mergeCounter(mf1, mf2 *dto.Metric) {
}

func mergeHistogram(mf1, mf2 *dto.Metric) {
bucketMap := map[float64]uint64{}

for _, bucket := range append(mf1.Histogram.GetBucket(), mf2.Histogram.GetBucket()...) {
bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount()
}

var newBucket []*dto.Bucket
for upperBound, cumulativeCount := range bucketMap {
ubValue := upperBound
ccValue := cumulativeCount
newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue})
}

newSampleCount := *mf1.Histogram.SampleCount + *mf2.Histogram.SampleCount
newSampleSum := *mf1.Histogram.SampleSum + *mf2.Histogram.SampleSum
mf1.Histogram.Bucket = newBucket
mf1.Histogram.SampleCount = &newSampleCount
mf1.Histogram.SampleSum = &newSampleSum

h1 := mf1.Histogram
h2 := mf2.Histogram

// Merge native histogram data if present.
// We'll process both native AND classic data below and expose in both formats
if isNative(h1) || isNative(h2) {
// Use schema/threshold from whichever side has native data (they should match).
if !isNative(h1) {
schema := h2.GetSchema()
h1.Schema = &schema
zt := h2.GetZeroThreshold()
h1.ZeroThreshold = &zt
}

// Merge zero bucket counts
zc := h1.GetZeroCount() + h2.GetZeroCount()
h1.ZeroCount = &zc

// Convert spans+deltas to bucket maps, merge them, then convert back
posCounts1 := deltasToCountsInt(h1.GetPositiveDelta())
posCounts2 := deltasToCountsInt(h2.GetPositiveDelta())
negCounts1 := deltasToCountsInt(h1.GetNegativeDelta())
negCounts2 := deltasToCountsInt(h2.GetNegativeDelta())

posMap := mergeBucketMaps(
spansCountsToBucketMap(h1.GetPositiveSpan(), posCounts1),
spansCountsToBucketMap(h2.GetPositiveSpan(), posCounts2),
)
negMap := mergeBucketMaps(
spansCountsToBucketMap(h1.GetNegativeSpan(), negCounts1),
spansCountsToBucketMap(h2.GetNegativeSpan(), negCounts2),
)

h1.PositiveSpan, h1.PositiveDelta = makeBucketsFromMap(posMap)
h1.NegativeSpan, h1.NegativeDelta = makeBucketsFromMap(negMap)
}

// Merge classic histogram buckets if present.
if len(h1.GetBucket()) > 0 || len(h2.GetBucket()) > 0 {
bucketMap := map[float64]uint64{}
for _, bucket := range append(h1.GetBucket(), h2.GetBucket()...) {
bucketMap[bucket.GetUpperBound()] += bucket.GetCumulativeCount()
}

var newBucket []*dto.Bucket
for upperBound, cumulativeCount := range bucketMap {
ubValue := upperBound
ccValue := cumulativeCount
newBucket = append(newBucket, &dto.Bucket{UpperBound: &ubValue, CumulativeCount: &ccValue})
}
h1.Bucket = newBucket
}
}

// bucketMapToSpansDeltas converts a bucket index->count map back into the
// spans+deltas encoding used by the native histogram proto representation.
//
// This implementation is the same as makeBucketsFromMap from prometheus
// (histogram.go:2006) to include the gap-filling optimization
func makeBucketsFromMap(buckets map[int]int64) ([]*dto.BucketSpan, []int64) {
if len(buckets) == 0 {
return nil, nil
}
var ii []int
for k := range buckets {
ii = append(ii, k)
}
sort.Ints(ii)

var (
spans []*dto.BucketSpan
deltas []int64
prevCount int64
nextI int
)

appendDelta := func(count int64) {
*spans[len(spans)-1].Length++
deltas = append(deltas, count-prevCount)
prevCount = count
}

for n, i := range ii {
count := buckets[i]
// Multiple spans with only small gaps in between are probably
// encoded more efficiently as one larger span with a few empty
// buckets. Needs some research to find the sweet spot. For now,
// we assume that gaps of one or two buckets should not create
// a new span.
iDelta := int32(i - nextI)
if n == 0 || iDelta > 2 {
// We have to create a new span, either because we are
// at the very beginning, or because we have found a gap
// of more than two buckets.
spans = append(spans, &dto.BucketSpan{
Offset: proto.Int32(iDelta),
Length: proto.Uint32(0),
})
} else {
// We have found a small gap (or no gap at all).
// Insert empty buckets as needed.
for range iDelta {
appendDelta(0)
}
}
appendDelta(count)
nextI = i + 1
}
return spans, deltas
}

func mergeSummary(mf1 *dto.Metric, mf2 *dto.Metric) {
Expand Down
Loading
Loading