[SPARK-55275] SQL State Coverage: IllegalStateException#54056
[SPARK-55275] SQL State Coverage: IllegalStateException#54056garlandz-db wants to merge 6 commits intoapache:masterfrom
Conversation
JIRA Issue Information=== Improvement SPARK-55275 === This comment was automatically generated by GitHub Actions |
6d21971 to
ae01b8c
Compare
...t/server/src/main/scala/org/apache/spark/sql/connect/execution/ExecuteResponseObserver.scala
Outdated
Show resolved
Hide resolved
0676a45 to
3432de2
Compare
...nnect/server/src/main/scala/org/apache/spark/sql/connect/pipelines/PipelineEventSender.scala
Outdated
Show resolved
Hide resolved
...erver/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/planner/IllegalStateErrors.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
Outdated
Show resolved
Hide resolved
...erver/src/main/scala/org/apache/spark/sql/connect/planner/StreamingQueryListenerHelper.scala
Outdated
Show resolved
Hide resolved
...onnect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
Outdated
Show resolved
Hide resolved
...onnect/server/src/main/scala/org/apache/spark/sql/connect/service/ExecuteEventsManager.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/service/SessionHolder.scala
Outdated
Show resolved
Hide resolved
| @@ -90,7 +90,7 @@ class SparkDeclarativePipelinesServerTest extends SparkConnectServerTest with St | |||
| if (iter.hasNext) { | |||
| iter.next() | |||
| } else { | |||
| throw new IllegalStateException(s"Invalid response: $iter") | |||
| throw IllegalStateErrors.noBatchesAvailable() | |||
...server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
Outdated
Show resolved
Hide resolved
aa03ab3 to
3ad3fb4
Compare
| @@ -332,7 +333,7 @@ case class SessionHolder(userId: String, sessionId: String, session: SparkSessio | |||
| // called only once, since removing the session from SparkConnectSessionManager.sessionStore is | |||
| // synchronized and guaranteed to happen only once. | |||
| if (closedTimeMs.isDefined) { | |||
| throw new IllegalStateException(s"Session $key is already closed.") | |||
| throw IllegalStateErrors.sessionAlreadyClosed(sessionId) | |||
| "value" -> value, | ||
| "context" -> context)) | ||
|
|
||
| def cleanerAlreadySet(queryKey: String): SparkIllegalStateException = |
There was a problem hiding this comment.
I was wondering if it makes sense to have two message parameters—userId and sessionId—instead of queryKey.
There was a problem hiding this comment.
the query id and run id are whats previously thrown. we can also include that
3ad3fb4 to
088ae2c
Compare
| sessionId: $sessionId with status ${status} | ||
| is not within statuses $validStatuses for event $eventStatus | ||
| """) | ||
| throw IllegalStateErrors.sessionStateTransitionInvalid(status.toString, eventStatus.toString) |
There was a problem hiding this comment.
sessionId and validStatuses are missing?
28c7b12 to
684123f
Compare
|
@heyihong thanks for the reviews. made the exceptions pass the sessionHolder key and executionHolder key where possible to populate the exception with more information |
684123f to
c0ef91a
Compare
heyihong
left a comment
There was a problem hiding this comment.
Overall, it looks good. I left a few more comments.
| errorClass = "SPARK_CONNECT_ILLEGAL_STATE." + | ||
| "STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_OPERATION_STATUS_MISMATCH", | ||
| messageParameters = Map( | ||
| "operationId" -> executeHolder.operationId, |
There was a problem hiding this comment.
The function or method parameters should accept the most general abstraction. In this case, we should pass the operationId string instead of executeHolder (consider that there might be another use case that wants to reuse this code but only has access to operationId and not executeHolder).
| errorClass = "SPARK_CONNECT_ILLEGAL_STATE." + | ||
| "STATE_CONSISTENCY_EXECUTION_STATE_TRANSITION_INVALID_SESSION_NOT_STARTED", | ||
| messageParameters = Map( | ||
| "sessionId" -> sessionHolder.sessionId, |
There was a problem hiding this comment.
Ditto. Consider passing sessionId instead of sessionHolder.
| "EXECUTION_STATE_EXECUTE_HOLDER_ALREADY_EXISTS_GRAPH", | ||
| messageParameters = Map("graphId" -> graphId)) | ||
|
|
||
| def sessionAlreadyClosed(sessionHolder: SessionHolder): SparkIllegalStateException = |
There was a problem hiding this comment.
Ditto. Consider passing sessionKey instead of sessionHolder.
| errorClass = "SPARK_CONNECT_ILLEGAL_STATE.SESSION_MANAGEMENT_SESSION_ALREADY_CLOSED", | ||
| messageParameters = Map("key" -> sessionHolder.key.toString)) | ||
|
|
||
| def operationOrphaned(executeHolder: ExecuteHolder): SparkIllegalStateException = |
There was a problem hiding this comment.
ditto. Consider passing executeKey instead of executeHolder.
| messageParameters = Map("key" -> executeHolder.key.toString)) | ||
|
|
||
| def sessionStateTransitionInvalid( | ||
| sessionHolder: SessionHolder, |
| messageParameters = Map.empty) | ||
|
|
||
| def streamingQueryUnexpectedReturnValue( | ||
| sessionHolder: SessionHolder, |
| "value" -> value.toString, | ||
| "context" -> context)) | ||
|
|
||
| def cleanerAlreadySet( |
| "key" -> sessionHolder.key.toString, | ||
| "queryKey" -> queryKey.toString)) | ||
|
|
||
| def eventSendAfterShutdown(sessionHolder: SessionHolder): SparkIllegalStateException = |
c0ef91a to
123d6cf
Compare
|
|
||
| def streamingQueryUnexpectedReturnValue( | ||
| key: String, | ||
| value: Any, |
|
|
||
| def cleanerAlreadySet( | ||
| key: String, | ||
| queryKey: Any): SparkIllegalStateException = |
There was a problem hiding this comment.
Any is too general; it should be at least a String, right?
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/IllegalStateErrors.scala
Outdated
Show resolved
Hide resolved
sql/connect/server/src/main/scala/org/apache/spark/sql/connect/IllegalStateErrors.scala
Outdated
Show resolved
Hide resolved
| errorClass = "SPARK_CONNECT_ILLEGAL_STATE.STATE_CONSISTENCY_CLEANER_ALREADY_SET", | ||
| messageParameters = Map( | ||
| "key" -> key, | ||
| "queryKey" -> queryKey.toString)) |
There was a problem hiding this comment.
| "queryKey" -> queryKey.toString)) | |
| "queryKey" -> queryKey)) |
...server/src/main/scala/org/apache/spark/sql/connect/planner/StreamingForeachBatchHelper.scala
Outdated
Show resolved
Hide resolved
|
Early LGTM — please fix the test failures accordingly. I also noticed a few unused subclasses (e.g., DATA_INTEGRITY_OPERATION_ID_MISMATCH, SESSION_MANAGEMENT_SERVER_SESSION_ID_CHANGED). Could you clean those up accordingly? One final thing: some error messages (e.g., “Attempting to stop the Spark Connect service that has not been started”) are being lost. Could you fix that as well? |
| @@ -46,11 +46,13 @@ object InvalidPlanInput { | |||
| InvalidPlanInput( | |||
| errorCondition = "INTERNAL_ERROR", | |||
| messageParameters = Map("message" -> message), | |||
| causeOpt = None) | |||
| causeOpt = None, | |||
| sqlState = Some("56K00")) | |||
There was a problem hiding this comment.
On second thought, it may not be a good idea to include this custom mapping in the code. It breaks the assumption that each error condition corresponds to a single SQL state. You might also need to change the error condition to a new one and store this mapping in error-conditions.json.
|
|
||
| def apply(errorCondition: String, messageParameters: Map[String, String]): InvalidPlanInput = | ||
| InvalidPlanInput( | ||
| errorCondition = errorCondition, | ||
| messageParameters = messageParameters, | ||
| causeOpt = None) | ||
| causeOpt = None, | ||
| sqlState = Some("56K00")) |
|
One good practice is to split a larger PR into smaller ones so that it is easier for reviewers to handle. In this case, it may be easier to review if we split the changes for IllegalStateExceptions and InvalidPlanInput into separate PRs. |
| @@ -455,8 +456,7 @@ object SparkConnectService extends Logging { | |||
| } | |||
|
|
|||
| if (!started) { | |||
| throw new IllegalStateException( | |||
| "Attempting to stop the Spark Connect service that has not been started.") | |||
| throw IllegalStateErrors.serviceNotStarted() | |||
There was a problem hiding this comment.
Friendly ping on this. It seems that some error messages (e.g., “Attempting to stop the Spark Connect service that has not been started”) are still being lost
There was a problem hiding this comment.
updated good catch!
733a7b5 to
4051ebc
Compare
4051ebc to
8277b78
Compare
What changes were proposed in this pull request?
Update IllegalStateException => SparkIllegalStateException in spark connect layer
Why are the changes needed?
This keeps the Spark Connect layer more traceable for errors
Does this PR introduce any user-facing change?
Yes. Changes exception type to their Spark equivalent
How was this patch tested?
Updated testing
Was this patch authored or co-authored using generative AI tooling?
Yes