-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathbattery-arm32.py
More file actions
128 lines (116 loc) · 3.6 KB
/
battery-arm32.py
File metadata and controls
128 lines (116 loc) · 3.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
from BaseHTTPServer import BaseHTTPRequestHandler, HTTPServer
import subprocess
import simplejson as json
import time
import socket
import errno
from SocketServer import ThreadingMixIn
def get_percentage():
return run_cmd('batterydata | grep "Current Capacity" | awk \'{print $4}\'').strip()
def get_charging_status():
status = run_cmd('batterydata | grep "Is Charging" | awk \'{print $4}\'').strip()
return "Charging" if status == '1' else "NotCharging"
def run_cmd(cmd):
proc = subprocess.Popen(cmd, shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
start = time.time()
while True:
ret = proc.poll()
if ret is not None:
out, _ = proc.communicate()
try:
return out
except OSError:
return ''
if time.time() - start > 5:
try:
proc.terminate()
except (OSError, AttributeError):
pass
time.sleep(0.1)
if proc.poll() is None:
try:
proc.kill()
except (OSError, AttributeError):
pass
out, _ = proc.communicate()
return out or ''
time.sleep(0.1)
class serverHandler(BaseHTTPRequestHandler):
def do_GET(self):
try:
if self.path == '/status':
self.send_response(200)
self.send_header('Content-type', 'application/json')
self.end_headers()
body = json.dumps({'Battery': get_percentage(), 'Battery status': get_charging_status()})
safe_write(self.wfile, body)
else:
self.send_error(404)
except KeyboardInterrupt:
return
except socket.error, e:
try:
err_no = e.errno
except AttributeError:
try:
err_no = e[0]
except Exception:
err_no = None
if err_no == errno.EPIPE:
return
return
except IOError, e:
try:
err_no = e.errno
except AttributeError:
try:
err_no = e[0]
except Exception:
err_no = None
if err_no == errno.EPIPE:
return
return
class ThreadedHTTPServer(ThreadingMixIn, HTTPServer):
"""Handle requests in a separate thread."""
daemon_threads = True
def safe_write(wfile, data):
"""Write to wfile and ignore EPIPE/Broken pipe errors (client disconnected).
wfile is the socket-like file object (self.wfile). data should be a str
(text).
"""
try:
wfile.write(data)
except socket.error, e:
err_no = None
try:
err_no = e.errno
except Exception:
try:
idx = e[0]
if isinstance(idx, int):
err_no = idx
except Exception:
err_no = None
if err_no == errno.EPIPE:
return
raise
def main():
server = ThreadedHTTPServer(('0.0.0.0', 8000), serverHandler)
print('Starting server, use <Ctrl-C> to stop')
try:
server.serve_forever()
except KeyboardInterrupt:
print('Shutting down server')
try:
server.shutdown()
except Exception:
try:
server.socket.close()
except Exception:
pass
try:
server.server_close()
except Exception:
pass
if __name__ == '__main__':
main()