Skip to content

Commit

Permalink
clean up logs
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni committed Jan 26, 2025
1 parent d237180 commit 71837b3
Show file tree
Hide file tree
Showing 4 changed files with 19 additions and 65 deletions.
16 changes: 0 additions & 16 deletions pkg/models/plan.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,5 @@
package models

import (
"github.com/rs/zerolog/log"
)

type PlanExecutionUpdate struct {
Execution *Execution `json:"Execution"`
DesiredState ExecutionDesiredStateType `json:"DesiredState"`
Expand Down Expand Up @@ -58,12 +54,6 @@ func (p *Plan) AppendExecution(execution *Execution, event Event) {

// AppendStoppedExecution marks an execution to be stopped.
func (p *Plan) AppendStoppedExecution(execution *Execution, event Event, computeState ExecutionStateType) {
log.Debug().
Str("OldDesiredState", execution.DesiredState.StateType.String()).
Str("NewDesiredState", ExecutionDesiredStateStopped.String()).
Str("ExecutionID", execution.ID).
Str("OldComputeState", execution.ComputeState.StateType.String()).
Msgf("Plan: AppendStoppedExecution: %s", execution.ID)
updateRequest := &PlanExecutionUpdate{
Execution: execution,
DesiredState: ExecutionDesiredStateStopped,
Expand All @@ -76,12 +66,6 @@ func (p *Plan) AppendStoppedExecution(execution *Execution, event Event, compute

// AppendApprovedExecution marks an execution as accepted and ready to be started.
func (p *Plan) AppendApprovedExecution(execution *Execution, event Event) {
log.Debug().
Str("OldDesiredState", execution.DesiredState.StateType.String()).
Str("NewDesiredState", ExecutionDesiredStateRunning.String()).
Str("ExecutionID", execution.ID).
Str("OldComputeState", execution.ComputeState.StateType.String()).
Msgf("Plan: AppendApprovedExecution: %s", execution.ID)
updateRequest := &PlanExecutionUpdate{
Execution: execution,
DesiredState: ExecutionDesiredStateRunning,
Expand Down
2 changes: 1 addition & 1 deletion pkg/orchestrator/planner/state_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ func (s *StateUpdater) processJobState(
NewState: plan.DesiredJobState,
Message: plan.UpdateMessage,
Condition: jobstore.UpdateJobCondition{
ExpectedState: plan.Job.State.StateType,
ExpectedRevision: plan.Job.Revision,
},
}); err != nil {
return err
Expand Down
34 changes: 17 additions & 17 deletions pkg/orchestrator/planner/utils_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,34 +126,34 @@ func (m *UpdateExecutionMatcher) String() string {

// UpdateJobMatcher is a matcher for the UpdateJobState method of the JobStore interface.
type UpdateJobMatcher struct {
t *testing.T
job *models.Job
newState models.JobStateType
comment string
expectedState models.JobStateType
t *testing.T
job *models.Job
newState models.JobStateType
comment string
expectedRevision uint64
}

type UpdateJobMatcherParams struct {
NewState models.JobStateType
Comment string
ExpectedState models.JobStateType
NewState models.JobStateType
Comment string
ExpectedRevision uint64
}

func NewUpdateJobMatcher(t *testing.T, job *models.Job, params UpdateJobMatcherParams) *UpdateJobMatcher {
return &UpdateJobMatcher{
t: t,
job: job,
newState: params.NewState,
comment: params.Comment,
expectedState: params.ExpectedState,
t: t,
job: job,
newState: params.NewState,
comment: params.Comment,
expectedRevision: params.ExpectedRevision,
}
}

func NewUpdateJobMatcherFromPlanUpdate(t *testing.T, plan *models.Plan) *UpdateJobMatcher {
return NewUpdateJobMatcher(t, plan.Job, UpdateJobMatcherParams{
NewState: plan.DesiredJobState,
Comment: plan.UpdateMessage,
ExpectedState: plan.Job.State.StateType,
NewState: plan.DesiredJobState,
Comment: plan.UpdateMessage,
ExpectedRevision: plan.Job.Revision,
})
}

Expand All @@ -169,7 +169,7 @@ func (m *UpdateJobMatcher) Matches(x interface{}) bool {
NewState: m.newState,
Message: m.comment,
Condition: jobstore.UpdateJobCondition{
ExpectedState: m.expectedState,
ExpectedRevision: m.expectedRevision,
},
}
return reflect.DeepEqual(expectedRequest, req)
Expand Down
32 changes: 1 addition & 31 deletions pkg/orchestrator/scheduler/batch_service_job.go
Original file line number Diff line number Diff line change
Expand Up @@ -186,12 +186,6 @@ func (b *BatchServiceJobScheduler) loadJobState(ctx context.Context, metrics *te
}
metrics.Latency(ctx, processPartDuration, AttrOperationPartGetExecs)
metrics.Histogram(ctx, executionsExisting, float64(len(jobExecutions)))

// loop and log execution id, desired state and compute state
for _, exec := range jobExecutions {
log.Ctx(ctx).Debug().Msgf("Found Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState,
exec.ComputeState)
}
return job, jobExecutions, nil
}

Expand Down Expand Up @@ -232,32 +226,8 @@ func (b *BatchServiceJobScheduler) handleTimeouts(ctx context.Context, metrics *
func (b *BatchServiceJobScheduler) approveRejectExecs(nonDiscardedExecs execSet, plan *models.Plan) {
// Process each partition independently, ensuring only one execution
// can be active per partition at any time
for partitionIndex, partitionExecs := range nonDiscardedExecs.groupByPartition() {
for _, partitionExecs := range nonDiscardedExecs.groupByPartition() {
execsByApprovalStatus := partitionExecs.getApprovalStatuses()

// loop and log the partition index, and the number of executions in each state
log.Debug().Msgf("Partition %d: %d to approve, %d to reject, %d to cancel", partitionIndex,
len(execsByApprovalStatus.toApprove), len(execsByApprovalStatus.toReject), len(execsByApprovalStatus.toCancel))

for _, exec := range partitionExecs {
log.Debug().Msgf("approveRejectExecs Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState,
exec.ComputeState)
}

// loop and log the execution id, desired state and compute state for each toApprove, toReject and toCancel
for _, exec := range execsByApprovalStatus.toApprove {
log.Debug().Msgf("ToApprove Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState,
exec.ComputeState)
}
for _, exec := range execsByApprovalStatus.toReject {
log.Debug().Msgf("ToReject Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState,
exec.ComputeState)
}
for _, exec := range execsByApprovalStatus.toCancel {
log.Debug().Msgf("ToCancel Execution %s: DesiredState=%s, ComputeState=%s", exec.ID, exec.DesiredState,
exec.ComputeState)
}

execsByApprovalStatus.toApprove.markApproved(plan, orchestrator.ExecRunningEvent())
execsByApprovalStatus.toReject.markRejected(plan, orchestrator.ExecStoppedByNodeRejectedEvent())
execsByApprovalStatus.toCancel.markCancelled(plan, orchestrator.ExecStoppedByOversubscriptionEvent())
Expand Down

0 comments on commit 71837b3

Please sign in to comment.