Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,91 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector

import java.util

import scala.reflect.ClassTag

import org.apache.spark.sql.{DataFrame, QueryTest, Row, SparkSession}
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{BufferedRows, CatalogV2Util, Identifier, InMemoryBaseTable, TableCatalog, TableWritePrivilege}

/**
* Base trait for DSv2 tests that involve external table mutations (writes, schema changes,
* drop/recreate) via the catalog API.
*
* Provides abstract methods so that the same test scenarios can run in both classic mode
* (where the test session IS the server session) and Connect mode (where the test session
* is a Connect client and catalog access requires the server session).
*
* Concrete suites override the abstract methods and mix in the test trait
* (e.g. [[DSv2RepeatedTableAccessWithExternalChangesTests]]).
*
* Extends [[QueryTest]] (not [[org.apache.spark.sql.test.SharedSparkSession]]) so that the
* trait does not pull in a SparkSession of its own. Concrete suites provide a session via
* [[SharedSparkSession]] (classic) or [[org.apache.spark.sql.connect.SparkConnectServerTest]]
* (Connect).
*/
trait DSv2ExternalMutationTestBase extends QueryTest {

/** Prefix for test names, e.g. "[connect] " for Connect suites, "" for classic. */
protected def testPrefix: String = ""

/**
* Execute a test body with a session. Classic: `fn(spark)`. Connect: `withSession(fn)`.
*/
protected def withTestSession(fn: SparkSession => Unit): Unit

/**
* Assert that a DataFrame's rows match the expected rows (order-agnostic).
* Classic: delegates to [[org.apache.spark.sql.QueryTest.checkAnswer]].
* Connect: collects rows and compares with [[org.apache.spark.sql.QueryTest.sameRows]].
*/
protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit

/**
* Get a server-side [[TableCatalog]] by name.
* Classic: the session is the server session, so access the catalog directly.
* Connect: get the server session behind the Connect client, then access the catalog.
*/
protected def getTableCatalog[C <: TableCatalog: ClassTag](
session: SparkSession,
catalogName: String): C

/**
* Cleanup wrapper: drop views and the table after the test body, even on failure.
* Classic: delegates to `withTable` + manual view drops.
* Connect: `session.sql("DROP ...")` in a finally block.
*/
protected def withTestTableAndViews(
session: SparkSession,
table: String,
views: Seq[String] = Seq.empty)(fn: => Unit): Unit

/** Appends a row to a DSv2 table via the catalog API, bypassing the session. */
protected def externalAppend(
cat: TableCatalog,
ident: Identifier,
row: InternalRow): Unit = {
val extTable = cat
.loadTable(ident, util.Set.of(TableWritePrivilege.INSERT))
.asInstanceOf[InMemoryBaseTable]
val schema = CatalogV2Util.v2ColumnsToStructType(extTable.columns())
extTable.withData(Array(new BufferedRows(Seq.empty, schema).withRow(row)))
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,232 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.spark.sql.connector

import org.apache.spark.sql.Row
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.connector.catalog.{CachingInMemoryTableCatalog, Column, Identifier, InMemoryTableCatalog, TableChange, TableInfo}
import org.apache.spark.sql.types.IntegerType

/**
* Shared repeated sql() access tests for DSv2 tables. These tests verify that each
* `sql("SELECT * FROM t")` call creates a fresh QueryExecution and always sees the latest
* data, schema, and table identity (no CACHE TABLE involved).
*
* Mixed into both classic [[DataSourceV2DataFrameSuite]] and Connect suites.
*
* NOTE: All `session.sql(...)` calls append `.collect()` because Connect client DataFrames
* are lazy and require an action to trigger execution. In classic mode `.collect()` on DDL
* is a no-op (DDL executes eagerly), so this is harmless.
*/
trait DSv2RepeatedTableAccessWithExternalChangesTests extends DSv2ExternalMutationTestBase {

private val T = "testcat.ns1.ns2.tbl"
private val CT = "cachingcat.ns1.ns2.tbl"
private val testIdent = Identifier.of(Array("ns1", "ns2"), "tbl")

// Scenario 1.1 (session write)
test(s"${testPrefix}repeated sql() reflects session write") {
withTestSession { session =>
withTestTableAndViews(session, T) {
session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect()
session.sql(s"INSERT INTO $T VALUES (1, 100)").collect()
checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100)))

session.sql(s"INSERT INTO $T VALUES (2, 200)").collect()
checkRows(
session.sql(s"SELECT * FROM $T"),
Seq(Row(1, 100), Row(2, 200)))
}
}
}

// Scenario 1.2 (external write)
test(s"${testPrefix}repeated sql() reflects external write") {
withTestSession { session =>
withTestTableAndViews(session, T) {
session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect()
session.sql(s"INSERT INTO $T VALUES (1, 100)").collect()
checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100)))

val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat")
externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200))

checkRows(
session.sql(s"SELECT * FROM $T"),
Seq(Row(1, 100), Row(2, 200)))
}
}
}

// Scenario 1.2 connector w/ cache (external write, caching connector)
test(s"${testPrefix}connector w/ cache: repeated sql() stale after external write") {
withTestSession { session =>
withTestTableAndViews(session, CT) {
session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect()
session.sql(s"INSERT INTO $CT VALUES (1, 100)").collect()
checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100)))

val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat")
externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200))

// Caching connector returns stale table: external write invisible
checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100)))

// REFRESH TABLE invalidates the connector cache, external write becomes visible
session.sql(s"REFRESH TABLE $CT").collect()
checkRows(
session.sql(s"SELECT * FROM $CT"),
Seq(Row(1, 100), Row(2, 200)))
}
}
}

// Scenario 2.1 (session schema change)
test(s"${testPrefix}repeated sql() reflects session schema change") {
withTestSession { session =>
withTestTableAndViews(session, T) {
session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect()
session.sql(s"INSERT INTO $T VALUES (1, 100)").collect()
checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100)))

session.sql(s"ALTER TABLE $T ADD COLUMN new_col INT").collect()
session.sql(s"INSERT INTO $T VALUES (2, 200, -1)").collect()
checkRows(
session.sql(s"SELECT * FROM $T"),
Seq(Row(1, 100, null), Row(2, 200, -1)))
}
}
}

// Scenario 2.2 (external schema change)
test(s"${testPrefix}repeated sql() reflects external schema change") {
withTestSession { session =>
withTestTableAndViews(session, T) {
session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect()
session.sql(s"INSERT INTO $T VALUES (1, 100)").collect()
checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100)))

// external schema change + data write via catalog API
val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat")
val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
cat.alterTable(testIdent, addCol)

externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1))

checkRows(
session.sql(s"SELECT * FROM $T"),
Seq(Row(1, 100, null), Row(2, 200, -1)))
}
}
}

// Scenario 2.2 connector w/ cache (external schema change, caching connector)
test(s"${testPrefix}connector w/ cache: repeated sql() stale after external schema change") {
withTestSession { session =>
withTestTableAndViews(session, CT) {
session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect()
session.sql(s"INSERT INTO $CT VALUES (1, 100)").collect()
checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100)))

// external schema change + data via catalog API
val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat")
val addCol = TableChange.addColumn(Array("new_col"), IntegerType, true)
cat.alterTable(testIdent, addCol)

externalAppend(cat = cat, ident = testIdent, row = InternalRow(2, 200, -1))

// Caching connector returns stale table: external changes invisible
checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100)))

// REFRESH TABLE invalidates the connector cache, schema change + data visible
session.sql(s"REFRESH TABLE $CT").collect()
checkRows(
session.sql(s"SELECT * FROM $CT"),
Seq(Row(1, 100, null), Row(2, 200, -1)))
}
}
}

// Scenario 3.1 (session drop and recreate table)
test(s"${testPrefix}repeated sql() reflects session drop/recreate") {
withTestSession { session =>
withTestTableAndViews(session, T) {
session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect()
session.sql(s"INSERT INTO $T VALUES (1, 100)").collect()
checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100)))

session.sql(s"DROP TABLE $T").collect()
session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect()
checkRows(session.sql(s"SELECT * FROM $T"), Seq.empty)
}
}
}

// Scenario 3.2 (external drop and recreate table)
test(s"${testPrefix}repeated sql() reflects external drop/recreate") {
withTestSession { session =>
withTestTableAndViews(session, T) {
session.sql(s"CREATE TABLE $T (id INT, salary INT) USING foo").collect()
session.sql(s"INSERT INTO $T VALUES (1, 100)").collect()
checkRows(session.sql(s"SELECT * FROM $T"), Seq(Row(1, 100)))

// external drop and recreate via catalog API
val cat = getTableCatalog[InMemoryTableCatalog](session, "testcat")
cat.dropTable(testIdent)
cat.createTable(
testIdent,
new TableInfo.Builder()
.withColumns(Array(
Column.create("id", IntegerType),
Column.create("salary", IntegerType)))
.build())

checkRows(session.sql(s"SELECT * FROM $T"), Seq.empty)
}
}
}

// Scenario 3.2 connector w/ cache (external drop/recreate, caching connector)
test(s"${testPrefix}connector w/ cache: repeated sql() stale after external drop/recreate") {
withTestSession { session =>
withTestTableAndViews(session, CT) {
session.sql(s"CREATE TABLE $CT (id INT, salary INT) USING foo").collect()
session.sql(s"INSERT INTO $CT VALUES (1, 100)").collect()
checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100)))

// external drop and recreate via catalog API
val cat = getTableCatalog[CachingInMemoryTableCatalog](session, "cachingcat")
cat.dropTable(testIdent)
cat.createTable(
testIdent,
new TableInfo.Builder()
.withColumns(Array(
Column.create("id", IntegerType),
Column.create("salary", IntegerType)))
.build())

// Caching connector returns stale table: drop/recreate invisible
checkRows(session.sql(s"SELECT * FROM $CT"), Seq(Row(1, 100)))

// REFRESH TABLE invalidates the connector cache, new empty table visible
session.sql(s"REFRESH TABLE $CT").collect()
checkRows(session.sql(s"SELECT * FROM $CT"), Seq.empty)
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,14 +21,15 @@ import java.util
import java.util.Collections

import scala.jdk.CollectionConverters._
import scala.reflect.ClassTag

import org.apache.spark.{SparkConf, SparkException}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode}
import org.apache.spark.sql.{AnalysisException, DataFrame, Row, SaveMode, SparkSession}
import org.apache.spark.sql.QueryTest.withQueryExecutionsCaptured
import org.apache.spark.sql.catalyst.InternalRow
import org.apache.spark.sql.catalyst.analysis.TableAlreadyExistsException
import org.apache.spark.sql.catalyst.plans.logical.{AppendData, CreateTableAsSelect, LogicalPlan, ReplaceTableAsSelect}
import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableInfo, TypeChangeResetsColIdTableCatalog}
import org.apache.spark.sql.connector.catalog.{BufferedRows, CachingInMemoryTableCatalog, CatalogV2Util, Column, ColumnDefaultValue, ComposedColumnIdTableCatalog, DefaultValue, Identifier, InMemoryBaseTable, InMemoryTableCatalog, MixedColumnIdTableCatalog, NullColumnIdInMemoryTableCatalog, NullTableIdInMemoryTableCatalog, SupportsV1OverwriteWithSaveAsTable, TableCatalog, TableInfo, TypeChangeResetsColIdTableCatalog}
import org.apache.spark.sql.connector.catalog.BasicInMemoryTableCatalog
import org.apache.spark.sql.connector.catalog.TableChange.{AddColumn, UpdateColumnDefaultValue}
import org.apache.spark.sql.connector.catalog.TableChange
Expand All @@ -46,7 +47,8 @@ import org.apache.spark.sql.util.QueryExecutionListener
import org.apache.spark.unsafe.types.UTF8String

class DataSourceV2DataFrameSuite
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false) {
extends InsertIntoTests(supportsDynamicOverwrite = true, includeSQLOnlyTests = false)
with DSv2RepeatedTableAccessWithExternalChangesTests {
import org.apache.spark.sql.connector.catalog.CatalogV2Implicits._
import testImplicits._

Expand Down Expand Up @@ -87,6 +89,28 @@ class DataSourceV2DataFrameSuite
catalog.asInstanceOf[InMemoryTableCatalog]
}

// DSv2ExternalMutationTestBase implementations for classic mode
override protected def withTestSession(fn: SparkSession => Unit): Unit = fn(spark)

override protected def checkRows(df: => DataFrame, expected: Seq[Row]): Unit =
checkAnswer(df, expected)

override protected def getTableCatalog[C <: TableCatalog: ClassTag](
session: SparkSession,
catalogName: String): C = {
catalog(catalogName).asInstanceOf[C]
}

override protected def withTestTableAndViews(
session: SparkSession,
table: String,
views: Seq[String] = Seq.empty)(fn: => Unit): Unit = {
withTable(table) {
try { fn }
finally { views.foreach(v => session.sql(s"DROP VIEW IF EXISTS $v")) }
}
}

override def verifyTable(tableName: String, expected: DataFrame): Unit = {
checkAnswer(spark.table(tableName), expected)
}
Expand Down