Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
20 changes: 16 additions & 4 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,23 @@ public Task<bool> DeleteBlobAsync(string blobName, CancellationToken cancellatio

private async Task<string> DownloadAndDecompressAsBytesAsync(Blob blob, CancellationToken cancellationToken = default)
{
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken);
using GZipStream decompressedBlobStream = new GZipStream(result.Content, CompressionMode.Decompress);
using StreamReader reader = new StreamReader(decompressedBlobStream, Encoding.UTF8);
try
{
using BlobDownloadStreamingResult result = await blob.DownloadStreamingAsync(cancellationToken);
using GZipStream decompressedBlobStream = new GZipStream(result.Content, CompressionMode.Decompress);
using StreamReader reader = new StreamReader(decompressedBlobStream, Encoding.UTF8);

return await reader.ReadToEndAsync();
}
catch (Exception)
{
this.settings.Logger.GeneralWarning(
this.azureStorageClient.BlobAccountName,
this.settings.TaskHubName,
$"Failed to download or decompress blob {blob.Name}.");

return await reader.ReadToEndAsync();
throw;
}
}

public string GetBlobUrl(string blobName)
Expand Down
24 changes: 14 additions & 10 deletions src/DurableTask.AzureStorage/Tracking/AzureTableTrackingStore.cs
Original file line number Diff line number Diff line change
Expand Up @@ -155,14 +155,21 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
.GetHistoryEntitiesResponseInfoAsync(instanceId, expectedExecutionId, null, cancellationToken)
.GetResultsAsync(cancellationToken: cancellationToken);

// The sentinel row should always be the last row
TableEntity sentinel = results.Entities.LastOrDefault(e => e.RowKey == SentinelRowKey);

IList<HistoryEvent> historyEvents;
string executionId;
TableEntity sentinel = null;
TrackingStoreContext trackingStoreContext = new TrackingStoreContext();
if (results.Entities.Count > 0)

// If expectedExecutionId is provided but it does not match the sentinel executionId,
// it may belong to a previous generation. In that case, treat it as an unknown executionId
// and skip loading history.
if (results.Entities.Count > 0 && (expectedExecutionId == null ||
expectedExecutionId == sentinel?.GetString("ExecutionId")))
{
// The most recent generation will always be in the first history event.
executionId = results.Entities[0].GetString("ExecutionId");
executionId = sentinel?.GetString("ExecutionId") ?? results.Entities[0].GetString("ExecutionId");

// Convert the table entities into history events.
var events = new List<HistoryEvent>(results.Entities.Count);
Expand All @@ -175,11 +182,9 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
break;
}

// The sentinel row does not contain any history events, so save it for later
// and continue
if (entity.RowKey == SentinelRowKey)
// The sentinel row does not contain any history events, so ignore and continue
if (entity == sentinel)
{
sentinel = entity;
continue;
}

Expand All @@ -197,11 +202,10 @@ public override async Task<OrchestrationHistory> GetHistoryEventsAsync(string in
executionId = expectedExecutionId;
}

// Read the checkpoint completion time from the sentinel row, which should always be the last row.
// Read the checkpoint completion time from the sentinel row.
// A sentinel won't exist only if no instance of this ID has ever existed or the instance history
// was purged.The IsCheckpointCompleteProperty was newly added _after_ v1.6.4.
// was purged. The IsCheckpointCompleteProperty was newly added _after_ v1.6.4.
DateTime checkpointCompletionTime = DateTime.MinValue;
sentinel = sentinel ?? results.Entities.LastOrDefault(e => e.RowKey == SentinelRowKey);
ETag? eTagValue = sentinel?.ETag;
if (sentinel != null &&
sentinel.TryGetValue(CheckpointCompletedTimestampProperty, out object timestampObj) &&
Expand Down
Loading