Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ Under the storage path specified by the `StorePath` configuration option, Mapped
│ └── ...
├── offset
│ ├── producer.offset
│ ├── producer.confirmed.offset
│ └── consumer.offset
```

Expand All @@ -51,6 +52,9 @@ Details:
- The `commitlog` directory stores the actual Segment files.

- The `offset` directory stores the offset files for both the producer and the consumer.
- The `producer.offset` file is used to record the write offset of the producer.
- The `producer.confirmed.offset` file is used to record the offset that the producer has confirmed to be written to disk. This is used so that after a system abnormal restart, the producer can continue writing data from this offset, avoiding the situation where the consumer waits for lost data.
- The `consumer.offset` file is used to record the consumption offset of the consumer.

### Usage Example

Expand All @@ -64,6 +68,8 @@ Details:

- **ConsumerSpinWaitDuration**: The maximum duration for a single spin-wait for data by the consumer, default is 100 milliseconds.

- **ProducerForceFlushIntervalCount**: The number of messages after which the producer will forcibly flush data to disk after writing. The default is 1000 messages. If this number is not reached, data may be temporarily stored in memory, posing a risk of data loss until the system automatically flushes it to disk. Setting this value to 1 maximizes data safety but may impact performance. In cases of sudden power loss or other exceptions, data that has not been promptly written to disk may be lost. During recovery, the producer's offset will be rolled back to ensure that the consumer does not wait for lost data.

#### Producing and Consuming Data

The producer and consumer interfaces in MappedFileQueues are as follows:
Expand All @@ -76,6 +82,12 @@ public interface IMappedFileProducer<T> where T : struct
// Observes the next writable offset for the current producer
public long Offset { get; }

// Observes the offset that the current producer has confirmed to be written to disk
public long ConfirmedOffset { get; }

// Adjusts the offset for the current producer
public void AdjustOffset(long offset);

public void Produce(ref T item);
}

Expand Down
13 changes: 12 additions & 1 deletion README.zh-CN.md
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ offset 使用 long 类型存储,支持的最大值为 2^63-1。
│ └── ...
├── offset
│ ├── producer.offset
│ ├── producer.confirmed.offset
│ └── consumer.offset
```

Expand All @@ -51,7 +52,9 @@ offset 使用 long 类型存储,支持的最大值为 2^63-1。
- `commitlog` 目录存储实际的 Segment 文件。

- `offset` 目录存储生产者和消费者的偏移量文件。

- `producer.offset` 文件用于记录生产者的写入偏移量。
- `producer.confirmed.offset` 文件用于记录生产者已确认写入磁盘的偏移量,用于在系统异常重启后,生产者可以从该偏移量继续写入数据,避免消费者陷入等待已丢失数据的情况。
- `consumer.offset` 文件用于记录消费者的消费偏移量。
### 使用示例

#### 配置选项(MappedFileQueueOptions)
Expand All @@ -64,6 +67,8 @@ offset 使用 long 类型存储,支持的最大值为 2^63-1。

- **ConsumerSpinWaitDuration**:消费者单次自旋等待数据时的最大等待时间,默认为 100 毫秒。

- **ProducerForceFlushIntervalCount**:生产者在写入数据后,强制将数据刷新到磁盘的间隔消息数量,默认为 1000 条消息。在未达到该数量时,数据可能会暂存在内存中,存在丢失的风险,需等待系统自动刷新到磁盘。将此值设置为 1 可最大程度地保证数据安全,但会影响性能。在突然断电等异常情况下,未及时落盘的数据可能会丢失,恢复时将对生产者的 offset 进行回退,以确保消费者不会等待已丢失的数据。

#### 生产和消费数据

MappedFileQueues 中的生产者和消费者接口如下所示:
Expand All @@ -74,6 +79,12 @@ public interface IMappedFileProducer<T> where T : struct
// 用于观察当前生产者的下一个可写入的偏移量
public long Offset { get; }

// 用于观察当前生产者已确认写入磁盘的偏移量
public long ConfirmedOffset { get; }

// 调整当前生产者的偏移量
public void AdjustOffset(long offset);

public void Produce(ref T message);
}

Expand Down
2 changes: 2 additions & 0 deletions src/MappedFileQueues/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ internal class Constants

public const string ProducerOffsetFile = "producer.offset";

public const string ProducerConfirmedOffsetFile = "producer.confirmed.offset";

public const string ConsumerOffsetFile = "consumer.offset";

public const byte EndMarker = 0xFF;
Expand Down
8 changes: 8 additions & 0 deletions src/MappedFileQueues/IMappedFileConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@ public interface IMappedFileConsumer<T> where T : struct
/// Adjusts the offset to consume from the mapped file queue.
/// </summary>
/// <param name="offset">The new offset to set.</param>
/// <exception cref="ArgumentOutOfRangeException">Thrown when the provided offset is negative.</exception>
/// <exception cref="InvalidOperationException">Thrown when the producer has already started consuming messages.</exception>
public void AdjustOffset(long offset);

/// <summary>
Expand All @@ -23,4 +25,10 @@ public interface IMappedFileConsumer<T> where T : struct
/// Commits the offset of the last consumed message.
/// </summary>
public void Commit();

/// <summary>
/// Checks if there is a next message available to consume.
/// </summary>
/// <returns>True if there is a next message available; otherwise, false.</returns>
internal bool NextMessageAvailable();
}
13 changes: 13 additions & 0 deletions src/MappedFileQueues/IMappedFileProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,19 @@ public interface IMappedFileProducer<T> where T : struct
/// </summary>
public long Offset { get; }

/// <summary>
/// The last offset that has been fully persisted (confirmed written to storage).
/// </summary>
public long ConfirmedOffset { get; }

/// <summary>
/// Adjusts the offset to produce to the mapped file queue.
/// </summary>
/// <param name="offset">The new offset to set.</param>
/// <exception cref="ArgumentOutOfRangeException">Thrown when the provided offset is negative.</exception>
/// <exception cref="InvalidOperationException">Thrown when the producer has already started producing messages.</exception>
public void AdjustOffset(long offset);

/// <summary>
/// Produces a message to the mapped file queue.
/// </summary>
Expand Down
9 changes: 8 additions & 1 deletion src/MappedFileQueues/MappedFileConsumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ public void AdjustOffset(long offset)
"Cannot adjust offset while there is an active segment. Please adjust the offset before consuming any messages.");
}

_offsetFile.MoveTo(offset);
_offsetFile.MoveTo(offset, true);
}

public void Consume(out T message)
Expand Down Expand Up @@ -120,6 +120,13 @@ public void Dispose()
_segment?.Dispose();
}

public bool NextMessageAvailable() =>
_segment switch
{
null when !TryFindSegmentByOffset(out _segment) => false,
_ => _segment.TryRead(_offsetFile.Offset, out _)
};

private bool TryFindSegmentByOffset([MaybeNullWhen(false)] out MappedFileSegment<T> segment) =>
MappedFileSegment<T>.TryFind(
_segmentDirectory,
Expand Down
38 changes: 38 additions & 0 deletions src/MappedFileQueues/MappedFileProducer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,9 @@ internal class MappedFileProducer<T> : IMappedFileProducer<T>, IDisposable where
// Memory mapped file to store the producer offset
private readonly OffsetMappedFile _offsetFile;

// Memory mapped file to store the confirmed producer offset
private readonly OffsetMappedFile _confirmedOffsetFile;

private readonly int _payloadSize;

private readonly string _segmentDirectory;
Expand All @@ -17,6 +20,8 @@ internal class MappedFileProducer<T> : IMappedFileProducer<T>, IDisposable where

private bool _disposed;

private long _producedCount = 0;

public MappedFileProducer(MappedFileQueueOptions options)
{
_options = options;
Expand All @@ -30,13 +35,36 @@ public MappedFileProducer(MappedFileQueueOptions options)
var offsetPath = Path.Combine(offsetDir, Constants.ProducerOffsetFile);
_offsetFile = new OffsetMappedFile(offsetPath);

var confirmedOffsetPath = Path.Combine(offsetDir, Constants.ProducerConfirmedOffsetFile);
_confirmedOffsetFile = new OffsetMappedFile(confirmedOffsetPath);

_payloadSize = Unsafe.SizeOf<T>();

_segmentDirectory = Path.Combine(options.StorePath, Constants.CommitLogDirectory);
}

public long Offset => _offsetFile.Offset;

public long ConfirmedOffset => _confirmedOffsetFile.Offset;

public void AdjustOffset(long offset)
{
ObjectDisposedException.ThrowIf(_disposed, this);

if (offset < 0)
{
throw new ArgumentOutOfRangeException(nameof(offset), "Offset must be greater than or equal to zero.");
}

if (_segment != null)
{
throw new InvalidOperationException(
"Cannot adjust offset while there is an active segment. Please adjust the offset before producing any messages.");
}

_offsetFile.MoveTo(offset, true);
}

public void Produce(ref T message)
{
ObjectDisposedException.ThrowIf(_disposed, this);
Expand All @@ -57,6 +85,7 @@ public void Dispose()

_disposed = true;
_offsetFile.Dispose();
_confirmedOffsetFile.Dispose();
_segment?.Dispose();
}

Expand All @@ -71,6 +100,15 @@ private void Commit()

_offsetFile.Advance(_payloadSize + Constants.EndMarkerSize);

_producedCount++;

if (_producedCount % _options.ProducerForceFlushIntervalCount == 0)
{
// Force flush the segment and update the confirmed offset
_segment.Flush();
_confirmedOffsetFile.MoveTo(_offsetFile.Offset, true);
}

// Check if the segment has reached its limit
if (_segment.AllowedLastOffsetToWrite < _offsetFile.Offset)
{
Expand Down
5 changes: 5 additions & 0 deletions src/MappedFileQueues/MappedFileQueueOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,9 @@ public class MappedFileQueueOptions
/// The maximum duration a consumer will spin-wait each time for an item to become available.
/// </summary>
public TimeSpan ConsumerSpinWaitDuration { get; set; } = TimeSpan.FromMilliseconds(100);

/// <summary>
/// Number of produced items after which the producer will perform a forced flush.
/// </summary>
public long ProducerForceFlushIntervalCount { get; set; } = 1000;
}
88 changes: 80 additions & 8 deletions src/MappedFileQueues/MappedFileQueueT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -9,26 +9,30 @@ public sealed class MappedFileQueue<T> : IDisposable where T : struct

public MappedFileQueue(MappedFileQueueOptions options)
{
_options = options;

ArgumentException.ThrowIfNullOrWhiteSpace(options.StorePath, nameof(options.StorePath));

if (options.SegmentSize <= 0)
{
throw new ArgumentOutOfRangeException(nameof(options.SegmentSize),
"SegmentSize must be greater than zero.");
}

if (File.Exists(options.StorePath))
{
throw new ArgumentException($"The path '{options.StorePath}' is a file, not a directory.",
nameof(options.StorePath));
}

if (!Directory.Exists(options.StorePath))
if (Directory.Exists(options.StorePath))
{
Directory.CreateDirectory(options.StorePath);
RecoverProducerOffsetIfNeeded();
}

if (options.SegmentSize <= 0)
else
{
throw new ArgumentOutOfRangeException(nameof(options.SegmentSize),
"SegmentSize must be greater than zero.");
Directory.CreateDirectory(options.StorePath);
}

_options = options;
}

public IMappedFileProducer<T> Producer => _producer ??= new MappedFileProducer<T>(_options);
Expand All @@ -39,4 +43,72 @@ public void Dispose()
_producer?.Dispose();
_consumer?.Dispose();
}

// Check the last data. If the producer's offset is greater than the consumer's offset,
// and the consumer cannot consume the next piece of data, it means that there is data
// in the queue that was not persisted before the crash. We need to roll back the
// producer's offset to the position of the data that has been confirmed to be persisted.
private void RecoverProducerOffsetIfNeeded()
{
// On Windows, use "Global\" prefix to make the named semaphore visible across all user sessions (system-wide on this machine).
var semName = $"Global\\MappedFileQueueSem_{_options.StorePath.GetHashCode()}";
Semaphore? semaphore = null;
try
{
semaphore = new Semaphore(1, 1, semName);
}
catch (NotSupportedException)
{
// Named semaphores are not supported on this platform, use unnamed semaphore instead.
semaphore = new Semaphore(1, 1);
}

using var sem = semaphore;

semaphore.WaitOne();

try
{
var consumer = Consumer;

if (consumer.NextMessageAvailable())
{
return;
}

var producer = Producer;

if (producer.Offset <= consumer.Offset)
{
// the consumer can continue to consume when
// the producer produces a new message
return;
}

var rollbackOffset = Math.Max(consumer.Offset, producer.ConfirmedOffset);

if (producer.Offset > rollbackOffset)
{
producer.AdjustOffset(rollbackOffset);
}

if (producer.Offset > consumer.Offset
&& !consumer.NextMessageAvailable())
{
throw new InvalidOperationException(
"After recovering the producer's offset, the consumer still cannot consume the next message, the data may be corrupted.");
}
}
finally
{
// The producer and consumer may not be used after this recovery process,
// so we dispose them here.
_producer?.Dispose();
_consumer?.Dispose();
_producer = null;
_consumer = null;

semaphore.Release();
}
}
}
13 changes: 13 additions & 0 deletions src/MappedFileQueues/MappedFileSegment.cs
Original file line number Diff line number Diff line change
Expand Up @@ -130,8 +130,21 @@ public bool TryRead(long offset, out T message)
return true;
}


/// <summary>
/// Forces any changes made to the memory-mapped view to be written to the underlying file.
/// It is not usually necessary to call this method, as the system will flush changes automatically,
/// but it can be useful in scenarios where data integrity is critical.
/// </summary>
public void Flush()
{
_viewAccessor.Flush();
_fileStream.Flush(true);
}

public void Dispose()
{
Flush();
_viewAccessor.Dispose();
_mmf.Dispose();
_fileStream.Dispose();
Expand Down
Loading