From 69acd6911434f051fbd2f4ff3489ccf10d81ca81 Mon Sep 17 00:00:00 2001 From: Clearfield Christopher Date: Tue, 4 Aug 2015 13:50:52 -0700 Subject: [PATCH 1/7] Working on addiing extend to all sections --- starcluster/config.py | 10 ++++++++ starcluster/plugins/ipcluster.py | 35 +++++++++++++++++++-------- starcluster/static.py | 2 ++ starcluster/tests/templates/config.py | 5 ++++ 4 files changed, 42 insertions(+), 10 deletions(-) diff --git a/starcluster/config.py b/starcluster/config.py index e15c866e3..c4fae1492 100644 --- a/starcluster/config.py +++ b/starcluster/config.py @@ -340,16 +340,20 @@ def _load_extends_settings(self, section_name, store): if DEBUG_CONFIG: log.debug('%s extends %s' % (section_name, extends)) extensions = [section] + print "Extensions1 ", extensions while extends is not None: try: section = store[extends] if section in extensions: + print "Extensions2 ", section exts = ', '.join([self._get_section_name(x['__name__']) for x in extensions]) raise exception.ConfigError( "Cyclical dependency between sections %s. " "Check your EXTENDS settings." % exts) + print "Section: ", section extensions.insert(0, section) + print "Extensions2", extensions except KeyError: raise exception.ConfigError( "%s can't extend non-existent section %s" % @@ -358,6 +362,7 @@ def _load_extends_settings(self, section_name, store): transform = AttributeDict() for extension in extensions: transform.update(extension) + print "Transform", transform store[section_name] = transform def _load_keypairs(self, store): @@ -531,6 +536,11 @@ def _load_sections(self, section_prefix, section_settings, sections_store = AttributeDict() for sec in sections: name = self._get_section_name(sec) + sections_store[name] = AttributeDict() + self._load_settings(sec, section_settings, sections_store[name]) + for sec in sections: + name = self._get_section_name(sec) + self._load_extends_settings(name, sections_store) sections_store[name] = self._load_section(sec, section_settings, filter_settings) return sections_store diff --git a/starcluster/plugins/ipcluster.py b/starcluster/plugins/ipcluster.py index 1939a76a2..3f4dbdf01 100644 --- a/starcluster/plugins/ipcluster.py +++ b/starcluster/plugins/ipcluster.py @@ -71,7 +71,7 @@ def _start_engines(node, user, n_engines=None, kill_existing=False): n_engines = node.num_processors node.ssh.switch_user(user) if kill_existing: - node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True) + node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True) node.ssh.execute("ipcluster engines --n=%i --daemonize" % n_engines) node.ssh.switch_user('root') @@ -91,7 +91,7 @@ class IPCluster(DefaultClusterSetup): """ def __init__(self, enable_notebook=False, notebook_passwd=None, - notebook_directory=None, packer=None, log_level='INFO'): + notebook_directory=None, packer=None, master_engines=None, node_engines=None, log_level='INFO'): super(IPCluster, self).__init__() if isinstance(enable_notebook, basestring): self.enable_notebook = enable_notebook.lower().strip() == 'true' @@ -100,6 +100,8 @@ def __init__(self, enable_notebook=False, notebook_passwd=None, self.notebook_passwd = notebook_passwd or utils.generate_passwd(16) self.notebook_directory = notebook_directory self.log_level = log_level + self.master_engines = master_engines + self.node_engines = node_engines if packer not in (None, 'json', 'pickle', 'msgpack'): log.error("Unsupported packer: %s", packer) self.packer = None @@ -163,7 +165,10 @@ def _write_config(self, master, user, profile_dir): f.close() def _start_cluster(self, master, profile_dir): - n_engines = max(1, master.num_processors - 1) + if not self.master_engines: + n_engines = max(1, master.num_processors - 1) + else: + n_engines = int(self.master_engines) log.info("Starting the IPython controller and %i engines on master" % n_engines) # cleanup existing connection files, to prevent their use @@ -215,7 +220,7 @@ def _start_cluster(self, master, profile_dir): self._authorize_port(master, (1000, 65535), "IPython controller") return local_json, n_engines - def _start_notebook(self, master, user, profile_dir): + def _start_notebook(self, master, user, profile_dir, time_to_dead=30.0): log.info("Setting up IPython web notebook for user: %s" % user) user_cert = posixpath.join(profile_dir, '%s.pem' % user) ssl_cert = posixpath.join(profile_dir, '%s.pem' % user) @@ -242,6 +247,7 @@ def _start_notebook(self, master, user, profile_dir): "c.NotebookApp.open_browser = False", "c.NotebookApp.password = u'%s'" % sha1pass, "c.NotebookApp.port = %d" % notebook_port, + "c.NotebookApp.time_to_dead = %d" % time_to_dead, ])) f.close() if self.notebook_directory is not None: @@ -289,10 +295,15 @@ def run(self, nodes, master, user, user_shell, volumes): # Start engines on each of the non-master nodes non_master_nodes = [node for node in nodes if not node.is_master()] for node in non_master_nodes: + if not self.node_engines: + n_engines = node.num_processors + else: + n_engines = int(self.node_engines) self.pool.simple_job( - _start_engines, (node, user, node.num_processors), + _start_engines, (node, user, n_engines), jobid=node.alias) - n_engines_non_master = sum(node.num_processors + n_engines_non_master = 0 + n_engines_non_master += sum(node.num_processors if not self.node_engines else int(self.node_engines) for node in non_master_nodes) if len(non_master_nodes) > 0: log.info("Adding %d engines on %d nodes", @@ -310,7 +321,10 @@ def run(self, nodes, master, user, user_shell, volumes): def on_add_node(self, node, nodes, master, user, user_shell, volumes): self._check_ipython_installed(node) - n_engines = node.num_processors + if not self.node_engines: + n_engines = node.num_processors + else: + n_engines = self.node_engines log.info("Adding %d engines on %s", n_engines, node.alias) _start_engines(node, user) @@ -332,7 +346,7 @@ def run(self, nodes, master, user, user_shell, volumes): master.ssh.execute("ipcluster stop", ignore_exit_status=True) time.sleep(2) log.info("Stopping IPython controller on %s", master.alias) - master.ssh.execute("pkill -f ipcontrollerapp", + master.ssh.execute("pkill -f IPython.parallel.controller", ignore_exit_status=True) master.ssh.execute("pkill -f 'ipython notebook'", ignore_exit_status=True) @@ -344,7 +358,7 @@ def run(self, nodes, master, user, user_shell, volumes): def _stop_engines(self, node, user): node.ssh.switch_user(user) - node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True) + node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True) node.ssh.switch_user('root') def on_add_node(self, node, nodes, master, user, user_shell, volumes): @@ -364,8 +378,9 @@ class IPClusterRestartEngines(DefaultClusterSetup): This plugin is meant to be run manually with: starcluster runplugin plugin_conf_name cluster_name - """ + # XXX TODO have RestartEngines respect the + # number of nodes we've enforced above def run(self, nodes, master, user, user_shell, volumes): n_total = 0 for node in nodes: diff --git a/starcluster/static.py b/starcluster/static.py index 72bb4e896..2d4513477 100644 --- a/starcluster/static.py +++ b/starcluster/static.py @@ -271,6 +271,7 @@ def create_sc_config_dirs(): PLUGIN_SETTINGS = { 'setup_class': (str, True, None, None, None), + 'extends': (str, False, None, None, None), } PERMISSION_SETTINGS = { @@ -287,6 +288,7 @@ def create_sc_config_dirs(): # 'source_group_owner': (int, False, None), } +# func, required, default, options, callback CLUSTER_SETTINGS = { 'spot_bid': (float, False, None, None, None), 'cluster_size': (int, True, None, None, None), diff --git a/starcluster/tests/templates/config.py b/starcluster/tests/templates/config.py index 33b29852f..417f787e8 100644 --- a/starcluster/tests/templates/config.py +++ b/starcluster/tests/templates/config.py @@ -52,6 +52,7 @@ 'p3_param1': 'bon', 'p3_param2': 'jour', 'p3_param3': 'monsignour', + 'p4_param1': 'oui', 's1_protocol': 'udp', 's1_from_port': 20, 's1_to_port': 20, @@ -150,6 +151,10 @@ MY_OTHER_ARG = %(p3_param2)s MY_OTHER_OTHER_ARG = %(p3_param3)s +[plugin p4] +EXTENDS = p3 +MY_OTHER_OTHER_ARG = %(p4_param1)s + [permission s1] protocol = %(s1_protocol)s from_port = %(s1_from_port)s From 10e2246b585784a249da5f14570b055cf15770e3 Mon Sep 17 00:00:00 2001 From: Clearfield Christopher Date: Tue, 4 Aug 2015 14:06:32 -0700 Subject: [PATCH 2/7] Allowing arbitrary sections to inherit properties --- starcluster/config.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/starcluster/config.py b/starcluster/config.py index c4fae1492..3f70aceb2 100644 --- a/starcluster/config.py +++ b/starcluster/config.py @@ -360,7 +360,9 @@ def _load_extends_settings(self, section_name, store): (section_name, extends)) extends = section.get('extends') transform = AttributeDict() + print "Extensions3", extensions for extension in extensions: + print "Ext: ", extension transform.update(extension) print "Transform", transform store[section_name] = transform @@ -537,12 +539,14 @@ def _load_sections(self, section_prefix, section_settings, for sec in sections: name = self._get_section_name(sec) sections_store[name] = AttributeDict() - self._load_settings(sec, section_settings, sections_store[name]) + self._load_settings(sec, section_settings, sections_store[name], filter_settings) for sec in sections: name = self._get_section_name(sec) self._load_extends_settings(name, sections_store) - sections_store[name] = self._load_section(sec, section_settings, - filter_settings) + self._load_defaults(section_settings, sections_store[name]) + self._check_required(name, section_settings, sections_store[name]) + #sections_store[name] = self._load_section(sec, section_settings, + #filter_settings) return sections_store def _load_cluster_sections(self, cluster_sections): From d19cdf5f9f3517af585d64e40af5dcd1e25d2346 Mon Sep 17 00:00:00 2001 From: Clearfield Christopher Date: Tue, 4 Aug 2015 22:43:17 -0700 Subject: [PATCH 3/7] Generalizing EXTENDS to other prefixes (e.g., plugin), include test for plugin --- starcluster/config.py | 9 --------- starcluster/tests/templates/config.py | 2 +- starcluster/tests/test_config.py | 12 ++++++++++-- 3 files changed, 11 insertions(+), 12 deletions(-) diff --git a/starcluster/config.py b/starcluster/config.py index 3f70aceb2..67504ea99 100644 --- a/starcluster/config.py +++ b/starcluster/config.py @@ -340,31 +340,24 @@ def _load_extends_settings(self, section_name, store): if DEBUG_CONFIG: log.debug('%s extends %s' % (section_name, extends)) extensions = [section] - print "Extensions1 ", extensions while extends is not None: try: section = store[extends] if section in extensions: - print "Extensions2 ", section exts = ', '.join([self._get_section_name(x['__name__']) for x in extensions]) raise exception.ConfigError( "Cyclical dependency between sections %s. " "Check your EXTENDS settings." % exts) - print "Section: ", section extensions.insert(0, section) - print "Extensions2", extensions except KeyError: raise exception.ConfigError( "%s can't extend non-existent section %s" % (section_name, extends)) extends = section.get('extends') transform = AttributeDict() - print "Extensions3", extensions for extension in extensions: - print "Ext: ", extension transform.update(extension) - print "Transform", transform store[section_name] = transform def _load_keypairs(self, store): @@ -545,8 +538,6 @@ def _load_sections(self, section_prefix, section_settings, self._load_extends_settings(name, sections_store) self._load_defaults(section_settings, sections_store[name]) self._check_required(name, section_settings, sections_store[name]) - #sections_store[name] = self._load_section(sec, section_settings, - #filter_settings) return sections_store def _load_cluster_sections(self, cluster_sections): diff --git a/starcluster/tests/templates/config.py b/starcluster/tests/templates/config.py index 417f787e8..1aaec6dae 100644 --- a/starcluster/tests/templates/config.py +++ b/starcluster/tests/templates/config.py @@ -72,7 +72,7 @@ 'c1_master_type': 'm1.small', 'c1_node_type': 'm1.small', 'c1_vols': 'v1,v2,v3', - 'c1_plugs': 'p1,p2,p3', + 'c1_plugs': 'p1,p2,p3,p4', 'c1_zone': 'us-east-1c', 'c2_extends': 'c1', 'c2_keyname': 'k2', diff --git a/starcluster/tests/test_config.py b/starcluster/tests/test_config.py index 784d9c8e8..6069ffbd2 100644 --- a/starcluster/tests/test_config.py +++ b/starcluster/tests/test_config.py @@ -169,15 +169,17 @@ def test_order_invariance(self): def test_plugins(self): c1 = self.config.get_cluster_template('c1') plugs = c1.plugins - assert len(plugs) == 3 + assert len(plugs) == 4 # test that order is preserved - p1, p2, p3 = plugs + p1, p2, p3, p4 = plugs p1_name = p1.__name__ p1_class = utils.get_fq_class_name(p1) p2_name = p2.__name__ p2_class = utils.get_fq_class_name(p2) p3_name = p3.__name__ p3_class = utils.get_fq_class_name(p3) + p4_name = p4.__name__ + p4_class = utils.get_fq_class_name(p4) assert p1_name == 'p1' assert p1_class == 'starcluster.tests.mytestplugin.SetupClass' assert p1.my_arg == '23' @@ -193,6 +195,12 @@ def test_plugins(self): assert p3.my_arg == 'bon' assert p3.my_other_arg == 'jour' assert p3.my_other_other_arg == 'monsignour' + # Test that p4 inherits from p3 + assert p4_class == setup_class3 + assert p4.my_arg == 'bon' + assert p4.my_other_arg == 'jour' + # And test that the p4 argument overides the p3 default + assert p4.my_other_other_arg == 'oui' def test_plugin_not_defined(self): try: From 40050078f7c2f4c4cd235fc2fca4dc649f699863 Mon Sep 17 00:00:00 2001 From: Clearfield Christopher Date: Tue, 4 Aug 2015 22:43:58 -0700 Subject: [PATCH 4/7] Allowing for MASTER_ENGINES and NODE_ENGINES as a parameter --- starcluster/plugins/ipcluster.py | 16 ++++++++++------ 1 file changed, 10 insertions(+), 6 deletions(-) diff --git a/starcluster/plugins/ipcluster.py b/starcluster/plugins/ipcluster.py index 3f4dbdf01..2d607a0b6 100644 --- a/starcluster/plugins/ipcluster.py +++ b/starcluster/plugins/ipcluster.py @@ -368,7 +368,7 @@ def on_remove_node(self, node, nodes, master, user, user_shell, volumes): raise NotImplementedError("on_remove_node method not implemented") -class IPClusterRestartEngines(DefaultClusterSetup): +class IPClusterRestartEngines(IPCluster): """Plugin to kill and restart all engines of an IPython cluster This plugin can be useful to hard-reset the all the engines, for instance @@ -379,14 +379,18 @@ class IPClusterRestartEngines(DefaultClusterSetup): starcluster runplugin plugin_conf_name cluster_name """ - # XXX TODO have RestartEngines respect the - # number of nodes we've enforced above def run(self, nodes, master, user, user_shell, volumes): n_total = 0 for node in nodes: - n_engines = node.num_processors - if node.is_master() and n_engines > 2: - n_engines -= 1 + if node.is_master() and self.master_engines: + n_engines = int(self.master_engines) + elif self.node_engines: + n_engines = int(self.node_engines) + elif node.is_master(): + # and n_engines > 2: # XXX I'm not sure I understand this logic yet. + n_engines = node.num_processors - 1 + else: + n_engines = node.num_processors self.pool.simple_job( _start_engines, (node, user, n_engines, True), jobid=node.alias) From 0332e9af0ecdd390c2f02d34ff3809d635ebf0c5 Mon Sep 17 00:00:00 2001 From: Clearfield Christopher Date: Tue, 4 Aug 2015 22:44:40 -0700 Subject: [PATCH 5/7] Adding example of MASTER_ENGINES and NODE_ENGINES and EXTENDS for plugins to config --- starcluster/templates/config.py | 12 ++++++++++++ 1 file changed, 12 insertions(+) diff --git a/starcluster/templates/config.py b/starcluster/templates/config.py index 885d741c3..772ac0b99 100644 --- a/starcluster/templates/config.py +++ b/starcluster/templates/config.py @@ -317,6 +317,18 @@ # # Set a custom packer. Must be one of 'json', 'pickle', or 'msgpack' # # This is optional. # PACKER = pickle +# # MASTER_ENGINES = 0 # Avoid calculations on master if you are worried about memory/resources +# +# # [plugin ipcluster-joblib] +# # EXTENDS = ipcluster +# # NODE_ENGINES = 1 +# +# # [plugin ipclusterrestart] +# # SETUP_CLASS = starcluster.plugins.ipcluster.IPClusterRestartEngines +# +# # [plugin ipclusterrestart-joblib] +# # SETUP_CLASS = starcluster.plugins.ipcluster.IPClusterRestartEngines +# # EXTENDS = ipcluster-joblib # # Use this plugin to create a cluster SSH "dashboard" using tmux. The plugin # creates a tmux session on the master node that automatically connects to all From 93651e9d338494a62e0749d4eddfefdb1ad6b4e2 Mon Sep 17 00:00:00 2001 From: Clearfield Christopher Date: Tue, 4 Aug 2015 23:03:01 -0700 Subject: [PATCH 6/7] Adding check for p4 name in testing --- starcluster/tests/test_config.py | 1 + 1 file changed, 1 insertion(+) diff --git a/starcluster/tests/test_config.py b/starcluster/tests/test_config.py index 6069ffbd2..192e7346e 100644 --- a/starcluster/tests/test_config.py +++ b/starcluster/tests/test_config.py @@ -196,6 +196,7 @@ def test_plugins(self): assert p3.my_other_arg == 'jour' assert p3.my_other_other_arg == 'monsignour' # Test that p4 inherits from p3 + assert p4_name == 'p4' assert p4_class == setup_class3 assert p4.my_arg == 'bon' assert p4.my_other_arg == 'jour' From 5336ff32479cdf4c1593746170829b12ab058185 Mon Sep 17 00:00:00 2001 From: Clearfield Christopher Date: Wed, 5 Aug 2015 13:39:15 -0700 Subject: [PATCH 7/7] Unrolling ipplugin fixes --- starcluster/plugins/ipcluster.py | 47 ++++++++++---------------------- 1 file changed, 14 insertions(+), 33 deletions(-) diff --git a/starcluster/plugins/ipcluster.py b/starcluster/plugins/ipcluster.py index 2d607a0b6..1939a76a2 100644 --- a/starcluster/plugins/ipcluster.py +++ b/starcluster/plugins/ipcluster.py @@ -71,7 +71,7 @@ def _start_engines(node, user, n_engines=None, kill_existing=False): n_engines = node.num_processors node.ssh.switch_user(user) if kill_existing: - node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True) + node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True) node.ssh.execute("ipcluster engines --n=%i --daemonize" % n_engines) node.ssh.switch_user('root') @@ -91,7 +91,7 @@ class IPCluster(DefaultClusterSetup): """ def __init__(self, enable_notebook=False, notebook_passwd=None, - notebook_directory=None, packer=None, master_engines=None, node_engines=None, log_level='INFO'): + notebook_directory=None, packer=None, log_level='INFO'): super(IPCluster, self).__init__() if isinstance(enable_notebook, basestring): self.enable_notebook = enable_notebook.lower().strip() == 'true' @@ -100,8 +100,6 @@ def __init__(self, enable_notebook=False, notebook_passwd=None, self.notebook_passwd = notebook_passwd or utils.generate_passwd(16) self.notebook_directory = notebook_directory self.log_level = log_level - self.master_engines = master_engines - self.node_engines = node_engines if packer not in (None, 'json', 'pickle', 'msgpack'): log.error("Unsupported packer: %s", packer) self.packer = None @@ -165,10 +163,7 @@ def _write_config(self, master, user, profile_dir): f.close() def _start_cluster(self, master, profile_dir): - if not self.master_engines: - n_engines = max(1, master.num_processors - 1) - else: - n_engines = int(self.master_engines) + n_engines = max(1, master.num_processors - 1) log.info("Starting the IPython controller and %i engines on master" % n_engines) # cleanup existing connection files, to prevent their use @@ -220,7 +215,7 @@ def _start_cluster(self, master, profile_dir): self._authorize_port(master, (1000, 65535), "IPython controller") return local_json, n_engines - def _start_notebook(self, master, user, profile_dir, time_to_dead=30.0): + def _start_notebook(self, master, user, profile_dir): log.info("Setting up IPython web notebook for user: %s" % user) user_cert = posixpath.join(profile_dir, '%s.pem' % user) ssl_cert = posixpath.join(profile_dir, '%s.pem' % user) @@ -247,7 +242,6 @@ def _start_notebook(self, master, user, profile_dir, time_to_dead=30.0): "c.NotebookApp.open_browser = False", "c.NotebookApp.password = u'%s'" % sha1pass, "c.NotebookApp.port = %d" % notebook_port, - "c.NotebookApp.time_to_dead = %d" % time_to_dead, ])) f.close() if self.notebook_directory is not None: @@ -295,15 +289,10 @@ def run(self, nodes, master, user, user_shell, volumes): # Start engines on each of the non-master nodes non_master_nodes = [node for node in nodes if not node.is_master()] for node in non_master_nodes: - if not self.node_engines: - n_engines = node.num_processors - else: - n_engines = int(self.node_engines) self.pool.simple_job( - _start_engines, (node, user, n_engines), + _start_engines, (node, user, node.num_processors), jobid=node.alias) - n_engines_non_master = 0 - n_engines_non_master += sum(node.num_processors if not self.node_engines else int(self.node_engines) + n_engines_non_master = sum(node.num_processors for node in non_master_nodes) if len(non_master_nodes) > 0: log.info("Adding %d engines on %d nodes", @@ -321,10 +310,7 @@ def run(self, nodes, master, user, user_shell, volumes): def on_add_node(self, node, nodes, master, user, user_shell, volumes): self._check_ipython_installed(node) - if not self.node_engines: - n_engines = node.num_processors - else: - n_engines = self.node_engines + n_engines = node.num_processors log.info("Adding %d engines on %s", n_engines, node.alias) _start_engines(node, user) @@ -346,7 +332,7 @@ def run(self, nodes, master, user, user_shell, volumes): master.ssh.execute("ipcluster stop", ignore_exit_status=True) time.sleep(2) log.info("Stopping IPython controller on %s", master.alias) - master.ssh.execute("pkill -f IPython.parallel.controller", + master.ssh.execute("pkill -f ipcontrollerapp", ignore_exit_status=True) master.ssh.execute("pkill -f 'ipython notebook'", ignore_exit_status=True) @@ -358,7 +344,7 @@ def run(self, nodes, master, user, user_shell, volumes): def _stop_engines(self, node, user): node.ssh.switch_user(user) - node.ssh.execute("pkill -f IPython.parallel.engine", ignore_exit_status=True) + node.ssh.execute("pkill -f ipengineapp", ignore_exit_status=True) node.ssh.switch_user('root') def on_add_node(self, node, nodes, master, user, user_shell, volumes): @@ -368,7 +354,7 @@ def on_remove_node(self, node, nodes, master, user, user_shell, volumes): raise NotImplementedError("on_remove_node method not implemented") -class IPClusterRestartEngines(IPCluster): +class IPClusterRestartEngines(DefaultClusterSetup): """Plugin to kill and restart all engines of an IPython cluster This plugin can be useful to hard-reset the all the engines, for instance @@ -378,19 +364,14 @@ class IPClusterRestartEngines(IPCluster): This plugin is meant to be run manually with: starcluster runplugin plugin_conf_name cluster_name + """ def run(self, nodes, master, user, user_shell, volumes): n_total = 0 for node in nodes: - if node.is_master() and self.master_engines: - n_engines = int(self.master_engines) - elif self.node_engines: - n_engines = int(self.node_engines) - elif node.is_master(): - # and n_engines > 2: # XXX I'm not sure I understand this logic yet. - n_engines = node.num_processors - 1 - else: - n_engines = node.num_processors + n_engines = node.num_processors + if node.is_master() and n_engines > 2: + n_engines -= 1 self.pool.simple_job( _start_engines, (node, user, n_engines, True), jobid=node.alias)