Skip to content
Original file line number Diff line number Diff line change
Expand Up @@ -797,7 +797,6 @@ async Task<TaskOrchestrationWorkItem> LockNextTaskOrchestrationWorkItemAsync(boo
session.RuntimeState,
orchestrationWorkItem.NewMessages,
settings.AllowReplayingTerminalInstances,
session.TrackingStoreContext,
cancellationToken);
if (!string.IsNullOrEmpty(warningMessage))
{
Expand Down Expand Up @@ -1059,7 +1058,6 @@ async Task<string> IsExecutableInstanceAsync(
OrchestrationRuntimeState runtimeState,
IList<TaskMessage> newMessages,
bool allowReplayingTerminalInstances,
object trackingStoreContext,
CancellationToken cancellationToken)
{
if (runtimeState.ExecutionStartedEvent == null && !newMessages.Any(msg => msg.Event is ExecutionStartedEvent))
Expand All @@ -1078,8 +1076,7 @@ async Task<string> IsExecutableInstanceAsync(
var executionTerminatedEvent = (ExecutionTerminatedEvent)executionTerminatedEventMessage.Event;
await this.trackingStore.UpdateStatusForTerminationAsync(
instanceId,
executionTerminatedEvent.Input,
executionTerminatedEvent.Timestamp);
executionTerminatedEvent);
return $"Instance is {OrchestrationStatus.Terminated}";
}

Expand All @@ -1103,11 +1100,11 @@ await this.trackingStore.UpdateStatusForTerminationAsync(
if (instanceStatus == null || (instanceStatus.State.OrchestrationInstance.ExecutionId == runtimeState.OrchestrationInstance.ExecutionId
&& instanceStatus.State.OrchestrationStatus != runtimeState.OrchestrationStatus))
{
await this.trackingStore.UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
await this.trackingStore.UpdateInstanceStatusForCompletedOrchestrationAsync(
runtimeState.OrchestrationInstance.InstanceId,
runtimeState.OrchestrationInstance.ExecutionId,
runtimeState,
trackingStoreContext,
instanceStatus is not null,
cancellationToken);
}
if (!allowReplayingTerminalInstances)
Expand Down
130 changes: 106 additions & 24 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -798,21 +798,25 @@ public override async Task UpdateStatusForRewindAsync(string instanceId, Cancell
/// <inheritdoc />
public override async Task UpdateStatusForTerminationAsync(
string instanceId,
string output,
DateTime lastUpdatedTime,
ExecutionTerminatedEvent executionTerminatedEvent,
CancellationToken cancellationToken = default)
{
string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
TableEntity entity = new TableEntity(sanitizedInstanceId, "")
TableEntity instanceEntity = new TableEntity(sanitizedInstanceId, "")
{
["RuntimeStatus"] = OrchestrationStatus.Terminated.ToString("G"),
["LastUpdatedTime"] = lastUpdatedTime,
["LastUpdatedTime"] = executionTerminatedEvent.Timestamp,
["CompletedTime"] = DateTime.UtcNow,
[OutputProperty] = output
// In the case of terminating an orchestration, the termination reason becomes the orchestration's output.
[OutputProperty] = executionTerminatedEvent.Input,
};

// Setting addBlobPropertyName to false ensures that the blob URL is saved as the "Output" of the instance entity, which is the expected behavior
// for large orchestration outputs.
await this.CompressLargeMessageAsync(instanceEntity, listOfBlobs: null, cancellationToken: cancellationToken, addBlobPropertyName: false);

Stopwatch stopwatch = Stopwatch.StartNew();
await this.InstancesTable.MergeEntityAsync(entity, ETag.All, cancellationToken);
await this.InstancesTable.MergeEntityAsync(instanceEntity, ETag.All, cancellationToken);

this.settings.Logger.InstanceStatusUpdate(
this.storageAccountName,
Expand Down Expand Up @@ -864,6 +868,7 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
["CustomStatus"] = newRuntimeState.Status ?? "null",
["ExecutionId"] = executionId,
["LastUpdatedTime"] = newEvents.Last().Timestamp,
["TaskHubName"] = this.settings.TaskHubName,
};

// check if we are replacing a previous execution with blobs; those will be deleted from the store after the update. This could occur in a ContinueAsNew scenario
Expand Down Expand Up @@ -910,6 +915,8 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
instanceEntity["Version"] = executionStartedEvent.Version;
instanceEntity["CreatedTime"] = executionStartedEvent.Timestamp;
instanceEntity["RuntimeStatus"] = OrchestrationStatus.Running.ToString();
instanceEntity["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags);
instanceEntity["Generation"] = executionStartedEvent.Generation;
if (executionStartedEvent.ScheduledStartTime.HasValue)
{
instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime;
Expand Down Expand Up @@ -1048,11 +1055,11 @@ public override Task StartAsync(CancellationToken cancellationToken = default)
return eTagValue;
}

public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync(
string instanceId,
string executionId,
OrchestrationRuntimeState runtimeState,
object trackingStoreContext,
bool instanceEntityExists,
CancellationToken cancellationToken = default)
{
if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed &&
Expand All @@ -1063,28 +1070,90 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete
return;
}

TrackingStoreContext context = (TrackingStoreContext)trackingStoreContext;
if (context.Blobs.Count > 0)
{
var tasks = new List<Task>(context.Blobs.Count);
foreach (string blobName in context.Blobs)
{
tasks.Add(this.messageManager.DeleteBlobAsync(blobName));
}
await Task.WhenAll(tasks);
}

string sanitizedInstanceId = KeySanitation.EscapePartitionKey(instanceId);
ExecutionStartedEvent executionStartedEvent = runtimeState.ExecutionStartedEvent;

// We need to set all of the fields of the instance entity in the case that it was never created for the orchestration.
// This can be the case for a suborchestration that completed in one execution, for example.
var instanceEntity = new TableEntity(sanitizedInstanceId, string.Empty)
{
["Name"] = runtimeState.Name,
["Version"] = runtimeState.Version,
["CreatedTime"] = executionStartedEvent.Timestamp,
// TODO: Translating null to "null" is a temporary workaround. We should prioritize
// https://github.com/Azure/durabletask/issues/477 so that this is no longer necessary.
["CustomStatus"] = runtimeState.Status ?? "null",
["ExecutionId"] = executionId,
["LastUpdatedTime"] = runtimeState.Events.Last().Timestamp,
["RuntimeStatus"] = runtimeState.OrchestrationStatus.ToString(),
["CompletedTime"] = runtimeState.CompletedTime
["CompletedTime"] = runtimeState.CompletedTime,
["Tags"] = TagsSerializer.Serialize(executionStartedEvent.Tags),
["TaskHubName"] = this.settings.TaskHubName,
};
if (runtimeState.ExecutionStartedEvent.ScheduledStartTime.HasValue)
{
instanceEntity["ScheduledStartTime"] = executionStartedEvent.ScheduledStartTime;
}

static TableEntity GetSingleEntityFromHistoryTableResults(IReadOnlyList<TableEntity> entities, string dataType)
{
try
{
TableEntity singleEntity = entities.SingleOrDefault();

return singleEntity ?? throw new DurableTaskStorageException($"The history table query to determine the blob storage URL " +
$"for the large orchestration {dataType} returned no rows. Unable to extract the URL from these results.");
}
catch (InvalidOperationException)
{
throw new DurableTaskStorageException($"The history table query to determine the blob storage URL for the large orchestration " +
$"{dataType} returned more than one row, when exactly one row is expected. " +
$"Unable to extract the URL from these results.");
}
}

// Set the output.
// In the case that the output is too large and is stored in blob storage, extract the blob name from the ExecutionCompleted history entity.
if (this.ExceedsMaxTablePropertySize(runtimeState.Output))
{
string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" +
$" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" +
$" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionCompleted)}'";
TableEntity executionCompletedEntity = GetSingleEntityFromHistoryTableResults(await this.QueryHistoryAsync(filter, instanceId, cancellationToken), "output");
this.SetInstancesTablePropertyFromHistoryProperty(
executionCompletedEntity,
instanceEntity,
historyPropertyName: nameof(runtimeState.ExecutionCompletedEvent.Result),
instancePropertyName: OutputProperty,
data: runtimeState.Output);
}
else
{
instanceEntity[OutputProperty] = runtimeState.Output;
}

// If the input has not been set by a previous execution, set the input.
if (!instanceEntityExists)
{
// In the case that the input is too large and is stored in blob storage, extract the blob name from the ExecutionStarted history entity.
if (this.ExceedsMaxTablePropertySize(runtimeState.Input))
{
string filter = $"{nameof(ITableEntity.PartitionKey)} eq '{KeySanitation.EscapePartitionKey(instanceId)}'" +
$" and {nameof(OrchestrationInstance.ExecutionId)} eq '{executionId}'" +
$" and {nameof(HistoryEvent.EventType)} eq '{nameof(EventType.ExecutionStarted)}'";
TableEntity executionStartedEntity = GetSingleEntityFromHistoryTableResults(await this.QueryHistoryAsync(filter, instanceId, cancellationToken), "input");
this.SetInstancesTablePropertyFromHistoryProperty(
executionStartedEntity,
instanceEntity,
historyPropertyName: nameof(executionStartedEvent.Input),
instancePropertyName: InputProperty,
data: executionStartedEvent.Input);
}
else
{
instanceEntity[InputProperty] = runtimeState.Input;
}
}

Stopwatch orchestrationInstanceUpdateStopwatch = Stopwatch.StartNew();
await this.InstancesTable.InsertOrMergeEntityAsync(instanceEntity);
Expand Down Expand Up @@ -1161,7 +1230,7 @@ void SetInstancesTablePropertyFromHistoryProperty(
}
}

async Task CompressLargeMessageAsync(TableEntity entity, List<string> listOfBlobs, CancellationToken cancellationToken)
async Task CompressLargeMessageAsync(TableEntity entity, List<string> listOfBlobs, CancellationToken cancellationToken, bool addBlobPropertyName = true)
{
foreach (string propertyName in VariableSizeEntityProperties)
{
Expand All @@ -1176,9 +1245,16 @@ property is string stringProperty &&

// Clear out the original property value and create a new "*BlobName"-suffixed property.
// The runtime will look for the new "*BlobName"-suffixed column to know if a property is stored in a blob.
string blobPropertyName = GetBlobPropertyName(propertyName);
entity.Add(blobPropertyName, blobName);
entity[propertyName] = string.Empty;
if (addBlobPropertyName)
{
string blobPropertyName = GetBlobPropertyName(propertyName);
entity.Add(blobPropertyName, blobName);
entity[propertyName] = string.Empty;
}
else
{
entity[propertyName] = this.messageManager.GetBlobUrl(blobName);
}

// if necessary, keep track of all the blobs associated with this execution
listOfBlobs?.Add(blobName);
Expand Down Expand Up @@ -1226,6 +1302,12 @@ static string GetBlobName(TableEntity entity, string property)
// EventType. Use a hardcoded value to record the orchestration input.
eventType = "Input";
}
else if (property == "Output")
{
// This message is used to terminate an orchestration with no history, so it does not have a
// corresponding EventType. Use a hardcoded value to record the orchestration output.
eventType = "Output";
}
else if (property == "Tags")
{
eventType = "Tags";
Expand Down
12 changes: 5 additions & 7 deletions src/DurableTask.AzureStorage/Tracking/ITrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -105,16 +105,15 @@ interface ITrackingStore

/// <summary>
/// Updates the instance status of the specified orchestration instance to match that of <paramref name="runtimeState"/> for a completed orchestration.
/// Also deletes any orphaned blobs of <paramref name="trackingStoreContext"/>.
/// This method is meant to be called in the case that there is an inconsistency between the instance and history table due to a failure during a call to
/// <see cref="UpdateStateAsync"/> for a completing orchestration. If the orchestration is not in a terminal state, the method will immediately return and do nothing.
/// </summary>
/// <param name="instanceId">The ID of the orchestration.</param>
/// <param name="executionId">The execution ID of the orchestration.</param>
/// <param name="runtimeState">The runtime state of the orchestration.</param>
/// <param name="trackingStoreContext">Additional context for the execution that is maintained by the tracking store.</param>
/// <param name="instanceEntityExists">Whether the instance entity already exists in the instance store.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, object trackingStoreContext, CancellationToken cancellationToken = default);
Task UpdateInstanceStatusForCompletedOrchestrationAsync(string instanceId, string executionId, OrchestrationRuntimeState runtimeState, bool instanceEntityExists, CancellationToken cancellationToken = default);

/// <summary>
/// Get The Orchestration State for querying all orchestration instances
Expand Down Expand Up @@ -166,13 +165,12 @@ interface ITrackingStore
Task UpdateStatusForRewindAsync(string instanceId, CancellationToken cancellationToken = default);

/// <summary>
/// Used to update the instance status to "Terminated" whend a pending orchestration is terminated.
/// Used to update the instance status to "Terminated" when a pending orchestration is terminated.
/// </summary>
/// <param name="instanceId">The instance being terminated</param>
/// <param name="output">The output of the orchestration</param>
/// <param name="lastUpdatedTime">The last updated time of the orchestration (the time the termination request was created)</param>
/// <param name="executionTerminatedEvent">The termination history event.</param>
/// <param name="cancellationToken">The token to monitor for cancellation requests. The default value is <see cref="CancellationToken.None"/>.</param>
Task UpdateStatusForTerminationAsync(string instanceId, string output, DateTime lastUpdatedTime, CancellationToken cancellationToken = default);
Task UpdateStatusForTerminationAsync(string instanceId, ExecutionTerminatedEvent executionTerminatedEvent, CancellationToken cancellationToken = default);

/// <summary>
/// Purge The History and state which is older than thresholdDateTimeUtc based on the timestamp type specified by timeRangeFilterType
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -179,24 +179,23 @@ await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[]

public override async Task UpdateStatusForTerminationAsync(
string instanceId,
string output,
DateTime lastUpdatedTime,
ExecutionTerminatedEvent executionTerminatedEvent,
CancellationToken cancellationToken = default)
{
// Get the most recent execution and update its status to terminated
IEnumerable<OrchestrationStateInstanceEntity> instanceEntity = await this.instanceStore.GetOrchestrationStateAsync(instanceId, allInstances: false);
instanceEntity.Single().State.OrchestrationStatus = OrchestrationStatus.Terminated;
instanceEntity.Single().State.LastUpdatedTime = lastUpdatedTime;
instanceEntity.Single().State.LastUpdatedTime = executionTerminatedEvent.Timestamp;
instanceEntity.Single().State.CompletedTime = DateTime.UtcNow;
instanceEntity.Single().State.Output = output;
instanceEntity.Single().State.Output = executionTerminatedEvent.Input;
await this.instanceStore.WriteEntitiesAsync(instanceEntity);
}

public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForCompletedOrchestrationAsync(
public override async Task UpdateInstanceStatusForCompletedOrchestrationAsync(
string instanceId,
string executionId,
OrchestrationRuntimeState runtimeState,
object trackingStoreContext,
bool instanceEntityExists,
CancellationToken cancellationToken = default)
{
if (runtimeState.OrchestrationStatus != OrchestrationStatus.Completed &&
Expand All @@ -207,7 +206,6 @@ public override async Task UpdateInstanceStatusAndDeleteOrphanedBlobsForComplete
return;
}

// No blobs to delete for this tracking store implementation
await instanceStore.WriteEntitiesAsync(new InstanceEntityBase[]
{
new OrchestrationStateInstanceEntity()
Expand Down
Loading
Loading