Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 7 additions & 1 deletion airbyte_cdk/metrics/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,15 @@
import time
from typing import Any, Optional

from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info
from airbyte_cdk.metrics.memory import MemoryInfo, get_memory_info, get_python_heap_bytes

logger = logging.getLogger(__name__)

# Metric names
METRIC_MEMORY_USAGE_BYTES = "cdk.memory.usage_bytes"
METRIC_MEMORY_LIMIT_BYTES = "cdk.memory.limit_bytes"
METRIC_MEMORY_USAGE_PERCENT = "cdk.memory.usage_percent"
METRIC_MEMORY_PYTHON_HEAP_BYTES = "cdk.memory.python_heap_bytes"

# Default emission interval in seconds
DEFAULT_EMISSION_INTERVAL_SECONDS = 30.0
Expand Down Expand Up @@ -139,6 +140,7 @@ def emit_memory_metrics(self) -> None:
- cdk.memory.usage_bytes: Current container memory usage
- cdk.memory.limit_bytes: Container memory limit (if known)
- cdk.memory.usage_percent: Usage/limit ratio (if limit is known)
- cdk.memory.python_heap_bytes: Python heap via tracemalloc (only if CDK_TRACEMALLOC_ENABLED is set)
"""
if not self.enabled:
return
Expand All @@ -154,6 +156,10 @@ def emit_memory_metrics(self) -> None:
if info.usage_percent is not None:
self.gauge(METRIC_MEMORY_USAGE_PERCENT, info.usage_percent)

python_heap = get_python_heap_bytes()
if python_heap is not None:
self.gauge(METRIC_MEMORY_PYTHON_HEAP_BYTES, float(python_heap))

except Exception:
# Never let metric collection failures affect the sync
logger.debug("Failed to collect memory metrics", exc_info=True)
Expand Down
37 changes: 37 additions & 0 deletions airbyte_cdk/metrics/memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,14 +10,21 @@
"""

import logging
import os
import resource
import sys
import tracemalloc
from dataclasses import dataclass
from pathlib import Path
from typing import Optional

logger = logging.getLogger(__name__)

# Environment variable to opt in to tracemalloc-based Python heap metrics.
# tracemalloc.start() hooks into CPython's allocator and has ~10-30% runtime overhead,
# so it must not be enabled by default in production.
ENV_CDK_TRACEMALLOC_ENABLED = "CDK_TRACEMALLOC_ENABLED"

# cgroup v2 file paths (standard in modern K8s pods)
CGROUP_V2_MEMORY_CURRENT = Path("/sys/fs/cgroup/memory.current")
CGROUP_V2_MEMORY_MAX = Path("/sys/fs/cgroup/memory.max")
Expand Down Expand Up @@ -153,3 +160,33 @@ def get_memory_info() -> MemoryInfo:

# Fallback to rusage
return _read_rusage_memory()


def _is_tracemalloc_enabled() -> bool:
"""Return True if the CDK_TRACEMALLOC_ENABLED env var is set to a truthy value."""
return os.environ.get(ENV_CDK_TRACEMALLOC_ENABLED, "").lower() in ("1", "true", "yes")


def get_python_heap_bytes() -> Optional[int]:
"""Return Python heap size in bytes via tracemalloc, or None if not enabled.

tracemalloc hooks into CPython's allocator and has ~10-30% runtime overhead.
It is only activated when the ``CDK_TRACEMALLOC_ENABLED`` env var is set to a
truthy value (``1``, ``true``, or ``yes``).
"""
if not _is_tracemalloc_enabled():
return None

if not tracemalloc.is_tracing():
try:
tracemalloc.start()
logger.info(
"tracemalloc started (CDK_TRACEMALLOC_ENABLED is set). "
"Expect ~10-30%% runtime overhead."
)
except RuntimeError:
logger.debug("tracemalloc failed to start", exc_info=True)
return None

current, _ = tracemalloc.get_traced_memory()
return current
66 changes: 66 additions & 0 deletions unit_tests/metrics/test_memory.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,17 +4,20 @@

"""Tests for airbyte_cdk.metrics.memory module."""

import tracemalloc
from pathlib import Path
from unittest.mock import patch

import pytest

from airbyte_cdk.metrics.memory import (
ENV_CDK_TRACEMALLOC_ENABLED,
MemoryInfo,
_read_cgroup_v1_memory,
_read_cgroup_v2_memory,
_read_rusage_memory,
get_memory_info,
get_python_heap_bytes,
)


Expand Down Expand Up @@ -176,3 +179,66 @@ def test_falls_back_to_rusage(self) -> None:

assert info.usage_bytes > 0
assert info.limit_bytes is None


class TestGetPythonHeapBytes:
"""Tests for the opt-in tracemalloc-based heap metric."""

def test_returns_none_when_env_var_not_set(self) -> None:
with patch.dict("os.environ", {}, clear=True):
result = get_python_heap_bytes()
assert result is None

def test_returns_none_when_env_var_is_empty(self) -> None:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: ""}):
result = get_python_heap_bytes()
assert result is None

def test_returns_none_when_env_var_is_false(self) -> None:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "false"}):
result = get_python_heap_bytes()
assert result is None

def test_returns_bytes_when_enabled_with_true(self) -> None:
was_tracing = tracemalloc.is_tracing()
try:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "true"}):
result = get_python_heap_bytes()
assert result is not None
assert isinstance(result, int)
assert result >= 0
finally:
if not was_tracing:
tracemalloc.stop()

def test_returns_bytes_when_enabled_with_1(self) -> None:
was_tracing = tracemalloc.is_tracing()
try:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "1"}):
result = get_python_heap_bytes()
assert result is not None
assert isinstance(result, int)
finally:
if not was_tracing:
tracemalloc.stop()

def test_returns_bytes_when_enabled_with_yes(self) -> None:
was_tracing = tracemalloc.is_tracing()
try:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "yes"}):
result = get_python_heap_bytes()
assert result is not None
assert isinstance(result, int)
finally:
if not was_tracing:
tracemalloc.stop()

def test_case_insensitive(self) -> None:
was_tracing = tracemalloc.is_tracing()
try:
with patch.dict("os.environ", {ENV_CDK_TRACEMALLOC_ENABLED: "TRUE"}):
result = get_python_heap_bytes()
assert result is not None
finally:
if not was_tracing:
tracemalloc.stop()
26 changes: 26 additions & 0 deletions unit_tests/metrics/test_metrics_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -150,6 +150,32 @@ def test_skips_limit_when_unknown(self) -> None:
assert "cdk.memory.limit_bytes" not in metric_names
assert "cdk.memory.usage_percent" not in metric_names

def test_emits_python_heap_when_tracemalloc_enabled(self) -> None:
client, mock_instance = _make_enabled_client()

mock_info = MemoryInfo(usage_bytes=100_000_000, limit_bytes=200_000_000)
with (
patch("airbyte_cdk.metrics.get_memory_info", return_value=mock_info),
patch("airbyte_cdk.metrics.get_python_heap_bytes", return_value=5_000_000),
):
client.emit_memory_metrics()

gauge_calls = {call[0][0]: call[0][1] for call in mock_instance.gauge.call_args_list}
assert gauge_calls["cdk.memory.python_heap_bytes"] == 5_000_000.0

def test_skips_python_heap_when_tracemalloc_disabled(self) -> None:
client, mock_instance = _make_enabled_client()

mock_info = MemoryInfo(usage_bytes=100_000_000, limit_bytes=200_000_000)
with (
patch("airbyte_cdk.metrics.get_memory_info", return_value=mock_info),
patch("airbyte_cdk.metrics.get_python_heap_bytes", return_value=None),
):
client.emit_memory_metrics()

metric_names = [call[0][0] for call in mock_instance.gauge.call_args_list]
assert "cdk.memory.python_heap_bytes" not in metric_names

def test_noop_when_disabled(self) -> None:
client = MetricsClient()
# Should not raise
Expand Down