Skip to content

Commit

Permalink
more progress
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed Feb 12, 2025
1 parent 24551c7 commit b7dbd7a
Show file tree
Hide file tree
Showing 2 changed files with 235 additions and 78 deletions.
126 changes: 102 additions & 24 deletions go/vt/vttablet/tabletmanager/vreplication/vplayer_parallel_producer.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,22 +34,22 @@ import (
)

const (
countWorkers = 1
countWorkers = 4
)

type parallelProducer struct {
vp *vplayer

workers []*parallelWorker
lastWorker int
workers []*parallelWorker

dbClient *vdbClient

posReached atomic.Bool
workerErrors chan error
sequenceToWorkersMap map[int64]int // sequence number => worker index
completedSequenceNumbers chan []int64
completedSequenceNumbers chan map[int64]bool
commitWorkerEventSequence atomic.Int64
assignSequence int64

numCommits atomic.Int64 // temporary. TODO: remove
currentConcurrency atomic.Int64 // temporary. TODO: remove
Expand All @@ -59,10 +59,19 @@ type parallelProducer struct {
func newParallelProducer(ctx context.Context, dbClientGen dbClientGenerator, vp *vplayer) (*parallelProducer, error) {
p := &parallelProducer{
vp: vp,
dbClient: vp.vr.dbClient,
workers: make([]*parallelWorker, countWorkers),
workerErrors: make(chan error, countWorkers),
dbClient: vp.vr.dbClient,
completedSequenceNumbers: make(chan []int64, countWorkers),
sequenceToWorkersMap: make(map[int64]int),
completedSequenceNumbers: make(chan map[int64]bool, countWorkers),
}
{
// TODO(shlomi): just use the dbClient from vp.vr.
dbClient, err := dbClientGen()
if err != nil {
return nil, err
}
p.dbClient = newVDBClient(dbClient, vp.vr.stats, 0)
}
for i := range p.workers {
w := newParallelWorker(i, p, vp.vr.workflowConfig.RelayLogMaxItems)
Expand Down Expand Up @@ -101,16 +110,34 @@ func (p *parallelProducer) commitWorkerEvent() *binlogdatapb.VEvent {
}

func (p *parallelProducer) assignTransactionToWorker(sequenceNumber int64, lastCommitted int64) (workerIndex int) {
if workerIndex, ok := p.sequenceToWorkersMap[sequenceNumber]; ok {
// Pin for the duration of the transaction
// log.Errorf("========== QQQ assignTransactionToWorker same trx sequenceNumber=%v, lastCommitted=%v, workerIndex=%v", sequenceNumber, lastCommitted, workerIndex)
return workerIndex
}
if workerIndex, ok := p.sequenceToWorkersMap[lastCommitted]; ok {
// Assign transaction to the same worker who owns the last committed transaction
// log.Errorf("========== QQQ assignTransactionToWorker dependent trx sequenceNumber=%v, lastCommitted=%v, workerIndex=%v", sequenceNumber, lastCommitted, workerIndex)
p.sequenceToWorkersMap[sequenceNumber] = workerIndex
return workerIndex
}
p.lastWorker = (p.lastWorker + 1) % countWorkers
return p.lastWorker
// workerIndex = int((p.assignSequence / 10) % countWorkers)
workerIndex = int(p.assignSequence % countWorkers)
// log.Errorf("========== QQQ assignTransactionToWorker free trx p.sequence=%v, sequenceNumber=%v, lastCommitted=%v, workerIndex=%v", p.assignSequence, sequenceNumber, lastCommitted, workerIndex)
p.assignSequence++
p.sequenceToWorkersMap[sequenceNumber] = workerIndex
return workerIndex
}

func (p *parallelProducer) commitAll(ctx context.Context, except *parallelWorker) error {
// TODO(shlomi) remove
{
exceptString := ""
if except != nil {
exceptString = fmt.Sprintf(" except %v", except.index)
}
log.Errorf("========== QQQ commitAll%v", exceptString)
}
var eg errgroup.Group
for _, w := range p.workers {
w := w
Expand All @@ -123,41 +150,87 @@ func (p *parallelProducer) commitAll(ctx context.Context, except *parallelWorker
}
return eg.Wait()
}
func (p *parallelProducer) aggregateWorkersPos(ctx context.Context) (aggregatedWorkersPos replication.Position, combinedPos replication.Position, err error) {
// TODO(shlomi): this query can be computed once in the lifetime of the producer
query := binlogplayer.ReadVReplicationWorkersGTIDs(p.vp.vr.id)
qr, err := p.dbClient.ExecuteFetch(query, -1)
if err != nil {
log.Errorf("Error fetching vreplication worker positions: %v. isclosed? %v", err, p.dbClient.IsClosed())
return aggregatedWorkersPos, combinedPos, err
}
var lastEventTimestamp int64
for _, row := range qr.Rows {
current, err := binlogplayer.DecodeMySQL56Position(row[0].ToString())
if err != nil {
return aggregatedWorkersPos, combinedPos, err
}
eventTimestamp, err := row[1].ToInt64()
if err != nil {
return aggregatedWorkersPos, combinedPos, err
}
lastEventTimestamp = max(lastEventTimestamp, eventTimestamp)
aggregatedWorkersPos = replication.AppendGTIDSet(aggregatedWorkersPos, current.GTIDSet)
}
combinedPos = replication.AppendGTIDSet(aggregatedWorkersPos, p.vp.startPos.GTIDSet)
p.vp.pos = combinedPos // TODO(shlomi) potential for race condition

func (p *parallelProducer) watchPos(ctx context.Context) error {
if p.vp.stopPos.IsZero() {
return nil
log.Errorf("========== QQQ aggregateWorkersPos updatePos ts=%v, pos=%v", lastEventTimestamp, combinedPos)
if _, err := p.vp.updatePos(ctx, lastEventTimestamp); err != nil {
return aggregatedWorkersPos, combinedPos, err
}
query := binlogplayer.ReadVReplicationWorkersGTIDs(p.vp.vr.id)
if err := p.vp.commit(); err != nil {
return aggregatedWorkersPos, combinedPos, err
}
return aggregatedWorkersPos, combinedPos, nil
}

func (p *parallelProducer) watchPos(ctx context.Context) error {
// if p.vp.stopPos.IsZero() {
// return nil
// }
ticker := time.NewTicker(1 * time.Second)
defer ticker.Stop()

var lastCombinedPos replication.Position
for {
select {
case <-ctx.Done():
return ctx.Err()
case <-ticker.C:
qr, err := p.dbClient.ExecuteFetch(query, -1)
log.Errorf("========== QQQ watchPos ticker")
aggregatedWorkersPos, combinedPos, err := p.aggregateWorkersPos(ctx)
if err != nil {
log.Errorf("Error fetching vreplication worker positions: %v", err)
log.Errorf("Error aggregating vreplication worker positions: %v. isclosed? %v", err, p.dbClient.IsClosed())
continue
}
combinedPos := p.vp.startPos
for _, row := range qr.Rows {
current, err := binlogplayer.DecodeMySQL56Position(row[0].ToString())
if err != nil {
log.Errorf("========== QQQ watchPos aggregatedWorkersPos: %v, combinedPos: %v, stop: %v", aggregatedWorkersPos, combinedPos, p.vp.stopPos)

// Write back this combined pos to all workers, so that we condense their otherwise sparse GTID sets.
log.Errorf("========== QQQ watchPos pushing combined pos %v", combinedPos)
for _, w := range p.workers {
log.Errorf("========== QQQ watchPos pushing combined pos worker %v", w.index)
w.aggregatedPosChan <- aggregatedWorkersPos
}
log.Errorf("========== QQQ watchPos pushed combined pos")
if combinedPos.GTIDSet.Equal(lastCombinedPos.GTIDSet) {
// no progress has been made
log.Errorf("========== QQQ watchPos no progress!! committing all")
if err := p.commitAll(ctx, nil); err != nil {
return err
}
combinedPos = replication.AppendGTIDSet(combinedPos, current.GTIDSet)
log.Errorf("========== QQQ watchPos no progress!! committed all")
} else {
// progress has been made
lastCombinedPos = combinedPos
}

if combinedPos.AtLeast(p.vp.stopPos) {
if !p.vp.stopPos.IsZero() && combinedPos.AtLeast(p.vp.stopPos) {
if err := p.commitAll(ctx, nil); err != nil {
return err
}
p.posReached.Store(true)
return io.EOF
}
log.Errorf("========== QQQ watchPos end loop cycle")
}
}
}
Expand Down Expand Up @@ -198,7 +271,8 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap
case <-ctx.Done():
return ctx.Err()
case sequenceNumbers := <-p.completedSequenceNumbers:
for _, sequenceNumber := range sequenceNumbers {
// log.Errorf("========== QQQ process completedSequenceNumbers=%v", sequenceNumbers)
for sequenceNumber := range sequenceNumbers {
delete(p.sequenceToWorkersMap, sequenceNumber)
}
// case t := <-ticker.C:
Expand All @@ -208,6 +282,7 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap
workerIndex := p.assignTransactionToWorker(event.SequenceNumber, event.CommitParent)
worker := p.workers[workerIndex]
// We know the worker has enough capacity and thus the following will not block.
// log.Errorf("========== QQQ process: assigning event.Type %v seq=%v, parent=%v, to worker.Index %v at index %v", event.Type, event.SequenceNumber, event.CommitParent, worker.index, workerIndex)
select {
case <-ctx.Done():
return ctx.Err()
Expand All @@ -218,8 +293,11 @@ func (p *parallelProducer) process(ctx context.Context, events chan *binlogdatap
}

func (p *parallelProducer) applyEvents(ctx context.Context, relay *relayLog) error {
ctx, cancel := context.WithCancel(ctx)
defer cancel()
// TODO(shlomi): do not cancel context, because if we do, that can terminate async queries still running.
// ctx, cancel := context.WithCancel(ctx)
// defer cancel()

log.Errorf("========== QQQ applyEvents defer")

go func() {
if err := p.watchPos(ctx); err != nil {
Expand Down
Loading

0 comments on commit b7dbd7a

Please sign in to comment.