Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
38 changes: 36 additions & 2 deletions AzureBatchQueue/MessageQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,22 @@ async Task QuarantineMessage(QueueMessage queueMessage, CancellationToken ct = d
{
try
{
await quarantineQueue.SendMessageAsync(queueMessage.Body, cancellationToken: ct);
await queue.DeleteMessageAsync(queueMessage.MessageId, queueMessage.PopReceipt, ct);
BinaryData messageBody;

if (IsBlobRef(queueMessage.Body, out var blobRef) && blobRef != null)
{
var newBlobRef = await CopyBlobForQuarantine(blobRef, queueMessage.MessageId, ct);
messageBody = Payload.OffloadedToBlob(newBlobRef).Data;
}
else
{
messageBody = queueMessage.Body;
}

await quarantineQueue.SendMessageAsync(messageBody, cancellationToken: ct);

var messageId = new MessageId(queueMessage.MessageId, queueMessage.PopReceipt, blobRef?.BlobName);
await DeleteMessage(messageId, ct);

logger.LogInformation("QueueMessage {msgId} with {popReceipt} was quarantined after {dequeueCount} unsuccessful attempts.",
queueMessage.MessageId, queueMessage.PopReceipt, queueMessage.DequeueCount);
Expand All @@ -126,6 +140,26 @@ async Task QuarantineMessage(QueueMessage queueMessage, CancellationToken ct = d
}
}

// Why not just move the message from main queue to quarantine queue?
// Because on message quarantine, we can put the message into quarantine queue, and then the service stops and we don't delete it from the main queue.
// In this case, later we will insert a duplicate message to quarantine, with the same ref to blob.
// When one message is processed, we remove the blob. The other message will have a BlobNotFound error.
// So it's better to do some more heavy-lifting and copy blob content on quarantine.
async Task<BlobRef> CopyBlobForQuarantine(BlobRef sourceBlobRef, string messageId, CancellationToken ct)
{
var sourceBlobClient = container.GetBlobClient(sourceBlobRef.BlobName);
var newBlobRef = BlobRef.Create();
var destinationBlobClient = container.GetBlobClient(newBlobRef.BlobName);

var content = await sourceBlobClient.DownloadContentAsync(ct);
await destinationBlobClient.UploadAsync(content.Value.Content, ct);

logger.LogInformation("Copied blob {SourceBlob} to {DestBlob} for quarantine message {MsgId}",
sourceBlobRef.BlobName, newBlobRef.BlobName, messageId);

return newBlobRef;
}

public async Task Dequarantine(CancellationToken ct = default)
{
do
Expand Down
Loading