Skip to content

Commit

Permalink
Fix udp payload extraction code (ipv6 support)
Browse files Browse the repository at this point in the history
  • Loading branch information
ChillerDragon committed Feb 19, 2024
1 parent 436f5ad commit 5711a46
Show file tree
Hide file tree
Showing 2 changed files with 41 additions and 42 deletions.
30 changes: 16 additions & 14 deletions src/udp.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,36 +12,38 @@ def extract_udp_payload(data: bytes) -> Tuple[bytes, List[str]]:
"""
messages: List[str] = []
udp_payload = data
# ethernet
try:
ip = dpkt.ethernet.Ethernet(data).data
if not isinstance(ip.data, dpkt.udp.UDP):
print("not ethernet")
print("ethernet but not udp payload")
raise ValueError("not udp")
udp_payload = ip.data.data
messages.append("extracting udp payload from ethernet packet ...")
except:
pass
# ipv6
try:
ip = dpkt.ip6.IP6(data)
print(ip.data)
if not isinstance(ip.data, dpkt.udp.UDP):
print("ipv6 but not udp payload")
raise ValueError("not udp")
udp_payload = ip.data.data
messages.append("extracting udp payload from ipv6 packet ...")
except:
pass
# ipv4
try:
ip = dpkt.ip.IP(data)
print(ip.data)
if not isinstance(ip.data, dpkt.udp.UDP):
print("not udp")
print("ipv4 but not udp payload")
raise ValueError("not udp")
udp_payload = ip.data.data
messages.append("extracting udp payload from ip packet ...")
messages.append("extracting udp payload from ipv4 packet ...")
except:
pass

data = udp_payload
return (data, messages)



# data = \
# b'\x60\x0a\xa5\x6d\x00\x1d\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \
# b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \
# b'\x00\x00\x00\x00\x00\x00\x00\x01\x20\x6f\xd9\xc8\x00\x1d\x00\x30' \
# b'\x00\x19\x02\x23\xec\x92\x03\x00\x05\x15\x9e\xa0\x05\x0c\x00\x05' \
# b'\x0f\x9e\xa0\x05\x02'
#
# print(extract_udp_payload(data))
53 changes: 25 additions & 28 deletions tests/extract_udp_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -9,37 +9,34 @@ def test_extract_udp():
b'\xff\x8c\x14\x37\x00'

got = extract_udp_payload(data)
expect = (b'\x10\x0c\x01Bx\r\x88U\xe9\xf0\x87\xe6\x07h\xd6\xd0[\xf8i/\xff\x8c\x147\x00', ['extracting udp payload from ip packet ...'])
expect = (b'\x10\x0c\x01Bx\r\x88U\xe9\xf0\x87\xe6\x07h\xd6\xd0[\xf8i/\xff\x8c\x147\x00', ['extracting udp payload from ipv4 packet ...'])
assert got == expect


# def test_extract_udp_ctrl_disconnect():
# # throws invalid header length?!
# # this is copied from tcpdump and wireshark parses it fine too wtf
# data = \
# b'\x60\x04\x33\xb1\x00\x10\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \
# b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \
# b'\x00\x00\x00\x00\x00\x00\x00\x01\xd9\xc8\x20\x6f\x00\x10\x00\x23' \
# b'\x04\x0f\x00\x26\x3e\x5a\x37\x04'
# got = extract_udp_payload(data)
# expect = (b'\x10\x0c\x01Bx\r\x88U\xe9\xf0\x87\xe6\x07h\xd6\xd0[\xf8i/\xff\x8c\x147\x00', ['extracting udp payload from ip packet ...'])
# assert got == expect


# def test_extract_udp_input_timing_and_snap_empty():
# # also throws invalid header?!
# # what am i doing wrong here omg
# # this is also straight from wireshark/tcpdump
# # https://zillyhuhn.com/cs/.1708314568.png
# # https://github.com/ChillerDragon/teeworlds-web-traffic-decoder/issues/1
# data = \
# b'\x60\x0a\xa5\x6d\x00\x1d\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \
# b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \
# b'\x00\x00\x00\x00\x00\x00\x00\x01\x20\x6f\xd9\xc8\x00\x1d\x00\x30' \
# b'\x00\x19\x02\x23\xec\x92\x03\x00\x05\x15\x9e\xa0\x05\x0c\x00\x05' \
# b'\x0f\x9e\xa0\x05\x02'
#
# print(extract_udp_payload(data))
def test_extract_ctrl_disconnect_from_ipv6():
# tcpdump -x
data = \
b'\x60\x04\x33\xb1\x00\x10\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \
b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \
b'\x00\x00\x00\x00\x00\x00\x00\x01\xd9\xc8\x20\x6f\x00\x10\x00\x23' \
b'\x04\x0f\x00\x26\x3e\x5a\x37\x04'
got = extract_udp_payload(data)
expect = (b'\x04\x0f\x00&>Z7\x04', ['extracting udp payload from ipv6 packet ...'])
assert got == expect


def test_extract_snap_from_ipv6():
# tcpdump -x
data = \
b'\x60\x0a\xa5\x6d\x00\x1d\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \
b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \
b'\x00\x00\x00\x00\x00\x00\x00\x01\x20\x6f\xd9\xc8\x00\x1d\x00\x30' \
b'\x00\x19\x02\x23\xec\x92\x03\x00\x05\x15\x9e\xa0\x05\x0c\x00\x05' \
b'\x0f\x9e\xa0\x05\x02'

expected = (b'\x00\x19\x02#\xec\x92\x03\x00\x05\x15\x9e\xa0\x05\x0c\x00\x05\x0f\x9e\xa0\x05\x02', ['extracting udp payload from ipv6 packet ...'])
assert extract_udp_payload(data) == expected


def test_extract_ethernet_input_timing_and_snap_empty():
# real packet from tcpdump -xx
Expand Down

0 comments on commit 5711a46

Please sign in to comment.