Skip to content

Commit

Permalink
unified metrics naming
Browse files Browse the repository at this point in the history
  • Loading branch information
lukas016 committed Sep 24, 2024
1 parent b8bc0cc commit f0a1ec9
Show file tree
Hide file tree
Showing 3 changed files with 127 additions and 60 deletions.
2 changes: 1 addition & 1 deletion cmd/libvirt-provider/app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -342,7 +342,7 @@ func Run(ctx context.Context, opts Options) error {
eventStore := machineevent.NewEventStore(log, opts.MachineEventStore)

machineReconciler, err := controllers.NewMachineReconciler(
log.WithName("machine-reconciler"),
log.WithName(controllers.MachineReconcilerName),
libvirt,
machineStore,
machineEvents,
Expand Down
94 changes: 69 additions & 25 deletions internal/controllers/machine_controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/ironcore-dev/libvirt-provider/internal/raw"
"github.com/ironcore-dev/libvirt-provider/internal/store"
"github.com/ironcore-dev/libvirt-provider/internal/utils"
"github.com/prometheus/client_golang/prometheus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/util/wait"
"k8s.io/client-go/util/workqueue"
Expand All @@ -45,6 +46,7 @@ const (
rootFSAlias = "ua-rootfs"
libvirtDomainXMLIgnitionKeyName = "opt/com.coreos/config"
networkInterfaceAliasPrefix = "ua-networkinterface-"
MachineReconcilerName = "machine-reconciler"
)

var (
Expand Down Expand Up @@ -102,24 +104,27 @@ func NewMachineReconciler(
log: log,
queue: workqueue.NewRateLimitingQueueWithConfig(workqueue.DefaultControllerRateLimiter(), workqueue.RateLimitingQueueConfig{
MetricsProvider: metrics.WorkqueueMetricsProvider{},
Name: "machines",
Name: MachineReconcilerName,
}),
libvirt: libvirt,
machines: machines,
machineEvents: machineEvents,
EventRecorder: eventRecorder,
guestCapabilities: opts.GuestCapabilities,
tcMallocLibPath: opts.TCMallocLibPath,
host: opts.Host,
imageCache: opts.ImageCache,
raw: opts.Raw,
volumePluginManager: opts.VolumePluginManager,
networkInterfacePlugin: opts.NetworkInterfacePlugin,
resyncIntervalVolumeSize: opts.ResyncIntervalVolumeSize,
resyncIntervalGarbageCollector: opts.ResyncIntervalGarbageCollector,
enableHugepages: opts.EnableHugepages,
gcVMGracefulShutdownTimeout: opts.GCVMGracefulShutdownTimeout,
volumeCachePolicy: opts.VolumeCachePolicy,
libvirt: libvirt,
machines: machines,
machineEvents: machineEvents,
EventRecorder: eventRecorder,
guestCapabilities: opts.GuestCapabilities,
tcMallocLibPath: opts.TCMallocLibPath,
host: opts.Host,
imageCache: opts.ImageCache,
raw: opts.Raw,
volumePluginManager: opts.VolumePluginManager,
networkInterfacePlugin: opts.NetworkInterfacePlugin,
resyncIntervalVolumeSize: opts.ResyncIntervalVolumeSize,
resyncIntervalGarbageCollector: opts.ResyncIntervalGarbageCollector,
enableHugepages: opts.EnableHugepages,
gcVMGracefulShutdownTimeout: opts.GCVMGracefulShutdownTimeout,
volumeCachePolicy: opts.VolumeCachePolicy,
metricsReconcileDuration: metrics.ControllerRuntimeReconcileDuration.WithLabelValues(MachineReconcilerName),
metricsControllerRuntimeActiveWorker: metrics.ControllerRuntimeActiveWorker.WithLabelValues(MachineReconcilerName),
metricsControllerRuntimeReconcileErrors: metrics.ControllerRuntimeReconcileErrors.WithLabelValues(MachineReconcilerName),
}, nil
}

Expand Down Expand Up @@ -149,13 +154,18 @@ type MachineReconciler struct {
resyncIntervalGarbageCollector time.Duration

volumeCachePolicy string

metricsReconcileDuration prometheus.Observer
metricsControllerRuntimeActiveWorker prometheus.Gauge
metricsControllerRuntimeReconcileErrors prometheus.Counter
}

func (r *MachineReconciler) Start(ctx context.Context) error {
log := r.log

//todo make configurable
workerSize := 1
workerSize := 15
metrics.ControllerRuntimeMaxConccurrentReconciles.WithLabelValues("machine-reconciler").Set(float64(workerSize))

r.imageCache.AddListener(providerimage.ListenerFuncs{
HandlePullDoneFunc: func(evt providerimage.PullDoneEvent) {
Expand Down Expand Up @@ -191,19 +201,19 @@ func (r *MachineReconciler) Start(ctx context.Context) error {
wg.Add(1)
go func() {
defer wg.Done()
r.startCheckAndEnqueueVolumeResize(ctx, r.log.WithName("volume-size"))
r.startCheckAndEnqueueVolumeResize(ctx, r.log)
}()

wg.Add(1)
go func() {
defer wg.Done()
r.startEnqueueMachineByLibvirtEvent(ctx, r.log.WithName("libvirt-event"))
r.startEnqueueMachineByLibvirtEvent(ctx, r.log)
}()

wg.Add(1)
go func() {
defer wg.Done()
r.startGarbageCollector(ctx, r.log.WithName("garbage-collector"))
r.startGarbageCollector(ctx, r.log)
}()

go func() {
Expand All @@ -225,9 +235,20 @@ func (r *MachineReconciler) Start(ctx context.Context) error {
}

func (r *MachineReconciler) startCheckAndEnqueueVolumeResize(ctx context.Context, log logr.Logger) {
const name = "volume-size"
log = log.WithName(name)
opsDuration := metrics.OperationDuration.WithLabelValues(name)
opsErrors := metrics.OperationErrors.WithLabelValues(name)

wait.UntilWithContext(ctx, func(ctx context.Context) {
startTime := time.Now()
defer func() {
opsDuration.Observe(float64(time.Since(startTime).Milliseconds()) / 1000)
}()

machines, err := r.machines.List(ctx)
if err != nil {
opsErrors.Inc()
log.Error(err, "failed to list machines")
return
}
Expand All @@ -241,18 +262,21 @@ func (r *MachineReconciler) startCheckAndEnqueueVolumeResize(ctx context.Context
for _, volume := range machine.Spec.Volumes {
plugin, err := r.volumePluginManager.FindPluginBySpec(volume)
if err != nil {
opsErrors.Inc()
log.Error(err, "failed to get volume plugin", "machineID", machine.ID, "volumeName", volume.Name)
continue
}

volumeID, err := plugin.GetBackingVolumeID(volume)
if err != nil {
opsErrors.Inc()
log.Error(err, "failed to get volume id", "machineID", machine.ID, "volumeName", volume.Name)
continue
}

volumeSize, err := plugin.GetSize(ctx, volume)
if err != nil {
opsErrors.Inc()
log.Error(err, "failed to get volume size", "machineID", machine.ID, "volumeName", volume.Name, "volumeID", volumeID)
continue
}
Expand All @@ -273,8 +297,13 @@ func (r *MachineReconciler) startCheckAndEnqueueVolumeResize(ctx context.Context
}

func (r *MachineReconciler) startEnqueueMachineByLibvirtEvent(ctx context.Context, log logr.Logger) {
const name = "libvirt-event"
log = log.WithName(name)
opsErrors := metrics.OperationErrors.WithLabelValues(name)

lifecycleEvents, err := r.libvirt.LifecycleEvents(ctx)
if err != nil {
opsErrors.Inc()
log.Error(err, "failed to subscribe to libvirt lifecycle events")
return
}
Expand All @@ -295,6 +324,7 @@ func (r *MachineReconciler) startEnqueueMachineByLibvirtEvent(ctx context.Contex
log.V(2).Info("Skipped: not managed by libvirt-provider", "machineID", evt.Dom.Name)
continue
}
opsErrors.Inc()
log.Error(err, "failed to fetch machine from store")
continue
}
Expand All @@ -309,9 +339,20 @@ func (r *MachineReconciler) startEnqueueMachineByLibvirtEvent(ctx context.Contex
}

func (r *MachineReconciler) startGarbageCollector(ctx context.Context, log logr.Logger) {
const name = "garbage-collector"
log = log.WithName(name)
opsDuration := metrics.OperationDuration.WithLabelValues(name)
opsErrors := metrics.OperationErrors.WithLabelValues(name)

wait.UntilWithContext(ctx, func(ctx context.Context) {
startTime := time.Now()
defer func() {
opsDuration.Observe(float64(time.Since(startTime).Milliseconds()) / 1000)
}()

machines, err := r.machines.List(ctx)
if err != nil {
opsErrors.Inc()
log.Error(err, "failed to list machines")
return
}
Expand All @@ -323,6 +364,7 @@ func (r *MachineReconciler) startGarbageCollector(ctx context.Context, log logr.

logger := log.WithValues("machineID", machine.ID)
if err := r.processMachineDeletion(ctx, logger, machine); err != nil {
opsErrors.Inc()
logger.Error(err, "failed to garbage collect machine")
}
}
Expand Down Expand Up @@ -432,22 +474,24 @@ func (r *MachineReconciler) processNextWorkItem(ctx context.Context, log logr.Lo
if shutdown {
return false
}

r.metricsControllerRuntimeActiveWorker.Inc()
defer r.queue.Done(item)
defer r.metricsControllerRuntimeActiveWorker.Dec()

id := item.(string)
log = log.WithValues("machineID", id)
ctx = logr.NewContext(ctx, log)

start := time.Now()
startTime := time.Now()
defer func() {
metrics.OpsTime.Observe(float64(time.Now().Sub(start).Milliseconds()) / 1000)
r.metricsReconcileDuration.Observe(float64(time.Since(startTime).Milliseconds()) / 1000)
}()

if err := r.reconcileMachine(ctx, id); err != nil {
log.Error(err, "failed to reconcile machine")
r.metricsControllerRuntimeReconcileErrors.Inc()
r.queue.AddRateLimited(item)
metrics.OpsFailed.Inc()

return true
}

Expand Down
91 changes: 57 additions & 34 deletions internal/metrics/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,43 +20,53 @@ const (
)

var (
OpsFailed = prometheus.NewCounter(prometheus.CounterOpts{
Name: "reconcile_total_number_failed_loops_counter",
Help: "How many errors happened during run time of loop",
})

OpsTime = prometheus.NewSummary(prometheus.SummaryOpts{
Name: "reconcile_total_duration_sec",
Help: "How long it took for method to run",
})

depth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
ControllerRuntimeReconcileErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "controller_runtime_reconcile_errors_total",
Help: "Total number of reconciliation errors per controller",
}, []string{"controller"})

ControllerRuntimeReconcileDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "controller_runtime_reconcile_duration_seconds",
Help: "Length of time per reconciliation per controller",
}, []string{"controller"})

ControllerRuntimeMaxConccurrentReconciles = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "controller_runtime_max_concurrent_reconciles",
Help: "Maximum number of concurrent reconciles per controller",
}, []string{"controller"})

ControllerRuntimeActiveWorker = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Name: "controller_runtime_active_workers",
Help: "Number of currently used workers per controller",
}, []string{"controller"})

workqueuDepth = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: DepthKey,
Help: "Current depth of workqueue",
}, []string{"name", "controller"})

adds = prometheus.NewCounterVec(prometheus.CounterOpts{
workqueueAdds = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: AddsKey,
Help: "Total number of adds handled by workqueue",
}, []string{"name", "controller"})

latency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
workqueueLatency = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: QueueLatencyKey,
Help: "How long in seconds an item stays in workqueue before being requested",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name", "controller"})

workDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
workqueueDuration = prometheus.NewHistogramVec(prometheus.HistogramOpts{
Subsystem: WorkQueueSubsystem,
Name: WorkDurationKey,
Help: "How long in seconds processing an item from workqueue takes.",
Buckets: prometheus.ExponentialBuckets(10e-9, 10, 12),
}, []string{"name", "controller"})

unfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{
workqueueUnfinished = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: UnfinishedWorkKey,
Help: "How many seconds of work has been done that " +
Expand All @@ -65,60 +75,73 @@ var (
"threads by observing the rate at which this increases.",
}, []string{"name", "controller"})

longestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{
workqueueLongestRunningProcessor = prometheus.NewGaugeVec(prometheus.GaugeOpts{
Subsystem: WorkQueueSubsystem,
Name: LongestRunningProcessorKey,
Help: "How many seconds has the longest running " +
"processor for workqueue been running.",
}, []string{"name", "controller"})

retries = prometheus.NewCounterVec(prometheus.CounterOpts{
workqueueRetries = prometheus.NewCounterVec(prometheus.CounterOpts{
Subsystem: WorkQueueSubsystem,
Name: RetriesKey,
Help: "Total number of retries handled by workqueue",
}, []string{"name", "controller"})

OperationDuration = prometheus.NewSummaryVec(prometheus.SummaryOpts{
Name: "operation_duration_seconds",
Help: "Length of time per operation",
}, []string{"operation"})

OperationErrors = prometheus.NewCounterVec(prometheus.CounterOpts{
Name: "operation_errors_total",
Help: "Total number of errors which affect main logic of operation",
}, []string{"operation"})
)

func init() {
prometheus.MustRegister(OpsFailed)
prometheus.MustRegister(OpsTime)
prometheus.MustRegister(depth)
prometheus.MustRegister(adds)
prometheus.MustRegister(latency)
prometheus.MustRegister(workDuration)
prometheus.MustRegister(unfinished)
prometheus.MustRegister(longestRunningProcessor)
prometheus.MustRegister(retries)

prometheus.MustRegister(ControllerRuntimeReconcileErrors)
prometheus.MustRegister(ControllerRuntimeReconcileDuration)
prometheus.MustRegister(ControllerRuntimeMaxConccurrentReconciles)
prometheus.MustRegister(ControllerRuntimeActiveWorker)
prometheus.MustRegister(OperationDuration)
prometheus.MustRegister(OperationErrors)
prometheus.MustRegister(workqueuDepth)
prometheus.MustRegister(workqueueAdds)
prometheus.MustRegister(workqueueLatency)
prometheus.MustRegister(workqueueDuration)
prometheus.MustRegister(workqueueUnfinished)
prometheus.MustRegister(workqueueLongestRunningProcessor)
prometheus.MustRegister(workqueueRetries)
workqueue.SetProvider(WorkqueueMetricsProvider{})
}

type WorkqueueMetricsProvider struct{}

func (WorkqueueMetricsProvider) NewDepthMetric(name string) workqueue.GaugeMetric {
return depth.WithLabelValues(name, name)
return workqueuDepth.WithLabelValues(name, name)
}

func (WorkqueueMetricsProvider) NewAddsMetric(name string) workqueue.CounterMetric {
return adds.WithLabelValues(name, name)
return workqueueAdds.WithLabelValues(name, name)
}

func (WorkqueueMetricsProvider) NewLatencyMetric(name string) workqueue.HistogramMetric {
return latency.WithLabelValues(name, name)
return workqueueLatency.WithLabelValues(name, name)
}

func (WorkqueueMetricsProvider) NewWorkDurationMetric(name string) workqueue.HistogramMetric {
return workDuration.WithLabelValues(name, name)
return workqueueDuration.WithLabelValues(name, name)
}

func (WorkqueueMetricsProvider) NewUnfinishedWorkSecondsMetric(name string) workqueue.SettableGaugeMetric {
return unfinished.WithLabelValues(name, name)
return workqueueUnfinished.WithLabelValues(name, name)
}

func (WorkqueueMetricsProvider) NewLongestRunningProcessorSecondsMetric(name string) workqueue.SettableGaugeMetric {
return longestRunningProcessor.WithLabelValues(name, name)
return workqueueLongestRunningProcessor.WithLabelValues(name, name)
}

func (WorkqueueMetricsProvider) NewRetriesMetric(name string) workqueue.CounterMetric {
return retries.WithLabelValues(name, name)
return workqueueRetries.WithLabelValues(name, name)
}

0 comments on commit f0a1ec9

Please sign in to comment.