-
Notifications
You must be signed in to change notification settings - Fork 3
Expand file tree
/
Copy pathmyfuncs.py
More file actions
143 lines (112 loc) · 4.6 KB
/
myfuncs.py
File metadata and controls
143 lines (112 loc) · 4.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
import errno
import logging
import os
import signal
import time
from collections.abc import Callable
from functools import wraps
from types import FrameType
from typing import Any, Optional, TypeVar
logging.basicConfig(format="%(asctime)s %(message)s")
F = TypeVar("F", bound=Callable[..., Any])
def timeit(some_func: F) -> F:
"""Decorator that logs how long the wrapped function took to run (at DEBUG level)."""
@wraps(some_func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
t1: float = time.perf_counter()
result: Any = some_func(*args, **kwargs)
diff: float = time.perf_counter() - t1
logging.debug("%s completed in %.5f seconds.", some_func.__name__, diff)
return result
return wrapper # type: ignore[return-value]
# Valid /proc/status memory scale suffixes → multiplier in bytes
_VM_SCALE: dict[str, float] = {
"kB": 1024.0,
"mB": 1024.0 * 1024.0, # non-standard but seen in the wild
"KB": 1024.0,
"MB": 1024.0 * 1024.0,
}
def memory_usage(vm_key: str = "VmRSS:") -> int:
"""Return the current process's memory usage in bytes for the given /proc/status key.
Defaults to VmRSS (resident set size). Returns 0 on non-Linux systems or on
any parse failure.
Common keys: VmRSS (resident), VmSize (virtual), VmPeak (peak virtual).
"""
try:
with open(f"/proc/{os.getpid()}/status") as f:
status: str = f.read()
except OSError:
return 0 # non-Linux or permission denied
try:
# Find the key line, e.g. "VmRSS: 9999 kB\n"
idx: int = status.index(vm_key)
parts: list[str] = status[idx:].split(None, 3)
if len(parts) < 3:
return 0
scale: float = _VM_SCALE.get(parts[2], 0.0)
if scale == 0.0:
return 0
return round(float(parts[1]) * scale)
except (ValueError, IndexError):
return 0
def daemonize() -> None:
"""Detach the current process from the terminal and run it as a daemon.
Uses the standard UNIX double-fork technique to ensure the daemon cannot
re-acquire a controlling terminal. Safe on Python 3 on any POSIX system.
"""
import resource
pid: int = os.fork()
if pid == 0: # First child
os.setsid() # Create a new session; detach from controlling terminal
signal.signal(signal.SIGHUP, signal.SIG_IGN) # Ignore SIGHUP
pid = os.fork() # Second fork: prevent re-acquiring a terminal
if pid != 0:
os._exit(0) # Exit first child; grandchild continues
os.umask(0) # Clear umask so daemon can create files with any permissions
else:
os._exit(0) # Exit the original parent
# Close all open file descriptors so the daemon doesn't hold onto
# inherited handles (sockets, pipes, log files, etc.)
maxfd: int = resource.getrlimit(resource.RLIMIT_NOFILE)[1]
if maxfd == resource.RLIM_INFINITY:
maxfd = 1024 # POSIX minimum; use as a safe fallback
for fd in range(maxfd):
try:
os.close(fd)
except OSError:
pass # fd wasn't open; that's fine
null_fd: int = os.open(os.devnull, os.O_RDWR) # Opens as fd 0 (stdin → /dev/null)
os.dup2(null_fd, 1) # stdout → /dev/null
os.dup2(null_fd, 2) # stderr → /dev/null
class TimedOutError(Exception):
"""Raised by the @timeout decorator when a function exceeds its time limit."""
pass
def timeout(
seconds: float = 10.0,
error_message: str = os.strerror(errno.ETIME),
) -> Callable[[F], F]:
"""Decorator that raises TimedOutError if the wrapped function runs too long.
Usage:
@timeout(2.5, 'That took way too long')
def my_func(some, args):
try:
do_something_that_might_hang()
except TimedOutError as e:
logging.debug("Timed out: %s", e)
Signal-based timeouts are not thread-safe. Only use this decorator
on the main thread. If you need thread-safe timeouts, use concurrent.futures
with a ThreadPoolExecutor or ProcessPoolExecutor instead.
"""
def decorator(func: F) -> F:
def _timed_out(signum: int, frame: Optional[FrameType]) -> None:
raise TimedOutError(error_message)
@wraps(func)
def wrapper(*args: Any, **kwargs: Any) -> Any:
signal.signal(signal.SIGALRM, _timed_out)
signal.setitimer(signal.ITIMER_REAL, seconds)
try:
return func(*args, **kwargs)
finally:
signal.setitimer(signal.ITIMER_REAL, 0)
return wrapper # type: ignore[return-value]
return decorator # type: ignore[return-value]