Skip to content

SimpleOutbox - A simple, standalone Transactional Outbox library for .NET applications using EF Core and PostgreSQL

Notifications You must be signed in to change notification settings

alexandrereyes/SimpleOutbox

Folders and files

NameName
Last commit message
Last commit date

Latest commit

 

History

23 Commits
 
 
 
 
 
 
 
 
 
 
 
 

Repository files navigation

SimpleOutbox

A simple, standalone Transactional Outbox library for .NET applications using Entity Framework Core and PostgreSQL.

NuGet NuGet Downloads .NET PostgreSQL License GitHub


Why SimpleOutbox?

When you need to update your database and publish a message/event, you face the dual-write problem. If the message publish fails after the database commit, you have inconsistent state. SimpleOutbox solves this by storing messages in the same database transaction as your domain changes, then reliably delivering them asynchronously.

Database Transaction
├── Update Order status = "Paid"
├── Insert OutboxMessage (OrderPaidEvent)  ← Same transaction
└── Commit

Background Service (async)
├── SELECT ... FOR UPDATE SKIP LOCKED
├── Deserialize & Process message
└── Mark as Completed

Features

Feature Description
Transactional Outbox Messages persisted atomically with your domain changes
FIFO per Partition Messages with same PartitionKey processed in order
Concurrency Control Configurable MaxConcurrency per partition
Scheduling Delay message delivery with ScheduleAsync, cancel with CancelAsync
Multi-Instance Safe PostgreSQL FOR UPDATE SKIP LOCKED for distributed locking
Retry with Backoff Exponential backoff on failures
Cleanup Service Automatic removal of old processed messages
Unified Handler Single IOutboxMessageHandler for all message types
BDD/Integration Tests IOutboxTestHarness and IOutboxTestHarnessFactory for parallel test isolation

Quick Start

1. Install

dotnet add package SimpleOutbox

2. Configure

// Program.cs
builder.Services.AddSimpleOutbox<MyDbContext>(options =>
{
    options.Schema = "outbox";
    options.MaxRetries = 5;
    options.PollingInterval = TimeSpan.FromSeconds(5);
});

// Register your handler
builder.Services.AddScoped<IOutboxMessageHandler, MyOutboxHandler>();

3. Apply EF Configuration

public class MyDbContext : DbContext
{
    protected override void OnModelCreating(ModelBuilder modelBuilder)
    {
        modelBuilder.ApplyOutboxConfiguration();
    }
}

4. Use It

public class OrderService(MyDbContext db, IOutboxService outbox)
{
    public async Task CreateOrderAsync(Order order, CancellationToken ct)
    {
        db.Orders.Add(order);
        
        // Enqueue event in same transaction (use order.Id for idempotency)
        await outbox.EnqueueAsync(order.Id, new OrderCreatedEvent(order.Id), ct);
        
        // Both committed atomically
        await db.SaveChangesAsync(ct);
    }
}

5. Implement Handler

public class MyOutboxHandler(IMediator mediator) : IOutboxMessageHandler
{
    public async Task HandleAsync(object message, Type messageType, CancellationToken ct)
    {
        // Route to your message processing infrastructure (e.g., MediatR, custom dispatcher)
        await mediator.Send(message, ct);
    }
}

API Overview

IOutboxService

public interface IOutboxService
{
    // Enqueue for immediate processing (messageId for idempotency)
    Task EnqueueAsync<T>(Guid messageId, T message, CancellationToken ct = default);
    
    // Schedule for future processing (messageId for idempotency and cancellation)
    Task ScheduleAsync<T>(Guid messageId, T message, TimeSpan delay, CancellationToken ct = default);
    Task ScheduleAsync<T>(Guid messageId, T message, DateTime scheduledFor, CancellationToken ct = default);
    
    // Cancel a scheduled message
    Task<bool> CancelAsync(Guid messageId, CancellationToken ct = default);
}

IOutboxMessageHandler

public interface IOutboxMessageHandler
{
    Task HandleAsync(object message, Type messageType, CancellationToken ct);
}

IPartitionedMessage

public record OrderEvent(Guid OrderId) : IPartitionedMessage
{
    public string PartitionKey => $"order:{OrderId}";
    public int MaxConcurrency => 1; // Strict FIFO
}

Architecture

┌─────────────────────────────────────────────────────────────┐
│                     Your Application                         │
│                                                              │
│   dbContext.Add(order);                                      │
│   await outbox.EnqueueAsync(messageId, new OrderCreatedEvent(...)); │
│   await dbContext.SaveChangesAsync();  // Atomic commit      │
└──────────────────────────┬──────────────────────────────────┘
                           │
                           ▼
┌─────────────────────────────────────────────────────────────┐
│                  OutboxDeliveryService                       │
│                   (BackgroundService)                        │
│                                                              │
│   1. SELECT ... FOR UPDATE SKIP LOCKED                       │
│   2. Check partition concurrency limits                      │
│   3. Deserialize and call IOutboxMessageHandler              │
│   4. Mark as Completed or schedule retry                     │
│   5. Wait for notification or polling interval               │
└─────────────────────────────────────────────────────────────┘

Database Schema

SimpleOutbox creates a single table in your configured schema (default: outbox):

CREATE TABLE outbox.outbox_messages (
    id              BIGINT GENERATED ALWAYS AS IDENTITY PRIMARY KEY,
    message_id      UUID NOT NULL UNIQUE,
    message_type    VARCHAR(500) NOT NULL,
    payload         TEXT NOT NULL,
    partition_key   VARCHAR(200),
    max_concurrency INT DEFAULT 1,
    created_at      TIMESTAMP DEFAULT NOW(),
    scheduled_for   TIMESTAMP,
    processed_at    TIMESTAMP,
    retry_count     INT DEFAULT 0,
    last_error      TEXT,
    status          VARCHAR(20) DEFAULT 'Pending'
);

Indexes are automatically created for:

  • Pending message lookup
  • Partition FIFO ordering
  • Concurrency control
  • Cleanup queries
  • Message cancellation

Testing Support

SimpleOutbox provides comprehensive testing support for BDD/integration tests.

Basic Test Setup

// Register test support after AddSimpleOutbox
services.AddSimpleOutboxTestSupport<MyDbContext>();

// Use the harness in tests
var harness = serviceProvider.GetRequiredService<IOutboxTestHarness>();
var messageId = await harness.EnqueueAsync(new OrderCreatedEvent(orderId));
var result = await harness.WaitForMessageAsync(messageId, TimeSpan.FromSeconds(10));
result.Success.ShouldBeTrue();

Parallel Test Isolation (xUnit + ReqnRoll)

For parallel BDD tests, use IOutboxTestHarnessFactory to create scope-bound harnesses:

// DependencyInjection - simple setup
services.AddSimpleOutboxTestSupport<MyDbContext>();
services.AddScoped<FakeEmailService>();  // Scoped fakes work naturally!

// [BeforeScenario]
var scope = serviceProvider.CreateScope();
var factory = serviceProvider.GetRequiredService<IOutboxTestHarnessFactory>();
var harness = factory.CreateForScope(scope);
await harness.StartAsync();
scenarioContext.Set(harness);
scenarioContext.Set(scope);

// In test steps
var harness = scenarioContext.Get<IScopedOutboxTestHarness>();
await harness.EnqueueAsync(command);

// Assertions - fakes from the same scope just work!
var fake = scope.ServiceProvider.GetRequiredService<FakeEmailService>();
fake.SentEmails.ShouldContain(e => e.To == "customer@example.com");

// [AfterScenario]
await harness.DisposeAsync();
scope.Dispose();

Alternative: ScopeResolver

For more control, use ScopeResolver with a manual registry:

var scopeRegistry = new ConcurrentDictionary<string, IServiceScope>();

services.AddSimpleOutboxTestSupport<MyDbContext>(options =>
{
    options.ScopeResolver = (messageId, partitionKey) =>
        partitionKey != null && scopeRegistry.TryGetValue(partitionKey, out var scope)
            ? scope : null;
});

Migration from v1.x

Version 2.0 simplifies the API by removing typed handlers, middleware, and observers. If you were using these features:

v1.x Feature v2.0 Replacement
IMessageHandler<T> Use IOutboxMessageHandler with pattern matching or a dispatcher
IMessageMiddleware Implement cross-cutting concerns in your IOutboxMessageHandler
IMessageObserver Use OpenTelemetry or your logging infrastructure directly
options.AddHandler<T>() Register IOutboxMessageHandler directly in DI
options.AddHandlersFromAssembly() Not needed - single handler approach
options.UseMiddleware<T>() Not needed
options.AddObserver<T>() Not needed

Requirements

  • .NET 10.0+
  • PostgreSQL 14+
  • Entity Framework Core 10.0+

License

MIT License - see LICENSE for details.

About

SimpleOutbox - A simple, standalone Transactional Outbox library for .NET applications using EF Core and PostgreSQL

Topics

Resources

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages