Skip to content

Commit 264d083

Browse files
committed
initial commit
1 parent b047247 commit 264d083

25 files changed

Lines changed: 4241 additions & 3 deletions

azure-blob-payloads/build.gradle

Lines changed: 116 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,116 @@
1+
plugins {
2+
id 'java-library'
3+
id 'maven-publish'
4+
id 'signing'
5+
id 'com.github.spotbugs' version '6.4.8'
6+
}
7+
8+
group 'com.microsoft'
9+
version = '1.8.0'
10+
archivesBaseName = 'durabletask-azure-blob-payloads'
11+
12+
def grpcVersion = '1.78.0'
13+
def azureCoreVersion = '1.57.1'
14+
def azureStorageBlobVersion = '12.29.1'
15+
16+
// Java 11 is used to compile and run all tests. Set the JDK_11 env var to your
17+
// local JDK 11 home directory, e.g. C:/Program Files/Java/openjdk-11.0.12_7/
18+
// If unset, falls back to the current JDK running Gradle.
19+
def rawJdkPath = System.env.JDK_11 ?: System.getProperty("java.home")
20+
def PATH_TO_TEST_JAVA_RUNTIME = rawJdkPath
21+
if (rawJdkPath != null) {
22+
def f = new File(rawJdkPath)
23+
if (f.isFile()) {
24+
PATH_TO_TEST_JAVA_RUNTIME = f.parentFile.parentFile.absolutePath
25+
}
26+
}
27+
def isWindows = System.getProperty("os.name").toLowerCase().contains("win")
28+
def exeSuffix = isWindows ? ".exe" : ""
29+
30+
dependencies {
31+
api project(':client')
32+
33+
// Azure Storage Blobs
34+
implementation "com.azure:azure-storage-blob:${azureStorageBlobVersion}"
35+
36+
// TokenCredential abstraction (from azure-core)
37+
implementation "com.azure:azure-core:${azureCoreVersion}"
38+
39+
// gRPC interceptor API
40+
implementation "io.grpc:grpc-api:${grpcVersion}"
41+
implementation "io.grpc:grpc-protobuf:${grpcVersion}"
42+
implementation "io.grpc:grpc-stub:${grpcVersion}"
43+
44+
// NOTE: azure-identity is NOT included here. Users who need
45+
// DefaultAzureCredential should add it to their own project.
46+
47+
testImplementation 'org.mockito:mockito-core:5.21.0'
48+
testImplementation 'org.mockito:mockito-junit-jupiter:5.21.0'
49+
testImplementation project(':azuremanaged')
50+
}
51+
52+
compileJava {
53+
sourceCompatibility = JavaVersion.VERSION_1_8
54+
targetCompatibility = JavaVersion.VERSION_1_8
55+
}
56+
compileTestJava {
57+
sourceCompatibility = JavaVersion.VERSION_11
58+
targetCompatibility = JavaVersion.VERSION_11
59+
options.fork = true
60+
options.forkOptions.executable = "${PATH_TO_TEST_JAVA_RUNTIME}/bin/javac${exeSuffix}"
61+
}
62+
63+
tasks.withType(Test) {
64+
executable = new File("${PATH_TO_TEST_JAVA_RUNTIME}", "bin/java${exeSuffix}")
65+
}
66+
67+
test {
68+
useJUnitPlatform {
69+
// Skip tests tagged as "integration" since those require
70+
// external dependencies (DTS emulator + Azurite).
71+
excludeTags "integration"
72+
}
73+
}
74+
75+
// Integration tests require DTS emulator on localhost:4001 and Azurite on localhost:10000.
76+
task integrationTest(type: Test) {
77+
useJUnitPlatform {
78+
includeTags 'integration'
79+
}
80+
dependsOn build
81+
shouldRunAfter test
82+
testLogging.showStandardStreams = true
83+
ignoreFailures = false
84+
}
85+
86+
spotbugs {
87+
toolVersion = '4.9.8'
88+
effort = com.github.spotbugs.snom.Effort.valueOf('MAX')
89+
reportLevel = com.github.spotbugs.snom.Confidence.valueOf('HIGH')
90+
ignoreFailures = true
91+
excludeFilter = file('spotbugs-exclude.xml')
92+
}
93+
94+
spotbugsMain {
95+
reports {
96+
html {
97+
required = true
98+
stylesheet = 'fancy-hist.xsl'
99+
}
100+
xml {
101+
required = true
102+
}
103+
}
104+
}
105+
106+
spotbugsTest {
107+
reports {
108+
html {
109+
required = true
110+
stylesheet = 'fancy-hist.xsl'
111+
}
112+
xml {
113+
required = true
114+
}
115+
}
116+
}
Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,17 @@
1+
<?xml version="1.0" encoding="UTF-8"?>
2+
<FindBugsFilter>
3+
<!-- Exclude test classes -->
4+
<Match>
5+
<Class name="~.*Test"/>
6+
</Match>
7+
8+
<!-- Exclude common false positives -->
9+
<Match>
10+
<BugPattern name="DM_CONVERT_CASE"/>
11+
</Match>
12+
13+
<!-- Exclude serialization related warnings -->
14+
<Match>
15+
<BugPattern name="SE_NO_SERIALVERSIONID"/>
16+
</Match>
17+
</FindBugsFilter>
Lines changed: 208 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,208 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask.azureblobpayloads;
4+
5+
import com.azure.core.credential.TokenCredential;
6+
import com.azure.core.http.rest.Response;
7+
import com.azure.core.util.Context;
8+
import com.azure.storage.blob.BlobClient;
9+
import com.azure.storage.blob.BlobContainerClient;
10+
import com.azure.storage.blob.BlobServiceClient;
11+
import com.azure.storage.blob.BlobServiceClientBuilder;
12+
import com.azure.storage.blob.models.BlobHttpHeaders;
13+
import com.azure.storage.blob.models.BlobStorageException;
14+
import com.azure.storage.blob.models.PublicAccessType;
15+
16+
import java.io.ByteArrayInputStream;
17+
import java.io.ByteArrayOutputStream;
18+
import java.io.IOException;
19+
import java.io.InputStream;
20+
import java.io.OutputStream;
21+
import java.nio.charset.StandardCharsets;
22+
import java.util.UUID;
23+
import java.util.zip.GZIPInputStream;
24+
import java.util.zip.GZIPOutputStream;
25+
26+
/**
27+
* Azure Blob Storage implementation of {@link PayloadStore}.
28+
* <p>
29+
* Stores payloads as blobs and returns opaque tokens in the form {@code blob:v1:<container>:<blobName>}.
30+
* Supports optional gzip compression. The blob container is created automatically on first upload.
31+
*/
32+
public final class BlobPayloadStore extends PayloadStore {
33+
34+
static final String TOKEN_PREFIX = "blob:v1:";
35+
private static final String CONTENT_ENCODING_GZIP = "gzip";
36+
37+
private final BlobContainerClient containerClient;
38+
private final LargePayloadStorageOptions options;
39+
40+
/**
41+
* Creates a new {@code BlobPayloadStore} from the given options.
42+
*
43+
* @param options the storage options
44+
* @throws IllegalArgumentException if neither connection string nor account URI/credential are provided
45+
*/
46+
public BlobPayloadStore(LargePayloadStorageOptions options) {
47+
if (options == null) {
48+
throw new IllegalArgumentException("options must not be null.");
49+
}
50+
51+
String containerName = options.getContainerName();
52+
if (containerName == null || containerName.isEmpty()) {
53+
throw new IllegalArgumentException("Container name must not be null or empty.");
54+
}
55+
56+
boolean hasConnectionString = options.getConnectionString() != null
57+
&& !options.getConnectionString().isEmpty();
58+
boolean hasIdentityAuth = options.getAccountUri() != null && options.getCredential() != null;
59+
60+
if (!hasConnectionString && !hasIdentityAuth) {
61+
throw new IllegalArgumentException(
62+
"Either ConnectionString or AccountUri and Credential must be provided.");
63+
}
64+
65+
BlobServiceClient serviceClient;
66+
if (hasIdentityAuth) {
67+
serviceClient = new BlobServiceClientBuilder()
68+
.endpoint(options.getAccountUri().toString())
69+
.credential(options.getCredential())
70+
.buildClient();
71+
} else {
72+
serviceClient = new BlobServiceClientBuilder()
73+
.connectionString(options.getConnectionString())
74+
.buildClient();
75+
}
76+
77+
this.containerClient = serviceClient.getBlobContainerClient(containerName);
78+
this.options = options;
79+
}
80+
81+
/**
82+
* Package-private constructor for testing with an injected {@link BlobContainerClient}.
83+
*/
84+
BlobPayloadStore(BlobContainerClient containerClient, LargePayloadStorageOptions options) {
85+
this.containerClient = containerClient;
86+
this.options = options;
87+
}
88+
89+
@Override
90+
public String upload(String payload) {
91+
String blobName = UUID.randomUUID().toString().replace("-", "");
92+
BlobClient blob = this.containerClient.getBlobClient(blobName);
93+
94+
byte[] payloadBytes = payload.getBytes(StandardCharsets.UTF_8);
95+
96+
// Ensure container exists (idempotent)
97+
try {
98+
this.containerClient.createIfNotExists();
99+
} catch (BlobStorageException e) {
100+
// 409 Conflict means it already exists — safe to ignore
101+
if (e.getStatusCode() != 409) {
102+
throw new PayloadStorageException(
103+
"Failed to create blob container '" + this.containerClient.getBlobContainerName() + "'.", e);
104+
}
105+
}
106+
107+
try {
108+
if (this.options.isCompressionEnabled()) {
109+
ByteArrayOutputStream compressedBuffer = new ByteArrayOutputStream();
110+
try (GZIPOutputStream gzip = new GZIPOutputStream(compressedBuffer)) {
111+
gzip.write(payloadBytes);
112+
}
113+
byte[] compressedBytes = compressedBuffer.toByteArray();
114+
BlobHttpHeaders headers = new BlobHttpHeaders().setContentEncoding(CONTENT_ENCODING_GZIP);
115+
try (InputStream stream = new ByteArrayInputStream(compressedBytes)) {
116+
blob.uploadWithResponse(
117+
stream,
118+
compressedBytes.length,
119+
null, // parallelTransferOptions
120+
headers,
121+
null, // metadata
122+
null, // tier
123+
null, // requestConditions
124+
null, // timeout
125+
Context.NONE);
126+
}
127+
} else {
128+
try (InputStream stream = new ByteArrayInputStream(payloadBytes)) {
129+
blob.upload(stream, payloadBytes.length, true);
130+
}
131+
}
132+
} catch (IOException e) {
133+
throw new PayloadStorageException("Failed to upload payload blob '" + blobName + "'.", e);
134+
}
135+
136+
return encodeToken(this.containerClient.getBlobContainerName(), blobName);
137+
}
138+
139+
@Override
140+
public String download(String token) {
141+
String[] decoded = decodeToken(token);
142+
String container = decoded[0];
143+
String name = decoded[1];
144+
145+
if (!container.equals(this.containerClient.getBlobContainerName())) {
146+
throw new IllegalArgumentException("Token container does not match configured container.");
147+
}
148+
149+
BlobClient blob = this.containerClient.getBlobClient(name);
150+
151+
try {
152+
ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
153+
blob.downloadStream(outputStream);
154+
byte[] rawBytes = outputStream.toByteArray();
155+
156+
// Check if the content is gzip-compressed by trying to read the gzip header
157+
String contentEncoding = blob.getProperties().getContentEncoding();
158+
boolean isGzip = CONTENT_ENCODING_GZIP.equalsIgnoreCase(contentEncoding);
159+
160+
if (isGzip) {
161+
try (GZIPInputStream gzip = new GZIPInputStream(new ByteArrayInputStream(rawBytes));
162+
ByteArrayOutputStream decompressedBuffer = new ByteArrayOutputStream()) {
163+
byte[] buffer = new byte[8192];
164+
int len;
165+
while ((len = gzip.read(buffer)) != -1) {
166+
decompressedBuffer.write(buffer, 0, len);
167+
}
168+
return decompressedBuffer.toString(StandardCharsets.UTF_8.name());
169+
}
170+
}
171+
172+
return new String(rawBytes, StandardCharsets.UTF_8);
173+
} catch (BlobStorageException e) {
174+
if (e.getStatusCode() == 404) {
175+
throw new PayloadStorageException(
176+
"The blob '" + name + "' was not found in container '" + container + "'. " +
177+
"The payload may have been deleted or the container was never created.", e);
178+
}
179+
throw new PayloadStorageException("Failed to download payload blob '" + name + "'.", e);
180+
} catch (IOException e) {
181+
throw new PayloadStorageException("Failed to decompress payload blob '" + name + "'.", e);
182+
}
183+
}
184+
185+
@Override
186+
public boolean isKnownPayloadToken(String value) {
187+
if (value == null || value.isEmpty()) {
188+
return false;
189+
}
190+
return value.startsWith(TOKEN_PREFIX);
191+
}
192+
193+
static String encodeToken(String container, String name) {
194+
return TOKEN_PREFIX + container + ":" + name;
195+
}
196+
197+
static String[] decodeToken(String token) {
198+
if (!token.startsWith(TOKEN_PREFIX)) {
199+
throw new IllegalArgumentException("Invalid external payload token.");
200+
}
201+
String rest = token.substring(TOKEN_PREFIX.length());
202+
int sep = rest.indexOf(':');
203+
if (sep <= 0 || sep >= rest.length() - 1) {
204+
throw new IllegalArgumentException("Invalid external payload token format.");
205+
}
206+
return new String[] { rest.substring(0, sep), rest.substring(sep + 1) };
207+
}
208+
}
Lines changed: 53 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,53 @@
1+
// Copyright (c) Microsoft Corporation. All rights reserved.
2+
// Licensed under the MIT License.
3+
package com.microsoft.durabletask.azureblobpayloads;
4+
5+
import com.microsoft.durabletask.DurableTaskGrpcClientBuilder;
6+
7+
/**
8+
* Extension methods for enabling externalized payload storage on a {@link DurableTaskGrpcClientBuilder}.
9+
* <p>
10+
* This mirrors the .NET {@code UseExternalizedPayloads()} extension method pattern.
11+
*/
12+
public final class LargePayloadClientExtensions {
13+
14+
private LargePayloadClientExtensions() {
15+
}
16+
17+
/**
18+
* Enables externalized payload storage using Azure Blob Storage for the specified client builder.
19+
* <p>
20+
* Creates a new {@link BlobPayloadStore} from the given options and registers the
21+
* {@link LargePayloadInterceptor} on the builder's gRPC channel.
22+
*
23+
* @param builder the client builder to configure
24+
* @param options the storage options
25+
* @return the builder, for call chaining
26+
*/
27+
public static DurableTaskGrpcClientBuilder useExternalizedPayloads(
28+
DurableTaskGrpcClientBuilder builder,
29+
LargePayloadStorageOptions options) {
30+
PayloadStore store = new BlobPayloadStore(options);
31+
builder.addInterceptor(new LargePayloadInterceptor(store, options));
32+
return builder;
33+
}
34+
35+
/**
36+
* Enables externalized payload storage using a pre-configured shared payload store
37+
* for the specified client builder.
38+
* <p>
39+
* This overload ensures the client and worker share the same {@link PayloadStore} instance.
40+
*
41+
* @param builder the client builder to configure
42+
* @param store the shared payload store
43+
* @param options the storage options (used for threshold/max-size configuration)
44+
* @return the builder, for call chaining
45+
*/
46+
public static DurableTaskGrpcClientBuilder useExternalizedPayloads(
47+
DurableTaskGrpcClientBuilder builder,
48+
PayloadStore store,
49+
LargePayloadStorageOptions options) {
50+
builder.addInterceptor(new LargePayloadInterceptor(store, options));
51+
return builder;
52+
}
53+
}

0 commit comments

Comments
 (0)