Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@
## New Features / Improvements

* X feature added (Java/Python) ([#X](https://github.com/apache/beam/issues/X)).
* Python SDK now honors the `disableCounterMetrics`, `disableStringSetMetrics`, and `disableBoundedTrieMetrics` pipeline experiments to opt out of emitting the corresponding user metric kinds, matching Java SDK behavior ([#38746](https://github.com/apache/beam/issues/38746)).

## Breaking Changes

Expand Down
65 changes: 61 additions & 4 deletions sdks/python/apache_beam/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -46,18 +46,60 @@
from apache_beam.metrics.metricbase import Histogram
from apache_beam.metrics.metricbase import MetricName
from apache_beam.metrics.metricbase import StringSet
from apache_beam.options.pipeline_options import DebugOptions

if TYPE_CHECKING:
from apache_beam.internal.metrics.metric import MetricLogger
from apache_beam.metrics.execution import MetricKey
from apache_beam.metrics.metricbase import Metric
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.utils.histogram import BucketType

__all__ = ['Metrics', 'MetricsFilter', 'Lineage']

_LOGGER = logging.getLogger(__name__)


class MetricsFlag(object):
"""Process-wide flags controlling which user metric kinds are emitted.

Mirrors the Java SDK ``Metrics.MetricsFlag`` behavior. The flags are read
once at worker harness initialization from pipeline experiments and apply
for the lifetime of the worker. Exposed as public class attributes so
Delegating* gates can read them with a single attribute load on the hot
path of metric emission.
"""
counter_disabled = False
string_set_disabled = False
bounded_trie_disabled = False
_initialized = False

@classmethod
def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None:
"""Initialize the flags from pipeline experiments. Idempotent."""
if cls._initialized:
return
debug_options = options.view_as(DebugOptions)
if debug_options.lookup_experiment('disableCounterMetrics'):
cls.counter_disabled = True
_LOGGER.info('Counter metrics are disabled.')
if debug_options.lookup_experiment('disableStringSetMetrics'):
cls.string_set_disabled = True
_LOGGER.info('StringSet metrics are disabled.')
if debug_options.lookup_experiment('disableBoundedTrieMetrics'):
cls.bounded_trie_disabled = True
_LOGGER.info('BoundedTrie metrics are disabled.')
cls._initialized = True

@classmethod
def reset(cls) -> None:
"""Reset flags. Test-only."""
cls.counter_disabled = False
cls.string_set_disabled = False
cls.bounded_trie_disabled = False
cls._initialized = False


class Metrics(object):
"""Lets users create/access metric objects during pipeline execution."""
@staticmethod
Expand Down Expand Up @@ -204,12 +246,17 @@ class DelegatingCounter(Counter):
def __init__(
self, metric_name: MetricName, process_wide: bool = False) -> None:
super().__init__(metric_name)
self.inc = MetricUpdater( # type: ignore[method-assign]
self._updater = MetricUpdater(
cells.CounterCell,
metric_name,
default_value=1,
process_wide=process_wide)

def inc(self, n: int = 1) -> None:
if MetricsFlag.counter_disabled:
return
self._updater(n)
Comment on lines +255 to +258
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Access the counter_disabled class attribute directly instead of calling a classmethod getter to avoid method call overhead on the hot path.

Suggested change
def inc(self, n: int = 1) -> None:
if MetricsFlag.counter_disabled():
return
self._updater(n)
def inc(self, n: int = 1) -> None:
if MetricsFlag.counter_disabled:
return
self._updater(n)


class DelegatingDistribution(Distribution):
"""Metrics Distribution Delegates functionality to MetricsEnvironment."""
def __init__(
Expand All @@ -231,13 +278,23 @@ class DelegatingStringSet(StringSet):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign]
self._updater = MetricUpdater(cells.StringSetCell, metric_name)

def add(self, value: str) -> None:
if MetricsFlag.string_set_disabled:
return
self._updater(value)

class DelegatingBoundedTrie(BoundedTrie):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign]
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)

def add(self, value) -> None:
if MetricsFlag.bounded_trie_disabled:
return
self._updater(value)
Comment on lines +283 to +297
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Access the string_set_disabled and bounded_trie_disabled class attributes directly instead of calling classmethod getters to avoid method call overhead on the hot path.

Suggested change
def add(self, value: str) -> None:
if MetricsFlag.string_set_disabled():
return
self._updater(value)
class DelegatingBoundedTrie(BoundedTrie):
"""Metrics StringSet that Delegates functionality to MetricsEnvironment."""
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign]
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
def add(self, value) -> None:
if MetricsFlag.bounded_trie_disabled():
return
self._updater(value)
def add(self, value: str) -> None:
if MetricsFlag.string_set_disabled:
return
self._updater(value)
class DelegatingBoundedTrie(BoundedTrie):
"""Metrics BoundedTrie that Delegates functionality to MetricsEnvironment."""
def __init__(self, metric_name: MetricName) -> None:
super().__init__(metric_name)
self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name)
def add(self, value) -> None:
if MetricsFlag.bounded_trie_disabled:
return
self._updater(value)



class MetricResults(object):
Expand Down
74 changes: 74 additions & 0 deletions sdks/python/apache_beam/metrics/metric_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,9 @@
from apache_beam.metrics.metric import MetricResults
from apache_beam.metrics.metric import Metrics
from apache_beam.metrics.metric import MetricsFilter
from apache_beam.metrics.metric import MetricsFlag
from apache_beam.metrics.metricbase import MetricName
from apache_beam.options.pipeline_options import PipelineOptions
from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner
from apache_beam.runners.worker import statesampler
from apache_beam.testing.metric_result_matchers import DistributionMatcher
Expand Down Expand Up @@ -121,6 +123,78 @@ def test_get_namespace_error(self):
with self.assertRaises(ValueError):
Metrics.get_namespace(object())

def test_metrics_flag(self):
"""Mirrors Java MetricsTest.testMetricsFlag for the three disable* experiments."""
MetricsFlag.reset()
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)

options = PipelineOptions(['--experiments=disableCounterMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
options = PipelineOptions([
'--experiments=disableCounterMetrics',
'--experiments=disableStringSetMetrics',
'--experiments=disableBoundedTrieMetrics',
])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)

MetricsFlag.reset()
Comment on lines +126 to +164
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

medium

Update the tests to assert on the public class attributes directly instead of calling the removed getter methods.

  def test_metrics_flag(self):
    """Mirrors Java MetricsTest.testMetricsFlag for the three disable* experiments."""
    MetricsFlag.reset()
    self.assertFalse(MetricsFlag.counter_disabled)
    self.assertFalse(MetricsFlag.string_set_disabled)
    self.assertFalse(MetricsFlag.bounded_trie_disabled)

    options = PipelineOptions(['--experiments=disableCounterMetrics'])
    MetricsFlag.set_default_pipeline_options(options)
    self.assertTrue(MetricsFlag.counter_disabled)
    self.assertFalse(MetricsFlag.string_set_disabled)
    self.assertFalse(MetricsFlag.bounded_trie_disabled)

    MetricsFlag.reset()
    options = PipelineOptions(['--experiments=disableStringSetMetrics'])
    MetricsFlag.set_default_pipeline_options(options)
    self.assertFalse(MetricsFlag.counter_disabled)
    self.assertTrue(MetricsFlag.string_set_disabled)
    self.assertFalse(MetricsFlag.bounded_trie_disabled)

    MetricsFlag.reset()
    options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
    MetricsFlag.set_default_pipeline_options(options)
    self.assertFalse(MetricsFlag.counter_disabled)
    self.assertFalse(MetricsFlag.string_set_disabled)
    self.assertTrue(MetricsFlag.bounded_trie_disabled)

    MetricsFlag.reset()
    options = PipelineOptions([
        '--experiments=disableCounterMetrics',
        '--experiments=disableStringSetMetrics',
        '--experiments=disableBoundedTrieMetrics',
    ])
    MetricsFlag.set_default_pipeline_options(options)
    self.assertTrue(MetricsFlag.counter_disabled)
    self.assertTrue(MetricsFlag.string_set_disabled)
    self.assertTrue(MetricsFlag.bounded_trie_disabled)

    MetricsFlag.reset()


def test_disabled_counter_is_noop(self):
MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableCounterMetrics'])
MetricsFlag.set_default_pipeline_options(options)
try:
counter = Metrics.counter('ns', 'disabled_counter')
counter.inc()
counter.inc(5)
counter.dec()
finally:
MetricsFlag.reset()

def test_disabled_string_set_is_noop(self):
MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
MetricsFlag.set_default_pipeline_options(options)
try:
string_set = Metrics.string_set('ns', 'disabled_set')
string_set.add('value')
finally:
MetricsFlag.reset()

def test_disabled_bounded_trie_is_noop(self):
MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
MetricsFlag.set_default_pipeline_options(options)
try:
bounded_trie = Metrics.bounded_trie('ns', 'disabled_trie')
bounded_trie.add(['a', 'b'])
finally:
MetricsFlag.reset()

def test_counter_empty_name(self):
with self.assertRaises(ValueError):
Metrics.counter("namespace", "")
Expand Down
2 changes: 2 additions & 0 deletions sdks/python/apache_beam/runners/worker/sdk_worker_main.py
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@

from apache_beam.internal import pickler
from apache_beam.io import filesystems
from apache_beam.metrics import metric
from apache_beam.options.pipeline_options import DebugOptions
from apache_beam.options.pipeline_options import GoogleCloudOptions
from apache_beam.options.pipeline_options import PipelineOptions
Expand Down Expand Up @@ -123,6 +124,7 @@ def create_harness(environment, dry_run=False):
RuntimeValueProvider.set_runtime_options(pipeline_options_dict)
sdk_pipeline_options = PipelineOptions.from_dictionary(pipeline_options_dict)
filesystems.FileSystems.set_options(sdk_pipeline_options)
metric.MetricsFlag.set_default_pipeline_options(sdk_pipeline_options)
pickle_library = sdk_pipeline_options.view_as(SetupOptions).pickle_library
pickler.set_library(pickle_library)

Expand Down
Loading