From cf104b8f73d9a462c81686f722756fc9dcae170b Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Mon, 15 Apr 2024 14:52:58 +0100 Subject: [PATCH 1/7] =?UTF-8?q?Fixes=20warning=20=E2=80=9CRunning=20http?= =?UTF-8?q?=20and=20grpc=20server=20on=20single=20port.=20This=20is=20not?= =?UTF-8?q?=20recommended=20for=20production.=E2=80=9D?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Signed-off-by: Elena Kolevska --- cmd/scheduler/app/app.go | 23 ++--- cmd/scheduler/options/options.go | 10 ++- pkg/scheduler/server/config.go | 12 +++ pkg/scheduler/server/server.go | 73 ++++++++------- pkg/scheduler/server/server_test.go | 132 ++++++++++++++++++++++++++++ 5 files changed, 204 insertions(+), 46 deletions(-) create mode 100644 pkg/scheduler/server/server_test.go diff --git a/cmd/scheduler/app/app.go b/cmd/scheduler/app/app.go index 03de44a7651..6618340cd7d 100644 --- a/cmd/scheduler/app/app.go +++ b/cmd/scheduler/app/app.go @@ -83,17 +83,18 @@ func Run() { } server := server.New(server.Options{ - AppID: appID, - HostAddress: hostAddress, - ListenAddress: opts.ListenAddress, - DataDir: opts.EtcdDataDir, - EtcdID: opts.EtcdID, - EtcdInitialPeers: opts.EtcdInitialPeers, - EtcdClientPorts: opts.EtcdClientPorts, - Port: opts.Port, - Security: secHandler, - PlacementAddress: opts.PlacementAddress, - Mode: modes.DaprMode(opts.Mode), + AppID: appID, + HostAddress: hostAddress, + ListenAddress: opts.ListenAddress, + DataDir: opts.EtcdDataDir, + EtcdID: opts.EtcdID, + EtcdInitialPeers: opts.EtcdInitialPeers, + EtcdClientPorts: opts.EtcdClientPorts, + EtcdClientHttpPorts: opts.EtcdClientHttpPorts, + Port: opts.Port, + Security: secHandler, + PlacementAddress: opts.PlacementAddress, + Mode: modes.DaprMode(opts.Mode), }) return server.Run(ctx) diff --git a/cmd/scheduler/options/options.go b/cmd/scheduler/options/options.go index 2e69d27c7fb..d3c75a9bdf3 100644 --- a/cmd/scheduler/options/options.go +++ b/cmd/scheduler/options/options.go @@ -37,10 +37,11 @@ type Options struct { PlacementAddress string Mode string - EtcdID string - EtcdInitialPeers []string - EtcdDataDir string - EtcdClientPorts []string + EtcdID string + EtcdInitialPeers []string + EtcdDataDir string + EtcdClientPorts []string + EtcdClientHttpPorts []string Logger logger.Options Metrics *metrics.Options @@ -83,6 +84,7 @@ func New(origArgs []string) *Options { fs.StringSliceVar(&opts.EtcdInitialPeers, "initial-cluster", []string{"dapr-scheduler-server-0=http://localhost:2380"}, "Initial etcd cluster peers") fs.StringVar(&opts.EtcdDataDir, "etcd-data-dir", "./data", "Directory to store scheduler etcd data") fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client communication") + fs.StringSliceVar(&opts.EtcdClientHttpPorts, "etcd-client-http-ports", []string{""}, "Ports for etcd client http communication") opts.Logger = logger.DefaultOptions() opts.Logger.AttachCmdFlags(fs.StringVar, fs.BoolVar) diff --git a/pkg/scheduler/server/config.go b/pkg/scheduler/server/config.go index a5ff636d15c..28fc55c8990 100644 --- a/pkg/scheduler/server/config.go +++ b/pkg/scheduler/server/config.go @@ -58,6 +58,12 @@ func (s *Server) conf() *embed.Config { Scheme: "http", Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientPorts[s.etcdID]), }} + if len(s.etcdClientHttpPorts) > 0 { + config.ListenClientHttpUrls = []url.URL{{ + Scheme: "http", + Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientHttpPorts[s.etcdID]), + }} + } default: config.ListenPeerUrls = []url.URL{{ Scheme: "http", @@ -67,6 +73,12 @@ func (s *Server) conf() *embed.Config { Scheme: "http", Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.etcdID]), }} + if len(s.etcdClientHttpPorts) > 0 { + config.ListenClientHttpUrls = []url.URL{{ + Scheme: "http", + Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientHttpPorts[s.etcdID]), + }} + } } config.LogLevel = "info" // Only supports debug, info, warn, error, panic, or fatal. Default 'info'. diff --git a/pkg/scheduler/server/server.go b/pkg/scheduler/server/server.go index bd5454f69f0..d3566c73627 100644 --- a/pkg/scheduler/server/server.go +++ b/pkg/scheduler/server/server.go @@ -44,15 +44,16 @@ import ( var log = logger.NewLogger("dapr.scheduler.server") type Options struct { - AppID string - HostAddress string - ListenAddress string - DataDir string - EtcdID string - EtcdInitialPeers []string - EtcdClientPorts []string - Mode modes.DaprMode - Port int + AppID string + HostAddress string + ListenAddress string + DataDir string + EtcdID string + EtcdInitialPeers []string + EtcdClientPorts []string + EtcdClientHttpPorts []string + Mode modes.DaprMode + Port int Security security.Handler @@ -66,40 +67,33 @@ type Server struct { listenAddress string mode modes.DaprMode - dataDir string - etcdID string - etcdInitialPeers []string - etcdClientPorts map[string]string - cron *etcdcron.Cron - readyCh chan struct{} + dataDir string + etcdID string + etcdInitialPeers []string + etcdClientPorts map[string]string + etcdClientHttpPorts map[string]string + cron *etcdcron.Cron + readyCh chan struct{} grpcManager *manager.Manager actorRuntime actors.ActorRuntime } func New(opts Options) *Server { - clientPorts := make(map[string]string) - for _, input := range opts.EtcdClientPorts { - idAndPort := strings.Split(input, "=") - if len(idAndPort) != 2 { - log.Warnf("Incorrect format for client ports: %s. Should contain =", input) - continue - } - schedulerID := strings.TrimSpace(idAndPort[0]) - port := strings.TrimSpace(idAndPort[1]) - clientPorts[schedulerID] = port - } + clientPorts := parseClientPorts(opts.EtcdClientPorts) + clientHttpPorts := parseClientPorts(opts.EtcdClientHttpPorts) s := &Server{ port: opts.Port, listenAddress: opts.ListenAddress, mode: opts.Mode, - etcdID: opts.EtcdID, - etcdInitialPeers: opts.EtcdInitialPeers, - etcdClientPorts: clientPorts, - dataDir: opts.DataDir, - readyCh: make(chan struct{}), + etcdID: opts.EtcdID, + etcdInitialPeers: opts.EtcdInitialPeers, + etcdClientPorts: clientPorts, + etcdClientHttpPorts: clientHttpPorts, + dataDir: opts.DataDir, + readyCh: make(chan struct{}), } s.srv = grpc.NewServer(opts.Security.GRPCServerOptionMTLS()) @@ -141,6 +135,23 @@ func New(opts Options) *Server { return s } +func parseClientPorts(opts []string) map[string]string { + ports := make(map[string]string) + + for _, input := range opts { + idAndPort := strings.Split(input, "=") + if len(idAndPort) != 2 { + log.Warnf("Incorrect format for client http ports: %s. Should contain =", input) + continue + } + schedulerID := strings.TrimSpace(idAndPort[0]) + port := strings.TrimSpace(idAndPort[1]) + ports[schedulerID] = port + } + + return ports +} + func (s *Server) Run(ctx context.Context) error { log.Info("Dapr Scheduler is starting...") diff --git a/pkg/scheduler/server/server_test.go b/pkg/scheduler/server/server_test.go new file mode 100644 index 00000000000..f3be89ceb88 --- /dev/null +++ b/pkg/scheduler/server/server_test.go @@ -0,0 +1,132 @@ +package server + +import ( + "net/url" + "testing" + + "github.com/stretchr/testify/assert" + + "github.com/dapr/dapr/pkg/modes" +) + +func TestParseClientPorts(t *testing.T) { + t.Run("parses client ports", func(t *testing.T) { + ports := []string{ + "scheduler0=5000", + "scheduler1=5001", + "scheduler2=5002", + } + + clientPorts := parseClientPorts(ports) + assert.Equal(t, 3, len(clientPorts)) + assert.Equal(t, "5000", clientPorts["scheduler0"]) + assert.Equal(t, "5001", clientPorts["scheduler1"]) + assert.Equal(t, "5002", clientPorts["scheduler2"]) + }) + + t.Run("parses client ports with invalid format", func(t *testing.T) { + ports := []string{ + "scheduler0=5000", + "scheduler1=5001", + "scheduler2", + } + + clientPorts := parseClientPorts(ports) + assert.Equal(t, 2, len(clientPorts)) + assert.Equal(t, "5000", clientPorts["scheduler0"]) + assert.Equal(t, "5001", clientPorts["scheduler1"]) + }) + + t.Run("trims whitespace", func(t *testing.T) { + ports := []string{ + " scheduler0=5000 ", + "scheduler1 = 5001", + } + + clientPorts := parseClientPorts(ports) + assert.Equal(t, 2, len(clientPorts)) + assert.Equal(t, "5000", clientPorts["scheduler0"]) + assert.Equal(t, "5001", clientPorts["scheduler1"]) + }) +} + +func TestServerConf(t *testing.T) { + t.Run("KubernetesMode", func(t *testing.T) { + s := &Server{ + mode: modes.KubernetesMode, + etcdID: "id2", + etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, + etcdClientHttpPorts: map[string]string{"id1": "5003", "id2": "5004"}, + etcdInitialPeers: []string{"id1=http://localhost:5001", "id2=http://localhost:5002"}, + } + + config := s.conf() + + assert.Equal(t, "id1=http://localhost:5001,id2=http://localhost:5002", config.InitialCluster) + + clientUrl := url.URL{ + Scheme: "http", + Host: "0.0.0.0:5002", + } + + assert.Equal(t, clientUrl, config.ListenPeerUrls[0]) + assert.Equal(t, clientUrl, config.ListenClientUrls[0]) + + clientHttpUrl := url.URL{ + Scheme: "http", + Host: "0.0.0.0:5004", + } + assert.Equal(t, clientHttpUrl, config.ListenClientHttpUrls[0]) + }) + + t.Run("DefaultMode with client http ports", func(t *testing.T) { + s := &Server{ + mode: modes.StandaloneMode, + etcdID: "id2", + etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, + etcdClientHttpPorts: map[string]string{"id1": "5003", "id2": "5004"}, + etcdInitialPeers: []string{"id1=http://localhost:5001", "id2=http://localhost:5002"}, + } + + config := s.conf() + + assert.Equal(t, "id1=http://localhost:5001,id2=http://localhost:5002", config.InitialCluster) + + clientUrl := url.URL{ + Scheme: "http", + Host: "localhost:5002", + } + + assert.Equal(t, clientUrl, config.ListenPeerUrls[0]) + assert.Equal(t, clientUrl, config.ListenClientUrls[0]) + + clientHttpUrl := url.URL{ + Scheme: "http", + Host: "localhost:5004", + } + assert.Equal(t, clientHttpUrl, config.ListenClientHttpUrls[0]) + }) + + t.Run("DefaultMode without client http ports", func(t *testing.T) { + s := &Server{ + mode: modes.StandaloneMode, + etcdID: "id2", + etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, + etcdInitialPeers: []string{"id1=http://localhost:5001", "id2=http://localhost:5002"}, + } + + config := s.conf() + + assert.Equal(t, "id1=http://localhost:5001,id2=http://localhost:5002", config.InitialCluster) + + clientUrl := url.URL{ + Scheme: "http", + Host: "localhost:5002", + } + + assert.Equal(t, clientUrl, config.ListenPeerUrls[0]) + assert.Equal(t, clientUrl, config.ListenClientUrls[0]) + + assert.Empty(t, config.ListenClientHttpUrls) + }) +} From 4281ba2e22a086894408d2de716b7d250f23ea4f Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Mon, 15 Apr 2024 15:43:23 +0100 Subject: [PATCH 2/7] Suffixes data dirs with instance id Signed-off-by: Elena Kolevska --- pkg/scheduler/server/config.go | 2 +- pkg/scheduler/server/server_test.go | 4 +++- 2 files changed, 4 insertions(+), 2 deletions(-) diff --git a/pkg/scheduler/server/config.go b/pkg/scheduler/server/config.go index 28fc55c8990..f182faf3a9b 100644 --- a/pkg/scheduler/server/config.go +++ b/pkg/scheduler/server/config.go @@ -28,7 +28,7 @@ func (s *Server) conf() *embed.Config { config := embed.NewConfig() config.Name = s.etcdID - config.Dir = s.dataDir + config.Dir = s.dataDir + "-" + s.etcdID config.InitialCluster = strings.Join(s.etcdInitialPeers, ",") etcdURL, peerPort, err := peerHostAndPort(s.etcdID, s.etcdInitialPeers) diff --git a/pkg/scheduler/server/server_test.go b/pkg/scheduler/server/server_test.go index f3be89ceb88..d3cdf552e10 100644 --- a/pkg/scheduler/server/server_test.go +++ b/pkg/scheduler/server/server_test.go @@ -79,10 +79,11 @@ func TestServerConf(t *testing.T) { assert.Equal(t, clientHttpUrl, config.ListenClientHttpUrls[0]) }) - t.Run("DefaultMode with client http ports", func(t *testing.T) { + t.Run("DefaultMode", func(t *testing.T) { s := &Server{ mode: modes.StandaloneMode, etcdID: "id2", + dataDir: "./data", etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, etcdClientHttpPorts: map[string]string{"id1": "5003", "id2": "5004"}, etcdInitialPeers: []string{"id1=http://localhost:5001", "id2=http://localhost:5002"}, @@ -91,6 +92,7 @@ func TestServerConf(t *testing.T) { config := s.conf() assert.Equal(t, "id1=http://localhost:5001,id2=http://localhost:5002", config.InitialCluster) + assert.Equal(t, "./data-id2", config.Dir) clientUrl := url.URL{ Scheme: "http", From e958d8b39c771850da10c6f020e19d6fad5e31e0 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Fri, 26 Apr 2024 15:35:46 +0100 Subject: [PATCH 3/7] Adds space quota parameter Signed-off-by: Elena Kolevska --- cmd/scheduler/app/app.go | 1 + cmd/scheduler/options/options.go | 2 ++ pkg/scheduler/server/config.go | 1 + pkg/scheduler/server/server.go | 3 +++ 4 files changed, 7 insertions(+) diff --git a/cmd/scheduler/app/app.go b/cmd/scheduler/app/app.go index 6618340cd7d..96ecccdcccf 100644 --- a/cmd/scheduler/app/app.go +++ b/cmd/scheduler/app/app.go @@ -91,6 +91,7 @@ func Run() { EtcdInitialPeers: opts.EtcdInitialPeers, EtcdClientPorts: opts.EtcdClientPorts, EtcdClientHttpPorts: opts.EtcdClientHttpPorts, + EtcdSpaceQuota: opts.EtcdSpaceQuota, Port: opts.Port, Security: secHandler, PlacementAddress: opts.PlacementAddress, diff --git a/cmd/scheduler/options/options.go b/cmd/scheduler/options/options.go index d3c75a9bdf3..16225481823 100644 --- a/cmd/scheduler/options/options.go +++ b/cmd/scheduler/options/options.go @@ -42,6 +42,7 @@ type Options struct { EtcdDataDir string EtcdClientPorts []string EtcdClientHttpPorts []string + EtcdSpaceQuota int64 Logger logger.Options Metrics *metrics.Options @@ -85,6 +86,7 @@ func New(origArgs []string) *Options { fs.StringVar(&opts.EtcdDataDir, "etcd-data-dir", "./data", "Directory to store scheduler etcd data") fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client communication") fs.StringSliceVar(&opts.EtcdClientHttpPorts, "etcd-client-http-ports", []string{""}, "Ports for etcd client http communication") + fs.Int64Var(&opts.EtcdSpaceQuota, "etcd-space-quota", 2*1024*1024, "Space quota for etcd") opts.Logger = logger.DefaultOptions() opts.Logger.AttachCmdFlags(fs.StringVar, fs.BoolVar) diff --git a/pkg/scheduler/server/config.go b/pkg/scheduler/server/config.go index f182faf3a9b..bc54638bac5 100644 --- a/pkg/scheduler/server/config.go +++ b/pkg/scheduler/server/config.go @@ -30,6 +30,7 @@ func (s *Server) conf() *embed.Config { config.Name = s.etcdID config.Dir = s.dataDir + "-" + s.etcdID config.InitialCluster = strings.Join(s.etcdInitialPeers, ",") + config.QuotaBackendBytes = s.etcdSpaceQuota etcdURL, peerPort, err := peerHostAndPort(s.etcdID, s.etcdInitialPeers) if err != nil { diff --git a/pkg/scheduler/server/server.go b/pkg/scheduler/server/server.go index d3566c73627..9a75ddf8bfd 100644 --- a/pkg/scheduler/server/server.go +++ b/pkg/scheduler/server/server.go @@ -52,6 +52,7 @@ type Options struct { EtcdInitialPeers []string EtcdClientPorts []string EtcdClientHttpPorts []string + EtcdSpaceQuota int64 Mode modes.DaprMode Port int @@ -72,6 +73,7 @@ type Server struct { etcdInitialPeers []string etcdClientPorts map[string]string etcdClientHttpPorts map[string]string + etcdSpaceQuota int64 cron *etcdcron.Cron readyCh chan struct{} @@ -92,6 +94,7 @@ func New(opts Options) *Server { etcdInitialPeers: opts.EtcdInitialPeers, etcdClientPorts: clientPorts, etcdClientHttpPorts: clientHttpPorts, + etcdSpaceQuota: opts.EtcdSpaceQuota, dataDir: opts.DataDir, readyCh: make(chan struct{}), } From cdba733afe7a25aa508ce0d713b0c0b287d70a11 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Fri, 26 Apr 2024 16:57:10 +0100 Subject: [PATCH 4/7] Sets default quota to 2GB Signed-off-by: Elena Kolevska --- cmd/scheduler/options/options.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/cmd/scheduler/options/options.go b/cmd/scheduler/options/options.go index 16225481823..ad62bf7f546 100644 --- a/cmd/scheduler/options/options.go +++ b/cmd/scheduler/options/options.go @@ -86,7 +86,7 @@ func New(origArgs []string) *Options { fs.StringVar(&opts.EtcdDataDir, "etcd-data-dir", "./data", "Directory to store scheduler etcd data") fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client communication") fs.StringSliceVar(&opts.EtcdClientHttpPorts, "etcd-client-http-ports", []string{""}, "Ports for etcd client http communication") - fs.Int64Var(&opts.EtcdSpaceQuota, "etcd-space-quota", 2*1024*1024, "Space quota for etcd") + fs.Int64Var(&opts.EtcdSpaceQuota, "etcd-space-quota", 2*1024*1024*1024, "Space quota for etcd") opts.Logger = logger.DefaultOptions() opts.Logger.AttachCmdFlags(fs.StringVar, fs.BoolVar) From 6915aaa25a46cfd5a7f6f72e94cc29f55adc51d7 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Fri, 26 Apr 2024 17:15:29 +0100 Subject: [PATCH 5/7] Adds compaction parameters Signed-off-by: Elena Kolevska --- cmd/scheduler/app/app.go | 28 ++++++++------- cmd/scheduler/options/options.go | 16 +++++---- pkg/scheduler/server/config.go | 2 ++ pkg/scheduler/server/server.go | 58 ++++++++++++++++++-------------- 4 files changed, 59 insertions(+), 45 deletions(-) diff --git a/cmd/scheduler/app/app.go b/cmd/scheduler/app/app.go index 96ecccdcccf..516fec40ffb 100644 --- a/cmd/scheduler/app/app.go +++ b/cmd/scheduler/app/app.go @@ -83,19 +83,21 @@ func Run() { } server := server.New(server.Options{ - AppID: appID, - HostAddress: hostAddress, - ListenAddress: opts.ListenAddress, - DataDir: opts.EtcdDataDir, - EtcdID: opts.EtcdID, - EtcdInitialPeers: opts.EtcdInitialPeers, - EtcdClientPorts: opts.EtcdClientPorts, - EtcdClientHttpPorts: opts.EtcdClientHttpPorts, - EtcdSpaceQuota: opts.EtcdSpaceQuota, - Port: opts.Port, - Security: secHandler, - PlacementAddress: opts.PlacementAddress, - Mode: modes.DaprMode(opts.Mode), + AppID: appID, + HostAddress: hostAddress, + ListenAddress: opts.ListenAddress, + DataDir: opts.EtcdDataDir, + EtcdID: opts.EtcdID, + EtcdInitialPeers: opts.EtcdInitialPeers, + EtcdClientPorts: opts.EtcdClientPorts, + EtcdClientHttpPorts: opts.EtcdClientHttpPorts, + EtcdSpaceQuota: opts.EtcdSpaceQuota, + EtcdCompactionMode: opts.EtcdCompactionMode, + EtcdCompactionRetention: opts.EtcdCompactionRetention, + Port: opts.Port, + Security: secHandler, + PlacementAddress: opts.PlacementAddress, + Mode: modes.DaprMode(opts.Mode), }) return server.Run(ctx) diff --git a/cmd/scheduler/options/options.go b/cmd/scheduler/options/options.go index ad62bf7f546..d9ab37530ec 100644 --- a/cmd/scheduler/options/options.go +++ b/cmd/scheduler/options/options.go @@ -37,12 +37,14 @@ type Options struct { PlacementAddress string Mode string - EtcdID string - EtcdInitialPeers []string - EtcdDataDir string - EtcdClientPorts []string - EtcdClientHttpPorts []string - EtcdSpaceQuota int64 + EtcdID string + EtcdInitialPeers []string + EtcdDataDir string + EtcdClientPorts []string + EtcdClientHttpPorts []string + EtcdSpaceQuota int64 + EtcdCompactionMode string + EtcdCompactionRetention string Logger logger.Options Metrics *metrics.Options @@ -87,6 +89,8 @@ func New(origArgs []string) *Options { fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client communication") fs.StringSliceVar(&opts.EtcdClientHttpPorts, "etcd-client-http-ports", []string{""}, "Ports for etcd client http communication") fs.Int64Var(&opts.EtcdSpaceQuota, "etcd-space-quota", 2*1024*1024*1024, "Space quota for etcd") + fs.StringVar(&opts.EtcdCompactionMode, "etcd-compaction-mode", "periodic", "Compaction mode for etcd. Can be 'periodic' or 'revision'") + fs.StringVar(&opts.EtcdCompactionRetention, "etcd-compaction-retention", "24h", "Compaction retention for etcd. Can express time or number of revisions, depending on the value of 'etcd-compaction-mode'") opts.Logger = logger.DefaultOptions() opts.Logger.AttachCmdFlags(fs.StringVar, fs.BoolVar) diff --git a/pkg/scheduler/server/config.go b/pkg/scheduler/server/config.go index bc54638bac5..e97f4ed3eb3 100644 --- a/pkg/scheduler/server/config.go +++ b/pkg/scheduler/server/config.go @@ -31,6 +31,8 @@ func (s *Server) conf() *embed.Config { config.Dir = s.dataDir + "-" + s.etcdID config.InitialCluster = strings.Join(s.etcdInitialPeers, ",") config.QuotaBackendBytes = s.etcdSpaceQuota + config.AutoCompactionMode = s.etcdCompactionMode + config.AutoCompactionRetention = s.etcdCompactionRetention etcdURL, peerPort, err := peerHostAndPort(s.etcdID, s.etcdInitialPeers) if err != nil { diff --git a/pkg/scheduler/server/server.go b/pkg/scheduler/server/server.go index 9a75ddf8bfd..a9837da0b9d 100644 --- a/pkg/scheduler/server/server.go +++ b/pkg/scheduler/server/server.go @@ -44,17 +44,19 @@ import ( var log = logger.NewLogger("dapr.scheduler.server") type Options struct { - AppID string - HostAddress string - ListenAddress string - DataDir string - EtcdID string - EtcdInitialPeers []string - EtcdClientPorts []string - EtcdClientHttpPorts []string - EtcdSpaceQuota int64 - Mode modes.DaprMode - Port int + AppID string + HostAddress string + ListenAddress string + DataDir string + EtcdID string + EtcdInitialPeers []string + EtcdClientPorts []string + EtcdClientHttpPorts []string + EtcdSpaceQuota int64 + EtcdCompactionMode string + EtcdCompactionRetention string + Mode modes.DaprMode + Port int Security security.Handler @@ -68,14 +70,16 @@ type Server struct { listenAddress string mode modes.DaprMode - dataDir string - etcdID string - etcdInitialPeers []string - etcdClientPorts map[string]string - etcdClientHttpPorts map[string]string - etcdSpaceQuota int64 - cron *etcdcron.Cron - readyCh chan struct{} + dataDir string + etcdID string + etcdInitialPeers []string + etcdClientPorts map[string]string + etcdClientHttpPorts map[string]string + etcdSpaceQuota int64 + etcdCompactionMode string + etcdCompactionRetention string + cron *etcdcron.Cron + readyCh chan struct{} grpcManager *manager.Manager actorRuntime actors.ActorRuntime @@ -90,13 +94,15 @@ func New(opts Options) *Server { listenAddress: opts.ListenAddress, mode: opts.Mode, - etcdID: opts.EtcdID, - etcdInitialPeers: opts.EtcdInitialPeers, - etcdClientPorts: clientPorts, - etcdClientHttpPorts: clientHttpPorts, - etcdSpaceQuota: opts.EtcdSpaceQuota, - dataDir: opts.DataDir, - readyCh: make(chan struct{}), + etcdID: opts.EtcdID, + etcdInitialPeers: opts.EtcdInitialPeers, + etcdClientPorts: clientPorts, + etcdClientHttpPorts: clientHttpPorts, + etcdSpaceQuota: opts.EtcdSpaceQuota, + etcdCompactionMode: opts.EtcdCompactionMode, + etcdCompactionRetention: opts.EtcdCompactionRetention, + dataDir: opts.DataDir, + readyCh: make(chan struct{}), } s.srv = grpc.NewServer(opts.Security.GRPCServerOptionMTLS()) From 7c26a784b5e65a29b2d9850a30fcd552791761a8 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Sat, 27 Apr 2024 16:42:37 +0100 Subject: [PATCH 6/7] Updates helm charts Signed-off-by: Elena Kolevska --- .../dapr_scheduler/templates/_helpers.tpl | 22 +++++++++++++++++++ .../templates/dapr_scheduler_service.yaml | 2 ++ .../templates/dapr_scheduler_statefulset.yaml | 7 ++++++ charts/dapr/charts/dapr_scheduler/values.yaml | 5 +++++ 4 files changed, 36 insertions(+) diff --git a/charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl b/charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl index 4e858a7450d..85bb94ac731 100644 --- a/charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl +++ b/charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl @@ -63,3 +63,25 @@ Create etcd client ports list dynamically based on replicaCount. {{- $etcdClientPorts -}} {{- end -}} +{{/* +Create etcd client http ports list dynamically based on replicaCount. +*/}} +{{- define "dapr_scheduler.etcdclienthttpports" -}} +{{- $etcdClientHttpPorts := "" -}} +{{- $namespace := .Release.Namespace -}} +{{- $replicaCount := int .Values.replicaCount -}} +{{- range $i, $e := until $replicaCount -}} +{{- $instanceName := printf "dapr-scheduler-server-%d" $i -}} +{{/*{{- $svcName := printf "%s.%s" $instanceName $namespace -}}*/}} +{{- $clientPort := int $.Values.ports.etcdHttpClientPort -}} +{{- $instancePortPair := printf "%s=%d" $instanceName $clientPort -}} +{{- if gt $i 0 -}} +{{- $etcdClientHttpPorts = printf "%s,%s" $etcdClientHttpPorts $instancePortPair -}} +{{- else -}} +{{- $etcdClientHttpPorts = $instancePortPair -}} +{{- end -}} +{{- end -}} +{{- $etcdClientHttpPorts -}} +{{- end -}} + + diff --git a/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_service.yaml b/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_service.yaml index c748e690e35..e6c5730afec 100644 --- a/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_service.yaml +++ b/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_service.yaml @@ -23,6 +23,8 @@ spec: port: {{ .Values.ports.apiPort }} - name: etcd-client port: {{ .Values.ports.etcdRPCClientPort }} + - name: etcd-http-client + port: {{ .Values.ports.etcdHttpClientPort }} - name: etcd-peer port: {{ .Values.ports.etcdRPCPeerPort }} clusterIP: None # make the service headless diff --git a/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_statefulset.yaml b/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_statefulset.yaml index b7fe3b342c0..5ff7dabdeeb 100644 --- a/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_statefulset.yaml +++ b/charts/dapr/charts/dapr_scheduler/templates/dapr_scheduler_statefulset.yaml @@ -88,6 +88,8 @@ spec: name: api - containerPort: {{ .Values.ports.etcdRPCClientPort }} name: etcd-client + - containerPort: {{ .Values.ports.etcdHttpClientPort }} + name: etcd-http-client - containerPort: {{ .Values.ports.etcdRPCPeerPort }} name: etcd-peer {{- if eq .Values.global.prometheus.enabled true }} @@ -117,6 +119,8 @@ spec: - {{ include "dapr_scheduler.initialcluster" . | toYaml | trim }} - "--etcd-client-ports" - {{ include "dapr_scheduler.etcdclientports" . | toYaml | trim }} + - "--etcd-client-http-ports" + - {{ include "dapr_scheduler.etcdclienthttpports" . | toYaml | trim }} - "--log-level" - {{ .Values.logLevel }} {{- if eq .Values.global.logAsJson true }} @@ -130,6 +134,9 @@ spec: - "--enable-metrics=false" {{- end }} - "--etcd-data-dir={{ if eq .Values.global.daprControlPlaneOs "windows" }}{{ .Values.cluster.EtcdDataDirWinPath }}{{- else }}{{ .Values.cluster.EtcdDataDirPath }}" + - "--etcd-space-quota={{ .Values.etcdSpaceQuota }}" + - "--etcd-compaction-mode={{ .Values.etcdCompactionMode }}" + - "--etcd-compaction-retention={{ .Values.etcdCompactionRetention }}" - "--tls-enabled" - "--trust-domain={{ .Values.global.mtls.controlPlaneTrustDomain }}" - "--trust-anchors-file=/var/run/secrets/dapr.io/tls/ca.crt" diff --git a/charts/dapr/charts/dapr_scheduler/values.yaml b/charts/dapr/charts/dapr_scheduler/values.yaml index 36c2b5683d8..0241f2a51f7 100644 --- a/charts/dapr/charts/dapr_scheduler/values.yaml +++ b/charts/dapr/charts/dapr_scheduler/values.yaml @@ -23,6 +23,7 @@ ports: protocol: TCP apiPort: 50006 etcdRPCClientPort: 2379 + etcdHttpClientPort: 2330 etcdRPCPeerPort: 2380 replicaCount: 1 @@ -31,6 +32,10 @@ cluster: EtcdDataDirPath: /var/run/data/dapr-scheduler/etcd-data-dir EtcdDataDirWinPath: C:\\etcd-data-dir +etcdSpaceQuota: 2147483648 +etcdCompactionMode: periodic +etcdCompactionRetention: 24h + volumeclaims: storageSize: 1Gi storageClassName: From abcfd819783a1adaf630c719cd616e35a1766415 Mon Sep 17 00:00:00 2001 From: Elena Kolevska Date: Mon, 29 Apr 2024 18:35:48 +0100 Subject: [PATCH 7/7] Adds namespace to data dir name. Renames etcdID to just ID. Signed-off-by: Elena Kolevska --- cmd/scheduler/app/app.go | 2 +- cmd/scheduler/options/options.go | 4 ++-- pkg/scheduler/server/config.go | 17 +++++++++-------- pkg/scheduler/server/server.go | 6 +++--- pkg/scheduler/server/server_test.go | 8 ++++---- 5 files changed, 19 insertions(+), 18 deletions(-) diff --git a/cmd/scheduler/app/app.go b/cmd/scheduler/app/app.go index f9369cfea89..6d3b50be246 100644 --- a/cmd/scheduler/app/app.go +++ b/cmd/scheduler/app/app.go @@ -92,7 +92,7 @@ func Run() { Security: secHandler, DataDir: opts.EtcdDataDir, - EtcdID: opts.EtcdID, + Id: opts.Id, EtcdInitialPeers: opts.EtcdInitialPeers, EtcdClientPorts: opts.EtcdClientPorts, EtcdClientHttpPorts: opts.EtcdClientHttpPorts, diff --git a/cmd/scheduler/options/options.go b/cmd/scheduler/options/options.go index d9ab37530ec..b38f805ea00 100644 --- a/cmd/scheduler/options/options.go +++ b/cmd/scheduler/options/options.go @@ -37,7 +37,7 @@ type Options struct { PlacementAddress string Mode string - EtcdID string + Id string EtcdInitialPeers []string EtcdDataDir string EtcdClientPorts []string @@ -83,7 +83,7 @@ func New(origArgs []string) *Options { fs.StringVar(&opts.PlacementAddress, "placement-address", "", "Addresses for Dapr Actor Placement service") fs.StringVar(&opts.Mode, "mode", string(modes.StandaloneMode), "Runtime mode for Dapr Scheduler") - fs.StringVar(&opts.EtcdID, "id", "dapr-scheduler-server-0", "Scheduler server ID") + fs.StringVar(&opts.Id, "id", "dapr-scheduler-server-0", "Scheduler server ID") fs.StringSliceVar(&opts.EtcdInitialPeers, "initial-cluster", []string{"dapr-scheduler-server-0=http://localhost:2380"}, "Initial etcd cluster peers") fs.StringVar(&opts.EtcdDataDir, "etcd-data-dir", "./data", "Directory to store scheduler etcd data") fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client communication") diff --git a/pkg/scheduler/server/config.go b/pkg/scheduler/server/config.go index e97f4ed3eb3..3ce1255c847 100644 --- a/pkg/scheduler/server/config.go +++ b/pkg/scheduler/server/config.go @@ -22,19 +22,20 @@ import ( "go.etcd.io/etcd/server/v3/embed" "github.com/dapr/dapr/pkg/modes" + "github.com/dapr/dapr/pkg/security" ) func (s *Server) conf() *embed.Config { config := embed.NewConfig() - config.Name = s.etcdID - config.Dir = s.dataDir + "-" + s.etcdID + config.Name = s.id + config.Dir = s.dataDir + "-" + security.CurrentNamespace() + "-" + s.id config.InitialCluster = strings.Join(s.etcdInitialPeers, ",") config.QuotaBackendBytes = s.etcdSpaceQuota config.AutoCompactionMode = s.etcdCompactionMode config.AutoCompactionRetention = s.etcdCompactionRetention - etcdURL, peerPort, err := peerHostAndPort(s.etcdID, s.etcdInitialPeers) + etcdURL, peerPort, err := peerHostAndPort(s.id, s.etcdInitialPeers) if err != nil { log.Warnf("Invalid format for initial cluster. Make sure to include 'http://' in Scheduler URL") } @@ -46,7 +47,7 @@ func (s *Server) conf() *embed.Config { config.AdvertiseClientUrls = []url.URL{{ Scheme: "http", - Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.etcdID]), + Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.id]), }} switch s.mode { @@ -59,12 +60,12 @@ func (s *Server) conf() *embed.Config { }} config.ListenClientUrls = []url.URL{{ Scheme: "http", - Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientPorts[s.etcdID]), + Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientPorts[s.id]), }} if len(s.etcdClientHttpPorts) > 0 { config.ListenClientHttpUrls = []url.URL{{ Scheme: "http", - Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientHttpPorts[s.etcdID]), + Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientHttpPorts[s.id]), }} } default: @@ -74,12 +75,12 @@ func (s *Server) conf() *embed.Config { }} config.ListenClientUrls = []url.URL{{ Scheme: "http", - Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.etcdID]), + Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.id]), }} if len(s.etcdClientHttpPorts) > 0 { config.ListenClientHttpUrls = []url.URL{{ Scheme: "http", - Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientHttpPorts[s.etcdID]), + Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientHttpPorts[s.id]), }} } } diff --git a/pkg/scheduler/server/server.go b/pkg/scheduler/server/server.go index 76347a1980d..4fa445f562f 100644 --- a/pkg/scheduler/server/server.go +++ b/pkg/scheduler/server/server.go @@ -54,7 +54,7 @@ type Options struct { Security security.Handler DataDir string - EtcdID string + Id string EtcdInitialPeers []string EtcdClientPorts []string EtcdClientHttpPorts []string @@ -71,7 +71,7 @@ type Server struct { mode modes.DaprMode dataDir string - etcdID string + id string etcdInitialPeers []string etcdClientPorts map[string]string etcdClientHttpPorts map[string]string @@ -100,7 +100,7 @@ func New(opts Options) *Server { listenAddress: opts.ListenAddress, mode: opts.Mode, - etcdID: opts.EtcdID, + id: opts.Id, etcdInitialPeers: opts.EtcdInitialPeers, etcdClientPorts: clientPorts, etcdClientHttpPorts: clientHttpPorts, diff --git a/pkg/scheduler/server/server_test.go b/pkg/scheduler/server/server_test.go index d3cdf552e10..688bb165310 100644 --- a/pkg/scheduler/server/server_test.go +++ b/pkg/scheduler/server/server_test.go @@ -54,7 +54,7 @@ func TestServerConf(t *testing.T) { t.Run("KubernetesMode", func(t *testing.T) { s := &Server{ mode: modes.KubernetesMode, - etcdID: "id2", + id: "id2", etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, etcdClientHttpPorts: map[string]string{"id1": "5003", "id2": "5004"}, etcdInitialPeers: []string{"id1=http://localhost:5001", "id2=http://localhost:5002"}, @@ -82,7 +82,7 @@ func TestServerConf(t *testing.T) { t.Run("DefaultMode", func(t *testing.T) { s := &Server{ mode: modes.StandaloneMode, - etcdID: "id2", + id: "id2", dataDir: "./data", etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, etcdClientHttpPorts: map[string]string{"id1": "5003", "id2": "5004"}, @@ -92,7 +92,7 @@ func TestServerConf(t *testing.T) { config := s.conf() assert.Equal(t, "id1=http://localhost:5001,id2=http://localhost:5002", config.InitialCluster) - assert.Equal(t, "./data-id2", config.Dir) + assert.Equal(t, "./data-default-id2", config.Dir) clientUrl := url.URL{ Scheme: "http", @@ -112,7 +112,7 @@ func TestServerConf(t *testing.T) { t.Run("DefaultMode without client http ports", func(t *testing.T) { s := &Server{ mode: modes.StandaloneMode, - etcdID: "id2", + id: "id2", etcdClientPorts: map[string]string{"id1": "5001", "id2": "5002"}, etcdInitialPeers: []string{"id1=http://localhost:5001", "id2=http://localhost:5002"}, }