Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,26 @@ class ColumnarCachedBatchE2ESuite
.cache()
}

// Caller must have triggered execution so numOutputRows is populated.
// expectPrune=false is path-only: numOutputRows is unreliable for "no prune"
// on the Gluten native path (surviving rows bypass the IMS counter, see
// baseline "numOutputRows reflects post-filter row count" test).
private def assertGlutenCachedPlanAndPrune(df: DataFrame, expectPrune: Boolean): Unit = {
val plan = df.queryExecution.executedPlan
val ims = find(plan) {
case _: InMemoryTableScanExec => true
case _ => false
}
.get.asInstanceOf[InMemoryTableScanExec]
val serName = ims.relation.cacheBuilder.serializer.getClass.getSimpleName
assert(serName == "ColumnarCachedBatchSerializer", s"got $serName")
if (expectPrune) {
val outRows = ims.metrics("numOutputRows").value
val upperBound = (N / P) * 2
assert(outRows <= upperBound, s"numOutputRows=$outRows > $upperBound (N=$N, P=$P)")
}
}

test("e2e cache + equality filter: no crash + correct result") {
val cached = cacheRange()
try {
Expand Down Expand Up @@ -385,4 +405,87 @@ class ColumnarCachedBatchE2ESuite
}
}
}

// Cross-config: build with stats enabled, read with stats disabled.
// Wire format is build-time-decided, so reader-time SQLConf must not affect prune.
test("cross-config: build with stats enabled, read with stats disabled") {
var cached: DataFrame = null
var filtered: DataFrame = null
var result: Long = -1L
withSQLConf(
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "true") {
val df = cacheRange()
df.count()
cached = df
}
try {
withSQLConf(
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "false") {
filtered = cached.filter(col("k") === pivot)
result = filtered.count()
}
assert(result == 1L, s"got $result")
assertGlutenCachedPlanAndPrune(filtered, expectPrune = true)
} finally {
cached.unpersist()
}
}

// Reverse: legacy v1 payload at build (stats=null), reader cannot fabricate
// stats. Distinct from the same-config legacy test: this forces cross-config.
test("cross-config: build with stats disabled, read with stats enabled") {
var cached: DataFrame = null
var filtered: DataFrame = null
var result: Long = -1L
withSQLConf(
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "false") {
val df = cacheRange()
df.count()
cached = df
}
try {
withSQLConf(
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "true") {
filtered = cached.filter(col("k") === pivot)
result = filtered.count()
}
assert(result == 1L, s"got $result")
assertGlutenCachedPlanAndPrune(filtered, expectPrune = false)
} finally {
cached.unpersist()
}
}

// Round 2 must re-honor the new SQLConf, not reuse stale gate decision /
// payload from round 1.
test("cross-build-cycle: unpersist + toggle stats config + rebuild same query") {
var resultA: Long = -1L
var resultB: Long = -1L
withSQLConf(
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "true") {
val df = cacheRange()
try {
df.count()
val filtered = df.filter(col("k") === pivot)
resultA = filtered.count()
assert(resultA == 1L, s"round 1: got $resultA")
assertGlutenCachedPlanAndPrune(filtered, expectPrune = true)
} finally {
df.unpersist(blocking = true)
}
}
withSQLConf(
GlutenConfig.COLUMNAR_TABLE_CACHE_PARTITION_STATS_ENABLED.key -> "false") {
val df = cacheRange()
try {
df.count()
val filtered = df.filter(col("k") === pivot)
resultB = filtered.count()
assert(resultB == 1L, s"round 2: got $resultB")
assertGlutenCachedPlanAndPrune(filtered, expectPrune = false)
} finally {
df.unpersist(blocking = true)
}
}
}
}
Loading