Skip to content

Commit 689619f

Browse files
committed
refactor(timer): centralize timer management with utility functions
1 parent 4550129 commit 689619f

9 files changed

Lines changed: 187 additions & 58 deletions

File tree

pkg/ring/basic_lifecycler.go

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616

1717
"github.com/cortexproject/cortex/pkg/ring/kv"
1818
"github.com/cortexproject/cortex/pkg/util/services"
19+
utiltimer "github.com/cortexproject/cortex/pkg/util/timer"
1920
)
2021

2122
type BasicLifecyclerDelegate interface {
@@ -328,7 +329,7 @@ func (l *BasicLifecycler) waitStableTokens(ctx context.Context, period time.Dura
328329
// The first observation will occur after the specified period.
329330
level.Info(l.logger).Log("msg", "waiting stable tokens", "ring", l.ringName)
330331
observeTimer := time.NewTimer(period)
331-
defer stopAndDrainTimer(observeTimer)
332+
defer utiltimer.StopAndDrainTimer(observeTimer)
332333
observeChan := observeTimer.C
333334

334335
for {
@@ -337,7 +338,7 @@ func (l *BasicLifecycler) waitStableTokens(ctx context.Context, period time.Dura
337338
if !l.verifyTokens(ctx) {
338339
// The verification has failed
339340
level.Info(l.logger).Log("msg", "tokens verification failed, keep observing", "ring", l.ringName)
340-
resetTimer(observeTimer, period)
341+
utiltimer.ResetTimer(observeTimer, period)
341342
break
342343
}
343344

pkg/ring/kv/dynamodb/client.go

Lines changed: 3 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/cortexproject/cortex/pkg/ring/kv/codec"
1515
"github.com/cortexproject/cortex/pkg/util/backoff"
16+
utiltimer "github.com/cortexproject/cortex/pkg/util/timer"
1617
)
1718

1819
const (
@@ -267,7 +268,7 @@ func (c *Client) WatchKey(ctx context.Context, key string, f func(any) bool) {
267268
}
268269

269270
bo.Reset()
270-
resetTimer(syncTimer, c.pullerSyncTime)
271+
utiltimer.ResetTimer(syncTimer, c.pullerSyncTime)
271272
select {
272273
case <-ctx.Done():
273274
return
@@ -305,7 +306,7 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
305306
}
306307

307308
bo.Reset()
308-
resetTimer(syncTimer, c.pullerSyncTime)
309+
utiltimer.ResetTimer(syncTimer, c.pullerSyncTime)
309310
select {
310311
case <-ctx.Done():
311312
return
@@ -314,16 +315,6 @@ func (c *Client) WatchPrefix(ctx context.Context, prefix string, f func(string,
314315
}
315316
}
316317

317-
func resetTimer(timer *time.Timer, d time.Duration) {
318-
if !timer.Stop() {
319-
select {
320-
case <-timer.C:
321-
default:
322-
}
323-
}
324-
timer.Reset(d)
325-
}
326-
327318
func (c *Client) decodeMultikey(data map[string]dynamodbItem) (codec.MultiKey, error) {
328319
multiKeyData := make(map[string][]byte, len(data))
329320
for key, ddbItem := range data {

pkg/ring/kv/dynamodb/client_timer_benchmark_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,8 @@ package dynamodb
33
import (
44
"testing"
55
"time"
6+
7+
utiltimer "github.com/cortexproject/cortex/pkg/util/timer"
68
)
79

810
func BenchmarkWatchLoopWaitWithTimeAfter(b *testing.B) {
@@ -30,7 +32,7 @@ func BenchmarkWatchLoopWaitWithReusableTimer(b *testing.B) {
3032
b.ReportAllocs()
3133

3234
for b.Loop() {
33-
resetTimer(timer, interval)
35+
utiltimer.ResetTimer(timer, interval)
3436

3537
select {
3638
case <-ctx.Done():

pkg/ring/lifecycler.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@ import (
2121
"github.com/cortexproject/cortex/pkg/ring/kv"
2222
"github.com/cortexproject/cortex/pkg/util/flagext"
2323
"github.com/cortexproject/cortex/pkg/util/services"
24+
utiltimer "github.com/cortexproject/cortex/pkg/util/timer"
2425
)
2526

2627
var (
@@ -535,7 +536,7 @@ func (i *Lifecycler) loop(ctx context.Context) error {
535536
if autoJoinTimer == nil {
536537
autoJoinTimer = time.NewTimer(d)
537538
} else {
538-
resetTimer(autoJoinTimer, d)
539+
utiltimer.ResetTimer(autoJoinTimer, d)
539540
}
540541
autoJoinAfter = autoJoinTimer.C
541542
}
@@ -544,13 +545,13 @@ func (i *Lifecycler) loop(ctx context.Context) error {
544545
if observeTimer == nil {
545546
observeTimer = time.NewTimer(d)
546547
} else {
547-
resetTimer(observeTimer, d)
548+
utiltimer.ResetTimer(observeTimer, d)
548549
}
549550
observeChan = observeTimer.C
550551
}
551552

552-
defer stopAndDrainTimer(autoJoinTimer)
553-
defer stopAndDrainTimer(observeTimer)
553+
defer utiltimer.StopAndDrainTimer(autoJoinTimer)
554+
defer utiltimer.StopAndDrainTimer(observeTimer)
554555

555556
if i.autoJoinOnStartup {
556557
setAutoJoinAfter(i.cfg.JoinAfter)
@@ -616,7 +617,7 @@ func (i *Lifecycler) loop(ctx context.Context) error {
616617
// When observing is done, observeChan is set to nil.
617618

618619
observeChan = nil
619-
stopAndDrainTimer(observeTimer)
620+
utiltimer.StopAndDrainTimer(observeTimer)
620621
if s := i.GetState(); s != JOINING {
621622
level.Error(i.logger).Log("msg", "unexpected state while observing tokens", "state", s, "ring", i.RingName)
622623
}

pkg/ring/ticker.go

Lines changed: 0 additions & 18 deletions
Original file line numberDiff line numberDiff line change
@@ -12,21 +12,3 @@ func newDisableableTicker(interval time.Duration) (func(), <-chan time.Time) {
1212
tick := time.NewTicker(interval)
1313
return func() { tick.Stop() }, tick.C
1414
}
15-
16-
func stopAndDrainTimer(timer *time.Timer) {
17-
if timer == nil {
18-
return
19-
}
20-
21-
if !timer.Stop() {
22-
select {
23-
case <-timer.C:
24-
default:
25-
}
26-
}
27-
}
28-
29-
func resetTimer(timer *time.Timer, d time.Duration) {
30-
stopAndDrainTimer(timer)
31-
timer.Reset(d)
32-
}

pkg/util/backoff/backoff.go

Lines changed: 4 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,8 @@ import (
66
"fmt"
77
"math/rand"
88
"time"
9+
10+
utiltimer "github.com/cortexproject/cortex/pkg/util/timer"
911
)
1012

1113
// Config configures a Backoff
@@ -81,12 +83,12 @@ func (b *Backoff) Wait() {
8183
if b.waitTimer == nil {
8284
b.waitTimer = time.NewTimer(sleepTime)
8385
} else {
84-
resetTimer(b.waitTimer, sleepTime)
86+
utiltimer.ResetTimer(b.waitTimer, sleepTime)
8587
}
8688

8789
select {
8890
case <-b.ctx.Done():
89-
stopAndDrainTimer(b.waitTimer)
91+
utiltimer.StopAndDrainTimer(b.waitTimer)
9092
case <-b.waitTimer.C:
9193
}
9294
}
@@ -123,21 +125,3 @@ func doubleDuration(value time.Duration, max time.Duration) time.Duration {
123125

124126
return max
125127
}
126-
127-
func stopAndDrainTimer(timer *time.Timer) {
128-
if timer == nil {
129-
return
130-
}
131-
132-
if !timer.Stop() {
133-
select {
134-
case <-timer.C:
135-
default:
136-
}
137-
}
138-
}
139-
140-
func resetTimer(timer *time.Timer, d time.Duration) {
141-
stopAndDrainTimer(timer)
142-
timer.Reset(d)
143-
}

pkg/util/backoff/backoff_test.go

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,8 @@ import (
44
"context"
55
"testing"
66
"time"
7+
8+
utiltimer "github.com/cortexproject/cortex/pkg/util/timer"
79
)
810

911
func TestBackoff_NextDelay(t *testing.T) {
@@ -100,3 +102,61 @@ func TestBackoff_NextDelay(t *testing.T) {
100102
})
101103
}
102104
}
105+
106+
func TestBackoff_WaitReusesTimer(t *testing.T) {
107+
t.Parallel()
108+
109+
b := New(context.Background(), Config{
110+
MinBackoff: time.Nanosecond,
111+
MaxBackoff: time.Nanosecond,
112+
MaxRetries: 0,
113+
})
114+
115+
b.Wait()
116+
if b.waitTimer == nil {
117+
t.Fatal("expected wait timer to be initialized")
118+
}
119+
120+
firstTimer := b.waitTimer
121+
122+
b.Wait()
123+
if b.waitTimer != firstTimer {
124+
t.Fatal("expected wait timer to be reused")
125+
}
126+
}
127+
128+
func TestBackoff_WaitReturnsWhenContextCancelled(t *testing.T) {
129+
t.Parallel()
130+
131+
ctx, cancel := context.WithCancel(context.Background())
132+
t.Cleanup(cancel)
133+
134+
b := New(ctx, Config{
135+
MinBackoff: time.Second,
136+
MaxBackoff: time.Second,
137+
MaxRetries: 0,
138+
})
139+
140+
go func() {
141+
time.Sleep(10 * time.Millisecond)
142+
cancel()
143+
}()
144+
145+
startedAt := time.Now()
146+
b.Wait()
147+
148+
if time.Since(startedAt) >= 900*time.Millisecond {
149+
t.Fatal("expected Wait to return quickly after context cancellation")
150+
}
151+
152+
if b.waitTimer == nil {
153+
t.Fatal("expected wait timer to be initialized")
154+
}
155+
156+
utiltimer.ResetTimer(b.waitTimer, time.Nanosecond)
157+
select {
158+
case <-b.waitTimer.C:
159+
case <-time.After(100 * time.Millisecond):
160+
t.Fatal("expected wait timer to be reusable after cancellation")
161+
}
162+
}

pkg/util/timer/timer.go

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,23 @@
1+
package timer
2+
3+
import "time"
4+
5+
// StopAndDrainTimer stops the timer and drains its channel if a tick was already queued.
6+
func StopAndDrainTimer(timer *time.Timer) {
7+
if timer == nil {
8+
return
9+
}
10+
11+
if !timer.Stop() {
12+
select {
13+
case <-timer.C:
14+
default:
15+
}
16+
}
17+
}
18+
19+
// ResetTimer safely resets timer, handling the required stop+drain sequence first.
20+
func ResetTimer(timer *time.Timer, d time.Duration) {
21+
StopAndDrainTimer(timer)
22+
timer.Reset(d)
23+
}

pkg/util/timer/timer_test.go

Lines changed: 85 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,85 @@
1+
package timer
2+
3+
import (
4+
"testing"
5+
"time"
6+
)
7+
8+
func TestStopAndDrainTimer_NilTimer(t *testing.T) {
9+
// Should not panic on nil timer.
10+
StopAndDrainTimer(nil)
11+
}
12+
13+
func TestStopAndDrainTimer_UnfiredTimer(t *testing.T) {
14+
timer := time.NewTimer(time.Hour)
15+
StopAndDrainTimer(timer)
16+
17+
// Channel should be empty after stop+drain.
18+
select {
19+
case <-timer.C:
20+
t.Fatal("expected timer channel to be drained")
21+
default:
22+
}
23+
}
24+
25+
func TestStopAndDrainTimer_FiredTimer(t *testing.T) {
26+
timer := time.NewTimer(time.Nanosecond)
27+
// Wait for it to fire.
28+
time.Sleep(time.Millisecond)
29+
30+
StopAndDrainTimer(timer)
31+
32+
// Channel should be empty after stop+drain.
33+
select {
34+
case <-timer.C:
35+
t.Fatal("expected timer channel to be drained")
36+
default:
37+
}
38+
}
39+
40+
func TestResetTimer(t *testing.T) {
41+
timer := time.NewTimer(time.Hour)
42+
43+
// Reset to a very short duration.
44+
ResetTimer(timer, time.Nanosecond)
45+
46+
select {
47+
case <-timer.C:
48+
// Expected.
49+
case <-time.After(100 * time.Millisecond):
50+
t.Fatal("expected timer to fire after reset")
51+
}
52+
}
53+
54+
func TestResetTimer_AfterFired(t *testing.T) {
55+
timer := time.NewTimer(time.Nanosecond)
56+
// Wait for it to fire.
57+
time.Sleep(time.Millisecond)
58+
<-timer.C
59+
60+
// Reset after consuming the fired event.
61+
ResetTimer(timer, time.Nanosecond)
62+
63+
select {
64+
case <-timer.C:
65+
// Expected.
66+
case <-time.After(100 * time.Millisecond):
67+
t.Fatal("expected timer to fire after reset")
68+
}
69+
}
70+
71+
func TestResetTimer_MultipleTimes(t *testing.T) {
72+
timer := time.NewTimer(time.Hour)
73+
defer timer.Stop()
74+
75+
for i := 0; i < 10; i++ {
76+
ResetTimer(timer, time.Nanosecond)
77+
78+
select {
79+
case <-timer.C:
80+
// Expected.
81+
case <-time.After(100 * time.Millisecond):
82+
t.Fatalf("iteration %d: expected timer to fire after reset", i)
83+
}
84+
}
85+
}

0 commit comments

Comments
 (0)