Skip to content

Commit

Permalink
Productionises distributed scheduler (#22)
Browse files Browse the repository at this point in the history
* Fixes warning “Running http and grpc server on single port. This is not recommended for production.”

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Suffixes data dirs with instance id

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds space quota parameter

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Sets default quota to 2GB

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds compaction parameters

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Updates helm charts

Signed-off-by: Elena Kolevska <elena@kolevska.com>

* Adds namespace to data dir name. Renames etcdID to just ID.

Signed-off-by: Elena Kolevska <elena@kolevska.com>

---------

Signed-off-by: Elena Kolevska <elena@kolevska.com>
  • Loading branch information
elena-kolevska authored May 2, 2024
1 parent 5fde696 commit 8ccef92
Show file tree
Hide file tree
Showing 9 changed files with 267 additions and 49 deletions.
22 changes: 22 additions & 0 deletions charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,25 @@ Create etcd client ports list dynamically based on replicaCount.
{{- $etcdClientPorts -}}
{{- end -}}

{{/*
Create etcd client http ports list dynamically based on replicaCount.
*/}}
{{- define "dapr_scheduler.etcdclienthttpports" -}}
{{- $etcdClientHttpPorts := "" -}}
{{- $namespace := .Release.Namespace -}}
{{- $replicaCount := int .Values.replicaCount -}}
{{- range $i, $e := until $replicaCount -}}
{{- $instanceName := printf "dapr-scheduler-server-%d" $i -}}
{{/*{{- $svcName := printf "%s.%s" $instanceName $namespace -}}*/}}
{{- $clientPort := int $.Values.ports.etcdHttpClientPort -}}
{{- $instancePortPair := printf "%s=%d" $instanceName $clientPort -}}
{{- if gt $i 0 -}}
{{- $etcdClientHttpPorts = printf "%s,%s" $etcdClientHttpPorts $instancePortPair -}}
{{- else -}}
{{- $etcdClientHttpPorts = $instancePortPair -}}
{{- end -}}
{{- end -}}
{{- $etcdClientHttpPorts -}}
{{- end -}}


Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ spec:
port: {{ .Values.ports.apiPort }}
- name: etcd-client
port: {{ .Values.ports.etcdRPCClientPort }}
- name: etcd-http-client
port: {{ .Values.ports.etcdHttpClientPort }}
- name: etcd-peer
port: {{ .Values.ports.etcdRPCPeerPort }}
clusterIP: None # make the service headless
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ spec:
name: api
- containerPort: {{ .Values.ports.etcdRPCClientPort }}
name: etcd-client
- containerPort: {{ .Values.ports.etcdHttpClientPort }}
name: etcd-http-client
- containerPort: {{ .Values.ports.etcdRPCPeerPort }}
name: etcd-peer
{{- if eq .Values.global.prometheus.enabled true }}
Expand Down Expand Up @@ -117,6 +119,8 @@ spec:
- {{ include "dapr_scheduler.initialcluster" . | toYaml | trim }}
- "--etcd-client-ports"
- {{ include "dapr_scheduler.etcdclientports" . | toYaml | trim }}
- "--etcd-client-http-ports"
- {{ include "dapr_scheduler.etcdclienthttpports" . | toYaml | trim }}
- "--log-level"
- {{ .Values.logLevel }}
{{- if eq .Values.global.logAsJson true }}
Expand All @@ -130,6 +134,9 @@ spec:
- "--enable-metrics=false"
{{- end }}
- "--etcd-data-dir={{ if eq .Values.global.daprControlPlaneOs "windows" }}{{ .Values.cluster.EtcdDataDirWinPath }}{{- else }}{{ .Values.cluster.EtcdDataDirPath }}"
- "--etcd-space-quota={{ .Values.etcdSpaceQuota }}"
- "--etcd-compaction-mode={{ .Values.etcdCompactionMode }}"
- "--etcd-compaction-retention={{ .Values.etcdCompactionRetention }}"
- "--tls-enabled"
- "--trust-domain={{ .Values.global.mtls.controlPlaneTrustDomain }}"
- "--trust-anchors-file=/var/run/secrets/dapr.io/tls/ca.crt"
Expand Down
5 changes: 5 additions & 0 deletions charts/dapr/charts/dapr_scheduler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ports:
protocol: TCP
apiPort: 50006
etcdRPCClientPort: 2379
etcdHttpClientPort: 2330
etcdRPCPeerPort: 2380

replicaCount: 1
Expand All @@ -31,6 +32,10 @@ cluster:
EtcdDataDirPath: /var/run/data/dapr-scheduler/etcd-data-dir
EtcdDataDirWinPath: C:\\etcd-data-dir

etcdSpaceQuota: 2147483648
etcdCompactionMode: periodic
etcdCompactionRetention: 24h

volumeclaims:
storageSize: 1Gi
storageClassName:
Expand Down
12 changes: 8 additions & 4 deletions cmd/scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ func Run() {
Port: opts.Port,
Security: secHandler,

DataDir: opts.EtcdDataDir,
EtcdID: opts.EtcdID,
EtcdInitialPeers: opts.EtcdInitialPeers,
EtcdClientPorts: opts.EtcdClientPorts,
DataDir: opts.EtcdDataDir,
Id: opts.Id,
EtcdInitialPeers: opts.EtcdInitialPeers,
EtcdClientPorts: opts.EtcdClientPorts,
EtcdClientHttpPorts: opts.EtcdClientHttpPorts,
EtcdSpaceQuota: opts.EtcdSpaceQuota,
EtcdCompactionMode: opts.EtcdCompactionMode,
EtcdCompactionRetention: opts.EtcdCompactionRetention,
})

return server.Run(ctx)
Expand Down
18 changes: 13 additions & 5 deletions cmd/scheduler/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ type Options struct {
PlacementAddress string
Mode string

EtcdID string
EtcdInitialPeers []string
EtcdDataDir string
EtcdClientPorts []string
Id string
EtcdInitialPeers []string
EtcdDataDir string
EtcdClientPorts []string
EtcdClientHttpPorts []string
EtcdSpaceQuota int64
EtcdCompactionMode string
EtcdCompactionRetention string

Logger logger.Options
Metrics *metrics.Options
Expand Down Expand Up @@ -79,10 +83,14 @@ func New(origArgs []string) *Options {
fs.StringVar(&opts.PlacementAddress, "placement-address", "", "Addresses for Dapr Actor Placement service")
fs.StringVar(&opts.Mode, "mode", string(modes.StandaloneMode), "Runtime mode for Dapr Scheduler")

fs.StringVar(&opts.EtcdID, "id", "dapr-scheduler-server-0", "Scheduler server ID")
fs.StringVar(&opts.Id, "id", "dapr-scheduler-server-0", "Scheduler server ID")
fs.StringSliceVar(&opts.EtcdInitialPeers, "initial-cluster", []string{"dapr-scheduler-server-0=http://localhost:2380"}, "Initial etcd cluster peers")
fs.StringVar(&opts.EtcdDataDir, "etcd-data-dir", "./data", "Directory to store scheduler etcd data")
fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client communication")
fs.StringSliceVar(&opts.EtcdClientHttpPorts, "etcd-client-http-ports", []string{""}, "Ports for etcd client http communication")
fs.Int64Var(&opts.EtcdSpaceQuota, "etcd-space-quota", 2*1024*1024*1024, "Space quota for etcd")
fs.StringVar(&opts.EtcdCompactionMode, "etcd-compaction-mode", "periodic", "Compaction mode for etcd. Can be 'periodic' or 'revision'")
fs.StringVar(&opts.EtcdCompactionRetention, "etcd-compaction-retention", "24h", "Compaction retention for etcd. Can express time or number of revisions, depending on the value of 'etcd-compaction-mode'")

opts.Logger = logger.DefaultOptions()
opts.Logger.AttachCmdFlags(fs.StringVar, fs.BoolVar)
Expand Down
28 changes: 22 additions & 6 deletions pkg/scheduler/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ import (
"go.etcd.io/etcd/server/v3/embed"

"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/security"
)

func (s *Server) conf() *embed.Config {
config := embed.NewConfig()

config.Name = s.etcdID
config.Dir = s.dataDir
config.Name = s.id
config.Dir = s.dataDir + "-" + security.CurrentNamespace() + "-" + s.id
config.InitialCluster = strings.Join(s.etcdInitialPeers, ",")
config.QuotaBackendBytes = s.etcdSpaceQuota
config.AutoCompactionMode = s.etcdCompactionMode
config.AutoCompactionRetention = s.etcdCompactionRetention

etcdURL, peerPort, err := peerHostAndPort(s.etcdID, s.etcdInitialPeers)
etcdURL, peerPort, err := peerHostAndPort(s.id, s.etcdInitialPeers)
if err != nil {
log.Warnf("Invalid format for initial cluster. Make sure to include 'http://' in Scheduler URL")
}
Expand All @@ -43,7 +47,7 @@ func (s *Server) conf() *embed.Config {

config.AdvertiseClientUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.etcdID]),
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.id]),
}}

switch s.mode {
Expand All @@ -56,17 +60,29 @@ func (s *Server) conf() *embed.Config {
}}
config.ListenClientUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientPorts[s.etcdID]),
Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientPorts[s.id]),
}}
if len(s.etcdClientHttpPorts) > 0 {
config.ListenClientHttpUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientHttpPorts[s.id]),
}}
}
default:
config.ListenPeerUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdURL, peerPort),
}}
config.ListenClientUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.etcdID]),
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.id]),
}}
if len(s.etcdClientHttpPorts) > 0 {
config.ListenClientHttpUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientHttpPorts[s.id]),
}}
}
}

config.LogLevel = "info" // Only supports debug, info, warn, error, panic, or fatal. Default 'info'.
Expand Down
88 changes: 54 additions & 34 deletions pkg/scheduler/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ type Options struct {
Port int
Security security.Handler

DataDir string
EtcdID string
EtcdInitialPeers []string
EtcdClientPorts []string
DataDir string
Id string
EtcdInitialPeers []string
EtcdClientPorts []string
EtcdClientHttpPorts []string
EtcdSpaceQuota int64
EtcdCompactionMode string
EtcdCompactionRetention string
}

// Server is the gRPC server for the Scheduler service.
Expand All @@ -66,14 +70,18 @@ type Server struct {
listenAddress string
mode modes.DaprMode

dataDir string
etcdID string
etcdInitialPeers []string
etcdClientPorts map[string]string
cron *etcdcron.Cron
readyCh chan struct{}
jobTriggerChan chan *schedulerv1pb.WatchJobsResponse // used to trigger the WatchJobs logic
jobWatcherWG sync.WaitGroup
dataDir string
id string
etcdInitialPeers []string
etcdClientPorts map[string]string
etcdClientHttpPorts map[string]string
etcdSpaceQuota int64
etcdCompactionMode string
etcdCompactionRetention string
cron *etcdcron.Cron
readyCh chan struct{}
jobTriggerChan chan *schedulerv1pb.WatchJobsResponse // used to trigger the WatchJobs logic
jobWatcherWG sync.WaitGroup

sidecarConnChan chan *internal.Connection
connectionPool *internal.Pool // Connection pool for sidecars
Expand All @@ -84,30 +92,25 @@ type Server struct {
}

func New(opts Options) *Server {
clientPorts := make(map[string]string)
for _, input := range opts.EtcdClientPorts {
idAndPort := strings.Split(input, "=")
if len(idAndPort) != 2 {
log.Warnf("Incorrect format for client ports: %s. Should contain <id>=<client-port>", input)
continue
}
schedulerID := strings.TrimSpace(idAndPort[0])
port := strings.TrimSpace(idAndPort[1])
clientPorts[schedulerID] = port
}
clientPorts := parseClientPorts(opts.EtcdClientPorts)
clientHttpPorts := parseClientPorts(opts.EtcdClientHttpPorts)

s := &Server{
port: opts.Port,
listenAddress: opts.ListenAddress,
mode: opts.Mode,

etcdID: opts.EtcdID,
etcdInitialPeers: opts.EtcdInitialPeers,
etcdClientPorts: clientPorts,
dataDir: opts.DataDir,
readyCh: make(chan struct{}),
jobTriggerChan: make(chan *schedulerv1pb.WatchJobsResponse),
jobWatcherWG: sync.WaitGroup{},
id: opts.Id,
etcdInitialPeers: opts.EtcdInitialPeers,
etcdClientPorts: clientPorts,
etcdClientHttpPorts: clientHttpPorts,
etcdSpaceQuota: opts.EtcdSpaceQuota,
etcdCompactionMode: opts.EtcdCompactionMode,
etcdCompactionRetention: opts.EtcdCompactionRetention,
dataDir: opts.DataDir,
readyCh: make(chan struct{}),
jobTriggerChan: make(chan *schedulerv1pb.WatchJobsResponse),
jobWatcherWG: sync.WaitGroup{},

sidecarConnChan: make(chan *internal.Connection),
connectionPool: &internal.Pool{
Expand Down Expand Up @@ -155,6 +158,23 @@ func New(opts Options) *Server {
return s
}

func parseClientPorts(opts []string) map[string]string {
ports := make(map[string]string)

for _, input := range opts {
idAndPort := strings.Split(input, "=")
if len(idAndPort) != 2 {
log.Warnf("Incorrect format for client http ports: %s. Should contain <id>=<client-port>", input)
continue
}
schedulerID := strings.TrimSpace(idAndPort[0])
port := strings.TrimSpace(idAndPort[1])
ports[schedulerID] = port
}

return ports
}

func (s *Server) Run(ctx context.Context) error {
log.Info("Dapr Scheduler is starting...")

Expand Down Expand Up @@ -314,10 +334,10 @@ func (s *Server) runJobWatcher(ctx context.Context) error {
s.handleJobStreaming(ctx)
}()

<-ctx.Done()
s.jobWatcherWG.Wait()
log.Info("JobWatcher exited")
return ctx.Err()
<-ctx.Done()
s.jobWatcherWG.Wait()
log.Info("JobWatcher exited")
return ctx.Err()
}

// handleJobStreaming handles the streaming of jobs to Dapr sidecars.
Expand Down
Loading

0 comments on commit 8ccef92

Please sign in to comment.