Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
74 changes: 37 additions & 37 deletions drift/instrumentation/urllib3/instrumentation.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@
import base64
import json
import logging
from contextvars import ContextVar
from functools import partial
from typing import Any
from urllib.parse import parse_qs, urlencode, urlparse

Expand Down Expand Up @@ -55,6 +57,10 @@ def __init__(self, message: str, method: str, url: str):
# When these are active, urllib3 should skip creating duplicate spans
HIGHER_LEVEL_HTTP_INSTRUMENTATIONS = {"RequestsInstrumentation"}

# Set to True inside the PoolManager patch so that the HTTPConnectionPool patch
# can detect the call originated from PoolManager and skip duplicate span creation.
_inside_poolmanager = ContextVar("_inside_poolmanager", default=False)

HEADER_SCHEMA_MERGES = {
"headers": SchemaMerge(match_importance=0.0),
}
Expand Down Expand Up @@ -167,6 +173,9 @@ def patched_urlopen(
# Set calling_library_context to suppress socket instrumentation warnings
# for internal socket calls made by urllib3
context_token = calling_library_context.set("urllib3")
# Signal to the HTTPConnectionPool patch that this call originated
# from PoolManager so it should skip creating a duplicate span.
pm_token = _inside_poolmanager.set(True)
try:

def original_call():
Expand All @@ -189,6 +198,7 @@ def original_call():
span_kind=OTelSpanKind.CLIENT,
)
finally:
_inside_poolmanager.reset(pm_token)
calling_library_context.reset(context_token)

module.PoolManager.urlopen = patched_urlopen
Expand Down Expand Up @@ -231,46 +241,36 @@ def patched_urlopen(
port_str = f":{port}" if port and port not in (80, 443) else ""
full_url = f"{scheme}://{host}{port_str}{url}"

# Pass through if SDK is disabled
_passthrough = partial(
original_urlopen,
pool_self,
method,
url,
body=body,
headers=headers,
retries=retries,
redirect=redirect,
assert_same_host=assert_same_host,
timeout=timeout,
pool_timeout=pool_timeout,
release_conn=release_conn,
chunked=chunked,
body_pos=body_pos,
preload_content=preload_content,
decode_content=decode_content,
**response_kw,
)

if sdk.mode == TuskDriftMode.DISABLED:
return original_urlopen(
pool_self,
method,
url,
body=body,
headers=headers,
retries=retries,
redirect=redirect,
assert_same_host=assert_same_host,
timeout=timeout,
pool_timeout=pool_timeout,
release_conn=release_conn,
chunked=chunked,
body_pos=body_pos,
preload_content=preload_content,
decode_content=decode_content,
**response_kw,
)
return _passthrough()

# PoolManager.urlopen already created the span for this request;
# skip to avoid a duplicate child span.
if _inside_poolmanager.get():
return _passthrough()

if instrumentation_self._is_already_instrumented_by_higher_level():
return original_urlopen(
pool_self,
method,
url,
body=body,
headers=headers,
retries=retries,
redirect=redirect,
assert_same_host=assert_same_host,
timeout=timeout,
pool_timeout=pool_timeout,
release_conn=release_conn,
chunked=chunked,
body_pos=body_pos,
preload_content=preload_content,
decode_content=decode_content,
**response_kw,
)
return _passthrough()

# Set calling_library_context to suppress socket instrumentation warnings
# for internal socket calls made by urllib3
Expand Down
Loading