Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -432,11 +432,12 @@ def __init__(
)
),
resource=resource,
metric_readers=metric_readers,
views=views,
)
self._metric_readers = metric_readers
self._measurement_consumer = SynchronousMeasurementConsumer(
sdk_config=self._sdk_config
sdk_config=self._sdk_config,
metric_readers=metric_readers,
)
disabled = environ.get(OTEL_SDK_DISABLED, "")
self._disabled = disabled.lower().strip() == "true"
Expand All @@ -448,7 +449,7 @@ def __init__(
self._shutdown_once = Once()
self._shutdown = False

for metric_reader in self._sdk_config.metric_readers:
for metric_reader in self._metric_readers:
with self._all_metric_readers_lock:
if metric_reader in self._all_metric_readers:
# pylint: disable=broad-exception-raised
Expand All @@ -468,7 +469,7 @@ def force_flush(self, timeout_millis: float = 10_000) -> bool:

metric_reader_error = {}

for metric_reader in self._sdk_config.metric_readers:
for metric_reader in self._metric_readers:
current_ts = time_ns()
try:
if current_ts >= deadline_ns:
Expand Down Expand Up @@ -513,7 +514,7 @@ def _shutdown():

metric_reader_error = {}

for metric_reader in self._sdk_config.metric_readers:
for metric_reader in self._metric_readers:
current_ts = time_ns()
try:
if current_ts >= deadline_ns:
Expand Down Expand Up @@ -580,3 +581,31 @@ def get_meter(
self._measurement_consumer,
)
return self._meters[info]

def add_metric_reader(
self, metric_reader: "opentelemetry.sdk.metrics.export.MetricReader"
) -> None:
with self._all_metric_readers_lock:
if metric_reader in self._all_metric_readers:
raise ValueError(
f"MetricReader {metric_reader} has been registered already!"
)
self._measurement_consumer.add_metric_reader(metric_reader)
metric_reader._set_collect_callback(
self._measurement_consumer.collect
)
self._all_metric_readers.add(metric_reader)

def remove_metric_reader(
self,
metric_reader: "opentelemetry.sdk.metrics.export.MetricReader",
) -> None:
with self._all_metric_readers_lock:
if metric_reader not in self._all_metric_readers:
raise ValueError(
f"MetricReader {metric_reader} has not been registered!"
)
self._measurement_consumer.remove_metric_reader(metric_reader)
metric_reader._set_collect_callback(None)
metric_reader.shutdown()
self._all_metric_readers.remove(metric_reader)
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,12 @@ class SynchronousMeasurementConsumer(MeasurementConsumer):
def __init__(
self,
sdk_config: "opentelemetry.sdk.metrics._internal.SdkConfiguration",
metric_readers: Iterable["opentelemetry.sdk.metrics.MetricReader"],
) -> None:
self._lock = Lock()
self._sdk_config = sdk_config
# should never be mutated
self._metric_readers = tuple(metric_readers)
self._reader_storages: Mapping[
"opentelemetry.sdk.metrics.MetricReader", MetricReaderStorage
] = {
Expand All @@ -71,7 +73,7 @@ def __init__(
reader._instrument_class_temporality,
reader._instrument_class_aggregation,
)
for reader in sdk_config.metric_readers
for reader in self._metric_readers
}
self._async_instruments: List[
"opentelemetry.sdk.metrics._internal.instrument._Asynchronous"
Expand Down Expand Up @@ -143,3 +145,27 @@ def collect(
result = self._reader_storages[metric_reader].collect()

return result

def add_metric_reader(
self, metric_reader: "opentelemetry.sdk.metrics.MetricReader"
) -> None:
"""Registers a new metric reader."""
with self._lock:
self._metric_readers += (metric_reader,)
self._reader_storages[metric_reader] = MetricReaderStorage(
self._sdk_config,
metric_reader._instrument_class_temporality,
metric_reader._instrument_class_aggregation,
)

def remove_metric_reader(
self, metric_reader: "opentelemetry.sdk.metrics.MetricReader"
) -> None:
"""Unregisters the given metric reader."""
with self._lock:
self._reader_storages.pop(metric_reader)
self._metric_readers = tuple(
reader
for reader in self._metric_readers
if reader is not metric_reader
)
Original file line number Diff line number Diff line change
Expand Up @@ -26,5 +26,4 @@
class SdkConfiguration:
exemplar_filter: "opentelemetry.sdk.metrics.ExemplarFilter"
resource: "opentelemetry.sdk.resources.Resource"
metric_readers: Sequence["opentelemetry.sdk.metrics.MetricReader"]
views: Sequence["opentelemetry.sdk.metrics.View"]
27 changes: 14 additions & 13 deletions opentelemetry-sdk/tests/metrics/test_measurement_consumer.py
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,8 @@
class TestSynchronousMeasurementConsumer(TestCase):
def test_parent(self, _):
self.assertIsInstance(
SynchronousMeasurementConsumer(MagicMock()), MeasurementConsumer
SynchronousMeasurementConsumer(MagicMock(), metric_readers=()),
MeasurementConsumer,
)

def test_creates_metric_reader_storages(self, MockMetricReaderStorage):
Expand All @@ -44,9 +45,9 @@ def test_creates_metric_reader_storages(self, MockMetricReaderStorage):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=reader_mocks,
views=Mock(),
)
),
metric_readers=reader_mocks,
)
self.assertEqual(len(MockMetricReaderStorage.mock_calls), 5)

Expand All @@ -61,9 +62,9 @@ def test_measurements_passed_to_each_reader_storage(
SdkConfiguration(
exemplar_filter=Mock(should_sample=Mock(return_value=False)),
resource=Mock(),
metric_readers=reader_mocks,
views=Mock(),
)
),
metric_readers=reader_mocks,
)
measurement_mock = Mock()
consumer.consume_measurement(measurement_mock)
Expand All @@ -83,9 +84,9 @@ def test_collect_passed_to_reader_stage(self, MockMetricReaderStorage):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=reader_mocks,
views=Mock(),
)
),
metric_readers=reader_mocks,
)
for r_mock, rs_mock in zip(reader_mocks, reader_storage_mocks):
rs_mock.collect.assert_not_called()
Expand All @@ -102,9 +103,9 @@ def test_collect_calls_async_instruments(self, MockMetricReaderStorage):
SdkConfiguration(
exemplar_filter=Mock(should_sample=Mock(return_value=False)),
resource=Mock(),
metric_readers=[reader_mock],
views=Mock(),
)
),
metric_readers=[reader_mock],
)
async_instrument_mocks = [MagicMock() for _ in range(5)]
for i_mock in async_instrument_mocks:
Expand Down Expand Up @@ -133,9 +134,9 @@ def test_collect_timeout(self, MockMetricReaderStorage):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=[reader_mock],
views=Mock(),
)
),
metric_readers=[reader_mock],
)

def sleep_1(*args, **kwargs):
Expand Down Expand Up @@ -166,9 +167,9 @@ def test_collect_deadline(
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=[reader_mock],
views=Mock(),
)
),
metric_readers=[reader_mock],
)

def sleep_1(*args, **kwargs):
Expand Down
16 changes: 0 additions & 16 deletions opentelemetry-sdk/tests/metrics/test_metric_reader_storage.py
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ def test_creates_view_instrument_matches(
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(view1, view2),
),
MagicMock(
Expand Down Expand Up @@ -146,7 +145,6 @@ def test_forwards_calls_to_view_instrument_match(
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(view1, view2),
),
MagicMock(
Expand Down Expand Up @@ -256,7 +254,6 @@ def test_race_concurrent_measurements(self, MockViewInstrumentMatch: Mock):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(view1,),
),
MagicMock(
Expand Down Expand Up @@ -291,7 +288,6 @@ def test_default_view_enabled(self, MockViewInstrumentMatch: Mock):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(),
),
MagicMock(
Expand Down Expand Up @@ -327,7 +323,6 @@ def test_drop_aggregation(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(
instrument_name="name", aggregation=DropAggregation()
Expand Down Expand Up @@ -355,7 +350,6 @@ def test_same_collection_start(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(View(instrument_name="name"),),
),
MagicMock(
Expand Down Expand Up @@ -402,7 +396,6 @@ def test_conflicting_view_configuration(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(
instrument_name="observable_counter",
Expand Down Expand Up @@ -451,7 +444,6 @@ def test_view_instrument_match_conflict_0(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="observable_counter_0", name="foo"),
View(instrument_name="observable_counter_1", name="foo"),
Expand Down Expand Up @@ -509,7 +501,6 @@ def test_view_instrument_match_conflict_1(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="bar", name="foo"),
View(instrument_name="baz", name="foo"),
Expand Down Expand Up @@ -578,7 +569,6 @@ def test_view_instrument_match_conflict_2(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="foo"),
View(instrument_name="bar"),
Expand Down Expand Up @@ -631,7 +621,6 @@ def test_view_instrument_match_conflict_3(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="bar", name="foo"),
View(instrument_name="baz", name="foo"),
Expand Down Expand Up @@ -682,7 +671,6 @@ def test_view_instrument_match_conflict_4(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="bar", name="foo"),
View(instrument_name="baz", name="foo"),
Expand Down Expand Up @@ -729,7 +717,6 @@ def test_view_instrument_match_conflict_5(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="observable_counter_0", name="foo"),
View(instrument_name="observable_counter_1", name="foo"),
Expand Down Expand Up @@ -784,7 +771,6 @@ def test_view_instrument_match_conflict_6(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="observable_counter", name="foo"),
View(instrument_name="histogram", name="foo"),
Expand Down Expand Up @@ -839,7 +825,6 @@ def test_view_instrument_match_conflict_7(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="observable_counter_0", name="foo"),
View(instrument_name="observable_counter_1", name="foo"),
Expand Down Expand Up @@ -894,7 +879,6 @@ def test_view_instrument_match_conflict_8(self):
SdkConfiguration(
exemplar_filter=Mock(),
resource=Mock(),
metric_readers=(),
views=(
View(instrument_name="up_down_counter", name="foo"),
View(
Expand Down
34 changes: 32 additions & 2 deletions opentelemetry-sdk/tests/metrics/test_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,8 @@
# limitations under the License.

# pylint: disable=protected-access,no-self-use

import weakref
from logging import WARNING
from logging import DEBUG, WARNING
from time import sleep
from typing import Iterable, Sequence
from unittest.mock import MagicMock, Mock, patch
Expand All @@ -36,6 +35,7 @@
)
from opentelemetry.sdk.metrics._internal import SynchronousMeasurementConsumer
from opentelemetry.sdk.metrics.export import (
InMemoryMetricReader,
Metric,
MetricExporter,
MetricExportResult,
Expand Down Expand Up @@ -426,6 +426,36 @@ def test_consume_measurement_gauge(self, mock_sync_measurement_consumer):

sync_consumer_instance.consume_measurement.assert_called()

def test_addition_of_metric_reader(self):
# Suppress warnings for calling collect on an unregistered metric reader
with self.assertLogs(
"opentelemetry.sdk.metrics._internal.export", DEBUG
):
reader = InMemoryMetricReader()
meter_provider = MeterProvider()
meter = meter_provider.get_meter(__name__)
counter = meter.create_counter("counter")
counter.add(1)
self.assertIsNone(reader.get_metrics_data())

meter_provider.add_metric_reader(reader)
counter.add(1)
self.assertIsNotNone(reader.get_metrics_data())

with self.assertRaises(ValueError) as cm:
meter_provider.add_metric_reader(reader)
self.assertIn(
"has been registered already!", str(cm.exception)
)

meter_provider.remove_metric_reader(reader)
counter.add(1)
self.assertIsNone(reader.get_metrics_data())

with self.assertRaises(ValueError) as cm:
meter_provider.remove_metric_reader(reader)
self.assertIn("has not been registered!", str(cm.exception))


class TestMeter(TestCase):
def setUp(self):
Expand Down
Loading