Skip to content

Commit

Permalink
fix data leak in solution
Browse files Browse the repository at this point in the history
  • Loading branch information
almostinf committed Dec 21, 2023
1 parent d8b0561 commit c32e76a
Show file tree
Hide file tree
Showing 2 changed files with 126 additions and 67 deletions.
111 changes: 78 additions & 33 deletions database/redis/notification.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import (
)

type notificationTypes struct {
valid, toRemove, toResave []*moira.ScheduledNotification
Valid, ToRemove, ToResaveNew, ToResaveOld []*moira.ScheduledNotification
}

// Drops all notifications with last timestamp
Expand Down Expand Up @@ -121,19 +121,12 @@ func (connector *DbConnector) removeNotifications(ctx context.Context, pipe redi
return 0, fmt.Errorf("failed to remove notifications: %w", err)
}

total := int64(0)
var total int64
for _, val := range response {
intVal, _ := val.(*redis.IntCmd).Result()
total += intVal
}

if int(total) != len(notifications) {
connector.logger.Warning().
Int("need_to_delete", len(notifications)).
Int("deleted", int(total)).
Msg("Number of deletions does not match the number of notifications to be deleted")
}

return total, nil
}

Expand Down Expand Up @@ -195,7 +188,7 @@ func (connector *DbConnector) getNotificationsTriggerChecks(notifications []*moi
}

// Helper function for logging information on to remove notifications
func logRemovedNotifications(logger moira.Logger, toRemoveNotifications []*moira.ScheduledNotification) {
func logToRemoveNotifications(logger moira.Logger, toRemoveNotifications []*moira.ScheduledNotification) {
if len(toRemoveNotifications) == 0 {
return
}
Expand All @@ -218,45 +211,44 @@ func logRemovedNotifications(logger moira.Logger, toRemoveNotifications []*moira

logger.Info().
Interface("notification_trigger_ids", triggerIDs).
Int("removed_count", len(toRemoveNotifications)).
Msg("Removed notifications")
Int("to_remove_count", len(toRemoveNotifications)).
Msg("To remove notifications")
}

// filterNotificationsByState filters notifications based on their state to the corresponding arrays
func (connector *DbConnector) filterNotificationsByState(notifications []*moira.ScheduledNotification) (notificationTypes, error) {
types := notificationTypes{
valid: make([]*moira.ScheduledNotification, 0, len(notifications)),
toRemove: make([]*moira.ScheduledNotification, 0, len(notifications)),
toResave: make([]*moira.ScheduledNotification, 0, len(notifications)),
Valid: make([]*moira.ScheduledNotification, 0, len(notifications)),
ToRemove: make([]*moira.ScheduledNotification, 0, len(notifications)),
ToResaveNew: make([]*moira.ScheduledNotification, 0, len(notifications)),
ToResaveOld: make([]*moira.ScheduledNotification, 0, len(notifications)),
}

triggerChecks, err := connector.getNotificationsTriggerChecks(notifications)
if err != nil {
return notificationTypes{}, fmt.Errorf("failed to get notifications trigger checks: %w", err)
}

removedNotifications := make([]*moira.ScheduledNotification, 0, len(notifications))

for i, notification := range notifications {
if notification != nil {
switch notification.GetState(triggerChecks[i]) {
case moira.ValidNotification:
types.valid = append(types.valid, notification)
types.Valid = append(types.Valid, notification)

case moira.RemovedNotification:
types.toRemove = append(types.toRemove, notification)
removedNotifications = append(removedNotifications, notification)
types.ToRemove = append(types.ToRemove, notification)

case moira.ResavedNotification:
types.toRemove = append(types.toRemove, notification)
types.ToResaveOld = append(types.ToResaveOld, notification)

updatedNotification := *notification
updatedNotification.Timestamp = time.Now().Add(connector.notification.ResaveTime).Unix()
types.toResave = append(types.toResave, &updatedNotification)
types.ToResaveNew = append(types.ToResaveNew, &updatedNotification)
}
}
}

logRemovedNotifications(connector.logger, removedNotifications)
logToRemoveNotifications(connector.logger, types.ToRemove)

return types, nil
}
Expand All @@ -277,8 +269,8 @@ func (connector *DbConnector) handleNotifications(notifications []*moira.Schedul

if len(delayedNotifications) == 0 {
return notificationTypes{
valid: notDelayedNotifications,
toRemove: notDelayedNotifications,
Valid: notDelayedNotifications,
ToRemove: notDelayedNotifications,
}, nil
}

Expand All @@ -287,12 +279,12 @@ func (connector *DbConnector) handleNotifications(notifications []*moira.Schedul
return notificationTypes{}, fmt.Errorf("failed to filter delayed notifications by state: %w", err)
}

types.valid, err = moira.MergeToSorted[*moira.ScheduledNotification](types.valid, notDelayedNotifications)
types.Valid, err = moira.MergeToSorted[*moira.ScheduledNotification](types.Valid, notDelayedNotifications)
if err != nil {
return notificationTypes{}, fmt.Errorf("failed to merge valid and not delayed notifications into sorted array: %w", err)
}

types.toRemove = append(types.toRemove, types.valid...)
types.ToRemove = append(types.ToRemove, types.Valid...)

return types, nil
}
Expand Down Expand Up @@ -455,23 +447,29 @@ func (connector *DbConnector) fetchNotificationsDo(to int64, limit int64) ([]*mo
return fmt.Errorf("failed to validate notifications: %w", err)
}

result = types.valid
result = types.Valid

_, err = tx.TxPipelined(ctx, func(pipe redis.Pipeliner) error {
// someone has changed notifierNotificationsKey while we do our job
// and transaction fail (no notifications were deleted) :(
var deleted int64
deleted, err = connector.removeNotifications(ctx, pipe, types.toRemove)
deleted, err = connector.removeNotifications(ctx, pipe, types.ToRemove)
if err != nil {
return fmt.Errorf("failed to remove notifications in transaction: %w", err)
}

if int(deleted) != len(types.toRemove) {
return fmt.Errorf("number of deletions: %d does not match the number of notifications to be deleted: %d", int(deleted), len(types.toRemove))
if int(deleted) != len(types.ToRemove) {
return fmt.Errorf("number of deleted: %d does not match the number of notifications to be deleted: %d", int(deleted), len(types.ToRemove))
}

if err = connector.saveNotifications(ctx, pipe, types.toResave); err != nil {
return fmt.Errorf("failed to save notifications in transaction: %w", err)
var affected int
affected, err = connector.resaveNotifications(ctx, pipe, types.ToResaveOld, types.ToResaveNew)
if err != nil {
return fmt.Errorf("failed to resave notifications in transaction")
}

if affected != len(types.ToResaveOld)+len(types.ToResaveNew) {
return fmt.Errorf("number of affected: %d does not match the number of notifications to be affected: %d", affected, len(types.ToResaveOld)+len(types.ToResaveNew))
}

return nil
Expand Down Expand Up @@ -546,4 +544,51 @@ func (connector *DbConnector) saveNotifications(ctx context.Context, pipe redis.
return nil
}

func (connector *DbConnector) resaveNotifications(
ctx context.Context,
pipe redis.Pipeliner,
oldNotifications []*moira.ScheduledNotification,
newNotifications []*moira.ScheduledNotification,
) (int, error) {
for _, notification := range oldNotifications {
if notification != nil {
notificationString, err := reply.GetNotificationBytes(*notification)
if err != nil {
return 0, err
}

pipe.ZRem(ctx, notifierNotificationsKey, notificationString)
}
}

for _, notification := range newNotifications {
if notification != nil {
notificationString, err := reply.GetNotificationBytes(*notification)
if err != nil {
return 0, err
}

z := &redis.Z{Score: float64(notification.Timestamp), Member: notificationString}
pipe.ZAdd(ctx, notifierNotificationsKey, z)
}
}

response, err := pipe.Exec(ctx)
if err != nil {
return 0, fmt.Errorf("failed to EXEC: %w", err)
}

var total int
for _, val := range response {
intVal, err := val.(*redis.IntCmd).Result()
if err != nil {
return 0, fmt.Errorf("failed to get result of intCmd response value: %w", err)
}

total += int(intVal)
}

return total, nil
}

var notifierNotificationsKey = "moira-notifier-notifications"
82 changes: 48 additions & 34 deletions database/redis/notification_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -544,17 +544,19 @@ func TestFilterNotificationsByState(t *testing.T) {
Convey("With empty notifications", func() {
types, err := database.filterNotificationsByState([]*moira.ScheduledNotification{})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{})
So(types.toResave, ShouldResemble, []*moira.ScheduledNotification{})
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{})
So(types.ToResaveOld, ShouldResemble, []*moira.ScheduledNotification{})
So(types.ToResaveNew, ShouldResemble, []*moira.ScheduledNotification{})
})

Convey("With all valid notifications", func() {
types, err := database.filterNotificationsByState([]*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{})
So(types.toResave, ShouldResemble, []*moira.ScheduledNotification{})
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{})
So(types.ToResaveOld, ShouldResemble, []*moira.ScheduledNotification{})
So(types.ToResaveNew, ShouldResemble, []*moira.ScheduledNotification{})
})

Convey("With removed check data", func() {
Expand All @@ -565,9 +567,10 @@ func TestFilterNotificationsByState(t *testing.T) {

types, err := database.filterNotificationsByState([]*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{notificationNew})
So(types.toResave, ShouldResemble, []*moira.ScheduledNotification{})
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{notificationNew})
So(types.ToResaveOld, ShouldResemble, []*moira.ScheduledNotification{})
So(types.ToResaveNew, ShouldResemble, []*moira.ScheduledNotification{})
})

Convey("With metric on maintenance", func() {
Expand All @@ -576,10 +579,12 @@ func TestFilterNotificationsByState(t *testing.T) {

types, err := database.filterNotificationsByState([]*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{notification, notificationNew})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{notificationOld})
So(len(types.toResave), ShouldResemble, 1)
So(types.toResave[0].SendFail, ShouldResemble, notificationOld.SendFail)
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{notification, notificationNew})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{})
So(len(types.ToResaveOld), ShouldResemble, 1)
So(types.ToResaveOld[0], ShouldResemble, notificationOld)
So(len(types.ToResaveNew), ShouldResemble, 1)
So(types.ToResaveNew[0].SendFail, ShouldResemble, notificationOld.SendFail)
})

Convey("With trigger on maintenance", func() {
Expand All @@ -592,10 +597,12 @@ func TestFilterNotificationsByState(t *testing.T) {

types, err := database.filterNotificationsByState([]*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{notificationNew})
So(len(types.toResave), ShouldResemble, 1)
So(types.toResave[0].SendFail, ShouldResemble, notificationNew.SendFail)
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{})
So(len(types.ToResaveOld), ShouldResemble, 1)
So(types.ToResaveOld[0], ShouldResemble, notificationNew)
So(len(types.ToResaveNew), ShouldResemble, 1)
So(types.ToResaveNew[0].SendFail, ShouldResemble, notificationNew.SendFail)
})
})
}
Expand Down Expand Up @@ -678,18 +685,20 @@ func TestHandleNotifications(t *testing.T) {
Convey("Without delayed notifications", func() {
types, err := database.handleNotifications([]*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification, notificationNew})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification, notificationNew})
var toResaveNotificationsExpected []*moira.ScheduledNotification
So(types.toResave, ShouldResemble, toResaveNotificationsExpected)
So(types.ToResaveOld, ShouldResemble, toResaveNotificationsExpected)
So(types.ToResaveNew, ShouldResemble, toResaveNotificationsExpected)
})

Convey("With both delayed and not delayed valid notifications", func() {
types, err := database.handleNotifications([]*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew2, notificationNew3})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew2, notificationNew3})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew2, notificationNew3})
So(types.toResave, ShouldResemble, []*moira.ScheduledNotification{})
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew2, notificationNew3})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew2, notificationNew3})
So(types.ToResaveOld, ShouldResemble, []*moira.ScheduledNotification{})
So(types.ToResaveNew, ShouldResemble, []*moira.ScheduledNotification{})
})

Convey("With both delayed and not delayed notifications and removed check data", func() {
Expand All @@ -700,9 +709,10 @@ func TestHandleNotifications(t *testing.T) {

types, err := database.handleNotifications([]*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew2, notificationNew3})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew3})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{notificationNew2, notificationOld, notificationOld2, notification, notificationNew, notificationNew3})
So(types.toResave, ShouldResemble, []*moira.ScheduledNotification{})
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew3})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{notificationNew2, notificationOld, notificationOld2, notification, notificationNew, notificationNew3})
So(types.ToResaveOld, ShouldResemble, []*moira.ScheduledNotification{})
So(types.ToResaveNew, ShouldResemble, []*moira.ScheduledNotification{})
})

Convey("With both delayed and not delayed valid notifications and metric on maintenance", func() {
Expand All @@ -711,10 +721,12 @@ func TestHandleNotifications(t *testing.T) {

types, err := database.handleNotifications([]*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew2, notificationNew3})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification, notificationNew, notificationNew2, notificationNew3})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{notificationOld2, notificationOld, notification, notificationNew, notificationNew2, notificationNew3})
So(len(types.toResave), ShouldResemble, 1)
So(types.toResave[0].SendFail, ShouldResemble, notificationOld2.SendFail)
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification, notificationNew, notificationNew2, notificationNew3})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notification, notificationNew, notificationNew2, notificationNew3})
So(len(types.ToResaveNew), ShouldResemble, 1)
So(types.ToResaveNew[0].SendFail, ShouldResemble, notificationOld2.SendFail)
So(len(types.ToResaveOld), ShouldResemble, 1)
So(types.ToResaveOld[0], ShouldResemble, notificationOld2)
})

Convey("With both delayed and not delayed valid notifications and trigger on maintenance", func() {
Expand All @@ -727,10 +739,12 @@ func TestHandleNotifications(t *testing.T) {

types, err := database.handleNotifications([]*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew2, notificationNew3})
So(err, ShouldBeNil)
So(types.valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew3})
So(types.toRemove, ShouldResemble, []*moira.ScheduledNotification{notificationNew2, notificationOld, notificationOld2, notification, notificationNew, notificationNew3})
So(len(types.toResave), ShouldResemble, 1)
So(types.toResave[0].SendFail, ShouldResemble, notificationNew2.SendFail)
So(types.Valid, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew3})
So(types.ToRemove, ShouldResemble, []*moira.ScheduledNotification{notificationOld, notificationOld2, notification, notificationNew, notificationNew3})
So(len(types.ToResaveNew), ShouldResemble, 1)
So(types.ToResaveNew[0].SendFail, ShouldResemble, notificationNew2.SendFail)
So(len(types.ToResaveOld), ShouldResemble, 1)
So(types.ToResaveOld[0], ShouldResemble, notificationNew2)
})
})
}
Expand Down

0 comments on commit c32e76a

Please sign in to comment.