Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ import org.apache.spark.sql.execution.datasources.parquet.{ParquetFileFormat, Pa
import org.apache.spark.sql.hive.execution.HiveFileFormat
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types._
import org.apache.spark.util.SparkVersionUtil

import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.Path
Expand Down Expand Up @@ -113,6 +114,30 @@ object VeloxBackendSettings extends BackendSettingsApi {
hadoopConf: Configuration,
partitionFileFormats: Set[ReadFileFormat]): ValidationResult = {

def containsStructType(dataType: DataType): Boolean = {
dataType match {
case _: StructType => true
case ArrayType(elementType, _) => containsStructType(elementType)
case MapType(keyType, valueType, _) =>
containsStructType(keyType) || containsStructType(valueType)
case _ => false
}
}

def shouldFallbackBySpark41ParquetStructBehavior: Boolean = {
if (!SparkVersionUtil.gteSpark41) {
return false
}
if (!fields.exists(field => containsStructType(field.dataType))) {
return false
}
val returnNullStructIfAllFieldsMissingKey =
"spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing"
!SQLConf.get
.getConfString(returnNullStructIfAllFieldsMissingKey, "false")
.toBoolean
}

def validateScheme(): Option[String] = {
val filteredRootPaths = distinctRootPaths(rootPaths)
if (
Expand Down Expand Up @@ -156,6 +181,11 @@ object VeloxBackendSettings extends BackendSettingsApi {
if (parquetOptions.mergeSchema) {
// https://github.com/apache/gluten/issues/7174
Some(s"not support when merge schema is true")
} else if (shouldFallbackBySpark41ParquetStructBehavior) {
Some(
"Spark 4.1 Parquet struct compatibility (all requested struct fields missing) " +
"is not supported by Velox native scan yet when " +
"spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing=false")
} else {
None
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import org.apache.spark.sql.execution.adaptive.{AdaptiveSparkPlanHelper, AQEShuf
import org.apache.spark.sql.execution.exchange.ShuffleExchangeExec
import org.apache.spark.sql.execution.joins.{BroadcastHashJoinExec, BroadcastNestedLoopJoinExec, SortMergeJoinExec}
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{IntegerType, StructType}
import org.apache.spark.util.SparkVersionUtil
import org.apache.spark.utils.GlutenSuiteUtils

class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPlanHelper {
Expand Down Expand Up @@ -352,6 +354,26 @@ class FallbackSuite extends VeloxWholeStageTransformerSuite with AdaptiveSparkPl
}
}

test("fallback Spark 4.1 parquet missing all struct fields compatibility") {
if (!SparkVersionUtil.gteSpark41) {
cancel("Only applicable on Spark 4.1+")
}
withTempPath {
path =>
val schema = new StructType().add("s", new StructType().add("b", IntegerType))
val file = path.getCanonicalPath
spark.range(10).selectExpr("named_struct('a', cast(id as int)) as s").write.parquet(file)
withSQLConf("spark.sql.legacy.parquet.returnNullStructIfAllFieldsMissing" -> "false") {
spark.read.schema(schema).parquet(file).createOrReplaceTempView("struct_tbl")
runQueryAndCompare("select s is null as is_null from struct_tbl") {
df =>
val plan = df.queryExecution.executedPlan
assert(collect(plan) { case g: GlutenPlan => g }.isEmpty)
}
}
}
}

test("get correct fallback reason on nodes without logicalLink") {
withSQLConf(GlutenConfig.COLUMNAR_SORT_ENABLED.key -> "false") {
GlutenSuiteUtils.withFallbackEventListener(spark.sparkContext) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -548,9 +548,6 @@ class VeloxTestSettings extends BackendTestSettings {
.exclude("SPARK-40128 read DELTA_LENGTH_BYTE_ARRAY encoded strings")
// TODO: fix in Spark-4.0
.exclude("explode nested lists crossing a rowgroup boundary")
// TODO: fix on Spark-4.1
.excludeByPrefix("SPARK-53535") // see https://issues.apache.org/jira/browse/SPARK-53535
.excludeByPrefix("vectorized reader: missing all struct fields")
.excludeByPrefix("SPARK-54220") // https://issues.apache.org/jira/browse/SPARK-54220
enableSuite[GlutenParquetV1PartitionDiscoverySuite]
enableSuite[GlutenParquetV2PartitionDiscoverySuite]
Expand Down
Loading