Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions sdks/python/apache_beam/internal/dill_pickler.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,8 +39,6 @@
import types
import zlib
from typing import Any
from typing import Dict
from typing import Tuple

import dill

Expand All @@ -50,7 +48,7 @@

settings = {'dill_byref': None}

patch_save_code = sys.version_info >= (3, 10) and dill.__version__ == "0.3.1.1"
patch_save_code = dill.__version__ == "0.3.1.1"
Comment thread
jrmccluskey marked this conversation as resolved.

if patch_save_code:
# The following function is based on 'save_code' from 'dill'
Expand Down Expand Up @@ -315,7 +313,7 @@ def save_module(pickler, obj):
# Pickle module dictionaries (commonly found in lambda's globals)
# by referencing their module.
old_save_module_dict = dill.dill.save_module_dict
known_module_dicts: Dict[int, Tuple[types.ModuleType, Dict[str, Any]]] = {}
known_module_dicts: dict[int, tuple[types.ModuleType, dict[str, Any]]] = {}
Comment thread
jrmccluskey marked this conversation as resolved.

@dill.dill.register(dict)
def new_save_module_dict(pickler, obj):
Expand Down
12 changes: 5 additions & 7 deletions sdks/python/apache_beam/internal/metrics/metric.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,7 @@
import threading
import time
from typing import TYPE_CHECKING
from typing import Dict
from typing import Optional
from typing import Type
from typing import Union

from apache_beam.metrics import monitoring_infos
Expand All @@ -59,7 +57,7 @@ class Metrics(object):
@staticmethod
def counter(
urn: str,
labels: Optional[Dict[str, str]] = None,
labels: Optional[dict[str, str]] = None,
Comment thread
jrmccluskey marked this conversation as resolved.
process_wide: bool = False) -> UserMetrics.DelegatingCounter:
"""Obtains or creates a Counter metric.

Expand All @@ -82,22 +80,22 @@ def counter(
class MetricLogger(object):
"""Simple object to locally aggregate and log metrics."""
def __init__(self) -> None:
self._metric: Dict[MetricName, 'MetricCell'] = {}
self._metric: dict[MetricName, 'MetricCell'] = {}
self._lock = threading.Lock()
self._last_logging_millis = int(time.time() * 1000)
self.minimum_logging_frequency_msec = 180000

def update(
self,
cell_type: Union[Type['MetricCell'], 'MetricCellFactory'],
cell_type: Union[type['MetricCell'], 'MetricCellFactory'],
metric_name: MetricName,
value: object) -> None:
cell = self._get_metric_cell(cell_type, metric_name)
cell.update(value)

def _get_metric_cell(
self,
cell_type: Union[Type['MetricCell'], 'MetricCellFactory'],
cell_type: Union[type['MetricCell'], 'MetricCellFactory'],
metric_name: MetricName) -> 'MetricCell':
with self._lock:
if metric_name not in self._metric:
Expand Down Expand Up @@ -139,7 +137,7 @@ class ServiceCallMetric(object):
def __init__(
self,
request_count_urn: str,
base_labels: Optional[Dict[str, str]] = None) -> None:
base_labels: Optional[dict[str, str]] = None) -> None:
self.base_labels = base_labels if base_labels else {}
self.request_count_urn = request_count_urn

Expand Down
12 changes: 4 additions & 8 deletions sdks/python/apache_beam/internal/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,13 +25,9 @@
import logging
import threading
import weakref
from collections.abc import Iterable
from multiprocessing.pool import ThreadPool
from typing import Any
from typing import Dict
from typing import Iterable
from typing import List
from typing import Tuple
from typing import Type
from typing import TypeVar
from typing import Union

Expand Down Expand Up @@ -68,9 +64,9 @@ def __hash__(self):

def remove_objects_from_args(
args: Iterable[Any],
kwargs: Dict[str, Any],
pvalue_class: Union[Type[T], Tuple[Type[T], ...]]
) -> Tuple[List[Any], Dict[str, Any], List[T]]:
kwargs: dict[str, Any],
pvalue_class: Union[type[T], tuple[type[T], ...]]
) -> tuple[list[Any], dict[str, Any], list[T]]:
Comment thread
jrmccluskey marked this conversation as resolved.
"""For internal use only; no backwards-compatibility guarantees.

Replaces all objects of a given type in args/kwargs with a placeholder.
Expand Down
Loading