Skip to content

Commit

Permalink
[release-19.0] VReplication Atomic Copy Workflows: fix bugs around co…
Browse files Browse the repository at this point in the history
…ncurrent inserts (#17772) (#17791)

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
Co-authored-by: vitess-bot[bot] <108069721+vitess-bot[bot]@users.noreply.github.com>
Co-authored-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
vitess-bot[bot] and rohit-nayak-ps authored Feb 17, 2025
1 parent 4def0fc commit 1ca8d53
Show file tree
Hide file tree
Showing 3 changed files with 13 additions and 10 deletions.
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/fk_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,7 +90,10 @@ func TestFKExt(t *testing.T) {
setSidecarDBName("_vt")

// Ensure that there are multiple copy phase cycles per table.
extraVTTabletArgs = append(extraVTTabletArgs, "--vstream_packet_size=256", "--queryserver-config-schema-change-signal")
extraVTTabletArgs = append(extraVTTabletArgs,
"--vstream_packet_size=256",
"--queryserver-config-schema-change-signal",
parallelInsertWorkers)
extraVTGateArgs = append(extraVTGateArgs, "--schema_change_signal=true", "--planner-version", "Gen4")
defer func() { extraVTTabletArgs = nil }()
initFKExtConfig(t)
Expand Down
1 change: 1 addition & 0 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ const testWorkflowFlavor = workflowFlavorRandom
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
// i.e. with foreign_key_checks=0.
func TestFKWorkflow(t *testing.T) {
setSidecarDBName("_vt")
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
Expand Down
17 changes: 8 additions & 9 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,11 +86,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
defer rowsCopiedTicker.Stop()

parallelism := getInsertParallelism()
// For now do not support concurrent inserts for atomic copies.
if parallelism > 1 {
parallelism = 1
log.Infof("Disabling concurrent inserts for atomic copies")
}

copyWorkerFactory := vc.newCopyWorkerFactory(parallelism)
var copyWorkQueue *vcopierCopyWorkQueue

Expand All @@ -114,7 +110,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
resp.TableName, len(resp.Fields), len(resp.Rows), resp.Gtid, resp.Lastpk)
tableName := resp.TableName
gtid = resp.Gtid

updateRowsCopied := func() error {
updateRowsQuery := binlogplayer.GenerateUpdateRowsCopied(vc.vr.id, vc.vr.stats.CopyRowCount.Get())
_, err := vc.vr.dbClient.Execute(updateRowsQuery)
Expand Down Expand Up @@ -204,6 +199,10 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
log.Infof("copying table %s with lastpk %v", tableName, lastpkbv)
// Prepare a vcopierCopyTask for the current batch of work.
currCh := make(chan *vcopierCopyTaskResult, 1)

if parallelism > 1 {
resp = resp.CloneVT()
}
currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp.Rows, resp.Lastpk))

// Send result to the global resultCh and currCh. resultCh is used by
Expand Down Expand Up @@ -291,12 +290,12 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
log.Infof("Copy of %v stopped", state.currentTableName)
return fmt.Errorf("CopyAll was interrupted due to context expiration")
default:
if err := vc.deleteCopyState(state.currentTableName); err != nil {
return err
}
if copyWorkQueue != nil {
copyWorkQueue.close()
}
if err := vc.deleteCopyState(state.currentTableName); err != nil {
return err
}
if err := vc.updatePos(ctx, gtid); err != nil {
return err
}
Expand Down

0 comments on commit 1ca8d53

Please sign in to comment.