Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -170,6 +170,7 @@ private void merge(

Parquet.WriteBuilder writeBuilder =
Parquet.write(outputFile)
.setAll(table.properties())
.overwrite(false)
.createWriterFunc(GenericParquetWriter::buildWriter)
.metricsConfig(metricsConfig)
Expand Down
6 changes: 6 additions & 0 deletions ice/src/main/java/com/altinity/ice/cli/Main.java
Original file line number Diff line number Diff line change
Expand Up @@ -414,6 +414,11 @@ void insert(
description = "Number of threads to use for inserting data",
defaultValue = "-1")
int threadCount,
@CommandLine.Option(
names = {"--compression"},
description =
"Parquet compression codec: gzip (default), zstd, snappy, lz4, brotli, uncompressed, or as-source")
String compression,
@CommandLine.Option(
names = {"--watch"},
description = "Event queue. Supported: AWS SQS")
Expand Down Expand Up @@ -506,6 +511,7 @@ void insert(
.sortOrderList(sortOrders)
.threadCount(
threadCount < 1 ? Runtime.getRuntime().availableProcessors() : threadCount)
.compression(compression)
.build();

if (!watchMode) {
Expand Down
178 changes: 121 additions & 57 deletions ice/src/main/java/com/altinity/ice/cli/internal/cmd/Insert.java
Original file line number Diff line number Diff line change
Expand Up @@ -28,8 +28,10 @@
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Locale;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
Expand Down Expand Up @@ -76,6 +78,7 @@
import org.apache.iceberg.parquet.Parquet;
import org.apache.iceberg.parquet.ParquetUtil;
import org.apache.iceberg.rest.RESTCatalog;
import org.apache.parquet.hadoop.metadata.CompressionCodecName;
import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.slf4j.Logger;
Expand Down Expand Up @@ -108,6 +111,19 @@ public static Result run(
return new Result(0, 0);
}

if (options.compression() != null) {
Set<String> valid =
Arrays.stream(CompressionCodecName.values())
.map(c -> c.name().toLowerCase(Locale.ENGLISH))
.collect(Collectors.toCollection(HashSet::new));
valid.add("as-source");
if (!valid.contains(options.compression().toLowerCase(Locale.ENGLISH))) {
String accepted = String.join(", ", new TreeSet<>(valid));
throw new IllegalArgumentException(
"Unknown --compression value: " + options.compression() + ". Accepted: " + accepted);
}
}

Table table = catalog.loadTable(nsTable);

// Create transaction and pass it to updatePartitionAndSortOrderMetadata
Expand Down Expand Up @@ -501,66 +517,92 @@ private static List<DataFile> processFile(
.build());
dataFileSizeInBytes = inputFile.getLength();
dataFile = dstDataFile;
} else if (partitionSpec.isPartitioned() && partitionKey == null) {
return copyPartitionedAndSorted(
file,
tableSchema,
partitionSpec,
sortOrder,
metricsConfig,
tableIO,
inputFile,
dstDataFileSource);
} else if (sortOrder.isSorted() && !sorted) {
return Collections.singletonList(
copySorted(
file,
dstDataFileSource.get(file),
tableSchema,
partitionSpec,
sortOrder,
metricsConfig,
tableIO,
inputFile,
dataFileNamingStrategy,
partitionKey));
} else {
// Table isn't partitioned or sorted. Copy as is.
String dstDataFile = dstDataFileSource.get(file);
if (checkNotExists.apply(dstDataFile)) {
return Collections.emptyList();
// Copy path: compute compression override from CLI or as-source
String compressionCodecOverride = null;
if (options.compression() != null) {
if ("as-source".equalsIgnoreCase(options.compression())) {
var blocks = metadata.getBlocks();
if (!blocks.isEmpty()) {
compressionCodecOverride =
blocks.get(0).getColumns().get(0).getCodec().name().toLowerCase();
}
} else {
compressionCodecOverride = options.compression().toLowerCase();
}
}
OutputFile outputFile =
tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://"));
// TODO: support transferTo below (note that compression, etc. might be different)
// try (var d = outputFile.create()) {
// try (var s = inputFile.newStream()) { s.transferTo(d); }
// }
Parquet.ReadBuilder readBuilder =
Parquet.read(inputFile)
.createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
.project(tableSchema)
.reuseContainers();

Parquet.WriteBuilder writeBuilder =
Parquet.write(outputFile)
.overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL)
.createWriterFunc(GenericParquetWriter::buildWriter)
.metricsConfig(metricsConfig)
.schema(tableSchema);
if (partitionSpec.isPartitioned() && partitionKey == null) {
return copyPartitionedAndSorted(
file,
tableSchema,
partitionSpec,
sortOrder,
metricsConfig,
tableIO,
inputFile,
dstDataFileSource,
table.properties(),
compressionCodecOverride);
} else if (sortOrder.isSorted() && !sorted) {
return Collections.singletonList(
copySorted(
file,
dstDataFileSource.get(file),
tableSchema,
partitionSpec,
sortOrder,
metricsConfig,
tableIO,
inputFile,
dataFileNamingStrategy,
partitionKey,
table.properties(),
compressionCodecOverride));
} else {
// Table isn't partitioned or sorted. Copy as is.
String dstDataFile = dstDataFileSource.get(file);
if (checkNotExists.apply(dstDataFile)) {
return Collections.emptyList();
}
OutputFile outputFile =
tableIO.newOutputFile(Strings.replacePrefix(dstDataFile, "s3://", "s3a://"));
// TODO: support transferTo below (note that compression, etc. might be different)
// try (var d = outputFile.create()) {
// try (var s = inputFile.newStream()) { s.transferTo(d); }
// }
Parquet.ReadBuilder readBuilder =
Parquet.read(inputFile)
.createReaderFunc(s -> GenericParquetReaders.buildReader(tableSchema, s))
.project(tableSchema)
.reuseContainers();

Parquet.WriteBuilder writeBuilder =
Parquet.write(outputFile)
.setAll(table.properties())
.overwrite(dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL)
.createWriterFunc(GenericParquetWriter::buildWriter)
.metricsConfig(metricsConfig)
.schema(tableSchema);
if (compressionCodecOverride != null) {

writeBuilder =
writeBuilder.set(TableProperties.PARQUET_COMPRESSION, compressionCodecOverride);
}

logger.info("{}: copying to {}", file, dstDataFile);
logger.info("{}: copying to {}", file, dstDataFile);

try (CloseableIterable<Record> parquetReader = readBuilder.build()) {
try (FileAppender<Record> writer = writeBuilder.build()) {
writer.addAll(parquetReader);
writer.close(); // for write.length()
dataFileSizeInBytes = writer.length();
metrics = writer.metrics();
try (CloseableIterable<Record> parquetReader = readBuilder.build()) {
try (FileAppender<Record> writer = writeBuilder.build()) {
writer.addAll(parquetReader);
writer.close(); // for write.length()
dataFileSizeInBytes = writer.length();
metrics = writer.metrics();
}
}
}

dataFile = dstDataFile;
dataFile = dstDataFile;
}
}
logger.info(
"{}: adding data file (copy took {}s)", file, (System.currentTimeMillis() - start) / 1000);
Expand Down Expand Up @@ -588,7 +630,9 @@ private static List<DataFile> copyPartitionedAndSorted(
MetricsConfig metricsConfig,
FileIO tableIO,
InputFile inputFile,
DataFileNamingStrategy dstDataFileSource)
DataFileNamingStrategy dstDataFileSource,
Map<String, String> tableProperties,
@Nullable String compressionCodecOverride)
throws IOException {
logger.info("{}: partitioning{}", file, sortOrder.isSorted() ? "+sorting" : "");

Expand Down Expand Up @@ -622,10 +666,15 @@ private static List<DataFile> copyPartitionedAndSorted(

Parquet.WriteBuilder writeBuilder =
Parquet.write(outputFile)
.setAll(tableProperties)
.overwrite(true) // FIXME
.createWriterFunc(GenericParquetWriter::buildWriter)
.metricsConfig(metricsConfig)
.schema(tableSchema);
if (compressionCodecOverride != null) {
writeBuilder =
writeBuilder.set(TableProperties.PARQUET_COMPRESSION, compressionCodecOverride);
}

try (FileAppender<Record> writer = writeBuilder.build()) {
for (Record record : records) {
Expand Down Expand Up @@ -668,7 +717,9 @@ private static DataFile copySorted(
FileIO tableIO,
InputFile inputFile,
DataFileNamingStrategy.Name dataFileNamingStrategy,
PartitionKey partitionKey)
PartitionKey partitionKey,
Map<String, String> tableProperties,
@Nullable String compressionCodecOverride)
throws IOException {
logger.info("{}: copying (sorted) to {}", file, dstDataFile);

Expand Down Expand Up @@ -698,11 +749,16 @@ private static DataFile copySorted(
// Write sorted records to outputFile
Parquet.WriteBuilder writeBuilder =
Parquet.write(outputFile)
.setAll(tableProperties)
.overwrite(
dataFileNamingStrategy == DataFileNamingStrategy.Name.PRESERVE_ORIGINAL) // FIXME
.createWriterFunc(GenericParquetWriter::buildWriter)
.metricsConfig(metricsConfig)
.schema(tableSchema);
if (compressionCodecOverride != null) {
writeBuilder =
writeBuilder.set(TableProperties.PARQUET_COMPRESSION, compressionCodecOverride);
}

long fileSizeInBytes;
Metrics metrics;
Expand Down Expand Up @@ -793,7 +849,8 @@ public record Options(
@Nullable String retryListFile,
@Nullable List<Main.IcePartition> partitionList,
@Nullable List<Main.IceSortOrder> sortOrderList,
int threadCount) {
int threadCount,
@Nullable String compression) {

public static Builder builder() {
return new Builder();
Expand All @@ -816,6 +873,7 @@ public static final class Builder {
private List<Main.IcePartition> partitionList = List.of();
private List<Main.IceSortOrder> sortOrderList = List.of();
private int threadCount = Runtime.getRuntime().availableProcessors();
private String compression;

private Builder() {}

Expand Down Expand Up @@ -899,6 +957,11 @@ public Builder threadCount(int threadCount) {
return this;
}

public Builder compression(String compression) {
this.compression = compression;
return this;
}

public Options build() {
return new Options(
dataFileNamingStrategy,
Expand All @@ -916,7 +979,8 @@ public Options build() {
retryListFile,
partitionList,
sortOrderList,
threadCount);
threadCount,
compression);
}
}
}
Expand Down