From 9464d474da0942f0df83cf6c60b52f02fc09a031 Mon Sep 17 00:00:00 2001 From: morvencao Date: Tue, 7 Jan 2025 12:26:41 +0000 Subject: [PATCH] enable grpc broker work with openshift route and customize certificate. Signed-off-by: morvencao --- cmd/maestro/server/auth_interceptor.go | 6 +++ cmd/maestro/server/grpc_broker.go | 57 ++++++++++++++++++++++--- cmd/maestro/server/grpc_server.go | 9 +++- go.mod | 2 +- go.sum | 4 +- pkg/client/cloudevents/source_client.go | 3 ++ pkg/config/grpc_server.go | 46 +++++++++++++------- pkg/event/event.go | 4 +- templates/route-template.yml | 19 ++++++++- templates/service-template.yml | 17 +++++++- test/e2e/pkg/suite_test.go | 3 ++ test/e2e/setup/e2e_setup.sh | 23 ++++++++-- test/helper.go | 3 +- 13 files changed, 160 insertions(+), 36 deletions(-) diff --git a/cmd/maestro/server/auth_interceptor.go b/cmd/maestro/server/auth_interceptor.go index c8d042b3..1b22836f 100644 --- a/cmd/maestro/server/auth_interceptor.go +++ b/cmd/maestro/server/auth_interceptor.go @@ -110,6 +110,9 @@ func newAuthUnaryInterceptor(authNType string, authorizer grpcauthorizer.GRPCAut klog.Errorf("unable to get user and groups from certificate: %v", err) return nil, err } + case "mock": + user = "mock" + groups = []string{"mock-group"} default: return nil, fmt.Errorf("unsupported authentication type %s", authNType) } @@ -165,6 +168,9 @@ func newAuthStreamInterceptor(authNType string, authorizer grpcauthorizer.GRPCAu klog.Errorf("unable to get user and groups from certificate: %v", err) return err } + case "mock": + user = "mock" + groups = []string{"mock-group"} default: return fmt.Errorf("unsupported authentication Type %s", authNType) } diff --git a/cmd/maestro/server/grpc_broker.go b/cmd/maestro/server/grpc_broker.go index eff5bf39..15021aed 100644 --- a/cmd/maestro/server/grpc_broker.go +++ b/cmd/maestro/server/grpc_broker.go @@ -2,8 +2,11 @@ package server import ( "context" + "crypto/tls" + "crypto/x509" "fmt" "net" + "os" "strconv" "sync" "time" @@ -14,6 +17,7 @@ import ( "github.com/google/uuid" "google.golang.org/grpc" "google.golang.org/grpc/codes" + "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" "google.golang.org/grpc/status" "google.golang.org/protobuf/types/known/emptypb" @@ -71,11 +75,55 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer { grpcServerOptions = append(grpcServerOptions, grpc.ConnectionTimeout(config.ConnectionTimeout)) grpcServerOptions = append(grpcServerOptions, grpc.WriteBufferSize(config.WriteBufferSize)) grpcServerOptions = append(grpcServerOptions, grpc.ReadBufferSize(config.ReadBufferSize)) + grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: config.ClientMinPingInterval, + PermitWithoutStream: config.PermitPingWithoutStream, + })) grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionAge: config.MaxConnectionAge, + Time: config.ServerPingInterval, + Timeout: config.ServerPingTimeout, })) - klog.Infof("Serving gRPC broker without TLS at %s", config.BrokerBindPort) + if !config.DisableTLS { + // Check tls cert and key path path + if config.BrokerTLSCertFile == "" || config.BrokerTLSKeyFile == "" { + check( + fmt.Errorf("unspecified required --grpc-broker-tls-cert-file, --grpc-broker-tls-key-file"), + "Can't start gRPC broker", + ) + } + // Serve with TLS + serverCerts, err := tls.LoadX509KeyPair(config.BrokerTLSCertFile, config.BrokerTLSKeyFile) + if err != nil { + check(fmt.Errorf("failed to load broker certificates: %v", err), "Can't start gRPC broker") + } + tlsConfig := &tls.Config{ + Certificates: []tls.Certificate{serverCerts}, + MinVersion: tls.VersionTLS13, + MaxVersion: tls.VersionTLS13, + } + if config.BrokerClientCAFile != "" { + certPool, err := x509.SystemCertPool() + if err != nil { + check(fmt.Errorf("failed to load system cert pool: %v", err), "Can't start gRPC broker") + } + caPEM, err := os.ReadFile(config.BrokerClientCAFile) + if err != nil { + check(fmt.Errorf("failed to read broker client CA file: %v", err), "Can't start gRPC broker") + } + if ok := certPool.AppendCertsFromPEM(caPEM); !ok { + check(fmt.Errorf("failed to append broker client CA to cert pool"), "Can't start gRPC broker") + } + tlsConfig.ClientCAs = certPool + tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert + } + grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig))) + klog.Infof("Serving gRPC broker with TLS at %s", config.ServerBindPort) + } else { + klog.Infof("Serving gRPC broker without TLS at %s", config.ServerBindPort) + } + sessionFactory := env().Database.SessionFactory return &GRPCBroker{ grpcServer: grpc.NewServer(grpcServerOptions...), @@ -160,8 +208,7 @@ func (bkr *GRPCBroker) register(clusterName string, handler resourceHandler) (st errChan: errChan, } - klog.V(4).Infof("register a subscriber %s (cluster name = %s)", id, clusterName) - + klog.V(4).Infof("registered a subscriber %s (cluster name = %s)", id, clusterName) return id, errChan } @@ -170,9 +217,9 @@ func (bkr *GRPCBroker) unregister(id string) { bkr.mu.Lock() defer bkr.mu.Unlock() - klog.V(10).Infof("unregister subscriber %s", id) close(bkr.subscribers[id].errChan) delete(bkr.subscribers, id) + klog.V(4).Infof("unregistered subscriber %s", id) } // Subscribe in stub implementation for maestro agent subscribe resource spec from maestro server. @@ -215,7 +262,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv case err := <-errChan: // When reaching this point, an unrecoverable error occurred while sending the event, // such as the connection being closed. Unregister the subscriber to trigger agent reconnection. - klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err) + klog.Infof("unregistering subscriber %s because unrecoverable error= %v", subscriberID, err) bkr.unregister(subscriberID) return err case <-subServer.Context().Done(): diff --git a/cmd/maestro/server/grpc_server.go b/cmd/maestro/server/grpc_server.go index bde8bdbc..801aa6e3 100644 --- a/cmd/maestro/server/grpc_server.go +++ b/cmd/maestro/server/grpc_server.go @@ -54,8 +54,14 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e grpcServerOptions = append(grpcServerOptions, grpc.ConnectionTimeout(config.ConnectionTimeout)) grpcServerOptions = append(grpcServerOptions, grpc.WriteBufferSize(config.WriteBufferSize)) grpcServerOptions = append(grpcServerOptions, grpc.ReadBufferSize(config.ReadBufferSize)) + grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{ + MinTime: config.ClientMinPingInterval, + PermitWithoutStream: config.PermitPingWithoutStream, + })) grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{ MaxConnectionAge: config.MaxConnectionAge, + Time: config.ServerPingInterval, + Timeout: config.ServerPingTimeout, })) if !config.DisableTLS { @@ -266,11 +272,10 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv select { case err := <-errChan: - klog.Errorf("unregister client %s, error= %v", clientID, err) + klog.Infof("unregistering client %s due to error= %v", clientID, err) svr.eventBroadcaster.Unregister(clientID) return err case <-subServer.Context().Done(): - klog.V(10).Infof("unregister client %s", clientID) svr.eventBroadcaster.Unregister(clientID) return nil } diff --git a/go.mod b/go.mod index 1d487778..248f8437 100755 --- a/go.mod +++ b/go.mod @@ -55,7 +55,7 @@ require ( k8s.io/klog/v2 v2.130.1 open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2 open-cluster-management.io/ocm v0.15.1-0.20250108154653-2397c4e91119 - open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22 + open-cluster-management.io/sdk-go v0.15.1-0.20250106052515-7c50bbf220a9 sigs.k8s.io/yaml v1.4.0 ) diff --git a/go.sum b/go.sum index efe8481c..915a0f53 100644 --- a/go.sum +++ b/go.sum @@ -893,8 +893,8 @@ open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2 h1:zkp3VJnv open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM= open-cluster-management.io/ocm v0.15.1-0.20250108154653-2397c4e91119 h1:Ftx7vxDumTB9d4+ZdcqYwQavTOVzgF5h6vAXJ/gh0IE= open-cluster-management.io/ocm v0.15.1-0.20250108154653-2397c4e91119/go.mod h1:T9pfSm3EYHnysEP9JYfCojV2pI44IYMz3zaZNylulz8= -open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22 h1:w15NHc6cBfYxKHtF6zGLeQ1iTUqtN53sdONi9XXy5Xc= -open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA= +open-cluster-management.io/sdk-go v0.15.1-0.20250106052515-7c50bbf220a9 h1:yxkdser0gmaUryPRA33Fb3I+DDzXlPsDUNFIK5DVbPI= +open-cluster-management.io/sdk-go v0.15.1-0.20250106052515-7c50bbf220a9/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY= sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw= sigs.k8s.io/controller-runtime v0.19.3 h1:XO2GvC9OPftRst6xWCpTgBZO04S2cbp0Qqkj8bX1sPw= diff --git a/pkg/client/cloudevents/source_client.go b/pkg/client/cloudevents/source_client.go index 0e4d9a52..62c2444e 100644 --- a/pkg/client/cloudevents/source_client.go +++ b/pkg/client/cloudevents/source_client.go @@ -161,6 +161,9 @@ func (s *SourceClientImpl) ReconnectedChan() <-chan struct{} { // with the agent's status calculation. The resource status is converted to // manifestwork status based on resource type before calculating the hash. func ResourceStatusHashGetter(res *api.Resource) (string, error) { + if len(res.Status) == 0 { + return fmt.Sprintf("%x", sha256.Sum256([]byte(""))), nil + } evt, err := api.JSONMAPToCloudEvent(res.Status) if err != nil { return "", fmt.Errorf("failed to convert resource status to cloud event, %v", err) diff --git a/pkg/config/grpc_server.go b/pkg/config/grpc_server.go index cdbb5eb7..6d5929aa 100755 --- a/pkg/config/grpc_server.go +++ b/pkg/config/grpc_server.go @@ -8,22 +8,29 @@ import ( ) type GRPCServerConfig struct { - EnableGRPCServer bool `json:"enable_grpc_server"` - DisableTLS bool `json:"disable_grpc_tls"` - TLSCertFile string `json:"grpc_tls_cert_file"` - TLSKeyFile string `json:"grpc_tls_key_file"` - GRPCAuthNType string `json:"grpc_authn_type"` - GRPCAuthorizerConfig string `json:"grpc_authorizer_config"` - ClientCAFile string `json:"grpc_client_ca_file"` - ServerBindPort string `json:"server_bind_port"` - BrokerBindPort string `json:"broker_bind_port"` - MaxConcurrentStreams uint32 `json:"max_concurrent_steams"` - MaxReceiveMessageSize int `json:"max_receive_message_size"` - MaxSendMessageSize int `json:"max_send_message_size"` - ConnectionTimeout time.Duration `json:"connection_timeout"` - WriteBufferSize int `json:"write_buffer_size"` - ReadBufferSize int `json:"read_buffer_size"` - MaxConnectionAge time.Duration `json:"max_connection_age"` + EnableGRPCServer bool `json:"enable_grpc_server"` + DisableTLS bool `json:"disable_grpc_tls"` + TLSCertFile string `json:"grpc_tls_cert_file"` + TLSKeyFile string `json:"grpc_tls_key_file"` + BrokerTLSCertFile string `json:"grpc_broker_tls_cert_file"` + BrokerTLSKeyFile string `json:"grpc_broker_tls_key_file"` + GRPCAuthNType string `json:"grpc_authn_type"` + GRPCAuthorizerConfig string `json:"grpc_authorizer_config"` + ClientCAFile string `json:"grpc_client_ca_file"` + BrokerClientCAFile string `json:"grpc_broker_client_ca_file"` + ServerBindPort string `json:"server_bind_port"` + BrokerBindPort string `json:"broker_bind_port"` + MaxConcurrentStreams uint32 `json:"max_concurrent_steams"` + MaxReceiveMessageSize int `json:"max_receive_message_size"` + MaxSendMessageSize int `json:"max_send_message_size"` + ConnectionTimeout time.Duration `json:"connection_timeout"` + WriteBufferSize int `json:"write_buffer_size"` + ReadBufferSize int `json:"read_buffer_size"` + MaxConnectionAge time.Duration `json:"max_connection_age"` + ClientMinPingInterval time.Duration `json:"client_min_ping_interval"` + ServerPingInterval time.Duration `json:"server_ping_interval"` + ServerPingTimeout time.Duration `json:"server_ping_timeout"` + PermitPingWithoutStream bool `json:"permit_ping_without_stream"` } func NewGRPCServerConfig() *GRPCServerConfig { @@ -39,12 +46,19 @@ func (s *GRPCServerConfig) AddFlags(fs *pflag.FlagSet) { fs.IntVar(&s.MaxSendMessageSize, "grpc-max-send-message-size", math.MaxInt32, "gPRC max send message size") fs.DurationVar(&s.ConnectionTimeout, "grpc-connection-timeout", 120*time.Second, "gPRC connection timeout") fs.DurationVar(&s.MaxConnectionAge, "grpc-max-connection-age", time.Duration(math.MaxInt64), "A duration for the maximum amount of time connection may exist before closing") + fs.DurationVar(&s.ClientMinPingInterval, "grpc-client-min-ping-interval", 5*time.Second, "Server will terminate the connection if the client pings more than once within this duration") + fs.DurationVar(&s.ServerPingInterval, "grpc-server-ping-interval", 30*time.Second, "Duration after which the server pings the client if no activity is detected") + fs.DurationVar(&s.ServerPingTimeout, "grpc-server-ping-timeout", 10*time.Second, "Duration the client waits for a response after sending a keepalive ping") + fs.BoolVar(&s.PermitPingWithoutStream, "permit-ping-without-stream", false, "Allow keepalive pings even when there are no active streams") fs.IntVar(&s.WriteBufferSize, "grpc-write-buffer-size", 32*1024, "gPRC write buffer size") fs.IntVar(&s.ReadBufferSize, "grpc-read-buffer-size", 32*1024, "gPRC read buffer size") fs.BoolVar(&s.DisableTLS, "disable-grpc-tls", false, "Disable TLS for gRPC server, default is false") fs.StringVar(&s.TLSCertFile, "grpc-tls-cert-file", "", "The path to the tls.crt file") fs.StringVar(&s.TLSKeyFile, "grpc-tls-key-file", "", "The path to the tls.key file") + fs.StringVar(&s.BrokerTLSCertFile, "grpc-broker-tls-cert-file", "", "The path to the broker tls.crt file") + fs.StringVar(&s.BrokerTLSKeyFile, "grpc-broker-tls-key-file", "", "The path to the broker tls.key file") fs.StringVar(&s.GRPCAuthNType, "grpc-authn-type", "mock", "Specify the gRPC authentication type (e.g., mock, mtls or token)") fs.StringVar(&s.GRPCAuthorizerConfig, "grpc-authorizer-config", "", "Path to the gRPC authorizer configuration file") fs.StringVar(&s.ClientCAFile, "grpc-client-ca-file", "", "The path to the client ca file, must specify if using mtls authentication type") + fs.StringVar(&s.BrokerClientCAFile, "grpc-broker-client-ca-file", "", "The path to the broker client ca file") } diff --git a/pkg/event/event.go b/pkg/event/event.go index f9b24990..d053dad3 100644 --- a/pkg/event/event.go +++ b/pkg/event/event.go @@ -51,8 +51,7 @@ func (h *EventBroadcaster) Register(source string, handler resourceHandler) (str errChan: errChan, } - klog.V(4).Infof("register a broadcaster client %s (source=%s)", id, source) - + klog.V(4).Infof("registered a broadcaster client %s (source=%s)", id, source) return id, errChan } @@ -63,6 +62,7 @@ func (h *EventBroadcaster) Unregister(id string) { close(h.clients[id].errChan) delete(h.clients, id) + klog.V(4).Infof("unregistered broadcaster client %s", id) } // Broadcast broadcasts a resource status change event to all registered clients. diff --git a/templates/route-template.yml b/templates/route-template.yml index a782ca84..03e91cdb 100755 --- a/templates/route-template.yml +++ b/templates/route-template.yml @@ -39,5 +39,20 @@ objects: kind: Service name: maestro-grpc tls: - termination: reencrypt - insecureEdgeTerminationPolicy: Redirect + termination: passthrough + insecureEdgeTerminationPolicy: None + +- apiVersion: route.openshift.io/v1 + kind: Route + metadata: + name: maestro-grpc-broker + labels: + app: maestro-grpc-broker + spec: + host: maestro-grpc-broker.${EXTERNAL_APPS_DOMAIN} + to: + kind: Service + name: maestro-grpc-broker + tls: + termination: passthrough + insecureEdgeTerminationPolicy: None diff --git a/templates/service-template.yml b/templates/service-template.yml index 296235da..c548061b 100755 --- a/templates/service-template.yml +++ b/templates/service-template.yml @@ -265,6 +265,12 @@ objects: - name: tls secret: secretName: maestro-tls + - name: grpc-server-tls + secret: + secretName: maestro-grpc-server-tls + - name: grpc-broker-tls + secret: + secretName: maestro-grpc-broker-tls - name: service secret: secretName: maestro @@ -305,6 +311,10 @@ objects: volumeMounts: - name: tls mountPath: /secrets/tls + - name: grpc-server-tls + mountPath: /secrets/grpc-server-tls + - name: grpc-broker-tls + mountPath: /secrets/grpc-broker-tls - name: service mountPath: /secrets/service - name: rds @@ -342,8 +352,10 @@ objects: - --https-key-file=/secrets/tls/tls.key - --enable-grpc-server=${ENABLE_GRPC_SERVER} - --disable-grpc-tls=${DISABLE_GRPC_TLS} - - --grpc-tls-cert-file=/secrets/tls/tls.crt - - --grpc-tls-key-file=/secrets/tls/tls.key + - --grpc-tls-cert-file=/secrets/grpc-server-tls/tls.crt + - --grpc-tls-key-file=/secrets/grpc-server-tls/tls.key + - --grpc-broker-tls-cert-file=/secrets/grpc-broker-tls/tls.crt + - --grpc-broker-tls-key-file=/secrets/grpc-broker-tls/tls.key - --acl-file=/configs/authentication/acl.yml - --jwk-cert-file=/configs/authentication/jwks.json - --jwk-cert-url=${JWKS_URL} @@ -442,6 +454,7 @@ objects: port: grpc annotations: description: Exposes and load balances the maestro pods grpc endpoint + service.alpha.openshift.io/serving-cert-secret-name: maestro-grpc-server-tls spec: selector: app: maestro diff --git a/test/e2e/pkg/suite_test.go b/test/e2e/pkg/suite_test.go index f8cb9b3b..e9cfe06c 100644 --- a/test/e2e/pkg/suite_test.go +++ b/test/e2e/pkg/suite_test.go @@ -131,6 +131,9 @@ var _ = BeforeSuite(func() { // initialize the grpc source options grpcOptions = grpcoptions.NewGRPCOptions() grpcOptions.URL = grpcServerAddress + grpcOptions.KeepAliveOptions.Enable = true + grpcOptions.KeepAliveOptions.Time = 6 * time.Second + grpcOptions.KeepAliveOptions.Timeout = 1 * time.Second sourceID = "sourceclient-test" + rand.String(5) grpcCertSrt, err := serverTestOpts.kubeClientSet.CoreV1().Secrets(serverTestOpts.serverNamespace).Get(ctx, "maestro-grpc-cert", metav1.GetOptions{}) if !errors.IsNotFound(err) { diff --git a/test/e2e/setup/e2e_setup.sh b/test/e2e/setup/e2e_setup.sh index 8ea67bd7..0f9916a1 100755 --- a/test/e2e/setup/e2e_setup.sh +++ b/test/e2e/setup/e2e_setup.sh @@ -58,7 +58,7 @@ if command -v docker &> /dev/null; then kind load docker-image ${external_image_registry}/${namespace}/maestro:$image_tag --name maestro elif command -v podman &> /dev/null; then podman save ${external_image_registry}/${namespace}/maestro:$image_tag -o /tmp/maestro.tar - kind load image-archive /tmp/maestro.tar --name maestro + kind load image-archive /tmp/maestro.tar --name maestro rm /tmp/maestro.tar else echo "Neither Docker nor Podman is installed, exiting" @@ -120,6 +120,9 @@ kubectl patch service maestro -n $namespace -p '{"spec":{"type":"NodePort", "por # expose the maestro grpc server via nodeport kubectl patch service maestro-grpc -n $namespace -p '{"spec":{"type":"NodePort", "ports": [{"nodePort": 30090, "port": 8090, "targetPort": 8090}]}}' --type merge +# annotate maestro broker service for serving cert +kubectl -n $namespace annotate svc/maestro-grpc-broker service.alpha.openshift.io/serving-cert-secret-name=maestro-grpc-broker-tls + # 5. create a self-signed certificate for mqtt mqttCertDir=$(mktemp -d) step certificate create "maestro-mqtt-ca" ${mqttCertDir}/ca.crt ${mqttCertDir}/ca.key --profile root-ca --no-password --insecure @@ -253,10 +256,12 @@ echo $consumer_name > ./test/e2e/.consumer_name # 7. deploy maestro agent into maestro-agent namespace export agent_namespace=maestro-agent kubectl create namespace ${agent_namespace} || true +kubectl -n ${agent_namespace} create cm maestro-grpc-broker-ca +kubectl -n ${agent_namespace} annotate cm/maestro-grpc-broker-ca service.alpha.openshift.io/inject-cabundle=true make agent-template kubectl apply -n ${agent_namespace} --filename="templates/agent-template.json" | egrep --color=auto 'configured|$$' -# apply the maestro-mqtt secret +# apply the maestro-agent-mqtt secret cat << EOF | kubectl -n ${agent_namespace} apply -f - apiVersion: v1 kind: Secret @@ -273,9 +278,21 @@ stringData: agentEvents: sources/maestro/consumers/${consumer_name}/agentevents EOF +# apply the maestro-agent-grpc secret +cat << EOF | kubectl -n ${agent_namespace} apply -f - +apiVersion: v1 +kind: Secret +metadata: + name: maestro-agent-grpc +stringData: + config.yaml: | + url: maestro-grpc-broker.maestro.svc:8091 + caFile: /configmaps/grpc-broker-ca/service-ca.crt +EOF + # create secret containing the client certs to mqtt broker and patch the maestro-agent deployment kubectl create secret generic maestro-agent-certs -n ${agent_namespace} --from-file=ca.crt=${mqttCertDir}/ca.crt --from-file=client.crt=${mqttCertDir}/agent-client.crt --from-file=client.key=${mqttCertDir}/agent-client.key -kubectl patch deploy/maestro-agent -n ${agent_namespace} --type='json' -p='[{"op":"add","path":"/spec/template/spec/containers/0/command/-","value":"--appliedmanifestwork-eviction-grace-period=30s"},{"op":"add","path":"/spec/template/spec/volumes/-","value":{"name":"mqtt-certs","secret":{"secretName":"maestro-agent-certs"}}},{"op":"add","path":"/spec/template/spec/containers/0/volumeMounts/-","value":{"name":"mqtt-certs","mountPath":"/secrets/mqtt-certs"}}]' +kubectl patch deploy/maestro-agent -n ${agent_namespace} --type='json' -p='[{"op":"add","path":"/spec/template/spec/containers/0/command/-","value":"--appliedmanifestwork-eviction-grace-period=30s"},{"op":"add","path":"/spec/template/spec/volumes/-","value":{"name":"mqtt-certs","secret":{"secretName":"maestro-agent-certs"}}},{"op":"add","path":"/spec/template/spec/containers/0/volumeMounts/-","value":{"name":"mqtt-certs","mountPath":"/secrets/mqtt-certs"}},{"op":"add","path":"/spec/template/spec/volumes/-","value":{"name":"grpc-broker-ca","configMap":{"name":"maestro-grpc-broker-ca"}}},{"op":"add","path":"/spec/template/spec/containers/0/volumeMounts/-","value":{"name":"grpc-broker-ca","mountPath":"/configmaps/grpc-broker-ca"}}]' kubectl wait deploy/maestro-agent -n ${agent_namespace} --for condition=Available=True --timeout=200s # remove the certs diff --git a/test/helper.go b/test/helper.go index 331a79e4..4af5f48e 100755 --- a/test/helper.go +++ b/test/helper.go @@ -139,6 +139,8 @@ func NewHelper(t *testing.T) *Helper { // Set the healthcheck interval to 1 second for testing helper.Env().Config.HealthCheck.HeartbeartInterval = 1 helper.HealthCheckServer = server.NewHealthCheckServer() + // Disable TLS for testing + helper.Env().Config.GRPCServer.DisableTLS = true if helper.Broker != "grpc" { statusDispatcher := dispatcher.NewHashDispatcher( @@ -195,7 +197,6 @@ func (helper *Helper) Teardown() { func (helper *Helper) startAPIServer() { // TODO jwk mock server needs to be refactored out of the helper and into the testing environment helper.Env().Config.HTTPServer.JwkCertURL = jwkURL - helper.Env().Config.GRPCServer.DisableTLS = true helper.APIServer = server.NewAPIServer(helper.EventBroadcaster) go func() { klog.V(10).Info("Test API server started")