Skip to content

Commit

Permalink
merge with base
Browse files Browse the repository at this point in the history
  • Loading branch information
almostinf committed Dec 22, 2023
1 parent baa2e85 commit e54f806
Show file tree
Hide file tree
Showing 8 changed files with 209 additions and 143 deletions.
16 changes: 3 additions & 13 deletions checker/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package worker

import (
"errors"
"runtime"
"sync/atomic"
"time"

Expand Down Expand Up @@ -127,17 +126,8 @@ func (check *Checker) startCheckerWorker(w checkerWorker) error {
return nil
}

maxParallelChecks := w.MaxParallelChecks()
if maxParallelChecks == 0 {
maxParallelChecks = runtime.NumCPU()

check.Logger.Info().
Int("number_of_cpu", maxParallelChecks).
Msg("MaxParallel" + w.Name() + "Checks is not configured, set it to the number of CPU")
}

const maxParallelChecksMaxValue = 1024 * 8
if maxParallelChecks > maxParallelChecksMaxValue {
if w.MaxParallelChecks() > maxParallelChecksMaxValue {
return errors.New("MaxParallel" + w.Name() + "Checks value is too large")
}

Expand All @@ -146,10 +136,10 @@ func (check *Checker) startCheckerWorker(w checkerWorker) error {

triggerIdsToCheckChan := check.startTriggerToCheckGetter(
w.GetTriggersToCheck,
maxParallelChecks,
w.MaxParallelChecks(),
)

for i := 0; i < maxParallelChecks; i++ {
for i := 0; i < w.MaxParallelChecks(); i++ {
check.tomb.Go(func() error {
return check.startTriggerHandler(
triggerIdsToCheckChan,
Expand Down
32 changes: 32 additions & 0 deletions cmd/checker/config.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package main

import (
"runtime"

"github.com/moira-alert/moira"
"github.com/moira-alert/moira/checker"
"github.com/moira-alert/moira/cmd"
Expand Down Expand Up @@ -39,6 +41,8 @@ type checkerConfig struct {
MaxParallelChecks int `yaml:"max_parallel_checks"`
// Max concurrent remote checkers to run. Equals to the number of processor cores found on Moira host by default or when variable is defined as 0.
MaxParallelRemoteChecks int `yaml:"max_parallel_remote_checks"`
// Max concurrent prometheus checkers to run. Equals to the number of processor cores found on Moira host by default or when variable is defined as 0.
MaxParallelPrometheusChecks int `yaml:"max_parallel_prometheus_checks"`
// Specify log level by entities
SetLogLevel triggersLogConfig `yaml:"set_log_level"`
// Metric event pop operation batch size
Expand All @@ -47,6 +51,15 @@ type checkerConfig struct {
MetricEventPopDelay string `yaml:"metric_event_pop_delay"`
}

func handleParallelChecks(parallelChecks *int) bool {
if parallelChecks != nil && *parallelChecks == 0 {
*parallelChecks = runtime.NumCPU()
return true
}

return false
}

func (config *checkerConfig) getSettings(logger moira.Logger) *checker.Config {
logTriggersToLevel := make(map[string]string)
for _, v := range config.SetLogLevel.TriggersToLevel {
Expand All @@ -56,13 +69,32 @@ func (config *checkerConfig) getSettings(logger moira.Logger) *checker.Config {
Int("number_of_triggers", len(logTriggersToLevel)).
Msg("Found dynamic log rules in config for some triggers")

if handleParallelChecks(&config.MaxParallelChecks) {
logger.Info().
Int("number_of_cpu", config.MaxParallelChecks).
Msg("MaxParallelChecks is not configured, set it to the number of CPU")
}

if handleParallelChecks(&config.MaxParallelRemoteChecks) {
logger.Info().
Int("number_of_cpu", config.MaxParallelRemoteChecks).
Msg("MaxParallelRemoteChecks is not configured, set it to the number of CPU")
}

if handleParallelChecks(&config.MaxParallelPrometheusChecks) {
logger.Info().
Int("number_of_cpu", config.MaxParallelPrometheusChecks).
Msg("MaxParallelPrometheusChecks is not configured, set it to the number of CPU")
}

return &checker.Config{
CheckInterval: to.Duration(config.CheckInterval),
LazyTriggersCheckInterval: to.Duration(config.LazyTriggersCheckInterval),
NoDataCheckInterval: to.Duration(config.NoDataCheckInterval),
StopCheckingIntervalSeconds: int64(to.Duration(config.StopCheckingInterval).Seconds()),
MaxParallelLocalChecks: config.MaxParallelChecks,
MaxParallelRemoteChecks: config.MaxParallelRemoteChecks,
MaxParallelPrometheusChecks: config.MaxParallelPrometheusChecks,
LogTriggersToLevel: logTriggersToLevel,
MetricEventPopBatchSize: int64(config.MetricEventPopBatchSize),
MetricEventPopDelay: to.Duration(config.MetricEventPopDelay),
Expand Down
127 changes: 78 additions & 49 deletions database/redis/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type notificationTypes struct {
valid, toRemove, toResave []*moira.ScheduledNotification
Valid, ToRemove, ToResaveNew, ToResaveOld []*moira.ScheduledNotification
}

// Drops all notifications with last timestamp
Expand Down Expand Up @@ -121,19 +121,12 @@ func (connector *DbConnector) removeNotifications(ctx context.Context, pipe redi
return 0, fmt.Errorf("failed to remove notifications: %w", err)
}

total := int64(0)
var total int64
for _, val := range response {
intVal, _ := val.(*redis.IntCmd).Result()
total += intVal
}

if int(total) != len(notifications) {
connector.logger.Warning().
Int("need_to_delete", len(notifications)).
Int("deleted", int(total)).
Msg("Number of deletions does not match the number of notifications to be deleted")
}

return total, nil
}

Expand Down Expand Up @@ -195,7 +188,7 @@ func (connector *DbConnector) getNotificationsTriggerChecks(notifications []*moi
}

// Helper function for logging information on to remove notifications
func logRemovedNotifications(logger moira.Logger, toRemoveNotifications []*moira.ScheduledNotification) {
func logToRemoveNotifications(logger moira.Logger, toRemoveNotifications []*moira.ScheduledNotification) {
if len(toRemoveNotifications) == 0 {
return
}
Expand All @@ -218,45 +211,44 @@ func logRemovedNotifications(logger moira.Logger, toRemoveNotifications []*moira

logger.Info().
Interface("notification_trigger_ids", triggerIDs).
Int("removed_count", len(toRemoveNotifications)).
Msg("Removed notifications")
Int("to_remove_count", len(toRemoveNotifications)).
Msg("To remove notifications")
}

// filterNotificationsByState filters notifications based on their state to the corresponding arrays
func (connector *DbConnector) filterNotificationsByState(notifications []*moira.ScheduledNotification) (notificationTypes, error) {
types := notificationTypes{
valid: make([]*moira.ScheduledNotification, 0, len(notifications)),
toRemove: make([]*moira.ScheduledNotification, 0, len(notifications)),
toResave: make([]*moira.ScheduledNotification, 0, len(notifications)),
Valid: make([]*moira.ScheduledNotification, 0, len(notifications)),
ToRemove: make([]*moira.ScheduledNotification, 0, len(notifications)),
ToResaveNew: make([]*moira.ScheduledNotification, 0, len(notifications)),
ToResaveOld: make([]*moira.ScheduledNotification, 0, len(notifications)),
}

triggerChecks, err := connector.getNotificationsTriggerChecks(notifications)
if err != nil {
return notificationTypes{}, fmt.Errorf("failed to get notifications trigger checks: %w", err)
}

removedNotifications := make([]*moira.ScheduledNotification, 0, len(notifications))

for i, notification := range notifications {
if notification != nil {
switch notification.GetState(triggerChecks[i]) {
case moira.ValidNotification:
types.valid = append(types.valid, notification)
types.Valid = append(types.Valid, notification)

case moira.RemovedNotification:
types.toRemove = append(types.toRemove, notification)
removedNotifications = append(removedNotifications, notification)
types.ToRemove = append(types.ToRemove, notification)

case moira.ResavedNotification:
types.toRemove = append(types.toRemove, notification)
types.ToResaveOld = append(types.ToResaveOld, notification)

updatedNotification := *notification
updatedNotification.Timestamp = time.Now().Add(connector.notification.ResaveTime).Unix()
types.toResave = append(types.toResave, &updatedNotification)
types.ToResaveNew = append(types.ToResaveNew, &updatedNotification)
}
}
}

logRemovedNotifications(connector.logger, removedNotifications)
logToRemoveNotifications(connector.logger, types.ToRemove)

return types, nil
}
Expand All @@ -277,8 +269,8 @@ func (connector *DbConnector) handleNotifications(notifications []*moira.Schedul

if len(delayedNotifications) == 0 {
return notificationTypes{
valid: notDelayedNotifications,
toRemove: notDelayedNotifications,
Valid: notDelayedNotifications,
ToRemove: notDelayedNotifications,
}, nil
}

Expand All @@ -289,22 +281,24 @@ func (connector *DbConnector) handleNotifications(notifications []*moira.Schedul

connector.logger.Debug().
Fields(map[string]interface{}{
"valid": types.valid,
"valid_count": len(types.valid),
"removed": types.toRemove,
"removed_count": len(types.toRemove),
"resaved": types.toResave,
"resaved_count": len(types.toResave),
"valid": types.Valid,
"valid_count": len(types.Valid),
"removed": types.ToRemove,
"removed_count": len(types.ToRemove),
"resaved_old": types.ToResaveOld,
"resaved_old_count": len(types.ToResaveOld),
"resaved_new": types.ToResaveNew,
"resaved_new_count": len(types.ToResaveNew),
},
).
Msg("Notification types")

types.valid, err = moira.MergeToSorted[*moira.ScheduledNotification](types.valid, notDelayedNotifications)
types.Valid, err = moira.MergeToSorted[*moira.ScheduledNotification](types.Valid, notDelayedNotifications)
if err != nil {
return notificationTypes{}, fmt.Errorf("failed to merge valid and not delayed notifications into sorted array: %w", err)
}

types.toRemove = append(types.toRemove, types.valid...)
types.ToRemove = append(types.ToRemove, types.Valid...)

return types, nil
}
Expand Down Expand Up @@ -472,23 +466,29 @@ func (connector *DbConnector) fetchNotificationsDo(to int64, limit int64) ([]*mo
return fmt.Errorf("failed to validate notifications: %w", err)
}

result = types.valid
result = types.Valid

_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
// someone has changed notifierNotificationsKey while we do our job
// and transaction fail (no notifications were deleted) :(
var deleted int64
deleted, err = connector.removeNotifications(ctx, pipe, types.toRemove)
deleted, err = connector.removeNotifications(ctx, pipe, types.ToRemove)
if err != nil {
return fmt.Errorf("failed to remove notifications in transaction: %w", err)
}

if int(deleted) != len(types.toRemove) {
return fmt.Errorf("number of deletions: %d does not match the number of notifications to be deleted: %d", int(deleted), len(types.toRemove))
if int(deleted) != len(types.ToRemove) {
return fmt.Errorf("number of deleted: %d does not match the number of notifications to be deleted: %d", int(deleted), len(types.ToRemove))
}

if err = connector.saveNotifications(ctx, pipe, types.toResave); err != nil {
return fmt.Errorf("failed to save notifications in transaction: %w", err)
var affected int
affected, err = connector.resaveNotifications(ctx, pipe, types.ToResaveOld, types.ToResaveNew)
if err != nil {
return fmt.Errorf("failed to resave notifications in transaction")
}

if affected != len(types.ToResaveOld)+len(types.ToResaveNew) {
return fmt.Errorf("number of affected: %d does not match the number of notifications to be affected: %d", affected, len(types.ToResaveOld)+len(types.ToResaveNew))
}

return nil
Expand Down Expand Up @@ -545,22 +545,51 @@ func (connector *DbConnector) AddNotifications(notifications []*moira.ScheduledN
return nil
}

func (connector *DbConnector) saveNotifications(ctx context.Context, pipe redis.Pipeliner, notifications []*moira.ScheduledNotification) error {
for _, notification := range notifications {
bytes, err := reply.GetNotificationBytes(*notification)
if err != nil {
return err
func (connector *DbConnector) resaveNotifications(
ctx context.Context,
pipe redis.Pipeliner,
oldNotifications []*moira.ScheduledNotification,
newNotifications []*moira.ScheduledNotification,
) (int, error) {
for _, notification := range oldNotifications {
if notification != nil {
notificationString, err := reply.GetNotificationBytes(*notification)
if err != nil {
return 0, err
}

pipe.ZRem(ctx, notifierNotificationsKey, notificationString)
}
}

z := &redis.Z{Score: float64(notification.Timestamp), Member: bytes}
pipe.ZAdd(ctx, notifierNotificationsKey, z)
for _, notification := range newNotifications {
if notification != nil {
notificationString, err := reply.GetNotificationBytes(*notification)
if err != nil {
return 0, err
}

z := &redis.Z{Score: float64(notification.Timestamp), Member: notificationString}
pipe.ZAdd(ctx, notifierNotificationsKey, z)
}
}

response, err := pipe.Exec(ctx)
if err != nil {
return 0, fmt.Errorf("failed to EXEC: %w", err)
}

if _, err := pipe.Exec(ctx); err != nil {
return fmt.Errorf("failed to EXEC: %w", err)
var total int
for _, val := range response {
intVal, err := val.(*redis.IntCmd).Result()
if err != nil {
return 0, fmt.Errorf("failed to get result of intCmd response value: %w", err)
}

total += int(intVal)
}

return nil
return total, nil
}

var notifierNotificationsKey = "moira-notifier-notifications"
Loading

0 comments on commit e54f806

Please sign in to comment.