Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions adapter/sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,18 @@ func (s *SQSServer) Run() error {
// request hot path never pays the O(N) sweep cost. Cleaned up by
// the same reaperCtx cancellation that stops the message reaper.
go s.throttle.runSweepLoop(s.reaperCtx)
if s.listen == nil {
// Listenless mode: the SQS adapter was constructed without a
// public HTTP listener (--sqsAddress empty). Admin endpoints
// in adapter/sqs_admin.go still work because they go through
// the coordinator/store — only the SigV4 wire surface is
// suppressed. Block until Stop() cancels reaperCtx so the
// errgroup task lifetime matches the listening branch
// (callers wait on Run() returning to know it's safe to tear
// down the underlying coordinator/store).
<-s.reaperCtx.Done()
return nil
}
if err := s.httpServer.Serve(s.listen); err != nil && !errors.Is(err, http.ErrServerClosed) {
return errors.WithStack(err)
}
Expand Down
30 changes: 30 additions & 0 deletions adapter/sqs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -205,3 +205,33 @@ func TestSQSServer_StopShutsDown(t *testing.T) {
t.Fatal("Run did not return within timeout after Stop")
}
}

// TestSQSServer_ListenlessRunStopRoundTrip pins the listenless
// construction path used by startSQSServer when --sqsAddress is
// empty: NewSQSServer must accept a nil net.Listener, Run() must
// return cleanly when Stop() cancels the reaper context, and the
// reaper / throttle-sweep goroutines must wind down with the same
// cancellation. Regression guard for the admin-only deployment
// shape — without this branch the SQS admin endpoints would be
// dark on a build whose operator deliberately closed the public
// SigV4 surface.
func TestSQSServer_ListenlessRunStopRoundTrip(t *testing.T) {
t.Parallel()
srv := NewSQSServer(nil, nil, nil)
done := make(chan error, 1)
go func() { done <- srv.Run() }()
// Give Run() a tick to enter the reaperCtx.Done() block. Unlike
// the listening path there's no socket Accept loop to enter, so
// a shorter pause is enough.
time.Sleep(20 * time.Millisecond)
srv.Stop()

select {
case err := <-done:
if err != nil {
t.Fatalf("run: %v", err)
}
case <-time.After(2 * time.Second):
t.Fatal("listenless Run did not return within timeout after Stop")
}
}
8 changes: 5 additions & 3 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -1469,9 +1469,11 @@ type runtimeServerRunner struct {
s3Server *adapter.S3Server

// sqsServer plays the same role for the SQS admin entrypoints
// (adapter/sqs_admin.go). Nil when --sqsAddress is empty; the
// admin listener then leaves /admin/api/v1/sqs/* off the wire
// (the mux 404s those paths).
// (adapter/sqs_admin.go). Always non-nil after startup —
// startSQSServer constructs a listenless SQSServer when
// --sqsAddress is empty (the public SigV4 listener is
// suppressed but the admin bridge stays wired since the admin
// handlers only need the coordinator/store, not the listener).
sqsServer *adapter.SQSServer

// sqsPartitionResolver is the concrete pointer to the same
Expand Down
33 changes: 21 additions & 12 deletions main_sqs.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,11 +11,17 @@ import (
"golang.org/x/sync/errgroup"
)

// startSQSServer stands up the SQS adapter on sqsAddr and returns the
// running *adapter.SQSServer so the admin listener can call SigV4-bypass
// admin entrypoints against it (see adapter/sqs_admin.go). Returns
// (nil, nil) when sqsAddr is empty — that is the "SQS disabled" branch
// and the admin listener leaves /admin/api/v1/sqs/* off the wire.
// startSQSServer constructs the SQS adapter and returns the running
// *adapter.SQSServer. The admin listener calls SigV4-bypass admin
// entrypoints against this server (see adapter/sqs_admin.go); those
// admin methods only need the coordinator/store, NOT the public SQS
// HTTP listener. So when sqsAddr is empty the function still
// constructs the server (with a nil net.Listener) — Run() then skips
// httpServer.Serve while the reaper and throttle-sweep goroutines
// still run, keeping retention math behind the admin counters
// correct. The admin bridge in main_admin.go therefore wires
// /admin/api/v1/sqs/* on the wire even on builds that disabled the
// public SigV4 endpoint.
func startSQSServer(
ctx context.Context,
lc *net.ListenConfig,
Expand All @@ -30,16 +36,19 @@ func startSQSServer(
partitionObserver adapter.SQSPartitionObserver,
) (*adapter.SQSServer, error) {
sqsAddr = strings.TrimSpace(sqsAddr)
if sqsAddr == "" {
return nil, nil
}
sqsL, err := lc.Listen(ctx, "tcp", sqsAddr)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen on %s", sqsAddr)
var sqsL net.Listener
if sqsAddr != "" {
var err error
sqsL, err = lc.Listen(ctx, "tcp", sqsAddr)
if err != nil {
return nil, errors.Wrapf(err, "failed to listen on %s", sqsAddr)
}
}
staticCreds, err := loadSigV4StaticCredentialsFile(credentialsFile, "sqs")
if err != nil {
_ = sqsL.Close()
if sqsL != nil {
_ = sqsL.Close()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

The error from sqsL.Close() is silently ignored. According to the general rules, errors from Close() methods on resources like network listeners should be logged to ensure that resource leaks or other cleanup problems are visible to operators.

References
  1. Do not silently ignore errors from Close() methods on resources like network connections. Log them to ensure resource leaks or other cleanup problems are visible.

}
Comment on lines 47 to +51
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Skip SQS credentials validation in listenless mode

When --sqsAddress is empty, this function still parses --sqsCredentialsFile and returns an error on any file issue before constructing the listenless server. That makes admin-only SQS deployments fail to start if the credentials path is stale/malformed, even though admin queue handlers do not use SigV4 credentials. This is a behavior regression from the previous "SQS disabled" path and can unintentionally keep /admin/api/v1/sqs/* unavailable in the exact mode this change is meant to support.

Useful? React with 👍 / 👎.

return nil, err
}
sqsServer := adapter.NewSQSServer(
Expand Down
Loading