Skip to content

Commit

Permalink
DaskGatewayRequirement fully implemented - Begin code cleaning and re…
Browse files Browse the repository at this point in the history
…factoring
  • Loading branch information
lmizzoni committed Jan 24, 2025
1 parent 4911e8f commit befe99a
Show file tree
Hide file tree
Showing 2 changed files with 114 additions and 25 deletions.
21 changes: 10 additions & 11 deletions calrissian/job.py
Original file line number Diff line number Diff line change
Expand Up @@ -512,8 +512,6 @@ def init_containers(self):
'--max-ram',
str(dask_requirement["clusterMaxMemory"])
]

log.info(init_dask_command)

init_dask_cluster = {
'name': self.init_container_name(),
Expand Down Expand Up @@ -560,15 +558,16 @@ def build(self):
'resources': self.container_resources(),
'volumeMounts': self.volume_mounts,
'workingDir': self.container_workingdir(),
}
# {
# 'name': 'sidecar-container',
# 'image': str(self.container_image),
# 'command': sidecar_command,
# 'env': self.container_environment(),
# 'volumeMounts': self.volume_mounts,
# 'workingDir': self.container_workingdir(),
# }
},
{
'name': 'sidecar-container',
'image': str(self.container_image),
'command': sidecar_command,
'env': self.container_environment(),
'resources': self.container_resources(),
'volumeMounts': self.volume_mounts,
'workingDir': self.container_workingdir(),
}
],
'restartPolicy': 'Never',
'volumes': self.volumes,
Expand Down
118 changes: 104 additions & 14 deletions calrissian/k8s.py
Original file line number Diff line number Diff line change
Expand Up @@ -156,25 +156,54 @@ def follow_logs(self):

log.info('[{}] follow_logs end'.format(pod_name))

@retry_exponential_if_exception_type((ApiException, HTTPError,), log)
def follow_container_logs(self, status):
pod_name = self.pod.metadata.name

log.info('[{}] follow_logs start'.format(pod_name))
for line in self.core_api_instance.read_namespaced_pod_log(self.pod.metadata.name, self.namespace, follow=True,
_preload_content=False, container=status.name).stream():
# .stream() is only available if _preload_content=False
# .stream() returns a generator, each iteration yields bytes.
# kubernetes-client decodes them as utf-8 when _preload_content is True
# https://github.com/kubernetes-client/python/blob/fcda6fe96beb21cd05522c17f7f08c5a7c0e3dc3/kubernetes/client/rest.py#L215-L216
# So we do the same here
if not status.state.running:
break
line = line.decode('utf-8', errors="ignore").rstrip()
log.debug('[{}] {}'.format(pod_name, line))
self.tool_log.append(self.format_log_entry(pod_name, line))

log.info('[{}] follow_logs end'.format(pod_name))

@retry_exponential_if_exception_type((ApiException, HTTPError, IncompleteStatusException), log)
def wait_for_completion(self) -> CompletionResult:
w = watch.Watch()
for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()):
pod = event['object']
status = self.get_first_or_none(pod.status.container_statuses)
log.info('pod name {} with id {} has status {}'.format(pod.metadata.name, pod.metadata.uid, status))
if status is None:
continue
if self.state_is_waiting(status.state):
continue
elif self.state_is_running(status.state):
# Can only get logs once container is running
self.follow_logs() # This will not return until pod completes
elif self.state_is_terminated(status.state):
# status = self.get_first_or_none(pod.status.container_statuses)
last_status = self.get_last_or_none(pod.status.container_statuses)
if last_status == None or not self.state_is_terminated(last_status.state):
statuses = self.get_list_or_none(pod.status.container_statuses)
if statuses == None:
continue
for status in statuses:
log.info('pod name {} with id {} has status {}'.format(pod.metadata.name, pod.metadata.uid, status))
if status is None:
continue
if self.state_is_waiting(status.state):
continue
elif self.state_is_running(status.state):
# Can only get logs once container is running
self.follow_container_logs(status) # This will not return until container completes
elif self.state_is_terminated(status.state):
continue
else:
raise CalrissianJobException('Unexpected pod container status', status)
elif self.state_is_terminated(last_status.state):
log.info('Handling terminated pod name {} with id {}'.format(pod.metadata.name, pod.metadata.uid))
container = self.get_first_or_none(pod.spec.containers)
self._handle_completion(status.state, container)
container = self.get_last_or_none(pod.spec.containers)
self._handle_completion(last_status.state, container)
if self.should_delete_pod():
with PodMonitor() as monitor:
self.delete_pod_name(pod.metadata.name)
Expand All @@ -183,14 +212,49 @@ def wait_for_completion(self) -> CompletionResult:
# stop watching for events, our pod is done. Causes wait loop to exit
w.stop()
else:
raise CalrissianJobException('Unexpected pod container status', status)
raise CalrissianJobException('Unexpected pod container status', last_status)

# When the pod is done we should have a completion result
# Otherwise it will lead to further exceptions
if self.completion_result is None:
raise IncompleteStatusException

return self.completion_result

# @retry_exponential_if_exception_type((ApiException, HTTPError, IncompleteStatusException), log)
# def wait_for_completion(self) -> CompletionResult:
# w = watch.Watch()
# for event in w.stream(self.core_api_instance.list_namespaced_pod, self.namespace, field_selector=self._get_pod_field_selector()):
# pod = event['object']
# status = self.get_first_or_none(pod.status.container_statuses)
# log.info('pod name {} with id {} has status {}'.format(pod.metadata.name, pod.metadata.uid, status))
# if status is None:
# continue
# if self.state_is_waiting(status.state):
# continue
# elif self.state_is_running(status.state):
# # Can only get logs once container is running
# self.follow_logs() # This will not return until pod completes
# elif self.state_is_terminated(status.state):
# log.info('Handling terminated pod name {} with id {}'.format(pod.metadata.name, pod.metadata.uid))
# container = self.get_first_or_none(pod.spec.containers)
# self._handle_completion(status.state, container)
# if self.should_delete_pod():
# with PodMonitor() as monitor:
# self.delete_pod_name(pod.metadata.name)
# monitor.remove(pod)
# self._clear_pod()
# # stop watching for events, our pod is done. Causes wait loop to exit
# w.stop()
# else:
# raise CalrissianJobException('Unexpected pod container status', status)

# # When the pod is done we should have a completion result
# # Otherwise it will lead to further exceptions
# if self.completion_result is None:
# raise IncompleteStatusException

# return self.completion_result

def _set_pod(self, pod):
log.info('k8s pod \'{}\' started'.format(pod.metadata.name))
Expand All @@ -216,6 +280,32 @@ def state_is_waiting(state):
def state_is_terminated(state):
return state.terminated

@staticmethod
def get_list_or_none(container_list: List[Union[V1ContainerStatus, V1Container]]) -> Union[V1ContainerStatus, V1Container]:
"""
Check the list. Should be 0 or 1 items. If 0, there's no container yet. If 1, there's a
container. If > 1, there's more than 1 container and that's unexpected behavior
:param containers_or_container_statuses: list of V1ContainerStatus or V1Container
:return: first item if len of list is 1, None if 0, and raises CalrissianJobException if > 1
"""
if not container_list: # None or empty list
return None
else:
return list(container_list)

@staticmethod
def get_last_or_none(container_list: List[Union[V1ContainerStatus, V1Container]]) -> Union[V1ContainerStatus, V1Container]:
"""
Check the list. Should be 0 or 1 items. If 0, there's no container yet. If 1, there's a
container. If > 1, there's more than 1 container and that's unexpected behavior
:param containers_or_container_statuses: list of V1ContainerStatus or V1Container
:return: first item if len of list is 1, None if 0, and raises CalrissianJobException if > 1
"""
if not container_list: # None or empty list
return None
else:
return container_list[-1]

@staticmethod
def get_first_or_none(container_list: List[Union[V1ContainerStatus, V1Container]]) -> Union[V1ContainerStatus, V1Container]:
"""
Expand Down

0 comments on commit befe99a

Please sign in to comment.