-
Notifications
You must be signed in to change notification settings - Fork 12
Description
We are interested in adding a apache sedona extension to run geospatial queries on our Iceberg data lake (with BigLake rest catalog).
I'm trying to add those extensions by setting spark.jars and spark.sql.extensions but getting an error that I think it means the extensions were not loaded or aren't working properly.
OTOH the IcebergSparkSessionExtensions works, and if I just comment out the lines that add the sedona support, my non-geospatial queries run without issues.
Main doubts are: Am I doing anything wrong in the way I'm adding the sedona jars? is adding these extensions and extra jars supported?
The code
import sedona.spark.geopandas as geopandas
import pandas as pd
from google.cloud.dataproc_spark_connect import DataprocSparkSession
from google.cloud.dataproc_v1 import Session, RuntimeConfig, EnvironmentConfig, ExecutionConfig
"""Create a Dataproc Serverless session with Sedona."""
session_config = Session()
session_config.runtime_config = RuntimeConfig()
session_config.runtime_config.version = "2.2"
# Configure for BigLake Iceberg + Sedona
session_config.runtime_config.properties = {
# JAR dependencies
"spark.jars": "gs://er-dwh-dataflow-staging-uswest1/jars/sedona-spark-shaded-3.5_2.13-1.5.1.jar,gs://er-dwh-dataflow-staging-uswest1/jars/geotools-wrapper-1.5.1-28.2.jar",
# Serializer configuration
"spark.serializer": "org.apache.spark.serializer.KryoSerializer",
"spark.kryo.registrator": "org.apache.sedona.core.serde.SedonaKryoRegistrator",
# Extensions
"spark.sql.extensions": "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions;org.apache.sedona.sql.SedonaSqlExtensions",
# BigLake Iceberg REST catalog configuration
"spark.sql.defaultCatalog": "earthranger_catalog",
"spark.sql.catalog.earthranger_catalog": "org.apache.iceberg.spark.SparkCatalog",
"spark.sql.catalog.earthranger_catalog.type": "rest",
"spark.sql.catalog.earthranger_catalog.uri": "https://biglake.googleapis.com/iceberg/v1/restcatalog",
"spark.sql.catalog.earthranger_catalog.warehouse": "gs://er-dwh-uswest1/",
"spark.sql.catalog.earthranger_catalog.header.x-goog-user-project": "er-reporting-dev",
"spark.sql.catalog.earthranger_catalog.rest.auth.type": "org.apache.iceberg.gcp.auth.GoogleAuthManager",
"spark.sql.catalog.earthranger_catalog.rest-metrics-reporting-enabled": "false",
"spark.sql.catalog.earthranger_catalog.header.X-Iceberg-Access-Delegation": "vended-credentials",
# Performance optimizations
"spark.sql.adaptive.enabled": "true",
"spark.sql.adaptive.coalescePartitions.enabled": "true",
}
spark = (
DataprocSparkSession.builder
.appName("GeoPandas with Sedona Backend")
.dataprocSessionConfig(session_config)
.getOrCreate()
)
# Notice this query isn't using any sedona SQL feature yet. And it works without the sedona extension.
# Once the sedona extensions load correctly I'll update the query to use some geospatial function like ST_Area or similar
print("Querying BigLake Iceberg table: earthranger_catalog.er_warehouse.observations")
query = """
SELECT
id,
source_id,
location,
recorded_at
FROM earthranger_catalog.er_warehouse.observations
WHERE location IS NOT NULL
LIMIT 1000
"""
# Execute query (lazy - creates execution plan)
iceberg_df = spark.sql(query)
iceberg_df.show(5, truncate=False)The error
iceberg_df = spark.sql(query)
^^^^^^^^^^^^^^^^
File "/home/dev/earthranger/dwh/earthranger-warehouse-pipeline/.venv/lib/python3.12/site-packages/pyspark/sql/connect/session.py", line 550, in sql
data, properties = self.client.execute_command(cmd.command(self._client))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/dev/earthranger/dwh/earthranger-warehouse-pipeline/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 982, in execute_command
data, _, _, _, properties = self._execute_and_fetch(req)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/dev/earthranger/dwh/earthranger-warehouse-pipeline/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 1283, in _execute_and_fetch
for response in self._execute_and_fetch_as_iterator(req):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/home/dev/earthranger/dwh/earthranger-warehouse-pipeline/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 1264, in _execute_and_fetch_as_iterator
self._handle_error(error)
File "/home/dev/earthranger/dwh/earthranger-warehouse-pipeline/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 1503, in _handle_error
self._handle_rpc_error(error)
File "/home/dev/earthranger/dwh/earthranger-warehouse-pipeline/.venv/lib/python3.12/site-packages/pyspark/sql/connect/client/core.py", line 1539, in _handle_rpc_error
raise convert_exception(info, status.message) from None
pyspark.errors.exceptions.connect.SparkConnectGrpcException: (org.apache.spark.SparkException) Failed to register classes with Kryo
INFO:py4j.clientserver:Closing down clientserver connectionSession details (for one of the failed ones):
Name: sc-20251028-162446-kbpjbb
UUID: 9acb3015-2a12-49c4-af69-6f5bc625f85a
Project: er-reporting-dev
Region: us-west1
Any help is appreciated, thanks!