Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
133 changes: 133 additions & 0 deletions src/MappedFileQueues/CrossPlatformProcessLock.cs
Original file line number Diff line number Diff line change
@@ -0,0 +1,133 @@
using System.Runtime.InteropServices;
using System.Security.Cryptography;

namespace MappedFileQueues;

/// <summary>
/// A cross-platform process lock implementation. Only Windows and Linux are supported.
/// On Windows, a named mutex is used. On Linux, a file lock is used. On other platforms, a no-op lock is used.
/// </summary>
internal sealed class CrossPlatformProcessLock : IDisposable
{
private readonly IProcessLock _lock;

/// <summary>
/// Initializes a new instance of the <see cref="CrossPlatformProcessLock"/> class.
/// </summary>
/// <param name="name">The name of the process lock.</param>
/// <param name="storePath">The store path of the <see cref="MappedFileQueue{T}"/>.</param>
public CrossPlatformProcessLock(string name, string storePath)
{
if (RuntimeInformation.IsOSPlatform(OSPlatform.Windows))
{
var hashBytes = MD5.HashData(System.Text.Encoding.UTF8.GetBytes(storePath));
var hashString = Convert.ToHexString(hashBytes);
name = $"{name}_{hashString}";
_lock = new MutexProcessLock(name);
}
else if (RuntimeInformation.IsOSPlatform(OSPlatform.Linux))
{
var lockFilePath = Path.Combine(storePath, $"{name}.lock");
_lock = new FileProcessLock(lockFilePath);
}
else
{
_lock = new EmptyProcessLock();
}
}

public void Acquire() => _lock.Acquire();
public void Release() => _lock.Release();

public void Dispose() => _lock.Dispose();

#region Process Lock Interface

private interface IProcessLock : IDisposable
{
void Acquire();
void Release();
}

#endregion

#region Windows Named Mutex Lock

private class MutexProcessLock(string name) : IProcessLock
{
private Mutex? _mutex;
// Global\ Make the mutex available across sessions
private readonly string _name = $@"Global\{name}";

public void Acquire()
{
_mutex = new Mutex(false, _name);
_mutex.WaitOne();
}

public void Release()
{
_mutex?.ReleaseMutex();
_mutex?.Dispose();
}

public void Dispose() => Release();
}

#endregion

#region Linux File Lock

private class FileProcessLock(string lockFilePath) : IProcessLock
{
private FileStream? _lockFileStream;

public void Acquire()
{
while (true)
{
try
{
_lockFileStream = new FileStream(lockFilePath, FileMode.OpenOrCreate,
FileAccess.ReadWrite, FileShare.ReadWrite);
_lockFileStream.Lock(0, 0);

Check warning on line 93 in src/MappedFileQueues/CrossPlatformProcessLock.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

This call site is reachable on all platforms. 'FileStream.Lock(long, long)' is unsupported on: 'macOS/OSX'.

Check warning on line 93 in src/MappedFileQueues/CrossPlatformProcessLock.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

This call site is reachable on all platforms. 'FileStream.Lock(long, long)' is unsupported on: 'macOS/OSX'. (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1416)

Check warning on line 93 in src/MappedFileQueues/CrossPlatformProcessLock.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

This call site is reachable on all platforms. 'FileStream.Lock(long, long)' is unsupported on: 'macOS/OSX'. (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1416)
return;
}
catch (IOException)
{
// Lock is held by another process, wait and retry
Thread.Sleep(200);
}
}
}

public void Release()
{
_lockFileStream?.Unlock(0, 0);

Check warning on line 106 in src/MappedFileQueues/CrossPlatformProcessLock.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

This call site is reachable on all platforms. 'FileStream.Unlock(long, long)' is unsupported on: 'macOS/OSX'.

Check warning on line 106 in src/MappedFileQueues/CrossPlatformProcessLock.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

This call site is reachable on all platforms. 'FileStream.Unlock(long, long)' is unsupported on: 'macOS/OSX'. (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1416)

Check warning on line 106 in src/MappedFileQueues/CrossPlatformProcessLock.cs

View workflow job for this annotation

GitHub Actions / dotnet-build

This call site is reachable on all platforms. 'FileStream.Unlock(long, long)' is unsupported on: 'macOS/OSX'. (https://learn.microsoft.com/dotnet/fundamentals/code-analysis/quality-rules/ca1416)
_lockFileStream?.Dispose();
}

public void Dispose() => Release();
}

#endregion

#region Empty Lock (for unsupported platforms)

private class EmptyProcessLock : IProcessLock
{
public void Acquire()
{
}

public void Release()
{
}

public void Dispose()
{
}
}

#endregion
}
20 changes: 4 additions & 16 deletions src/MappedFileQueues/MappedFileQueueT.cs
Original file line number Diff line number Diff line change
Expand Up @@ -50,22 +50,10 @@ public void Dispose()
// producer's offset to the position of the data that has been confirmed to be persisted.
private void RecoverProducerOffsetIfNeeded()
{
// On Windows, use "Global\" prefix to make the named semaphore visible across all user sessions (system-wide on this machine).
var semName = $"Global\\MappedFileQueueSem_{_options.StorePath.GetHashCode()}";
Semaphore? semaphore = null;
try
{
semaphore = new Semaphore(1, 1, semName);
}
catch (NotSupportedException)
{
// Named semaphores are not supported on this platform, use unnamed semaphore instead.
semaphore = new Semaphore(1, 1);
}

using var sem = semaphore;
var lockName = "recovery_lock";
using var processLock = new CrossPlatformProcessLock(lockName, _options.StorePath);

semaphore.WaitOne();
processLock.Acquire();

try
{
Expand Down Expand Up @@ -108,7 +96,7 @@ private void RecoverProducerOffsetIfNeeded()
_producer = null;
_consumer = null;

semaphore.Release();
processLock.Release();
}
}
}
1 change: 1 addition & 0 deletions src/MappedFileQueues/OffsetMappedFile.cs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ public void MoveTo(long offset, bool flushToDisk = false)

public void Dispose()
{
Flush();
_fileStream.Dispose();
_mmf.Dispose();
_vierAccessor.Dispose();
Expand Down
Loading