diff --git a/user.go b/user.go index 1daa10d..ecdd7d9 100644 --- a/user.go +++ b/user.go @@ -23,6 +23,7 @@ import ( "fmt" "net/http" "net/url" + "runtime/debug" "strconv" "strings" "sync" @@ -159,9 +160,12 @@ func (br *MetaBridge) NewUser(dbUser *database.User) *User { log: br.ZLog.With().Stringer("user_id", dbUser.MXID).Logger(), PermissionLevel: br.Config.Bridge.Permissions.Get(dbUser.MXID), + + incomingTables: make(chan *table.LSTable, 8), } user.Admin = user.PermissionLevel >= bridgeconfig.PermissionLevelAdmin user.BridgeState = br.NewBridgeStateQueue(user) + go user.handleTablesLoop() return user } @@ -227,7 +231,8 @@ type User struct { InboxPagesFetched int - initialTable atomic.Pointer[table.LSTable] + initialTable atomic.Pointer[table.LSTable] + incomingTables chan *table.LSTable lastFullReconnect time.Time } @@ -630,7 +635,26 @@ func (br *MetaBridge) StartUsers() { } } +func (user *User) handleTablesLoop() { + for tbl := range user.incomingTables { + if tbl == nil { + user.log.Debug().Msg("Received nil table, stopping table handling") + return + } + user.handleTable(tbl) + } +} + func (user *User) handleTable(tbl *table.LSTable) { + defer func() { + if err := recover(); err != nil { + user.log.Error(). + Str("action", "handle table"). + Bytes(zerolog.ErrorStackFieldName, debug.Stack()). + Any(zerolog.ErrorFieldName, err). + Msg("Panic in Meta table handler") + } + }() log := user.log.With().Str("action", "handle table").Logger() ctx := log.WithContext(context.TODO()) for _, contact := range tbl.LSDeleteThenInsertContact { @@ -1043,14 +1067,21 @@ func (user *User) eventHandler(rawEvt any) { switch evt := rawEvt.(type) { case *messagix.Event_PublishResponse: user.log.Trace().Any("table", &evt.Table).Msg("Got new event") - user.handleTable(evt.Table) + select { + case user.incomingTables <- evt.Table: + default: + user.log.Warn().Msg("Incoming tables channel full, event order not guaranteed") + go func() { + user.incomingTables <- evt.Table + }() + } case *messagix.Event_Ready: user.log.Debug().Msg("Initial connect to Meta socket completed") user.metaState = status.BridgeState{StateEvent: status.StateConnected} user.BridgeState.Send(user.metaState) if initTable := user.initialTable.Swap(nil); initTable != nil { - user.log.Debug().Msg("Handling cached initial table") - go user.handleTable(initTable) + user.log.Debug().Msg("Sending cached initial table to handler") + user.incomingTables <- initTable } if user.bridge.Config.Meta.Mode.IsMessenger() || user.bridge.Config.Meta.IGE2EE { go func() {