Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
22 commits
Select commit Hold shift + click to select a range
b209631
Actually make StandardEncryptionManager serializable.
iamthomaspowell Dec 3, 2025
39fa0ae
unit tests for serializable KeyManagementClient
iamthomaspowell Dec 3, 2025
b1b0253
code review comments
iamthomaspowell Dec 8, 2025
88b526a
Merge commit 'a739cb3db89ad94a3525443cef9357ccb1897d3f' into tom-s-po…
iamthomaspowell Dec 8, 2025
d334c27
code review comment for GcpKeyManagementClient
iamthomaspowell Dec 8, 2025
45cf911
spotless
iamthomaspowell Dec 8, 2025
2395048
Merge branch 'main' into tom-s-powell/encrypted-serializable
iamthomaspowell Dec 17, 2025
3ab06c3
Merge branch 'main' into tom-s-powell/encrypted-serializable
iamthomaspowell Jan 6, 2026
5bf1a4c
make AzureKeyManagementClient serializable
iamthomaspowell Jan 6, 2026
042c831
Empty commit
iamthomaspowell Jan 7, 2026
b350593
fix
iamthomaspowell Jan 7, 2026
cd7b9a5
Merge branch 'main' into tom-s-powell/encrypted-serializable
iamthomaspowell Jan 7, 2026
fc6973d
add test
iamthomaspowell Jan 7, 2026
6aa39b8
fix checkstyle
iamthomaspowell Jan 8, 2026
c247c3b
fix
iamthomaspowell Jan 8, 2026
6d80689
Merge branch 'main' into tom-s-powell/encrypted-serializable
iamthomaspowell Jan 13, 2026
fc4b036
add test for Spark 4.1
iamthomaspowell Jan 13, 2026
762a69e
update serializer tests
iamthomaspowell Jan 16, 2026
f343a37
Empty commit
iamthomaspowell Jan 16, 2026
7072bec
code review suggestions
iamthomaspowell Jan 16, 2026
95fdd37
fix checkstyle
iamthomaspowell Jan 16, 2026
8f1f9c6
fix RuntimeIOException
iamthomaspowell Jan 16, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,11 @@
*/
package org.apache.iceberg.encryption;

import java.io.Serializable;
import java.nio.ByteBuffer;
import java.util.Map;

public interface EncryptedKey {
public interface EncryptedKey extends Serializable {
String keyId();

ByteBuffer encryptedKeyMetadata();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@

import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.AfterAll;
Expand All @@ -31,13 +32,15 @@
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.junit.jupiter.params.provider.NullSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.CreateKeyRequest;
import software.amazon.awssdk.services.kms.model.CreateKeyResponse;
import software.amazon.awssdk.services.kms.model.DataKeySpec;
import software.amazon.awssdk.services.kms.model.EncryptionAlgorithmSpec;
import software.amazon.awssdk.services.kms.model.KeySpec;
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionRequest;
import software.amazon.awssdk.services.kms.model.ScheduleKeyDeletionResponse;
Expand Down Expand Up @@ -91,13 +94,42 @@ public void testKeyWrapping() {
try (AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient()) {
keyManagementClient.initialize(ImmutableMap.of());

ByteBuffer key = ByteBuffer.wrap(new String("super-secret-table-master-key").getBytes());
ByteBuffer key = ByteBuffer.wrap("super-secret-table-master-key".getBytes());
ByteBuffer encryptedKey = keyManagementClient.wrapKey(key, keyId);

assertThat(keyManagementClient.unwrapKey(encryptedKey, keyId)).isEqualTo(key);
}
}

@ParameterizedTest
@MethodSource("org.apache.iceberg.TestHelpers#serializers")
public void testSerialization(
TestHelpers.RoundTripSerializer<AwsKeyManagementClient> roundTripSerializer)
throws Exception {
try (AwsKeyManagementClient keyManagementClient = new AwsKeyManagementClient()) {
keyManagementClient.initialize(
ImmutableMap.of(
AwsProperties.KMS_ENCRYPTION_ALGORITHM_SPEC,
EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256.toString(),
AwsProperties.KMS_DATA_KEY_SPEC,
DataKeySpec.AES_128.toString()));
assertThat(keyManagementClient.encryptionAlgorithmSpec())
.isEqualTo(EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256);
assertThat(keyManagementClient.dataKeySpec()).isEqualTo(DataKeySpec.AES_128);

AwsKeyManagementClient result = roundTripSerializer.apply(keyManagementClient);

ByteBuffer key = ByteBuffer.wrap("super-secret-table-master-key".getBytes());
ByteBuffer encryptedKey = result.wrapKey(key, keyId);

assertThat(keyManagementClient.unwrapKey(encryptedKey, keyId)).isEqualTo(key);
assertThat(result.unwrapKey(encryptedKey, keyId)).isEqualTo(key);
assertThat(result.encryptionAlgorithmSpec())
.isEqualTo(EncryptionAlgorithmSpec.RSAES_OAEP_SHA_256);
assertThat(result.dataKeySpec()).isEqualTo(DataKeySpec.AES_128);
}
}

@ParameterizedTest
@NullSource
@EnumSource(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,9 @@

import java.nio.ByteBuffer;
import java.util.Map;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.util.SerializableMap;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.kms.KmsClient;
import software.amazon.awssdk.services.kms.model.DataKeySpec;
Expand All @@ -39,14 +41,17 @@
*/
public class AwsKeyManagementClient implements KeyManagementClient {

private KmsClient kmsClient;
private final AtomicBoolean isResourceClosed = new AtomicBoolean(false);

private Map<String, String> allProperties;
private EncryptionAlgorithmSpec encryptionAlgorithmSpec;
private DataKeySpec dataKeySpec;

private transient volatile KmsClient kmsClient;

@Override
public void initialize(Map<String, String> properties) {
AwsClientFactory clientFactory = AwsClientFactories.from(properties);
this.kmsClient = clientFactory.kms();
this.allProperties = SerializableMap.copyOf(properties);

AwsProperties awsProperties = new AwsProperties(properties);
this.encryptionAlgorithmSpec = awsProperties.kmsEncryptionAlgorithmSpec();
Expand All @@ -62,7 +67,7 @@ public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) {
.plaintext(SdkBytes.fromByteBuffer(key))
.build();

EncryptResponse result = kmsClient.encrypt(request);
EncryptResponse result = kmsClient().encrypt(request);
return result.ciphertextBlob().asByteBuffer();
}

Expand All @@ -76,11 +81,9 @@ public KeyGenerationResult generateKey(String wrappingKeyId) {
GenerateDataKeyRequest request =
GenerateDataKeyRequest.builder().keyId(wrappingKeyId).keySpec(dataKeySpec).build();

GenerateDataKeyResponse response = kmsClient.generateDataKey(request);
KeyGenerationResult result =
new KeyGenerationResult(
response.plaintext().asByteBuffer(), response.ciphertextBlob().asByteBuffer());
return result;
GenerateDataKeyResponse response = kmsClient().generateDataKey(request);
return new KeyGenerationResult(
response.plaintext().asByteBuffer(), response.ciphertextBlob().asByteBuffer());
}

@Override
Expand All @@ -92,14 +95,36 @@ public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) {
.ciphertextBlob(SdkBytes.fromByteBuffer(wrappedKey))
.build();

DecryptResponse result = kmsClient.decrypt(request);
DecryptResponse result = kmsClient().decrypt(request);
return result.plaintext().asByteBuffer();
}

@Override
public void close() {
if (kmsClient != null) {
kmsClient.close();
if (isResourceClosed.compareAndSet(false, true)) {
if (kmsClient != null) {
kmsClient.close();
}
}
}

EncryptionAlgorithmSpec encryptionAlgorithmSpec() {
return encryptionAlgorithmSpec;
}

DataKeySpec dataKeySpec() {
return dataKeySpec;
}

private KmsClient kmsClient() {
if (kmsClient == null) {
synchronized (this) {
if (kmsClient == null) {
AwsClientFactory clientFactory = AwsClientFactories.from(allProperties);
kmsClient = clientFactory.kms();
}
}
}
return kmsClient;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,35 +27,38 @@
import com.azure.security.keyvault.keys.models.KeyType;
import java.nio.ByteBuffer;
import java.time.Duration;
import org.apache.iceberg.TestHelpers;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableMap;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariable;
import org.junit.jupiter.api.condition.EnabledIfEnvironmentVariables;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

@EnabledIfEnvironmentVariables({
@EnabledIfEnvironmentVariable(named = "AZURE_KEYVAULT_URL", matches = ".*")
})
public class TestAzureKeyManagementClient {
private static final String ICEBERG_TEST_KEY_NAME = "iceberg-test-key";

private static KeyClient keyClient;
private static final String KEY_VAULT_URI = System.getenv("AZURE_KEYVAULT_URL");

private static KeyManagementClient azureKeyManagementClient;
private static KeyClient keyClient;

@BeforeAll
public static void beforeClass() {
String keyVaultUri = System.getenv("AZURE_KEYVAULT_URL");
keyClient =
new KeyClientBuilder()
.vaultUrl(keyVaultUri)
.vaultUrl(KEY_VAULT_URI)
.credential(new DefaultAzureCredentialBuilder().build())
.buildClient();
keyClient.createKey(ICEBERG_TEST_KEY_NAME, KeyType.RSA);
azureKeyManagementClient = new AzureKeyManagementClient();
azureKeyManagementClient.initialize(ImmutableMap.of(AZURE_KEYVAULT_URL, keyVaultUri));
azureKeyManagementClient.initialize(ImmutableMap.of(AZURE_KEYVAULT_URL, KEY_VAULT_URI));
}

@AfterAll
Expand All @@ -81,4 +84,22 @@ public void keyWrapping() {
public void keyGenerationNotSupported() {
assertThat(azureKeyManagementClient.supportsKeyGeneration()).isFalse();
}

@ParameterizedTest
@MethodSource("org.apache.iceberg.TestHelpers#serializers")
public void testSerialization(
TestHelpers.RoundTripSerializer<AzureKeyManagementClient> roundTripSerializer)
throws Exception {
try (AzureKeyManagementClient keyManagementClient = new AzureKeyManagementClient()) {
keyManagementClient.initialize(ImmutableMap.of(AZURE_KEYVAULT_URL, KEY_VAULT_URI));

AzureKeyManagementClient result = roundTripSerializer.apply(keyManagementClient);

ByteBuffer key = ByteBuffer.wrap("super-secret-table-master-key".getBytes());
ByteBuffer encryptedKey = result.wrapKey(key, ICEBERG_TEST_KEY_NAME);

assertThat(keyManagementClient.unwrapKey(encryptedKey, ICEBERG_TEST_KEY_NAME)).isEqualTo(key);
assertThat(result.unwrapKey(encryptedKey, ICEBERG_TEST_KEY_NAME)).isEqualTo(key);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,40 +29,81 @@
import org.apache.iceberg.azure.AzureProperties;
import org.apache.iceberg.encryption.KeyManagementClient;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.SerializableMap;

/** Azure key management client which connects to Azure Key Vault. */
public class AzureKeyManagementClient implements KeyManagementClient {
private KeyClient keyClient;
private KeyWrapAlgorithm keyWrapAlgorithm;

private Map<String, String> allProperties;

private transient volatile ClientState state;

@Override
public void initialize(Map<String, String> properties) {
AzureProperties azureProperties = new AzureProperties(properties);

this.keyWrapAlgorithm = azureProperties.keyWrapAlgorithm();
KeyClientBuilder keyClientBuilder = new KeyClientBuilder();
azureProperties.keyVaultUrl().ifPresent(keyClientBuilder::vaultUrl);
this.keyClient =
keyClientBuilder
.credential(AdlsTokenCredentialProviders.from(properties).credential())
.buildClient();
this.allProperties = SerializableMap.copyOf(properties);
}

@Override
public ByteBuffer wrapKey(ByteBuffer key, String wrappingKeyId) {
WrapResult wrapResult =
keyClient
keyClient()
.getCryptographyClient(wrappingKeyId)
.wrapKey(keyWrapAlgorithm, ByteBuffers.toByteArray(key));
.wrapKey(keyWrapAlgorithm(), ByteBuffers.toByteArray(key));
return ByteBuffer.wrap(wrapResult.getEncryptedKey());
}

@Override
public ByteBuffer unwrapKey(ByteBuffer wrappedKey, String wrappingKeyId) {
UnwrapResult unwrapResult =
keyClient
keyClient()
.getCryptographyClient(wrappingKeyId)
.unwrapKey(keyWrapAlgorithm, ByteBuffers.toByteArray(wrappedKey));
.unwrapKey(keyWrapAlgorithm(), ByteBuffers.toByteArray(wrappedKey));
return ByteBuffer.wrap(unwrapResult.getKey());
}

private KeyClient keyClient() {
return state().keyClient();
}

private KeyWrapAlgorithm keyWrapAlgorithm() {
return state().keyWrapAlgorithm();
}

private ClientState state() {
if (state == null) {
synchronized (this) {
if (state == null) {
AzureProperties azureProperties = new AzureProperties(allProperties);
KeyClientBuilder keyClientBuilder = new KeyClientBuilder();
azureProperties.keyVaultUrl().ifPresent(keyClientBuilder::vaultUrl);
KeyClient keyClient =
keyClientBuilder
.credential(AdlsTokenCredentialProviders.from(allProperties).credential())
.buildClient();
KeyWrapAlgorithm keyWrapAlgorithm = azureProperties.keyWrapAlgorithm();
state = new ClientState(keyClient, keyWrapAlgorithm);
}
}
}
return state;
}

private static class ClientState {

private final KeyClient keyClient;
private final KeyWrapAlgorithm keyWrapAlgorithm;

ClientState(KeyClient keyClient, KeyWrapAlgorithm keyWrapAlgorithm) {
this.keyClient = keyClient;
this.keyWrapAlgorithm = keyWrapAlgorithm;
}

KeyClient keyClient() {
return keyClient;
}

KeyWrapAlgorithm keyWrapAlgorithm() {
return keyWrapAlgorithm;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,10 +21,12 @@
import java.nio.ByteBuffer;
import java.util.Map;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.ByteBuffers;
import org.apache.iceberg.util.SerializableMap;

public class BaseEncryptedKey implements EncryptedKey {
private final String keyId;
private final ByteBuffer keyMetadata;
private final byte[] keyMetadata;
private final String encryptedById;
private final Map<String, String> properties;

Expand All @@ -33,9 +35,9 @@ public BaseEncryptedKey(
Preconditions.checkArgument(keyId != null, "Key id cannot be null");
Preconditions.checkArgument(keyMetadata != null, "Encrypted key metadata cannot be null");
this.keyId = keyId;
this.keyMetadata = keyMetadata;
this.keyMetadata = ByteBuffers.toByteArray(keyMetadata);
this.encryptedById = encryptedById;
this.properties = properties;
this.properties = SerializableMap.copyOf(properties);
}

@Override
Expand All @@ -45,7 +47,7 @@ public String keyId() {

@Override
public ByteBuffer encryptedKeyMetadata() {
return keyMetadata;
return ByteBuffer.wrap(keyMetadata);
}

@Override
Expand Down
Loading