Skip to content

Commit 8115469

Browse files
authored
Merge pull request #3 from python/mv
port Minivisor
2 parents 3fb29a6 + a43994c commit 8115469

File tree

4 files changed

+396
-2
lines changed

4 files changed

+396
-2
lines changed

Dockerfile

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,8 @@ RUN pip install --no-cache-dir -r requirements.txt
4343
# Copy the backend source code
4444
COPY ./backend/ .
4545

46-
4746
# Copy the nextjs application
4847
COPY --from=frontend-builder --chown=nobody /app/.next/standalone ./
4948
COPY --from=frontend-builder --chown=nobody /app/.next/static ./.next/static
49+
50+
COPY ./scripts ./scripts

Procfile

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,3 @@
11
web-backend: python -m uvicorn app.main:app --uds /var/run/cabotage/cabotage.sock
2-
web-frontend: node server.js & socat UNIX-LISTEN:/var/run/cabotage/cabotage.sock,fork TCP:127.0.0.1:3000
2+
web-frontend: ./scripts/start-frontend
33
release: echo 'doin deploy things'

scripts/mv.py

Lines changed: 371 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,371 @@
1+
#!/usr/bin/env python3.7
2+
# This file runs on Debian Buster and needs to be Python 3.7 compatible.
3+
4+
from __future__ import annotations
5+
from typing import Callable, Coroutine, TYPE_CHECKING
6+
from types import FrameType
7+
8+
import asyncio
9+
import asyncio.subprocess
10+
import dataclasses
11+
import shutil
12+
import signal
13+
import sys
14+
15+
16+
if TYPE_CHECKING:
17+
# a coroutine function that doesn't accept arguments and whose coroutine doesn't
18+
# return anything
19+
SimpleCoroutineFunction = Callable[[], Coroutine[None, None, None]]
20+
21+
22+
async def gracefully_close(proc: asyncio.subprocess.Process, cmdline: str) -> int:
23+
prefix = make_prefix(cmdline)
24+
prefix_e = make_prefix(cmdline, err=True)
25+
if proc.returncode is not None:
26+
print(
27+
f"{prefix}PID {proc.pid} exited with status code {proc.returncode}",
28+
flush=True,
29+
)
30+
return proc.returncode
31+
32+
print(f"{prefix}Asking PID {proc.pid} to terminate...", flush=True)
33+
proc.terminate()
34+
try:
35+
await asyncio.wait_for(proc.wait(), timeout=10.0)
36+
if proc.returncode is not None:
37+
print(f"{prefix}PID {proc.pid} successfully terminated", flush=True)
38+
return proc.returncode
39+
except BaseException:
40+
pass
41+
42+
print(f"{prefix_e}Killing PID {proc.pid} forcefully...", flush=True)
43+
proc.kill()
44+
try:
45+
await asyncio.wait_for(proc.wait(), timeout=2.0)
46+
if proc.returncode is not None:
47+
print(f"{prefix_e}PID {proc.pid} successfully killed", flush=True)
48+
return proc.returncode
49+
except BaseException:
50+
pass
51+
52+
return -1024
53+
54+
55+
def make_prefix(
56+
cmdline: str, out: bool = False, err: bool = False, maxlength: int = 24
57+
) -> str:
58+
if len(cmdline) > maxlength:
59+
cmdline = cmdline[: maxlength - 1] + "…"
60+
kind = " "
61+
if out:
62+
kind = " →"
63+
elif err:
64+
kind = "!!"
65+
padding = " " * (maxlength - len(cmdline))
66+
return f"{cmdline}{padding} {kind} "
67+
68+
69+
def censor(s: str) -> str:
70+
if s.startswith("--backend-dsn="):
71+
return "--backend-dsn=********"
72+
if s.startswith("--dsn="):
73+
return "--dsn=********"
74+
return s
75+
76+
77+
@dataclasses.dataclass(init=False)
78+
class Minivisor:
79+
"""A tiny process supervisor.
80+
81+
It only gathers output from subprocesses and closes all if any of them dies.
82+
It passes SIGHUP, SIGINT, and SIGTERM but it doesn't multiplex sockets or do
83+
anything else fancy.
84+
"""
85+
86+
processes: dict[str, asyncio.subprocess.Process]
87+
waiters: list[asyncio.Task[int]]
88+
followers: list[asyncio.Task[None]]
89+
out: asyncio.Queue[bytes]
90+
display: asyncio.Task[None]
91+
92+
def __init__(self):
93+
self.processes = {}
94+
self.waiters = []
95+
self.followers = []
96+
self.out = asyncio.Queue()
97+
self.display = asyncio.create_task(self.display_out())
98+
self._is_shutting_down = False
99+
100+
loop = asyncio.get_event_loop()
101+
loop.add_signal_handler(signal.SIGHUP, self.signal_passer)
102+
loop.add_signal_handler(signal.SIGINT, self.signal_passer)
103+
loop.add_signal_handler(signal.SIGTERM, self.signal_passer)
104+
105+
def signal_passer(self, sig: int = 0, frame: FrameType | None = None) -> None:
106+
if not sig:
107+
return
108+
109+
for proc in reversed(self.processes):
110+
proc.send_signal(sig)
111+
112+
async def spawn(
113+
self,
114+
*args: str,
115+
with_healthcheck: SimpleCoroutineFunction | None = None,
116+
grace_period: float = 10.0,
117+
sleep_period: float = 60.0,
118+
) -> None:
119+
"""Spawn a new process with `exec` and wait for initial healthcheck to pass."""
120+
121+
exe = shutil.which(args[0])
122+
if not exe:
123+
raise RuntimeError(f"Missing {args[0]} executable")
124+
125+
cmdline = " ".join(censor(a) for a in args)
126+
prefix_str = make_prefix(cmdline)
127+
prefix_out = make_prefix(cmdline, out=True).encode()
128+
prefix_err = make_prefix(cmdline, err=True).encode()
129+
proc = await asyncio.create_subprocess_exec(
130+
exe,
131+
*args[1:],
132+
stdout=asyncio.subprocess.PIPE,
133+
stderr=asyncio.subprocess.PIPE,
134+
)
135+
await self.out.put(
136+
f"{prefix_str}PID {proc.pid} spawned daemon '{cmdline}'".encode("utf8")
137+
)
138+
initial_pass = asyncio.Future()
139+
waiter_task = asyncio.create_task(
140+
self.check_health(
141+
proc,
142+
cmdline,
143+
with_healthcheck or empty_healthcheck,
144+
initial_pass=initial_pass,
145+
grace_period=grace_period,
146+
sleep_period=sleep_period,
147+
)
148+
)
149+
stdout_task = asyncio.create_task(self.follow(prefix_out, proc.stdout))
150+
stderr_task = asyncio.create_task(self.follow(prefix_err, proc.stderr))
151+
self.processes[cmdline] = proc
152+
self.waiters.append(waiter_task)
153+
self.followers.append(stdout_task)
154+
self.followers.append(stderr_task)
155+
if not await initial_pass:
156+
# Healthchecks are not optional.
157+
await self.out.put(
158+
prefix_err + b"Initial health check failed, shutting down."
159+
)
160+
await self.shutdown()
161+
raise RuntimeError("Cannot continue without all processes healthy")
162+
else:
163+
await self.out.put(
164+
prefix_str.encode("utf8") + b"Initial health check passed."
165+
)
166+
167+
async def once(
168+
self,
169+
*args: str,
170+
input: bytes | None = None,
171+
require_clean_return_code: bool = True,
172+
) -> int:
173+
"""Spawn a short-lived process."""
174+
175+
exe = shutil.which(args[0])
176+
if not exe:
177+
raise RuntimeError(f"Missing {args[0]} executable")
178+
179+
cmdline = " ".join(censor(a) for a in args)
180+
prefix_str = make_prefix(cmdline)
181+
prefix_out = make_prefix(cmdline, out=True).encode()
182+
prefix_err = make_prefix(cmdline, err=True).encode()
183+
proc = await asyncio.create_subprocess_exec(
184+
exe,
185+
*args[1:],
186+
stdout=asyncio.subprocess.PIPE,
187+
stderr=asyncio.subprocess.PIPE,
188+
stdin=asyncio.subprocess.PIPE,
189+
)
190+
await self.out.put(
191+
f"{prefix_str}PID {proc.pid} running command '{cmdline}'".encode("utf8")
192+
)
193+
stdout_task = asyncio.create_task(self.follow(prefix_out, proc.stdout))
194+
stderr_task = asyncio.create_task(self.follow(prefix_err, proc.stderr))
195+
try:
196+
try:
197+
if input is not None:
198+
proc.stdin.write(input)
199+
try:
200+
await proc.stdin.drain()
201+
except (BrokenPipeError, ConnectionResetError):
202+
pass
203+
proc.stdin.close()
204+
205+
await proc.wait()
206+
finally:
207+
return_code = await gracefully_close(proc, cmdline)
208+
stdout_task.cancel()
209+
stderr_task.cancel()
210+
await asyncio.wait([stdout_task, stderr_task], timeout=2.0)
211+
finally:
212+
if not require_clean_return_code or return_code == 0:
213+
return return_code # continue, even if the world is burning
214+
215+
await self.out.put(
216+
prefix_err + b"Return code isn't zero: " + f"{return_code}".encode()
217+
)
218+
await self.shutdown()
219+
raise RuntimeError("Cannot continue without this command succeeding")
220+
221+
async def wait_until_any_terminates(self) -> None:
222+
if self._is_shutting_down:
223+
return
224+
225+
try:
226+
await asyncio.wait(self.waiters, return_when=asyncio.FIRST_COMPLETED)
227+
finally:
228+
await self.shutdown()
229+
230+
async def display_out(self) -> None:
231+
while True:
232+
line = await self.out.get()
233+
if line[-1] != b"\n":
234+
line += b"\n"
235+
sys.stdout.buffer.write(line)
236+
sys.stdout.flush()
237+
238+
async def follow(self, prefix: bytes, s: asyncio.StreamReader) -> None:
239+
"""Generates lines."""
240+
accu = prefix
241+
while not s.at_eof():
242+
try:
243+
line = await asyncio.wait_for(s.readuntil(b"\n"), timeout=1.0)
244+
for li in line.splitlines():
245+
if li.strip():
246+
await self.out.put(accu + li)
247+
accu = prefix
248+
except asyncio.LimitOverrunError:
249+
# a lot of characters without a newline; let's just accumulate them
250+
accu += await s.read(2 ** 16)
251+
except asyncio.TimeoutError:
252+
# no data coming or no \n; wait a bit longer
253+
continue
254+
except asyncio.IncompleteReadError as ire:
255+
# reached EOF without a newline; let's display what we got and exit
256+
if ire.partial:
257+
await self.out.put(accu + ire.partial)
258+
return
259+
except asyncio.CancelledError:
260+
# follow() is being cancelled, let's flush what we got so far
261+
if accu != prefix:
262+
try:
263+
self.out.put_nowait(accu)
264+
except asyncio.QueueFull:
265+
pass
266+
raise
267+
268+
async def shutdown(self) -> None:
269+
if self._is_shutting_down:
270+
return
271+
272+
self._is_shutting_down = True
273+
for waiter in self.waiters:
274+
# Sic, cancel all waiters, including possibly done ones, because
275+
# in this `finally:` block we might be in the middle of an exception.
276+
waiter.cancel()
277+
for cmdline, proc in reversed(list(self.processes.items())):
278+
# Sic, serially close in reverse order.
279+
await gracefully_close(proc, cmdline=cmdline)
280+
281+
# At this point all followers should be finished but let's ensure that.
282+
if self.followers:
283+
for follower in self.followers:
284+
follower.cancel()
285+
await asyncio.wait(self.followers, timeout=2.0)
286+
287+
# Finally we can close our output queue display.
288+
self.display.cancel()
289+
await asyncio.wait([self.display], timeout=2.0)
290+
291+
async def is_unhealthy(
292+
self,
293+
proc: asyncio.subprocess.Process,
294+
cmdline: str,
295+
hc: SimpleCoroutineFunction,
296+
) -> bool:
297+
"""Return True if healthcheck failed."""
298+
299+
prefix = make_prefix(cmdline, err=True)
300+
failed = False
301+
try:
302+
await hc()
303+
except Exception as exc:
304+
failed = True
305+
for line in str(exc).splitlines():
306+
if line.strip():
307+
line = "Health: " + prefix + line
308+
await self.out.put(line.encode())
309+
return failed or proc.returncode is not None
310+
311+
async def check_health(
312+
self,
313+
proc: asyncio.subprocess.Process,
314+
cmdline: str,
315+
hc: SimpleCoroutineFunction,
316+
grace_period: float = 10.0,
317+
sleep_period: float = 60.0,
318+
initial_pass: asyncio.Future | None = None,
319+
) -> None:
320+
failures = 0
321+
await asyncio.sleep(grace_period)
322+
while True:
323+
if await self.is_unhealthy(proc, cmdline, hc):
324+
failures += 1
325+
else:
326+
if initial_pass is not None:
327+
initial_pass.set_result(True)
328+
initial_pass = None
329+
failures = 0
330+
if failures == 3:
331+
await gracefully_close(proc, cmdline)
332+
if initial_pass is not None:
333+
initial_pass.set_result(False)
334+
initial_pass = None
335+
return
336+
try:
337+
sleep_sec = sleep_period if initial_pass is None else grace_period
338+
await asyncio.wait_for(proc.wait(), timeout=sleep_sec)
339+
if initial_pass is not None:
340+
initial_pass.set_result(False)
341+
initial_pass = None
342+
return
343+
except asyncio.TimeoutError:
344+
continue
345+
346+
347+
async def empty_healthcheck() -> None:
348+
return
349+
350+
351+
async def selftest() -> None:
352+
i = 0
353+
async def _failing_recovering_healthcheck():
354+
nonlocal i
355+
await asyncio.sleep(2.0)
356+
if i % 3 == 0:
357+
i += 1
358+
raise RuntimeError("healthcheck failed synthetically")
359+
i += 1
360+
361+
mv = Minivisor()
362+
await mv.spawn("tail", "-F", "/var/log/system.log")
363+
await mv.spawn(
364+
"python3", "-u", "-m", "http.server", with_healthcheck=_failing_recovering_healthcheck
365+
)
366+
await mv.spawn("tail", "-F", "/var/log/syslog")
367+
await mv.wait_until_any_terminates()
368+
369+
370+
if __name__ == "__main__":
371+
asyncio.run(selftest())

0 commit comments

Comments
 (0)