diff --git a/calrissian/dask.py b/calrissian/dask.py new file mode 100644 index 0000000..6d4d0ff --- /dev/null +++ b/calrissian/dask.py @@ -0,0 +1,333 @@ +import os +import logging +from typing import Optional +import yaml + +from cwltool.utils import CWLObjectType + +from calrissian.job import ( + CalrissianCommandLineJob, + KubernetesPodBuilder, +) +from calrissian.job import ( + quoted_arg_list +) +from calrissian.job import ( + DEFAULT_INIT_IMAGE, + INIT_IMAGE_ENV_VARIABLE +) + +log = logging.getLogger("calrissian.dask") +log_main = logging.getLogger("calrissian.main") + +def dask_req_validate(requirement: Optional[CWLObjectType]) -> bool: + if requirement is None: + return False + + required_keys = ["workerCores", + "workerCoresLimit", + "workerMemory", + "clustermaxCore", + "clusterMaxMemory", + "class"] + + return set(requirement.keys()) == set(required_keys) + + +class KubernetesDaskPodBuilder(KubernetesPodBuilder): + + def __init__(self, + name, + container_image, + environment, + volume_mounts, + volumes, + command_line, + stdout, + stderr, + stdin, + resources, + labels, + nodeselectors, + security_context, + serviceaccount, + gateway_url, + requirements=None, + hints=None): + self.name = name + self.container_image = container_image + self.environment = environment + self.volume_mounts = volume_mounts + self.volumes = volumes + self.command_line = command_line + self.stdout = stdout + self.stderr = stderr + self.stdin = stdin + self.resources = resources + self.labels = labels + self.nodeselectors = nodeselectors + self.security_context = security_context + self.serviceaccount = serviceaccount + self.gateway_url = gateway_url + self.requirements = {} if requirements is None else requirements + self.hints = [] if hints is None else hints + + def container_command(self): + super_command = super().container_command() + super_args = super().container_args() + + shell_script = f"""\ +set -e; +trap 'touch /shared/completed' EXIT; +export DASK_CLUSTER=$(cat /shared/dask_cluster_name.txt); +python -m app {super_args[0]} +""" + super_command.append(shell_script) + return super_command + + + def container_environment(self): + environment = [] + for name, value in sorted(self.environment.items()): + environment.append({'name': name, 'value': value}) + + environment.append({'name': 'PYTHONPATH', 'value': '/app'}) + + return environment + + + def init_containers(self): + containers = [] + # get dirname for any actual paths + dirs_to_create = [os.path.dirname(p) for p in [self.stdout, self.stderr] if p] + # Remove empty strings + dirs_to_create = [d for d in dirs_to_create if d] + # Quote if necessary + dirs_to_create = quoted_arg_list(dirs_to_create) + command_list = ['mkdir -p {};'.format(d) for d in dirs_to_create] + if command_list: + containers.append({ + 'name': self.init_container_name(), + 'image': os.environ.get(INIT_IMAGE_ENV_VARIABLE, DEFAULT_INIT_IMAGE), + 'command': ['/bin/sh', '-c', ' '.join(command_list)], + 'workingDir': self.container_workingdir(), + 'volumeMounts': self.volume_mounts, + }) + + dask_requirement = next((elem for elem in self.requirements if elem['class'] == 'https://calrissian-cwl.github.io/schema#DaskGatewayRequirement'), None) + + init_dask_command = [ + 'python', + '/app/init-dask.py', + '--target', + '/shared/dask_cluster_name.txt', + '--gateway-url', + self.gateway_url, + '--image', + str(self.container_image), + '--worker-cores', + str(dask_requirement["workerCores"]), + '--worker-memory', + str(dask_requirement["workerMemory"]), + '--worker-cores-limit', + str(dask_requirement["workerCoresLimit"]), + '--max-cores', + str(dask_requirement["clustermaxCore"]), + '--max-ram', + str(dask_requirement["clusterMaxMemory"]) + ] + + init_dask_cluster = { + 'name': self.init_container_name(), + 'image': str(self.container_image), + 'env': [{'name': 'PYTHONPATH', 'value': '/app'}], + 'command': init_dask_command, + 'workingDir': self.container_workingdir(), + 'volumeMounts': self.volume_mounts + } + + containers.append(init_dask_cluster) + + return containers + + + def build(self): + + sidecar_command = [ + 'python', + '/app/dispose-dask.py', + '--source', + '/shared/dask_cluster_name.txt', + '--gateway-url', + self.gateway_url, + '--signal', + '/shared/completed' + ] + + spec = { + 'metadata': { + 'name': self.pod_name(), + 'labels': self.pod_labels(), + }, + 'apiVersion': 'v1', + 'kind':'Pod', + 'spec': { + 'initContainers': self.init_containers(), + 'containers': [ + { + 'name': 'main-container', + 'image': str(self.container_image), + 'command': self.container_command(), + 'env': self.container_environment(), + 'resources': self.container_resources(), + 'volumeMounts': self.volume_mounts, + 'workingDir': self.container_workingdir(), + }, + { + 'name': 'sidecar-container', + 'image': str(self.container_image), + 'command': sidecar_command, + 'env': self.container_environment(), + 'resources': self.container_resources(), + 'volumeMounts': self.volume_mounts, + 'workingDir': self.container_workingdir(), + } + ], + 'restartPolicy': 'Never', + 'volumes': self.volumes, + 'securityContext': self.security_context, + 'nodeSelector': self.pod_nodeselectors() + } + } + + if ( self.serviceaccount ): + spec['spec']['serviceAccountName'] = self.serviceaccount + + return spec + + +class CalrissianCommandLineDaskJob(CalrissianCommandLineJob): + + container_shared_dir = '/shared' + daskGateway_config_dir = '/etc/dask' + daskGateway_cm_name = 'dask-gateway-cm' + daskGateway_cm = 'dask-gateway-cm' + + def __init__(self, *args, **kwargs): + super(CalrissianCommandLineJob, self).__init__(*args, **kwargs) + super(CalrissianCommandLineDaskJob, self).__init__(*args, **kwargs) + + def wait_for_kubernetes_pod(self): + return self.client.wait_for_dask_completion() + + def get_dask_gateway_url(self, runtimeContext): + return runtimeContext.gateway_url + + def _add_configmap_volume_and_binding(self, name, cm_name, target): + self.volume_builder.add_configmap_volume(name, cm_name) + self.volume_builder.add_configmap_volume_binding(cm_name, target) + + def create_kubernetes_runtime(self, runtimeContext): + # In cwltool, the runtime list starts as something like ['docker','run'] and these various builder methods + # append to that list with docker (or singularity) options like volume mount paths + # As we build up kubernetes, these aren't really used this way so we leave it empty + runtime = [] + + # Append volume for outdir + self._add_volume_binding(os.path.realpath(self.outdir), self.builder.outdir, writable=True) + # Use a kubernetes emptyDir: {} volume for /tmp + # Note that below add_volumes() may result in other temporary files being mounted + # from the calrissian host's tmpdir prefix into an absolute container path, but this will + # not conflict with '/tmp' as an emptyDir + self._add_emptydir_volume_and_binding('tmpdir', self.container_tmpdir) + + # Call the ContainerCommandLineJob add_volumes method + self.add_volumes(self.pathmapper, + runtime, + tmpdir_prefix=runtimeContext.tmpdir_prefix, + secret_store=runtimeContext.secret_store, + any_path_okay=True) + + if self.generatemapper is not None: + # Seems to be true if docker is a hard requirement + # This evaluates to true if docker_is_required is true + # Used only for generatemapper add volumes + any_path_okay = self.builder.get_requirement("DockerRequirement")[1] or False + self.add_volumes( + self.generatemapper, + runtime, + tmpdir_prefix=runtimeContext.tmpdir_prefix, + secret_store=runtimeContext.secret_store, + any_path_okay=any_path_okay) + + + # self.client.create_dask_gateway_cofig_map(gateway_url=self.get_dask_gateway_url(runtimeContext)) + + # emptyDir volume at /shared for sharing the Dask cluster name between containers + self._add_emptydir_volume_and_binding('shared-data', self.container_shared_dir) + + + # Need this ConfigMap to simplify configuration by providing defaults, + # as explained here: https://gateway.dask.org/configuration-user.html + self._add_configmap_volume_and_binding( + name=self.daskGateway_cm, + cm_name=self.daskGateway_cm_name, + target=self.daskGateway_config_dir) + + + k8s_builder = KubernetesDaskPodBuilder( + self.name, + self._get_container_image(), + self.environment, + self.volume_builder.volume_mounts, + self.volume_builder.volumes, + self.quoted_command_line(), + self.stdout, + self.stderr, + self.stdin, + self.builder.resources, + self.get_pod_labels(runtimeContext), + self.get_pod_nodeselectors(runtimeContext), + self.get_security_context(runtimeContext), + self.get_pod_serviceaccount(runtimeContext), + self.get_dask_gateway_url(runtimeContext), + requirements=self.builder.requirements, + hints=self.builder.hints, + ) + + built = k8s_builder.build() + log.debug('{}\n{}{}\n'.format('-' * 80, yaml.dump(built), '-' * 80)) + # Report an error if anything was added to the runtime list + if runtime: + log.error('Runtime list is not empty. k8s does not use that, so you should see who put something there:\n{}'.format(' '.join(runtime))) + return built + + def run(self, runtimeContext, tmpdir_lock=None): + def get_pod_command(pod): + if 'args' in pod['spec']['containers'][0].keys(): + return pod['spec']['containers'][0]['args'] + + def get_pod_name(pod): + return pod['spec']['containers'][0]['name'] + + self.check_requirements(runtimeContext) + + if tmpdir_lock: + with tmpdir_lock: + self.make_tmpdir() + else: + self.make_tmpdir() + self.populate_env_vars(runtimeContext) + + # specific setup for Kubernetes + self.setup_kubernetes(runtimeContext) + + self._setup(runtimeContext) + + pod = self.create_kubernetes_runtime(runtimeContext) # analogous to create_runtime() + self.execute_kubernetes_pod(pod) # analogous to _execute() + completion_result = self.wait_for_kubernetes_pod() + if completion_result.exit_code != 0: + log_main.error(f"ERROR the command below failed in pod {get_pod_name(pod)}:") + log_main.error("\t" + " ".join(get_pod_command(pod))) + self.finish(completion_result, runtimeContext) \ No newline at end of file diff --git a/calrissian/job.py b/calrissian/job.py index b3d3a41..631834f 100644 --- a/calrissian/job.py +++ b/calrissian/job.py @@ -1,5 +1,6 @@ from typing import Dict from cwltool.job import ContainerCommandLineJob, needs_shell_quoting_re + # override cwltool.cuda.cuda_check def _cuda_check(cuda_req, requestCount): return 1 @@ -12,7 +13,6 @@ def _cuda_check(cuda_req, requestCount): from calrissian.k8s import KubernetesClient, CompletionResult from calrissian.report import Reporter, TimedResourceReport from cwltool.builder import Builder -import sys import logging import os import yaml @@ -401,187 +401,6 @@ def build(self): return spec -class KubernetesDaskPodBuilder(KubernetesPodBuilder): - - def __init__(self, - name, - container_image, - environment, - volume_mounts, - volumes, - command_line, - stdout, - stderr, - stdin, - resources, - labels, - nodeselectors, - security_context, - serviceaccount, - gateway_url, - requirements=None, - hints=None): - self.name = name - self.container_image = container_image - self.environment = environment - self.volume_mounts = volume_mounts - self.volumes = volumes - self.command_line = command_line - self.stdout = stdout - self.stderr = stderr - self.stdin = stdin - self.resources = resources - self.labels = labels - self.nodeselectors = nodeselectors - self.security_context = security_context - self.serviceaccount = serviceaccount - self.gateway_url = gateway_url - self.requirements = {} if requirements is None else requirements - self.hints = [] if hints is None else hints - - -# INFO:calrissian.job:ARGS -# ['--pre-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2B_10TFK_20210713_0_L2A --post-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210718_0_L2A'] -# INFO:calrissian.job:['--pre-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2B_10TFK_20210713_0_L2A --post-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210718_0_L2A'] - -# INFO:calrissian.job:COMMAND -# ['/bin/sh', '-c', ['set -e \\', 'trap touch /shared/completed EXIT \\', 'export DASK_CLUSTER=`cat /shared/dask_cluster_name.txt` \\', 'python -m app'], ['--pre-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2B_10TFK_20210713_0_L2A --post-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210718_0_L2A']] -# INFO:calrissian.job:['/bin/sh', '-c', ['set -e \\', 'trap touch /shared/completed EXIT \\', 'export DASK_CLUSTER=`cat /shared/dask_cluster_name.txt` \\', 'python -m app'], ['--pre-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2B_10TFK_20210713_0_L2A --post-event https://earth-search.aws.element84.com/v0/collections/sentinel-s2-l2a-cogs/items/S2A_10TFK_20210718_0_L2A']] - - def container_command(self): - super_command = super().container_command() - super_args = super().container_args() - - shell_script = f"""\ -set -e; -trap 'touch /shared/completed' EXIT; -export DASK_CLUSTER=$(cat /shared/dask_cluster_name.txt); -python -m app {super_args[0]} -""" - super_command.append(shell_script) - return super_command - - - - def container_environment(self): - environment = [] - for name, value in sorted(self.environment.items()): - environment.append({'name': name, 'value': value}) - - environment.append({'name': 'PYTHONPATH', 'value': '/app'}) - - return environment - - def init_containers(self): - containers = [] - # get dirname for any actual paths - dirs_to_create = [os.path.dirname(p) for p in [self.stdout, self.stderr] if p] - # Remove empty strings - dirs_to_create = [d for d in dirs_to_create if d] - # Quote if necessary - dirs_to_create = quoted_arg_list(dirs_to_create) - command_list = ['mkdir -p {};'.format(d) for d in dirs_to_create] - if command_list: - containers.append({ - 'name': self.init_container_name(), - 'image': os.environ.get(INIT_IMAGE_ENV_VARIABLE, DEFAULT_INIT_IMAGE), - 'command': ['/bin/sh', '-c', ' '.join(command_list)], - 'workingDir': self.container_workingdir(), - 'volumeMounts': self.volume_mounts, - }) - - dask_requirement = next((elem for elem in self.requirements if elem['class'] == 'https://calrissian-cwl.github.io/schema#DaskGatewayRequirement'), None) - - init_dask_command = [ - 'python', - '/app/init-dask.py', - '--target', - '/shared/dask_cluster_name.txt', - '--gateway-url', - self.gateway_url, - '--image', - str(self.container_image), - '--worker-cores', - str(dask_requirement["workerCores"]), - '--worker-memory', - str(dask_requirement["workerMemory"]), - '--worker-cores-limit', - str(dask_requirement["workerCoresLimit"]), - '--max-cores', - str(dask_requirement["clustermaxCore"]), - '--max-ram', - str(dask_requirement["clusterMaxMemory"]) - ] - - init_dask_cluster = { - 'name': self.init_container_name(), - 'image': str(self.container_image), - 'env': [{'name': 'PYTHONPATH', 'value': '/app'}], - 'command': init_dask_command, - 'workingDir': self.container_workingdir(), - 'volumeMounts': self.volume_mounts - } - - containers.append(init_dask_cluster) - - return containers - - - def build(self): - - sidecar_command = [ - 'python', - '/app/dispose-dask.py', - '--source', - '/shared/dask_cluster_name.txt', - '--gateway-url', - self.gateway_url, - '--signal', - '/shared/completed' - ] - - spec = { - 'metadata': { - 'name': self.pod_name(), - 'labels': self.pod_labels(), - }, - 'apiVersion': 'v1', - 'kind':'Pod', - 'spec': { - 'initContainers': self.init_containers(), - 'containers': [ - { - 'name': 'main-container', - 'image': str(self.container_image), - 'command': self.container_command(), - 'env': self.container_environment(), - 'resources': self.container_resources(), - 'volumeMounts': self.volume_mounts, - 'workingDir': self.container_workingdir(), - }, - { - 'name': 'sidecar-container', - 'image': str(self.container_image), - 'command': sidecar_command, - 'env': self.container_environment(), - 'resources': self.container_resources(), - 'volumeMounts': self.volume_mounts, - 'workingDir': self.container_workingdir(), - } - ], - 'restartPolicy': 'Never', - 'volumes': self.volumes, - 'securityContext': self.security_context, - 'nodeSelector': self.pod_nodeselectors() - } - } - - if ( self.serviceaccount ): - spec['spec']['serviceAccountName'] = self.serviceaccount - - return spec - - # This now subclasses ContainerCommandLineJob, but only uses two of its methods: # create_file_and_add_volume and add_volumes class CalrissianCommandLineJob(ContainerCommandLineJob): @@ -692,6 +511,8 @@ def check_requirements(self, runtimeContext): if not field in self.supported_features[feature]: raise UnsupportedRequirement('Error: feature {}.{} is not supported'.format(feature, field)) + + def _get_container_image(self): docker_requirement, _ = self.get_requirement('DockerRequirement') if docker_requirement: @@ -737,9 +558,6 @@ def get_pod_env_vars(self, runtimeContext): else: return {} - def get_dask_gateway_url(self, runtimeContext): - return runtimeContext.gateway_url - def create_kubernetes_runtime(self, runtimeContext): # In cwltool, the runtime list starts as something like ['docker','run'] and these various builder methods # append to that list with docker (or singularity) options like volume mount paths @@ -772,60 +590,25 @@ def create_kubernetes_runtime(self, runtimeContext): tmpdir_prefix=runtimeContext.tmpdir_prefix, secret_store=runtimeContext.secret_store, any_path_okay=any_path_okay) - - if self.get_requirement('https://calrissian-cwl.github.io/schema#DaskGatewayRequirement'): - dask_default_conf = '/etc/dask' - shared_data = '/shared' - - # self.client.create_dask_gateway_cofig_map(gateway_url=self.get_dask_gateway_url(runtimeContext)) - - self._add_configmap_volume_and_binding( - name='dask-gateway-cm', - cm_name='dask-gateway-cm', - target=dask_default_conf) - - self._add_emptydir_volume_and_binding('shared-data', shared_data) - - k8s_builder = KubernetesDaskPodBuilder( - self.name, - self._get_container_image(), - self.environment, - self.volume_builder.volume_mounts, - self.volume_builder.volumes, - self.quoted_command_line(), - self.stdout, - self.stderr, - self.stdin, - self.builder.resources, - self.get_pod_labels(runtimeContext), - self.get_pod_nodeselectors(runtimeContext), - self.get_security_context(runtimeContext), - self.get_pod_serviceaccount(runtimeContext), - self.get_dask_gateway_url(runtimeContext), - requirements=self.builder.requirements, - hints=self.builder.hints, - ) - - else: - k8s_builder = KubernetesPodBuilder( - self.name, - self._get_container_image(), - self.environment, - self.volume_builder.volume_mounts, - self.volume_builder.volumes, - self.quoted_command_line(), - self.stdout, - self.stderr, - self.stdin, - self.builder.resources, - self.get_pod_labels(runtimeContext), - self.get_pod_nodeselectors(runtimeContext), - self.get_security_context(runtimeContext), - self.get_pod_serviceaccount(runtimeContext), - self.builder.requirements, - self.builder.hints - ) + k8s_builder = KubernetesPodBuilder( + self.name, + self._get_container_image(), + self.environment, + self.volume_builder.volume_mounts, + self.volume_builder.volumes, + self.quoted_command_line(), + self.stdout, + self.stderr, + self.stdin, + self.builder.resources, + self.get_pod_labels(runtimeContext), + self.get_pod_nodeselectors(runtimeContext), + self.get_security_context(runtimeContext), + self.get_pod_serviceaccount(runtimeContext), + self.builder.requirements, + self.builder.hints + ) built = k8s_builder.build() log.debug('{}\n{}{}\n'.format('-' * 80, yaml.dump(built), '-' * 80)) # Report an error if anything was added to the runtime list @@ -843,10 +626,6 @@ def _add_emptydir_volume_and_binding(self, name, target): def _add_volume_binding(self, source, target, writable=False): self.volume_builder.add_volume_binding(source, target, writable) - def _add_configmap_volume_and_binding(self, name, cm_name, target): - self.volume_builder.add_configmap_volume(name, cm_name) - self.volume_builder.add_configmap_volume_binding(cm_name, target) - # Below are concrete implementations of methods called by add_volumes # They are based on https://github.com/common-workflow-language/cwltool/blob/1.0.20181201184214/cwltool/docker.py # But the key difference is that docker is invoked via command-line, so the ones in docker.py append to @@ -933,8 +712,7 @@ def _required_env(self) -> Dict[str, str]: def run(self, runtimeContext, tmpdir_lock=None): def get_pod_command(pod): - if 'args' in pod['spec']['containers'][0].keys(): - return pod['spec']['containers'][0]['args'] + return pod['spec']['containers'][0]['args'] def get_pod_name(pod): return pod['spec']['containers'][0]['name'] diff --git a/calrissian/k8s.py b/calrissian/k8s.py index 83b1cdd..22d9df3 100644 --- a/calrissian/k8s.py +++ b/calrissian/k8s.py @@ -177,7 +177,7 @@ def follow_container_logs(self, status): log.info('[{}] follow_logs end'.format(pod_name)) @retry_exponential_if_exception_type((ApiException, HTTPError, IncompleteStatusException), log) - def wait_for_completion(self) -> CompletionResult: + def wait_for_dask_completion(self) -> CompletionResult: w = watch.Watch() for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()): pod = event['object'] @@ -221,40 +221,40 @@ def wait_for_completion(self) -> CompletionResult: return self.completion_result - # @retry_exponential_if_exception_type((ApiException, HTTPError, IncompleteStatusException), log) - # def wait_for_completion(self) -> CompletionResult: - # w = watch.Watch() - # for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()): - # pod = event['object'] - # status = self.get_first_or_none(pod.status.container_statuses) - # log.info('pod name {} with id {} has status {}'.format(pod.metadata.name, pod.metadata.uid, status)) - # if status is None: - # continue - # if self.state_is_waiting(status.state): - # continue - # elif self.state_is_running(status.state): - # # Can only get logs once container is running - # self.follow_logs() # This will not return until pod completes - # elif self.state_is_terminated(status.state): - # log.info('Handling terminated pod name {} with id {}'.format(pod.metadata.name, pod.metadata.uid)) - # container = self.get_first_or_none(pod.spec.containers) - # self._handle_completion(status.state, container) - # if self.should_delete_pod(): - # with PodMonitor() as monitor: - # self.delete_pod_name(pod.metadata.name) - # monitor.remove(pod) - # self._clear_pod() - # # stop watching for events, our pod is done. Causes wait loop to exit - # w.stop() - # else: - # raise CalrissianJobException('Unexpected pod container status', status) + @retry_exponential_if_exception_type((ApiException, HTTPError, IncompleteStatusException), log) + def wait_for_completion(self) -> CompletionResult: + w = watch.Watch() + for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()): + pod = event['object'] + status = self.get_first_or_none(pod.status.container_statuses) + log.info('pod name {} with id {} has status {}'.format(pod.metadata.name, pod.metadata.uid, status)) + if status is None: + continue + if self.state_is_waiting(status.state): + continue + elif self.state_is_running(status.state): + # Can only get logs once container is running + self.follow_logs() # This will not return until pod completes + elif self.state_is_terminated(status.state): + log.info('Handling terminated pod name {} with id {}'.format(pod.metadata.name, pod.metadata.uid)) + container = self.get_first_or_none(pod.spec.containers) + self._handle_completion(status.state, container) + if self.should_delete_pod(): + with PodMonitor() as monitor: + self.delete_pod_name(pod.metadata.name) + monitor.remove(pod) + self._clear_pod() + # stop watching for events, our pod is done. Causes wait loop to exit + w.stop() + else: + raise CalrissianJobException('Unexpected pod container status', status) - # # When the pod is done we should have a completion result - # # Otherwise it will lead to further exceptions - # if self.completion_result is None: - # raise IncompleteStatusException + # When the pod is done we should have a completion result + # Otherwise it will lead to further exceptions + if self.completion_result is None: + raise IncompleteStatusException - # return self.completion_result + return self.completion_result def _set_pod(self, pod): log.info('k8s pod \'{}\' started'.format(pod.metadata.name)) @@ -282,12 +282,6 @@ def state_is_terminated(state): @staticmethod def get_list_or_none(container_list: List[Union[V1ContainerStatus, V1Container]]) -> Union[V1ContainerStatus, V1Container]: - """ - Check the list. Should be 0 or 1 items. If 0, there's no container yet. If 1, there's a - container. If > 1, there's more than 1 container and that's unexpected behavior - :param containers_or_container_statuses: list of V1ContainerStatus or V1Container - :return: first item if len of list is 1, None if 0, and raises CalrissianJobException if > 1 - """ if not container_list: # None or empty list return None else: @@ -295,12 +289,6 @@ def get_list_or_none(container_list: List[Union[V1ContainerStatus, V1Container]] @staticmethod def get_last_or_none(container_list: List[Union[V1ContainerStatus, V1Container]]) -> Union[V1ContainerStatus, V1Container]: - """ - Check the list. Should be 0 or 1 items. If 0, there's no container yet. If 1, there's a - container. If > 1, there's more than 1 container and that's unexpected behavior - :param containers_or_container_statuses: list of V1ContainerStatus or V1Container - :return: first item if len of list is 1, None if 0, and raises CalrissianJobException if > 1 - """ if not container_list: # None or empty list return None else: diff --git a/calrissian/main.py b/calrissian/main.py index e6771b7..1e9bd85 100644 --- a/calrissian/main.py +++ b/calrissian/main.py @@ -53,7 +53,7 @@ def add_arguments(parser): parser.add_argument('--stderr', type=Text, nargs='?', help='Output file name to tee standard error to (includes tool logs)') parser.add_argument('--tool-logs-basepath', type=Text, nargs='?', help='Base path for saving the tool logs') parser.add_argument('--conf', help='Defines the default values for the CLI arguments', action='append') - parser.add_argument('--gateway-url', type=Text, nargs='?', help='Defines the Dask Gateway URL') + parser.add_argument('--gateway-url', type=Text, nargs='?', help='Defines the Dask Gateway URL', required=False) def print_version(): diff --git a/calrissian/tool.py b/calrissian/tool.py index e20704d..c9721c2 100644 --- a/calrissian/tool.py +++ b/calrissian/tool.py @@ -1,5 +1,6 @@ from cwltool.command_line_tool import CommandLineTool from cwltool.workflow import default_make_tool +from calrissian.dask import CalrissianCommandLineDaskJob, dask_req_validate from calrissian.job import CalrissianCommandLineJob import logging @@ -36,9 +37,7 @@ def make_job_runner(self, runtimeContext): if not runtimeContext.use_container: raise CalrissianCommandLineToolException('Unable to create a CalrissianCommandLineTool - use_container is disabled') docker_requirement, _ = self.get_requirement('DockerRequirement') - log.info(f"Hints: {self.hints}") - dask_requirement, _ = self.get_requirement('https://calrissian-cwl.github.io/schema#DaskGatewayRequirement') - log.info(f"DaskGatewayRequirement: {dask_requirement}") + if not docker_requirement: # no docker requirement specified, inject one default_container = runtimeContext.find_default_container(self) @@ -50,6 +49,11 @@ def make_job_runner(self, runtimeContext): 'dockerPull': default_container }) + dask_requirement, _ = self.get_requirement('https://calrissian-cwl.github.io/schema#DaskGatewayRequirement') + + if dask_req_validate(dask_requirement): + return CalrissianCommandLineDaskJob + return CalrissianCommandLineJob diff --git a/tests/test_main.py b/tests/test_main.py index e925f8d..209d7f6 100644 --- a/tests/test_main.py +++ b/tests/test_main.py @@ -63,7 +63,7 @@ def test_main_calls_cwlmain_returns_exit_code(self, mock_activate_logging, mock_ def test_add_arguments(self): mock_parser = Mock() add_arguments(mock_parser) - self.assertEqual(mock_parser.add_argument.call_count, 12) + self.assertEqual(mock_parser.add_argument.call_count, 13) @patch('calrissian.main.sys') def test_parse_arguments_exits_without_ram_or_cores(self, mock_sys):