Skip to content

Commit

Permalink
Fix scheduler initialisation startup race (armadaproject#4132)
Browse files Browse the repository at this point in the history
* Fix scheduler initialisation startup race

Currently the SubmitCheck.Run loads all the queues from the queue cache + executors from the database, this relies on QueueCache already having the queues loaded

As SubmitCheck.Run and QueueCache.Run are called at the same time in separate go routines, sometimes SubmitCheck.Run happens first and can't find the queues
 - When this happens, it blocks scheduling for 1 minute (until next SubmitCheck executor refresh)

I've added an Initialise method to QueueCache so we can call this during component creation, preventing the race described above

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Change executor update frequency - test if this fixes CI

* Fix ci

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Make sure queue cache and submit check are initialise on startup

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Revert config changes

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Respect JobLeaseRequestTimeout, reduce CI JobLeaseRequestTimeout to 5s, remove 70s sleep

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Format

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Set scheduler depending on server

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Wait for postgres to start running

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Remove server dependency

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Improve startup tests

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Fix func calls

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Move to separate funcs for checking running/ready

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Add timeouts + log errors on startup rather than exit

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Pass in timeout

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

* Set longer timeout

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>

---------

Signed-off-by: JamesMurkin <jamesmurkin@hotmail.com>
Signed-off-by: Rich Scott <richscott@sent.com>
  • Loading branch information
JamesMurkin authored and richscott committed Feb 5, 2025
1 parent a3efe6a commit 0c3e28c
Show file tree
Hide file tree
Showing 11 changed files with 172 additions and 25 deletions.
1 change: 1 addition & 0 deletions developer/env/docker/executor.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
ARMADA_EXECUTORAPICONNECTION_ARMADAURL="scheduler:50052"
ARMADA_EXECUTORAPICONNECTION_FORCENOTLS=true
ARMADA_APPLICATION_JOBLEASEREQUESTTIMEOUT=5s
1 change: 1 addition & 0 deletions internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func setupExecutorApiComponents(
clusterUtilisationService,
config.Kubernetes.PodDefaults,
config.Application.MaxLeasedJobs,
config.Application.JobLeaseRequestTimeout,
)
clusterAllocationService := service.NewClusterAllocationService(
clusterContext,
Expand Down
5 changes: 4 additions & 1 deletion internal/executor/service/job_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type JobRequester struct {
podDefaults *configuration.PodDefaults
jobRunStateStore job.RunStateStore
maxLeasedJobs int
maxRequestDuration time.Duration
}

func NewJobRequester(
Expand All @@ -35,6 +36,7 @@ func NewJobRequester(
utilisationService utilisation.UtilisationService,
podDefaults *configuration.PodDefaults,
maxLeasedJobs int,
maxRequestDuration time.Duration,
) *JobRequester {
return &JobRequester{
leaseRequester: leaseRequester,
Expand All @@ -44,6 +46,7 @@ func NewJobRequester(
clusterId: clusterId,
podDefaults: podDefaults,
maxLeasedJobs: maxLeasedJobs,
maxRequestDuration: maxRequestDuration,
}
}

Expand All @@ -53,7 +56,7 @@ func (r *JobRequester) RequestJobsRuns() {
log.Errorf("Failed to create lease request because %s", err)
return
}
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 30*time.Second)
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), r.maxRequestDuration)
defer cancel()
leaseResponse, err := r.leaseRequester.LeaseJobRuns(ctx, leaseRequest)
if err != nil {
Expand Down
16 changes: 14 additions & 2 deletions internal/executor/service/job_requester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand All @@ -23,7 +24,10 @@ import (
"github.com/armadaproject/armada/pkg/executorapi"
)

const defaultMaxLeasedJobs int = 5
const (
defaultMaxLeasedJobs int = 5
defaultMaxRequestDuration = 30 * time.Second
)

func TestRequestJobsRuns_HandlesLeaseRequestError(t *testing.T) {
jobRequester, eventReporter, leaseRequester, stateStore, _ := setupJobRequesterTest([]*job.RunState{})
Expand Down Expand Up @@ -257,7 +261,15 @@ func setupJobRequesterTest(initialJobRuns []*job.RunState) (*JobRequester, *mock
utilisationService.ClusterAvailableCapacityReport = &utilisation.ClusterAvailableCapacityReport{
AvailableCapacity: &armadaresource.ComputeResources{},
}
jobRequester := NewJobRequester(clusterId, eventReporter, leaseRequester, stateStore, utilisationService, podDefaults, defaultMaxLeasedJobs)
jobRequester := NewJobRequester(
clusterId,
eventReporter,
leaseRequester,
stateStore,
utilisationService,
podDefaults,
defaultMaxLeasedJobs,
defaultMaxRequestDuration)
return jobRequester, eventReporter, leaseRequester, stateStore, utilisationService
}

Expand Down
9 changes: 9 additions & 0 deletions internal/scheduler/queue/queue_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ func NewQueueCache(apiClient api.SubmitClient, updateFrequency time.Duration) *A
}
}

func (c *ApiQueueCache) Initialise(ctx *armadacontext.Context) error {
err := c.fetchQueues(ctx)
if err != nil {
ctx.Errorf("Error initialising queue cache, failed fetching queues: %v", err)
}

return err
}

func (c *ApiQueueCache) Run(ctx *armadacontext.Context) error {
if err := c.fetchQueues(ctx); err != nil {
ctx.Warnf("Error fetching queues: %v", err)
Expand Down
12 changes: 12 additions & 0 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ func Run(config schedulerconfig.Configuration) error {
}()
armadaClient := api.NewSubmitClient(conn)
queueCache := queue.NewQueueCache(armadaClient, config.QueueRefreshPeriod)
queueCacheInitTimeout, cancel := armadacontext.WithTimeout(ctx, time.Second*30)
defer cancel()
err = queueCache.Initialise(queueCacheInitTimeout)
if err != nil {
ctx.Errorf("error initialising queue cache - %v", err)
}
services = append(services, func() error { return queueCache.Run(ctx) })

// ////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -262,6 +268,12 @@ func Run(config schedulerconfig.Configuration) error {
floatingResourceTypes,
resourceListFactory,
)
submitCheckerInitTimeout, cancel := armadacontext.WithTimeout(ctx, time.Second*30)
defer cancel()
err = submitChecker.Initialise(submitCheckerInitTimeout)
if err != nil {
ctx.Errorf("error initialising submit checker - %v", err)
}
services = append(services, func() error {
return submitChecker.Run(ctx)
})
Expand Down
34 changes: 22 additions & 12 deletions internal/scheduler/submitcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,36 +72,44 @@ func NewSubmitChecker(
}
}

func (srv *SubmitChecker) Initialise(ctx *armadacontext.Context) error {
err := srv.updateExecutors(ctx)
if err != nil {
ctx.Logger().WithStacktrace(err).Errorf("Error initialising submit checker")
}

return err
}

func (srv *SubmitChecker) Run(ctx *armadacontext.Context) error {
ctx.Infof("Will refresh executor state every %s", srv.schedulingConfig.ExecutorUpdateFrequency)
srv.updateExecutors(ctx)
if err := srv.updateExecutors(ctx); err != nil {
ctx.Logger().WithStacktrace(err).Error("Failed updating executors")
}

ticker := time.NewTicker(srv.schedulingConfig.ExecutorUpdateFrequency)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
srv.updateExecutors(ctx)
if err := srv.updateExecutors(ctx); err != nil {
ctx.Logger().WithStacktrace(err).Error("Failed updating executors")
}
}
}
}

func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) {
func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) error {
queues, err := srv.queueCache.GetAll(ctx)
if err != nil {
ctx.Logger().
WithStacktrace(err).
Error("Error fetching queues")
return
return fmt.Errorf("failed fetching queues from queue cache - %s", err)
}

executors, err := srv.executorRepository.GetExecutors(ctx)
if err != nil {
ctx.Logger().
WithStacktrace(err).
Error("Error fetching executors")
return
return fmt.Errorf("failed fetching executors from db - %s", err)
}

ctx.Infof("Retrieved %d executors", len(executors))
jobSchedulingResultsCache, err := lru.New(10000)
if err != nil {
Expand Down Expand Up @@ -166,6 +174,8 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) {
constraintsByPool: constraintsByPool,
jobSchedulingResultsCache: jobSchedulingResultsCache,
})

return nil
}

func (srv *SubmitChecker) Check(ctx *armadacontext.Context, jobs []*jobdb.Job) (map[string]schedulingResult, error) {
Expand Down
61 changes: 60 additions & 1 deletion internal/scheduler/submitcheck_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -298,7 +299,8 @@ func TestSubmitChecker_CheckJobDbJobs(t *testing.T) {
floatingResources,
testfixtures.TestResourceListFactory)
submitCheck.clock = fakeClock
submitCheck.updateExecutors(ctx)
err := submitCheck.Initialise(ctx)
assert.NoError(t, err)
results, err := submitCheck.Check(ctx, tc.jobs)
require.NoError(t, err)
require.Equal(t, len(tc.expectedResult), len(results))
Expand All @@ -316,6 +318,63 @@ func TestSubmitChecker_CheckJobDbJobs(t *testing.T) {
}
}

func TestSubmitChecker_Initialise(t *testing.T) {
tests := map[string]struct {
queueCacheErr error
executorRepoErr error
expectError bool
}{
"Successful initialisation": {
expectError: false,
},
"error on queue cache error": {
expectError: true,
queueCacheErr: fmt.Errorf("failed to get queues"),
},
"error on executor repo err": {
expectError: true,
queueCacheErr: fmt.Errorf("failed to get executors"),
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second)
defer cancel()

queue := &api.Queue{Name: "queue"}
executors := []*schedulerobjects.Executor{Executor(SmallNode("cpu"))}

ctrl := gomock.NewController(t)
mockExecutorRepo := schedulermocks.NewMockExecutorRepository(ctrl)
if tc.executorRepoErr != nil {
mockExecutorRepo.EXPECT().GetExecutors(ctx).Return(nil, tc.executorRepoErr).AnyTimes()
} else {
mockExecutorRepo.EXPECT().GetExecutors(ctx).Return(executors, nil).AnyTimes()
}

mockQueueCache := schedulermocks.NewMockQueueCache(ctrl)
if tc.queueCacheErr != nil {
mockQueueCache.EXPECT().GetAll(ctx).Return(nil, tc.queueCacheErr).AnyTimes()
} else {
mockQueueCache.EXPECT().GetAll(ctx).Return([]*api.Queue{queue}, nil).AnyTimes()
}

submitCheck := NewSubmitChecker(testfixtures.TestSchedulingConfig(),
mockExecutorRepo,
mockQueueCache,
testfixtures.TestFloatingResources,
testfixtures.TestResourceListFactory)

err := submitCheck.Initialise(ctx)
if tc.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func Executor(nodes ...*schedulerobjects.Node) *schedulerobjects.Executor {
executorId := uuid.NewString()
for _, node := range nodes {
Expand Down
5 changes: 4 additions & 1 deletion magefiles/ci.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func TestSuite() error {

// Checks if Armada is ready to accept jobs.
func CheckForArmadaRunning() error {
time.Sleep(30 * time.Second)
// This is a bit of a shonky check, it confirms the scheduler is up and receiving reports from the executor
// at which point the system should be ready
// TODO Make a good check to confirm the system is ready, such as seeing armadactl get executors return a value
mg.Deps(CheckSchedulerReady)
mg.Deps(createQueue)

// Set high to take compile time into account
Expand Down
45 changes: 38 additions & 7 deletions magefiles/developer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"os/exec"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -111,28 +112,58 @@ func StopComponents() error {
return nil
}

// Repeatedly check logs until Pulsar is ready.
func CheckForPulsarRunning() error {
func CheckPulsarRunning() error {
return CheckDockerContainerRunning("pulsar", "alive")
}

func CheckPostgresRunning() error {
return CheckDockerContainerRunning("pulsar", "alive")
}

func CheckServerRunning() error {
return CheckDockerContainerRunning("server", "Starting http server listening on")
}

func CheckSchedulerRunning() error {
return CheckDockerContainerRunning("scheduler", "Starting http server listening on")
}

func CheckExecutorRunning() error {
return CheckDockerContainerRunning("executor", "Starting http server listening on")
}

func CheckSchedulerReady() error {
return CheckDockerContainerRunning("scheduler", "Retrieved [1-9]+ executors")
}

// Repeatedly check logs until container is ready.
func CheckDockerContainerRunning(containerName string, expectedLogRegex string) error {
timeout := time.After(1 * time.Minute)
tick := time.Tick(1 * time.Second)
seconds := 0

logMatchRegex, err := regexp.Compile(expectedLogRegex)
if err != nil {
return fmt.Errorf("invalid log regex %s - %s", expectedLogRegex, err)
}

for {
select {
case <-timeout:
return fmt.Errorf("timed out waiting for Pulsar to start")
return fmt.Errorf("timed out waiting for %s to start", containerName)
case <-tick:
out, err := dockerOutput("compose", "logs", "pulsar")
out, err := dockerOutput("compose", "logs", containerName)
if err != nil {
return err
}
if strings.Contains(out, "alive") {
if len(logMatchRegex.FindStringSubmatch(out)) > 0 {
// if seconds is less than 1, it means that pulsar had already started
if seconds < 1 {
fmt.Printf("\nPulsar had already started!\n\n")
fmt.Printf("\n%s had already started!\n\n", containerName)
return nil
}

fmt.Printf("\nPulsar took %d seconds to start!\n\n", seconds)
fmt.Printf("\n%s took %d seconds to start!\n\n", containerName, seconds)
return nil
}
seconds++
Expand Down
8 changes: 7 additions & 1 deletion magefiles/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,18 @@ func LocalDev(arg string) error {

mg.Deps(StartDependencies)
fmt.Println("Waiting for dependencies to start...")
mg.Deps(CheckForPulsarRunning)
mg.Deps(CheckPulsarRunning)
mg.Deps(CheckPostgresRunning)

switch arg {
case "minimal":
os.Setenv("ARMADA_COMPONENTS", "executor,server,scheduler")
mg.Deps(StartComponents)
// This is a naive check to confirm the containers are running, it doesn't check they are ready
// TODO Make a good check to confirm the system is ready, such as seeing armadactl get executors return a value
mg.Deps(CheckServerRunning)
mg.Deps(CheckSchedulerRunning)
mg.Deps(CheckExecutorRunning)
case "debug", "no-build":
fmt.Println("Dependencies started, ending localdev...")
return nil
Expand Down

0 comments on commit 0c3e28c

Please sign in to comment.