Skip to content

Commit

Permalink
replica - add max_statement_time
Browse files Browse the repository at this point in the history
  • Loading branch information
DamianZaremba committed Dec 11, 2024
1 parent 7643f91 commit 6e1ada8
Show file tree
Hide file tree
Showing 5 changed files with 26 additions and 29 deletions.
10 changes: 2 additions & 8 deletions pkg/cbng/database/cluebot/cluebot.go
Original file line number Diff line number Diff line change
Expand Up @@ -153,13 +153,10 @@ func (ci *CluebotInstance) GetLastRevertTime(l *logrus.Entry, ctx context.Contex
defer span.End()

var revertTime int64
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*300)
defer cancel()

db := ci.getDatabaseConnection()
defer db.Close()

rows, err := db.QueryContext(timeoutCtx, "SELECT `time` FROM `last_revert` WHERE title=? AND user=?", title, user)
rows, err := db.Query("SELECT `time` FROM `last_revert` WHERE title=? AND user=?", title, user)
if err != nil {
logger.Infof("Error running query: %v", err)
span.SetStatus(codes.Error, err.Error())
Expand Down Expand Up @@ -190,13 +187,10 @@ func (ci *CluebotInstance) SaveRevertTime(l *logrus.Entry, ctx context.Context,
defer span.End()

var revertTime int64
timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*300)
defer cancel()

db := ci.getDatabaseConnection()
defer db.Close()

rows, err := db.QueryContext(timeoutCtx, "INSERT INTO `last_revert` (`title`, `user`, `time`) "+
rows, err := db.Query("INSERT INTO `last_revert` (`title`, `user`, `time`) "+
"VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE `time`=`time`", title, user, time.Now().UTC().Unix())
if err != nil {
logger.Infof("Error running query: %v", err)
Expand Down
34 changes: 22 additions & 12 deletions pkg/cbng/database/replica/replica.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ func NewReplicaInstance(configuration *config.Configuration) *ReplicaInstance {
}
db.SetConnMaxLifetime(time.Minute * 5)
db.SetMaxOpenConns(10)
db.SetMaxIdleConns(10)
db.SetMaxIdleConns(0)

if err := db.Ping(); err != nil {
logger.Warnf("Could not use connection to MySQL: %v", err)
Expand Down Expand Up @@ -79,7 +79,8 @@ func (ri *ReplicaInstance) GetPageCreatedTimeAndUser(l *logrus.Entry, ctx contex

var timestamp int64
var user string
rows, err := ri.getHandle().Query("SELECT `rev_timestamp`, `actor_name` FROM `page` "+
rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+
"SELECT `rev_timestamp`, `actor_name` FROM `page` "+
"JOIN `revision` ON `rev_page` = `page_id` "+
"JOIN `actor` ON `actor_id` = `rev_actor` "+
"WHERE `page_namespace` = ? AND `page_title` = ? "+
Expand Down Expand Up @@ -117,7 +118,8 @@ func (ri *ReplicaInstance) GetPageRecentEditCount(l *logrus.Entry, ctx context.C
defer span.End()

var recentEditCount int64
rows, err := ri.getHandle().Query("SELECT COUNT(*) as count FROM `page` "+
rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+
"SELECT COUNT(*) as count FROM `page` "+
"JOIN `revision` ON `rev_page` = `page_id` "+
"WHERE `page_namespace` = ? AND `page_title` = ? AND `rev_timestamp` > ?", namespaceId, title, timestamp)

Expand Down Expand Up @@ -155,7 +157,8 @@ func (ri *ReplicaInstance) GetPageRecentRevertCount(l *logrus.Entry, ctx context
defer span.End()

var recentRevertCount int64
rows, err := ri.getHandle().Query("SELECT COUNT(*) as count FROM `page` "+
rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+
"SELECT COUNT(*) as count FROM `page` "+
"JOIN `revision` ON `rev_page` = `page_id` "+
"JOIN `comment` ON `comment_id` = `rev_comment_id` "+
"WHERE `page_namespace` = ? AND `page_title` = ? AND `rev_timestamp` > ? AND `comment_text` "+
Expand Down Expand Up @@ -198,7 +201,8 @@ func (ri *ReplicaInstance) GetUserEditCount(l *logrus.Entry, parentCtx context.C
defer span.End()

logger.Debugf("Querying revision_userindex for anonymous user")
rows, err := ri.getHandle().Query("SELECT COUNT(*) AS `user_editcount` FROM `revision_userindex` "+
rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+
"SELECT COUNT(*) AS `user_editcount` FROM `revision_userindex` "+
"WHERE `rev_actor` = "+
"(SELECT actor_id FROM actor WHERE `actor_name` = ?)", user)
if err != nil {
Expand All @@ -221,8 +225,9 @@ func (ri *ReplicaInstance) GetUserEditCount(l *logrus.Entry, parentCtx context.C
defer span.End()

logger.Debugf("Querying user_editcount for user")
userCountRows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=1 "+
"FOR SELECT `user_editcount` FROM `user` WHERE `user_name` = ?", user)
userCountRows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+
"SET STATEMENT max_statement_time=10 FOR "+
"SELECT `user_editcount` FROM `user` WHERE `user_name` = ?", user)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return editCount, err
Expand Down Expand Up @@ -257,7 +262,8 @@ func (ri *ReplicaInstance) GetUserRegistrationTime(l *logrus.Entry, parentCtx co
// Anon users have no registration time so are a noop
if net.ParseIP(user) == nil {
logger.Debugf("Using registered lookup")
userRegRows, err := ri.getHandle().Query("SELECT `user_registration` FROM `user` WHERE `user_name` = ? AND `user_registration` is not NULL", user)
userRegRows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+
"SELECT `user_registration` FROM `user` WHERE `user_name` = ? AND `user_registration` is not NULL", user)
if err != nil {
span.SetStatus(codes.Error, err.Error())
return registrationTime, err
Expand All @@ -273,7 +279,8 @@ func (ri *ReplicaInstance) GetUserRegistrationTime(l *logrus.Entry, parentCtx co
_, subSpan := metrics.OtelTracer.Start(ctx, "database.replica.ReplicaInstance.GetUserRegistrationTime.fallback")
defer subSpan.End()
logger.Debugf("Querying (fallback) revision_userindex for registered user")
userRevRows, err := ri.getHandle().Query("SELECT `rev_timestamp` FROM `revision_userindex` WHERE `rev_actor` = "+
userRevRows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+
"SELECT `rev_timestamp` FROM `revision_userindex` WHERE `rev_actor` = "+
"(SELECT actor_id FROM actor WHERE `actor_name` = ?) "+
" ORDER BY `rev_timestamp` LIMIT 0,1", user)
if err != nil {
Expand Down Expand Up @@ -308,7 +315,8 @@ func (ri *ReplicaInstance) GetUserWarnCount(l *logrus.Entry, ctx context.Context
defer span.End()

var warningCount int64
rows, err := ri.getHandle().Query("SELECT COUNT(*) as count FROM `page` "+
rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+
"SELECT COUNT(*) as count FROM `page` "+
"JOIN `revision` ON `rev_page` = `page_id` "+
"JOIN `comment` ON `comment_id` = `rev_comment_id` "+
"WHERE `page_namespace` = 3 AND `page_title` = ? AND "+
Expand Down Expand Up @@ -345,7 +353,8 @@ func (ri *ReplicaInstance) GetUserDistinctPagesCount(l *logrus.Entry, ctx contex
defer span.End()

var distinctPageCount int64
rows, err := ri.getHandle().Query("SELECT COUNT(DISTINCT rev_page) AS count FROM `revision_userindex` WHERE `rev_actor` = "+
rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+
"SELECT COUNT(DISTINCT rev_page) AS count FROM `revision_userindex` WHERE `rev_actor` = "+
"(SELECT actor_id FROM actor WHERE `actor_name` = ?)", strings.ReplaceAll(user, " ", "_"))
if err != nil {
span.SetStatus(codes.Error, err.Error())
Expand All @@ -372,7 +381,8 @@ func (ri *ReplicaInstance) GetLatestChangeTimestamp(l *logrus.Entry, ctx context
defer span.End()

var replicationDelay []uint8
rows, err := ri.getHandle().Query("SELECT UNIX_TIMESTAMP(MAX(rc_timestamp)) FROM `recentchanges`")
rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR " +
"SELECT UNIX_TIMESTAMP(MAX(rc_timestamp)) FROM `recentchanges`")
if err != nil {
logger.Errorf("Failed to query replication delay: %+v", err)
span.SetStatus(codes.Error, err.Error())
Expand Down
2 changes: 1 addition & 1 deletion pkg/cbng/loader/page_metadata.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ func loadSinglePageMetadata(logger *logrus.Entry, ctx context.Context, change *m

// Skip namespaces we're not interested in
if change.Common.NamespaceId != 0 && !helpers.StringItemInSlice(change.Common.Namespace, configuration.Dynamic.NamespaceOptIn) {
logger.Debugf("Skipping change due to namespace: %v (%v)", change.Common.Namespace, change.Common.NamespaceId)
logger.Debugf("Skipping change due to namespace: %s (%d)", change.Common.Namespace, change.Common.NamespaceId)
metrics.EditStatus.With(prometheus.Labels{"state": "verify_namespace", "status": "skipped"}).Inc()
return nil
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/cbng/loader/page_recent_edit_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ import (
func loadSinglePageRecentEditCount(logger *logrus.Entry, ctx context.Context, change *model.ProcessEvent, configuration *config.Configuration, db *database.DatabaseConnection, outChangeFeed chan *model.ProcessEvent) error {

// Load the page recent edit count
pageRecentEditCount, err := db.Replica.GetPageRecentEditCount(logger, ctx, change.Common.NamespaceId, helpers.PageTitleWithoutNamespace(change.Common.Title), change.ReceivedTime.Unix())
pageRecentEditCount, err := db.Replica.GetPageRecentEditCount(logger, ctx, change.Common.NamespaceId, helpers.PageTitleWithoutNamespace(change.Common.Title), change.ReceivedTime.Unix()-14*86400)
if err != nil {
metrics.EditStatus.With(prometheus.Labels{"state": "lookup_page_recent_edits", "status": "failed"}).Inc()
return err
Expand Down
7 changes: 0 additions & 7 deletions pkg/cbng/loader/user_distinct_pages_count.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,6 @@ func loadSingleDistinctPagesCount(logger *logrus.Entry, ctx context.Context, cha
// Load the user distinct pages count
userDistinctPagesCount, err := db.Replica.GetUserDistinctPagesCount(logger, ctx, change.User.Username)
if err != nil {
// If the user has a super high edit count, then fake it out as a non-error....
// This query will run successfully but take multiple mins, which we can't afford
// Since the user has a super high edit count, we're going to skip reverting them anyway :shrug:
if change.User.EditCount > 10000 {
return nil
}

metrics.EditStatus.With(prometheus.Labels{"state": "lookup_user_distinct_count", "status": "failed"}).Inc()
return err
}
Expand Down

0 comments on commit 6e1ada8

Please sign in to comment.