Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
34 changes: 20 additions & 14 deletions MessageBusReader/Program.cs
Original file line number Diff line number Diff line change
@@ -1,4 +1,6 @@
namespace MessageBusReader
using MessageBusReader.QueueProcessingHandlers;
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Opinions vary on whether usings should be inside or outside of the namespace. However, please don't do both in the same class.


namespace MessageBusReader
{
using System;
using System.Collections.Generic;
Expand All @@ -15,6 +17,13 @@ class Program
private static ServiceBusClient _client;
private static ServiceBusProcessor _processor;

// Customise your operational params.
private const string QueueName = "error";
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we call this new variable errorQueueName please?

private const string Env = "DEV_CONNECTION_STRING";
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would expect a variable called Env to have values "dev", "qa" and "prod". We are conflating the environment with the only value (at present) that changes by environment. I think we can do this better?


// Default handler unless overridden in Main.
private static Func<ProcessMessageEventArgs, Task> _messageHandler = ProcessMessagesAsync;

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that we have several different error queues I though we might benefit from a global variable to switch which queue we are processing.

private static TaskCompletionSource<int> _taskCompletionSource;
private static Task<int> _loopTask;
private static int _completeCounter = 0;
Expand All @@ -30,15 +39,14 @@ static async Task Main(string[] args)
var dotenv = Path.Combine(root, ".env");
DotEnv.Load(dotenv);

string env;
env = "PRODUCTION_CONNECTION_STRING";
// env = "QA_CONNECTION_STRING";
// env = "DEV_CONNECTION_STRING";
var connectionString = Environment.GetEnvironmentVariable(env);
var connectionString = Environment.GetEnvironmentVariable(Env);

// Connect to error queue
_client = new ServiceBusClient(connectionString);

// _messageHandler = AcademyProcessing.ProcessAcademyMessagesAsync;
// _messageHandler = AcademyProcessing.ProcessAcademyDeadletterAsync;
// _messageHandler = AcademyProcessing.ProcessCheckTypesAsync;
// await MainAsync();

// Switch to this to move deadletter back to the error queue
Expand All @@ -54,9 +62,9 @@ static async Task MainAsync()
ReceiveMode = ServiceBusReceiveMode.PeekLock,
};

_processor = _client.CreateProcessor("error", options);
_processor = _client.CreateProcessor(QueueName, options);

_processor.ProcessMessageAsync += ProcessMessagesAsync;
_processor.ProcessMessageAsync += _messageHandler;
_processor.ProcessErrorAsync += ExceptionReceivedHandler;

_taskCompletionSource = new TaskCompletionSource<int>();
Expand Down Expand Up @@ -256,11 +264,9 @@ static async Task MoveDeadletter()
SubQueue = SubQueue.DeadLetter
};

string queueName = "error";

_processor = _client.CreateProcessor(queueName, options);
_processor = _client.CreateProcessor(QueueName, options);

_processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, queueName);
_processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, QueueName);
_processor.ProcessErrorAsync += ExceptionReceivedHandler;

_taskCompletionSource = new TaskCompletionSource<int>();
Expand Down Expand Up @@ -293,7 +299,7 @@ private static async Task ReturnDeadletterAsync(ProcessMessageEventArgs args, st
await CompleteMessage(args);
}

private static async Task CompleteMessage(ProcessMessageEventArgs args)
public static async Task CompleteMessage(ProcessMessageEventArgs args)
{
await args.CompleteMessageAsync(args.Message);

Expand All @@ -309,7 +315,7 @@ private static async Task ReturnToSource(ProcessMessageEventArgs args)
await ReturnToSource(args, 0);
}

private static async Task ReturnToSource(ProcessMessageEventArgs args, int delay)
public static async Task ReturnToSource(ProcessMessageEventArgs args, int delay)
{
string source = GetSource(args);

Expand Down
137 changes: 137 additions & 0 deletions MessageBusReader/QueueProcessingHandlers/AcademyProcessing.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using Azure.Messaging.ServiceBus;

namespace MessageBusReader.QueueProcessingHandlers;

public class AcademyProcessing
{
private static int _messageCount = 0;
private static List<string> _foundTypes = new List<string>();
private static int _delay = 0;

public static async Task ProcessAcademyDeadletterAsync(ProcessMessageEventArgs args)
{
ServiceBusReceivedMessage message = args.Message;

_messageCount = Interlocked.Increment(ref _messageCount);

Console.WriteLine($"Processing {_messageCount}: {message.MessageId}");

if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false)
{
Console.WriteLine("Unable to get message type - Moving to deadletter");
await args.DeadLetterMessageAsync(args.Message);
return;
}

var academyTypes = new[]
{
"Edrington.Academy.Contracts.Events.AcademyInteracted, Edrington.Academy.Contracts",
"Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts",
};
string type = typeValue.ToString();

// Academy class/chapter create race condition.
if (!academyTypes.Contains(type))
{
Console.WriteLine($"Type {type} is irrelevant - Moving to deadletter");
await args.DeadLetterMessageAsync(args.Message);
return;
}

return;
}


public static async Task ProcessCheckTypesAsync(ProcessMessageEventArgs args)
{
ServiceBusReceivedMessage message = args.Message;

_messageCount = Interlocked.Increment(ref _messageCount);

// Console.WriteLine($"Processing {_messageCount}: {message.MessageId}");

if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false)
{
// Console.WriteLine("Unable to get message type - Moving to deadletter");
await args.DeadLetterMessageAsync(args.Message);
return;
}

string type = typeValue.ToString();

if (_foundTypes.Contains(type))
{
return;
}

_foundTypes.Add(type);
Console.WriteLine($"Found new type {type}");
return;
}

public static async Task ProcessAcademyMessagesAsync(ProcessMessageEventArgs args)
{
ServiceBusReceivedMessage message = args.Message;

_messageCount = Interlocked.Increment(ref _messageCount);

Console.WriteLine($"Processing {_messageCount}: {message.MessageId}");

if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false)
{
return;
}

var academyTypes = new[]
{
"Edrington.Academy.Contracts.Events.AcademyInteracted, Edrington.Academy.Contracts",
"Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts",
};
string type = typeValue.ToString();

if (!academyTypes.Contains(type))
{
return;
}

if (message.ContainsError("VALIDATION_ERROR"))
{
Console.WriteLine($"Found validation error: {message}");
await Program.ReturnToSource(args, _delay);
_delay++;
return;
}

if (message.ContainsError("Object reference not set to an instance of an object"))
{
Console.WriteLine($"Found missing user error: {message}");
await Program.ReturnToSource(args, _delay);
_delay++;
return;
}

Console.WriteLine("Error message:");
Console.WriteLine(message.GetErrorMessage());

var removeMessageIds = new[]
{
"d9d0141a-f997-468c-831d-a4c155e74b64",
"a72de5ea-4bd1-49e7-9463-0166ce04479e",
"2354c8c2-c86d-495d-8a97-520990e1631e",
};
if (removeMessageIds.Contains(message.MessageId))
{
Console.WriteLine($"Removing message.");
await Program.CompleteMessage(args);
return;
}

// Console.WriteLine($"Not processed message: {message}");
return;
}
}