-
Notifications
You must be signed in to change notification settings - Fork 1.5k
[AI-4964] Add support for timeout in process isolation #22975
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
8c5ea0a
bf90e1b
f3de605
f0185ea
c14de53
5681255
d1675a4
58168c7
6565292
ff6c92c
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| Add support for timeout paramter to use with process_isolation. |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -4,6 +4,8 @@ | |
| import os | ||
| import subprocess | ||
| import sys | ||
| import threading | ||
| from contextlib import contextmanager, nullcontext | ||
|
|
||
| from datadog_checks.base.utils.common import ensure_bytes, to_native_string | ||
| from datadog_checks.base.utils.format import json | ||
|
|
@@ -12,6 +14,16 @@ | |
| from .constants import KNOWN_DATADOG_AGENT_SETTER_METHODS, EnvVars | ||
|
|
||
|
|
||
| @contextmanager | ||
| def _timer(timeout, callback): | ||
| timer = threading.Timer(timeout, callback) | ||
| timer.start() | ||
| try: | ||
| yield | ||
| finally: | ||
| timer.cancel() | ||
|
|
||
|
|
||
| def run_with_isolation(check, aggregator, datadog_agent): | ||
| message_indicator = os.urandom(8).hex() | ||
| instance = dict(check.instance) | ||
|
|
@@ -21,6 +33,10 @@ def run_with_isolation(check, aggregator, datadog_agent): | |
| instance.pop('process_isolation', None) | ||
| init_config.pop('process_isolation', None) | ||
|
|
||
| timeout = instance.pop('process_isolation_timeout', init_config.pop('process_isolation_timeout', None)) | ||
| if timeout is not None: | ||
| timeout = float(timeout) | ||
|
|
||
| env_vars = dict(os.environ) | ||
| env_vars[EnvVars.MESSAGE_INDICATOR] = message_indicator | ||
| env_vars[EnvVars.CHECK_NAME] = check.name | ||
|
|
@@ -47,37 +63,54 @@ def run_with_isolation(check, aggregator, datadog_agent): | |
| stderr=subprocess.STDOUT, | ||
| env=env_vars, | ||
| ) | ||
| timed_out = False | ||
|
|
||
| def _kill_on_timeout(): | ||
| nonlocal timed_out | ||
| timed_out = True | ||
| # currently this only kills the parent process; does not support checks that spawn sub-processes | ||
| process.kill() | ||
|
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Useful? React with 👍 / 👎.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. This is a good point however the
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. imho TODOs really should reflect things we plan to do later. This sounds more like something we don't know we want yet. So I'd rather we have a comment that says something about that.
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Mhm ok good point, I'll make a note to that extent |
||
|
|
||
| timer = _timer(timeout, _kill_on_timeout) if timeout is not None else nullcontext() | ||
|
|
||
| with process: | ||
| check.log.info('Running check in a separate process') | ||
| with timer: | ||
| # To avoid blocking never use a pipe's file descriptor iterator. See https://bugs.python.org/issue3907 | ||
| for line in iter(process.stdout.readline, b''): | ||
| line = line.rstrip().decode('utf-8') | ||
| indicator, _, procedure = line.partition(':') | ||
| if indicator != message_indicator: | ||
| check.log.debug(line) | ||
| continue | ||
|
|
||
| check.log.trace(line) | ||
|
|
||
| message_type, _, message = procedure.partition(':') | ||
| message = json.decode(message) | ||
| if message_type == 'aggregator': | ||
| getattr(aggregator, message['method'])(check, *message['args'], **message['kwargs']) | ||
| elif message_type == 'log': | ||
| getattr(check.log, message['method'])(*message['args']) | ||
| elif message_type == 'datadog_agent': | ||
| method = message['method'] | ||
| value = getattr(datadog_agent, method)(*message['args'], **message['kwargs']) | ||
| if method not in KNOWN_DATADOG_AGENT_SETTER_METHODS: | ||
| try: | ||
| process.stdin.write(b'%s\n' % ensure_bytes(json.encode({'value': value}))) | ||
| process.stdin.flush() | ||
| except BrokenPipeError: | ||
| break | ||
| elif message_type == 'error': | ||
| check.log.error(message[0]['traceback']) | ||
| break | ||
| else: | ||
| check.log.error( | ||
| 'Unknown message type encountered during communication with the isolated process: %s', | ||
| message_type, | ||
| ) | ||
| break | ||
|
|
||
| # To avoid blocking never use a pipe's file descriptor iterator. See https://bugs.python.org/issue3907 | ||
| for line in iter(process.stdout.readline, b''): | ||
| line = line.rstrip().decode('utf-8') | ||
| indicator, _, procedure = line.partition(':') | ||
| if indicator != message_indicator: | ||
| check.log.debug(line) | ||
| continue | ||
|
|
||
| check.log.trace(line) | ||
|
|
||
| message_type, _, message = procedure.partition(':') | ||
| message = json.decode(message) | ||
| if message_type == 'aggregator': | ||
| getattr(aggregator, message['method'])(check, *message['args'], **message['kwargs']) | ||
| elif message_type == 'log': | ||
| getattr(check.log, message['method'])(*message['args']) | ||
| elif message_type == 'datadog_agent': | ||
| method = message['method'] | ||
| value = getattr(datadog_agent, method)(*message['args'], **message['kwargs']) | ||
| if method not in KNOWN_DATADOG_AGENT_SETTER_METHODS: | ||
| process.stdin.write(b'%s\n' % ensure_bytes(json.encode({'value': value}))) | ||
| process.stdin.flush() | ||
| elif message_type == 'error': | ||
| check.log.error(message[0]['traceback']) | ||
| break | ||
| else: | ||
| check.log.error( | ||
| 'Unknown message type encountered during communication with the isolated process: %s', | ||
| message_type, | ||
| ) | ||
| break | ||
| if timed_out: | ||
| check.log.error('Check timed out after %s seconds', timeout) | ||
| check.warning('Check timed out and possibly reported incomplete data.') | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What's stopping us from having this as an instance variable?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
run_with_isolationis a standalone method; we could set it on the check itself since we pass it torun_with_isolationbut then we'd have thetimed_outvariable on the check class when the majority of the time we don't even run with process isolation. I don't have a strong opinion either way, thoughThere was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
can be done separately, but wdyt of having a
process_isolation_configorprocess_isolation_stateon the check class? We'd define some dataclass withtimed_outas a field, but mb others later.That way we acknowledge the theoretical need for this state without making claims about it being a specific value?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hm that would be cleaner! I will think about it a little more and follow up in a separate PR