Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
50 changes: 38 additions & 12 deletions netboxlabs/diode/sdk/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,12 @@
_INGEST_SCOPE = "diode:ingest"
_LOGGER = logging.getLogger(__name__)
_MAX_RETRIES_ENVVAR_NAME = "DIODE_MAX_AUTH_RETRIES"
# server policy (MinTime 10s so client pings must be >= 10s, e.g. 30s interval).
_GRPC_KEEPALIVE_TIME_MS = 30_000
_GRPC_KEEPALIVE_TIMEOUT_MS = 10_000
_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS = 1
# 0 = no cap on keepalive pings without data (matches reconciler Python client).
_GRPC_HTTP2_MAX_PINGS_WITHOUT_DATA = 0


def load_dryrun_entities(file_path: str | Path) -> Iterable[Entity]:
Expand Down Expand Up @@ -138,6 +144,32 @@ def _get_optional_config_value(
return value


def _otlp_grpc_channel_options(primary_user_agent_value: str) -> list[tuple[str, Any]]:
"""
Build gRPC channel argument list for generic OTLP collectors (user-agent only).

Avoid aggressive HTTP/2 keepalive here: many OTLP backends enforce strict ping
limits and may GOAWAY idle exporters when permit-without-stream or unlimited
pings are enabled.
"""
return [
("grpc.primary_user_agent", primary_user_agent_value),
]


def _diode_ingest_grpc_channel_options(primary_user_agent_value: str) -> list[tuple[str, Any]]:
"""Build gRPC channel argument list for the Diode ingester API (with keepalive)."""
return _otlp_grpc_channel_options(primary_user_agent_value) + [
("grpc.keepalive_time_ms", _GRPC_KEEPALIVE_TIME_MS),
("grpc.keepalive_timeout_ms", _GRPC_KEEPALIVE_TIMEOUT_MS),
(
"grpc.keepalive_permit_without_calls",
_GRPC_KEEPALIVE_PERMIT_WITHOUT_CALLS,
),
("grpc.http2.max_pings_without_data", _GRPC_HTTP2_MAX_PINGS_WITHOUT_DATA),
]


def _get_proxy_env_var(var_name: str) -> str | None:
"""Get proxy environment variable (case-insensitive)."""
value = os.getenv(var_name.upper())
Expand Down Expand Up @@ -335,12 +367,9 @@ def __init__(

self._authenticate(_INGEST_SCOPE)

channel_opts = [
(
"grpc.primary_user_agent",
f"{self._name}/{self._version} {self._app_name}/{self._app_version}",
),
]
channel_opts = _diode_ingest_grpc_channel_options(
f"{self._name}/{self._version} {self._app_name}/{self._app_version}"
)

proxy_url = _get_grpc_proxy_url(self._target, self._tls_verify)
if proxy_url:
Expand Down Expand Up @@ -631,12 +660,9 @@ def __init__(
else None
)

channel_opts = [
(
"grpc.primary_user_agent",
f"{self._name}/{self._version} {self._app_name}/{self._app_version}",
),
]
channel_opts = _otlp_grpc_channel_options(
f"{self._name}/{self._version} {self._app_name}/{self._app_version}"
)

proxy_url = _get_grpc_proxy_url(self._target, self._tls_verify)
if proxy_url:
Expand Down
44 changes: 34 additions & 10 deletions tests/test_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@
DiodeMethodClientInterceptor,
DiodeOTLPClient,
_ClientCallDetails,
_diode_ingest_grpc_channel_options,
_DiodeAuthentication,
_get_sentry_dsn,
_load_certs,
_otlp_grpc_channel_options,
load_dryrun_entities,
parse_target,
)
Expand Down Expand Up @@ -268,11 +270,10 @@ def test_insecure_channel_options_with_primary_user_agent(mock_diode_authenticat

mock_insecure_channel.assert_called_once()
_, kwargs = mock_insecure_channel.call_args
assert kwargs["options"] == (
(
"grpc.primary_user_agent",
f"{client.name}/{client.version} {client.app_name}/{client.app_version}",
),
assert kwargs["options"] == tuple(
_diode_ingest_grpc_channel_options(
f"{client.name}/{client.version} {client.app_name}/{client.app_version}"
)
)


Expand All @@ -289,11 +290,10 @@ def test_secure_channel_options_with_primary_user_agent(mock_diode_authenticatio

mock_secure_channel.assert_called_once()
_, kwargs = mock_secure_channel.call_args
assert kwargs["options"] == (
(
"grpc.primary_user_agent",
f"{client.name}/{client.version} {client.app_name}/{client.app_version}",
),
assert kwargs["options"] == tuple(
_diode_ingest_grpc_channel_options(
f"{client.name}/{client.version} {client.app_name}/{client.app_version}"
)
)


Expand Down Expand Up @@ -884,6 +884,30 @@ def test_otlp_client_grpcs_uses_secure_channel():
base_channel.close.assert_called_once()


def test_otlp_insecure_channel_options_exclude_diode_keepalive():
"""OTLP targets arbitrary collectors; only user-agent is forced (Codex/OBS-2873)."""
with (
patch("netboxlabs.diode.sdk.client.grpc.insecure_channel") as mock_insecure,
patch("netboxlabs.diode.sdk.client.logs_service_pb2_grpc.LogsServiceStub"),
):
client = DiodeOTLPClient(
target="grpc://collector:4317",
app_name="orb-producer",
app_version="1.2.3",
)

mock_insecure.assert_called_once()
_, kwargs = mock_insecure.call_args
ua = (
f"{client.name}/{client.version} "
f"{client.app_name}/{client.app_version}"
)
assert kwargs["options"] == tuple(_otlp_grpc_channel_options(ua))
assert all(
opt[0] != "grpc.keepalive_time_ms" for opt in kwargs["options"]
)


def test_diode_authentication_with_custom_certificates():
"""Test _DiodeAuthentication with custom certificates - covers SSL context creation."""
# Create test certificate content
Expand Down
Loading