Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
64f47b9
Allow watch streaming option for lister watcher to prevent high memor…
zkdlin211 Nov 19, 2025
7035198
amend
zkdlin211 Jan 12, 2026
b5acb6e
break the loop if bookmark is received, and return error if no bookma…
zkdlin211 Jan 12, 2026
92bd2b3
amend
zkdlin211 Jan 12, 2026
59d3376
Merge pull request #1038 from zkdlin211/lister-watcher-streaming
zkdlin211 Jan 12, 2026
976746c
Bump event-exporter verion to v0.5.9
zkdlin211 Jan 12, 2026
7d9741d
Merge pull request #1039 from zkdlin211/lister-watcher-streaming
zkdlin211 Jan 12, 2026
c2d68b6
Add restart attempt and uid labels for jobset events (#1040)
chelseychen Jan 14, 2026
f27de56
iBump event-exporter verion to v0.5.10 (#1042)
chelseychen Jan 14, 2026
0e0b237
Enable streaming listWatcher only if the required k8s feature gate is…
zkdlin211 Jan 15, 2026
b6f1fb8
Enable streaming listWatcher only if the required k8s feature gate is…
zkdlin211 Jan 15, 2026
41eb8fb
Merge pull request #1043 from zkdlin211/lister-watcher-streaming
zkdlin211 Jan 15, 2026
439e26b
Bump up golang.org/x/crypto to 0.45.0 for event-exporter
zkdlin211 Jan 15, 2026
16f1492
Merge pull request #1044 from zkdlin211/vul-fix
zkdlin211 Jan 15, 2026
8c71d26
update extractAllLabels function to filter out labels that aren't met…
jinyigke Jan 20, 2026
a207279
add a unit test and bump the version
jinyigke Jan 20, 2026
a53abe2
fix the test after updating extractAllLabels
jinyigke Jan 22, 2026
30000d2
Merge pull request #1045 from jinyigke/jinyi_filterMetriclabel
waterloong Jan 23, 2026
fcd00d0
Add restart attempt and jobset uid labels in pod owner transform
chelseychen Jan 26, 2026
22cb3b5
Merge pull request #1046 from chelseychen/restart-attempt
chelseychen Jan 26, 2026
f075aa7
Bump event-exporter verion to v0.5.11
chelseychen Jan 26, 2026
6c60316
Merge pull request #1047 from chelseychen/restart-attempt
chelseychen Jan 26, 2026
c750d3c
Add CODEOWNERS for top-level components
erain Mar 9, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/CODEOWNERS
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
# Default owner for the repository.
* @erain

# Repository-level configuration and shared docs.
/.github/ @erain
/README.md @erain
/CONTRIBUTING.md @erain
/hack/ @erain
/archived/ @erain

# Component ownership.
/custom-metrics-stackdriver-adapter/ @juli4n
/event-exporter/ @courageJ @erain
/fluentd-gcp-scaler/ @erain
/kubelet-to-gcm/ @erain
/prometheus-to-sd/ @erain
2 changes: 1 addition & 1 deletion event-exporter/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ ALL_ARCH=amd64 arm64
IMAGE_NAME = event-exporter

PREFIX ?= staging-k8s.gcr.io
TAG ?= v0.5.8
TAG ?= v0.5.11

IMAGE=$(PREFIX)/$(IMAGE_NAME)

Expand Down
19 changes: 10 additions & 9 deletions event-exporter/event_exporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,20 +37,21 @@ func (e *eventExporter) Run(stopCh <-chan struct{}) {
utils.RunConcurrentlyUntil(stopCh, e.sink.Run, e.watcher.Run)
}

func newEventExporter(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration, eventLabelSelector labels.Selector, listerWatcherOptionsLimit int64, storageType watchers.StorageType) *eventExporter {
func newEventExporter(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration, eventLabelSelector labels.Selector, listerWatcherOptionsLimit int64, listerWatcherEnableStreaming bool, storageType watchers.StorageType) *eventExporter {
return &eventExporter{
sink: sink,
watcher: createWatcher(client, sink, resyncPeriod, eventLabelSelector, listerWatcherOptionsLimit, storageType),
watcher: createWatcher(client, sink, resyncPeriod, eventLabelSelector, listerWatcherOptionsLimit, listerWatcherEnableStreaming, storageType),
}
}

func createWatcher(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration, eventLabelSelector labels.Selector, listerWatcherOptionsLimit int64, storageType watchers.StorageType) watchers.Watcher {
func createWatcher(client kubernetes.Interface, sink sinks.Sink, resyncPeriod time.Duration, eventLabelSelector labels.Selector, listerWatcherOptionsLimit int64, listerWatcherEnableStreaming bool, storageType watchers.StorageType) watchers.Watcher {
return events.NewEventWatcher(client, &events.EventWatcherConfig{
OnList: sink.OnList,
ResyncPeriod: resyncPeriod,
Handler: sink,
EventLabelSelector: eventLabelSelector,
ListerWatcherOptionsLimit: listerWatcherOptionsLimit,
StorageType: storageType,
OnList: sink.OnList,
ResyncPeriod: resyncPeriod,
Handler: sink,
EventLabelSelector: eventLabelSelector,
ListerWatcherOptionsLimit: listerWatcherOptionsLimit,
ListerWatcherEnableStreaming: listerWatcherEnableStreaming,
StorageType: storageType,
})
}
10 changes: 5 additions & 5 deletions event-exporter/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ require (
github.com/golang/glog v1.2.4
github.com/google/go-cmp v0.7.0
github.com/prometheus/client_golang v1.21.1
golang.org/x/net v0.39.0
golang.org/x/net v0.47.0
google.golang.org/api v0.231.0
k8s.io/api v0.33.3
k8s.io/apimachinery v0.33.3
Expand Down Expand Up @@ -55,11 +55,11 @@ require (
go.opentelemetry.io/otel v1.35.0 // indirect
go.opentelemetry.io/otel/metric v1.35.0 // indirect
go.opentelemetry.io/otel/trace v1.35.0 // indirect
golang.org/x/crypto v0.37.0 // indirect
golang.org/x/crypto v0.45.0 // indirect
golang.org/x/oauth2 v0.29.0 // indirect
golang.org/x/sys v0.32.0 // indirect
golang.org/x/term v0.31.0 // indirect
golang.org/x/text v0.24.0 // indirect
golang.org/x/sys v0.38.0 // indirect
golang.org/x/term v0.37.0 // indirect
golang.org/x/text v0.31.0 // indirect
golang.org/x/time v0.11.0 // indirect
google.golang.org/genproto/googleapis/api v0.0.0-20250414145226-207652e42e2e // indirect
google.golang.org/genproto/googleapis/rpc v0.0.0-20250425173222-7b384671a197 // indirect
Expand Down
28 changes: 14 additions & 14 deletions event-exporter/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -136,42 +136,42 @@ go.uber.org/goleak v1.3.0/go.mod h1:CoHD4mav9JJNrW/WLlf7HGZPjdw8EucARQHekz1X6bE=
golang.org/x/crypto v0.0.0-20190308221718-c2843e01d9a2/go.mod h1:djNgcEr1/C05ACkg1iLfiJU5Ep61QUkGW8qpdssI0+w=
golang.org/x/crypto v0.0.0-20191011191535-87dc89f01550/go.mod h1:yigFU9vqHzYiE8UmvKecakEJjdnWj3jj499lnFckfCI=
golang.org/x/crypto v0.0.0-20200622213623-75b288015ac9/go.mod h1:LzIPMQfyMNhhGPhUkYOs5KpL4U8rLKemX1yGLhDgUto=
golang.org/x/crypto v0.37.0 h1:kJNSjF/Xp7kU0iB2Z+9viTPMW4EqqsrywMXLJOOsXSE=
golang.org/x/crypto v0.37.0/go.mod h1:vg+k43peMZ0pUMhYmVAWysMK35e6ioLh3wB8ZCAfbVc=
golang.org/x/crypto v0.45.0 h1:jMBrvKuj23MTlT0bQEOBcAE0mjg8mK9RXFhRH6nyF3Q=
golang.org/x/crypto v0.45.0/go.mod h1:XTGrrkGJve7CYK7J8PEww4aY7gM3qMCElcJQ8n8JdX4=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20190404232315-eb5bcb51f2a3/go.mod h1:t9HGtf8HONx5eT2rtn7q6eTqICYqUVnKs3thJo3Qplg=
golang.org/x/net v0.0.0-20190620200207-3b0461eec859/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20200226121028-0de0cce0169b/go.mod h1:z5CRVTTTmAJ677TzLLGU+0bjPO0LkuOLi4/5GtJWs/s=
golang.org/x/net v0.0.0-20201021035429-f5854403a974/go.mod h1:sp8m0HH+o8qH0wwXwYZr8TS3Oi6o0r6Gce1SSxlDquU=
golang.org/x/net v0.39.0 h1:ZCu7HMWDxpXpaiKdhzIfaltL9Lp31x/3fCP11bc6/fY=
golang.org/x/net v0.39.0/go.mod h1:X7NRbYVEA+ewNkCNyJ513WmMdQ3BineSwVtN2zD/d+E=
golang.org/x/net v0.47.0 h1:Mx+4dIFzqraBXUugkia1OOvlD6LemFo1ALMHjrXDOhY=
golang.org/x/net v0.47.0/go.mod h1:/jNxtkgq5yWUGYkaZGqo27cfGZ1c5Nen03aYrrKpVRU=
golang.org/x/oauth2 v0.29.0 h1:WdYw2tdTK1S8olAzWHdgeqfy+Mtm9XNhv/xJsY65d98=
golang.org/x/oauth2 v0.29.0/go.mod h1:onh5ek6nERTohokkhCD/y2cV4Do3fxFHFuAejCkRWT8=
golang.org/x/sync v0.0.0-20190423024810-112230192c58/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.0.0-20201020160332-67f06af15bc9/go.mod h1:RxMgew5VJxzue5/jJTE5uejpjVlOe/izrB70Jof72aM=
golang.org/x/sync v0.13.0 h1:AauUjRAJ9OSnvULf/ARrrVywoJDy0YS2AwQ98I37610=
golang.org/x/sync v0.13.0/go.mod h1:1dzgHSNfp02xaA81J2MS99Qcpr2w7fw1gpm99rleRqA=
golang.org/x/sync v0.18.0 h1:kr88TuHDroi+UVf+0hZnirlk8o8T+4MrK6mr60WkH/I=
golang.org/x/sync v0.18.0/go.mod h1:9KTHXmSnoGruLpwFjVSX0lNNA75CykiMECbovNTZqGI=
golang.org/x/sys v0.0.0-20190215142949-d0b11bdaac8a/go.mod h1:STP8DvDyc/dI5b8T5hshtkjS+E42TnysNCUPdjciGhY=
golang.org/x/sys v0.0.0-20190412213103-97732733099d/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.0.0-20200930185726-fdedc70b468f/go.mod h1:h1NjWce9XRLGQEsW7wpKNCjG9DtNlClVuFLEZdDNbEs=
golang.org/x/sys v0.32.0 h1:s77OFDvIQeibCmezSnk/q6iAfkdiQaJi4VzroCFrN20=
golang.org/x/sys v0.32.0/go.mod h1:BJP2sWEmIv4KK5OTEluFJCKSidICx8ciO85XgH3Ak8k=
golang.org/x/term v0.31.0 h1:erwDkOK1Msy6offm1mOgvspSkslFnIGsFnxOKoufg3o=
golang.org/x/term v0.31.0/go.mod h1:R4BeIy7D95HzImkxGkTW1UQTtP54tio2RyHz7PwK0aw=
golang.org/x/sys v0.38.0 h1:3yZWxaJjBmCWXqhN1qh02AkOnCQ1poK6oF+a7xWL6Gc=
golang.org/x/sys v0.38.0/go.mod h1:OgkHotnGiDImocRcuBABYBEXf8A9a87e/uXjp9XT3ks=
golang.org/x/term v0.37.0 h1:8EGAD0qCmHYZg6J17DvsMy9/wJ7/D/4pV/wfnld5lTU=
golang.org/x/term v0.37.0/go.mod h1:5pB4lxRNYYVZuTLmy8oR2BH8dflOR+IbTYFD8fi3254=
golang.org/x/text v0.3.0/go.mod h1:NqM8EUOU14njkJ3fqMW+pc6Ldnwhi/IjpwHt7yyuwOQ=
golang.org/x/text v0.3.3/go.mod h1:5Zoc/QRtKVWzQhOtBMvqHzDpF6irO9z98xDceosuGiQ=
golang.org/x/text v0.24.0 h1:dd5Bzh4yt5KYA8f9CJHCP4FB4D51c2c6JvN37xJJkJ0=
golang.org/x/text v0.24.0/go.mod h1:L8rBsPeo2pSS+xqN0d5u2ikmjtmoJbDBT1b7nHvFCdU=
golang.org/x/text v0.31.0 h1:aC8ghyu4JhP8VojJ2lEHBnochRno1sgL6nEi9WGFGMM=
golang.org/x/text v0.31.0/go.mod h1:tKRAlv61yKIjGGHX/4tP1LTbc13YSec1pxVEWXzfoeM=
golang.org/x/time v0.11.0 h1:/bpjEDfN9tkoN/ryeYHnv5hcMlc8ncjMcM4XBk5NWV0=
golang.org/x/time v0.11.0/go.mod h1:CDIdPxbZBQxdj6cxyCIdrNogrJKMJ7pr37NYpMcMDSg=
golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ=
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20200619180055-7c47624df98f/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/tools v0.0.0-20210106214847-113979e3529a/go.mod h1:emZCQorbCU4vsT4fOWvOPXz4eW1wZW4PmDk9uLelYpA=
golang.org/x/tools v0.26.0 h1:v/60pFQmzmT9ExmjDv2gGIfi3OqfKoEP6I5+umXlbnQ=
golang.org/x/tools v0.26.0/go.mod h1:TPVVj70c7JJ3WCazhD8OdXcZg/og+b9+tH/KxylGwH0=
golang.org/x/tools v0.38.0 h1:Hx2Xv8hISq8Lm16jvBZ2VQf+RLmbd7wVUsALibYI/IQ=
golang.org/x/tools v0.38.0/go.mod h1:yEsQ/d/YK8cjh0L6rZlY8tgtlKiBNTL14pGDJPJpYQs=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191204190536-9bdfabe68543/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down
8 changes: 5 additions & 3 deletions event-exporter/kubernetes/podlabels/label_utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,11 @@ import (
)

const (
ownerTypeKeyName = "logging.gke.io/top_level_controller_type"
ownerNameKeyName = "logging.gke.io/top_level_controller_name"
jobSetNameLabelKey = "jobset.sigs.k8s.io/jobset-name"
ownerTypeKeyName = "logging.gke.io/top_level_controller_type"
ownerNameKeyName = "logging.gke.io/top_level_controller_name"
jobSetNameLabelKey = "jobset.sigs.k8s.io/jobset-name"
jobSetRestartAttemptLabelKey = "jobset.sigs.k8s.io/restart-attempt"
jobsetUIDLabelKey = "jobset.sigs.k8s.io/jobset-uid"
)

// matches suffixes containing number between 20000000 to 59999999
Expand Down
14 changes: 14 additions & 0 deletions event-exporter/kubernetes/podlabels/pod_labels_informer.go
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,12 @@ func NewPodLabelsSharedInformerFactory(client kubernetes.Interface, ignoredNames
if v, ok := pod.Labels[jobSetNameLabelKey]; ok {
labels[jobSetNameLabelKey] = v
}
if v, ok := pod.Labels[jobSetRestartAttemptLabelKey]; ok {
labels[jobSetRestartAttemptLabelKey] = v
}
if v, ok := pod.Labels[jobsetUIDLabelKey]; ok {
labels[jobsetUIDLabelKey] = v
}
return &corev1.Pod{
ObjectMeta: metav1.ObjectMeta{
Name: pod.Name,
Expand Down Expand Up @@ -121,6 +127,14 @@ func getLabelsFromPod(pod *corev1.Pod) map[string]string {
// Pod that is eventually owned by a JobSet has the jobset name label set.
transformedLabels[ownerTypeKeyName] = "JobSet"
transformedLabels[ownerNameKeyName] = jobsetName

// Add restart_attempt and uid labels for JobSet events.
if restartAttempt, ok := pod.GetObjectMeta().GetLabels()[jobSetRestartAttemptLabelKey]; ok {
transformedLabels[jobSetRestartAttemptLabelKey] = restartAttempt
}
if uid, ok := pod.GetObjectMeta().GetLabels()[jobsetUIDLabelKey]; ok {
transformedLabels[jobsetUIDLabelKey] = uid
}
} else {
transformedLabels[ownerTypeKeyName] = "Job"
transformedLabels[ownerNameKeyName] = owner.Name
Expand Down
20 changes: 20 additions & 0 deletions event-exporter/kubernetes/podlabels/pod_labels_informer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,6 +153,26 @@ func TestGetLabelsFromPod(t *testing.T) {
"logging.gke.io/top_level_controller_name": "training-sample",
},
},
{
description: "correct jobset owner, restart attempt and uid labels returned for pod",
pod: makePod(mockPodOptions{
Labels: map[string]string{
"jobset.sigs.k8s.io/jobset-name": "training-sample",
"jobset.sigs.k8s.io/restart-attempt": "5",
"jobset.sigs.k8s.io/jobset-uid": "fake-uid",
},
OwnerReference: metav1.OwnerReference{
Kind: "Job",
Name: "training-sample-0",
},
}),
wantLabels: map[string]string{
"logging.gke.io/top_level_controller_type": "JobSet",
"logging.gke.io/top_level_controller_name": "training-sample",
"jobset.sigs.k8s.io/restart-attempt": "5",
"jobset.sigs.k8s.io/jobset-uid": "fake-uid",
},
},
}

for _, tc := range testCases {
Expand Down
133 changes: 117 additions & 16 deletions event-exporter/kubernetes/watchers/events/watcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package events

import (
"context"
"errors"
"regexp"
"time"

corev1 "k8s.io/api/core/v1"
Expand All @@ -29,6 +31,7 @@ import (
"k8s.io/client-go/tools/cache"

"github.com/GoogleCloudPlatform/k8s-stackdriver/event-exporter/kubernetes/watchers"
"github.com/golang/glog"
)

const (
Expand Down Expand Up @@ -59,32 +62,38 @@ type EventWatcherConfig struct {
// there can be many, e.g. because of network problems. Note also, that
// items in the List response WILL NOT trigger OnAdd method in handler,
// instead Store contents will be completely replaced.
OnList OnListFunc
ResyncPeriod time.Duration
Handler EventHandler
EventLabelSelector labels.Selector
ListerWatcherOptionsLimit int64
StorageType watchers.StorageType
OnList OnListFunc
ResyncPeriod time.Duration
Handler EventHandler
EventLabelSelector labels.Selector
ListerWatcherOptionsLimit int64
ListerWatcherEnableStreaming bool
StorageType watchers.StorageType
}

// NewEventWatcher create a new watcher that only watches the events resource.
func NewEventWatcher(client kubernetes.Interface, config *EventWatcherConfig) watchers.Watcher {
watchListFeatureGateEnabled := IsFeatureGateEnabled(client, "WatchList")
glog.Infof("Feature gate WatchList is enabled: %v", watchListFeatureGateEnabled)
return watchers.NewWatcher(&watchers.WatcherConfig{
// List and watch events in all namespaces.
ListerWatcher: &cache.ListWatch{
ListFunc: func(options meta_v1.ListOptions) (runtime.Object, error) {
if config.ListerWatcherOptionsLimit > 0 {
options.Limit = config.ListerWatcherOptionsLimit
if config.ListerWatcherEnableStreaming && watchListFeatureGateEnabled {
return streamingListEvents(client, config, options)
} else {
if config.ListerWatcherOptionsLimit > 0 {
options.Limit = config.ListerWatcherOptionsLimit
}
options.LabelSelector = config.EventLabelSelector.String()
list, err := client.CoreV1().Events(meta_v1.NamespaceAll).List(context.TODO(), options)
if err == nil {
config.OnList(list)
}
return list, err
}
options.LabelSelector = config.EventLabelSelector.String()
list, err := client.CoreV1().Events(meta_v1.NamespaceAll).List(context.TODO(), options)
if err == nil {
config.OnList(list)
// Clear items to prevent Reflector from buffering them in memeory.
list.Items = []corev1.Event{}
}
return list, err
},

WatchFunc: func(options meta_v1.ListOptions) (watch.Interface, error) {
options.LabelSelector = config.EventLabelSelector.String()
return client.CoreV1().Events(meta_v1.NamespaceAll).Watch(context.TODO(), options)
Expand All @@ -101,3 +110,95 @@ func NewEventWatcher(client kubernetes.Interface, config *EventWatcherConfig) wa
WatchListPageSize: eventWatchListPageSize,
})
}

// streamingListEvents uses Streaming List (SendInitialEvents=true) to avoid buffering.
// This allows us to process initial events incrementally.
func streamingListEvents(client kubernetes.Interface, config *EventWatcherConfig, options meta_v1.ListOptions) (runtime.Object, error) {
sendInitialEvents := true
options.SendInitialEvents = &sendInitialEvents
options.ResourceVersionMatch = meta_v1.ResourceVersionMatchNotOlderThan
options.Watch = true
options.LabelSelector = config.EventLabelSelector.String()
options.AllowWatchBookmarks = true

// Perform the streaming list (actually a Watch)
watcher, err := client.CoreV1().Events(meta_v1.NamespaceAll).Watch(context.TODO(), options)
if err != nil {
return nil, err
}
defer watcher.Stop()

// Call OnList once to start the sink (it just logs "Started watching")
config.OnList(&corev1.EventList{})

lastRV := ""
bookmarkReceived := false

eventLoop:
for event := range watcher.ResultChan() {
if meta, ok := event.Object.(meta_v1.Object); ok {
lastRV = meta.GetResourceVersion()
}

switch event.Type {
case watch.Added:
if e, ok := event.Object.(*corev1.Event); ok {
// Manually pass to handler since we bypass Reflector's store
config.Handler.OnAdd(e)
}
case watch.Bookmark:
// Check for the annotation that signals the initial list is done.
if m, ok := event.Object.(meta_v1.Object); ok {
if val, ok := m.GetAnnotations()["k8s.io/initial-events-end"]; ok && val == "true" {
// Close the channel and break the loop
bookmarkReceived = true
break eventLoop
}
}
case watch.Error:
// If we get an error, Reflector will retry ListFunc anyway.
// We can return the error here to trigger that retry.
if status, ok := event.Object.(*meta_v1.Status); ok {
return nil, errors.New(status.Message)
}
}
}

if !bookmarkReceived {
// If we exited the loop without receiving the bookmark, something went wrong.
return nil, errors.New("streaming list ended without receiving initial-events-end bookmark")
}

// Return an empty list with the correct ResourceVersion.
// Reflector will then start Watching from this version.
return &corev1.EventList{
ListMeta: meta_v1.ListMeta{
ResourceVersion: lastRV,
},
Items: []corev1.Event{},
}, nil
}

func IsFeatureGateEnabled(client kubernetes.Interface, featureName string) bool {
// Request raw metrics from the API server
data, err := client.CoreV1().RESTClient().Get().
AbsPath("/metrics").
SetHeader("Accept", "text/plain").
DoRaw(context.TODO())

if err != nil {
glog.Errorf("fail to get raw metrics: %v", err)
return false
}

// Pattern explained:
// 1. Match the metric name and the feature name label
// 2. [^}]* matches any other labels (like stage="BETA")
// 3. \s+ matches the whitespace before the value
// 4. (1(\.0+)?) matches "1" or "1.0", "1.00", etc.
// 5. (\s+|$) ensures it's the end of the value (whitespace or end of line)
pattern := `kubernetes_feature_enabled\{name="` + featureName + `"[^}]*\}\s+(1(\.0+)?)(\s+|$)`
re := regexp.MustCompile(pattern)

return re.Match(data)
}
Loading
Loading