Skip to content

Commit

Permalink
Update kafka helpers
Browse files Browse the repository at this point in the history
Signed-off-by: obaydullahmhs <obaydullah@appscode.com>
  • Loading branch information
obaydullahmhs authored and raihankhan committed Jan 19, 2024
1 parent eac46c3 commit 353cb99
Show file tree
Hide file tree
Showing 4 changed files with 72 additions and 48 deletions.
20 changes: 5 additions & 15 deletions apis/kafka/v1alpha1/connect_cluster_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,23 +173,13 @@ func validateEnvVars(connect *ConnectCluster) error {
return nil
}

var availableVersions = []string{
"3.3.0",
"3.3.2",
"3.4.0",
"3.4.1",
"3.5.1",
"3.6.0",
}

func validateVersion(connect *ConnectCluster) error {
version := connect.Spec.Version
for _, v := range availableVersions {
if v == version {
return nil
}
kccVersion := &catalog.KafkaVersion{}
err := DefaultClient.Get(context.TODO(), types.NamespacedName{Name: connect.Spec.Version}, kccVersion)
if err != nil {
return errors.New("version not supported")
}
return errors.New("version not supported")
return nil
}

var reservedVolumes = []string{
Expand Down
37 changes: 37 additions & 0 deletions apis/kafka/v1alpha1/connector_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,11 +17,48 @@ limitations under the License.
package v1alpha1

import (
"fmt"

"kubedb.dev/apimachinery/apis/kafka"
"kubedb.dev/apimachinery/crds"

meta "k8s.io/apimachinery/pkg/apis/meta/v1"
"kmodules.xyz/client-go/apiextensions"
)

func (_ *Connector) CustomResourceDefinition() *apiextensions.CustomResourceDefinition {
return crds.MustCustomResourceDefinition(SchemeGroupVersion.WithResource(ResourcePluralConnector))
}

func (k *Connector) AsOwner() *meta.OwnerReference {
return meta.NewControllerRef(k, SchemeGroupVersion.WithKind(ResourceKindConnector))
}

func (k *Connector) ResourceShortCode() string {
return ResourceCodeConnector
}

func (k *Connector) ResourceKind() string {
return ResourceKindConnector
}

func (k *Connector) ResourceSingular() string {
return ResourceSingularConnector
}

func (k *Connector) ResourcePlural() string {
return ResourcePluralConnector
}

func (k *Connector) ResourceFQN() string {
return fmt.Sprintf("%s.%s", k.ResourcePlural(), kafka.GroupName)
}

// Owner returns owner reference to resources
func (k *Connector) Owner() *meta.OwnerReference {
return meta.NewControllerRef(k, SchemeGroupVersion.WithKind(k.ResourceKind()))
}

func (k *Connector) OffshootName() string {
return k.Name
}
39 changes: 21 additions & 18 deletions apis/kubedb/v1alpha2/kafka_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -294,6 +294,23 @@ func (k *Kafka) SetDefaults() {
k.Spec.StorageType = StorageTypeDurable
}

var kfVersion catalog.KafkaVersion
err := DefaultClient.Get(context.TODO(), types.NamespacedName{Name: k.Spec.Version}, &kfVersion)
if err != nil {
klog.Errorf("can't get the kafka version object %s for %s \n", err.Error(), k.Spec.Version)
return
}

k.setDefaultContainerSecurityContext(&kfVersion, &k.Spec.PodTemplate)
if k.Spec.CruiseControl != nil {
k.setDefaultContainerSecurityContext(&kfVersion, &k.Spec.CruiseControl.PodTemplate)
}

k.Spec.Monitor.SetDefaults()
if k.Spec.Monitor != nil && k.Spec.Monitor.Prometheus != nil && k.Spec.Monitor.Prometheus.Exporter.SecurityContext.RunAsUser == nil {
k.Spec.Monitor.Prometheus.Exporter.SecurityContext.RunAsUser = kfVersion.Spec.SecurityContext.RunAsUser
}

if k.Spec.Topology != nil {
if k.Spec.Topology.Controller != nil {
if k.Spec.Topology.Controller.Suffix == "" {
Expand All @@ -315,29 +332,15 @@ func (k *Kafka) SetDefaults() {
apis.SetDefaultResourceLimits(&k.Spec.Topology.Broker.Resources, DefaultResources)
}
} else {
apis.SetDefaultResourceLimits(&k.Spec.PodTemplate.Spec.Containers[0].Resources, DefaultResources)
dbContainer := coreutil.GetContainerByName(k.Spec.PodTemplate.Spec.Containers, KafkaContainerName)
if dbContainer != nil {
apis.SetDefaultResourceLimits(&dbContainer.Resources, DefaultResources)
}
if k.Spec.Replicas == nil {
k.Spec.Replicas = pointer.Int32P(1)
}
}

var kfVersion catalog.KafkaVersion
err := DefaultClient.Get(context.TODO(), types.NamespacedName{Name: k.Spec.Version}, &kfVersion)
if err != nil {
klog.Errorf("can't get the kafka version object %s for %s \n", err.Error(), k.Spec.Version)
return
}

k.setDefaultContainerSecurityContext(&kfVersion, &k.Spec.PodTemplate)
if k.Spec.CruiseControl != nil {
k.setDefaultContainerSecurityContext(&kfVersion, &k.Spec.CruiseControl.PodTemplate)
}

k.Spec.Monitor.SetDefaults()
if k.Spec.Monitor != nil && k.Spec.Monitor.Prometheus != nil && k.Spec.Monitor.Prometheus.Exporter.SecurityContext.RunAsUser == nil {
k.Spec.Monitor.Prometheus.Exporter.SecurityContext.RunAsUser = kfVersion.Spec.SecurityContext.RunAsUser
}

if k.Spec.EnableSSL {
k.SetTLSDefaults()
}
Expand Down
24 changes: 9 additions & 15 deletions apis/kubedb/v1alpha2/kafka_webhook.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,17 @@ limitations under the License.
package v1alpha2

import (
"context"
"errors"

catalog "kubedb.dev/apimachinery/apis/catalog/v1alpha1"

errors2 "github.com/pkg/errors"
"gomodules.xyz/pointer"
apierrors "k8s.io/apimachinery/pkg/api/errors"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/apimachinery/pkg/runtime/schema"
"k8s.io/apimachinery/pkg/types"
"k8s.io/apimachinery/pkg/util/validation/field"
ofst "kmodules.xyz/offshoot-api/api/v2"
ctrl "sigs.k8s.io/controller-runtime"
Expand Down Expand Up @@ -204,23 +208,13 @@ func (k *Kafka) ValidateCreateOrUpdate() error {
return apierrors.NewInvalid(schema.GroupKind{Group: "kafka.kubedb.com", Kind: "Kafka"}, k.Name, allErr)
}

var kafkaAvailableVersions = []string{
"3.3.0",
"3.3.2",
"3.4.0",
"3.4.1",
"3.5.1",
"3.6.0",
}

func (k *Kafka) validateVersion(db *Kafka) error {
version := db.Spec.Version
for _, v := range kafkaAvailableVersions {
if v == version {
return nil
}
kfVersion := &catalog.KafkaVersion{}
err := DefaultClient.Get(context.TODO(), types.NamespacedName{Name: db.Spec.Version}, kfVersion)
if err != nil {
return errors.New("version not supported")
}
return errors.New("version not supported")
return nil
}

func (k *Kafka) validateNodeSuffix(topology *KafkaClusterTopology) error {
Expand Down

0 comments on commit 353cb99

Please sign in to comment.