Skip to content

Commit 575a295

Browse files
committed
more fixes
1 parent 5eb3ac7 commit 575a295

3 files changed

Lines changed: 46 additions & 24 deletions

File tree

azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java

Lines changed: 0 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -9,9 +9,7 @@
99
import com.azure.storage.blob.BlobServiceClientBuilder;
1010
import com.azure.storage.blob.models.BlobDownloadResponse;
1111
import com.azure.storage.blob.models.BlobHttpHeaders;
12-
import com.azure.storage.blob.models.BlobRange;
1312
import com.azure.storage.blob.models.BlobStorageException;
14-
import com.azure.storage.blob.options.BlobDownloadToFileOptions;
1513
import com.azure.storage.common.policy.RequestRetryOptions;
1614
import com.azure.storage.common.policy.RetryPolicyType;
1715

@@ -20,7 +18,6 @@
2018
import java.io.IOException;
2119
import java.io.InputStream;
2220
import java.nio.charset.StandardCharsets;
23-
import java.time.Duration;
2421
import java.util.UUID;
2522
import java.util.zip.GZIPInputStream;
2623
import java.util.zip.GZIPOutputStream;

azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadInterceptor.java

Lines changed: 6 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@
88

99
import io.grpc.*;
1010

11+
import java.nio.charset.StandardCharsets;
1112
import java.util.logging.Level;
1213
import java.util.logging.Logger;
1314

@@ -767,23 +768,12 @@ private static String getStackTraceString(Throwable t) {
767768
}
768769

769770
/**
770-
* Computes the UTF-8 encoded byte length of a string without allocating a byte array.
771+
* Computes the UTF-8 encoded byte length of a string.
772+
* <p>
773+
* Uses Java's canonical UTF-8 encoding behavior so malformed surrogate
774+
* sequences are measured exactly the same way as payload serialization.
771775
*/
772776
private static int utf8ByteLength(String s) {
773-
int count = 0;
774-
for (int i = 0; i < s.length(); i++) {
775-
char ch = s.charAt(i);
776-
if (ch <= 0x7F) {
777-
count++;
778-
} else if (ch <= 0x7FF) {
779-
count += 2;
780-
} else if (Character.isHighSurrogate(ch)) {
781-
count += 4;
782-
i++; // skip low surrogate
783-
} else {
784-
count += 3;
785-
}
786-
}
787-
return count;
777+
return s.getBytes(StandardCharsets.UTF_8).length;
788778
}
789779
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 40 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -649,6 +649,7 @@ private void completeOrchestratorTaskWithChunking(OrchestratorResponse response)
649649
int chunkIndex = 0;
650650
boolean isPartial = true;
651651
boolean isChunkedMode = false;
652+
boolean hasTraceContext = response.hasOrchestrationTraceContext();
652653

653654
while (isPartial) {
654655
OrchestratorResponse.Builder chunk = OrchestratorResponse.newBuilder()
@@ -657,15 +658,23 @@ private void completeOrchestratorTaskWithChunking(OrchestratorResponse response)
657658
.setCompletionToken(response.getCompletionToken())
658659
.setRequiresHistory(response.getRequiresHistory());
659660

660-
int chunkPayloadSize = 0;
661661
while (actionsCompleted < allActions.size()) {
662-
int actionSize = allActions.get(actionsCompleted).getSerializedSize();
662+
OrchestratorAction nextAction = allActions.get(actionsCompleted);
663+
664+
chunk.addActions(nextAction);
665+
666+
int estimatedSize = estimateChunkSerializedSize(
667+
chunk,
668+
chunkIndex,
669+
hasTraceContext,
670+
response.getOrchestrationTraceContext());
671+
663672
// Always accept the first action in an empty chunk to avoid infinite loops
664-
if (chunkPayloadSize + actionSize > maxChunkBytes && chunkPayloadSize > 0) {
673+
if (estimatedSize > maxChunkBytes && chunk.getActionsCount() > 1) {
674+
chunk.removeActions(chunk.getActionsCount() - 1);
665675
break;
666676
}
667-
chunk.addActions(allActions.get(actionsCompleted));
668-
chunkPayloadSize += actionSize;
677+
669678
actionsCompleted++;
670679
}
671680

@@ -696,6 +705,32 @@ private void completeOrchestratorTaskWithChunking(OrchestratorResponse response)
696705
}
697706
}
698707

708+
/**
709+
* Estimates the serialized size of a candidate chunk including envelope overhead fields
710+
* that may be added later in the chunking flow.
711+
*/
712+
private static int estimateChunkSerializedSize(
713+
OrchestratorResponse.Builder chunk,
714+
int chunkIndex,
715+
boolean hasTraceContext,
716+
OrchestrationTraceContext traceContext) {
717+
OrchestratorResponse.Builder estimate = chunk.clone();
718+
719+
// Include potential overhead fields to avoid under-estimating chunk size.
720+
estimate.setIsPartial(true);
721+
estimate.setChunkIndex(Int32Value.of(chunkIndex));
722+
723+
if (chunkIndex == 0) {
724+
if (hasTraceContext) {
725+
estimate.setOrchestrationTraceContext(traceContext);
726+
}
727+
} else {
728+
estimate.setNumEventsProcessed(Int32Value.of(0));
729+
}
730+
731+
return estimate.build().getSerializedSize();
732+
}
733+
699734
static WorkItemFilters toProtoWorkItemFilters(WorkItemFilter filter) {
700735
WorkItemFilters.Builder builder = WorkItemFilters.newBuilder();
701736
for (WorkItemFilter.OrchestrationFilter orch : filter.getOrchestrations()) {

0 commit comments

Comments
 (0)