From b7dbd7a55c1d3fb17b6b310bb46a075dd70e5e7c Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 12 Feb 2025 11:32:12 +0200 Subject: [PATCH] more progress Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vreplication/vplayer_parallel_producer.go | 126 +++++++++--- .../vreplication/vplayer_parallel_worker.go | 187 +++++++++++++----- 2 files changed, 235 insertions(+), 78 deletions(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go index e5e0dcd5c3c..689491b3b1c 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go @@ -34,22 +34,22 @@ import ( ) const ( - countWorkers = 1 + countWorkers = 4 ) type parallelProducer struct { vp *vplayer - workers []*parallelWorker - lastWorker int + workers []*parallelWorker dbClient *vdbClient posReached atomic.Bool workerErrors chan error sequenceToWorkersMap map[int64]int // sequence number => worker index - completedSequenceNumbers chan []int64 + completedSequenceNumbers chan map[int64]bool commitWorkerEventSequence atomic.Int64 + assignSequence int64 numCommits atomic.Int64 // temporary. TODO: remove currentConcurrency atomic.Int64 // temporary. TODO: remove @@ -59,10 +59,19 @@ type parallelProducer struct { func newParallelProducer(ctx context.Context, dbClientGen dbClientGenerator, vp *vplayer) (*parallelProducer, error) { p := ¶llelProducer{ vp: vp, + dbClient: vp.vr.dbClient, workers: make([]*parallelWorker, countWorkers), workerErrors: make(chan error, countWorkers), - dbClient: vp.vr.dbClient, - completedSequenceNumbers: make(chan []int64, countWorkers), + sequenceToWorkersMap: make(map[int64]int), + completedSequenceNumbers: make(chan map[int64]bool, countWorkers), + } + { + // TODO(shlomi): just use the dbClient from vp.vr. + dbClient, err := dbClientGen() + if err != nil { + return nil, err + } + p.dbClient = newVDBClient(dbClient, vp.vr.stats, 0) } for i := range p.workers { w := newParallelWorker(i, p, vp.vr.workflowConfig.RelayLogMaxItems) @@ -101,16 +110,34 @@ func (p *parallelProducer) commitWorkerEvent() *binlogdatapb.VEvent { } func (p *parallelProducer) assignTransactionToWorker(sequenceNumber int64, lastCommitted int64) (workerIndex int) { + if workerIndex, ok := p.sequenceToWorkersMap[sequenceNumber]; ok { + // Pin for the duration of the transaction + // log.Errorf("========== QQQ assignTransactionToWorker same trx sequenceNumber=%v, lastCommitted=%v, workerIndex=%v", sequenceNumber, lastCommitted, workerIndex) + return workerIndex + } if workerIndex, ok := p.sequenceToWorkersMap[lastCommitted]; ok { // Assign transaction to the same worker who owns the last committed transaction + // log.Errorf("========== QQQ assignTransactionToWorker dependent trx sequenceNumber=%v, lastCommitted=%v, workerIndex=%v", sequenceNumber, lastCommitted, workerIndex) p.sequenceToWorkersMap[sequenceNumber] = workerIndex return workerIndex } - p.lastWorker = (p.lastWorker + 1) % countWorkers - return p.lastWorker + // workerIndex = int((p.assignSequence / 10) % countWorkers) + workerIndex = int(p.assignSequence % countWorkers) + // log.Errorf("========== QQQ assignTransactionToWorker free trx p.sequence=%v, sequenceNumber=%v, lastCommitted=%v, workerIndex=%v", p.assignSequence, sequenceNumber, lastCommitted, workerIndex) + p.assignSequence++ + p.sequenceToWorkersMap[sequenceNumber] = workerIndex + return workerIndex } func (p *parallelProducer) commitAll(ctx context.Context, except *parallelWorker) error { + // TODO(shlomi) remove + { + exceptString := "" + if except != nil { + exceptString = fmt.Sprintf(" except %v", except.index) + } + log.Errorf("========== QQQ commitAll%v", exceptString) + } var eg errgroup.Group for _, w := range p.workers { w := w @@ -123,41 +150,87 @@ func (p *parallelProducer) commitAll(ctx context.Context, except *parallelWorker } return eg.Wait() } +func (p *parallelProducer) aggregateWorkersPos(ctx context.Context) (aggregatedWorkersPos replication.Position, combinedPos replication.Position, err error) { + // TODO(shlomi): this query can be computed once in the lifetime of the producer + query := binlogplayer.ReadVReplicationWorkersGTIDs(p.vp.vr.id) + qr, err := p.dbClient.ExecuteFetch(query, -1) + if err != nil { + log.Errorf("Error fetching vreplication worker positions: %v. isclosed? %v", err, p.dbClient.IsClosed()) + return aggregatedWorkersPos, combinedPos, err + } + var lastEventTimestamp int64 + for _, row := range qr.Rows { + current, err := binlogplayer.DecodeMySQL56Position(row[0].ToString()) + if err != nil { + return aggregatedWorkersPos, combinedPos, err + } + eventTimestamp, err := row[1].ToInt64() + if err != nil { + return aggregatedWorkersPos, combinedPos, err + } + lastEventTimestamp = max(lastEventTimestamp, eventTimestamp) + aggregatedWorkersPos = replication.AppendGTIDSet(aggregatedWorkersPos, current.GTIDSet) + } + combinedPos = replication.AppendGTIDSet(aggregatedWorkersPos, p.vp.startPos.GTIDSet) + p.vp.pos = combinedPos // TODO(shlomi) potential for race condition -func (p *parallelProducer) watchPos(ctx context.Context) error { - if p.vp.stopPos.IsZero() { - return nil + log.Errorf("========== QQQ aggregateWorkersPos updatePos ts=%v, pos=%v", lastEventTimestamp, combinedPos) + if _, err := p.vp.updatePos(ctx, lastEventTimestamp); err != nil { + return aggregatedWorkersPos, combinedPos, err } - query := binlogplayer.ReadVReplicationWorkersGTIDs(p.vp.vr.id) + if err := p.vp.commit(); err != nil { + return aggregatedWorkersPos, combinedPos, err + } + return aggregatedWorkersPos, combinedPos, nil +} +func (p *parallelProducer) watchPos(ctx context.Context) error { + // if p.vp.stopPos.IsZero() { + // return nil + // } ticker := time.NewTicker(1 * time.Second) defer ticker.Stop() + var lastCombinedPos replication.Position for { select { case <-ctx.Done(): return ctx.Err() case <-ticker.C: - qr, err := p.dbClient.ExecuteFetch(query, -1) + log.Errorf("========== QQQ watchPos ticker") + aggregatedWorkersPos, combinedPos, err := p.aggregateWorkersPos(ctx) if err != nil { - log.Errorf("Error fetching vreplication worker positions: %v", err) + log.Errorf("Error aggregating vreplication worker positions: %v. isclosed? %v", err, p.dbClient.IsClosed()) + continue } - combinedPos := p.vp.startPos - for _, row := range qr.Rows { - current, err := binlogplayer.DecodeMySQL56Position(row[0].ToString()) - if err != nil { + log.Errorf("========== QQQ watchPos aggregatedWorkersPos: %v, combinedPos: %v, stop: %v", aggregatedWorkersPos, combinedPos, p.vp.stopPos) + + // Write back this combined pos to all workers, so that we condense their otherwise sparse GTID sets. + log.Errorf("========== QQQ watchPos pushing combined pos %v", combinedPos) + for _, w := range p.workers { + log.Errorf("========== QQQ watchPos pushing combined pos worker %v", w.index) + w.aggregatedPosChan <- aggregatedWorkersPos + } + log.Errorf("========== QQQ watchPos pushed combined pos") + if combinedPos.GTIDSet.Equal(lastCombinedPos.GTIDSet) { + // no progress has been made + log.Errorf("========== QQQ watchPos no progress!! committing all") + if err := p.commitAll(ctx, nil); err != nil { return err } - combinedPos = replication.AppendGTIDSet(combinedPos, current.GTIDSet) + log.Errorf("========== QQQ watchPos no progress!! committed all") + } else { + // progress has been made + lastCombinedPos = combinedPos } - - if combinedPos.AtLeast(p.vp.stopPos) { + if !p.vp.stopPos.IsZero() && combinedPos.AtLeast(p.vp.stopPos) { if err := p.commitAll(ctx, nil); err != nil { return err } p.posReached.Store(true) return io.EOF } + log.Errorf("========== QQQ watchPos end loop cycle") } } } @@ -198,7 +271,8 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap case <-ctx.Done(): return ctx.Err() case sequenceNumbers := <-p.completedSequenceNumbers: - for _, sequenceNumber := range sequenceNumbers { + // log.Errorf("========== QQQ process completedSequenceNumbers=%v", sequenceNumbers) + for sequenceNumber := range sequenceNumbers { delete(p.sequenceToWorkersMap, sequenceNumber) } // case t := <-ticker.C: @@ -208,6 +282,7 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap workerIndex := p.assignTransactionToWorker(event.SequenceNumber, event.CommitParent) worker := p.workers[workerIndex] // We know the worker has enough capacity and thus the following will not block. + // log.Errorf("========== QQQ process: assigning event.Type %v seq=%v, parent=%v, to worker.Index %v at index %v", event.Type, event.SequenceNumber, event.CommitParent, worker.index, workerIndex) select { case <-ctx.Done(): return ctx.Err() @@ -218,8 +293,11 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap } func (p *parallelProducer) applyEvents(ctx context.Context, relay *relayLog) error { - ctx, cancel := context.WithCancel(ctx) - defer cancel() + // TODO(shlomi): do not cancel context, because if we do, that can terminate async queries still running. + // ctx, cancel := context.WithCancel(ctx) + // defer cancel() + + log.Errorf("========== QQQ applyEvents defer") go func() { if err := p.watchPos(ctx); err != nil { diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_worker.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_worker.go index d54c96c788d..d4e593fbdc8 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_worker.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_worker.go @@ -18,6 +18,7 @@ import ( "errors" "fmt" "io" + "runtime/debug" "strconv" "strings" "sync" @@ -34,20 +35,22 @@ import ( var ( errRetryEvent = errors.New("retry event") + maxEvents = 50 ) type parallelWorker struct { - index int - dbClient *vdbClient - queryFunc func(ctx context.Context, sql string) (*sqltypes.Result, error) - vp *vplayer - lastPos replication.Position + index int + dbClient *vdbClient + queryFunc func(ctx context.Context, sql string) (*sqltypes.Result, error) + vp *vplayer + lastPos replication.Position + aggregatedPosChan chan replication.Position producer *parallelProducer events chan *binlogdatapb.VEvent stats *VrLogStats - sequenceNumbers []int64 + sequenceNumbers map[int64]bool commitSubscribers map[int64]chan error // subscribing to commit events commitSubscribersMu sync.Mutex @@ -61,13 +64,15 @@ type parallelWorker struct { } func newParallelWorker(index int, producer *parallelProducer, capacity int) *parallelWorker { - log.Infof("======= QQQ newParallelWorker index: %v", index) + log.Errorf("======= QQQ newParallelWorker index: %v", index) return ¶llelWorker{ index: index, producer: producer, events: make(chan *binlogdatapb.VEvent, capacity), - sequenceNumbers: make([]int64, 0, capacity), + aggregatedPosChan: make(chan replication.Position, 1), + sequenceNumbers: make(map[int64]bool, maxEvents), commitSubscribers: make(map[int64]chan error), + vp: producer.vp, } } @@ -81,10 +86,23 @@ func (w *parallelWorker) subscribeCommitWorkerEvent(sequenceNumber int64) chan e } // updatePos should get called at a minimum of vreplicationMinimumHeartbeatUpdateInterval. -func (w *parallelWorker) updatePos(ctx context.Context, pos replication.Position) (posReached bool, err error) { - update := binlogplayer.GenerateUpdateWorkerPos(w.vp.vr.id, w.index, pos) +func (w *parallelWorker) updatePos(ctx context.Context, pos replication.Position, transactionTimestamp int64) (posReached bool, err error) { + update := binlogplayer.GenerateUpdateWorkerPos(w.vp.vr.id, w.index, pos, transactionTimestamp) if _, err := w.queryFunc(ctx, update); err != nil { - return false, fmt.Errorf("error %v updating position", err) + // TODO(remove) this is just debug info + { + query := binlogplayer.ReadVReplicationWorkersGTIDs(w.vp.vr.id) + qr, err := w.dbClient.ExecuteFetch(query, -1) + if err != nil { + log.Errorf("Error fetching vreplication worker positions: %v", err) + } else { + for _, row := range qr.Rows { + log.Errorf("====== QQQ updatePos gtid= %v", row[0].ToString()) + } + } + } + // end TODO + return false, fmt.Errorf("error updating position: %v", err) } // TODO (shlomi): handle these // vp.numAccumulatedHeartbeats = 0 @@ -100,7 +118,21 @@ func (w *parallelWorker) updatePosByEvent(ctx context.Context, event *binlogdata if err != nil { return err } - if _, err := w.updatePos(ctx, pos); err != nil { + if _, err := w.updatePos(ctx, pos, event.Timestamp); err != nil { + debug.PrintStack() // TODO(shlomi) remove + // TODO(remove) this is just debug info + { + query := binlogplayer.ReadVReplicationWorkersGTIDs(w.vp.vr.id) + qr, err := w.dbClient.ExecuteFetch(query, -1) + if err != nil { + log.Errorf("Error fetching vreplication worker positions: %v", err) + } else { + for _, row := range qr.Rows { + log.Errorf("====== QQQ updatePos gtid= %v", row[0].ToString()) + } + } + } + // end TODO return err } w.lastPos = replication.AppendGTIDSet(w.lastPos, pos.GTIDSet) @@ -117,53 +149,63 @@ func (w *parallelWorker) commit() error { func (w *parallelWorker) commitEvents() chan error { event := w.producer.commitWorkerEvent() + log.Errorf("========== QQQ commitEvents: %v", event) c := w.subscribeCommitWorkerEvent(event.SequenceNumber) + log.Errorf("========== QQQ commitEvents: subscribed to %v in worker %v", event.SequenceNumber, w.index) w.events <- event + log.Errorf("========== QQQ commitEvents: pushed event") return c } -func (w *parallelWorker) applyEvent(ctx context.Context, event *binlogdatapb.VEvent) error { - select { - case w.events <- event: - return nil - case <-ctx.Done(): - return ctx.Err() - } -} - func (w *parallelWorker) applyQueuedEvents(ctx context.Context) (err error) { + log.Errorf("========== QQQ applyQueuedEvents") + + ticker := time.NewTicker(100 * time.Millisecond) + defer ticker.Stop() + + var lastTickerTime time.Time + var lastAppliedTickerTime time.Time for { select { case <-ctx.Done(): return ctx.Err() + case lastTickerTime = <-ticker.C: + case pos := <-w.aggregatedPosChan: + log.Errorf("========== QQQ applyQueuedEvents worker %v got aggregated pos %v", w.index, pos) + if _, err := w.updatePos(ctx, pos, 0); err != nil { + debug.PrintStack() // TODO(shlomi) remove + return err + } + log.Errorf("========== QQQ applyQueuedEvents worker %v updated aggregated pos", w.index) case event := <-w.events: + // log.Errorf("========== QQQ applyQueuedEvents, event=%v", event) if event.SequenceNumber >= 0 { - w.sequenceNumbers = append(w.sequenceNumbers, event.SequenceNumber) - } - if err := w.applyQueuedEvent(ctx, event); err != nil { - return err + // Negative values are happen in commitWorkerEvent(). These are not real events. + w.sequenceNumbers[event.SequenceNumber] = true } - if event.Type == binlogdatapb.VEventType_UNKNOWN && event.SequenceNumber < 0 { - // This is a commit-hint event, produced by commitWorkerEvent() - err := w.commit() - func() { - w.commitSubscribersMu.Lock() - defer w.commitSubscribersMu.Unlock() - if subs, ok := w.commitSubscribers[event.SequenceNumber]; ok { - subs <- err - delete(w.commitSubscribers, event.SequenceNumber) - } - }() - if err != nil { - return err + skipApplyEvent := func() bool { + if event.Type != binlogdatapb.VEventType_COMMIT { + // We only skip commits, for performance reasons (aggregate more operations under same commit) + return false } - - w.producer.completedSequenceNumbers <- w.sequenceNumbers - w.sequenceNumbers = w.sequenceNumbers[0:0] - if w.producer.posReached.Load() { - return io.EOF + if len(w.sequenceNumbers) >= maxEvents { + // Too many events in the queue, commit now + return false + } + if lastTickerTime != lastAppliedTickerTime { + // Too much time passed since last applied event. Meaning we're sitting idly doing nothing. Better to commit now + // and utilize some IO and make some progress. + return false } + return true } + if skipApplyEvent() { + continue + } + if err := w.applyQueuedEvent(ctx, event); err != nil { + return err + } + lastAppliedTickerTime = lastTickerTime } } } @@ -189,12 +231,7 @@ func (w *parallelWorker) applyQueuedEvent(ctx context.Context, event *binlogdata } func (w *parallelWorker) applyApplicableQueuedEvent(ctx context.Context, event *binlogdatapb.VEvent) error { - switch event.Type { - case binlogdatapb.VEventType_UNKNOWN: - // An indication that there are no more events for this worker - return nil - } - + // log.Errorf("========== QQQ applyApplicableQueuedEvent, event.Type=%v", event.Type) ctx, cancel := context.WithCancel(ctx) defer cancel() @@ -222,11 +259,8 @@ func (w *parallelWorker) applyApplicableQueuedEvent(ctx context.Context, event * return nil case binlogdatapb.VEventType_BEGIN: // No-op: begin is called as needed. - case binlogdatapb.VEventType_COMMIT: - if err := <-w.commitEvents(); err != nil { - return err - } - w.producer.numCommits.Add(1) + case binlogdatapb.VEventType_COMMIT, binlogdatapb.VEventType_UNKNOWN: + return w.applyQueuedCommit(event) case binlogdatapb.VEventType_FIELD: if err := w.dbClient.Begin(); err != nil { return err @@ -485,6 +519,8 @@ func (w *parallelWorker) updateFKCheck(ctx context.Context, flags2 uint32) error } func (w *parallelWorker) applyQueuedRowEvent(ctx context.Context, vevent *binlogdatapb.VEvent) error { + log.Errorf("========== QQQ applyQueuedRowEvent worker %v", w.index) + defer log.Errorf("========== QQQ applyQueuedRowEvent worker %v DONE", w.index) if err := w.updateFKCheck(ctx, vevent.RowEvent.Flags); err != nil { return err } @@ -538,3 +574,46 @@ func (w *parallelWorker) applyQueuedRowEvent(ctx context.Context, vevent *binlog } return nil } + +func (w *parallelWorker) applyQueuedCommit(event *binlogdatapb.VEvent) error { + switch { + case event.Type == binlogdatapb.VEventType_COMMIT: + case event.Type == binlogdatapb.VEventType_UNKNOWN && event.SequenceNumber < 0: + default: + // Not a commit + return nil + } + shouldActuallyCommit := len(w.sequenceNumbers) > 0 + var err error + if shouldActuallyCommit { + err = w.commit() + } + // log.Errorf("========== QQQ applyQueuedEvents, event is commit or commitWorkerEvent(), seq=%v", event.SequenceNumber) + // log.Errorf("========== QQQ applyQueuedEvents committed, err=%v", err) + func() { + // log.Errorf("========== QQQ applyQueuedEvents finding subscribers") + w.commitSubscribersMu.Lock() + defer w.commitSubscribersMu.Unlock() + if subs, ok := w.commitSubscribers[event.SequenceNumber]; ok { + // log.Errorf("========== QQQ applyQueuedEvents found subscriber") + subs <- err + // log.Errorf("========== QQQ applyQueuedEvents notified subscriber") + delete(w.commitSubscribers, event.SequenceNumber) + } + }() + if err != nil { + return err + } + // Commit successful + if shouldActuallyCommit { + w.producer.numCommits.Add(1) + } + // log.Errorf("========== QQQ applyQueuedEvents pushing seqs") + w.producer.completedSequenceNumbers <- w.sequenceNumbers + // log.Errorf("========== QQQ applyQueuedEvents pushed seqs") + w.sequenceNumbers = make(map[int64]bool, maxEvents) + if w.producer.posReached.Load() { + return io.EOF + } + return nil +}