Skip to content

Commit

Permalink
client: add periodic reconnect and connection errors
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Aug 13, 2024
1 parent fea77e3 commit 97c08df
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 21 deletions.
4 changes: 2 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ require (
github.com/tidwall/gjson v1.17.3
github.com/zyedidia/clipboard v1.0.4
go.mau.fi/libsignal v0.1.1
go.mau.fi/util v0.6.1-0.20240811184504-b00aa5c5af3a
go.mau.fi/util v0.6.1-0.20240813094622-892c5e0ea9b1
go.mau.fi/whatsmeow v0.0.0-20240811142232-82a29759f1fc
golang.org/x/crypto v0.26.0
golang.org/x/exp v0.0.0-20240808152545-0cdaa3abc0fa
Expand All @@ -22,7 +22,7 @@ require (
google.golang.org/protobuf v1.34.2
gopkg.in/yaml.v3 v3.0.1
maunium.net/go/mauflag v1.0.0
maunium.net/go/mautrix v0.19.1-0.20240812160906-091a18d448de
maunium.net/go/mautrix v0.19.1-0.20240812193118-1e98cb6a2ec8
)

require (
Expand Down
8 changes: 4 additions & 4 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -57,8 +57,8 @@ github.com/zyedidia/clipboard v1.0.4 h1:r6GUQOyPtIaApRLeD56/U+2uJbXis6ANGbKWCljU
github.com/zyedidia/clipboard v1.0.4/go.mod h1:zykFnZUXX0ErxqvYLUFEq7QDJKId8rmh2FgD0/Y8cjA=
go.mau.fi/libsignal v0.1.1 h1:m/0PGBh4QKP/I1MQ44ti4C0fMbLMuHb95cmDw01FIpI=
go.mau.fi/libsignal v0.1.1/go.mod h1:QLs89F/OA3ThdSL2Wz2p+o+fi8uuQUz0e1BRa6ExdBw=
go.mau.fi/util v0.6.1-0.20240811184504-b00aa5c5af3a h1:A6AeueGxoDjSSf2X8Tz8X9nQ2S65uYWGVwlvTZa7Bjs=
go.mau.fi/util v0.6.1-0.20240811184504-b00aa5c5af3a/go.mod h1:ZRiX8FK4CsqVINI+3YK50nHnc+dKhfTZNf38zI31S/0=
go.mau.fi/util v0.6.1-0.20240813094622-892c5e0ea9b1 h1:cnTb0c782DRD375gfsZqttKCvLPykEX3LNswJ6DwADg=
go.mau.fi/util v0.6.1-0.20240813094622-892c5e0ea9b1/go.mod h1:ZRiX8FK4CsqVINI+3YK50nHnc+dKhfTZNf38zI31S/0=
go.mau.fi/whatsmeow v0.0.0-20240811142232-82a29759f1fc h1:LMKV0RwBZzTwUJ6BwXN3u0LVxNbhFbfuAfqT+nYKiIc=
go.mau.fi/whatsmeow v0.0.0-20240811142232-82a29759f1fc/go.mod h1:BhHKalSq0qNtSCuGIUIvoJyU5KbT4a7k8DQ5yw1Ssk4=
go.mau.fi/zeroconfig v0.1.3 h1:As9wYDKmktjmNZW5i1vn8zvJlmGKHeVxHVIBMXsm4kM=
Expand Down Expand Up @@ -89,5 +89,5 @@ gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA=
gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM=
maunium.net/go/mauflag v1.0.0 h1:YiaRc0tEI3toYtJMRIfjP+jklH45uDHtT80nUamyD4M=
maunium.net/go/mauflag v1.0.0/go.mod h1:nLivPOpTpHnpzEh8jEdSL9UqO9+/KBJFmNRlwKfkPeA=
maunium.net/go/mautrix v0.19.1-0.20240812160906-091a18d448de h1:eSzaQgyejWS4ok+l3DaXufqUXM1U/bA4LXfy1ei64CE=
maunium.net/go/mautrix v0.19.1-0.20240812160906-091a18d448de/go.mod h1:GtFIC8z1F1EmpwYIG2QOhhg7/TwxeTX82OBhegABr9s=
maunium.net/go/mautrix v0.19.1-0.20240812193118-1e98cb6a2ec8 h1:QAjuJRjpkVcql+Sauym+uUKDaq+P8SMvk62+TJjOr4c=
maunium.net/go/mautrix v0.19.1-0.20240812193118-1e98cb6a2ec8/go.mod h1:GtFIC8z1F1EmpwYIG2QOhhg7/TwxeTX82OBhegABr9s=
108 changes: 100 additions & 8 deletions pkg/connector/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"sync/atomic"
"time"

"github.com/rs/zerolog"
"go.mau.fi/util/exsync"
"go.mau.fi/whatsmeow"
"go.mau.fi/whatsmeow/store"
waTypes "go.mau.fi/whatsmeow/types"
Expand All @@ -34,6 +36,11 @@ type MetaClient struct {
backfillCollectors map[int64]*BackfillCollector
backfillLock sync.Mutex

stopPeriodicReconnect atomic.Pointer[context.CancelFunc]
lastFullReconnect time.Time
connectWaiter *exsync.Event
e2eeConnectWaiter *exsync.Event

E2EEClient *whatsmeow.Client
WADevice *store.Device
e2eeConnectLock sync.Mutex
Expand All @@ -53,6 +60,9 @@ func (m *MetaConnector) LoadUserLogin(ctx context.Context, login *bridgev2.UserL

incomingTables: make(chan *table.LSTable, 16),
backfillCollectors: make(map[int64]*BackfillCollector),

connectWaiter: exsync.NewEvent(),
e2eeConnectWaiter: exsync.NewEvent(),
}
c.Client.SetEventHandler(c.handleMetaEvent)
login.Client = c
Expand All @@ -64,7 +74,51 @@ var _ bridgev2.NetworkAPI = (*MetaClient)(nil)
func (m *MetaClient) Connect(ctx context.Context) error {
currentUser, initialTable, err := m.Client.LoadMessagesPage()
if err != nil {
return fmt.Errorf("failed to load messages page: %w", err)
if stopPeriodicReconnect := m.stopPeriodicReconnect.Swap(nil); stopPeriodicReconnect != nil {
(*stopPeriodicReconnect)()
}
if errors.Is(err, messagix.ErrTokenInvalidated) {
m.UserLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateBadCredentials,
Error: MetaCookieRemoved,
})
// TODO clear cookies?
} else if errors.Is(err, messagix.ErrChallengeRequired) {
m.UserLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateBadCredentials,
Error: IGChallengeRequired,
})
} else if errors.Is(err, messagix.ErrAccountSuspended) {
m.UserLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateBadCredentials,
Error: IGAccountSuspended,
})
} else if errors.Is(err, messagix.ErrConsentRequired) {
code := IGConsentRequired
if m.LoginMeta.Platform.IsMessenger() {
code = FBConsentRequired
}
m.UserLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateBadCredentials,
Error: code,
})
} else if lsErr := (&types.ErrorResponse{}); errors.As(err, &lsErr) {
stateEvt := status.StateUnknownError
if lsErr.ErrorCode == 1357053 {
stateEvt = status.StateBadCredentials
}
m.UserLogin.BridgeState.Send(status.BridgeState{
StateEvent: stateEvt,
Error: status.BridgeStateErrorCode(fmt.Sprintf("meta-lserror-%d", lsErr.ErrorCode)),
Message: lsErr.Error(),
})
} else {
m.UserLogin.BridgeState.Send(status.BridgeState{
StateEvent: status.StateUnknownError,
Error: MetaConnectError,
})
}
return nil
}
return m.connectWithTable(ctx, initialTable, currentUser)
}
Expand All @@ -91,9 +145,35 @@ func (m *MetaClient) connectWithTable(ctx context.Context, initialTable *table.L
return err
}

go m.periodicReconnect()

return nil
}

func (m *MetaClient) periodicReconnect() {
if m.Main.Config.ForceRefreshIntervalSeconds <= 0 {
return
}
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
if oldCancel := m.stopPeriodicReconnect.Swap(&cancel); oldCancel != nil {
(*oldCancel)()
}
interval := time.Duration(m.Main.Config.ForceRefreshIntervalSeconds) * time.Second
timer := time.NewTimer(interval)
defer timer.Stop()
m.UserLogin.Log.Info().Stringer("interval", interval).Msg("Starting periodic reconnect loop")
for {
select {
case <-timer.C:
m.UserLogin.Log.Info().Msg("Doing periodic reconnect")
m.FullReconnect()
case <-ctx.Done():
return
}
}
}

func (m *MetaClient) connectE2EE() error {
m.e2eeConnectLock.Lock()
defer m.e2eeConnectLock.Unlock()
Expand Down Expand Up @@ -155,9 +235,12 @@ func (m *MetaClient) Disconnect() {
}
m.metaState = status.BridgeState{}
m.waState = status.BridgeState{}
if stopTableLoop := m.stopHandlingTables.Load(); stopTableLoop != nil {
if stopTableLoop := m.stopHandlingTables.Swap(nil); stopTableLoop != nil {
(*stopTableLoop)()
}
if stopPeriodicReconnect := m.stopPeriodicReconnect.Swap(nil); stopPeriodicReconnect != nil {
(*stopPeriodicReconnect)()
}
}

var metaCaps = &bridgev2.NetworkRoomCapabilities{
Expand All @@ -177,8 +260,6 @@ func (m *MetaClient) GetCapabilities(ctx context.Context, portal *bridgev2.Porta
return metaCaps
}

var ErrServerRejectedMessage = bridgev2.WrapErrorInStatus(errors.New("server rejected message")).WithErrorAsMessage().WithSendNotice(true)

func (m *MetaClient) IsLoggedIn() bool {
return m.Client != nil
}
Expand All @@ -188,16 +269,27 @@ func (m *MetaClient) IsThisUser(ctx context.Context, userID networkid.UserID) bo
}

func (m *MetaClient) LogoutRemote(ctx context.Context) {
panic("unimplemented")
m.Disconnect()
err := m.WADevice.Delete()
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to delete device from store")
}
m.resetWADevice()
}

func (m *MetaClient) canReconnect() bool {
//return time.Since(user.lastFullReconnect) > time.Duration(user.bridge.Config.Meta.MinFullReconnectIntervalSeconds)*time.Second
return false
return time.Since(m.lastFullReconnect) > time.Duration(m.Main.Config.MinFullReconnectIntervalSeconds)*time.Second
}

func (m *MetaClient) FullReconnect() {
panic("unimplemented")
ctx := m.UserLogin.Log.WithContext(context.TODO())
m.connectWaiter.Clear()
m.e2eeConnectWaiter.Clear()
m.Disconnect()
err := m.Connect(ctx)
if err != nil {
zerolog.Ctx(ctx).Err(err).Msg("Failed to reconnect")
}
}

func (m *MetaClient) resetWADevice() {
Expand Down
3 changes: 3 additions & 0 deletions pkg/connector/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,9 @@ type Config struct {
DisableXMABackfill bool `yaml:"disable_xma_backfill"`
DisableXMAAlways bool `yaml:"disable_xma_always"`

MinFullReconnectIntervalSeconds int `yaml:"min_full_reconnect_interval_seconds"`
ForceRefreshIntervalSeconds int `yaml:"force_refresh_interval_seconds"`

DisplaynameTemplate string `yaml:"displayname_template"`
displaynameTemplate *template.Template `yaml:"-"`
}
Expand Down
39 changes: 39 additions & 0 deletions pkg/connector/handlematrix.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,13 +30,24 @@ var (
_ bridgev2.ReadReceiptHandlingNetworkAPI = (*MetaClient)(nil)
)

var (
ErrServerRejectedMessage = bridgev2.WrapErrorInStatus(errors.New("server rejected message")).WithErrorAsMessage().WithSendNotice(true)
ErrNotConnected = bridgev2.WrapErrorInStatus(errors.New("not connected")).WithErrorAsMessage().WithSendNotice(true)
)

const ConnectWaitTimeout = 1 * time.Minute

func (m *MetaClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.MatrixMessage) (*bridgev2.MatrixMessageResponse, error) {
log := zerolog.Ctx(ctx)

portalMeta := msg.Portal.Metadata.(*PortalMetadata)

switch portalMeta.ThreadType {
case table.ENCRYPTED_OVER_WA_ONE_TO_ONE, table.ENCRYPTED_OVER_WA_GROUP:
if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) {
return nil, ErrNotConnected
}

waMsg, waMeta, err := m.Main.MsgConv.ToWhatsApp(ctx, msg.Event, msg.Content, msg.Portal, m.E2EEClient, msg.OrigSender != nil, msg.ReplyTo)
if err != nil {
return nil, fmt.Errorf("failed to convert message: %w", err)
Expand All @@ -58,6 +69,10 @@ func (m *MetaClient) HandleMatrixMessage(ctx context.Context, msg *bridgev2.Matr
},
}, nil
default:
if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) {
return nil, ErrNotConnected
}

tasks, otid, err := m.Main.MsgConv.ToMeta(ctx, m.Client, msg.Event, msg.Content, msg.ReplyTo, msg.OrigSender != nil, msg.Portal)
if errors.Is(err, types.ErrPleaseReloadPage) {
// TODO handle properly
Expand Down Expand Up @@ -180,6 +195,9 @@ func wrapRevoke(message *waConsumerApplication.ConsumerApplication_RevokeMessage
func (m *MetaClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2.MatrixReaction) (*database.Reaction, error) {
switch messageID := metaid.ParseMessageID(msg.TargetMessage.ID).(type) {
case metaid.ParsedFBMessageID:
if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) {
return nil, ErrNotConnected
}
resp, err := m.Client.ExecuteTasks(&socket.SendReactionTask{
ThreadKey: metaid.ParseFBPortalID(msg.Portal.ID),
TimestampMs: msg.Event.Timestamp,
Expand All @@ -196,6 +214,9 @@ func (m *MetaClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2.Mat
zerolog.Ctx(ctx).Trace().Any("response", resp).Msg("Meta reaction response")
return &database.Reaction{}, nil
case metaid.ParsedWAMessageID:
if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) {
return nil, ErrNotConnected
}
consumerMsg := wrapReaction(&waConsumerApplication.ConsumerApplication_ReactionMessage{
Key: m.messageIDToWAKey(messageID),
Text: ptr.Ptr(msg.PreHandleResp.Emoji),
Expand All @@ -213,6 +234,9 @@ func (m *MetaClient) HandleMatrixReaction(ctx context.Context, msg *bridgev2.Mat
func (m *MetaClient) HandleMatrixReactionRemove(ctx context.Context, msg *bridgev2.MatrixReactionRemove) error {
switch messageID := metaid.ParseMessageID(msg.TargetReaction.MessageID).(type) {
case metaid.ParsedFBMessageID:
if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) {
return ErrNotConnected
}
resp, err := m.Client.ExecuteTasks(&socket.SendReactionTask{
ThreadKey: metaid.ParseFBPortalID(msg.Portal.ID),
TimestampMs: msg.Event.Timestamp,
Expand All @@ -228,6 +252,9 @@ func (m *MetaClient) HandleMatrixReactionRemove(ctx context.Context, msg *bridge
zerolog.Ctx(ctx).Trace().Any("response", resp).Msg("Meta reaction remove response")
return nil
case metaid.ParsedWAMessageID:
if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) {
return ErrNotConnected
}
consumerMsg := wrapReaction(&waConsumerApplication.ConsumerApplication_ReactionMessage{
Key: m.messageIDToWAKey(messageID),
Text: ptr.Ptr(""),
Expand All @@ -246,6 +273,9 @@ func (m *MetaClient) HandleMatrixEdit(ctx context.Context, edit *bridgev2.Matrix
log := zerolog.Ctx(ctx)
switch messageID := metaid.ParseMessageID(edit.EditTarget.ID).(type) {
case metaid.ParsedFBMessageID:
if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) {
return ErrNotConnected
}
fakeSendTasks, _, err := m.Main.MsgConv.ToMeta(ctx, m.Client, edit.Event, edit.Content, nil, false, edit.Portal)
if err != nil {
return fmt.Errorf("failed to convert message: %w", err)
Expand Down Expand Up @@ -287,6 +317,9 @@ func (m *MetaClient) HandleMatrixEdit(ctx context.Context, edit *bridgev2.Matrix

return nil
case metaid.ParsedWAMessageID:
if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) {
return ErrNotConnected
}
consumerMsg := wrapEdit(&waConsumerApplication.ConsumerApplication_EditMessage{
Key: m.messageIDToWAKey(messageID),
Message: m.Main.MsgConv.TextToWhatsApp(edit.Content),
Expand All @@ -305,11 +338,17 @@ func (m *MetaClient) HandleMatrixMessageRemove(ctx context.Context, msg *bridgev
log := zerolog.Ctx(ctx)
switch messageID := metaid.ParseMessageID(msg.TargetMessage.ID).(type) {
case metaid.ParsedFBMessageID:
if !m.connectWaiter.WaitTimeout(ConnectWaitTimeout) {
return ErrNotConnected
}
resp, err := m.Client.ExecuteTasks(&socket.DeleteMessageTask{MessageId: messageID.ID})
// TODO does the response data need to be checked?
log.Trace().Any("response", resp).Msg("Meta delete response")
return err
case metaid.ParsedWAMessageID:
if !m.e2eeConnectWaiter.WaitTimeout(ConnectWaitTimeout) {
return ErrNotConnected
}
consumerMsg := wrapRevoke(&waConsumerApplication.ConsumerApplication_RevokeMessage{
Key: m.messageIDToWAKey(messageID),
})
Expand Down
8 changes: 5 additions & 3 deletions pkg/connector/handlemeta.go
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ func (m *MetaClient) handleMetaEvent(rawEvt any) {
}
case *messagix.Event_Ready:
log.Debug().Msg("Initial connect to Meta socket completed")
m.connectWaiter.Set()
if tbl := m.initialTable.Swap(nil); tbl != nil {
log.Debug().Msg("Sending cached initial table to handler")
m.incomingTables <- tbl
Expand All @@ -91,6 +92,7 @@ func (m *MetaClient) handleMetaEvent(rawEvt any) {
m.UserLogin.BridgeState.Send(m.metaState)
case *messagix.Event_Reconnected:
log.Debug().Msg("Reconnected to Meta socket")
m.connectWaiter.Set()
m.metaState = status.BridgeState{StateEvent: status.StateConnected}
m.UserLogin.BridgeState.Send(m.metaState)
case *messagix.Event_PermanentError:
Expand Down Expand Up @@ -123,9 +125,9 @@ func (m *MetaClient) handleMetaEvent(rawEvt any) {
}
}
m.UserLogin.BridgeState.Send(m.metaState)
//if user.forceRefreshTimer != nil {
// user.forceRefreshTimer.Stop()
//}
if stopPeriodicReconnect := m.stopPeriodicReconnect.Swap(nil); stopPeriodicReconnect != nil {
(*stopPeriodicReconnect)()
}
default:
log.Warn().Type("event_type", evt).Msg("Unrecognized event type from messagix")
}
Expand Down
17 changes: 13 additions & 4 deletions pkg/connector/handlewhatsapp.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func (m *MetaClient) e2eeEventHandler(rawEvt any) {
})
case *events.Connected:
log.Debug().Msg("Connected to WhatsApp socket")
m.connectWaiter.Set()
m.waState = status.BridgeState{StateEvent: status.StateConnected}
m.UserLogin.BridgeState.Send(m.waState)
case *events.Disconnected:
Expand Down Expand Up @@ -129,14 +130,22 @@ func (m *MetaClient) e2eeEventHandler(rawEvt any) {
}
case *events.ConnectFailure:
if e.Reason == events.ConnectFailureNotFound {
if m.E2EEClient != nil {
m.E2EEClient.Disconnect()
m.WADevice.Delete()
if cli := m.E2EEClient; cli != nil {
cli.Disconnect()
err := m.WADevice.Delete()
if err != nil {
log.Err(err).Msg("Failed to delete WhatsApp device after 415 error")
}
m.resetWADevice()
m.E2EEClient = nil
}
log.Debug().Msg("Reconnecting e2ee client after WhatsApp 415 error")
go m.connectE2EE()
go func() {
err := m.connectE2EE()
if err != nil {
log.Err(err).Msg("Error connecting to e2ee after 415 error")
}
}()
}
}

Expand Down

0 comments on commit 97c08df

Please sign in to comment.