Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,9 @@
import java.security.NoSuchAlgorithmException;
import java.util.Comparator;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.stream.Stream;
import org.apache.directory.api.util.Hex;
import org.apache.commons.codec.binary.Hex;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.io.ByteStreams;
Expand Down Expand Up @@ -87,7 +86,7 @@ static String md5sum(InputStream is) throws IOException {
while ((numBytes = is.read(bytes)) != -1) {
md.update(bytes, 0, numBytes);
}
return new String(Hex.encodeHex(md.digest())).toUpperCase(Locale.ROOT);
return Hex.encodeHexString(md.digest(), false);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why is this changed?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

org.apache.directory.api.util.Hex is not available after switching to Hadoop 3. Also, I think it is more reasonable to use a function from Apache Commons for this purpose.

}

private static void inputStreamToFile(InputStream inputStream, File targetFile)
Expand Down
30 changes: 15 additions & 15 deletions build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -348,7 +348,7 @@ project(':iceberg-core') {
implementation libs.jackson.databind
implementation libs.caffeine
implementation libs.roaringbitmap
compileOnly(libs.hadoop2.client) {
compileOnly(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
Expand All @@ -373,7 +373,7 @@ project(':iceberg-data') {
implementation project(':iceberg-core')
compileOnly project(':iceberg-parquet')
compileOnly project(':iceberg-orc')
compileOnly(libs.hadoop2.common) {
compileOnly(libs.hadoop3.common) {
exclude group: 'commons-beanutils'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
Expand All @@ -396,7 +396,7 @@ project(':iceberg-data') {

compileOnly libs.avro.avro

testImplementation(libs.hadoop2.client) {
testImplementation(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
Expand Down Expand Up @@ -427,7 +427,7 @@ project(':iceberg-aliyun') {
compileOnly libs.jaxb.api
compileOnly libs.activation
compileOnly libs.jaxb.runtime
compileOnly(libs.hadoop2.common) {
compileOnly(libs.hadoop3.common) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'javax.servlet', module: 'servlet-api'
Expand Down Expand Up @@ -470,7 +470,7 @@ project(':iceberg-aws') {
compileOnly("software.amazon.awssdk:dynamodb")
compileOnly("software.amazon.awssdk:lakeformation")

compileOnly(libs.hadoop2.common) {
compileOnly(libs.hadoop3.common) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'javax.servlet', module: 'servlet-api'
Expand Down Expand Up @@ -572,7 +572,7 @@ project(':iceberg-delta-lake') {

compileOnly "io.delta:delta-standalone_${scalaVersion}:${libs.versions.delta.standalone.get()}"

compileOnly(libs.hadoop2.common) {
compileOnly(libs.hadoop3.common) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'javax.servlet', module: 'servlet-api'
Expand All @@ -584,7 +584,7 @@ project(':iceberg-delta-lake') {
if (sparkVersions.contains("3.5")) {
integrationImplementation "io.delta:delta-spark_${scalaVersion}:${libs.versions.delta.spark.get()}"
integrationImplementation project(path: ":iceberg-spark:iceberg-spark-3.5_${scalaVersion}")
integrationImplementation(libs.hadoop2.minicluster) {
integrationImplementation(libs.hadoop3.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
Expand Down Expand Up @@ -645,7 +645,7 @@ project(':iceberg-gcp') {
testImplementation project(path: ':iceberg-api', configuration: 'testArtifacts')
testImplementation project(path: ':iceberg-core', configuration: 'testArtifacts')

testImplementation(libs.hadoop2.common) {
testImplementation(libs.hadoop3.common) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
exclude group: 'javax.servlet', module: 'servlet-api'
Expand Down Expand Up @@ -722,7 +722,7 @@ project(':iceberg-hive-metastore') {
exclude group: 'com.zaxxer', module: 'HikariCP'
}

compileOnly(libs.hadoop2.client) {
compileOnly(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
Expand Down Expand Up @@ -754,12 +754,12 @@ project(':iceberg-orc') {
exclude group: 'org.apache.hive', module: 'hive-storage-api'
}

compileOnly(libs.hadoop2.common) {
compileOnly(libs.hadoop3.common) {
exclude group: 'commons-beanutils'
exclude group: 'org.apache.avro', module: 'avro'
exclude group: 'org.slf4j', module: 'slf4j-log4j12'
}
compileOnly(libs.hadoop2.client) {
compileOnly(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
}

Expand Down Expand Up @@ -788,7 +788,7 @@ project(':iceberg-parquet') {
}

compileOnly libs.avro.avro
compileOnly(libs.hadoop2.client) {
compileOnly(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
}

Expand Down Expand Up @@ -832,8 +832,8 @@ project(':iceberg-arrow') {
// We import :netty-common through :arrow-memory-netty
// so that the same version as used by the :arrow-memory-netty module is picked.
testImplementation libs.arrow.memory.netty
testImplementation libs.hadoop2.common
testImplementation libs.hadoop2.mapreduce.client.core
testImplementation libs.hadoop3.common
testImplementation libs.hadoop3.mapreduce.client.core
}
}

Expand All @@ -854,7 +854,7 @@ project(':iceberg-nessie') {
implementation libs.jackson.core
implementation libs.jackson.databind

compileOnly libs.hadoop2.common
compileOnly libs.hadoop3.common
// Only there to prevent "warning: unknown enum constant SchemaType.OBJECT" compile messages
compileOnly libs.microprofile.openapi.api

Expand Down
12 changes: 6 additions & 6 deletions flink/v1.18/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
compileOnly libs.flink118.connector.base
compileOnly libs.flink118.connector.files

compileOnly libs.hadoop2.hdfs
compileOnly libs.hadoop2.common
compileOnly(libs.hadoop2.minicluster) {
compileOnly libs.hadoop3.hdfs
compileOnly libs.hadoop3.common
compileOnly(libs.hadoop3.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
}

Expand Down Expand Up @@ -186,9 +186,9 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
integrationImplementation libs.flink118.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink118.get()}"

integrationImplementation libs.hadoop2.common
integrationImplementation libs.hadoop2.hdfs
integrationImplementation(libs.hadoop2.minicluster) {
integrationImplementation libs.hadoop3.common
integrationImplementation libs.hadoop3.hdfs
integrationImplementation(libs.hadoop3.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
}

Expand Down
12 changes: 6 additions & 6 deletions flink/v1.19/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
compileOnly libs.flink119.connector.base
compileOnly libs.flink119.connector.files

compileOnly libs.hadoop2.hdfs
compileOnly libs.hadoop2.common
compileOnly(libs.hadoop2.minicluster) {
compileOnly libs.hadoop3.hdfs
compileOnly libs.hadoop3.common
compileOnly(libs.hadoop3.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
}

Expand Down Expand Up @@ -187,9 +187,9 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
integrationImplementation libs.flink119.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink119.get()}"

integrationImplementation libs.hadoop2.common
integrationImplementation libs.hadoop2.hdfs
integrationImplementation(libs.hadoop2.minicluster) {
integrationImplementation libs.hadoop3.common
integrationImplementation libs.hadoop3.hdfs
integrationImplementation(libs.hadoop3.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
}

Expand Down
12 changes: 6 additions & 6 deletions flink/v1.20/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,9 @@ project(":iceberg-flink:iceberg-flink-${flinkMajorVersion}") {
compileOnly libs.flink120.connector.base
compileOnly libs.flink120.connector.files

compileOnly libs.hadoop2.hdfs
compileOnly libs.hadoop2.common
compileOnly(libs.hadoop2.minicluster) {
compileOnly libs.hadoop3.hdfs
compileOnly libs.hadoop3.common
compileOnly(libs.hadoop3.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
}

Expand Down Expand Up @@ -187,9 +187,9 @@ project(":iceberg-flink:iceberg-flink-runtime-${flinkMajorVersion}") {
integrationImplementation libs.flink120.table.api.java.bridge
integrationImplementation "org.apache.flink:flink-table-planner_${scalaVersion}:${libs.versions.flink120.get()}"

integrationImplementation libs.hadoop2.common
integrationImplementation libs.hadoop2.hdfs
integrationImplementation(libs.hadoop2.minicluster) {
integrationImplementation libs.hadoop3.common
integrationImplementation libs.hadoop3.hdfs
integrationImplementation(libs.hadoop3.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
}

Expand Down
9 changes: 3 additions & 6 deletions gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ flink119 = { strictly = "1.19.1"}
flink120 = { strictly = "1.20.0"}
google-libraries-bom = "26.54.0"
guava = "33.4.0-jre"
hadoop2 = "2.7.3"
hadoop3 = "3.4.1"
httpcomponents-httpclient5 = "5.4.2"
hive2 = { strictly = "2.3.10"} # see rich version usage explanation above
Expand Down Expand Up @@ -124,13 +123,11 @@ flink120-streaming-java = { module = "org.apache.flink:flink-streaming-java", ve
flink120-table-api-java-bridge = { module = "org.apache.flink:flink-table-api-java-bridge", version.ref = "flink120" }
google-libraries-bom = { module = "com.google.cloud:libraries-bom", version.ref = "google-libraries-bom" }
guava-guava = { module = "com.google.guava:guava", version.ref = "guava" }
hadoop2-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop2" }
hadoop2-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop2" }
hadoop2-hdfs = { module = "org.apache.hadoop:hadoop-hdfs", version.ref = "hadoop2" }
hadoop2-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-client-core", version.ref = "hadoop2" }
hadoop2-minicluster = { module = "org.apache.hadoop:hadoop-minicluster", version.ref = "hadoop2" }
hadoop3-client = { module = "org.apache.hadoop:hadoop-client", version.ref = "hadoop3" }
hadoop3-common = { module = "org.apache.hadoop:hadoop-common", version.ref = "hadoop3" }
hadoop3-hdfs = { module = "org.apache.hadoop:hadoop-hdfs", version.ref = "hadoop3" }
hadoop3-mapreduce-client-core = { module = "org.apache.hadoop:hadoop-mapreduce-client-core", version.ref = "hadoop3" }
hadoop3-minicluster = { module = "org.apache.hadoop:hadoop-minicluster", version.ref = "hadoop3" }
hive2-exec = { module = "org.apache.hive:hive-exec", version.ref = "hive2" }
hive2-metastore = { module = "org.apache.hive:hive-metastore", version.ref = "hive2" }
hive2-service = { module = "org.apache.hive:hive-service", version.ref = "hive2" }
Expand Down
2 changes: 1 addition & 1 deletion mr/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ project(':iceberg-mr') {
implementation project(':iceberg-orc')
implementation project(':iceberg-parquet')

compileOnly(libs.hadoop2.client) {
compileOnly(libs.hadoop3.client) {
exclude group: 'org.apache.avro', module: 'avro'
}

Expand Down
2 changes: 1 addition & 1 deletion spark/v3.4/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {

implementation libs.caffeine

testImplementation(libs.hadoop2.minicluster) {
testImplementation(libs.hadoop3.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
import org.junit.AfterClass;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -106,6 +107,13 @@ public static void startSpark() {
TestCompressionSettings.spark = SparkSession.builder().master("local[2]").getOrCreate();
}

@Before
public void resetSpecificConfigurations() {
spark.conf().unset(COMPRESSION_CODEC);
spark.conf().unset(COMPRESSION_LEVEL);
spark.conf().unset(COMPRESSION_STRATEGY);
}
Comment on lines +110 to +115
Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

spark.sql.iceberg.compression-level = 1 is not a valid compression level setting for gzip. If we don't unset all these options before running each test, the old configs set by the previous run will be left over and make gzip tests fail.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

for other reviewers: the same is already being done in the Spark 3.5 version of the test and was added by #11333


@Parameterized.AfterParam
public static void clearSourceCache() {
spark.sql(String.format("DROP TABLE IF EXISTS %s", TABLE_NAME));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -118,6 +120,7 @@ public void testStreamingWriteAppendMode() throws Exception {
// remove the last commit to force Spark to reprocess batch #1
File lastCommitFile = new File(checkpoint + "/commits/1");
Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete());
Files.deleteIfExists(Paths.get(checkpoint + "/commits/.1.crc"));
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do we need this now?

Copy link
Member Author

@Kontinuation Kontinuation Feb 21, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The .crc file will be renamed along with the main file since HADOOP-16255, deleting the main file without deleting the crc file will result in a failure when renaming to the main file again:

org.apache.hadoop.fs.FileAlreadyExistsException: Rename destination file:/var/folders/jw/nz45tb550rbgjkndd37m8rrh0000gn/T/junit-12664551068658781194/parquet/checkpoint/commits/.1.crc already exists.
	at org.apache.hadoop.fs.AbstractFileSystem.renameInternal(AbstractFileSystem.java:876)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:807)
	at org.apache.hadoop.fs.ChecksumFs.renameInternal(ChecksumFs.java:519)
	at org.apache.hadoop.fs.AbstractFileSystem.rename(AbstractFileSystem.java:807)
	at org.apache.hadoop.fs.FileContext.rename(FileContext.java:1044)
	at org.apache.spark.sql.execution.streaming.FileContextBasedCheckpointFileManager.renameTempFile(CheckpointFileManager.scala:372)
	at org.apache.spark.sql.execution.streaming.CheckpointFileManager$RenameBasedFSDataOutputStream.close(CheckpointFileManager.scala:154)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.write(HDFSMetadataLog.scala:204)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.addNewBatchByStream(HDFSMetadataLog.scala:237)
	at org.apache.spark.sql.execution.streaming.HDFSMetadataLog.add(HDFSMetadataLog.scala:130)
	at org.apache.spark.sql.execution.streaming.MicroBatchExecution.$anonfun$markMicroBatchEnd$1(MicroBatchExecution.scala:785)


// restart the query from the checkpoint
StreamingQuery restartedQuery = streamWriter.start();
Expand Down Expand Up @@ -178,6 +181,7 @@ public void testStreamingWriteCompleteMode() throws Exception {
// remove the last commit to force Spark to reprocess batch #1
File lastCommitFile = new File(checkpoint + "/commits/1");
Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete());
Files.deleteIfExists(Paths.get(checkpoint + "/commits/.1.crc"));

// restart the query from the checkpoint
StreamingQuery restartedQuery = streamWriter.start();
Expand Down Expand Up @@ -238,6 +242,7 @@ public void testStreamingWriteCompleteModeWithProjection() throws Exception {
// remove the last commit to force Spark to reprocess batch #1
File lastCommitFile = new File(checkpoint + "/commits/1");
Assert.assertTrue("The commit file must be deleted", lastCommitFile.delete());
Files.deleteIfExists(Paths.get(checkpoint + "/commits/.1.crc"));

// restart the query from the checkpoint
StreamingQuery restartedQuery = streamWriter.start();
Expand Down
2 changes: 1 addition & 1 deletion spark/v3.5/build.gradle
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ project(":iceberg-spark:iceberg-spark-${sparkMajorVersion}_${scalaVersion}") {

implementation libs.caffeine

testImplementation(libs.hadoop2.minicluster) {
testImplementation(libs.hadoop3.minicluster) {
exclude group: 'org.apache.avro', module: 'avro'
// to make sure netty libs only come from project(':iceberg-arrow')
exclude group: 'io.netty', module: 'netty-buffer'
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,9 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

import java.io.File;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
import org.apache.iceberg.PartitionSpec;
Expand Down Expand Up @@ -117,6 +119,7 @@ public void testStreamingWriteAppendMode() throws Exception {
// remove the last commit to force Spark to reprocess batch #1
File lastCommitFile = new File(checkpoint + "/commits/1");
assertThat(lastCommitFile.delete()).as("The commit file must be deleted").isTrue();
Files.deleteIfExists(Paths.get(checkpoint + "/commits/.1.crc"));

// restart the query from the checkpoint
StreamingQuery restartedQuery = streamWriter.start();
Expand Down Expand Up @@ -178,6 +181,7 @@ public void testStreamingWriteCompleteMode() throws Exception {
// remove the last commit to force Spark to reprocess batch #1
File lastCommitFile = new File(checkpoint + "/commits/1");
assertThat(lastCommitFile.delete()).as("The commit file must be deleted").isTrue();
Files.deleteIfExists(Paths.get(checkpoint + "/commits/.1.crc"));

// restart the query from the checkpoint
StreamingQuery restartedQuery = streamWriter.start();
Expand Down Expand Up @@ -239,6 +243,7 @@ public void testStreamingWriteCompleteModeWithProjection() throws Exception {
// remove the last commit to force Spark to reprocess batch #1
File lastCommitFile = new File(checkpoint + "/commits/1");
assertThat(lastCommitFile.delete()).as("The commit file must be deleted").isTrue();
Files.deleteIfExists(Paths.get(checkpoint + "/commits/.1.crc"));

// restart the query from the checkpoint
StreamingQuery restartedQuery = streamWriter.start();
Expand Down