Skip to content

Commit

Permalink
Implement timeouts for process executions.
Browse files Browse the repository at this point in the history
This is required because sometimes you need to benchmark VMs and/or
benchmark suites which freeze up. We've seen this numerous times with
Java VMs and benchmarks. Without having a timeout to kill frozen process
executions, you can't make progress and have to restart the experiment
(possibly skipping that benchmark).

Further, by restarting the experiment you are biasing your experiment:
giving benchmarks that have the worst performance (that's what a
non-terminating benchmark is) another chance to redeem themselves.

Timed-out process executions now show as a 'T' flag in the manifest file.
Flags for finished process executions are carried through to the results
file in a new `pexec_flags` field. This field can be one of {C, E, T}
(completed, errored, timed-out).
  • Loading branch information
vext01 committed May 14, 2020
1 parent 68532f3 commit a02368f
Show file tree
Hide file tree
Showing 14 changed files with 134 additions and 50 deletions.
5 changes: 5 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ You need to have the following programs installed:
* GNU make, a C compiler and libc (`build-essential` package in Debian)
* cpufrequtils (Linux only. `cpufrequtils` package in Debian)
* cffi (`python-cffi` package in Debian)
* subprocess32 Python module.
* cset (for pinning on Linux only. `cpuset` package in Debian)
* virt-what (Linux only. `virt-what` package in Debian)
* Our custom Linux kernel (see below).
Expand Down Expand Up @@ -464,6 +465,10 @@ The structure of the JSON results is as follows:
# (structure same as 'core_cycle_counts')
'mperf_counts': {...} # Per-core MPERF deltas
# (structure same as 'core_cycle_counts')
'pexec_flags': {...} # A flag for each process execution:
# 'C' completed OK.
# 'E' benchmark crashed.
# 'T' benchmark timed out.
'eta_estimates': {u"bmark:VM:variant": [t_0, t_1, ...], ...} # A dict mapping
# benchmark keys to rough process execution times. Used internally:
# users can ignore this.
Expand Down
2 changes: 2 additions & 0 deletions examples/example.krun
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ LUAJIT_BIN = find_executable("luajit")
if LUAJIT_BIN is None:
fatal("luajit binary not found in path")

EXECUTION_TIMEOUT = 60 # time allowance for each process execution in seconds.

# Who to mail
MAIL_TO = []

Expand Down
4 changes: 3 additions & 1 deletion krun/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ def __init__(self, config_file=None):
self.AMPERF_RATIO_BOUNDS = None
self.PRE_EXECUTION_CMDS = []
self.POST_EXECUTION_CMDS = []
self.EXECUTION_TIMEOUT = None

# config defaults (callbacks)
self.custom_dmesg_whitelist = None
Expand Down Expand Up @@ -139,4 +140,5 @@ def __eq__(self, other):
(self.SKIP == other.SKIP) and
(self.N_EXECUTIONS == other.N_EXECUTIONS) and
(self.PRE_EXECUTION_CMDS == other.PRE_EXECUTION_CMDS) and
(self.POST_EXECUTION_CMDS == other.POST_EXECUTION_CMDS))
(self.POST_EXECUTION_CMDS == other.POST_EXECUTION_CMDS) and
(self.EXECUTION_TIMEOUT == other.EXECUTION_TIMEOUT))
5 changes: 3 additions & 2 deletions krun/mail.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,7 @@
import socket
import textwrap
import logging
from subprocess import Popen, PIPE
import subprocess32


FROM_USER = "noreply"
Expand Down Expand Up @@ -79,7 +79,8 @@ def _sendmail(self, msg):
logging.debug("Sending email to '%s' subject line '%s'" %
(msg['To'], msg['Subject']))

pipe = Popen([SENDMAIL, "-t", "-oi"], stdin=PIPE)
pipe = subprocess32.Popen([SENDMAIL, "-t", "-oi"],
stdin=subprocess32.PIPE)
pipe.communicate(msg.as_string())

rc = pipe.returncode
Expand Down
4 changes: 2 additions & 2 deletions krun/platform.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import difflib
import sys
import glob
import subprocess
import subprocess32
import re
import pwd
import cffi
Expand Down Expand Up @@ -491,7 +491,7 @@ def sync_disks(self):
"""Force pending I/O to physical disks"""

debug("sync disks...")
rc = subprocess.call("/bin/sync")
rc = subprocess32.call("/bin/sync")
if rc != 0:
fatal("sync failed")

Expand Down
16 changes: 15 additions & 1 deletion krun/results.py
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,9 @@ def __init__(self, config, platform, results_file=None):
self.aperf_counts = dict()
self.mperf_counts = dict()

# Record the flag for each process execution.
self.pexec_flags = dict()

# Record how long execs are taking so we can give the user a rough ETA.
# Maps "bmark:vm:variant" -> [t_0, t_1, ...]
self.eta_estimates = dict()
Expand Down Expand Up @@ -79,6 +82,7 @@ def init_from_config(self):
self.core_cycle_counts[key] = []
self.aperf_counts[key] = []
self.mperf_counts[key] = []
self.pexec_flags[key] = []
self.eta_estimates[key] = []

def read_from_file(self, results_file):
Expand All @@ -104,6 +108,7 @@ def integrity_check(self):
cycles_len = len(self.core_cycle_counts[key])
aperf_len = len(self.aperf_counts[key])
mperf_len = len(self.mperf_counts[key])
pexec_flags_len = len(self.pexec_flags[key])

if eta_len != wct_len:
fatal("inconsistent etas length: %s: %d vs %d" % (key, eta_len, wct_len))
Expand All @@ -117,6 +122,9 @@ def integrity_check(self):
if mperf_len != wct_len:
fatal("inconsistent mperf length: %s: %d vs %d" % (key, mperf_len, wct_len))

if pexec_flags_len != wct_len:
fatal("inconsistent pexec flags length: %s: %d vs %d" % (key, pexec_flags_len, wct_len))

# Check the length of the different measurements match and that the
# number of per-core measurements is consistent.
for exec_idx in xrange(len(self.wallclock_times[key])):
Expand Down Expand Up @@ -167,6 +175,7 @@ def write_to_file(self):
"core_cycle_counts": self.core_cycle_counts,
"aperf_counts": self.aperf_counts,
"mperf_counts": self.mperf_counts,
"pexec_flags": self.pexec_flags,
"audit": self.audit.audit,
"eta_estimates": self.eta_estimates,
"error_flag": self.error_flag,
Expand All @@ -189,17 +198,22 @@ def __eq__(self, other):
self.core_cycle_counts == other.core_cycle_counts and
self.aperf_counts == other.aperf_counts and
self.mperf_counts == other.mperf_counts and
self.pexec_flags == other.pexec_flags and
self.audit == other.audit and
self.eta_estimates == other.eta_estimates and
self.error_flag == other.error_flag)

def append_exec_measurements(self, key, measurements):
def append_exec_measurements(self, key, measurements, flag):
"""Unpacks a measurements dict into the Results instance"""

# Only a subset of flags can arise at this time.
assert flag in ("C", "E", "T")

# Consistently format monotonic time doubles
wallclock_times = format_raw_exec_results(
measurements["wallclock_times"])

self.pexec_flags[key].append(flag)
self.wallclock_times[key].append(wallclock_times)
self.core_cycle_counts[key].append(measurements["core_cycle_counts"])
self.aperf_counts[key].append(measurements["aperf_counts"])
Expand Down
27 changes: 19 additions & 8 deletions krun/scheduler.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@

from logging import warn, info, error, debug

import os, subprocess, sys, time
import os, sys, time
import krun.util as util

# Wait this many seconds for the init system to finish bringing up services.
Expand Down Expand Up @@ -139,7 +139,7 @@ def _parse(self):
if not key in self.completed_exec_counts:
self.completed_exec_counts[key] = 0

if flag in ["S", "E", "C"]: # skip, error, completed
if flag in ["S", "E", "C", "T"]: # skip, error, completed, timeout
pass
elif flag == "O": # outstanding
self.outstanding_exec_counts[key] += 1
Expand All @@ -158,7 +158,7 @@ def _parse(self):
else:
self.skipped_keys |= set([key])

if flag in ["E", "C"]:
if flag in ["E", "C", "T"]:
self.completed_exec_counts[key] += 1

exec_idx += 1
Expand Down Expand Up @@ -370,12 +370,16 @@ def run(self, mailer, dry_run=False):
if not dry_run:
self.sched.platform.collect_starting_throttle_counts()

stdout, stderr, rc, envlog_filename = \
stdout, stderr, rc, envlog_filename, timed_out = \
vm_def.run_exec(entry_point, in_proc_iters, self.parameter,
heap_limit_kb, stack_limit_kb, self.key,
self.key_pexec_idx)

if not dry_run:
if timed_out:
measurements = self.empty_measurements
instr_data = {}
flag = "T"
elif not dry_run:
try:
self.sched.platform.check_throttle_counts(self.sched.manifest)
measurements = util.check_and_parse_execution_results(
Expand Down Expand Up @@ -416,8 +420,15 @@ def run(self, mailer, dry_run=False):
info("Finished '%s(%d)' (%s variant) under '%s'" %
(self.benchmark, self.parameter, self.variant, self.vm_name))

# Move the environment log out of /tmp
if not dry_run and flag != "O":
# Move the environment log out of /tmp.
#
# We don't do this for re-runs (O) as the log for the re-run pexec is
# the one we want.
#
# We don't do this for timeouts (T) because the wrapper script is
# killed upon timeout, and thus doesn't get a chance to log the
# environment.
if not dry_run and flag not in ("O", "T"):
key_exec_num = self.sched.manifest.completed_exec_counts[self.key]
util.stash_envlog(envlog_filename, self.sched.config,
self.sched.platform, self.key, key_exec_num)
Expand Down Expand Up @@ -557,7 +568,7 @@ def run(self):
raise RuntimeError("reached unreachable code!")

# Store new result.
results.append_exec_measurements(job.key, measurements)
results.append_exec_measurements(job.key, measurements, flag)

# Store instrumentation data in a separate file
if job.vm_info["vm_def"].instrument:
Expand Down
2 changes: 2 additions & 0 deletions krun/tests/test_results.py
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ def fake_results(mock_platform, no_results_instantiation_check):
[[[4., 4.], [4., 4.,]], [[4., 4.], [4., 4.]]]}
results.mperf_counts = {"bench:vm:variant":
[[[5., 5.], [5., 5.,]], [[5., 5.], [5., 5.]]]}
results.pexec_flags = {"bench:vm:variant": ["C", "T"]}
return results


Expand Down Expand Up @@ -91,6 +92,7 @@ def test_write_results_to_disk(self, mock_platform,
results0.core_cycle_counts = {u"dummy:Java:default-java": [[[2], [3], [4], [5]]]}
results0.aperf_counts = {u"dummy:Java:default-java": [[[3], [4], [5], [6]]]}
results0.mperf_counts = {u"dummy:Java:default-java": [[[4], [5], [6], [7]]]}
results0.pexec_flags = {u"dummy:Java:default-java": [[["C"], ["C"], ["C"], ["C"]]]}
results0.reboots = 5
results0.error_flag = False
results0.write_to_file()
Expand Down
19 changes: 13 additions & 6 deletions krun/tests/test_util.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,20 @@
run_shell_cmd_bench, get_git_version, ExecutionFailed,
get_session_info, run_shell_cmd_list, FatalKrunError,
stash_envlog, dump_instr_json, RerunExecution,
make_instr_dir)
make_instr_dir, read_popen_output_carefully)
from krun.tests.mocks import MockMailer
from krun.tests import TEST_DIR
from krun.config import Config
from krun.scheduler import ManifestManager
from krun.tests.mocks import mock_platform, mock_manifest, mock_mailer
from krun.platform import detect_platform
from bz2 import BZ2File

import json
import logging
import pytest
import os
import subprocess32
from tempfile import NamedTemporaryFile


Expand Down Expand Up @@ -74,27 +76,25 @@ def test_run_shell_cmd_fatal():
assert out == ""

def test_run_shell_cmd_bench():
from krun.platform import detect_platform
platform = detect_platform(None, None)
msg = "example text\n"
out, err, rc = run_shell_cmd_bench("echo " + msg, platform)
out, err, rc, _ = run_shell_cmd_bench("echo " + msg, platform)
assert out == msg
assert err == ""
assert rc == 0

msg2 = "another example\n"
out, err, rc = run_shell_cmd_bench(
out, err, rc, _ = run_shell_cmd_bench(
"(>&2 echo %s) && (echo %s)" % (msg2, msg),
platform)
assert out == msg
assert err == msg2
assert rc == 0

def test_run_shell_cmd_bench_fatal():
from krun.platform import detect_platform
cmd = "nonsensecommand"
platform = detect_platform(None, None)
out, err, rc = run_shell_cmd_bench(cmd, platform, False)
out, err, rc, _ = run_shell_cmd_bench(cmd, platform, False)
assert rc != 0
assert cmd in err
assert out == ""
Expand Down Expand Up @@ -405,3 +405,10 @@ def test_dump_instr_json0001():
os.rmdir(dump_dir)

assert js == instr_data


def test_read_popen_output_carefully_0001():
platform = detect_platform(None, None)
process = subprocess32.Popen(["/bin/sleep", "5"], stdout=subprocess32.PIPE)
_, _, _, timed_out = read_popen_output_carefully(process, platform, timeout=1)
assert timed_out
10 changes: 6 additions & 4 deletions krun/tests/test_vmdef.py
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ def fake_sync_disks():
monkeypatch.setattr(platform, "sync_disks", fake_sync_disks)

def fake_run_exec_popen(args, stderr_file=None):
return "[1]", "", 0 # stdout, stderr, exit_code
return "[1]", "", 0, False # stdout, stderr, exit_code, timed_out
monkeypatch.setattr(vm_def, "_run_exec_popen", fake_run_exec_popen)

vm_def.run_exec(ep, 1, 1, 1, 1, "test:dummyvm:default", 0)
Expand Down Expand Up @@ -113,7 +113,7 @@ def fake_sync_disks():
monkeypatch.setattr(platform, "sync_disks", fake_sync_disks)

def fake_run_exec_popen(args, stderr_file=None):
return stdout, "", 0 # stdout, stderr, exit_code
return stdout, "", 0, False # stdout, stderr, exit_code, timed_out

monkeypatch.setattr(vm_def, "_run_exec_popen", fake_run_exec_popen)

Expand All @@ -130,11 +130,12 @@ def test_run_exec_popen0001(self, monkeypatch):

args = [sys.executable, "-c",
"import sys; sys.stdout.write('STDOUT'); sys.stderr.write('STDERR')"]
out, err, rv = vm_def._run_exec_popen(args)
out, err, rv, timed_out = vm_def._run_exec_popen(args)

assert err == "STDERR"
assert out == "STDOUT"
assert rv == 0
assert timed_out == False

def test_run_exec_popen0002(self, monkeypatch):
"""Check that writing stderr to a file works. Used for instrumentation"""
Expand All @@ -149,11 +150,12 @@ def test_run_exec_popen0002(self, monkeypatch):

with NamedTemporaryFile(delete=False, prefix="kruntest") as fh:
filename = fh.name
out, err, rv = vm_def._run_exec_popen(args, fh)
out, err, rv, timed_out = vm_def._run_exec_popen(args, fh)

assert err == "" # not here due to redirection
assert out == "STDOUT" # behaviour should be unchanged
assert rv == 0
assert timed_out == False

# stderr should be in this file
with open(filename) as fh:
Expand Down
Loading

0 comments on commit a02368f

Please sign in to comment.