Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor LoggingPlanner for better debugging #4826

Merged
merged 2 commits into from
Jan 26, 2025
Merged
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
117 changes: 103 additions & 14 deletions pkg/orchestrator/planner/logging_planner.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,26 +10,54 @@ import (
"github.com/bacalhau-project/bacalhau/pkg/orchestrator"
)

type LoggingPlanner struct {
}
const defaultLogLevel = zerolog.TraceLevel

// LoggingPlanner is a debug-focused component that logs the execution of plans.
// It tracks job state transitions, execution lifecycle, and evaluation scheduling
// at different verbosity levels:
// - Job completions and failures are logged at INFO and WARN respectively
// - Job state transitions are logged at TRACE level
// - Execution and evaluation details are logged at TRACE level when enabled
//
// This planner is meant to be used in conjunction with other planners for debugging purposes.
type LoggingPlanner struct{}

func NewLoggingPlanner() *LoggingPlanner {
return &LoggingPlanner{}
}

func (s *LoggingPlanner) Process(ctx context.Context, plan *models.Plan) error {
// Log state changes first as they're most important
s.logJobStateChanges(ctx, plan)

// Only log detailed execution and evaluation info at trace level
if zerolog.GlobalLevel() <= defaultLogLevel {
s.logNewExecutions(ctx, plan)
s.logExecutionUpdates(ctx, plan)
s.logEvaluations(ctx, plan)
}

return nil
}

func (s *LoggingPlanner) logJobStateChanges(ctx context.Context, plan *models.Plan) {
if plan.DesiredJobState.IsUndefined() {
return
}

dict := zerolog.Dict()
var eventMessage string
var delim string
for _, event := range plan.JobEvents {
for key, value := range event.Details {
dict = dict.Str(key, value)
var eventMsg string
for i, event := range plan.JobEvents {
if i > 0 {
eventMsg += ". "
}
eventMsg += event.Message
for k, v := range event.Details {
dict = dict.Str(k, v)
}
eventMessage += delim + event.Message
delim = ". "
}

level := zerolog.TraceLevel
level := defaultLogLevel
message := "Job updated"
switch plan.DesiredJobState {
case models.JobStateTypeCompleted:
Expand All @@ -41,12 +69,73 @@ func (s *LoggingPlanner) Process(ctx context.Context, plan *models.Plan) error {
default:
}

log.Ctx(ctx).WithLevel(level).
logger := log.Ctx(ctx).WithLevel(level).
Dict("Details", dict).
Str("Event", eventMessage).
Str("Event", eventMsg).
Str("JobID", plan.Job.ID).
Msg(message)
return nil
Str("OldState", plan.Job.State.StateType.String()).
Str("NewState", plan.DesiredJobState.String()).
Uint64("OldRevision", plan.Job.Revision)

if plan.UpdateMessage != "" {
logger = logger.Str("Reason", plan.UpdateMessage)
}

logger.Msg(message)
}

func (s *LoggingPlanner) logNewExecutions(ctx context.Context, plan *models.Plan) {
for _, exec := range plan.NewExecutions {
log.Ctx(ctx).WithLevel(defaultLogLevel).
Str("JobID", plan.Job.ID).
Str("ExecutionID", exec.ID).
Str("NodeID", exec.NodeID).
Int("PartitionIndex", exec.PartitionIndex).
Str("DesiredState", exec.DesiredState.StateType.String()).
Str("ComputeState", exec.ComputeState.StateType.String()).
Msg("New execution created")
}
}

func (s *LoggingPlanner) logExecutionUpdates(ctx context.Context, plan *models.Plan) {
for execID, update := range plan.UpdatedExecutions {
logger := log.Ctx(ctx).WithLevel(defaultLogLevel).
Str("JobID", update.Execution.JobID).
Str("ExecutionID", execID).
Str("NodeID", update.Execution.NodeID).
Int("PartitionIndex", update.Execution.PartitionIndex).
Str("OldState", update.Execution.DesiredState.StateType.String()).
Str("NewState", update.DesiredState.String()).
Str("OldComputeState", update.Execution.ComputeState.StateType.String()).
Str("NewComputeState", update.ComputeState.String())

// Include event details if present
if update.Event.Message != "" {
logger = logger.Str("Reason", update.Event.Message)
}
for k, v := range update.Event.Details {
logger = logger.Str(k, v)
}

logger.Msg("Execution updated")
}
}

func (s *LoggingPlanner) logEvaluations(ctx context.Context, plan *models.Plan) {
for _, eval := range plan.NewEvaluations {
logger := log.Ctx(ctx).WithLevel(defaultLogLevel).
Str("JobID", plan.Job.ID).
Str("EvalID", eval.ID).
Str("TriggeredBy", eval.TriggeredBy)

if !eval.WaitUntil.IsZero() {
logger = logger.Time("WaitUntil", eval.WaitUntil)
}
if eval.Comment != "" {
logger = logger.Str("Reason", eval.Comment)
}
logger.Msg("New evaluation")
}
}

// compile-time check whether the LoggingPlanner implements the Planner interface.
Expand Down
Loading