Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

enable grpc broker work with openshift route and customize certificate. #241

Merged
merged 1 commit into from
Jan 14, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions cmd/maestro/server/auth_interceptor.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,9 @@ func newAuthUnaryInterceptor(authNType string, authorizer grpcauthorizer.GRPCAut
klog.Errorf("unable to get user and groups from certificate: %v", err)
return nil, err
}
case "mock":
user = "mock"
groups = []string{"mock-group"}
default:
return nil, fmt.Errorf("unsupported authentication type %s", authNType)
}
Expand Down Expand Up @@ -165,6 +168,9 @@ func newAuthStreamInterceptor(authNType string, authorizer grpcauthorizer.GRPCAu
klog.Errorf("unable to get user and groups from certificate: %v", err)
return err
}
case "mock":
user = "mock"
groups = []string{"mock-group"}
default:
return fmt.Errorf("unsupported authentication Type %s", authNType)
}
Expand Down
57 changes: 52 additions & 5 deletions cmd/maestro/server/grpc_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,11 @@ package server

import (
"context"
"crypto/tls"
"crypto/x509"
"fmt"
"net"
"os"
"strconv"
"sync"
"time"
Expand All @@ -14,6 +17,7 @@ import (
"github.com/google/uuid"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/keepalive"
"google.golang.org/grpc/status"
"google.golang.org/protobuf/types/known/emptypb"
Expand Down Expand Up @@ -71,11 +75,55 @@ func NewGRPCBroker(eventBroadcaster *event.EventBroadcaster) EventServer {
grpcServerOptions = append(grpcServerOptions, grpc.ConnectionTimeout(config.ConnectionTimeout))
grpcServerOptions = append(grpcServerOptions, grpc.WriteBufferSize(config.WriteBufferSize))
grpcServerOptions = append(grpcServerOptions, grpc.ReadBufferSize(config.ReadBufferSize))
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: config.ClientMinPingInterval,
PermitWithoutStream: config.PermitPingWithoutStream,
}))
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: config.MaxConnectionAge,
Time: config.ServerPingInterval,
Timeout: config.ServerPingTimeout,
}))

klog.Infof("Serving gRPC broker without TLS at %s", config.BrokerBindPort)
if !config.DisableTLS {
// Check tls cert and key path path
if config.BrokerTLSCertFile == "" || config.BrokerTLSKeyFile == "" {
check(
fmt.Errorf("unspecified required --grpc-broker-tls-cert-file, --grpc-broker-tls-key-file"),
"Can't start gRPC broker",
)
}
// Serve with TLS
serverCerts, err := tls.LoadX509KeyPair(config.BrokerTLSCertFile, config.BrokerTLSKeyFile)
if err != nil {
check(fmt.Errorf("failed to load broker certificates: %v", err), "Can't start gRPC broker")
}
tlsConfig := &tls.Config{
Certificates: []tls.Certificate{serverCerts},
MinVersion: tls.VersionTLS13,
MaxVersion: tls.VersionTLS13,
}
if config.BrokerClientCAFile != "" {
certPool, err := x509.SystemCertPool()
if err != nil {
check(fmt.Errorf("failed to load system cert pool: %v", err), "Can't start gRPC broker")
}
caPEM, err := os.ReadFile(config.BrokerClientCAFile)
if err != nil {
check(fmt.Errorf("failed to read broker client CA file: %v", err), "Can't start gRPC broker")
}
if ok := certPool.AppendCertsFromPEM(caPEM); !ok {
check(fmt.Errorf("failed to append broker client CA to cert pool"), "Can't start gRPC broker")
}
tlsConfig.ClientCAs = certPool
tlsConfig.ClientAuth = tls.RequireAndVerifyClientCert
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use verify-full?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

}
grpcServerOptions = append(grpcServerOptions, grpc.Creds(credentials.NewTLS(tlsConfig)))
klog.Infof("Serving gRPC broker with TLS at %s", config.ServerBindPort)
} else {
klog.Infof("Serving gRPC broker without TLS at %s", config.ServerBindPort)
}

sessionFactory := env().Database.SessionFactory
return &GRPCBroker{
grpcServer: grpc.NewServer(grpcServerOptions...),
Expand Down Expand Up @@ -160,8 +208,7 @@ func (bkr *GRPCBroker) register(clusterName string, handler resourceHandler) (st
errChan: errChan,
}

klog.V(4).Infof("register a subscriber %s (cluster name = %s)", id, clusterName)

klog.V(4).Infof("registered a subscriber %s (cluster name = %s)", id, clusterName)
return id, errChan
}

Expand All @@ -170,9 +217,9 @@ func (bkr *GRPCBroker) unregister(id string) {
bkr.mu.Lock()
defer bkr.mu.Unlock()

klog.V(10).Infof("unregister subscriber %s", id)
close(bkr.subscribers[id].errChan)
delete(bkr.subscribers, id)
klog.V(4).Infof("unregistered subscriber %s", id)
}

// Subscribe in stub implementation for maestro agent subscribe resource spec from maestro server.
Expand Down Expand Up @@ -215,7 +262,7 @@ func (bkr *GRPCBroker) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv
case err := <-errChan:
// When reaching this point, an unrecoverable error occurred while sending the event,
// such as the connection being closed. Unregister the subscriber to trigger agent reconnection.
klog.Errorf("unregister subscriber %s, error= %v", subscriberID, err)
klog.Infof("unregistering subscriber %s because unrecoverable error= %v", subscriberID, err)
bkr.unregister(subscriberID)
return err
case <-subServer.Context().Done():
Expand Down
9 changes: 7 additions & 2 deletions cmd/maestro/server/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,14 @@ func NewGRPCServer(resourceService services.ResourceService, eventBroadcaster *e
grpcServerOptions = append(grpcServerOptions, grpc.ConnectionTimeout(config.ConnectionTimeout))
grpcServerOptions = append(grpcServerOptions, grpc.WriteBufferSize(config.WriteBufferSize))
grpcServerOptions = append(grpcServerOptions, grpc.ReadBufferSize(config.ReadBufferSize))
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveEnforcementPolicy(keepalive.EnforcementPolicy{
MinTime: config.ClientMinPingInterval,
PermitWithoutStream: config.PermitPingWithoutStream,
}))
grpcServerOptions = append(grpcServerOptions, grpc.KeepaliveParams(keepalive.ServerParameters{
MaxConnectionAge: config.MaxConnectionAge,
Time: config.ServerPingInterval,
Timeout: config.ServerPingTimeout,
}))

if !config.DisableTLS {
Expand Down Expand Up @@ -266,11 +272,10 @@ func (svr *GRPCServer) Subscribe(subReq *pbv1.SubscriptionRequest, subServer pbv

select {
case err := <-errChan:
klog.Errorf("unregister client %s, error= %v", clientID, err)
klog.Infof("unregistering client %s due to error= %v", clientID, err)
svr.eventBroadcaster.Unregister(clientID)
return err
case <-subServer.Context().Done():
klog.V(10).Infof("unregister client %s", clientID)
svr.eventBroadcaster.Unregister(clientID)
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ require (
k8s.io/klog/v2 v2.130.1
open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2
open-cluster-management.io/ocm v0.15.1-0.20250108154653-2397c4e91119
open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22
open-cluster-management.io/sdk-go v0.15.1-0.20250106052515-7c50bbf220a9
sigs.k8s.io/yaml v1.4.0
)

Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -893,8 +893,8 @@ open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2 h1:zkp3VJnv
open-cluster-management.io/api v0.15.1-0.20241210025410-0ba6809d0ae2/go.mod h1:9erZEWEn4bEqh0nIX2wA7f/s3KCuFycQdBrPrRzi0QM=
open-cluster-management.io/ocm v0.15.1-0.20250108154653-2397c4e91119 h1:Ftx7vxDumTB9d4+ZdcqYwQavTOVzgF5h6vAXJ/gh0IE=
open-cluster-management.io/ocm v0.15.1-0.20250108154653-2397c4e91119/go.mod h1:T9pfSm3EYHnysEP9JYfCojV2pI44IYMz3zaZNylulz8=
open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22 h1:w15NHc6cBfYxKHtF6zGLeQ1iTUqtN53sdONi9XXy5Xc=
open-cluster-management.io/sdk-go v0.15.1-0.20241224013925-71378a533f22/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA=
open-cluster-management.io/sdk-go v0.15.1-0.20250106052515-7c50bbf220a9 h1:yxkdser0gmaUryPRA33Fb3I+DDzXlPsDUNFIK5DVbPI=
open-cluster-management.io/sdk-go v0.15.1-0.20250106052515-7c50bbf220a9/go.mod h1:fi5WBsbC5K3txKb8eRLuP0Sim/Oqz/PHX18skAEyjiA=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3 h1:2770sDpzrjjsAtVhSeUFseziht227YAWYHLGNM8QPwY=
sigs.k8s.io/apiserver-network-proxy/konnectivity-client v0.30.3/go.mod h1:Ve9uj1L+deCXFrPOk1LpFXqTg7LCFzFso6PA48q/XZw=
sigs.k8s.io/controller-runtime v0.19.3 h1:XO2GvC9OPftRst6xWCpTgBZO04S2cbp0Qqkj8bX1sPw=
Expand Down
3 changes: 3 additions & 0 deletions pkg/client/cloudevents/source_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,9 @@ func (s *SourceClientImpl) ReconnectedChan() <-chan struct{} {
// with the agent's status calculation. The resource status is converted to
// manifestwork status based on resource type before calculating the hash.
func ResourceStatusHashGetter(res *api.Resource) (string, error) {
if len(res.Status) == 0 {
return fmt.Sprintf("%x", sha256.Sum256([]byte(""))), nil
}
evt, err := api.JSONMAPToCloudEvent(res.Status)
if err != nil {
return "", fmt.Errorf("failed to convert resource status to cloud event, %v", err)
Expand Down
46 changes: 30 additions & 16 deletions pkg/config/grpc_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,22 +8,29 @@ import (
)

type GRPCServerConfig struct {
EnableGRPCServer bool `json:"enable_grpc_server"`
DisableTLS bool `json:"disable_grpc_tls"`
TLSCertFile string `json:"grpc_tls_cert_file"`
TLSKeyFile string `json:"grpc_tls_key_file"`
GRPCAuthNType string `json:"grpc_authn_type"`
GRPCAuthorizerConfig string `json:"grpc_authorizer_config"`
ClientCAFile string `json:"grpc_client_ca_file"`
ServerBindPort string `json:"server_bind_port"`
BrokerBindPort string `json:"broker_bind_port"`
MaxConcurrentStreams uint32 `json:"max_concurrent_steams"`
MaxReceiveMessageSize int `json:"max_receive_message_size"`
MaxSendMessageSize int `json:"max_send_message_size"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
WriteBufferSize int `json:"write_buffer_size"`
ReadBufferSize int `json:"read_buffer_size"`
MaxConnectionAge time.Duration `json:"max_connection_age"`
EnableGRPCServer bool `json:"enable_grpc_server"`
DisableTLS bool `json:"disable_grpc_tls"`
TLSCertFile string `json:"grpc_tls_cert_file"`
TLSKeyFile string `json:"grpc_tls_key_file"`
BrokerTLSCertFile string `json:"grpc_broker_tls_cert_file"`
BrokerTLSKeyFile string `json:"grpc_broker_tls_key_file"`
GRPCAuthNType string `json:"grpc_authn_type"`
GRPCAuthorizerConfig string `json:"grpc_authorizer_config"`
ClientCAFile string `json:"grpc_client_ca_file"`
BrokerClientCAFile string `json:"grpc_broker_client_ca_file"`
ServerBindPort string `json:"server_bind_port"`
BrokerBindPort string `json:"broker_bind_port"`
MaxConcurrentStreams uint32 `json:"max_concurrent_steams"`
MaxReceiveMessageSize int `json:"max_receive_message_size"`
MaxSendMessageSize int `json:"max_send_message_size"`
ConnectionTimeout time.Duration `json:"connection_timeout"`
WriteBufferSize int `json:"write_buffer_size"`
ReadBufferSize int `json:"read_buffer_size"`
MaxConnectionAge time.Duration `json:"max_connection_age"`
ClientMinPingInterval time.Duration `json:"client_min_ping_interval"`
ServerPingInterval time.Duration `json:"server_ping_interval"`
ServerPingTimeout time.Duration `json:"server_ping_timeout"`
PermitPingWithoutStream bool `json:"permit_ping_without_stream"`
}

func NewGRPCServerConfig() *GRPCServerConfig {
Expand All @@ -39,12 +46,19 @@ func (s *GRPCServerConfig) AddFlags(fs *pflag.FlagSet) {
fs.IntVar(&s.MaxSendMessageSize, "grpc-max-send-message-size", math.MaxInt32, "gPRC max send message size")
fs.DurationVar(&s.ConnectionTimeout, "grpc-connection-timeout", 120*time.Second, "gPRC connection timeout")
fs.DurationVar(&s.MaxConnectionAge, "grpc-max-connection-age", time.Duration(math.MaxInt64), "A duration for the maximum amount of time connection may exist before closing")
fs.DurationVar(&s.ClientMinPingInterval, "grpc-client-min-ping-interval", 5*time.Second, "Server will terminate the connection if the client pings more than once within this duration")
fs.DurationVar(&s.ServerPingInterval, "grpc-server-ping-interval", 30*time.Second, "Duration after which the server pings the client if no activity is detected")
fs.DurationVar(&s.ServerPingTimeout, "grpc-server-ping-timeout", 10*time.Second, "Duration the client waits for a response after sending a keepalive ping")
fs.BoolVar(&s.PermitPingWithoutStream, "permit-ping-without-stream", false, "Allow keepalive pings even when there are no active streams")
fs.IntVar(&s.WriteBufferSize, "grpc-write-buffer-size", 32*1024, "gPRC write buffer size")
fs.IntVar(&s.ReadBufferSize, "grpc-read-buffer-size", 32*1024, "gPRC read buffer size")
fs.BoolVar(&s.DisableTLS, "disable-grpc-tls", false, "Disable TLS for gRPC server, default is false")
fs.StringVar(&s.TLSCertFile, "grpc-tls-cert-file", "", "The path to the tls.crt file")
fs.StringVar(&s.TLSKeyFile, "grpc-tls-key-file", "", "The path to the tls.key file")
fs.StringVar(&s.BrokerTLSCertFile, "grpc-broker-tls-cert-file", "", "The path to the broker tls.crt file")
fs.StringVar(&s.BrokerTLSKeyFile, "grpc-broker-tls-key-file", "", "The path to the broker tls.key file")
fs.StringVar(&s.GRPCAuthNType, "grpc-authn-type", "mock", "Specify the gRPC authentication type (e.g., mock, mtls or token)")
fs.StringVar(&s.GRPCAuthorizerConfig, "grpc-authorizer-config", "", "Path to the gRPC authorizer configuration file")
fs.StringVar(&s.ClientCAFile, "grpc-client-ca-file", "", "The path to the client ca file, must specify if using mtls authentication type")
fs.StringVar(&s.BrokerClientCAFile, "grpc-broker-client-ca-file", "", "The path to the broker client ca file")
}
4 changes: 2 additions & 2 deletions pkg/event/event.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,8 +51,7 @@ func (h *EventBroadcaster) Register(source string, handler resourceHandler) (str
errChan: errChan,
}

klog.V(4).Infof("register a broadcaster client %s (source=%s)", id, source)

klog.V(4).Infof("registered a broadcaster client %s (source=%s)", id, source)
return id, errChan
}

Expand All @@ -63,6 +62,7 @@ func (h *EventBroadcaster) Unregister(id string) {

close(h.clients[id].errChan)
delete(h.clients, id)
klog.V(4).Infof("unregistered broadcaster client %s", id)
}

// Broadcast broadcasts a resource status change event to all registered clients.
Expand Down
19 changes: 17 additions & 2 deletions templates/route-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -39,5 +39,20 @@ objects:
kind: Service
name: maestro-grpc
tls:
termination: reencrypt
insecureEdgeTerminationPolicy: Redirect
termination: passthrough
insecureEdgeTerminationPolicy: None

- apiVersion: route.openshift.io/v1
kind: Route
metadata:
name: maestro-grpc-broker
labels:
app: maestro-grpc-broker
spec:
host: maestro-grpc-broker.${EXTERNAL_APPS_DOMAIN}
to:
kind: Service
name: maestro-grpc-broker
tls:
termination: passthrough
insecureEdgeTerminationPolicy: None
17 changes: 15 additions & 2 deletions templates/service-template.yml
Original file line number Diff line number Diff line change
Expand Up @@ -265,6 +265,12 @@ objects:
- name: tls
secret:
secretName: maestro-tls
- name: grpc-server-tls
secret:
secretName: maestro-grpc-server-tls
- name: grpc-broker-tls
secret:
secretName: maestro-grpc-broker-tls
- name: service
secret:
secretName: maestro
Expand Down Expand Up @@ -305,6 +311,10 @@ objects:
volumeMounts:
- name: tls
mountPath: /secrets/tls
- name: grpc-server-tls
mountPath: /secrets/grpc-server-tls
- name: grpc-broker-tls
mountPath: /secrets/grpc-broker-tls
- name: service
mountPath: /secrets/service
- name: rds
Expand Down Expand Up @@ -342,8 +352,10 @@ objects:
- --https-key-file=/secrets/tls/tls.key
- --enable-grpc-server=${ENABLE_GRPC_SERVER}
- --disable-grpc-tls=${DISABLE_GRPC_TLS}
- --grpc-tls-cert-file=/secrets/tls/tls.crt
- --grpc-tls-key-file=/secrets/tls/tls.key
- --grpc-tls-cert-file=/secrets/grpc-server-tls/tls.crt
- --grpc-tls-key-file=/secrets/grpc-server-tls/tls.key
- --grpc-broker-tls-cert-file=/secrets/grpc-broker-tls/tls.crt
- --grpc-broker-tls-key-file=/secrets/grpc-broker-tls/tls.key
- --acl-file=/configs/authentication/acl.yml
- --jwk-cert-file=/configs/authentication/jwks.json
- --jwk-cert-url=${JWKS_URL}
Expand Down Expand Up @@ -442,6 +454,7 @@ objects:
port: grpc
annotations:
description: Exposes and load balances the maestro pods grpc endpoint
service.alpha.openshift.io/serving-cert-secret-name: maestro-grpc-server-tls
spec:
selector:
app: maestro
Expand Down
3 changes: 3 additions & 0 deletions test/e2e/pkg/suite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,9 @@ var _ = BeforeSuite(func() {
// initialize the grpc source options
grpcOptions = grpcoptions.NewGRPCOptions()
grpcOptions.URL = grpcServerAddress
grpcOptions.KeepAliveOptions.Enable = true
grpcOptions.KeepAliveOptions.Time = 6 * time.Second
grpcOptions.KeepAliveOptions.Timeout = 1 * time.Second
sourceID = "sourceclient-test" + rand.String(5)
grpcCertSrt, err := serverTestOpts.kubeClientSet.CoreV1().Secrets(serverTestOpts.serverNamespace).Get(ctx, "maestro-grpc-cert", metav1.GetOptions{})
if !errors.IsNotFound(err) {
Expand Down
Loading