diff --git a/.gitignore b/.gitignore index 56bf8f4..7282230 100644 --- a/.gitignore +++ b/.gitignore @@ -159,4 +159,5 @@ cython_debug/ # be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore # and can be added to the global gitignore or merged into this file. For a more nuclear # option (not recommended) you can uncomment the following to ignore the entire idea folder. -.idea/ \ No newline at end of file +.idea/ +uv.lock diff --git a/dagster_uc/manage_user_code_deployments.py b/dagster_uc/manage_user_code_deployments.py index 7702a4e..db5b86b 100644 --- a/dagster_uc/manage_user_code_deployments.py +++ b/dagster_uc/manage_user_code_deployments.py @@ -8,12 +8,11 @@ import subprocess import time from dataclasses import asdict -from typing import Annotated, cast +from typing import Annotated import kr8s import typer from kr8s.objects import ( - APIObject, ConfigMap, Pod, ) @@ -32,12 +31,12 @@ app.add_typer(deployment_app) deployment_delete_app = typer.Typer( name="delete", - help="Contains subcommands for deleting one or more user code deployments from the cluster", + help="Delete one or more user code deployments", ) deployment_app.add_typer(deployment_delete_app) deployment_check_app = typer.Typer( name="check", - help="Contains subcommands for checking the status of a deployment", + help="Check status of the user code deployment", ) deployment_app.add_typer(deployment_check_app) @@ -77,10 +76,8 @@ def default( if verbose: config.verbose = True logger.debug(f"Switching kubernetes context to {config.environment}...") - kr8s_api = cast( - kr8s.Api, - kr8s.api(context=f"{config.kubernetes_context}", namespace=config.namespace), - ) + kr8s_api = kr8s.api(context=f"{config.kubernetes_context}", namespace=config.namespace) + handler = DagsterUserCodeHandler(config, kr8s_api) handler._ensure_dagster_version_match() handler.maybe_create_user_deployments_configmap() @@ -191,7 +188,7 @@ def optional_prompt(text: str) -> str | None: @deployment_app.command( name="list", - help="List user code deployments that are currently active on the cluster", + help="List user code deployments that are currently active", ) def deployment_list(): """Outputs a list of currently active deployments""" @@ -205,7 +202,7 @@ def deployment_list(): @deployment_app.command( name="revive", - help="Redeploy an old user-code deployment, without rebuilding and uploading a docker image but instead using the latest existing image from the acr.", + help="Redeploy an old user code deployment using the latest existing image from the container registry", ) def deployment_revive( name: Annotated[ @@ -258,13 +255,10 @@ def deployment_delete( label_selector="app.kubernetes.io/name=dagster-user-deployments", ) handler.delete_k8s_resources(label_selector="app=dagster-user-deployments") - for item in cast( - list[APIObject], - handler.api.get( - ConfigMap, - namespace=config.namespace, - label_selector="app=dagster-user-deployments", - ), + for item in handler.api.get( + ConfigMap, + namespace=config.namespace, + label_selector="app=dagster-user-deployments", ): item.delete() # type: ignore handler.delete_k8s_resources(label_selector="dagster/code-location") @@ -318,9 +312,10 @@ def check_deployment( f"Deployment with name '{name}' does not seem to exist in environment '{config.environment}'. Attempting to proceed with status check anyways.", ) typer.echo(f"\033[1mStatus for deployment {name}\033[0m") - for pod in cast( - list[Pod], - handler.api.get(Pod, label_selector=f"deployment={name}", namespace=config.namespace), + for pod in handler.api.get( + Pod, + label_selector=f"deployment={name}", + namespace=config.namespace, ): with contextlib.suppress(Exception): for line in pod.logs(pretty=True, follow=True, timeout=timeout): # type: ignore @@ -331,8 +326,7 @@ def check_deployment( @deployment_app.command( name="deploy", - help="Deploys the currently checked out git branch to the cluster as a user code deployment", - short_help="hello", + help="Deploys the currently checked out git branch as a user code deployment", ) def deployment_deploy( force: Annotated[ @@ -488,13 +482,10 @@ def is_command_available(command: str) -> bool: while True: code_pods = list( - cast( - list[APIObject], - handler.api.get( - Pod, - label_selector=f"deployment={deployment_name}", - namespace=config.namespace, - ), + handler.api.get( + Pod, + label_selector=f"deployment={deployment_name}", + namespace=config.namespace, ), ) if len(code_pods) == 0: diff --git a/dagster_uc/uc_handler.py b/dagster_uc/uc_handler.py index 991cde9..ffef581 100644 --- a/dagster_uc/uc_handler.py +++ b/dagster_uc/uc_handler.py @@ -3,11 +3,10 @@ import re import subprocess from collections.abc import Callable -from typing import NamedTuple, cast +from typing import NamedTuple import kr8s import yaml -from kr8s._objects import APIObject from kr8s.objects import ( ConfigMap, Deployment, @@ -58,7 +57,7 @@ def maybe_create_user_deployments_configmap(self) -> None: resource=dagster_user_deployments_values_yaml_configmap, namespace=self.config.namespace, api=self.api, - ).create() # type: ignore + ).create() def remove_all_deployments(self) -> None: """This function removes in its entirety the values.yaml for dagster's user-code deployment chart from the k8s @@ -210,10 +209,7 @@ def deploy_to_k8s( if reload_dagster: for deployment_name in ["dagster-daemon", "dagster-dagster-webserver"]: - deployment = cast( - APIObject, - Deployment.get(deployment_name, namespace=self.config.namespace), - ) + deployment = Deployment.get(deployment_name, namespace=self.config.namespace) reload_patch = { "spec": { "template": { @@ -227,7 +223,7 @@ def deploy_to_k8s( }, }, } - deployment.patch(reload_patch) # type: ignore + deployment.patch(reload_patch) def delete_k8s_resources_for_user_deployment( self, @@ -237,14 +233,11 @@ def delete_k8s_resources_for_user_deployment( """Deletes all k8s resources related to a specific user code deployment. Returns a boolean letting you know if pod was found """ - for pod in cast( - list[APIObject], - self.api.get( - Pod, - label_selector=f"dagster/code-location={label}", - field_selector="status.phase=Succeeded", - namespace=self.config.namespace, - ), + for pod in self.api.get( + Pod, + label_selector=f"dagster/code-location={label}", + field_selector="status.phase=Succeeded", + namespace=self.config.namespace, ): logger.info(f"Deleting pod {pod.name}") pod.delete() # type: ignore @@ -257,12 +250,12 @@ def delete_k8s_resources_for_user_deployment( namespace=self.config.namespace, label_selector=f"deployment={label}", api=self.api, - ).delete() # type: ignore + ).delete() Deployment.get( namespace=self.config.namespace, label_selector=f"dagster/code-location={label}", api=self.api, - ).delete() # type: ignore + ).delete() def gen_new_deployment_yaml( self, @@ -330,12 +323,9 @@ def gen_new_deployment_yaml( def _read_namespaced_config_map( self, name: str, - ) -> APIObject: + ) -> ConfigMap: """Read a configmap that exists on the k8s cluster""" - configmap = cast( - APIObject, - ConfigMap.get(name=name, namespace=self.config.namespace, api=self.api), - ) + configmap = ConfigMap.get(name=name, namespace=self.config.namespace, api=self.api) return configmap def add_user_deployment_to_configmap( @@ -423,8 +413,7 @@ def _modify_user_deployments( "kubectl.kubernetes.io/last-applied-configuration": last_applied_configuration, }, } - - configmap.patch(new_configmap) # type: ignore + configmap.patch(new_configmap) def get_deployment_name( # noqa: D102 self, @@ -457,16 +446,11 @@ def _ensure_dagster_version_match(self) -> None: local_dagster_version = Version(self.config.dagster_version) ## GETS cluster version from dagster deamon pod - deamon_pod = list( - cast( - list[Pod], - self.api.get( - Pod, - label_selector="deployment=daemon", - namespace=self.config.namespace, - ), - ), - )[0] + deamon_pod = Pod.get( + label_selector="deployment=daemon", + namespace=self.config.namespace, + api=self.api, + ) ex = deamon_pod.exec(command=["dagster", "--version"]) output = ex.stdout.decode("ascii") # type: ignore @@ -488,13 +472,10 @@ def _ensure_dagster_version_match(self) -> None: def check_if_code_pod_exists(self, label: str) -> bool: """Checks if the code location pod of specific label is available""" running_pods = list( - cast( - list[APIObject], - self.api.get( - Pod, - label_selector=f"deployment={label}", - namespace=self.config.namespace, - ), + self.api.get( + Pod, + label_selector=f"deployment={label}", + namespace=self.config.namespace, ), ) return len(running_pods) > 0 @@ -513,50 +494,38 @@ def delete_k8s_resources(self, label_selector: str): "CronJob", "Job", ]: - for item in cast( - list[APIObject], - self.api.get( - resource, - namespace=self.config.namespace, - label_selector=label_selector, - ), + for item in self.api.get( + resource, + namespace=self.config.namespace, + label_selector=label_selector, ): item.delete() # type: ignore def acquire_semaphore(self, reset_lock: bool = False) -> bool: """Acquires a semaphore by creating a configmap""" if reset_lock: - semaphore_list = list( - cast( - list[APIObject], - self.api.get( - ConfigMap, - self.config.uc_deployment_semaphore_name, - namespace=self.config.namespace, - ), - ), - ) - if len(semaphore_list): - semaphore_list[0].delete() # type: ignore - - semaphore_list = list( - cast( - list[ConfigMap], - self.api.get( - ConfigMap, + try: + semaphore = ConfigMap.get( self.config.uc_deployment_semaphore_name, namespace=self.config.namespace, - ), - ), - ) - if len(semaphore_list): - semaphore = semaphore_list[0] + api=self.api, + ) + semaphore.delete() + except kr8s.NotFoundError: + pass + + try: + semaphore = ConfigMap.get( + self.config.uc_deployment_semaphore_name, + namespace=self.config.namespace, + api=self.api, + ) if semaphore.data.get("locked") == "true": return False - - semaphore.patch({"data": {"locked": "true"}}) # type: ignore - return True - else: + else: + semaphore.patch({"data": {"locked": "true"}}) + return True + except kr8s.NotFoundError: # Create semaphore if it does not exist semaphore = ConfigMap( { @@ -566,23 +535,19 @@ def acquire_semaphore(self, reset_lock: bool = False) -> bool: }, "data": {"locked": "true"}, }, + api=self.api, ).create() return True def release_semaphore(self) -> None: """Releases the semaphore lock""" try: - semaphore = list( - cast( - list[ConfigMap], - self.api.get( - ConfigMap, - self.config.uc_deployment_semaphore_name, - namespace=self.config.namespace, - ), - ), - )[0] - semaphore.patch({"data": {"locked": "false"}}) # type: ignore + semaphore = ConfigMap.get( + self.config.uc_deployment_semaphore_name, + namespace=self.config.namespace, + api=self.api, + ) + semaphore.patch({"data": {"locked": "false"}}) logger.debug("patched semaphore to locked: false") except Exception as e: logger.error(f"Failed to release deployment lock: {e}") diff --git a/pyproject.toml b/pyproject.toml index 7788122..4866ff4 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "dagster-uc" -version = "0.3.1" +version = "0.3.2" authors = [ {name = "Stefan Verbruggen"}, {name = "Ion Koutsouris"}, @@ -17,7 +17,7 @@ classifiers = [ "Programming Language :: Python :: 3.13" ] dependencies = [ - "kr8s < 1.0", + "kr8s>=0.20,<1.0", "pyhelm3==0.3.3", "typer==0.12.3", "tomli",