Skip to content

Commit

Permalink
tests: gateware: Added initial end-to-end USB gateware test due to DF…
Browse files Browse the repository at this point in the history
…U bug.

Turns out it might be an issue in SOL not Squishy, so keeping the test but in it's own little world for now
  • Loading branch information
lethalbit committed Jan 4, 2025
1 parent 3b05f76 commit 62cd9ff
Show file tree
Hide file tree
Showing 2 changed files with 480 additions and 0 deletions.
282 changes: 282 additions & 0 deletions squishy/support/test.py
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,294 @@

__all__ = (
'SquishyGatewareTest',
'USBGatewarePHYTestHelpers',
'USBGatewareTestHelpers',
'DFUGatewareTestHelpers',
'SPIGatewareTestHelpers',
'SCSIGatewareTestHelpers',
)

class USBGatewarePHYTestHelpers:
'''
Unlike :py:class:`USBGatewareTestHelpers`, this mixin is used for end-to-end USB tests where we emulate a physical USB
bus, this helps test end-to-end integration.
To use this, the DUT platform rather than returning a ULPI or UTMI resource should return a DirectUSB resource, this
will cause SOL to use it's gateware PHY and we can then manually drive D+ and D- for tests.
'''

_USB_DP_RECORD = None
_last_frame: int = 0
_last_state: Literal['j', 'k'] | None = None
_last_trans: int = 0
_last_data: USBPacketID | None = None

def setup_helper(self, *, raw_record):
self.domains = (('usb', 12e6), ('usb_io', 48e6), *self.domains)

USBGatewarePHYTestHelpers._USB_DP_RECORD = raw_record

@staticmethod
def crc5(data: int, bit_len: int) -> int:
crc = 0x1F

for bit_idx in range(bit_len):
bit = (data >> bit_idx) & 1
crc <<= 1

if bit != (crc >> 5):
crc ^= 0x25
crc &= 0x1F

crc ^= 0x1F
return int(f'{crc:05b}'[::-1], base = 2)

@staticmethod
def crc16(data: int, bit_len: int, crc_in: int = 0) -> int:
crc = int(f'{crc_in ^ 0xFFFF:016b}'[::-1], base = 2)

for bit_idx in range(bit_len):
bit = (data >> bit_idx) & 1
crc <<= 1

if bit != (crc >> 16):
crc ^= 0x18005
crc &= 0xFFFF

crc ^= 0xFFFF
return int(f'{crc:016b}'[::-1], base = 2)

def usb_emit_bits(self, bits: int, count: int = 8):
for bit_idx in range(count):
bit = (bits >> bit_idx) & 1
if bit == 0:
self._last_trans = 0
match self._last_state:
case 'k':
yield from self.usb_j()
self._last_state = 'j'
case 'j':
yield from self.usb_k()
self._last_state = 'k'
case _:
yield from self.usb_j()
self._last_state = 'j'
else:
self._last_trans += 1
# Do bit-stuffing if we need to
if self._last_trans == 6:
self._last_trans = 0
match self._last_state:
case 'k':
yield from self.usb_j()
self._last_state = 'j'
case 'j':
yield from self.usb_k()
self._last_state = 'k'

match self._last_state:
case 'k':
yield from self.usb_k()
case 'j':
yield from self.usb_j()

def usb_single_zero(self):
yield self._USB_DP_RECORD.d_p.i.eq(0)
yield self._USB_DP_RECORD.d_n.i.eq(0)
yield Settle()
yield

def usb_single_one(self):
yield self._USB_DP_RECORD.d_p.i.eq(1)
yield self._USB_DP_RECORD.d_n.i.eq(1)
yield Settle()
yield

def usb_j(self):
yield self._USB_DP_RECORD.d_p.i.eq(1)
yield self._USB_DP_RECORD.d_n.i.eq(0)
yield Settle()
yield

def usb_wait_j(self):
yield from self.wait_until_high(self._USB_DP_RECORD.d_p.o, timeout = 1e4)
yield from self.usb_assert_j()

def usb_assert_j(self):
self.assertEqual((yield self._USB_DP_RECORD.d_p.o), 1)
self.assertEqual((yield self._USB_DP_RECORD.d_n.o), 0)

def usb_k(self):
yield self._USB_DP_RECORD.d_p.i.eq(0)
yield self._USB_DP_RECORD.d_n.i.eq(1)
yield Settle()
yield

def usb_wait_k(self):
yield from self.wait_until_high(self._USB_DP_RECORD.d_n.o, timeout = 1e4)
yield from self.usb_assert_k()

def usb_assert_k(self):
self.assertEqual((yield self._USB_DP_RECORD.d_n.o), 1)
self.assertEqual((yield self._USB_DP_RECORD.d_p.o), 0)


def usb_initialize(self):
yield from self.usb_single_zero()
yield from self.wait_for(30e-6, 'usb')
yield from self.usb_single_one()
yield from self.wait_for(1e-6, 'usb')

def usb_sync(self):
yield from self.usb_emit_bits(0x100, 9)

def usb_assert_sync(self):
yield from self.usb_wait_k()
for _ in range(3):
yield
yield from self.usb_assert_j()
yield
yield from self.usb_assert_k()
yield
yield from self.usb_assert_k()
yield


def usb_eop(self):
yield from self.usb_single_zero()
yield from self.usb_single_zero()
yield from self.usb_j()
yield from self.usb_single_one()
self._last_state = None

def usb_sof(self, frame_number: int | None = None):
yield from self.usb_sync()
yield from self.usb_emit_bits(USBPacketID.SOF.byte())

if frame_number is not None:
self._last_frame = frame_number

yield from self.usb_emit_bits(self._last_frame, 11)
yield from self.usb_emit_bits(self.crc5(self._last_frame, 11), 5)
yield from self.usb_eop()

self._last_frame += 1

def usb_in(self, addr: int, ep: int):
yield from self.usb_solicit(addr, ep, USBPacketID.IN)

def usb_out(self, addr: int, ep: int):
yield from self.usb_solicit(addr, ep, USBPacketID.OUT)

def usb_setup(self, addr: int):
self._last_data = None
yield from self.usb_solicit(addr, 0, USBPacketID.SETUP)

def usb_ack(self):
yield from self.usb_sync()
yield from self.usb_emit_bits(USBPacketID.ACK.byte())
yield from self.usb_eop()

def usb_solicit(self, addr: int, ep: int, pid: USBPacketID):
yield from self.usb_sync()
yield from self.usb_emit_bits(pid.byte())
yield from self.usb_emit_bits(addr, 7)
yield from self.usb_emit_bits(ep, 4)
yield from self.usb_emit_bits(self.crc5((addr | (ep << 7)), 11), 5)
yield from self.usb_eop()

def usb_data(self, data: Iterable[int]):
if self._last_data is None or self._last_data == USBPacketID.DATA1:
self._last_data = USBPacketID.DATA0
else:
self._last_data = USBPacketID.DATA1

yield from self.usb_sync()
yield from self.usb_emit_bits(self._last_data.byte())
crc = 0
for byte in data:
crc = self.crc16(byte, 8, crc)
yield from self.usb_emit_bits(byte)
yield from self.usb_emit_bits(crc, 16)
yield from self.usb_eop()

def usb_get_zlp(self):
yield from self.usb_consume_response((USBPacketID.DATA1.byte(), 0x00, 0x00))

def usb_send_zlp(self):
yield from self.usb_data(())

def usb_send_setup_pkt(self, addr: int, data: Iterable[int]):
yield from self.usb_setup(addr)
yield from self.usb_data(data)
yield from self.usb_consume_response((USBPacketID.ACK.byte(), ))
yield from self.step(10)


def usb_set_addr(self, addr: int):
yield from self.usb_send_setup_pkt(0, (
0x00, USBStandardRequests.SET_ADDRESS,
*addr.to_bytes(2, byteorder = 'little'), 0x00, 0x00, 0x00, 0x00
))
yield from self.usb_in(0x00, 0x00)
yield from self.usb_get_zlp()
yield from self.usb_ack()

def usb_set_config(self, addr: int, cfg: int):
yield from self.usb_send_setup_pkt(addr, (
0x00, USBStandardRequests.SET_CONFIGURATION,
*cfg.to_bytes(2, byteorder = 'little'), 0x00, 0x00, 0x00, 0x00
))
yield from self.usb_in(addr, 0x00)
yield from self.usb_get_zlp()
yield from self.usb_ack()


def usb_get_state(self):
dp = yield self._USB_DP_RECORD.d_p.o
dn = yield self._USB_DP_RECORD.d_n.o
yield
match (dp, dn):
case (0, 1):
return 'k'
case (1, 0):
return 'j'
case (0, 0):
return '0'
case (1, 1):
return '1'

def usb_consume_byte(self):
res = 0
for bit in range(8):
res >>= 1
state = yield from self.usb_get_state()
if self._last_state != state:
self._last_trans = 0
res |= 0x00
else:
self._last_trans += 1
if self._last_trans == 6:
state = yield from self.usb_get_state()
self.assertNotEqual(state, self._last_state)
self._last_trans = 0
res |= 0x80

self._last_state = state
return res

def usb_consume_response(self, data: Iterable[int]):
yield from self.usb_assert_sync()
self._last_state = 'k'

for byte in data:
self.assertEqual((yield from self.usb_consume_byte()), byte)

self.assertEqual((yield from self.usb_get_state()), '0')
self.assertEqual((yield from self.usb_get_state()), '0')
self.assertEqual((yield from self.usb_get_state()), 'j')

self._last_state = None


class USBGatewareTestHelpers:
Expand Down
Loading

0 comments on commit 62cd9ff

Please sign in to comment.