From 71837b3d2bffb1cc5299626a9e68032ac812e756 Mon Sep 17 00:00:00 2001 From: Walid Baruni Date: Sun, 26 Jan 2025 13:14:53 +0200 Subject: [PATCH] clean up logs --- pkg/models/plan.go | 16 --------- pkg/orchestrator/planner/state_updater.go | 2 +- pkg/orchestrator/planner/utils_test.go | 34 +++++++++---------- .../scheduler/batch_service_job.go | 32 +---------------- 4 files changed, 19 insertions(+), 65 deletions(-) diff --git a/pkg/models/plan.go b/pkg/models/plan.go index c114765eb2..e0ee983a96 100644 --- a/pkg/models/plan.go +++ b/pkg/models/plan.go @@ -1,9 +1,5 @@ package models -import ( - "github.com/rs/zerolog/log" -) - type PlanExecutionUpdate struct { Execution *Execution `json:"Execution"` DesiredState ExecutionDesiredStateType `json:"DesiredState"` @@ -58,12 +54,6 @@ func (p *Plan) AppendExecution(execution *Execution, event Event) { // AppendStoppedExecution marks an execution to be stopped. func (p *Plan) AppendStoppedExecution(execution *Execution, event Event, computeState ExecutionStateType) { - log.Debug(). - Str("OldDesiredState", execution.DesiredState.StateType.String()). - Str("NewDesiredState", ExecutionDesiredStateStopped.String()). - Str("ExecutionID", execution.ID). - Str("OldComputeState", execution.ComputeState.StateType.String()). - Msgf("Plan: AppendStoppedExecution: %s", execution.ID) updateRequest := &PlanExecutionUpdate{ Execution: execution, DesiredState: ExecutionDesiredStateStopped, @@ -76,12 +66,6 @@ func (p *Plan) AppendStoppedExecution(execution *Execution, event Event, compute // AppendApprovedExecution marks an execution as accepted and ready to be started. func (p *Plan) AppendApprovedExecution(execution *Execution, event Event) { - log.Debug(). - Str("OldDesiredState", execution.DesiredState.StateType.String()). - Str("NewDesiredState", ExecutionDesiredStateRunning.String()). - Str("ExecutionID", execution.ID). - Str("OldComputeState", execution.ComputeState.StateType.String()). - Msgf("Plan: AppendApprovedExecution: %s", execution.ID) updateRequest := &PlanExecutionUpdate{ Execution: execution, DesiredState: ExecutionDesiredStateRunning, diff --git a/pkg/orchestrator/planner/state_updater.go b/pkg/orchestrator/planner/state_updater.go index 3787f8b535..182c2a09bf 100644 --- a/pkg/orchestrator/planner/state_updater.go +++ b/pkg/orchestrator/planner/state_updater.go @@ -129,7 +129,7 @@ func (s *StateUpdater) processJobState( NewState: plan.DesiredJobState, Message: plan.UpdateMessage, Condition: jobstore.UpdateJobCondition{ - ExpectedState: plan.Job.State.StateType, + ExpectedRevision: plan.Job.Revision, }, }); err != nil { return err diff --git a/pkg/orchestrator/planner/utils_test.go b/pkg/orchestrator/planner/utils_test.go index 9c8b4a1507..ab7b210a7b 100644 --- a/pkg/orchestrator/planner/utils_test.go +++ b/pkg/orchestrator/planner/utils_test.go @@ -126,34 +126,34 @@ func (m *UpdateExecutionMatcher) String() string { // UpdateJobMatcher is a matcher for the UpdateJobState method of the JobStore interface. type UpdateJobMatcher struct { - t *testing.T - job *models.Job - newState models.JobStateType - comment string - expectedState models.JobStateType + t *testing.T + job *models.Job + newState models.JobStateType + comment string + expectedRevision uint64 } type UpdateJobMatcherParams struct { - NewState models.JobStateType - Comment string - ExpectedState models.JobStateType + NewState models.JobStateType + Comment string + ExpectedRevision uint64 } func NewUpdateJobMatcher(t *testing.T, job *models.Job, params UpdateJobMatcherParams) *UpdateJobMatcher { return &UpdateJobMatcher{ - t: t, - job: job, - newState: params.NewState, - comment: params.Comment, - expectedState: params.ExpectedState, + t: t, + job: job, + newState: params.NewState, + comment: params.Comment, + expectedRevision: params.ExpectedRevision, } } func NewUpdateJobMatcherFromPlanUpdate(t *testing.T, plan *models.Plan) *UpdateJobMatcher { return NewUpdateJobMatcher(t, plan.Job, UpdateJobMatcherParams{ - NewState: plan.DesiredJobState, - Comment: plan.UpdateMessage, - ExpectedState: plan.Job.State.StateType, + NewState: plan.DesiredJobState, + Comment: plan.UpdateMessage, + ExpectedRevision: plan.Job.Revision, }) } @@ -169,7 +169,7 @@ func (m *UpdateJobMatcher) Matches(x interface{}) bool { NewState: m.newState, Message: m.comment, Condition: jobstore.UpdateJobCondition{ - ExpectedState: m.expectedState, + ExpectedRevision: m.expectedRevision, }, } return reflect.DeepEqual(expectedRequest, req) diff --git a/pkg/orchestrator/scheduler/batch_service_job.go b/pkg/orchestrator/scheduler/batch_service_job.go index 4ba654fa4d..438038d33d 100644 --- a/pkg/orchestrator/scheduler/batch_service_job.go +++ b/pkg/orchestrator/scheduler/batch_service_job.go @@ -186,12 +186,6 @@ func (b *BatchServiceJobScheduler) loadJobState(ctx context.Context, metrics *te } metrics.Latency(ctx, processPartDuration, AttrOperationPartGetExecs) metrics.Histogram(ctx, executionsExisting, float64(len(jobExecutions))) - - // loop and log execution id, desired state and compute state - for _, exec := range jobExecutions { - log.Ctx(ctx).Debug().Msgf("Found Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState, - exec.ComputeState) - } return job, jobExecutions, nil } @@ -232,32 +226,8 @@ func (b *BatchServiceJobScheduler) handleTimeouts(ctx context.Context, metrics * func (b *BatchServiceJobScheduler) approveRejectExecs(nonDiscardedExecs execSet, plan *models.Plan) { // Process each partition independently, ensuring only one execution // can be active per partition at any time - for partitionIndex, partitionExecs := range nonDiscardedExecs.groupByPartition() { + for _, partitionExecs := range nonDiscardedExecs.groupByPartition() { execsByApprovalStatus := partitionExecs.getApprovalStatuses() - - // loop and log the partition index, and the number of executions in each state - log.Debug().Msgf("Partition %d: %d to approve, %d to reject, %d to cancel", partitionIndex, - len(execsByApprovalStatus.toApprove), len(execsByApprovalStatus.toReject), len(execsByApprovalStatus.toCancel)) - - for _, exec := range partitionExecs { - log.Debug().Msgf("approveRejectExecs Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState, - exec.ComputeState) - } - - // loop and log the execution id, desired state and compute state for each toApprove, toReject and toCancel - for _, exec := range execsByApprovalStatus.toApprove { - log.Debug().Msgf("ToApprove Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState, - exec.ComputeState) - } - for _, exec := range execsByApprovalStatus.toReject { - log.Debug().Msgf("ToReject Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState, - exec.ComputeState) - } - for _, exec := range execsByApprovalStatus.toCancel { - log.Debug().Msgf("ToCancel Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState, - exec.ComputeState) - } - execsByApprovalStatus.toApprove.markApproved(plan, orchestrator.ExecRunningEvent()) execsByApprovalStatus.toReject.markRejected(plan, orchestrator.ExecStoppedByNodeRejectedEvent()) execsByApprovalStatus.toCancel.markCancelled(plan, orchestrator.ExecStoppedByOversubscriptionEvent())