Skip to content

feat(PubSub): Add NackImmediately and WaitForProcessing shutdown mode#15482

Closed
robertvoinescu-work wants to merge 5 commits intogoogleapis:mainfrom
robertvoinescu-work:pubsub/shutdownOptions
Closed

feat(PubSub): Add NackImmediately and WaitForProcessing shutdown mode#15482
robertvoinescu-work wants to merge 5 commits intogoogleapis:mainfrom
robertvoinescu-work:pubsub/shutdownOptions

Conversation

@robertvoinescu-work
Copy link
Copy Markdown
Contributor

b/427314526

@robertvoinescu-work robertvoinescu-work requested a review from a team as a code owner February 24, 2026 16:53
@product-auto-label product-auto-label Bot added the api: pubsub Issues related to the Pub/Sub API. label Feb 24, 2026
@gemini-code-assist
Copy link
Copy Markdown
Contributor

Summary of Changes

Hello @robertvoinescu-work, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed!

This pull request significantly improves the SubscriberClient shutdown mechanism by introducing explicit shutdown settings: NackImmediately and WaitForProcessing. These settings provide developers with greater control over how messages are handled when a subscriber is stopped, allowing for either immediate negative acknowledgment of unhandled messages or a graceful period to complete processing before Nacking. This change enhances the reliability and predictability of subscriber termination, ensuring messages are managed appropriately based on application requirements.

Highlights

  • New Subscriber Shutdown Modes: Introduced 'NackImmediately' and 'WaitForProcessing' settings for SubscriberClient shutdown, offering fine-grained control over message handling during termination.
  • Enhanced StopAsync Method: Added a new StopAsync overload that accepts SubscriberShutdownSetting, an optional timeout, and a cancellation token, replacing previous StopAsync methods which are now marked obsolete.
  • Graceful Shutdown Logic: Implemented logic for 'WaitForProcessing' mode, which attempts to process all in-flight messages within a timeout, with a fallback to 'NackImmediately' if the timeout is approached.
  • Updated Test Coverage: Extensive unit and integration tests were added or modified to validate the behavior of the new shutdown settings, including scenarios for immediate Nack, waiting for processing, timeout handling, and cancellation.
  • Documentation Update: The docs/index.md file was updated to explain the new subscriber shutdown options and their usage.
Changelog
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.IntegrationTests/PubSubClientTest.cs
    • Updated StopAsync calls in integration tests to use the new SubscriberClient.SubscriberShutdownSetting.NackImmediately for consistent shutdown behavior.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.OrderingKeyTester/Program.cs
    • Modified the StopAsync call to explicitly use SubscriberClient.SubscriberShutdownSetting.NackImmediately for shutdown.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberClientSnippets.cs
    • Updated StopAsync in snippets to demonstrate the use of SubscriberClient.SubscriberShutdownSetting.NackImmediately.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Snippets/SubscriberServiceApiClientSnippets.cs
    • Changed StopAsync call to use SubscriberClient.SubscriberShutdownSetting.NackImmediately in the sample code.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1.Tests/SubscriberClientTest.cs
    • Removed System.Text.RegularExpressions import.
    • Added a using alias for SubscriberShutdownSetting.
    • Renamed several existing test methods to include _Obsolete suffix and introduced new test methods to cover the new SubscriberShutdownSetting enum.
    • Updated StopAsync calls across numerous tests to incorporate the new SubscriberShutdownSetting parameter.
    • Added new dedicated tests for various shutdown scenarios, including Shutdown_SoftStop_NacksMessages, Shutdown_NackImmediately_Success, Shutdown_WaitForProcessing_CompletesBeforeNack, Shutdown_WaitForProcessing_NacksOnTimeout, Shutdown_CancellationToken_AbortsWaitForProcessing, Shutdown_WaitForProcessing_NacksWhenTimeoutLessThanMinimum, and Shutdown_NackImmediately_LeaseExtensionStops.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs
    • Defined a new SubscriberShutdownSetting enum with WaitForProcessing and NackImmediately options.
    • Marked the existing StopAsync(CancellationToken) and StopAsync(TimeSpan) methods as obsolete.
    • Introduced a new public virtual Task StopAsync(SubscriberShutdownSetting shutdownSetting, TimeSpan? timeout = null, CancellationToken cancellationToken = default) method.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.SingleChannel.cs
    • Added new CancellationTokenSource fields (_nackImmediatelyCts, _waitForProcessingCts, _jointNackImmediatelyOrSoftStopCts) to manage different shutdown phases.
    • Modified internal logic in StartAsync, StartStreamingPull, HandlePullMoveNext, ProcessPullMessagesAsync, HandleExtendLease, and HandleStreamPing to respect the new shutdown cancellation tokens.
    • Updated the Nack helper method to accept leaseTracking for more precise message management during shutdown.
    • Changed LeaseCancellation constructor to accept a leaseStopCts.
  • apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs
    • Added s_nackGracePeriod constant and _isStopInitiated property.
    • Introduced _globalWaitForProcessingCts and _globalNackImmediatelyCts as new global cancellation token sources.
    • Modified the StartAsync method to initialize these new cancellation token sources.
    • Updated DisposeAsync to call the new StopAsync overload.
    • Refactored the StopAsync implementation to handle the new SubscriberShutdownSetting, including logic for CancelAfterDelay and CalculateNackDelay to manage the shutdown timeline.
    • Added a CancelTargetOnTrigger helper method for managing cancellation token registrations.
  • apis/Google.Cloud.PubSub.V1/docs/index.md
    • Added a new section detailing the Subscriber shutdown process, explaining the NackImmediately and WaitForProcessing settings, their behavior, and configurable timeouts.
Activity
  • The pull request was created by robertvoinescu-work.
  • It introduces a significant feature to the Pub/Sub client library, enhancing control over subscriber shutdown.
  • The changes span across core logic, testing, and documentation, indicating a comprehensive implementation.
Using Gemini Code Assist

The full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips.

Invoking Gemini

You can request assistance from Gemini at any point by creating a comment using either /gemini <command> or @gemini-code-assist <command>. Below is a summary of the supported commands on the current page.

Feature Command Description
Code Review /gemini review Performs a code review for the current pull request in its current state.
Pull Request Summary /gemini summary Provides a summary of the current pull request in its current state.
Comment @gemini-code-assist Responds in comments when explicitly tagged, both in pull request comments and review comments.
Help /gemini help Displays a list of available commands.

Customization

To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a .gemini/ folder in the base of the repository. Detailed instructions can be found here.

Limitations & Feedback

Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here.

You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension.

Footnotes

  1. Review the Privacy Notices, Generative AI Prohibited Use Policy, Terms of Service, and learn how to configure Gemini Code Assist in GitHub here. Gemini can make mistakes, so double check it and use code with caution.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

This pull request introduces two new shutdown modes for SubscriberClient: NackImmediately and WaitForProcessing, providing users more control over handling outstanding messages during shutdown. A security audit found no security vulnerabilities in the changes related to the graceful shutdown process. However, there is a potential memory leak issue related to orphaned delay tasks during shutdown that should be addressed.

Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Copy link
Copy Markdown
Contributor

@amanda-tarafa amanda-tarafa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed some of this, but I have some questions. And also a lot of my comments are about rewording, etc. Let's chat tomorrow.

Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClient.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
@robertvoinescu-work robertvoinescu-work force-pushed the pubsub/shutdownOptions branch 4 times, most recently from 331fb79 to 2a357a0 Compare March 12, 2026 18:48
Copy link
Copy Markdown
Contributor

@amanda-tarafa amanda-tarafa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I have two main change requests:

  • Create the ShutdownOptions type to hold the parameters for shutdown, as future proofing ourselves against breaking changes due to extra parameters.
  • Refine the token setup, where we have shutdown mode tokens and operation tokens. It's almost there now, but can be improved:
    • We have one shutdown token per shutdown mode.
    • We have one operation token per operation that needs to be stopped on shutdown.
    • Operation tokens are linked to the mode tokens, e.g. pull is linked to all mode tokens, but lease is not linked to the wait for processing token.
    • Calling Stop triggers the right mode token as per shutdown configuration.
    • Each operation only checks its own operation token to determine whether it should continue or stop.

Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Comment thread apis/Google.Cloud.PubSub.V1/docs/index.md
@robertvoinescu-work robertvoinescu-work force-pushed the pubsub/shutdownOptions branch 2 times, most recently from 8d40a69 to 7daaa55 Compare March 19, 2026 18:14
Copy link
Copy Markdown
Contributor

@amanda-tarafa amanda-tarafa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've reviewed the first commit only and I have lots of questions. And maybe some proposed renamings/groupings of tokens, other than that it looks good.

Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
@robertvoinescu-work robertvoinescu-work force-pushed the pubsub/shutdownOptions branch 2 times, most recently from 1862658 to 6474dcb Compare March 20, 2026 21:13
Copy link
Copy Markdown
Contributor

@amanda-tarafa amanda-tarafa left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I've left comments one commit at a time, so that's the best way to see them.
I'm fine if the next PR comes with the first and second commit squashed in one. Happy to chat more about these.

@robertvoinescu-work robertvoinescu-work force-pushed the pubsub/shutdownOptions branch 2 times, most recently from bdeb795 to d69101b Compare March 25, 2026 01:46
@@ -167,7 +169,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null)
private readonly RequeueableQueue<string> _nackQueue = new RequeueableQueue<string>();

private int _pushInFlight = 0;
private int _userHandlerInFlight = 0;
private readonly HashSet<string> _messagesInFlight = new HashSet<string>();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Before, this was _userHandlerInFlight which hinted at where messages "were flying to". With just _messagesInFlight we lost that information. I think the previous name was better, or at least a comment.

This can be done in a follow up PR.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before _userHandlerInFlight was used to count the number of user handlers in flight, this is now keeping track of messages currently being handled. I don't think the old name conveys to me that it is now tracking messages. How about _messagesInHandler?

// Stop waiting for data to push.
_pushStopCts.Cancel();
// Stop waiting for data to push and handler tasks.
_waitForHandlerCts.Cancel();
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we need this one here?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If NackAndWait finishes it's processing there might still be Leasing Tasks queued up with empty lease tracking lists, but the LeasHandler won't know to stop and check and see it has no leases to extends. We are then stuck waiting for this to finish before shutdown can be marked as complete.

// Nack messages that will not be sent to handler freeing them for redelivery.
// We continue extending leases for in flight messages.
var messagesNotBeingHandled = leaseTracking.Except(_messagesInFlight);
_nackedOnShutdownIds.UnionWith(messagesNotBeingHandled);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Same comment as before, move this into the Nack method and there, into the lock. _nackedOnShutdownIds needs to be accessed from withing the lock. At the very least it needs to be a concurrent collection.

Copy link
Copy Markdown
Contributor Author

@robertvoinescu-work robertvoinescu-work Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks for clarifying the async flow of this object! I've added a lock on the object itself.

@@ -167,7 +169,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null)
private readonly RequeueableQueue<string> _nackQueue = new RequeueableQueue<string>();

private int _pushInFlight = 0;
private int _userHandlerInFlight = 0;
private readonly HashSet<string> _messagesInFlight = new HashSet<string>();
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Before _userHandlerInFlight was used to count the number of user handlers in flight, this is now keeping track of messages currently being handled. I don't think the old name conveys to me that it is now tracking messages. How about _messagesInHandler?

Comment thread apis/Google.Cloud.PubSub.V1/Google.Cloud.PubSub.V1/SubscriberClientImpl.cs Outdated
Add(_scheduler.Delay(_maxExtensionDuration, cancellation.Token), Next(false, () =>
{
// On cancellation we may have not reached the max extension duration so don't clear the lease
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think this way is cleaner thanks amanda. I've added NAcking here.

// Nack messages that will not be sent to handler freeing them for redelivery.
// We continue extending leases for in flight messages.
var messagesNotBeingHandled = leaseTracking.Except(_messagesInFlight);
_nackedOnShutdownIds.UnionWith(messagesNotBeingHandled);
Copy link
Copy Markdown
Contributor Author

@robertvoinescu-work robertvoinescu-work Mar 26, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done, thanks for clarifying the async flow of this object! I've added a lock on the object itself.

@amanda-tarafa
Copy link
Copy Markdown
Contributor

Closing in favour of #15545 , which adds some minor improvements and changes how ack/nack responses are communicated to calling code during handler to better match requirements.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: pubsub Issues related to the Pub/Sub API.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants