-
Notifications
You must be signed in to change notification settings - Fork 4.6k
[Python] Honor disableCounterMetrics, disableStringSetMetrics, and disableBoundedTrieMetrics experiments #38749
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
1163c11
93c99a9
f860d86
222c8d6
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -46,18 +46,60 @@ | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from apache_beam.metrics.metricbase import Histogram | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from apache_beam.metrics.metricbase import MetricName | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from apache_beam.metrics.metricbase import StringSet | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from apache_beam.options.pipeline_options import DebugOptions | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if TYPE_CHECKING: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from apache_beam.internal.metrics.metric import MetricLogger | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from apache_beam.metrics.execution import MetricKey | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from apache_beam.metrics.metricbase import Metric | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from apache_beam.options.pipeline_options import PipelineOptions | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| from apache_beam.utils.histogram import BucketType | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| __all__ = ['Metrics', 'MetricsFilter', 'Lineage'] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _LOGGER = logging.getLogger(__name__) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class MetricsFlag(object): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Process-wide flags controlling which user metric kinds are emitted. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Mirrors the Java SDK ``Metrics.MetricsFlag`` behavior. The flags are read | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| once at worker harness initialization from pipeline experiments and apply | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| for the lifetime of the worker. Exposed as public class attributes so | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Delegating* gates can read them with a single attribute load on the hot | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| path of metric emission. | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """ | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| counter_disabled = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| string_set_disabled = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| bounded_trie_disabled = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _initialized = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @classmethod | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def set_default_pipeline_options(cls, options: 'PipelineOptions') -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Initialize the flags from pipeline experiments. Idempotent.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if cls._initialized: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| debug_options = options.view_as(DebugOptions) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if debug_options.lookup_experiment('disableCounterMetrics'): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cls.counter_disabled = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _LOGGER.info('Counter metrics are disabled.') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if debug_options.lookup_experiment('disableStringSetMetrics'): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cls.string_set_disabled = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _LOGGER.info('StringSet metrics are disabled.') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if debug_options.lookup_experiment('disableBoundedTrieMetrics'): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cls.bounded_trie_disabled = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| _LOGGER.info('BoundedTrie metrics are disabled.') | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cls._initialized = True | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @classmethod | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def reset(cls) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Reset flags. Test-only.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cls.counter_disabled = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cls.string_set_disabled = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cls.bounded_trie_disabled = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cls._initialized = False | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class Metrics(object): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Lets users create/access metric objects during pipeline execution.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| @staticmethod | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -204,12 +246,17 @@ class DelegatingCounter(Counter): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self, metric_name: MetricName, process_wide: bool = False) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| super().__init__(metric_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.inc = MetricUpdater( # type: ignore[method-assign] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._updater = MetricUpdater( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| cells.CounterCell, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| metric_name, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| default_value=1, | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| process_wide=process_wide) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def inc(self, n: int = 1) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if MetricsFlag.counter_disabled: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._updater(n) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class DelegatingDistribution(Distribution): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Metrics Distribution Delegates functionality to MetricsEnvironment.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __init__( | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
@@ -231,13 +278,23 @@ class DelegatingStringSet(StringSet): | |||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __init__(self, metric_name: MetricName) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| super().__init__(metric_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.add = MetricUpdater(cells.StringSetCell, metric_name) # type: ignore[method-assign] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._updater = MetricUpdater(cells.StringSetCell, metric_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def add(self, value: str) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if MetricsFlag.string_set_disabled: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._updater(value) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class DelegatingBoundedTrie(BoundedTrie): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Metrics StringSet that Delegates functionality to MetricsEnvironment.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| """Metrics BoundedTrie that Delegates functionality to MetricsEnvironment.""" | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def __init__(self, metric_name: MetricName) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| super().__init__(metric_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self.add = MetricUpdater(cells.BoundedTrieCell, metric_name) # type: ignore[method-assign] | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._updater = MetricUpdater(cells.BoundedTrieCell, metric_name) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| def add(self, value) -> None: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| if MetricsFlag.bounded_trie_disabled: | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| return | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| self._updater(value) | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+283
to
+297
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Access the
Suggested change
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| class MetricResults(object): | ||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -32,7 +32,9 @@ | |
| from apache_beam.metrics.metric import MetricResults | ||
| from apache_beam.metrics.metric import Metrics | ||
| from apache_beam.metrics.metric import MetricsFilter | ||
| from apache_beam.metrics.metric import MetricsFlag | ||
| from apache_beam.metrics.metricbase import MetricName | ||
| from apache_beam.options.pipeline_options import PipelineOptions | ||
| from apache_beam.runners.direct.direct_runner import BundleBasedDirectRunner | ||
| from apache_beam.runners.worker import statesampler | ||
| from apache_beam.testing.metric_result_matchers import DistributionMatcher | ||
|
|
@@ -121,6 +123,78 @@ def test_get_namespace_error(self): | |
| with self.assertRaises(ValueError): | ||
| Metrics.get_namespace(object()) | ||
|
|
||
| def test_metrics_flag(self): | ||
| """Mirrors Java MetricsTest.testMetricsFlag for the three disable* experiments.""" | ||
| MetricsFlag.reset() | ||
| self.assertFalse(MetricsFlag.counter_disabled) | ||
| self.assertFalse(MetricsFlag.string_set_disabled) | ||
| self.assertFalse(MetricsFlag.bounded_trie_disabled) | ||
|
|
||
| options = PipelineOptions(['--experiments=disableCounterMetrics']) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| self.assertTrue(MetricsFlag.counter_disabled) | ||
| self.assertFalse(MetricsFlag.string_set_disabled) | ||
| self.assertFalse(MetricsFlag.bounded_trie_disabled) | ||
|
|
||
| MetricsFlag.reset() | ||
| options = PipelineOptions(['--experiments=disableStringSetMetrics']) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| self.assertFalse(MetricsFlag.counter_disabled) | ||
| self.assertTrue(MetricsFlag.string_set_disabled) | ||
| self.assertFalse(MetricsFlag.bounded_trie_disabled) | ||
|
|
||
| MetricsFlag.reset() | ||
| options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| self.assertFalse(MetricsFlag.counter_disabled) | ||
| self.assertFalse(MetricsFlag.string_set_disabled) | ||
| self.assertTrue(MetricsFlag.bounded_trie_disabled) | ||
|
|
||
| MetricsFlag.reset() | ||
| options = PipelineOptions([ | ||
| '--experiments=disableCounterMetrics', | ||
| '--experiments=disableStringSetMetrics', | ||
| '--experiments=disableBoundedTrieMetrics', | ||
| ]) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| self.assertTrue(MetricsFlag.counter_disabled) | ||
| self.assertTrue(MetricsFlag.string_set_disabled) | ||
| self.assertTrue(MetricsFlag.bounded_trie_disabled) | ||
|
|
||
| MetricsFlag.reset() | ||
|
Comment on lines
+126
to
+164
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Update the tests to assert on the public class attributes directly instead of calling the removed getter methods. def test_metrics_flag(self):
"""Mirrors Java MetricsTest.testMetricsFlag for the three disable* experiments."""
MetricsFlag.reset()
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)
options = PipelineOptions(['--experiments=disableCounterMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)
MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableStringSetMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertFalse(MetricsFlag.bounded_trie_disabled)
MetricsFlag.reset()
options = PipelineOptions(['--experiments=disableBoundedTrieMetrics'])
MetricsFlag.set_default_pipeline_options(options)
self.assertFalse(MetricsFlag.counter_disabled)
self.assertFalse(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)
MetricsFlag.reset()
options = PipelineOptions([
'--experiments=disableCounterMetrics',
'--experiments=disableStringSetMetrics',
'--experiments=disableBoundedTrieMetrics',
])
MetricsFlag.set_default_pipeline_options(options)
self.assertTrue(MetricsFlag.counter_disabled)
self.assertTrue(MetricsFlag.string_set_disabled)
self.assertTrue(MetricsFlag.bounded_trie_disabled)
MetricsFlag.reset() |
||
|
|
||
| def test_disabled_counter_is_noop(self): | ||
| MetricsFlag.reset() | ||
| options = PipelineOptions(['--experiments=disableCounterMetrics']) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| try: | ||
| counter = Metrics.counter('ns', 'disabled_counter') | ||
| counter.inc() | ||
| counter.inc(5) | ||
| counter.dec() | ||
| finally: | ||
| MetricsFlag.reset() | ||
|
|
||
| def test_disabled_string_set_is_noop(self): | ||
| MetricsFlag.reset() | ||
| options = PipelineOptions(['--experiments=disableStringSetMetrics']) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| try: | ||
| string_set = Metrics.string_set('ns', 'disabled_set') | ||
| string_set.add('value') | ||
| finally: | ||
| MetricsFlag.reset() | ||
|
|
||
| def test_disabled_bounded_trie_is_noop(self): | ||
| MetricsFlag.reset() | ||
| options = PipelineOptions(['--experiments=disableBoundedTrieMetrics']) | ||
| MetricsFlag.set_default_pipeline_options(options) | ||
| try: | ||
| bounded_trie = Metrics.bounded_trie('ns', 'disabled_trie') | ||
| bounded_trie.add(['a', 'b']) | ||
| finally: | ||
| MetricsFlag.reset() | ||
|
|
||
| def test_counter_empty_name(self): | ||
| with self.assertRaises(ValueError): | ||
| Metrics.counter("namespace", "") | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Access the
counter_disabledclass attribute directly instead of calling a classmethod getter to avoid method call overhead on the hot path.