Skip to content

Commit

Permalink
rollback previous changes and fix migration for running on all shards
Browse files Browse the repository at this point in the history
  • Loading branch information
almostinf committed Jun 28, 2024
1 parent 4324713 commit 249f270
Show file tree
Hide file tree
Showing 6 changed files with 220 additions and 189 deletions.
4 changes: 2 additions & 2 deletions cmd/cli/from_2.11_to_2.12.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ import (
func updateFrom211(logger moira.Logger, database moira.Database) error {
logger.Info().Msg("Update 2.11 -> 2.12 was started")

if err := database.UpdateTelegramUsersRecords(); err != nil {
if err := updateTelegramUsersRecords(logger, database); err != nil {
return err
}

Expand All @@ -19,7 +19,7 @@ func updateFrom211(logger moira.Logger, database moira.Database) error {
func downgradeTo211(logger moira.Logger, database moira.Database) error {
logger.Info().Msg("Downgrade 2.12 -> 2.11 started")

if err := database.DowngradeTelegramUsersRecords(); err != nil {
if err := downgradeTelegramUsersRecords(logger, database); err != nil {
return err
}

Expand Down
173 changes: 173 additions & 0 deletions cmd/cli/telegram_utils.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,173 @@
package main

import (
"context"
"encoding/json"
"strconv"
"strings"

goredis "github.com/go-redis/redis/v8"
"github.com/moira-alert/moira"
"github.com/moira-alert/moira/database/redis"
"github.com/moira-alert/moira/senders/telegram"
)

var (
telegramUsersKey = "moira-telegram-users:"
telegramLockPrefix = "moira-telegram-users:moira-bot-host:"
)

// callFunc calls the fn dependent of Redis client type (cluster or standalone).
func callFunc(connector *redis.DbConnector, fn func(connector *redis.DbConnector, client goredis.UniversalClient) error) error {
client := connector.Client()
ctx := connector.Context()

switch c := client.(type) {
case *goredis.ClusterClient:
return c.ForEachMaster(ctx, func(ctx context.Context, shard *goredis.Client) error {
return fn(connector, shard)
})
default:
return fn(connector, client)
}
}

func updateTelegramUsersRecords(logger moira.Logger, database moira.Database) error {
logger.Info().Msg("Start updateTelegramUsersRecords")

switch d := database.(type) {
case *redis.DbConnector:
if err := callFunc(d, func(connector *redis.DbConnector, client goredis.UniversalClient) error {
return updateTelegramUsersRecordsOnRedisNode(connector, client, logger)
}); err != nil {
return err
}

default:
return makeUnknownDBError(database)
}

logger.Info().Msg("Successfully finished updateTelegramUsersRecords")

return nil
}

func updateTelegramUsersRecordsOnRedisNode(connector *redis.DbConnector, client goredis.UniversalClient, logger moira.Logger) error {
ctx := connector.Context()
iter := client.Scan(ctx, 0, telegramUsersKey+"*", 0).Iterator()
pipe := client.TxPipeline()

for iter.Next(ctx) {
key := iter.Val()
if strings.HasPrefix(key, telegramLockPrefix) {
continue
}

oldValue, err := client.Get(ctx, key).Result()
if err != nil {
return err
}

chatID, err := strconv.ParseInt(oldValue, 10, 64)
if err != nil {
logger.Error().
String("old_value", oldValue).
Error(err).
Msg("failed to parse chatID as int")

continue
}

var chat *telegram.Chat
if chatID < 0 {
chat = &telegram.Chat{
Type: "group",
ID: chatID,
}
} else {
chat = &telegram.Chat{
Type: "private",
ID: chatID,
}
}

chatRaw, err := json.Marshal(chat)
if err != nil {
return err
}

pipe.Set(ctx, key, string(chatRaw), goredis.KeepTTL)
}

if _, err := pipe.Exec(ctx); err != nil {
return err
}

return nil
}

func downgradeTelegramUsersRecords(logger moira.Logger, database moira.Database) error {
logger.Info().Msg("Start downgradeTelegramUsersRecords")

switch d := database.(type) {
case *redis.DbConnector:
if err := callFunc(d, func(connector *redis.DbConnector, client goredis.UniversalClient) error {
return downgradeTelegramUsersRecordsOnRedisNode(connector, client, logger)
}); err != nil {
return err
}

default:
return makeUnknownDBError(database)
}

logger.Info().Msg("Successfully finished downgradeTelegramUsersRecords")

return nil
}

func downgradeTelegramUsersRecordsOnRedisNode(connector *redis.DbConnector, client goredis.UniversalClient, logger moira.Logger) error {
ctx := connector.Context()
iter := client.Scan(ctx, 0, telegramUsersKey+"*", 0).Iterator()
pipe := client.TxPipeline()

for iter.Next(ctx) {
key := iter.Val()
if strings.HasPrefix(key, telegramLockPrefix) {
continue
}

oldValue, err := client.Get(ctx, key).Result()
if err != nil {
return err
}

chat := &telegram.Chat{}
if err = json.Unmarshal([]byte(oldValue), chat); err != nil {
logger.Error().
String("old_value", oldValue).
Error(err).
Msg("failed to unmarshal old value chat json")

continue
}

var newValue string
if chat.ID == 0 {
logger.Error().
Msg("chat ID is null")

continue
} else {
newValue = strconv.FormatInt(chat.ID, 10)
}

pipe.Set(ctx, key, newValue, goredis.KeepTTL)
}

if _, err := pipe.Exec(ctx); err != nil {
return err
}

return nil
}
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package redis
package main

import (
"context"
"testing"

goredis "github.com/go-redis/redis/v8"
"github.com/moira-alert/moira"
"github.com/moira-alert/moira/database/redis"
logging "github.com/moira-alert/moira/logging/zerolog_adapter"

goredis "github.com/go-redis/redis/v8"
. "github.com/smartystreets/goconvey/convey"
)

Expand All @@ -14,9 +16,13 @@ func TestUpdateTelegramUsersRecords(t *testing.T) {
t.Skip("skipping test in short mode.")
}

logger, _ := logging.GetLogger("database")
conf := getDefault()
logger, err := logging.ConfigureLog(conf.LogFile, "error", "cli", conf.LogPrettyFormat)
if err != nil {
t.Fatal(err)
}

database := NewTestDatabase(logger)
database := redis.NewTestDatabase(logger)
database.Flush()
defer database.Flush()

Expand All @@ -25,10 +31,10 @@ func TestUpdateTelegramUsersRecords(t *testing.T) {

Convey("Test data migration forwards", t, func() {
Convey("Given old database", func() {
createOldTelegramUserRecords(ctx, client)
createOldTelegramUserRecords(database)

Convey("When migration was applied", func() {
err := database.UpdateTelegramUsersRecords()
err := updateTelegramUsersRecords(logger, database)
So(err, ShouldBeNil)

Convey("Database should be new", func() {
Expand Down Expand Up @@ -58,21 +64,24 @@ func TestDowngradeTelegramUsersRecords(t *testing.T) {
t.Skip("skipping test in short mode.")
}

logger, _ := logging.GetLogger("database")
conf := getDefault()
logger, err := logging.ConfigureLog(conf.LogFile, "error", "cli", conf.LogPrettyFormat)
if err != nil {
t.Fatal(err)
}

database := NewTestDatabase(logger)
database := redis.NewTestDatabase(logger)
database.Flush()
defer database.Flush()

client := database.Client()
ctx := database.Context()

Convey("Test data migration backwards", t, func() {
Convey("Given new database", func() {
createNewTelegramUserRecords(ctx, client)
createNewTelegramUserRecords(database)

Convey("When migration was applied", func() {
err := database.DowngradeTelegramUsersRecords()
err := downgradeTelegramUsersRecords(logger, database)
So(err, ShouldBeNil)

Convey("Database should be old", func() {
Expand All @@ -97,16 +106,30 @@ func TestDowngradeTelegramUsersRecords(t *testing.T) {
})
}

func createOldTelegramUserRecords(ctx context.Context, client goredis.UniversalClient) {
client.Set(ctx, "moira-telegram-users:some telegram group", "-1001494975744", goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:some telegram group failed migration", `{"chatId":-1001494975755,"type":"group"}`, goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:@durov", "1", goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:moira-bot-host:123", "D4VdnzZDTS/xXF87THARWw==", goredis.KeepTTL)
func createOldTelegramUserRecords(database moira.Database) {
switch d := database.(type) {
case *redis.DbConnector:
d.Flush()
client := d.Client()
ctx := d.Context()

client.Set(ctx, "moira-telegram-users:some telegram group", "-1001494975744", goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:some telegram group failed migration", `{"chatId":-1001494975755,"type":"group"}`, goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:@durov", "1", goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:moira-bot-host:123", "D4VdnzZDTS/xXF87THARWw==", goredis.KeepTTL)
}
}

func createNewTelegramUserRecords(ctx context.Context, client goredis.UniversalClient) {
client.Set(ctx, "moira-telegram-users:some telegram group", `{"type":"group","chatId":-1001494975744}`, goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:@durov", `{"type":"private","chatId":1}`, goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:@failed_migration", `2`, goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:moira-bot-host:123", "D4VdnzZDTS/xXF87THARWw==", goredis.KeepTTL)
func createNewTelegramUserRecords(database moira.Database) {
switch d := database.(type) {
case *redis.DbConnector:
d.Flush()
client := d.Client()
ctx := d.Context()

client.Set(ctx, "moira-telegram-users:some telegram group", `{"type":"group","chatId":-1001494975744}`, goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:@durov", `{"type":"private","chatId":1}`, goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:@failed_migration", `2`, goredis.KeepTTL)
client.Set(ctx, "moira-telegram-users:moira-bot-host:123", "D4VdnzZDTS/xXF87THARWw==", goredis.KeepTTL)
}
}
Loading

0 comments on commit 249f270

Please sign in to comment.