Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions datadog_checks_base/changelog.d/22975.added
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
Add support for timeout paramter to use with process_isolation.
95 changes: 64 additions & 31 deletions datadog_checks_base/datadog_checks/base/utils/replay/execute.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,8 @@
import os
import subprocess
import sys
import threading
from contextlib import contextmanager, nullcontext

from datadog_checks.base.utils.common import ensure_bytes, to_native_string
from datadog_checks.base.utils.format import json
Expand All @@ -12,6 +14,16 @@
from .constants import KNOWN_DATADOG_AGENT_SETTER_METHODS, EnvVars


@contextmanager
def _timer(timeout, callback):
timer = threading.Timer(timeout, callback)
timer.start()
try:
yield
finally:
timer.cancel()


def run_with_isolation(check, aggregator, datadog_agent):
message_indicator = os.urandom(8).hex()
instance = dict(check.instance)
Expand All @@ -21,6 +33,10 @@ def run_with_isolation(check, aggregator, datadog_agent):
instance.pop('process_isolation', None)
init_config.pop('process_isolation', None)

timeout = instance.pop('process_isolation_timeout', init_config.pop('process_isolation_timeout', None))
if timeout is not None:
timeout = float(timeout)

env_vars = dict(os.environ)
env_vars[EnvVars.MESSAGE_INDICATOR] = message_indicator
env_vars[EnvVars.CHECK_NAME] = check.name
Expand All @@ -47,37 +63,54 @@ def run_with_isolation(check, aggregator, datadog_agent):
stderr=subprocess.STDOUT,
env=env_vars,
)
timed_out = False

def _kill_on_timeout():
nonlocal timed_out
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What's stopping us from having this as an instance variable?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

run_with_isolation is a standalone method; we could set it on the check itself since we pass it to run_with_isolation but then we'd have the timed_out variable on the check class when the majority of the time we don't even run with process isolation. I don't have a strong opinion either way, though

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can be done separately, but wdyt of having a process_isolation_config or process_isolation_state on the check class? We'd define some dataclass with timed_out as a field, but mb others later.

That way we acknowledge the theoretical need for this state without making claims about it being a specific value?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Hm that would be cleaner! I will think about it a little more and follow up in a separate PR

timed_out = True
# currently this only kills the parent process; does not support checks that spawn sub-processes
process.kill()
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

P1 Badge Terminate the entire isolated process tree on timeout

process.kill() only stops the wrapper Python process here. Checks can legitimately spawn child commands (for example process/datadog_checks/process/process.py:244-246 and tibco_ems/datadog_checks/tibco_ems/tibco_ems.py:81-88), and with process_isolation_timeout enabled those descendants will keep running after the wrapper is killed because this subprocess was not started in its own session/process group. In those integrations, repeated timeouts would leak orphaned commands and the check is not actually "stopped" as advertised.

Useful? React with 👍 / 👎.

Copy link
Copy Markdown
Contributor Author

@sarah-witt sarah-witt Mar 19, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a good point however the process_isolation feature is still experimental and the timeout in particular only will be used on vSphere, which doesn't spawn processes. I added a TODO, but reviewers let me know if I should address!

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

imho TODOs really should reflect things we plan to do later. This sounds more like something we don't know we want yet. So I'd rather we have a comment that says something about that.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Mhm ok good point, I'll make a note to that extent


timer = _timer(timeout, _kill_on_timeout) if timeout is not None else nullcontext()

with process:
check.log.info('Running check in a separate process')
with timer:
# To avoid blocking never use a pipe's file descriptor iterator. See https://bugs.python.org/issue3907
for line in iter(process.stdout.readline, b''):
line = line.rstrip().decode('utf-8')
indicator, _, procedure = line.partition(':')
if indicator != message_indicator:
check.log.debug(line)
continue

check.log.trace(line)

message_type, _, message = procedure.partition(':')
message = json.decode(message)
if message_type == 'aggregator':
getattr(aggregator, message['method'])(check, *message['args'], **message['kwargs'])
elif message_type == 'log':
getattr(check.log, message['method'])(*message['args'])
elif message_type == 'datadog_agent':
method = message['method']
value = getattr(datadog_agent, method)(*message['args'], **message['kwargs'])
if method not in KNOWN_DATADOG_AGENT_SETTER_METHODS:
try:
process.stdin.write(b'%s\n' % ensure_bytes(json.encode({'value': value})))
process.stdin.flush()
except BrokenPipeError:
break
elif message_type == 'error':
check.log.error(message[0]['traceback'])
break
else:
check.log.error(
'Unknown message type encountered during communication with the isolated process: %s',
message_type,
)
break

# To avoid blocking never use a pipe's file descriptor iterator. See https://bugs.python.org/issue3907
for line in iter(process.stdout.readline, b''):
line = line.rstrip().decode('utf-8')
indicator, _, procedure = line.partition(':')
if indicator != message_indicator:
check.log.debug(line)
continue

check.log.trace(line)

message_type, _, message = procedure.partition(':')
message = json.decode(message)
if message_type == 'aggregator':
getattr(aggregator, message['method'])(check, *message['args'], **message['kwargs'])
elif message_type == 'log':
getattr(check.log, message['method'])(*message['args'])
elif message_type == 'datadog_agent':
method = message['method']
value = getattr(datadog_agent, method)(*message['args'], **message['kwargs'])
if method not in KNOWN_DATADOG_AGENT_SETTER_METHODS:
process.stdin.write(b'%s\n' % ensure_bytes(json.encode({'value': value})))
process.stdin.flush()
elif message_type == 'error':
check.log.error(message[0]['traceback'])
break
else:
check.log.error(
'Unknown message type encountered during communication with the isolated process: %s',
message_type,
)
break
if timed_out:
check.log.error('Check timed out after %s seconds', timeout)
check.warning('Check timed out and possibly reported incomplete data.')
19 changes: 19 additions & 0 deletions datadog_checks_base/tests/base/utils/replay/test_check.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,3 +62,22 @@ def test_replay_all(caplog, dd_run_check, aggregator, datadog_agent, init_config
break
else:
raise AssertionError('Expected DEBUG log with message: {}'.format(expected_message))


class SlowReplayCheck(AgentCheck):
__NAMESPACE__ = 'slow_replay'

def check(self, _):
import time

time.sleep(9999)


def test_replay_timeout(caplog, dd_run_check):
instance = {'process_isolation': True, 'process_isolation_timeout': 1}
check = SlowReplayCheck('slow_replay', {}, [instance])

with caplog.at_level(logging.ERROR):
dd_run_check(check)

assert 'Check timed out' in caplog.text
Loading