Skip to content

Commit 8ab513b

Browse files
committed
fix
1 parent 2830a29 commit 8ab513b

20 files changed

Lines changed: 626 additions & 234 deletions

Eocron.DependencyInjection.Interceptors/DecoratorChainExtensions.cs

Lines changed: 53 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -2,6 +2,7 @@
22
using System.Reflection;
33
using Castle.DynamicProxy;
44
using Eocron.DependencyInjection.Interceptors.Caching;
5+
using Eocron.DependencyInjection.Interceptors.Locking;
56
using Eocron.DependencyInjection.Interceptors.Logging;
67
using Eocron.DependencyInjection.Interceptors.Retry;
78
using Eocron.DependencyInjection.Interceptors.Timeout;
@@ -16,20 +17,49 @@ public static class DecoratorChainExtensions
1617
public static DecoratorChain AddInterceptor(this DecoratorChain decoratorChain,
1718
Func<IServiceProvider, IAsyncInterceptor> interceptorFactory)
1819
{
19-
decoratorChain.Add((sp, instance) => InterceptionHelper.CreateProxy(instance, interceptorFactory(sp)));
20+
decoratorChain.Add((sp, keyPrefix, instance, lifetime) => InterceptionHelper.CreateProxy(decoratorChain.ServiceType, instance, interceptorFactory(sp)));
2021
return decoratorChain;
2122
}
2223

2324
public static DecoratorChain AddInterceptor(this DecoratorChain decoratorChain,
24-
IAsyncInterceptor interceptor)
25+
Func<IServiceProvider, string, ServiceLifetime, IAsyncInterceptor> interceptorFactory,
26+
DecoratorConfiguratorDelegate interceptorConfigurator)
2527
{
26-
decoratorChain.Add((sp, instance) => InterceptionHelper.CreateProxy(instance, interceptor));
28+
decoratorChain.Add((sp, keyPrefix, instance, lifetime) => InterceptionHelper.CreateProxy(decoratorChain.ServiceType, instance, interceptorFactory(sp, keyPrefix, lifetime)), interceptorConfigurator);
2729
return decoratorChain;
2830
}
2931

30-
public static DecoratorChain AddTracing(this DecoratorChain decoratorChain)
32+
public static DecoratorChain AddLock(this DecoratorChain decoratorChain,
33+
Func<IServiceProvider, ILockProvider> lockProvider)
34+
{
35+
decoratorChain.AddInterceptor(sp =>
36+
new LockAsyncInterceptor(
37+
lockProvider(sp),
38+
false));
39+
return decoratorChain;
40+
}
41+
42+
public static DecoratorChain AddSemaphoreSlimLock(this DecoratorChain decoratorChain, int initialCount = 1)
43+
{
44+
decoratorChain.AddInterceptor((sp, keyPrefix, lifetime) =>
45+
new LockAsyncInterceptor(sp.GetRequiredKeyedService<ILockProvider>(keyPrefix), lifetime == ServiceLifetime.Transient),
46+
(services, keyPrefix, lifetime) =>
47+
{
48+
services.Add(new ServiceDescriptor(typeof(ILockProvider), keyPrefix, (_,_)=> new SemaphoreSlimLockProvider(initialCount), lifetime));
49+
});
50+
return decoratorChain;
51+
}
52+
53+
public static DecoratorChain AddMonitorLock(this DecoratorChain decoratorChain)
3154
{
3255
decoratorChain.AddInterceptor((sp) =>
56+
new LockAsyncInterceptor(new MonitorLockProvider(), false));
57+
return decoratorChain;
58+
}
59+
60+
public static DecoratorChain AddTracing(this DecoratorChain decoratorChain)
61+
{
62+
decoratorChain.AddInterceptor(sp =>
3363
new LoggingAsyncInterceptor(
3464
sp.GetService<ILoggerFactory>().CreateLogger(decoratorChain.ServiceType.FullName),
3565
LogLevel.Trace,
@@ -44,15 +74,15 @@ public static DecoratorChain AddTimeout(this DecoratorChain decoratorChain, Time
4474
return decoratorChain;
4575
}
4676

47-
decoratorChain.AddInterceptor(new TimeoutAsyncInterceptor(timeout));
77+
decoratorChain.AddInterceptor(_ => new TimeoutAsyncInterceptor(timeout));
4878
return decoratorChain;
4979
}
5080

5181
public static DecoratorChain AddRetry(this DecoratorChain decoratorChain,
5282
Func<int, Exception, bool> exceptionPredicate,
5383
Func<int, Exception, TimeSpan> retryIntervalProvider)
5484
{
55-
decoratorChain.AddInterceptor((sp) => new RetryUntilConditionAsyncInterceptor(exceptionPredicate,
85+
decoratorChain.AddInterceptor(sp => new RetryAsyncInterceptor(exceptionPredicate,
5686
retryIntervalProvider,
5787
sp.GetService<ILoggerFactory>()?.CreateLogger(decoratorChain.ServiceType.Name)));
5888
return decoratorChain;
@@ -66,7 +96,7 @@ public static DecoratorChain AddConstantBackoff(this DecoratorChain decoratorCha
6696
{
6797
return decoratorChain.AddRetry(
6898
(c, ex) => c <= maxAttempts && (isRetryable?.Invoke(ex) ?? true),
69-
(c, _) => ConstantBackoff.Calculate(StaticRandom.Value, retryInterval, jittered));
99+
(_, _) => ConstantBackoff.Calculate(StaticRandom.Value, retryInterval, jittered));
70100
}
71101

72102
public static DecoratorChain AddExponentialBackoff(this DecoratorChain decoratorChain,
@@ -81,27 +111,36 @@ public static DecoratorChain AddExponentialBackoff(this DecoratorChain decorator
81111
(c, _) => CorrelatedExponentialBackoff.Calculate(StaticRandom.Value, c, minPropagationDuration, maxPropagationDuration, jittered));
82112
}
83113

114+
public static DecoratorChain AddCache(this DecoratorChain decoratorChain,
115+
Func<MethodInfo, object[], object> keyProvider)
116+
{
117+
decoratorChain.AddInterceptor(sp => new MemoryCacheAsyncInterceptor(sp.GetRequiredService<IMemoryCache>(),
118+
keyProvider,
119+
(_,_,_)=> {}));
120+
return decoratorChain;
121+
}
122+
84123
public static DecoratorChain AddSlidingTimeoutCache(this DecoratorChain decoratorChain,
85-
Func<MethodInfo, object[], object> keyProvider,
86-
TimeSpan cacheDuration)
124+
TimeSpan cacheDuration,
125+
Func<MethodInfo, object[], object> keyProvider)
87126
{
88127
if (cacheDuration <= TimeSpan.Zero)
89128
return decoratorChain;
90129

91-
decoratorChain.AddInterceptor((sp) => new MemoryCacheAsyncInterceptor(sp.GetRequiredService<IMemoryCache>(),
130+
decoratorChain.AddInterceptor(sp => new MemoryCacheAsyncInterceptor(sp.GetRequiredService<IMemoryCache>(),
92131
keyProvider,
93132
(_,_,ce)=> ce.SetSlidingExpiration(cacheDuration)));
94133
return decoratorChain;
95134
}
96135

97-
public static DecoratorChain AddTimeoutCache(this DecoratorChain decoratorChain,
98-
Func<MethodInfo, object[], object> keyProvider,
99-
TimeSpan cacheDuration)
136+
public static DecoratorChain AddAbsoluteTimeoutCache(this DecoratorChain decoratorChain,
137+
TimeSpan cacheDuration,
138+
Func<MethodInfo, object[], object> keyProvider)
100139
{
101140
if (cacheDuration <= TimeSpan.Zero)
102141
return decoratorChain;
103142

104-
decoratorChain.AddInterceptor((sp) => new MemoryCacheAsyncInterceptor(sp.GetRequiredService<IMemoryCache>(),
143+
decoratorChain.AddInterceptor(sp => new MemoryCacheAsyncInterceptor(sp.GetRequiredService<IMemoryCache>(),
105144
keyProvider,
106145
(_,_,ce)=> ce.SetAbsoluteExpiration(cacheDuration)));
107146
return decoratorChain;

Eocron.DependencyInjection.Interceptors/InterceptionHelper.cs

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -39,7 +39,12 @@ private static bool TryGetCancellationTokenIndex(IInvocation invocation, out int
3939
return false;
4040
}
4141

42-
public static async Task SafeDelay(TimeSpan delay, CancellationToken ct = default)
42+
public static void SafeDelay(TimeSpan delay, CancellationToken ct = default)
43+
{
44+
Thread.Sleep(delay);
45+
}
46+
47+
public static async Task SafeDelayAsync(TimeSpan delay, CancellationToken ct = default)
4348
{
4449
try
4550
{
@@ -56,4 +61,10 @@ public static T CreateProxy<T>(T target, params IAsyncInterceptor[] interceptors
5661
ProxyGenerator generator = new ProxyGenerator();
5762
return generator.CreateInterfaceProxyWithTargetInterface<T>(target, interceptors.Select(x=> x.ToInterceptor()).ToArray());
5863
}
64+
65+
public static object CreateProxy(Type interfaceType, object target, params IAsyncInterceptor[] interceptors)
66+
{
67+
ProxyGenerator generator = new ProxyGenerator();
68+
return generator.CreateInterfaceProxyWithTargetInterface(interfaceType, target, interceptors.Select(x=> x.ToInterceptor()).ToArray());
69+
}
5970
}
Lines changed: 13 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,13 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace Eocron.DependencyInjection.Interceptors.Locking
6+
{
7+
public interface ILockProvider
8+
{
9+
Task<IAsyncDisposable> AcquireAsync(CancellationToken ct);
10+
11+
IDisposable Acquire(CancellationToken ct);
12+
}
13+
}
Lines changed: 68 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,68 @@
1+
using System;
2+
using System.Threading.Tasks;
3+
using Castle.DynamicProxy;
4+
5+
namespace Eocron.DependencyInjection.Interceptors.Locking
6+
{
7+
public sealed class LockAsyncInterceptor : IAsyncInterceptor, IDisposable
8+
{
9+
private readonly ILockProvider _lockProvider;
10+
private readonly bool _disposeProvider;
11+
12+
public LockAsyncInterceptor(
13+
ILockProvider lockProvider,
14+
bool disposeProvider)
15+
{
16+
_lockProvider = lockProvider;
17+
_disposeProvider = disposeProvider;
18+
}
19+
20+
public void InterceptSynchronous(IInvocation invocation)
21+
{
22+
ExecuteSync(invocation);
23+
}
24+
25+
public void InterceptAsynchronous(IInvocation invocation)
26+
{
27+
invocation.ReturnValue = ExecuteAsync(invocation);
28+
}
29+
30+
public void InterceptAsynchronous<TResult>(IInvocation invocation)
31+
{
32+
invocation.ReturnValue = ExecuteAsync<TResult>(invocation);
33+
}
34+
35+
private void ExecuteSync(IInvocation invocation)
36+
{
37+
var ct = InterceptionHelper.GetCancellationTokenOrDefault(invocation);
38+
using var sync = _lockProvider.Acquire(ct);
39+
invocation.Proceed();
40+
}
41+
42+
private async Task ExecuteAsync(IInvocation invocation)
43+
{
44+
var ct = InterceptionHelper.GetCancellationTokenOrDefault(invocation);
45+
await using var sync = await _lockProvider.AcquireAsync(ct).ConfigureAwait(false);
46+
invocation.Proceed();
47+
var task = (Task)invocation.ReturnValue;
48+
await task.ConfigureAwait(false);
49+
}
50+
51+
private async Task<T> ExecuteAsync<T>(IInvocation invocation)
52+
{
53+
var ct = InterceptionHelper.GetCancellationTokenOrDefault(invocation);
54+
await using var sync = await _lockProvider.AcquireAsync(ct).ConfigureAwait(false);
55+
invocation.Proceed();
56+
var task = (Task<T>)invocation.ReturnValue;
57+
return await task.ConfigureAwait(false);
58+
}
59+
60+
public void Dispose()
61+
{
62+
if (_disposeProvider)
63+
{
64+
(_lockProvider as IDisposable)?.Dispose();
65+
}
66+
}
67+
}
68+
}
Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace Eocron.DependencyInjection.Interceptors.Locking
6+
{
7+
public sealed class MonitorLockProvider : ILockProvider
8+
{
9+
private readonly object _sync = new();
10+
private readonly TimeSpan _waitInterval = TimeSpan.FromMilliseconds(100);
11+
12+
public Task<IAsyncDisposable> AcquireAsync(CancellationToken ct)
13+
{
14+
throw new NotSupportedException("This lock is only supported on IDisposable.");
15+
}
16+
17+
public IDisposable Acquire(CancellationToken ct)
18+
{
19+
while(!Monitor.TryEnter(_sync, _waitInterval))
20+
{
21+
ct.ThrowIfCancellationRequested();
22+
}
23+
return new Releaser(_sync);
24+
}
25+
26+
private sealed class Releaser(object sync) : IDisposable
27+
{
28+
public void Dispose()
29+
{
30+
Monitor.Exit(sync);
31+
}
32+
}
33+
}
34+
}
Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,47 @@
1+
using System;
2+
using System.Threading;
3+
using System.Threading.Tasks;
4+
5+
namespace Eocron.DependencyInjection.Interceptors.Locking
6+
{
7+
public sealed class SemaphoreSlimLockProvider : ILockProvider, IDisposable
8+
{
9+
private readonly SemaphoreSlim _sync;
10+
11+
public SemaphoreSlimLockProvider(int count)
12+
{
13+
_sync = new SemaphoreSlim(count);
14+
}
15+
16+
public async Task<IAsyncDisposable> AcquireAsync(CancellationToken ct)
17+
{
18+
await _sync.WaitAsync(ct).ConfigureAwait(false);
19+
return new Releaser(_sync);
20+
}
21+
22+
public IDisposable Acquire(CancellationToken ct)
23+
{
24+
_sync.Wait(ct);
25+
return new Releaser(_sync);
26+
}
27+
28+
private sealed class Releaser(SemaphoreSlim semaphoreSlim) : IAsyncDisposable, IDisposable
29+
{
30+
public ValueTask DisposeAsync()
31+
{
32+
semaphoreSlim.Release();
33+
return ValueTask.CompletedTask;
34+
}
35+
36+
public void Dispose()
37+
{
38+
semaphoreSlim.Release();
39+
}
40+
}
41+
42+
public void Dispose()
43+
{
44+
_sync?.Dispose();
45+
}
46+
}
47+
}

Eocron.DependencyInjection.Interceptors/Logging/LoggingAsyncInterceptor.cs

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ private void ExecuteSync(IInvocation invocation)
4949
}
5050
catch (Exception ex)
5151
{
52-
_logger?.Log(_onError, ex, "Failed to invoke {invocation}. Elapsed: {elapsed}", invocation.Method.Name, sw.Elapsed);
52+
_logger?.Log(_onError, ex, "Failed to call {invocation}. Elapsed: {elapsed}", invocation.Method.Name, sw.Elapsed);
5353
throw;
5454
}
5555
}
@@ -67,7 +67,7 @@ private async Task ExecuteAsync(IInvocation invocation)
6767
}
6868
catch (Exception ex)
6969
{
70-
_logger?.Log(_onError, ex, "Failed to invoke {invocation}. Elapsed: {elapsed}", invocation.Method.Name, sw.Elapsed);
70+
_logger?.Log(_onError, ex, "Failed to call {invocation}. Elapsed: {elapsed}", invocation.Method.Name, sw.Elapsed);
7171
throw;
7272
}
7373
}
@@ -86,7 +86,7 @@ private async Task<T> ExecuteAsync<T>(IInvocation invocation)
8686
}
8787
catch (Exception ex)
8888
{
89-
_logger?.Log(_onError, ex, "Failed to invoke {invocation}. Elapsed: {elapsed}", invocation.Method.Name, sw.Elapsed);
89+
_logger?.Log(_onError, ex, "Failed to call {invocation}. Elapsed: {elapsed}", invocation.Method.Name, sw.Elapsed);
9090
throw;
9191
}
9292
}

Eocron.DependencyInjection.Interceptors/Retry/CorrelatedExponentialBackoff.cs

Lines changed: 14 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -4,19 +4,28 @@ namespace Eocron.DependencyInjection.Interceptors.Retry
44
{
55
public static class CorrelatedExponentialBackoff
66
{
7-
public static TimeSpan Calculate(Random random, int attempt, TimeSpan propagationDuration,
7+
/// <summary>
8+
/// Calculates timeframes in which multiple clients can invoke requests at single channel and scatters them in this timeframe.
9+
/// Formula: timeToWait = random(0, min(minPropagation * 2^(attempt-1), maxPropagation))
10+
/// </summary>
11+
/// <param name="minPropagationDuration">Minimum time it takes to complete request, not less than 1ms</param>
12+
/// <param name="maxPropagationDuration">Maximum time it takes to complete request</param>
13+
/// <param name="jittered">Enables jittering to avoid clustering of invocations between different systems. Disable to make things worse.</param>
14+
/// <returns>Time to wait</returns>
15+
/// <exception cref="ArgumentOutOfRangeException"></exception>
16+
public static TimeSpan Calculate(Random random, int attempt, TimeSpan minPropagationDuration,
817
TimeSpan maxPropagationDuration, bool jittered)
918
{
1019
if(attempt < 1)
1120
throw new ArgumentOutOfRangeException(nameof(attempt));
12-
if(propagationDuration >= maxPropagationDuration)
13-
throw new ArgumentOutOfRangeException(nameof(propagationDuration), "Minimum propagation duration must be less than max propagation duration.");
21+
if(minPropagationDuration >= maxPropagationDuration)
22+
throw new ArgumentOutOfRangeException(nameof(minPropagationDuration), "Minimum propagation duration must be less than maximum propagation duration.");
1423
if (maxPropagationDuration <= TimeSpan.Zero)
1524
throw new ArgumentOutOfRangeException(nameof(maxPropagationDuration), "Maximum propagation duration must be greater than zero.");
1625

1726
var power = attempt - 1;
18-
var minPropagationMs = Math.Max((int)propagationDuration.TotalMilliseconds, 5); //min time it takes to process single request
19-
var maxPropagationMs = Math.Max(minPropagationMs, (int)maxPropagationDuration.TotalMilliseconds); //max time it takes to process single request
27+
var minPropagationMs = Math.Max((int)minPropagationDuration.TotalMilliseconds, 20); //min time it takes to process single request
28+
var maxPropagationMs = Math.Max(minPropagationMs<<1, (int)maxPropagationDuration.TotalMilliseconds); //max time it takes to process single request
2029
var maxPower = (int)Math.Floor(Math.Log2((maxPropagationMs - minPropagationMs) / minPropagationMs));
2130
if (maxPower >= power)
2231
{

0 commit comments

Comments
 (0)