Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 5 additions & 3 deletions sdks/python/apache_beam/runners/common.pxd
Original file line number Diff line number Diff line change
Expand Up @@ -102,9 +102,11 @@ cdef class PerWindowInvoker(DoFnInvoker):
cdef dict kwargs_for_process_batch
cdef list placeholders_for_process_batch
cdef bint has_windowed_inputs
cdef bint recalculate_window_args
cdef bint has_cached_window_args
cdef bint has_cached_window_batch_args
cdef bint should_cache_args
cdef list cached_args_for_process
cdef dict cached_kwargs_for_process
cdef list cached_args_for_process_batch
cdef dict cached_kwargs_for_process_batch
cdef object process_method
cdef object process_batch_method
cdef bint is_splittable
Expand Down
46 changes: 26 additions & 20 deletions sdks/python/apache_beam/runners/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -812,16 +812,15 @@ def __init__(
self.current_window_index = None
self.stop_window_index = None

# TODO(https://github.com/apache/beam/issues/28776): Remove caching after
# fully rolling out.
# If true, always recalculate window args. If false, has_cached_window_args
# and has_cached_window_batch_args will be set to true if the corresponding
# self.args_for_process,have been updated and should be reused directly.
self.recalculate_window_args = (
self.has_windowed_inputs or 'disable_global_windowed_args_caching'
in RuntimeValueProvider.experiments)
self.has_cached_window_args = False
self.has_cached_window_batch_args = False
# If true, after the first process invocation the the args for process will be cached
# in cached_args_for_process and cached_kwargs_for_process and reused on
# subsequent invocations in the same bundle..
self.should_cache_args = (not self.has_windowed_inputs)
self.cached_args_for_process = None
self.cached_kwargs_for_process = None
# See above, similar cached args for process_batch invocations.
self.cached_args_for_process_batch = None
self.cached_kwargs_for_process_batch = None

# Try to prepare all the arguments that can just be filled in
# without any additional work. in the process function.
Expand Down Expand Up @@ -984,9 +983,9 @@ def _invoke_process_per_window(
additional_kwargs,
):
# type: (...) -> Optional[SplitResultResidual]
if self.has_cached_window_args:
if self.cached_args_for_process:
args_for_process, kwargs_for_process = (
self.args_for_process, self.kwargs_for_process)
self.cached_args_for_process, self.cached_kwargs_for_process)
else:
if self.has_windowed_inputs:
assert len(windowed_value.windows) <= 1
Expand All @@ -997,10 +996,9 @@ def _invoke_process_per_window(
side_inputs.extend(additional_args)
args_for_process, kwargs_for_process = util.insert_values_in_args(
self.args_for_process, self.kwargs_for_process, side_inputs)
if not self.recalculate_window_args:
self.args_for_process, self.kwargs_for_process = (
if self.should_cache_args:
self.cached_args_for_process, self.cached_kwargs_for_process = (
args_for_process, kwargs_for_process)
self.has_cached_window_args = True

# Extract key in the case of a stateful DoFn. Note that in the case of a
# stateful DoFn, we set during __init__ self.has_windowed_inputs to be
Expand Down Expand Up @@ -1088,9 +1086,9 @@ def _invoke_process_batch_per_window(
):
# type: (...) -> Optional[SplitResultResidual]

if self.has_cached_window_batch_args:
if self.cached_args_for_process_batch:
args_for_process_batch, kwargs_for_process_batch = (
self.args_for_process_batch, self.kwargs_for_process_batch)
self.cached_args_for_process_batch, self.cached_kwargs_for_process_batch)
else:
if self.has_windowed_inputs:
assert isinstance(windowed_batch, HomogeneousWindowedBatch)
Expand All @@ -1107,10 +1105,9 @@ def _invoke_process_batch_per_window(
side_inputs,
)
)
if not self.recalculate_window_args:
self.args_for_process_batch, self.kwargs_for_process_batch = (
if self.should_cache_args:
self.cached_args_for_process_batch, self.cached_kwargs_for_process_batch = (
args_for_process_batch, kwargs_for_process_batch)
self.has_cached_window_batch_args = True

for i, p in self.placeholders_for_process_batch:
if core.DoFn.ElementParam == p:
Expand Down Expand Up @@ -1150,6 +1147,15 @@ def _invoke_process_batch_per_window(
*args_for_process_batch, **kwargs_for_process_batch),
self.threadsafe_watermark_estimator)

def invoke_finish_bundle(self):
# type: () -> None
# Clear the cached args to allow for refreshing of side inputs across bundles.
self.cached_args_for_process, self.cached_kwargs_for_process = (None, None)
self.cached_args_for_process_batch, self.cached_kwargs_for_process_batch = (
None, None)

super(PerWindowInvoker, self).invoke_finish_bundle()

@staticmethod
def _try_split(
fraction,
Expand Down
4 changes: 3 additions & 1 deletion sdks/python/apache_beam/runners/direct/direct_runner.py
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,9 @@ def visit_transform(self, applied_ptransform):
evaluation_context)
# DirectRunner does not support injecting
# PipelineOptions values at runtime
RuntimeValueProvider.set_runtime_options({})
RuntimeValueProvider.set_runtime_options(
{'experiments': set(options.view_as(beam.options.pipeline_options.DebugOptions).experiments)}
)
# Start the executor. This is a non-blocking call, it will start the
# execution in background threads and return.
executor.start(self.consumer_tracking_visitor.root_transforms)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,7 @@ def run_pipeline(
if not 'beam_fn_api' in experiments:
experiments.append('beam_fn_api')
options.view_as(pipeline_options.DebugOptions).experiments = experiments
RuntimeValueProvider.set_runtime_options({'experiments': set(experiments)})

# This is sometimes needed if type checking is disabled
# to enforce that the inputs (and outputs) of GroupByKey operations
Expand Down
14 changes: 14 additions & 0 deletions sdks/python/apache_beam/testing/load_tests/microbenchmarks_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@

from apache_beam.testing.load_tests.load_test import LoadTest
from apache_beam.tools import fn_api_runner_microbenchmark
from apache_beam.tools import map_fn_microbenchmark
from apache_beam.tools import teststream_microbenchmark
from apache_beam.transforms.util import _BatchSizeEstimator

Expand Down Expand Up @@ -92,6 +93,19 @@ def _run_fn_api_runner_microbenchmark(self):
'fn_api_runner_microbenchmark_per_element_cost_ms': b * 1000,
}

def _run_map_fn_microbenchmark(self):
start = time.perf_counter()
result = map_fn_microbenchmark.run_benchmark(verbose=False)
sizes = list(result[0].values())[0]
costs = list(result[1].values())[0]
a, b = _BatchSizeEstimator.linear_regression_no_numpy(sizes, costs)

return {
'map_fn_microbenchmark_runtime_sec': time.perf_counter() - start,
'map_fn_microbenchmark_fixed_cost_ms': a * 1000,
'map_fn_microbenchmark_per_element_cost_ms': b * 1000,
}


if __name__ == '__main__':
logging.basicConfig(level=logging.INFO)
Expand Down
112 changes: 87 additions & 25 deletions sdks/python/apache_beam/tools/map_fn_microbenchmark.py
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@

This executes the same codepaths that are run on the Fn API (and Dataflow)
workers, but is generally easier to run (locally) and more stable. It does
not, on the other hand, excercise any non-trivial amount of IO (e.g. shuffle).
not, on the other hand, exercise any non-trivial amount of IO (e.g. shuffle).

Run as

Expand All @@ -32,41 +32,103 @@

# pytype: skip-file

import argparse
import logging
import time

from scipy import stats

import apache_beam as beam
import apache_beam.options.pipeline_options
from apache_beam.tools import utils
from apache_beam.transforms.window import FixedWindows


def run_benchmark(num_maps=100, num_runs=10, num_elements_step=1000):
timings = {}
for run in range(num_runs):
num_elements = num_elements_step * run + 1
start = time.time()
def map_pipeline(num_elements, num_maps=100):
def _pipeline_runner():
with beam.Pipeline() as p:
pc = p | beam.Create(list(range(num_elements)))
for ix in range(num_maps):
pc = pc | 'Map%d' % ix >> beam.FlatMap(lambda x: (None, ))
timings[num_elements] = time.time() - start
print(
"%6d element%s %g sec" % (
num_elements,
" " if num_elements == 1 else "s",
timings[num_elements]))

print()
# pylint: disable=unused-variable
gradient, intercept, r_value, p_value, std_err = stats.linregress(
*list(zip(*list(timings.items()))))
print("Fixed cost ", intercept)
print("Per-element ", gradient / num_maps)
print("R^2 ", r_value**2)
pc = pc | 'Map%d' % ix >> beam.FlatMap(lambda x: (None,))

return _pipeline_runner


def map_with_global_side_input_pipeline(num_elements, num_maps=100):
def add(element, side_input):
return element + side_input

def _pipeline_runner():
with beam.Pipeline() as p:
side = p | 'CreateSide' >> beam.Create([1])
pc = p | 'CreateMain' >> beam.Create(list(range(num_elements)))
for ix in range(num_maps):
pc = pc | 'Map%d' % ix >> beam.Map(add, beam.pvalue.AsSingleton(side))

return _pipeline_runner

def map_with_global_side_input_pipeline_uncached(num_elements, num_maps=100):
def add(element, side_input):
return element + side_input

def _pipeline_runner():
beam_options = beam.options.pipeline_options.DebugOptions()
beam_options.add_experiment('disable_global_windowed_args_caching')
with beam.Pipeline(options=beam_options) as p:
side = p | 'CreateSide' >> beam.Create([1])
pc = p | 'CreateMain' >> beam.Create(list(range(num_elements)))
for ix in range(num_maps):
pc = pc | 'Map%d' % ix >> beam.Map(add, beam.pvalue.AsSingleton(side))

return _pipeline_runner

def map_with_fixed_window_side_input_pipeline(num_elements, num_maps=100):
def add(element, side_input):
return element + side_input

def _pipeline_runner():
with beam.Pipeline() as p:
side = p | 'CreateSide' >> beam.Create([1]) | 'WindowSide' >> beam.WindowInto(FixedWindows(1000))
pc = p | 'CreateMain' >> beam.Create(list(range(num_elements))) | 'WindowMain' >> beam.WindowInto(FixedWindows(1000))
for ix in range(num_maps):
pc = pc | 'Map%d' % ix >> beam.Map(add, beam.pvalue.AsSingleton(side))

return _pipeline_runner

def run_benchmark(
starting_point=1, num_runs=10, num_elements_step=100, verbose=True, profile_filename_base=None,
):
suite = [
utils.LinearRegressionBenchmarkConfig(
map_pipeline, starting_point, num_elements_step, num_runs
),
utils.BenchmarkConfig(
map_with_global_side_input_pipeline,
starting_point * 1000,
num_runs,
),
utils.BenchmarkConfig(
map_with_fixed_window_side_input_pipeline,
starting_point * 1000,
num_runs,
),
]
return utils.run_benchmarks(suite, verbose=verbose, profile_filename_base=profile_filename_base)


if __name__ == '__main__':
logging.basicConfig()
utils.check_compiled('apache_beam.runners.common')
run_benchmark()

parser = argparse.ArgumentParser()
parser.add_argument('--num_runs', default=10, type=int)
parser.add_argument('--starting_point', default=1, type=int)
parser.add_argument('--increment', default=100, type=int)
parser.add_argument('--verbose', default=True, type=bool)
parser.add_argument('--profile_filename_base', default=None, type=str)
options = parser.parse_args()

run_benchmark(
options.starting_point,
options.num_runs,
options.increment,
options.verbose,
options.profile_filename_base,
)
19 changes: 15 additions & 4 deletions sdks/python/apache_beam/tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
# pytype: skip-file

import collections
import cProfile
import gc
import importlib
import os
Expand Down Expand Up @@ -70,10 +71,11 @@ def __call__(self):
size: int
num_runs: int

def name(self):
return getattr(self.benchmark, '__name__', str(self.benchmark))

def __str__(self):
return "%s, %s element(s)" % (
getattr(self.benchmark, '__name__', str(self.benchmark)),
str(self.size))
return "%s, %s element(s)" % (self.name(), str(self.size))


class LinearRegressionBenchmarkConfig(NamedTuple):
Expand Down Expand Up @@ -109,7 +111,8 @@ def __str__(self):
str(self.increment))


def run_benchmarks(benchmark_suite, verbose=True):
def run_benchmarks(benchmark_suite, verbose=True,
profile_filename_base=None):
"""Runs benchmarks, and collects execution times.

A simple instrumentation to run a callable several times, collect and print
Expand All @@ -123,12 +126,15 @@ def run_benchmarks(benchmark_suite, verbose=True):
A dictionary of the form string -> list of floats. Keys of the dictionary
are benchmark names, values are execution times in seconds for each run.
"""
profiler = cProfile.Profile()
def run(benchmark: BenchmarkFactoryFn, size: int):
# Contain each run of a benchmark inside a function so that any temporary
# objects can be garbage-collected after the run.
benchmark_instance_callable = benchmark(size)
start = time.time()
profiler.enable()
_ = benchmark_instance_callable()
profiler.disable()
return time.time() - start

cost_series = collections.defaultdict(list)
Expand Down Expand Up @@ -164,6 +170,11 @@ def run(benchmark: BenchmarkFactoryFn, size: int):
if verbose:
print("")

if profile_filename_base:
filename = profile_filename_base + benchmark_config.name() + '.prof'
print("Dumping profile to " + filename)
profiler.dump_stats(filename)

if verbose:
pad_length = max([len(str(bc)) for bc in benchmark_suite])

Expand Down
Loading