11"""Build HAT handling functionality"""
22
3+ import logging
34import queue
5+ import tempfile
46import threading
57import time
68from enum import Enum
@@ -69,13 +71,14 @@ class BuildHAT:
6971 RESET_GPIO_NUMBER = 4
7072 BOOT0_GPIO_NUMBER = 22
7173
72- def __init__ (self , firmware , signature , version , device = "/dev/serial0" ):
74+ def __init__ (self , firmware , signature , version , device = "/dev/serial0" , debug = False ):
7375 """Interact with Build HAT
7476
7577 :param firmware: Firmware file
7678 :param signature: Signature file
7779 :param version: Firmware version
7880 :param device: Serial device to use
81+ :param debug: Optional boolean to log debug information
7982 :raises BuildHATError: Occurs if can't find HAT
8083 """
8184 self .cond = Condition ()
@@ -88,6 +91,10 @@ def __init__(self, firmware, signature, version, device="/dev/serial0"):
8891 self .running = True
8992 self .vincond = Condition ()
9093 self .vin = None
94+ if debug :
95+ tmp = tempfile .NamedTemporaryFile (suffix = ".log" , prefix = "buildhat-" , delete = False )
96+ logging .basicConfig (filename = tmp .name , format = '%(asctime)s %(message)s' ,
97+ level = logging .INFO )
9198
9299 for _ in range (4 ):
93100 self .connections .append (Connection ())
@@ -99,16 +106,18 @@ def __init__(self, firmware, signature, version, device="/dev/serial0"):
99106 # Check if we're in the bootloader or the firmware
100107 self .write (b"version\r " )
101108
109+ emptydata = 0
102110 incdata = 0
103111 while True :
104- try :
105- line = self .ser .readline ().decode ('utf-8' , 'ignore' )
106- except serial .SerialException :
107- pass
112+ line = self .read ()
108113 if len (line ) == 0 :
109- # Didn't recieve any data
110- break
111- if line [:len (BuildHAT .FIRMWARE )] == BuildHAT .FIRMWARE :
114+ # Didn't receive any data
115+ emptydata += 1
116+ if emptydata > 3 :
117+ break
118+ else :
119+ continue
120+ if cmp (line , BuildHAT .FIRMWARE ):
112121 self .state = HatState .FIRMWARE
113122 ver = line [len (BuildHAT .FIRMWARE ):].split (' ' )
114123 if int (ver [0 ]) == version :
@@ -117,7 +126,7 @@ def __init__(self, firmware, signature, version, device="/dev/serial0"):
117126 else :
118127 self .state = HatState .NEEDNEWFIRMWARE
119128 break
120- elif line [: len ( BuildHAT . BOOTLOADER )] == BuildHAT .BOOTLOADER :
129+ elif cmp ( line , BuildHAT .BOOTLOADER ) :
121130 self .state = HatState .BOOTLOADER
122131 break
123132 else :
@@ -184,15 +193,15 @@ def loadfirmware(self, firmware, signature):
184193 self .getprompt ()
185194 self .write ("load {} {}\r " .format (len (firm ), self .checksum (firm )).encode ())
186195 time .sleep (0.1 )
187- self .write (b"\x02 " )
188- self .write (firm )
189- self .write (b"\x03 \r " )
196+ self .write (b"\x02 " , replace = "0x02" )
197+ self .write (firm , replace = "--firmware file--" )
198+ self .write (b"\x03 \r " , replace = "0x03" )
190199 self .getprompt ()
191200 self .write ("signature {}\r " .format (len (sig )).encode ())
192201 time .sleep (0.1 )
193- self .write (b"\x02 " )
194- self .write (sig )
195- self .write (b"\x03 \r " )
202+ self .write (b"\x02 " , replace = "0x02" )
203+ self .write (sig , replace = "--signature file--" )
204+ self .write (b"\x03 \r " , replace = "0x03" )
196205 self .getprompt ()
197206
198207 def getprompt (self ):
@@ -201,12 +210,8 @@ def getprompt(self):
201210 Need to decide what we will do, when no prompt
202211 """
203212 while True :
204- line = b""
205- try :
206- line = self .ser .readline ().decode ('utf-8' , 'ignore' )
207- except serial .SerialException :
208- pass
209- if line [:len (BuildHAT .PROMPT )] == BuildHAT .PROMPT :
213+ line = self .read ()
214+ if cmp (line , BuildHAT .PROMPT ):
210215 break
211216
212217 def checksum (self , data ):
@@ -224,12 +229,33 @@ def checksum(self, data):
224229 u = (u ^ data [i ]) & 0xFFFFFFFF
225230 return u
226231
227- def write (self , data ):
232+ def write (self , data , log = True , replace = "" ):
228233 """Write data to the serial port of Build HAT
229234
230235 :param data: Data to write to Build HAT
236+ :param log: Whether to log line or not
237+ :param replace: Whether to log an alternative string
231238 """
232239 self .ser .write (data )
240+ if not self .fin and log :
241+ if replace != "" :
242+ logging .info ("> {}" .format (replace ))
243+ else :
244+ logging .info ("> {}" .format (data .decode ('utf-8' , 'ignore' ).strip ()))
245+
246+ def read (self ):
247+ """Read data from the serial port of Build HAT
248+
249+ :return: Line that has been read
250+ """
251+ line = ""
252+ try :
253+ line = self .ser .readline ().decode ('utf-8' , 'ignore' ).strip ()
254+ except serial .SerialException :
255+ pass
256+ if line != "" :
257+ logging .info ("< {}" .format (line ))
258+ return line
233259
234260 def shutdown (self ):
235261 """Turn off the Build HAT devices"""
@@ -275,11 +301,7 @@ def loop(self, cond, uselist, q):
275301 """
276302 count = 0
277303 while self .running :
278- line = b""
279- try :
280- line = self .ser .readline ().decode ('utf-8' , 'ignore' )
281- except serial .SerialException :
282- pass
304+ line = self .read ()
283305 if len (line ) == 0 :
284306 continue
285307 if line [0 ] == "P" and line [2 ] == ":" :
@@ -326,7 +348,7 @@ def runit():
326348
327349 if line [0 ] == "P" and (line [2 ] == "C" or line [2 ] == "M" ):
328350 portid = int (line [1 ])
329- data = line [5 :].strip (). split (" " )
351+ data = line [5 :].split (" " )
330352 newdata = []
331353 for d in data :
332354 if "." in d :
@@ -341,8 +363,8 @@ def runit():
341363 with self .portcond [portid ]:
342364 self .portcond [portid ].notify ()
343365
344- if len (line ) >= 5 and line [1 ] == "." and line .strip (). endswith (" V" ):
345- vin = float (line .strip (). split (" " )[0 ])
366+ if len (line ) >= 5 and line [1 ] == "." and line .endswith (" V" ):
367+ vin = float (line .split (" " )[0 ])
346368 self .vin = vin
347369 with self .vincond :
348370 self .vincond .notify ()
0 commit comments