add methods to read directly off the board over USB
This commit is contained in:
parent
772ae953f0
commit
10dcd149cc
|
@ -2,27 +2,39 @@
|
|||
|
||||
Much of this code is a partial Python implementation of picotool.
|
||||
"""
|
||||
import logging
|
||||
import struct
|
||||
|
||||
import usb.core
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
PICO_VENDOR = 0x2e8a
|
||||
PICO_PRODUCT = 0x0003
|
||||
|
||||
PICOBOOT_CMD_STRUCT = '<LLBBxxL'
|
||||
PICOBOOT_CMD_EXCLUSIVE_ACCESS_SUFFIX_STRUCT = 'L12x'
|
||||
PICOBOOT_CMD_EXIT_XIP_SUFFIX_STRUCT = '16x'
|
||||
PICOBOOT_CMD_READ_SUFFIX_STRUCT = 'LL8x'
|
||||
PICOBOOT_CMD_REBOOT_SUFFIX_STRUCT = 'LLL4x'
|
||||
|
||||
PICO_MAGIC = 0x431fd10b
|
||||
PICO_SRAM_END = 0x20042000
|
||||
PICO_TOKEN = 0
|
||||
# only a partial implementation...
|
||||
PICO_COMMANDS = {
|
||||
'EXCLUSIVE_ACCESS': 0x1,
|
||||
'REBOOT': 0x2,
|
||||
'READ': 0x4,
|
||||
'EXIT_XIP': 0x6,
|
||||
}
|
||||
|
||||
|
||||
def get_bootsel_out_endpoint() -> usb.core.Endpoint:
|
||||
"""Retrieve the USB endpoint for purposes of interacting with a Pico in BOOTSEL mode."""
|
||||
def get_bootsel_endpoints() -> tuple[usb.core.Endpoint, usb.core.Endpoint]:
|
||||
"""Retrieve the USB endpoint for purposes of interacting with a Pico in BOOTSEL mode.
|
||||
|
||||
Returns:
|
||||
the out and in endpoints for the BOOTSEL interface
|
||||
"""
|
||||
# get the device and claim it from whatever else might have in the kernel
|
||||
pico_device = usb.core.find(idVendor=PICO_VENDOR, idProduct=PICO_PRODUCT)
|
||||
|
||||
|
@ -40,21 +52,101 @@ def get_bootsel_out_endpoint() -> usb.core.Endpoint:
|
|||
out_endpoint = usb.util.find_descriptor(pico_bootsel_interface,
|
||||
custom_match=lambda e: (usb.util.endpoint_direction(e.bEndpointAddress) ==
|
||||
usb.util.ENDPOINT_OUT))
|
||||
return out_endpoint
|
||||
in_endpoint = usb.util.find_descriptor(pico_bootsel_interface,
|
||||
custom_match=lambda e: (usb.util.endpoint_direction(e.bEndpointAddress) ==
|
||||
usb.util.ENDPOINT_IN))
|
||||
return out_endpoint, in_endpoint
|
||||
|
||||
|
||||
def reboot() -> None:
|
||||
"""Reboot a Pico in BOOTSEL mode."""
|
||||
global PICO_TOKEN
|
||||
endpoint = get_bootsel_out_endpoint()
|
||||
def exclusive_access(out_end: usb.core.Endpoint, in_end: usb.core.Endpoint, is_exclusive: bool = True) -> None:
|
||||
"""Enable exclusive access mode on a Pico in BOOTSEL.
|
||||
|
||||
Args:
|
||||
out_endpoint: the out direction USB endpoint to write to
|
||||
in_endpoint: the in direction USB endpoint to read from
|
||||
"""
|
||||
# set up the data
|
||||
PICO_TOKEN += 1
|
||||
pico_token = 1
|
||||
command_size = 1
|
||||
transfer_len = 0
|
||||
exclusive = 1 if is_exclusive else 0
|
||||
payload = struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_EXCLUSIVE_ACCESS_SUFFIX_STRUCT,
|
||||
PICO_MAGIC, pico_token, PICO_COMMANDS['EXCLUSIVE_ACCESS'], command_size, transfer_len,
|
||||
exclusive)
|
||||
logger.debug("EXCLUSIVE_ACCESS: %s", payload)
|
||||
out_end.write(payload)
|
||||
_ = in_end.read(256)
|
||||
|
||||
|
||||
def exit_xip(out_end: usb.core.Endpoint, in_end: usb.core.Endpoint) -> None:
|
||||
"""Exit XIP on a Pico in BOOTSEL.
|
||||
|
||||
Args:
|
||||
out_endpoint: the out direction USB endpoint to write to
|
||||
in_endpoint: the in direction USB endpoint to read from
|
||||
"""
|
||||
# set up the data
|
||||
pico_token = 1
|
||||
command_size = 0
|
||||
transfer_len = 0
|
||||
payload = struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_EXIT_XIP_SUFFIX_STRUCT,
|
||||
PICO_MAGIC, pico_token, PICO_COMMANDS['EXIT_XIP'], command_size, transfer_len)
|
||||
logger.debug("EXIT_XIP: %s", payload)
|
||||
out_end.write(payload)
|
||||
_ = in_end.read(256)
|
||||
|
||||
|
||||
def read(out_end: usb.core.Endpoint, in_end: usb.core.Endpoint, location: int, size: int) -> bytearray:
|
||||
"""Read a requested number of bytes from a Pico in BOOTSEL, starting from the specified location.
|
||||
|
||||
This also prepares the USB device for reading, so it expects to be able to grab
|
||||
exclusive access.
|
||||
|
||||
Args:
|
||||
out_endpoint: the out direction USB endpoint to write to
|
||||
in_endpoint: the in direction USB endpoint to read from
|
||||
location: memory address of where to start reading from
|
||||
size: number of bytes to read
|
||||
Returns:
|
||||
the read bytes as a byte array
|
||||
"""
|
||||
# set up the data
|
||||
chunk_size = 256
|
||||
command_size = 8
|
||||
|
||||
read_location = location
|
||||
read_size = 0
|
||||
content = bytearray()
|
||||
exclusive_access(out_end, in_end, is_exclusive=True)
|
||||
while read_size < size:
|
||||
exit_xip(out_end, in_end)
|
||||
pico_token = 1
|
||||
payload = struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_READ_SUFFIX_STRUCT,
|
||||
PICO_MAGIC, pico_token, PICO_COMMANDS['READ'] + 128, command_size, chunk_size,
|
||||
read_location, chunk_size)
|
||||
logger.debug("READ: %s", payload)
|
||||
out_end.write(payload)
|
||||
res = in_end.read(chunk_size)
|
||||
logger.debug("res: %s", res)
|
||||
content += res
|
||||
read_size += chunk_size
|
||||
read_location += chunk_size
|
||||
out_end.write(b'\xc0')
|
||||
exclusive_access(out_end, in_end, is_exclusive=False)
|
||||
logger.debug("final content: %s", content[:size])
|
||||
return content[:size]
|
||||
|
||||
|
||||
def reboot(out_end: usb.core.Endpoint) -> None:
|
||||
"""Reboot a Pico in BOOTSEL mode."""
|
||||
# set up the data
|
||||
pico_token = 1
|
||||
command_size = 12
|
||||
transfer_len = 0
|
||||
boot_start = 0
|
||||
boot_end = PICO_SRAM_END
|
||||
boot_delay_ms = 500
|
||||
endpoint.write(struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_REBOOT_SUFFIX_STRUCT,
|
||||
PICO_MAGIC, PICO_TOKEN, PICO_COMMANDS['REBOOT'], command_size, transfer_len,
|
||||
boot_start, boot_end, boot_delay_ms))
|
||||
out_end.write(struct.pack(PICOBOOT_CMD_STRUCT + PICOBOOT_CMD_REBOOT_SUFFIX_STRUCT,
|
||||
PICO_MAGIC, pico_token, PICO_COMMANDS['REBOOT'], command_size, transfer_len,
|
||||
boot_start, boot_end, boot_delay_ms))
|
||||
# we don't even bother reading here because it may have already rebooted
|
||||
|
|
|
@ -0,0 +1,118 @@
|
|||
"""Test operations for interfacing directly with a Pico."""
|
||||
import os
|
||||
import struct
|
||||
import sys
|
||||
import unittest.mock as mock
|
||||
from array import array
|
||||
|
||||
from decorator import decorator
|
||||
|
||||
import gp2040ce_bintools.pico as pico
|
||||
|
||||
HERE = os.path.dirname(os.path.abspath(__file__))
|
||||
|
||||
|
||||
@decorator
|
||||
def with_pb2s(test, *args, **kwargs):
|
||||
"""Wrap a test with precompiled pb2 files on the path."""
|
||||
proto_path = os.path.join(HERE, 'test-files', 'pb2-files')
|
||||
sys.path.append(proto_path)
|
||||
|
||||
test(*args, **kwargs)
|
||||
|
||||
sys.path.pop()
|
||||
del sys.modules['config_pb2']
|
||||
|
||||
|
||||
def test_exclusive_access():
|
||||
"""Test that we can get exclusive access to a BOOTSEL board."""
|
||||
end_out, end_in = mock.MagicMock(), mock.MagicMock()
|
||||
pico.exclusive_access(end_out, end_in)
|
||||
|
||||
payload = struct.pack('<LLBBxxLL12x', 0x431fd10b, 1, 0x1, 1, 0, 1)
|
||||
end_out.write.assert_called_with(payload)
|
||||
end_in.read.assert_called_once()
|
||||
|
||||
end_out.reset_mock()
|
||||
end_in.reset_mock()
|
||||
pico.exclusive_access(end_out, end_in, is_exclusive=False)
|
||||
|
||||
payload = struct.pack('<LLBBxxLL12x', 0x431fd10b, 1, 0x1, 1, 0, 0)
|
||||
end_out.write.assert_called_with(payload)
|
||||
end_in.read.assert_called_once()
|
||||
|
||||
|
||||
def test_exit_xip():
|
||||
"""Test that we can exit XIP on a BOOTSEL board."""
|
||||
end_out, end_in = mock.MagicMock(), mock.MagicMock()
|
||||
pico.exit_xip(end_out, end_in)
|
||||
|
||||
payload = struct.pack('<LLBBxxL16x', 0x431fd10b, 1, 0x6, 0, 0)
|
||||
end_out.write.assert_called_with(payload)
|
||||
end_in.read.assert_called_once()
|
||||
|
||||
|
||||
def test_read():
|
||||
"""Test that we can read a memory of a BOOTSEL board in a variety of conditions."""
|
||||
end_out, end_in = mock.MagicMock(), mock.MagicMock()
|
||||
end_in.read.return_value = array('B', b'\x11' * 256)
|
||||
content = pico.read(end_out, end_in, 0x101FE000, 256)
|
||||
|
||||
expected_writes = [
|
||||
mock.call(struct.pack('<LLBBxxLL12x', 0x431fd10b, 1, 0x1, 1, 0, 1)),
|
||||
mock.call(struct.pack('<LLBBxxL16x', 0x431fd10b, 1, 0x6, 0, 0)),
|
||||
mock.call(struct.pack('<LLBBxxLLL8x', 0x431fd10b, 1, 0x84, 8, 256, 0x101FE000, 256)),
|
||||
mock.call(b'\xc0'),
|
||||
mock.call(struct.pack('<LLBBxxLL12x', 0x431fd10b, 1, 0x1, 1, 0, 0)),
|
||||
]
|
||||
end_out.write.assert_has_calls(expected_writes)
|
||||
assert end_in.read.call_count == 4
|
||||
assert len(content) == 256
|
||||
|
||||
|
||||
def test_read_shorter_than_chunk():
|
||||
"""Test that we can read a memory of a BOOTSEL board in a variety of conditions."""
|
||||
end_out, end_in = mock.MagicMock(), mock.MagicMock()
|
||||
end_in.read.return_value = array('B', b'\x11' * 256)
|
||||
content = pico.read(end_out, end_in, 0x101FE000, 128)
|
||||
|
||||
expected_writes = [
|
||||
mock.call(struct.pack('<LLBBxxLL12x', 0x431fd10b, 1, 0x1, 1, 0, 1)),
|
||||
mock.call(struct.pack('<LLBBxxL16x', 0x431fd10b, 1, 0x6, 0, 0)),
|
||||
mock.call(struct.pack('<LLBBxxLLL8x', 0x431fd10b, 1, 0x84, 8, 256, 0x101FE000, 256)),
|
||||
mock.call(b'\xc0'),
|
||||
mock.call(struct.pack('<LLBBxxLL12x', 0x431fd10b, 1, 0x1, 1, 0, 0)),
|
||||
]
|
||||
end_out.write.assert_has_calls(expected_writes)
|
||||
assert end_in.read.call_count == 4
|
||||
assert len(content) == 128
|
||||
|
||||
|
||||
def test_read_bigger_than_chunk():
|
||||
"""Test that we can read a memory of a BOOTSEL board in a variety of conditions."""
|
||||
end_out, end_in = mock.MagicMock(), mock.MagicMock()
|
||||
end_in.read.return_value = array('B', b'\x11' * 256)
|
||||
content = pico.read(end_out, end_in, 0x101FE000, 512)
|
||||
|
||||
expected_writes = [
|
||||
mock.call(struct.pack('<LLBBxxLL12x', 0x431fd10b, 1, 0x1, 1, 0, 1)),
|
||||
mock.call(struct.pack('<LLBBxxL16x', 0x431fd10b, 1, 0x6, 0, 0)),
|
||||
mock.call(struct.pack('<LLBBxxLLL8x', 0x431fd10b, 1, 0x84, 8, 256, 0x101FE000, 256)),
|
||||
mock.call(b'\xc0'),
|
||||
mock.call(struct.pack('<LLBBxxL16x', 0x431fd10b, 1, 0x6, 0, 0)),
|
||||
mock.call(struct.pack('<LLBBxxLLL8x', 0x431fd10b, 1, 0x84, 8, 256, 0x101FE000+256, 256)),
|
||||
mock.call(b'\xc0'),
|
||||
mock.call(struct.pack('<LLBBxxLL12x', 0x431fd10b, 1, 0x1, 1, 0, 0)),
|
||||
]
|
||||
end_out.write.assert_has_calls(expected_writes)
|
||||
assert end_in.read.call_count == 6
|
||||
assert len(content) == 512
|
||||
|
||||
|
||||
def test_reboot():
|
||||
"""Test that we can reboot a BOOTSEL board."""
|
||||
end_out = mock.MagicMock()
|
||||
pico.reboot(end_out)
|
||||
|
||||
payload = struct.pack('<LLBBxxLLLL4x', 0x431fd10b, 1, 0x2, 12, 0, 0, 0x20042000, 500)
|
||||
end_out.write.assert_called_with(payload)
|
Loading…
Reference in New Issue