From 39e46868fb89149b6405620db0cd03a634860f20 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Mon, 22 Jan 2024 15:02:33 +0200 Subject: [PATCH] Send create thread request when encountering unknown thread --- portal.go | 3 +++ user.go | 57 ++++++++++++++++++++++++++++++++++++++----------------- 2 files changed, 43 insertions(+), 17 deletions(-) diff --git a/portal.go b/portal.go index 223a6b8..109430d 100644 --- a/portal.go +++ b/portal.go @@ -23,6 +23,7 @@ import ( "reflect" "strconv" "sync" + "sync/atomic" "time" "github.com/rs/zerolog" @@ -198,6 +199,8 @@ type Portal struct { pendingMessages map[int64]id.EventID pendingMessagesLock sync.Mutex + fetchAttempted atomic.Bool + relayUser *User } diff --git a/user.go b/user.go index 6c5d274..f0ead2d 100644 --- a/user.go +++ b/user.go @@ -37,6 +37,7 @@ import ( "go.mau.fi/mautrix-meta/database" "go.mau.fi/mautrix-meta/messagix" "go.mau.fi/mautrix-meta/messagix/cookies" + "go.mau.fi/mautrix-meta/messagix/socket" "go.mau.fi/mautrix-meta/messagix/table" "go.mau.fi/mautrix-meta/messagix/types" ) @@ -449,12 +450,12 @@ func (br *MetaBridge) StartUsers() { } } -func (user *User) handleTable(table *table.LSTable) { +func (user *User) handleTable(tbl *table.LSTable) { ctx := user.log.With().Str("action", "handle table").Logger().WithContext(context.TODO()) - for _, contact := range table.LSVerifyContactRowExists { + for _, contact := range tbl.LSVerifyContactRowExists { user.bridge.GetPuppetByID(contact.ContactId).UpdateInfo(ctx, contact) } - for _, thread := range table.LSDeleteThenInsertThread { + for _, thread := range tbl.LSDeleteThenInsertThread { portal := user.GetPortalByThreadID(thread.ThreadKey, thread.ThreadType) portal.UpdateInfo(ctx, thread) if portal.MXID == "" { @@ -467,7 +468,7 @@ func (user *User) handleTable(table *table.LSTable) { go portal.addToPersonalSpace(portal.log.WithContext(context.TODO()), user) } } - for _, participant := range table.LSAddParticipantIdToGroupThread { + for _, participant := range tbl.LSAddParticipantIdToGroupThread { portal := user.GetExistingPortalByThreadID(participant.ThreadKey) if portal != nil && portal.MXID != "" && !portal.IsPrivateChat() { puppet := user.bridge.GetPuppetByID(participant.ContactId) @@ -480,7 +481,7 @@ func (user *User) handleTable(table *table.LSTable) { } } } - for _, participant := range table.LSRemoveParticipantFromThread { + for _, participant := range tbl.LSRemoveParticipantFromThread { portal := user.GetExistingPortalByThreadID(participant.ThreadKey) if portal != nil && portal.MXID != "" { puppet := user.bridge.GetPuppetByID(participant.ParticipantId) @@ -493,27 +494,49 @@ func (user *User) handleTable(table *table.LSTable) { } } } - for _, thread := range table.LSVerifyThreadExists { + for _, thread := range tbl.LSVerifyThreadExists { portal := user.GetPortalByThreadID(thread.ThreadKey, thread.ThreadType) // TODO if there's some way to fetch thread info, the portal could be created here if portal.MXID != "" { portal.ensureUserInvited(ctx, user) go portal.addToPersonalSpace(ctx, user) + } else if !portal.fetchAttempted.Swap(true) { + user.log.Debug().Int64("thread_id", thread.ThreadKey).Msg("Sending create thread request for unknown thread in verifyThreadExists") + go func(thread *table.LSVerifyThreadExists) { + resp, err := user.Client.ExecuteTasks([]socket.Task{ + &socket.CreateThreadTask{ + ThreadFBID: thread.ThreadKey, + ForceUpsert: 0, + UseOpenMessengerTransport: 0, + SyncGroup: 1, + MetadataOnly: 0, + PreviewOnly: 0, + }, + }) + if err != nil { + user.log.Err(err).Int64("thread_id", thread.ThreadKey).Msg("Failed to execute create thread task for verifyThreadExists of unknown thread") + } else { + user.log.Debug().Int64("thread_id", thread.ThreadKey).Msg("Sent create thread request for unknown thread in verifyThreadExists") + user.log.Trace().Any("resp_data", resp).Int64("thread_id", thread.ThreadKey).Msg("Create thread response") + } + }(thread) + } else { + user.log.Warn().Int64("thread_id", thread.ThreadKey).Msg("Portal doesn't exist in verifyThreadExists, but fetch was already attempted") } } - handlePortalEvents(user, table.WrapMessages()) - for _, msg := range table.LSEditMessage { + handlePortalEvents(user, tbl.WrapMessages()) + for _, msg := range tbl.LSEditMessage { user.handleEditEvent(ctx, msg) } - handlePortalEvents(user, table.LSSyncUpdateThreadName) - handlePortalEvents(user, table.LSSetThreadImageURL) - handlePortalEvents(user, table.LSUpdateReadReceipt) - handlePortalEvents(user, table.LSMarkThreadRead) - handlePortalEvents(user, table.LSUpdateTypingIndicator) - handlePortalEvents(user, table.LSDeleteMessage) - handlePortalEvents(user, table.LSDeleteThenInsertMessage) - handlePortalEvents(user, table.LSUpsertReaction) - handlePortalEvents(user, table.LSDeleteReaction) + handlePortalEvents(user, tbl.LSSyncUpdateThreadName) + handlePortalEvents(user, tbl.LSSetThreadImageURL) + handlePortalEvents(user, tbl.LSUpdateReadReceipt) + handlePortalEvents(user, tbl.LSMarkThreadRead) + handlePortalEvents(user, tbl.LSUpdateTypingIndicator) + handlePortalEvents(user, tbl.LSDeleteMessage) + handlePortalEvents(user, tbl.LSDeleteThenInsertMessage) + handlePortalEvents(user, tbl.LSUpsertReaction) + handlePortalEvents(user, tbl.LSDeleteReaction) } type ThreadKeyable interface {