diff --git a/config/bridge.go b/config/bridge.go index 5f8644d..b97e2d9 100644 --- a/config/bridge.go +++ b/config/bridge.go @@ -59,6 +59,7 @@ type BridgeConfig struct { Backfill struct { Enabled bool `yaml:"enabled"` + InboxFetchPages int `yaml:"inbox_fetch_pages"` HistoryFetchPages int `yaml:"history_fetch_pages"` CatchupFetchPages int `yaml:"catchup_fetch_pages"` Queue struct { diff --git a/config/upgrade.go b/config/upgrade.go index 11007fa..058f8cd 100644 --- a/config/upgrade.go +++ b/config/upgrade.go @@ -97,6 +97,7 @@ func DoUpgrade(helper *up.Helper) { helper.Copy(up.Map, "bridge", "login_shared_secret_map") helper.Copy(up.Str, "bridge", "command_prefix") helper.Copy(up.Bool, "bridge", "backfill", "enabled") + helper.Copy(up.Int, "bridge", "backfill", "inbox_fetch_pages") helper.Copy(up.Int, "bridge", "backfill", "history_fetch_pages") helper.Copy(up.Int, "bridge", "backfill", "catchup_fetch_pages") helper.Copy(up.Int, "bridge", "backfill", "queue", "pages_at_once") diff --git a/database/upgrades/00-latest.sql b/database/upgrades/00-latest.sql index bad4be2..4fa6057 100644 --- a/database/upgrades/00-latest.sql +++ b/database/upgrades/00-latest.sql @@ -45,6 +45,8 @@ CREATE TABLE "user" ( meta_id BIGINT, cookies jsonb, + inbox_fetched BOOLEAN NOT NULL, + management_room TEXT, space_room TEXT, diff --git a/database/upgrades/03-backfill-queue.sql b/database/upgrades/03-backfill-queue.sql index 26d4fb1..f053c03 100644 --- a/database/upgrades/03-backfill-queue.sql +++ b/database/upgrades/03-backfill-queue.sql @@ -33,3 +33,7 @@ CREATE TABLE backfill_task ( CONSTRAINT backfill_task_portal_fkey FOREIGN KEY (portal_id, portal_receiver) REFERENCES portal (thread_id, receiver) ON UPDATE CASCADE ON DELETE CASCADE ); + +ALTER TABLE "user" ADD COLUMN inbox_fetched BOOLEAN NOT NULL DEFAULT false; +-- only: postgres +ALTER TABLE "user" ALTER COLUMN inbox_fetched DROP DEFAULT; diff --git a/database/user.go b/database/user.go index 2dc970b..08bb88c 100644 --- a/database/user.go +++ b/database/user.go @@ -28,11 +28,11 @@ import ( ) const ( - getUserByMXIDQuery = `SELECT mxid, meta_id, cookies, management_room, space_room FROM "user" WHERE mxid=$1` - getUserByMetaIDQuery = `SELECT mxid, meta_id, cookies, management_room, space_room FROM "user" WHERE meta_id=$1` - getAllLoggedInUsersQuery = `SELECT mxid, meta_id, cookies, management_room, space_room FROM "user" WHERE cookies IS NOT NULL` - insertUserQuery = `INSERT INTO "user" (mxid, meta_id, cookies, management_room, space_room) VALUES ($1, $2, $3, $4, $5)` - updateUserQuery = `UPDATE "user" SET meta_id=$2, cookies=$3, management_room=$4, space_room=$5 WHERE mxid=$1` + getUserByMXIDQuery = `SELECT mxid, meta_id, cookies, inbox_fetched, management_room, space_room FROM "user" WHERE mxid=$1` + getUserByMetaIDQuery = `SELECT mxid, meta_id, cookies, inbox_fetched, management_room, space_room FROM "user" WHERE meta_id=$1` + getAllLoggedInUsersQuery = `SELECT mxid, meta_id, cookies, inbox_fetched, management_room, space_room FROM "user" WHERE cookies IS NOT NULL` + insertUserQuery = `INSERT INTO "user" (mxid, meta_id, cookies, inbox_fetched, management_room, space_room) VALUES ($1, $2, $3, $4, $5, $6)` + updateUserQuery = `UPDATE "user" SET meta_id=$2, cookies=$3, inbox_fetched=$4, management_room=$5, space_room=$6 WHERE mxid=$1` ) type UserQuery struct { @@ -45,6 +45,7 @@ type User struct { MXID id.UserID MetaID int64 Cookies cookies.Cookies + InboxFetched bool ManagementRoom id.RoomID SpaceRoom id.RoomID @@ -73,7 +74,7 @@ func (uq *UserQuery) GetAllLoggedIn(ctx context.Context) ([]*User, error) { } func (u *User) sqlVariables() []any { - return []any{u.MXID, dbutil.NumPtr(u.MetaID), dbutil.JSON{Data: u.Cookies}, dbutil.StrPtr(u.ManagementRoom), dbutil.StrPtr(u.SpaceRoom)} + return []any{u.MXID, dbutil.NumPtr(u.MetaID), dbutil.JSON{Data: u.Cookies}, u.InboxFetched, dbutil.StrPtr(u.ManagementRoom), dbutil.StrPtr(u.SpaceRoom)} } func (u *User) Insert(ctx context.Context) error { @@ -94,6 +95,7 @@ func (u *User) Scan(row dbutil.Scannable) (*User, error) { &u.MXID, &metaID, &dbutil.JSON{Data: scannedCookies}, + &u.InboxFetched, &managementRoom, &spaceRoom, ) diff --git a/example-config.yaml b/example-config.yaml index 6efb01d..1200d49 100644 --- a/example-config.yaml +++ b/example-config.yaml @@ -150,8 +150,12 @@ bridge: backfill: # If disabled, old messages will never be bridged. enabled: true - # By default, Meta only sends one old message. If this is set to a something else than 0, the bridge - # will delay handling the one automatically received message and request more messages to backfill. + # By default, Meta sends info about approximately 20 recent threads. If this is set to something else than 0, + # the bridge will request more threads on first login, until it reaches the specified number of pages + # or the end of the inbox. + inbox_fetch_pages: 0 + # By default, Meta only sends one old message per thread. If this is set to a something else than 0, + # the bridge will delay handling the one automatically received message and request more messages to backfill. # One page usually contains 20 messages. This can technically be set to -1 to fetch all messages, # but that will block bridging messages until the entire backfill is completed. history_fetch_pages: 0 diff --git a/user.go b/user.go index 1358604..5153edc 100644 --- a/user.go +++ b/user.go @@ -175,6 +175,8 @@ type User struct { spaceCreateLock sync.Mutex stopBackfillTask atomic.Pointer[context.CancelFunc] + + InboxPagesFetched int } var ( @@ -546,6 +548,46 @@ func (user *User) handleTable(tbl *table.LSTable) { handlePortalEvents(user, tbl.LSDeleteThenInsertMessage) handlePortalEvents(user, tbl.LSUpsertReaction) handlePortalEvents(user, tbl.LSDeleteReaction) + user.requestMoreInbox(ctx, tbl.LSUpsertInboxThreadsRange) +} + +func (user *User) requestMoreInbox(ctx context.Context, itrs []*table.LSUpsertInboxThreadsRange) { + maxInboxPages := user.bridge.Config.Bridge.Backfill.InboxFetchPages + if len(itrs) == 0 || user.InboxFetched || maxInboxPages == 0 { + return + } + log := zerolog.Ctx(ctx) + itr := itrs[0] + user.InboxPagesFetched++ + reachedPageLimit := maxInboxPages > 0 && user.InboxPagesFetched > maxInboxPages + logEvt := log.Debug(). + Int("fetched_pages", user.InboxPagesFetched). + Bool("has_more_before", itr.HasMoreBefore). + Bool("reached_page_limit", reachedPageLimit). + Int64("min_thread_key", itr.MinThreadKey). + Int64("min_last_activity_timestamp_ms", itr.MinLastActivityTimestampMs) + if !itr.HasMoreBefore || reachedPageLimit { + logEvt.Msg("Finished fetching threads") + user.InboxFetched = true + err := user.Update(ctx) + if err != nil { + log.Err(err).Msg("Failed to save user after marking inbox as fetched") + } + } else { + logEvt.Msg("Requesting more threads") + resp, err := user.Client.ExecuteTasks(&socket.FetchThreadsTask{ + ReferenceThreadKey: itr.MinThreadKey, + ReferenceActivityTimestamp: itr.MinLastActivityTimestampMs, + Cursor: user.Client.SyncManager.GetCursor(1), + SyncGroup: 1, + }) + log.Trace().Any("resp", resp).Msg("Fetch threads response data") + if err != nil { + log.Err(err).Msg("Failed to fetch more threads") + } else { + log.Debug().Msg("Sent more threads request") + } + } } type ThreadKeyable interface {