Skip to content

Commit d4f9054

Browse files
committed
Fix #729 and #731: Telemetry lifecycle management
1 parent 4b7df5b commit d4f9054

File tree

3 files changed

+51
-6
lines changed

3 files changed

+51
-6
lines changed

src/databricks/sql/client.py

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -306,6 +306,8 @@ def read(self) -> Optional[OAuthToken]:
306306
)
307307
self.session.open()
308308
except Exception as e:
309+
# Respect user's telemetry preference even during connection failure
310+
enable_telemetry = kwargs.get("enable_telemetry", True)
309311
TelemetryClientFactory.connection_failure_log(
310312
error_name="Exception",
311313
error_message=str(e),
@@ -316,6 +318,7 @@ def read(self) -> Optional[OAuthToken]:
316318
user_agent=self.session.useragent_header
317319
if hasattr(self, "session")
318320
else None,
321+
enable_telemetry=enable_telemetry,
319322
)
320323
raise e
321324

src/databricks/sql/common/unified_http_client.py

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -217,15 +217,15 @@ def _should_use_proxy(self, target_host: str) -> bool:
217217
logger.debug("Error checking proxy bypass for host %s: %s", target_host, e)
218218
return True
219219

220-
def _get_pool_manager_for_url(self, url: str) -> urllib3.PoolManager:
220+
def _get_pool_manager_for_url(self, url: str) -> Optional[urllib3.PoolManager]:
221221
"""
222222
Get the appropriate pool manager for the given URL.
223223
224224
Args:
225225
url: The target URL
226226
227227
Returns:
228-
PoolManager instance (either direct or proxy)
228+
PoolManager instance (either direct or proxy), or None if client is closed
229229
"""
230230
parsed_url = urllib.parse.urlparse(url)
231231
target_host = parsed_url.hostname
@@ -291,6 +291,12 @@ def request_context(
291291
# Select appropriate pool manager based on target URL
292292
pool_manager = self._get_pool_manager_for_url(url)
293293

294+
# DEFENSIVE: Check if pool_manager is None (client closing/closed)
295+
# This prevents AttributeError race condition when telemetry cleanup happens
296+
if pool_manager is None:
297+
logger.debug("HTTP client closing or closed, cannot make request to %s", url)
298+
raise RequestError("HTTP client is closing or has been closed")
299+
294300
response = None
295301

296302
try:

src/databricks/sql/telemetry/telemetry_client.py

Lines changed: 40 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -42,6 +42,7 @@
4242
from databricks.sql.common.feature_flag import FeatureFlagsContextFactory
4343
from databricks.sql.common.unified_http_client import UnifiedHttpClient
4444
from databricks.sql.common.http import HttpMethod
45+
from databricks.sql.exc import RequestError
4546
from databricks.sql.telemetry.telemetry_push_client import (
4647
ITelemetryPushClient,
4748
TelemetryPushClient,
@@ -295,7 +296,7 @@ def _send_telemetry(self, events):
295296
url,
296297
data=request.to_json(),
297298
headers=headers,
298-
timeout=900,
299+
timeout=30,
299300
)
300301

301302
future.add_done_callback(
@@ -417,10 +418,38 @@ def export_latency_log(
417418
)
418419

419420
def close(self):
420-
"""Flush remaining events before closing"""
421+
"""Flush remaining events before closing
422+
423+
IMPORTANT: This method does NOT close self._http_client.
424+
425+
Rationale:
426+
- _flush() submits async work to the executor that uses _http_client
427+
- If we closed _http_client here, async callbacks would fail with AttributeError
428+
- Instead, we let _http_client live as long as needed:
429+
* Pending futures hold references to self (via bound methods)
430+
* This keeps self alive, which keeps self._http_client alive
431+
* When all futures complete, Python GC will clean up naturally
432+
- The __del__ method ensures eventual cleanup during garbage collection
433+
434+
This design prevents race conditions while keeping telemetry truly async.
435+
"""
421436
logger.debug("Closing TelemetryClient for connection %s", self._session_id_hex)
422437
self._flush()
423438

439+
def __del__(self):
440+
"""Cleanup when TelemetryClient is garbage collected
441+
442+
This ensures _http_client is eventually closed when the TelemetryClient
443+
object is destroyed. By this point, all async work should be complete
444+
(since the futures held references keeping us alive), so it's safe to
445+
close the http client.
446+
"""
447+
try:
448+
if hasattr(self, '_http_client') and self._http_client:
449+
self._http_client.close()
450+
except Exception:
451+
pass
452+
424453

425454
class _TelemetryClientHolder:
426455
"""
@@ -674,7 +703,8 @@ def close(host_url):
674703
)
675704
try:
676705
TelemetryClientFactory._stop_flush_thread()
677-
TelemetryClientFactory._executor.shutdown(wait=True)
706+
# Use wait=False to allow process to exit immediately
707+
TelemetryClientFactory._executor.shutdown(wait=False)
678708
except Exception as e:
679709
logger.debug("Failed to shutdown thread pool executor: %s", e)
680710
TelemetryClientFactory._executor = None
@@ -689,13 +719,19 @@ def connection_failure_log(
689719
port: int,
690720
client_context,
691721
user_agent: Optional[str] = None,
722+
enable_telemetry: bool = True,
692723
):
693724
"""Send error telemetry when connection creation fails, using provided client context"""
694725

726+
# Respect user's telemetry preference - don't force-enable
727+
if not enable_telemetry:
728+
logger.debug("Telemetry disabled, skipping connection failure log")
729+
return
730+
695731
UNAUTH_DUMMY_SESSION_ID = "unauth_session_id"
696732

697733
TelemetryClientFactory.initialize_telemetry_client(
698-
telemetry_enabled=True,
734+
telemetry_enabled=enable_telemetry,
699735
session_id_hex=UNAUTH_DUMMY_SESSION_ID,
700736
auth_provider=None,
701737
host_url=host_url,

0 commit comments

Comments
 (0)