-
Notifications
You must be signed in to change notification settings - Fork 20
fix: timer-based cleanup of listenQueues after transient exporter disconnect #417
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
0278e4d
fecd007
0d372ec
c239709
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -69,6 +69,15 @@ import ( | |
| "google.golang.org/protobuf/proto" | ||
| ) | ||
|
|
||
| // listenQueueCleanupDelay is how long to keep a listen queue alive after the | ||
| // exporter's stream disconnects with a transient error. It must be long enough | ||
| // for the exporter to reconnect and consume any Dial token already buffered in | ||
| // the queue, yet short enough to bound the memory overhead of orphaned queues. | ||
| // The exporter's default retry window (5 retries × ~1 s backoff) is well under | ||
| // 30 s, so 2 minutes is a conservative upper bound. | ||
| // Exposed as a var so tests can shorten it without rebuilding. | ||
| var listenQueueCleanupDelay = 2 * time.Minute | ||
|
|
||
| // ControllerService exposes a gRPC service | ||
| type ControllerService struct { | ||
| pb.UnimplementedControllerServiceServer | ||
|
|
@@ -79,7 +88,8 @@ type ControllerService struct { | |
| Attr authorization.ContextAttributesGetter | ||
| ServerOptions []grpc.ServerOption | ||
| Router config.Router | ||
| listenQueues sync.Map | ||
| listenQueues sync.Map // key: leaseName, value: chan *pb.ListenResponse | ||
| listenTimers sync.Map // key: leaseName, value: *time.Timer -- deferred cleanup after transient disconnect | ||
| } | ||
|
|
||
| type wrappedStream struct { | ||
|
|
@@ -439,13 +449,51 @@ func (s *ControllerService) Listen(req *pb.ListenRequest, stream pb.ControllerSe | |
| return err | ||
| } | ||
|
|
||
| // Cancel any pending cleanup timer -- the exporter is reconnecting and | ||
| // should inherit the existing queue (which may hold a buffered Dial token). | ||
| if t, ok := s.listenTimers.LoadAndDelete(leaseName); ok { | ||
| t.(*time.Timer).Stop() | ||
| } | ||
|
|
||
| queue, _ := s.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8)) | ||
| for { | ||
| select { | ||
| case <-ctx.Done(): | ||
| // Clean shutdown (lease ended / server stopping): cancel any timer | ||
| // and remove the queue immediately. | ||
| if t, ok := s.listenTimers.LoadAndDelete(leaseName); ok { | ||
| t.(*time.Timer).Stop() | ||
| } | ||
| s.listenQueues.Delete(leaseName) | ||
| return nil | ||
| case msg := <-queue.(chan *pb.ListenResponse): | ||
| if err := stream.Send(msg); err != nil { | ||
|
raballew marked this conversation as resolved.
|
||
| // Transient stream error: schedule deferred cleanup so a | ||
| // reconnecting exporter can still inherit the queue and consume | ||
| // any Dial token that was buffered between the error and now. | ||
| // listenQueueCleanupDelay gives the exporter time to reconnect. | ||
| // | ||
| // Known limitation: there is a narrow race at timer expiry -- | ||
| // if Dial() calls LoadOrStore and obtains the queue just before | ||
| // this callback deletes it, the buffered token is lost. The | ||
| // window is microseconds wide and only opens after | ||
| // listenQueueCleanupDelay has fully elapsed (i.e. the exporter | ||
| // has been gone for 2+ minutes), at which point the lease is | ||
| // typically being reclaimed anyway. Replacing sync.Map with a | ||
| // mutex-protected map would close this race by holding the lock | ||
| // across both the timer check and the queue access. | ||
| if old, ok := s.listenTimers.LoadAndDelete(leaseName); ok { | ||
| old.(*time.Timer).Stop() | ||
| } | ||
| t := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| if q, ok := s.listenQueues.LoadAndDelete(leaseName); ok { | ||
| logger.Info("listen queue cleanup timer fired", | ||
| "lease", leaseName, | ||
| "bufferedTokens", len(q.(chan *pb.ListenResponse))) | ||
| } | ||
| s.listenTimers.Delete(leaseName) | ||
| }) | ||
|
raballew marked this conversation as resolved.
Comment on lines
+488
to
+495
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Low] When the timer fires, up to 8 buffered Dial tokens in the channel are silently dropped. Adding a log line that includes AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Agreed -- implemented in c239709. The timer callback now uses LoadAndDelete and logs a logger.Info with the lease name and bufferedTokens count (len(queue)) when a non-empty queue is deleted. The logger from the enclosing Listen() scope is captured by the closure. |
||
| s.listenTimers.Store(leaseName, t) | ||
|
raballew marked this conversation as resolved.
Comment on lines
+488
to
+496
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] When multiple Suggested fix: load and stop the existing timer before creating the new one: if old, ok := s.listenTimers.LoadAndDelete(leaseName); ok {
old.(*time.Timer).Stop()
}
t := time.AfterFunc(listenQueueCleanupDelay, func() { ... })
s.listenTimers.Store(leaseName, t)AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Good catch. Fixed in c239709 -- the error path now calls |
||
| return err | ||
| } | ||
| } | ||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -18,6 +18,7 @@ package service | |
|
|
||
| import ( | ||
| "testing" | ||
| "time" | ||
|
|
||
| jumpstarterdevv1alpha1 "github.com/jumpstarter-dev/jumpstarter-controller/api/v1alpha1" | ||
| pb "github.com/jumpstarter-dev/jumpstarter-controller/internal/protocol/jumpstarter/v1" | ||
|
|
@@ -299,6 +300,151 @@ func TestSyncOnlineConditionWithStatus(t *testing.T) { | |
| } | ||
| } | ||
|
|
||
| // TestListenQueueTimerCleanup verifies that the listen queue is NOT removed | ||
| // immediately when a transient stream error occurs, giving a reconnecting | ||
| // exporter time to inherit the queue (and any buffered Dial token), and that | ||
| // the queue IS removed once the cleanup timer fires. | ||
| func TestListenQueueTimerCleanup(t *testing.T) { | ||
| original := listenQueueCleanupDelay | ||
| t.Cleanup(func() { listenQueueCleanupDelay = original }) | ||
|
|
||
| svc := &ControllerService{} | ||
| leaseName := "test-lease" | ||
|
|
||
| // Seed the queue as Listen() would via LoadOrStore. | ||
| ch := make(chan *pb.ListenResponse, 8) | ||
| svc.listenQueues.Store(leaseName, ch) | ||
|
|
||
| // Use a long delay for the first two subtests so the timer cannot fire | ||
| // between sequential subtests under CI load (fixes flake when >50ms | ||
| // elapses between subtest boundaries). | ||
| listenQueueCleanupDelay = 10 * time.Second | ||
|
|
||
| // Simulate the stream-error path: schedule deferred cleanup. | ||
| t.Run("queue survives transient error", func(t *testing.T) { | ||
| timer := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| svc.listenQueues.Delete(leaseName) | ||
| svc.listenTimers.Delete(leaseName) | ||
| }) | ||
|
Comment on lines
+307
to
+328
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [medium] These tests directly replicate the production timer logic ( This is understandable -- calling Consider extracting the timer-scheduling and cleanup logic into a small, testable helper method on AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fair point about the tests replicating the timer callback verbatim. However, extracting the timer-scheduling logic into a separate helper would meaningfully increase the scope of this fix -- the current patch is intentionally minimal (a targeted fix for a production issue where orphaned queues accumulate after transient exporter disconnects). A follow-up PR that refactors the timer logic into a testable helper and adds integration-style tests (with a mock gRPC stream) would be the right way to close this gap without bloating the fix itself. Happy to file an issue to track that. |
||
| svc.listenTimers.Store(leaseName, timer) | ||
|
|
||
| // Queue must still be present immediately after the error. | ||
| if _, ok := svc.listenQueues.Load(leaseName); !ok { | ||
| t.Fatal("listen queue was removed immediately after stream error -- Dial token would be lost") | ||
| } | ||
| }) | ||
|
|
||
| t.Run("buffered token survives disconnect and is readable after reconnect", func(t *testing.T) { | ||
| // Write a Dial token into the queue while the exporter is disconnected. | ||
| token := &pb.ListenResponse{} | ||
| ch <- token | ||
|
|
||
| // Simulate Listen() reconnect: cancel the timer and call LoadOrStore. | ||
| if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok { | ||
| raw.(*time.Timer).Stop() | ||
| } | ||
| got, _ := svc.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8)) | ||
| inherited := got.(chan *pb.ListenResponse) | ||
| if inherited != ch { | ||
| t.Fatal("reconnecting Listen() did not inherit the existing queue") | ||
| } | ||
|
|
||
| // Read the token back -- this is the primary scenario this fix targets. | ||
| select { | ||
| case msg := <-inherited: | ||
| if msg != token { | ||
| t.Fatal("token read from inherited queue does not match the one written during disconnect") | ||
| } | ||
| default: | ||
| t.Fatal("no token available in inherited queue -- Dial token was lost") | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Low] Em-dash character (U+2014) in this test message. Project guidelines prefer AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in c239709 -- replaced all em-dashes with |
||
| }) | ||
|
raballew marked this conversation as resolved.
|
||
|
|
||
| t.Run("reconnecting exporter cancels cleanup timer", func(t *testing.T) { | ||
| // Re-arm a timer to simulate another transient error. | ||
| timer := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| svc.listenQueues.Delete(leaseName) | ||
| svc.listenTimers.Delete(leaseName) | ||
| }) | ||
| svc.listenTimers.Store(leaseName, timer) | ||
|
|
||
| // Simulate Listen() reconnect: cancel the timer and call LoadOrStore. | ||
| if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok { | ||
| raw.(*time.Timer).Stop() | ||
| } | ||
| got, _ := svc.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8)) | ||
| if got != ch { | ||
| t.Fatal("reconnecting Listen() did not inherit the existing queue") | ||
| } | ||
|
|
||
| // Verify the queue is still present -- the stopped timer must not | ||
| // have fired. | ||
| if _, ok := svc.listenQueues.Load(leaseName); !ok { | ||
| t.Fatal("listen queue was removed even though cleanup timer was cancelled") | ||
| } | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Low] Same em-dash (U+2014) issue here -- swap to AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Fixed in the same commit (c239709). |
||
| }) | ||
|
|
||
| t.Run("timer fires and removes queue when exporter does not reconnect", func(t *testing.T) { | ||
| // Shorten the delay so this subtest completes quickly. | ||
| listenQueueCleanupDelay = 50 * time.Millisecond | ||
|
|
||
| // Use a channel to detect when the timer callback completes, | ||
| // avoiding time.Sleep-based flakiness under CI load. | ||
| done := make(chan struct{}) | ||
| timer := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| svc.listenQueues.Delete(leaseName) | ||
| svc.listenTimers.Delete(leaseName) | ||
| close(done) | ||
| }) | ||
| svc.listenTimers.Store(leaseName, timer) | ||
|
|
||
| // Wait for the timer callback to signal completion. | ||
| select { | ||
| case <-done: | ||
| case <-time.After(5 * time.Second): | ||
| t.Fatal("cleanup timer did not fire within timeout") | ||
| } | ||
| if _, ok := svc.listenQueues.Load(leaseName); ok { | ||
| t.Fatal("listen queue was not removed after cleanup timer fired") | ||
| } | ||
| }) | ||
| } | ||
|
|
||
| // TestListenQueueCleanShutdown verifies that a clean context cancellation | ||
| // (lease end / server stop) removes the queue immediately without waiting for | ||
| // the cleanup timer. | ||
| func TestListenQueueCleanShutdown(t *testing.T) { | ||
| original := listenQueueCleanupDelay | ||
| listenQueueCleanupDelay = 2 * time.Minute // keep long -- must NOT fire during test | ||
| t.Cleanup(func() { listenQueueCleanupDelay = original }) | ||
|
|
||
| svc := &ControllerService{} | ||
| leaseName := "test-lease-shutdown" | ||
|
|
||
| ch := make(chan *pb.ListenResponse, 8) | ||
| svc.listenQueues.Store(leaseName, ch) | ||
|
|
||
| // Arm a timer that should be cancelled before it fires. | ||
| timer := time.AfterFunc(listenQueueCleanupDelay, func() { | ||
| svc.listenQueues.Delete(leaseName) | ||
| svc.listenTimers.Delete(leaseName) | ||
| }) | ||
| svc.listenTimers.Store(leaseName, timer) | ||
|
|
||
| // Simulate the ctx.Done() path in Listen(). | ||
| if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok { | ||
| raw.(*time.Timer).Stop() | ||
| } | ||
| svc.listenQueues.Delete(leaseName) | ||
|
|
||
| if _, ok := svc.listenQueues.Load(leaseName); ok { | ||
| t.Fatal("listen queue was not removed on clean shutdown") | ||
| } | ||
| if _, ok := svc.listenTimers.Load(leaseName); ok { | ||
| t.Fatal("cleanup timer was not cancelled on clean shutdown") | ||
| } | ||
| } | ||
|
Comment on lines
+307
to
+446
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] Both Consider adding at least one test that invokes AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This was discussed in the previous review round (see reply to comment r3075337964). The mock gRPC stream infrastructure required to call Listen() directly is substantial -- it needs a mock K8s client, authentication layer, lease objects, and a ControllerService_ListenServer implementation. That level of integration testing would be better suited to a follow-up PR to keep this fix minimal and focused on the production race condition. The new "buffered token survives disconnect" subtest (added in c239709) does cover the primary user story end-to-end at the sync.Map level, which is where the actual bug lived.
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. [Medium] The core scenario this fix targets is that a Adding a subtest that writes a AI-generated, human reviewed
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Great suggestion. Added in c239709 -- new subtest "buffered token survives disconnect and is readable after reconnect" writes a ListenResponse into the channel during the disconnect window, simulates the reconnect path, then reads the token back from the inherited queue and verifies identity. |
||
|
|
||
| // contains checks if substr is contained in s | ||
| func contains(s, substr string) bool { | ||
| return len(s) >= len(substr) && (s == substr || len(substr) == 0 || | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.