From 97c08df59019536a8284e9b93d81bbe8eed0cae0 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Tue, 13 Aug 2024 13:19:00 +0300 Subject: [PATCH] client: add periodic reconnect and connection errors --- go.mod | 4 +- go.sum | 8 +-- pkg/connector/client.go | 108 +++++++++++++++++++++++++++++--- pkg/connector/config.go | 3 + pkg/connector/handlematrix.go | 39 ++++++++++++ pkg/connector/handlemeta.go | 8 ++- pkg/connector/handlewhatsapp.go | 17 +++-- 7 files changed, 166 insertions(+), 21 deletions(-) diff --git a/go.mod b/go.mod index c8bfc53..b3cd967 100644 --- a/go.mod +++ b/go.mod @@ -13,7 +13,7 @@ require ( github.com/tidwall/gjson v1.17.3 github.com/zyedidia/clipboard v1.0.4 go.mau.fi/libsignal v0.1.1 - go.mau.fi/util v0.6.1-0.20240811184504-b00aa5c5af3a + go.mau.fi/util v0.6.1-0.20240813094622-892c5e0ea9b1 go.mau.fi/whatsmeow v0.0.0-20240811142232-82a29759f1fc golang.org/x/crypto v0.26.0 golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa @@ -22,7 +22,7 @@ require ( google.golang.org/protobuf v1.34.2 gopkg.in/yaml.v3 v3.0.1 maunium.net/go/mauflag v1.0.0 - maunium.net/go/mautrix v0.19.1-0.20240812160906-091a18d448de + maunium.net/go/mautrix v0.19.1-0.20240812193118-1e98cb6a2ec8 ) require ( diff --git a/go.sum b/go.sum index cab7c17..dbb1b86 100644 --- a/go.sum +++ b/go.sum @@ -57,8 +57,8 @@ github.com/zyedidia/clipboard v1.0.4 h1:r6GUQOyPtIaApRLeD56/U+2uJbXis6ANGbKWCljU github.com/zyedidia/clipboard v1.0.4/go.mod h1:zykFnZUXX0ErxqvYLUFEq7QDJKId8rmh2FgD0/Y8cjA= go.mau.fi/libsignal v0.1.1 h1:m/0PGBh4QKP/I1MQ44ti4C0fMbLMuHb95cmDw01FIpI= go.mau.fi/libsignal v0.1.1/go.mod h1:QLs89F/OA3ThdSL2Wz2p+o+fi8uuQUz0e1BRa6ExdBw= -go.mau.fi/util v0.6.1-0.20240811184504-b00aa5c5af3a h1:A6AeueGxoDjSSf2X8Tz8X9nQ2S65uYWGVwlvTZa7Bjs= -go.mau.fi/util v0.6.1-0.20240811184504-b00aa5c5af3a/go.mod h1:ZRiX8FK4CsqVINI+3YK50nHnc+dKhfTZNf38zI31S/0= +go.mau.fi/util v0.6.1-0.20240813094622-892c5e0ea9b1 h1:cnTb0c782DRD375gfsZqttKCvLPykEX3LNswJ6DwADg= +go.mau.fi/util v0.6.1-0.20240813094622-892c5e0ea9b1/go.mod h1:ZRiX8FK4CsqVINI+3YK50nHnc+dKhfTZNf38zI31S/0= go.mau.fi/whatsmeow v0.0.0-20240811142232-82a29759f1fc h1:LMKV0RwBZzTwUJ6BwXN3u0LVxNbhFbfuAfqT+nYKiIc= go.mau.fi/whatsmeow v0.0.0-20240811142232-82a29759f1fc/go.mod h1:BhHKalSq0qNtSCuGIUIvoJyU5KbT4a7k8DQ5yw1Ssk4= go.mau.fi/zeroconfig v0.1.3 h1:As9wYDKmktjmNZW5i1vn8zvJlmGKHeVxHVIBMXsm4kM= @@ -89,5 +89,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M= maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA= -maunium.net/go/mautrix v0.19.1-0.20240812160906-091a18d448de h1:eSzaQgyejWS4ok+l3DaXufqUXM1U/bA4LXfy1ei64CE= -maunium.net/go/mautrix v0.19.1-0.20240812160906-091a18d448de/go.mod h1:GtFIC8z1F1EmpwYIG2QOhhg7/TwxeTX82OBhegABr9s= +maunium.net/go/mautrix v0.19.1-0.20240812193118-1e98cb6a2ec8 h1:QAjuJRjpkVcql+Sauym+uUKDaq+P8SMvk62+TJjOr4c= +maunium.net/go/mautrix v0.19.1-0.20240812193118-1e98cb6a2ec8/go.mod h1:GtFIC8z1F1EmpwYIG2QOhhg7/TwxeTX82OBhegABr9s= diff --git a/pkg/connector/client.go b/pkg/connector/client.go index 47b1ea3..8ce2c1e 100644 --- a/pkg/connector/client.go +++ b/pkg/connector/client.go @@ -8,6 +8,8 @@ import ( "sync/atomic" "time" + "github.com/rs/zerolog" + "go.mau.fi/util/exsync" "go.mau.fi/whatsmeow" "go.mau.fi/whatsmeow/store" waTypes "go.mau.fi/whatsmeow/types" @@ -34,6 +36,11 @@ type MetaClient struct { backfillCollectors map[int64]*BackfillCollector backfillLock sync.Mutex + stopPeriodicReconnect atomic.Pointer[context.CancelFunc] + lastFullReconnect time.Time + connectWaiter *exsync.Event + e2eeConnectWaiter *exsync.Event + E2EEClient *whatsmeow.Client WADevice *store.Device e2eeConnectLock sync.Mutex @@ -53,6 +60,9 @@ func (m *MetaConnector) LoadUserLogin(ctx context.Context, login *bridgev2.UserL incomingTables: make(chan *table.LSTable, 16), backfillCollectors: make(map[int64]*BackfillCollector), + + connectWaiter: exsync.NewEvent(), + e2eeConnectWaiter: exsync.NewEvent(), } c.Client.SetEventHandler(c.handleMetaEvent) login.Client = c @@ -64,7 +74,51 @@ var _ bridgev2.NetworkAPI = (*MetaClient)(nil) func (m *MetaClient) Connect(ctx context.Context) error { currentUser, initialTable, err := m.Client.LoadMessagesPage() if err != nil { - return fmt.Errorf("failed to load messages page: %w", err) + if stopPeriodicReconnect := m.stopPeriodicReconnect.Swap(nil); stopPeriodicReconnect != nil { + (*stopPeriodicReconnect)() + } + if errors.Is(err, messagix.ErrTokenInvalidated) { + m.UserLogin.BridgeState.Send(status.BridgeState{ + StateEvent: status.StateBadCredentials, + Error: MetaCookieRemoved, + }) + // TODO clear cookies? + } else if errors.Is(err, messagix.ErrChallengeRequired) { + m.UserLogin.BridgeState.Send(status.BridgeState{ + StateEvent: status.StateBadCredentials, + Error: IGChallengeRequired, + }) + } else if errors.Is(err, messagix.ErrAccountSuspended) { + m.UserLogin.BridgeState.Send(status.BridgeState{ + StateEvent: status.StateBadCredentials, + Error: IGAccountSuspended, + }) + } else if errors.Is(err, messagix.ErrConsentRequired) { + code := IGConsentRequired + if m.LoginMeta.Platform.IsMessenger() { + code = FBConsentRequired + } + m.UserLogin.BridgeState.Send(status.BridgeState{ + StateEvent: status.StateBadCredentials, + Error: code, + }) + } else if lsErr := (&types.ErrorResponse{}); errors.As(err, &lsErr) { + stateEvt := status.StateUnknownError + if lsErr.ErrorCode == 1357053 { + stateEvt = status.StateBadCredentials + } + m.UserLogin.BridgeState.Send(status.BridgeState{ + StateEvent: stateEvt, + Error: status.BridgeStateErrorCode(fmt.Sprintf("meta-lserror-%d", lsErr.ErrorCode)), + Message: lsErr.Error(), + }) + } else { + m.UserLogin.BridgeState.Send(status.BridgeState{ + StateEvent: status.StateUnknownError, + Error: MetaConnectError, + }) + } + return nil } return m.connectWithTable(ctx, initialTable, currentUser) } @@ -91,9 +145,35 @@ func (m *MetaClient) connectWithTable(ctx context.Context, initialTable *table.L return err } + go m.periodicReconnect() + return nil } +func (m *MetaClient) periodicReconnect() { + if m.Main.Config.ForceRefreshIntervalSeconds <= 0 { + return + } + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + if oldCancel := m.stopPeriodicReconnect.Swap(&cancel); oldCancel != nil { + (*oldCancel)() + } + interval := time.Duration(m.Main.Config.ForceRefreshIntervalSeconds) * time.Second + timer := time.NewTimer(interval) + defer timer.Stop() + m.UserLogin.Log.Info().Stringer("interval", interval).Msg("Starting periodic reconnect loop") + for { + select { + case <-timer.C: + m.UserLogin.Log.Info().Msg("Doing periodic reconnect") + m.FullReconnect() + case <-ctx.Done(): + return + } + } +} + func (m *MetaClient) connectE2EE() error { m.e2eeConnectLock.Lock() defer m.e2eeConnectLock.Unlock() @@ -155,9 +235,12 @@ func (m *MetaClient) Disconnect() { } m.metaState = status.BridgeState{} m.waState = status.BridgeState{} - if stopTableLoop := m.stopHandlingTables.Load(); stopTableLoop != nil { + if stopTableLoop := m.stopHandlingTables.Swap(nil); stopTableLoop != nil { (*stopTableLoop)() } + if stopPeriodicReconnect := m.stopPeriodicReconnect.Swap(nil); stopPeriodicReconnect != nil { + (*stopPeriodicReconnect)() + } } var metaCaps = &bridgev2.NetworkRoomCapabilities{ @@ -177,8 +260,6 @@ func (m *MetaClient) GetCapabilities(ctx context.Context, portal *bridgev2.Porta return metaCaps } -var ErrServerRejectedMessage = bridgev2.WrapErrorInStatus(errors.New("server rejected message")).WithErrorAsMessage().WithSendNotice(true) - func (m *MetaClient) IsLoggedIn() bool { return m.Client != nil } @@ -188,16 +269,27 @@ func (m *MetaClient) IsThisUser(ctx context.Context, userID networkid.UserID) bo } func (m *MetaClient) LogoutRemote(ctx context.Context) { - panic("unimplemented") + m.Disconnect() + err := m.WADevice.Delete() + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to delete device from store") + } + m.resetWADevice() } func (m *MetaClient) canReconnect() bool { - //return time.Since(user.lastFullReconnect) > time.Duration(user.bridge.Config.Meta.MinFullReconnectIntervalSeconds)*time.Second - return false + return time.Since(m.lastFullReconnect) > time.Duration(m.Main.Config.MinFullReconnectIntervalSeconds)*time.Second } func (m *MetaClient) FullReconnect() { - panic("unimplemented") + ctx := m.UserLogin.Log.WithContext(context.TODO()) + m.connectWaiter.Clear() + m.e2eeConnectWaiter.Clear() + m.Disconnect() + err := m.Connect(ctx) + if err != nil { + zerolog.Ctx(ctx).Err(err).Msg("Failed to reconnect") + } } func (m *MetaClient) resetWADevice() { diff --git a/pkg/connector/config.go b/pkg/connector/config.go index 8891a35..0aba662 100644 --- a/pkg/connector/config.go +++ b/pkg/connector/config.go @@ -25,6 +25,9 @@ type Config struct { DisableXMABackfill bool `yaml:"disable_xma_backfill"` DisableXMAAlways bool `yaml:"disable_xma_always"` + MinFullReconnectIntervalSeconds int `yaml:"min_full_reconnect_interval_seconds"` + ForceRefreshIntervalSeconds int `yaml:"force_refresh_interval_seconds"` + DisplaynameTemplate string `yaml:"displayname_template"` displaynameTemplate *template.Template `yaml:"-"` } diff --git a/pkg/connector/handlematrix.go b/pkg/connector/handlematrix.go index fcbc7aa..7362e92 100644 --- a/pkg/connector/handlematrix.go +++ b/pkg/connector/handlematrix.go @@ -30,6 +30,13 @@ var ( _ bridgev2.ReadReceiptHandlingNetworkAPI = (*MetaClient)(nil) ) +var ( + ErrServerRejectedMessage = bridgev2.WrapErrorInStatus(errors.New("server rejected message")).WithErrorAsMessage().WithSendNotice(true) + ErrNotConnected = bridgev2.WrapErrorInStatus(errors.New("not connected")).WithErrorAsMessage().WithSendNotice(true) +) + +const ConnectWaitTimeout = 1 * time.Minute + func (m *MetaClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (*bridgev2.MatrixMessageResponse, error) { log := zerolog.Ctx(ctx) @@ -37,6 +44,10 @@ func (m *MetaClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.Matr switch portalMeta.ThreadType { case table.ENCRYPTED_OVER_WA_ONE_TO_ONE, table.ENCRYPTED_OVER_WA_GROUP: + if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) { + return nil, ErrNotConnected + } + waMsg, waMeta, err := m.Main.MsgConv.ToWhatsApp(ctx, msg.Event, msg.Content, msg.Portal, m.E2EEClient, msg.OrigSender != nil, msg.ReplyTo) if err != nil { return nil, fmt.Errorf("failed to convert message: %w", err) @@ -58,6 +69,10 @@ func (m *MetaClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.Matr }, }, nil default: + if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) { + return nil, ErrNotConnected + } + tasks, otid, err := m.Main.MsgConv.ToMeta(ctx, m.Client, msg.Event, msg.Content, msg.ReplyTo, msg.OrigSender != nil, msg.Portal) if errors.Is(err, types.ErrPleaseReloadPage) { // TODO handle properly @@ -180,6 +195,9 @@ func wrapRevoke(message *waConsumerApplication.ConsumerApplication_RevokeMessage func (m *MetaClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2.MatrixReaction) (*database.Reaction, error) { switch messageID := metaid.ParseMessageID(msg.TargetMessage.ID).(type) { case metaid.ParsedFBMessageID: + if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) { + return nil, ErrNotConnected + } resp, err := m.Client.ExecuteTasks(&socket.SendReactionTask{ ThreadKey: metaid.ParseFBPortalID(msg.Portal.ID), TimestampMs: msg.Event.Timestamp, @@ -196,6 +214,9 @@ func (m *MetaClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2.Mat zerolog.Ctx(ctx).Trace().Any("response", resp).Msg("Meta reaction response") return &database.Reaction{}, nil case metaid.ParsedWAMessageID: + if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) { + return nil, ErrNotConnected + } consumerMsg := wrapReaction(&waConsumerApplication.ConsumerApplication_ReactionMessage{ Key: m.messageIDToWAKey(messageID), Text: ptr.Ptr(msg.PreHandleResp.Emoji), @@ -213,6 +234,9 @@ func (m *MetaClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2.Mat func (m *MetaClient) HandleMatrixReactionRemove(ctx context.Context, msg *bridgev2.MatrixReactionRemove) error { switch messageID := metaid.ParseMessageID(msg.TargetReaction.MessageID).(type) { case metaid.ParsedFBMessageID: + if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) { + return ErrNotConnected + } resp, err := m.Client.ExecuteTasks(&socket.SendReactionTask{ ThreadKey: metaid.ParseFBPortalID(msg.Portal.ID), TimestampMs: msg.Event.Timestamp, @@ -228,6 +252,9 @@ func (m *MetaClient) HandleMatrixReactionRemove(ctx context.Context, msg *bridge zerolog.Ctx(ctx).Trace().Any("response", resp).Msg("Meta reaction remove response") return nil case metaid.ParsedWAMessageID: + if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) { + return ErrNotConnected + } consumerMsg := wrapReaction(&waConsumerApplication.ConsumerApplication_ReactionMessage{ Key: m.messageIDToWAKey(messageID), Text: ptr.Ptr(""), @@ -246,6 +273,9 @@ func (m *MetaClient) HandleMatrixEdit(ctx context.Context, edit *bridgev2.Matrix log := zerolog.Ctx(ctx) switch messageID := metaid.ParseMessageID(edit.EditTarget.ID).(type) { case metaid.ParsedFBMessageID: + if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) { + return ErrNotConnected + } fakeSendTasks, _, err := m.Main.MsgConv.ToMeta(ctx, m.Client, edit.Event, edit.Content, nil, false, edit.Portal) if err != nil { return fmt.Errorf("failed to convert message: %w", err) @@ -287,6 +317,9 @@ func (m *MetaClient) HandleMatrixEdit(ctx context.Context, edit *bridgev2.Matrix return nil case metaid.ParsedWAMessageID: + if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) { + return ErrNotConnected + } consumerMsg := wrapEdit(&waConsumerApplication.ConsumerApplication_EditMessage{ Key: m.messageIDToWAKey(messageID), Message: m.Main.MsgConv.TextToWhatsApp(edit.Content), @@ -305,11 +338,17 @@ func (m *MetaClient) HandleMatrixMessageRemove(ctx context.Context, msg *bridgev log := zerolog.Ctx(ctx) switch messageID := metaid.ParseMessageID(msg.TargetMessage.ID).(type) { case metaid.ParsedFBMessageID: + if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) { + return ErrNotConnected + } resp, err := m.Client.ExecuteTasks(&socket.DeleteMessageTask{MessageId: messageID.ID}) // TODO does the response data need to be checked? log.Trace().Any("response", resp).Msg("Meta delete response") return err case metaid.ParsedWAMessageID: + if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) { + return ErrNotConnected + } consumerMsg := wrapRevoke(&waConsumerApplication.ConsumerApplication_RevokeMessage{ Key: m.messageIDToWAKey(messageID), }) diff --git a/pkg/connector/handlemeta.go b/pkg/connector/handlemeta.go index 1af845f..b62e299 100644 --- a/pkg/connector/handlemeta.go +++ b/pkg/connector/handlemeta.go @@ -68,6 +68,7 @@ func (m *MetaClient) handleMetaEvent(rawEvt any) { } case *messagix.Event_Ready: log.Debug().Msg("Initial connect to Meta socket completed") + m.connectWaiter.Set() if tbl := m.initialTable.Swap(nil); tbl != nil { log.Debug().Msg("Sending cached initial table to handler") m.incomingTables <- tbl @@ -91,6 +92,7 @@ func (m *MetaClient) handleMetaEvent(rawEvt any) { m.UserLogin.BridgeState.Send(m.metaState) case *messagix.Event_Reconnected: log.Debug().Msg("Reconnected to Meta socket") + m.connectWaiter.Set() m.metaState = status.BridgeState{StateEvent: status.StateConnected} m.UserLogin.BridgeState.Send(m.metaState) case *messagix.Event_PermanentError: @@ -123,9 +125,9 @@ func (m *MetaClient) handleMetaEvent(rawEvt any) { } } m.UserLogin.BridgeState.Send(m.metaState) - //if user.forceRefreshTimer != nil { - // user.forceRefreshTimer.Stop() - //} + if stopPeriodicReconnect := m.stopPeriodicReconnect.Swap(nil); stopPeriodicReconnect != nil { + (*stopPeriodicReconnect)() + } default: log.Warn().Type("event_type", evt).Msg("Unrecognized event type from messagix") } diff --git a/pkg/connector/handlewhatsapp.go b/pkg/connector/handlewhatsapp.go index 973ad9d..2228353 100644 --- a/pkg/connector/handlewhatsapp.go +++ b/pkg/connector/handlewhatsapp.go @@ -97,6 +97,7 @@ func (m *MetaClient) e2eeEventHandler(rawEvt any) { }) case *events.Connected: log.Debug().Msg("Connected to WhatsApp socket") + m.connectWaiter.Set() m.waState = status.BridgeState{StateEvent: status.StateConnected} m.UserLogin.BridgeState.Send(m.waState) case *events.Disconnected: @@ -129,14 +130,22 @@ func (m *MetaClient) e2eeEventHandler(rawEvt any) { } case *events.ConnectFailure: if e.Reason == events.ConnectFailureNotFound { - if m.E2EEClient != nil { - m.E2EEClient.Disconnect() - m.WADevice.Delete() + if cli := m.E2EEClient; cli != nil { + cli.Disconnect() + err := m.WADevice.Delete() + if err != nil { + log.Err(err).Msg("Failed to delete WhatsApp device after 415 error") + } m.resetWADevice() m.E2EEClient = nil } log.Debug().Msg("Reconnecting e2ee client after WhatsApp 415 error") - go m.connectE2EE() + go func() { + err := m.connectE2EE() + if err != nil { + log.Err(err).Msg("Error connecting to e2ee after 415 error") + } + }() } }