-
Notifications
You must be signed in to change notification settings - Fork 1
By yanniboi: Added error replay for HubSpot Academy validation errors. #2
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
349e811
0fe9d01
c3c87ae
d40099d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -1,4 +1,6 @@ | ||
| namespace MessageBusReader | ||
| using MessageBusReader.QueueProcessingHandlers; | ||
|
|
||
| namespace MessageBusReader | ||
| { | ||
| using System; | ||
| using System.Collections.Generic; | ||
|
|
@@ -15,6 +17,13 @@ class Program | |
| private static ServiceBusClient _client; | ||
| private static ServiceBusProcessor _processor; | ||
|
|
||
| // Customise your operational params. | ||
| private const string QueueName = "error"; | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can we call this new variable |
||
| private const string Env = "DEV_CONNECTION_STRING"; | ||
|
Owner
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. I would expect a variable called |
||
|
|
||
| // Default handler unless overridden in Main. | ||
| private static Func<ProcessMessageEventArgs, Task> _messageHandler = ProcessMessagesAsync; | ||
|
|
||
|
Collaborator
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Now that we have several different error queues I though we might benefit from a global variable to switch which queue we are processing. |
||
| private static TaskCompletionSource<int> _taskCompletionSource; | ||
| private static Task<int> _loopTask; | ||
| private static int _completeCounter = 0; | ||
|
|
@@ -30,15 +39,14 @@ static async Task Main(string[] args) | |
| var dotenv = Path.Combine(root, ".env"); | ||
| DotEnv.Load(dotenv); | ||
|
|
||
| string env; | ||
| env = "PRODUCTION_CONNECTION_STRING"; | ||
| // env = "QA_CONNECTION_STRING"; | ||
| // env = "DEV_CONNECTION_STRING"; | ||
| var connectionString = Environment.GetEnvironmentVariable(env); | ||
| var connectionString = Environment.GetEnvironmentVariable(Env); | ||
|
|
||
| // Connect to error queue | ||
| _client = new ServiceBusClient(connectionString); | ||
|
|
||
| // _messageHandler = AcademyProcessing.ProcessAcademyMessagesAsync; | ||
| // _messageHandler = AcademyProcessing.ProcessAcademyDeadletterAsync; | ||
| // _messageHandler = AcademyProcessing.ProcessCheckTypesAsync; | ||
| // await MainAsync(); | ||
|
|
||
| // Switch to this to move deadletter back to the error queue | ||
|
|
@@ -54,9 +62,9 @@ static async Task MainAsync() | |
| ReceiveMode = ServiceBusReceiveMode.PeekLock, | ||
| }; | ||
|
|
||
| _processor = _client.CreateProcessor("error", options); | ||
| _processor = _client.CreateProcessor(QueueName, options); | ||
|
|
||
| _processor.ProcessMessageAsync += ProcessMessagesAsync; | ||
| _processor.ProcessMessageAsync += _messageHandler; | ||
| _processor.ProcessErrorAsync += ExceptionReceivedHandler; | ||
|
|
||
| _taskCompletionSource = new TaskCompletionSource<int>(); | ||
|
|
@@ -256,11 +264,9 @@ static async Task MoveDeadletter() | |
| SubQueue = SubQueue.DeadLetter | ||
| }; | ||
|
|
||
| string queueName = "error"; | ||
|
|
||
| _processor = _client.CreateProcessor(queueName, options); | ||
| _processor = _client.CreateProcessor(QueueName, options); | ||
|
|
||
| _processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, queueName); | ||
| _processor.ProcessMessageAsync += args => ReturnDeadletterAsync(args, QueueName); | ||
| _processor.ProcessErrorAsync += ExceptionReceivedHandler; | ||
|
|
||
| _taskCompletionSource = new TaskCompletionSource<int>(); | ||
|
|
@@ -293,7 +299,7 @@ private static async Task ReturnDeadletterAsync(ProcessMessageEventArgs args, st | |
| await CompleteMessage(args); | ||
| } | ||
|
|
||
| private static async Task CompleteMessage(ProcessMessageEventArgs args) | ||
| public static async Task CompleteMessage(ProcessMessageEventArgs args) | ||
| { | ||
| await args.CompleteMessageAsync(args.Message); | ||
|
|
||
|
|
@@ -309,7 +315,7 @@ private static async Task ReturnToSource(ProcessMessageEventArgs args) | |
| await ReturnToSource(args, 0); | ||
| } | ||
|
|
||
| private static async Task ReturnToSource(ProcessMessageEventArgs args, int delay) | ||
| public static async Task ReturnToSource(ProcessMessageEventArgs args, int delay) | ||
| { | ||
| string source = GetSource(args); | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,137 @@ | ||
| using System; | ||
| using System.Collections.Generic; | ||
| using System.Linq; | ||
| using System.Threading; | ||
| using System.Threading.Tasks; | ||
| using Azure.Messaging.ServiceBus; | ||
|
|
||
| namespace MessageBusReader.QueueProcessingHandlers; | ||
|
|
||
| public class AcademyProcessing | ||
| { | ||
| private static int _messageCount = 0; | ||
| private static List<string> _foundTypes = new List<string>(); | ||
| private static int _delay = 0; | ||
|
|
||
| public static async Task ProcessAcademyDeadletterAsync(ProcessMessageEventArgs args) | ||
| { | ||
| ServiceBusReceivedMessage message = args.Message; | ||
|
|
||
| _messageCount = Interlocked.Increment(ref _messageCount); | ||
|
|
||
| Console.WriteLine($"Processing {_messageCount}: {message.MessageId}"); | ||
|
|
||
| if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false) | ||
| { | ||
| Console.WriteLine("Unable to get message type - Moving to deadletter"); | ||
| await args.DeadLetterMessageAsync(args.Message); | ||
| return; | ||
| } | ||
|
|
||
| var academyTypes = new[] | ||
| { | ||
| "Edrington.Academy.Contracts.Events.AcademyInteracted, Edrington.Academy.Contracts", | ||
| "Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts", | ||
| }; | ||
| string type = typeValue.ToString(); | ||
|
|
||
| // Academy class/chapter create race condition. | ||
| if (!academyTypes.Contains(type)) | ||
| { | ||
| Console.WriteLine($"Type {type} is irrelevant - Moving to deadletter"); | ||
| await args.DeadLetterMessageAsync(args.Message); | ||
| return; | ||
| } | ||
|
|
||
| return; | ||
| } | ||
|
|
||
|
|
||
| public static async Task ProcessCheckTypesAsync(ProcessMessageEventArgs args) | ||
| { | ||
| ServiceBusReceivedMessage message = args.Message; | ||
|
|
||
| _messageCount = Interlocked.Increment(ref _messageCount); | ||
|
|
||
| // Console.WriteLine($"Processing {_messageCount}: {message.MessageId}"); | ||
|
|
||
| if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false) | ||
| { | ||
| // Console.WriteLine("Unable to get message type - Moving to deadletter"); | ||
| await args.DeadLetterMessageAsync(args.Message); | ||
| return; | ||
| } | ||
|
|
||
| string type = typeValue.ToString(); | ||
|
|
||
| if (_foundTypes.Contains(type)) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| _foundTypes.Add(type); | ||
| Console.WriteLine($"Found new type {type}"); | ||
| return; | ||
| } | ||
|
|
||
| public static async Task ProcessAcademyMessagesAsync(ProcessMessageEventArgs args) | ||
| { | ||
| ServiceBusReceivedMessage message = args.Message; | ||
|
|
||
| _messageCount = Interlocked.Increment(ref _messageCount); | ||
|
|
||
| Console.WriteLine($"Processing {_messageCount}: {message.MessageId}"); | ||
|
|
||
| if (message.ApplicationProperties.TryGetValue("rbs2-msg-type", out object typeValue) == false) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| var academyTypes = new[] | ||
| { | ||
| "Edrington.Academy.Contracts.Events.AcademyInteracted, Edrington.Academy.Contracts", | ||
| "Edrington.Academy.Contracts.Events.AcademyCourseInteracted, Edrington.Academy.Contracts", | ||
| }; | ||
| string type = typeValue.ToString(); | ||
|
|
||
| if (!academyTypes.Contains(type)) | ||
| { | ||
| return; | ||
| } | ||
|
|
||
| if (message.ContainsError("VALIDATION_ERROR")) | ||
| { | ||
| Console.WriteLine($"Found validation error: {message}"); | ||
| await Program.ReturnToSource(args, _delay); | ||
| _delay++; | ||
| return; | ||
| } | ||
|
|
||
| if (message.ContainsError("Object reference not set to an instance of an object")) | ||
| { | ||
| Console.WriteLine($"Found missing user error: {message}"); | ||
| await Program.ReturnToSource(args, _delay); | ||
| _delay++; | ||
| return; | ||
| } | ||
|
|
||
| Console.WriteLine("Error message:"); | ||
| Console.WriteLine(message.GetErrorMessage()); | ||
|
|
||
| var removeMessageIds = new[] | ||
| { | ||
| "d9d0141a-f997-468c-831d-a4c155e74b64", | ||
| "a72de5ea-4bd1-49e7-9463-0166ce04479e", | ||
| "2354c8c2-c86d-495d-8a97-520990e1631e", | ||
| }; | ||
| if (removeMessageIds.Contains(message.MessageId)) | ||
| { | ||
| Console.WriteLine($"Removing message."); | ||
| await Program.CompleteMessage(args); | ||
| return; | ||
| } | ||
|
|
||
| // Console.WriteLine($"Not processed message: {message}"); | ||
| return; | ||
| } | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Opinions vary on whether usings should be inside or outside of the namespace. However, please don't do both in the same class.