diff --git a/charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl b/charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl index 4e858a7450d..85bb94ac731 100644 --- a/charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl +++ b/charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl @@ -63,3 +63,25 @@ Create etcd client ports list dynamically based on replicaCount. {{- $etcdClientPorts -}} {{- end -}} +{{/* +Create etcd client http ports list dynamically based on replicaCount. +*/}} +{{- define "dapr_scheduler.etcdclienthttpports" -}} +{{- $etcdClientHttpPorts := "" -}} +{{- $namespace := .Release.Namespace -}} +{{- $replicaCount := int .Values.replicaCount -}} +{{- range $i, $e := until $replicaCount -}} +{{- $instanceName := printf "dapr-scheduler-server-%d" $i -}} +{{/*{{- $svcName := printf "%s.%s" $instanceName $namespace -}}*/}} +{{- $clientPort := int $.Values.ports.etcdHttpClientPort -}} +{{- $instancePortPair := printf "%s=%d" $instanceName $clientPort -}} +{{- if gt $i 0 -}} +{{- $etcdClientHttpPorts = printf "%s,%s" $etcdClientHttpPorts $instancePortPair -}} +{{- else -}} +{{- $etcdClientHttpPorts = $instancePortPair -}} +{{- end -}} +{{- end -}} +{{- $etcdClientHttpPorts -}} +{{- end -}} + + diff --git a/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_service.yaml b/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_service.yaml index c748e690e35..e6c5730afec 100644 --- a/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_service.yaml +++ b/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_service.yaml @@ -23,6 +23,8 @@ spec: port: {{ .Values.ports.apiPort }} - name: etcd-client port: {{ .Values.ports.etcdRPCClientPort }} + - name: etcd-http-client + port: {{ .Values.ports.etcdHttpClientPort }} - name: etcd-peer port: {{ .Values.ports.etcdRPCPeerPort }} clusterIP: None # make the service headless diff --git a/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_statefulset.yaml b/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_statefulset.yaml index b7fe3b342c0..5ff7dabdeeb 100644 --- a/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_statefulset.yaml +++ b/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_statefulset.yaml @@ -88,6 +88,8 @@ spec: name: api - containerPort: {{ .Values.ports.etcdRPCClientPort }} name: etcd-client + - containerPort: {{ .Values.ports.etcdHttpClientPort }} + name: etcd-http-client - containerPort: {{ .Values.ports.etcdRPCPeerPort }} name: etcd-peer {{- if eq .Values.global.prometheus.enabled true }} @@ -117,6 +119,8 @@ spec: - {{ include "dapr_scheduler.initialcluster" . | toYaml | trim }} - "--etcd-client-ports" - {{ include "dapr_scheduler.etcdclientports" . | toYaml | trim }} + - "--etcd-client-http-ports" + - {{ include "dapr_scheduler.etcdclienthttpports" . | toYaml | trim }} - "--log-level" - {{ .Values.logLevel }} {{- if eq .Values.global.logAsJson true }} @@ -130,6 +134,9 @@ spec: - "--enable-metrics=false" {{- end }} - "--etcd-data-dir={{ if eq .Values.global.daprControlPlaneOs "windows" }}{{ .Values.cluster.EtcdDataDirWinPath }}{{- else }}{{ .Values.cluster.EtcdDataDirPath }}" + - "--etcd-space-quota={{ .Values.etcdSpaceQuota }}" + - "--etcd-compaction-mode={{ .Values.etcdCompactionMode }}" + - "--etcd-compaction-retention={{ .Values.etcdCompactionRetention }}" - "--tls-enabled" - "--trust-domain={{ .Values.global.mtls.controlPlaneTrustDomain }}" - "--trust-anchors-file=/var/run/secrets/dapr.io/tls/ca.crt" diff --git a/charts/dapr/charts/dapr_scheduler/values.yaml b/charts/dapr/charts/dapr_scheduler/values.yaml index 36c2b5683d8..0241f2a51f7 100644 --- a/charts/dapr/charts/dapr_scheduler/values.yaml +++ b/charts/dapr/charts/dapr_scheduler/values.yaml @@ -23,6 +23,7 @@ ports: protocol: TCP apiPort: 50006 etcdRPCClientPort: 2379 + etcdHttpClientPort: 2330 etcdRPCPeerPort: 2380 replicaCount: 1 @@ -31,6 +32,10 @@ cluster: EtcdDataDirPath: /var/run/data/dapr-scheduler/etcd-data-dir EtcdDataDirWinPath: C:\\etcd-data-dir +etcdSpaceQuota: 2147483648 +etcdCompactionMode: periodic +etcdCompactionRetention: 24h + volumeclaims: storageSize: 1Gi storageClassName: diff --git a/cmd/scheduler/app/app.go b/cmd/scheduler/app/app.go index e54c98209a9..6d3b50be246 100644 --- a/cmd/scheduler/app/app.go +++ b/cmd/scheduler/app/app.go @@ -91,10 +91,14 @@ func Run() { Port: opts.Port, Security: secHandler, - DataDir: opts.EtcdDataDir, - EtcdID: opts.EtcdID, - EtcdInitialPeers: opts.EtcdInitialPeers, - EtcdClientPorts: opts.EtcdClientPorts, + DataDir: opts.EtcdDataDir, + Id: opts.Id, + EtcdInitialPeers: opts.EtcdInitialPeers, + EtcdClientPorts: opts.EtcdClientPorts, + EtcdClientHttpPorts: opts.EtcdClientHttpPorts, + EtcdSpaceQuota: opts.EtcdSpaceQuota, + EtcdCompactionMode: opts.EtcdCompactionMode, + EtcdCompactionRetention: opts.EtcdCompactionRetention, }) return server.Run(ctx) diff --git a/cmd/scheduler/options/options.go b/cmd/scheduler/options/options.go index 2e69d27c7fb..b38f805ea00 100644 --- a/cmd/scheduler/options/options.go +++ b/cmd/scheduler/options/options.go @@ -37,10 +37,14 @@ type Options struct { PlacementAddress string Mode string - EtcdID string - EtcdInitialPeers []string - EtcdDataDir string - EtcdClientPorts []string + Id string + EtcdInitialPeers []string + EtcdDataDir string + EtcdClientPorts []string + EtcdClientHttpPorts []string + EtcdSpaceQuota int64 + EtcdCompactionMode string + EtcdCompactionRetention string Logger logger.Options Metrics *metrics.Options @@ -79,10 +83,14 @@ func New(origArgs []string) *Options { fs.StringVar(&opts.PlacementAddress, "placement-address", "", "Addresses for Dapr Actor Placement service") fs.StringVar(&opts.Mode, "mode", string(modes.StandaloneMode), "Runtime mode for Dapr Scheduler") - fs.StringVar(&opts.EtcdID, "id", "dapr-scheduler-server-0", "Scheduler server ID") + fs.StringVar(&opts.Id, "id", "dapr-scheduler-server-0", "Scheduler server ID") fs.StringSliceVar(&opts.EtcdInitialPeers, "initial-cluster", []string{"dapr-scheduler-server-0=http://localhost:2380"}, "Initial etcd cluster peers") fs.StringVar(&opts.EtcdDataDir, "etcd-data-dir", "./data", "Directory to store scheduler etcd data") fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client communication") + fs.StringSliceVar(&opts.EtcdClientHttpPorts, "etcd-client-http-ports", []string{""}, "Ports for etcd client http communication") + fs.Int64Var(&opts.EtcdSpaceQuota, "etcd-space-quota", 2*1024*1024*1024, "Space quota for etcd") + fs.StringVar(&opts.EtcdCompactionMode, "etcd-compaction-mode", "periodic", "Compaction mode for etcd. Can be 'periodic' or 'revision'") + fs.StringVar(&opts.EtcdCompactionRetention, "etcd-compaction-retention", "24h", "Compaction retention for etcd. Can express time or number of revisions, depending on the value of 'etcd-compaction-mode'") opts.Logger = logger.DefaultOptions() opts.Logger.AttachCmdFlags(fs.StringVar, fs.BoolVar) diff --git a/pkg/scheduler/server/config.go b/pkg/scheduler/server/config.go index a5ff636d15c..3ce1255c847 100644 --- a/pkg/scheduler/server/config.go +++ b/pkg/scheduler/server/config.go @@ -22,16 +22,20 @@ import ( "go.etcd.io/etcd/server/v3/embed" "github.com/dapr/dapr/pkg/modes" + "github.com/dapr/dapr/pkg/security" ) func (s *Server) conf() *embed.Config { config := embed.NewConfig() - config.Name = s.etcdID - config.Dir = s.dataDir + config.Name = s.id + config.Dir = s.dataDir + "-" + security.CurrentNamespace() + "-" + s.id config.InitialCluster = strings.Join(s.etcdInitialPeers, ",") + config.QuotaBackendBytes = s.etcdSpaceQuota + config.AutoCompactionMode = s.etcdCompactionMode + config.AutoCompactionRetention = s.etcdCompactionRetention - etcdURL, peerPort, err := peerHostAndPort(s.etcdID, s.etcdInitialPeers) + etcdURL, peerPort, err := peerHostAndPort(s.id, s.etcdInitialPeers) if err != nil { log.Warnf("Invalid format for initial cluster. Make sure to include 'http://' in Scheduler URL") } @@ -43,7 +47,7 @@ func (s *Server) conf() *embed.Config { config.AdvertiseClientUrls = []url.URL{{ Scheme: "http", - Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.etcdID]), + Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.id]), }} switch s.mode { @@ -56,8 +60,14 @@ func (s *Server) conf() *embed.Config { }} config.ListenClientUrls = []url.URL{{ Scheme: "http", - Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientPorts[s.etcdID]), + Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientPorts[s.id]), }} + if len(s.etcdClientHttpPorts) > 0 { + config.ListenClientHttpUrls = []url.URL{{ + Scheme: "http", + Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientHttpPorts[s.id]), + }} + } default: config.ListenPeerUrls = []url.URL{{ Scheme: "http", @@ -65,8 +75,14 @@ func (s *Server) conf() *embed.Config { }} config.ListenClientUrls = []url.URL{{ Scheme: "http", - Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.etcdID]), + Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.id]), }} + if len(s.etcdClientHttpPorts) > 0 { + config.ListenClientHttpUrls = []url.URL{{ + Scheme: "http", + Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientHttpPorts[s.id]), + }} + } } config.LogLevel = "info" // Only supports debug, info, warn, error, panic, or fatal. Default 'info'. diff --git a/pkg/scheduler/server/server.go b/pkg/scheduler/server/server.go index 4befff4c0a3..4fa445f562f 100644 --- a/pkg/scheduler/server/server.go +++ b/pkg/scheduler/server/server.go @@ -53,10 +53,14 @@ type Options struct { Port int Security security.Handler - DataDir string - EtcdID string - EtcdInitialPeers []string - EtcdClientPorts []string + DataDir string + Id string + EtcdInitialPeers []string + EtcdClientPorts []string + EtcdClientHttpPorts []string + EtcdSpaceQuota int64 + EtcdCompactionMode string + EtcdCompactionRetention string } // Server is the gRPC server for the Scheduler service. @@ -66,14 +70,18 @@ type Server struct { listenAddress string mode modes.DaprMode - dataDir string - etcdID string - etcdInitialPeers []string - etcdClientPorts map[string]string - cron *etcdcron.Cron - readyCh chan struct{} - jobTriggerChan chan *schedulerv1pb.WatchJobsResponse // used to trigger the WatchJobs logic - jobWatcherWG sync.WaitGroup + dataDir string + id string + etcdInitialPeers []string + etcdClientPorts map[string]string + etcdClientHttpPorts map[string]string + etcdSpaceQuota int64 + etcdCompactionMode string + etcdCompactionRetention string + cron *etcdcron.Cron + readyCh chan struct{} + jobTriggerChan chan *schedulerv1pb.WatchJobsResponse // used to trigger the WatchJobs logic + jobWatcherWG sync.WaitGroup sidecarConnChan chan *internal.Connection connectionPool *internal.Pool // Connection pool for sidecars @@ -84,30 +92,25 @@ type Server struct { } func New(opts Options) *Server { - clientPorts := make(map[string]string) - for _, input := range opts.EtcdClientPorts { - idAndPort := strings.Split(input, "=") - if len(idAndPort) != 2 { - log.Warnf("Incorrect format for client ports: %s. Should contain =", input) - continue - } - schedulerID := strings.TrimSpace(idAndPort[0]) - port := strings.TrimSpace(idAndPort[1]) - clientPorts[schedulerID] = port - } + clientPorts := parseClientPorts(opts.EtcdClientPorts) + clientHttpPorts := parseClientPorts(opts.EtcdClientHttpPorts) s := &Server{ port: opts.Port, listenAddress: opts.ListenAddress, mode: opts.Mode, - etcdID: opts.EtcdID, - etcdInitialPeers: opts.EtcdInitialPeers, - etcdClientPorts: clientPorts, - dataDir: opts.DataDir, - readyCh: make(chan struct{}), - jobTriggerChan: make(chan *schedulerv1pb.WatchJobsResponse), - jobWatcherWG: sync.WaitGroup{}, + id: opts.Id, + etcdInitialPeers: opts.EtcdInitialPeers, + etcdClientPorts: clientPorts, + etcdClientHttpPorts: clientHttpPorts, + etcdSpaceQuota: opts.EtcdSpaceQuota, + etcdCompactionMode: opts.EtcdCompactionMode, + etcdCompactionRetention: opts.EtcdCompactionRetention, + dataDir: opts.DataDir, + readyCh: make(chan struct{}), + jobTriggerChan: make(chan *schedulerv1pb.WatchJobsResponse), + jobWatcherWG: sync.WaitGroup{}, sidecarConnChan: make(chan *internal.Connection), connectionPool: &internal.Pool{ @@ -155,6 +158,23 @@ func New(opts Options) *Server { return s } +func parseClientPorts(opts []string) map[string]string { + ports := make(map[string]string) + + for _, input := range opts { + idAndPort := strings.Split(input, "=") + if len(idAndPort) != 2 { + log.Warnf("Incorrect format for client http ports: %s. Should contain =", input) + continue + } + schedulerID := strings.TrimSpace(idAndPort[0]) + port := strings.TrimSpace(idAndPort[1]) + ports[schedulerID] = port + } + + return ports +} + func (s *Server) Run(ctx context.Context) error { log.Info("Dapr Scheduler is starting...") @@ -314,10 +334,10 @@ func (s *Server) runJobWatcher(ctx context.Context) error { s.handleJobStreaming(ctx) }() - <-ctx.Done() - s.jobWatcherWG.Wait() - log.Info("JobWatcher exited") - return ctx.Err() + <-ctx.Done() + s.jobWatcherWG.Wait() + log.Info("JobWatcher exited") + return ctx.Err() } // handleJobStreaming handles the streaming of jobs to Dapr sidecars. diff --git a/pkg/scheduler/server/server_test.go b/pkg/scheduler/server/server_test.go new file mode 100644 index 00000000000..688bb165310 --- /dev/null +++ b/pkg/scheduler/server/server_test.go @@ -0,0 +1,134 @@ +package server + +import ( + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/dapr/pkg/modes" +) + +func TestParseClientPorts(t *testing.T) { + t.Run("parses client ports", func(t *testing.T) { + ports := []string{ + "scheduler0=5000", + "scheduler1=5001", + "scheduler2=5002", + } + + clientPorts := parseClientPorts(ports) + assert.Equal(t, 3, len(clientPorts)) + assert.Equal(t, "5000", clientPorts["scheduler0"]) + assert.Equal(t, "5001", clientPorts["scheduler1"]) + assert.Equal(t, "5002", clientPorts["scheduler2"]) + }) + + t.Run("parses client ports with invalid format", func(t *testing.T) { + ports := []string{ + "scheduler0=5000", + "scheduler1=5001", + "scheduler2", + } + + clientPorts := parseClientPorts(ports) + assert.Equal(t, 2, len(clientPorts)) + assert.Equal(t, "5000", clientPorts["scheduler0"]) + assert.Equal(t, "5001", clientPorts["scheduler1"]) + }) + + t.Run("trims whitespace", func(t *testing.T) { + ports := []string{ + " scheduler0=5000 ", + "scheduler1 = 5001", + } + + clientPorts := parseClientPorts(ports) + assert.Equal(t, 2, len(clientPorts)) + assert.Equal(t, "5000", clientPorts["scheduler0"]) + assert.Equal(t, "5001", clientPorts["scheduler1"]) + }) +} + +func TestServerConf(t *testing.T) { + t.Run("KubernetesMode", func(t *testing.T) { + s := &Server{ + mode: modes.KubernetesMode, + id: "id2", + etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, + etcdClientHttpPorts: map[string]string{"id1": "5003", "id2": "5004"}, + etcdInitialPeers: []string{"id1=http://localhost:5001", "id2=http://localhost:5002"}, + } + + config := s.conf() + + assert.Equal(t, "id1=http://localhost:5001,id2=http://localhost:5002", config.InitialCluster) + + clientUrl := url.URL{ + Scheme: "http", + Host: "0.0.0.0:5002", + } + + assert.Equal(t, clientUrl, config.ListenPeerUrls[0]) + assert.Equal(t, clientUrl, config.ListenClientUrls[0]) + + clientHttpUrl := url.URL{ + Scheme: "http", + Host: "0.0.0.0:5004", + } + assert.Equal(t, clientHttpUrl, config.ListenClientHttpUrls[0]) + }) + + t.Run("DefaultMode", func(t *testing.T) { + s := &Server{ + mode: modes.StandaloneMode, + id: "id2", + dataDir: "./data", + etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, + etcdClientHttpPorts: map[string]string{"id1": "5003", "id2": "5004"}, + etcdInitialPeers: []string{"id1=http://localhost:5001", "id2=http://localhost:5002"}, + } + + config := s.conf() + + assert.Equal(t, "id1=http://localhost:5001,id2=http://localhost:5002", config.InitialCluster) + assert.Equal(t, "./data-default-id2", config.Dir) + + clientUrl := url.URL{ + Scheme: "http", + Host: "localhost:5002", + } + + assert.Equal(t, clientUrl, config.ListenPeerUrls[0]) + assert.Equal(t, clientUrl, config.ListenClientUrls[0]) + + clientHttpUrl := url.URL{ + Scheme: "http", + Host: "localhost:5004", + } + assert.Equal(t, clientHttpUrl, config.ListenClientHttpUrls[0]) + }) + + t.Run("DefaultMode without client http ports", func(t *testing.T) { + s := &Server{ + mode: modes.StandaloneMode, + id: "id2", + etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, + etcdInitialPeers: []string{"id1=http://localhost:5001", "id2=http://localhost:5002"}, + } + + config := s.conf() + + assert.Equal(t, "id1=http://localhost:5001,id2=http://localhost:5002", config.InitialCluster) + + clientUrl := url.URL{ + Scheme: "http", + Host: "localhost:5002", + } + + assert.Equal(t, clientUrl, config.ListenPeerUrls[0]) + assert.Equal(t, clientUrl, config.ListenClientUrls[0]) + + assert.Empty(t, config.ListenClientHttpUrls) + }) +}