diff --git a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go index 689491b3b1c..1e897e83cfb 100644 --- a/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go +++ b/go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go @@ -278,7 +278,24 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap // case t := <-ticker.C: // lastGoodTime = t case event := <-events: - // processEvent(ctx, event) + log.Errorf("========== QQQ process event type: %v", event.Type) + canApplyInParallel := false + switch event.Type { + case binlogdatapb.VEventType_BEGIN, + binlogdatapb.VEventType_FIELD, + binlogdatapb.VEventType_ROW, + binlogdatapb.VEventType_COMMIT, + binlogdatapb.VEventType_GTID: + // We can parallelize these events. + canApplyInParallel = true + } + if !canApplyInParallel { + // As an example, thus could be a DDL. + // Wait for all existing workers to complete + if err := p.commitAll(ctx, nil); err != nil { + return err + } + } workerIndex := p.assignTransactionToWorker(event.SequenceNumber, event.CommitParent) worker := p.workers[workerIndex] // We know the worker has enough capacity and thus the following will not block. @@ -288,6 +305,12 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap return ctx.Err() case worker.events <- event: } + if !canApplyInParallel { + // Say this was a DDL. Then we need to wait until it is absolutely complete, before we allow the next event to be processed. + if err := <-worker.commitEvents(); err != nil { + return err + } + } } } }