Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
35 changes: 29 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -71,21 +71,39 @@ BufferQueue supports two consumption modes: pull mode and push mode.
Pull mode consumer example:

```csharp

builder.Services.AddBufferQueue(options =>
builder.Services.AddBufferQueue(bufferOptionsBuilder =>
{
options.UseMemory(bufferOptions =>
bufferOptionsBuilder
.UseMemory(memoryBufferOptionsBuilder =>
{
// Each pair of Topic and data type corresponds to an independent buffer, and partitionNumber can be set
bufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6);
bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4);
bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8);
memoryBufferOptionsBuilder
.AddTopic<Foo>(options =>
{
options.TopicName = "topic-foo1";
options.PartitionNumber = 6;
})
.AddTopic<Foo>(options =>
{
options.TopicName = "topic-foo2";
options.PartitionNumber = 4;
})
.AddTopic<Bar>(options =>
{
options.TopicName = "topic-bar";
options.PartitionNumber = 8;
// 可以设置缓冲区的最大容量
options.BoundedCapacity = 100_000;
});
})
// Add push mode consumers,
// scan the specified assembly for classes marked with
// BufferPushCustomerAttribute and register them as push mode consumers
.AddPushCustomers(typeof(Program).Assembly);
});

// Pull mode consumers can be implemented as HostedService.
builder.Services.AddHostedService<Foo1PullConsumerHostService>();
```

Pull mode consumer example:
Expand Down Expand Up @@ -197,6 +215,9 @@ Producer example:

Get the specified producer through the IBufferQueue service and send the data by calling the ProduceAsync method.

If bounded capacity is set, when the buffer is full, the ProduceAsync method will discard the data and throw a MemoryBufferQueueFullException.
You can use the TryProduceAsync method to check if the data was successfully sent.

```csharp
[ApiController]
[Route("/api/[controller]")]
Expand All @@ -223,6 +244,8 @@ public class TestController(IBufferQueue bufferQueue) : ControllerBase
{
var producer = bufferQueue.GetProducer<Bar>("topic-bar");
await producer.ProduceAsync(bar);
// TryProduceAsync will return a boolean indicating whether the data was successfully sent.
// bool success = await producer.TryProduceAsync(bar);
return Ok();
}
}
Expand Down
33 changes: 25 additions & 8 deletions README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,15 +64,30 @@ dotnet add package BufferQueue
BufferQueue 支持两种消费模式:pull 模式和 push 模式。

```csharp

builder.Services.AddBufferQueue(options =>
builder.Services.AddBufferQueue(bufferOptionsBuilder =>
{
options.UseMemory(bufferOptions =>
bufferOptionsBuilder
.UseMemory(memoryBufferOptionsBuilder =>
{
// 每一对 Topic 和数据类型对应一个独立的缓冲区,可以设置 partitionNumber
bufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6);
bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4);
bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8);
memoryBufferOptionsBuilder
.AddTopic<Foo>(options =>
{
options.TopicName = "topic-foo1";
options.PartitionNumber = 6;
})
.AddTopic<Foo>(options =>
{
options.TopicName = "topic-foo2";
options.PartitionNumber = 4;
})
.AddTopic<Bar>(options =>
{
options.TopicName = "topic-bar";
options.PartitionNumber = 8;
// 可以设置缓冲区的最大容量
options.BoundedCapacity = 100_000;
});
})
// 添加 push 模式的消费者
// 扫描指定程序集中的标记了 BufferPushCustomerAttribute 的类,
Expand Down Expand Up @@ -141,9 +156,7 @@ push consumer 会被注册到 DI 容器中,可以通过构造函数注入其

BufferPushCustomerAttribute 中的 concurrency 参数用于设置 push consumer 的消费并发数,对应 pull consumer 的 consumerNumber。


```csharp

[BufferPushCustomer(
topicName: "topic-foo2",
groupName: "group-foo2",
Expand Down Expand Up @@ -194,6 +207,8 @@ Producer 示例:

通过 IBufferQueue 获取到指定的 Producer,然后调用 ProduceAsync 方法发送数据。

如果设置了 BoundedCapacity,当缓冲区满时,ProduceAsync 方法会丢弃数据并抛出 MemoryBufferQueueFullException。可以使用 TryProduceAsync 方法来检查数据是否成功发送。

```csharp
[ApiController]
[Route("/api/[controller]")]
Expand All @@ -220,6 +235,8 @@ public class TestController(IBufferQueue bufferQueue) : ControllerBase
{
var producer = bufferQueue.GetProducer<Bar>("topic-bar");
await producer.ProduceAsync(bar);
// TryProduceAsync 会返回一个布尔值,表示数据是否成功发送
// bool success = await producer.TryProduceAsync(bar);
return Ok();
}
}
Expand Down
2 changes: 2 additions & 0 deletions samples/WebAPI/Controllers/TestController.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ public async Task<IActionResult> PostBar([FromBody] Bar bar)
{
var producer = bufferQueue.GetProducer<Bar>("topic-bar");
await producer.ProduceAsync(bar);
// TryProduceAsync can be used if you want to check if the item was produced successfully
// bool success = await producer.TryProduceAsync(bar);
return Ok();
}
}
25 changes: 20 additions & 5 deletions samples/WebAPI/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,28 @@
builder.Services.AddSwaggerGen();

// Configure the BufferQueue
builder.Services.AddBufferQueue(options =>
builder.Services.AddBufferQueue(bufferOptionsBuilder =>
{
options.UseMemory(bufferOptions =>
bufferOptionsBuilder
.UseMemory(memoryBufferOptionsBuilder =>
{
bufferOptions.AddTopic<Foo>("topic-foo1", partitionNumber: 6);
bufferOptions.AddTopic<Foo>("topic-foo2", partitionNumber: 4);
bufferOptions.AddTopic<Bar>("topic-bar", partitionNumber: 8);
memoryBufferOptionsBuilder
.AddTopic<Foo>(options =>
{
options.TopicName = "topic-foo1";
options.PartitionNumber = 6;
})
.AddTopic<Foo>(options =>
{
options.TopicName = "topic-foo2";
options.PartitionNumber = 4;
})
.AddTopic<Bar>(options =>
{
options.TopicName = "topic-bar";
options.PartitionNumber = 8;
options.BoundedCapacity = 100_000; // Set a bounded capacity for the Bar topic
});
})
.AddPushCustomers(typeof(Program).Assembly);
});
Expand Down
8 changes: 3 additions & 5 deletions src/BufferQueue/BufferPullConsumerOptions.cs
Original file line number Diff line number Diff line change
@@ -1,17 +1,15 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

using System.Diagnostics.CodeAnalysis;

namespace BufferQueue;

public class BufferPullConsumerOptions
{
public string TopicName { get; init; } = string.Empty;
public required string TopicName { get; init; }

public string GroupName { get; init; } = string.Empty;
public required string GroupName { get; init; }

public bool AutoCommit { get; init; } = false;
public bool AutoCommit { get; init; }

public int BatchSize { get; init; } = 100;
}
2 changes: 1 addition & 1 deletion src/BufferQueue/BufferQueue.csproj
Original file line number Diff line number Diff line change
@@ -1,10 +1,10 @@
<Project Sdk="Microsoft.NET.Sdk">

<PropertyGroup>
<TargetFramework>net8.0</TargetFramework>
<Nullable>enable</Nullable>
<LangVersion>latest</LangVersion>
<EnforceCodeStyleInBuild>true</EnforceCodeStyleInBuild>
<TargetFrameworks>net6.0;net7.0;net8.0</TargetFrameworks>
</PropertyGroup>

<ItemGroup>
Expand Down
2 changes: 2 additions & 0 deletions src/BufferQueue/IBufferProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,6 @@ public interface IBufferProducer<in T>
string TopicName { get; }

ValueTask ProduceAsync(T item);

ValueTask<bool> TryProduceAsync(T item);
}
9 changes: 6 additions & 3 deletions src/BufferQueue/Memory/MemoryBuferQueue.cs
Original file line number Diff line number Diff line change
Expand Up @@ -19,17 +19,20 @@ internal sealed class MemoryBufferQueue<T> : IBufferQueue<T>
private readonly object _consumersLock;
private readonly Dictionary<string /* GroupName */, List<MemoryBufferConsumer<T>>> _consumers;

public MemoryBufferQueue(string topicName, int partitionNumber)
public MemoryBufferQueue(MemoryBufferQueueOptions options)
{
var topicName = options.TopicName!;
var partitionNumber = options.PartitionNumber;

TopicName = topicName;
_partitionNumber = partitionNumber;
_partitions = new MemoryBufferPartition<T>[partitionNumber];
for (var i = 0; i < partitionNumber; i++)
{
_partitions[i] = new MemoryBufferPartition<T>(i);
_partitions[i] = new MemoryBufferPartition<T>(i, options.SegmentSize);
}

_producer = new MemoryBufferProducer<T>(topicName, _partitions);
_producer = new MemoryBufferProducer<T>(options, _partitions);

_consumers = new Dictionary<string, List<MemoryBufferConsumer<T>>>();
_consumersLock = new object();
Expand Down

This file was deleted.

2 changes: 1 addition & 1 deletion src/BufferQueue/Memory/MemoryBufferConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ internal sealed class MemoryBufferConsumer<T> : IBufferPullConsumer<T>
public MemoryBufferConsumer(BufferPullConsumerOptions options)
{
_options = options;
_assignedPartitions = Array.Empty<MemoryBufferPartition<T>>();
_assignedPartitions = [];
_pendingDataValueTaskSource = new PendingDataValueTaskSource<MemoryBufferPartition<T>>();
_pendingDataVersion = 0;
_pendingDataLock = new ReaderWriterLockSlim();
Expand Down
15 changes: 0 additions & 15 deletions src/BufferQueue/Memory/MemoryBufferOptions.cs

This file was deleted.

38 changes: 38 additions & 0 deletions src/BufferQueue/Memory/MemoryBufferOptionsBuilder.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

using System;
using Microsoft.Extensions.DependencyInjection;

namespace BufferQueue.Memory;

public class MemoryBufferOptionsBuilder(IServiceCollection services)
{
public MemoryBufferOptionsBuilder AddTopic<T>(
Action<MemoryBufferQueueOptions> configure)
where T : notnull
{
ArgumentNullException.ThrowIfNull(configure);

var options = new MemoryBufferQueueOptions();
configure(options);

var topicName = options.TopicName;
var partitionNumber = options.PartitionNumber;

if (string.IsNullOrWhiteSpace(topicName))
{
throw new ArgumentException("Topic name cannot be null or whitespace.", nameof(options.TopicName));
}

if (partitionNumber <= 0)
{
throw new ArgumentOutOfRangeException(nameof(options.PartitionNumber),
"Partition number must be greater than zero.");
}

services.AddKeyedSingleton<IBufferQueue<T>>(
topicName, new MemoryBufferQueue<T>(options));
return this;
}
}
23 changes: 23 additions & 0 deletions src/BufferQueue/Memory/MemoryBufferOptionsBuilderExtensions.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
// Licensed to the .NET Core Community under one or more agreements.
// The .NET Core Community licenses this file to you under the MIT license.

using System;
using BufferQueue.Memory;

namespace BufferQueue;

public static class MemoryBufferOptionsBuilderExtensions
{
public static BufferOptionsBuilder UseMemory(
this BufferOptionsBuilder builder,
Action<MemoryBufferOptionsBuilder> configure)
{
ArgumentNullException.ThrowIfNull(builder);
ArgumentNullException.ThrowIfNull(configure);

var options = new MemoryBufferOptionsBuilder(builder.Services);
configure(options);

return builder;
}
}
10 changes: 5 additions & 5 deletions src/BufferQueue/Memory/MemoryBufferPartition.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ namespace BufferQueue.Memory;
internal sealed class MemoryBufferPartition<T>
{
// internal for test
internal readonly int _segmentLength;
internal readonly int _segmentSize;

private volatile MemoryBufferSegment<T> _head;
private volatile MemoryBufferSegment<T> _tail;
Expand All @@ -26,11 +26,11 @@ internal sealed class MemoryBufferPartition<T>

private readonly object _createSegmentLock;

public MemoryBufferPartition(int id, int segmentLength = 1024)
public MemoryBufferPartition(int id, int segmentSize)
{
_segmentLength = segmentLength;
_segmentSize = segmentSize;
PartitionId = id;
_head = _tail = new MemoryBufferSegment<T>(_segmentLength, default);
_head = _tail = new MemoryBufferSegment<T>(_segmentSize, default);
_consumerReaders = new ConcurrentDictionary<string, Reader>();
_consumers = new HashSet<MemoryBufferConsumer<T>>();

Expand Down Expand Up @@ -79,7 +79,7 @@ public void Enqueue(T item)
var newSegmentStartOffset = tail.EndOffset + 1;
var newSegment = TryRecycleSegment(newSegmentStartOffset, out var recycledSegment)
? recycledSegment
: new MemoryBufferSegment<T>(_segmentLength, newSegmentStartOffset);
: new MemoryBufferSegment<T>(_segmentSize, newSegmentStartOffset);
tail.NextSegment = newSegment;
_tail = newSegment;
}
Expand Down
Loading