From 35f2b31440b0906bd79f3d47bf8f606e5f2d6e02 Mon Sep 17 00:00:00 2001 From: Javier Cuevas Date: Wed, 22 May 2024 10:32:06 +0200 Subject: [PATCH] Improve sending messages while reconnecting (#63) * Add sync.Cond and CanSendMessages flag to `messagix.Client` * Enable sending messages on Event_Ready, and wait for it on handleMatrixMessages * Add `EnableSendingMessages` and `WaitUntilCanSendMessages` to `Client` * Add sending retry to Meta messages * Improve error handling * Improve error handling * Add `disableSendingMessages` and improve `WaitUntilCanSendMessages` * Move `disableSendingMessages` call --- messagix/client.go | 58 ++++++++++++++++++++++++++++++++++++++++------ messagix/events.go | 3 +++ portal.go | 20 ++++++++++++++-- 3 files changed, 72 insertions(+), 9 deletions(-) diff --git a/messagix/client.go b/messagix/client.go index 1bf14fe..f6c9b19 100644 --- a/messagix/client.go +++ b/messagix/client.go @@ -72,6 +72,9 @@ type Client struct { unnecessaryCATRequests int stopCurrentConnection atomic.Pointer[context.CancelFunc] + + canSendMessages bool + sendMessagesCond *sync.Cond } func NewClient(cookies *cookies.Cookies, logger zerolog.Logger) *Client { @@ -88,13 +91,15 @@ func NewClient(cookies *cookies.Cookies, logger zerolog.Logger) *Client { }, Timeout: 60 * time.Second, }, - cookies: cookies, - Logger: logger, - lsRequests: 0, - graphQLRequests: 1, - platform: cookies.Platform, - activeTasks: make([]int, 0), - taskMutex: &sync.Mutex{}, + cookies: cookies, + Logger: logger, + lsRequests: 0, + graphQLRequests: 1, + platform: cookies.Platform, + activeTasks: make([]int, 0), + taskMutex: &sync.Mutex{}, + canSendMessages: false, + sendMessagesCond: sync.NewCond(&sync.Mutex{}), } cli.http.CheckRedirect = cli.checkHTTPRedirect @@ -222,6 +227,7 @@ func (c *Client) Connect() error { for { connectStart := time.Now() err := c.socket.Connect() + c.disableSendingMessages() if ctx.Err() != nil { return } @@ -388,3 +394,41 @@ func (c *Client) GetTaskId() int { c.activeTasks = append(c.activeTasks, id) return id } + +func (c *Client) EnableSendingMessages() { + c.sendMessagesCond.L.Lock() + c.canSendMessages = true + c.sendMessagesCond.Broadcast() + c.sendMessagesCond.L.Unlock() +} + +func (c *Client) disableSendingMessages() { + c.sendMessagesCond.L.Lock() + c.canSendMessages = false + c.sendMessagesCond.L.Unlock() +} + +func (c *Client) WaitUntilCanSendMessages(timeout time.Duration) error { + timer := time.NewTimer(timeout) + defer timer.Stop() + + done := make(chan struct{}) + go func() { + c.sendMessagesCond.L.Lock() + defer c.sendMessagesCond.L.Unlock() + c.sendMessagesCond.Wait() + close(done) + }() + + for !c.canSendMessages { + select { + case <-timer.C: + return fmt.Errorf("timeout waiting for canSendMessages") + case <-done: + if c.canSendMessages { + return nil + } + } + } + return nil +} diff --git a/messagix/events.go b/messagix/events.go index 3c273cb..73af30e 100644 --- a/messagix/events.go +++ b/messagix/events.go @@ -109,6 +109,9 @@ func (s *Socket) handleReadyEvent(data *Event_Ready) error { data.client = s.client s.client.eventHandler(data.Finish()) s.previouslyConnected = true + + s.client.EnableSendingMessages() + return nil } diff --git a/portal.go b/portal.go index a556f8b..45fc81c 100644 --- a/portal.go +++ b/portal.go @@ -54,6 +54,8 @@ import ( "go.mau.fi/mautrix-meta/msgconv" ) +const MaxMetaSendAttempts = 5 + func (br *MetaBridge) GetPortalByMXID(mxid id.RoomID) *Portal { br.portalsLock.Lock() defer br.portalsLock.Unlock() @@ -651,10 +653,24 @@ func (portal *Portal) handleMatrixMessage(ctx context.Context, sender *User, evt portal.pendingMessages[otid] = evt.ID messageTS := time.Now() var resp *table.LSTable - resp, err = sender.Client.ExecuteTasks(tasks...) + + retries := 0 + for retries < MaxMetaSendAttempts { + if err = sender.Client.WaitUntilCanSendMessages(15 * time.Second); err != nil { + log.Err(err).Msg("Error waiting to be able to send messages, retrying") + } else { + resp, err = sender.Client.ExecuteTasks(tasks...) + if err == nil { + break + } + log.Err(err).Msg("Failed to send message to Meta, retrying") + } + retries++ + } + log.Trace().Any("response", resp).Msg("Meta send response") var msgID string - if err == nil { + if resp != nil && err == nil { for _, replace := range resp.LSReplaceOptimsiticMessage { if replace.OfflineThreadingId == otidStr { msgID = replace.MessageId