Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions changelog/68678.fixed.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
Handle corrupted grains cache msgpack data by refreshing the cache.
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the wrong changelog


32 changes: 32 additions & 0 deletions salt/client/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2068,6 +2068,29 @@ def __exit__(self, *args):
self.destroy()


class LoadedMod:
"""
Proxy a module namespace on a FunctionWrapper instance.
"""

__slots__ = ("mod", "wrapper")

def __init__(self, mod, wrapper):
self.mod = mod
self.wrapper = wrapper

def __getattr__(self, name):
try:
return self.wrapper[f"{self.mod}.{name}"]
except KeyError:
raise AttributeError(
f"No attribute by the name of {name} was found on {self.mod}"
)

def __repr__(self):
return f"<FunctionWrapper module='{self.mod}'>"


class FunctionWrapper(dict):
"""
Create a function wrapper that looks like the functions dict on the minion
Expand Down Expand Up @@ -2118,6 +2141,15 @@ def func(*args, **kwargs):

return func

def __getattr__(self, mod_or_func):
"""
Support dotted module access (e.g. __salt__.cp.get_file_str()).
"""
if mod_or_func.startswith("__") and mod_or_func.endswith("__"):
# Don't pretend dunders are set.
raise AttributeError(mod_or_func)
return LoadedMod(mod_or_func, self)


class Caller:
"""
Expand Down
15 changes: 15 additions & 0 deletions salt/config/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -312,6 +312,16 @@ def _gather_buffer_space():
"multiprocessing": bool,
# Maximum number of concurrently active processes at any given point in time
"process_count_max": int,
# Ratio of open-file usage above which new jobs will be delayed
"minion_open_file_limit_ratio": float,
# Minimum number of free file descriptors to keep available
"minion_open_file_limit_min": int,
# Backoff in seconds when delaying jobs due to open-file pressure
"minion_open_file_backoff": float,
# Minimum seconds between open-file pressure log messages
"minion_open_file_log_interval": float,
# Retry count for starting processes when EMFILE/ENFILE is raised
"minion_open_file_retries": int,
# Whether or not the salt minion should run scheduled mine updates
"mine_enabled": bool,
# Whether or not scheduled mine updates should be accompanied by a job return for the job cache
Expand Down Expand Up @@ -1205,6 +1215,11 @@ def _gather_buffer_space():
"autosign_timeout": 120,
"multiprocessing": True,
"process_count_max": -1,
"minion_open_file_limit_ratio": 0.9,
"minion_open_file_limit_min": 32,
"minion_open_file_backoff": 1.0,
"minion_open_file_log_interval": 30.0,
"minion_open_file_retries": 3,
"mine_enabled": True,
"mine_return_job": False,
"mine_interval": 60,
Expand Down
36 changes: 27 additions & 9 deletions salt/loader/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -1043,6 +1043,20 @@ def _format_cached_grains(cached_grains):
return cached_grains


def _invalidate_grains_cache(cfn, reason=None):
"""
Remove an invalid grains cache file to allow refresh.
"""
if reason:
log.warning("Invalid grains cache (%s). Removing %s and refreshing.", reason, cfn)
else:
log.warning("Invalid grains cache. Removing %s and refreshing.", cfn)
try:
os.remove(cfn)
except OSError as exc:
log.debug("Failed to remove grains cache file %s: %s", cfn, exc)


def _load_cached_grains(opts, cfn):
"""
Returns the grains cached in cfn, or None if the cache is too old or is
Expand Down Expand Up @@ -1070,17 +1084,21 @@ def _load_cached_grains(opts, cfn):
log.debug("Retrieving grains from cache")
try:
with salt.utils.files.fopen(cfn, "rb") as fp_:
cached_grains = salt.utils.data.decode(
salt.payload.load(fp_), preserve_tuples=True
)
if not cached_grains:
log.debug("Cached grains are empty, cache might be corrupted. Refreshing.")
return None

return _format_cached_grains(cached_grains)
except OSError:
cached_grains = salt.payload.load(fp_, raise_on_error=False)
except OSError as exc:
log.debug("Failed to read grains cache file %s: %s", cfn, exc)
return None
if not cached_grains:
_invalidate_grains_cache(cfn, "empty or unreadable")
return None
try:
cached_grains = salt.utils.data.decode(cached_grains, preserve_tuples=True)
except Exception as exc: # pylint: disable=broad-except
_invalidate_grains_cache(cfn, f"decode error: {exc}")
return None

return _format_cached_grains(cached_grains)


def grains(opts, force_refresh=False, proxy=None, context=None, loaded_base_name=None):
"""
Expand Down
67 changes: 63 additions & 4 deletions salt/minion.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@
"""

import asyncio
import errno
import binascii
import collections
import contextlib
Expand Down Expand Up @@ -1878,6 +1879,41 @@ async def _handle_decoded_payload(self, data):
await asyncio.sleep(10)
process_count = len(salt.utils.minion.running(self.opts))

try:
fd_backoff = max(
0.1, float(self.opts.get("minion_open_file_backoff", 1.0))
)
except (TypeError, ValueError):
fd_backoff = 1.0
try:
fd_log_interval = max(
1.0, float(self.opts.get("minion_open_file_log_interval", 30.0))
)
except (TypeError, ValueError):
fd_log_interval = 30.0

while salt.utils.process.should_throttle_open_files(self.opts):
now = time.time()
last_log = getattr(self, "_last_open_file_log", 0)
if (now - last_log) >= fd_log_interval:
self._last_open_file_log = now
pressure = salt.utils.process.open_file_pressure()
if pressure:
count, limit, usage = pressure
log.warning(
"Open file usage %s/%s (%.0f%%) while executing jid %s, waiting...",
count,
limit,
usage * 100.0,
data["jid"],
)
else:
log.warning(
"Open file usage at or above limit while executing jid %s, waiting...",
data["jid"],
)
await asyncio.sleep(fd_backoff)

# We stash an instance references to allow for the socket
# communication in Windows. You can't pickle functions, and thus
# python needs to be able to reconstruct the reference on the other
Expand Down Expand Up @@ -1906,10 +1942,33 @@ async def _handle_decoded_payload(self, data):
)

if multiprocessing_enabled:
with default_signals(signal.SIGINT, signal.SIGTERM):
# Reset current signals before starting the process in
# order not to inherit the current signal handlers
process.start()
start_retries = self.opts.get("minion_open_file_retries", 3)
try:
start_retries = max(0, int(start_retries))
except (TypeError, ValueError):
start_retries = 3
for attempt in range(start_retries + 1):
with default_signals(signal.SIGINT, signal.SIGTERM):
try:
# Reset current signals before starting the process in
# order not to inherit the current signal handlers
process.start()
break
except OSError as exc:
if exc.errno not in (errno.EMFILE, errno.ENFILE):
raise
if attempt >= start_retries:
log.error(
"Failed to start process for jid %s due to open file limits",
data["jid"],
)
return
log.warning(
"Too many open files to start jid %s, retrying in %ss",
data["jid"],
fd_backoff,
)
await asyncio.sleep(fd_backoff)
else:
process.start()
self.subprocess_list.add(process)
Expand Down
32 changes: 20 additions & 12 deletions salt/payload.py
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ def format_payload(enc, **kwargs):
return package(payload)


def loads(msg, encoding=None, raw=False):
def loads(msg, encoding=None, raw=False, log_error=True):
"""
Run the correct loads serialization format

Expand All @@ -70,6 +70,7 @@ def loads(msg, encoding=None, raw=False):
been lost in this case) to what the encoding is
set as. In this case, it will fail if any of
the contents cannot be converted.
:param log_error: Log deserialization failures when True.
"""
try:

Expand Down Expand Up @@ -97,14 +98,15 @@ def ext_type_decoder(code, data):
if encoding is None and not raw:
ret = salt.transport.frame.decode_embedded_strs(ret)
except Exception as exc: # pylint: disable=broad-except
log.critical(
"Could not deserialize msgpack message. This often happens "
"when trying to read a file not in binary mode. "
"To see message payload, enable debug logging and retry. "
"Exception: %s",
exc,
)
log.debug("Msgpack deserialization failure on message: %s", msg)
if log_error:
log.critical(
"Could not deserialize msgpack message. This often happens "
"when trying to read a file not in binary mode. "
"To see message payload, enable debug logging and retry. "
"Exception: %s",
salt.utils.msgpack.format_exception(exc),
)
log.debug("Msgpack deserialization failure on message: %s", msg)
exc_msg = "Could not deserialize msgpack message. See log for more info."
raise SaltDeserializationError(exc_msg) from exc
finally:
Expand Down Expand Up @@ -200,14 +202,20 @@ def verylong_encoder(obj, context):
)


def load(fn_):
def load(fn_, raise_on_error=True):
"""
Run the correct serialization to load a file
"""
data = fn_.read()
fn_.close()
if data:
return loads(data, encoding="utf-8")
if not data:
return None
try:
return loads(data, encoding="utf-8", log_error=raise_on_error)
except SaltDeserializationError:
if raise_on_error:
raise
return None


def dump(msg, fn_):
Expand Down
7 changes: 5 additions & 2 deletions salt/renderers/py.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,11 +142,14 @@ def render(template, saltenv="base", sls="", tmplpath=None, **kws):
if not os.path.isfile(template):
raise SaltRenderError(f"Template {template} is not a file!")

# Ensure python templates get a __salt__ object that supports dot notation
# consistently (including under salt-ssh wrappers).
salt_funcs = salt.utils.templates.wrap_salt_funcs(__salt__)
tmp_data = salt.utils.templates.py(
template,
True,
__salt__=__salt__,
salt=__salt__,
__salt__=salt_funcs,
salt=salt_funcs,
__grains__=__grains__,
grains=__grains__,
__opts__=__opts__,
Expand Down
23 changes: 23 additions & 0 deletions salt/utils/msgpack.py
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,29 @@ class _exceptions:

exceptions = _exceptions()


def is_extra_data_exception(exc):
"""
Return True if the exception is a msgpack ExtraData error.
"""
extra_data = getattr(exceptions, "ExtraData", None)
return extra_data is not None and isinstance(exc, extra_data)


def format_exception(exc):
"""
Return a helpful string for msgpack exceptions.
"""
if is_extra_data_exception(exc):
extra = getattr(exc, "extra", None)
try:
extra_len = len(extra) if extra is not None else None
except TypeError:
extra_len = None
if extra_len is not None:
return f"{exc} (extra {extra_len} bytes)"
return str(exc)

# One-to-one mappings
Packer = msgpack.Packer
ExtType = msgpack.ExtType
Expand Down
Loading