Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor AuthCredentials #3

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions ops/ops/interface_kube_control/consts.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
from typing import Final

CLIENT_TOKEN_SECERT_FIELD: Final[str] = "client-token"
KUBELET_TOKEN_SECRET_FIELD: Final[str] = "kubelet-token"
PROXY_TOKEN_SECRET_FIELD: Final[str] = "proxy-token"
SECRET_LABEL_FORMAT: Final[str] = "{user}-creds"
louiseschmidtgen marked this conversation as resolved.
Show resolved Hide resolved
36 changes: 32 additions & 4 deletions ops/ops/interface_kube_control/model.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,13 @@
from typing import List, Dict, Optional
import re

from .consts import (
SECRET_LABEL_FORMAT,
CLIENT_TOKEN_SECERT_FIELD,
KUBELET_TOKEN_SECRET_FIELD,
PROXY_TOKEN_SECRET_FIELD,
)


class _ValidatedStr:
def __init__(self, value, *groups) -> None:
Expand Down Expand Up @@ -94,22 +101,25 @@ class Creds(BaseModel):
secret_id: Optional[str] = Field(alias="secret-id", default=None)

def _get_secret_content(self, model: ops.Model, user: str) -> Dict[str, str]:
secret = model.get_secret(id=self.secret_id, label=f"{user}-creds")
secret = model.get_secret(
id=self.secret_id,
label=SECRET_LABEL_FORMAT.format(user=user),
)
return secret.get_content(refresh=True)

def load_client_token(self, model: ops.Model, user: str) -> str:
if self.secret_id:
return self._get_secret_content(model, user)["client-token"]
return self._get_secret_content(model, user)[CLIENT_TOKEN_SECERT_FIELD]
return self.client_token

def load_kubelet_token(self, model, user: str) -> str:
if self.secret_id:
return self._get_secret_content(model, user)["kubelet-token"]
return self._get_secret_content(model, user)[KUBELET_TOKEN_SECRET_FIELD]
return self.kubelet_token

def load_proxy_token(self, model, user: str) -> str:
if self.secret_id:
return self._get_secret_content(model, user)["proxy-token"]
return self._get_secret_content(model, user)[PROXY_TOKEN_SECRET_FIELD]
return self.proxy_token


Expand Down Expand Up @@ -139,3 +149,21 @@ def get_ca_certificate(self, model: ops.Model) -> Optional[bytes]:
return secret.get_content(refresh=True)["ca-certificate"].encode()
except ops.SecretNotFoundError:
return None


class AuthCredentials(BaseModel):

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are there any tests around getting and setting these AuthCredentials?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think there is. The only thing that we use from that AuthCredentials is the client_token field, and that field is used to populate the token in the created kubeconfig. We have a test covering it here: https://github.com/charmed-kubernetes/interface-kube-control/blob/main/ops/tests/unit/test_ops_requires.py#L157

"""
AuthCredentials is a Pydantic model that holds authentication credentials
for accessing a Kubernetes cluster.

Attributes:
user (str): The username for authentication.
kubelet_token (str): The token used for authenticating to the kubelet.
proxy_token (str): The token used for authenticating to the proxy.
client_token (str): The token used for authenticating to the client.
"""

user: str
kubelet_token: str
proxy_token: str
client_token: str
Comment on lines +167 to +169

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Are these tokens going to be encoded?

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I don't think so, I just created this model because I have personal issues with a type-unsafe dict :D

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That'll be the go developer in you :D

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You can make them "safe" using SecretStr from pydantic but I don't recommend it b/c users of this library would have to change to access them.

    thing.client_token.get_secret_value()

16 changes: 12 additions & 4 deletions ops/ops/interface_kube_control/provides.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,13 @@
from ops import CharmBase, Relation, SecretNotFoundError, Unit
from typing import Generator, List, Tuple

from .consts import (
CLIENT_TOKEN_SECERT_FIELD,
KUBELET_TOKEN_SECRET_FIELD,
PROXY_TOKEN_SECRET_FIELD,
SECRET_LABEL_FORMAT,
)

log = logging.getLogger("KubeControlProvides")


Expand Down Expand Up @@ -214,11 +221,12 @@ def sign_auth_request(
if 1 in request.schema_vers and request_relation:
# Requesting unit can use schema 1, use juju secrets
content = {
"client-token": client_token,
"kubelet-token": kubelet_token,
"proxy-token": proxy_token,
CLIENT_TOKEN_SECERT_FIELD: client_token,
KUBELET_TOKEN_SECRET_FIELD: kubelet_token,
PROXY_TOKEN_SECRET_FIELD: proxy_token,
}
label = f"{request.user}-creds"

label = SECRET_LABEL_FORMAT.format(user=request.user)
description = f"Credentials for {request.user}"
secret = self.refresh_secret_content(label, content, description)
if secret.id:
Expand Down
24 changes: 12 additions & 12 deletions ops/ops/interface_kube_control/requires.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

import yaml
from backports.cached_property import cached_property
from .model import AuthRequest, Creds, Data, Taint, Label
from .model import AuthCredentials, AuthRequest, Creds, Data, Taint, Label
from pydantic import ValidationError

from ops.charm import CharmBase, RelationBrokenEvent
Expand Down Expand Up @@ -84,7 +84,7 @@ def create_kubeconfig(
creds = self.get_auth_credentials(k8s_user)
endpoints = self.get_api_endpoints()
server = endpoints[0] if endpoints else None
token = creds["client_token"] if creds else None
token = creds.client_token if creds else None

if ca_content := self.get_ca_certificate():
ca_b64 = base64.b64encode(ca_content).decode("utf-8")
Expand Down Expand Up @@ -137,23 +137,23 @@ def get_ca_certificate(self) -> Optional[bytes]:

return self._data.get_ca_certificate(self.model)

def get_auth_credentials(self, user) -> Optional[Mapping[str, str]]:
def get_auth_credentials(self, user) -> Optional[AuthCredentials]:
"""Return the authentication credentials."""
if not self.is_ready:
return None

users: Dict[str, Creds] = self._data.creds
users_to_creds: Dict[str, Creds] = self._data.creds

if creds := users.get(user):
return {
"user": user,
"kubelet_token": creds.load_kubelet_token(self.model, user),
"proxy_token": creds.load_proxy_token(self.model, user),
"client_token": creds.load_client_token(self.model, user),
}
if creds := users_to_creds.get(user):
return AuthCredentials(
user=user,
kubelet_token=creds.load_kubelet_token(self.model, user),
proxy_token=creds.load_proxy_token(self.model, user),
client_token=creds.load_client_token(self.model, user),
)
return None

def get_dns(self) -> Mapping[str, str]:
def get_dns(self) -> Mapping[str, Optional[str]]:
"""
Return DNS info provided by the control-plane.
"""
Expand Down