Skip to content

Commit

Permalink
sequentialize non-ROW events
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed Feb 12, 2025
1 parent b7dbd7a commit 68d6d2a
Showing 1 changed file with 24 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -278,7 +278,24 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap
// case t := <-ticker.C:
// lastGoodTime = t
case event := <-events:
// processEvent(ctx, event)
log.Errorf("========== QQQ process event type: %v", event.Type)
canApplyInParallel := false
switch event.Type {
case binlogdatapb.VEventType_BEGIN,
binlogdatapb.VEventType_FIELD,
binlogdatapb.VEventType_ROW,
binlogdatapb.VEventType_COMMIT,
binlogdatapb.VEventType_GTID:
// We can parallelize these events.
canApplyInParallel = true
}
if !canApplyInParallel {
// As an example, thus could be a DDL.
// Wait for all existing workers to complete
if err := p.commitAll(ctx, nil); err != nil {
return err
}
}
workerIndex := p.assignTransactionToWorker(event.SequenceNumber, event.CommitParent)
worker := p.workers[workerIndex]
// We know the worker has enough capacity and thus the following will not block.
Expand All @@ -288,6 +305,12 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap
return ctx.Err()
case worker.events <- event:
}
if !canApplyInParallel {
// Say this was a DDL. Then we need to wait until it is absolutely complete, before we allow the next event to be processed.
if err := <-worker.commitEvents(); err != nil {
return err
}
}
}
}
}
Expand Down

0 comments on commit 68d6d2a

Please sign in to comment.