Skip to content

Commit

Permalink
ensure rollback; no need to retry FIELD event as it's now sequentiali…
Browse files Browse the repository at this point in the history
…zed; async channel inserts as otherwise sync ones can cause deadlock

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed Feb 16, 2025
1 parent a480d1d commit 8aecc48
Showing 1 changed file with 39 additions and 39 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ package vreplication

import (
"context"
"errors"
"fmt"
"io"
"runtime/debug"
Expand All @@ -29,15 +28,11 @@ import (
"vitess.io/vitess/go/vt/binlog/binlogplayer"
"vitess.io/vitess/go/vt/log"
binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
"vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

var (
errRetryEvent = errors.New("retry event")
maxEvents = 50
)

type parallelWorker struct {
index int
dbClient *vdbClient
Expand All @@ -61,6 +56,9 @@ type parallelWorker struct {
// foreignKeyChecksStateInitialized is set to true once we have initialized the foreignKeyChecksEnabled.
// The initialization is done on the first row event that this vplayer sees.
foreignKeyChecksStateInitialized bool

// TODO(shlomi): remove this
numCommits int
}

func newParallelWorker(index int, producer *parallelProducer, capacity int) *parallelWorker {
Expand All @@ -70,7 +68,7 @@ func newParallelWorker(index int, producer *parallelProducer, capacity int) *par
producer: producer,
events: make(chan *binlogdatapb.VEvent, capacity),
aggregatedPosChan: make(chan replication.Position),
sequenceNumbers: make(map[int64]bool, maxEvents),
sequenceNumbers: make(map[int64]bool, maxWorkerEvents),
commitSubscribers: make(map[int64]chan error),
vp: producer.vp,
}
Expand Down Expand Up @@ -163,6 +161,14 @@ func (w *parallelWorker) applyQueuedEvents(ctx context.Context) (err error) {
log.Errorf("========== QQQ applyQueuedEvents *********** DONE %v *********** err=%v", w.index, err)
}()

defer func() {
w.dbClient.Rollback()
}()

defer func() {
log.Errorf("========== QQQ applyQueuedEvents worker %v num commits=%v", w.index, w.numCommits)
}()

ticker := time.NewTicker(100 * time.Millisecond)
defer ticker.Stop()

Expand All @@ -174,12 +180,12 @@ func (w *parallelWorker) applyQueuedEvents(ctx context.Context) (err error) {
return ctx.Err()
case lastTickerTime = <-ticker.C:
case pos := <-w.aggregatedPosChan:
log.Errorf("========== QQQ applyQueuedEvents worker %v got aggregated pos %v", w.index, pos)
// log.Errorf("========== QQQ applyQueuedEvents worker %v got aggregated pos %v", w.index, pos)
if _, err := w.updatePos(ctx, pos, 0); err != nil {
debug.PrintStack() // TODO(shlomi) remove
// debug.PrintStack() // TODO(shlomi) remove
return err
}
log.Errorf("========== QQQ applyQueuedEvents worker %v updated aggregated pos", w.index)
// log.Errorf("========== QQQ applyQueuedEvents worker %v updated aggregated pos", w.index)
case event := <-w.events:
// log.Errorf("========== QQQ applyQueuedEvents, event=%v", event)
if event.SequenceNumber >= 0 {
Expand All @@ -191,7 +197,7 @@ func (w *parallelWorker) applyQueuedEvents(ctx context.Context) (err error) {
// We only skip commits, for performance reasons (aggregate more operations under same commit)
return false
}
if len(w.sequenceNumbers) >= maxEvents {
if len(w.sequenceNumbers) >= maxWorkerEvents {
// Too many events in the queue, commit now
return false
}
Expand All @@ -214,29 +220,10 @@ func (w *parallelWorker) applyQueuedEvents(ctx context.Context) (err error) {
}

func (w *parallelWorker) applyQueuedEvent(ctx context.Context, event *binlogdatapb.VEvent) error {
for {
err := w.applyApplicableQueuedEvent(ctx, event)
if errors.Is(vterrors.UnwrapAll(err), errRetryEvent) {
log.Errorf("========== QQQ worker %v error is errRetryEvent: %v", w.index, err)
// The error here is that we tried to apply a ROW event, but the table map for this row change
// we advertised in a FIELD event to a different worker. This happens because vstreamer optimizes
// table map events: it only sends the single first event for any table (until log is rotated or until
// table is changed). As we slice the relaylog events and distribute into different worker, it is possible
// that worker #3 will attempt to run a ROW event before worker #2 has applied the FIELD event for the same table.
// So what we do here is to commit all workers
if err := w.producer.commitAll(ctx, w); err != nil {
return err
}
continue
}
return err
}
}

func (w *parallelWorker) applyApplicableQueuedEvent(ctx context.Context, event *binlogdatapb.VEvent) error {
// log.Errorf("========== QQQ applyApplicableQueuedEvent, event.Type=%v", event.Type)
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// log.Errorf("========== QQQ applyQueuedEvent, START worker=%v, event.Type=%v", w.index, event.Type)
// defer log.Errorf("========== QQQ applyQueuedEvent, DONE worker=%v, event.Type=%v", w.index, event.Type)
// ctx, cancel := context.WithCancel(ctx)
// defer cancel()

//
t := time.NewTimer(5 * time.Second)
Expand Down Expand Up @@ -522,8 +509,8 @@ func (w *parallelWorker) updateFKCheck(ctx context.Context, flags2 uint32) error
}

func (w *parallelWorker) applyQueuedRowEvent(ctx context.Context, vevent *binlogdatapb.VEvent) error {
log.Errorf("========== QQQ applyQueuedRowEvent worker %v", w.index)
defer log.Errorf("========== QQQ applyQueuedRowEvent worker %v DONE", w.index)
// log.Errorf("========== QQQ applyQueuedRowEvent worker %v", w.index)
// defer log.Errorf("========== QQQ applyQueuedRowEvent worker %v DONE", w.index)
if err := w.updateFKCheck(ctx, vevent.RowEvent.Flags); err != nil {
return err
}
Expand All @@ -534,7 +521,7 @@ func (w *parallelWorker) applyQueuedRowEvent(ctx context.Context, vevent *binlog
tplan = w.vp.tablePlans[vevent.RowEvent.TableName]
}()
if tplan == nil {
return vterrors.Wrapf(errRetryEvent, "unexpected event on table %s", vevent.RowEvent.TableName)
return vterrors.Errorf(vtrpc.Code_FAILED_PRECONDITION, "unexpected event on table %s that has no plan yet", vevent.RowEvent.TableName)
}
applyFunc := func(sql string) (*sqltypes.Result, error) {
stats := NewVrLogStats("ROWCHANGE")
Expand Down Expand Up @@ -579,6 +566,7 @@ func (w *parallelWorker) applyQueuedRowEvent(ctx context.Context, vevent *binlog
}

func (w *parallelWorker) applyQueuedCommit(event *binlogdatapb.VEvent) error {
// log.Errorf("========== QQQ applyQueuedCommit worker %v", w.index)
switch {
case event.Type == binlogdatapb.VEventType_COMMIT:
case event.Type == binlogdatapb.VEventType_UNKNOWN && event.SequenceNumber < 0:
Expand All @@ -589,7 +577,9 @@ func (w *parallelWorker) applyQueuedCommit(event *binlogdatapb.VEvent) error {
shouldActuallyCommit := len(w.sequenceNumbers) > 0
var err error
if shouldActuallyCommit {
// log.Errorf("========== QQQ applyQueuedCommit actual commit worker %v", w.index)
err = w.commit()
// log.Errorf("========== QQQ applyQueuedCommit actual commit worker %v DONE", w.index)
}
// log.Errorf("========== QQQ applyQueuedEvents, event is commit or commitWorkerEvent(), seq=%v", event.SequenceNumber)
// log.Errorf("========== QQQ applyQueuedEvents committed, err=%v", err)
Expand All @@ -610,11 +600,21 @@ func (w *parallelWorker) applyQueuedCommit(event *binlogdatapb.VEvent) error {
// Commit successful
if shouldActuallyCommit {
w.producer.numCommits.Add(1)
w.numCommits++
}
// log.Errorf("========== QQQ applyQueuedEvents pushing seqs")
w.producer.completedSequenceNumbers <- w.sequenceNumbers
// Now that we have committed, we want to report the committed sequence numbers back to the producer.
// We can't just run `w.producer.completedSequenceNumbers <- sequenceNumbers` because we will be deadlocking
// the producer: this very function runs off the producer's `case worker.events <- event`, which is on the same
// `select` statement as `case sequenceNumbers := <-p.completedSequenceNumbers`. So we need to run this in a goroutine.
// Which is fine, because while it is very nice to have, it's not mandatory that the producer updates the sequence
// numbers asap.
sequenceNumbers := w.sequenceNumbers
go func() {
w.producer.completedSequenceNumbers <- sequenceNumbers
}()
// log.Errorf("========== QQQ applyQueuedEvents pushed seqs")
w.sequenceNumbers = make(map[int64]bool, maxEvents)
w.sequenceNumbers = make(map[int64]bool, maxWorkerEvents)
if w.producer.posReached.Load() {
return io.EOF
}
Expand Down

0 comments on commit 8aecc48

Please sign in to comment.