Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
40 commits
Select commit Hold shift + click to select a range
22531fe
naive poison message handler
davidmrdavid Apr 12, 2024
748b279
incorporate feedback
davidmrdavid Apr 13, 2024
40a00dd
add suffix, change to terminated
davidmrdavid Apr 15, 2024
b1b7fba
more changes to get poison message handling working E2E. It's hackier…
davidmrdavid Apr 16, 2024
b1808a1
simplify implementation
davidmrdavid Apr 16, 2024
45d523b
remove commented out code
davidmrdavid Apr 16, 2024
82e3531
remove csproj changes
davidmrdavid Apr 16, 2024
adf4579
undo change in message manager deps
davidmrdavid Apr 16, 2024
d20bb7e
undo csproj changeS
davidmrdavid Apr 16, 2024
40baca0
add activity pmh as well
davidmrdavid Apr 16, 2024
f896364
make configurable
davidmrdavid Apr 16, 2024
cef1410
move poison message handler to superclass
davidmrdavid Apr 16, 2024
eeea159
remove unecessary imports
davidmrdavid Apr 16, 2024
961d64b
remove unecessary import
davidmrdavid Apr 16, 2024
5dfe896
simplify code a bit
davidmrdavid Apr 16, 2024
4a25c5b
remove unused variable
davidmrdavid Apr 16, 2024
8afbfc2
simplify and unify guidance
davidmrdavid Apr 16, 2024
9057bfd
improve guidance
davidmrdavid Apr 16, 2024
6866828
call out backend-specificness
davidmrdavid Apr 16, 2024
b0d739c
clean up PR
davidmrdavid Apr 16, 2024
71e0b36
clean up csproj
davidmrdavid Apr 16, 2024
5934076
indent csproj comment
davidmrdavid Apr 16, 2024
a94cc4e
remove unused import
davidmrdavid Apr 16, 2024
37dbac4
have valid table-naming scheme
davidmrdavid Apr 18, 2024
865aa20
add log
davidmrdavid Apr 18, 2024
57bb966
add comments
davidmrdavid Apr 18, 2024
6c3bb79
create valid serializable activity failure
davidmrdavid Apr 18, 2024
b15dbb5
handle de-serialization errors as well
davidmrdavid Jun 14, 2024
cbb8274
add version suffix
davidmrdavid Jun 25, 2024
2acadbe
resolve conflicts
davidmrdavid Jun 25, 2024
16f38f1
rev patch
davidmrdavid Jun 25, 2024
74dc0f7
add dtfx.core
davidmrdavid Jun 25, 2024
584cf8d
merge mixed deserializtion hotfix
davidmrdavid Jun 27, 2024
51978a0
add imports
davidmrdavid Jun 27, 2024
65c29c4
pass nullable analysis
davidmrdavid Jun 27, 2024
de7e46b
make hotfix always occur
davidmrdavid Jun 27, 2024
a8b24e5
move nullable analysis
davidmrdavid Jun 27, 2024
a746b1e
make hotfix conditional on setting
davidmrdavid Jun 27, 2024
d219ffa
match diffs
davidmrdavid Jun 28, 2024
b2e1f0c
make hotfix always run
davidmrdavid Jun 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,11 @@ public class AzureStorageOrchestrationServiceSettings
/// </summary>
public int MaxConcurrentTaskEntityWorkItems { get; set; } = 100;

/// <summary>
/// Gets or sets the maximum dequeue count of any message before it is flagged as a "poison message".
/// The default value is 20.
/// </summary>
public int PoisonMessageDeuqueCountThreshold { get; set; } = 20;
/// <summary>
/// Gets or sets the maximum number of concurrent storage operations that can be executed in the context
/// of a single orchestration instance.
Expand Down
57 changes: 21 additions & 36 deletions src/DurableTask.AzureStorage/DataContractJsonConverter.cs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ namespace DurableTask.AzureStorage
/// </summary>
internal class DataContractJsonConverter : JsonConverter
{
public JsonSerializer alternativeSerializer = null;

public override bool CanConvert(Type objectType)
{
if (objectType == null)
Expand Down Expand Up @@ -59,50 +61,33 @@ public override object ReadJson(
throw new ArgumentNullException(nameof(serializer));
}

using (var stream = new MemoryStream())
using (var writer = new StreamWriter(stream))
using (var jsonWriter = new JsonTextWriter(writer))
// JsonReader is forward only, need to make a copy so we can read it twice.
using var stream = new MemoryStream();
using var writer = new StreamWriter(stream);
using var jsonWriter = new JsonTextWriter(writer);
jsonWriter.WriteToken(reader, writeChildren: true);
jsonWriter.Flush();
stream.Position = 0;

try
{
using var reader2 = new JsonTextReader(new StreamReader(stream));
reader2.CloseInput = false;
return this.alternativeSerializer.Deserialize(reader2, objectType);
}
catch
{
jsonWriter.WriteToken(reader, writeChildren: true);
jsonWriter.Flush();
stream.Position = 0;

var contractSerializer = CreateSerializer(objectType, serializer);
DataContractJsonSerializer contractSerializer = CreateSerializer(objectType, serializer);
return contractSerializer.ReadObject(stream);
}
}

/// <inheritdoc />
public override void WriteJson(JsonWriter writer, object value, JsonSerializer serializer)
{
if (writer == null)
{
throw new ArgumentNullException(nameof(writer));
}

if (value == null)
{
writer.WriteNull();
return;
}

if (serializer == null)
{
throw new ArgumentNullException(nameof(serializer));
}

using (var memoryStream = new MemoryStream())
{
var contractSerializer = CreateSerializer(value.GetType(), serializer);
contractSerializer.WriteObject(memoryStream, value);
memoryStream.Position = 0;

using (var streamReader = new StreamReader(memoryStream))
using (var jsonReader = new JsonTextReader(streamReader))
{
writer.WriteToken(jsonReader, writeChildren: true);
}
}
// Ignore data contract, use Newtonsoft
this.alternativeSerializer.Serialize(writer, value);
}

private static DataContractJsonSerializer CreateSerializer(Type type, JsonSerializer serializer)
Expand All @@ -115,4 +100,4 @@ private static DataContractJsonSerializer CreateSerializer(Type type, JsonSerial
});
}
}
}
}
3 changes: 2 additions & 1 deletion src/DurableTask.AzureStorage/DurableTask.AzureStorage.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,9 @@
<PropertyGroup>
<MajorVersion>1</MajorVersion>
<MinorVersion>17</MinorVersion>
<PatchVersion>3</PatchVersion>
<PatchVersion>4</PatchVersion>
<VersionPrefix>$(MajorVersion).$(MinorVersion).$(PatchVersion)</VersionPrefix>
<VersionSuffix>poisonmessagehandler.4</VersionSuffix>
<FileVersion>$(VersionPrefix).0</FileVersion>
<!-- FileVersionRevision is expected to be set by the CI. This is useful for distinguishing between multiple builds of the same version. -->
<FileVersion Condition="'$(FileVersionRevision)' != ''">$(VersionPrefix).$(FileVersionRevision)</FileVersion>
Expand Down
5 changes: 3 additions & 2 deletions src/DurableTask.AzureStorage/EntityTrackingStoreQueries.cs
Original file line number Diff line number Diff line change
Expand Up @@ -235,9 +235,10 @@ bool OrchestrationIsRunning(OrchestrationStatus? status)
{
// first, retrieve the entity scheduler state (= input of the orchestration state), possibly from blob storage.
string serializedSchedulerState;
if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri blobUrl))
if (MessageManager.TryGetLargeMessageReference(state.Input, out Uri? blobUrl))
{
serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl);
// we know blobUrl is not null because TryGetLargeMessageReference returned true
serializedSchedulerState = await this.messageManager.DownloadAndDecompressAsBytesAsync(blobUrl!);
Comment on lines +240 to +241
Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is here because I added nullable analysis

}
else
{
Expand Down
23 changes: 13 additions & 10 deletions src/DurableTask.AzureStorage/MessageManager.cs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@
// See the License for the specific language governing permissions and
// limitations under the License.
// ----------------------------------------------------------------------------------

namespace DurableTask.AzureStorage
{
using System;
Expand Down Expand Up @@ -50,7 +49,7 @@ public MessageManager(
{
this.settings = settings;
this.azureStorageClient = azureStorageClient;
this.blobContainer = this.azureStorageClient.GetBlobContainerReference(blobContainerName);
this.blobContainer = this.azureStorageClient?.GetBlobContainerReference(blobContainerName);
this.taskMessageSerializerSettings = new JsonSerializerSettings
{
TypeNameHandling = TypeNameHandling.Objects,
Expand All @@ -61,17 +60,19 @@ public MessageManager(
#endif
};

if (this.settings.UseDataContractSerialization)
{
this.taskMessageSerializerSettings.Converters.Add(new DataContractJsonConverter());
}
JsonSerializer newtonSoftSerializer = JsonSerializer.Create(taskMessageSerializerSettings);

// make hotfix always present
var dataConverter = new DataContractJsonConverter();
dataConverter.alternativeSerializer = newtonSoftSerializer;
this.taskMessageSerializerSettings.Converters.Add(dataConverter);

// We _need_ to create the Json serializer after providing the data converter,
// otherwise the converters will be ignored.
this.serializer = JsonSerializer.Create(taskMessageSerializerSettings);

}

#nullable enable
public async Task<bool> EnsureContainerAsync()
{
bool created = false;
Expand Down Expand Up @@ -122,17 +123,18 @@ public async Task<string> SerializeMessageDataAsync(MessageData messageData)
/// <returns>Actual string representation of message.</returns>
public async Task<string> FetchLargeMessageIfNecessary(string message)
{
if (TryGetLargeMessageReference(message, out Uri blobUrl))
if (TryGetLargeMessageReference(message, out Uri? blobUrl))
{
return await this.DownloadAndDecompressAsBytesAsync(blobUrl);
// we know blobUrl is not null because TryGetLargeMessageReference returned true
return await this.DownloadAndDecompressAsBytesAsync(blobUrl!);
}
else
{
return message;
}
}

internal static bool TryGetLargeMessageReference(string messagePayload, out Uri blobUrl)
internal static bool TryGetLargeMessageReference(string messagePayload, out Uri? blobUrl)
{
if (Uri.IsWellFormedUriString(messagePayload, UriKind.Absolute))
{
Expand Down Expand Up @@ -318,6 +320,7 @@ public async Task<int> DeleteLargeMessageBlobs(string sanitizedInstanceId)
return storageOperationCount;
}
}
#nullable disable

#if NETSTANDARD2_0
class TypeNameSerializationBinder : ISerializationBinder
Expand Down
Loading