-
Notifications
You must be signed in to change notification settings - Fork 5
/
Copy path49.py
251 lines (212 loc) · 8.8 KB
/
49.py
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
import dataclasses
from .. import aes
from .. import mac
from .. import random_helper
from .. import xor
@dataclasses.dataclass
class Account:
id_: int
name: str
password: str # Stored as plaintext. Bad. Oh well!
spacebucks: int
class Secrets:
def __init__(self):
seed_accounts = [
Account(id_=0, name="Alice", spacebucks=1000500,
password="@l1c3!"),
Account(id_=1, name="Bob", spacebucks=100, password="b0b"),
Account(id_=2, name="Mallory", spacebucks=1, password="hunter2"),
]
self.accounts = {acc.id_: acc for acc in seed_accounts}
self.key = random_helper.random_bytes(16)
class BankApi:
"""What would be our API endpoint."""
def __init__(self, secrets):
self.secrets = secrets
def transfer_v1(self, message):
"""V1: msg||iv||mac."""
mac_, message = message[-16:], message[:-16]
iv, message = message[-16:], message[:-16]
if mac.cbc_mac(self.secrets.key, message, iv) != mac_:
raise ValueError("Invalid CBC-MAC.")
parts = [part.split(b"=") for part in message.split(b"&")]
values = {key: int(value) for key, value in parts}
from_acc = self.secrets.accounts[values[b"from"]]
to_acc = self.secrets.accounts[values[b"to"]]
amount = values[b"amount"]
assert from_acc.spacebucks >= amount
from_acc.spacebucks -= amount
to_acc.spacebucks += amount
def transfer_v2(self, message):
"""V2: msg||mac (fixed iv)"""
# Be lenient with validation to allow a scrambled block to go through.
mac_, message = message[-16:], message[:-16]
if mac.cbc_mac(self.secrets.key, message) != mac_:
raise ValueError("Invalid CBC-MAC.")
values = {}
for key_value in message.split(b"&"):
parts = key_value.split(b"=", 2)
if len(parts) != 2:
continue
key, value = parts
values[key] = value
if b"from" not in values or b"tx_list" not in values:
return
from_acc = self.secrets.accounts[int(values[b"from"])]
txns = values[b"tx_list"].split(b";")
transfers = []
for txn in txns:
parts = txn.split(b":")
if len(parts) == 2:
to, amount = parts
try:
to = int(to)
amount = int(amount)
except ValueError:
continue
transfers.append((to, amount))
assert sum(amount for _, amount in transfers) <= from_acc.spacebucks
for to, amount in transfers:
from_acc.spacebucks -= amount
self.secrets.accounts[to].spacebucks += amount
def get_balance(self, id_):
return self.secrets.accounts[id_].spacebucks
class BankServer:
"""What would be the web API."""
class ClientSession:
def __init__(self, from_id, key):
self.from_id = from_id
self._key = key
def transfer_v1(self, to_id, amount, api):
msg = f"from={self.from_id}&to={to_id}&amount={amount}"
msg = msg.encode("ascii")
iv = random_helper.random_bytes(16)
mac_ = mac.cbc_mac(self._key, msg, iv)
api.transfer_v1(msg + iv + mac_)
def transfer_v2(self, txns, api):
msg = f"from={self.from_id}&tx_list="
msg += ";".join(f"{to}:{amount}" for to, amount in txns)
msg = msg.encode("ascii")
mac_ = mac.cbc_mac(self._key, msg)
api.transfer_v2(msg + mac_)
def __init__(self, secrets):
self.secrets = secrets
def login(self, name, password) -> ClientSession:
from_id = next(acc.id_ for acc in self.secrets.accounts.values()
if acc.name == name and acc.password == password)
return BankServer.ClientSession(from_id, self.secrets.key)
def reset():
secrets = Secrets()
api = BankApi(secrets)
server = BankServer(secrets)
alice = server.login("Alice", "@l1c3!") # We don't see that!
me = server.login("Mallory", "hunter2")
return api, server, alice, me
# Test our regular bank logic.
api, _, alice, me = reset()
assert api.get_balance(id_=0) == 1000500
assert api.get_balance(id_=1) == 100
alice.transfer_v1(to_id=1, amount=500, api=api)
assert api.get_balance(id_=0) == 1000000
assert api.get_balance(id_=1) == 600
# Can't do transfers without knowing the key.
try:
bad_client = BankServer.ClientSession(from_id=0, key=b"1234123412341234")
bad_client.transfer_v1(to_id=0, amount=500, api=api)
assert False, "accepted a bad transaction"
except ValueError:
pass
# Let's reset:
api, bank, _, me = reset()
# Create a MitM bank API:
class MitmApi:
def __init__(self):
self._intercepted = None
def transfer_v1(self, msg):
self._intercepted = msg
def transfer_v2(self, msg):
self._intercepted = msg
def read(self):
ret = self._intercepted
self._intercepted = None
return ret
# CBC encryption will do: `AES_k(pt ^ iv)` for the first block. So we can
# manipulate it by ensuring that bitflips we make on pt are also done on iv,
# then we get the same final CBC-MAC.
mitm = MitmApi()
print("Getting rich with protocol V1.")
while api.get_balance(id_=2) < 1000000: # While we're not rich...
amount = api.get_balance(id_=2)
amount = min(amount, api.get_balance(id_=0)) # Max we can steal
# Make a transfer to myself to get an example transaction:
me.transfer_v1(to_id=2, amount=amount, api=mitm)
msg = mitm.read()
mac_ = msg[-16:]
iv = msg[-32:-16]
msg = msg[:-32]
# Looks like: from=2&to=2&amount=XYZ
# |||||||||||||||| (first block)
# We want: from=0&to=2&amou
first_block = msg[:16]
target = b"from=0&to=2&amou"
assert len(target) == 16
xors = xor.xor_bytes(first_block, target)
iv = xor.xor_bytes(iv, xors)
api.transfer_v1(target + msg[16:] + iv + mac_)
print(f"We now have: {api.get_balance(id_=2)}$!")
# V2 now has a fixed IV.
api, bank, alice, me = reset()
# Test bank logic:
alice.transfer_v2(txns=[(1, 500), (1, 1000)], api=api)
assert api.get_balance(id_=1) == 1600
try: # Make sure MAC is checked
bad_client = BankServer.ClientSession(from_id=0, key=b"1234123412341234")
bad_client.transfer_v2(txns=[(1, 500), (1, 1000)], api=api)
assert False, "accepted a bad transaction"
except ValueError:
pass
# We have the last ciphertext block of Alice's message (the MAC).
# We have the MAC of one of our messages.
# By xoring our first plaintext block, we can chain our message to Alice's in a
# way that will match the IV=0 behavior, allowing us to get our same final MAC.
print("Getting rich with protocol V2.")
for attempt in range(40):
# Because we scramble bits with our bitflips, we can insert protocol
# characters with our attack (e.g. '&' or ':'), which can mess up parsing.
# In practice, we could adjust our payload as needed or retry with new
# MACs. Here, we just do multiple attempts to make sure this doesn't fail
# due to bad luck and mess up our CI.
# Let's reset:
api, bank, alice, me = reset()
# Intercept a transaction from Alice:
alice.transfer_v2(txns=[(1, 1)], api=mitm)
msg = mitm.read()
alice_msg, alice_mac = msg[:-16], msg[-16:]
api.transfer_v2(msg) # Do the real transaction to hide our mitm.
while api.get_balance(id_=2) < 1000000: # While we're not rich...
amount = api.get_balance(id_=2)
# Max we can steal (-1 because Alice is transfering $1 to Bob)
amount = min(amount, api.get_balance(id_=0) - 1)
# Make a transfer to myself to get an example transaction.
# Our first block will get scrambled, so we want to be strategic in what
# shows up in the second block:
# from=2&tx_list=2:0;2:XYZ;
# ||||||||||||||||----------------
# We want to make sure we include a ';' separator, so send 2 transactions.
me.transfer_v2(txns=[(2, 0), (2, amount)], api=mitm)
msg = mitm.read()
me_msg, me_mac = msg[:-16], msg[-16:]
first_block, rest = me_msg[:16], me_msg[16:]
target_iv = b"\x00" * 16
first_block = xor.xor_bytes(first_block,
xor.xor_bytes(target_iv, alice_mac))
prev_cash = api.get_balance(id_=2)
api.transfer_v2(aes.pad(alice_msg) + first_block + rest + me_mac)
print(f"We now have: {api.get_balance(id_=2)}$!")
if api.get_balance(id_=2) == prev_cash:
# Failed for some reason? Retry again with new keys. We were
# unlucky with the scrambling.
print(f"[!!!] Balance did not change ({attempt}). Retrying.")
break
if api.get_balance(id_=2) >= 1000000:
break # We're rich! We can leave now.