From ba730e933dda0294d625ead2559465b607a4b760 Mon Sep 17 00:00:00 2001 From: Tulir Asokan Date: Thu, 25 Jan 2024 14:56:54 +0200 Subject: [PATCH] Set completed at for backfill tasks properly and fix other things --- backfill.go | 9 ++++++++- database/backfilltask.go | 2 +- 2 files changed, 9 insertions(+), 2 deletions(-) diff --git a/backfill.go b/backfill.go index dbfdc97..3aceda5 100644 --- a/backfill.go +++ b/backfill.go @@ -61,9 +61,11 @@ func (user *User) handleBackfillTask(ctx context.Context, task *database.Backfil log.Debug().Any("task", task).Msg("Got backfill task") portal := user.bridge.GetExistingPortalByThreadID(task.Key) task.DispatchedAt = time.Now() + task.CompletedAt = time.Time{} if !portal.MoreToBackfill { log.Debug().Int64("portal_id", task.Key.ThreadID).Msg("Nothing more to backfill in portal") task.Finished = true + task.CompletedAt = time.Now() if err := task.Upsert(ctx); err != nil { log.Err(err).Msg("Failed to save backfill task") } @@ -100,6 +102,7 @@ func (user *User) handleBackfillTask(ctx context.Context, task *database.Backfil }, Source: user.MXID, MaxPages: user.bridge.Config.Bridge.Backfill.Queue.PagesAtOnce, + Forward: false, Task: task, Done: doneCallback, } @@ -111,6 +114,7 @@ func (user *User) handleBackfillTask(ctx context.Context, task *database.Backfil if !portal.MoreToBackfill { task.Finished = true } + task.CompletedAt = time.Now() if err := task.Upsert(ctx); err != nil { log.Err(err).Msg("Failed to save backfill task") } @@ -425,6 +429,9 @@ func (portal *Portal) handleMessageBatch(ctx context.Context, source *User, upse portal.convertAndSendBackfill(ctx, source, upsert.Messages, upsert.MarkRead, forward) }() } else { + if doneCallback != nil { + defer doneCallback() + } portal.convertAndSendBackfill(ctx, source, upsert.Messages, upsert.MarkRead, forward) queueConfig := portal.bridge.Config.Bridge.Backfill.Queue if lastMessage == nil && queueConfig.MaxPages != 0 && portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending) { @@ -462,7 +469,7 @@ func (portal *Portal) convertAndSendBackfill(ctx context.Context, source *User, } if len(converted.Parts) == 0 { log.Warn().Str("message_id", msg.MessageId).Msg("Message was empty after conversion") - return + continue } var reactionsToSendSeparately []*table.LSUpsertReaction sendReactionsInBatch := portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending) diff --git a/database/backfilltask.go b/database/backfilltask.go index f9f268d..f13579f 100644 --- a/database/backfilltask.go +++ b/database/backfilltask.go @@ -82,7 +82,7 @@ func (btq *BackfillTaskQuery) NewWithValues(portalKey PortalKey, userID id.UserI } func (btq *BackfillTaskQuery) GetNext(ctx context.Context, userID id.UserID) (*BackfillTask, error) { - return btq.QueryOne(ctx, getNextBackfillTask, userID, time.Now().UnixMilli(), time.Now().Add(-15*time.Minute).UnixMilli()) + return btq.QueryOne(ctx, getNextBackfillTask, userID, time.Now().UnixMilli(), time.Now().Add(-1*time.Hour).UnixMilli()) } func (task *BackfillTask) Scan(row dbutil.Scannable) (*BackfillTask, error) {