-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathRxTxBytes.py
More file actions
executable file
·140 lines (112 loc) · 2.96 KB
/
RxTxBytes.py
File metadata and controls
executable file
·140 lines (112 loc) · 2.96 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
#!/usr/bin/env python
"""
Get the number of bytes received and sent from a network interface.
"""
import appindicator
import gtk
import os.path
import signal
import subprocess
import sys
# Catch CTRL+C
def signal_handler(signal, frame):
print('\nAborted!')
sys.exit(0)
signal.signal(signal.SIGINT, signal_handler)
sys.tracebacklimit = 0
class RxTxBytes():
"""
Bytes received and sent Object
"""
PATH = "/sys/class/net/"
PATH_STATS = "/statistics/"
FILE_SUFFIX = "_bytes"
PING_FREQUENCY = 1 # every 1 second
bytesData = {}
def __init__(self, interface):
"""
Constructor
"""
self.bytesData['rx'] = self.PATH + interface + self.PATH_STATS + 'rx' + self.FILE_SUFFIX;
self.bytesData['tx'] = self.PATH + interface + self.PATH_STATS + 'tx' + self.FILE_SUFFIX;
# AppIndicator
self.ind = appindicator.Indicator(
"RxTx",
"utilities-system-monitor",
appindicator.CATEGORY_APPLICATION_STATUS)
self.ind.set_status(appindicator.STATUS_ACTIVE)
#self.ind.set_attention_icon("utilities-system-monitor")
self.ind.set_label("Loading ...")
# Set menu
self.menu_setup()
self.ind.set_menu(self.menu)
def getBytes(self, mode):
"""
Get the bytes received or sent
@param String [ The mode (tx or rx) ]
@return Integer [ Size ], Message [ If error ]
"""
if not mode in self.bytesData:
return 0, "Error: the mode is invalid"
filename = self.bytesData[mode]
if not os.path.isfile(filename):
return 0, "The " + filename + " cannot be found"
p = subprocess.Popen(['cat', filename],
stdout=subprocess.PIPE,
stderr=subprocess.PIPE
)
out, err = p.communicate()
return int(out), err
def getHumanReadableBytes(self, size, suffix = 'B'):
"""
Get the human readable version of bytes received or sent
@param size [ The size ]
@return String [ Human readable version]
"""
if not isinstance(size, ( int, long )):
return "Error"
for unit in ['','Ki','Mi','Gi','Ti','Pi','Ei','Zi']:
if abs(size) < 1024.0:
return "%3.4f %s%s" % (size, unit, suffix)
size /= 1024.0
return "%.4f %s%s" % (size, 'Yi', suffix)
def main(self):
self.fetch()
gtk.timeout_add(self.PING_FREQUENCY * 1000, self.fetch)
gtk.main()
def fetch(self):
"""
Fetch the data from the statistic files
"""
rx, rxErr = self.getBytes('rx')
tx, txErr = self.getBytes('tx')
msg = ""
if rxErr:
msg += rxErr
if txErr:
msg += txErr
if not msg:
msg += self.getHumanReadableBytes(rx) + ' / ' + self.getHumanReadableBytes(tx)
self.ind.set_label(msg)
return True
# App Indicator methods
def menu_setup(self):
"""
Setup the AppIndicator Menu
"""
self.menu = gtk.Menu()
quit_item = gtk.MenuItem("Quit")
quit_item.connect("activate", self.quit)
quit_item.show()
self.menu.append(quit_item)
#separator = gtk.SeparatorMenuItem()
#separator.show()
#self.menu.append(separator)
def quit(self, which):
"""
Close the program
"""
sys.exit(0)
if __name__ == "__main__":
indicator = RxTxBytes('eth0')
indicator.main()