Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.flink.annotation.Internal;
import org.apache.flink.configuration.GlobalConfiguration;
import org.apache.flink.table.catalog.AbstractCatalog;
import org.apache.flink.table.catalog.CatalogBaseTable;
import org.apache.flink.table.catalog.CatalogDatabase;
Expand Down Expand Up @@ -111,7 +112,10 @@ public FlinkCatalog(
long cacheExpirationIntervalMs) {
super(catalogName, defaultDatabase);
this.catalogLoader = catalogLoader;
this.catalogProps = catalogProps;
this.catalogProps =
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If this property is needed in order to create Iceberg catalog, then filtering it here breaks CREATE TABLE LIKE functionality.
https://github.com/apache/iceberg/blob/main/flink/v2.1/flink/src/main/java/org/apache/iceberg/flink/FlinkDynamicTableFactory.java#L190-L204

catalogProps.entrySet().stream()
.filter(e -> !GlobalConfiguration.isSensitive(e.getKey()))
.collect(ImmutableMap.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue));
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seems like a reasonable short-term fix to me.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ConfigurationUtils.hideSensitiveValues is even more simple.

Copy link
Copy Markdown
Contributor

@talatuyarer talatuyarer May 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@pvary The isSensitive list doesn't cover all Iceberg configurations. For example, the credential keyword used for REST catalog OAuth2 client credentials or S3 access key was only added to Flink's list in a later release than the one we're using. See https://github.com/apache/flink/blob/release-2.1.1/flink-core/src/main/java/org/apache/flink/configuration/GlobalConfiguration.java#L49

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is true for ConfigurationUtils.hideSensitiveValues also...

this.baseNamespace = baseNamespace;
this.cacheEnabled = cacheEnabled;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.flink.table.catalog.CommonCatalogOptions;
import org.apache.flink.table.catalog.ObjectPath;
import org.apache.flink.table.catalog.exceptions.TableNotExistException;
import org.apache.flink.util.ArrayUtils;
import org.apache.iceberg.BaseTable;
import org.apache.iceberg.ContentFile;
import org.apache.iceberg.DataFile;
Expand Down Expand Up @@ -202,45 +203,177 @@ public void testCreateTableLikeInDiffIcebergCatalog() throws TableNotExistExcept

String catalog2 = catalogName + "2";
sql("CREATE CATALOG %s WITH %s", catalog2, toWithClause(config));
sql("CREATE DATABASE %s", catalog2 + ".testdb");
sql("CREATE TABLE %s LIKE tl", catalog2 + ".testdb.tl2");
try {
sql("CREATE DATABASE %s.testdb", catalog2);
sql("CREATE TABLE %s.testdb.tl2 LIKE tl", catalog2);

CatalogTable catalogTable = catalogTable(catalog2, "testdb", "tl2");
assertThat(catalogTable.getUnresolvedSchema())
.isEqualTo(
org.apache.flink.table.api.Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.build());
} finally {
sql("DROP TABLE IF EXISTS %s.testdb.tl2", catalog2);
sql("DROP DATABASE IF EXISTS %s.testdb", catalog2);
dropCatalog(catalog2, true);
}
}

CatalogTable catalogTable = catalogTable(catalog2, "testdb", "tl2");
assertThat(catalogTable.getUnresolvedSchema())
.isEqualTo(
org.apache.flink.table.api.Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.build());
@TestTemplate
public void testCreateTableLikeCopiesTableProperties() {
sql("CREATE TABLE tl(id BIGINT) WITH ('k1'='v1', 'k2'='v2')");
sql("CREATE TABLE tl2 LIKE tl");

Table source = table("tl");
Table copy = table("tl2");
assertThat(copy.properties()).isEqualTo(source.properties());
}

@TestTemplate
public void testCreateTableLikeInDiffIcebergCatalogCopiesTableProperties() {
sql("CREATE TABLE tl(id BIGINT) WITH ('k1'='v1', 'k2'='v2')");

String catalog2 = catalogName + "2";
sql("CREATE CATALOG %s WITH %s", catalog2, toWithClause(config));
try {
sql("CREATE DATABASE %s.testdb", catalog2);
sql("CREATE TABLE %s.testdb.tl2 LIKE tl", catalog2);

Table source = table("tl");
Table copy =
validationCatalog.loadTable(
TableIdentifier.of(
ArrayUtils.concat(baseNamespace.levels(), new String[] {"testdb", "tl2"})));
assertThat(copy.properties()).isEqualTo(source.properties());
} finally {
sql("DROP TABLE IF EXISTS %s.testdb.tl2", catalog2);
sql("DROP DATABASE IF EXISTS %s.testdb", catalog2);
dropCatalog(catalog2, true);
}
}

dropCatalog(catalog2, true);
@TestTemplate
public void testShowCreateTableRetractsSensitiveCatalogProperties() {
String otherCatalog = catalogName + "_other";
String sensitiveKey = "my.password";
String sensitiveValue = "super-secret-value";
String benignKey = "my.harmless.option";
String benignValue = "safe-to-show";

Map<String, String> leakyConfig = Maps.newHashMap(config);
leakyConfig.put(sensitiveKey, sensitiveValue);
leakyConfig.put(benignKey, benignValue);

sql("CREATE CATALOG %s WITH %s", otherCatalog, toWithClause(leakyConfig));
try {
sql("CREATE DATABASE %s.testdb", otherCatalog);
sql("CREATE TABLE %s.testdb.tl(id BIGINT)", otherCatalog);

String showCreate =
(String)
Iterables.getOnlyElement(sql("SHOW CREATE TABLE %s.testdb.tl", otherCatalog))
.getField(0);

assertThat(showCreate)
.as("SHOW CREATE TABLE should still expose the synthetic src-catalog options blob")
.contains(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY)
.contains(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)
.as("Benign catalog property should still be present")
.contains(benignValue)
.as("Sensitive catalog property value must not be there")
.doesNotContain(sensitiveValue);
} finally {
sql("DROP TABLE IF EXISTS %s.testdb.tl", otherCatalog);
sql("DROP DATABASE IF EXISTS %s.testdb", otherCatalog);
dropCatalog(otherCatalog, true);
}
}

@TestTemplate
public void testCreateTableLikeInFlinkCatalogFiltersSensitiveCatalogProperties()
throws TableNotExistException {
String sourceCatalog = catalogName + "_with_secrets";
String sensitiveKey = "my.password";
String sensitiveValue = "super-secret-value";
String benignKey = "my.harmless.option";
String benignValue = "safe-to-show";

Map<String, String> sourceCatalogConfig = Maps.newHashMap(config);
sourceCatalogConfig.put(sensitiveKey, sensitiveValue);
sourceCatalogConfig.put(benignKey, benignValue);

sql("CREATE CATALOG %s WITH %s", sourceCatalog, toWithClause(sourceCatalogConfig));
try {
sql("CREATE DATABASE %s.testdb", sourceCatalog);
sql("CREATE TABLE %s.testdb.tl(id BIGINT)", sourceCatalog);

// CREATE TABLE LIKE from the Iceberg-backed source catalog into Flink's default
// (non-Iceberg) in-memory catalog. The target catalog will persist the table options
// map verbatim, so anything FlinkCatalog#getTable injects will be visible there.
sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE %s.testdb.tl", sourceCatalog);

CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2");
Map<String, String> options = catalogTable.getOptions();

assertThat(options)
.as("Connector and src-catalog props are required for the LIKE-target to work")
.containsKey(FlinkCreateTableOptions.CONNECTOR_PROPS_KEY)
.containsKey(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY);

String srcCatalogProps = options.get(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY);
assertThat(srcCatalogProps)
.as("Benign source-catalog property should still be propagated")
.contains(benignValue)
.as("Sensitive source-catalog property value must not leak into the target catalog")
.doesNotContain(sensitiveValue);

// Belt-and-braces: also check the rendered DDL on the target side.
String showCreate =
(String)
Iterables.getOnlyElement(
sql("SHOW CREATE TABLE `default_catalog`.`default_database`.tl2"))
.getField(0);
assertThat(showCreate)
.as("SHOW CREATE TABLE on the target catalog must not contain the secret")
.doesNotContain(sensitiveValue);
} finally {
sql("DROP TABLE IF EXISTS `default_catalog`.`default_database`.tl2");
sql("DROP TABLE IF EXISTS %s.testdb.tl", sourceCatalog);
sql("DROP DATABASE IF EXISTS %s.testdb", sourceCatalog);
dropCatalog(sourceCatalog, true);
}
}

@TestTemplate
public void testCreateTableLikeInFlinkCatalog() throws TableNotExistException {
sql("CREATE TABLE tl(id BIGINT)");

sql("CREATE TABLE `default_catalog`.`default_database`.tl2 LIKE tl");

CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2");
assertThat(catalogTable.getUnresolvedSchema())
.isEqualTo(
org.apache.flink.table.api.Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.build());

// `type` option is filtered out by Flink
// https://github.com/apache/flink/blob/edc3d68736de73665440f4313ddcfd9142d8d42b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java#L378
Map<String, String> filteredOptions = Maps.newHashMap(config);
filteredOptions.remove(CommonCatalogOptions.CATALOG_TYPE.key());

String srcCatalogProps =
FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", filteredOptions);
Map<String, String> options = catalogTable.getOptions();
assertThat(options)
.containsEntry(
FlinkCreateTableOptions.CONNECTOR_PROPS_KEY,
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)
.containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps);
try {
CatalogTable catalogTable = catalogTable("default_catalog", "default_database", "tl2");
assertThat(catalogTable.getUnresolvedSchema())
.isEqualTo(
org.apache.flink.table.api.Schema.newBuilder()
.column("id", DataTypes.BIGINT())
.build());

// `type` option is filtered out by Flink
// https://github.com/apache/flink/blob/edc3d68736de73665440f4313ddcfd9142d8d42b/flink-table/flink-table-common/src/main/java/org/apache/flink/table/factories/FactoryUtil.java#L378
Map<String, String> filteredOptions = Maps.newHashMap(config);
filteredOptions.remove(CommonCatalogOptions.CATALOG_TYPE.key());

String srcCatalogProps =
FlinkCreateTableOptions.toJson(catalogName, DATABASE, "tl", filteredOptions);
Map<String, String> options = catalogTable.getOptions();
assertThat(options)
.containsEntry(
FlinkCreateTableOptions.CONNECTOR_PROPS_KEY,
FlinkDynamicTableFactory.FACTORY_IDENTIFIER)
.containsEntry(FlinkCreateTableOptions.SRC_CATALOG_PROPS_KEY, srcCatalogProps);
} finally {
sql("DROP TABLE IF EXISTS `default_catalog`.`default_database`.tl2");
}
}

@TestTemplate
Expand Down