Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
13 changes: 10 additions & 3 deletions src/Client/Core/DurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -369,7 +369,14 @@ public virtual Task<PurgeResult> PurgeInstanceAsync(string instanceId, Cancellat
/// This method returns a <see cref="PurgeResult"/> object after the operation has completed with a
/// <see cref="PurgeResult.PurgedInstanceCount"/> indicating the number of orchestration instances that were purged,
/// including the count of sub-orchestrations purged if any.
/// </returns>
/// </returns>
/// <exception cref="ArgumentNullException">Thrown if <paramref name="instanceId"/> is null.</exception>
/// <exception cref="ArgumentException">Thrown if <paramref name="instanceId"/> is empty or starts with the null character.</exception>
/// <exception cref="InvalidOperationException">Thrown if the orchestration is not in a
/// <see cref="OrchestrationRuntimeStatus.Completed"/>, <see cref="OrchestrationRuntimeStatus.Failed"/>,
/// or <see cref="OrchestrationRuntimeStatus.Terminated"/> state.</exception>
/// <exception cref="NotImplementedException">Thrown if the backend storage provider does not support purging instances.</exception>
/// <exception cref="OperationCanceledException">Thrown if the operation is canceled via the <paramref name="cancellation"/> token.</exception>
public virtual Task<PurgeResult> PurgeInstanceAsync(
string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
Expand Down Expand Up @@ -447,7 +454,7 @@ public virtual Task<string> RestartAsync(
/// The orchestration's history will be replaced with a new history that excludes the failed Activities and suborchestrations,
/// and a new execution ID will be generated for the rewound orchestration instance. As the failed Activities and suborchestrations
/// re-execute, the history will be appended with new TaskScheduled, TaskCompleted, and SubOrchestrationInstanceCompleted events.
/// Note that only orchestrations in a "Failed" state can be rewound.
/// Note that only orchestrations in a <see cref="OrchestrationRuntimeStatus.Failed"/> state can be rewound.
/// </remarks>
/// <param name="instanceId">The instance ID of the orchestration to rewind.</param>
/// <param name="reason">The reason for the rewind.</param>
Expand All @@ -460,7 +467,7 @@ public virtual Task<string> RestartAsync(
/// <exception cref="ArgumentException">Thrown if an orchestration with the specified <paramref name="instanceId"/> does not exist,
/// or if <paramref name="instanceId"/> is the instance ID of an entity.</exception>
/// <exception cref="InvalidOperationException">Thrown if a precondition of the operation fails, for example if the specified
/// orchestration is not in a "Failed" state.</exception>
/// orchestration is not in a <see cref="OrchestrationRuntimeStatus.Failed"/> state.</exception>
/// <exception cref="OperationCanceledException">Thrown if the operation is canceled via the <paramref name="cancellation"/> token.</exception>
public virtual Task RewindInstanceAsync(
string instanceId,
Expand Down
16 changes: 15 additions & 1 deletion src/Client/Grpc/GrpcDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -394,10 +394,16 @@ public override async Task<OrchestrationMetadata> WaitForInstanceCompletionAsync
public override Task<PurgeResult> PurgeInstanceAsync(
string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
Check.NotNullOrEmpty(instanceId);
bool recursive = options?.Recursive ?? false;
this.logger.PurgingInstanceMetadata(instanceId);

P.PurgeInstancesRequest request = new() { InstanceId = instanceId, Recursive = recursive };
P.PurgeInstancesRequest request = new()
{
InstanceId = instanceId,
Recursive = recursive,
IsOrchestration = !this.options.EnableEntitySupport || instanceId[0] != '@',
};
return this.PurgeInstancesCoreAsync(request, cancellation);
}

Expand Down Expand Up @@ -598,6 +604,14 @@ async Task<PurgeResult> PurgeInstancesCoreAsync(
throw new OperationCanceledException(
$"The {nameof(this.PurgeAllInstancesAsync)} operation was canceled.", e, cancellation);
}
catch (RpcException e) when (e.StatusCode == StatusCode.FailedPrecondition)
{
throw new InvalidOperationException(e.Status.Detail);
}
catch (RpcException e) when (e.StatusCode == StatusCode.Unimplemented)
{
throw new NotImplementedException(e.Status.Detail);
}
}

OrchestrationMetadata CreateMetadata(P.OrchestrationState state, bool includeInputsAndOutputs)
Expand Down
15 changes: 15 additions & 0 deletions src/Client/OrchestrationServiceClientShim/ShimDurableTaskClient.cs
Original file line number Diff line number Diff line change
Expand Up @@ -122,6 +122,21 @@ public override async Task<PurgeResult> PurgeInstanceAsync(
string instanceId, PurgeInstanceOptions? options = null, CancellationToken cancellation = default)
{
Check.NotNullOrEmpty(instanceId);
OrchestrationMetadata? orchestrationState = await this.GetInstanceAsync(instanceId, cancellation);

// The orchestration doesn't exist, nothing to purge
if (orchestrationState == null)
{
return new PurgeResult(0);
}

bool isEntity = this.options.EnableEntitySupport && instanceId[0] == '@';
if (!isEntity && !orchestrationState.IsCompleted)
{
throw new InvalidOperationException($"Only orchestrations in a terminal state can be purged, " +
$"but the orchestration with instance ID {instanceId} has status {orchestrationState.RuntimeStatus}");
}

cancellation.ThrowIfCancellationRequested();

// TODO: Support recursive purge of sub-orchestrations
Expand Down
26 changes: 25 additions & 1 deletion src/Grpc/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -115,6 +115,7 @@ message SubOrchestrationInstanceCreatedEvent {
google.protobuf.StringValue version = 3;
google.protobuf.StringValue input = 4;
TraceContext parentTraceContext = 5;
map<string, string> tags = 6;
}

message SubOrchestrationInstanceCompletedEvent {
Expand Down Expand Up @@ -225,6 +226,11 @@ message ExecutionRewoundEvent {
google.protobuf.StringValue parentExecutionId = 2; // used only for rewinding suborchestrations, null otherwise
google.protobuf.StringValue instanceId = 3; // used only for rewinding suborchestrations, null otherwise
TraceContext parentTraceContext = 4; // used only for rewinding suborchestrations, null otherwise
google.protobuf.StringValue name = 5; // used by DTS backend only
google.protobuf.StringValue version = 6; // used by DTS backend only
google.protobuf.StringValue input = 7; // used by DTS backend only
ParentInstanceInfo parentInstance = 8; // used by DTS backend only
map<string, string> tags = 9; // used by DTS backend only
}

message HistoryEvent {
Expand Down Expand Up @@ -483,13 +489,28 @@ message QueryInstancesResponse {
google.protobuf.StringValue continuationToken = 2;
}

message ListInstanceIdsRequest {
repeated OrchestrationStatus runtimeStatus = 1;
google.protobuf.Timestamp completedTimeFrom = 2;
google.protobuf.Timestamp completedTimeTo = 3;
int32 pageSize = 4;
google.protobuf.StringValue lastInstanceKey = 5;
}

message ListInstanceIdsResponse {
repeated string instanceIds = 1;
google.protobuf.StringValue lastInstanceKey = 2;
}

message PurgeInstancesRequest {
oneof request {
string instanceId = 1;
PurgeInstanceFilter purgeInstanceFilter = 2;
InstanceBatch instanceBatch = 4;
}
bool recursive = 3;
// used in the case when an instanceId is specified to determine if the purge request is for an orchestration (as opposed to an entity)
bool isOrchestration = 5;
}

message PurgeInstanceFilter {
Expand Down Expand Up @@ -750,6 +771,9 @@ service TaskHubSidecarService {
// rpc DeleteInstance(DeleteInstanceRequest) returns (DeleteInstanceResponse);

rpc QueryInstances(QueryInstancesRequest) returns (QueryInstancesResponse);

rpc ListInstanceIds(ListInstanceIdsRequest) returns (ListInstanceIdsResponse);

rpc PurgeInstances(PurgeInstancesRequest) returns (PurgeInstancesResponse);

rpc GetWorkItems(GetWorkItemsRequest) returns (stream WorkItem);
Expand Down Expand Up @@ -854,4 +878,4 @@ message HistoryChunk {
message InstanceBatch {
// A maximum of 500 instance IDs can be provided in this list.
repeated string instanceIds = 1;
}
}
4 changes: 2 additions & 2 deletions src/Grpc/versions.txt
Original file line number Diff line number Diff line change
@@ -1,2 +1,2 @@
# The following files were downloaded from branch main at 2025-12-29 22:13:55 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/b7e260ad7b84740a2ed5cb4600ce73bef702a979/protos/orchestrator_service.proto
# The following files were downloaded from branch main at 2026-01-13 00:01:21 UTC
https://raw.githubusercontent.com/microsoft/durabletask-protobuf/026329c53fe6363985655857b9ca848ec7238bd2/protos/orchestrator_service.proto
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,9 @@ public async Task GetInstances_Results(bool getInputs)
public async Task PurgeInstanceMetadata()
{
// arrange
string instanceId = Guid.NewGuid().ToString();
List<Core.OrchestrationState> states = [CreateState("input", "output")];
string instanceId = states.First().OrchestrationInstance.InstanceId;
this.orchestrationClient.Setup(m => m.GetOrchestrationStateAsync(instanceId, false)).ReturnsAsync(states);
this.purgeClient.Setup(m => m.PurgeInstanceStateAsync(instanceId)).ReturnsAsync(new Core.PurgeResult(1));

// act
Expand Down
Loading