From 5711a4682b9e3e7354df12d07afbebe6244c35d7 Mon Sep 17 00:00:00 2001 From: ChillerDragon Date: Mon, 19 Feb 2024 12:27:37 +0800 Subject: [PATCH] Fix udp payload extraction code (ipv6 support) --- src/udp.py | 30 +++++++++++----------- tests/extract_udp_test.py | 53 ++++++++++++++++++--------------------- 2 files changed, 41 insertions(+), 42 deletions(-) diff --git a/src/udp.py b/src/udp.py index 6c48bde..6fe4a8e 100644 --- a/src/udp.py +++ b/src/udp.py @@ -12,36 +12,38 @@ def extract_udp_payload(data: bytes) -> Tuple[bytes, List[str]]: """ messages: List[str] = [] udp_payload = data + # ethernet try: ip = dpkt.ethernet.Ethernet(data).data if not isinstance(ip.data, dpkt.udp.UDP): - print("not ethernet") + print("ethernet but not udp payload") raise ValueError("not udp") udp_payload = ip.data.data messages.append("extracting udp payload from ethernet packet ...") except: pass + # ipv6 + try: + ip = dpkt.ip6.IP6(data) + print(ip.data) + if not isinstance(ip.data, dpkt.udp.UDP): + print("ipv6 but not udp payload") + raise ValueError("not udp") + udp_payload = ip.data.data + messages.append("extracting udp payload from ipv6 packet ...") + except: + pass + # ipv4 try: ip = dpkt.ip.IP(data) print(ip.data) if not isinstance(ip.data, dpkt.udp.UDP): - print("not udp") + print("ipv4 but not udp payload") raise ValueError("not udp") udp_payload = ip.data.data - messages.append("extracting udp payload from ip packet ...") + messages.append("extracting udp payload from ipv4 packet ...") except: pass data = udp_payload return (data, messages) - - - -# data = \ -# b'\x60\x0a\xa5\x6d\x00\x1d\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \ -# b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \ -# b'\x00\x00\x00\x00\x00\x00\x00\x01\x20\x6f\xd9\xc8\x00\x1d\x00\x30' \ -# b'\x00\x19\x02\x23\xec\x92\x03\x00\x05\x15\x9e\xa0\x05\x0c\x00\x05' \ -# b'\x0f\x9e\xa0\x05\x02' -# -# print(extract_udp_payload(data)) diff --git a/tests/extract_udp_test.py b/tests/extract_udp_test.py index 77f2ab3..855b22c 100644 --- a/tests/extract_udp_test.py +++ b/tests/extract_udp_test.py @@ -9,37 +9,34 @@ def test_extract_udp(): b'\xff\x8c\x14\x37\x00' got = extract_udp_payload(data) - expect = (b'\x10\x0c\x01Bx\r\x88U\xe9\xf0\x87\xe6\x07h\xd6\xd0[\xf8i/\xff\x8c\x147\x00', ['extracting udp payload from ip packet ...']) + expect = (b'\x10\x0c\x01Bx\r\x88U\xe9\xf0\x87\xe6\x07h\xd6\xd0[\xf8i/\xff\x8c\x147\x00', ['extracting udp payload from ipv4 packet ...']) assert got == expect -# def test_extract_udp_ctrl_disconnect(): -# # throws invalid header length?! -# # this is copied from tcpdump and wireshark parses it fine too wtf -# data = \ -# b'\x60\x04\x33\xb1\x00\x10\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \ -# b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \ -# b'\x00\x00\x00\x00\x00\x00\x00\x01\xd9\xc8\x20\x6f\x00\x10\x00\x23' \ -# b'\x04\x0f\x00\x26\x3e\x5a\x37\x04' -# got = extract_udp_payload(data) -# expect = (b'\x10\x0c\x01Bx\r\x88U\xe9\xf0\x87\xe6\x07h\xd6\xd0[\xf8i/\xff\x8c\x147\x00', ['extracting udp payload from ip packet ...']) -# assert got == expect - - -# def test_extract_udp_input_timing_and_snap_empty(): -# # also throws invalid header?! -# # what am i doing wrong here omg -# # this is also straight from wireshark/tcpdump -# # https://zillyhuhn.com/cs/.1708314568.png -# # https://github.com/ChillerDragon/teeworlds-web-traffic-decoder/issues/1 -# data = \ -# b'\x60\x0a\xa5\x6d\x00\x1d\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \ -# b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \ -# b'\x00\x00\x00\x00\x00\x00\x00\x01\x20\x6f\xd9\xc8\x00\x1d\x00\x30' \ -# b'\x00\x19\x02\x23\xec\x92\x03\x00\x05\x15\x9e\xa0\x05\x0c\x00\x05' \ -# b'\x0f\x9e\xa0\x05\x02' -# -# print(extract_udp_payload(data)) +def test_extract_ctrl_disconnect_from_ipv6(): + # tcpdump -x + data = \ + b'\x60\x04\x33\xb1\x00\x10\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \ + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \ + b'\x00\x00\x00\x00\x00\x00\x00\x01\xd9\xc8\x20\x6f\x00\x10\x00\x23' \ + b'\x04\x0f\x00\x26\x3e\x5a\x37\x04' + got = extract_udp_payload(data) + expect = (b'\x04\x0f\x00&>Z7\x04', ['extracting udp payload from ipv6 packet ...']) + assert got == expect + + +def test_extract_snap_from_ipv6(): + # tcpdump -x + data = \ + b'\x60\x0a\xa5\x6d\x00\x1d\x11\x40\x00\x00\x00\x00\x00\x00\x00\x00' \ + b'\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x00' \ + b'\x00\x00\x00\x00\x00\x00\x00\x01\x20\x6f\xd9\xc8\x00\x1d\x00\x30' \ + b'\x00\x19\x02\x23\xec\x92\x03\x00\x05\x15\x9e\xa0\x05\x0c\x00\x05' \ + b'\x0f\x9e\xa0\x05\x02' + + expected = (b'\x00\x19\x02#\xec\x92\x03\x00\x05\x15\x9e\xa0\x05\x0c\x00\x05\x0f\x9e\xa0\x05\x02', ['extracting udp payload from ipv6 packet ...']) + assert extract_udp_payload(data) == expected + def test_extract_ethernet_input_timing_and_snap_empty(): # real packet from tcpdump -xx