Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion sdks/python/apache_beam/options/pipeline_options.py
Original file line number Diff line number Diff line change
Expand Up @@ -1810,7 +1810,7 @@ def _add_argparse_args(cls, parser):
parser.add_argument(
'--job_server_timeout',
'--job-server-timeout', # For backwards compatibility.
default=300,
default=600,
type=int,
help=(
'Job service request timeout in seconds. The timeout '
Expand Down
6 changes: 5 additions & 1 deletion sdks/python/apache_beam/runners/portability/job_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -88,7 +88,11 @@ def start(self):
self._endpoint = self._job_server.start()
self._started = True
atexit.register(self.stop)
signal.signal(signal.SIGINT, self._sigint_handler)
try:
signal.signal(signal.SIGINT, self._sigint_handler)
except Exception as e:
logging.warning("Unable to install signal handler for SIGINT: %s", e)
pass
return self._endpoint

def stop(self):
Expand Down
15 changes: 8 additions & 7 deletions sdks/python/apache_beam/utils/subprocess_server.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,15 +85,16 @@ def _next_id(self):

def register(self):
owner = self._next_id()
self._live_owners.add(owner)
with self._lock:
self._live_owners.add(owner)
return owner

def purge(self, owner):
if owner not in self._live_owners:
raise ValueError(f"{owner} not in {self._live_owners}")
self._live_owners.remove(owner)
to_delete = []
with self._lock:
if owner not in self._live_owners:
raise ValueError(f"{owner} not in {self._live_owners}")
self._live_owners.remove(owner)
for key, entry in list(self._cache.items()):
if owner in entry.owners:
entry.owners.remove(owner)
Expand All @@ -105,9 +106,9 @@ def purge(self, owner):
self._destructor(value)

def get(self, *key):
if not self._live_owners:
raise RuntimeError("At least one owner must be registered.")
with self._lock:
if not self._live_owners:
raise RuntimeError("At least one owner must be registered.")
if key not in self._cache:
self._cache[key] = _SharedCacheEntry(self._constructor(*key), set())
for owner in self._live_owners:
Expand Down Expand Up @@ -439,7 +440,7 @@ def path_to_beam_jar(
def _download_jar_to_cache(
cls, download_url, cached_jar_path, user_agent=None):
"""Downloads a jar from the given URL to the specified cache path.

Args:
download_url (str): The URL to download from.
cached_jar_path (str): The local path where the jar should be cached.
Expand Down
1 change: 1 addition & 0 deletions sdks/python/setup.py
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
# It is recommended to import setuptools prior to importing distutils to avoid
# using legacy behavior from distutils.
# https://setuptools.readthedocs.io/en/latest/history.html#v48-0-0
# Add something here to retrigger tests.
from distutils.errors import DistutilsError # isort:skip


Expand Down
Loading