Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
{
"https://github.com/apache/beam/pull/35951": "triggering sideinput test"
}
72 changes: 72 additions & 0 deletions sdks/python/apache_beam/transforms/sideinputs_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -19,13 +19,20 @@

# pytype: skip-file

import hashlib
import itertools
import logging
import unittest
from typing import Any
from typing import Dict
from typing import Iterable
from typing import Tuple
from typing import Union

import pytest

import apache_beam as beam
from apache_beam.testing.synthetic_pipeline import SyntheticSDFAsSource
from apache_beam.testing.test_pipeline import TestPipeline
from apache_beam.testing.test_stream import TestStream
from apache_beam.testing.util import assert_that
Expand Down Expand Up @@ -417,6 +424,71 @@ def process(
use_global_window=False,
label='assert per window')

@pytest.mark.it_validatesrunner
def test_side_input_with_sdf(self):
"""Test a side input with SDF.

This test verifies consisency of side input when it is split due to
SDF (Splittable DoFns). The consistency is verified by checking the size
and fingerprint of the side input.

This test needs to run with at least 2 workers (--num_workers=2) and
autoscaling disabled (--autoscaling_algorithm=NONE). Otherwise it might
provide false positives (i.e. not fail on bad state).
"""
initial_elements = 1000
num_records = 10000
key_size = 10
value_size = 100
expected_fingerprint = '00f7eeac8514721e2683d14a504b33d1'

class GetSyntheticSDFOptions(beam.DoFn):
"""A DoFn that emits elements for genenrating SDF."""
def process(self, element: Any) -> Iterable[Dict[str, Union[int, str]]]:
yield {
'num_records': num_records // initial_elements,
'key_size': key_size,
'value_size': value_size,
'initial_splitting_num_bundles': 0,
'initial_splitting_desired_bundle_size': 0,
'sleep_per_input_record_sec': 0,
'initial_splitting': 'const',
}

class SideInputTrackingDoFn(beam.DoFn):
"""A DoFn that emits the size and fingerprint of the side input.

In this context, the size is the number of elements and the fingerprint
is the hash of the sorted serialized elements.
"""
def process(
self, element: Any,
side_input: Iterable[Tuple[bytes,
bytes]]) -> Iterable[Tuple[int, str]]:

# Sort for consistent hashing.
sorted_side_input = sorted(side_input)
size = len(sorted_side_input)
m = hashlib.md5()
for key, value in sorted_side_input:
m.update(key)
m.update(value)
yield (size, m.hexdigest())

pipeline = self.create_pipeline()
main_input = pipeline | 'Main input: Create' >> beam.Create([0])
side_input = pipeline | 'Side input: Create' >> beam.Create(
range(initial_elements))
side_input |= 'Side input: Get synthetic SDF options' >> beam.ParDo(
GetSyntheticSDFOptions())
side_input |= 'Side input: Process and split' >> beam.ParDo(
SyntheticSDFAsSource())
results = main_input | 'Emit side input' >> beam.ParDo(
SideInputTrackingDoFn(), beam.pvalue.AsIter(side_input))

assert_that(results, equal_to([(num_records, expected_fingerprint)]))
pipeline.run()


if __name__ == '__main__':
logging.getLogger().setLevel(logging.DEBUG)
Expand Down
Loading