Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -75,7 +75,7 @@ import org.apache.spark.util.ArrayImplicits._
* functions.
*/
object SimpleAnalyzer extends Analyzer(
new CatalogManager(
new DefaultCatalogManager(
FakeV2SessionCatalog,
new SessionCatalog(
new InMemoryCatalog,
Expand Down Expand Up @@ -323,7 +323,7 @@ class Analyzer(

// Only for tests.
def this(catalog: SessionCatalog) = {
this(new CatalogManager(FakeV2SessionCatalog, catalog))
this(new DefaultCatalogManager(FakeV2SessionCatalog, catalog))
}

def getRelationResolution: RelationResolution = relationResolution
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,7 +121,7 @@ class SessionCatalog(

/**
* Live PATH for session function kinds. Set from
* [[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor via
* [[org.apache.spark.sql.connector.catalog.DefaultCatalogManager]]'s constructor via
* [[bindCatalogManagerForSessionFunctionKinds]] so unqualified lookups and the security check
* that blocks temp functions from shadowing builtins read the effective SQL PATH (post-`SET
* PATH`, with [[SQLConf.DEFAULT_PATH]] and [[SQLConf.defaultPathOrder]] fallbacks already
Expand All @@ -135,7 +135,8 @@ class SessionCatalog(

/**
* Wire live PATH-derived session function kinds from the session [[CatalogManager]].
* Called once from [[org.apache.spark.sql.connector.catalog.CatalogManager]]'s constructor.
* Called once from [[org.apache.spark.sql.connector.catalog.DefaultCatalogManager]]'s
* constructor.
*/
private[sql] def bindCatalogManagerForSessionFunctionKinds(cm: CatalogManager): Unit = {
catalogManagerForSessionFunctionKinds = Some(cm)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical.{LocalRelation, Project}
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.catalyst.util.ResolveDefaultColumns.BuiltInFunctionCatalog
import org.apache.spark.sql.connector.catalog.{CatalogManager, Identifier, TableCatalog, TableCatalogCapability}
import org.apache.spark.sql.connector.catalog.{DefaultCatalogManager, Identifier, TableCatalog, TableCatalogCapability}
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.{DataType, StructField, StructType}
Expand Down Expand Up @@ -205,5 +205,5 @@ object GeneratedColumn {
* Analyzer for processing generated column expressions using built-in functions only.
*/
object GeneratedColumnAnalyzer extends Analyzer(
new CatalogManager(BuiltInFunctionCatalog, BuiltInFunctionCatalog.v1Catalog)) {
new DefaultCatalogManager(BuiltInFunctionCatalog, BuiltInFunctionCatalog.v1Catalog)) {
}
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ import org.apache.spark.sql.catalyst.optimizer.{ConstantFolding, Optimizer}
import org.apache.spark.sql.catalyst.parser.{CatalystSqlParser, ParseException}
import org.apache.spark.sql.catalyst.plans.logical._
import org.apache.spark.sql.catalyst.trees.TreePattern.PLAN_EXPRESSION
import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, DefaultValue, FunctionCatalog, Identifier, TableCatalog, TableCatalogCapability}
import org.apache.spark.sql.connector.catalog.{CatalogManager, Column, DefaultCatalogManager, DefaultValue, FunctionCatalog, Identifier, TableCatalog, TableCatalogCapability}
import org.apache.spark.sql.connector.catalog.functions.UnboundFunction
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryErrorsBase}
import org.apache.spark.sql.internal.SQLConf
Expand Down Expand Up @@ -597,7 +597,7 @@ object ResolveDefaultColumns extends QueryErrorsBase
* This is an Analyzer for processing default column values using built-in functions only.
*/
object DefaultColumnAnalyzer extends Analyzer(
new CatalogManager(BuiltInFunctionCatalog, BuiltInFunctionCatalog.v1Catalog)) {
new DefaultCatalogManager(BuiltInFunctionCatalog, BuiltInFunctionCatalog.v1Catalog)) {
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,63 +26,115 @@ import org.apache.spark.internal.Logging
import org.apache.spark.sql.AnalysisException
import org.apache.spark.sql.catalyst.SQLConfHelper
import org.apache.spark.sql.catalyst.catalog.{SessionCatalog, TempVariableManager}
import org.apache.spark.sql.catalyst.catalog.SessionCatalog.SessionFunctionKind
import org.apache.spark.sql.catalyst.parser.CatalystSqlParser
import org.apache.spark.sql.catalyst.util.StringUtils
import org.apache.spark.sql.connector.catalog.CatalogManager.SessionPathEntry
import org.apache.spark.sql.connector.catalog.transactions.Transaction
import org.apache.spark.sql.errors.QueryCompilationErrors
import org.apache.spark.sql.internal.SQLConf

/**
* A thread-safe manager for [[CatalogPlugin]]s. It tracks all the registered catalogs, and allow
* the caller to look up a catalog by name.
* A thread-safe contract for managing [[CatalogPlugin]]s. Implementations resolve catalogs by
* name and maintain the current catalog and namespace for a session.
*
* There are still many commands (e.g. ANALYZE TABLE) that do not support v2 catalog API. They
* ignore the current catalog and blindly go to the v1 `SessionCatalog`. To avoid tracking current
* namespace in both `SessionCatalog` and `CatalogManger`, we let `CatalogManager` to set/get
* namespace in both `SessionCatalog` and `CatalogManager`, implementations set/get the
* current database of `SessionCatalog` when the current catalog is the session catalog.
*
* Two implementations exist: [[DefaultCatalogManager]] owns the mutable session state;
* [[TransactionAwareCatalogManager]] wraps another manager and redirects catalog lookups to the
* active transaction's catalog.
*/
// TODO: all commands should look up table from the current catalog. The `SessionCatalog` doesn't
// need to track current database at all.
private[sql]
class CatalogManager(
val defaultSessionCatalog: CatalogPlugin,
val v1SessionCatalog: SessionCatalog) extends SQLConfHelper with Logging {
private[sql] trait CatalogManager extends SQLConfHelper with Logging {

// ---- Underlying state exposed by implementations ----
def defaultSessionCatalog: CatalogPlugin
def v1SessionCatalog: SessionCatalog
def tempVariableManager: TempVariableManager

// ---- Catalog access ----
def catalog(name: String): CatalogPlugin
private[sql] def v2SessionCatalog: CatalogPlugin
def listCatalogs(pattern: Option[String]): Seq[String]
def currentCatalog: CatalogPlugin
def setCurrentCatalog(catalogName: String): Unit
def isCatalogRegistered(name: String): Boolean = {
try {
catalog(name)
true
} catch {
case _: CatalogNotFoundException => false
}
}

// ---- Transactions ----
def transaction: Option[Transaction] = None

def withTransaction(transaction: Transaction): CatalogManager

// ---- Namespace ----
def currentNamespace: Array[String]
def setCurrentNamespace(namespace: Array[String]): Unit

// ---- Session path ----
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@cloud-fan, could you help review this, please?

def sessionPathEntries: Option[Seq[SessionPathEntry]]
def storedSessionPathEntries: Option[Seq[SessionPathEntry]]
def confDefaultPathEntries: Option[Seq[SessionPathEntry]]
def setSessionPath(entries: Seq[SessionPathEntry]): Unit
def clearSessionPath(): Unit
private[sql] def copySessionPathFrom(other: CatalogManager): Unit
def currentPathString: String
def sqlResolutionPathEntries(
pathDefaultCatalog: String,
pathDefaultNamespace: Seq[String],
expandCatalog: String,
expandNamespace: Seq[String]): Seq[Seq[String]]
def sqlResolutionPathEntries(
currentCatalog: String,
currentNamespace: Seq[String]): Seq[Seq[String]]
def isSystemSessionOnPath: Boolean
def resolutionPathEntriesForAnalysis(
pinnedEntries: Option[Seq[Seq[String]]],
viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]]
def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionFunctionKind]

// Reset the manager to its initial state. Only used in tests.
private[sql] def reset(): Unit
}

/**
* Default [[CatalogManager]] implementation. Owns the mutable session state
* (registered catalogs, current catalog/namespace, session path).
*/
private[sql] class DefaultCatalogManager(
override val defaultSessionCatalog: CatalogPlugin,
override val v1SessionCatalog: SessionCatalog) extends CatalogManager {
import CatalogManager.SESSION_CATALOG_NAME
import CatalogV2Util._

private val catalogs = mutable.HashMap.empty[String, CatalogPlugin]

// TODO: create a real SYSTEM catalog to host `TempVariableManager` under the SESSION namespace.
val tempVariableManager: TempVariableManager = new TempVariableManager
override val tempVariableManager: TempVariableManager = new TempVariableManager

// Wire `SessionCatalog`'s fast-path kinds to the live SQL PATH. The kinds list itself is
// pure data conversion (system entries from the path, in path order); the *decision* to use
// path-order kinds for unqualified lookups lives at the Strategy layer (see callers of
// [[CatalogManager.systemFunctionKindsFromPath]]).
v1SessionCatalog.bindCatalogManagerForSessionFunctionKinds(this)

def catalog(name: String): CatalogPlugin = synchronized {
override def catalog(name: String): CatalogPlugin = synchronized {
if (name.equalsIgnoreCase(SESSION_CATALOG_NAME)) {
v2SessionCatalog
} else {
catalogs.getOrElseUpdate(name, Catalogs.load(name, conf))
}
}

def transaction: Option[Transaction] = None

def withTransaction(transaction: Transaction): CatalogManager =
new TransactionAwareCatalogManager(this, transaction)

def isCatalogRegistered(name: String): Boolean = {
try {
catalog(name)
true
} catch {
case _: CatalogNotFoundException => false
}
}

private def loadV2SessionCatalog(): CatalogPlugin = {
Catalogs.load(SESSION_CATALOG_NAME, conf) match {
case extension: CatalogExtension =>
Expand All @@ -101,16 +153,19 @@ class CatalogManager(
* This happens when the source implementation extends the v2 TableProvider API and is not listed
* in the fallback configuration, spark.sql.sources.useV1SourceList
*/
private[sql] def v2SessionCatalog: CatalogPlugin = {
override private[sql] def v2SessionCatalog: CatalogPlugin = {
conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION) match {
case "builtin" => defaultSessionCatalog
case _ => catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
}
}

override def withTransaction(transaction: Transaction): CatalogManager =
new TransactionAwareCatalogManager(this, transaction)

private var _currentNamespace: Option[Array[String]] = None

def currentNamespace: Array[String] = {
override def currentNamespace: Array[String] = {
val defaultNamespace = if (currentCatalog.name() == SESSION_CATALOG_NAME) {
Array(v1SessionCatalog.getCurrentDatabase)
} else {
Expand All @@ -132,7 +187,7 @@ class CatalogManager(
}
}

def setCurrentNamespace(namespace: Array[String]): Unit = {
override def setCurrentNamespace(namespace: Array[String]): Unit = {
// SPARK-56939: do NOT hold [[CatalogManager]]'s intrinsic lock across the callbacks below.
// [[v1SessionCatalog.setCurrentDatabaseWithNameCheck]] briefly synchronizes on
// [[SessionCatalog]], and concurrent unqualified function resolution acquires the
Expand Down Expand Up @@ -175,8 +230,6 @@ class CatalogManager(
}
}

import CatalogManager.SessionPathEntry

private var _sessionPath: Option[Seq[SessionPathEntry]] = None

/**
Expand All @@ -196,13 +249,14 @@ class CatalogManager(
* [[currentCatalog]] falls back to [[SQLConf#DEFAULT_CATALOG]]). Returns `None` when
* [[SQLConf#PATH_ENABLED]] is false or both sources are empty.
*/
def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
override def sessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized {
if (!conf.pathEnabled) None
else _sessionPath.orElse(confDefaultPathEntries)
}

/** Raw `_sessionPath` (post-`SET PATH`), without the [[SQLConf#DEFAULT_PATH]] fallback. */
def storedSessionPathEntries: Option[Seq[SessionPathEntry]] = synchronized { _sessionPath }
override def storedSessionPathEntries: Option[Seq[SessionPathEntry]] =
synchronized { _sessionPath }

/**
* Parsed and expanded [[SQLConf#DEFAULT_PATH]] value, or `None` when the conf is empty.
Expand All @@ -216,7 +270,7 @@ class CatalogManager(
* `USE SCHEMA`) is dead code rather than an error. Cached so the hot path is a single
* atomic load on conf-stable sessions.
*/
def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = {
override def confDefaultPathEntries: Option[Seq[SessionPathEntry]] = {
val confValue = conf.defaultPath
if (confValue == null || confValue.trim.isEmpty) {
confDefaultPathCache.set(None)
Expand All @@ -236,15 +290,15 @@ class CatalogManager(
}
}

def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized {
override def setSessionPath(entries: Seq[SessionPathEntry]): Unit = synchronized {
_sessionPath = Some(entries)
}

def clearSessionPath(): Unit = synchronized {
override def clearSessionPath(): Unit = synchronized {
_sessionPath = None
}

private[sql] def copySessionPathFrom(other: CatalogManager): Unit = synchronized {
override private[sql] def copySessionPathFrom(other: CatalogManager): Unit = synchronized {
_sessionPath = other.storedSessionPathEntries
}

Expand All @@ -263,7 +317,7 @@ class CatalogManager(
* new SC->CM ordering must take `currentPathString` (or any other CM->SC nest) into
* account to avoid resurrecting the deadlock.
*/
def currentPathString: String = synchronized {
override def currentPathString: String = synchronized {
import CatalogV2Implicits._
sessionPathEntries match {
case Some(entries) =>
Expand All @@ -282,7 +336,7 @@ class CatalogManager(
* When PATH is in effect (stored or via the [[SQLConf#DEFAULT_PATH]] conf), uses the
* resolved entries.
*/
def sqlResolutionPathEntries(
override def sqlResolutionPathEntries(
pathDefaultCatalog: String,
pathDefaultNamespace: Seq[String],
expandCatalog: String,
Expand All @@ -299,7 +353,7 @@ class CatalogManager(
}

/** Session-catalog overload. */
def sqlResolutionPathEntries(
override def sqlResolutionPathEntries(
currentCatalog: String,
currentNamespace: Seq[String]): Seq[Seq[String]] =
sqlResolutionPathEntries(
Expand Down Expand Up @@ -330,7 +384,7 @@ class CatalogManager(
* [[org.apache.spark.sql.catalyst.analysis.FunctionResolution.isSessionBeforeBuiltinInPath]])
* MUST NOT hold [[SessionCatalog]]'s intrinsic lock when invoking this method.
*/
def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionCatalog.SessionFunctionKind] = {
override def sessionFunctionKindsForUnqualifiedResolution(): Seq[SessionFunctionKind] = {
// SPARK-56939: read v1's current database before taking the CM lock; see the method
// doc for why the resulting staleness is harmless for the kinds list.
val v1CurrentDb = v1SessionCatalog.getCurrentDatabase
Expand Down Expand Up @@ -358,7 +412,7 @@ class CatalogManager(
* invariant ever changes, this short-circuit must be revisited.
* Inspecting effective entries directly avoids loading the configured default catalog.
*/
def isSystemSessionOnPath: Boolean = synchronized {
override def isSystemSessionOnPath: Boolean = synchronized {
if (!conf.pathEnabled) return true
sessionPathEntries match {
case None => true
Expand All @@ -385,7 +439,7 @@ class CatalogManager(
* (typically `AnalysisContext.catalogAndNamespace`); empty when
* not resolving a view body.
*/
def resolutionPathEntriesForAnalysis(
override def resolutionPathEntriesForAnalysis(
pinnedEntries: Option[Seq[Seq[String]]],
viewCatalogAndNamespace: Seq[String]): Seq[Seq[String]] = {
pinnedEntries match {
Expand All @@ -409,11 +463,11 @@ class CatalogManager(

private var _currentCatalogName: Option[String] = None

def currentCatalog: CatalogPlugin = synchronized {
override def currentCatalog: CatalogPlugin = synchronized {
catalog(_currentCatalogName.getOrElse(conf.getConf(SQLConf.DEFAULT_CATALOG)))
}

def setCurrentCatalog(catalogName: String): Unit = {
override def setCurrentCatalog(catalogName: String): Unit = {
// SPARK-56939: see [[setCurrentNamespace]]. Avoid nesting [[CatalogManager]]'s lock
// across [[v1SessionCatalog.setCurrentDatabase]] (which synchronizes on
// [[SessionCatalog]]) to prevent a lock-order inversion with concurrent unqualified
Expand Down Expand Up @@ -451,7 +505,7 @@ class CatalogManager(
}
}

def listCatalogs(pattern: Option[String]): Seq[String] = {
override def listCatalogs(pattern: Option[String]): Seq[String] = {
val allCatalogs = (synchronized(catalogs.keys.toSeq) :+ SESSION_CATALOG_NAME).distinct.sorted
pattern.map(StringUtils.filterPattern(allCatalogs, _)).getOrElse(allCatalogs)
}
Expand All @@ -463,7 +517,7 @@ class CatalogManager(
// calls back into [[v1SessionCatalog]]. Test-only callers don't race against unqualified
// function resolution today, but keeping the contract symmetric prevents future test
// helpers (e.g. session reset in a concurrent harness) from reintroducing the cycle.
private[sql] def reset(): Unit = {
override private[sql] def reset(): Unit = {
synchronized {
catalogs.clear()
_currentNamespace = None
Expand Down
Loading