Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix scheduler initialisation startup race #4132

Merged
merged 27 commits into from
Feb 4, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
27 commits
Select commit Hold shift + click to select a range
2a340df
Fix scheduler initialisation startup race
JamesMurkin Jan 10, 2025
63bf987
Change executor update frequency - test if this fixes CI
JamesMurkin Jan 10, 2025
fb8db61
Fix ci
JamesMurkin Jan 10, 2025
ea56300
Make sure queue cache and submit check are initialise on startup
JamesMurkin Jan 13, 2025
77e1c4f
Merge branch 'master' into fix_queue_cache_startup_race
JamesMurkin Jan 13, 2025
8909468
Revert config changes
JamesMurkin Jan 13, 2025
2d5884f
Respect JobLeaseRequestTimeout, reduce CI JobLeaseRequestTimeout to 5…
JamesMurkin Jan 13, 2025
cb88f9f
Format
JamesMurkin Jan 13, 2025
c7a054b
Merge branch 'master' into fix_queue_cache_startup_race
JamesMurkin Jan 16, 2025
32f5959
Merge branch 'master' into fix_queue_cache_startup_race
JamesMurkin Jan 17, 2025
cf0dcaa
Merge branch 'master' into fix_queue_cache_startup_race
JamesMurkin Jan 20, 2025
4b80aac
Set scheduler depending on server
JamesMurkin Jan 20, 2025
622d071
Merge branch 'master' into fix_queue_cache_startup_race
JamesMurkin Jan 20, 2025
f4b4b6f
Wait for postgres to start running
JamesMurkin Jan 20, 2025
c41abb1
Merge branch 'fix_queue_cache_startup_race' of https://github.com/G-R…
JamesMurkin Jan 20, 2025
eab6098
Remove server dependency
JamesMurkin Jan 20, 2025
b563b48
Improve startup tests
JamesMurkin Jan 22, 2025
8ff764f
Fix func calls
JamesMurkin Jan 22, 2025
4f90086
Move to separate funcs for checking running/ready
JamesMurkin Jan 22, 2025
0bb7add
Merge branch 'master' into fix_queue_cache_startup_race
JamesMurkin Jan 22, 2025
7b2747a
Add timeouts + log errors on startup rather than exit
JamesMurkin Jan 23, 2025
df6b0ff
Merge branch 'fix_queue_cache_startup_race' of https://github.com/G-R…
JamesMurkin Jan 23, 2025
488d34b
Pass in timeout
JamesMurkin Jan 23, 2025
ed8b62b
Set longer timeout
JamesMurkin Jan 23, 2025
d33da75
Merge branch 'master' into fix_queue_cache_startup_race
JamesMurkin Jan 30, 2025
4685d4f
Merge branch 'master' into fix_queue_cache_startup_race
JamesMurkin Feb 3, 2025
a8fa919
Merge branch 'master' into fix_queue_cache_startup_race
JamesMurkin Feb 4, 2025
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions developer/env/docker/executor.env
Original file line number Diff line number Diff line change
@@ -1,2 +1,3 @@
ARMADA_EXECUTORAPICONNECTION_ARMADAURL="scheduler:50052"
ARMADA_EXECUTORAPICONNECTION_FORCENOTLS=true
ARMADA_APPLICATION_JOBLEASEREQUESTTIMEOUT=5s
1 change: 1 addition & 0 deletions internal/executor/application.go
Original file line number Diff line number Diff line change
Expand Up @@ -218,6 +218,7 @@ func setupExecutorApiComponents(
clusterUtilisationService,
config.Kubernetes.PodDefaults,
config.Application.MaxLeasedJobs,
config.Application.JobLeaseRequestTimeout,
)
clusterAllocationService := service.NewClusterAllocationService(
clusterContext,
Expand Down
5 changes: 4 additions & 1 deletion internal/executor/service/job_requester.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ type JobRequester struct {
podDefaults *configuration.PodDefaults
jobRunStateStore job.RunStateStore
maxLeasedJobs int
maxRequestDuration time.Duration
}

func NewJobRequester(
Expand All @@ -35,6 +36,7 @@ func NewJobRequester(
utilisationService utilisation.UtilisationService,
podDefaults *configuration.PodDefaults,
maxLeasedJobs int,
maxRequestDuration time.Duration,
) *JobRequester {
return &JobRequester{
leaseRequester: leaseRequester,
Expand All @@ -44,6 +46,7 @@ func NewJobRequester(
clusterId: clusterId,
podDefaults: podDefaults,
maxLeasedJobs: maxLeasedJobs,
maxRequestDuration: maxRequestDuration,
}
}

Expand All @@ -53,7 +56,7 @@ func (r *JobRequester) RequestJobsRuns() {
log.Errorf("Failed to create lease request because %s", err)
return
}
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 30*time.Second)
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), r.maxRequestDuration)
defer cancel()
leaseResponse, err := r.leaseRequester.LeaseJobRuns(ctx, leaseRequest)
if err != nil {
Expand Down
16 changes: 14 additions & 2 deletions internal/executor/service/job_requester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package service
import (
"fmt"
"testing"
"time"

"github.com/google/uuid"
"github.com/stretchr/testify/assert"
Expand All @@ -23,7 +24,10 @@ import (
"github.com/armadaproject/armada/pkg/executorapi"
)

const defaultMaxLeasedJobs int = 5
const (
defaultMaxLeasedJobs int = 5
defaultMaxRequestDuration = 30 * time.Second
)

func TestRequestJobsRuns_HandlesLeaseRequestError(t *testing.T) {
jobRequester, eventReporter, leaseRequester, stateStore, _ := setupJobRequesterTest([]*job.RunState{})
Expand Down Expand Up @@ -257,7 +261,15 @@ func setupJobRequesterTest(initialJobRuns []*job.RunState) (*JobRequester, *mock
utilisationService.ClusterAvailableCapacityReport = &utilisation.ClusterAvailableCapacityReport{
AvailableCapacity: &armadaresource.ComputeResources{},
}
jobRequester := NewJobRequester(clusterId, eventReporter, leaseRequester, stateStore, utilisationService, podDefaults, defaultMaxLeasedJobs)
jobRequester := NewJobRequester(
clusterId,
eventReporter,
leaseRequester,
stateStore,
utilisationService,
podDefaults,
defaultMaxLeasedJobs,
defaultMaxRequestDuration)
return jobRequester, eventReporter, leaseRequester, stateStore, utilisationService
}

Expand Down
9 changes: 9 additions & 0 deletions internal/scheduler/queue/queue_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,15 @@ func NewQueueCache(apiClient api.SubmitClient, updateFrequency time.Duration) *A
}
}

func (c *ApiQueueCache) Initialise(ctx *armadacontext.Context) error {
err := c.fetchQueues(ctx)
if err != nil {
ctx.Errorf("Error initialising queue cache, failed fetching queues: %v", err)
}

return err
}

func (c *ApiQueueCache) Run(ctx *armadacontext.Context) error {
if err := c.fetchQueues(ctx); err != nil {
ctx.Warnf("Error fetching queues: %v", err)
Expand Down
12 changes: 12 additions & 0 deletions internal/scheduler/schedulerapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,6 +126,12 @@ func Run(config schedulerconfig.Configuration) error {
}()
armadaClient := api.NewSubmitClient(conn)
queueCache := queue.NewQueueCache(armadaClient, config.QueueRefreshPeriod)
queueCacheInitTimeout, cancel := armadacontext.WithTimeout(ctx, time.Second*30)
defer cancel()
err = queueCache.Initialise(queueCacheInitTimeout)
if err != nil {
ctx.Errorf("error initialising queue cache - %v", err)
}
services = append(services, func() error { return queueCache.Run(ctx) })

// ////////////////////////////////////////////////////////////////////////
Expand Down Expand Up @@ -262,6 +268,12 @@ func Run(config schedulerconfig.Configuration) error {
floatingResourceTypes,
resourceListFactory,
)
submitCheckerInitTimeout, cancel := armadacontext.WithTimeout(ctx, time.Second*30)
defer cancel()
err = submitChecker.Initialise(submitCheckerInitTimeout)
if err != nil {
ctx.Errorf("error initialising submit checker - %v", err)
}
services = append(services, func() error {
return submitChecker.Run(ctx)
})
Expand Down
34 changes: 22 additions & 12 deletions internal/scheduler/submitcheck.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,36 +72,44 @@ func NewSubmitChecker(
}
}

func (srv *SubmitChecker) Initialise(ctx *armadacontext.Context) error {
err := srv.updateExecutors(ctx)
if err != nil {
ctx.Logger().WithStacktrace(err).Errorf("Error initialising submit checker")
}

return err
}

func (srv *SubmitChecker) Run(ctx *armadacontext.Context) error {
ctx.Infof("Will refresh executor state every %s", srv.schedulingConfig.ExecutorUpdateFrequency)
srv.updateExecutors(ctx)
if err := srv.updateExecutors(ctx); err != nil {
ctx.Logger().WithStacktrace(err).Error("Failed updating executors")
}

ticker := time.NewTicker(srv.schedulingConfig.ExecutorUpdateFrequency)
for {
select {
case <-ctx.Done():
return nil
case <-ticker.C:
srv.updateExecutors(ctx)
if err := srv.updateExecutors(ctx); err != nil {
ctx.Logger().WithStacktrace(err).Error("Failed updating executors")
}
}
}
}

func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) {
func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) error {
queues, err := srv.queueCache.GetAll(ctx)
if err != nil {
ctx.Logger().
WithStacktrace(err).
Error("Error fetching queues")
return
return fmt.Errorf("failed fetching queues from queue cache - %s", err)
}

executors, err := srv.executorRepository.GetExecutors(ctx)
if err != nil {
ctx.Logger().
WithStacktrace(err).
Error("Error fetching executors")
return
return fmt.Errorf("failed fetching executors from db - %s", err)
}

ctx.Infof("Retrieved %d executors", len(executors))
jobSchedulingResultsCache, err := lru.New(10000)
if err != nil {
Expand Down Expand Up @@ -166,6 +174,8 @@ func (srv *SubmitChecker) updateExecutors(ctx *armadacontext.Context) {
constraintsByPool: constraintsByPool,
jobSchedulingResultsCache: jobSchedulingResultsCache,
})

return nil
}

func (srv *SubmitChecker) Check(ctx *armadacontext.Context, jobs []*jobdb.Job) (map[string]schedulingResult, error) {
Expand Down
61 changes: 60 additions & 1 deletion internal/scheduler/submitcheck_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package scheduler

import (
"fmt"
"testing"
"time"

Expand Down Expand Up @@ -298,7 +299,8 @@ func TestSubmitChecker_CheckJobDbJobs(t *testing.T) {
floatingResources,
testfixtures.TestResourceListFactory)
submitCheck.clock = fakeClock
submitCheck.updateExecutors(ctx)
err := submitCheck.Initialise(ctx)
assert.NoError(t, err)
results, err := submitCheck.Check(ctx, tc.jobs)
require.NoError(t, err)
require.Equal(t, len(tc.expectedResult), len(results))
Expand All @@ -316,6 +318,63 @@ func TestSubmitChecker_CheckJobDbJobs(t *testing.T) {
}
}

func TestSubmitChecker_Initialise(t *testing.T) {
tests := map[string]struct {
queueCacheErr error
executorRepoErr error
expectError bool
}{
"Successful initialisation": {
expectError: false,
},
"error on queue cache error": {
expectError: true,
queueCacheErr: fmt.Errorf("failed to get queues"),
},
"error on executor repo err": {
expectError: true,
queueCacheErr: fmt.Errorf("failed to get executors"),
},
}
for name, tc := range tests {
t.Run(name, func(t *testing.T) {
ctx, cancel := armadacontext.WithTimeout(armadacontext.Background(), 5*time.Second)
defer cancel()

queue := &api.Queue{Name: "queue"}
executors := []*schedulerobjects.Executor{Executor(SmallNode("cpu"))}

ctrl := gomock.NewController(t)
mockExecutorRepo := schedulermocks.NewMockExecutorRepository(ctrl)
if tc.executorRepoErr != nil {
mockExecutorRepo.EXPECT().GetExecutors(ctx).Return(nil, tc.executorRepoErr).AnyTimes()
} else {
mockExecutorRepo.EXPECT().GetExecutors(ctx).Return(executors, nil).AnyTimes()
}

mockQueueCache := schedulermocks.NewMockQueueCache(ctrl)
if tc.queueCacheErr != nil {
mockQueueCache.EXPECT().GetAll(ctx).Return(nil, tc.queueCacheErr).AnyTimes()
} else {
mockQueueCache.EXPECT().GetAll(ctx).Return([]*api.Queue{queue}, nil).AnyTimes()
}

submitCheck := NewSubmitChecker(testfixtures.TestSchedulingConfig(),
mockExecutorRepo,
mockQueueCache,
testfixtures.TestFloatingResources,
testfixtures.TestResourceListFactory)

err := submitCheck.Initialise(ctx)
if tc.expectError {
assert.Error(t, err)
} else {
assert.NoError(t, err)
}
})
}
}

func Executor(nodes ...*schedulerobjects.Node) *schedulerobjects.Executor {
executorId := uuid.NewString()
for _, node := range nodes {
Expand Down
5 changes: 4 additions & 1 deletion magefiles/ci.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,10 @@ func TestSuite() error {

// Checks if Armada is ready to accept jobs.
func CheckForArmadaRunning() error {
time.Sleep(30 * time.Second)
// This is a bit of a shonky check, it confirms the scheduler is up and receiving reports from the executor
// at which point the system should be ready
// TODO Make a good check to confirm the system is ready, such as seeing armadactl get executors return a value
mg.Deps(CheckSchedulerReady)
mg.Deps(createQueue)

// Set high to take compile time into account
Expand Down
45 changes: 38 additions & 7 deletions magefiles/developer.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"fmt"
"os"
"os/exec"
"regexp"
"strings"
"time"

Expand Down Expand Up @@ -111,28 +112,58 @@ func StopComponents() error {
return nil
}

// Repeatedly check logs until Pulsar is ready.
func CheckForPulsarRunning() error {
func CheckPulsarRunning() error {
return CheckDockerContainerRunning("pulsar", "alive")
}

func CheckPostgresRunning() error {
return CheckDockerContainerRunning("pulsar", "alive")
}

func CheckServerRunning() error {
return CheckDockerContainerRunning("server", "Starting http server listening on")
}

func CheckSchedulerRunning() error {
return CheckDockerContainerRunning("scheduler", "Starting http server listening on")
}

func CheckExecutorRunning() error {
return CheckDockerContainerRunning("executor", "Starting http server listening on")
}

func CheckSchedulerReady() error {
return CheckDockerContainerRunning("scheduler", "Retrieved [1-9]+ executors")
}

// Repeatedly check logs until container is ready.
func CheckDockerContainerRunning(containerName string, expectedLogRegex string) error {
timeout := time.After(1 * time.Minute)
tick := time.Tick(1 * time.Second)
seconds := 0

logMatchRegex, err := regexp.Compile(expectedLogRegex)
if err != nil {
return fmt.Errorf("invalid log regex %s - %s", expectedLogRegex, err)
}

for {
select {
case <-timeout:
return fmt.Errorf("timed out waiting for Pulsar to start")
return fmt.Errorf("timed out waiting for %s to start", containerName)
case <-tick:
out, err := dockerOutput("compose", "logs", "pulsar")
out, err := dockerOutput("compose", "logs", containerName)
if err != nil {
return err
}
if strings.Contains(out, "alive") {
if len(logMatchRegex.FindStringSubmatch(out)) > 0 {
// if seconds is less than 1, it means that pulsar had already started
if seconds < 1 {
fmt.Printf("\nPulsar had already started!\n\n")
fmt.Printf("\n%s had already started!\n\n", containerName)
return nil
}

fmt.Printf("\nPulsar took %d seconds to start!\n\n", seconds)
fmt.Printf("\n%s took %d seconds to start!\n\n", containerName, seconds)
return nil
}
seconds++
Expand Down
8 changes: 7 additions & 1 deletion magefiles/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -214,12 +214,18 @@ func LocalDev(arg string) error {

mg.Deps(StartDependencies)
fmt.Println("Waiting for dependencies to start...")
mg.Deps(CheckForPulsarRunning)
mg.Deps(CheckPulsarRunning)
mg.Deps(CheckPostgresRunning)

switch arg {
case "minimal":
os.Setenv("ARMADA_COMPONENTS", "executor,server,scheduler")
mg.Deps(StartComponents)
// This is a naive check to confirm the containers are running, it doesn't check they are ready
// TODO Make a good check to confirm the system is ready, such as seeing armadactl get executors return a value
mg.Deps(CheckServerRunning)
mg.Deps(CheckSchedulerRunning)
mg.Deps(CheckExecutorRunning)
case "debug", "no-build":
fmt.Println("Dependencies started, ending localdev...")
return nil
Expand Down