Skip to content

Commit

Permalink
Set completed at for backfill tasks properly and fix other things
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Jan 25, 2024
1 parent 8456cb4 commit ba730e9
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 2 deletions.
9 changes: 8 additions & 1 deletion backfill.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,9 +61,11 @@ func (user *User) handleBackfillTask(ctx context.Context, task *database.Backfil
log.Debug().Any("task", task).Msg("Got backfill task")
portal := user.bridge.GetExistingPortalByThreadID(task.Key)
task.DispatchedAt = time.Now()
task.CompletedAt = time.Time{}
if !portal.MoreToBackfill {
log.Debug().Int64("portal_id", task.Key.ThreadID).Msg("Nothing more to backfill in portal")
task.Finished = true
task.CompletedAt = time.Now()
if err := task.Upsert(ctx); err != nil {
log.Err(err).Msg("Failed to save backfill task")
}
Expand Down Expand Up @@ -100,6 +102,7 @@ func (user *User) handleBackfillTask(ctx context.Context, task *database.Backfil
},
Source: user.MXID,
MaxPages: user.bridge.Config.Bridge.Backfill.Queue.PagesAtOnce,
Forward: false,
Task: task,
Done: doneCallback,
}
Expand All @@ -111,6 +114,7 @@ func (user *User) handleBackfillTask(ctx context.Context, task *database.Backfil
if !portal.MoreToBackfill {
task.Finished = true
}
task.CompletedAt = time.Now()
if err := task.Upsert(ctx); err != nil {
log.Err(err).Msg("Failed to save backfill task")
}
Expand Down Expand Up @@ -425,6 +429,9 @@ func (portal *Portal) handleMessageBatch(ctx context.Context, source *User, upse
portal.convertAndSendBackfill(ctx, source, upsert.Messages, upsert.MarkRead, forward)
}()
} else {
if doneCallback != nil {
defer doneCallback()
}
portal.convertAndSendBackfill(ctx, source, upsert.Messages, upsert.MarkRead, forward)
queueConfig := portal.bridge.Config.Bridge.Backfill.Queue
if lastMessage == nil && queueConfig.MaxPages != 0 && portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending) {
Expand Down Expand Up @@ -462,7 +469,7 @@ func (portal *Portal) convertAndSendBackfill(ctx context.Context, source *User,
}
if len(converted.Parts) == 0 {
log.Warn().Str("message_id", msg.MessageId).Msg("Message was empty after conversion")
return
continue
}
var reactionsToSendSeparately []*table.LSUpsertReaction
sendReactionsInBatch := portal.bridge.SpecVersions.Supports(mautrix.BeeperFeatureBatchSending)
Expand Down
2 changes: 1 addition & 1 deletion database/backfilltask.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,7 @@ func (btq *BackfillTaskQuery) NewWithValues(portalKey PortalKey, userID id.UserI
}

func (btq *BackfillTaskQuery) GetNext(ctx context.Context, userID id.UserID) (*BackfillTask, error) {
return btq.QueryOne(ctx, getNextBackfillTask, userID, time.Now().UnixMilli(), time.Now().Add(-15*time.Minute).UnixMilli())
return btq.QueryOne(ctx, getNextBackfillTask, userID, time.Now().UnixMilli(), time.Now().Add(-1*time.Hour).UnixMilli())
}

func (task *BackfillTask) Scan(row dbutil.Scannable) (*BackfillTask, error) {
Expand Down

0 comments on commit ba730e9

Please sign in to comment.