Skip to content

Commit

Permalink
Add queue for incoming tables
Browse files Browse the repository at this point in the history
  • Loading branch information
tulir committed Mar 4, 2024
1 parent d219495 commit b7230c1
Showing 1 changed file with 35 additions and 4 deletions.
39 changes: 35 additions & 4 deletions user.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import (
"fmt"
"net/http"
"net/url"
"runtime/debug"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -159,9 +160,12 @@ func (br *MetaBridge) NewUser(dbUser *database.User) *User {
log: br.ZLog.With().Stringer("user_id", dbUser.MXID).Logger(),

PermissionLevel: br.Config.Bridge.Permissions.Get(dbUser.MXID),

incomingTables: make(chan *table.LSTable, 8),
}
user.Admin = user.PermissionLevel >= bridgeconfig.PermissionLevelAdmin
user.BridgeState = br.NewBridgeStateQueue(user)
go user.handleTablesLoop()
return user
}

Expand Down Expand Up @@ -227,7 +231,8 @@ type User struct {

InboxPagesFetched int

initialTable atomic.Pointer[table.LSTable]
initialTable atomic.Pointer[table.LSTable]
incomingTables chan *table.LSTable

lastFullReconnect time.Time
}
Expand Down Expand Up @@ -630,7 +635,26 @@ func (br *MetaBridge) StartUsers() {
}
}

func (user *User) handleTablesLoop() {
for tbl := range user.incomingTables {
if tbl == nil {
user.log.Debug().Msg("Received nil table, stopping table handling")
return
}
user.handleTable(tbl)
}
}

func (user *User) handleTable(tbl *table.LSTable) {
defer func() {
if err := recover(); err != nil {
user.log.Error().
Str("action", "handle table").
Bytes(zerolog.ErrorStackFieldName, debug.Stack()).
Any(zerolog.ErrorFieldName, err).
Msg("Panic in Meta table handler")
}
}()
log := user.log.With().Str("action", "handle table").Logger()
ctx := log.WithContext(context.TODO())
for _, contact := range tbl.LSDeleteThenInsertContact {
Expand Down Expand Up @@ -1043,14 +1067,21 @@ func (user *User) eventHandler(rawEvt any) {
switch evt := rawEvt.(type) {
case *messagix.Event_PublishResponse:
user.log.Trace().Any("table", &evt.Table).Msg("Got new event")
user.handleTable(evt.Table)
select {
case user.incomingTables <- evt.Table:
default:
user.log.Warn().Msg("Incoming tables channel full, event order not guaranteed")
go func() {
user.incomingTables <- evt.Table
}()
}
case *messagix.Event_Ready:
user.log.Debug().Msg("Initial connect to Meta socket completed")
user.metaState = status.BridgeState{StateEvent: status.StateConnected}
user.BridgeState.Send(user.metaState)
if initTable := user.initialTable.Swap(nil); initTable != nil {
user.log.Debug().Msg("Handling cached initial table")
go user.handleTable(initTable)
user.log.Debug().Msg("Sending cached initial table to handler")
user.incomingTables <- initTable
}
if user.bridge.Config.Meta.Mode.IsMessenger() || user.bridge.Config.Meta.IGE2EE {
go func() {
Expand Down

0 comments on commit b7230c1

Please sign in to comment.