Skip to content

Commit

Permalink
Add option to fetch more pages of inbox at login
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Jan 25, 2024
1 parent ba730e9 commit 9fadcd7
Show file tree
Hide file tree
Showing 7 changed files with 64 additions and 8 deletions.
1 change: 1 addition & 0 deletions config/bridge.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@ type BridgeConfig struct {

Backfill struct {
Enabled bool `yaml:"enabled"`
InboxFetchPages int `yaml:"inbox_fetch_pages"`
HistoryFetchPages int `yaml:"history_fetch_pages"`
CatchupFetchPages int `yaml:"catchup_fetch_pages"`
Queue struct {
Expand Down
1 change: 1 addition & 0 deletions config/upgrade.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,7 @@ func DoUpgrade(helper *up.Helper) {
helper.Copy(up.Map, "bridge", "login_shared_secret_map")
helper.Copy(up.Str, "bridge", "command_prefix")
helper.Copy(up.Bool, "bridge", "backfill", "enabled")
helper.Copy(up.Int, "bridge", "backfill", "inbox_fetch_pages")
helper.Copy(up.Int, "bridge", "backfill", "history_fetch_pages")
helper.Copy(up.Int, "bridge", "backfill", "catchup_fetch_pages")
helper.Copy(up.Int, "bridge", "backfill", "queue", "pages_at_once")
Expand Down
2 changes: 2 additions & 0 deletions database/upgrades/00-latest.sql
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ CREATE TABLE "user" (
meta_id BIGINT,
cookies jsonb,

inbox_fetched BOOLEAN NOT NULL,

management_room TEXT,
space_room TEXT,

Expand Down
4 changes: 4 additions & 0 deletions database/upgrades/03-backfill-queue.sql
Original file line number Diff line number Diff line change
Expand Up @@ -33,3 +33,7 @@ CREATE TABLE backfill_task (
CONSTRAINT backfill_task_portal_fkey FOREIGN KEY (portal_id, portal_receiver)
REFERENCES portal (thread_id, receiver) ON UPDATE CASCADE ON DELETE CASCADE
);

ALTER TABLE "user" ADD COLUMN inbox_fetched BOOLEAN NOT NULL DEFAULT false;
-- only: postgres
ALTER TABLE "user" ALTER COLUMN inbox_fetched DROP DEFAULT;
14 changes: 8 additions & 6 deletions database/user.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,11 +28,11 @@ import (
)

const (
getUserByMXIDQuery = `SELECT mxid, meta_id, cookies, management_room, space_room FROM "user" WHERE mxid=$1`
getUserByMetaIDQuery = `SELECT mxid, meta_id, cookies, management_room, space_room FROM "user" WHERE meta_id=$1`
getAllLoggedInUsersQuery = `SELECT mxid, meta_id, cookies, management_room, space_room FROM "user" WHERE cookies IS NOT NULL`
insertUserQuery = `INSERT INTO "user" (mxid, meta_id, cookies, management_room, space_room) VALUES ($1, $2, $3, $4, $5)`
updateUserQuery = `UPDATE "user" SET meta_id=$2, cookies=$3, management_room=$4, space_room=$5 WHERE mxid=$1`
getUserByMXIDQuery = `SELECT mxid, meta_id, cookies, inbox_fetched, management_room, space_room FROM "user" WHERE mxid=$1`
getUserByMetaIDQuery = `SELECT mxid, meta_id, cookies, inbox_fetched, management_room, space_room FROM "user" WHERE meta_id=$1`
getAllLoggedInUsersQuery = `SELECT mxid, meta_id, cookies, inbox_fetched, management_room, space_room FROM "user" WHERE cookies IS NOT NULL`
insertUserQuery = `INSERT INTO "user" (mxid, meta_id, cookies, inbox_fetched, management_room, space_room) VALUES ($1, $2, $3, $4, $5, $6)`
updateUserQuery = `UPDATE "user" SET meta_id=$2, cookies=$3, inbox_fetched=$4, management_room=$5, space_room=$6 WHERE mxid=$1`
)

type UserQuery struct {
Expand All @@ -45,6 +45,7 @@ type User struct {
MXID id.UserID
MetaID int64
Cookies cookies.Cookies
InboxFetched bool
ManagementRoom id.RoomID
SpaceRoom id.RoomID

Expand Down Expand Up @@ -73,7 +74,7 @@ func (uq *UserQuery) GetAllLoggedIn(ctx context.Context) ([]*User, error) {
}

func (u *User) sqlVariables() []any {
return []any{u.MXID, dbutil.NumPtr(u.MetaID), dbutil.JSON{Data: u.Cookies}, dbutil.StrPtr(u.ManagementRoom), dbutil.StrPtr(u.SpaceRoom)}
return []any{u.MXID, dbutil.NumPtr(u.MetaID), dbutil.JSON{Data: u.Cookies}, u.InboxFetched, dbutil.StrPtr(u.ManagementRoom), dbutil.StrPtr(u.SpaceRoom)}
}

func (u *User) Insert(ctx context.Context) error {
Expand All @@ -94,6 +95,7 @@ func (u *User) Scan(row dbutil.Scannable) (*User, error) {
&u.MXID,
&metaID,
&dbutil.JSON{Data: scannedCookies},
&u.InboxFetched,
&managementRoom,
&spaceRoom,
)
Expand Down
8 changes: 6 additions & 2 deletions example-config.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -150,8 +150,12 @@ bridge:
backfill:
# If disabled, old messages will never be bridged.
enabled: true
# By default, Meta only sends one old message. If this is set to a something else than 0, the bridge
# will delay handling the one automatically received message and request more messages to backfill.
# By default, Meta sends info about approximately 20 recent threads. If this is set to something else than 0,
# the bridge will request more threads on first login, until it reaches the specified number of pages
# or the end of the inbox.
inbox_fetch_pages: 0
# By default, Meta only sends one old message per thread. If this is set to a something else than 0,
# the bridge will delay handling the one automatically received message and request more messages to backfill.
# One page usually contains 20 messages. This can technically be set to -1 to fetch all messages,
# but that will block bridging messages until the entire backfill is completed.
history_fetch_pages: 0
Expand Down
42 changes: 42 additions & 0 deletions user.go
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,8 @@ type User struct {
spaceCreateLock sync.Mutex

stopBackfillTask atomic.Pointer[context.CancelFunc]

InboxPagesFetched int
}

var (
Expand Down Expand Up @@ -546,6 +548,46 @@ func (user *User) handleTable(tbl *table.LSTable) {
handlePortalEvents(user, tbl.LSDeleteThenInsertMessage)
handlePortalEvents(user, tbl.LSUpsertReaction)
handlePortalEvents(user, tbl.LSDeleteReaction)
user.requestMoreInbox(ctx, tbl.LSUpsertInboxThreadsRange)
}

func (user *User) requestMoreInbox(ctx context.Context, itrs []*table.LSUpsertInboxThreadsRange) {
maxInboxPages := user.bridge.Config.Bridge.Backfill.InboxFetchPages
if len(itrs) == 0 || user.InboxFetched || maxInboxPages == 0 {
return
}
log := zerolog.Ctx(ctx)
itr := itrs[0]
user.InboxPagesFetched++
reachedPageLimit := maxInboxPages > 0 && user.InboxPagesFetched > maxInboxPages
logEvt := log.Debug().
Int("fetched_pages", user.InboxPagesFetched).
Bool("has_more_before", itr.HasMoreBefore).
Bool("reached_page_limit", reachedPageLimit).
Int64("min_thread_key", itr.MinThreadKey).
Int64("min_last_activity_timestamp_ms", itr.MinLastActivityTimestampMs)
if !itr.HasMoreBefore || reachedPageLimit {
logEvt.Msg("Finished fetching threads")
user.InboxFetched = true
err := user.Update(ctx)
if err != nil {
log.Err(err).Msg("Failed to save user after marking inbox as fetched")
}
} else {
logEvt.Msg("Requesting more threads")
resp, err := user.Client.ExecuteTasks(&socket.FetchThreadsTask{
ReferenceThreadKey: itr.MinThreadKey,
ReferenceActivityTimestamp: itr.MinLastActivityTimestampMs,
Cursor: user.Client.SyncManager.GetCursor(1),
SyncGroup: 1,
})
log.Trace().Any("resp", resp).Msg("Fetch threads response data")
if err != nil {
log.Err(err).Msg("Failed to fetch more threads")
} else {
log.Debug().Msg("Sent more threads request")
}
}
}

type ThreadKeyable interface {
Expand Down

0 comments on commit 9fadcd7

Please sign in to comment.