Skip to content

Commit 2329174

Browse files
ennuiteVando Pereira
andauthored
GH-863: [JDBC] Suppress benign exceptions from gRPC layer on ArrowFlightSqlClientHandler#close (#910)
## What's Changed When using the Flight SQL JDBC driver with connection pooling and a catalog parameter, ArrowFlightSqlClientHandler.close() performs a CloseSession RPC that can fail during gRPC channel shutdown. These transient failures (UNAVAILABLE or INTERNAL with "Connection closed after GOAWAY") cause noisy errors in pooling frameworks like Apache Commons DBCP. With this PR these exceptions will instead be suppressed and logged, following the procedure that was used for [ARROW-17785](https://issues.apache.org/jira/browse/ARROW-17785) ### Are these changes tested? Yes Closes #863 --------- Co-authored-by: Vando Pereira <vando@dremio.com>
1 parent 033ecc3 commit 2329174

File tree

2 files changed

+160
-9
lines changed

2 files changed

+160
-9
lines changed

flight/flight-sql-jdbc-core/src/main/java/org/apache/arrow/driver/jdbc/client/ArrowFlightSqlClientHandler.java

Lines changed: 72 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -262,15 +262,85 @@ public FlightInfo getInfo(final String query) {
262262
@Override
263263
public void close() throws SQLException {
264264
if (catalog.isPresent()) {
265-
sqlClient.closeSession(new CloseSessionRequest(), getOptions());
265+
try {
266+
sqlClient.closeSession(new CloseSessionRequest(), getOptions());
267+
} catch (FlightRuntimeException fre) {
268+
handleBenignCloseException(
269+
fre, "Failed to close Flight SQL session.", "closing Flight SQL session");
270+
}
266271
}
267272
try {
268273
AutoCloseables.close(sqlClient);
274+
} catch (FlightRuntimeException fre) {
275+
handleBenignCloseException(
276+
fre, "Failed to clean up client resources.", "closing Flight SQL client");
269277
} catch (final Exception e) {
270278
throw new SQLException("Failed to clean up client resources.", e);
271279
}
272280
}
273281

282+
/**
283+
* Handles FlightRuntimeException during close operations, suppressing benign gRPC shutdown errors
284+
* while re-throwing genuine failures.
285+
*
286+
* @param fre the FlightRuntimeException to handle
287+
* @param sqlErrorMessage the SQLException message to use for genuine failures
288+
* @param operationDescription description of the operation for logging
289+
* @throws SQLException if the exception represents a genuine failure
290+
*/
291+
private void handleBenignCloseException(
292+
FlightRuntimeException fre, String sqlErrorMessage, String operationDescription)
293+
throws SQLException {
294+
if (isBenignCloseException(fre)) {
295+
logSuppressedCloseException(fre, operationDescription);
296+
} else {
297+
throw new SQLException(sqlErrorMessage, fre);
298+
}
299+
}
300+
301+
/**
302+
* Handles FlightRuntimeException during close operations, suppressing benign gRPC shutdown errors
303+
* while re-throwing genuine failures as FlightRuntimeException.
304+
*
305+
* @param fre the FlightRuntimeException to handle
306+
* @param operationDescription description of the operation for logging
307+
* @throws FlightRuntimeException if the exception represents a genuine failure
308+
*/
309+
private void handleBenignCloseException(FlightRuntimeException fre, String operationDescription)
310+
throws FlightRuntimeException {
311+
if (isBenignCloseException(fre)) {
312+
logSuppressedCloseException(fre, operationDescription);
313+
} else {
314+
throw fre;
315+
}
316+
}
317+
318+
/**
319+
* Determines if a FlightRuntimeException represents a benign close operation error that should be
320+
* suppressed.
321+
*
322+
* @param fre the FlightRuntimeException to check
323+
* @return true if the exception should be suppressed, false otherwise
324+
*/
325+
private boolean isBenignCloseException(FlightRuntimeException fre) {
326+
return fre.status().code().equals(FlightStatusCode.UNAVAILABLE)
327+
|| (fre.status().code().equals(FlightStatusCode.INTERNAL)
328+
&& fre.getMessage() != null
329+
&& fre.getMessage().contains("Connection closed after GOAWAY"));
330+
}
331+
332+
/**
333+
* Logs a suppressed close exception with appropriate level based on debug settings.
334+
*
335+
* @param fre the FlightRuntimeException being suppressed
336+
* @param operationDescription description of the operation for logging
337+
*/
338+
private void logSuppressedCloseException(
339+
FlightRuntimeException fre, String operationDescription) {
340+
// ARROW-17785 and GH-863: suppress exceptions caused by flaky gRPC layer during shutdown
341+
LOGGER.debug("Suppressed error {}", operationDescription, fre);
342+
}
343+
274344
/** A prepared statement handler. */
275345
public interface PreparedStatement extends AutoCloseable {
276346
/**
@@ -386,14 +456,7 @@ public void close() {
386456
try {
387457
preparedStatement.close(getOptions());
388458
} catch (FlightRuntimeException fre) {
389-
// ARROW-17785: suppress exceptions caused by flaky gRPC layer
390-
if (fre.status().code().equals(FlightStatusCode.UNAVAILABLE)
391-
|| (fre.status().code().equals(FlightStatusCode.INTERNAL)
392-
&& fre.getMessage().contains("Connection closed after GOAWAY"))) {
393-
LOGGER.warn("Supressed error closing PreparedStatement", fre);
394-
return;
395-
}
396-
throw fre;
459+
handleBenignCloseException(fre, "closing PreparedStatement");
397460
}
398461
}
399462
};
Lines changed: 88 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,88 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one or more
3+
* contributor license agreements. See the NOTICE file distributed with
4+
* this work for additional information regarding copyright ownership.
5+
* The ASF licenses this file to You under the Apache License, Version 2.0
6+
* (the "License"); you may not use this file except in compliance with
7+
* the License. You may obtain a copy of the License at
8+
*
9+
* http://www.apache.org/licenses/LICENSE-2.0
10+
*
11+
* Unless required by applicable law or agreed to in writing, software
12+
* distributed under the License is distributed on an "AS IS" BASIS,
13+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14+
* See the License for the specific language governing permissions and
15+
* limitations under the License.
16+
*/
17+
package org.apache.arrow.driver.jdbc.client;
18+
19+
import static org.junit.jupiter.api.Assertions.assertDoesNotThrow;
20+
import static org.junit.jupiter.api.Assertions.assertThrows;
21+
import static org.mockito.ArgumentMatchers.any;
22+
import static org.mockito.Mockito.doThrow;
23+
import static org.mockito.Mockito.mock;
24+
25+
import java.sql.SQLException;
26+
import java.util.ArrayList;
27+
import java.util.Collection;
28+
import java.util.Optional;
29+
import org.apache.arrow.flight.CallOption;
30+
import org.apache.arrow.flight.CallStatus;
31+
import org.apache.arrow.flight.CloseSessionRequest;
32+
import org.apache.arrow.flight.FlightStatusCode;
33+
import org.apache.arrow.flight.sql.FlightSqlClient;
34+
import org.junit.jupiter.params.ParameterizedTest;
35+
import org.junit.jupiter.params.provider.MethodSource;
36+
37+
public class ArrowFlightSqlClientHandlerTest {
38+
39+
@ParameterizedTest
40+
@MethodSource
41+
public void testCloseHandlesFlightRuntimeException(
42+
boolean throwFromCloseSession, CallStatus callStatus, boolean shouldSuppress)
43+
throws Exception {
44+
FlightSqlClient sqlClient = mock(FlightSqlClient.class);
45+
String cacheKey = "cacheKey";
46+
Optional<String> catalog =
47+
throwFromCloseSession ? Optional.of("test_catalog") : Optional.empty();
48+
final Collection<CallOption> credentialOptions = new ArrayList<>();
49+
ArrowFlightSqlClientHandler.Builder builder = new ArrowFlightSqlClientHandler.Builder();
50+
51+
if (throwFromCloseSession) {
52+
doThrow(callStatus.toRuntimeException())
53+
.when(sqlClient)
54+
.closeSession(any(CloseSessionRequest.class), any(CallOption[].class));
55+
} else {
56+
doThrow(callStatus.toRuntimeException()).when(sqlClient).close();
57+
}
58+
59+
ArrowFlightSqlClientHandler sqlClientHandler =
60+
new ArrowFlightSqlClientHandler(
61+
cacheKey, sqlClient, builder, credentialOptions, catalog, null);
62+
63+
if (shouldSuppress) {
64+
assertDoesNotThrow(sqlClientHandler::close);
65+
} else {
66+
assertThrows(SQLException.class, sqlClientHandler::close);
67+
}
68+
}
69+
70+
private static Object[] testCloseHandlesFlightRuntimeException() {
71+
CallStatus benignInternalError =
72+
new CallStatus(FlightStatusCode.INTERNAL, null, "Connection closed after GOAWAY", null);
73+
CallStatus notBenignInternalError =
74+
new CallStatus(FlightStatusCode.INTERNAL, null, "Not a benign internal error", null);
75+
CallStatus unavailableError = new CallStatus(FlightStatusCode.UNAVAILABLE, null, null, null);
76+
CallStatus unknownError = new CallStatus(FlightStatusCode.UNKNOWN, null, null, null);
77+
return new Object[] {
78+
new Object[] {true, benignInternalError, true},
79+
new Object[] {false, benignInternalError, true},
80+
new Object[] {true, notBenignInternalError, false},
81+
new Object[] {false, notBenignInternalError, false},
82+
new Object[] {true, unavailableError, true},
83+
new Object[] {false, unavailableError, true},
84+
new Object[] {true, unknownError, false},
85+
new Object[] {false, unknownError, false},
86+
};
87+
}
88+
}

0 commit comments

Comments
 (0)