diff --git a/bd_tools/src/bd_tools/bin/control_motor.py b/bd_tools/src/bd_tools/bin/control_motor.py index 674d46a..b937c0f 100755 --- a/bd_tools/src/bd_tools/bin/control_motor.py +++ b/bd_tools/src/bd_tools/bin/control_motor.py @@ -53,15 +53,18 @@ def make_type(x, to_type): boards.initMotor(client, board_ids) - def callback() -> bool: + def callback() -> int: boards.clearWDGRST(client) try: boards.driveMotor(client, board_ids, actuations, mode) - except (comms.ProtocolError, comms.MalformedPacketError): - return False + except (comms.ProtocolError, comms.MalformedPacketError) as e: + if "id: " in str(e): + return int(str(e).split("id: ")[1][0]) + else: + return -1 - return True + return 0 loop = utils.DebugLoop(callback, args.num_iters, iters_per_print=1000) diff --git a/bd_tools/src/bd_tools/bin/read_sensor.py b/bd_tools/src/bd_tools/bin/read_sensor.py index 230a437..fe70ae8 100755 --- a/bd_tools/src/bd_tools/bin/read_sensor.py +++ b/bd_tools/src/bd_tools/bin/read_sensor.py @@ -81,7 +81,7 @@ def action(args): num_boards = len(board_ids) - def callback() -> bool: + def callback() -> int: boards.clearWDGRST(client) try: @@ -98,8 +98,12 @@ def callback() -> bool: print("Board:", bid, message.format(args.sensor, val)) except (comms.MalformedPacketError, comms.ProtocolError) as e: - print(e) - return False + if "id: " in str(e): + return int(str(e).split("id: ")[1][0]) + else: + return -1 + + return 0 loop = utils.DebugLoop( callback=callback, num_iters=args.num_iters, iters_per_print=1000 diff --git a/bd_tools/src/bd_tools/utils.py b/bd_tools/src/bd_tools/utils.py index 7dd460a..58be449 100644 --- a/bd_tools/src/bd_tools/utils.py +++ b/bd_tools/src/bd_tools/utils.py @@ -1,5 +1,6 @@ """Defines a set of convenient utilities to use in scripts.""" import time +from collections import Counter from typing import Callable @@ -17,8 +18,8 @@ def __init__( Arguments: callback: called for each iteration of the loop. This callable - takes no arguments and should return True if successful else, - False. + takes no arguments and should return 0 if successful and an + error code otherwise (this will be used to report statistics). num_iters: number of iterations to run before exiting. iters_per_print: number of iterations between prints. """ @@ -26,32 +27,54 @@ def __init__( self._num_iters = num_iters self._iters_per_print = iters_per_print - self._errors = 0 + self._errors = {} + self._cummulative_errors = Counter({}) self._iters = 0 self._iter_start_time = None def _loop_func(self): - self._errors += 0 if self._callback() else 1 + err = self._callback() + assert type(err) is int + + if err != 0: + if err not in self._errors: + self._errors[err] = 1 + else: + self._errors[err] += 1 self._iters += 1 if self._iters % self._iters_per_print == 0: self._print_func() self._iter_start_time = time.time() + def _print_error_codes(self, errors): + total_errors = sum(errors.values()) + for error_code, count in sorted(errors.items()): + code_errors = count / total_errors * 100 + print( + f"Error code: {error_code} was {round(code_errors, 2)}% of " + f"errors." + ) + def _print_func(self): now = time.time() diff = now - self._iter_start_time freq = self._iters_per_print / diff - error_rate = self._errors / self._iters_per_print * 100 + total_errors = sum(self._errors.values()) + error_rate = total_errors / self._iters_per_print * 100 print( f"Loop frequency is {round(freq, 2)}hz at " + f"({total_errors}/{self._iters_per_print})" f"{round(error_rate, 2)}% error rate." ) + self._print_error_codes(self._errors) + # Reset statistics variables. self._start_time = now - self._errors = 0 + self._cummulative_errors += Counter(self._errors) + self._errors = {} def loop(self): """Loops the callback. Can be escaped with ctrl^c.""" @@ -67,4 +90,6 @@ def loop(self): f"Interrupted. Loop exiting. Completed {self._iters} " f"iterations." ) + print(f"Cummultive errors:") + self._print_error_codes(self._cummulative_errors) pass