Skip to content

Commit

Permalink
Clone response on concurrent inserts. Delete copy state for last tabl…
Browse files Browse the repository at this point in the history
…e after all copy is done. Updated test to run concurrent inserts

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Feb 12, 2025
1 parent 01b3e02 commit e6da4ae
Show file tree
Hide file tree
Showing 2 changed files with 20 additions and 15 deletions.
15 changes: 10 additions & 5 deletions go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,9 +39,11 @@ const testWorkflowFlavor = workflowFlavorVtctld
// It inserts initial data, then simulates load. We insert both child rows with foreign keys and those without,
// i.e. with foreign_key_checks=0.
func TestFKWorkflow(t *testing.T) {
setSidecarDBName("_vt")
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
parallelInsertWorkers,
}
defer func() { extraVTTabletArgs = nil }()

Expand Down Expand Up @@ -128,11 +130,14 @@ func TestFKWorkflow(t *testing.T) {
vtgateConn, closeConn := getVTGateConn()
defer closeConn()

t11Count := getRowCount(t, vtgateConn, "t11")
t12Count := getRowCount(t, vtgateConn, "t12")
require.Greater(t, t11Count, 1)
require.Greater(t, t12Count, 1)
require.Equal(t, t11Count, t12Count)
if withLoad {
t11Count := getRowCount(t, vtgateConn, "t11")
t12Count := getRowCount(t, vtgateConn, "t12")
require.Greater(t, t11Count, 1)
require.Greater(t, t12Count, 1)
require.Equal(t, t11Count, t12Count)
}

}

func insertInitialFKData(t *testing.T) {
Expand Down
20 changes: 10 additions & 10 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,11 +84,7 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
defer rowsCopiedTicker.Stop()

parallelism := int(math.Max(1, float64(vc.vr.workflowConfig.ParallelInsertWorkers)))
// For now do not support concurrent inserts for atomic copies.
if parallelism > 1 {
parallelism = 1
log.Infof("Disabling concurrent inserts for atomic copies")
}

copyWorkerFactory := vc.newCopyWorkerFactory(parallelism)
var copyWorkQueue *vcopierCopyWorkQueue

Expand All @@ -115,7 +111,6 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
resp.TableName, len(resp.Fields), len(resp.Rows), resp.Gtid, resp.Lastpk)
tableName := resp.TableName
gtid = resp.Gtid

updateRowsCopied := func() error {
updateRowsQuery := binlogplayer.GenerateUpdateRowsCopied(vc.vr.id, vc.vr.stats.CopyRowCount.Get())
_, err := vc.vr.dbClient.Execute(updateRowsQuery)
Expand Down Expand Up @@ -205,7 +200,12 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
log.Infof("copying table %s with lastpk %v", tableName, lastpkbv)
// Prepare a vcopierCopyTask for the current batch of work.
currCh := make(chan *vcopierCopyTaskResult, 1)
currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp.Rows, resp.Lastpk))

resp2 := resp
if parallelism > 1 {
resp2 = resp.CloneVT()
}
currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp2.Rows, resp2.Lastpk))

// Send result to the global resultCh and currCh. resultCh is used by
// the loop to return results to VStreamRows. currCh will be used to
Expand Down Expand Up @@ -292,12 +292,12 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
log.Infof("Copy of %v stopped", state.currentTableName)
return fmt.Errorf("CopyAll was interrupted due to context expiration")
default:
if err := vc.deleteCopyState(state.currentTableName); err != nil {
return err
}
if copyWorkQueue != nil {
copyWorkQueue.close()
}
if err := vc.deleteCopyState(state.currentTableName); err != nil {
return err
}
if err := vc.updatePos(ctx, gtid); err != nil {
return err
}
Expand Down

0 comments on commit e6da4ae

Please sign in to comment.