Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,15 @@
import java.io.IOException;
import java.io.OutputStream;
import java.nio.ByteBuffer;
import java.util.zip.DataFormatException;
import java.util.zip.Deflater;
import java.util.zip.DeflaterOutputStream;
import java.util.zip.Inflater;
import java.util.zip.InflaterOutputStream;

import org.apache.avro.AvroRuntimeException;
import org.apache.avro.util.NonCopyingByteArrayOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

/**
* Implements DEFLATE (RFC1951) compression and decompression.
Expand All @@ -37,7 +40,33 @@
*/
public class DeflateCodec extends Codec {

private static final Logger LOG = LoggerFactory.getLogger(DeflateCodec.class);

private static final int DEFAULT_BUFFER_SIZE = 8192;
private static final String MAX_DECOMPRESS_LENGTH_PROPERTY = "org.apache.avro.limits.decompress.maxLength";
private static final long DEFAULT_MAX_DECOMPRESS_LENGTH = 200L * 1024 * 1024; // 200MB default limit

private static final long MAX_DECOMPRESS_LENGTH;

static {
String prop = System.getProperty(MAX_DECOMPRESS_LENGTH_PROPERTY);
long limit = DEFAULT_MAX_DECOMPRESS_LENGTH;
if (prop != null) {
try {
long parsed = Long.parseLong(prop);
if (parsed <= 0) {
LOG.warn("Invalid value '{}' for property '{}': must be positive. Using default: {}", prop,
MAX_DECOMPRESS_LENGTH_PROPERTY, DEFAULT_MAX_DECOMPRESS_LENGTH);
} else {
limit = parsed;
}
} catch (NumberFormatException e) {
LOG.warn("Could not parse property '{}' value '{}'. Using default: {}", MAX_DECOMPRESS_LENGTH_PROPERTY, prop,
DEFAULT_MAX_DECOMPRESS_LENGTH);
}
}
MAX_DECOMPRESS_LENGTH = limit;
}

static class Option extends CodecFactory {
private final int compressionLevel;
Expand Down Expand Up @@ -79,9 +108,30 @@ public ByteBuffer compress(ByteBuffer data) throws IOException {
@Override
public ByteBuffer decompress(ByteBuffer data) throws IOException {
NonCopyingByteArrayOutputStream baos = new NonCopyingByteArrayOutputStream(DEFAULT_BUFFER_SIZE);
try (OutputStream outputStream = new InflaterOutputStream(baos, getInflater())) {
outputStream.write(data.array(), computeOffset(data), data.remaining());
byte[] buffer = new byte[DEFAULT_BUFFER_SIZE];
long totalBytes = 0;

Inflater inflater = getInflater();
inflater.setInput(data.array(), computeOffset(data), data.remaining());

try {
while (!inflater.finished()) {
int len = inflater.inflate(buffer);
if (len == 0 && inflater.needsInput()) {
break;
}
totalBytes += len;
if (totalBytes > MAX_DECOMPRESS_LENGTH) {
throw new AvroRuntimeException(
"Decompressed size " + totalBytes + " (bytes) exceeds maximum allowed size " + MAX_DECOMPRESS_LENGTH
+ ". This can be configured by setting the system property '" + MAX_DECOMPRESS_LENGTH_PROPERTY + "'");
}
baos.write(buffer, 0, len);
}
} catch (DataFormatException e) {
throw new IOException("Invalid deflate data", e);
}

return baos.asByteBuffer();
}

Expand Down