From 5ea6ceb9d7ecc9251ec0779f62040338e4f236c9 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 15:33:16 +0000 Subject: [PATCH 01/19] Cleanup release logic --- .github/workflows/release.yml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/.github/workflows/release.yml b/.github/workflows/release.yml index a1caccf..8020dad 100644 --- a/.github/workflows/release.yml +++ b/.github/workflows/release.yml @@ -20,4 +20,4 @@ jobs: uses: ncipollo/release-action@v1 with: token: "${{ secrets.GITHUB_TOKEN }}" - artifacts: cbng + artifacts: botng From aff51c03973b6656ad89986b82c8cb518d8c3a4d Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 17:44:39 +0000 Subject: [PATCH 02/19] Allow moving the log file --- main.go | 8 +++++++- pkg/cbng/config/config.go | 2 +- 2 files changed, 8 insertions(+), 2 deletions(-) diff --git a/main.go b/main.go index 11be84e..efc9463 100644 --- a/main.go +++ b/main.go @@ -18,6 +18,7 @@ import ( "github.com/spf13/pflag" "gopkg.in/natefinch/lumberjack.v2" "net/http" + "os" "sync" "time" ) @@ -80,8 +81,13 @@ func main() { logrus.FieldKeyMsg: "message", }, }) + + logFile := "cbng.log" + if value, ok := os.LookupEnv("BOTNG_LOG"); ok { + logFile = value + } logrus.AddHook(helpers.NewLogFileHook(&lumberjack.Logger{ - Filename: "cbng.log", + Filename: logFile, MaxBackups: 31, MaxAge: 1, Compress: true, diff --git a/pkg/cbng/config/config.go b/pkg/cbng/config/config.go index 0e35e80..f3c5999 100644 --- a/pkg/cbng/config/config.go +++ b/pkg/cbng/config/config.go @@ -103,7 +103,7 @@ func NewConfiguration() *Configuration { configuration := Configuration{} var configPath string - if val, ok := os.LookupEnv("BOT_CFG"); ok { + if val, ok := os.LookupEnv("BOTNG_CFG"); ok { configPath = val } if configPath != "" { From 7729f68b6051e5902a106e5fcdf2cc3f5244c4f7 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 18:30:21 +0000 Subject: [PATCH 03/19] core - increase timeout --- pkg/cbng/processor/core.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cbng/processor/core.go b/pkg/cbng/processor/core.go index a225b7e..30c5277 100644 --- a/pkg/cbng/processor/core.go +++ b/pkg/cbng/processor/core.go @@ -100,7 +100,7 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf coreUrl := fmt.Sprintf("%s:%d", coreHost, configuration.Core.Port) logger.Tracef("Connecting to %v", coreUrl) - dialer := net.Dialer{Timeout: time.Second * 2} + dialer := net.Dialer{Timeout: time.Second * 10} conn, err := dialer.Dial("tcp", coreUrl) if err != nil { scoreSpan.SetStatus(codes.Error, err.Error()) From 1cda69f8a086eb43819e2c84f7cfbede5ca8771d Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 18:31:08 +0000 Subject: [PATCH 04/19] More runtime tweaks --- .gitignore | 2 +- cluebot.sql | 62 ++--------------------------------------- main.go | 2 +- pkg/cbng/relay/relay.go | 8 ++++-- 4 files changed, 10 insertions(+), 64 deletions(-) diff --git a/.gitignore b/.gitignore index 8d6b537..b7c6d70 100644 --- a/.gitignore +++ b/.gitignore @@ -1,5 +1,5 @@ .idea *.iml -cbng.log* +botng.log* vendor/ config.yaml diff --git a/cluebot.sql b/cluebot.sql index 5617b30..5c63471 100644 --- a/cluebot.sql +++ b/cluebot.sql @@ -1,22 +1,4 @@ --- MySQL dump 10.11 --- --- Host: localhost Database: cluebot_enwiki --- ------------------------------------------------------ --- Server version 5.0.32-Debian_7etch3-log -/*!40101 SET @OLD_CHARACTER_SET_CLIENT=@@CHARACTER_SET_CLIENT */; -/*!40101 SET @OLD_CHARACTER_SET_RESULTS=@@CHARACTER_SET_RESULTS */; -/*!40101 SET @OLD_COLLATION_CONNECTION=@@COLLATION_CONNECTION */; -/*!40101 SET NAMES utf8 */; -/*!40103 SET @OLD_TIME_ZONE=@@TIME_ZONE */; -/*!40103 SET TIME_ZONE='+00:00' */; -/*!40014 SET @OLD_UNIQUE_CHECKS=@@UNIQUE_CHECKS, UNIQUE_CHECKS=0 */; -/*!40014 SET @OLD_FOREIGN_KEY_CHECKS=@@FOREIGN_KEY_CHECKS, FOREIGN_KEY_CHECKS=0 */; -/*!40101 SET @OLD_SQL_MODE=@@SQL_MODE, SQL_MODE='NO_AUTO_VALUE_ON_ZERO' */; -/*!40111 SET @OLD_SQL_NOTES=@@SQL_NOTES, SQL_NOTES=0 */; --- --- Table structure for table `beaten` --- -DROP TABLE IF EXISTS `beaten`; +CREATE TABLE IF NOT EXISTS `beaten`; CREATE TABLE `beaten` ( `id` int(11) NOT NULL auto_increment, @@ -26,26 +8,8 @@ CREATE TABLE `beaten` `user` varchar(256) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; --- --- Table structure for table `trr` --- -DROP TABLE IF EXISTS `trr`; -CREATE TABLE `trr` -( - `id` int(11) NOT NULL auto_increment, - `timestamp` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP, - `user` varchar(256) NOT NULL, - `title` varchar(256) NOT NULL, - `url` varchar(512) NOT NULL, - `revid` int(11) NOT NULL, - `md5` char(32) default NULL, - PRIMARY KEY (`id`) -) ENGINE=MyISAM AUTO_INCREMENT=488749 DEFAULT CHARSET=latin1; --- --- Table structure for table `vandalism` --- -DROP TABLE IF EXISTS `vandalism`; -CREATE TABLE `vandalism` + +CREATE TABLE IF NOT EXISTS `vandalism` ( `id` int(11) NOT NULL auto_increment, `timestamp` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP, @@ -60,23 +24,3 @@ CREATE TABLE `vandalism` `reverted` tinyint(1) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; --- --- Table structure for table `cluster_node` --- -DROP TABLE IF EXISTS `cluster_node`; -CREATE TABLE `cluster_node` -( - `node` varchar(256) NOT NULL, - `port` int(11) NOT NULL, - `type` varchar(256) NOT NULL, - PRIMARY KEY (`type`) -) ENGINE=InnoDB AUTO_INCREMENT=0 DEFAULT CHARSET=latin1; -/*!40103 SET TIME_ZONE=@OLD_TIME_ZONE */; -/*!40101 SET SQL_MODE=@OLD_SQL_MODE */; -/*!40014 SET FOREIGN_KEY_CHECKS=@OLD_FOREIGN_KEY_CHECKS */; -/*!40014 SET UNIQUE_CHECKS=@OLD_UNIQUE_CHECKS */; -/*!40101 SET CHARACTER_SET_CLIENT=@OLD_CHARACTER_SET_CLIENT */; -/*!40101 SET CHARACTER_SET_RESULTS=@OLD_CHARACTER_SET_RESULTS */; -/*!40101 SET COLLATION_CONNECTION=@OLD_COLLATION_CONNECTION */; -/*!40111 SET SQL_NOTES=@OLD_SQL_NOTES */; --- Dump completed on 2008-01-14 2:31:27 diff --git a/main.go b/main.go index efc9463..1d8698f 100644 --- a/main.go +++ b/main.go @@ -82,7 +82,7 @@ func main() { }, }) - logFile := "cbng.log" + logFile := "botng.log" if value, ok := os.LookupEnv("BOTNG_LOG"); ok { logFile = value } diff --git a/pkg/cbng/relay/relay.go b/pkg/cbng/relay/relay.go index de620f4..42eb68b 100644 --- a/pkg/cbng/relay/relay.go +++ b/pkg/cbng/relay/relay.go @@ -125,7 +125,7 @@ func NewRelays(wg *sync.WaitGroup, enableIrc bool, host string, port int, nick, password: password, sendChan: make(chan string, 10000), reConnectSignal: make(chan bool, 1), - limiter: rate.NewLimiter(6, 6), + limiter: rate.NewLimiter(2, 4), channel: channel, } go f.reader(wg) @@ -234,15 +234,17 @@ func (f *IrcServer) writer(wg *sync.WaitGroup) { logger.Warn("Stopping writing due to no connection") break } + message := <-f.sendChan + // Just drop the message if we cannot send right now, + // otherwise we just make a huge backlog if f.limiter.Allow() { - message := <-f.sendChan logger.Tracef("Sending: %+v\n", message) if !f.send(fmt.Sprintf("PRIVMSG #%s :%s", f.channel, message)) { logger.Warn("IRC write error") break } } else { - logger.Infof("Not permitted to write") + logger.Tracef("Not sending due to rate limit: %+v\n", message) } } f.close() From 132e5458619386e6f7f975dd73963c4d8c6c684c Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 19:29:38 +0000 Subject: [PATCH 05/19] replica - add connection pool --- main.go | 1 + pkg/cbng/config/config.go | 2 +- pkg/cbng/database/datasource.go | 4 ++ pkg/cbng/database/replica/replica.go | 81 +++++++++++++++------------- 4 files changed, 49 insertions(+), 39 deletions(-) diff --git a/main.go b/main.go index 1d8698f..fb189d0 100644 --- a/main.go +++ b/main.go @@ -133,6 +133,7 @@ func main() { r := relay.NewRelays(&wg, useIrcRelay, configuration.Irc.Server, configuration.Irc.Port, configuration.Irc.Username, configuration.Irc.Password, configuration.Irc.Channel) db := database.NewDatabaseConnection(configuration) + defer db.Disconnect() // Processing channels toReplicationWatcher := make(chan *model.ProcessEvent, 10000) diff --git a/pkg/cbng/config/config.go b/pkg/cbng/config/config.go index f3c5999..a9bae07 100644 --- a/pkg/cbng/config/config.go +++ b/pkg/cbng/config/config.go @@ -41,7 +41,7 @@ type ReplicaSqlConfiguration struct { } type SqlConfiguration struct { - Replica ReplicaSqlConfiguration + Replica []ReplicaSqlConfiguration Cluebot CluebotSqlConfiguration } diff --git a/pkg/cbng/database/datasource.go b/pkg/cbng/database/datasource.go index feb52ef..c2edd78 100644 --- a/pkg/cbng/database/datasource.go +++ b/pkg/cbng/database/datasource.go @@ -18,3 +18,7 @@ func NewDatabaseConnection(configuration *config.Configuration) *DatabaseConnect } return &c } + +func (dbc *DatabaseConnection) Disconnect() { + dbc.Replica.DisconnectFromDatabase() +} diff --git a/pkg/cbng/database/replica/replica.go b/pkg/cbng/database/replica/replica.go index e09eff1..aacc850 100644 --- a/pkg/cbng/database/replica/replica.go +++ b/pkg/cbng/database/replica/replica.go @@ -10,52 +10,57 @@ import ( _ "github.com/go-sql-driver/mysql" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/codes" + "math/rand" "net" "strings" + "time" ) type ReplicaInstance struct { - config config.ReplicaSqlConfiguration - cur *sql.DB - connectionId string + handlers []*sql.DB } func NewReplicaInstance(configuration *config.Configuration) *ReplicaInstance { - ri := ReplicaInstance{config: configuration.Sql.Replica} - if err := ri.ConnectToDatabase(); err != nil { - panic(err) - } - return &ri -} - -func (ri *ReplicaInstance) ConnectToDatabase() error { logger := logrus.WithFields(logrus.Fields{ "function": "database.replica.getDatabaseConnection", }) - cur, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?timeout=1s", ri.config.Username, ri.config.Password, ri.config.Host, ri.config.Port, ri.config.Schema)) - if err != nil { - logger.Fatalf("Error connecting to MySQL: %v", err) - } - cur.SetMaxIdleConns(1) - cur.SetMaxOpenConns(1) + var handlers []*sql.DB + for _, replica := range configuration.Sql.Replica { + db, err := sql.Open("mysql", fmt.Sprintf("%s:%s@tcp(%s:%d)/%s?timeout=1s", replica.Username, replica.Password, replica.Host, replica.Port, replica.Schema)) + if err != nil { + logger.Fatalf("Error connecting to MySQL: %v", err) + } + db.SetConnMaxLifetime(time.Minute * 5) + db.SetMaxOpenConns(10) + db.SetMaxIdleConns(10) - logger.Tracef("Connected to %s:xxx@tcp(%s:%d)/%s", ri.config.Username, ri.config.Host, ri.config.Port, ri.config.Schema) - ri.cur = cur - if err := ri.cur.QueryRow("SELECT CONNECTION_ID()").Scan(&ri.connectionId); err != nil { - return err + if err := db.Ping(); err != nil { + logger.Warnf("Could not use connection to MySQL: %v", err) + continue + } + + logger.Tracef("Connected to %s:xxx@tcp(%s:%d)/%s", replica.Username, replica.Host, replica.Port, replica.Schema) + handlers = append(handlers, db) } - return nil + + ri := ReplicaInstance{handlers: handlers} + return &ri } -func (ri *ReplicaInstance) DisconnectFromDatabase() error { - if ri.connectionId != "" { - if _, err := ri.cur.Exec("KILL CONNECTION ?", ri.connectionId); err != nil { - return err +func (ri *ReplicaInstance) getHandle() *sql.DB { + return ri.handlers[rand.Intn(len(ri.handlers))] +} + +func (ri *ReplicaInstance) DisconnectFromDatabase() { + logger := logrus.WithFields(logrus.Fields{ + "function": "database.replica.DisconnectFromDatabase", + }) + for _, handler := range ri.handlers { + if err := handler.Close(); err != nil { + logger.Warnf("Error closing connection to MySQL: %v", err) } - ri.connectionId = "" } - return ri.cur.Close() } func (ri *ReplicaInstance) GetPageCreatedTimeAndUser(l *logrus.Entry, ctx context.Context, namespaceId int64, title string) (string, int64, error) { @@ -65,7 +70,7 @@ func (ri *ReplicaInstance) GetPageCreatedTimeAndUser(l *logrus.Entry, ctx contex var timestamp int64 var user string - rows, err := ri.cur.Query("SELECT `rev_timestamp`, `actor_name` FROM `page` "+ + rows, err := ri.getHandle().Query("SELECT `rev_timestamp`, `actor_name` FROM `page` "+ "JOIN `revision` ON `rev_page` = `page_id` "+ "JOIN `actor` ON `actor_id` = `rev_actor` "+ "WHERE `page_namespace` = ? AND `page_title` = ? "+ @@ -103,7 +108,7 @@ func (ri *ReplicaInstance) GetPageRecentEditCount(l *logrus.Entry, ctx context.C defer span.End() var recentEditCount int64 - rows, err := ri.cur.Query("SELECT COUNT(*) as count FROM `page` "+ + rows, err := ri.getHandle().Query("SELECT COUNT(*) as count FROM `page` "+ "JOIN `revision` ON `rev_page` = `page_id` "+ "WHERE `page_namespace` = ? AND `page_title` = ? AND `rev_timestamp` > ?", namespaceId, title, timestamp) @@ -141,7 +146,7 @@ func (ri *ReplicaInstance) GetPageRecentRevertCount(l *logrus.Entry, ctx context defer span.End() var recentRevertCount int64 - rows, err := ri.cur.Query("SELECT COUNT(*) as count FROM `page` "+ + rows, err := ri.getHandle().Query("SELECT COUNT(*) as count FROM `page` "+ "JOIN `revision` ON `rev_page` = `page_id` "+ "JOIN `comment` ON `comment_id` = `rev_comment_id` "+ "WHERE `page_namespace` = ? AND `page_title` = ? AND `rev_timestamp` > ? AND `comment_text` "+ @@ -184,7 +189,7 @@ func (ri *ReplicaInstance) GetUserEditCount(l *logrus.Entry, parentCtx context.C defer span.End() logger.Debugf("Querying revision_userindex for anonymous user") - rows, err := ri.cur.Query("SELECT COUNT(*) AS `user_editcount` FROM `revision_userindex` "+ + rows, err := ri.getHandle().Query("SELECT COUNT(*) AS `user_editcount` FROM `revision_userindex` "+ "WHERE `rev_actor` = "+ "(SELECT actor_id FROM actor WHERE `actor_name` = ?)", user) if err != nil { @@ -207,7 +212,7 @@ func (ri *ReplicaInstance) GetUserEditCount(l *logrus.Entry, parentCtx context.C defer span.End() logger.Debugf("Querying user_editcount for user") - userCountRows, err := ri.cur.Query("SET STATEMENT max_statement_time=1 "+ + userCountRows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=1 "+ "FOR SELECT `user_editcount` FROM `user` WHERE `user_name` = ?", user) if err != nil { span.SetStatus(codes.Error, err.Error()) @@ -243,7 +248,7 @@ func (ri *ReplicaInstance) GetUserRegistrationTime(l *logrus.Entry, parentCtx co // Anon users have no registration time so are a noop if net.ParseIP(user) == nil { logger.Debugf("Using registered lookup") - userRegRows, err := ri.cur.Query("SELECT `user_registration` FROM `user` WHERE `user_name` = ? AND `user_registration` is not NULL", user) + userRegRows, err := ri.getHandle().Query("SELECT `user_registration` FROM `user` WHERE `user_name` = ? AND `user_registration` is not NULL", user) if err != nil { span.SetStatus(codes.Error, err.Error()) return registrationTime, err @@ -259,7 +264,7 @@ func (ri *ReplicaInstance) GetUserRegistrationTime(l *logrus.Entry, parentCtx co _, subSpan := metrics.OtelTracer.Start(ctx, "database.replica.ReplicaInstance.GetUserRegistrationTime.fallback") defer subSpan.End() logger.Debugf("Querying (fallback) revision_userindex for registered user") - userRevRows, err := ri.cur.Query("SELECT `rev_timestamp` FROM `revision_userindex` WHERE `rev_actor` = "+ + userRevRows, err := ri.getHandle().Query("SELECT `rev_timestamp` FROM `revision_userindex` WHERE `rev_actor` = "+ "(SELECT actor_id FROM actor WHERE `actor_name` = ?) "+ " ORDER BY `rev_timestamp` LIMIT 0,1", user) if err != nil { @@ -294,7 +299,7 @@ func (ri *ReplicaInstance) GetUserWarnCount(l *logrus.Entry, ctx context.Context defer span.End() var warningCount int64 - rows, err := ri.cur.Query("SELECT COUNT(*) as count FROM `page` "+ + rows, err := ri.getHandle().Query("SELECT COUNT(*) as count FROM `page` "+ "JOIN `revision` ON `rev_page` = `page_id` "+ "JOIN `comment` ON `comment_id` = `rev_comment_id` "+ "WHERE `page_namespace` = 3 AND `page_title` = ? AND "+ @@ -331,7 +336,7 @@ func (ri *ReplicaInstance) GetUserDistinctPagesCount(l *logrus.Entry, ctx contex defer span.End() var distinctPageCount int64 - rows, err := ri.cur.Query("SELECT COUNT(DISTINCT rev_page) AS count FROM `revision_userindex` WHERE `rev_actor` = "+ + rows, err := ri.getHandle().Query("SELECT COUNT(DISTINCT rev_page) AS count FROM `revision_userindex` WHERE `rev_actor` = "+ "(SELECT actor_id FROM actor WHERE `actor_name` = ?)", strings.ReplaceAll(user, " ", "_")) if err != nil { span.SetStatus(codes.Error, err.Error()) @@ -358,7 +363,7 @@ func (ri *ReplicaInstance) GetLatestChangeTimestamp(l *logrus.Entry, ctx context defer span.End() var replicationDelay []uint8 - rows, err := ri.cur.Query("SELECT UNIX_TIMESTAMP(MAX(rc_timestamp)) FROM `recentchanges`") + rows, err := ri.getHandle().Query("SELECT UNIX_TIMESTAMP(MAX(rc_timestamp)) FROM `recentchanges`") if err != nil { logger.Errorf("Failed to query replication delay: %+v", err) span.SetStatus(codes.Error, err.Error()) From 6b35a028baf86fbd58bc4b6dd5434d516fd311df Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 19:29:50 +0000 Subject: [PATCH 06/19] page_metadata - add success metric --- pkg/cbng/loader/page_metadata.go | 1 + 1 file changed, 1 insertion(+) diff --git a/pkg/cbng/loader/page_metadata.go b/pkg/cbng/loader/page_metadata.go index 411968c..b237946 100644 --- a/pkg/cbng/loader/page_metadata.go +++ b/pkg/cbng/loader/page_metadata.go @@ -24,6 +24,7 @@ func loadSinglePageMetadata(logger *logrus.Entry, ctx context.Context, change *m metrics.EditStatus.With(prometheus.Labels{"state": "verify_namespace", "status": "skipped"}).Inc() return nil } + metrics.EditStatus.With(prometheus.Labels{"state": "verify_namespace", "status": "success"}).Inc() // Load the page created metadata pageCreatedUser, pageCreatedTimestamp, err := db.Replica.GetPageCreatedTimeAndUser(logger, ctx, change.Common.NamespaceId, helpers.PageTitleWithoutNamespace(change.Common.Title)) From 06b5b2940273385965666b325c98d3fb0f3bacf1 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 19:30:28 +0000 Subject: [PATCH 07/19] feed - improve processing & use base stream --- pkg/cbng/feed/feed.go | 14 ++++---------- 1 file changed, 4 insertions(+), 10 deletions(-) diff --git a/pkg/cbng/feed/feed.go b/pkg/cbng/feed/feed.go index dc9b435..17cacde 100644 --- a/pkg/cbng/feed/feed.go +++ b/pkg/cbng/feed/feed.go @@ -41,8 +41,7 @@ type httpChangeEvent struct { } func handleLine(logger *logrus.Entry, line string, configuration *config.Configuration, changeFeed chan<- *model.ProcessEvent) { - parts := strings.Split(line, ": ") - if len(parts) == 2 && parts[0] == "data" { + if len(line) > 5 && line[0:5] == "data:" { rootCtx, rootSpan := metrics.OtelTracer.Start(context.Background(), "feed.ConsumeHttpChangeEvents.event") rootUUID := uuid.NewV4().String() rootSpan.SetAttributes(attribute.String("uuid", rootUUID)) @@ -50,7 +49,7 @@ func handleLine(logger *logrus.Entry, line string, configuration *config.Configu _, decodeSpan := metrics.OtelTracer.Start(rootCtx, "feed.ConsumeHttpChangeEvents.event.unmarshal") httpChange := httpChangeEvent{} - if err := json.Unmarshal([]byte(parts[1]), &httpChange); err != nil { + if err := json.Unmarshal([]byte(line[5:]), &httpChange); err != nil { logger.Warnf("Decoding failed: %v", err) decodeSpan.SetStatus(codes.Error, err.Error()) decodeSpan.End() @@ -86,19 +85,14 @@ func handleLine(logger *logrus.Entry, line string, configuration *config.Configu logger.WithFields(logrus.Fields{"uuid": change.Uuid, "change": change}).Debug("Received new event") metrics.EditStatus.With(prometheus.Labels{"state": "received_new", "status": "success"}).Inc() - - select { - case changeFeed <- &change: - default: - logger.Errorf("Failed to write to change feed") - } + changeFeed <- &change } } } func streamFeed(logger *logrus.Entry, configuration *config.Configuration, changeFeed chan<- *model.ProcessEvent) bool { logger.Info("Connecting to feed") - req, err := http.NewRequest("GET", "https://stream.wikimedia.org/v2/stream/recentchange", nil) + req, err := http.NewRequest("GET", "https://stream.wikimedia.org/v2/stream/mediawiki.recentchange", nil) if err != nil { logger.Errorf("Could not build request: %v", err) return false From 3311eb41d0869ac46728f9d5c125f85dc3d7ea27 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 19:36:31 +0000 Subject: [PATCH 08/19] core - tweak timeouts --- pkg/cbng/processor/core.go | 6 +++--- 1 file changed, 3 insertions(+), 3 deletions(-) diff --git a/pkg/cbng/processor/core.go b/pkg/cbng/processor/core.go index 30c5277..9c6474e 100644 --- a/pkg/cbng/processor/core.go +++ b/pkg/cbng/processor/core.go @@ -100,7 +100,7 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf coreUrl := fmt.Sprintf("%s:%d", coreHost, configuration.Core.Port) logger.Tracef("Connecting to %v", coreUrl) - dialer := net.Dialer{Timeout: time.Second * 10} + dialer := net.Dialer{Timeout: time.Second * 5} conn, err := dialer.Dial("tcp", coreUrl) if err != nil { scoreSpan.SetStatus(codes.Error, err.Error()) @@ -109,12 +109,12 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf } defer conn.Close() - if err := conn.SetDeadline(time.Now().Add(time.Second * 2)); err != nil { + if err := conn.SetDeadline(time.Now().Add(time.Second * 10)); err != nil { scoreSpan.SetStatus(codes.Error, err.Error()) logger.Errorf("Could not set deadline: %v", err) return false, err } - if err := conn.SetReadDeadline(time.Now().Add(time.Second * 2)); err != nil { + if err := conn.SetReadDeadline(time.Now().Add(time.Second * 10)); err != nil { scoreSpan.SetStatus(codes.Error, err.Error()) logger.Errorf("Could not set read deadline: %v", err) return false, err From 50fda744805644ef4b30323ca8d1ef20d9c00bcf Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 19:41:33 +0000 Subject: [PATCH 09/19] core - move read deadline to after write --- pkg/cbng/processor/core.go | 11 +++++------ 1 file changed, 5 insertions(+), 6 deletions(-) diff --git a/pkg/cbng/processor/core.go b/pkg/cbng/processor/core.go index 9c6474e..9a4956e 100644 --- a/pkg/cbng/processor/core.go +++ b/pkg/cbng/processor/core.go @@ -109,23 +109,22 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf } defer conn.Close() - if err := conn.SetDeadline(time.Now().Add(time.Second * 10)); err != nil { + if err := conn.SetDeadline(time.Now().Add(time.Second * 20)); err != nil { scoreSpan.SetStatus(codes.Error, err.Error()) logger.Errorf("Could not set deadline: %v", err) return false, err } - if err := conn.SetReadDeadline(time.Now().Add(time.Second * 10)); err != nil { + if _, err := conn.Write(xmlData); err != nil { scoreSpan.SetStatus(codes.Error, err.Error()) - logger.Errorf("Could not set read deadline: %v", err) + logger.Infof("Could not write payload: %v", err) return false, err } - if _, err := conn.Write(xmlData); err != nil { + if err := conn.SetReadDeadline(time.Now().Add(time.Second * 20)); err != nil { scoreSpan.SetStatus(codes.Error, err.Error()) - logger.Infof("Could not write payload: %v", err) + logger.Errorf("Could not set read deadline: %v", err) return false, err } - response := []byte{} tmp := make([]byte, 4096) i := 0 From 8d327813b4443f3701ec7e44acc8c782dea18423 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 19:45:27 +0000 Subject: [PATCH 10/19] core - drop deadline timeouts for now --- pkg/cbng/processor/core.go | 11 ----------- 1 file changed, 11 deletions(-) diff --git a/pkg/cbng/processor/core.go b/pkg/cbng/processor/core.go index 9a4956e..68a5a25 100644 --- a/pkg/cbng/processor/core.go +++ b/pkg/cbng/processor/core.go @@ -109,22 +109,11 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf } defer conn.Close() - if err := conn.SetDeadline(time.Now().Add(time.Second * 20)); err != nil { - scoreSpan.SetStatus(codes.Error, err.Error()) - logger.Errorf("Could not set deadline: %v", err) - return false, err - } if _, err := conn.Write(xmlData); err != nil { scoreSpan.SetStatus(codes.Error, err.Error()) logger.Infof("Could not write payload: %v", err) return false, err } - - if err := conn.SetReadDeadline(time.Now().Add(time.Second * 20)); err != nil { - scoreSpan.SetStatus(codes.Error, err.Error()) - logger.Errorf("Could not set read deadline: %v", err) - return false, err - } response := []byte{} tmp := make([]byte, 4096) i := 0 From 7643f91a8bd40e45d1a4385ade810b37d222ab71 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Tue, 10 Dec 2024 21:36:27 +0000 Subject: [PATCH 11/19] Hack in basic db metrics --- config.yaml.example | 13 ++++++------- main.go | 6 ++++-- pkg/cbng/database/datasource.go | 4 ++++ pkg/cbng/database/replica/replica.go | 26 +++++++++++++++++++++++++- pkg/cbng/metrics/metrics.go | 4 ++++ 5 files changed, 43 insertions(+), 10 deletions(-) diff --git a/config.yaml.example b/config.yaml.example index 59ba5e1..3fa1566 100644 --- a/config.yaml.example +++ b/config.yaml.example @@ -12,10 +12,9 @@ wikipedia: username: ClueBot_NG irc: - server: irc.freenode.org + server: irc.libera.chat port: 6697 username: CBNGRelay - password: xxxxxxxx channel: spam: wikipedia-en-cbngfeed2 revert: wikipedia-en-cbngrevertfeed2 @@ -23,12 +22,12 @@ irc: sql: replica: - username: root - host: 127.0.0.1 - port: 3306 - schema: enwiki_p + - username: root + host: 127.0.0.1 + port: 3306 + schema: enwiki_p cluebot: username: root host: 127.0.0.1 port: 3306 - schema: cbng \ No newline at end of file + schema: cbng diff --git a/main.go b/main.go index fb189d0..a6da25d 100644 --- a/main.go +++ b/main.go @@ -23,7 +23,7 @@ import ( "time" ) -func RunMetricPoller(wg *sync.WaitGroup, toReplicationWatcher, toPageMetadataLoader, toPageRecentEditCountLoader, toPageRecentRevertCountLoader, toUserEditCountLoader, toUserWarnsCountLoader, toUserDistinctPagesCountLoader, toRevisionLoader, toScoringProcessor, toRevertProcessor chan *model.ProcessEvent, r *relay.Relays) { +func RunMetricPoller(wg *sync.WaitGroup, toPageMetadataLoader, toPageRecentEditCountLoader, toPageRecentRevertCountLoader, toUserEditCountLoader, toUserWarnsCountLoader, toUserDistinctPagesCountLoader, toRevisionLoader, toScoringProcessor, toRevertProcessor chan *model.ProcessEvent, r *relay.Relays, db *database.DatabaseConnection) { wg.Add(1) defer wg.Done() @@ -42,6 +42,8 @@ func RunMetricPoller(wg *sync.WaitGroup, toReplicationWatcher, toPageMetadataLoa metrics.IrcNotificationsPending.With(prometheus.Labels{"channel": "debug"}).Set(float64(r.GetPendingDebugMessages())) metrics.IrcNotificationsPending.With(prometheus.Labels{"channel": "revert"}).Set(float64(r.GetPendingRevertMessages())) metrics.IrcNotificationsPending.With(prometheus.Labels{"channel": "spam"}).Set(float64(r.GetPendingSpamMessages())) + + db.UpdateMetrics() } } @@ -148,7 +150,7 @@ func main() { toScoringProcessor := make(chan *model.ProcessEvent, 10000) toRevertProcessor := make(chan *model.ProcessEvent, 10000) - go RunMetricPoller(&wg, toReplicationWatcher, toPageMetadataLoader, toPageRecentEditCountLoader, toPageRecentRevertCountLoader, toUserEditCountLoader, toUserWarnsCountLoader, toUserDistinctPagesCountLoader, toRevisionLoader, toScoringProcessor, toRevertProcessor, r) + go RunMetricPoller(&wg, toPageMetadataLoader, toPageRecentEditCountLoader, toPageRecentRevertCountLoader, toUserEditCountLoader, toUserWarnsCountLoader, toUserDistinctPagesCountLoader, toRevisionLoader, toScoringProcessor, toRevertProcessor, r, db) go feed.ConsumeHttpChangeEvents(&wg, configuration, toReplicationWatcher) go processor.ReplicationWatcher(&wg, configuration, db, ignoreReplicationDelay, toReplicationWatcher, toPageMetadataLoader) diff --git a/pkg/cbng/database/datasource.go b/pkg/cbng/database/datasource.go index c2edd78..a4b3087 100644 --- a/pkg/cbng/database/datasource.go +++ b/pkg/cbng/database/datasource.go @@ -22,3 +22,7 @@ func NewDatabaseConnection(configuration *config.Configuration) *DatabaseConnect func (dbc *DatabaseConnection) Disconnect() { dbc.Replica.DisconnectFromDatabase() } + +func (dbc *DatabaseConnection) UpdateMetrics() { + dbc.Replica.UpdateMetrics() +} diff --git a/pkg/cbng/database/replica/replica.go b/pkg/cbng/database/replica/replica.go index aacc850..2824d9e 100644 --- a/pkg/cbng/database/replica/replica.go +++ b/pkg/cbng/database/replica/replica.go @@ -8,6 +8,7 @@ import ( "github.com/cluebotng/botng/pkg/cbng/config" "github.com/cluebotng/botng/pkg/cbng/metrics" _ "github.com/go-sql-driver/mysql" + "github.com/prometheus/client_golang/prometheus" "github.com/sirupsen/logrus" "go.opentelemetry.io/otel/codes" "math/rand" @@ -49,7 +50,15 @@ func NewReplicaInstance(configuration *config.Configuration) *ReplicaInstance { } func (ri *ReplicaInstance) getHandle() *sql.DB { - return ri.handlers[rand.Intn(len(ri.handlers))] + logger := logrus.WithFields(logrus.Fields{ + "function": "database.replica.DisconnectFromDatabase", + }) + numberOfHandlers := len(ri.handlers) + if numberOfHandlers > 0 { + return ri.handlers[rand.Intn(len(ri.handlers))] + } + logger.Warnf("Could not find handler: %d", numberOfHandlers) + return nil } func (ri *ReplicaInstance) DisconnectFromDatabase() { @@ -387,3 +396,18 @@ func (ri *ReplicaInstance) GetLatestChangeTimestamp(l *logrus.Entry, ctx context return int64(replicationDelay[0]), nil } + +func (ri *ReplicaInstance) UpdateMetrics() { + for i, handler := range ri.handlers { + stats := handler.Stats() + metrics.ReplicaStats.With(prometheus.Labels{"instance": fmt.Sprintf("%d", i), "metric": "max_open"}).Set(float64(stats.MaxOpenConnections)) + metrics.ReplicaStats.With(prometheus.Labels{"instance": fmt.Sprintf("%d", i), "metric": "idle"}).Set(float64(stats.Idle)) + metrics.ReplicaStats.With(prometheus.Labels{"instance": fmt.Sprintf("%d", i), "metric": "open"}).Set(float64(stats.OpenConnections)) + metrics.ReplicaStats.With(prometheus.Labels{"instance": fmt.Sprintf("%d", i), "metric": "in_use"}).Set(float64(stats.InUse)) + metrics.ReplicaStats.With(prometheus.Labels{"instance": fmt.Sprintf("%d", i), "metric": "wait"}).Set(float64(stats.WaitCount)) + metrics.ReplicaStats.With(prometheus.Labels{"instance": fmt.Sprintf("%d", i), "metric": "wait_duration"}).Set(float64(stats.WaitDuration)) + metrics.ReplicaStats.With(prometheus.Labels{"instance": fmt.Sprintf("%d", i), "metric": "idle_closed"}).Set(float64(stats.MaxIdleClosed)) + metrics.ReplicaStats.With(prometheus.Labels{"instance": fmt.Sprintf("%d", i), "metric": "idle_time_closed"}).Set(float64(stats.MaxIdleTimeClosed)) + metrics.ReplicaStats.With(prometheus.Labels{"instance": fmt.Sprintf("%d", i), "metric": "lifetime_closed"}).Set(float64(stats.MaxLifetimeClosed)) + } +} diff --git a/pkg/cbng/metrics/metrics.go b/pkg/cbng/metrics/metrics.go index b79a810..c1832e7 100644 --- a/pkg/cbng/metrics/metrics.go +++ b/pkg/cbng/metrics/metrics.go @@ -39,6 +39,8 @@ var LoaderUserDistinctPageCountInUse prometheus.Gauge var LoaderUserWarnsCountInUse prometheus.Gauge var LoaderPageRevisionInUse prometheus.Gauge +var ReplicaStats *prometheus.GaugeVec + var OtelTracer trace.Tracer func init() { @@ -76,4 +78,6 @@ func init() { IrcNotificationsPending = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "cbng_irc_notifications_pending"}, []string{"channel"}) IrcNotificationsSent = promauto.NewCounterVec(prometheus.CounterOpts{Name: "cbng_irc_notifications_sent"}, []string{"channel"}) + + ReplicaStats = promauto.NewGaugeVec(prometheus.GaugeOpts{Name: "cbng_database_replica"}, []string{"instance", "metric"}) } From 6e1ada8fc28e84fbdcab34df120ae63a96e836a8 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Wed, 11 Dec 2024 01:01:12 +0000 Subject: [PATCH 12/19] replica - add max_statement_time --- pkg/cbng/database/cluebot/cluebot.go | 10 ++---- pkg/cbng/database/replica/replica.go | 34 +++++++++++++------- pkg/cbng/loader/page_metadata.go | 2 +- pkg/cbng/loader/page_recent_edit_count.go | 2 +- pkg/cbng/loader/user_distinct_pages_count.go | 7 ---- 5 files changed, 26 insertions(+), 29 deletions(-) diff --git a/pkg/cbng/database/cluebot/cluebot.go b/pkg/cbng/database/cluebot/cluebot.go index 7c0173d..0e9dbb0 100644 --- a/pkg/cbng/database/cluebot/cluebot.go +++ b/pkg/cbng/database/cluebot/cluebot.go @@ -153,13 +153,10 @@ func (ci *CluebotInstance) GetLastRevertTime(l *logrus.Entry, ctx context.Contex defer span.End() var revertTime int64 - timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*300) - defer cancel() - db := ci.getDatabaseConnection() defer db.Close() - rows, err := db.QueryContext(timeoutCtx, "SELECT `time` FROM `last_revert` WHERE title=? AND user=?", title, user) + rows, err := db.Query("SELECT `time` FROM `last_revert` WHERE title=? AND user=?", title, user) if err != nil { logger.Infof("Error running query: %v", err) span.SetStatus(codes.Error, err.Error()) @@ -190,13 +187,10 @@ func (ci *CluebotInstance) SaveRevertTime(l *logrus.Entry, ctx context.Context, defer span.End() var revertTime int64 - timeoutCtx, cancel := context.WithTimeout(context.Background(), time.Millisecond*300) - defer cancel() - db := ci.getDatabaseConnection() defer db.Close() - rows, err := db.QueryContext(timeoutCtx, "INSERT INTO `last_revert` (`title`, `user`, `time`) "+ + rows, err := db.Query("INSERT INTO `last_revert` (`title`, `user`, `time`) "+ "VALUES (?, ?, ?) ON DUPLICATE KEY UPDATE `time`=`time`", title, user, time.Now().UTC().Unix()) if err != nil { logger.Infof("Error running query: %v", err) diff --git a/pkg/cbng/database/replica/replica.go b/pkg/cbng/database/replica/replica.go index 2824d9e..93dc4bc 100644 --- a/pkg/cbng/database/replica/replica.go +++ b/pkg/cbng/database/replica/replica.go @@ -34,7 +34,7 @@ func NewReplicaInstance(configuration *config.Configuration) *ReplicaInstance { } db.SetConnMaxLifetime(time.Minute * 5) db.SetMaxOpenConns(10) - db.SetMaxIdleConns(10) + db.SetMaxIdleConns(0) if err := db.Ping(); err != nil { logger.Warnf("Could not use connection to MySQL: %v", err) @@ -79,7 +79,8 @@ func (ri *ReplicaInstance) GetPageCreatedTimeAndUser(l *logrus.Entry, ctx contex var timestamp int64 var user string - rows, err := ri.getHandle().Query("SELECT `rev_timestamp`, `actor_name` FROM `page` "+ + rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+ + "SELECT `rev_timestamp`, `actor_name` FROM `page` "+ "JOIN `revision` ON `rev_page` = `page_id` "+ "JOIN `actor` ON `actor_id` = `rev_actor` "+ "WHERE `page_namespace` = ? AND `page_title` = ? "+ @@ -117,7 +118,8 @@ func (ri *ReplicaInstance) GetPageRecentEditCount(l *logrus.Entry, ctx context.C defer span.End() var recentEditCount int64 - rows, err := ri.getHandle().Query("SELECT COUNT(*) as count FROM `page` "+ + rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+ + "SELECT COUNT(*) as count FROM `page` "+ "JOIN `revision` ON `rev_page` = `page_id` "+ "WHERE `page_namespace` = ? AND `page_title` = ? AND `rev_timestamp` > ?", namespaceId, title, timestamp) @@ -155,7 +157,8 @@ func (ri *ReplicaInstance) GetPageRecentRevertCount(l *logrus.Entry, ctx context defer span.End() var recentRevertCount int64 - rows, err := ri.getHandle().Query("SELECT COUNT(*) as count FROM `page` "+ + rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+ + "SELECT COUNT(*) as count FROM `page` "+ "JOIN `revision` ON `rev_page` = `page_id` "+ "JOIN `comment` ON `comment_id` = `rev_comment_id` "+ "WHERE `page_namespace` = ? AND `page_title` = ? AND `rev_timestamp` > ? AND `comment_text` "+ @@ -198,7 +201,8 @@ func (ri *ReplicaInstance) GetUserEditCount(l *logrus.Entry, parentCtx context.C defer span.End() logger.Debugf("Querying revision_userindex for anonymous user") - rows, err := ri.getHandle().Query("SELECT COUNT(*) AS `user_editcount` FROM `revision_userindex` "+ + rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+ + "SELECT COUNT(*) AS `user_editcount` FROM `revision_userindex` "+ "WHERE `rev_actor` = "+ "(SELECT actor_id FROM actor WHERE `actor_name` = ?)", user) if err != nil { @@ -221,8 +225,9 @@ func (ri *ReplicaInstance) GetUserEditCount(l *logrus.Entry, parentCtx context.C defer span.End() logger.Debugf("Querying user_editcount for user") - userCountRows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=1 "+ - "FOR SELECT `user_editcount` FROM `user` WHERE `user_name` = ?", user) + userCountRows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+ + "SET STATEMENT max_statement_time=10 FOR "+ + "SELECT `user_editcount` FROM `user` WHERE `user_name` = ?", user) if err != nil { span.SetStatus(codes.Error, err.Error()) return editCount, err @@ -257,7 +262,8 @@ func (ri *ReplicaInstance) GetUserRegistrationTime(l *logrus.Entry, parentCtx co // Anon users have no registration time so are a noop if net.ParseIP(user) == nil { logger.Debugf("Using registered lookup") - userRegRows, err := ri.getHandle().Query("SELECT `user_registration` FROM `user` WHERE `user_name` = ? AND `user_registration` is not NULL", user) + userRegRows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+ + "SELECT `user_registration` FROM `user` WHERE `user_name` = ? AND `user_registration` is not NULL", user) if err != nil { span.SetStatus(codes.Error, err.Error()) return registrationTime, err @@ -273,7 +279,8 @@ func (ri *ReplicaInstance) GetUserRegistrationTime(l *logrus.Entry, parentCtx co _, subSpan := metrics.OtelTracer.Start(ctx, "database.replica.ReplicaInstance.GetUserRegistrationTime.fallback") defer subSpan.End() logger.Debugf("Querying (fallback) revision_userindex for registered user") - userRevRows, err := ri.getHandle().Query("SELECT `rev_timestamp` FROM `revision_userindex` WHERE `rev_actor` = "+ + userRevRows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+ + "SELECT `rev_timestamp` FROM `revision_userindex` WHERE `rev_actor` = "+ "(SELECT actor_id FROM actor WHERE `actor_name` = ?) "+ " ORDER BY `rev_timestamp` LIMIT 0,1", user) if err != nil { @@ -308,7 +315,8 @@ func (ri *ReplicaInstance) GetUserWarnCount(l *logrus.Entry, ctx context.Context defer span.End() var warningCount int64 - rows, err := ri.getHandle().Query("SELECT COUNT(*) as count FROM `page` "+ + rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+ + "SELECT COUNT(*) as count FROM `page` "+ "JOIN `revision` ON `rev_page` = `page_id` "+ "JOIN `comment` ON `comment_id` = `rev_comment_id` "+ "WHERE `page_namespace` = 3 AND `page_title` = ? AND "+ @@ -345,7 +353,8 @@ func (ri *ReplicaInstance) GetUserDistinctPagesCount(l *logrus.Entry, ctx contex defer span.End() var distinctPageCount int64 - rows, err := ri.getHandle().Query("SELECT COUNT(DISTINCT rev_page) AS count FROM `revision_userindex` WHERE `rev_actor` = "+ + rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR "+ + "SELECT COUNT(DISTINCT rev_page) AS count FROM `revision_userindex` WHERE `rev_actor` = "+ "(SELECT actor_id FROM actor WHERE `actor_name` = ?)", strings.ReplaceAll(user, " ", "_")) if err != nil { span.SetStatus(codes.Error, err.Error()) @@ -372,7 +381,8 @@ func (ri *ReplicaInstance) GetLatestChangeTimestamp(l *logrus.Entry, ctx context defer span.End() var replicationDelay []uint8 - rows, err := ri.getHandle().Query("SELECT UNIX_TIMESTAMP(MAX(rc_timestamp)) FROM `recentchanges`") + rows, err := ri.getHandle().Query("SET STATEMENT max_statement_time=10 FOR " + + "SELECT UNIX_TIMESTAMP(MAX(rc_timestamp)) FROM `recentchanges`") if err != nil { logger.Errorf("Failed to query replication delay: %+v", err) span.SetStatus(codes.Error, err.Error()) diff --git a/pkg/cbng/loader/page_metadata.go b/pkg/cbng/loader/page_metadata.go index b237946..7bac01c 100644 --- a/pkg/cbng/loader/page_metadata.go +++ b/pkg/cbng/loader/page_metadata.go @@ -20,7 +20,7 @@ func loadSinglePageMetadata(logger *logrus.Entry, ctx context.Context, change *m // Skip namespaces we're not interested in if change.Common.NamespaceId != 0 && !helpers.StringItemInSlice(change.Common.Namespace, configuration.Dynamic.NamespaceOptIn) { - logger.Debugf("Skipping change due to namespace: %v (%v)", change.Common.Namespace, change.Common.NamespaceId) + logger.Debugf("Skipping change due to namespace: %s (%d)", change.Common.Namespace, change.Common.NamespaceId) metrics.EditStatus.With(prometheus.Labels{"state": "verify_namespace", "status": "skipped"}).Inc() return nil } diff --git a/pkg/cbng/loader/page_recent_edit_count.go b/pkg/cbng/loader/page_recent_edit_count.go index 1813624..d89e851 100644 --- a/pkg/cbng/loader/page_recent_edit_count.go +++ b/pkg/cbng/loader/page_recent_edit_count.go @@ -19,7 +19,7 @@ import ( func loadSinglePageRecentEditCount(logger *logrus.Entry, ctx context.Context, change *model.ProcessEvent, configuration *config.Configuration, db *database.DatabaseConnection, outChangeFeed chan *model.ProcessEvent) error { // Load the page recent edit count - pageRecentEditCount, err := db.Replica.GetPageRecentEditCount(logger, ctx, change.Common.NamespaceId, helpers.PageTitleWithoutNamespace(change.Common.Title), change.ReceivedTime.Unix()) + pageRecentEditCount, err := db.Replica.GetPageRecentEditCount(logger, ctx, change.Common.NamespaceId, helpers.PageTitleWithoutNamespace(change.Common.Title), change.ReceivedTime.Unix()-14*86400) if err != nil { metrics.EditStatus.With(prometheus.Labels{"state": "lookup_page_recent_edits", "status": "failed"}).Inc() return err diff --git a/pkg/cbng/loader/user_distinct_pages_count.go b/pkg/cbng/loader/user_distinct_pages_count.go index 0d7f027..7606091 100644 --- a/pkg/cbng/loader/user_distinct_pages_count.go +++ b/pkg/cbng/loader/user_distinct_pages_count.go @@ -20,13 +20,6 @@ func loadSingleDistinctPagesCount(logger *logrus.Entry, ctx context.Context, cha // Load the user distinct pages count userDistinctPagesCount, err := db.Replica.GetUserDistinctPagesCount(logger, ctx, change.User.Username) if err != nil { - // If the user has a super high edit count, then fake it out as a non-error.... - // This query will run successfully but take multiple mins, which we can't afford - // Since the user has a super high edit count, we're going to skip reverting them anyway :shrug: - if change.User.EditCount > 10000 { - return nil - } - metrics.EditStatus.With(prometheus.Labels{"state": "lookup_user_distinct_count", "status": "failed"}).Inc() return err } From 6b8d0bddf16bb8e66611d5e062f2abc6ea1f8663 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Wed, 11 Dec 2024 12:18:44 +0000 Subject: [PATCH 13/19] core - correct XML payload --- pkg/cbng/database/replica/replica.go | 4 ++-- pkg/cbng/feed/feed.go | 8 +++++++- pkg/cbng/loader/page_revision.go | 2 ++ pkg/cbng/model/processor.go | 4 +--- pkg/cbng/processor/core.go | 11 ++++++++--- 5 files changed, 20 insertions(+), 9 deletions(-) diff --git a/pkg/cbng/database/replica/replica.go b/pkg/cbng/database/replica/replica.go index 93dc4bc..81424ec 100644 --- a/pkg/cbng/database/replica/replica.go +++ b/pkg/cbng/database/replica/replica.go @@ -101,7 +101,7 @@ func (ri *ReplicaInstance) GetPageCreatedTimeAndUser(l *logrus.Entry, ctx contex return user, timestamp, err } - logger.Debugf("Found user '%v', timestamp '%v'", user, timestamp) + logger.Debugf("Found creator %v @ %v", user, timestamp) return user, timestamp, nil } @@ -300,7 +300,7 @@ func (ri *ReplicaInstance) GetUserRegistrationTime(l *logrus.Entry, parentCtx co } } } - logger.Debugf("Found registration time '%v'", registrationTime) + logger.Debugf("Found registration time: %v", registrationTime) return registrationTime, nil } diff --git a/pkg/cbng/feed/feed.go b/pkg/cbng/feed/feed.go index 17cacde..3b49ba1 100644 --- a/pkg/cbng/feed/feed.go +++ b/pkg/cbng/feed/feed.go @@ -61,11 +61,17 @@ func handleLine(logger *logrus.Entry, line string, configuration *config.Configu if httpChange.Type == "edit" && httpChange.ServerName == configuration.Wikipedia.Host { _, emitterSpan := metrics.OtelTracer.Start(rootCtx, "feed.ConsumeHttpChangeEvents.event.emit") defer emitterSpan.End() + + namespace := strings.TrimRight(httpChange.Namespace, ":") + if namespace == "" { + namespace = "main" + } + change := model.ProcessEvent{ Uuid: rootUUID, ReceivedTime: time.Now().UTC(), Common: model.ProcessEventCommon{ - Namespace: strings.TrimRight(httpChange.Namespace, ":"), + Namespace: namespace, NamespaceId: httpChange.NamespaceId, Title: httpChange.Title, }, diff --git a/pkg/cbng/loader/page_revision.go b/pkg/cbng/loader/page_revision.go index 6b4cabd..678acb8 100644 --- a/pkg/cbng/loader/page_revision.go +++ b/pkg/cbng/loader/page_revision.go @@ -32,11 +32,13 @@ func loadSinglePageRevision(logger *logrus.Entry, ctx context.Context, change *m Timestamp: revisionData.Current.Timestamp, Text: revisionData.Current.Data, Id: revisionData.Current.Id, + Username: revisionData.Current.User, } change.Previous = model.ProcessEventRevision{ Timestamp: revisionData.Previous.Timestamp, Text: revisionData.Previous.Data, Id: revisionData.Previous.Id, + Username: revisionData.Previous.User, } metrics.EditStatus.With(prometheus.Labels{"state": "lookup_page_revisions", "status": "success"}).Inc() diff --git a/pkg/cbng/model/processor.go b/pkg/cbng/model/processor.go index e474054..bee7880 100644 --- a/pkg/cbng/model/processor.go +++ b/pkg/cbng/model/processor.go @@ -20,6 +20,7 @@ type ProcessEventRevision struct { Timestamp int64 Text string `json:"-"` Id int64 + Username string } type ProcessEventUser struct { @@ -34,12 +35,9 @@ type ProcessEvent struct { Uuid string ReceivedTime time.Time Attempts int32 - EditType string - EditId int64 User ProcessEventUser Comment string Length int64 - PreviousUser string Common ProcessEventCommon Current ProcessEventRevision Previous ProcessEventRevision diff --git a/pkg/cbng/processor/core.go b/pkg/cbng/processor/core.go index 68a5a25..472b3d0 100644 --- a/pkg/cbng/processor/core.go +++ b/pkg/cbng/processor/core.go @@ -48,14 +48,14 @@ func generateXML(pe *model.ProcessEvent) ([]byte, error) { data := WPEditSet{ WPEdit: WPEdit{ - EditType: pe.EditType, - EditId: pe.EditId, + EditType: "change", + EditId: pe.Current.Id, Comment: pe.Comment, User: pe.User.Username, UserEditCount: pe.User.EditCount, UserDistinctPagesCount: pe.User.DistinctPages, UserWarningsCount: pe.User.Warns, - PreviousUser: pe.PreviousUser, + PreviousUser: pe.Previous.Username, UserRegistrationTime: pe.User.RegistrationTime, Common: WPEditCommon{ PageMadeTime: pe.Common.PageMadeTime, @@ -69,6 +69,10 @@ func generateXML(pe *model.ProcessEvent) ([]byte, error) { Text: pe.Current.Text, Timestamp: pe.Current.Timestamp, }, + Previous: WPEditRevision{ + Text: pe.Previous.Text, + Timestamp: pe.Previous.Timestamp, + }, }, } @@ -93,6 +97,7 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf return false, err } XmlSpan.End() + logger = logger.WithField("request", XmlSpan) _, scoreSpan := metrics.OtelTracer.Start(ctx, "core.isVandalism.score") defer scoreSpan.End() From f2939ed3e821090c97338fb0e6dd47a2bd951388 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Wed, 11 Dec 2024 12:34:10 +0000 Subject: [PATCH 14/19] db - Add last_revert --- README.md | 1 + cluebot.sql | 37 ++++++++++++++++++++------------- pkg/cbng/processor/revert.go | 11 +++++----- pkg/cbng/wikipedia/wikipedia.go | 18 +++++++++------- 4 files changed, 40 insertions(+), 27 deletions(-) diff --git a/README.md b/README.md index 9fad057..19b8955 100644 --- a/README.md +++ b/README.md @@ -22,6 +22,7 @@ Not supported: * `oftenvandalized.txt` - This was used to emit messages into IRC, but hasn't been used since, but haven't been used since 2018 (80cab4) * `irc.wikimedia.org` - In favour of the HTTP event stream (which backs the IRC relay) +* `titles.txt` - This is replaced by a MySQL table TODO ---- diff --git a/cluebot.sql b/cluebot.sql index 5c63471..ac083e3 100644 --- a/cluebot.sql +++ b/cluebot.sql @@ -1,7 +1,6 @@ -CREATE TABLE IF NOT EXISTS `beaten`; -CREATE TABLE `beaten` +CREATE TABLE IF NOT EXISTS `beaten` ( - `id` int(11) NOT NULL auto_increment, + `id` int(11) NOT NULL auto_increment, `timestamp` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP, `article` varchar(256) NOT NULL, `diff` varchar(512) NOT NULL, @@ -11,16 +10,26 @@ CREATE TABLE `beaten` CREATE TABLE IF NOT EXISTS `vandalism` ( - `id` int(11) NOT NULL auto_increment, - `timestamp` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP, - `user` varchar(256) NOT NULL, - `article` varchar(256) NOT NULL, - `heuristic` varchar(64) NOT NULL, - `regex` varchar(2048) default NULL, - `reason` varchar(512) NOT NULL, - `diff` varchar(512) NOT NULL, - `old_id` int(11) NOT NULL, - `new_id` int(11) NOT NULL, - `reverted` tinyint(1) NOT NULL, + `id` int(11) NOT NULL auto_increment, + `timestamp` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP, + `user` varchar(256) NOT NULL, + `article` varchar(256) NOT NULL, + `heuristic` varchar(64) NOT NULL, + `regex` varchar(2048) default NULL, + `reason` varchar(512) NOT NULL, + `diff` varchar(512) NOT NULL, + `old_id` int(11) NOT NULL, + `new_id` int(11) NOT NULL, + `reverted` tinyint(1) NOT NULL, PRIMARY KEY (`id`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; + +CREATE TABLE IF NOT EXISTS `last_revert` +( + `id` int(11) NOT NULL auto_increment, + `title` varchar(256) NOT NULL, + `user` varchar(256) NOT NULL, + `time` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP, + PRIMARY KEY (`id`), + INDEX (`title`, `user`) +) ENGINE=InnoDB DEFAULT CHARSET=latin1; diff --git a/pkg/cbng/processor/revert.go b/pkg/cbng/processor/revert.go index 3aa2651..6394c6c 100644 --- a/pkg/cbng/processor/revert.go +++ b/pkg/cbng/processor/revert.go @@ -52,13 +52,14 @@ func revertChange(l *logrus.Entry, parentCtx context.Context, api *wikipedia.Wik revComment := "older version" if revertRevision.Id != 0 { metrics.RevertStatus.With(prometheus.Labels{"state": "revert", "status": "failed", "meta": "revision_is_old"}).Inc() - revComment = fmt.Sprintf("version by %+v", revertRevision.User) + revComment = fmt.Sprintf("version by %s", revertRevision.User) } - comment := fmt.Sprintf("Reverting possible vandalism by [[Special:Contribs/%+v|%+v]] to %+v. [[WP:CBFP|Report False Positive?]] Thanks, [[WP:CBNG|%+v]]. (%v) (Bot)", - change.User, change.User, + comment := fmt.Sprintf("Reverting possible vandalism by [[Special:Contribs/%s|%s]] to %s. [[WP:CBFP|Report False Positive?]] Thanks, [[WP:%s|%s]]. (%d) (Bot)", + change.User.Username, change.User.Username, revComment, configuration.Wikipedia.Username, + configuration.Wikipedia.Username, mysqlVandalismId, ) @@ -221,7 +222,7 @@ func shouldRevert(l *logrus.Entry, parentCtx context.Context, configuration *con metrics.RevertStatus.With(prometheus.Labels{"state": "should_revert", "status": "failed", "meta": "high_edit_count"}).Inc() return false } - logger.Infof("Found user edit count, but high warns (%+v)", userWarnRatio) + logger.Infof("Found user edit count, but high warns (%f)", userWarnRatio) change.RevertReason = "User has edit count, but warns > 10%" metrics.RevertStatus.With(prometheus.Labels{"state": "should_revert", "status": "success", "meta": "edit_count_warn_perc"}).Inc() return true @@ -265,7 +266,7 @@ func processSingleRevertChange(logger *logrus.Entry, parentCtx context.Context, ctx, change.User.Username, change.Common.Title, - fmt.Sprintf("ANN scored at %+v", change.VandalismScore), + fmt.Sprintf("ANN scored at %f", change.VandalismScore), change.GetDiffUrl(), change.Previous.Id, change.Current.Id) diff --git a/pkg/cbng/wikipedia/wikipedia.go b/pkg/cbng/wikipedia/wikipedia.go index af71bf0..c2041ed 100644 --- a/pkg/cbng/wikipedia/wikipedia.go +++ b/pkg/cbng/wikipedia/wikipedia.go @@ -459,14 +459,16 @@ func (w *WikipediaApi) GetWarningLevel(l *logrus.Entry, parentCtx context.Contex logger.Warnf("Failed to parse '%v' into int: %v", match[1], err) continue } - t, err := time.Parse("15:04, 02 January 2006 (MST)", match[2]) - if err != nil { - span.SetStatus(codes.Error, err.Error()) - logger.Warnf("Failed to parse '%v' into time: %v", match[2], err) - continue - } - if mlevel > level && t.Second() <= (2*24*60*60) { - level = mlevel + if match[2] != "" { + t, err := time.Parse("15:04, 02 January 2006 (MST)", match[2]) + if err != nil { + span.SetStatus(codes.Error, err.Error()) + logger.Warnf("Failed to parse '%v' into time: %v", match[2], err) + continue + } + if mlevel > level && t.Second() <= (2*24*60*60) { + level = mlevel + } } } return level From 942dc68edc9a4a92990f7474dd275bebc3024080 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Wed, 11 Dec 2024 12:50:29 +0000 Subject: [PATCH 15/19] Cleanup mysql generation --- pkg/cbng/database/cluebot/cluebot.go | 10 +++++----- pkg/cbng/wikipedia/wikipedia.go | 20 +++++--------------- 2 files changed, 10 insertions(+), 20 deletions(-) diff --git a/pkg/cbng/database/cluebot/cluebot.go b/pkg/cbng/database/cluebot/cluebot.go index 0e9dbb0..0300861 100644 --- a/pkg/cbng/database/cluebot/cluebot.go +++ b/pkg/cbng/database/cluebot/cluebot.go @@ -41,8 +41,6 @@ func (ci *CluebotInstance) GenerateVandalismId(logger *logrus.Entry, ctx context _, span := metrics.OtelTracer.Start(ctx, "database.cluebot.GenerateVandalismId") defer span.End() - var vandalismId int64 - db := ci.getDatabaseConnection() defer db.Close() @@ -50,12 +48,14 @@ func (ci *CluebotInstance) GenerateVandalismId(logger *logrus.Entry, ctx context if err != nil { logger.Errorf("Error running query: %v", err) span.SetStatus(codes.Error, err.Error()) - return vandalismId, err + return 0, err } - if vandalismId, err := res.LastInsertId(); err != nil { + + vandalismId, err := res.LastInsertId() + if err != nil { logger.Errorf("Failed to get insert id: %v", err) span.SetStatus(codes.Error, err.Error()) - return vandalismId, err + return 0, err } logger.Debugf("Generated id %v", vandalismId) diff --git a/pkg/cbng/wikipedia/wikipedia.go b/pkg/cbng/wikipedia/wikipedia.go index c2041ed..739d828 100644 --- a/pkg/cbng/wikipedia/wikipedia.go +++ b/pkg/cbng/wikipedia/wikipedia.go @@ -453,21 +453,11 @@ func (w *WikipediaApi) GetWarningLevel(l *logrus.Entry, parentCtx context.Contex matches := regexp.MustCompile(`.*(\d{2}:\d{2}, \d+ [a-zA-Z]+ \d{4} \(UTC\))`).FindAllStringSubmatch(page.Data, -1) level := 0 for _, match := range matches { - mlevel, err := strconv.Atoi(match[1]) - if err != nil { - span.SetStatus(codes.Error, err.Error()) - logger.Warnf("Failed to parse '%v' into int: %v", match[1], err) - continue - } - if match[2] != "" { - t, err := time.Parse("15:04, 02 January 2006 (MST)", match[2]) - if err != nil { - span.SetStatus(codes.Error, err.Error()) - logger.Warnf("Failed to parse '%v' into time: %v", match[2], err) - continue - } - if mlevel > level && t.Second() <= (2*24*60*60) { - level = mlevel + if matchLevel, err := strconv.Atoi(match[1]); err == nil { + if t, err := time.Parse("15:04, 02 January 2006 (MST)", match[2]); err == nil { + if matchLevel > level && t.Second() <= (2*24*60*60) { + level = matchLevel + } } } } From 0c718a21ed874dacfdaf094caf0b77478ee43b67 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Wed, 11 Dec 2024 13:00:49 +0000 Subject: [PATCH 16/19] revert - fix recent revert tracking --- cluebot.sql | 2 +- pkg/cbng/database/cluebot/cluebot.go | 6 ++++-- pkg/cbng/processor/revert.go | 27 +++++++++++++++++++-------- 3 files changed, 24 insertions(+), 11 deletions(-) diff --git a/cluebot.sql b/cluebot.sql index ac083e3..b70c993 100644 --- a/cluebot.sql +++ b/cluebot.sql @@ -29,7 +29,7 @@ CREATE TABLE IF NOT EXISTS `last_revert` `id` int(11) NOT NULL auto_increment, `title` varchar(256) NOT NULL, `user` varchar(256) NOT NULL, - `time` timestamp NOT NULL default CURRENT_TIMESTAMP on update CURRENT_TIMESTAMP, + `time` int NOT NULL, PRIMARY KEY (`id`), INDEX (`title`, `user`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; diff --git a/pkg/cbng/database/cluebot/cluebot.go b/pkg/cbng/database/cluebot/cluebot.go index 0300861..4c3aea1 100644 --- a/pkg/cbng/database/cluebot/cluebot.go +++ b/pkg/cbng/database/cluebot/cluebot.go @@ -175,7 +175,7 @@ func (ci *CluebotInstance) GetLastRevertTime(l *logrus.Entry, ctx context.Contex return revertTime } -func (ci *CluebotInstance) SaveRevertTime(l *logrus.Entry, ctx context.Context, title, user string) int64 { +func (ci *CluebotInstance) SaveRevertTime(l *logrus.Entry, ctx context.Context, title, user string) error { logger := l.WithFields(logrus.Fields{ "function": "database.cluebot.SaveRevertTime", "args": map[string]interface{}{ @@ -195,6 +195,7 @@ func (ci *CluebotInstance) SaveRevertTime(l *logrus.Entry, ctx context.Context, if err != nil { logger.Infof("Error running query: %v", err) span.SetStatus(codes.Error, err.Error()) + return err } else { defer rows.Close() if !rows.Next() { @@ -203,9 +204,10 @@ func (ci *CluebotInstance) SaveRevertTime(l *logrus.Entry, ctx context.Context, if err := rows.Scan(&revertTime); err != nil { logger.Errorf("Error reading rows for query: %v", err) span.SetStatus(codes.Error, err.Error()) + return err } } } - return revertTime + return nil } diff --git a/pkg/cbng/processor/revert.go b/pkg/cbng/processor/revert.go index 6394c6c..370e94b 100644 --- a/pkg/cbng/processor/revert.go +++ b/pkg/cbng/processor/revert.go @@ -246,7 +246,7 @@ func shouldRevert(l *logrus.Entry, parentCtx context.Context, configuration *con // If we reverted this user/page before in the last 24 hours, don't lastRevertTime := db.ClueBot.GetLastRevertTime(logger, ctx, change.Common.Title, change.User.Username) - if lastRevertTime != 0 && lastRevertTime < 86400 { + if lastRevertTime != 0 && lastRevertTime > time.Now().UTC().Unix()-86400 { change.RevertReason = "Reverted before" metrics.RevertStatus.With(prometheus.Labels{"state": "should_revert", "status": "failed", "meta": "recent_revert"}).Inc() return false @@ -276,6 +276,11 @@ func processSingleRevertChange(logger *logrus.Entry, parentCtx context.Context, } logger.Infof("Generated vandalism id %v", mysqlVandalismId) + // Log the revert time for later + if err := db.ClueBot.SaveRevertTime(logger, ctx, change.Common.Title, change.User.Username); err != nil { + logger.Warnf("Failed to save revert time: %v", err) + } + // Revert or not if !shouldRevert(logger, ctx, configuration, db, change) { metrics.EditStatus.With(prometheus.Labels{"state": "revert", "status": "skipped"}).Inc() @@ -294,15 +299,21 @@ func processSingleRevertChange(logger *logrus.Entry, parentCtx context.Context, r.SendRevert(fmt.Sprintf("%s (Reverted) (%s) (%d s)", change.FormatIrcRevert(), change.RevertReason, time.Now().Unix()-change.ReceivedTime.Unix())) r.SendSpam(fmt.Sprintf("%s # %f # %s # Reverted", change.FormatIrcChange(), change.VandalismScore, change.RevertReason)) } else { - metrics.EditStatus.With(prometheus.Labels{"state": "revert", "status": "failed"}).Inc() logger.Infof("Failed to revert") revision := api.GetPage(logger, ctx, helpers.PageTitle(change.Common.Namespace, change.Common.Title)) - if revision != nil && change.User.Username != revision.User { - change.RevertReason = fmt.Sprintf("Beaten by %s", revision.User) - db.ClueBot.MarkVandalismRevertBeaten(logger, ctx, mysqlVandalismId, change.Common.Title, change.GetDiffUrl(), revision.User) - - r.SendRevert(fmt.Sprintf("%s (Not Reverted) (%s) (%d s)", change.FormatIrcRevert(), change.RevertReason, time.Now().Unix()-change.ReceivedTime.Unix())) - r.SendSpam(fmt.Sprintf("%s # %f # %s # Not Reverted", change.FormatIrcChange(), change.VandalismScore, change.RevertReason)) + if revision != nil { + if change.User.Username == revision.User { + metrics.EditStatus.With(prometheus.Labels{"state": "revert", "status": "self_beaten"}).Inc() + } else { + metrics.EditStatus.With(prometheus.Labels{"state": "revert", "status": "beaten"}).Inc() + change.RevertReason = fmt.Sprintf("Beaten by %s", revision.User) + db.ClueBot.MarkVandalismRevertBeaten(logger, ctx, mysqlVandalismId, change.Common.Title, change.GetDiffUrl(), revision.User) + + r.SendRevert(fmt.Sprintf("%s (Not Reverted) (%s) (%d s)", change.FormatIrcRevert(), change.RevertReason, time.Now().Unix()-change.ReceivedTime.Unix())) + r.SendSpam(fmt.Sprintf("%s # %f # %s # Not Reverted", change.FormatIrcChange(), change.VandalismScore, change.RevertReason)) + } + } else { + metrics.EditStatus.With(prometheus.Labels{"state": "revert", "status": "failed"}).Inc() } } return nil From eb05213d1f0cbe00d6e9d01b471c555415fe3466 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Wed, 11 Dec 2024 13:16:59 +0000 Subject: [PATCH 17/19] Purge last revert times after threshold --- cluebot.sql | 4 +--- main.go | 11 +++++++++++ pkg/cbng/config/config.go | 1 + pkg/cbng/database/cluebot/cluebot.go | 17 +++++++++++++++++ pkg/cbng/processor/revert.go | 2 +- 5 files changed, 31 insertions(+), 4 deletions(-) diff --git a/cluebot.sql b/cluebot.sql index b70c993..0c5ef08 100644 --- a/cluebot.sql +++ b/cluebot.sql @@ -26,10 +26,8 @@ CREATE TABLE IF NOT EXISTS `vandalism` CREATE TABLE IF NOT EXISTS `last_revert` ( - `id` int(11) NOT NULL auto_increment, `title` varchar(256) NOT NULL, `user` varchar(256) NOT NULL, `time` int NOT NULL, - PRIMARY KEY (`id`), - INDEX (`title`, `user`) + PRIMARY KEY (`title`, `user`) ) ENGINE=InnoDB DEFAULT CHARSET=latin1; diff --git a/main.go b/main.go index a6da25d..0d7dd3e 100644 --- a/main.go +++ b/main.go @@ -47,6 +47,16 @@ func RunMetricPoller(wg *sync.WaitGroup, toPageMetadataLoader, toPageRecentEditC } } +func RunDatabasePurger(wg *sync.WaitGroup, db *database.DatabaseConnection) { + wg.Add(1) + defer wg.Done() + + timer := time.NewTicker(time.Hour) + for range timer.C { + db.ClueBot.PurgeOldRevertTimes() + } +} + func main() { var wg sync.WaitGroup var debugLogging bool @@ -151,6 +161,7 @@ func main() { toRevertProcessor := make(chan *model.ProcessEvent, 10000) go RunMetricPoller(&wg, toPageMetadataLoader, toPageRecentEditCountLoader, toPageRecentRevertCountLoader, toUserEditCountLoader, toUserWarnsCountLoader, toUserDistinctPagesCountLoader, toRevisionLoader, toScoringProcessor, toRevertProcessor, r, db) + go RunDatabasePurger(&wg, db) go feed.ConsumeHttpChangeEvents(&wg, configuration, toReplicationWatcher) go processor.ReplicationWatcher(&wg, configuration, db, ignoreReplicationDelay, toReplicationWatcher, toPageMetadataLoader) diff --git a/pkg/cbng/config/config.go b/pkg/cbng/config/config.go index a9bae07..be7ce4c 100644 --- a/pkg/cbng/config/config.go +++ b/pkg/cbng/config/config.go @@ -9,6 +9,7 @@ import ( ) var ReleaseTag = "development" +var RecentRevertThreshold = int64(86400) type BotConfiguration struct { Owner string diff --git a/pkg/cbng/database/cluebot/cluebot.go b/pkg/cbng/database/cluebot/cluebot.go index 4c3aea1..e76ffd9 100644 --- a/pkg/cbng/database/cluebot/cluebot.go +++ b/pkg/cbng/database/cluebot/cluebot.go @@ -211,3 +211,20 @@ func (ci *CluebotInstance) SaveRevertTime(l *logrus.Entry, ctx context.Context, return nil } + +func (ci *CluebotInstance) PurgeOldRevertTimes() { + logger := logrus.WithFields(logrus.Fields{ + "function": "database.cluebot.PurgeOldRevertTimes", + }) + _, span := metrics.OtelTracer.Start(context.Background(), "database.cluebot.PurgeOldRevertTimes") + defer span.End() + + db := ci.getDatabaseConnection() + defer db.Close() + + _, err := db.Exec("DELETE FROM `last_revert` WHERE `time` < ?", time.Now().UTC().Unix()-(config.RecentRevertThreshold+10)) + if err != nil { + logger.Warnf("Error purging database: %v", err) + span.SetStatus(codes.Error, err.Error()) + } +} diff --git a/pkg/cbng/processor/revert.go b/pkg/cbng/processor/revert.go index 370e94b..5eca910 100644 --- a/pkg/cbng/processor/revert.go +++ b/pkg/cbng/processor/revert.go @@ -246,7 +246,7 @@ func shouldRevert(l *logrus.Entry, parentCtx context.Context, configuration *con // If we reverted this user/page before in the last 24 hours, don't lastRevertTime := db.ClueBot.GetLastRevertTime(logger, ctx, change.Common.Title, change.User.Username) - if lastRevertTime != 0 && lastRevertTime > time.Now().UTC().Unix()-86400 { + if lastRevertTime != 0 && lastRevertTime > time.Now().UTC().Unix()-config.RecentRevertThreshold { change.RevertReason = "Reverted before" metrics.RevertStatus.With(prometheus.Labels{"state": "should_revert", "status": "failed", "meta": "recent_revert"}).Inc() return false From 62688665fb0a40cd367d56a14d04b1fa65ea2acc Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Wed, 11 Dec 2024 13:25:32 +0000 Subject: [PATCH 18/19] core - set request to data, not span --- pkg/cbng/processor/core.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/cbng/processor/core.go b/pkg/cbng/processor/core.go index 472b3d0..118c4b7 100644 --- a/pkg/cbng/processor/core.go +++ b/pkg/cbng/processor/core.go @@ -97,7 +97,7 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf return false, err } XmlSpan.End() - logger = logger.WithField("request", XmlSpan) + logger = logger.WithField("request", xmlData) _, scoreSpan := metrics.OtelTracer.Start(ctx, "core.isVandalism.score") defer scoreSpan.End() From a179b13f7175cee53070fbc5b6c5ef8652f16309 Mon Sep 17 00:00:00 2001 From: Damian Zaremba Date: Thu, 12 Dec 2024 14:16:11 +0000 Subject: [PATCH 19/19] core - split core span into 2 --- pkg/cbng/processor/core.go | 9 ++++----- 1 file changed, 4 insertions(+), 5 deletions(-) diff --git a/pkg/cbng/processor/core.go b/pkg/cbng/processor/core.go index 118c4b7..34611c0 100644 --- a/pkg/cbng/processor/core.go +++ b/pkg/cbng/processor/core.go @@ -99,12 +99,10 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf XmlSpan.End() logger = logger.WithField("request", xmlData) - _, scoreSpan := metrics.OtelTracer.Start(ctx, "core.isVandalism.score") - defer scoreSpan.End() - coreUrl := fmt.Sprintf("%s:%d", coreHost, configuration.Core.Port) logger.Tracef("Connecting to %v", coreUrl) + _, scoreSpan := metrics.OtelTracer.Start(ctx, "core.isVandalism.score") dialer := net.Dialer{Timeout: time.Second * 5} conn, err := dialer.Dial("tcp", coreUrl) if err != nil { @@ -121,7 +119,6 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf } response := []byte{} tmp := make([]byte, 4096) - i := 0 for { n, err := conn.Read(tmp) if err != nil { @@ -134,10 +131,12 @@ func isVandalism(l *logrus.Entry, parentCtx context.Context, configuration *conf if strings.Contains(string(response), "") { break } - i += 1 } + defer scoreSpan.End() logger = logger.WithField("response", response) + _, scoreDecodeSpan := metrics.OtelTracer.Start(ctx, "core.isVandalism.score.decode") + defer scoreDecodeSpan.End() editSet := model.WPEditScoreSet{} if err := xml.Unmarshal(response, &editSet); err != nil { scoreSpan.SetStatus(codes.Error, err.Error())