Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
87073cc
Add persistence docs
warwick-exia Jan 19, 2026
e47dc69
Remove local files
warwickschroeder Jan 27, 2026
bb65fc6
WIP - Initial commit. MongoDB persistence SPIKE
warwickschroeder Jan 29, 2026
62c7770
WIP - add body storage and failed message for mongo persistence
warwickschroeder Feb 2, 2026
5994958
Add mongo indexers
warwickschroeder Feb 2, 2026
9bc34c8
Add body storage and full text search
warwickschroeder Feb 3, 2026
5a5f691
Add additional logging
warwickschroeder Feb 4, 2026
51e0e3c
Update to use EnableFullTextSearchOnBodies setting
warwickschroeder Feb 5, 2026
836dffc
performance updates
warwickschroeder Feb 13, 2026
18cdd22
Add BLOB storage option
warwickschroeder Feb 16, 2026
e7a5707
Update tests for Spike
warwickschroeder Feb 16, 2026
76fb3e3
Update for .net10
warwickschroeder Feb 16, 2026
5e3e104
Add mongodb persister to project build
warwickschroeder Feb 17, 2026
beda65e
Fix issue when using installer
warwickschroeder Feb 17, 2026
215969d
Add rety logic to the audit ingestion
warwickschroeder Feb 18, 2026
8dd3e0c
add logging when deadlocks are detected
warwickschroeder Feb 18, 2026
1ec0050
Attempt to fix deadlocks
warwickschroeder Feb 18, 2026
574ed73
Add InlineMessageBody storage
warwickschroeder Feb 18, 2026
0dd27d4
Attempt to resolve throughput issues in Azure
warwickschroeder Feb 18, 2026
5bc900f
Fix tests. Add comments
warwickschroeder Feb 19, 2026
c6becff
make it past tests for amazon document db
jpalac Feb 19, 2026
6d0867f
Re-enable validation for AuditRetentionPeriod settings to ensure a mi…
warwickschroeder Feb 25, 2026
7d40f1f
Merge branch 'cloudxp-679-warwick' of https://github.com/Particular/S…
warwickschroeder Feb 25, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -100,4 +100,4 @@ src/scaffolding.config
*.sln.iml

# Visual Studio Code
.vscode
.vscode
3 changes: 3 additions & 0 deletions src/Directory.Packages.props
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
<PackageVersion Include="Autofac" Version="9.0.0" />
<PackageVersion Include="AWSSDK.CloudWatch" Version="4.0.7" />
<PackageVersion Include="Azure.Identity" Version="1.17.1" />
<PackageVersion Include="Azure.Storage.Blobs" Version="12.27.0" />
<PackageVersion Include="Azure.Monitor.Query.Metrics" Version="1.0.0" />
<PackageVersion Include="Azure.ResourceManager.ServiceBus" Version="1.1.0" />
<PackageVersion Include="ByteSize" Version="2.1.2" />
Expand All @@ -19,6 +20,7 @@
<PackageVersion Include="Microsoft.AspNetCore.Authentication.JwtBearer" Version="10.0.3" />
<PackageVersion Include="Microsoft.AspNetCore.Authentication.OpenIdConnect" Version="10.0.3" />
<PackageVersion Include="Microsoft.AspNetCore.Mvc.Testing" Version="10.0.3" />
<PackageVersion Include="MongoDB.Driver" Version="3.6.0" />
<PackageVersion Include="Microsoft.AspNetCore.SignalR.Client" Version="10.0.3" />
<PackageVersion Include="Microsoft.Extensions.DependencyInjection" Version="10.0.3" />
<PackageVersion Include="Microsoft.Extensions.DependencyModel" Version="10.0.3" />
Expand Down Expand Up @@ -80,6 +82,7 @@
<PackageVersion Include="System.Reactive" Version="6.1.0" />
<PackageVersion Include="System.Reflection.MetadataLoadContext" Version="10.0.3" />
<PackageVersion Include="System.ServiceProcess.ServiceController" Version="10.0.3" />
<PackageVersion Include="Testcontainers.MongoDb" Version="4.3.0" />
<PackageVersion Include="Validar.Fody" Version="1.9.0" />
<PackageVersion Include="Yarp.ReverseProxy" Version="2.3.0" />
</ItemGroup>
Expand Down
1 change: 1 addition & 0 deletions src/ProjectReferences.Persisters.Audit.props
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
<ItemGroup Label="Persisters">
<ProjectReference Include="..\ServiceControl.Audit.Persistence.InMemory\ServiceControl.Audit.Persistence.InMemory.csproj" ReferenceOutputAssembly="false" Private="false" />
<ProjectReference Include="..\ServiceControl.Audit.Persistence.RavenDB\ServiceControl.Audit.Persistence.RavenDB.csproj" ReferenceOutputAssembly="false" Private="false" />
<ProjectReference Include="..\ServiceControl.Audit.Persistence.MongoDB\ServiceControl.Audit.Persistence.MongoDB.csproj" ReferenceOutputAssembly="false" Private="false" />
</ItemGroup>

</Project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,134 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Auditing.BodyStorage;
using Azure;
using Azure.Storage.Blobs;
using Azure.Storage.Blobs.Models;
using Microsoft.Extensions.Logging;

/// <summary>
/// Body storage implementation that uses Azure Blob Storage to store message bodies. Each body is stored as a separate blob, with metadata for content type and size.
/// The implementation includes retry logic for transient failures when uploading blobs, and uses a batched writer to optimize performance when storing large volumes of messages.
/// </summary>
class AzureBlobBodyStorage(
Channel<BodyWriteItem> channel,
MongoSettings settings,
ILogger<AzureBlobBodyStorage> logger)
: BatchedBodyStorageWriter<BodyWriteItem>(channel, settings, logger), IBodyStorage, IBodyWriter
{
const int MaxRetries = 3;
readonly BlobContainerClient containerClient = new(settings.BlobConnectionString, settings.BlobContainerName);

protected override string WriterName => "Azure Blob body storage writer";

// Initialization

public async Task Initialize(CancellationToken cancellationToken)
{
_ = await containerClient.CreateIfNotExistsAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
logger.LogInformation("Azure Blob body storage initialized. Container: {ContainerName}", containerClient.Name);
}

// IBodyWriter

public bool IsEnabled => true;

public async ValueTask WriteAsync(string id, string contentType, ReadOnlyMemory<byte> body, DateTime expiresAt, CancellationToken cancellationToken)
{
await WriteToChannelAsync(new BodyWriteItem
{
Id = id,
ContentType = contentType,
BodySize = body.Length,
Body = body.ToArray(),
ExpiresAt = expiresAt
}, cancellationToken).ConfigureAwait(false);
}

// IBodyStorage

public Task Store(string bodyId, string contentType, int bodySize, Stream bodyStream, CancellationToken cancellationToken)
=> Task.CompletedTask;

public async Task<StreamResult> TryFetch(string bodyId, CancellationToken cancellationToken)
{
var blobClient = containerClient.GetBlobClient(bodyId);

try
{
var response = await blobClient.DownloadStreamingAsync(cancellationToken: cancellationToken).ConfigureAwait(false);
var details = response.Value.Details;

var bodySize = 0;
if (details.Metadata.TryGetValue("bodySize", out var bodySizeStr))
{
_ = int.TryParse(bodySizeStr, out bodySize);
}

return new StreamResult
{
HasResult = true,
Stream = response.Value.Content,
ContentType = details.ContentType ?? "text/plain",
BodySize = bodySize,
Etag = details.ETag.ToString()
};
}
catch (RequestFailedException ex) when (ex.Status == 404)
{
return new StreamResult { HasResult = false };
}
}

// BatchedBodyStorageWriter

protected override async Task FlushBatchAsync(List<BodyWriteItem> batch, CancellationToken cancellationToken)
{
var uploadTasks = batch.Select(entry => UploadBlobWithRetry(entry, cancellationToken));
await Task.WhenAll(uploadTasks).ConfigureAwait(false);
}

async Task UploadBlobWithRetry(BodyWriteItem entry, CancellationToken cancellationToken)
{
var blobClient = containerClient.GetBlobClient(entry.Id);

for (var attempt = 1; attempt <= MaxRetries; attempt++)
{
try
{
using var stream = new MemoryStream(entry.Body);
var options = new BlobUploadOptions
{
HttpHeaders = new BlobHttpHeaders { ContentType = entry.ContentType.Trim() },
Metadata = new Dictionary<string, string>
{
["messageId"] = entry.Id.Trim(),
["bodySize"] = entry.BodySize.ToString(),
["mongoExpiresAt"] = entry.ExpiresAt.ToString("O")
}
};
_ = await blobClient.UploadAsync(stream, options, cancellationToken).ConfigureAwait(false);
return;
}
catch (Exception ex) when (attempt < MaxRetries && !cancellationToken.IsCancellationRequested)
{
var delay = TimeSpan.FromSeconds(Math.Pow(2, attempt - 1));
logger.LogWarning(ex, "Failed to upload blob {BlobId} (attempt {Attempt}/{MaxRetries}), retrying in {Delay}s",
entry.Id, attempt, MaxRetries, delay.TotalSeconds);
await Task.Delay(delay, cancellationToken).ConfigureAwait(false);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to upload blob {BlobId} after {MaxRetries} attempts", entry.Id, MaxRetries);
}
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,207 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Channels;
using System.Threading.Tasks;
using Microsoft.Extensions.Hosting;
using Microsoft.Extensions.Logging;

/// <summary>
/// Base class for body storage writers that batch write operations for improved performance.
/// The batcher assembles batches of entries from the input channel, and processes them in parallel using a configurable number of writer tasks.
/// </summary>
abstract class BatchedBodyStorageWriter<TEntry>(
Channel<TEntry> channel,
MongoSettings settings,
ILogger logger)
: BackgroundService
{
readonly int BatchSize = settings.BodyWriterBatchSize;
readonly int ParallelWriters = settings.BodyWriterParallelWriters;
readonly TimeSpan BatchTimeout = settings.BodyWriterBatchTimeout;
const int BacklogWarningThreshold = 5_000;
long totalWritten;
DateTime lastBacklogWarning;
DateTime lastBackpressureWarning;

readonly Channel<List<TEntry>> batchChannel = Channel.CreateBounded<List<TEntry>>(
new BoundedChannelOptions(settings.BodyWriterParallelWriters * 2)
{
SingleReader = false,
SingleWriter = true,
AllowSynchronousContinuations = false,
FullMode = BoundedChannelFullMode.Wait
});

protected ChannelWriter<TEntry> WriteChannel => channel.Writer;

protected async ValueTask WriteToChannelAsync(TEntry entry, CancellationToken cancellationToken)
{
if (channel.Writer.TryWrite(entry))
{
return;
}

if (DateTime.UtcNow - lastBackpressureWarning > TimeSpan.FromSeconds(10))
{
lastBackpressureWarning = DateTime.UtcNow;
logger.LogWarning("{WriterName} channel is full (backlog: {Backlog}). Body writes are blocking ingestion until the writer catches up",
WriterName, channel.Reader.Count);
}

await channel.Writer.WriteAsync(entry, cancellationToken).ConfigureAwait(false);
}

protected abstract string WriterName { get; }

protected abstract Task FlushBatchAsync(List<TEntry> batch, CancellationToken cancellationToken);

protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
logger.LogInformation("{WriterName} started ({Writers} writers, batch size {BatchSize})", WriterName, ParallelWriters, BatchSize);

var assemblerTask = Task.Run(() => BatchAssemblerLoop(stoppingToken), CancellationToken.None);

var writerTasks = new Task[ParallelWriters];
for (var i = 0; i < ParallelWriters; i++)
{
var writerId = i;
writerTasks[i] = Task.Run(() => WriterLoop(writerId, stoppingToken), CancellationToken.None);
}

try
{
await Task.WhenAll(writerTasks.Append(assemblerTask)).ConfigureAwait(false);
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Expected during shutdown
}

logger.LogInformation("{WriterName} stopped", WriterName);
}

async Task BatchAssemblerLoop(CancellationToken stoppingToken)
{
var batch = new List<TEntry>(BatchSize);

try
{
while (await channel.Reader.WaitToReadAsync(stoppingToken).ConfigureAwait(false))
{
while (batch.Count < BatchSize && channel.Reader.TryRead(out var entry))
{
batch.Add(entry);
}

if (batch.Count > 0 && batch.Count < BatchSize)
{
using var timeoutCts = CancellationTokenSource.CreateLinkedTokenSource(stoppingToken);
timeoutCts.CancelAfter(BatchTimeout);
try
{
while (batch.Count < BatchSize)
{
if (!await channel.Reader.WaitToReadAsync(timeoutCts.Token).ConfigureAwait(false))
{
break;
}

while (batch.Count < BatchSize && channel.Reader.TryRead(out var entry))
{
batch.Add(entry);
}
}
}
catch (OperationCanceledException) when (!stoppingToken.IsCancellationRequested)
{
// Timeout expired - dispatch partial batch
}
}

if (batch.Count > 0)
{
await batchChannel.Writer.WriteAsync(batch, stoppingToken).ConfigureAwait(false);
batch = [];
}
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Shutting down - drain channel into remaining batches
while (channel.Reader.TryRead(out var entry))
{
batch.Add(entry);

if (batch.Count >= BatchSize)
{
await batchChannel.Writer.WriteAsync(batch, CancellationToken.None).ConfigureAwait(false);
batch = [];
}
}

if (batch.Count > 0)
{
await batchChannel.Writer.WriteAsync(batch, CancellationToken.None).ConfigureAwait(false);
}
}
finally
{
batchChannel.Writer.Complete();
}
}

async Task WriterLoop(int writerId, CancellationToken stoppingToken)
{
logger.LogDebug("{WriterName} writer {WriterId} started", WriterName, writerId);

try
{
// Use CancellationToken.None for FlushBatch so in-flight writes complete
// during shutdown. ReadAllAsync(stoppingToken) controls when we stop
// accepting new batches.
await foreach (var batch in batchChannel.Reader.ReadAllAsync(stoppingToken).ConfigureAwait(false))
{
await FlushBatchAsync(batch, CancellationToken.None).ConfigureAwait(false);
ReportBatchWritten(batch.Count);
}
}
catch (OperationCanceledException) when (stoppingToken.IsCancellationRequested)
{
// Expected during shutdown
}

// Drain any remaining batches after the assembler completes the channel
while (batchChannel.Reader.TryRead(out var batch))
{
try
{
await FlushBatchAsync(batch, CancellationToken.None).ConfigureAwait(false);
ReportBatchWritten(batch.Count);
}
catch (Exception ex)
{
logger.LogError(ex, "Failed to flush {Count} entries during shutdown", batch.Count);
}
}

logger.LogDebug("{WriterName} writer {WriterId} stopped", WriterName, writerId);
}

void ReportBatchWritten(int batchCount)
{
totalWritten += batchCount;
var backlog = channel.Reader.Count;
logger.LogDebug("{WriterName}: batch={BatchCount}, total={TotalWritten}, backlog={Backlog}",
WriterName, batchCount, totalWritten, backlog);
if (backlog > BacklogWarningThreshold && DateTime.UtcNow - lastBacklogWarning > TimeSpan.FromSeconds(10))
{
lastBacklogWarning = DateTime.UtcNow;
logger.LogWarning("{WriterName} is not keeping up with ingestion. Channel backlog: {Backlog} items", WriterName, backlog);
}
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@
namespace ServiceControl.Audit.Persistence.MongoDB.BodyStorage
{
using System;

readonly struct BodyWriteItem
{
public required string Id { get; init; }
public required string ContentType { get; init; }
public required int BodySize { get; init; }
public required byte[] Body { get; init; }
public string TextBody { get; init; }
public required DateTime ExpiresAt { get; init; }
}
}
Loading
Loading