Skip to content

Commit

Permalink
Address review comments. Run parallel insert on extended TestFKExt an…
Browse files Browse the repository at this point in the history
…d regular on TestFKWorkflow

Signed-off-by: Rohit Nayak <rohit@planetscale.com>
  • Loading branch information
rohit-nayak-ps committed Feb 14, 2025
1 parent e6da4ae commit 0d24a94
Show file tree
Hide file tree
Showing 3 changed files with 6 additions and 5 deletions.
5 changes: 4 additions & 1 deletion go/test/endtoend/vreplication/fk_ext_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,10 @@ func TestFKExt(t *testing.T) {
setSidecarDBName("_vt")

// Ensure that there are multiple copy phase cycles per table.
extraVTTabletArgs = append(extraVTTabletArgs, "--vstream_packet_size=256", "--queryserver-config-schema-change-signal")
extraVTTabletArgs = append(extraVTTabletArgs,
"--vstream_packet_size=256",
"--queryserver-config-schema-change-signal",
parallelInsertWorkers)
extraVTGateArgs = append(extraVTGateArgs, "--schema_change_signal=true", "--planner-version", "Gen4")
defer func() { extraVTTabletArgs = nil }()
initFKExtConfig(t)
Expand Down
1 change: 0 additions & 1 deletion go/test/endtoend/vreplication/fk_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@ func TestFKWorkflow(t *testing.T) {
extraVTTabletArgs = []string{
// Ensure that there are multiple copy phase cycles per table.
"--vstream_packet_size=256",
parallelInsertWorkers,
}
defer func() { extraVTTabletArgs = nil }()

Expand Down
5 changes: 2 additions & 3 deletions go/vt/vttablet/tabletmanager/vreplication/vcopier_atomic.go
Original file line number Diff line number Diff line change
Expand Up @@ -201,11 +201,10 @@ func (vc *vcopier) copyAll(ctx context.Context, settings binlogplayer.VRSettings
// Prepare a vcopierCopyTask for the current batch of work.
currCh := make(chan *vcopierCopyTaskResult, 1)

resp2 := resp
if parallelism > 1 {
resp2 = resp.CloneVT()
resp = resp.CloneVT()
}
currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp2.Rows, resp2.Lastpk))
currT := newVCopierCopyTask(newVCopierCopyTaskArgs(resp.Rows, resp.Lastpk))

// Send result to the global resultCh and currCh. resultCh is used by
// the loop to return results to VStreamRows. currCh will be used to
Expand Down

0 comments on commit 0d24a94

Please sign in to comment.