Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cleipnir.ResilientFunctions
54 changes: 0 additions & 54 deletions Cleipnir.Tests/Flows/CorrelationIdFlowTests.cs

This file was deleted.

25 changes: 12 additions & 13 deletions Cleipnir/Flow.cs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,6 @@ namespace Cleipnir.Flows;
public abstract class BaseFlow
{
public Workflow Workflow { get; init; } = null!;
public Utilities Utilities => Workflow.Utilities;
public Effect Effect => Workflow.Effect;

#region Capture explicit id with ResiliencyLevel
Expand Down Expand Up @@ -63,18 +62,18 @@ public Task Capture(Action work, ResiliencyLevel resiliencyLevel = ResiliencyLev

#endregion

public Task<TMessage> Message<TMessage>(TimeSpan? maxWait = null) where TMessage : class
=> Workflow.Message<TMessage>(maxWait);
public Task<TMessage?> Message<TMessage>(DateTime waitUntil, TimeSpan? maxWait = null) where TMessage : class
=> Workflow.Message<TMessage>(waitUntil, maxWait);
public Task<TMessage?> Message<TMessage>(TimeSpan waitFor, TimeSpan? maxWait = null) where TMessage : class
=> Workflow.Message<TMessage>(waitFor, maxWait);
public Task<TMessage> Message<TMessage>(Func<TMessage, bool> filter, TimeSpan? maxWait = null) where TMessage : class
=> Workflow.Message(filter, maxWait);
public Task<TMessage?> Message<TMessage>(Func<TMessage, bool> filter, DateTime waitUntil, TimeSpan? maxWait = null) where TMessage : class
=> Workflow.Message(filter, waitUntil, maxWait);
public Task<TMessage?> Message<TMessage>(Func<TMessage, bool> filter, TimeSpan waitFor, TimeSpan? maxWait = null) where TMessage : class
=> Workflow.Message(filter, waitFor, maxWait);
public Task<TMessage> Message<TMessage>() where TMessage : class
=> Workflow.Message<TMessage>();
public Task<TMessage?> Message<TMessage>(DateTime waitUntil) where TMessage : class
=> Workflow.Message<TMessage>(waitUntil);
public Task<TMessage?> Message<TMessage>(TimeSpan waitFor) where TMessage : class
=> Workflow.Message<TMessage>(waitFor);
public Task<TMessage> Message<TMessage>(Func<TMessage, bool> filter) where TMessage : class
=> Workflow.Message(filter);
public Task<TMessage?> Message<TMessage>(Func<TMessage, bool> filter, DateTime waitUntil) where TMessage : class
=> Workflow.Message(filter, waitUntil);
public Task<TMessage?> Message<TMessage>(Func<TMessage, bool> filter, TimeSpan waitFor) where TMessage : class
=> Workflow.Message(filter, waitFor);

public Task Delay(TimeSpan @for, bool suspend = true) => Workflow.Delay(@for, suspend);
public Task Delay(DateTime until, bool suspend = true) => Workflow.Delay(until, suspend);
Expand Down
37 changes: 0 additions & 37 deletions Cleipnir/Flows.cs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,6 @@ namespace Cleipnir.Flows;
public interface IBaseFlows
{
public static abstract Type FlowType { get; }

public Task RouteMessage<T>(T message, string correlationId, string? idempotencyKey = null) where T : class;
}

public abstract class BaseFlows<TFlow> : IBaseFlows where TFlow : notnull
Expand Down Expand Up @@ -44,8 +42,6 @@ protected static Action<TFlow, Workflow> CreateWorkflowSetter()
var setter = lambdaExpr.Compile();
return setter;
}

public abstract Task RouteMessage<T>(T message, string correlationId, string? idempotencyKey = null) where T : class;
}

public class Flows<TFlow> : BaseFlows<TFlow> where TFlow : Flow
Expand Down Expand Up @@ -143,17 +139,6 @@ public Task<Scheduled> Schedule(FlowInstance instanceId, InitialState? initialSt
/// <returns>A task which will complete when the flow has been persisted</returns>
public Task ScheduleIn(FlowInstance instanceId, TimeSpan delay) => _registration.ScheduleIn(instanceId.Value, delay);

/// <summary>
/// Route a message to the flow with registered correlation id
/// </summary>
/// <param name="message">Message to be delivered to the flow</param>
/// <param name="correlationId">Correlation id by which the flow is resolved</param>
/// <param name="idempotencyKey">Optional idempotency key to de-duplicate messages</param>
/// <typeparam name="T">Message type</typeparam>
/// <returns>A task which will complete when the message has been persisted</returns>
public override Task RouteMessage<T>(T message, string correlationId, string? idempotencyKey = null)
=> _registration.RouteMessage(message, correlationId, idempotencyKey);

/// <summary>
/// Schedule multiple flows at once
/// Execution of flows will be divided between the replicas
Expand Down Expand Up @@ -301,17 +286,6 @@ public Task ScheduleIn(
TimeSpan delay
) => _registration.ScheduleIn(instanceId.Value, param, delay);

/// <summary>
/// Route a message to the flow with registered correlation id
/// </summary>
/// <param name="message">Message to be delivered to the flow</param>
/// <param name="correlationId">Correlation id by which the flow is resolved</param>
/// <param name="idempotencyKey">Optional idempotency key to de-duplicate messages</param>
/// <typeparam name="T">Message type</typeparam>
/// <returns>A task which will complete when the message has been persisted</returns>
public override Task RouteMessage<T>(T message, string correlationId, string? idempotencyKey = null)
=> _registration.RouteMessage(message, correlationId, idempotencyKey);

/// <summary>
/// Emit interrupt signal to flows
/// Execution of suspended flows will be resumed. Already executing flows will be restarted on suspension.
Expand Down Expand Up @@ -455,17 +429,6 @@ public Task ScheduleIn(
TimeSpan delay
) => _registration.ScheduleIn(instanceId.Value, param, delay);

/// <summary>
/// Route a message to the flow with registered correlation id
/// </summary>
/// <param name="message">Message to be delivered to the flow</param>
/// <param name="correlationId">Correlation id by which the flow is resolved</param>
/// <param name="idempotencyKey">Optional idempotency key to de-duplicate messages</param>
/// <typeparam name="T">Message type</typeparam>
/// <returns>A task which will complete when the message has been persisted</returns>
public override Task RouteMessage<T>(T message, string correlationId, string? idempotencyKey = null)
=> _registration.RouteMessage(message, correlationId, idempotencyKey);

/// <summary>
/// Emit interrupt signal to flows
/// Execution of suspended flows will be resumed. Already executing flows will be restarted on suspension.
Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public override async Task Run(MailAndRecipients param)
await childFlows.BulkSchedule(bulkWork);

for (var i = 0; i < 3; i++)
await Message<EmailsSent>(maxWait: TimeSpan.FromMinutes(30));
await Message<EmailsSent>(waitFor: TimeSpan.FromMinutes(30));

Console.WriteLine("Finished NewsletterParentFlow");
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ public override async Task Run(string customerEmail)

for (var i = 0; i <= 3; i++)
{
var emailVerified = await Message<EmailVerified>(maxWait: TimeSpan.FromDays(1));
var emailVerified = await Message<EmailVerified>(waitFor: TimeSpan.FromDays(1));

if (emailVerified != null)
break;
Expand Down
29 changes: 0 additions & 29 deletions Samples/Cleipnir.Sample.Presentation/H_BankTransfer/Example.cs

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading
Loading