From 68d6d2ae8779248dae1e34aca6c584bd41e67b93 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Wed, 12 Feb 2025 14:52:33 +0200 Subject: [PATCH] sequentialize non-ROW events Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../vreplication/vplayer_parallel_producer.go | 25 ++++++++++++++++++- 1 file changed, 24 insertions(+), 1 deletion(-) diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go index 689491b3b1c..1e897e83cfb 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go @@ -278,7 +278,24 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap // case t := <-ticker.C: // lastGoodTime = t case event := <-events: - // processEvent(ctx, event) + log.Errorf("========== QQQ process event type: %v", event.Type) + canApplyInParallel := false + switch event.Type { + case binlogdatapb.VEventType_BEGIN, + binlogdatapb.VEventType_FIELD, + binlogdatapb.VEventType_ROW, + binlogdatapb.VEventType_COMMIT, + binlogdatapb.VEventType_GTID: + // We can parallelize these events. + canApplyInParallel = true + } + if !canApplyInParallel { + // As an example, thus could be a DDL. + // Wait for all existing workers to complete + if err := p.commitAll(ctx, nil); err != nil { + return err + } + } workerIndex := p.assignTransactionToWorker(event.SequenceNumber, event.CommitParent) worker := p.workers[workerIndex] // We know the worker has enough capacity and thus the following will not block. @@ -288,6 +305,12 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap return ctx.Err() case worker.events <- event: } + if !canApplyInParallel { + // Say this was a DDL. Then we need to wait until it is absolutely complete, before we allow the next event to be processed. + if err := <-worker.commitEvents(); err != nil { + return err + } + } } } }