-
-
Notifications
You must be signed in to change notification settings - Fork 96
Fix persistent subscription failure handler never being invoked #546
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Merged
+183
−39
Merged
Changes from all commits
Commits
Show all changes
6 commits
Select commit
Hold shift + click to select a range
3356010
Fix persistent subscription failure handler never being invoked
alexeyzimarev 40756d6
Bump OpenTelemetry to 1.15.x and switch preview pushes to nuget.org
alexeyzimarev 0c81f95
Switch release pipeline to NuGet trusted publishing and drop MyGet
alexeyzimarev ff64448
Tidy NuGet push steps in preview/publish workflows
alexeyzimarev 7b8e35b
Address review feedback on PR #546
alexeyzimarev 8f20989
Make SpyglassRegistry thread-safe
alexeyzimarev File filter
Filter by extension
Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
There are no files selected for viewing
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
111 changes: 111 additions & 0 deletions
111
...rentDB/test/Eventuous.Tests.KurrentDB/Subscriptions/PersistentSubscriptionFailureTests.cs
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,111 @@ | ||
| using Eventuous.KurrentDB.Subscriptions; | ||
| using Eventuous.Producers; | ||
| using Eventuous.Subscriptions.Context; | ||
| using Eventuous.Subscriptions.Filters; | ||
| using Eventuous.Tests.KurrentDB.Subscriptions.Fixtures; | ||
| using Eventuous.Tests.Subscriptions.Base; | ||
| using KurrentDB.Client; | ||
|
|
||
| namespace Eventuous.Tests.KurrentDB.Subscriptions; | ||
|
|
||
| public class PersistentSubscriptionFailureTests { | ||
| [Test] | ||
| [Category("Persistent subscription")] | ||
| public async Task Esdb_ShouldParkMessageWhenHandlerFails(CancellationToken cancellationToken) { | ||
| var fixture = new PersistentSubscriptionFixture<StreamPersistentSubscription, StreamPersistentSubscriptionOptions, AlwaysFailingHandler>( | ||
| new(), | ||
| CreateSubscription, | ||
| autoStart: false | ||
| ); | ||
|
|
||
| await fixture.InitializeAsync(); | ||
| var started = false; | ||
|
|
||
| try { | ||
| await fixture.Start(); | ||
| started = true; | ||
|
|
||
| var testEvent = TestEvent.Create(); | ||
| await fixture.Producer.Produce(fixture.Stream, testEvent, new(), cancellationToken: cancellationToken); | ||
|
|
||
| var parkedStream = $"$persistentsubscription-{fixture.Stream}::{fixture.SubscriptionId}-parked"; | ||
| var parked = await ReadFirstParkedEvent(fixture.Client, parkedStream, TimeSpan.FromSeconds(20), cancellationToken) | ||
| ?? throw new TimeoutException($"No event was parked on {parkedStream} within the timeout"); | ||
|
|
||
| await Assert.That(parked.Event.EventStreamId).IsEqualTo(fixture.Stream.ToString()); | ||
| await Assert.That(parked.Event.EventType).IsEqualTo(TestEvent.TypeName); | ||
| await Assert.That(fixture.Handler.Failures).IsGreaterThan(0); | ||
| } finally { | ||
| // Fixture only auto-stops when autoStart is true, so stop explicitly to avoid leaking the subscription. | ||
| if (started) await fixture.Stop(); | ||
| await fixture.DisposeAsync(); | ||
| } | ||
| } | ||
|
|
||
| static StreamPersistentSubscription CreateSubscription(string id, string connectionString, StreamName stream, AlwaysFailingHandler handler, ILoggerFactory loggerFactory) { | ||
| var settings = KurrentDBClientSettings.Create(connectionString); | ||
|
|
||
| return new( | ||
| new KurrentDBClient(settings), | ||
| new() { | ||
| StreamName = stream, | ||
| SubscriptionId = id, | ||
| ThrowOnError = true, | ||
| SubscriptionSettings = new PersistentSubscriptionSettings( | ||
| resolveLinkTos: false, | ||
| messageTimeout: TimeSpan.FromSeconds(2), | ||
| maxRetryCount: 0 | ||
| ) | ||
| }, | ||
| new ConsumePipe().AddDefaultConsumer(handler), | ||
| loggerFactory | ||
| ); | ||
| } | ||
|
|
||
| static async Task<ResolvedEvent?> ReadFirstParkedEvent(KurrentDBClient client, string parkedStream, TimeSpan timeout, CancellationToken cancellationToken) { | ||
| using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken); | ||
| cts.CancelAfter(timeout); | ||
|
|
||
| while (!cts.Token.IsCancellationRequested) { | ||
| try { | ||
| var read = client.ReadStreamAsync( | ||
| Direction.Forwards, | ||
| parkedStream, | ||
| StreamPosition.Start, | ||
| maxCount: 1, | ||
| resolveLinkTos: true, | ||
| cancellationToken: cts.Token | ||
| ); | ||
| var state = await read.ReadState; | ||
|
|
||
| if (state == ReadState.Ok) { | ||
| await foreach (var resolved in read) { | ||
| return resolved; | ||
| } | ||
| } | ||
| } catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) { | ||
| return null; | ||
| } | ||
|
|
||
| try { | ||
| await Task.Delay(200, cts.Token); | ||
| } catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) { | ||
| return null; | ||
| } | ||
| } | ||
|
|
||
| return null; | ||
| } | ||
| } | ||
|
|
||
| public class AlwaysFailingHandler : BaseEventHandler { | ||
| int _failures; | ||
|
|
||
| public int Failures => Volatile.Read(ref _failures); | ||
|
|
||
| public override ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context) { | ||
| Interlocked.Increment(ref _failures); | ||
|
|
||
| throw new InvalidOperationException("Simulated handler failure"); | ||
| } | ||
| } | ||
Oops, something went wrong.
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This test explicitly starts a persistent subscription with
autoStart: false, but thefinallyblock only callsDisposeAsync(). In this fixture,DisposeAsync()only invokesStop()whenautoStartis true, so the subscription remains active after the test. In longer test runs this can leak active consumers/connections and introduce cross-test flakiness; stop the subscription explicitly before disposing.Useful? React with 👍 / 👎.