Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -429,7 +429,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
if !relation.hiveQlTable.isPartitioned &&
hive.convertMetastoreParquet &&
hive.conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
relation.tableDescVirtual.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
Expand All @@ -438,7 +438,7 @@ private[hive] class HiveMetastoreCatalog(hive: HiveContext) extends Catalog with
case p @ PhysicalOperation(_, _, relation: MetastoreRelation)
if hive.convertMetastoreParquet &&
hive.conf.parquetUseDataSourceApi &&
relation.tableDesc.getSerdeClassName.toLowerCase.contains("parquet") =>
relation.tableDescVirtual.getSerdeClassName.toLowerCase.contains("parquet") =>
val parquetRelation = convertToParquetRelation(relation)
val attributedRewrites = relation.output.zip(parquetRelation.output)
(relation, parquetRelation, attributedRewrites)
Expand Down Expand Up @@ -692,7 +692,7 @@ private[hive] case class MetastoreRelation
}
}

val tableDesc = HiveShim.getTableDesc(
val tableDesc = HiveShim.getTableDescVirtual(
Class.forName(
hiveQlTable.getSerializationLib,
true,
Expand All @@ -706,6 +706,21 @@ private[hive] case class MetastoreRelation
hiveQlTable.getMetadata
)

val tableDescVirtual = HiveShim.getTableDesc(
Class.forName(
hiveQlTable.getSerializationLib,
true,
Utils.getContextOrSparkClassLoader).asInstanceOf[Class[Deserializer]],
hiveQlTable.getInputFormatClass,
// The class of table should be org.apache.hadoop.hive.ql.metadata.Table because
// getOutputFormatClass will use HiveFileFormatUtils.getOutputFormatSubstitute to
// substitute some output formats, e.g. substituting SequenceFileOutputFormat to
// HiveSequenceFileOutputFormat.
hiveQlTable.getOutputFormatClass,
hiveQlTable.getMetadata
)


implicit class SchemaAttribute(f: FieldSchema) {
def toAttribute = AttributeReference(
f.getName,
Expand All @@ -721,7 +736,14 @@ private[hive] case class MetastoreRelation
/** Non-partitionKey attributes */
val attributes = hiveQlTable.getCols.map(_.toAttribute)

val output = attributes ++ partitionKeys
var virtualAttributes = scala.collection.immutable.List[FieldSchema]()
val inputFileNmae = new FieldSchema("INPUT__FILE__NAME", "string","")
val offset = new FieldSchema("BLOCK__OFFSET__INSIDE__FILE", "int","")
virtualAttributes = offset :: inputFileNmae :: virtualAttributes
val c = virtualAttributes.map(_.toAttribute)
val output = attributes ++ c ++ partitionKeys
val attributes1 = attributes ++ c
//val output = attributes ++ partitionKeys

/** An attribute map that can be used to lookup original attributes based on expression id. */
val attributeMap = AttributeMap(output.map(o => (o,o)))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ import org.apache.spark.broadcast.Broadcast
import org.apache.spark.rdd.{EmptyRDD, HadoopRDD, RDD, UnionRDD}
import org.apache.spark.sql.catalyst.expressions._
import org.apache.spark.sql.types.DateUtils

import org.apache.spark.sql.hive.VirtualHadoopRDD
/**
* A trait for subclasses that handle table scans.
*/
Expand Down Expand Up @@ -107,7 +107,13 @@ class HadoopTableReader(
// logDebug("Table input: %s".format(tablePath))
val ifc = hiveTable.getInputFormatClass
.asInstanceOf[java.lang.Class[InputFormat[Writable, Writable]]]
val hadoopRDD = createHadoopRdd(tableDesc, inputPathStr, ifc)

val hadoopRDD =if (attributes.seq.toString.contains("BLOCK__OFFSET__INSIDE__FILE") ||
attributes.seq.toString.contains("INPUT__FILE__NAME")){
createHadoopRddVirtual(tableDesc, inputPathStr, ifc)
} else{
createHadoopRdd(tableDesc, inputPathStr, ifc)
}

val attrsWithIndex = attributes.zipWithIndex
val mutableRow = new SpecificMutableRow(attributes.map(_.dataType))
Expand Down Expand Up @@ -240,6 +246,29 @@ class HadoopTableReader(
// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)
}

private def createHadoopRddVirtual(
tableDesc: TableDesc,
path: String,
inputFormatClass: Class[InputFormat[Writable, Writable]]): RDD[Writable] = {

val initializeJobConfFunc = HadoopTableReader.initializeLocalJobConfFunc(path, tableDesc) _

val rdd = new VirtualHadoopRDD(
sc.sparkContext,
_broadcastedHiveConf.asInstanceOf[Broadcast[SerializableWritable[Configuration]]],
Some(initializeJobConfFunc),
inputFormatClass,
classOf[Writable],
classOf[Writable],
_minSplitsPerRDD,
tableDesc)

// Only take the value (skip the key) because Hive works only with values.
rdd.map(_._2)
}


}

private[hive] object HadoopTableReader extends HiveInspectors {
Expand Down Expand Up @@ -273,9 +302,11 @@ private[hive] object HadoopTableReader extends HiveInspectors {
nonPartitionKeyAttrs: Seq[(Attribute, Int)],
mutableRow: MutableRow): Iterator[Row] = {

val inputfile = "INPUT__FILE__NAME|BLOCK__OFFSET__INSIDE__FILE|input__file__name".r
val soi = deserializer.getObjectInspector().asInstanceOf[StructObjectInspector]
val (fieldRefs, fieldOrdinals) = nonPartitionKeyAttrs.map { case (attr, ordinal) =>
soi.getStructFieldRef(attr.name) -> ordinal
val (fieldRefs, fieldOrdinals) =
nonPartitionKeyAttrs.map { case (attr, ordinal) if (!attr.name.contains("input__file__name")) =>
soi.getStructFieldRef(attr.name) -> ordinal
}.unzip

// Builds specific unwrappers ahead of time according to object inspector types to avoid pattern
Expand Down
Loading