Skip to content

[SPARK-55275] SQL State Coverage: IllegalStateException#54056

Open
garlandz-db wants to merge 6 commits intoapache:masterfrom
garlandz-db:SPARK-55275
Open

[SPARK-55275] SQL State Coverage: IllegalStateException#54056
garlandz-db wants to merge 6 commits intoapache:masterfrom
garlandz-db:SPARK-55275

Conversation

@garlandz-db
Copy link
Contributor

@garlandz-db garlandz-db commented Jan 29, 2026

What changes were proposed in this pull request?

Update IllegalStateException => SparkIllegalStateException in spark connect layer

Why are the changes needed?

This keeps the Spark Connect layer more traceable for errors

Does this PR introduce any user-facing change?

Yes. Changes exception type to their Spark equivalent

How was this patch tested?

Updated testing

Was this patch authored or co-authored using generative AI tooling?

Yes

@github-actions
Copy link

JIRA Issue Information

=== Improvement SPARK-55275 ===
Summary: add a specific SQL state for IllegalStateException errors at ExecuteEventsManager.assertStatus to ensure reliable error reporting
Assignee: None
Status: Open
Affected: ["4.0.2"]


This comment was automatically generated by GitHub Actions

@garlandz-db garlandz-db force-pushed the SPARK-55275 branch 2 times, most recently from 6d21971 to ae01b8c Compare February 5, 2026 08:37
@@ -90,7 +90,7 @@ class SparkDeclarativePipelinesServerTest extends SparkConnectServerTest with St
if (iter.hasNext) {
iter.next()
} else {
throw new IllegalStateException(s"Invalid response: $iter")
throw IllegalStateErrors.noBatchesAvailable()
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

iter is missing?

@garlandz-db garlandz-db force-pushed the SPARK-55275 branch 3 times, most recently from aa03ab3 to 3ad3fb4 Compare February 9, 2026 13:25
@@ -332,7 +333,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio
// called only once, since removing the session from SparkConnectSessionManager.sessionStore is
// synchronized and guaranteed to happen only once.
if (closedTimeMs.isDefined) {
throw new IllegalStateException(s"Session $key is already closed.")
throw IllegalStateErrors.sessionAlreadyClosed(sessionId)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

key is missing?

"value" -> value,
"context" -> context))

def cleanerAlreadySet(queryKey: String): SparkIllegalStateException =
Copy link
Contributor

@heyihong heyihong Feb 9, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I was wondering if it makes sense to have two message parameters—userId and sessionId—instead of queryKey.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the query id and run id are whats previously thrown. we can also include that

sessionId: $sessionId with status ${status}
is not within statuses $validStatuses for event $eventStatus
""")
throw IllegalStateErrors.sessionStateTransitionInvalid(status.toString, eventStatus.toString)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sessionId and validStatuses are missing?

@garlandz-db garlandz-db requested a review from heyihong February 10, 2026 08:29
@garlandz-db garlandz-db force-pushed the SPARK-55275 branch 4 times, most recently from 28c7b12 to 684123f Compare February 10, 2026 09:12
@garlandz-db garlandz-db changed the title [SPARK-55275] SQL State Coverage: IllegalStateException [SPARK-55275] SQL State Coverage: IllegalStateException AND InvalidPlanInput Feb 10, 2026
@garlandz-db
Copy link
Contributor Author

@heyihong thanks for the reviews. made the exceptions pass the sessionHolder key and executionHolder key where possible to populate the exception with more information

Copy link
Contributor

@heyihong heyihong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall, it looks good. I left a few more comments.

errorClass = "SPARK_CONNECT_ILLEGAL_STATE." +
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_OPERATION_STATUS_MISMATCH",
messageParameters = Map(
"operationId" -> executeHolder.operationId,
Copy link
Contributor

@heyihong heyihong Feb 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The function or method parameters should accept the most general abstraction. In this case, we should pass the operationId string instead of executeHolder (consider that there might be another use case that wants to reuse this code but only has access to operationId and not executeHolder).

errorClass = "SPARK_CONNECT_ILLEGAL_STATE." +
"STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_SESSION_NOT_STARTED",
messageParameters = Map(
"sessionId" -> sessionHolder.sessionId,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Consider passing sessionId instead of sessionHolder.

"EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS_GRAPH",
messageParameters = Map("graphId" -> graphId))

def sessionAlreadyClosed(sessionHolder: SessionHolder): SparkIllegalStateException =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ditto. Consider passing sessionKey instead of sessionHolder.

errorClass = "SPARK_CONNECT_ILLEGAL_STATE.SESSION_MANAGEMENT_SESSION_ALREADY_CLOSED",
messageParameters = Map("key" -> sessionHolder.key.toString))

def operationOrphaned(executeHolder: ExecuteHolder): SparkIllegalStateException =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto. Consider passing executeKey instead of executeHolder.

messageParameters = Map("key" -> executeHolder.key.toString))

def sessionStateTransitionInvalid(
sessionHolder: SessionHolder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

messageParameters = Map.empty)

def streamingQueryUnexpectedReturnValue(
sessionHolder: SessionHolder,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"value" -> value.toString,
"context" -> context))

def cleanerAlreadySet(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

"key" -> sessionHolder.key.toString,
"queryKey" -> queryKey.toString))

def eventSendAfterShutdown(sessionHolder: SessionHolder): SparkIllegalStateException =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto


def streamingQueryUnexpectedReturnValue(
key: String,
value: Any,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the type should be Int?


def cleanerAlreadySet(
key: String,
queryKey: Any): SparkIllegalStateException =
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Any is too general; it should be at least a String, right?

errorClass = "SPARK_CONNECT_ILLEGAL_STATE.STATE_CONSISTENCY_CLEANER_ALREADY_SET",
messageParameters = Map(
"key" -> key,
"queryKey" -> queryKey.toString))
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
"queryKey" -> queryKey.toString))
"queryKey" -> queryKey))

Copy link
Contributor

@heyihong heyihong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some final comment

@heyihong
Copy link
Contributor

heyihong commented Feb 13, 2026

Early LGTM — please fix the test failures accordingly. I also noticed a few unused subclasses (e.g., DATA_INTEGRITY_OPERATION_ID_MISMATCH, SESSION_MANAGEMENT_SERVER_SESSION_ID_CHANGED). Could you clean those up accordingly? One final thing: some error messages (e.g., “Attempting to stop the Spark Connect service that has not been started”) are being lost. Could you fix that as well?

@@ -46,11 +46,13 @@ object InvalidPlanInput {
InvalidPlanInput(
errorCondition = "INTERNAL_ERROR",
messageParameters = Map("message" -> message),
causeOpt = None)
causeOpt = None,
sqlState = Some("56K00"))
Copy link
Contributor

@heyihong heyihong Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

On second thought, it may not be a good idea to include this custom mapping in the code. It breaks the assumption that each error condition corresponds to a single SQL state. You might also need to change the error condition to a new one and store this mapping in error-conditions.json.


def apply(errorCondition: String, messageParameters: Map[String, String]): InvalidPlanInput =
InvalidPlanInput(
errorCondition = errorCondition,
messageParameters = messageParameters,
causeOpt = None)
causeOpt = None,
sqlState = Some("56K00"))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ditto

@heyihong
Copy link
Contributor

One good practice is to split a larger PR into smaller ones so that it is easier for reviewers to handle. In this case, it may be easier to review if we split the changes for IllegalStateExceptions and InvalidPlanInput into separate PRs.

@@ -455,8 +456,7 @@ object SparkConnectService extends Logging {
}

if (!started) {
throw new IllegalStateException(
"Attempting to stop the Spark Connect service that has not been started.")
throw IllegalStateErrors.serviceNotStarted()
Copy link
Contributor

@heyihong heyihong Feb 13, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Friendly ping on this. It seems that some error messages (e.g., “Attempting to stop the Spark Connect service that has not been started”) are still being lost

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated good catch!

@garlandz-db garlandz-db changed the title [SPARK-55275] SQL State Coverage: IllegalStateException AND InvalidPlanInput [SPARK-55275] SQL State Coverage: IllegalStateException Feb 13, 2026
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants