Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.sql.execution.ui

import scala.annotation.nowarn

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerEvent}
import org.apache.spark.status.ElementTrackingStore

import org.apache.auron.spark.ui.AuronBuildInfoEvent

@nowarn("cat=unused") // conf temporarily unused
class AuronSQLAppStatusListener(conf: SparkConf, kvstore: ElementTrackingStore)
extends SparkListener
with Logging {
Expand Down
7 changes: 7 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -377,7 +377,10 @@
<target>${javaVersion}</target>
<scalaVersion>${scalaLongVersion}</scalaVersion>
<args>
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-Ywarn-unused</arg>
<arg>-Xfatal-warnings</arg>
</args>
</configuration>
<dependencies>
Expand Down Expand Up @@ -862,11 +865,15 @@
<artifactId>scala-maven-plugin</artifactId>
<configuration>
<args>
<arg>-deprecation</arg>
<arg>-feature</arg>
<arg>-Ywarn-unused</arg>
<!-- https://github.com/scalamacros/paradise is no longer actively developed. -->
<!-- In Scala 2.13, the plugin's functionality has been included in the compiler -->
<!-- directly under the -Ymacro-annotations flag. -->
<arg>-Ymacro-annotations</arg>
<arg>-Xfatal-warnings</arg>
<arg>-Wconf:cat=deprecation:w</arg>
</args>
<compilerPlugins>
<compilerPlugin>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.auron

import scala.annotation.nowarn

import org.apache.spark.internal.Logging
import org.apache.spark.sql.execution.SparkPlan

Expand Down Expand Up @@ -71,6 +73,7 @@ object InterceptedValidateSparkPlan extends Logging {
}
}

@nowarn("cat=unused") // plan unused
@sparkver("3.0 / 3.1")
def validate(plan: SparkPlan): Unit = {
throw new UnsupportedOperationException("validate is not supported in spark 3.0.3 or 3.1.3")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ package org.apache.spark.sql.auron
import java.io.File
import java.util.UUID

import scala.annotation.nowarn
import scala.collection.mutable

import org.apache.commons.lang3.reflect.FieldUtils
Expand Down Expand Up @@ -965,6 +966,7 @@ class ShimsImpl extends Shims with Logging {
}
}

@nowarn("cat=unused") // Some params temporarily unused
@sparkver("3.4 / 3.5")
private def convertPromotePrecision(
e: Expression,
Expand Down Expand Up @@ -997,6 +999,7 @@ class ShimsImpl extends Shims with Logging {
}
}

@nowarn("cat=unused") // Some params temporarily unused
@sparkver("3.0 / 3.1 / 3.2")
private def convertBloomFilterAgg(agg: AggregateFunction): Option[pb.PhysicalAggExprNode] = None

Expand All @@ -1023,6 +1026,7 @@ class ShimsImpl extends Shims with Logging {
}
}

@nowarn("cat=unused") // Some params temporarily unused
@sparkver("3.0 / 3.1 / 3.2")
private def convertBloomFilterMightContain(
e: Expression,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,9 @@ class AuronBlockStoreShuffleReader[K, C](
extends AuronBlockStoreShuffleReaderBase[K, C](handle, context)
with Logging {

// Touch mapOutputTracker to suppress -Xfatal-warnings (used in Spark 3.2+, unused in 3.0/3.1)
private val _ = mapOutputTracker

override def readBlocks(): Iterator[InputStream] = {
@sparkver("3.2 / 3.3 / 3.4 / 3.5")
def fetchIterator = new ShuffleBlockFetcherIterator(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,16 @@
*/
package org.apache.spark.sql.execution.auron.shuffle

import scala.annotation.nowarn

import org.apache.spark.{ShuffleDependency, SparkConf, TaskContext}
import org.apache.spark.internal.Logging
import org.apache.spark.shuffle._
import org.apache.spark.sql.execution.auron.shuffle.AuronShuffleDependency.isArrowShuffle

import org.apache.auron.sparkver

@nowarn("cat=unused") // _conf temporarily unused
abstract class AuronRssShuffleManagerBase(_conf: SparkConf) extends ShuffleManager with Logging {
override def registerShuffle[K, V, C](
shuffleId: Int,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.joins.auron.plan

import scala.annotation.nowarn

import org.apache.spark.sql.auron.join.JoinBuildSides.JoinBuildSide
import org.apache.spark.sql.catalyst.expressions.Expression
import org.apache.spark.sql.catalyst.plans.JoinType
Expand Down Expand Up @@ -80,6 +82,7 @@ case object NativeShuffledHashJoinExecProvider {
NativeShuffledHashJoinExec(left, right, leftKeys, rightKeys, joinType, buildSide, isSkewJoin)
}

@nowarn("cat=unused") // Some params temporarily unused
@sparkver("3.1")
def provide(
left: SparkPlan,
Expand Down Expand Up @@ -127,6 +130,7 @@ case object NativeShuffledHashJoinExecProvider {
NativeShuffledHashJoinExec(left, right, leftKeys, rightKeys, joinType, buildSide)
}

@nowarn("cat=unused") // Some params temporarily unused
@sparkver("3.0")
def provide(
left: SparkPlan,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -278,8 +278,6 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
val dateTimeStampMin = format.parse(dateStringMin).getTime
val dateTimeStampMax = format.parse(dateStringMax).getTime
format = new SimpleDateFormat("yyyy-MM-dd")
val dateString = "2015-01-01"
val date = format.parse(dateString)

val functions =
s"""
Expand Down Expand Up @@ -320,8 +318,6 @@ class AuronFunctionSuite extends AuronQueryTest with BaseAuronSQLSuite {
val dateTimeStampMin = format.parse(dateStringMin).getTime
val dateTimeStampMax = format.parse(dateStringMax).getTime
format = new SimpleDateFormat("yyyy-MM-dd")
val dateString = "2015-07-01"
val date = format.parse(dateString)

val functions =
s"""
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,7 @@ class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with AuronSQ
withTable("t") {
sql(s"CREATE EXTERNAL TABLE t(c3 INT, c2 INT) USING ORC LOCATION '$path'")

val expected = if (forcePositionalEvolution) {
val _ = if (forcePositionalEvolution) {
correctAnswer
} else {
Seq(Row(null, 2), Row(null, 4), Row(null, 6), Row(null, null))
Expand Down Expand Up @@ -247,7 +247,7 @@ class AuronQuerySuite extends AuronQueryTest with BaseAuronSQLSuite with AuronSQ
|LOCATION '$path'
|""".stripMargin)
sql("MSCK REPAIR TABLE t")
if (forcePositionalEvolution) {
val _ = if (forcePositionalEvolution) {
correctAnswer
} else {
Seq(Row(null, 2, 1), Row(null, 4, 2), Row(null, 6, 3), Row(null, null, 4))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import java.nio.file.Files
import java.nio.file.StandardCopyOption
import java.util.concurrent.atomic.AtomicReference

import scala.annotation.nowarn
import scala.collection.mutable.ArrayBuffer

import org.apache.arrow.c.ArrowArray
Expand Down Expand Up @@ -53,6 +54,7 @@ import org.apache.auron.protobuf.TaskDefinition
* This class has been deprecated and migrated to {@link
* org.apache.auron.jni.AuronCallNativeWrapper}. Will be removed in the future.
*/
@nowarn("cat=deprecation") // JniBridge is temporarily used (deprecated)
@Deprecated
case class AuronCallNativeWrapper(
nativePlan: PhysicalPlanNode,
Expand Down Expand Up @@ -193,6 +195,7 @@ case class AuronCallNativeWrapper(
}
}

@nowarn("cat=deprecation") // JniBridge is temporarily used (deprecated)
object AuronCallNativeWrapper extends Logging {
def initNative(): Unit = {
lazyInitNative
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ package org.apache.spark.sql.auron

import java.util.ServiceLoader

import scala.annotation.tailrec
import scala.annotation.{nowarn, tailrec}
import scala.collection.JavaConverters._
import scala.collection.mutable

Expand Down Expand Up @@ -418,12 +418,14 @@ object AuronConverters extends Logging {
@sparkver(" 3.2 / 3.3 / 3.4 / 3.5")
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = exec.isSkewJoin

@nowarn("cat=unused")
@sparkver("3.0 / 3.1")
def getIsSkewJoinFromSHJ(exec: ShuffledHashJoinExec): Boolean = false

@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = Some(exec.shuffleOrigin)

@nowarn("cat=unused")
@sparkver("3.0")
def getShuffleOrigin(exec: ShuffleExchangeExec): Option[Any] = None

Expand Down Expand Up @@ -649,6 +651,7 @@ object AuronConverters extends Logging {
@sparkver("3.1 / 3.2 / 3.3 / 3.4 / 3.5")
def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = exec.isNullAwareAntiJoin

@nowarn("cat=unused")
@sparkver("3.0")
def isNullAwareAntiJoin(exec: BroadcastHashJoinExec): Boolean = false

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1001,12 +1001,12 @@ object NativeConverters extends Logging {
val children = e.children.map(Cast(_, e.dataType))
buildScalarFunction(pb.ScalarFunction.Coalesce, children, e.dataType)

case e @ StringLPad(str, len, pad) =>
case _ @StringLPad(str, len, pad) =>
buildScalarFunction(
pb.ScalarFunction.Lpad,
Seq(str, castIfNecessary(len, LongType), pad),
StringType)
case e @ StringRPad(str, len, pad) =>
case _ @StringRPad(str, len, pad) =>
buildScalarFunction(
pb.ScalarFunction.Rpad,
Seq(str, castIfNecessary(len, LongType), pad),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ object NativeHelper extends Logging {
if (nativePlan == null) {
return Iterator.empty
}
var auronCallNativeWrapper = new org.apache.auron.jni.AuronCallNativeWrapper(
val auronCallNativeWrapper = new org.apache.auron.jni.AuronCallNativeWrapper(
ROOT_ALLOCATOR,
nativePlan,
metrics,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ package org.apache.spark.sql.auron

import java.io.File

import scala.annotation.nowarn

import org.apache.spark.ShuffleDependency
import org.apache.spark.SparkContext
import org.apache.spark.TaskContext
Expand Down Expand Up @@ -259,6 +261,7 @@ abstract class Shims {

def getMinPartitionNum(sparkSession: SparkSession): Int

@nowarn("cat=unused") // Some params temporarily unused
def postTransform(plan: SparkPlan, sc: SparkContext): Unit = {}

def getAdaptiveInputPlan(exec: AdaptiveSparkPlanExec): SparkPlan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
*/
package org.apache.spark.sql.execution.auron.columnar

import scala.annotation.nowarn

import org.apache.arrow.vector.BigIntVector
import org.apache.arrow.vector.BitVector
import org.apache.arrow.vector.DateDayVector
Expand Down Expand Up @@ -143,6 +145,7 @@ class AuronArrowColumnVector(vector: ValueVector)
}

object AuronArrowColumnVector {
@nowarn("cat=unused") // Data type get methods unimplemented (placeholder)
abstract private class ArrowVectorAccessor(private val vector: ValueVector) {
def isNullAt(rowId: Int): Boolean =
if (vector.getValueCount > 0 && vector.getValidityBuffer.capacity == 0) false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import java.security.PrivilegedExceptionAction
import java.util
import java.util.UUID

import scala.annotation.nowarn
import scala.collection.JavaConverters._

import org.apache.hadoop.conf.Configuration
Expand Down Expand Up @@ -80,7 +81,7 @@ abstract class NativeParquetSinkBase(
hiveQlTable.getMetadata)
val tableSchema = table.schema
val hadoopConf = newHadoopConf(tableDesc)
val job = new Job(hadoopConf)
val job = Job.getInstance(hadoopConf)
val parquetFileFormat = new ParquetFileFormat()
parquetFileFormat.prepareWrite(sparkSession, job, Map(), tableSchema)

Expand Down Expand Up @@ -114,7 +115,7 @@ abstract class NativeParquetSinkBase(
})

// init parquet schema
val job = new Job(new JobConf(serializableConf.value))
val job = Job.getInstance(new JobConf(serializableConf.value))
val tableProperties = tableDesc.getProperties
val columnNameProperty: String = tableProperties.getProperty(IOConstants.COLUMNS)
val columnTypeProperty: String = tableProperties.getProperty(IOConstants.COLUMNS_TYPES)
Expand Down Expand Up @@ -157,6 +158,7 @@ abstract class NativeParquetSinkBase(
friendlyName = "NativeRDD.ParquetSink")
}

@nowarn("cat=unused") // _tableDesc temporarily unused
protected def newHadoopConf(_tableDesc: TableDesc): Configuration =
sparkSession.sessionState.newHadoopConf()
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ package org.apache.spark.sql.execution.auron.plan

import java.util.UUID

import scala.annotation.nowarn
import scala.collection.JavaConverters._
import scala.collection.mutable
import scala.collection.mutable.ArrayBuffer
Expand Down Expand Up @@ -308,6 +309,7 @@ abstract class NativeShuffleExchangeBase(
dependency
}

@nowarn("cat=unused") // Some params temporarily unused
private def rangePartitioningBound[K: Ordering: ClassTag, V](
partitions: Int,
rdd: RDD[_ <: Product2[K, V]],
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,8 @@ abstract class NativeShuffledHashJoinBase(
private def nativeBuildSide = buildSide match {
case JoinBuildLeft => pb.JoinSide.LEFT_SIDE
case JoinBuildRight => pb.JoinSide.RIGHT_SIDE
case other =>
throw new IllegalArgumentException(s"Unknown Join buildSide: $other")
}

protected def rewriteKeyExprToLong(exprs: Seq[Expression]): Seq[Expression]
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ abstract class NativeUnionBase(
val unionInputs = ArrayBuffer[(PhysicalPlanNode, Int)]()
partition match {
case p: UnionPartition[_] =>
val rdds = unionRDD.asInstanceOf[UnionRDD[_]].rdds
val rdds = unionRDD.asInstanceOf[UnionRDD[Any]].rdds
val nativeRDD = rdds(p.parentRddIndex).asInstanceOf[NativeRDD]
val input = nativeRDD.nativePlan(p.parentPartition, taskContext)
for (childIndex <- rdds.indices) {
Expand All @@ -81,7 +81,7 @@ abstract class NativeUnionBase(
}
}
case p: PartitionerAwareUnionRDDPartition =>
val rdds = unionRDD.asInstanceOf[PartitionerAwareUnionRDD[_]].rdds
val rdds = unionRDD.asInstanceOf[PartitionerAwareUnionRDD[Any]].rdds
for ((rdd, partition) <- rdds.zip(p.parents)) {
val nativeRDD = rdd.asInstanceOf[NativeRDD]
unionInputs.append((nativeRDD.nativePlan(partition, taskContext), partition.index))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -180,5 +180,5 @@ trait BlockObject extends AutoCloseable {
def getFileLength: Long = throw new UnsupportedOperationException
def getByteBuffer: ByteBuffer = throw new UnsupportedOperationException
def getChannel: ReadableByteChannel = throw new UnsupportedOperationException
def throwFetchFailed(errmsg: String): Unit = throw new UnsupportedOperationException
def throwFetchFailed(errmsg: String): Unit = throw new UnsupportedOperationException(errmsg)
}
Loading