Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -317,12 +317,15 @@ class HiveTableCatalog(sparkSession: SparkSession)
val (partitionColumns, maybeBucketSpec) = partitions.toSeq.convertTransforms
val location = Option(properties.get(TableCatalog.PROP_LOCATION))
val maybeProvider = Option(properties.get(TableCatalog.PROP_PROVIDER))
val tableProps = properties.asScala.toMap
val (optionsProps, serdeProps) = toOptionsAndSerdeProps(tableProps)
val (storage, provider) =
getStorageFormatAndProvider(
maybeProvider,
location,
properties.asScala.toMap)
val tableProperties = properties.asScala
tableProps,
optionsProps,
serdeProps)
val isExternal = properties.containsKey(TableCatalog.PROP_EXTERNAL)
val tableType =
if (isExternal || location.isDefined) {
Expand All @@ -339,7 +342,7 @@ class HiveTableCatalog(sparkSession: SparkSession)
provider = Some(provider),
partitionColumnNames = partitionColumns,
bucketSpec = maybeBucketSpec,
properties = tableProperties.toMap,
properties = tableProps,
tracksPartitionsInCatalog = conf.manageFilesourcePartitions,
comment = Option(properties.get(TableCatalog.PROP_COMMENT)))

Expand Down Expand Up @@ -431,10 +434,25 @@ class HiveTableCatalog(sparkSession: SparkSession)
catalog.renameTable(oldIdent.asTableIdentifier, newIdent.asTableIdentifier)
}

private def toOptions(properties: Map[String, String]): Map[String, String] = {
properties.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX)).map {
case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value
}.toMap
/**
* Splits properties into optionsProps and serdeProps based on the `options.` prefix.
*
* - optionsProps: keys with "options." prefix whose stripped key ALREADY exist in properties,
* indicating they were originally specified via OPTIONS clause.
* - serdeProps: keys with "options." prefix whose stripped key does NOT exists in properties,
* indicating they were originally specified via SERDEPROPERTIES clause.
*
* @param properties the full properties map
* @return a tuple of (optionsProps, serdeProps), both with the "options." prefix stripped
*/
private[hive] def toOptionsAndSerdeProps(
properties: Map[String, String]): (Map[String, String], Map[String, String]) = {
val (optionsProps, serdeProps) = properties
.filterKeys(_.startsWith(TableCatalog.OPTION_PREFIX))
.map { case (key, value) => key.drop(TableCatalog.OPTION_PREFIX.length) -> value }
.toMap
.partition { case (strippedKey, _) => properties.contains(strippedKey) }
(optionsProps, serdeProps)
}

override def listNamespaces(): Array[Array[String]] =
Expand Down Expand Up @@ -583,23 +601,25 @@ private object HiveTableCatalog extends Logging {
private def getStorageFormatAndProvider(
provider: Option[String],
location: Option[String],
options: Map[String, String]): (CatalogStorageFormat, String) = {
tableProps: Map[String, String],
optionsProps: Map[String, String],
serdeProps: Map[String, String]): (CatalogStorageFormat, String) = {
val nonHiveStorageFormat = CatalogStorageFormat.empty.copy(
locationUri = location.map(CatalogUtils.stringToURI),
properties = options)
properties = optionsProps)

val conf = SQLConf.get
val defaultHiveStorage = HiveSerDe.getDefaultStorage(conf).copy(
locationUri = location.map(CatalogUtils.stringToURI),
properties = options)
properties = optionsProps)

if (provider.isDefined) {
(nonHiveStorageFormat, provider.get)
} else if (serdeIsDefined(options)) {
val maybeSerde = options.get("hive.serde")
val maybeStoredAs = options.get("hive.stored-as")
val maybeInputFormat = options.get("hive.input-format")
val maybeOutputFormat = options.get("hive.output-format")
} else if (serdeIsDefined(tableProps)) {
val maybeSerde = tableProps.get("hive.serde")
val maybeStoredAs = tableProps.get("hive.stored-as")
val maybeInputFormat = tableProps.get("hive.input-format")
val maybeOutputFormat = tableProps.get("hive.output-format")
val storageFormat = if (maybeStoredAs.isDefined) {
// If `STORED AS fileFormat` is used, infer inputFormat, outputFormat and serde from it.
HiveSerDe.sourceToSerDe(maybeStoredAs.get) match {
Expand All @@ -609,7 +629,7 @@ private object HiveTableCatalog extends Logging {
outputFormat = hiveSerde.outputFormat.orElse(defaultHiveStorage.outputFormat),
// User specified serde takes precedence over the one inferred from file format.
serde = maybeSerde.orElse(hiveSerde.serde).orElse(defaultHiveStorage.serde),
properties = options ++ defaultHiveStorage.properties)
properties = serdeProps ++ defaultHiveStorage.properties)
case _ => throw KyuubiHiveConnectorException(s"Unsupported serde ${maybeSerde.get}.")
}
} else {
Expand All @@ -619,7 +639,7 @@ private object HiveTableCatalog extends Logging {
outputFormat =
maybeOutputFormat.orElse(defaultHiveStorage.outputFormat),
serde = maybeSerde.orElse(defaultHiveStorage.serde),
properties = options ++ defaultHiveStorage.properties)
properties = serdeProps ++ defaultHiveStorage.properties)
}
(storageFormat, DDLUtils.HIVE_PROVIDER)
} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -287,6 +287,46 @@ class HiveCatalogSuite extends KyuubiHiveTest {
catalog.dropTable(testIdent)
}

test("toOptionsAndSerdeProps") {
val properties = Map(
"hive.serde" -> "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe",
"owner" -> "hadoop",
"header" -> "false",
"delimiter" -> "#",
TableCatalog.OPTION_PREFIX + "header" -> "false",
TableCatalog.OPTION_PREFIX + "delimiter" -> "#",
TableCatalog.OPTION_PREFIX + "field.delim" -> ",",
TableCatalog.OPTION_PREFIX + "line.delim" -> "\n")

val (optionsProps, serdeProps) = catalog.toOptionsAndSerdeProps(properties)

assert(optionsProps == Map(
"header" -> "false",
"delimiter" -> "#"))
assert(serdeProps == Map(
"field.delim" -> ",",
"line.delim" -> "\n"))
}

test("createTable: SERDEPROPERTIES") {
val properties = new util.HashMap[String, String]()
properties.put("hive.serde", "org.apache.hadoop.hive.serde2.lazy.LazySimpleSerDe")
properties.put(TableCatalog.OPTION_PREFIX + "field.delim", ",")
assert(!catalog.tableExists(testIdent))

val table = catalog.createTable(
testIdent,
schema,
Array.empty[Transform],
properties).asInstanceOf[HiveTable]

assert(!table.catalogTable.storage.properties.keys.exists(
_.startsWith(TableCatalog.OPTION_PREFIX)))
assert(!table.catalogTable.storage.properties.contains("hive.serde"))
assert(table.catalogTable.storage.properties.contains("field.delim"))
catalog.dropTable(testIdent)
}

test("loadTable") {
val table = catalog.createTable(testIdent, schema, Array.empty[Transform], emptyProps)
val loaded = catalog.loadTable(testIdent)
Expand Down
Loading