Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 16 additions & 12 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,8 @@ async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(boo
TraceContext = currentRequestTraceContext,
};

if (!this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances, out string warningMessage))
string warningMessage = await this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances);
if (!string.IsNullOrEmpty(warningMessage))
{
// If all messages belong to the same execution ID, then all of them need to be discarded.
// However, it's also possible to have messages for *any* execution ID batched together with messages
Expand Down Expand Up @@ -1049,28 +1050,33 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin
data.Episode.GetValueOrDefault(-1));
}

bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, bool allowReplayingTerminalInstances, out string message)
async Task<string> IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, bool allowReplayingTerminalInstances)
{
if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent))
{
var instanceId = newMessages[0].OrchestrationInstance.InstanceId;

if (DurableTask.Core.Common.Entities.AutoStart(instanceId, newMessages))
{
message = null;
return true;
return null;
}
else
{
TaskMessage executionTerminatedEventMessage = newMessages.LastOrDefault(msg => msg.Event is ExecutionTerminatedEvent);
if (executionTerminatedEventMessage is not null)
{
await this.trackingStore.UpdateStatusForTerminationAsync(instanceId, ((ExecutionTerminatedEvent)executionTerminatedEventMessage.Event).Input);
return $"Instance is {OrchestrationStatus.Terminated}";
}

// A non-zero event count usually happens when an instance's history is overwritten by a
// new instance or by a ContinueAsNew. When history is overwritten by new instances, we
// overwrite the old history with new history (with a new execution ID), but this is done
// gradually as we build up the new history over time. If we haven't yet overwritten *all*
// the old history and we receive a message from the old instance (this happens frequently
// with canceled durable timer messages) we'll end up loading just the history that hasn't
// been fully overwritten. We know it's invalid because it's missing the ExecutionStartedEvent.
message = runtimeState.Events.Count == 0 ? "No such instance" : "Invalid history (may have been overwritten by a newer instance)";
return false;
return runtimeState.Events.Count == 0 ? "No such instance" : "Invalid history (may have been overwritten by a newer instance)";
}
}

Expand All @@ -1080,12 +1086,10 @@ bool IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMess
runtimeState.OrchestrationStatus != OrchestrationStatus.Pending &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended)
{
message = $"Instance is {runtimeState.OrchestrationStatus}";
return false;
return $"Instance is {runtimeState.OrchestrationStatus}";
}

message = null;
return true;
return null;
}

async Task AbandonAndReleaseSessionAsync(OrchestrationSession session)
Expand Down Expand Up @@ -1909,15 +1913,15 @@ public async Task<IList<OrchestrationState>> GetOrchestrationStateAsync(string i
/// </summary>
/// <param name="instanceId">Instance ID of the orchestration to terminate.</param>
/// <param name="reason">The user-friendly reason for terminating.</param>
public Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
public async Task ForceTerminateTaskOrchestrationAsync(string instanceId, string reason)
{
var taskMessage = new TaskMessage
{
OrchestrationInstance = new OrchestrationInstance { InstanceId = instanceId },
Event = new ExecutionTerminatedEvent(-1, reason)
};

return SendTaskOrchestrationMessageAsync(taskMessage);
await SendTaskOrchestrationMessageAsync(taskMessage);
}

/// <summary>
Expand Down
24 changes: 24 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -795,6 +795,30 @@ public override async Task UpdateStatusForRewindAsync(string instanceId, Cancell
stopwatch.ElapsedMilliseconds);
}

/// <inheritdoc />
public override async Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default)
{
string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
TableEntity entity = new TableEntity(sanitizedInstanceId, "")
{
["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString("G"),
["LastUpdatedTime"] = DateTime.UtcNow,
[OutputProperty] = output
};

Stopwatch stopwatch = Stopwatch.StartNew();
await this.InstancesTable.MergeEntityAsync(entity, ETag.All, cancellationToken);

this.settings.Logger.InstanceStatusUpdate(
this.storageAccountName,
this.taskHubName,
instanceId,
string.Empty,
OrchestrationStatus.Terminated,
episode: 0,
stopwatch.ElapsedMilliseconds);
}


/// <inheritdoc />
public override Task StartAsync(CancellationToken cancellationToken = default)
Expand Down
8 changes: 8 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,14 @@ interface ITrackingStore
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
Task UpdateStatusForRewindAsync(string instanceId, CancellationToken cancellationToken = default);

/// <summary>
/// Used to update the instance status to "Terminated" whend a pending orchestration is terminated.
/// </summary>
/// <param name="instanceId">The instance being terminated</param>
/// <param name="output">The output of the orchestration</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default);

/// <summary>
/// Purge The History and state which is older than thresholdDateTimeUtc based on the timestamp type specified by timeRangeFilterType
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -176,5 +176,15 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[]

return null;
}

public override async Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default)
{
// Get the most recent execution and update its status to terminated
IEnumerable<OrchestrationStateInstanceEntity> instanceEntity = await this.instanceStore.GetOrchestrationStateAsync(instanceId, allInstances: false);
instanceEntity.Single().State.OrchestrationStatus = OrchestrationStatus.Terminated;
instanceEntity.Single().State.LastUpdatedTime = DateTime.UtcNow;
instanceEntity.Single().State.Output = output;
await this.instanceStore.WriteEntitiesAsync(instanceEntity);
}
}
}
3 changes: 3 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -100,6 +100,9 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo
throw new NotSupportedException();
}

/// <inheritdoc />
public abstract Task UpdateStatusForTerminationAsync(string instanceId, string output, CancellationToken cancellationToken = default);

/// <inheritdoc />
public abstract Task StartAsync(CancellationToken cancellationToken = default);

Expand Down
34 changes: 34 additions & 0 deletions test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -965,6 +965,40 @@ public async Task TerminateSuspendedOrchestration(bool enableExtendedSessions)
}
}

/// <summary>
/// Test that a pending orchestration can be terminated.
/// </summary>
[DataTestMethod]
[DataRow(true)]
[DataRow(false)]
public async Task TerminatePendingOrchestration(bool enableExtendedSessions)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions))
{
await host.StartAsync();
// Schedule a start time to ensure that the orchestration is in a Pending state when we attempt to terminate.
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Counter), 0, startAt: DateTime.UtcNow.AddMinutes(1));
await client.WaitForStatusChange(TimeSpan.FromSeconds(5), OrchestrationStatus.Pending);

await client.TerminateAsync("terminate");

var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));

// Confirm the pending orchestration was terminated.
Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus);
Assert.AreEqual("terminate", status?.Output);

// Now sleep for a minute and confirm that the orchestration does not start after its scheduled time.
Thread.Sleep(TimeSpan.FromMinutes(1));

status = await client.GetStatusAsync();
Assert.AreEqual(OrchestrationStatus.Terminated, status?.OrchestrationStatus);
Assert.AreEqual("terminate", status?.Output);

await host.StopAsync();
}
}

/// <summary>
/// End-to-end test which validates the Rewind functionality on more than one orchestration.
/// </summary>
Expand Down
Loading