Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

chore: refactor type hints #17

Merged
merged 1 commit into from
Jan 22, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -159,4 +159,5 @@ cython_debug/
# be found at https://github.com/github/gitignore/blob/main/Global/JetBrains.gitignore
# and can be added to the global gitignore or merged into this file. For a more nuclear
# option (not recommended) you can uncomment the following to ignore the entire idea folder.
.idea/
.idea/
uv.lock
49 changes: 20 additions & 29 deletions dagster_uc/manage_user_code_deployments.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,12 +8,11 @@
import subprocess
import time
from dataclasses import asdict
from typing import Annotated, cast
from typing import Annotated

import kr8s
import typer
from kr8s.objects import (
APIObject,
ConfigMap,
Pod,
)
Expand All @@ -32,12 +31,12 @@
app.add_typer(deployment_app)
deployment_delete_app = typer.Typer(
name="delete",
help="Contains subcommands for deleting one or more user code deployments from the cluster",
help="Delete one or more user code deployments",
)
deployment_app.add_typer(deployment_delete_app)
deployment_check_app = typer.Typer(
name="check",
help="Contains subcommands for checking the status of a deployment",
help="Check status of the user code deployment",
)
deployment_app.add_typer(deployment_check_app)

Expand Down Expand Up @@ -77,10 +76,8 @@ def default(
if verbose:
config.verbose = True
logger.debug(f"Switching kubernetes context to {config.environment}...")
kr8s_api = cast(
kr8s.Api,
kr8s.api(context=f"{config.kubernetes_context}", namespace=config.namespace),
)
kr8s_api = kr8s.api(context=f"{config.kubernetes_context}", namespace=config.namespace)

handler = DagsterUserCodeHandler(config, kr8s_api)
handler._ensure_dagster_version_match()
handler.maybe_create_user_deployments_configmap()
Expand Down Expand Up @@ -191,7 +188,7 @@ def optional_prompt(text: str) -> str | None:

@deployment_app.command(
name="list",
help="List user code deployments that are currently active on the cluster",
help="List user code deployments that are currently active",
)
def deployment_list():
"""Outputs a list of currently active deployments"""
Expand All @@ -205,7 +202,7 @@ def deployment_list():

@deployment_app.command(
name="revive",
help="Redeploy an old user-code deployment, without rebuilding and uploading a docker image but instead using the latest existing image from the acr.",
help="Redeploy an old user code deployment using the latest existing image from the container registry",
)
def deployment_revive(
name: Annotated[
Expand Down Expand Up @@ -258,13 +255,10 @@ def deployment_delete(
label_selector="app.kubernetes.io/name=dagster-user-deployments",
)
handler.delete_k8s_resources(label_selector="app=dagster-user-deployments")
for item in cast(
list[APIObject],
handler.api.get(
ConfigMap,
namespace=config.namespace,
label_selector="app=dagster-user-deployments",
),
for item in handler.api.get(
ConfigMap,
namespace=config.namespace,
label_selector="app=dagster-user-deployments",
):
item.delete() # type: ignore
handler.delete_k8s_resources(label_selector="dagster/code-location")
Expand Down Expand Up @@ -318,9 +312,10 @@ def check_deployment(
f"Deployment with name '{name}' does not seem to exist in environment '{config.environment}'. Attempting to proceed with status check anyways.",
)
typer.echo(f"\033[1mStatus for deployment {name}\033[0m")
for pod in cast(
list[Pod],
handler.api.get(Pod, label_selector=f"deployment={name}", namespace=config.namespace),
for pod in handler.api.get(
Pod,
label_selector=f"deployment={name}",
namespace=config.namespace,
):
with contextlib.suppress(Exception):
for line in pod.logs(pretty=True, follow=True, timeout=timeout): # type: ignore
Expand All @@ -331,8 +326,7 @@ def check_deployment(

@deployment_app.command(
name="deploy",
help="Deploys the currently checked out git branch to the cluster as a user code deployment",
short_help="hello",
help="Deploys the currently checked out git branch as a user code deployment",
)
def deployment_deploy(
force: Annotated[
Expand Down Expand Up @@ -488,13 +482,10 @@ def is_command_available(command: str) -> bool:

while True:
code_pods = list(
cast(
list[APIObject],
handler.api.get(
Pod,
label_selector=f"deployment={deployment_name}",
namespace=config.namespace,
),
handler.api.get(
Pod,
label_selector=f"deployment={deployment_name}",
namespace=config.namespace,
),
)
if len(code_pods) == 0:
Expand Down
139 changes: 52 additions & 87 deletions dagster_uc/uc_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,11 +3,10 @@
import re
import subprocess
from collections.abc import Callable
from typing import NamedTuple, cast
from typing import NamedTuple

import kr8s
import yaml
from kr8s._objects import APIObject
from kr8s.objects import (
ConfigMap,
Deployment,
Expand Down Expand Up @@ -58,7 +57,7 @@ def maybe_create_user_deployments_configmap(self) -> None:
resource=dagster_user_deployments_values_yaml_configmap,
namespace=self.config.namespace,
api=self.api,
).create() # type: ignore
).create()

def remove_all_deployments(self) -> None:
"""This function removes in its entirety the values.yaml for dagster's user-code deployment chart from the k8s
Expand Down Expand Up @@ -210,10 +209,7 @@ def deploy_to_k8s(

if reload_dagster:
for deployment_name in ["dagster-daemon", "dagster-dagster-webserver"]:
deployment = cast(
APIObject,
Deployment.get(deployment_name, namespace=self.config.namespace),
)
deployment = Deployment.get(deployment_name, namespace=self.config.namespace)
reload_patch = {
"spec": {
"template": {
Expand All @@ -227,7 +223,7 @@ def deploy_to_k8s(
},
},
}
deployment.patch(reload_patch) # type: ignore
deployment.patch(reload_patch)

def delete_k8s_resources_for_user_deployment(
self,
Expand All @@ -237,14 +233,11 @@ def delete_k8s_resources_for_user_deployment(
"""Deletes all k8s resources related to a specific user code deployment.
Returns a boolean letting you know if pod was found
"""
for pod in cast(
list[APIObject],
self.api.get(
Pod,
label_selector=f"dagster/code-location={label}",
field_selector="status.phase=Succeeded",
namespace=self.config.namespace,
),
for pod in self.api.get(
Pod,
label_selector=f"dagster/code-location={label}",
field_selector="status.phase=Succeeded",
namespace=self.config.namespace,
):
logger.info(f"Deleting pod {pod.name}")
pod.delete() # type: ignore
Expand All @@ -257,12 +250,12 @@ def delete_k8s_resources_for_user_deployment(
namespace=self.config.namespace,
label_selector=f"deployment={label}",
api=self.api,
).delete() # type: ignore
).delete()
Deployment.get(
namespace=self.config.namespace,
label_selector=f"dagster/code-location={label}",
api=self.api,
).delete() # type: ignore
).delete()

def gen_new_deployment_yaml(
self,
Expand Down Expand Up @@ -330,12 +323,9 @@ def gen_new_deployment_yaml(
def _read_namespaced_config_map(
self,
name: str,
) -> APIObject:
) -> ConfigMap:
"""Read a configmap that exists on the k8s cluster"""
configmap = cast(
APIObject,
ConfigMap.get(name=name, namespace=self.config.namespace, api=self.api),
)
configmap = ConfigMap.get(name=name, namespace=self.config.namespace, api=self.api)
return configmap

def add_user_deployment_to_configmap(
Expand Down Expand Up @@ -423,8 +413,7 @@ def _modify_user_deployments(
"kubectl.kubernetes.io/last-applied-configuration": last_applied_configuration,
},
}

configmap.patch(new_configmap) # type: ignore
configmap.patch(new_configmap)

def get_deployment_name( # noqa: D102
self,
Expand Down Expand Up @@ -457,16 +446,11 @@ def _ensure_dagster_version_match(self) -> None:
local_dagster_version = Version(self.config.dagster_version)

## GETS cluster version from dagster deamon pod
deamon_pod = list(
cast(
list[Pod],
self.api.get(
Pod,
label_selector="deployment=daemon",
namespace=self.config.namespace,
),
),
)[0]
deamon_pod = Pod.get(
label_selector="deployment=daemon",
namespace=self.config.namespace,
api=self.api,
)

ex = deamon_pod.exec(command=["dagster", "--version"])
output = ex.stdout.decode("ascii") # type: ignore
Expand All @@ -488,13 +472,10 @@ def _ensure_dagster_version_match(self) -> None:
def check_if_code_pod_exists(self, label: str) -> bool:
"""Checks if the code location pod of specific label is available"""
running_pods = list(
cast(
list[APIObject],
self.api.get(
Pod,
label_selector=f"deployment={label}",
namespace=self.config.namespace,
),
self.api.get(
Pod,
label_selector=f"deployment={label}",
namespace=self.config.namespace,
),
)
return len(running_pods) > 0
Expand All @@ -513,50 +494,38 @@ def delete_k8s_resources(self, label_selector: str):
"CronJob",
"Job",
]:
for item in cast(
list[APIObject],
self.api.get(
resource,
namespace=self.config.namespace,
label_selector=label_selector,
),
for item in self.api.get(
resource,
namespace=self.config.namespace,
label_selector=label_selector,
):
item.delete() # type: ignore

def acquire_semaphore(self, reset_lock: bool = False) -> bool:
"""Acquires a semaphore by creating a configmap"""
if reset_lock:
semaphore_list = list(
cast(
list[APIObject],
self.api.get(
ConfigMap,
self.config.uc_deployment_semaphore_name,
namespace=self.config.namespace,
),
),
)
if len(semaphore_list):
semaphore_list[0].delete() # type: ignore

semaphore_list = list(
cast(
list[ConfigMap],
self.api.get(
ConfigMap,
try:
semaphore = ConfigMap.get(
self.config.uc_deployment_semaphore_name,
namespace=self.config.namespace,
),
),
)
if len(semaphore_list):
semaphore = semaphore_list[0]
api=self.api,
)
semaphore.delete()
except kr8s.NotFoundError:
pass

try:
semaphore = ConfigMap.get(
self.config.uc_deployment_semaphore_name,
namespace=self.config.namespace,
api=self.api,
)
if semaphore.data.get("locked") == "true":
return False

semaphore.patch({"data": {"locked": "true"}}) # type: ignore
return True
else:
else:
semaphore.patch({"data": {"locked": "true"}})
return True
except kr8s.NotFoundError:
# Create semaphore if it does not exist
semaphore = ConfigMap(
{
Expand All @@ -566,23 +535,19 @@ def acquire_semaphore(self, reset_lock: bool = False) -> bool:
},
"data": {"locked": "true"},
},
api=self.api,
).create()
return True

def release_semaphore(self) -> None:
"""Releases the semaphore lock"""
try:
semaphore = list(
cast(
list[ConfigMap],
self.api.get(
ConfigMap,
self.config.uc_deployment_semaphore_name,
namespace=self.config.namespace,
),
),
)[0]
semaphore.patch({"data": {"locked": "false"}}) # type: ignore
semaphore = ConfigMap.get(
self.config.uc_deployment_semaphore_name,
namespace=self.config.namespace,
api=self.api,
)
semaphore.patch({"data": {"locked": "false"}})
logger.debug("patched semaphore to locked: false")
except Exception as e:
logger.error(f"Failed to release deployment lock: {e}")
Expand Down
4 changes: 2 additions & 2 deletions pyproject.toml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
[project]
name = "dagster-uc"
version = "0.3.1"
version = "0.3.2"
authors = [
{name = "Stefan Verbruggen"},
{name = "Ion Koutsouris"},
Expand All @@ -17,7 +17,7 @@ classifiers = [
"Programming Language :: Python :: 3.13"
]
dependencies = [
"kr8s < 1.0",
"kr8s>=0.20,<1.0",
"pyhelm3==0.3.3",
"typer==0.12.3",
"tomli",
Expand Down
Loading