Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -2564,12 +2564,8 @@ object DecimalAggregates extends Rule[LogicalPlan] {
/** Maximum number of decimal digits representable precisely in a Double */
private val MAX_DOUBLE_DIGITS = 15

/** Tighter than the AVG fast path's `prec + 4 <= MAX_DOUBLE_DIGITS` (= 11):
* the strict-subset keeps SPARK-37024 Double-regime exposure unchanged. */
private val AVG_PEEL_MAX_INNER_PRECISION = 7

/** Matches a scale-preserving widening decimal Cast; refuses CheckOverflow
* to preserve overflow semantics on the unscaled value. */
/** Matches a scale-preserving widening decimal Cast.
* Refuses CheckOverflow so per-row overflow checks are not hoisted out. */
private object WidenedDecimalChild {
def unapply(e: Expression): Option[(Expression, Int, Int, Int)] = e match {
case Cast(inner @ DecimalExpression(p, s), DecimalType.Fixed(pPrime, sPrime), _, _)
Expand Down Expand Up @@ -2604,27 +2600,35 @@ object DecimalAggregates extends Rule[LogicalPlan] {
case _ => we
}
case ae @ AggregateExpression(af, _, _, _, _) => af match {
// Hoist a scale-preserving widening Cast out of Sum so the existing
// Long fast-path can fire on the inner. The MakeDecimal target type
// matches `Sum(Cast(x, dec(pPrime, s))).dataType` (see Sum.resultType)
// so the final-value overflow boundary is the same as the un-rewritten
// expression.
case s @ Sum(WidenedDecimalChild(inner, p, pPrime, s_scale), _)
if p + 10 <= MAX_LONG_DIGITS =>
Cast(
MakeDecimal(
ae.copy(aggregateFunction = s.copy(child = UnscaledValue(inner))),
p + 10, s_scale),
DecimalType.bounded(pPrime + 10, s_scale),
Option(conf.sessionLocalTimeZone))
val target = DecimalType.bounded(pPrime + 10, s_scale)
MakeDecimal(
ae.copy(aggregateFunction = s.copy(child = UnscaledValue(inner))),
target.precision, target.scale)

case s @ Sum(e @ DecimalExpression(prec, scale), _) if prec + 10 <= MAX_LONG_DIGITS =>
MakeDecimal(ae.copy(aggregateFunction = s.copy(child = UnscaledValue(e))),
prec + 10, scale)

// Ordered before the un-widened Average arm: when pPrime in [8, 11],
// the outer Cast's DecimalType would otherwise match that arm first.
// Hoist a scale-preserving widening Cast out of Average. Guarded on
// the OUTER precision `pPrime + 4 <= MAX_DOUBLE_DIGITS` so the
// rewrite only fires inside the existing Double-regime envelope;
// for wider outer casts the un-rewritten Decimal-exact path is
// preserved. Ordered before the un-widened arm so the outer Cast's
// dataType does not let that arm intercept first (when pPrime <= 11,
// it would also match -- but on the outer Cast, not the inner).
case a @ Average(WidenedDecimalChild(inner, p, pPrime, s_scale), _)
if p <= AVG_PEEL_MAX_INNER_PRECISION =>
if pPrime + 4 <= MAX_DOUBLE_DIGITS =>
val newAggExpr = ae.copy(aggregateFunction = a.copy(child = UnscaledValue(inner)))
Cast(
Divide(newAggExpr, Literal.create(math.pow(10.0, s_scale), DoubleType)),
DecimalType.bounded(pPrime + 4, s_scale + 4), Option(conf.sessionLocalTimeZone))
DecimalType(pPrime + 4, s_scale + 4), Option(conf.sessionLocalTimeZone))

case a @ Average(e @ DecimalExpression(prec, scale), _) if prec + 4 <= MAX_DOUBLE_DIGITS =>
val newAggExpr = ae.copy(aggregateFunction = a.copy(child = UnscaledValue(e)))
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -257,7 +257,7 @@ Arguments: [[cs_quantity#4, cs_list_price#5, cs_sales_price#6, cs_coupon_amt#7,
(46) HashAggregate [codegen id : 13]
Input [12]: [cs_quantity#4, cs_list_price#5, cs_sales_price#6, cs_coupon_amt#7, cs_net_profit#8, cd_dep_count#14, c_birth_year#22, i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32]
Keys [5]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32]
Functions [7]: [partial_avg(cast(cs_quantity#4 as decimal(12,2))), partial_avg(UnscaledValue(cs_list_price#5)), partial_avg(UnscaledValue(cs_coupon_amt#7)), partial_avg(UnscaledValue(cs_sales_price#6)), partial_avg(UnscaledValue(cs_net_profit#8)), partial_avg(cast(c_birth_year#22 as decimal(12,2))), partial_avg(cast(cd_dep_count#14 as decimal(12,2)))]
Functions [7]: [partial_avg(cast(cs_quantity#4 as decimal(12,2))), partial_avg(cast(cs_list_price#5 as decimal(12,2))), partial_avg(cast(cs_coupon_amt#7 as decimal(12,2))), partial_avg(cast(cs_sales_price#6 as decimal(12,2))), partial_avg(cast(cs_net_profit#8 as decimal(12,2))), partial_avg(cast(c_birth_year#22 as decimal(12,2))), partial_avg(cast(cd_dep_count#14 as decimal(12,2)))]
Aggregate Attributes [14]: [sum#33, count#34, sum#35, count#36, sum#37, count#38, sum#39, count#40, sum#41, count#42, sum#43, count#44, sum#45, count#46]
Results [19]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32, sum#47, count#48, sum#49, count#50, sum#51, count#52, sum#53, count#54, sum#55, count#56, sum#57, count#58, sum#59, count#60]

Expand All @@ -268,9 +268,9 @@ Arguments: hashpartitioning(i_item_id#28, ca_country#29, ca_state#30, ca_county#
(48) HashAggregate [codegen id : 14]
Input [19]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32, sum#47, count#48, sum#49, count#50, sum#51, count#52, sum#53, count#54, sum#55, count#56, sum#57, count#58, sum#59, count#60]
Keys [5]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32]
Functions [7]: [avg(cast(cs_quantity#4 as decimal(12,2))), avg(UnscaledValue(cs_list_price#5)), avg(UnscaledValue(cs_coupon_amt#7)), avg(UnscaledValue(cs_sales_price#6)), avg(UnscaledValue(cs_net_profit#8)), avg(cast(c_birth_year#22 as decimal(12,2))), avg(cast(cd_dep_count#14 as decimal(12,2)))]
Aggregate Attributes [7]: [avg(cast(cs_quantity#4 as decimal(12,2)))#61, avg(UnscaledValue(cs_list_price#5))#62, avg(UnscaledValue(cs_coupon_amt#7))#63, avg(UnscaledValue(cs_sales_price#6))#64, avg(UnscaledValue(cs_net_profit#8))#65, avg(cast(c_birth_year#22 as decimal(12,2)))#66, avg(cast(cd_dep_count#14 as decimal(12,2)))#67]
Results [11]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, avg(cast(cs_quantity#4 as decimal(12,2)))#61 AS agg1#68, cast((avg(UnscaledValue(cs_list_price#5))#62 / 100.0) as decimal(16,6)) AS agg2#69, cast((avg(UnscaledValue(cs_coupon_amt#7))#63 / 100.0) as decimal(16,6)) AS agg3#70, cast((avg(UnscaledValue(cs_sales_price#6))#64 / 100.0) as decimal(16,6)) AS agg4#71, cast((avg(UnscaledValue(cs_net_profit#8))#65 / 100.0) as decimal(16,6)) AS agg5#72, avg(cast(c_birth_year#22 as decimal(12,2)))#66 AS agg6#73, avg(cast(cd_dep_count#14 as decimal(12,2)))#67 AS agg7#74]
Functions [7]: [avg(cast(cs_quantity#4 as decimal(12,2))), avg(cast(cs_list_price#5 as decimal(12,2))), avg(cast(cs_coupon_amt#7 as decimal(12,2))), avg(cast(cs_sales_price#6 as decimal(12,2))), avg(cast(cs_net_profit#8 as decimal(12,2))), avg(cast(c_birth_year#22 as decimal(12,2))), avg(cast(cd_dep_count#14 as decimal(12,2)))]
Aggregate Attributes [7]: [avg(cast(cs_quantity#4 as decimal(12,2)))#61, avg(cast(cs_list_price#5 as decimal(12,2)))#62, avg(cast(cs_coupon_amt#7 as decimal(12,2)))#63, avg(cast(cs_sales_price#6 as decimal(12,2)))#64, avg(cast(cs_net_profit#8 as decimal(12,2)))#65, avg(cast(c_birth_year#22 as decimal(12,2)))#66, avg(cast(cd_dep_count#14 as decimal(12,2)))#67]
Results [11]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, avg(cast(cs_quantity#4 as decimal(12,2)))#61 AS agg1#68, avg(cast(cs_list_price#5 as decimal(12,2)))#62 AS agg2#69, avg(cast(cs_coupon_amt#7 as decimal(12,2)))#63 AS agg3#70, avg(cast(cs_sales_price#6 as decimal(12,2)))#64 AS agg4#71, avg(cast(cs_net_profit#8 as decimal(12,2)))#65 AS agg5#72, avg(cast(c_birth_year#22 as decimal(12,2)))#66 AS agg6#73, avg(cast(cd_dep_count#14 as decimal(12,2)))#67 AS agg7#74]

(49) TakeOrderedAndProject
Input [11]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, agg1#68, agg2#69, agg3#70, agg4#71, agg5#72, agg6#73, agg7#74]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject [ca_country,ca_state,ca_county,i_item_id,agg1,agg2,agg3,agg4,agg5,agg6,agg7]
WholeStageCodegen (14)
HashAggregate [i_item_id,ca_country,ca_state,ca_county,spark_grouping_id,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count] [avg(cast(cs_quantity as decimal(12,2))),avg(UnscaledValue(cs_list_price)),avg(UnscaledValue(cs_coupon_amt)),avg(UnscaledValue(cs_sales_price)),avg(UnscaledValue(cs_net_profit)),avg(cast(c_birth_year as decimal(12,2))),avg(cast(cd_dep_count as decimal(12,2))),agg1,agg2,agg3,agg4,agg5,agg6,agg7,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count]
HashAggregate [i_item_id,ca_country,ca_state,ca_county,spark_grouping_id,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count] [avg(cast(cs_quantity as decimal(12,2))),avg(cast(cs_list_price as decimal(12,2))),avg(cast(cs_coupon_amt as decimal(12,2))),avg(cast(cs_sales_price as decimal(12,2))),avg(cast(cs_net_profit as decimal(12,2))),avg(cast(c_birth_year as decimal(12,2))),avg(cast(cd_dep_count as decimal(12,2))),agg1,agg2,agg3,agg4,agg5,agg6,agg7,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count]
InputAdapter
Exchange [i_item_id,ca_country,ca_state,ca_county,spark_grouping_id] #1
WholeStageCodegen (13)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -227,7 +227,7 @@ Arguments: [[cs_quantity#4, cs_list_price#5, cs_sales_price#6, cs_coupon_amt#7,
(40) HashAggregate [codegen id : 7]
Input [12]: [cs_quantity#4, cs_list_price#5, cs_sales_price#6, cs_coupon_amt#7, cs_net_profit#8, cd_dep_count#14, c_birth_year#19, i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32]
Keys [5]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32]
Functions [7]: [partial_avg(cast(cs_quantity#4 as decimal(12,2))), partial_avg(UnscaledValue(cs_list_price#5)), partial_avg(UnscaledValue(cs_coupon_amt#7)), partial_avg(UnscaledValue(cs_sales_price#6)), partial_avg(UnscaledValue(cs_net_profit#8)), partial_avg(cast(c_birth_year#19 as decimal(12,2))), partial_avg(cast(cd_dep_count#14 as decimal(12,2)))]
Functions [7]: [partial_avg(cast(cs_quantity#4 as decimal(12,2))), partial_avg(cast(cs_list_price#5 as decimal(12,2))), partial_avg(cast(cs_coupon_amt#7 as decimal(12,2))), partial_avg(cast(cs_sales_price#6 as decimal(12,2))), partial_avg(cast(cs_net_profit#8 as decimal(12,2))), partial_avg(cast(c_birth_year#19 as decimal(12,2))), partial_avg(cast(cd_dep_count#14 as decimal(12,2)))]
Aggregate Attributes [14]: [sum#33, count#34, sum#35, count#36, sum#37, count#38, sum#39, count#40, sum#41, count#42, sum#43, count#44, sum#45, count#46]
Results [19]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32, sum#47, count#48, sum#49, count#50, sum#51, count#52, sum#53, count#54, sum#55, count#56, sum#57, count#58, sum#59, count#60]

Expand All @@ -238,9 +238,9 @@ Arguments: hashpartitioning(i_item_id#28, ca_country#29, ca_state#30, ca_county#
(42) HashAggregate [codegen id : 8]
Input [19]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32, sum#47, count#48, sum#49, count#50, sum#51, count#52, sum#53, count#54, sum#55, count#56, sum#57, count#58, sum#59, count#60]
Keys [5]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, spark_grouping_id#32]
Functions [7]: [avg(cast(cs_quantity#4 as decimal(12,2))), avg(UnscaledValue(cs_list_price#5)), avg(UnscaledValue(cs_coupon_amt#7)), avg(UnscaledValue(cs_sales_price#6)), avg(UnscaledValue(cs_net_profit#8)), avg(cast(c_birth_year#19 as decimal(12,2))), avg(cast(cd_dep_count#14 as decimal(12,2)))]
Aggregate Attributes [7]: [avg(cast(cs_quantity#4 as decimal(12,2)))#61, avg(UnscaledValue(cs_list_price#5))#62, avg(UnscaledValue(cs_coupon_amt#7))#63, avg(UnscaledValue(cs_sales_price#6))#64, avg(UnscaledValue(cs_net_profit#8))#65, avg(cast(c_birth_year#19 as decimal(12,2)))#66, avg(cast(cd_dep_count#14 as decimal(12,2)))#67]
Results [11]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, avg(cast(cs_quantity#4 as decimal(12,2)))#61 AS agg1#68, cast((avg(UnscaledValue(cs_list_price#5))#62 / 100.0) as decimal(16,6)) AS agg2#69, cast((avg(UnscaledValue(cs_coupon_amt#7))#63 / 100.0) as decimal(16,6)) AS agg3#70, cast((avg(UnscaledValue(cs_sales_price#6))#64 / 100.0) as decimal(16,6)) AS agg4#71, cast((avg(UnscaledValue(cs_net_profit#8))#65 / 100.0) as decimal(16,6)) AS agg5#72, avg(cast(c_birth_year#19 as decimal(12,2)))#66 AS agg6#73, avg(cast(cd_dep_count#14 as decimal(12,2)))#67 AS agg7#74]
Functions [7]: [avg(cast(cs_quantity#4 as decimal(12,2))), avg(cast(cs_list_price#5 as decimal(12,2))), avg(cast(cs_coupon_amt#7 as decimal(12,2))), avg(cast(cs_sales_price#6 as decimal(12,2))), avg(cast(cs_net_profit#8 as decimal(12,2))), avg(cast(c_birth_year#19 as decimal(12,2))), avg(cast(cd_dep_count#14 as decimal(12,2)))]
Aggregate Attributes [7]: [avg(cast(cs_quantity#4 as decimal(12,2)))#61, avg(cast(cs_list_price#5 as decimal(12,2)))#62, avg(cast(cs_coupon_amt#7 as decimal(12,2)))#63, avg(cast(cs_sales_price#6 as decimal(12,2)))#64, avg(cast(cs_net_profit#8 as decimal(12,2)))#65, avg(cast(c_birth_year#19 as decimal(12,2)))#66, avg(cast(cd_dep_count#14 as decimal(12,2)))#67]
Results [11]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, avg(cast(cs_quantity#4 as decimal(12,2)))#61 AS agg1#68, avg(cast(cs_list_price#5 as decimal(12,2)))#62 AS agg2#69, avg(cast(cs_coupon_amt#7 as decimal(12,2)))#63 AS agg3#70, avg(cast(cs_sales_price#6 as decimal(12,2)))#64 AS agg4#71, avg(cast(cs_net_profit#8 as decimal(12,2)))#65 AS agg5#72, avg(cast(c_birth_year#19 as decimal(12,2)))#66 AS agg6#73, avg(cast(cd_dep_count#14 as decimal(12,2)))#67 AS agg7#74]

(43) TakeOrderedAndProject
Input [11]: [i_item_id#28, ca_country#29, ca_state#30, ca_county#31, agg1#68, agg2#69, agg3#70, agg4#71, agg5#72, agg6#73, agg7#74]
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
TakeOrderedAndProject [ca_country,ca_state,ca_county,i_item_id,agg1,agg2,agg3,agg4,agg5,agg6,agg7]
WholeStageCodegen (8)
HashAggregate [i_item_id,ca_country,ca_state,ca_county,spark_grouping_id,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count] [avg(cast(cs_quantity as decimal(12,2))),avg(UnscaledValue(cs_list_price)),avg(UnscaledValue(cs_coupon_amt)),avg(UnscaledValue(cs_sales_price)),avg(UnscaledValue(cs_net_profit)),avg(cast(c_birth_year as decimal(12,2))),avg(cast(cd_dep_count as decimal(12,2))),agg1,agg2,agg3,agg4,agg5,agg6,agg7,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count]
HashAggregate [i_item_id,ca_country,ca_state,ca_county,spark_grouping_id,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count] [avg(cast(cs_quantity as decimal(12,2))),avg(cast(cs_list_price as decimal(12,2))),avg(cast(cs_coupon_amt as decimal(12,2))),avg(cast(cs_sales_price as decimal(12,2))),avg(cast(cs_net_profit as decimal(12,2))),avg(cast(c_birth_year as decimal(12,2))),avg(cast(cd_dep_count as decimal(12,2))),agg1,agg2,agg3,agg4,agg5,agg6,agg7,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count,sum,count]
InputAdapter
Exchange [i_item_id,ca_country,ca_state,ca_county,spark_grouping_id] #1
WholeStageCodegen (7)
Expand Down
Loading