Skip to content

Commit df9784a

Browse files
committed
capture process output
1 parent 4cf62a5 commit df9784a

4 files changed

Lines changed: 115 additions & 21 deletions

File tree

.pre-commit-config.yaml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -11,7 +11,7 @@ repos:
1111
hooks:
1212
- id: black
1313
args: ["--line-length=120"]
14-
language_version: python3.9
14+
language_version: python3.10
1515
- repo: https://github.com/pre-commit/pre-commit-hooks
1616
rev: v4.3.0
1717
hooks:

taskbadger/cli.py

Lines changed: 26 additions & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,3 @@
1-
import subprocess
2-
import time
3-
from datetime import datetime
41
from typing import Optional, Tuple
52

63
import typer
@@ -10,6 +7,7 @@
107
from taskbadger import Action, StatusEnum, Task, __version__, integrations
118
from taskbadger.config import get_config, write_config
129
from taskbadger.exceptions import ConfigurationError
10+
from taskbadger.process import ProcessRunner
1311

1412
app = typer.Typer(
1513
rich_markup_mode="rich",
@@ -49,6 +47,7 @@ def run(
4947
show_default=False,
5048
help="Action definition e.g. 'success,error email to:me@email.com'",
5149
),
50+
capture_output: bool = typer.Option(False, help="Capture stdout and stderr."),
5251
):
5352
"""Execute a command using the CLI and create a Task to track its outcome.
5453
@@ -81,33 +80,43 @@ def run(
8180
else:
8281
print(f"Task created: {task.public_url}")
8382
env = {"TASKBADGER_TASK_ID": task.id} if task else None
84-
last_update = datetime.utcnow()
8583
try:
86-
process = subprocess.Popen(ctx.args, env=env, shell=True)
87-
while process.poll() is None:
88-
try:
89-
time.sleep(0.1)
90-
if task and _should_update_task(last_update, update_frequency):
91-
last_update = datetime.utcnow()
92-
task.ping()
93-
except Exception as e:
94-
err_console.print(f"Error updating task status: {e}")
84+
process = ProcessRunner(ctx.args, env, capture_output=capture_output, update_frequency=update_frequency)
85+
for output in process.run():
86+
_update_task(task, **(output or {}))
9587
except Exception as e:
96-
task and task.error(data={"exception": str(e)})
88+
_update_task(task, exception=str(e))
9789
raise typer.Exit(1)
9890

9991
if task:
10092
if process.returncode == 0:
10193
task.success(value=100)
10294
else:
103-
task.error(data={"return_code": process.returncode})
95+
_update_task(task, status=StatusEnum.ERROR, return_code=process.returncode)
10496

10597
if process.returncode != 0:
10698
raise typer.Exit(process.returncode)
10799

108100

109-
def _should_update_task(last_update: datetime, update_frequency_seconds):
110-
return (datetime.utcnow() - last_update).total_seconds() >= update_frequency_seconds
101+
def _update_task(task, status=None, **data_kwargs):
102+
"""Update the task and merge the data"""
103+
if not task:
104+
return
105+
106+
task_data = task.data or {}
107+
for key, value in data_kwargs.items():
108+
if key in ("stdout", "stderr"):
109+
if key in task_data and value:
110+
task_data[key] += value
111+
elif value:
112+
task_data[key] = value
113+
else:
114+
task_data[key] = value
115+
116+
try:
117+
task.update(status=status, data=task_data or None)
118+
except Exception as e:
119+
err_console.print(f"Error updating task status: {e}")
111120

112121

113122
@app.command()

taskbadger/process.py

Lines changed: 72 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,72 @@
1+
import subprocess
2+
import threading
3+
import time
4+
from datetime import datetime
5+
6+
7+
class ProcessRunner:
8+
def __init__(self, process_args, env, capture_output: bool, update_frequency: int = 5):
9+
self.process_args = process_args
10+
self.env = env
11+
self.capture_output = capture_output
12+
self.update_frequency = update_frequency
13+
self.returncode = None
14+
15+
def run(self):
16+
last_update = datetime.utcnow()
17+
18+
kwargs = {}
19+
if self.capture_output:
20+
kwargs["stdout"] = subprocess.PIPE
21+
kwargs["stderr"] = subprocess.PIPE
22+
23+
process = subprocess.Popen(self.process_args, env=self.env, shell=True, **kwargs)
24+
if self.capture_output:
25+
stdout = Reader(process.stdout).start()
26+
stderr = Reader(process.stderr).start()
27+
28+
while process.poll() is None:
29+
time.sleep(0.1)
30+
if _should_update(last_update, self.update_frequency):
31+
last_update = datetime.utcnow()
32+
if self.capture_output:
33+
yield {"stdout": stdout.read(), "stderr": stderr.read()}
34+
else:
35+
yield
36+
37+
self.returncode = process.returncode
38+
39+
40+
class Reader:
41+
def __init__(self, source):
42+
self.source = source
43+
self.data = []
44+
self._lock = threading.Lock()
45+
46+
def start(self):
47+
self._thread = threading.Thread(name="reader-thread", target=self._reader, daemon=True)
48+
self._thread.start()
49+
return self
50+
51+
def _reader(self):
52+
"""Read data from source until EOF, adding it to collector."""
53+
while True:
54+
data = self.source.read1().decode()
55+
self._lock.acquire()
56+
self.data.append(data)
57+
self._lock.release()
58+
if not data:
59+
break
60+
return
61+
62+
def read(self):
63+
"""Read data written by the process to its standard output."""
64+
self._lock.acquire()
65+
outdata = "".join(self.data)
66+
del self.data[:]
67+
self._lock.release()
68+
return outdata
69+
70+
71+
def _should_update(last_update: datetime, update_frequency_seconds):
72+
return (datetime.utcnow() - last_update).total_seconds() >= update_frequency_seconds

tests/test_cli_run.py

Lines changed: 16 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -7,7 +7,7 @@
77

88
from taskbadger.cli import app
99
from taskbadger.internal.models import PatchedTaskRequest, PatchedTaskRequestData, StatusEnum, TaskRequest
10-
from taskbadger.internal.types import Response
10+
from taskbadger.internal.types import UNSET, Response
1111
from taskbadger.sdk import Badger
1212
from tests.utils import task_for_test
1313

@@ -36,8 +36,20 @@ def test_cli_long_run():
3636
def _should_update_task(last_update, update_frequency_seconds):
3737
return True
3838

39-
with mock.patch("taskbadger.cli._should_update_task", new=_should_update_task):
40-
_test_cli_run(["echo test; sleep 0.11"], 0, args=["task_name"], update_call_count=3)
39+
with mock.patch("taskbadger.process._should_update", new=_should_update_task):
40+
update_patch = _test_cli_run(
41+
["echo test; sleep 0.11"], 0, args=["task_name", "--capture-output"], update_call_count=3
42+
)
43+
44+
# make sure the output was captured
45+
body = PatchedTaskRequest(status=UNSET, data=PatchedTaskRequestData.from_dict({"stdout": "test\n"}))
46+
update_patch.assert_any_call(
47+
client=Badger.current.settings.client,
48+
organization_slug="org",
49+
project_slug="project",
50+
id="test_id",
51+
json_body=body,
52+
)
4153

4254

4355
def test_cli_run_error():
@@ -95,3 +107,4 @@ def _test_cli_run(command, return_code, args=None, action=None, update_call_coun
95107
update.assert_called_with(
96108
client=settings.client, organization_slug="org", project_slug="project", id="test_id", json_body=body
97109
)
110+
return update

0 commit comments

Comments
 (0)