Skip to content

Commit 20d95b6

Browse files
committed
addressed PR feedback
1 parent 709fd5f commit 20d95b6

5 files changed

Lines changed: 119 additions & 17 deletions

File tree

client/src/main/java/com/microsoft/durabletask/OrchestrationHistoryEventMapper.java

Lines changed: 12 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,8 @@
2222
*/
2323
final class OrchestrationHistoryEventMapper {
2424

25+
private static final int MAX_PROTO_NESTING_DEPTH = 10;
26+
2527
private static final Map<HistoryEvent.EventTypeCase, String> EVENT_TYPE_NAMES = new HashMap<>();
2628

2729
static {
@@ -104,9 +106,9 @@ private static Map<String, Object> extractEventData(
104106
// Proto map fields appear as repeated MapEntry messages in reflection;
105107
// convert them to plain Java Maps using the field descriptor.
106108
if (field.isMapField()) {
107-
data.put(field.getJsonName(), convertProtoMapField(field, value));
109+
data.put(field.getJsonName(), convertProtoMapField(field, value, 0));
108110
} else {
109-
data.put(field.getJsonName(), convertProtoValue(value));
111+
data.put(field.getJsonName(), convertProtoValue(value, 0));
110112
}
111113
}
112114

@@ -148,7 +150,7 @@ private static Message getEventMessage(HistoryEvent proto, HistoryEvent.EventTyp
148150
}
149151

150152
@SuppressWarnings("unchecked")
151-
private static Map<String, Object> convertProtoMapField(Descriptors.FieldDescriptor field, Object value) {
153+
private static Map<String, Object> convertProtoMapField(Descriptors.FieldDescriptor field, Object value, int depth) {
152154
// Proto map fields are represented as List<MapEntry> in reflection.
153155
// Each MapEntry is a Message with "key" and "value" fields.
154156
List<Message> entries = (List<Message>) value;
@@ -157,13 +159,16 @@ private static Map<String, Object> convertProtoMapField(Descriptors.FieldDescrip
157159
Descriptors.FieldDescriptor valueField = field.getMessageType().findFieldByName("value");
158160
for (Message entry : entries) {
159161
String key = String.valueOf(entry.getField(keyField));
160-
Object val = convertProtoValue(entry.getField(valueField));
162+
Object val = convertProtoValue(entry.getField(valueField), depth + 1);
161163
result.put(key, val);
162164
}
163165
return result;
164166
}
165167

166-
private static Object convertProtoValue(Object value) {
168+
private static Object convertProtoValue(Object value, int depth) {
169+
if (depth > MAX_PROTO_NESTING_DEPTH) {
170+
return "[nested object truncated at max depth " + MAX_PROTO_NESTING_DEPTH + "]";
171+
}
167172
if (value instanceof Timestamp) {
168173
Timestamp ts = (Timestamp) value;
169174
return toInstant(ts).toString();
@@ -182,7 +187,7 @@ private static Object convertProtoValue(Object value) {
182187
List<?> protoList = (List<?>) value;
183188
List<Object> converted = new ArrayList<>(protoList.size());
184189
for (Object item : protoList) {
185-
converted.add(convertProtoValue(item));
190+
converted.add(convertProtoValue(item, depth + 1));
186191
}
187192
return converted;
188193
}
@@ -191,7 +196,7 @@ private static Object convertProtoValue(Object value) {
191196
Map<String, Object> nested = new LinkedHashMap<>();
192197
for (Map.Entry<Descriptors.FieldDescriptor, Object> entry :
193198
((Message) value).getAllFields().entrySet()) {
194-
nested.put(entry.getKey().getJsonName(), convertProtoValue(entry.getValue()));
199+
nested.put(entry.getKey().getJsonName(), convertProtoValue(entry.getValue(), depth + 1));
195200
}
196201
return nested;
197202
}

exporthistory/src/main/java/com/microsoft/durabletask/exporthistory/client/DefaultExportHistoryJobClient.java

Lines changed: 18 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -169,20 +169,34 @@ private void terminateAndPurgeOrchestration(String orchestrationInstanceId) {
169169
/**
170170
* Detects "instance not found" responses from the sidecar without taking a hard dependency on
171171
* gRPC types. The Durable Task gRPC client surfaces "not found" as a {@code StatusRuntimeException}
172-
* with code {@code NOT_FOUND}; we match against the exception's {@code toString()} which embeds
173-
* both. This mirrors .NET's {@code catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound)}.
172+
* with code {@code NOT_FOUND}; we check the status code via reflection for exactness. Mirrors
173+
* .NET's {@code catch (RpcException ex) when (ex.StatusCode == StatusCode.NotFound)}.
174174
*/
175175
private static boolean isNotFound(Throwable t) {
176176
while (t != null) {
177177
String name = t.getClass().getName();
178178
if ("io.grpc.StatusRuntimeException".equals(name) || "io.grpc.StatusException".equals(name)) {
179-
String msg = String.valueOf(t.getMessage());
180-
if (msg.contains("NOT_FOUND")) {
179+
if (hasGrpcStatusCode(t, "NOT_FOUND")) {
181180
return true;
182181
}
183182
}
184183
t = t.getCause();
185184
}
186185
return false;
187186
}
187+
188+
/**
189+
* Extracts the gRPC status code name via reflection and compares it to {@code expected}.
190+
* Falls back to a {@code "CODE: "} message prefix check if reflection is unavailable.
191+
*/
192+
private static boolean hasGrpcStatusCode(Throwable t, String expected) {
193+
try {
194+
Object status = t.getClass().getMethod("getStatus").invoke(t);
195+
Object code = status.getClass().getMethod("getCode").invoke(status);
196+
return expected.equals(code.toString());
197+
} catch (ReflectiveOperationException ignored) {
198+
// Fallback: gRPC formats messages as "CODE: description"
199+
return String.valueOf(t.getMessage()).startsWith(expected + ":");
200+
}
201+
}
188202
}

exporthistory/src/main/java/com/microsoft/durabletask/exporthistory/models/ExportJobCreationOptions.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@ public ExportJobCreationOptions() {
5555
* @param jobId the job ID, or {@code null} to auto-generate
5656
* @param mode the export mode (BATCH or CONTINUOUS)
5757
* @param completedTimeFrom inclusive start of the completed time window
58-
* @param completedTimeTo exclusive end of the completed time window — must be strictly after
58+
* @param completedTimeTo inclusive end of the completed time window — must be strictly after
5959
* {@code completedTimeFrom} (required for BATCH, null for CONTINUOUS)
6060
* @param destination blob storage destination, or {@code null} to use defaults
6161
* @param format export format, or {@code null} for default (JSONL+gzip)

exporthistory/src/main/java/com/microsoft/durabletask/exporthistory/orchestrations/ExportJobOrchestrator.java

Lines changed: 15 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,7 @@
33
package com.microsoft.durabletask.exporthistory.orchestrations;
44

55
import com.microsoft.durabletask.Task;
6+
import com.microsoft.durabletask.TaskFailedException;
67
import com.microsoft.durabletask.TaskOptions;
78
import com.microsoft.durabletask.TaskOrchestration;
89
import com.microsoft.durabletask.TaskOrchestrationContext;
@@ -129,14 +130,19 @@ public void run(TaskOrchestrationContext ctx) {
129130
// SDK control-flow signals — must propagate so the runtime can suspend/continue.
130131
throw controlFlow;
131132
} catch (Exception ex) {
132-
// Mark as failed
133+
// Mark as failed (best-effort). The inner try MUST re-throw SDK control-flow signals;
134+
// otherwise the entity call's await() block on its first turn will be swallowed and the
135+
// orchestrator will fall through to `throw ex` in the same turn, leaving the ExportJob
136+
// entity stuck in ACTIVE and breaking replay determinism.
133137
try {
134138
ctx.callEntity(input.getJobEntityId(),
135139
ExportJobOperationNames.MARK_AS_FAILED,
136140
ex.getMessage(),
137141
Void.class).await();
142+
} catch (OrchestratorBlockedException | ContinueAsNewInterruption controlFlow) {
143+
throw controlFlow;
138144
} catch (Exception ignored) {
139-
// Best-effort failure marking
145+
// Best-effort failure marking — swallow only genuine application errors
140146
}
141147
throw ex instanceof RuntimeException ? (RuntimeException) ex : new RuntimeException(ex);
142148
}
@@ -213,11 +219,16 @@ List<ExportResult> exportBatch(
213219
ExportResult.class));
214220
}
215221

216-
// Wait for all exports in this chunk before scheduling the next
222+
// Wait for all exports in this chunk before scheduling the next.
223+
// IMPORTANT: only catch TaskFailedException — never a broad Exception. The SDK's
224+
// OrchestratorBlockedException / ContinueAsNewInterruption are control-flow signals
225+
// that MUST propagate so the runtime can suspend/continue the orchestration. Swallowing
226+
// them produces non-deterministic replay (e.g. fabricated failures + spurious timers in
227+
// history that don't match the replayed action sequence).
217228
for (int i = 0; i < exportTasks.size(); i++) {
218229
try {
219230
results.add(exportTasks.get(i).await());
220-
} catch (Exception ex) {
231+
} catch (TaskFailedException ex) {
221232
// Activity failure after all retries — preserve the instance ID for diagnostics
222233
results.add(new ExportResult(chunk.get(i), false, ex.getMessage()));
223234
}

exporthistory/src/test/java/com/microsoft/durabletask/exporthistory/orchestrations/ExportJobOrchestratorChunkingTest.java

Lines changed: 73 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,8 +3,11 @@
33
package com.microsoft.durabletask.exporthistory.orchestrations;
44

55
import com.microsoft.durabletask.Task;
6+
import com.microsoft.durabletask.TaskFailedException;
67
import com.microsoft.durabletask.TaskOptions;
78
import com.microsoft.durabletask.TaskOrchestrationContext;
9+
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
10+
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
811
import com.microsoft.durabletask.exporthistory.models.ExportDestination;
912
import com.microsoft.durabletask.exporthistory.models.ExportFilter;
1013
import com.microsoft.durabletask.exporthistory.models.ExportFormat;
@@ -156,7 +159,9 @@ void exportBatch_activityFailure_capturesFailureWithInstanceId() {
156159
@SuppressWarnings("unchecked")
157160
Task<ExportResult> task = mock(Task.class);
158161
if ("fail-me".equals(req.getInstanceId())) {
159-
when(task.await()).thenThrow(new RuntimeException("activity exhausted retries"));
162+
TaskFailedException tfe = mock(TaskFailedException.class);
163+
when(tfe.getMessage()).thenReturn("activity exhausted retries");
164+
when(task.await()).thenThrow(tfe);
160165
} else {
161166
when(task.await()).thenReturn(new ExportResult(req.getInstanceId(), true, null));
162167
}
@@ -185,4 +190,71 @@ void exportBatch_activityFailure_capturesFailureWithInstanceId() {
185190
assertTrue(results.get(1).getError().contains("activity exhausted retries"));
186191
assertTrue(results.get(2).isSuccess());
187192
}
193+
194+
/**
195+
* A broad {@code catch (Exception)} around {@code Task.await()} would swallow
196+
* {@link OrchestratorBlockedException}, fabricating a "failed" {@link ExportResult} on the
197+
* first orchestrator turn and producing non-deterministic replay. The catch must be narrowed
198+
* to {@link TaskFailedException} so SDK control-flow signals propagate.
199+
*/
200+
@Test
201+
void exportBatch_orchestratorBlockedException_propagates() {
202+
TaskOrchestrationContext ctx = mock(TaskOrchestrationContext.class);
203+
when(ctx.callActivity(eq("ExportInstanceHistoryActivity"), any(ExportRequest.class),
204+
any(TaskOptions.class), eq(ExportResult.class)))
205+
.thenAnswer((InvocationOnMock inv) -> {
206+
@SuppressWarnings("unchecked")
207+
Task<ExportResult> task = mock(Task.class);
208+
when(task.await()).thenThrow(
209+
new OrchestratorBlockedException("awaiting incomplete task"));
210+
return task;
211+
});
212+
213+
List<String> instanceIds = new ArrayList<>();
214+
instanceIds.add("i-1");
215+
instanceIds.add("i-2");
216+
217+
ExportJobConfiguration config = new ExportJobConfiguration(
218+
ExportMode.BATCH,
219+
new ExportFilter(),
220+
new ExportDestination("test-container", null),
221+
ExportFormat.DEFAULT,
222+
32,
223+
100);
224+
225+
assertThrows(OrchestratorBlockedException.class,
226+
() -> new ExportJobOrchestrator().exportBatch(ctx, instanceIds, config),
227+
"OrchestratorBlockedException must propagate so the runtime can suspend");
228+
}
229+
230+
/**
231+
* Companion to the OrchestratorBlockedException test — {@link ContinueAsNewInterruption}
232+
* is the other SDK control-flow signal that must never be swallowed by user code.
233+
*/
234+
@Test
235+
void exportBatch_continueAsNewInterruption_propagates() {
236+
TaskOrchestrationContext ctx = mock(TaskOrchestrationContext.class);
237+
when(ctx.callActivity(eq("ExportInstanceHistoryActivity"), any(ExportRequest.class),
238+
any(TaskOptions.class), eq(ExportResult.class)))
239+
.thenAnswer((InvocationOnMock inv) -> {
240+
@SuppressWarnings("unchecked")
241+
Task<ExportResult> task = mock(Task.class);
242+
when(task.await()).thenThrow(new ContinueAsNewInterruption("continue-as-new"));
243+
return task;
244+
});
245+
246+
List<String> instanceIds = new ArrayList<>();
247+
instanceIds.add("i-1");
248+
249+
ExportJobConfiguration config = new ExportJobConfiguration(
250+
ExportMode.BATCH,
251+
new ExportFilter(),
252+
new ExportDestination("test-container", null),
253+
ExportFormat.DEFAULT,
254+
32,
255+
100);
256+
257+
assertThrows(ContinueAsNewInterruption.class,
258+
() -> new ExportJobOrchestrator().exportBatch(ctx, instanceIds, config));
259+
}
188260
}

0 commit comments

Comments
 (0)