Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -435,15 +435,7 @@ static DataType mergeDecimalType(DataType lType, DataType rType) {
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
Preconditions.checkArgument(
resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
String.format(
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
lType,
rType,
resultIntDigits + resultScale,
DecimalType.MAX_PRECISION));
return DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
return createDecimalBounded(resultIntDigits + resultScale, resultScale);
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
return mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
Expand Down Expand Up @@ -935,4 +927,13 @@ private static Map<Class<? extends DataType>, List<DataType>> getTypeMergingTree
mergingTree.put(VariantType.class, ImmutableList.of(stringType));
return mergingTree;
}

static DecimalType createDecimalBounded(int precision, int scale) {
if (precision > DecimalType.MAX_PRECISION) {
int lossDigits = precision - DecimalType.MAX_PRECISION;
return DataTypes.DECIMAL(precision - lossDigits, scale - lossDigits);
} else {
return DataTypes.DECIMAL(precision, scale);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,8 @@
import java.util.stream.Collectors;
import java.util.stream.IntStream;

import static org.apache.flink.cdc.common.utils.SchemaMergingUtils.createDecimalBounded;

/** Utils for {@link Schema} to perform the ability of evolution. */
@PublicEvolving
public class SchemaUtils {
Expand Down Expand Up @@ -575,15 +577,7 @@ public static DataType inferWiderType(DataType lType, DataType rType) {
lhsDecimal.getPrecision() - lhsDecimal.getScale(),
rhsDecimal.getPrecision() - rhsDecimal.getScale());
int resultScale = Math.max(lhsDecimal.getScale(), rhsDecimal.getScale());
Preconditions.checkArgument(
resultIntDigits + resultScale <= DecimalType.MAX_PRECISION,
String.format(
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
lType,
rType,
resultIntDigits + resultScale,
DecimalType.MAX_PRECISION));
mergedType = DataTypes.DECIMAL(resultIntDigits + resultScale, resultScale);
mergedType = createDecimalBounded(resultIntDigits + resultScale, resultScale);
} else if (lType instanceof DecimalType && rType.is(DataTypeFamily.EXACT_NUMERIC)) {
// Merge decimal and int
mergedType = mergeExactNumericsIntoDecimal((DecimalType) lType, rType);
Expand All @@ -608,12 +602,7 @@ private static DataType mergeExactNumericsIntoDecimal(
Math.max(
decimalType.getPrecision(),
decimalType.getScale() + getNumericPrecision(otherType));
Preconditions.checkArgument(
resultPrecision <= DecimalType.MAX_PRECISION,
String.format(
"Failed to merge %s and %s type into DECIMAL. %d precision digits required, %d available",
decimalType, otherType, resultPrecision, DecimalType.MAX_PRECISION));
return DataTypes.DECIMAL(resultPrecision, decimalType.getScale());
return createDecimalBounded(resultPrecision, decimalType.getScale());
}

@Deprecated
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -314,21 +314,15 @@ void testInferWiderType() {
.isEqualTo(DataTypes.DECIMAL(12, 4));

// Test overflow decimal conversions
Assertions.assertThatThrownBy(
() ->
SchemaUtils.inferWiderType(
DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 0)))
.isExactlyInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Failed to merge DECIMAL(5, 5) NOT NULL and DECIMAL(38, 0) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");
Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.DECIMAL(5, 5), DataTypes.DECIMAL(38, 0)))
.isEqualTo(DataTypes.DECIMAL(38, 0));

Assertions.assertThatThrownBy(
() ->
SchemaUtils.inferWiderType(
DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 5)))
.isExactlyInstanceOf(IllegalArgumentException.class)
.hasMessage(
"Failed to merge DECIMAL(38, 0) NOT NULL and DECIMAL(5, 5) NOT NULL type into DECIMAL. 43 precision digits required, 38 available");
Assertions.assertThat(
SchemaUtils.inferWiderType(
DataTypes.DECIMAL(38, 0), DataTypes.DECIMAL(5, 5)))
.isEqualTo(DataTypes.DECIMAL(38, 0));

// Test merging with nullability
Assertions.assertThat(
Expand Down
Loading
Loading