Skip to content

Commit b8af0b6

Browse files
committed
Add settings for outbox processor (PollingIntervalSeconds and MaxEventsPerCycle).
1 parent 587fcf2 commit b8af0b6

7 files changed

Lines changed: 61 additions & 14 deletions

File tree

src/OpenDDD/API/Extensions/OpenDddServiceCollectionExtensions.cs

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,7 @@
5050
using OpenDDD.Infrastructure.TransactionalOutbox.EfCore;
5151
using OpenDDD.Infrastructure.TransactionalOutbox.OpenDdd.InMemory;
5252
using OpenDDD.Infrastructure.TransactionalOutbox.OpenDdd.Postgres;
53+
using OpenDDD.Infrastructure.TransactionalOutbox.Options;
5354
using OpenDDD.Infrastructure.Utils;
5455

5556
namespace OpenDDD.API.Extensions
@@ -183,6 +184,11 @@ private static void AddMessaging(this IServiceCollection services, OpenDddOption
183184

184185
private static void AddTransactionalOutbox(this IServiceCollection services)
185186
{
187+
services.Configure<OpenDddOutboxProcessorOptions>(options =>
188+
{
189+
var config = services.BuildServiceProvider().GetRequiredService<IConfiguration>();
190+
config.GetSection("OpenDDD:OutboxProcessor").Bind(options);
191+
});
186192
services.AddHostedService<OutboxProcessor>();
187193
}
188194

src/OpenDDD/Infrastructure/TransactionalOutbox/EfCore/EfCoreOutboxRepository.cs

Lines changed: 9 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -40,14 +40,18 @@ public async Task SaveEventAsync<TEvent>(TEvent @event, CancellationToken ct) wh
4040
await _session.DbContext.SaveChangesAsync(ct);
4141
}
4242

43-
public async Task<List<OutboxEntry>> GetPendingEventsAsync(CancellationToken ct)
43+
public async Task<List<OutboxEntry>> GetPendingEventsAsync(int? maxCount = null, CancellationToken ct = default)
4444
{
4545
await _session.OpenConnectionAsync(ct);
46-
47-
return await _session.DbContext.Set<OutboxEntry>()
46+
47+
IQueryable<OutboxEntry> query = _session.DbContext.Set<OutboxEntry>()
4848
.Where(e => e.ProcessedAt == null)
49-
.OrderBy(e => e.CreatedAt)
50-
.ToListAsync(ct);
49+
.OrderBy(e => e.CreatedAt);
50+
51+
if (maxCount.HasValue)
52+
query = query.Take(maxCount.Value);
53+
54+
return await query.ToListAsync(ct);
5155
}
5256

5357
public async Task MarkEventAsProcessedAsync(Guid eventId, CancellationToken ct)

src/OpenDDD/Infrastructure/TransactionalOutbox/IOutboxRepository.cs

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -5,7 +5,7 @@ namespace OpenDDD.Infrastructure.TransactionalOutbox
55
public interface IOutboxRepository
66
{
77
Task SaveEventAsync<TEvent>(TEvent @event, CancellationToken ct) where TEvent : IEvent;
8-
Task<List<OutboxEntry>> GetPendingEventsAsync(CancellationToken ct);
8+
Task<List<OutboxEntry>> GetPendingEventsAsync(int? maxCount = null, CancellationToken ct = default);
99
Task MarkEventAsProcessedAsync(Guid eventId, CancellationToken ct);
1010
}
1111
}

src/OpenDDD/Infrastructure/TransactionalOutbox/OpenDdd/InMemory/InMemoryOpenDddOutboxRepository.cs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -42,10 +42,16 @@ public async Task SaveEventAsync<TEvent>(TEvent @event, CancellationToken ct) wh
4242
_logger.LogDebug("Added event to in-memory outbox: {EventName}", outboxEntry.EventName);
4343
}
4444

45-
public async Task<List<OutboxEntry>> GetPendingEventsAsync(CancellationToken ct)
45+
public async Task<List<OutboxEntry>> GetPendingEventsAsync(int? maxCount = null, CancellationToken ct = default)
4646
{
4747
var entries = await _session.SelectAllAsync<OutboxEntry>(OutboxTable, ct);
48-
return entries.Where(entry => entry.ProcessedAt == null).ToList();
48+
var pending = entries
49+
.Where(entry => entry.ProcessedAt == null)
50+
.OrderBy(entry => entry.CreatedAt);
51+
52+
return maxCount.HasValue
53+
? pending.Take(maxCount.Value).ToList()
54+
: pending.ToList();
4955
}
5056

5157
public async Task MarkEventAsProcessedAsync(Guid eventId, CancellationToken ct)

src/OpenDDD/Infrastructure/TransactionalOutbox/OpenDdd/Postgres/PostgresOpenDddOutboxRepository.cs

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,13 +43,23 @@ INSERT INTO outbox_entries (id, event_type, event_name, payload, created_at, pro
4343
await cmd.ExecuteNonQueryAsync(ct);
4444
}
4545

46-
public async Task<List<OutboxEntry>> GetPendingEventsAsync(CancellationToken ct)
46+
public async Task<List<OutboxEntry>> GetPendingEventsAsync(int? maxCount = null, CancellationToken ct = default)
4747
{
4848
await _session.OpenConnectionAsync(ct);
49+
50+
var query = @"
51+
SELECT id, event_type, event_name, payload, created_at, processed_at
52+
FROM outbox_entries
53+
WHERE processed_at IS NULL
54+
ORDER BY created_at";
4955

50-
const string query = "SELECT id, event_type, event_name, payload, created_at, processed_at FROM outbox_entries WHERE processed_at IS NULL ORDER BY created_at;";
56+
query = maxCount.HasValue ? query + " LIMIT @maxCount;" : query + ";";
5157

5258
await using var cmd = new NpgsqlCommand(query, _session.Connection, _session.Transaction);
59+
60+
if (maxCount.HasValue)
61+
cmd.Parameters.AddWithValue("maxCount", maxCount.Value);
62+
5363
await using var reader = await cmd.ExecuteReaderAsync(ct);
5464

5565
var events = new List<OutboxEntry>();
Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,8 @@
1+
namespace OpenDDD.Infrastructure.TransactionalOutbox.Options
2+
{
3+
public class OpenDddOutboxProcessorOptions
4+
{
5+
public int PollingIntervalSeconds { get; set; } = 3;
6+
public int? MaxEventsPerCycle { get; set; } = null;
7+
}
8+
}

src/OpenDDD/Infrastructure/TransactionalOutbox/OutboxProcessor.cs

Lines changed: 17 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@
77
using OpenDDD.Domain.Model.Helpers;
88
using OpenDDD.Infrastructure.Events;
99
using OpenDDD.Infrastructure.Persistence.DatabaseSession;
10+
using OpenDDD.Infrastructure.TransactionalOutbox.Options;
1011

1112
namespace OpenDDD.Infrastructure.TransactionalOutbox
1213
{
@@ -16,24 +17,29 @@ public class OutboxProcessor : BackgroundService
1617
private readonly StartupHostedService _startupService;
1718
private readonly ILogger<OutboxProcessor> _logger;
1819
private readonly OpenDddOptions _options;
20+
private readonly OpenDddOutboxProcessorOptions _processorOptions;
1921

2022
public OutboxProcessor(
2123
IServiceScopeFactory serviceScopeFactory,
2224
StartupHostedService startupService,
2325
ILogger<OutboxProcessor> logger,
24-
IOptions<OpenDddOptions> options)
26+
IOptions<OpenDddOptions> options,
27+
IOptions<OpenDddOutboxProcessorOptions> processorOptions)
2528
{
2629
_serviceScopeFactory = serviceScopeFactory ?? throw new ArgumentNullException(nameof(serviceScopeFactory));
2730
_startupService = startupService;
2831
_logger = logger ?? throw new ArgumentNullException(nameof(logger));
2932
_options = options.Value ?? throw new ArgumentNullException(nameof(options));
33+
_processorOptions = processorOptions.Value ?? throw new ArgumentNullException(nameof(processorOptions));
3034
}
3135

3236
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
3337
{
3438
_logger.LogInformation("Outbox Processor started.");
39+
_logger.LogDebug("Polling interval: {PollingIntervalSeconds}s, Max events per cycle: {MaxEventsPerCycle}",
40+
_processorOptions.PollingIntervalSeconds,
41+
_processorOptions.MaxEventsPerCycle);
3542

36-
// Ensure database setup is complete
3743
_logger.LogInformation("Waiting for database setup to complete before starting outbox processing...");
3844
await _startupService.StartupCompleted;
3945
_logger.LogInformation("Database setup completed. Starting outbox processing...");
@@ -56,7 +62,12 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
5662

5763
await databaseSession.OpenConnectionAsync(stoppingToken);
5864

59-
var pendingEvents = await outboxRepository.GetPendingEventsAsync(stoppingToken);
65+
var pendingEvents = await outboxRepository
66+
.GetPendingEventsAsync(_processorOptions.MaxEventsPerCycle, stoppingToken);
67+
68+
_logger.Log(
69+
pendingEvents.Any() ? LogLevel.Debug : LogLevel.Trace,
70+
"Fetched {EventCount} pending events.", pendingEvents.Count);
6071

6172
foreach (var outboxEntry in pendingEvents)
6273
{
@@ -72,6 +83,8 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
7283

7384
await messagingProvider.PublishAsync(topic, outboxEntry.Payload, stoppingToken);
7485
await outboxRepository.MarkEventAsProcessedAsync(outboxEntry.Id, stoppingToken);
86+
87+
_logger.LogDebug("Successfully published and marked event {EventId} as processed.", outboxEntry.Id);
7588
}
7689
catch (Exception ex)
7790
{
@@ -84,7 +97,7 @@ protected override async Task ExecuteAsync(CancellationToken stoppingToken)
8497
_logger.LogError(ex, "Unexpected error in Outbox Processor.");
8598
}
8699

87-
await Task.Delay(TimeSpan.FromSeconds(3), stoppingToken);
100+
await Task.Delay(TimeSpan.FromSeconds(_processorOptions.PollingIntervalSeconds), stoppingToken);
88101
}
89102

90103
_logger.LogInformation("Outbox Processor stopping.");

0 commit comments

Comments
 (0)