Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,180 @@
using System;
using System.Linq;
using System.Security.Claims;
using System.Threading;
using System.Threading.Tasks;
using EventStore.Core.Bus;
using EventStore.Core.Data;
using EventStore.Core.Helpers;
using EventStore.Core.LogAbstraction;
using EventStore.Core.Messages;
using EventStore.Core.Metrics;
using EventStore.Core.Services;
using EventStore.Core.Services.PersistentSubscription;
using EventStore.Core.Services.PersistentSubscription.ConsumerStrategy;
using EventStore.Core.Services.Storage.ReaderIndex;
using EventStore.Core.Tests.Services.Replication;
using EventStore.Core.Tests.TransactionLog;
using NUnit.Framework;

namespace EventStore.Core.Tests.Services.PersistentSubscription;

[TestFixture]
public class PersistentSubscriptionServiceNotReadyTests {
private PersistentSubscriptionService<string> _sut;

[SetUp]
public void SetUp() {
var bus = new SynchronousScheduler();
var trackers = new Trackers();

_sut = new PersistentSubscriptionService<string>(
new QueuedHandlerThreadPool(bus, "test", new QueueStatsManager(), new QueueTrackers()),
new FakeReadIndex<LogFormat.V2, string>(_ => false, new MetaStreamLookup()),
new IODispatcher(bus, bus), bus,
new PersistentSubscriptionConsumerStrategyRegistry(
bus,
bus,
Array.Empty<IPersistentSubscriptionConsumerStrategyFactory>()),
trackers.PersistentSubscriptionTracker);
}

[Test]
public void create_stream_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

_sut.Handle(new ClientMessage.CreatePersistentSubscriptionToStream(
Guid.NewGuid(), correlationId, envelope, "stream", "group", false, 0L, 10_000, false, 10, 10, 10, 5,
1_000, 10, 100, 10, SystemConsumerStrategies.RoundRobin, ClaimsPrincipal.Current));

AssertNotReady(envelope, correlationId);
}

[Test]
public void create_all_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

_sut.Handle(new ClientMessage.CreatePersistentSubscriptionToAll(
Guid.NewGuid(), correlationId, envelope, "group", EventFilter.DefaultAllFilter, false, new TFPos(0, 0),
10_000, false, 10, 10, 10, 5, 1_000, 10, 100, 10, SystemConsumerStrategies.RoundRobin,
ClaimsPrincipal.Current));

AssertNotReady(envelope, correlationId);
}

[Test]
public void update_stream_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

_sut.Handle(new ClientMessage.UpdatePersistentSubscriptionToStream(
Guid.NewGuid(), correlationId, envelope, "stream", "group", false, 0L, 10_000, false, 10, 10, 10, 5,
1_000, 10, 100, 10, SystemConsumerStrategies.RoundRobin, ClaimsPrincipal.Current));

AssertNotReady(envelope, correlationId);
}

[Test]
public void update_all_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

_sut.Handle(new ClientMessage.UpdatePersistentSubscriptionToAll(
Guid.NewGuid(), correlationId, envelope, "group", false, new TFPos(0, 0), 10_000, false, 10, 10, 10, 5,
1_000, 10, 100, 10, SystemConsumerStrategies.RoundRobin, ClaimsPrincipal.Current));

AssertNotReady(envelope, correlationId);
}

[Test]
public void delete_stream_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

_sut.Handle(new ClientMessage.DeletePersistentSubscriptionToStream(
Guid.NewGuid(), correlationId, envelope, "stream", "group", ClaimsPrincipal.Current));

AssertNotReady(envelope, correlationId);
}

[Test]
public void delete_all_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

_sut.Handle(new ClientMessage.DeletePersistentSubscriptionToAll(
Guid.NewGuid(), correlationId, envelope, "group", ClaimsPrincipal.Current));

AssertNotReady(envelope, correlationId);
}

[Test]
public async Task connect_stream_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

await ((IAsyncHandle<ClientMessage.ConnectToPersistentSubscriptionToStream>)_sut).HandleAsync(
new ClientMessage.ConnectToPersistentSubscriptionToStream(
Guid.NewGuid(), correlationId, envelope, Guid.NewGuid(), "connection", "group", "stream", 1,
"source", ClaimsPrincipal.Current),
CancellationToken.None);

AssertNotReady(envelope, correlationId);
}

[Test]
public async Task connect_all_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

await ((IAsyncHandle<ClientMessage.ConnectToPersistentSubscriptionToAll>)_sut).HandleAsync(
new ClientMessage.ConnectToPersistentSubscriptionToAll(
Guid.NewGuid(), correlationId, envelope, Guid.NewGuid(), "connection", "group", 1, "source",
ClaimsPrincipal.Current),
CancellationToken.None);

AssertNotReady(envelope, correlationId);
}

[Test]
public void read_next_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

_sut.Handle(new ClientMessage.ReadNextNPersistentMessages(
Guid.NewGuid(), correlationId, envelope, "stream", "group", 10, ClaimsPrincipal.Current));

AssertNotReady(envelope, correlationId);
}

[Test]
public void replay_parked_replies_not_ready() {
var envelope = new FakeEnvelope();
var correlationId = Guid.NewGuid();

_sut.Handle(new ClientMessage.ReplayParkedMessages(
Guid.NewGuid(), correlationId, envelope, "stream", "group", null, ClaimsPrincipal.Current));

AssertNotReady(envelope, correlationId);
}

private static void AssertNotReady(FakeEnvelope envelope, Guid correlationId) {
Assert.That(envelope.Replies, Has.Count.EqualTo(1));
var reply = envelope.Replies.Single();

Assert.That(reply, Is.TypeOf<ClientMessage.NotHandled>());
var notHandled = (ClientMessage.NotHandled)reply;
Assert.That(notHandled.CorrelationId, Is.EqualTo(correlationId));
Assert.That(notHandled.Reason, Is.EqualTo(ClientMessage.NotHandled.Types.NotHandledReason.NotReady));
}

private sealed class MetaStreamLookup : IMetastreamLookup<string> {
public bool IsMetaStream(string streamId) => throw new NotSupportedException();

public string MetaStreamOf(string streamId) => throw new NotSupportedException();

public string OriginalStreamOf(string streamId) => throw new NotSupportedException();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,6 @@ private void CreatePersistentSubscription(
string user
)
{
if (!_started) return;
var stream = eventSource.ToString();
var key = BuildSubscriptionGroupKey(stream, groupName);
Log.Debug("Creating persistent subscription {subscriptionKey}", key);
Expand Down Expand Up @@ -347,6 +346,12 @@ string user

public void Handle(ClientMessage.CreatePersistentSubscriptionToStream message)
{
if (!_started)
{
ReplyWithNotReady(message.Envelope, message.CorrelationId);
return;
}

if (string.IsNullOrEmpty(message.EventStreamId))
{
message.Envelope.ReplyWith(new ClientMessage.CreatePersistentSubscriptionToStreamCompleted(
Expand Down Expand Up @@ -433,6 +438,12 @@ public void Handle(ClientMessage.CreatePersistentSubscriptionToStream message)

public void Handle(ClientMessage.CreatePersistentSubscriptionToAll message)
{
if (!_started)
{
ReplyWithNotReady(message.Envelope, message.CorrelationId);
return;
}

try
{
CreatePersistentSubscription(
Expand Down Expand Up @@ -522,8 +533,6 @@ private void UpdatePersistentSubscription(
string user
)
{
if (!_started) return;

var key = BuildSubscriptionGroupKey(stream, groupName);
Log.Debug("Updating persistent subscription {subscriptionKey}", key);

Expand Down Expand Up @@ -602,6 +611,12 @@ string user

public void Handle(ClientMessage.UpdatePersistentSubscriptionToStream message)
{
if (!_started)
{
ReplyWithNotReady(message.Envelope, message.CorrelationId);
return;
}

if (string.IsNullOrEmpty(message.EventStreamId))
{
message.Envelope.ReplyWith(new ClientMessage.UpdatePersistentSubscriptionToStreamCompleted(
Expand Down Expand Up @@ -689,6 +704,12 @@ public void Handle(ClientMessage.UpdatePersistentSubscriptionToStream message)

public void Handle(ClientMessage.UpdatePersistentSubscriptionToAll message)
{
if (!_started)
{
ReplyWithNotReady(message.Envelope, message.CorrelationId);
return;
}

try
{
UpdatePersistentSubscription(
Expand Down Expand Up @@ -816,7 +837,6 @@ private void DeletePersistentSubscription(
string user
)
{
if (!_started) return;
var stream = eventSource.ToString();
var key = BuildSubscriptionGroupKey(stream, groupName);
Log.Debug("Deleting persistent subscription {subscriptionKey}", key);
Expand All @@ -843,6 +863,12 @@ string user

public void Handle(ClientMessage.DeletePersistentSubscriptionToStream message)
{
if (!_started)
{
ReplyWithNotReady(message.Envelope, message.CorrelationId);
return;
}

if (string.IsNullOrEmpty(message.EventStreamId))
{
message.Envelope.ReplyWith(new ClientMessage.DeletePersistentSubscriptionToStreamCompleted(
Expand Down Expand Up @@ -903,6 +929,12 @@ public void Handle(ClientMessage.DeletePersistentSubscriptionToStream message)

public void Handle(ClientMessage.DeletePersistentSubscriptionToAll message)
{
if (!_started)
{
ReplyWithNotReady(message.Envelope, message.CorrelationId);
return;
}

DeletePersistentSubscription(
new PersistentSubscriptionAllStreamEventSource(),
message.GroupName,
Expand Down Expand Up @@ -1069,7 +1101,11 @@ public async ValueTask ConnectToPersistentSubscription(
string user,
CancellationToken token)
{
if (!_started) return;
if (!_started)
{
ReplyWithNotReady(envelope, correlationId);
return;
}

var stream = eventSource.ToString();
if (!_subscriptionTopics.TryGetValue(stream, out _))
Expand Down Expand Up @@ -1228,7 +1264,11 @@ public void Handle(ClientMessage.PersistentSubscriptionNackEvents message)

public void Handle(ClientMessage.ReadNextNPersistentMessages message)
{
if (!_started) return;
if (!_started)
{
ReplyWithNotReady(message.Envelope, message.CorrelationId);
return;
}
Comment thread
yordis marked this conversation as resolved.

if (string.IsNullOrEmpty(message.EventStreamId))
{
Expand Down Expand Up @@ -1283,6 +1323,12 @@ public void Handle(ClientMessage.ReadNextNPersistentMessages message)

public void Handle(ClientMessage.ReplayParkedMessages message)
{
if (!_started)
{
ReplyWithNotReady(message.Envelope, message.CorrelationId);
return;
}

PersistentSubscription subscription;
var key = BuildSubscriptionGroupKey(message.EventStreamId, message.GroupName);
Log.Debug("Replaying parked messages for persistent subscription {subscriptionKey} {to}",
Expand Down Expand Up @@ -1464,6 +1510,14 @@ public void Handle(TelemetryMessage.Request message)
new JsonObject { ["count"] = _subscriptionsById?.Count ?? 0 }));
}

private static void ReplyWithNotReady(IEnvelope envelope, Guid correlationId)
{
envelope.ReplyWith(new ClientMessage.NotHandled(
correlationId,
ClientMessage.NotHandled.Types.NotHandledReason.NotReady,
(string)null));
}

public void Handle(MonitoringMessage.GetPersistentSubscriptionStats message)
{
if (!_started)
Expand Down
2 changes: 1 addition & 1 deletion src/EventStore.Core/Services/Transport/Http/Configure.cs
Original file line number Diff line number Diff line change
Expand Up @@ -119,7 +119,7 @@ public static ResponseConfiguration Unauthorized(string description = null) {
Helper.UTF8NoBom);
}

private static ResponseConfiguration HandleNotHandled(Uri requestedUri, ClientMessage.NotHandled notHandled) {
public static ResponseConfiguration HandleNotHandled(Uri requestedUri, ClientMessage.NotHandled notHandled) {
switch (notHandled.Reason) {
case ClientMessage.NotHandled.Types.NotHandledReason.NotReady:
return ServiceUnavailable("Server Is Not Ready");
Expand Down
Loading
Loading