Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions python/docs/source/migration_guide/pyspark_upgrade.rst
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ Upgrading from PySpark 4.1 to 4.2
---------------------------------
* In Spark 4.2, the minimum supported version for PyArrow has been raised from 15.0.0 to 18.0.0 in PySpark.
* In Spark 4.2, ``DataFrame.__getattr__`` on Spark Connect Python Client no longer eagerly validate the column name. To restore the legacy behavior, set ``PYSPARK_VALIDATE_COLUMN_NAME_LEGACY`` environment variable to ``1``.
* In Spark 4.2, ``DataFrame[Stream]Reader/Writer.option`` and ``.options`` now filter out ``None`` values (treating them as "unset") instead of forwarding ``None`` to the JVM as Java ``null``, matching the Spark Connect Python client (SPARK-49263) and ``OptionUtils._set_opts``. To set an option to its default, omit it or pass ``None``; to set it to an empty string, pass ``""`` explicitly.
* In Spark 4.2, columnar data exchange between PySpark and the JVM uses Apache Arrow by default. The configuration ``spark.sql.execution.arrow.pyspark.enabled`` now defaults to true. To restore the legacy (non-Arrow) row-based data exchange, set ``spark.sql.execution.arrow.pyspark.enabled`` to ``false``.
* In Spark 4.2, regular Python UDFs are Arrow-optimized by default. The configuration ``spark.sql.execution.pythonUDF.arrow.enabled`` now defaults to true. To restore the legacy behavior for Python UDF execution, set ``spark.sql.execution.pythonUDF.arrow.enabled`` to ``false``.
* In Spark 4.2, regular Python UDTFs are Arrow-optimized by default. The configuration ``spark.sql.execution.pythonUDTF.arrow.enabled`` now defaults to true. To restore the legacy behavior for Python UDTF execution, set ``spark.sql.execution.pythonUDTF.arrow.enabled`` to ``false``.
Expand Down
8 changes: 6 additions & 2 deletions python/pyspark/sql/connect/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -602,14 +602,16 @@ def format(self, source: str) -> "DataFrameWriter":
format.__doc__ = PySparkDataFrameWriter.format.__doc__

def option(self, key: str, value: "OptionalPrimitiveType") -> "DataFrameWriter":
if value is None:
return self
self._write.options[key] = to_str(value)
return self

option.__doc__ = PySparkDataFrameWriter.option.__doc__

def options(self, **options: "OptionalPrimitiveType") -> "DataFrameWriter":
for k in options:
self._write.options[k] = to_str(options[k])
self.option(k, options[k])
return self

options.__doc__ = PySparkDataFrameWriter.options.__doc__
Expand Down Expand Up @@ -978,14 +980,16 @@ def using(self, provider: str) -> "DataFrameWriterV2":
using.__doc__ = PySparkDataFrameWriterV2.using.__doc__

def option(self, key: str, value: "OptionalPrimitiveType") -> "DataFrameWriterV2":
if value is None:
return self
self._write.options[key] = to_str(value)
return self

option.__doc__ = PySparkDataFrameWriterV2.option.__doc__

def options(self, **options: "OptionalPrimitiveType") -> "DataFrameWriterV2":
for k in options:
self._write.options[k] = to_str(options[k])
self.option(k, options[k])
return self

options.__doc__ = PySparkDataFrameWriterV2.options.__doc__
Expand Down
4 changes: 4 additions & 0 deletions python/pyspark/sql/connect/streaming/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,8 @@ def schema(self, schema: Union[StructType, str]) -> "DataStreamReader":
schema.__doc__ = PySparkDataStreamReader.schema.__doc__

def option(self, key: str, value: "OptionalPrimitiveType") -> "DataStreamReader":
if value is None:
return self
self._options[key] = str(value)
return self

Expand Down Expand Up @@ -488,6 +490,8 @@ def format(self, source: str) -> "DataStreamWriter":
format.__doc__ = PySparkDataStreamWriter.format.__doc__

def option(self, key: str, value: "OptionalPrimitiveType") -> "DataStreamWriter":
if value is None:
return self
self._write_proto.options[key] = cast(str, to_str(value))
return self

Expand Down
18 changes: 13 additions & 5 deletions python/pyspark/sql/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -201,6 +201,8 @@ def option(self, key: str, value: "OptionalPrimitiveType") -> "DataFrameReader":
|100|NULL|
+---+----+
"""
if value is None:
return self
self._jreader = self._jreader.option(key, to_str(value))
return self

Expand Down Expand Up @@ -248,8 +250,9 @@ def options(self, **options: "OptionalPrimitiveType") -> "DataFrameReader":
|100|NULL|
+---+----+
"""
for k in options:
self._jreader = self._jreader.option(k, to_str(options[k]))
for k, v in options.items():
if v is not None:
self._jreader = self._jreader.option(k, to_str(v))
return self

def load(
Expand Down Expand Up @@ -1433,6 +1436,8 @@ def option(self, key: str, value: "OptionalPrimitiveType") -> "DataFrameWriter":
+---+------------+
"""

if value is None:
return self
self._jwrite = self._jwrite.option(key, to_str(value))
return self

Expand Down Expand Up @@ -1483,8 +1488,9 @@ def options(self, **options: "OptionalPrimitiveType") -> "DataFrameWriter":
|100|Hyukjin Kwon|
+---+------------+
"""
for k in options:
self._jwrite = self._jwrite.option(k, to_str(options[k]))
for k, v in options.items():
if v is not None:
self._jwrite = self._jwrite.option(k, to_str(v))
return self

@overload
Expand Down Expand Up @@ -2469,6 +2475,8 @@ def option(self, key: str, value: "OptionalPrimitiveType") -> "DataFrameWriterV2

.. versionadded: 3.1.0
"""
if value is None:
return self
self._jwriter.option(key, to_str(value))
return self

Expand All @@ -2478,7 +2486,7 @@ def options(self, **options: "OptionalPrimitiveType") -> "DataFrameWriterV2":

.. versionadded: 3.1.0
"""
options = {k: to_str(v) for k, v in options.items()}
options = {k: to_str(v) for k, v in options.items() if v is not None}
self._jwriter.options(options)
return self

Expand Down
14 changes: 10 additions & 4 deletions python/pyspark/sql/streaming/readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -207,6 +207,8 @@ def option(self, key: str, value: "OptionalPrimitiveType") -> "DataStreamReader"
>>> time.sleep(3)
>>> q.stop()
"""
if value is None:
return self
self._jreader = self._jreader.option(key, to_str(value))
return self

Expand Down Expand Up @@ -242,8 +244,9 @@ def options(self, **options: "OptionalPrimitiveType") -> "DataStreamReader":
>>> time.sleep(3)
>>> q.stop()
"""
for k in options:
self._jreader = self._jreader.option(k, to_str(options[k]))
for k, v in options.items():
if v is not None:
self._jreader = self._jreader.option(k, to_str(v))
return self

def name(self, source_name: str) -> "DataStreamReader":
Expand Down Expand Up @@ -1143,6 +1146,8 @@ def option(self, key: str, value: "OptionalPrimitiveType") -> "DataStreamWriter"
>>> time.sleep(3)
>>> q.stop()
"""
if value is None:
return self
self._jwrite = self._jwrite.option(key, to_str(value))
return self

Expand Down Expand Up @@ -1179,8 +1184,9 @@ def options(self, **options: "OptionalPrimitiveType") -> "DataStreamWriter":
>>> time.sleep(3)
>>> q.stop()
"""
for k in options:
self._jwrite = self._jwrite.option(k, to_str(options[k]))
for k, v in options.items():
if v is not None:
self._jwrite = self._jwrite.option(k, to_str(v))
return self

@overload
Expand Down
20 changes: 20 additions & 0 deletions python/pyspark/sql/tests/streaming/test_streaming.py
Original file line number Diff line number Diff line change
Expand Up @@ -667,6 +667,26 @@ def test_streaming_drop_duplicate_within_watermark(self):
result = self.spark.sql("SELECT * FROM test_streaming_drop_duplicates_within_wm").collect()
self.assertTrue(len(result) >= 6 and len(result) <= 9)

def test_stream_reader_option_none_chains_safely(self):
df = (
self.spark.readStream.format("rate")
.option("rowsPerSecond", None)
.options(numPartitions=None)
.option("rowsPerSecond", "5")
.load()
)
self.assertIsNotNone(df.schema)

def test_stream_writer_option_none_chains_safely(self):
df = self.spark.readStream.format("rate").option("rowsPerSecond", "5").load()
writer = (
df.writeStream.format("memory")
.queryName("opt_none_test")
.option("checkpointLocation", None)
.options(checkpointLocation=None)
)
self.assertIsNotNone(writer)


class StreamingTests(StreamingTestsMixin, ReusedSQLTestCase):
def _assert_exception_tree_contains_msg(self, exception, msg):
Expand Down
26 changes: 26 additions & 0 deletions python/pyspark/sql/tests/test_readwriter.py
Original file line number Diff line number Diff line change
Expand Up @@ -313,6 +313,27 @@ def test_streaming_changes_rejects_user_schema(self):
).changes("nonexistent_table")
self.assertIn("changes", str(ctx.exception))

def test_option_none_is_filtered(self):
with tempfile.TemporaryDirectory() as d:
path = os.path.join(d, "data.csv")
with open(path, "w") as f:
f.write('"",val\n')
schema = "a STRING, b STRING"
expected = [Row(a=None, b="val")]
self.assertEqual(
self.spark.read.schema(schema).option("nullValue", None).csv(path).collect(),
expected,
)
self.assertEqual(
self.spark.read.schema(schema).options(nullValue=None).csv(path).collect(),
expected,
)

def test_writer_option_none_chains_safely(self):
df = self.spark.createDataFrame([(1,)], "x INT")
self.assertIsNotNone(df.write.option("foo", None).option("bar", "baz"))
self.assertIsNotNone(df.write.options(foo=None, bar="baz"))


class ReadwriterV2TestsMixin:
def test_api(self):
Expand Down Expand Up @@ -419,6 +440,11 @@ def get_cluster_by_cols(table="pyspark_cluster_by"):
self.assertEqual(get_cluster_by_cols(), ["x"])
self.assertSetEqual(set(data), set(self.spark.table(table_name).collect()))

def test_v2_writer_option_none_chains_safely(self):
df = self.spark.createDataFrame([(1,)], "x INT")
self.assertIsNotNone(df.writeTo("notexist").option("foo", None).option("bar", "baz"))
self.assertIsNotNone(df.writeTo("notexist").options(foo=None, bar="baz"))


class ReadwriterTests(ReadwriterTestsMixin, ReusedSQLTestCase):
pass
Expand Down