Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dagvis #303

Open
wants to merge 11 commits into
base: develop
Choose a base branch
from
8 changes: 8 additions & 0 deletions maestrowf/conductor.py
Original file line number Diff line number Diff line change
Expand Up @@ -304,6 +304,14 @@ def initialize(self, batch_info, sleeptime=60):
# Write metadata
self._exec_dag.set_adapter(batch_info)
self._study.store_metadata()

LOGGER.debug("Exporting dag.")
if self._study.draw:
# Setup base name for each format option
dag_basename = os.path.join(self._study.output_path,
'dag_{}_'.format(self._study.name))
self._exec_dag.export_dag_vis(dag_basename, self._study.draw)

self._setup = True

def monitor_study(self):
Expand Down
25 changes: 17 additions & 8 deletions maestrowf/datastructures/core/executiongraph.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,10 +55,10 @@ def __init__(self, workspace, step, **kwargs):
self.script = kwargs.get("script", "")
self.restart_script = kwargs.get("restart", "")
self.to_be_scheduled = False
self.step = step
self._step = step
self.restart_limit = kwargs.get("restart_limit", 3)

# Status Information
# Status Informationp
self._num_restarts = 0
self._submit_time = None
self._start_time = None
Expand All @@ -82,11 +82,13 @@ def generate_script(self, adapter, tmp_dir=""):
else:
scr_dir = self.workspace.value

self.step.run["cmd"] = self.workspace.substitute(self.step.run["cmd"])
self._step.run["cmd"] = self.workspace.substitute(
self._step.run["cmd"]
)

LOGGER.info("Generating script for %s into %s", self.name, scr_dir)
self.to_be_scheduled, self.script, self.restart_script = \
adapter.write_script(scr_dir, self.step)
adapter.write_script(scr_dir, self._step)
LOGGER.info("Script: %s\nRestart: %s\nScheduled?: %s",
self.script, self.restart_script, self.to_be_scheduled)

Expand Down Expand Up @@ -122,12 +124,12 @@ def can_restart(self):
def _execute(self, adapter, script):
if self.to_be_scheduled:
srecord = adapter.submit(
self.step, script, self.workspace.value)
self._step, script, self.workspace.value)
else:
self.mark_running()
ladapter = ScriptAdapterFactory.get_adapter("local")()
srecord = ladapter.submit(
self.step, script, self.workspace.value)
self._step, script, self.workspace.value)

retcode = srecord.submission_code
jobid = srecord.job_identifier
Expand Down Expand Up @@ -230,7 +232,14 @@ def name(self):

:returns: The name of the StudyStep contained within the record.
"""
return self.step.name
return self._step.name

@property
def step(self):
"""
Get the study step object represented by the record instance
"""
return self._step

@property
def walltime(self):
Expand All @@ -239,7 +248,7 @@ def walltime(self):

:returns: A string representing the requested computing time.
"""
return self.step.run["walltime"]
return self._step.run["walltime"]

@property
def time_submitted(self):
Expand Down
14 changes: 14 additions & 0 deletions maestrowf/datastructures/core/parameters.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,20 @@ def apply(self, item):
# substrings.
return item

@property
def param_vals(self):
"""
Return dict of parameter values
"""
return self._params

@property
def param_labels(self):
"""
Return dict of parameter labels
"""
return self._labels


class ParameterGenerator:
"""
Expand Down
62 changes: 46 additions & 16 deletions maestrowf/datastructures/core/study.py
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ class StudyStep:
def __init__(self):
"""Object that represents a single workflow step."""
self.name = ""
self.base_name = "" # Stores unparameterized name
self.description = ""
self.run = {
"cmd": "",
Expand All @@ -83,6 +84,9 @@ def __init__(self):
"reservation": ""
}

self._param_vals = {} # Better to be None?
self._param_labels = {}

def apply_parameters(self, combo):
"""
Apply a parameter combination to the StudyStep.
Expand All @@ -92,8 +96,13 @@ def apply_parameters(self, combo):
"""
# Create a new StudyStep and populate it with substituted values.
tmp = StudyStep()

base_name = tmp.name
tmp.__dict__ = apply_function(self.__dict__, combo.apply)
# Return if the new step is modified and the step itself.
tmp.base_name = base_name # Why doesn't this work here?
tmp._param_vals = combo.param_vals
tmp._param_labels = combo.param_labels

return self.__ne__(tmp), tmp

Expand Down Expand Up @@ -122,6 +131,20 @@ def __ne__(self, other):
"""
return not self.__eq__(other)

@property
def param_vals(self):
"""
Return dict of parameter values for this step
"""
return self._param_vals

@property
def param_labels(self):
"""
Return dict of parameter labels for this step
"""
return self._param_labels


class Study(DAG, PickleInterface):
"""
Expand Down Expand Up @@ -164,7 +187,8 @@ class Study(DAG, PickleInterface):
"""

def __init__(self, name, description,
studyenv=None, parameters=None, steps=None, out_path="./"):
studyenv=None, parameters=None, steps=None, out_path="./",
draw=False):
FrankD412 marked this conversation as resolved.
Show resolved Hide resolved
"""
Study object used to represent the full workflow of a study.

Expand Down Expand Up @@ -200,11 +224,13 @@ def __init__(self, name, description,
self.is_configured = False
self.add_node(SOURCE, None)

# Settings for handling restarts and submission attempts.
self._restart_limit = 0
self._submission_attempts = 0
self._use_tmp = False
self._dry_run = False
# Settings for handling restarts and submission attempts. Just set to
# defaults here.
self._restart_limit = 0 # Number of restarts before fail
self._submission_attempts = 0 # Submit attempts before fail
self._use_tmp = False # tmp dir for script/lock writing.
self._dry_run = False # Enables dry-run (disables submit)
self.draw = False # Set dag vis flag

# Management structures
# The workspace used by each step.
Expand Down Expand Up @@ -391,24 +417,25 @@ def setup_environment(self):

def configure_study(self, submission_attempts=1, restart_limit=1,
throttle=0, use_tmp=False, hash_ws=False,
dry_run=False):
dry_run=False, draw=[]):
"""
Perform initial configuration of a study. \
Perform initial configuration of a study.

The method is used for going through and actually acquiring each \
dependency, substituting variables, sources and labels. \
dependency, substituting variables, sources and labels.

:param submission_attempts: Number of attempted submissions before \
marking a step as failed. \
marking a step as failed.
:param restart_limit: Upper limit on the number of times a step with \
a restart command can be resubmitted before it is considered failed. \
:param throttle: The maximum number of in-progress jobs allowed. [0 \
denotes no cap].\
denotes no cap].
:param use_tmp: Boolean value specifying if the generated \
ExecutionGraph dumps its information into a temporary directory. \
ExecutionGraph dumps its information into a temporary directory.
:param dry_run: Boolean value that toggles dry run to just generate \
study workspaces and scripts without execution or status checking. \
:returns: True if the Study is successfully setup, False otherwise. \
study workspaces and scripts without execution or status checking.
:param draw: List of visualization dot style options [empty = no draw].
:returns: True if the Study is successfully setup, False otherwise.
"""

self._submission_attempts = submission_attempts
Expand All @@ -417,6 +444,7 @@ def configure_study(self, submission_attempts=1, restart_limit=1,
self._use_tmp = use_tmp
self._hash_ws = hash_ws
self._dry_run = dry_run
self.draw = draw

LOGGER.info(
"\n------------------------------------------\n"
Expand All @@ -426,10 +454,11 @@ def configure_study(self, submission_attempts=1, restart_limit=1,
"Use temporary directory = %s\n"
"Hash workspaces = %s\n"
"Dry run enabled = %s\n"
"Graph vis options = %s\n"
"Output path = %s\n"
"------------------------------------------",
submission_attempts, restart_limit, throttle,
use_tmp, hash_ws, dry_run, self._out_path
use_tmp, hash_ws, dry_run, self.draw, self._out_path
)

self.is_configured = True
Expand All @@ -446,7 +475,7 @@ def _stage(self, dag):
# Items to store that should be reset.
LOGGER.info(
"\n==================================================\n"
"Constructing parameter study '%s'\n"
"Constructing study '%s'\n"
"==================================================\n",
self.name
)
Expand Down Expand Up @@ -644,6 +673,7 @@ def _stage(self, dag):
self.step_combos[step].add(combo_str)

modified, step_exp = node.apply_parameters(combo)
step_exp.base_name = step_exp.name
step_exp.name = combo_str

# Substitute workspaces into the combination.
Expand Down
122 changes: 122 additions & 0 deletions maestrowf/datastructures/dag.py
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@

from collections import deque, OrderedDict
import logging
from math import sqrt

from maestrowf.abstracts.graph import Graph

Expand Down Expand Up @@ -248,3 +249,124 @@ def _detect_cycle(self, v, visited, rstack):
rstack.remove(v)
logger.debug("No cycle originating from '%s'", v)
return False

def export_dag_vis(self, dag_basename, draw_opts):
"""
Export hierarchical representation of this study's dag to the list of
formats specified in draw_opts.

:param dag_basename: Basename of output file, in study output path
:param draw_opts: specifies one or more file output formats.
mpl (matplotlib png), mpl-dot (dot layout mpl), dot
(graphviz dot file), graphml (graphml file)

NOTE: must this re-call topological sort for safety?
NOTE: Add optional node annotations/attributes (colors, shape, etc)
NOTE: Add skeleton only format (unexpanded steps)
NOTE: Add partial expansion of dag -> large workflows
NOTE: What about node attributes when here are too many parameters
to enumerate?
"""

logger.debug("Exporting hierarchical representation of dag")

# Put these at the top of file, maybe decorate this function to handle
# the disablement?
try:
import matplotlib.pyplot as plt
import networkx as nx

except ImportError:
logger.exception("Couldn't import graph drawing utilities; "
"disabling graph visualzation.")
return

try:
from networkx import nx_agraph
have_pygv = True

except ImportError:
logger.exception("Error importing pygraphviz: dot "
"layout/output disabled.")

have_pygv = False

dagnx = nx.DiGraph()

nodelist = self.topological_sort()
node_labels = {}
for idx, node in enumerate(nodelist):

if node == '_source':
node_label = 'Study' # Try to get study name instead?
else:
this_step = self.values[node].step
node_label = '{}\n'.format(this_step.base_name)
for var, value in this_step.param_vals.items():
varname = var[2:-1]
node_label += '{}:{}\n'.format(varname, value)

logger.debug("Adding label to node {}: {}".format(node,
node_label))

node_labels[node] = node_label # draw these later
dagnx.add_node(node,
label=node_label)

for node in nodelist:
edges = self.adjacency_table[node]

dagnx.add_edges_from([(node, child) for child in edges])
logger.debug("Node {} has children: {}".format(node, edges))

# Compute node positions for two layouts
# Note: work on something better for sizing/layout than these hacks
# NOTE: check if this longest path computation is expensive
longest_chain = len(nx.algorithms.dag_longest_path(dagnx))
pos_spring = nx.spring_layout(dagnx, k=1/sqrt(longest_chain))

# Convert to pygraphviz agraph for dot layout
if have_pygv:
pos_dot = nx_agraph.pygraphviz_layout(dagnx, prog='dot')
else:
# Fail-safe for matplotlib rendering
pos_dot = pos_spring

for viz_format in draw_opts:

# For matplotlib, have to do extra work to compute image size
if viz_format == "mpl" or viz_format == "mpl-dot":
fig, ax = plt.subplots(figsize=(3*longest_chain,
2*longest_chain))

if viz_format == "mpl" or viz_format == "graphml":
pos = pos_spring
else:
pos = pos_dot

if viz_format == "mpl" or viz_format == "mpl-dot":
# Possible to iteratively compute node size and figure size?
nx.draw_networkx(dagnx,
pos=pos,
ax=ax,
labels=node_labels,
node_size=500)
# May need to render labels separately?
# nx.draw(dagnx, with_labels=False)
# nx.draw_networkx_labels(dagnx,
plt.savefig(dag_basename + '.png', dpi=150)

if viz_format == "dot" and have_pygv:
# Possible to pass networkx/pygraphviz agraph object around
# when imports aren't available?
nx_agraph.write_dot(dagnx, dag_basename + '.dot')

if viz_format == "graphml" or viz_format == "graphml-dot":
# NOTE: find implementation that avoids this copy
graphml_dag = dagnx
# Add positions as node attributes (NEEDS VERIFICATION)
for node, (x, y) in pos.items():
graphml_dag.node[node]['x'] = float(x)
graphml_dag.node[node]['y'] = float(y)

nx.write_graphml(graphml_dag, dag_basename + '.graphml')
Loading