Skip to content

Commit

Permalink
refs #15: Fix subproc reclamation and implement auto-plotting in PPC …
Browse files Browse the repository at this point in the history
…experiment scripts.

 * In Python, read() returns an empty string when it reaches EOF.
   The zombie process + infinite loop problem was due to missing this.

 * Automated PPC plotting using matplotlib.
   It can be reused for other types of load balancers as well.
  • Loading branch information
achimnol committed Jul 10, 2015
1 parent a46fb51 commit 04023bc
Show file tree
Hide file tree
Showing 4 changed files with 228 additions and 75 deletions.
45 changes: 22 additions & 23 deletions scripts/exprlib.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ def __init__(self, verbose=False):
self._cpu_timer = None
self._start_time = None
self._loop = asyncio.get_event_loop()
self._loop.add_signal_handler(signal.SIGINT, self._signal_coro)
#self._loop.add_signal_handler(signal.SIGINT, self._signal_coro)

# Configurations.
self._nba_env = {
Expand Down Expand Up @@ -351,44 +351,52 @@ def execute_main(self, config_name, click_name,
self._main_proc = yield from asyncio.create_subprocess_exec('bin/main', *args,
loop=self._loop,
stdout=subprocess.PIPE,
stderr=None,
stdin=None,
start_new_session=True,
env=self.get_merged_env())

assert self._main_proc.stdout is not None

# Run the readers.
if custom_stdout_coro:
self._main_tasks.append(custom_stdout_coro(self._main_proc.stdout))
asyncio.async(custom_stdout_coro(self._main_proc.stdout), loop=self._loop)
else:
self._main_tasks.append(self._main_read_stdout_coro(self._main_proc.stdout))
asyncio.async(self._main_read_stdout_coro(self._main_proc.stdout), loop=self._loop)

# Create timers.
self._start_time = self._loop.time()
if running_time > 0:
self._delayed_calls.append(self._loop.call_later(running_time, self._main_finish_cb))
self._cpu_timer_fires = 0
if self._cpu_measure_time is not None:
self._delayed_calls.append(self._loop.call_later(self._cpu_measure_time, self._cpu_timer_cb))

# Run the stdout/stderr reader tasks.
# (There may be other tasks in _main_tasks.)
if self._main_tasks:
done, pending = yield from asyncio.wait(self._main_tasks, loop=self._loop, timeout=running_time + 0.1)

# Wait.
if running_time > 0:
yield from asyncio.sleep(running_time + 1)
yield from asyncio.sleep(running_time)
else:
while not self._break:
yield from asyncio.sleep(1)
yield from asyncio.sleep(0.5)
yield from asyncio.sleep(0.2)

# Reclaim the child.
try:
sid = os.getsid(self._main_proc.pid)
os.killpg(sid, signal.SIGTERM)
except ProcessLookupError:
# when the program automatically terminates (e.g., alb_measure)
# it might already have terminated.
pass
for handle in self._delayed_calls:
handle.cancel()
try:
exitcode = yield from asyncio.wait_for(self._main_proc.wait(), 3)
except asyncio.TimeoutError:
# If the termination times out, kill it.
# (GPU/ALB configurations often hang...)
print('The main process hangs during termination. Killing it...', file=sys.stderr)
os.killpg(os.getpgid(self._main_proc.pid), signal.SIGKILL)
exitcode = -9
os.killpg(os.getsid(self._main_proc.pid), signal.SIGKILL)
exitcode = -signal.SIGKILL
# We don't wait it...

self._main_proc = None
self._main_tasks.clear()
Expand Down Expand Up @@ -479,15 +487,6 @@ def _main_read_stdout_coro(self, stdout):
self._total_gbps.clear()
self._port_records.clear()

def _main_finish_cb(self):
print('Finished')
if self._main_proc:
self._main_proc.stdout.feed_eof()
yield from asyncio.sleep(0.2)
os.killpg(os.getpgid(self._main_proc.pid), signal.SIGTERM)
for handle in self._delayed_calls:
handle.cancel()

@asyncio.coroutine
def _cpu_measure_coro(self):
cur_ts = self._loop.time() - self._start_time
Expand Down
128 changes: 103 additions & 25 deletions scripts/run_ppc_adaptive.py
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@
from pathlib import Path
from statistics import mean
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from matplotlib.font_manager import FontProperties
from matplotlib.colors import hex2color
from matplotlib.font_manager import findSystemFonts, FontProperties
from matplotlib.gridspec import GridSpec
from exprlib import execute, execute_async_simple, comma_sep_numbers, host_port_pair, ExperimentEnv
from pspgen import PktGenRunner

Expand All @@ -25,11 +28,8 @@ def read_stdout_coro(records, stdout):
rx_thr_node = re.compile(r'node (\d+)$')
last_node_thruputs = [0, 0]
while True:
try:
line = yield from stdout.readline()
except asyncio.CancelledError:
break
if line is None: break
line = yield from stdout.readline()
if not line: break
line = line.decode('utf8')
if line.startswith('[PPC:') and 'converge' not in line:
pieces = tuple(s.replace(',', '') for s in line.split())
Expand All @@ -45,6 +45,19 @@ def read_stdout_coro(records, stdout):
# [PPC:1] CPU 30 GPU 47 PPC 30 CPU-Ratio 1.000
# Total forwarded pkts: 39.53 Mpps, 27.83 Gbps in node 1

def find_times(bold=False, italic=False):
fonts = findSystemFonts()
for fontpath in fonts:
fprop = FontProperties(fname=fontpath)
name = fprop.get_name()
name_matched = 'Times New Roman' in name
pname = os.path.splitext(os.path.basename(fontpath))[0]
style_matched = ((not bold) or (bold and (pname.endswith('Bold') or (pname.lower() == pname and pname.endswith('bd'))))) and \
((not italic) or (italic and (pname.endswith('Italic') or (pname.lower() == pname and pname.endswith('i')))))
if name_matched and style_matched:
return fprop
return None

def draw_plot(records, confname, pktsize, base_path='~/Dropbox/temp/plots/nba/'):
category = 'ppc-adaptive'
now = datetime.now()
Expand All @@ -57,35 +70,92 @@ def draw_plot(records, confname, pktsize, base_path='~/Dropbox/temp/plots/nba/')
#plt.rc('font', family='Times New Roman')
rec_count = min(len(recs) for recs in records)
print('# records: {0}'.format(rec_count))

fig, ax = plt.subplots()
ax.set_xlim(0, rec_count)
ax.set_xlabel('Ticks', fontweight='bold')
ax.set_ylabel('Throughput (Gbps)', fontweight='bold')
ax2 = ax.twinx()
ax2.set_ylabel('Offloading Ratio', fontweight='bold')
ax2.set_ylim(0, 1)

line_colors = ['b', 'g']
bar_colors = ['r', 'y']
times_bold = find_times(True)
times_bold.set_size(6)
plt.rcParams.update({
'font.family': 'Times New Roman',
'font.size': 5,
'axes.labelsize': 5,
'axes.titlesize': 6,
'legend.fontsize': 5,
'axes.linewidth': 0.64,
})

fig = plt.figure(figsize=(3.3, 1.8), dpi=150)
gs = GridSpec(2, 1, height_ratios=(4, 1.5))
ax_perf = plt.subplot(gs[0]) # sharex?
ax_ratio = plt.subplot(gs[1])

ax_perf.set_xlim(0, rec_count - 1)
ax_perf.set_ylabel(' Throughput (Gbps)', fontproperties=times_bold, labelpad=2)
ax_ppc = ax_perf.twinx()
ax_ppc.set_ylabel('PPC (cycles)', fontproperties=times_bold, labelpad=3)
ax_ppc.set_xlim(0, rec_count - 1)
ax_ratio.set_xlabel('Ticks', fontproperties=times_bold, labelpad=1)
ax_ratio.set_xlim(0, rec_count - 1)
ax_ratio.set_ylabel('Offloading Weight', fontproperties=times_bold, labelpad=2)
ax_ratio.set_ylim(0, 1)
for l in ax_perf.get_xticklabels(): l.set_visible(False)
for l in ax_ppc.get_xticklabels(): l.set_visible(False)
l = ax_perf.get_yticklabels()[0]
l.set_visible(False)

box = ax_perf.get_position()
ax_perf.set_position([box.x0 - 0.03, box.y0 + 0.05, box.width*0.62, box.height])
box = ax_ppc.get_position()
ax_ppc.set_position([box.x0 - 0.03, box.y0 + 0.05, box.width*0.62, box.height])
box2 = ax_ratio.get_position()
ax_ratio.set_position([box2.x0 - 0.03, box2.y0 + 0.05, box2.width*0.62, box.y0 - box2.y0])

ppc_colors = [hex2color('#ff375d'), hex2color('#bf0000')]
ratio_colors = [hex2color('#376bff'), hex2color('#0033bf')]
thr_colors = [hex2color('#d8d8d8'), hex2color('#7f7f7f')]

cpu_ratio = []
thruput = []
legend_items_thr = []
legend_items_pcpu = []
legend_items_pgpu = []
legend_items_pest = []
legend_items_ratio = []

x_ind = np.arange(rec_count)

for node_id in range(2):
cpu_ratio.append(tuple(r.cpu_ratio for r in records[node_id][:rec_count]))
for node_id in range(len(records)):
ppc_cpu = tuple(r.ppc_cpu for r in records[node_id][:rec_count])
ppc_gpu = tuple(r.ppc_gpu for r in records[node_id][:rec_count])
ppc_est = tuple(r.ppc_est for r in records[node_id][:rec_count])
thruput.append(tuple(r.thruput for r in records[node_id][:rec_count]))
gpu_ratio = 1 - np.array(cpu_ratio[node_id][:rec_count])
ax.bar(x_ind, thruput[node_id], bottom=thruput[node_id - 1] if node_id > 0 else None, color=bar_colors[node_id], edgecolor='none', width=1.05)
ax2.plot(x_ind, gpu_ratio, color=line_colors[node_id])
cpu_ratio.append(tuple(r.cpu_ratio for r in records[node_id][:rec_count]))
offl_ratio = 1 - np.array(cpu_ratio[node_id][:rec_count])
h_thr = ax_perf.bar(x_ind, thruput[node_id], bottom=thruput[node_id - 1] if node_id > 0 else None,
color=thr_colors[node_id], edgecolor=thr_colors[node_id], width=1.0, align='center')
#step_thr = np.array(thruput[0])
#for thr in thruput[1:node_id+1]:
# step_thr += np.array(thr)
#ax_perf.step(x_ind, step_thr, color='black', where='mid')
h_pcpu, = ax_ppc.plot(x_ind, ppc_cpu, color=ppc_colors[node_id], lw=0.8)
h_pgpu, = ax_ppc.plot(x_ind, ppc_gpu, color=ppc_colors[node_id], lw=0.8)
h_pest, = ax_ppc.plot(x_ind, ppc_est, color=ppc_colors[node_id], lw=1.6)
h_ratio, = ax_ratio.plot(x_ind, offl_ratio, color=ratio_colors[node_id], lw=0.8)
h_pcpu.set_dashes([0.6, 0.6])
h_pgpu.set_dashes([2.4, 0.6])
legend_items_thr.append((h_thr, 'Throughput (Node {0})'.format(node_id)))
legend_items_pcpu.append((h_pcpu, 'PPC of CPU (Node {0})'.format(node_id)))
legend_items_pgpu.append((h_pgpu, 'PPC of GPU (Node {0})'.format(node_id)))
legend_items_pest.append((h_pest, 'Estimated PPC (Node {0})'.format(node_id)))
legend_items_ratio.append((h_ratio, 'Offloading Weight (Node {0})'.format(node_id)))

legend_items = legend_items_thr + legend_items_pcpu + legend_items_pgpu + legend_items_pest + legend_items_ratio
legend = fig.legend([li[0] for li in legend_items], [li[1] for li in legend_items], bbox_to_anchor=(0.99, 0.5), loc='right', ncol=1, borderaxespad=0)
legend.get_frame().set_linewidth(0.64)

try:
path.parent.mkdir(parents=True)
except FileExistsError:
pass
print('Saving figure to {0}...'.format(path))
plt.savefig(str(path), transparent=True)
plt.savefig(str(path), transparent=True, dpi=300)

if __name__ == '__main__':

Expand All @@ -108,6 +178,8 @@ def draw_plot(records, confname, pktsize, base_path='~/Dropbox/temp/plots/nba/')
args = parser.parse_args()

env = ExperimentEnv(verbose=args.verbose)
loop = asyncio.get_event_loop()

pktgens = []
if not args.emulate_io:
for host, port in args.pktgen:
Expand Down Expand Up @@ -147,20 +219,26 @@ def draw_plot(records, confname, pktsize, base_path='~/Dropbox/temp/plots/nba/')
# Run.
with ExitStack() as stack:
_ = [stack.enter_context(pktgen) for pktgen in pktgens]
loop = asyncio.get_event_loop()
retcode = loop.run_until_complete(env.execute_main(args.sys_config_to_use, args.element_config_to_use,
running_time=32.0, emulate_opts=emulate_opts,
custom_stdout_coro=partial(read_stdout_coro, records)))
if retcode != 0 and retcode != -9:
print('The main program exited abnormaly, and we ignore the results! (exit code: {0})'.format(retcode))

#records[0].append(PPCRecord(10, 12, 10, 0.5, 25.5))
#records[0].append(PPCRecord(11, 13, 12, 0.6, 23.5))
#records[0].append(PPCRecord(10, 10, 13, 0.5, 22.5))
#records[0].append(PPCRecord(11, 9, 15, 0.4, 28.5))
#records[0].append(PPCRecord(11, 12, 16, 0.3, 29.5))
#records[1].append(PPCRecord(13, 10, 9, 0.3, 25.5))
#records[1].append(PPCRecord(14, 15, 15, 0.25, 24.8))
#records[1].append(PPCRecord(15, 11, 13, 0.4, 24.2))
#records[1].append(PPCRecord(12, 12, 12, 0.48, 28.8))
#records[1].append(PPCRecord(13, 13, 13, 0.5, 29.2))

sys.stdout.flush()
time.sleep(1)

draw_plot(records, args.element_config_to_use, pktsize)

loop.close()
Loading

0 comments on commit 04023bc

Please sign in to comment.