Skip to content

Commit 19dc243

Browse files
committed
wrapper: add network throttling
Add network throttling using cgroups and tc. This is choice of tools was not the best one and it has quite important caveats: 1) Not easy to (re-)configure when there is already some trafic control setup in place. 2) We don't know in advance which interface should be throttled, that means we need to apply limit to all interfaces. We do this only in limited way because setting it up (and cleaning!) it properly is difficult (especially in connection with #1). 3) Most importantly, we can limit only output traffic! That is too bad as we are more interested in limiting the input traffic. Signed-off-by: Tomáš Golembiovský <tgolembi@redhat.com>
1 parent 03011c1 commit 19dc243

File tree

3 files changed

+271
-7
lines changed

3 files changed

+271
-7
lines changed

docs/Virt-v2v-wrapper.md

+21-3
Original file line numberDiff line numberDiff line change
@@ -222,17 +222,25 @@ Right before the wrapper terminates it updates the state with:
222222
## Conversion throttling (rate limiting)
223223

224224
It is possible to throttle resources used by the conversion. At the moment one
225-
can limit only CPU.
225+
can limit CPU and network bandwidth. Before wrapper detaches to background
226226

227227
Example of throttling file content:
228228

229229
```
230230
{
231231
"cpu": "50%",
232+
"network": "1048576"
232233
}
233234
```
234235

235-
This will assign half of single CPU to the process.
236+
This will assign half of single CPU to the process and limit network bandwidth
237+
to 1 MB/s.
238+
239+
Limits read from the throttling file are added to those already in place. That
240+
means one does not strictly need to include all the possible limits in the
241+
file. Although it is suggested to do so. Otherwise some information can be lost
242+
if multiple changes occur to the throttling file before the wrapper manages to
243+
read them.
236244

237245
To remove a limit one should assign value `unlimited`.
238246

@@ -251,4 +259,14 @@ From systemd.resource-control(5):
251259
hierarchy and "cpu.cfs_quota_us" on legacy.
252260

253261
For example, to assign half of one CPU use "50%" or to assign two CPUs use
254-
"200%".
262+
"200%". To remove any limit on CPU one can pass the string "unlimited".
263+
264+
265+
### Network Bandwidth
266+
267+
Due to design of cgroup filter on tc the network is limited on output only.
268+
(Input is limited only indirectly and is unreliable.)
269+
270+
The limit is specified in bytes per seconds without any units. E.g. "12345".
271+
Note that despite being a number it should be passed as string in JSON. To
272+
remove any limit one can pass the string "unlimited".

v2v-conversion-host.spec.in

+2
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,8 @@ Ansible role to setup hosts as conversion host for ManageIQ
5050

5151
%package wrapper
5252
Summary: Daemonizing wrapper for virt-v2v
53+
Requires: libcgroup-tools
54+
Requires: python
5355
BuildArch: noarch
5456

5557
%description wrapper

wrapper/virt-v2v-wrapper.py

+248-4
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
# limitations under the License.
1818
#
1919

20+
import atexit
2021
from contextlib import contextmanager
2122
import copy
2223
import errno
@@ -781,6 +782,7 @@ def __init__(self):
781782
'failed': False,
782783
'throttling': {
783784
'cpu': None,
785+
'network': None,
784786
}
785787
}
786788
self._filename = None
@@ -814,6 +816,23 @@ def __getattr__(self, name):
814816

815817

816818
# }}}
819+
820+
def atexit_command(cmd):
821+
"""
822+
Run command ignoring any errors. This is supposed to be used with atexit.
823+
"""
824+
def remove(cmd):
825+
try:
826+
logging.info('Running command at exit: %r', cmd)
827+
subprocess.check_call(cmd)
828+
except subprocess.CalledProcessError as e:
829+
logging.warning(
830+
'Ignoring failed command at exit,'
831+
'returncode=%d, output=\n%s\n',
832+
e.returncode, e.output)
833+
atexit.register(lambda: remove(cmd))
834+
835+
817836
def hard_error(msg):
818837
"""
819838
Function to produce an error and terminate the wrapper.
@@ -1142,6 +1161,12 @@ def run(self):
11421161

11431162
# }}}
11441163
class SystemdRunner(BaseRunner): # {{{
1164+
1165+
def __init__(self, host, arguments, environment, log):
1166+
super(SystemdRunner, self).__init__(host, arguments, environment, log)
1167+
self._service_name = None
1168+
self._tc = None
1169+
11451170
def is_running(self):
11461171
try:
11471172
subprocess.check_call([
@@ -1164,6 +1189,7 @@ def kill(self):
11641189
error('Failed to kill virt-v2v unit', exception=True)
11651190

11661191
def run(self):
1192+
net_cls_dir = self._prepare_net_cls()
11671193
unit = [
11681194
'systemd-run',
11691195
'--description=virt-v2v conversion',
@@ -1173,9 +1199,11 @@ def run(self):
11731199
for k, v in six.iteritems(self._environment):
11741200
unit.append('--setenv=%s=%s' % (k, v))
11751201
unit.extend([
1202+
'cgexec', '-g', 'net_cls:%s' % net_cls_dir,
11761203
'/bin/sh', '-c',
11771204
'exec "%s" "$@" > "%s" 2>&1' % (VIRT_V2V, self._log),
11781205
VIRT_V2V]) # First argument is command name
1206+
logging.info('systemd-run invocation: %r', unit)
11791207
unit.extend(self._arguments)
11801208

11811209
proc = subprocess.Popen(
@@ -1248,6 +1276,15 @@ def systemd_set_property(self, property_name, value):
12481276
property_name)
12491277
return False
12501278

1279+
def set_network_limit(self, limit):
1280+
if self._tc is None:
1281+
return False
1282+
return self._tc.set_limit(limit)
1283+
1284+
def _prepare_net_cls(self):
1285+
self._tc = TcController(self._host.get_tag())
1286+
return self._tc.cgroup
1287+
12511288
def _systemd_return_code(self):
12521289
""" Return code after the unit exited """
12531290
code = self._systemd_property('ExecMainStatus')
@@ -1262,6 +1299,187 @@ def _systemd_return_code(self):
12621299

12631300

12641301
# }}}
1302+
class TcController(object):
1303+
"""
1304+
Handles communication with tc (traffic control) and associated net_cls
1305+
cgroup.
1306+
"""
1307+
1308+
# TC store rates as a 32-bit unsigned integer in bps internally
1309+
MAX_RATE = 0xffffffff
1310+
1311+
@staticmethod
1312+
def class_id_to_hex(class_id):
1313+
"""
1314+
Convert class ID in the form <major>:<minor> into hex string where
1315+
upper 16b are for major and lower 16b are for minor number.
1316+
1317+
e.g.: '1a:2b' -> '0x001a002b'
1318+
"""
1319+
parts = class_id.split(':')
1320+
major = int(parts[0], base=16)
1321+
minor = int(parts[1], base=16)
1322+
return '0x{:04x}{:04x}'.format(major, minor)
1323+
1324+
def __init__(self, tag):
1325+
self._cgroup = 'v2v-conversion/%s' % tag
1326+
self._class_id = None
1327+
self._interfaces = []
1328+
self._prepare()
1329+
1330+
@property
1331+
def class_id(self):
1332+
return self._class_id
1333+
1334+
@property
1335+
def cgroup(self):
1336+
return self._cgroup
1337+
1338+
def set_limit(self, limit):
1339+
if limit is None or limit == 'unlimited':
1340+
limit = TcController.MAX_RATE
1341+
ret = True
1342+
for iface in self._interfaces:
1343+
if self._run_tc([
1344+
'class', 'change', 'dev', iface,
1345+
'classid', self._class_id, 'htb',
1346+
'rate', '{}bps'.format(limit),
1347+
]) is None:
1348+
ret = False
1349+
return ret
1350+
1351+
def _prepare(self):
1352+
logging.info('Preparing tc')
1353+
root_handle = self._create_qdiscs()
1354+
if root_handle is None:
1355+
return
1356+
for iface in self._interfaces[:]:
1357+
if not self._create_filter(root_handle, iface) or \
1358+
not self._create_class(root_handle, iface):
1359+
self._interfaces.remove(iface)
1360+
self._prepare_cgroup()
1361+
1362+
def _prepare_cgroup(self):
1363+
logging.info('Preparing net_cls cgroup %s', self._cgroup)
1364+
# Create cgroup -- we do this even when tc is not properly set
1365+
# otherwise cgexec would fail
1366+
cgroup_dir = '/sys/fs/cgroup/net_cls/%s' % self._cgroup
1367+
atexit_command(['/usr/bin/rmdir', '-p', cgroup_dir])
1368+
os.makedirs(cgroup_dir)
1369+
# Store class ID
1370+
if self._class_id is not None:
1371+
with open(os.path.join(cgroup_dir, 'net_cls.classid'), 'w') as f:
1372+
f.write(TcController.class_id_to_hex(self._class_id))
1373+
else:
1374+
logging.info(
1375+
'Not assigning class ID to net_cls cgroup'
1376+
' because of previous errors')
1377+
1378+
def _create_qdiscs(self):
1379+
qdiscs = self._run_tc(['qdisc', 'show'])
1380+
if qdiscs is None:
1381+
logging.error('Failed to query existing qdiscs')
1382+
return None
1383+
logging.debug('Found following qdiscs: %r', qdiscs)
1384+
1385+
root_handle = 'abc:'
1386+
ifaces = []
1387+
roots = None
1388+
try:
1389+
# (interface, type, root handle)
1390+
roots = [(qdisc[4], qdisc[1], qdisc[2])
1391+
for qdisc in qdiscs if qdisc[5] == 'root']
1392+
except IndexError:
1393+
logging.exception('Failed to process tc output')
1394+
logging.error('%r', qdiscs)
1395+
return None
1396+
logging.debug('Found following root qdiscs: %r', roots)
1397+
#
1398+
# Here we go through all interfaces and try to set our root handle.
1399+
# For interfaces that already have some configuration this will likely
1400+
# fail, we ignore those (but we give it a try first).
1401+
#
1402+
for qdisc in roots:
1403+
if qdisc[1] == 'htb' and qdisc[2] == root_handle:
1404+
# Already ours
1405+
ifaces.append(qdisc[0])
1406+
continue
1407+
# Try to change the qdisc type
1408+
if self._run_tc([
1409+
'qdisc', 'add', 'dev', qdisc[0],
1410+
'root', 'handle', root_handle, 'htb'
1411+
]) is None:
1412+
logging.info('Failed to setup HTB qdisc on %s', qdisc[0])
1413+
else:
1414+
ifaces.append(qdisc[0])
1415+
self._interfaces = ifaces
1416+
return root_handle
1417+
1418+
def _create_class(self, handle, iface):
1419+
# If there is no class ID assigned yet, try to find first free
1420+
if self._class_id is None:
1421+
# First list existing classes
1422+
classes = self._run_tc([
1423+
'class', 'show', 'dev', iface, 'parent', handle])
1424+
if classes is None:
1425+
logging.error(
1426+
'Failed to query existing classes for parent %s on %s',
1427+
handle, iface)
1428+
return False
1429+
logging.debug('Found existing tc classes: %r', classes)
1430+
# Gather IDs and find first free
1431+
ids = [class_[2] for class_ in classes]
1432+
new_id = None
1433+
logging.debug('Existing class IDs on %s: %r', iface, ids)
1434+
for i in xrange(1, 0x10000):
1435+
test_id = '{}{:x}'.format(handle, i)
1436+
if test_id not in ids:
1437+
new_id = test_id
1438+
break
1439+
if new_id is None:
1440+
logging.error(
1441+
'Could not find any free class ID on %s under %s',
1442+
iface, handle)
1443+
return False
1444+
else:
1445+
# We already chose ID before
1446+
new_id = self._class_id
1447+
# Create new class
1448+
logging.info('Creating new tc class on %s with class ID: %s',
1449+
iface, new_id)
1450+
if self._run_tc([
1451+
'class', 'add', 'dev', iface,
1452+
'parent', handle, 'classid', new_id,
1453+
'htb', 'rate', '{}bps'.format(TcController.MAX_RATE),
1454+
]) is None:
1455+
logging.error('Failed to create tc class')
1456+
return False
1457+
atexit_command(['tc', 'class', 'del', 'dev', iface, 'classid', new_id])
1458+
self._class_id = new_id
1459+
return True
1460+
1461+
def _create_filter(self, handle, iface):
1462+
# It is OK if same filter already exists. However, if there is already
1463+
# a different filter we're in trouble.
1464+
return self._run_tc([
1465+
'filter', 'add', 'dev', iface, 'parent', handle,
1466+
'protocol', 'ip', 'prio', '10', 'handle', '1:', 'cgroup'
1467+
]) is not None
1468+
1469+
def _run_tc(self, args):
1470+
try:
1471+
output = subprocess.check_output(['tc'] + args)
1472+
except subprocess.CalledProcessError as e:
1473+
logging.exception(
1474+
'tc command failed; return code %d, output:\n%s\n',
1475+
e.returncode, e.output)
1476+
return None
1477+
# Split into words by line
1478+
output = output.splitlines()
1479+
output = list(map(str.split, output))
1480+
return output
1481+
1482+
12651483
@contextmanager
12661484
def log_parser(v2v_log):
12671485
parser = None
@@ -1334,6 +1552,7 @@ def throttling_update(runner, initial=None):
13341552
# Remove file when finished to prevent spamming logs with repeated
13351553
# messages
13361554
os.remove(state['internal']['throttling_file'])
1555+
logging.info('Fetched updated throttling info from file')
13371556
except IOError as e:
13381557
if e.errno != errno.ENOENT:
13391558
error('Failed to read throttling file', exception=True)
@@ -1364,16 +1583,41 @@ def throttling_update(runner, initial=None):
13641583
set_val = val
13651584
else:
13661585
error(
1367-
'Failed to parse value for CPU quota',
1368-
'Failed to parse value for CPU quota: %s', v)
1586+
'Failed to parse value for CPU limit',
1587+
'Failed to parse value for CPU limit: %s', v)
13691588
continue
13701589
if val != state['throttling']['cpu'] and \
13711590
runner.systemd_set_property('CPUQuota', set_val):
13721591
processed[k] = val
13731592
else:
13741593
error(
1375-
'Failed to set CPU quota',
1376-
'Failed to set CPU quota to %s', val)
1594+
'Failed to set CPU limit',
1595+
'Failed to set CPU limit to %s', val)
1596+
elif k == 'network':
1597+
if v is None or v == 'unlimited':
1598+
# Treat empty value and 'unlimited' in the same way
1599+
val = 'unlimited'
1600+
set_val = 'unlimited'
1601+
else:
1602+
m = re.match("([+0-9]+)$", v)
1603+
if m is not None:
1604+
val = m.group(1)
1605+
set_val = val
1606+
else:
1607+
error(
1608+
'Failed to parse value for network limit',
1609+
'Failed to parse value for network limit: %s', v)
1610+
continue
1611+
if val != state['throttling']['network'] and \
1612+
runner.set_network_limit(set_val):
1613+
logging.debug(
1614+
'Changing network throttling to %s (previous: %s)',
1615+
val, state['throttling']['network'])
1616+
processed[k] = val
1617+
else:
1618+
error(
1619+
'Failed to set network limit',
1620+
'Failed to set network limit to %s', val)
13771621
else:
13781622
logging.debug('Ignoring unknown throttling request: %s', k)
13791623
state['throttling'].update(processed)

0 commit comments

Comments
 (0)