Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 49 additions & 1 deletion controller/internal/service/controller_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,15 @@ import (
"google.golang.org/protobuf/proto"
)

// listenQueueCleanupDelay is how long to keep a listen queue alive after the
// exporter's stream disconnects with a transient error. It must be long enough
// for the exporter to reconnect and consume any Dial token already buffered in
// the queue, yet short enough to bound the memory overhead of orphaned queues.
// The exporter's default retry window (5 retries × ~1 s backoff) is well under
// 30 s, so 2 minutes is a conservative upper bound.
// Exposed as a var so tests can shorten it without rebuilding.
var listenQueueCleanupDelay = 2 * time.Minute
Comment thread
raballew marked this conversation as resolved.

// ControllerService exposes a gRPC service
type ControllerService struct {
pb.UnimplementedControllerServiceServer
Expand All @@ -79,7 +88,8 @@ type ControllerService struct {
Attr authorization.ContextAttributesGetter
ServerOptions []grpc.ServerOption
Router config.Router
listenQueues sync.Map
listenQueues sync.Map // key: leaseName, value: chan *pb.ListenResponse
listenTimers sync.Map // key: leaseName, value: *time.Timer -- deferred cleanup after transient disconnect
}

type wrappedStream struct {
Expand Down Expand Up @@ -439,13 +449,51 @@ func (s *ControllerService) Listen(req *pb.ListenRequest, stream pb.ControllerSe
return err
}

// Cancel any pending cleanup timer -- the exporter is reconnecting and
// should inherit the existing queue (which may hold a buffered Dial token).
if t, ok := s.listenTimers.LoadAndDelete(leaseName); ok {
t.(*time.Timer).Stop()
}

queue, _ := s.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8))
for {
select {
case <-ctx.Done():
// Clean shutdown (lease ended / server stopping): cancel any timer
// and remove the queue immediately.
if t, ok := s.listenTimers.LoadAndDelete(leaseName); ok {
t.(*time.Timer).Stop()
}
s.listenQueues.Delete(leaseName)
return nil
case msg := <-queue.(chan *pb.ListenResponse):
if err := stream.Send(msg); err != nil {
Comment thread
raballew marked this conversation as resolved.
// Transient stream error: schedule deferred cleanup so a
// reconnecting exporter can still inherit the queue and consume
// any Dial token that was buffered between the error and now.
// listenQueueCleanupDelay gives the exporter time to reconnect.
//
// Known limitation: there is a narrow race at timer expiry --
// if Dial() calls LoadOrStore and obtains the queue just before
// this callback deletes it, the buffered token is lost. The
// window is microseconds wide and only opens after
// listenQueueCleanupDelay has fully elapsed (i.e. the exporter
// has been gone for 2+ minutes), at which point the lease is
// typically being reclaimed anyway. Replacing sync.Map with a
// mutex-protected map would close this race by holding the lock
// across both the timer check and the queue access.
if old, ok := s.listenTimers.LoadAndDelete(leaseName); ok {
old.(*time.Timer).Stop()
}
t := time.AfterFunc(listenQueueCleanupDelay, func() {
if q, ok := s.listenQueues.LoadAndDelete(leaseName); ok {
logger.Info("listen queue cleanup timer fired",
"lease", leaseName,
"bufferedTokens", len(q.(chan *pb.ListenResponse)))
}
s.listenTimers.Delete(leaseName)
})
Comment thread
raballew marked this conversation as resolved.
Comment on lines +488 to +495
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] When the timer fires, up to 8 buffered Dial tokens in the channel are silently dropped. Adding a log line that includes len(queue) when a non-empty queue is deleted would make it much easier to notice lost tokens in production logs.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agreed -- implemented in c239709. The timer callback now uses LoadAndDelete and logs a logger.Info with the lease name and bufferedTokens count (len(queue)) when a non-empty queue is deleted. The logger from the enclosing Listen() scope is captured by the closure.

s.listenTimers.Store(leaseName, t)
Comment thread
raballew marked this conversation as resolved.
Comment on lines +488 to +496
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] When multiple Send() errors fire in rapid succession, time.AfterFunc creates a new timer and Store overwrites the previous map entry without calling Stop() on the old one. The old timer still fires and deletes the queue, so the earliest timer wins rather than the latest. Double-delete on sync.Map is safe, but the cleanup window silently shrinks.

Suggested fix: load and stop the existing timer before creating the new one:

if old, ok := s.listenTimers.LoadAndDelete(leaseName); ok {
    old.(*time.Timer).Stop()
}
t := time.AfterFunc(listenQueueCleanupDelay, func() { ... })
s.listenTimers.Store(leaseName, t)

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch. Fixed in c239709 -- the error path now calls LoadAndDelete + Stop() on any existing timer before creating the new one. This ensures the latest timer always governs the cleanup window.

return err
}
}
Expand Down
146 changes: 146 additions & 0 deletions controller/internal/service/controller_service_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package service

import (
"testing"
"time"

jumpstarterdevv1alpha1 "github.com/jumpstarter-dev/jumpstarter-controller/api/v1alpha1"
pb "github.com/jumpstarter-dev/jumpstarter-controller/internal/protocol/jumpstarter/v1"
Expand Down Expand Up @@ -299,6 +300,151 @@ func TestSyncOnlineConditionWithStatus(t *testing.T) {
}
}

// TestListenQueueTimerCleanup verifies that the listen queue is NOT removed
// immediately when a transient stream error occurs, giving a reconnecting
// exporter time to inherit the queue (and any buffered Dial token), and that
// the queue IS removed once the cleanup timer fires.
func TestListenQueueTimerCleanup(t *testing.T) {
original := listenQueueCleanupDelay
t.Cleanup(func() { listenQueueCleanupDelay = original })

svc := &ControllerService{}
leaseName := "test-lease"

// Seed the queue as Listen() would via LoadOrStore.
ch := make(chan *pb.ListenResponse, 8)
svc.listenQueues.Store(leaseName, ch)

// Use a long delay for the first two subtests so the timer cannot fire
// between sequential subtests under CI load (fixes flake when >50ms
// elapses between subtest boundaries).
listenQueueCleanupDelay = 10 * time.Second

// Simulate the stream-error path: schedule deferred cleanup.
t.Run("queue survives transient error", func(t *testing.T) {
timer := time.AfterFunc(listenQueueCleanupDelay, func() {
svc.listenQueues.Delete(leaseName)
svc.listenTimers.Delete(leaseName)
})
Comment on lines +307 to +328
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[medium] These tests directly replicate the production timer logic (time.AfterFunc + Delete callbacks) rather than exercising the actual Listen() code path. For example, the timer callback on lines 322-325 is a verbatim copy of the production callback. If the production logic changes, these tests would still pass.

This is understandable -- calling Listen() directly would require substantial mock infrastructure (gRPC stream, K8s client, authentication). But it does mean a refactoring bug in Listen() could go undetected by these tests.

Consider extracting the timer-scheduling and cleanup logic into a small, testable helper method on ControllerService that both Listen() and the tests can call. Alternatively, a follow-up integration test that exercises Listen() through a mock gRPC stream would close the gap.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fair point about the tests replicating the timer callback verbatim. However, extracting the timer-scheduling logic into a separate helper would meaningfully increase the scope of this fix -- the current patch is intentionally minimal (a targeted fix for a production issue where orphaned queues accumulate after transient exporter disconnects).

A follow-up PR that refactors the timer logic into a testable helper and adds integration-style tests (with a mock gRPC stream) would be the right way to close this gap without bloating the fix itself. Happy to file an issue to track that.

svc.listenTimers.Store(leaseName, timer)

// Queue must still be present immediately after the error.
if _, ok := svc.listenQueues.Load(leaseName); !ok {
t.Fatal("listen queue was removed immediately after stream error -- Dial token would be lost")
}
})

t.Run("buffered token survives disconnect and is readable after reconnect", func(t *testing.T) {
// Write a Dial token into the queue while the exporter is disconnected.
token := &pb.ListenResponse{}
ch <- token

// Simulate Listen() reconnect: cancel the timer and call LoadOrStore.
if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok {
raw.(*time.Timer).Stop()
}
got, _ := svc.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8))
inherited := got.(chan *pb.ListenResponse)
if inherited != ch {
t.Fatal("reconnecting Listen() did not inherit the existing queue")
}

// Read the token back -- this is the primary scenario this fix targets.
select {
case msg := <-inherited:
if msg != token {
t.Fatal("token read from inherited queue does not match the one written during disconnect")
}
default:
t.Fatal("no token available in inherited queue -- Dial token was lost")
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] Em-dash character (U+2014) in this test message. Project guidelines prefer -- over non-ASCII punctuation.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in c239709 -- replaced all em-dashes with -- in both the test file and production code.

})
Comment thread
raballew marked this conversation as resolved.

t.Run("reconnecting exporter cancels cleanup timer", func(t *testing.T) {
// Re-arm a timer to simulate another transient error.
timer := time.AfterFunc(listenQueueCleanupDelay, func() {
svc.listenQueues.Delete(leaseName)
svc.listenTimers.Delete(leaseName)
})
svc.listenTimers.Store(leaseName, timer)

// Simulate Listen() reconnect: cancel the timer and call LoadOrStore.
if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok {
raw.(*time.Timer).Stop()
}
got, _ := svc.listenQueues.LoadOrStore(leaseName, make(chan *pb.ListenResponse, 8))
if got != ch {
t.Fatal("reconnecting Listen() did not inherit the existing queue")
}

// Verify the queue is still present -- the stopped timer must not
// have fired.
if _, ok := svc.listenQueues.Load(leaseName); !ok {
t.Fatal("listen queue was removed even though cleanup timer was cancelled")
}
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Low] Same em-dash (U+2014) issue here -- swap to -- per project guidelines.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed in the same commit (c239709).

})

t.Run("timer fires and removes queue when exporter does not reconnect", func(t *testing.T) {
// Shorten the delay so this subtest completes quickly.
listenQueueCleanupDelay = 50 * time.Millisecond

// Use a channel to detect when the timer callback completes,
// avoiding time.Sleep-based flakiness under CI load.
done := make(chan struct{})
timer := time.AfterFunc(listenQueueCleanupDelay, func() {
svc.listenQueues.Delete(leaseName)
svc.listenTimers.Delete(leaseName)
close(done)
})
svc.listenTimers.Store(leaseName, timer)

// Wait for the timer callback to signal completion.
select {
case <-done:
case <-time.After(5 * time.Second):
t.Fatal("cleanup timer did not fire within timeout")
}
if _, ok := svc.listenQueues.Load(leaseName); ok {
t.Fatal("listen queue was not removed after cleanup timer fired")
}
})
}

// TestListenQueueCleanShutdown verifies that a clean context cancellation
// (lease end / server stop) removes the queue immediately without waiting for
// the cleanup timer.
func TestListenQueueCleanShutdown(t *testing.T) {
original := listenQueueCleanupDelay
listenQueueCleanupDelay = 2 * time.Minute // keep long -- must NOT fire during test
t.Cleanup(func() { listenQueueCleanupDelay = original })

svc := &ControllerService{}
leaseName := "test-lease-shutdown"

ch := make(chan *pb.ListenResponse, 8)
svc.listenQueues.Store(leaseName, ch)

// Arm a timer that should be cancelled before it fires.
timer := time.AfterFunc(listenQueueCleanupDelay, func() {
svc.listenQueues.Delete(leaseName)
svc.listenTimers.Delete(leaseName)
})
svc.listenTimers.Store(leaseName, timer)

// Simulate the ctx.Done() path in Listen().
if raw, ok := svc.listenTimers.LoadAndDelete(leaseName); ok {
raw.(*time.Timer).Stop()
}
svc.listenQueues.Delete(leaseName)

if _, ok := svc.listenQueues.Load(leaseName); ok {
t.Fatal("listen queue was not removed on clean shutdown")
}
if _, ok := svc.listenTimers.Load(leaseName); ok {
t.Fatal("cleanup timer was not cancelled on clean shutdown")
}
}
Comment on lines +307 to +446
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] Both TestListenQueueTimerCleanup and TestListenQueueCleanShutdown replicate the timer scheduling, cancellation, and map operations from Listen() inline rather than calling Listen() with a mock gRPC stream. This validates the pattern but not the implementation -- a refactoring bug in the timer path of Listen() would go undetected.

Consider adding at least one test that invokes Listen() directly (e.g., with a mock ControllerService_ListenServer that returns an error on the second Send()) to cover the production code path end-to-end.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This was discussed in the previous review round (see reply to comment r3075337964). The mock gRPC stream infrastructure required to call Listen() directly is substantial -- it needs a mock K8s client, authentication layer, lease objects, and a ControllerService_ListenServer implementation. That level of integration testing would be better suited to a follow-up PR to keep this fix minimal and focused on the production race condition.

The new "buffered token survives disconnect" subtest (added in c239709) does cover the primary user story end-to-end at the sync.Map level, which is where the actual bug lived.

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[Medium] The core scenario this fix targets is that a Dial() token buffered during a transient disconnect survives until the exporter reconnects. Subtest 2 verifies queue inheritance by channel identity, but no test actually writes a token into the queue and reads it back after reconnect.

Adding a subtest that writes a ListenResponse into the channel between the "transient error" and "reconnect" phases, then reads it back from the inherited queue, would directly cover the primary user story.

AI-generated, human reviewed

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great suggestion. Added in c239709 -- new subtest "buffered token survives disconnect and is readable after reconnect" writes a ListenResponse into the channel during the disconnect window, simulates the reconnect path, then reads the token back from the inherited queue and verifies identity.


// contains checks if substr is contained in s
func contains(s, substr string) bool {
return len(s) >= len(substr) && (s == substr || len(substr) == 0 ||
Expand Down
Loading