feat(PubSub): Add NackImmediately and WaitForProcessing shutdown mode#15482
feat(PubSub): Add NackImmediately and WaitForProcessing shutdown mode#15482robertvoinescu-work wants to merge 5 commits intogoogleapis:mainfrom
Conversation
Summary of ChangesHello @robertvoinescu-work, I'm Gemini Code Assist1! I'm currently reviewing this pull request and will post my feedback shortly. In the meantime, here's a summary to help you and other reviewers quickly get up to speed! This pull request significantly improves the SubscriberClient shutdown mechanism by introducing explicit shutdown settings: NackImmediately and WaitForProcessing. These settings provide developers with greater control over how messages are handled when a subscriber is stopped, allowing for either immediate negative acknowledgment of unhandled messages or a graceful period to complete processing before Nacking. This change enhances the reliability and predictability of subscriber termination, ensuring messages are managed appropriately based on application requirements. Highlights
Changelog
Activity
Using Gemini Code AssistThe full guide for Gemini Code Assist can be found on our documentation page, here are some quick tips. Invoking Gemini You can request assistance from Gemini at any point by creating a comment using either
Customization To customize Gemini Code Assist for GitHub experience, repository maintainers can create a configuration file and/or provide a custom code review style guide (such as PEP-8 for Python) by creating and adding files to a Limitations & Feedback Gemini Code Assist may make mistakes. Please leave feedback on any instances where its feedback is incorrect or counter productive. You can react with 👍 and 👎 on @gemini-code-assist comments. If you're interested in giving your feedback about your experience with Gemini Code Assist for Github and other Google products, sign up here. You can also get AI-powered code generation, chat, as well as code reviews directly in the IDE at no cost with the Gemini Code Assist IDE Extension. Footnotes
|
There was a problem hiding this comment.
Code Review
This pull request introduces two new shutdown modes for SubscriberClient: NackImmediately and WaitForProcessing, providing users more control over handling outstanding messages during shutdown. A security audit found no security vulnerabilities in the changes related to the graceful shutdown process. However, there is a potential memory leak issue related to orphaned delay tasks during shutdown that should be addressed.
amanda-tarafa
left a comment
There was a problem hiding this comment.
I've reviewed some of this, but I have some questions. And also a lot of my comments are about rewording, etc. Let's chat tomorrow.
331fb79 to
2a357a0
Compare
amanda-tarafa
left a comment
There was a problem hiding this comment.
I have two main change requests:
- Create the ShutdownOptions type to hold the parameters for shutdown, as future proofing ourselves against breaking changes due to extra parameters.
- Refine the token setup, where we have shutdown mode tokens and operation tokens. It's almost there now, but can be improved:
- We have one shutdown token per shutdown mode.
- We have one operation token per operation that needs to be stopped on shutdown.
- Operation tokens are linked to the mode tokens, e.g. pull is linked to all mode tokens, but lease is not linked to the wait for processing token.
- Calling Stop triggers the right mode token as per shutdown configuration.
- Each operation only checks its own operation token to determine whether it should continue or stop.
8d40a69 to
7daaa55
Compare
amanda-tarafa
left a comment
There was a problem hiding this comment.
I've reviewed the first commit only and I have lots of questions. And maybe some proposed renamings/groupings of tokens, other than that it looks good.
1862658 to
6474dcb
Compare
amanda-tarafa
left a comment
There was a problem hiding this comment.
I've left comments one commit at a time, so that's the best way to see them.
I'm fine if the next PR comes with the first and second commit squashed in one. Happy to chat more about these.
bdeb795 to
d69101b
Compare
d69101b to
c7f0c9e
Compare
| @@ -167,7 +169,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) | |||
| private readonly RequeueableQueue<string> _nackQueue = new RequeueableQueue<string>(); | |||
|
|
|||
| private int _pushInFlight = 0; | |||
| private int _userHandlerInFlight = 0; | |||
| private readonly HashSet<string> _messagesInFlight = new HashSet<string>(); | |||
There was a problem hiding this comment.
nit: Before, this was _userHandlerInFlight which hinted at where messages "were flying to". With just _messagesInFlight we lost that information. I think the previous name was better, or at least a comment.
This can be done in a follow up PR.
There was a problem hiding this comment.
Before _userHandlerInFlight was used to count the number of user handlers in flight, this is now keeping track of messages currently being handled. I don't think the old name conveys to me that it is now tracking messages. How about _messagesInHandler?
| // Stop waiting for data to push. | ||
| _pushStopCts.Cancel(); | ||
| // Stop waiting for data to push and handler tasks. | ||
| _waitForHandlerCts.Cancel(); |
There was a problem hiding this comment.
Why do we need this one here?
There was a problem hiding this comment.
If NackAndWait finishes it's processing there might still be Leasing Tasks queued up with empty lease tracking lists, but the LeasHandler won't know to stop and check and see it has no leases to extends. We are then stuck waiting for this to finish before shutdown can be marked as complete.
| // Nack messages that will not be sent to handler freeing them for redelivery. | ||
| // We continue extending leases for in flight messages. | ||
| var messagesNotBeingHandled = leaseTracking.Except(_messagesInFlight); | ||
| _nackedOnShutdownIds.UnionWith(messagesNotBeingHandled); |
There was a problem hiding this comment.
Same comment as before, move this into the Nack method and there, into the lock. _nackedOnShutdownIds needs to be accessed from withing the lock. At the very least it needs to be a concurrent collection.
There was a problem hiding this comment.
Done, thanks for clarifying the async flow of this object! I've added a lock on the object itself.
c7f0c9e to
6508fda
Compare
6508fda to
73fccea
Compare
| @@ -167,7 +169,7 @@ internal RetryInfo(DateTime firstTimeOfFailureInUtc, TimeSpan? backoff = null) | |||
| private readonly RequeueableQueue<string> _nackQueue = new RequeueableQueue<string>(); | |||
|
|
|||
| private int _pushInFlight = 0; | |||
| private int _userHandlerInFlight = 0; | |||
| private readonly HashSet<string> _messagesInFlight = new HashSet<string>(); | |||
There was a problem hiding this comment.
Before _userHandlerInFlight was used to count the number of user handlers in flight, this is now keeping track of messages currently being handled. I don't think the old name conveys to me that it is now tracking messages. How about _messagesInHandler?
| Add(_scheduler.Delay(_maxExtensionDuration, cancellation.Token), Next(false, () => | ||
| { | ||
| // On cancellation we may have not reached the max extension duration so don't clear the lease |
There was a problem hiding this comment.
I think this way is cleaner thanks amanda. I've added NAcking here.
| // Nack messages that will not be sent to handler freeing them for redelivery. | ||
| // We continue extending leases for in flight messages. | ||
| var messagesNotBeingHandled = leaseTracking.Except(_messagesInFlight); | ||
| _nackedOnShutdownIds.UnionWith(messagesNotBeingHandled); |
There was a problem hiding this comment.
Done, thanks for clarifying the async flow of this object! I've added a lock on the object itself.
|
Closing in favour of #15545 , which adds some minor improvements and changes how ack/nack responses are communicated to calling code during handler to better match requirements. |
b/427314526