From 10dcd149ccbf1048160c7a39453d11e0ed2273b6 Mon Sep 17 00:00:00 2001 From: "Brian S. Stephan" Date: Fri, 7 Jul 2023 16:45:07 -0500 Subject: [PATCH] add methods to read directly off the board over USB --- gp2040ce_bintools/pico.py | 116 +++++++++++++++++++++++++++++++++---- tests/test_pico.py | 118 ++++++++++++++++++++++++++++++++++++++ 2 files changed, 222 insertions(+), 12 deletions(-) create mode 100644 tests/test_pico.py diff --git a/gp2040ce_bintools/pico.py b/gp2040ce_bintools/pico.py index 7f3a703..301093a 100644 --- a/gp2040ce_bintools/pico.py +++ b/gp2040ce_bintools/pico.py @@ -2,27 +2,39 @@ Much of this code is a partial Python implementation of picotool. """ +import logging import struct import usb.core +logger = logging.getLogger(__name__) + PICO_VENDOR = 0x2e8a PICO_PRODUCT = 0x0003 PICOBOOT_CMD_STRUCT = ' usb.core.Endpoint: - """Retrieve the USB endpoint for purposes of interacting with a Pico in BOOTSEL mode.""" +def get_bootsel_endpoints() -> tuple[usb.core.Endpoint, usb.core.Endpoint]: + """Retrieve the USB endpoint for purposes of interacting with a Pico in BOOTSEL mode. + + Returns: + the out and in endpoints for the BOOTSEL interface + """ # get the device and claim it from whatever else might have in the kernel pico_device = usb.core.find(idVendor=PICO_VENDOR, idProduct=PICO_PRODUCT) @@ -40,21 +52,101 @@ def get_bootsel_out_endpoint() -> usb.core.Endpoint: out_endpoint = usb.util.find_descriptor(pico_bootsel_interface, custom_match=lambda e: (usb.util.endpoint_direction(e.bEndpointAddress) == usb.util.ENDPOINT_OUT)) - return out_endpoint + in_endpoint = usb.util.find_descriptor(pico_bootsel_interface, + custom_match=lambda e: (usb.util.endpoint_direction(e.bEndpointAddress) == + usb.util.ENDPOINT_IN)) + return out_endpoint, in_endpoint -def reboot() -> None: - """Reboot a Pico in BOOTSEL mode.""" - global PICO_TOKEN - endpoint = get_bootsel_out_endpoint() +def exclusive_access(out_end: usb.core.Endpoint, in_end: usb.core.Endpoint, is_exclusive: bool = True) -> None: + """Enable exclusive access mode on a Pico in BOOTSEL. + Args: + out_endpoint: the out direction USB endpoint to write to + in_endpoint: the in direction USB endpoint to read from + """ # set up the data - PICO_TOKEN += 1 + pico_token = 1 + command_size = 1 + transfer_len = 0 + exclusive = 1 if is_exclusive else 0 + payload = struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_EXCLUSIVE_ACCESS_SUFFIX_STRUCT, + PICO_MAGIC, pico_token, PICO_COMMANDS['EXCLUSIVE_ACCESS'], command_size, transfer_len, + exclusive) + logger.debug("EXCLUSIVE_ACCESS: %s", payload) + out_end.write(payload) + _ = in_end.read(256) + + +def exit_xip(out_end: usb.core.Endpoint, in_end: usb.core.Endpoint) -> None: + """Exit XIP on a Pico in BOOTSEL. + + Args: + out_endpoint: the out direction USB endpoint to write to + in_endpoint: the in direction USB endpoint to read from + """ + # set up the data + pico_token = 1 + command_size = 0 + transfer_len = 0 + payload = struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_EXIT_XIP_SUFFIX_STRUCT, + PICO_MAGIC, pico_token, PICO_COMMANDS['EXIT_XIP'], command_size, transfer_len) + logger.debug("EXIT_XIP: %s", payload) + out_end.write(payload) + _ = in_end.read(256) + + +def read(out_end: usb.core.Endpoint, in_end: usb.core.Endpoint, location: int, size: int) -> bytearray: + """Read a requested number of bytes from a Pico in BOOTSEL, starting from the specified location. + + This also prepares the USB device for reading, so it expects to be able to grab + exclusive access. + + Args: + out_endpoint: the out direction USB endpoint to write to + in_endpoint: the in direction USB endpoint to read from + location: memory address of where to start reading from + size: number of bytes to read + Returns: + the read bytes as a byte array + """ + # set up the data + chunk_size = 256 + command_size = 8 + + read_location = location + read_size = 0 + content = bytearray() + exclusive_access(out_end, in_end, is_exclusive=True) + while read_size < size: + exit_xip(out_end, in_end) + pico_token = 1 + payload = struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_READ_SUFFIX_STRUCT, + PICO_MAGIC, pico_token, PICO_COMMANDS['READ'] + 128, command_size, chunk_size, + read_location, chunk_size) + logger.debug("READ: %s", payload) + out_end.write(payload) + res = in_end.read(chunk_size) + logger.debug("res: %s", res) + content += res + read_size += chunk_size + read_location += chunk_size + out_end.write(b'\xc0') + exclusive_access(out_end, in_end, is_exclusive=False) + logger.debug("final content: %s", content[:size]) + return content[:size] + + +def reboot(out_end: usb.core.Endpoint) -> None: + """Reboot a Pico in BOOTSEL mode.""" + # set up the data + pico_token = 1 command_size = 12 transfer_len = 0 boot_start = 0 boot_end = PICO_SRAM_END boot_delay_ms = 500 - endpoint.write(struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_REBOOT_SUFFIX_STRUCT, - PICO_MAGIC, PICO_TOKEN, PICO_COMMANDS['REBOOT'], command_size, transfer_len, - boot_start, boot_end, boot_delay_ms)) + out_end.write(struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_REBOOT_SUFFIX_STRUCT, + PICO_MAGIC, pico_token, PICO_COMMANDS['REBOOT'], command_size, transfer_len, + boot_start, boot_end, boot_delay_ms)) + # we don't even bother reading here because it may have already rebooted diff --git a/tests/test_pico.py b/tests/test_pico.py new file mode 100644 index 0000000..1d8194f --- /dev/null +++ b/tests/test_pico.py @@ -0,0 +1,118 @@ +"""Test operations for interfacing directly with a Pico.""" +import os +import struct +import sys +import unittest.mock as mock +from array import array + +from decorator import decorator + +import gp2040ce_bintools.pico as pico + +HERE = os.path.dirname(os.path.abspath(__file__)) + + +@decorator +def with_pb2s(test, *args, **kwargs): + """Wrap a test with precompiled pb2 files on the path.""" + proto_path = os.path.join(HERE, 'test-files', 'pb2-files') + sys.path.append(proto_path) + + test(*args, **kwargs) + + sys.path.pop() + del sys.modules['config_pb2'] + + +def test_exclusive_access(): + """Test that we can get exclusive access to a BOOTSEL board.""" + end_out, end_in = mock.MagicMock(), mock.MagicMock() + pico.exclusive_access(end_out, end_in) + + payload = struct.pack('