Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 27 additions & 4 deletions src/DurableTask.AzureStorage/AzureStorageOrchestrationService.cs
Original file line number Diff line number Diff line change
Expand Up @@ -793,7 +793,12 @@ async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(boo
TraceContext = currentRequestTraceContext,
};

string warningMessage = await this.IsExecutableInstance(session.RuntimeState, orchestrationWorkItem.NewMessages, settings.AllowReplayingTerminalInstances);
string warningMessage = await this.IsExecutableInstanceAsync(
session.RuntimeState,
orchestrationWorkItem.NewMessages,
settings.AllowReplayingTerminalInstances,
session.TrackingStoreContext,
cancellationToken);
if (!string.IsNullOrEmpty(warningMessage))
{
// If all messages belong to the same execution ID, then all of them need to be discarded.
Expand Down Expand Up @@ -1050,7 +1055,12 @@ internal static void TraceMessageReceived(AzureStorageOrchestrationServiceSettin
data.Episode.GetValueOrDefault(-1));
}

async Task<string> IsExecutableInstance(OrchestrationRuntimeState runtimeState, IList<TaskMessage> newMessages, bool allowReplayingTerminalInstances)
async Task<string> IsExecutableInstanceAsync(
OrchestrationRuntimeState runtimeState,
IList<TaskMessage> newMessages,
bool allowReplayingTerminalInstances,
object trackingStoreContext,
CancellationToken cancellationToken)
{
if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent))
{
Expand Down Expand Up @@ -1085,12 +1095,25 @@ await this.trackingStore.UpdateStatusForTerminationAsync(
}

if (runtimeState.ExecutionStartedEvent != null &&
!allowReplayingTerminalInstances &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Running &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Pending &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Suspended)
{
return $"Instance is {runtimeState.OrchestrationStatus}";
InstanceStatus instanceStatus = await this.trackingStore.FetchInstanceStatusAsync(runtimeState.OrchestrationInstance.InstanceId);
if (instanceStatus.State.OrchestrationInstance.ExecutionId == runtimeState.OrchestrationInstance.ExecutionId
&& instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus)
{
await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
runtimeState.OrchestrationInstance.InstanceId,
runtimeState.OrchestrationInstance.ExecutionId,
runtimeState,
trackingStoreContext,
cancellationToken);
}
if (!allowReplayingTerminalInstances)
{
return $"Instance is {runtimeState.OrchestrationStatus}";
}
}

return null;
Expand Down
51 changes: 51 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -1048,6 +1048,57 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
return eTagValue;
}

public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
string instanceId,
string executionId,
OrchestrationRuntimeState runtimeState,
object trackingStoreContext,
CancellationToken cancellationToken = default)
{
if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Canceled &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Failed &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Terminated)
{
return;
}

TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext;
if (context.Blobs.Count > 0)
{
var tasks = new List<Task>(context.Blobs.Count);
foreach (string blobName in context.Blobs)
{
tasks.Add(this.messageManager.DeleteBlobAsync(blobName));
}
await Task.WhenAll(tasks);
}

string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty)
{
// TODO: Translating null to "null" is a temporary workaround. We should prioritize
// https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary.
["CustomStatus"] = runtimeState.Status ?? "null",
["ExecutionId"] = executionId,
["LastUpdatedTime"] = runtimeState.Events.Last().Timestamp,
["RuntimeStatus"] = runtimeState.OrchestrationStatus.ToString(),
["CompletedTime"] = runtimeState.CompletedTime
};

Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew();
await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity);

this.settings.Logger.InstanceStatusUpdate(
this.storageAccountName,
this.taskHubName,
instanceId,
executionId,
runtimeState.OrchestrationStatus,
Utils.GetEpisodeNumber(runtimeState),
orchestrationInstanceUpdateStopwatch.ElapsedMilliseconds);
}

static int GetEstimatedByteCount(TableEntity entity)
{
// Assume at least 1 KB of data per entity to account for static-length properties
Expand Down
13 changes: 13 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,19 @@ interface ITrackingStore
/// <returns>Returns the instance status or <c>null</c> if none was found.</returns>
Task<InstanceStatus> FetchInstanceStatusAsync(string instanceId, CancellationToken cancellationToken = default);

/// <summary>
/// Updates the instance status of the specified orchestration instance to match that of <paramref name="runtimeState"/> for a completed orchestration.
/// Also deletes any orphaned blobs of <paramref name="trackingStoreContext"/>.
/// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a failure during a call to
/// <see cref="UpdateStateAsync"/> for a completing orchestration. If the orchestration is not in a terminal state, the method will immediately return and do nothing.
/// </summary>
/// <param name="instanceId">The ID of the orchestration.</param>
/// <param name="executionId">The execution ID of the orchestration.</param>
/// <param name="runtimeState">The runtime state of the orchestration.</param>
/// <param name="trackingStoreContext">Additional context for the execution that is maintained by the tracking store.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default);

/// <summary>
/// Get The Orchestration State for querying all orchestration instances
/// </summary>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -191,5 +191,31 @@ public override async Task UpdateStatusForTerminationAsync(
instanceEntity.Single().State.Output = output;
await this.instanceStore.WriteEntitiesAsync(instanceEntity);
}

public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
string instanceId,
string executionId,
OrchestrationRuntimeState runtimeState,
object trackingStoreContext,
CancellationToken cancellationToken = default)
{
if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Canceled &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Failed &&
runtimeState.OrchestrationStatus != OrchestrationStatus.Terminated)
{
return;
}

// No blobs to delete for this tracking store implementation
await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[]
{
new OrchestrationStateInstanceEntity()
{
State = Core.Common.Utils.BuildOrchestrationState(runtimeState),
SequenceNumber = runtimeState.Events.Count
}
});
}
}
}
3 changes: 3 additions & 0 deletions src/DurableTask.AzureStorage/Tracking/TrackingStoreBase.cs
Original file line number Diff line number Diff line change
Expand Up @@ -108,5 +108,8 @@ public virtual Task UpdateStatusForRewindAsync(string instanceId, CancellationTo

/// <inheritdoc />
public abstract Task<ETag?> UpdateStateAsync(OrchestrationRuntimeState newRuntimeState, OrchestrationRuntimeState oldRuntimeState, string instanceId, string executionId, ETag? eTag, object executionData, CancellationToken cancellationToken = default);

/// <inheritdoc />
public abstract Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default);
}
}
74 changes: 74 additions & 0 deletions test/DurableTask.AzureStorage.Tests/AzureStorageScenarioTests.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2401,6 +2401,7 @@ await Task.WhenAll(
[DataRow(false, true, false)]
[DataRow(false, false, true)]
[DataRow(false, false, false)]
[Ignore("Skipping since this functionality has since changed, see TestWorkerFailingDuringCompleteWorkItemCall")]
public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSessions, bool sendTerminateEvent, bool allowReplayingTerminalInstances)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(
Expand Down Expand Up @@ -2475,6 +2476,79 @@ public async Task TestAllowReplayingTerminalInstances(bool enableExtendedSession
}
}

/// <summary>
/// Confirm that if a worker fails after committing the new history but before updating the instance state in a call to
/// <see cref="AzureStorageOrchestrationService.CompleteTaskOrchestrationWorkItemAsync"/> for an orchestration that has
/// reached a terminal state, then storage is brought to consistent state by the call to
/// <see cref="AzureStorageOrchestrationService.LockNextTaskOrchestrationWorkItemAsync"/>.
/// Since we cannot simulate a worker failure at this precise point, instead what is done by this test is that we
/// let an orchestration run to completion, and then manually change the instance table state back to "Running".
/// We then send an event to the orchestration, which triggers a call to lock the next task work item, at which point
/// the inconsistent state in storage for the terminal instance is recognized, the instance state is updated, and the work item discarded.
/// Note that this test does not confirm that orphaned blobs are deleted by the call to lock the next orchestration work item
/// in the case of a terminal orchestration with inconsistent state in storage. This is because there is no easy way to mock/inject
/// the tracking store context object that is part of the orchestration session state which keeps track of the blobs.
/// </summary>
/// <returns></returns>
[DataTestMethod]
[DataRow(true, true)]
[DataRow(false, true)]
[DataRow(true, false)]
[DataRow(false, false)]
public async Task TestWorkerFailingDuringCompleteWorkItemCall(bool enableExtendedSessions, bool terminate)
{
using (TestOrchestrationHost host = TestHelpers.GetTestOrchestrationHost(enableExtendedSessions))
{
await host.StartAsync();

// Run simple orchestrator to completion, this will help us obtain a valid terminal history for the orchestrator
var client = await host.StartOrchestrationAsync(typeof(Orchestrations.Echo), "hello!");
var status = await client.WaitForCompletionAsync(TimeSpan.FromSeconds(10));
Assert.AreEqual(OrchestrationStatus.Completed, status?.OrchestrationStatus);

// Simulate having an "out of date" Instance table, by setting it's runtime status to "Running".
// This simulates the scenario where the History table was updated, but not the Instance table.
var instanceId = client.InstanceId;
AzureStorageOrchestrationServiceSettings settings = TestHelpers.GetTestAzureStorageOrchestrationServiceSettings(
enableExtendedSessions);
AzureStorageClient azureStorageClient = new AzureStorageClient(settings);

Table instanceTable = azureStorageClient.GetTableReference(azureStorageClient.Settings.InstanceTableName);
TableEntity entity = new TableEntity(instanceId, "")
{
["RuntimeStatus"] = OrchestrationStatus.Running.ToString("G")
};
await instanceTable.MergeEntityAsync(entity, Azure.ETag.All);

// Assert that the status in the Instance table reads "Running"
IList<OrchestrationState> state = await client.GetStateAsync(instanceId);
OrchestrationStatus forcedStatus = state.First().OrchestrationStatus;
Assert.AreEqual(OrchestrationStatus.Running, forcedStatus);

// The type of event sent should not matter - the event itself should be discarded, and the instance table updated
// to reflect the status in the history table.
if (terminate)
{
await client.TerminateAsync("testing");
}
else
{
await client.RaiseEventAsync("Foo", "Bar");
}
await Task.Delay(TimeSpan.FromSeconds(30));

// A replay should have occurred, forcing the instance table to be updated with a terminal status
state = await client.GetStateAsync(instanceId);
Assert.AreEqual(1, state.Count);

status = state.First();
OrchestrationStatus expectedStatus = OrchestrationStatus.Completed;
Assert.AreEqual(expectedStatus, status.OrchestrationStatus);

await host.StopAsync();
}
}

[TestMethod]
[DataRow(VersioningSettings.VersionMatchStrategy.Strict)]
[DataRow(VersioningSettings.VersionMatchStrategy.CurrentOrOlder)]
Expand Down
Loading