diff --git a/lunadev/base_bashrc_append.sh b/lunadev/base_bashrc_append.sh index 5a64ed01..2b9847ea 100644 --- a/lunadev/base_bashrc_append.sh +++ b/lunadev/base_bashrc_append.sh @@ -17,6 +17,9 @@ alias startvnc='python3 /root/lunadev-2024/lunadev/start_vnc.py' # Start telemetry tunnel alias starttele='python3 /root/lunadev-2024/lunadev/start_telemetry_tunnel.py' +# Tool for resetting a USB device +alias resetusb='python3 /root/lunadev-2024/lunadev/reset_usb.py' + # Try srcinstall if [ -f /root/lunadev-2024/install/setup.bash ]; then source /root/lunadev-2024/install/setup.bash diff --git a/lunadev/client_telemetry.py b/lunadev/client_telemetry.py index 97fd06a6..48640d3f 100644 --- a/lunadev/client_telemetry.py +++ b/lunadev/client_telemetry.py @@ -21,12 +21,12 @@ args = parser.parse_args() UDP_HEADER_SIZE = 8 # 8 bytes -DIAGNOSTIC_WINDOW = 10 +DIAGNOSTIC_WINDOW = 3 DIAGNOSTIC_DELAY = 2 -# First item in list is time step -# Second item is bandwidth during that time step -diagnostic_buffer = [[0, 0] for _ in range(DIAGNOSTIC_WINDOW)] +# First item in list is timestamp +# Second item is packet size +diagnostic_buffer = [] diagnostics_enabled = args.diagnostics running = True @@ -34,6 +34,14 @@ def print_diagnostics(): while running: time.sleep(DIAGNOSTIC_DELAY) + + for i, (timestamp, _) in enumerate(diagnostic_buffer): + if time.time() - timestamp < DIAGNOSTIC_WINDOW: + del diagnostic_buffer[0:i] + break + else: + diagnostic_buffer.clear() + print( "Bandwidth:", round(sum((n for _, n in diagnostic_buffer)) / 1000 / DIAGNOSTIC_WINDOW * 8, 3), @@ -48,14 +56,10 @@ def track_data(data: bytes): if not diagnostics_enabled: return - current_time_step = int(time.time()) - diagnostic = diagnostic_buffer[current_time_step % DIAGNOSTIC_WINDOW] - - if diagnostic[0] != current_time_step: - diagnostic[0] = current_time_step - diagnostic[1] = 0 - - diagnostic[1] += len(data) + UDP_HEADER_SIZE + diagnostic_buffer.append(( + time.time(), + len(data) + UDP_HEADER_SIZE + )) tcp_sock = None diff --git a/lunadev/reset_usb.py b/lunadev/reset_usb.py new file mode 100644 index 00000000..d1140bf0 --- /dev/null +++ b/lunadev/reset_usb.py @@ -0,0 +1,172 @@ +#!/usr/bin/env python +import os +import sys +from subprocess import Popen, PIPE +import fcntl + +instructions = ''' +Usage: python reset_usb.py help : Show this help + sudo python reset_usb.py list : List all USB devices + sudo python reset_usb.py path /dev/bus/usb/XXX/YYY : Reset USB device using path /dev/bus/usb/XXX/YYY + sudo python reset_usb.py search "search terms" : Search for USB device using the search terms within the search string returned by list and reset matching device + sudo python reset_usb.py listpci : List all PCI USB devices + sudo python reset_usb.py pathpci /sys/bus/pci/drivers/.../XXXX:XX:XX.X : Reset PCI USB device using path + sudo python reset_usb.py searchpci "search terms" : Search for PCI USB device using the search terms within the search string returned by listpci and reset matching device + ''' + + +if len(sys.argv) < 2: + print(instructions) + sys.exit(0) + +option = sys.argv[1].lower() +if 'help' in option: + print(instructions) + sys.exit(0) + + +def create_pci_list(): + pci_usb_list = list() + try: + lspci_out = Popen('lspci -Dvmm', shell=True, bufsize=64, stdin=PIPE, stdout=PIPE, close_fds=True).stdout.read().strip().decode('utf-8') + pci_devices = lspci_out.split('%s%s' % (os.linesep, os.linesep)) + for pci_device in pci_devices: + device_dict = dict() + categories = pci_device.split(os.linesep) + for category in categories: + key, value = category.split('\t') + device_dict[key[:-1]] = value.strip() + if 'USB' not in device_dict['Class']: + continue + for root, dirs, files in os.walk('/sys/bus/pci/drivers/'): + slot = device_dict['Slot'] + if slot in dirs: + device_dict['path'] = os.path.join(root, slot) + break + pci_usb_list.append(device_dict) + except Exception as ex: + print('Failed to list pci devices! Error: %s' % ex) + sys.exit(-1) + return pci_usb_list + + +def create_usb_list(): + device_list = list() + try: + lsusb_out = Popen('lsusb -v', shell=True, bufsize=64, stdin=PIPE, stdout=PIPE, close_fds=True).stdout.read().strip().decode('utf-8') + usb_devices = lsusb_out.split('%s%s' % (os.linesep, os.linesep)) + for device_categories in usb_devices: + if not device_categories: + continue + categories = device_categories.split(os.linesep) + device_stuff = categories[0].strip().split() + bus = device_stuff[1] + device = device_stuff[3][:-1] + device_dict = {'bus': bus, 'device': device} + device_info = ' '.join(device_stuff[6:]) + device_dict['description'] = device_info + for category in categories: + if not category: + continue + categoryinfo = category.strip().split() + if categoryinfo[0] == 'iManufacturer': + manufacturer_info = ' '.join(categoryinfo[2:]) + device_dict['manufacturer'] = manufacturer_info + if categoryinfo[0] == 'iProduct': + device_info = ' '.join(categoryinfo[2:]) + device_dict['device'] = device_info + path = '/dev/bus/usb/%s/%s' % (bus, device) + device_dict['path'] = path + + device_list.append(device_dict) + except Exception as ex: + print('Failed to list usb devices! Error: %s' % ex) + sys.exit(-1) + return device_list + + +if 'listpci' in option: + pci_usb_list = create_pci_list() + for device in pci_usb_list: + print('path=%s' % device['path']) + print(' manufacturer=%s' % device['SVendor']) + print(' device=%s' % device['SDevice']) + print(' search string=%s %s' % (device['SVendor'], device['SDevice'])) + sys.exit(0) + +if 'list' in option: + usb_list = create_usb_list() + for device in usb_list: + print('path=%s' % device['path']) + print(' description=%s' % device['description']) + print(' manufacturer=%s' % device['manufacturer']) + print(' device=%s' % device['device']) + print(' search string=%s %s %s' % (device['description'], device['manufacturer'], device['device'])) + sys.exit(0) + +if len(sys.argv) < 3: + print(instructions) + sys.exit(0) + +option2 = sys.argv[2] + +print('Resetting device: %s' % option2) + + +# echo -n "0000:39:00.0" | tee /sys/bus/pci/drivers/xhci_hcd/unbind;echo -n "0000:39:00.0" | tee /sys/bus/pci/drivers/xhci_hcd/bind +def reset_pci_usb_device(dev_path): + folder, slot = os.path.split(dev_path) + try: + fp = open(os.path.join(folder, 'unbind'), 'wt') + fp.write(slot) + fp.close() + fp = open(os.path.join(folder, 'bind'), 'wt') + fp.write(slot) + fp.close() + print('Successfully reset %s' % dev_path) + sys.exit(0) + except Exception as ex: + print('Failed to reset device! Error: %s' % ex) + sys.exit(-1) + + +if 'pathpci' in option: + reset_pci_usb_device(option2) + + +if 'searchpci' in option: + pci_usb_list = create_pci_list() + for device in pci_usb_list: + text = '%s %s' % (device['SVendor'], device['SDevice']) + if option2 in text: + reset_pci_usb_device(device['path']) + print('Failed to find device!') + sys.exit(-1) + + +def reset_usb_device(dev_path): + USBDEVFS_RESET = 21780 + try: + f = open(dev_path, 'w', os.O_WRONLY) + fcntl.ioctl(f, USBDEVFS_RESET, 0) + print('Successfully reset %s' % dev_path) + sys.exit(0) + except Exception as ex: + print('Failed to reset device! Error: %s' % ex) + sys.exit(-1) + + +if 'path' in option: + reset_usb_device(option2) + + +if 'search' in option: + usb_list = create_usb_list() + for device in usb_list: + text = '%s %s %s' % (device['description'], device['manufacturer'], device['device']) + if option2 in text: + reset_usb_device(device['path']) + print('Failed to find device!') + sys.exit(-1) + + diff --git a/lunadev/start_telemetry_tunnel.py b/lunadev/start_telemetry_tunnel.py index a64e421c..58e2db0f 100644 --- a/lunadev/start_telemetry_tunnel.py +++ b/lunadev/start_telemetry_tunnel.py @@ -14,8 +14,7 @@ def main(): "socat", "UNIX-LISTEN:lunadev/tele.sock,reuseaddr,fork,", "UDP4-LISTEN:43721" - ], - stderr=subprocess.DEVNULL + ] ) while not os.path.exists(full_sock_path): diff --git a/src/camera/camera/camera_compress.py b/src/camera/camera/camera_compress.py index fff4dbb1..8d0bfa46 100644 --- a/src/camera/camera/camera_compress.py +++ b/src/camera/camera/camera_compress.py @@ -22,8 +22,7 @@ def __init__(self): "-i", "-", "-c:v", "libvpx-vp9", "-b:v", "1500k", - # "-g", "72", - "-f", "matroska", "-" + "-f", "mpegts", "-" ], stdin=subprocess.PIPE, stdout=subprocess.PIPE, diff --git a/src/telemetry/telemetry/telemetry.py b/src/telemetry/telemetry/telemetry.py index 3b33e697..6d9abcd7 100644 --- a/src/telemetry/telemetry/telemetry.py +++ b/src/telemetry/telemetry/telemetry.py @@ -6,8 +6,7 @@ from global_msgs.msg import Steering from global_msgs.msg import CompressedImagePacket from std_msgs.msg import Float32 -from queue import Queue -from multiprocessing import Process +from multiprocessing import Process, Queue, Value class Channels(IntEnum): @@ -54,11 +53,16 @@ def __init__(self, scheme: int): else: self.steering_struct = Struct("bbb") + self.connected = Value('b', False) self.thr = Process(target=self.run) self.thr.start() def receive_image(self, img: CompressedImagePacket): - self.camera_image_buffer.put(img.data) + with self.connected.get_lock(): + if not self.connected.value: + return + + self.camera_image_buffer.put(bytes(img.data)) def run(self): logger = self.get_logger() @@ -87,6 +91,13 @@ def run(self): logger.info("Connected to lunabase!") + # Empty camera buffer + while not self.camera_image_buffer.empty(): + self.camera_image_buffer.get() + + with self.connected.get_lock(): + self.connected.value = True + # Main Loop while True: event = host.service(1000) @@ -102,6 +113,8 @@ def run(self): f"{event.peer.address}" ) continue + with self.connected.get_lock(): + self.connected.value = True logger.error("Disconnected from lunabase!") break @@ -120,10 +133,27 @@ def run(self): logger.error("Host has returned an error! Restarting...") break + data = bytearray() while not self.camera_image_buffer.empty(): + data += self.camera_image_buffer.get() + + while len(data) >= 512: + peer.send( + Channels.CAMERA, + enet.Packet( + data[0:512], + enet.PACKET_FLAG_UNSEQUENCED | enet.PACKET_FLAG_UNRELIABLE_FRAGMENT + ) + ) + del data[0:512] + + if len(data) > 0: peer.send( Channels.CAMERA, - enet.Packet(self.camera_image_buffer.get(), enet.PACKET_FLAG_UNSEQUENCED) + enet.Packet( + data, + enet.PACKET_FLAG_UNSEQUENCED | enet.PACKET_FLAG_UNRELIABLE_FRAGMENT + ) ) def on_receive(self, channel: int, data: bytes, peer) -> None: