Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
3605889
[SPARK-56643][CONNECT][TESTS] Add DSv2 temp view with stored plan tes…
longvu-db May 1, 2026
1863720
Add schema verification assertions for empty table results
longvu-db May 1, 2026
4a15374
Add Scenario 3.1 session column removal test
longvu-db May 1, 2026
c9ab367
Empty commit to retrigger CI
longvu-db May 1, 2026
8d9cf90
Address review: add helpers, fix assertRows, generic serverCatalog
longvu-db May 8, 2026
97add45
Add session and cache variants for Scenario 7, named args, clearCachi…
longvu-db May 8, 2026
3d71703
Format withTableAndView calls: one argument per line
longvu-db May 8, 2026
8d9dc56
Remove unnecessary ClassTag from serverCatalog helper
longvu-db May 8, 2026
2aa5db5
Add externalAppend helper to reduce boilerplate in connect tests
longvu-db May 8, 2026
f4d06f8
Remove trailing empty line
longvu-db May 8, 2026
9a646de
Rename withTableAndView to withTableAndViews
longvu-db May 8, 2026
5615254
Fix scalafmt formatting in DataSourceV2TempViewConnectSuite
longvu-db May 11, 2026
78cd846
Address review: add row= named arg, document cache and schemaAfterDro…
longvu-db May 19, 2026
b50ccec
Remove CachingInMemoryTableCatalog.scala and InMemoryTableCatalog.sca…
longvu-db May 20, 2026
dca00df
Restore catalog files to master version
longvu-db May 20, 2026
dea00ca
Fix compilation error: call clearCache() on catalog instance, not com…
longvu-db May 20, 2026
23dc92f
Address review comments from cloud-fan
longvu-db May 21, 2026
cdaf848
Refactor: share temp view tests between classic and Connect via trait
longvu-db May 21, 2026
1052517
Fix self-review findings: remove missing trait mixin, dead imports, b…
longvu-db May 21, 2026
63ec689
Fix scalafmt formatting in DataSourceV2TempViewConnectSuite
longvu-db May 21, 2026
cf811e9
Add comment explaining why sameRows is used instead of checkAnswer fo…
longvu-db May 22, 2026
6253739
Address auto-review findings: narrow base trait, clarify Connect comm…
longvu-db May 22, 2026
62998c3
Clean up comments and naming: rename cat to catalog, simplify Scaladoc
longvu-db May 24, 2026
9f98149
Make testPrefix abstract, require explicit prefix in each suite
longvu-db May 24, 2026
5923e0c
Simplify withTestTableAndViews Scaladoc: remove implementation details
longvu-db May 24, 2026
2a084ce
Fully qualify cross-package Scaladoc link for DataSourceV2TempViewCon…
longvu-db May 24, 2026
c22ffbe
Remove redundant mixin list from DSv2TempViewWithStoredPlanTests Scal…
longvu-db May 24, 2026
8fd6b01
Address review: preserve classic test names, add ClassTag require check
longvu-db May 25, 2026
a8f1353
Simplify testPrefix Scaladoc
longvu-db May 25, 2026
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,83 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connect

import scala.reflect.ClassTag

import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
import org.apache.spark.sql.connector.DSv2TempViewWithStoredPlanTests
import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, InMemoryTableCatalog, TableCatalog}

/**
* Connect-mode runner for [[DSv2TempViewWithStoredPlanTests]]. All test logic lives in the shared
* trait; this class only provides the Connect-specific session, catalog access, and result
* comparison.
*/
class DataSourceV2TempViewConnectSuite
extends SparkConnectServerTest
with DSv2TempViewWithStoredPlanTests {

override def sparkConf: SparkConf = super.sparkConf
.set("spark.sql.catalog.testcat", classOf[InMemoryTableCatalog].getName)
.set("spark.sql.catalog.testcat.copyOnLoad", "true")
.set("spark.sql.catalog.cachingcat", classOf[CachingInMemoryTableCatalog].getName)
.set("spark.sql.catalog.cachingcat.copyOnLoad", "true")

override protected def testPrefix: String = "[connect] "

override protected def withTestSession(fn: SparkSession => Unit): Unit =
withSession(fn)

// Cannot use QueryTest.checkAnswer directly because it accesses df.logicalPlan,
// df.queryExecution, and df.materializedRdd, which are not available on Connect *client*
// DataFrames (they throw ConnectClientUnsupportedErrors). Note: checkAnswer IS usable from
// Connect server tests that operate on classic server-side DataFrames, but in this suite
// `df` is a Connect client DataFrame returned by session.table() / session.sql().
// Instead, collect the rows and delegate to QueryTest.sameRows, which is the same
// value-based, order-agnostic comparison that checkAnswer uses internally.
override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit =
QueryTest.sameRows(expected, df.collect().toSeq).foreach(msg => fail(msg))

override protected def getTableCatalog[C <: TableCatalog: ClassTag](
session: SparkSession,
catalogName: String): C = {
val serverSession = getServerSession(session)
val catalog = serverSession.sessionState.catalogManager.catalog(catalogName)
val ct = implicitly[ClassTag[C]]
require(
ct.runtimeClass.isInstance(catalog),
s"Expected ${ct.runtimeClass.getName} but got ${catalog.getClass.getName}")
catalog.asInstanceOf[C]
}

// No explicit clearCache() for cachingcat is needed here, unlike the classic suite.
// Each withSession call creates a freshly isolated SparkSession on the server side
// (via SparkConnectSessionManager.newIsolatedSession), and afterEach invalidates all
// sessions, so the CachingInMemoryTableCatalog instance is per-test.
override protected def withTestTableAndViews(
session: SparkSession,
table: String,
views: Seq[String] = Seq.empty)(fn: => Unit): Unit = {
try { fn }
finally {
views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v").collect())
session.sql(s"DROP TABLE IF EXISTS $table").collect()
}
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector

import java.util

import scala.reflect.ClassTag

import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{BufferedRows, CatalogV2Util, Identifier, InMemoryBaseTable, TableCatalog, TableWritePrivilege}

/**
* Base trait for DSv2 tests that involve external table mutations (writes, schema changes,
* drop/recreate) via the catalog API.
*
* Provides abstract methods so that the same test scenarios can run in both classic mode
* (where the test session IS the server session) and Connect mode (where the test session
* is a Connect client and catalog access requires the server session).
*
* Concrete suites override the abstract methods and mix in the test trait
* [[DSv2TempViewWithStoredPlanTests]].
*/
trait DSv2ExternalMutationTestBase extends QueryTest {

/** Prefix for test names, e.g. "" or "[connect] ". */
protected def testPrefix: String

/** Execute a test body with a session. */
protected def withTestSession(fn: SparkSession => Unit): Unit

/**
* Assert that a DataFrame's rows match the expected rows (order-agnostic).
*/
protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit

/**
* Get a [[TableCatalog]] by name from the underlying session.
*/
protected def getTableCatalog[C <: TableCatalog: ClassTag](
session: SparkSession,
catalogName: String): C

/** Cleanup wrapper: drop views and the table after the test body, even on failure. */
protected def withTestTableAndViews(
session: SparkSession,
table: String,
views: Seq[String] = Seq.empty)(fn: => Unit): Unit

/** Appends a row to a DSv2 table via the catalog API, bypassing the session. */
protected def externalAppend(
catalog: TableCatalog,
ident: Identifier,
row: InternalRow): Unit = {
val extTable = catalog
.loadTable(ident, util.Set.of(TableWritePrivilege.INSERT))
.asInstanceOf[InMemoryBaseTable]
val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns())
extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
}
}
Loading