Skip to content

Commit

Permalink
Added Adaptable CA
Browse files Browse the repository at this point in the history
  • Loading branch information
tyler.spens committed Aug 27, 2024
1 parent b3c270a commit c4029b0
Show file tree
Hide file tree
Showing 4 changed files with 132 additions and 5 deletions.
2 changes: 1 addition & 1 deletion pyvenafi/_about.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,4 +2,4 @@
__author_email__ = 'spi@venafi.com'
__project_name__ = 'pyvenafi'
__project_url__ = 'https://github.com/Venafi/pyVenafi'
__version__ = '1.0.9'
__version__ = '1.0.10'
16 changes: 12 additions & 4 deletions pyvenafi/tpp/features/bases/feature_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -170,21 +170,27 @@ def _is_obj_guid(obj: str):
regex = '^[{]?[0-9a-fA-F]{8}-([0-9a-fA-F]{4}-){3}[0-9a-fA-F]{12}[}]?$'
return isinstance(obj, str) and bool(re.match(pattern=regex, string=obj))

def _get_prefixed_name(self, identity: 'Union[ident.Identity, str]'):
def _get_prefixed_name(self, identity: 'Union[ident.Identity, str, None]'):
if identity is None:
return None
if hasattr(identity, 'prefixed_name'):
return identity.prefixed_name
if self._is_prefixed_universal(identity):
return self._get_identity_object(prefixed_universal=identity).prefixed_name
return identity

def _get_prefixed_universal(self, identity: 'Union[ident.Identity, str]'):
def _get_prefixed_universal(self, identity: 'Union[ident.Identity, str, None]'):
if identity is None:
return None
if hasattr(identity, 'prefixed_universal'):
return identity.prefixed_universal
if self._is_prefixed_universal(identity):
return identity
return self._get_identity_object(prefixed_name=identity).prefixed_universal

def _get_dn(self, obj: 'Union[config.Object, str]', parent_dn: str = None):
def _get_dn(self, obj: 'Union[config.Object, str, None]', parent_dn: str = None):
if obj is None:
return None
if hasattr(obj, 'dn'):
return obj.dn
if self._is_obj_guid(obj):
Expand All @@ -196,7 +202,9 @@ def _get_dn(self, obj: 'Union[config.Object, str]', parent_dn: str = None):
else:
return obj

def _get_guid(self, obj: 'Union[config.Object, str]', parent_dn: str = None):
def _get_guid(self, obj: 'Union[config.Object, str, None]', parent_dn: str = None):
if obj is None:
return None
if hasattr(obj, 'guid'):
return obj.guid
if self._is_obj_guid(obj):
Expand Down
112 changes: 112 additions & 0 deletions pyvenafi/tpp/features/certificate_authorities.py
Original file line number Diff line number Diff line change
@@ -1,10 +1,18 @@
from __future__ import annotations

import base64
import hashlib
from typing import (
TYPE_CHECKING,
Union,
)

from pyvenafi.tpp.api.websdk.enums.secret_store import (
KeyNames,
Namespaces,
VaultTypes,
)
from pyvenafi.tpp.attributes.adaptable_ca import AdaptableCAAttributes
from pyvenafi.tpp.attributes.microsoft_ca import MicrosoftCAAttributes
from pyvenafi.tpp.attributes.self_signed_ca import SelfSignedCAAttributes
from pyvenafi.tpp.features.bases.feature_base import (
Expand Down Expand Up @@ -185,3 +193,107 @@ def create(
attributes=ca_attrs,
get_if_already_exists=get_if_already_exists
)

@feature('Self-Signed CA')
class AdaptableCA(_CertificateAuthorityBase):
def __init__(self, api):
super().__init__(api=api)

def create(
self, name: str, parent_folder: 'Union[config.Object, str]', powershell_script_name: str,
powershell_script_content: bytes, oauth_application_id: str,
oauth_token_credential: 'Union[config.Object, str]', description: 'str' = None,
contacts: 'list[ident.Identity, str]' = None, username_credential: 'Union[config.Object, str]' = None,
certificate_credential: 'Union[config.Object, str]' = None,
secondary_credential: 'Union[config.Object, str]' = None, service_address: 'str' = None,
profile_string: 'str' = None, renewal_window: int = None,
allow_reissuance: bool = False, subject_alt_name_enabled: bool = False,
fix_certificate_errors: bool = False, enabled_debug_logging: bool = False,
oauth_scope: str = None, attributes: dict = None, get_if_already_exists: bool = True
):
"""
Args:
name: Name of the CA object.
parent_folder: `:ref:`config_object` or :ref:`dn` of the parent folder of this certificate authority object.
powershell_script_name: Powershell script name on the TPP server.
powershell_script_content: Powershell script content on the TPP server.
oauth_application_id: Application ID of the OAuth application.
oauth_token_credential: `:ref:`config_object` or :ref:`dn` of the parent folder of this certificate authority object.
description: Description of the CA object.
contacts: List of :ref:`identity_object` or :ref:`prefixed_name` of the contacts.
username_credential: `:ref:`config_object` or :ref:`dn` of a username credential.
certificate_credential: `:ref:`config_object` or :ref:`dn` of a certificate credential.
secondary_credential: `:ref:`config_object` or :ref:`dn` of a secondary credential.
service_address: Any service address.
profile_string: Profile string.
renewal_window: Renewal window of the CA object.
allow_reissuance: Allow certificate reissuance.
subject_alt_name_enabled: Enable subject alternate names.
fix_certificate_errors: Fix certificate errors when the script is updated.
enabled_debug_logging: Enable debug logging.
oauth_scope: Scope of the OAuth application.
attributes: Additional attributes associated to the CA object.
get_if_already_exists: If the objects already exists, just return it as is.
Returns:
:ref:`config_object` of the certificate authority.
"""
bool_to_str = lambda x: "1" if x else None
ca_attrs = {
AdaptableCAAttributes.driver_name : 'caadaptable',
AdaptableCAAttributes.description : description,
AdaptableCAAttributes.contact : [
self._get_prefixed_universal(c) for c in
contacts
] if contacts else None,
AdaptableCAAttributes.credential : self._get_dn(username_credential),
AdaptableCAAttributes.certificate_credential : self._get_dn(certificate_credential),
AdaptableCAAttributes.secondary_credential : self._get_dn(secondary_credential),
AdaptableCAAttributes.service_address : service_address,
AdaptableCAAttributes.profile_string : profile_string,
AdaptableCAAttributes.renewal_window : renewal_window,
AdaptableCAAttributes.allow_reissue : bool_to_str(allow_reissuance),
AdaptableCAAttributes.san_enabled : bool_to_str(subject_alt_name_enabled),
AdaptableCAAttributes.retry_after_script_hash_mismatch: bool_to_str(fix_certificate_errors),
AdaptableCAAttributes.log_debug : bool_to_str(enabled_debug_logging),
AdaptableCAAttributes.oauth_token_application_id : oauth_application_id,
AdaptableCAAttributes.oauth_token_credential : self._get_dn(oauth_token_credential),
AdaptableCAAttributes.oauth_token_scope : oauth_scope,
}
if attributes:
ca_attrs.update(attributes)
ca = self._config_create(
name=name,
parent_folder_dn=self._get_dn(parent_folder),
config_class=AdaptableCAAttributes.__config_class__,
attributes=ca_attrs,
get_if_already_exists=get_if_already_exists
)

self._api.websdk.Config.AddValue.post(
object_dn=ca.dn,
attribute_name=AdaptableCAAttributes.interoperability_script,
value=powershell_script_name
)

script_hash = base64.b64encode(
hashlib.sha256(
powershell_script_content.decode().encode('utf-32-le')
).hexdigest().encode()
).decode()

vault_id = self._api.websdk.SecretStore.Add.post(
base_64_data=script_hash,
keyname=KeyNames.software_default,
vault_type=VaultTypes.blob,
namespace=Namespaces.config,
owner=ca.dn,
).vault_id

self._api.websdk.Config.WriteDn.post(
object_dn=ca.dn,
attribute_name=AdaptableCAAttributes.powershell_script_hash_vault_id,
values=[vault_id]
)

return ca
7 changes: 7 additions & 0 deletions pyvenafi/tpp/features/definitions/features.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@
)
from pyvenafi.tpp.features.certificate import Certificate
from pyvenafi.tpp.features.certificate_authorities import (
AdaptableCA,
MSCA,
SelfSignedCA,
)
Expand Down Expand Up @@ -278,6 +279,7 @@ def __init__(self, api):

self._msca = None
self._self_signed = None
self._adaptable = None

@property
def msca(self) -> MSCA:
Expand All @@ -289,6 +291,11 @@ def self_signed(self) -> SelfSignedCA:
self._self_signed = self._self_signed or SelfSignedCA(self._api)
return self._self_signed

@property
def adaptable(self) -> AdaptableCA:
self._adaptable = self._adaptable or AdaptableCA(self._api)
return self._adaptable

class _CodeSign:
def __init__(self, api):
self._api = api
Expand Down

0 comments on commit c4029b0

Please sign in to comment.