diff --git a/micropython/usbd/__init__.py b/micropython/usbd/__init__.py index 90739c27e..c6fadc4a7 100644 --- a/micropython/usbd/__init__.py +++ b/micropython/usbd/__init__.py @@ -1,4 +1,5 @@ from .device import get_usbdevice, USBInterface from .hid import HIDInterface, MouseInterface from .midi import DummyAudioInterface, MIDIInterface, MidiUSB +from .cdc import CDC from . import utils diff --git a/micropython/usbd/cdc.py b/micropython/usbd/cdc.py new file mode 100644 index 000000000..0d194e43f --- /dev/null +++ b/micropython/usbd/cdc.py @@ -0,0 +1,459 @@ +# MicroPython USB CDC module +# MIT license; Copyright (c) 2022 Martin Fischer, 2023 Angus Gratton +import io +import ustruct +import time +import errno +import machine +import struct +from micropython import const + +from .device import USBInterface, get_usbdevice +from .utils import ( + Buffer, + endpoint_descriptor, + split_bmRequestType, + STAGE_SETUP, + STAGE_DATA, + STAGE_ACK, + REQ_TYPE_STANDARD, + REQ_TYPE_CLASS, + EP_IN_FLAG, +) + +_DEV_CLASS_MISC = const(0xEF) +_CS_DESC_TYPE = const(0x24) # CS Interface type communication descriptor +_ITF_ASSOCIATION_DESC_TYPE = const(0xB) # Interface Association descriptor + +# CDC control interface definitions +_INTERFACE_CLASS_CDC = const(2) +_INTERFACE_SUBCLASS_CDC = const(2) # Abstract Control Mode +_PROTOCOL_NONE = const(0) # no protocol + +# CDC descriptor subtype +# see also CDC120.pdf, table 13 +_CDC_FUNC_DESC_HEADER = const(0) +_CDC_FUNC_DESC_CALL_MANAGEMENT = const(1) +_CDC_FUNC_DESC_ABSTRACT_CONTROL = const(2) +_CDC_FUNC_DESC_UNION = const(6) + +# CDC class requests, table 13, PSTN subclass +_SET_LINE_CODING_REQ = const(0x20) +_GET_LINE_CODING_REQ = const(0x21) +_SET_CONTROL_LINE_STATE = const(0x22) +_SEND_BREAK_REQ = const(0x23) + +_LINE_CODING_STOP_BIT_1 = const(0) +_LINE_CODING_STOP_BIT_1_5 = const(1) +_LINE_CODING_STOP_BIT_2 = const(2) + +_LINE_CODING_PARITY_NONE = const(0) +_LINE_CODING_PARITY_ODD = const(1) +_LINE_CODING_PARITY_EVEN = const(2) +_LINE_CODING_PARITY_MARK = const(3) +_LINE_CODING_PARITY_SPACE = const(4) + +_LINE_STATE_DTR = const(1) +_LINE_STATE_RTS = const(2) + +_PARITY_BITS_REPR = "NOEMS" +_STOP_BITS_REPR = ("1", "1.5", "2") + +# Other definitions +_CDC_VERSION = const(0x0120) # release number in binary-coded decimal + + +# CDC data interface definitions +_CDC_ITF_DATA_CLASS = const(0xA) +_CDC_ITF_DATA_SUBCLASS = const(0) +_CDC_ITF_DATA_PROT = const(0) # no protocol + +# Length of the bulk transfer endpoints. Maybe should be configurable? +_BULK_EP_LEN = const(64) + +# MicroPython error constants (negated as IOBase.ioctl uses negative return values for error codes) +# these must match values in py/mperrno.h +_MP_EINVAL = const(-22) +_MP_ETIMEDOUT = const(-110) + +# MicroPython stream ioctl requests, same as py/stream.h +_MP_STREAM_FLUSH = const(1) +_MP_STREAM_POLL = const(3) + +# MicroPython ioctl poll values, same as py/stream.h +_MP_STREAM_POLL_WR = const(0x04) +_MP_STREAM_POLL_RD = const(0x01) +_MP_STREAM_POLL_HUP = const(0x10) + + +class CDC(io.IOBase): + # USB CDC serial device class, designed to resemble machine.UART + # with some additional methods. + # + # This is a standalone class, instead of a USBInterface subclass, because + # CDC consists of multiple interfaces (CDC control and CDC data) + def __init__(self, **kwargs): + # For CDC to work, the device class must be set to Interface Association + usb_device = get_usbdevice() + usb_device.device_class = _DEV_CLASS_MISC + usb_device.device_subclass = 2 + usb_device.device_protocol = 1 # Itf association descriptor + + self._ctrl = CDCControlInterface() + self._data = CDCDataInterface() + # The data interface *must* be added immediately after the control interface + usb_device.add_interface(self._ctrl) + usb_device.add_interface(self._data) + + self.init(**kwargs) + + def init( + self, baudrate=9600, bits=8, parity="N", stop=1, timeout=None, txbuf=256, rxbuf=256, flow=0 + ): + # Configure the CDC serial port. Note that many of these settings like + # baudrate, bits, parity, stop don't change the USB-CDC device behavior + # at all, only the "line coding" as communicated from/to the USB host. + + # Store initial line coding parameters in the USB CDC binary format + # (there is nothing implemented to further change these from Python + # code, the USB host sets them.) + struct.pack_into( + " self._timeout: + return len(buf) - len(mv) + + def _wr_xfer(self): + # Submit a new IN transfer from the _wb buffer, if needed + if self.is_open() and not self.xfer_pending(self.ep_in) and self._wb.readable(): + self.submit_xfer(self.ep_in, self._wb.pend_read(), self._wr_cb) + + def _wr_cb(self, ep, res, num_bytes): + # Whenever an IN transfer ends + if res == 0: + self._wb.finish_read(num_bytes) + self._wr_xfer() + + def _rd_xfer(self): + # Keep an active OUT transfer to read data from the host, + # whenever the receive buffer has room for new data + if self.is_open() and not self.xfer_pending(self.ep_out) and self._rb.writable(): + # Can only submit up to the endpoint length per transaction, otherwise we won't + # get any transfer callback until the full transaction completes. + self.submit_xfer(self.ep_out, self._rb.pend_write(_BULK_EP_LEN), self._rd_cb) + + def _rd_cb(self, ep, res, num_bytes): + if res == 0: + self._rb.finish_write(num_bytes) + self._rd_xfer() + + def handle_open(self): + super().handle_open() + # kick off any transfers that may have queued while the device was not open + self._rd_xfer() + self._wr_xfer() + + def read(self, size): + start = time.ticks_ms() + + # Allocate a suitable buffer to read into + if size >= 0: + b = bytearray(size) + else: + # for size == -1, return however many bytes are ready + b = bytearray(self._rb.readable()) + + n = self._readinto(b, start) + if not b: + return None + if n < len(b): + return b[:n] + return b + + def readinto(self, b): + return self._readinto(b, time.ticks_ms()) + + def _readinto(self, b, start): + if len(b) == 0: + return 0 + + n = 0 + m = memoryview(b) + while n < len(b): + # copy out of the read buffer if there is anything available + if self._rb.readable(): + n += self._rb.readinto(m if n == 0 else m[n:]) + self._rd_xfer() # if _rd was previously full, no transfer will be running + if n == len(b): + break # Done, exit before we reach the sleep + + if time.ticks_diff(time.ticks_ms(), start) > self._timeout: + break # Timed out + + machine.idle() + + return n or None + + def ioctl(self, req, arg): + if req == _MP_STREAM_POLL: + return ( + (_MP_STREAM_POLL_WR if (arg & _MP_STREAM_POLL_WR) and self._wb.writable() else 0) + | (_MP_STREAM_POLL_RD if (arg & _MP_STREAM_POLL_RD) and self._rd.readable() else 0) + | + # using the USB level "open" (i.e. connected to host) for !HUP, not !DTR (port is open) + (_MP_STREAM_POLL_HUP if (arg & _MP_STREAM_POLL_HUP) and not self.is_open() else 0) + ) + elif req == _MP_STREAM_FLUSH: + start = time.ticks_ms() + # Wait until write buffer contains no bytes for the lower TinyUSB layer to "read" + while self._wb.readable(): + if not self.is_open(): + return _MP_EINVAL + if time.ticks_diff(time.ticks_ms(), start) > self._timeout: + return _MP_ETIMEDOUT + machine.idle() + return 0 + + return _MP_EINVAL diff --git a/micropython/usbd/cdc_example.py b/micropython/usbd/cdc_example.py new file mode 100644 index 000000000..9d0f454f9 --- /dev/null +++ b/micropython/usbd/cdc_example.py @@ -0,0 +1,24 @@ +from usbd import CDC, get_usbdevice +import os +import time + +cdc = CDC() # adds itself automatically +cdc.init(timeout=0) # zero timeout makes this non-blocking, suitable for os.dupterm() + +print("Triggering reenumerate...") + +ud = get_usbdevice() +ud.reenumerate() + +print("Waiting for CDC port to open...") + +# cdc.is_open() returns true after enumeration finishes. +# cdc.dtr is not set until the host opens the port and asserts DTR +while not (cdc.is_open() and cdc.dtr): + time.sleep_ms(20) + +print("CDC port is open, duplicating REPL...") + +old_term = os.dupterm(cdc) + +print("Welcome to REPL on CDC implemented in Python?") diff --git a/micropython/usbd/cdc_rate_benchmark.py b/micropython/usbd/cdc_rate_benchmark.py new file mode 100755 index 000000000..85b10cfef --- /dev/null +++ b/micropython/usbd/cdc_rate_benchmark.py @@ -0,0 +1,290 @@ +#!/usr/bin/env python +# +# Internal performance and reliability test for USB CDC. +# +# MIT License; Original Copyright (c) Damien George, updated by Angus Gratton 2023. +# +# Runs on the host, not the device. +# +# Usage: +# cdc_rate_benchmark.py [REPL serial device] [DATA serial device] +# +# - If both REPL and DATA serial devices are specified, script is loaded onto the REPL device +# and data is measured over the DATA device. +# - If only REPL serial device argument is specified, same port is used for both "REPL" and "DATA" +# - If neither serial device is specified, defaults to /dev/ttyACM0. +# + +import sys +import time +import argparse +import serial + + +def drain_input(ser): + time.sleep(0.1) + while ser.inWaiting() > 0: + data = ser.read(ser.inWaiting()) + time.sleep(0.1) + + +test_script_common = """ +try: + import pyb + p = pyb.USB_VCP(vcp_id) + pyb.LED(1).on() + led = pyb.LED(2) +except ImportError: + try: + from usbd import CDC, get_usbdevice + cdc = CDC(timeout=60_000) # adds itself automatically + ud = get_usbdevice() + ud.reenumerate() + p = cdc + led = None + except ImportError: + import sys + p = sys.stdout.buffer + led = None +""" + +read_test_script = """ +vcp_id = %u +b=bytearray(%u) +assert p.read(1) == b'G' # Trigger +for i in range(len(b)): + b[i] = i & 0xff +for _ in range(%d): + if led: + led.toggle() + n = p.write(b) + assert n == len(b) +p.flush() # for dynamic CDC, need to send all bytes before 'p' may be garbage collected +""" + + +def read_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf): + assert bufsize % 256 == 0 # for verify to work + + # Load and run the read_test_script. + ser_repl.write(b"\x03\x03\x01\x04") # break, break, raw-repl, soft-reboot + drain_input(ser_repl) + ser_repl.write(bytes(test_script_common, "ascii")) + ser_repl.write(bytes(read_test_script % (usb_vcp_id, bufsize, nbuf), "ascii")) + ser_repl.write(b"\x04") # eof + ser_repl.flush() + response = ser_repl.read(2) + assert response == b"OK", response + + # for dynamic USB CDC this port doesn't exist until shortly after the script runs, and we need + # to reopen it for each test run + dynamic_cdc = False + if isinstance(ser_data, str): + time.sleep(2) # hacky, but need some time for old port to close and new one to appear + ser_data = serial.Serial(ser_data, baudrate=115200) + dynamic_cdc = True + + ser_data.write(b"G") # trigger script to start sending + + # Read data from the device, check it is correct, and measure throughput. + n = 0 + last_byte = None + t_start = time.time() + remain = nbuf * bufsize + READ_TIMEOUT = 1e9 + total_data = bytearray(remain) + while remain: + t0 = time.monotonic_ns() + while ser_data.inWaiting() == 0: + if time.monotonic_ns() - t0 > READ_TIMEOUT: + # timeout waiting for data from device + break + time.sleep(0.0001) + if not ser_data.inWaiting(): + print(f"ERROR: timeout waiting for data. remain={remain}") + break + to_read = min(ser_data.inWaiting(), remain) + data = ser_data.read(to_read) + # verify bytes coming in are in sequence + # if last_byte is not None: + # if data[0] != (last_byte + 1) & 0xff: + # print('ERROR: first byte is not in sequence:', last_byte, data[0]) + # last_byte = data[-1] + # for i in range(1, len(data)): + # if data[i] != (data[i - 1] + 1) & 0xff: + # print('ERROR: data not in sequence at position %d:' % i, data[i - 1], data[i]) + remain -= len(data) + # print(n, nbuf * bufsize, end="\r") + total_data[n : n + len(data)] = data + n += len(data) + t_end = time.time() + for i in range(len(total_data)): + if total_data[i] != i & 0xFF: + print("fail", i, i & 0xFF, total_data[i]) + ser_repl.write(b"\x03") # break + t = t_end - t_start + + # Print results. + print( + "READ: bufsize=%u, read %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec" + % (bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t) + ) + + if dynamic_cdc: + ser_data.close() + + return t + + +write_test_script = """ +import sys +vcp_id = %u +b=bytearray(%u) +while 1: + if led: + led.toggle() + n = p.readinto(b) + assert n is not None # timeout + fail = 0 + er = b'ER' + if %u: + for i in range(n): + if b[i] != 32 + (i & 0x3f): + fail += 1 + if n != len(b): + er = b'BL' + fail = n or -1 + + if fail: + sys.stdout.write(er + b'%%04u' %% fail) + else: + sys.stdout.write(b'OK%%04u' %% n) +""" + + +def write_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf, verified): + # Load and run the write_test_script. + # ser_repl.write(b'\x03\x03\x01\x04') # break, break, raw-repl, soft-reboot + ser_repl.write(b"\x03\x01\x04") # break, raw-repl, soft-reboot + drain_input(ser_repl) + ser_repl.write(bytes(test_script_common, "ascii")) + ser_repl.write(bytes(write_test_script % (usb_vcp_id, bufsize, 1 if verified else 0), "ascii")) + ser_repl.write(b"\x04") # eof + ser_repl.flush() + drain_input(ser_repl) + + # for dynamic USB CDC this port doesn't exist until shortly after the script runs, and we need + # to reopen it for each test run + dynamic_cdc = False + if isinstance(ser_data, str): + time.sleep(2) # hacky, but need some time for old port to close and new one to appear + ser_data = serial.Serial(ser_data, baudrate=115200) + dynamic_cdc = True + + # Write data to the device, check it is correct, and measure throughput. + n = 0 + t_start = time.time() + buf = bytearray(bufsize) + for i in range(len(buf)): + buf[i] = 32 + (i & 0x3F) # don't want to send ctrl chars! + for i in range(nbuf): + ser_data.write(buf) + n += len(buf) + # while ser_data.inWaiting() == 0: + # time.sleep(0.001) + # response = ser_data.read(ser_data.inWaiting()) + response = ser_repl.read(6) + if response != b"OK%04u" % bufsize: + response += ser_repl.read(ser_repl.inWaiting()) + print("bad response, expecting OK%04u, got %r" % (bufsize, response)) + t_end = time.time() + ser_repl.write(b"\x03") # break + t = t_end - t_start + + # Print results. + print( + "WRITE: verified=%d, bufsize=%u, wrote %u bytes in %.2f msec = %.2f kibytes/sec = %.2f MBits/sec" + % (verified, bufsize, n, t * 1000, n / 1024 / t, n * 8 / 1000000 / t) + ) + + if dynamic_cdc: + ser_data.close() + + return t + + +def main(): + dev_repl = "/dev/ttyACM0" + dev_data = None + if len(sys.argv) >= 2: + dev_repl = sys.argv[1] + if len(sys.argv) >= 3: + assert len(sys.argv) >= 4 + dev_data = sys.argv[2] + usb_vcp_id = int(sys.argv[3]) + + if dev_data is None: + print("REPL and data on", dev_repl) + ser_repl = serial.Serial(dev_repl, baudrate=115200) + ser_data = ser_repl + usb_vcp_id = 0 + else: + print("REPL on", dev_repl) + print("data on", dev_data) + print("USB VCP", usb_vcp_id) + ser_repl = serial.Serial(dev_repl, baudrate=115200) + ser_data = dev_data # can't open this port until it exists + + if 0: + for i in range(1000): + print("======== TEST %04u ========" % i) + read_test(ser_repl, ser_data, usb_vcp_id, 8000, 32) + write_test(ser_repl, ser_data, usb_vcp_id, 8000, 32, True) + return + + read_test_params = [ + (256, 128), + (512, 64), + (1024, 64), + (2048, 64), + (4096, 64), + (8192, 64), + (16384, 64), + ] + write_test_params = [(512, 16), (1024, 16)] # for high speed mode due to lack of flow ctrl + write_test_params = [ + (128, 32), + (256, 16), + (512, 16), + (1024, 16), + (2048, 16), + (4096, 16), + (8192, 64), + (9999, 64), + ] + + # ambiq + # read_test_params = ((256, 512),) + # write_test_params = () + # ambiq + + for bufsize, nbuf in read_test_params: + t = read_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf) + if t > 8: + break + + for bufsize, nbuf in write_test_params: + t = write_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf, True) + if t > 8: + break + + for bufsize, nbuf in write_test_params: + t = write_test(ser_repl, ser_data, usb_vcp_id, bufsize, nbuf, False) + if t > 8: + break + + ser_repl.close() + + +if __name__ == "__main__": + main() diff --git a/micropython/usbd/device.py b/micropython/usbd/device.py index 492b42ea3..7d6ce3bf8 100644 --- a/micropython/usbd/device.py +++ b/micropython/usbd/device.py @@ -210,7 +210,7 @@ def _descriptor_config_cb(self): self._eps = {} # rebuild endpoint mapping as we enumerate each interface self._ep_cbs = {} itf_idx = static.itf_max - ep_addr = static.ep_max + ep_addr = max(static.ep_max, 1) # Endpoint 0 reserved for control str_idx = static.str_max + len(strs) for itf in self._itfs: # Get the endpoint descriptors first so we know how many endpoints there are @@ -324,9 +324,8 @@ def _submit_xfer(self, ep_addr, data, done_cb=None): # # Generally, drivers should call USBInterface.submit_xfer() instead. See # that function for documentation about the possible parameter values. - cb = self._ep_cbs[ep_addr] - if cb: - raise RuntimeError(f"Pending xfer on EP {ep_addr}") + if self._ep_cbs[ep_addr]: + raise RuntimeError("xfer_pending") # USBD callback may be called immediately, before Python execution # continues @@ -334,8 +333,7 @@ def _submit_xfer(self, ep_addr, data, done_cb=None): if not self._usbd.submit_xfer(ep_addr, data): self._ep_cbs[ep_addr] = None - return False - return True + raise RuntimeError("submit failed") def _xfer_cb(self, ep_addr, result, xferred_bytes): # Singleton callback from TinyUSB custom class driver when a transfer completes. @@ -386,7 +384,7 @@ def _control_xfer_cb(self, stage, request): # True - Continue transfer, no data # False - STALL transfer # Object with buffer interface - submit this data for the control transfer - if type(result) == bool: + if isinstance(result, bool): return result return self._usbd.control_xfer(request, result) @@ -544,7 +542,7 @@ def handle_device_control_xfer(self, stage, request): # # - True to continue the request, False to STALL the endpoint. # - Buffer interface object to provide a buffer to the host as part of the - # transfer, if possible. + # transfer, if applicable. return False def handle_interface_control_xfer(self, stage, request): @@ -582,6 +580,12 @@ def handle_endpoint_control_xfer(self, stage, request): # possible return values. return False + def xfer_pending(self, ep_addr): + # Return True if a transfer is already pending on ep_addr. + # + # Only one transfer can be submitted at a time. + return bool(get_usbdevice()._ep_cbs[ep_addr]) + def submit_xfer(self, ep_addr, data, done_cb=None): # Submit a USB transfer (of any type except control) # @@ -599,11 +603,27 @@ def submit_xfer(self, ep_addr, data, done_cb=None): # xferred_bytes) where result is one of xfer_result_t enum (see top of # this file), and xferred_bytes is an integer. # + # If the function returns, the transfer is queued. + # + # The function will raise RuntimeError under the following conditions: + # + # - The interface is not "open" (i.e. has not been enumerated and configured + # by the host yet.) + # + # - A transfer is already pending on this endpoint (use xfer_pending() to check + # before sending if needed.) + # + # - A DCD error occurred when queueing the transfer on the hardware. + # + # + # Will raise TypeError if 'data' isn't he correct type of buffer for the + # endpoint transfer direction. + # # Note that done_cb may be called immediately, possibly before this # function has returned to the caller. if not self._open: - raise RuntimeError - return get_usbdevice()._submit_xfer(ep_addr, data, done_cb) + raise RuntimeError("Not open") + get_usbdevice()._submit_xfer(ep_addr, data, done_cb) def set_ep_stall(self, ep_addr, stall): # Set or clear endpoint STALL state, according to the bool "stall" parameter. diff --git a/micropython/usbd/manifest.py b/micropython/usbd/manifest.py index 78b2c69fb..c6e710af8 100644 --- a/micropython/usbd/manifest.py +++ b/micropython/usbd/manifest.py @@ -5,6 +5,7 @@ "usbd", files=( "__init__.py", + "cdc.py", "device.py", "hid.py", "hid_keypad.py", diff --git a/micropython/usbd/test_utils_buffer.py b/micropython/usbd/test_utils_buffer.py new file mode 100644 index 000000000..871f125bf --- /dev/null +++ b/micropython/usbd/test_utils_buffer.py @@ -0,0 +1,94 @@ +import micropython + +from utils import Buffer +import utils + +if not hasattr(utils.machine, "disable_irq"): + # Allow testing on the unix port, and as a bonus have tests fail if the buffer + # allocates inside a critical section. + class FakeMachine: + def disable_irq(self): + return micropython.heap_lock() + + def enable_irq(self, was_locked): + if not was_locked: + micropython.heap_unlock() + + utils.machine = FakeMachine() + + +# TODO: Makes this a test which can be integrated somewhere + +b = Buffer(16) + +# Check buffer is empty +assert b.readable() == 0 +assert b.writable() == 16 + +# Single write then read +w = b.pend_write() +assert len(w) == 16 +w[:8] = b"12345678" +b.finish_write(8) + +# Empty again +assert b.readable() == 8 +assert b.writable() == 8 + +r = b.pend_read() +assert len(r) == 8 +assert r == b"12345678" +b.finish_read(8) + +# Empty buffer again +assert b.readable() == 0 +assert b.writable() == 16 + +# Single write then split reads +b.write(b"abcdefghijklmnop") +assert b.writable() == 0 # full buffer + +r = b.pend_read() +assert r == b"abcdefghijklmnop" +b.finish_read(2) + +r = b.pend_read() +assert r == b"cdefghijklmnop" +b.finish_read(3) + +# write to end of buffer +b.write(b"AB") + +r = b.pend_read() +assert r == b"fghijklmnopAB" + +# write while a read is pending +b.write(b"XY") + +# previous pend_read() memoryview should be the same +assert r == b"fghijklmnopAB" + +b.finish_read(4) +r = b.pend_read() +assert r == b"jklmnopABXY" # four bytes consumed from head, one new byte at tail + +# read while a write is pending +w = b.pend_write() +assert len(w) == 5 +r = b.pend_read() +assert len(r) == 11 +b.finish_read(3) +w[:2] = b"12" +b.finish_write(2) + +# Expected final state of buffer +tmp = bytearray(b.readable()) +assert b.readinto(tmp) == len(tmp) +assert tmp == b"mnopABXY12" + +# Now buffer is empty again +assert b.readable() == 0 +assert b.readinto(tmp) == 0 +assert b.writable() == 16 + +print("Done!") diff --git a/micropython/usbd/utils.py b/micropython/usbd/utils.py index 017019575..c94d093db 100644 --- a/micropython/usbd/utils.py +++ b/micropython/usbd/utils.py @@ -3,6 +3,7 @@ # # Some constants and stateless utility functions for working with USB descriptors and requests. from micropython import const +import machine import ustruct # Shared constants @@ -75,3 +76,131 @@ def split_bmRequestType(bmRequestType): (bmRequestType >> 5) & 0x03, (bmRequestType >> 7) & 0x01, ) + + +class Buffer: + # An interrupt-safe producer/consumer buffer that wraps a bytearray object. + # + # Kind of like a ring buffer, but supports the idea of returning a + # memoryview for either read or write of multiple bytes (suitable for + # passing to a buffer function without needing to allocate another buffer to + # read into.) + # + # Consumer can call pend_read() to get a memoryview to read from, and then + # finish_read(n) when done to indicate it read 'n' bytes from the + # memoryview. There is also a readinto() convenience function. + # + # Producer must call pend_write() to get a memorybuffer to write into, and + # then finish_write(n) when done to indicate it wrote 'n' bytes into the + # memoryview. There is also a normal write() convenience function. + # + # - Only one producer and one consumer is supported. + # + # - Calling pend_read() and pend_write() is effectively idempotent, they can be + # called more than once without a corresponding finish_x() call if necessary + # (provided only one thread does this, as per the previous point.) + # + # - Calling finish_write() and finish_read() is hard interrupt safe (does + # not allocate). pend_read() and pend_write() each allocate 1 block for + # the memoryview that is returned. + # + # The buffer contents are always laid out as: + # + # - Slice [:_n] = bytes of valid data waiting to read + # - Slice [_n:_w] = unused space + # - Slice [_w:] = bytes of pending write buffer waiting to be written + # + # This buffer should be fast when most reads and writes are balanced and use + # the whole buffer. When this doesn't happen, performance degrades to + # approximate a Python-based single byte ringbuffer. + # + def __init__(self, length): + self._b = memoryview(bytearray(length)) + # number of bytes in buffer read to read, starting at index 0. Updated + # by both producer & consumer. + self._n = 0 + # start index of a pending write into the buffer, if any. equals + # len(self._b) if no write is pending. Updated by producer only. + self._w = length + + def writable(self): + # Number of writable bytes in the buffer. Assumes no pending write is outstanding. + return len(self._b) - self._n + + def readable(self): + # Number of readable bytes in the buffer. Assumes no pending read is outstanding. + return self._n + + def pend_write(self, wmax=None): + # Returns a memoryview that the producer can write bytes into. + # start the write at self._n, the end of data waiting to read + # + # If wmax is set then the memoryview is pre-sliced to be at most + # this many bytes long. + # + # (No critical section needed as self._w is only updated by the producer.) + self._w = self._n + end = (self._w + wmax) if wmax else len(self._b) + return self._b[self._w : end] + + def finish_write(self, nbytes): + # Called by the producer to indicate it wrote nbytes into the buffer. + ist = machine.disable_irq() + try: + assert nbytes <= len(self._b) - self._w # can't say we wrote more than was pended + if self._n == self._w: + # no data was read while the write was happening, so the buffer is already in place + # (this is the fast path) + self._n += nbytes + else: + # Slow path: data was read while the write was happening, so + # shuffle the newly written bytes back towards index 0 to avoid fragmentation + # + # As this updates self._n we have to do it in the critical + # section, so do it byte by byte to avoid allocating. + while nbytes > 0: + self._b[self._n] = self._b[self._w] + self._n += 1 + self._w += 1 + nbytes -= 1 + + self._w = len(self._b) + finally: + machine.enable_irq(ist) + + def write(self, w): + # Helper method for the producer to write into the buffer in one call + pw = self.pend_write() + to_w = min(len(w), len(pw)) + if to_w: + pw[:to_w] = w[:to_w] + self.finish_write(to_w) + return to_w + + def pend_read(self): + # Return a memoryview slice that the consumer can read bytes from + return self._b[: self._n] + + def finish_read(self, nbytes): + # Called by the consumer to indicate it read nbytes from the buffer. + ist = machine.disable_irq() + try: + assert nbytes <= self._n # can't say we read more than was available + i = 0 + self._n -= nbytes + while i < self._n: + # consumer only read part of the buffer, so shuffle remaining + # read data back towards index 0 to avoid fragmentation + self._b[i] = self._b[i + nbytes] + i += 1 + finally: + machine.enable_irq(ist) + + def readinto(self, b): + # Helper method for the consumer to read out of the buffer in one call + pr = self.pend_read() + to_r = min(len(pr), len(b)) + if to_r: + b[:to_r] = pr[:to_r] + self.finish_read(to_r) + return to_r