Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,18 @@ public class MessageReceptionBuilder

public Type HandlerType => _registration.HandlerType;

public Type PayloadType => _registration.PayloadType;
public Type? PayloadType => _registration.PayloadType;

public MessageReceptionBuilder(ClientOptions clientOptions, Type payloadType, Type handlerType)
{
_registration = new MessageReceptionRegistration(clientOptions, payloadType, handlerType);
}

public MessageReceptionBuilder(ClientOptions clientOptions, string payloadTypeId, Type handlerType)
{
_registration = new MessageReceptionRegistration(clientOptions, payloadTypeId, handlerType);
}

/// <summary>
/// Sets the PayloadTypeId (by default it will take the <see cref="MemberInfo.Name"/> of the payload <see cref="Type"/> object)
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,12 @@

namespace Ev.ServiceBus.Abstractions;

public enum HandlerMode
{
Typed = 1,
Generic
}

public class MessageReceptionRegistration
{
public MessageReceptionRegistration(ClientOptions clientOptions, Type payloadType, Type handlerType)
Expand All @@ -10,6 +16,16 @@ public MessageReceptionRegistration(ClientOptions clientOptions, Type payloadTyp
PayloadType = payloadType;
HandlerType = handlerType;
PayloadTypeId = PayloadType.Name;
HandlerMode = HandlerMode.Typed;
}

public MessageReceptionRegistration(ClientOptions clientOptions, string payloadTypeId, Type handlerType)
{
Options = clientOptions;
PayloadType = null;
HandlerType = handlerType;
PayloadTypeId = payloadTypeId;
HandlerMode = HandlerMode.Generic;
}

/// <summary>
Expand All @@ -20,7 +36,7 @@ public MessageReceptionRegistration(ClientOptions clientOptions, Type payloadTyp
/// <summary>
/// The type the receiving message wil be deserialized into.
/// </summary>
public Type PayloadType { get; }
public Type? PayloadType { get; }

/// <summary>
/// The class that will be resolved to process the incoming message.
Expand All @@ -31,4 +47,9 @@ public MessageReceptionRegistration(ClientOptions clientOptions, Type payloadTyp
/// The unique identifier of this payload's type.
/// </summary>
public string PayloadTypeId { get; internal set; }

/// <summary>
/// The unique identifier of this payload's type.
/// </summary>
public HandlerMode HandlerMode { get; internal set; }
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
using System.Threading;
using System;
using System.Threading;
using System.Threading.Tasks;
using Ev.ServiceBus.Abstractions.MessageReception;

namespace Ev.ServiceBus.Reception;

Expand All @@ -16,4 +18,18 @@ public interface IMessageReceptionHandler<in TMessagePayload>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task Handle(TMessagePayload @event, CancellationToken cancellationToken);
}

/// <summary>
/// Base interface for a message reception handler.
/// </summary>
public interface IMessageReceptionHandler
{
/// <summary>
/// Called whenever a message of linked payloadTypeId is received.
/// </summary>
/// <param name="event"></param>
/// <param name="cancellationToken"></param>
/// <returns></returns>
Task Handle(BinaryData body, IMessageMetadata messageMetadata, CancellationToken cancellationToken);
}
17 changes: 14 additions & 3 deletions src/Ev.ServiceBus/Reception/MessageReceptionHandler.cs
Original file line number Diff line number Diff line change
Expand Up @@ -62,9 +62,20 @@ public async Task HandleMessageAsync(MessageContext context)
if (context.CancellationToken.IsCancellationRequested)
return;

var @event = _messagePayloadSerializer.DeSerializeBody(context.Message.Body.ToArray(), context.ReceptionRegistration!.PayloadType);
var methodInfo = _callHandlerInfo.MakeGenericMethod(context.ReceptionRegistration.PayloadType);
await ((Task) methodInfo.Invoke(this, new[] { context.ReceptionRegistration, @event, context.CancellationToken })!);
switch (context.ReceptionRegistration.HandlerMode)
{
case HandlerMode.Typed:
var @event = _messagePayloadSerializer.DeSerializeBody(context.Message.Body.ToArray(), context.ReceptionRegistration!.PayloadType);
var methodInfo = _callHandlerInfo.MakeGenericMethod(context.ReceptionRegistration.PayloadType);
await ((Task) methodInfo.Invoke(this, new[] { context.ReceptionRegistration, @event, context.CancellationToken })!);
break;
case HandlerMode.Generic:
var handler = (IMessageReceptionHandler) _provider.GetRequiredService(context.ReceptionRegistration.HandlerType);

await handler.Handle(context.Message.Body, _messageMetadataAccessor.Metadata!, context.CancellationToken);
break;
default: throw new ArgumentOutOfRangeException();
}
}
catch (Exception ex)
{
Expand Down
18 changes: 18 additions & 0 deletions src/Ev.ServiceBus/Reception/ReceptionRegistrationBuilder.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,24 @@ public MessageReceptionBuilder RegisterReception<TReceptionModel, THandler>()
return builder;
}

/// <summary>
/// Registers a generic handler to receive message from a given PayloadTypeId.
/// </summary>
/// <param name="payloadTypeId"></param>
/// <typeparam name="THandler">The handler that will receive the raw data</typeparam>
/// <returns></returns>
public MessageReceptionBuilder RegisterReception<THandler>(string payloadTypeId)
where THandler : class, IMessageReceptionHandler
{
_services.TryAddScoped<THandler>();
var builder = new MessageReceptionBuilder(_options, payloadTypeId, typeof(THandler));
_services.Configure<ServiceBusOptions>(options =>
{
options.RegisterReception(builder);
});
return builder;
}

/// <summary>
/// Registers a class as a payload to receive and deserialize through the current resource.
/// </summary>
Expand Down