Skip to content
Draft
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
18 changes: 17 additions & 1 deletion sdks/python/apache_beam/runners/portability/prism_runner_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@
import os.path
import queue
import shlex
import threading
import time
import typing
import unittest
Expand Down Expand Up @@ -189,6 +188,7 @@ def create_options(self):
options.view_as(StandardOptions).streaming = self.streaming
options.view_as(
TypeOptions).allow_unsafe_triggers = self.allow_unsafe_triggers
options.view_as(PortableOptions).job_server_timeout = 60
return options

# Can't read host files from within docker, read a "local" file there.
Expand Down Expand Up @@ -300,6 +300,22 @@ def test_after_count_trigger_streaming(self):
('B-3', {10, 15, 16}),
])))

def test_dofn_failure_clean_exit(self):
class FailDoFn(beam.DoFn):
def process(self, element):
raise ValueError("Failing as intended")

class BlockDoFn(beam.DoFn):
def process(self, element):
time.sleep(1000)
yield element

with self.assertRaisesRegex(Exception, "Failing as intended"):
with self.create_pipeline() as p:
imp = p | beam.Create([1, 2])
_ = imp | 'Block' >> beam.ParDo(BlockDoFn())
_ = imp | 'Fail' >> beam.ParDo(FailDoFn())


class PrismJobServerTest(unittest.TestCase):
def setUp(self) -> None:
Expand Down
Loading