Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions .github/workflows/preview.yml
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,5 @@ jobs:
-
name: Create and push NuGet package
run: |
dotnet pack -c Debug -o nuget
dotnet nuget push nuget/**/*.nupkg --skip-duplicate --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/${{ vars.MYGET_FEED_NAME || 'eventuous' }}/api/v3/index.json
env:
NUGET_AUTH_TOKEN: ${{ github.token }}
dotnet pack -c Release -o nuget -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg
dotnet nuget push nuget/**/*.nupkg --api-key "${{ secrets.NUGET_API_KEY }}" --source https://api.nuget.org/v3/index.json --skip-duplicate
14 changes: 10 additions & 4 deletions .github/workflows/publish.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,9 @@ on:
jobs:
nuget:
runs-on: [ self-hosted, type-cpx52, setup-docker, volume-cache-50GB ]
permissions:
id-token: write
contents: read
env:
NUGET_PACKAGES: "/mnt/cache/.nuget/packages"
DOTNET_INSTALL_DIR: "/mnt/cache/.dotnet"
Expand Down Expand Up @@ -48,11 +51,14 @@ jobs:
files: |
test-results/**/*.xml
test-results/**/*.trx
-
name: NuGet trusted publishing login
id: nuget-login
uses: NuGet/login@v1
with:
user: ${{ vars.NUGET_USER || 'Eventuous' }}
-
name: Create and push NuGet package
run: |
dotnet pack -c Release -o nuget -p:IncludeSymbols=true -p:SymbolPackageFormat=snupkg
dotnet nuget push nuget/**/*.nupkg --api-key ${{ secrets.NUGET_API_KEY }} --source https://api.nuget.org/v3/index.json --skip-duplicate
dotnet nuget push nuget/**/*.nupkg --api-key ${{ secrets.MYGET_API_KEY }} --source https://www.myget.org/F/${{ vars.MYGET_FEED_NAME || 'eventuous' }}/api/v2/package --skip-duplicate
env:
NUGET_AUTH_TOKEN: ${{ github.token }}
dotnet nuget push nuget/**/*.nupkg --api-key "${{ steps.nuget-login.outputs.NUGET_API_KEY }}" --source https://api.nuget.org/v3/index.json --skip-duplicate
15 changes: 8 additions & 7 deletions Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,8 @@
<PackageVersion Include="System.Diagnostics.DiagnosticSource" Version="$(MicrosoftExtensionsVer)" />
<PackageVersion Include="System.Linq.AsyncEnumerable" Version="10.0.0" />
<PackageVersion Include="System.Reactive" Version="6.0.1" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.13.1" />
<PackageVersion Include="OpenTelemetry.Api" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Extensions.Hosting" Version="1.15.3" />
<PackageVersion Include="MongoDB.Driver" Version="3.4.0" />
<PackageVersion Include="Google.Cloud.PubSub.V1" Version="3.19.0" />
<PackageVersion Include="Confluent.Kafka" Version="2.6.1" />
Expand Down Expand Up @@ -105,12 +106,12 @@
<PackageVersion Include="MongoDB.Driver.Core.Extensions.DiagnosticSources" Version="2.1.0" />
<PackageVersion Include="MongoDB.Driver.Core" Version="2.30.0" />
<PackageVersion Include="Npgsql.OpenTelemetry" Version="$(NpgsqlVersion)" />
<PackageVersion Include="OpenTelemetry" Version="1.13.1" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.13.1" />
<PackageVersion Include="OpenTelemetry.Exporter.Prometheus.AspNetCore" Version="1.13.1-beta.1" />
<PackageVersion Include="OpenTelemetry.Exporter.Zipkin" Version="1.13.1" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.13.0" />
<PackageVersion Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.13.0-beta.1" />
<PackageVersion Include="OpenTelemetry" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Exporter.OpenTelemetryProtocol" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Exporter.Prometheus.AspNetCore" Version="1.15.3-beta.1" />
<PackageVersion Include="OpenTelemetry.Exporter.Zipkin" Version="1.15.3" />
<PackageVersion Include="OpenTelemetry.Instrumentation.AspNetCore" Version="1.15.2" />
<PackageVersion Include="OpenTelemetry.Instrumentation.GrpcNetClient" Version="1.15.1-beta.1" />
<PackageVersion Include="Serilog.AspNetCore" Version="9.0.0" />
<PackageVersion Include="Serilog.Extensions.Hosting" Version="9.0.0" />
<PackageVersion Include="Serilog.Sinks.Console" Version="6.0.0" />
Expand Down
32 changes: 23 additions & 9 deletions src/Experimental/src/Eventuous.Spyglass/SpyglassRegistry.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,20 +25,34 @@ public record SpyglassLoadResult(object State, SpyglassEventInfo[] Events);
public record SpyglassEventInfo(string EventType, object? Payload);

public static class SpyglassRegistry {
static readonly List<SpyglassAggregateInfo> Aggregates = [];

public static void Register(SpyglassAggregateInfo info)
=> Aggregates.Add(info with { Id = Guid.NewGuid() });
// Module initializers from different assemblies can call Register concurrently when the assemblies
// are loaded by parallel test fixtures (or any parallel host startup), so the backing store has to
// be thread-safe. Reads also enumerate the snapshot, so we publish a fresh array on every write.
static SpyglassAggregateInfo[] _aggregates = [];
static readonly object _lock = new();

public static void Register(SpyglassAggregateInfo info) {
lock (_lock) {
var entry = info with { Id = Guid.NewGuid() };
var next = new SpyglassAggregateInfo[_aggregates.Length + 1];
Array.Copy(_aggregates, next, _aggregates.Length);
next[^1] = entry;
_aggregates = next;
}
}

public static SpyglassAggregateEntry[] GetAggregates()
=> Aggregates.Select(a => new SpyglassAggregateEntry(a.Id, a.AggregateType, a.StateType, a.Methods, a.Events)).ToArray();
=> _aggregates.Select(a => new SpyglassAggregateEntry(a.Id, a.AggregateType, a.StateType, a.Methods, a.Events)).ToArray();

public static SpyglassAggregateInfo? FindById(Guid id)
=> Aggregates.FirstOrDefault(x => x.Id == id);
=> Array.Find(_aggregates, x => x.Id == id);

public static SpyglassAggregateInfo? FindByTypeName(string typeName) {
var snapshot = _aggregates;

public static SpyglassAggregateInfo? FindByTypeName(string typeName)
=> Aggregates.FirstOrDefault(x => x.AggregateType == typeName)
?? Aggregates.FirstOrDefault(x => StripStateSuffix(x.StateType) == typeName);
return Array.Find(snapshot, x => x.AggregateType == typeName)
?? Array.Find(snapshot, x => StripStateSuffix(x.StateType) == typeName);
}

static string StripStateSuffix(string s)
=> s.EndsWith("State") && s.Length > 5 ? s[..^5] : s;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
</PropertyGroup>
<ItemGroup>
<PackageReference Include="KurrentDB.Client"/>
<!-- KurrentDB.Client transitively pulls OpenTelemetry.Api 1.12.0 which is vulnerable
(GHSA-g94r-2vxg-569j). Forcing the central pinned version overrides the transitive. -->
<PackageReference Include="OpenTelemetry.Api"/>
</ItemGroup>
<ItemGroup>
<ProjectReference Include="$(CoreRoot)\Eventuous.Producers\Eventuous.Producers.csproj"/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -185,9 +185,11 @@ async ValueTask Nack(MessageConsumeContext ctx, Exception exception) {
return;
}

ctx.LogContext.MessageHandlingFailed(Options.SubscriptionId, ctx, exception);

if (Options.ThrowOnError) throw exception;
// Handler-pipeline failures are already logged via context.Nack inside EventSubscription.Handler.
// Anything else reaching here (e.g. an Ack failure after the handler returned) needs its own log entry.
if (!ctx.HasFailed()) {
ctx.LogContext.MessageHandlingFailed(Options.SubscriptionId, ctx, exception);
}

var re = ctx.Items.GetItem<ResolvedEvent>(ResolvedEventKey);
var subscription = ctx.Items.GetItem<PersistentSubscription>(SubscriptionKey)!;
Expand Down Expand Up @@ -248,6 +250,12 @@ static Task DefaultEventProcessingFailureHandler(
PersistentSubscription subscription,
ResolvedEvent resolvedEvent,
Exception exception
)
=> subscription.Nack(PersistentSubscriptionNakEventAction.Retry, exception.Message, resolvedEvent);
) {
// When ThrowOnError is enabled, Handler wraps the original exception in SubscriptionException;
// unwrap it so the parked-message reason carries the actual handler error rather than a generic
// "Error processing event ..." string.
var cause = exception is SubscriptionException { InnerException: { } inner } ? inner : exception;

return subscription.Nack(PersistentSubscriptionNakEventAction.Retry, cause.Message, resolvedEvent);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@
using Eventuous.KurrentDB.Producers;
using Eventuous.KurrentDB.Subscriptions;
using Eventuous.Tests.Subscriptions.Base;
using KurrentDB.Client;
using LoggingExtensions = Eventuous.TestHelpers.TUnit.Logging.LoggingExtensions;

namespace Eventuous.Tests.KurrentDB.Subscriptions.Fixtures;
Expand All @@ -15,12 +16,14 @@ public class PersistentSubscriptionFixture<TSubscription, TOptions, THandler>(
where THandler : class, IEventHandler
where TSubscription : PersistentSubscriptionBase<TOptions>
where TOptions : PersistentSubscriptionOptions {
public StreamName Stream { get; } = new($"test-{Guid.NewGuid():N}");
public THandler Handler { get; } = handler;
public KurrentDBProducer Producer { get; private set; } = null!;
protected ILogger Log { get; set; } = null!;
protected StoreFixture Fixture { get; } = new(logLevel);
TSubscription Subscription { get; set; } = null!;
public StreamName Stream { get; } = new($"test-{Guid.NewGuid():N}");
public THandler Handler { get; } = handler;
public KurrentDBProducer Producer { get; private set; } = null!;
public string SubscriptionId { get; private set; } = null!;
public KurrentDBClient Client => Fixture.Client;
protected ILogger Log { get; set; } = null!;
protected StoreFixture Fixture { get; } = new(logLevel);
TSubscription Subscription { get; set; } = null!;

public ValueTask Start() => Subscription.SubscribeWithLog(Log);

Expand All @@ -32,13 +35,13 @@ public async ValueTask InitializeAsync() {
Fixture.TypeMapper.RegisterKnownEventTypes(typeof(TestEvent).Assembly);
await Fixture.InitializeAsync();
Producer = new(Fixture.Client);
var loggerFactory = LoggingExtensions.GetLoggerFactory(logLevel);
var subscriptionId = $"test-{Guid.NewGuid():N}";
Log = loggerFactory.CreateLogger(GetType());
var loggerFactory = LoggingExtensions.GetLoggerFactory(logLevel);
SubscriptionId = $"test-{Guid.NewGuid():N}";
Log = loggerFactory.CreateLogger(GetType());

_listener = new(loggerFactory);

Subscription = subscriptionFactory(subscriptionId, Fixture.Container.GetConnectionString(), Stream, Handler, loggerFactory);
Subscription = subscriptionFactory(SubscriptionId, Fixture.Container.GetConnectionString(), Stream, Handler, loggerFactory);
if (autoStart) await Start();
}

Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
using Eventuous.KurrentDB.Subscriptions;
using Eventuous.Producers;
using Eventuous.Subscriptions.Context;
using Eventuous.Subscriptions.Filters;
using Eventuous.Tests.KurrentDB.Subscriptions.Fixtures;
using Eventuous.Tests.Subscriptions.Base;
using KurrentDB.Client;

namespace Eventuous.Tests.KurrentDB.Subscriptions;

public class PersistentSubscriptionFailureTests {
[Test]
[Category("Persistent subscription")]
public async Task Esdb_ShouldParkMessageWhenHandlerFails(CancellationToken cancellationToken) {
var fixture = new PersistentSubscriptionFixture<StreamPersistentSubscription, StreamPersistentSubscriptionOptions, AlwaysFailingHandler>(
new(),
CreateSubscription,
autoStart: false
);

await fixture.InitializeAsync();
var started = false;

try {
await fixture.Start();
started = true;

var testEvent = TestEvent.Create();
await fixture.Producer.Produce(fixture.Stream, testEvent, new(), cancellationToken: cancellationToken);

var parkedStream = $"$persistentsubscription-{fixture.Stream}::{fixture.SubscriptionId}-parked";
var parked = await ReadFirstParkedEvent(fixture.Client, parkedStream, TimeSpan.FromSeconds(20), cancellationToken)
?? throw new TimeoutException($"No event was parked on {parkedStream} within the timeout");

await Assert.That(parked.Event.EventStreamId).IsEqualTo(fixture.Stream.ToString());
await Assert.That(parked.Event.EventType).IsEqualTo(TestEvent.TypeName);
await Assert.That(fixture.Handler.Failures).IsGreaterThan(0);
} finally {
// Fixture only auto-stops when autoStart is true, so stop explicitly to avoid leaking the subscription.
if (started) await fixture.Stop();
await fixture.DisposeAsync();
Comment on lines +38 to +41
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P2 Badge Stop started subscription in teardown

This test explicitly starts a persistent subscription with autoStart: false, but the finally block only calls DisposeAsync(). In this fixture, DisposeAsync() only invokes Stop() when autoStart is true, so the subscription remains active after the test. In longer test runs this can leak active consumers/connections and introduce cross-test flakiness; stop the subscription explicitly before disposing.

Useful? React with 👍 / 👎.

}
}

static StreamPersistentSubscription CreateSubscription(string id, string connectionString, StreamName stream, AlwaysFailingHandler handler, ILoggerFactory loggerFactory) {
var settings = KurrentDBClientSettings.Create(connectionString);

return new(
new KurrentDBClient(settings),
new() {
StreamName = stream,
SubscriptionId = id,
ThrowOnError = true,
SubscriptionSettings = new PersistentSubscriptionSettings(
resolveLinkTos: false,
messageTimeout: TimeSpan.FromSeconds(2),
maxRetryCount: 0
)
},
new ConsumePipe().AddDefaultConsumer(handler),
loggerFactory
);
}

static async Task<ResolvedEvent?> ReadFirstParkedEvent(KurrentDBClient client, string parkedStream, TimeSpan timeout, CancellationToken cancellationToken) {
using var cts = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);
cts.CancelAfter(timeout);

while (!cts.Token.IsCancellationRequested) {
try {
var read = client.ReadStreamAsync(
Direction.Forwards,
parkedStream,
StreamPosition.Start,
maxCount: 1,
resolveLinkTos: true,
cancellationToken: cts.Token
);
var state = await read.ReadState;

if (state == ReadState.Ok) {
await foreach (var resolved in read) {
return resolved;
}
}
} catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) {
return null;
}

try {
await Task.Delay(200, cts.Token);
} catch (OperationCanceledException) when (cts.Token.IsCancellationRequested) {
return null;
}
}

return null;
}
}

public class AlwaysFailingHandler : BaseEventHandler {
int _failures;

public int Failures => Volatile.Read(ref _failures);

public override ValueTask<EventHandlingStatus> HandleEvent(IMessageConsumeContext context) {
Interlocked.Increment(ref _failures);

throw new InvalidOperationException("Simulated handler failure");
}
}
Loading