Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
45 commits
Select commit Hold shift + click to select a range
5291f9a
[WIP] Migrate to Google Cloud Dataflow Client
jrmccluskey Feb 18, 2026
e66962e
Trigger relevant postcommits
jrmccluskey Feb 18, 2026
533f061
base image update
jrmccluskey Feb 18, 2026
6302b39
fix camel case
jrmccluskey Feb 18, 2026
5944415
update dataflow runner + tests
jrmccluskey Feb 18, 2026
7ae8dc3
slide import to avoid triggering unit tests
jrmccluskey Feb 18, 2026
7ad855c
yapf stuff
jrmccluskey Feb 18, 2026
d08fcb2
remove extra print
jrmccluskey Feb 18, 2026
94bc621
further spec structs, fix incorrect piplineUrl option, remove old cli…
jrmccluskey Feb 19, 2026
9bde92d
suppress line-too-longs
jrmccluskey Feb 19, 2026
00cf579
formatting
jrmccluskey Feb 19, 2026
164ef09
linting, tweak metrics tests
jrmccluskey Feb 24, 2026
716f375
Proto-specific changes to metric processing tests
jrmccluskey Feb 24, 2026
e1acf9e
try to dump logging
jrmccluskey Feb 24, 2026
d9c37b0
handle more straightforward metrics values
jrmccluskey Feb 24, 2026
f63bde2
add skips since the unit tests now depend on the proto library
jrmccluskey Feb 24, 2026
92a746f
testing if there's a disconnect between proto behavior locally and in…
jrmccluskey Feb 25, 2026
9ed32e4
correct scalar access
jrmccluskey Feb 25, 2026
fddb2d2
clean up dist accesses
jrmccluskey Feb 25, 2026
1cbcad2
linting, various fixes
jrmccluskey Feb 25, 2026
d727e6d
fix unit test setup for direct accesses
jrmccluskey Feb 25, 2026
7fbf62f
linting
jrmccluskey Feb 25, 2026
4d7b9dc
more linting
jrmccluskey Feb 26, 2026
269735a
re-enable histograms
jrmccluskey Mar 25, 2026
4ff4f73
Bump dataflow client version, restore pausing/paused concept
jrmccluskey Apr 9, 2026
4889b33
formatting
jrmccluskey Apr 9, 2026
366092d
fix enum selection
jrmccluskey Apr 16, 2026
41a8871
handle the proto hash PR
jrmccluskey May 7, 2026
9c75420
remove unnecessary try/except block
jrmccluskey May 7, 2026
feec693
re-delete old messsages
jrmccluskey May 12, 2026
3859d11
fix disk_provisioned_iops/throughput_mibps tests
jrmccluskey May 12, 2026
6a0b9ba
code bot suggestions
jrmccluskey May 12, 2026
c1650ca
revert pipeline options interaction
jrmccluskey May 12, 2026
46c08b6
leftover update
jrmccluskey May 12, 2026
e1579e2
yapf
jrmccluskey May 12, 2026
b9e2558
swap credential loading
jrmccluskey May 12, 2026
30882bb
Fix message importance parsing
jrmccluskey May 26, 2026
f794e07
fix apiclient_test case w/ cloud deps and no auth
jrmccluskey May 26, 2026
87abbca
Local runner SSL fix
jrmccluskey May 27, 2026
bfa9e77
last local change
jrmccluskey May 27, 2026
cd83ba3
apitools test fix
jrmccluskey May 27, 2026
08e9194
yapf
jrmccluskey May 27, 2026
1966b5b
Apply suggestions from code review
jrmccluskey May 27, 2026
904282a
gemini suggestions
jrmccluskey May 27, 2026
a44b251
fix message importance
jrmccluskey May 27, 2026
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions .github/trigger_files/beam_PostCommit_Python.json
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run.",
"pr": "38069",
"modification": 41
}
"modification": 40
}
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
{
"comment": "Modify this file in a trivial way to cause this test suite to run",
"modification": 2
"modification": 3
}
65 changes: 19 additions & 46 deletions sdks/python/apache_beam/runners/dataflow/dataflow_metrics.py
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ def _get_match(proto, filter_fn):


# V1b3 MetricStructuredName keys to accept and copy to the MetricKey labels.
STEP_LABEL = 'step'
STRUCTURED_NAME_LABELS = set(
['execution_step', 'original_name', 'output_user_name'])

Expand Down Expand Up @@ -112,9 +111,7 @@ def _translate_step_name(self, internal_name):
try:
step = _get_match(
self._job_graph.proto.steps, lambda x: x.name == internal_name)
user_step_name = _get_match(
step.properties.additionalProperties,
lambda x: x.key == 'user_name').value.string_value
user_step_name = step.properties.get('user_name')
except ValueError:
pass # Exception is handled below.
if not user_step_name:
Expand All @@ -135,24 +132,19 @@ def _get_metric_key(self, metric):
# step name (only happens for unstructured-named metrics).
# 2. Unable to unpack [step] or [namespace]; which should only happen
# for unstructured names.
step = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == STEP_LABEL).value
step = metric.name.context['step']
step = self._translate_step_name(step)
except ValueError:
except (KeyError, ValueError):
pass

namespace = "dataflow/v1b3" # Try to extract namespace or add a default.
try:
namespace = _get_match(
metric.name.context.additionalProperties,
lambda x: x.key == 'namespace').value
except ValueError:
pass
carried_namespace = metric.name.context.get('namespace', None)
if carried_namespace:
namespace = carried_namespace

for kv in metric.name.context.additionalProperties:
if kv.key in STRUCTURED_NAME_LABELS:
labels[kv.key] = kv.value
for key in metric.name.context:
if key in STRUCTURED_NAME_LABELS:
labels[key] = metric.name.context[key]
# Package everything besides namespace and name the labels as well,
# including unmodified step names to assist in integration the exact
# unmodified values which come from dataflow.
Expand Down Expand Up @@ -185,10 +177,7 @@ def _populate_metrics(self, response, result, user_metrics=False):
# in the service.
# The second way is only useful for the UI, and should be ignored.
continue
is_tentative = [
prop for prop in metric.name.context.additionalProperties
if prop.key == 'tentative' and prop.value == 'true'
]
is_tentative = metric.name.context.get('tentative') == 'true'
tentative_or_committed = 'tentative' if is_tentative else 'committed'

metric_key = self._get_metric_key(metric)
Expand All @@ -209,32 +198,16 @@ def _get_metric_value(self, metric):
return None

if metric.scalar is not None:
return metric.scalar.integer_value
# This will always be a single value if there is any data in the field.
val = metric.scalar
if isinstance(val, float) and val.is_integer():
return int(val)
return val
elif metric.distribution is not None:
dist_count = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'count').value.integer_value
dist_min = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'min').value.integer_value
dist_max = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'max').value.integer_value
dist_sum = _get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.integer_value
if dist_sum is None:
# distribution metric is not meant to use on large values, but in case
# it is, the value can overflow and become double_value, the correctness
# of the value may not be guaranteed.
_LOGGER.info(
"Distribution metric sum value seems to have "
"overflowed integer_value range, the correctness of sum or mean "
"value may not be guaranteed: %s" % metric.distribution)
dist_sum = int(
_get_match(
metric.distribution.object_value.properties,
lambda x: x.key == 'sum').value.double_value)
dist_count = int(metric.distribution['count'])
dist_min = int(metric.distribution['min'])
dist_max = int(metric.distribution['max'])
dist_sum = int(metric.distribution['sum'])
return DistributionResult(
DistributionData(dist_sum, dist_count, dist_min, dist_max))
Comment thread
jrmccluskey marked this conversation as resolved.
#TODO(https://github.com/apache/beam/issues/31788) support StringSet after
Expand Down
Loading
Loading