Skip to content

Commit 8a8b217

Browse files
committed
addressed pr feedback
1 parent 35af38c commit 8a8b217

File tree

11 files changed

+150
-50
lines changed

11 files changed

+150
-50
lines changed

azure-blob-payloads/src/main/java/com/microsoft/durabletask/azureblobpayloads/BlobPayloadStore.java

Lines changed: 26 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,10 @@
66
import com.azure.storage.blob.BlobContainerClient;
77
import com.azure.storage.blob.BlobServiceClient;
88
import com.azure.storage.blob.BlobServiceClientBuilder;
9+
import com.azure.storage.blob.models.BlobDownloadContentResponse;
910
import com.azure.storage.blob.models.BlobHttpHeaders;
1011
import com.azure.storage.blob.models.BlobStorageException;
12+
import com.azure.storage.blob.options.BlobParallelUploadOptions;
1113
import com.microsoft.durabletask.PayloadStore;
1214

1315
import java.io.ByteArrayInputStream;
@@ -43,6 +45,7 @@ public final class BlobPayloadStore implements PayloadStore {
4345
private final String blobPrefix;
4446
private final String containerName;
4547
private final boolean compressPayloads;
48+
private volatile boolean containerEnsured;
4649

4750
/**
4851
* Creates a new BlobPayloadStore with the given options.
@@ -72,8 +75,6 @@ public BlobPayloadStore(BlobPayloadStoreOptions options) {
7275
this.blobPrefix = options.getBlobPrefix();
7376
this.containerName = options.getContainerName();
7477
this.compressPayloads = options.isCompressPayloads();
75-
76-
ensureContainerExists();
7778
}
7879

7980
@Override
@@ -82,6 +83,8 @@ public String upload(String payload) {
8283
throw new IllegalArgumentException("payload must not be null");
8384
}
8485

86+
ensureContainerExists();
87+
8588
String blobName = this.blobPrefix + UUID.randomUUID().toString().replace("-", "") + BLOB_EXTENSION;
8689
BlobClient blobClient = this.containerClient.getBlobClient(blobName);
8790

@@ -90,8 +93,11 @@ public String upload(String payload) {
9093

9194
if (this.compressPayloads) {
9295
data = gzipCompress(rawData);
93-
blobClient.upload(new ByteArrayInputStream(data), data.length, true);
94-
blobClient.setHttpHeaders(new BlobHttpHeaders().setContentEncoding(GZIP_CONTENT_ENCODING));
96+
BlobHttpHeaders headers = new BlobHttpHeaders().setContentEncoding(GZIP_CONTENT_ENCODING);
97+
BlobParallelUploadOptions uploadOptions = new BlobParallelUploadOptions(
98+
new ByteArrayInputStream(data), data.length)
99+
.setHeaders(headers);
100+
blobClient.uploadWithResponse(uploadOptions, null, null);
95101
} else {
96102
data = rawData;
97103
blobClient.upload(new ByteArrayInputStream(data), data.length, true);
@@ -112,13 +118,9 @@ public String download(String token) {
112118
String blobName = extractBlobName(token);
113119
BlobClient blobClient = this.containerClient.getBlobClient(blobName);
114120

115-
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
116-
blobClient.downloadStream(outputStream);
117-
118-
byte[] rawBytes = outputStream.toByteArray();
119-
120-
// Check if the blob was compressed by inspecting content encoding
121-
String contentEncoding = blobClient.getProperties().getContentEncoding();
121+
BlobDownloadContentResponse response = blobClient.downloadContentWithResponse(null, null, null, null);
122+
byte[] rawBytes = response.getValue().toBytes();
123+
String contentEncoding = response.getDeserializedHeaders().getContentEncoding();
122124
if (GZIP_CONTENT_ENCODING.equalsIgnoreCase(contentEncoding)) {
123125
rawBytes = gzipDecompress(rawBytes);
124126
}
@@ -137,21 +139,27 @@ public boolean isKnownPayloadToken(String value) {
137139
}
138140

139141
private void ensureContainerExists() {
142+
if (this.containerEnsured) {
143+
return;
144+
}
140145
try {
141146
if (!this.containerClient.exists()) {
142147
this.containerClient.create();
143148
logger.info(() -> String.format("Created blob container: %s", this.containerClient.getBlobContainerName()));
144149
}
150+
this.containerEnsured = true;
145151
} catch (BlobStorageException e) {
146152
// Container might have been created concurrently (409 Conflict)
147153
if (e.getStatusCode() != 409) {
148154
throw e;
149155
}
156+
this.containerEnsured = true;
150157
}
151158
}
152159

153160
/**
154-
* Extracts the blob name from a {@code blob:v1:<container>:<blobName>} token.
161+
* Extracts the blob name from a {@code blob:v1:<container>:<blobName>} token
162+
* and validates that the container matches the configured container.
155163
*/
156164
private String extractBlobName(String token) {
157165
if (!token.startsWith(TOKEN_PREFIX)) {
@@ -165,6 +173,12 @@ private String extractBlobName(String token) {
165173
throw new IllegalArgumentException(
166174
"Token does not have the expected format (blob:v1:<container>:<blobName>): " + token);
167175
}
176+
String tokenContainer = remainder.substring(0, colonIndex);
177+
if (!this.containerName.equals(tokenContainer)) {
178+
throw new IllegalArgumentException(String.format(
179+
"Token container '%s' does not match configured container '%s'",
180+
tokenContainer, this.containerName));
181+
}
168182
return remainder.substring(colonIndex + 1);
169183
}
170184

Lines changed: 41 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,41 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask.azureblobpayloads;
4+
5+
import com.microsoft.durabletask.PayloadStore;
6+
import com.microsoft.durabletask.PayloadStoreProvider;
7+
8+
import java.util.logging.Level;
9+
import java.util.logging.Logger;
10+
11+
/**
12+
* {@link PayloadStoreProvider} implementation that creates a {@link BlobPayloadStore}
13+
* when the {@code DURABLETASK_LARGE_PAYLOADS_CONNECTION_STRING} environment variable is set.
14+
* <p>
15+
* This provider is discovered automatically via {@link java.util.ServiceLoader}.
16+
*/
17+
public final class BlobPayloadStoreProvider implements PayloadStoreProvider {
18+
19+
private static final Logger logger = Logger.getLogger(BlobPayloadStoreProvider.class.getName());
20+
private static final String ENV_STORAGE_CONNECTION_STRING = "DURABLETASK_LARGE_PAYLOADS_CONNECTION_STRING";
21+
22+
@Override
23+
public PayloadStore create() {
24+
String connectionString = System.getenv(ENV_STORAGE_CONNECTION_STRING);
25+
if (connectionString == null || connectionString.isEmpty()) {
26+
return null;
27+
}
28+
29+
try {
30+
BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder()
31+
.setConnectionString(connectionString)
32+
.build();
33+
logger.info("Large payload externalization enabled using Azure Blob Storage");
34+
return new BlobPayloadStore(options);
35+
} catch (Exception e) {
36+
logger.log(Level.WARNING,
37+
"Failed to initialize BlobPayloadStore; large payloads will not be externalized", e);
38+
return null;
39+
}
40+
}
41+
}
Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
com.microsoft.durabletask.azureblobpayloads.BlobPayloadStoreProvider

azurefunctions/build.gradle

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,6 @@ repositories {
3535

3636
dependencies {
3737
api project(':client')
38-
implementation project(':azure-blob-payloads')
3938
implementation group: 'com.microsoft.azure.functions', name: 'azure-functions-java-library', version: '3.2.3'
4039
implementation "com.google.protobuf:protobuf-java:${protocVersion}"
4140
compileOnly "com.microsoft.azure.functions:azure-functions-java-spi:1.1.0"

azurefunctions/src/main/java/com/microsoft/durabletask/azurefunctions/internal/middleware/OrchestrationMiddleware.java

Lines changed: 15 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -13,12 +13,12 @@
1313
import com.microsoft.durabletask.DataConverter;
1414
import com.microsoft.durabletask.OrchestrationRunner;
1515
import com.microsoft.durabletask.PayloadStore;
16+
import com.microsoft.durabletask.PayloadStoreProvider;
1617
import com.microsoft.durabletask.TaskFailedException;
17-
import com.microsoft.durabletask.azureblobpayloads.BlobPayloadStore;
18-
import com.microsoft.durabletask.azureblobpayloads.BlobPayloadStoreOptions;
1918
import com.microsoft.durabletask.interruption.ContinueAsNewInterruption;
2019
import com.microsoft.durabletask.interruption.OrchestratorBlockedException;
2120

21+
import java.util.ServiceLoader;
2222
import java.util.logging.Level;
2323
import java.util.logging.Logger;
2424

@@ -33,16 +33,6 @@ public class OrchestrationMiddleware implements Middleware {
3333
private static final String ORCHESTRATION_TRIGGER = "DurableOrchestrationTrigger";
3434
private static final Logger logger = Logger.getLogger(OrchestrationMiddleware.class.getName());
3535

36-
/**
37-
* Environment variable for the Azure Storage connection string used for large payload externalization.
38-
*/
39-
private static final String ENV_STORAGE_CONNECTION_STRING = "DURABLETASK_STORAGE_CONNECTION_STRING";
40-
41-
/**
42-
* Fallback environment variable (standard Azure Functions storage connection).
43-
*/
44-
private static final String ENV_AZURE_WEB_JOBS_STORAGE = "AzureWebJobsStorage";
45-
4636
private final PayloadStore payloadStore;
4737

4838
public OrchestrationMiddleware() {
@@ -98,27 +88,19 @@ public void invoke(MiddlewareContext context, MiddlewareChain chain) throws Exce
9888
}
9989

10090
private static PayloadStore initializePayloadStore() {
101-
// Check for explicit large-payload storage connection string
102-
String connectionString = System.getenv(ENV_STORAGE_CONNECTION_STRING);
103-
if (connectionString == null || connectionString.isEmpty()) {
104-
// Fall back to standard Azure Functions storage connection
105-
connectionString = System.getenv(ENV_AZURE_WEB_JOBS_STORAGE);
106-
}
107-
108-
if (connectionString == null || connectionString.isEmpty()) {
109-
logger.fine("No storage connection string configured for large payload externalization");
110-
return null;
111-
}
112-
113-
try {
114-
BlobPayloadStoreOptions options = new BlobPayloadStoreOptions.Builder()
115-
.setConnectionString(connectionString)
116-
.build();
117-
logger.info("Large payload externalization enabled using Azure Blob Storage");
118-
return new BlobPayloadStore(options);
119-
} catch (Exception e) {
120-
logger.log(Level.WARNING, "Failed to initialize BlobPayloadStore; large payloads will not be externalized", e);
121-
return null;
91+
ServiceLoader<PayloadStoreProvider> loader = ServiceLoader.load(PayloadStoreProvider.class);
92+
for (PayloadStoreProvider provider : loader) {
93+
try {
94+
PayloadStore store = provider.create();
95+
if (store != null) {
96+
return store;
97+
}
98+
} catch (Exception e) {
99+
logger.log(Level.WARNING,
100+
"PayloadStoreProvider " + provider.getClass().getName() + " failed to create store", e);
101+
}
122102
}
103+
logger.fine("No PayloadStoreProvider found or configured; large payload externalization is disabled");
104+
return null;
123105
}
124106
}

client/src/main/java/com/microsoft/durabletask/DurableTaskGrpcWorker.java

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -311,7 +311,7 @@ public void startAndBlock() {
311311
response = externalizeOrchestratorResponsePayloads(response);
312312
}
313313
sendOrchestratorResponse(response);
314-
} catch (IllegalArgumentException | IllegalStateException e) {
314+
} catch (PayloadTooLargeException e) {
315315
logger.log(Level.WARNING,
316316
"Failed to send orchestrator response for instance '" +
317317
orchestratorRequest.getInstanceId() + "': " + e.getMessage(), e);
@@ -320,6 +320,7 @@ public void startAndBlock() {
320320
.setFailureDetails(TaskFailureDetails.newBuilder()
321321
.setErrorType(e.getClass().getName())
322322
.setErrorMessage(e.getMessage())
323+
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
323324
.build())
324325
.build();
325326
OrchestratorResponse failResponse = OrchestratorResponse.newBuilder()

client/src/main/java/com/microsoft/durabletask/PayloadHelper.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,12 @@ String maybeExternalize(String value) {
4848
return value;
4949
}
5050

51+
// Fast path: if char count is below threshold, byte count is too
52+
// (each Java char encodes to 1-3 UTF-8 bytes, so length() <= UTF-8 byte length)
53+
if (value.length() <= this.options.getThresholdBytes()) {
54+
return value;
55+
}
56+
5157
int byteSize = value.getBytes(StandardCharsets.UTF_8).length;
5258

5359
// (2) below-threshold guard
@@ -57,7 +63,7 @@ String maybeExternalize(String value) {
5763

5864
// (3) above-max-cap rejection
5965
if (byteSize > this.options.getMaxExternalizedPayloadBytes()) {
60-
throw new IllegalArgumentException(String.format(
66+
throw new PayloadTooLargeException(String.format(
6167
"Payload size %d KB exceeds maximum of %d KB. " +
6268
"Reduce the payload size or increase maxExternalizedPayloadBytes.",
6369
byteSize / 1024,

client/src/main/java/com/microsoft/durabletask/PayloadInterceptionHelper.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -185,7 +185,7 @@ interface HistoryEventUpdater {
185185
private static HistoryEvent resolveStringValueField(HistoryEvent event, StringValue field,
186186
PayloadHelper payloadHelper,
187187
HistoryEventUpdater updater) {
188-
if (!field.isInitialized() || field.getValue().isEmpty()) {
188+
if (field.getValue().isEmpty()) {
189189
return event;
190190
}
191191
String resolved = payloadHelper.maybeResolve(field.getValue());
@@ -314,7 +314,7 @@ private static HistoryEvent externalizeHistoryEvent(HistoryEvent event, PayloadH
314314
private static HistoryEvent externalizeStringValueField(HistoryEvent event, StringValue field,
315315
PayloadHelper payloadHelper,
316316
HistoryEventUpdater updater) {
317-
if (!field.isInitialized() || field.getValue().isEmpty()) {
317+
if (field.getValue().isEmpty()) {
318318
return event;
319319
}
320320
String externalized = payloadHelper.maybeExternalize(field.getValue());

client/src/main/java/com/microsoft/durabletask/PayloadStore.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,12 @@
1313
* The store implementation is solely responsible for generating blob names/keys and
1414
* defining the token format. The core framework treats tokens as opaque strings and
1515
* delegates token recognition to {@link #isKnownPayloadToken(String)}.
16+
* <p>
17+
* <b>Payload retention:</b> This interface does not define a deletion mechanism.
18+
* Externalized payloads persist until removed by external means. When using Azure
19+
* Blob Storage, configure
20+
* <a href="https://learn.microsoft.com/azure/storage/blobs/lifecycle-management-overview">
21+
* lifecycle management policies</a> to automatically expire old payloads.
1622
*
1723
* @see LargePayloadOptions
1824
*/
Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,30 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask;
4+
5+
/**
6+
* Service provider interface for discovering {@link PayloadStore} implementations at runtime.
7+
* <p>
8+
* Implementations are discovered via {@link java.util.ServiceLoader}. To register a provider,
9+
* create a file {@code META-INF/services/com.microsoft.durabletask.PayloadStoreProvider}
10+
* containing the fully qualified class name of the implementation.
11+
* <p>
12+
* The provider is responsible for reading its own configuration (e.g., environment variables)
13+
* and determining whether it can create a functional {@link PayloadStore}.
14+
*
15+
* @see PayloadStore
16+
*/
17+
public interface PayloadStoreProvider {
18+
19+
/**
20+
* Attempts to create a {@link PayloadStore} based on available configuration.
21+
* <p>
22+
* Implementations should inspect environment variables or other configuration sources
23+
* to determine if they can provide a store. If the required configuration is not present,
24+
* this method should return {@code null} rather than throwing an exception.
25+
*
26+
* @return a configured {@link PayloadStore}, or {@code null} if the required configuration
27+
* is not available
28+
*/
29+
PayloadStore create();
30+
}

0 commit comments

Comments
 (0)