Skip to content

Commit 5eb3ac7

Browse files
committed
Add tests for large payload externalization and fix review issues
1 parent 264d083 commit 5eb3ac7

5 files changed

Lines changed: 86 additions & 51 deletions

File tree

azure-blob-payloads/build.gradle

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -72,7 +72,7 @@ test {
7272
}
7373
}
7474

75-
// Integration tests require DTS emulator on localhost:4001 and Azurite on localhost:10000.
75+
// Integration tests require DTS emulator (default localhost:8080) and Azurite on localhost:10000.
7676
task integrationTest(type: Test) {
7777
useJUnitPlatform {
7878
includeTags 'integration'

azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java

Lines changed: 30 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -2,23 +2,25 @@
22
// Licensed under the MIT License.
33
package com.microsoft.durabletask.azureblobpayloads;
44

5-
import com.azure.core.credential.TokenCredential;
6-
import com.azure.core.http.rest.Response;
75
import com.azure.core.util.Context;
86
import com.azure.storage.blob.BlobClient;
97
import com.azure.storage.blob.BlobContainerClient;
108
import com.azure.storage.blob.BlobServiceClient;
119
import com.azure.storage.blob.BlobServiceClientBuilder;
10+
import com.azure.storage.blob.models.BlobDownloadResponse;
1211
import com.azure.storage.blob.models.BlobHttpHeaders;
12+
import com.azure.storage.blob.models.BlobRange;
1313
import com.azure.storage.blob.models.BlobStorageException;
14-
import com.azure.storage.blob.models.PublicAccessType;
14+
import com.azure.storage.blob.options.BlobDownloadToFileOptions;
15+
import com.azure.storage.common.policy.RequestRetryOptions;
16+
import com.azure.storage.common.policy.RetryPolicyType;
1517

1618
import java.io.ByteArrayInputStream;
1719
import java.io.ByteArrayOutputStream;
1820
import java.io.IOException;
1921
import java.io.InputStream;
20-
import java.io.OutputStream;
2122
import java.nio.charset.StandardCharsets;
23+
import java.time.Duration;
2224
import java.util.UUID;
2325
import java.util.zip.GZIPInputStream;
2426
import java.util.zip.GZIPOutputStream;
@@ -62,15 +64,27 @@ public BlobPayloadStore(LargePayloadStorageOptions options) {
6264
"Either ConnectionString or AccountUri and Credential must be provided.");
6365
}
6466

67+
// Retry policy: exponential (8 retries, 250ms base, 10s max, 2min network timeout)
68+
// Matches the .NET BlobPayloadStore retry configuration.
69+
RequestRetryOptions retryOptions = new RequestRetryOptions(
70+
RetryPolicyType.EXPONENTIAL,
71+
8, // maxTries
72+
120, // tryTimeoutInSeconds (2 min network timeout)
73+
250L, // retryDelayInMs (250ms base)
74+
10_000L, // maxRetryDelayInMs (10s max)
75+
null); // secondaryHost
76+
6577
BlobServiceClient serviceClient;
6678
if (hasIdentityAuth) {
6779
serviceClient = new BlobServiceClientBuilder()
6880
.endpoint(options.getAccountUri().toString())
6981
.credential(options.getCredential())
82+
.retryOptions(retryOptions)
7083
.buildClient();
7184
} else {
7285
serviceClient = new BlobServiceClientBuilder()
7386
.connectionString(options.getConnectionString())
87+
.retryOptions(retryOptions)
7488
.buildClient();
7589
}
7690

@@ -150,11 +164,20 @@ public String download(String token) {
150164

151165
try {
152166
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
153-
blob.downloadStream(outputStream);
167+
// Use downloadStreamWithResponse to get content-encoding header in the same call,
168+
// avoiding a separate getProperties() round-trip.
169+
BlobDownloadResponse downloadResponse = blob.downloadStreamWithResponse(
170+
outputStream,
171+
null, // range (full blob)
172+
null, // options
173+
null, // requestConditions
174+
false, // getMD5
175+
null, // timeout
176+
Context.NONE);
154177
byte[] rawBytes = outputStream.toByteArray();
155178

156-
// Check if the content is gzip-compressed by trying to read the gzip header
157-
String contentEncoding = blob.getProperties().getContentEncoding();
179+
// Check if the content is gzip-compressed via the response header
180+
String contentEncoding = downloadResponse.getDeserializedHeaders().getContentEncoding();
158181
boolean isGzip = CONTENT_ENCODING_GZIP.equalsIgnoreCase(contentEncoding);
159182

160183
if (isGzip) {

azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadInterceptor.java

Lines changed: 22 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@
88

99
import io.grpc.*;
1010

11-
import java.nio.charset.StandardCharsets;
1211
import java.util.logging.Level;
1312
import java.util.logging.Logger;
1413

@@ -712,7 +711,7 @@ private StringValue maybeExternalize(StringValue value) {
712711
}
713712

714713
String strValue = value.getValue();
715-
int size = strValue.getBytes(StandardCharsets.UTF_8).length;
714+
int size = utf8ByteLength(strValue);
716715

717716
if (size < this.options.getThresholdBytes()) {
718717
return value;
@@ -766,4 +765,25 @@ private static String getStackTraceString(Throwable t) {
766765
t.printStackTrace(new java.io.PrintWriter(sw));
767766
return sw.toString();
768767
}
768+
769+
/**
770+
* Computes the UTF-8 encoded byte length of a string without allocating a byte array.
771+
*/
772+
private static int utf8ByteLength(String s) {
773+
int count = 0;
774+
for (int i = 0; i < s.length(); i++) {
775+
char ch = s.charAt(i);
776+
if (ch <= 0x7F) {
777+
count++;
778+
} else if (ch <= 0x7FF) {
779+
count += 2;
780+
} else if (Character.isHighSurrogate(ch)) {
781+
count += 4;
782+
i++; // skip low surrogate
783+
} else {
784+
count += 3;
785+
}
786+
}
787+
return count;
788+
}
769789
}

azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStoreTest.java

Lines changed: 31 additions & 39 deletions
Original file line numberDiff line numberDiff line change
@@ -4,8 +4,9 @@
44

55
import com.azure.storage.blob.BlobClient;
66
import com.azure.storage.blob.BlobContainerClient;
7+
import com.azure.storage.blob.models.BlobDownloadHeaders;
8+
import com.azure.storage.blob.models.BlobDownloadResponse;
79
import com.azure.storage.blob.models.BlobHttpHeaders;
8-
import com.azure.storage.blob.models.BlobProperties;
910
import com.azure.storage.blob.models.BlobStorageException;
1011

1112
import org.junit.jupiter.api.BeforeEach;
@@ -121,15 +122,7 @@ void download_uncompressedBlob_returnsOriginalPayload() {
121122
String expectedPayload = "Hello, uncompressed world!";
122123
byte[] payloadBytes = expectedPayload.getBytes(StandardCharsets.UTF_8);
123124

124-
doAnswer(inv -> {
125-
OutputStream out = inv.getArgument(0);
126-
out.write(payloadBytes);
127-
return null;
128-
}).when(mockBlobClient).downloadStream(any(OutputStream.class));
129-
130-
BlobProperties properties = mock(BlobProperties.class);
131-
when(properties.getContentEncoding()).thenReturn(null);
132-
when(mockBlobClient.getProperties()).thenReturn(properties);
125+
mockDownloadResponse(payloadBytes, null);
133126

134127
BlobPayloadStore store = new BlobPayloadStore(mockContainerClient, options);
135128
String token = BlobPayloadStore.encodeToken("durabletask-payloads", "testblob");
@@ -150,15 +143,7 @@ void download_gzipCompressedBlob_decompressesCorrectly() throws Exception {
150143
}
151144
byte[] compressedBytes = compressedBuffer.toByteArray();
152145

153-
doAnswer(inv -> {
154-
OutputStream out = inv.getArgument(0);
155-
out.write(compressedBytes);
156-
return null;
157-
}).when(mockBlobClient).downloadStream(any(OutputStream.class));
158-
159-
BlobProperties properties = mock(BlobProperties.class);
160-
when(properties.getContentEncoding()).thenReturn("gzip");
161-
when(mockBlobClient.getProperties()).thenReturn(properties);
146+
mockDownloadResponse(compressedBytes, "gzip");
162147

163148
BlobPayloadStore store = new BlobPayloadStore(mockContainerClient, options);
164149
String token = BlobPayloadStore.encodeToken("durabletask-payloads", "compressed-blob");
@@ -172,7 +157,8 @@ void download_gzipCompressedBlob_decompressesCorrectly() throws Exception {
172157
void download_blobNotFound_throwsPayloadStorageException() {
173158
BlobStorageException notFound = mock(BlobStorageException.class);
174159
when(notFound.getStatusCode()).thenReturn(404);
175-
doThrow(notFound).when(mockBlobClient).downloadStream(any(OutputStream.class));
160+
doThrow(notFound).when(mockBlobClient).downloadStreamWithResponse(
161+
any(OutputStream.class), isNull(), isNull(), isNull(), eq(false), isNull(), any());
176162

177163
BlobPayloadStore store = new BlobPayloadStore(mockContainerClient, options);
178164
String token = BlobPayloadStore.encodeToken("durabletask-payloads", "missing-blob");
@@ -194,7 +180,8 @@ void download_nonMatchingContainer_throwsIllegalArgument() {
194180
void download_blobStorageError_throwsPayloadStorageException() {
195181
BlobStorageException serverError = mock(BlobStorageException.class);
196182
when(serverError.getStatusCode()).thenReturn(500);
197-
doThrow(serverError).when(mockBlobClient).downloadStream(any(OutputStream.class));
183+
doThrow(serverError).when(mockBlobClient).downloadStreamWithResponse(
184+
any(OutputStream.class), isNull(), isNull(), isNull(), eq(false), isNull(), any());
198185

199186
BlobPayloadStore store = new BlobPayloadStore(mockContainerClient, options);
200187
String token = BlobPayloadStore.encodeToken("durabletask-payloads", "error-blob");
@@ -228,15 +215,7 @@ void upload_download_roundTrip_withCompression() throws Exception {
228215
String token = store.upload(originalPayload);
229216

230217
// Now set up download to return the captured bytes
231-
doAnswer(inv -> {
232-
OutputStream out = inv.getArgument(0);
233-
out.write(capturedBytes[0]);
234-
return null;
235-
}).when(mockBlobClient).downloadStream(any(OutputStream.class));
236-
237-
BlobProperties properties = mock(BlobProperties.class);
238-
when(properties.getContentEncoding()).thenReturn("gzip");
239-
when(mockBlobClient.getProperties()).thenReturn(properties);
218+
mockDownloadResponse(capturedBytes[0], "gzip");
240219

241220
// Need to map the token's blob name back to our mock
242221
String[] decoded = BlobPayloadStore.decodeToken(token);
@@ -269,15 +248,7 @@ void upload_download_roundTrip_withoutCompression() throws Exception {
269248
String token = store.upload(originalPayload);
270249

271250
// Set up download
272-
doAnswer(inv -> {
273-
OutputStream out = inv.getArgument(0);
274-
out.write(capturedBytes[0]);
275-
return null;
276-
}).when(mockBlobClient).downloadStream(any(OutputStream.class));
277-
278-
BlobProperties properties = mock(BlobProperties.class);
279-
when(properties.getContentEncoding()).thenReturn(null);
280-
when(mockBlobClient.getProperties()).thenReturn(properties);
251+
mockDownloadResponse(capturedBytes[0], null);
281252

282253
String[] decoded = BlobPayloadStore.decodeToken(token);
283254
when(mockContainerClient.getBlobClient(decoded[1])).thenReturn(mockBlobClient);
@@ -340,4 +311,25 @@ void upload_compressionProducesGzipHeaders() {
340311
BlobHttpHeaders capturedHeaders = headersCaptor.getValue();
341312
assertEquals("gzip", capturedHeaders.getContentEncoding());
342313
}
314+
315+
// ==================== Test helpers ====================
316+
317+
/**
318+
* Sets up the mockBlobClient to return a downloadStreamWithResponse that writes
319+
* the given bytes and returns headers with the specified content-encoding.
320+
*/
321+
private void mockDownloadResponse(byte[] content, String contentEncoding) {
322+
BlobDownloadHeaders downloadHeaders = mock(BlobDownloadHeaders.class);
323+
when(downloadHeaders.getContentEncoding()).thenReturn(contentEncoding);
324+
325+
BlobDownloadResponse downloadResponse = mock(BlobDownloadResponse.class);
326+
when(downloadResponse.getDeserializedHeaders()).thenReturn(downloadHeaders);
327+
328+
doAnswer(inv -> {
329+
OutputStream out = inv.getArgument(0);
330+
out.write(content);
331+
return downloadResponse;
332+
}).when(mockBlobClient).downloadStreamWithResponse(
333+
any(OutputStream.class), isNull(), isNull(), isNull(), eq(false), isNull(), any());
334+
}
343335
}

azure-blob-payloads/src/test/java/com/microsoft/durabletask/azureblobpayloads/LargePayloadIntegrationTest.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -24,8 +24,8 @@
2424
* <p>
2525
* These tests require:
2626
* <ul>
27-
* <li>DTS emulator running on localhost:4001:
28-
* {@code docker run --name durabletask-emulator -p 4001:8080 -d mcr.microsoft.com/dts/dts-emulator:latest}</li>
27+
* <li>DTS emulator running on localhost:8080:
28+
* {@code docker run --name durabletask-emulator -p 8080:8080 -d mcr.microsoft.com/dts/dts-emulator:latest}</li>
2929
* <li>Azurite running on localhost:10000:
3030
* {@code docker run --name azurite -p 10000:10000 -p 10001:10001 -p 10002:10002 -d mcr.microsoft.com/azure-storage/azurite}</li>
3131
* </ul>

0 commit comments

Comments
 (0)