forked from psychogenic/spasics
-
Notifications
You must be signed in to change notification settings - Fork 0
Expand file tree
/
Copy pathi2c_client_packets.py
More file actions
131 lines (100 loc) · 4.08 KB
/
i2c_client_packets.py
File metadata and controls
131 lines (100 loc) · 4.08 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
# translations from spasic.error_codes
ErrorCodes = {
0x01: 'Unknown Command',
0x02: 'Unknown Experiment',
0x03: 'Busy',
0x04: 'Unterminated Core1 Exp',
0x05: 'Runtime Exception',
0x06: 'Invalid Request',
0x07: 'Unknown Variable',
0x08: 'Cannot open file',
0x09: 'EOF',
0x0A: 'Write Failure',
0x0B: 'MakeDir Failure',
0x0C: 'Delete File Failure',
0x0D: 'Rename File Failure',
0x0E: 'POST Fail'
}
class ClientPacketGenerator:
def __init__(self):
pass
def abort(self):
return bytearray([ord('A')])
def time_sync(self, t_now:int):
bts = bytearray([ord('T')])
bts += t_now.to_bytes(4, 'little')
return bts
def status(self):
return bytearray([ord('S')])
def ping(self, count:int, extra_payload=b'PNG'):
bts = bytearray([ord('P'), count % 256])
bts += extra_payload[:7]
return bts
def info(self):
return bytearray([ord('I')])
def reboot(self, safe_mode:bool=False):
return bytearray([ord('R'), 1 if safe_mode else 0])
def experiment_result(self):
return bytearray([ord('E') + ord('I')])
def experiment_packets_list(self, cmd:int, experiment_id:int, args:bytearray=None):
ret_list = []
if args is not None and len(args):
for chunk in [args[i:i + 7] for i in range(0, len(args), 7)]:
bts = bytearray([ord('E') + ord('A')])
bts.extend(chunk)
ret_list.append(bts)
bts = bytearray([cmd])
bts += experiment_id.to_bytes(2, 'little')
ret_list.append(bts)
return ret_list
def run_experiment_now_list(self, experiment_id:int, args:bytearray=None):
return self.experiment_packets_list(ord('E'), experiment_id, args)
def experiment_queue(self, experiment_id:int, args:bytearray=None):
return self.experiment_packets_list(ord('E') + ord('Q'), experiment_id, args)
def filesize(self, varid:int):
return bytearray([ord('F'), ord('S'), varid])
def mkdir(self, varid:int):
return bytearray([ord('F'), ord('D'), varid])
def lsdir(self, varid:int):
return bytearray([ord('F'), ord('L'), varid])
def file_unlink(self, varid:int):
return bytearray([ord('F'), ord('U'), varid])
def file_move(self, srcvarid:int, destvarid:int):
return bytearray([ord('F'), ord('M'), srcvarid, destvarid])
def open_read(self, varid:int):
return bytearray([ord('F'), ord('O'), varid, ord('R')])
def file_close(self):
return bytearray([ord('F') + ord('C')])
def file_read(self, num_bytes:int=0):
if num_bytes:
return bytearray([ord('F') + ord('R'), num_bytes % 256])
else:
return bytearray([ord('F') + ord('R')])
def file_write_list(self, bts_to_write:bytearray):
cmdPrefix = ord('F') + ord('W')
ret_list = []
for chunk in [bts_to_write[i:i + 7] for i in range(0, len(bts_to_write), 7)]:
vals = [cmdPrefix]
vals.extend(chunk)
ret_list.append(bytearray(vals))
return ret_list
def open_write(self, varid:int):
return bytearray([ord('F'), ord('O'), varid, ord('W')])
def checksum(self, varid:int):
return bytearray([ord('F'), ord('Z'), varid])
def getvar(self, v:int):
return bytearray([ord('V'), v])
def setvar_list(self, v:int, val):
if isinstance(val, str):
val = bytearray(val, 'ascii')
chunks = [val[i:i + 6] for i in range(0, len(val), 6)]
setvar = bytearray([ord('V') + ord('S'), v])
setvar.extend(chunks[0])
ret_list = [setvar]
if len(chunks) == 1:
return ret_list
for i in range(1, len(chunks)):
appvar = bytearray([ord('V') + ord('A'), v])
appvar.extend(bytearray(chunks[i]))
ret_list.append(appvar)
return ret_list