Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Productionises distributed scheduler #22

Merged
22 changes: 22 additions & 0 deletions charts/dapr/charts/dapr_scheduler/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -63,3 +63,25 @@ Create etcd client ports list dynamically based on replicaCount.
{{- $etcdClientPorts -}}
{{- end -}}

{{/*
Create etcd client http ports list dynamically based on replicaCount.
*/}}
{{- define "dapr_scheduler.etcdclienthttpports" -}}
{{- $etcdClientHttpPorts := "" -}}
{{- $namespace := .Release.Namespace -}}
{{- $replicaCount := int .Values.replicaCount -}}
{{- range $i, $e := until $replicaCount -}}
{{- $instanceName := printf "dapr-scheduler-server-%d" $i -}}
{{/*{{- $svcName := printf "%s.%s" $instanceName $namespace -}}*/}}
{{- $clientPort := int $.Values.ports.etcdHttpClientPort -}}
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This looks good since its just changing the port var from the logic above I've thoroughly tested. I'm not sure if there is a way to pass in a var in the helm template helper logic.

{{- $instancePortPair := printf "%s=%d" $instanceName $clientPort -}}
{{- if gt $i 0 -}}
{{- $etcdClientHttpPorts = printf "%s,%s" $etcdClientHttpPorts $instancePortPair -}}
{{- else -}}
{{- $etcdClientHttpPorts = $instancePortPair -}}
{{- end -}}
{{- end -}}
{{- $etcdClientHttpPorts -}}
{{- end -}}


Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ spec:
port: {{ .Values.ports.apiPort }}
- name: etcd-client
port: {{ .Values.ports.etcdRPCClientPort }}
- name: etcd-http-client
port: {{ .Values.ports.etcdHttpClientPort }}
- name: etcd-peer
port: {{ .Values.ports.etcdRPCPeerPort }}
clusterIP: None # make the service headless
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,8 @@ spec:
name: api
- containerPort: {{ .Values.ports.etcdRPCClientPort }}
name: etcd-client
- containerPort: {{ .Values.ports.etcdHttpClientPort }}
name: etcd-http-client
- containerPort: {{ .Values.ports.etcdRPCPeerPort }}
name: etcd-peer
{{- if eq .Values.global.prometheus.enabled true }}
Expand Down Expand Up @@ -117,6 +119,8 @@ spec:
- {{ include "dapr_scheduler.initialcluster" . | toYaml | trim }}
- "--etcd-client-ports"
- {{ include "dapr_scheduler.etcdclientports" . | toYaml | trim }}
- "--etcd-client-http-ports"
- {{ include "dapr_scheduler.etcdclienthttpports" . | toYaml | trim }}
- "--log-level"
- {{ .Values.logLevel }}
{{- if eq .Values.global.logAsJson true }}
Expand All @@ -130,6 +134,9 @@ spec:
- "--enable-metrics=false"
{{- end }}
- "--etcd-data-dir={{ if eq .Values.global.daprControlPlaneOs "windows" }}{{ .Values.cluster.EtcdDataDirWinPath }}{{- else }}{{ .Values.cluster.EtcdDataDirPath }}"
- "--etcd-space-quota={{ .Values.etcdSpaceQuota }}"
- "--etcd-compaction-mode={{ .Values.etcdCompactionMode }}"
- "--etcd-compaction-retention={{ .Values.etcdCompactionRetention }}"
- "--tls-enabled"
- "--trust-domain={{ .Values.global.mtls.controlPlaneTrustDomain }}"
- "--trust-anchors-file=/var/run/secrets/dapr.io/tls/ca.crt"
Expand Down
5 changes: 5 additions & 0 deletions charts/dapr/charts/dapr_scheduler/values.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ ports:
protocol: TCP
apiPort: 50006
etcdRPCClientPort: 2379
etcdHttpClientPort: 2330
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be 2381?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I considered the same, and it's open for discussion, but I thought a second scheduler instance would be using 2381 for the main (gRPC) port. So let's say if we had 5 instances, we'd have 2379, 2381, 2383, 2385 and 2387 for the client ports. That's why I chose a random port that's reasonably far away for http.

Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fair enough

etcdRPCPeerPort: 2380

replicaCount: 1
Expand All @@ -31,6 +32,10 @@ cluster:
EtcdDataDirPath: /var/run/data/dapr-scheduler/etcd-data-dir
EtcdDataDirWinPath: C:\\etcd-data-dir

etcdSpaceQuota: 2147483648
etcdCompactionMode: periodic
etcdCompactionRetention: 24h

volumeclaims:
storageSize: 1Gi
storageClassName:
Expand Down
12 changes: 8 additions & 4 deletions cmd/scheduler/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,10 +91,14 @@ func Run() {
Port: opts.Port,
Security: secHandler,

DataDir: opts.EtcdDataDir,
EtcdID: opts.EtcdID,
EtcdInitialPeers: opts.EtcdInitialPeers,
EtcdClientPorts: opts.EtcdClientPorts,
DataDir: opts.EtcdDataDir,
Id: opts.Id,
EtcdInitialPeers: opts.EtcdInitialPeers,
EtcdClientPorts: opts.EtcdClientPorts,
EtcdClientHttpPorts: opts.EtcdClientHttpPorts,
EtcdSpaceQuota: opts.EtcdSpaceQuota,
EtcdCompactionMode: opts.EtcdCompactionMode,
EtcdCompactionRetention: opts.EtcdCompactionRetention,
})

return server.Run(ctx)
Expand Down
18 changes: 13 additions & 5 deletions cmd/scheduler/options/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,10 +37,14 @@ type Options struct {
PlacementAddress string
Mode string

EtcdID string
EtcdInitialPeers []string
EtcdDataDir string
EtcdClientPorts []string
Id string
EtcdInitialPeers []string
EtcdDataDir string
EtcdClientPorts []string
EtcdClientHttpPorts []string
EtcdSpaceQuota int64
EtcdCompactionMode string
EtcdCompactionRetention string

Logger logger.Options
Metrics *metrics.Options
Expand Down Expand Up @@ -79,10 +83,14 @@ func New(origArgs []string) *Options {
fs.StringVar(&opts.PlacementAddress, "placement-address", "", "Addresses for Dapr Actor Placement service")
fs.StringVar(&opts.Mode, "mode", string(modes.StandaloneMode), "Runtime mode for Dapr Scheduler")

fs.StringVar(&opts.EtcdID, "id", "dapr-scheduler-server-0", "Scheduler server ID")
fs.StringVar(&opts.Id, "id", "dapr-scheduler-server-0", "Scheduler server ID")
Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I changed this because we can have scheduler instances without a running etcd; it would feel weird to have an "etcdID" for them.

fs.StringSliceVar(&opts.EtcdInitialPeers, "initial-cluster", []string{"dapr-scheduler-server-0=http://localhost:2380"}, "Initial etcd cluster peers")
fs.StringVar(&opts.EtcdDataDir, "etcd-data-dir", "./data", "Directory to store scheduler etcd data")
fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client communication")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client communication")
fs.StringSliceVar(&opts.EtcdClientPorts, "etcd-client-grpc-ports", []string{"dapr-scheduler-server-0=2379"}, "Ports for etcd client grpc communication")

if we are being explicit on http, it might be worth putting grpc here

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is related to decision we're gonna make above (re: to having a default http port or not). If we do it like etcd does it and serve http on the same port as grpc by default, it wouldn't make sense to add grpc to the parameter.

fs.StringSliceVar(&opts.EtcdClientHttpPorts, "etcd-client-http-ports", []string{""}, "Ports for etcd client http communication")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we put a sane default here? or no and default is non production setup

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the review Cassie 🙏
I don't know, my feeling is that we should stay as close as possible to etcd behaviour here, and keep it empty, so that we have http and grpc on the same port. This lowers the complexity for development, while still keeping open the possibility of adding a separate http port for production.

fs.Int64Var(&opts.EtcdSpaceQuota, "etcd-space-quota", 2*1024*1024*1024, "Space quota for etcd")
Copy link
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For consistency, should we put the computed val here of 2147483648 instead of 2*1024*1024*1024?

Suggested change
fs.Int64Var(&opts.EtcdSpaceQuota, "etcd-space-quota", 2*1024*1024*1024, "Space quota for etcd")
fs.Int64Var(&opts.EtcdSpaceQuota, "etcd-space-quota", 2147483648, "Space quota for etcd")

fs.StringVar(&opts.EtcdCompactionMode, "etcd-compaction-mode", "periodic", "Compaction mode for etcd. Can be 'periodic' or 'revision'")
fs.StringVar(&opts.EtcdCompactionRetention, "etcd-compaction-retention", "24h", "Compaction retention for etcd. Can express time or number of revisions, depending on the value of 'etcd-compaction-mode'")

opts.Logger = logger.DefaultOptions()
opts.Logger.AttachCmdFlags(fs.StringVar, fs.BoolVar)
Expand Down
28 changes: 22 additions & 6 deletions pkg/scheduler/server/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,16 +22,20 @@ import (
"go.etcd.io/etcd/server/v3/embed"

"github.com/dapr/dapr/pkg/modes"
"github.com/dapr/dapr/pkg/security"
)

func (s *Server) conf() *embed.Config {
config := embed.NewConfig()

config.Name = s.etcdID
config.Dir = s.dataDir
config.Name = s.id
config.Dir = s.dataDir + "-" + security.CurrentNamespace() + "-" + s.id
config.InitialCluster = strings.Join(s.etcdInitialPeers, ",")
config.QuotaBackendBytes = s.etcdSpaceQuota
config.AutoCompactionMode = s.etcdCompactionMode
config.AutoCompactionRetention = s.etcdCompactionRetention

etcdURL, peerPort, err := peerHostAndPort(s.etcdID, s.etcdInitialPeers)
etcdURL, peerPort, err := peerHostAndPort(s.id, s.etcdInitialPeers)
if err != nil {
log.Warnf("Invalid format for initial cluster. Make sure to include 'http://' in Scheduler URL")
}
Expand All @@ -43,7 +47,7 @@ func (s *Server) conf() *embed.Config {

config.AdvertiseClientUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.etcdID]),
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.id]),
}}

switch s.mode {
Expand All @@ -56,17 +60,29 @@ func (s *Server) conf() *embed.Config {
}}
config.ListenClientUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientPorts[s.etcdID]),
Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientPorts[s.id]),
}}
if len(s.etcdClientHttpPorts) > 0 {
config.ListenClientHttpUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdIP, s.etcdClientHttpPorts[s.id]),
}}
}
default:
config.ListenPeerUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdURL, peerPort),
}}
config.ListenClientUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.etcdID]),
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientPorts[s.id]),
}}
if len(s.etcdClientHttpPorts) > 0 {
config.ListenClientHttpUrls = []url.URL{{
Scheme: "http",
Host: fmt.Sprintf("%s:%s", etcdURL, s.etcdClientHttpPorts[s.id]),
}}
}
}

config.LogLevel = "info" // Only supports debug, info, warn, error, panic, or fatal. Default 'info'.
Expand Down
88 changes: 54 additions & 34 deletions pkg/scheduler/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,10 +53,14 @@ type Options struct {
Port int
Security security.Handler

DataDir string
EtcdID string
EtcdInitialPeers []string
EtcdClientPorts []string
DataDir string
Id string
EtcdInitialPeers []string
EtcdClientPorts []string
EtcdClientHttpPorts []string
EtcdSpaceQuota int64
EtcdCompactionMode string
EtcdCompactionRetention string
}

// Server is the gRPC server for the Scheduler service.
Expand All @@ -66,14 +70,18 @@ type Server struct {
listenAddress string
mode modes.DaprMode

dataDir string
etcdID string
etcdInitialPeers []string
etcdClientPorts map[string]string
cron *etcdcron.Cron
readyCh chan struct{}
jobTriggerChan chan *schedulerv1pb.WatchJobsResponse // used to trigger the WatchJobs logic
jobWatcherWG sync.WaitGroup
dataDir string
id string
etcdInitialPeers []string
etcdClientPorts map[string]string
etcdClientHttpPorts map[string]string
etcdSpaceQuota int64
etcdCompactionMode string
etcdCompactionRetention string
cron *etcdcron.Cron
readyCh chan struct{}
jobTriggerChan chan *schedulerv1pb.WatchJobsResponse // used to trigger the WatchJobs logic
jobWatcherWG sync.WaitGroup

sidecarConnChan chan *internal.Connection
connectionPool *internal.Pool // Connection pool for sidecars
Expand All @@ -84,30 +92,25 @@ type Server struct {
}

func New(opts Options) *Server {
clientPorts := make(map[string]string)
for _, input := range opts.EtcdClientPorts {
idAndPort := strings.Split(input, "=")
if len(idAndPort) != 2 {
log.Warnf("Incorrect format for client ports: %s. Should contain <id>=<client-port>", input)
continue
}
schedulerID := strings.TrimSpace(idAndPort[0])
port := strings.TrimSpace(idAndPort[1])
clientPorts[schedulerID] = port
}
clientPorts := parseClientPorts(opts.EtcdClientPorts)
clientHttpPorts := parseClientPorts(opts.EtcdClientHttpPorts)

s := &Server{
port: opts.Port,
listenAddress: opts.ListenAddress,
mode: opts.Mode,

etcdID: opts.EtcdID,
etcdInitialPeers: opts.EtcdInitialPeers,
etcdClientPorts: clientPorts,
dataDir: opts.DataDir,
readyCh: make(chan struct{}),
jobTriggerChan: make(chan *schedulerv1pb.WatchJobsResponse),
jobWatcherWG: sync.WaitGroup{},
id: opts.Id,
etcdInitialPeers: opts.EtcdInitialPeers,
etcdClientPorts: clientPorts,
etcdClientHttpPorts: clientHttpPorts,
etcdSpaceQuota: opts.EtcdSpaceQuota,
etcdCompactionMode: opts.EtcdCompactionMode,
etcdCompactionRetention: opts.EtcdCompactionRetention,
dataDir: opts.DataDir,
readyCh: make(chan struct{}),
jobTriggerChan: make(chan *schedulerv1pb.WatchJobsResponse),
jobWatcherWG: sync.WaitGroup{},

sidecarConnChan: make(chan *internal.Connection),
connectionPool: &internal.Pool{
Expand Down Expand Up @@ -155,6 +158,23 @@ func New(opts Options) *Server {
return s
}

func parseClientPorts(opts []string) map[string]string {
ports := make(map[string]string)

for _, input := range opts {
idAndPort := strings.Split(input, "=")
if len(idAndPort) != 2 {
log.Warnf("Incorrect format for client http ports: %s. Should contain <id>=<client-port>", input)
continue
}
schedulerID := strings.TrimSpace(idAndPort[0])
port := strings.TrimSpace(idAndPort[1])
ports[schedulerID] = port
}

return ports
}

func (s *Server) Run(ctx context.Context) error {
log.Info("Dapr Scheduler is starting...")

Expand Down Expand Up @@ -314,10 +334,10 @@ func (s *Server) runJobWatcher(ctx context.Context) error {
s.handleJobStreaming(ctx)
}()

<-ctx.Done()
s.jobWatcherWG.Wait()
log.Info("JobWatcher exited")
return ctx.Err()
<-ctx.Done()
s.jobWatcherWG.Wait()
log.Info("JobWatcher exited")
return ctx.Err()
}

// handleJobStreaming handles the streaming of jobs to Dapr sidecars.
Expand Down
Loading