Skip to content

Commit

Permalink
Merge branch 'main' into devstack-config
Browse files Browse the repository at this point in the history
  • Loading branch information
wdbaruni authored Jan 26, 2025
2 parents 9461497 + ab21916 commit dc7afea
Showing 1 changed file with 103 additions and 14 deletions.
117 changes: 103 additions & 14 deletions pkg/orchestrator/planner/logging_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,54 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
)

type LoggingPlanner struct {
}
const defaultLogLevel = zerolog.TraceLevel

// LoggingPlanner is a debug-focused component that logs the execution of plans.
// It tracks job state transitions, execution lifecycle, and evaluation scheduling
// at different verbosity levels:
// - Job completions and failures are logged at INFO and WARN respectively
// - Job state transitions are logged at TRACE level
// - Execution and evaluation details are logged at TRACE level when enabled
//
// This planner is meant to be used in conjunction with other planners for debugging purposes.
type LoggingPlanner struct{}

func NewLoggingPlanner() *LoggingPlanner {
return &LoggingPlanner{}
}

func (s *LoggingPlanner) Process(ctx context.Context, plan *models.Plan) error {
// Log state changes first as they're most important
s.logJobStateChanges(ctx, plan)

// Only log detailed execution and evaluation info at trace level
if zerolog.GlobalLevel() <= defaultLogLevel {
s.logNewExecutions(ctx, plan)
s.logExecutionUpdates(ctx, plan)
s.logEvaluations(ctx, plan)
}

return nil
}

func (s *LoggingPlanner) logJobStateChanges(ctx context.Context, plan *models.Plan) {
if plan.DesiredJobState.IsUndefined() {
return
}

dict := zerolog.Dict()
var eventMessage string
var delim string
for _, event := range plan.JobEvents {
for key, value := range event.Details {
dict = dict.Str(key, value)
var eventMsg string
for i, event := range plan.JobEvents {
if i > 0 {
eventMsg += ". "
}
eventMsg += event.Message
for k, v := range event.Details {
dict = dict.Str(k, v)
}
eventMessage += delim + event.Message
delim = ". "
}

level := zerolog.TraceLevel
level := defaultLogLevel
message := "Job updated"
switch plan.DesiredJobState {
case models.JobStateTypeCompleted:
Expand All @@ -41,12 +69,73 @@ func (s *LoggingPlanner) Process(ctx context.Context, plan *models.Plan) error {
default:
}

log.Ctx(ctx).WithLevel(level).
logger := log.Ctx(ctx).WithLevel(level).
Dict("Details", dict).
Str("Event", eventMessage).
Str("Event", eventMsg).
Str("JobID", plan.Job.ID).
Msg(message)
return nil
Str("OldState", plan.Job.State.StateType.String()).
Str("NewState", plan.DesiredJobState.String()).
Uint64("OldRevision", plan.Job.Revision)

if plan.UpdateMessage != "" {
logger = logger.Str("Reason", plan.UpdateMessage)
}

logger.Msg(message)
}

func (s *LoggingPlanner) logNewExecutions(ctx context.Context, plan *models.Plan) {
for _, exec := range plan.NewExecutions {
log.Ctx(ctx).WithLevel(defaultLogLevel).
Str("JobID", plan.Job.ID).
Str("ExecutionID", exec.ID).
Str("NodeID", exec.NodeID).
Int("PartitionIndex", exec.PartitionIndex).
Str("DesiredState", exec.DesiredState.StateType.String()).
Str("ComputeState", exec.ComputeState.StateType.String()).
Msg("New execution created")
}
}

func (s *LoggingPlanner) logExecutionUpdates(ctx context.Context, plan *models.Plan) {
for execID, update := range plan.UpdatedExecutions {
logger := log.Ctx(ctx).WithLevel(defaultLogLevel).
Str("JobID", update.Execution.JobID).
Str("ExecutionID", execID).
Str("NodeID", update.Execution.NodeID).
Int("PartitionIndex", update.Execution.PartitionIndex).
Str("OldState", update.Execution.DesiredState.StateType.String()).
Str("NewState", update.DesiredState.String()).
Str("OldComputeState", update.Execution.ComputeState.StateType.String()).
Str("NewComputeState", update.ComputeState.String())

// Include event details if present
if update.Event.Message != "" {
logger = logger.Str("Reason", update.Event.Message)
}
for k, v := range update.Event.Details {
logger = logger.Str(k, v)
}

logger.Msg("Execution updated")
}
}

func (s *LoggingPlanner) logEvaluations(ctx context.Context, plan *models.Plan) {
for _, eval := range plan.NewEvaluations {
logger := log.Ctx(ctx).WithLevel(defaultLogLevel).
Str("JobID", plan.Job.ID).
Str("EvalID", eval.ID).
Str("TriggeredBy", eval.TriggeredBy)

if !eval.WaitUntil.IsZero() {
logger = logger.Time("WaitUntil", eval.WaitUntil)
}
if eval.Comment != "" {
logger = logger.Str("Reason", eval.Comment)
}
logger.Msg("New evaluation")
}
}

// compile-time check whether the LoggingPlanner implements the Planner interface.
Expand Down

0 comments on commit dc7afea

Please sign in to comment.