-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathrequest-response.py
More file actions
249 lines (192 loc) · 7.84 KB
/
request-response.py
File metadata and controls
249 lines (192 loc) · 7.84 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
from serial import Serial
from signal import signal, SIGINT, SIGTERM
from time import time, sleep
from threading import Thread
from typing import NoReturn, Final, LiteralString
from utils import contsruct_payload_from_json, get_json_from_packet, KeyAsyncReader, PACKET_HEADER, PACKET_FOOTER
import threading
#####################################################################
### Internal variables
_RW_CMD_QUERY : Final[LiteralString] = 'Enter a command name to run: '
_RW_SERIAL_CONN : Serial = None
_RW_READER_THREAD : Thread = None
_RW_KEEP_ALIVE : bool = True
_RW_DO_EXIT : bool = False
_RW_INPUT_BUFFER : str = ''
#####################################################################
### User variables
IS_INTERCEPTIR : Final[bool] = True
PORT_NAME : Final[str] = 'COM8' #'/dev/cu.usbmodem2101' #'/dev/ttys016'
#####################################################################
### Funcs
def exit_handler( sig, frame ) -> None:
global _RW_DO_EXIT
print( f'Signal caught: {sig}' )
_RW_DO_EXIT = True
def keyPressCb( char:str ):
global _RW_INPUT_BUFFER
byte:Final[bytes] = char.encode()
# print( 'Key Pressed:', byte )
# Backspace
if ( byte == b'\x08' ) and ( len(_RW_INPUT_BUFFER) > 0 ):
_RW_INPUT_BUFFER = _RW_INPUT_BUFFER[:-1]
updateCmdQueryLine()
# Enter
elif byte == b'\r' or byte == b'\n':
writeSerialCommand( _RW_INPUT_BUFFER )
_RW_INPUT_BUFFER = ''
print()
updateCmdQueryLine()
sleep( 0.25 )
# [a-zA-Z_]
elif byte.isalpha() or byte == b'_':
_RW_INPUT_BUFFER += char
updateCmdQueryLine()
def clearCmdQueryLine():
global _RW_CMD_QUERY, _RW_INPUT_BUFFER
print( '\r' + (' ' * (len(_RW_CMD_QUERY) + len(_RW_INPUT_BUFFER) + 1)), end='' )
def updateCmdQueryLine():
global _RW_CMD_QUERY, _RW_INPUT_BUFFER
newLine:Final[str] = f'{_RW_CMD_QUERY}{_RW_INPUT_BUFFER}'
out:str = '\r'
out += ' ' * ( len(newLine) + 1 )
out += f'\r{newLine}'
print( out, end='' )
def openSerial( port_name: str ) -> None:
global _RW_SERIAL_CONN
_RW_SERIAL_CONN = Serial( port_name, baudrate=115200, timeout=0.2 )
def closeSerial() -> None:
global _RW_SERIAL_CONN
if _RW_SERIAL_CONN and _RW_SERIAL_CONN.is_open:
print( 'Closing serial connection...' )
_RW_SERIAL_CONN.close()
def wait_for_app():
# Decided to give user 1/2s interval feedback that script is still running
wait_str : Final[LiteralString] = 'Waiting for app and API to load. Please wait... '
start : Final[float] = time()
cutoff : int = -3
while True:
print( ' ' * len(wait_str), end='\r' ) # erase line
print( wait_str[:cutoff], end='\r' ) # update line
sleep( 0.5 )
if _RW_DO_EXIT:
closeSerial()
return 0
elif time() - start >= 60.0:
print() # newline
break
elif cutoff == -1:
cutoff = -3
else:
cutoff += 1
def readSerial() -> str|None:
global _RW_SERIAL_CONN, _RW_DO_EXIT
buffer:bytes = b''
timeFirstByte = None
# Local function for handling invalid packet structure
def _readError( msg: str ):
print( msg )
_RW_SERIAL_CONN.read_all() # flush read buffer
# Read serial buffer one byte at a time.
# We explicitly AVOID use of `Serial.read_until(...)` due to existing bug(s) which lead to undefined behavior.
while not _RW_DO_EXIT:
# Read one byte
byte:bytes = _RW_SERIAL_CONN.read( 1 ) # 0.2s timeout (set upon Serial obj construction)
# If read timed out, read again
if len( byte ) == 0:
continue
# Ensure first two bytes are the packet header
elif ( buffer == b'' ) and ( byte != PACKET_HEADER[0].to_bytes() ): # Accessing an array of bytes by index returns an int, so we re-cast to bytes for comparison
_readError( f'FIRST BYTE WAS {byte}, NOT {PACKET_HEADER[0].to_bytes()}' )
return None
elif ( buffer == PACKET_HEADER[0].to_bytes() ) and ( byte != PACKET_HEADER[1].to_bytes() ):
_readError( f'SECOND BYTE WAS {byte}, NOT {PACKET_HEADER[1].to_bytes()}' )
return None
# Append byte to packet buffer
buffer += byte
# If the packet has taken longer than 2 seconds to receive, consider it invalid and exit loop
if timeFirstByte == None:
timeFirstByte = time()
elif time() - timeFirstByte > 3.0:
_readError( 'PACKET STILL INCOMPLETE AFTER 3.0 SECONDS' )
return None
# If we've received the packet footer, stop reading
elif buffer.endswith( PACKET_FOOTER ):
break
if _RW_DO_EXIT:
return None
# Parse JSON message from packet
return get_json_from_packet( buffer )
def writeSerialCommand( cmd: str ) -> int|None:
global _RW_SERIAL_CONN
payload:Final[bytes] = contsruct_payload_from_json( '{\"command\":\"' + cmd + '\"}' )
return _RW_SERIAL_CONN.write( payload )
# The program is stopped when any of the following cases occur:
# 1. Entering ctrl+c from the console (Expected means of termination)
# 2. IS_INTERCEPTIR is true and the API mode request packet is not received
def main() -> NoReturn:
global _RW_SERIAL_CONN, PORT_NAME, _RW_READER_THREAD, _RW_KEEP_ALIVE
signal( SIGINT, exit_handler )
signal( SIGTERM, exit_handler )
json:str|None = None
if IS_INTERCEPTIR:
print( 'Waiting for InterceptIR to open serial connection...' )
# Wait for IIR to open serial port.
# Once open, do not send any commands but instead listen for API mode request
while True:
try:
openSerial( PORT_NAME )
break
except IOError as e:
# print( e.args[0] )
if _RW_DO_EXIT:
closeSerial()
return 0
# Wait for API mode request
print( 'Waiting for API mode request...' )
json = readSerial()
if _RW_DO_EXIT:
closeSerial()
return 0
elif json == '{"request":"which_mode"}':
# Choose USB mode
_RW_SERIAL_CONN.write( contsruct_payload_from_json('{"mode":"usb"}', do_print=True) )
print( 'Selecting USB API mode...' )
else:
print( f'Expected API mode request but instead received: "{json}"\nQuitting...' )
closeSerial()
return 1
# Give the app and API time to load...
# print( 'Waiting for app and API to load. Please wait...' )
# sleep( 60.0 )
wait_for_app()
else:
openSerial( PORT_NAME )
#################
# Notice - we updated our API since this program was created. The new API has several
# long-running commands, like start_cm, that continually return responses which this
# program may not be suited for. For now, it's recommended to stick with single-response
# commands like "get_device_info". In the future, we plan to have more single-response
# commands that can be used with this script.
#################
# Start keystroke-capture thread
keycap = KeyAsyncReader()
keycap.startReading( keyPressCb )
print( 'Getting device info...' )
writeSerialCommand( 'get_device_info' )
# Main program loop
while not _RW_DO_EXIT:
json = readSerial()
if json:
clearCmdQueryLine()
print( f'\rReceived:\n{json}' )
updateCmdQueryLine()
# Join keystroke-capture thread
keycap.stopReading()
# Close serial connection
closeSerial()
return 0
#####################################################################
### Script entry point
if __name__ == '__main__':
main()