Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

New colocated picker strategy #2368

Open
wants to merge 17 commits into
base: main
Choose a base branch
from
51 changes: 41 additions & 10 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -576,6 +576,7 @@ func importData(importFileTasks []*ImportFileTask) {
utils.PrintAndLog("Tables to import: %v", importFileTasksToTableNames(pendingTasks))
prepareTableToColumns(pendingTasks) //prepare the tableToColumns map
poolSize := tconf.Parallelism * 2
maxParallelConns := tconf.Parallelism
maxTasksInProgress := tconf.Parallelism
if tconf.EnableYBAdaptiveParallelism {
// in case of adaptive parallelism, we need to use maxParalllelism * 2
Expand All @@ -584,6 +585,7 @@ func importData(importFileTasks []*ImportFileTask) {
utils.ErrExit("adaptive parallelism is only supported if target DB is YugabyteDB")
}
poolSize = yb.GetNumMaxConnectionsInPool() * 2
maxParallelConns = yb.GetNumMaxConnectionsInPool()
}
progressReporter := NewImportDataProgressReporter(bool(disablePb))

Expand All @@ -594,7 +596,8 @@ func importData(importFileTasks []*ImportFileTask) {

useTaskPicker := utils.GetEnvAsBool("USE_TASK_PICKER_FOR_IMPORT", true)
if useTaskPicker {
err := importTasksViaTaskPicker(pendingTasks, state, progressReporter, poolSize, maxTasksInProgress)
maxColocatedBatchesInProgress := utils.GetEnvAsInt("MAX_COLOCATED_BATCHES_IN_PROGRESS", 3)
err := importTasksViaTaskPicker(pendingTasks, state, progressReporter, maxParallelConns, maxTasksInProgress, maxColocatedBatchesInProgress)
if err != nil {
utils.ErrExit("Failed to import tasks via task picker: %s", err)
}
Expand All @@ -605,7 +608,7 @@ func importData(importFileTasks []*ImportFileTask) {
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
log.Infof("created batch import pool of size: %d", poolSize)

taskImporter, err := NewFileTaskImporter(task, state, batchImportPool, progressReporter)
taskImporter, err := NewFileTaskImporter(task, state, batchImportPool, progressReporter, nil, false)
if err != nil {
utils.ErrExit("Failed to create file task importer: %s", err)
}
Expand Down Expand Up @@ -709,21 +712,42 @@ func importData(importFileTasks []*ImportFileTask) {
- For the task that is picked, produce the next batch and submit it to the worker pool. Worker will asynchronously import the batch.
- If task is done, mark it as done in the task picker.
*/
func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataState, progressReporter *ImportDataProgressReporter, poolSize int, maxTasksInProgress int) error {
func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataState, progressReporter *ImportDataProgressReporter, maxParallelConns int,
maxShardedTasksInProgress int, maxColocatedBatchesInProgress int) error {

// setup worker pools

// The code can produce `poolSize` number of batches at a time. But, it can consume only
// `parallelism` number of batches at a time.
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
log.Infof("created batch import pool of size: %d", poolSize)
shardedPoolSize := maxParallelConns * 2
batchImportPool = pool.New().WithMaxGoroutines(shardedPoolSize)
log.Infof("created batch import pool of size: %d", shardedPoolSize)

colocatedBatchImportPool := pool.New().WithMaxGoroutines(maxColocatedBatchesInProgress)
log.Infof("created colocated batch import pool of size: %d", maxColocatedBatchesInProgress)

colocatedBatchImportQueue := make(chan func(), maxColocatedBatchesInProgress*2)
go func() {
for {
select {
case f := <-colocatedBatchImportQueue:
colocatedBatchImportPool.Go(f)
}
}
}()

taskImporters := map[int]*FileTaskImporter{}

var taskPicker FileTaskPicker
var err error
var yb *tgtdb.TargetYugabyteDB
var ok bool
if importerRole == TARGET_DB_IMPORTER_ROLE || importerRole == IMPORT_FILE_ROLE {
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
yb, ok = tdb.(*tgtdb.TargetYugabyteDB)
if !ok {
return fmt.Errorf("expected tdb to be of type TargetYugabyteDB, got: %T", tdb)
}
taskPicker, err = NewColocatedAwareRandomTaskPicker(maxTasksInProgress, pendingTasks, state, yb)
taskPicker, err = NewColocatedCappedRandomTaskPicker(maxShardedTasksInProgress, maxColocatedBatchesInProgress, pendingTasks, state, yb, colocatedBatchImportQueue)
if err != nil {
return fmt.Errorf("create colocated aware randmo task picker: %w", err)
}
Expand All @@ -744,9 +768,16 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS
var ok bool
taskImporter, ok = taskImporters[task.ID]
if !ok {
taskImporter, err = NewFileTaskImporter(task, state, batchImportPool, progressReporter)
if err != nil {
return fmt.Errorf("create file task importer: %s", err)
if importerRole == TARGET_DB_IMPORTER_ROLE || importerRole == IMPORT_FILE_ROLE {
taskImporter, err = NewFileTaskImporter(task, state, batchImportPool, progressReporter, colocatedBatchImportQueue, true)
if err != nil {
return fmt.Errorf("create file task importer: %w", err)
}
} else {
taskImporter, err = NewFileTaskImporter(task, state, batchImportPool, progressReporter, nil, false)
if err != nil {
return fmt.Errorf("create file task importer: %w", err)
}
}
log.Infof("created file task importer for table: %s, task: %v", task.TableNameTup.ForOutput(), task)
taskImporters[task.ID] = taskImporter
Expand Down
42 changes: 32 additions & 10 deletions yb-voyager/cmd/importDataFileTaskImporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,13 +41,17 @@ type FileTaskImporter struct {
importBatchArgsProto *tgtdb.ImportBatchArgs
workerPool *pool.Pool

isTableColocated bool
colocatedImportBatchQueue chan func()
useColocatedImportBatchQueue bool

totalProgressAmount int64
currentProgressAmount int64
progressReporter *ImportDataProgressReporter
}

func NewFileTaskImporter(task *ImportFileTask, state *ImportDataState, workerPool *pool.Pool,
progressReporter *ImportDataProgressReporter) (*FileTaskImporter, error) {
progressReporter *ImportDataProgressReporter, colocatedImportBatchQueue chan func(), useColocatedImportBatchQueue bool) (*FileTaskImporter, error) {
batchProducer, err := NewFileBatchProducer(task, state)
if err != nil {
return nil, fmt.Errorf("creating file batch producer: %s", err)
Expand All @@ -56,15 +60,27 @@ func NewFileTaskImporter(task *ImportFileTask, state *ImportDataState, workerPoo
progressReporter.ImportFileStarted(task, totalProgressAmount)
currentProgressAmount := getImportedProgressAmount(task, state)
progressReporter.AddProgressAmount(task, currentProgressAmount)
isTableColocated := false

if useColocatedImportBatchQueue {
yb, ok := tdb.(*tgtdb.TargetYugabyteDB)
if !ok {
return nil, fmt.Errorf("tdb is not of type TargetYugabyteDB. Cannot use colocated import batch queue")
}
isTableColocated, err = yb.IsTableColocated(task.TableNameTup)
}

fti := &FileTaskImporter{
task: task,
batchProducer: batchProducer,
workerPool: workerPool,
importBatchArgsProto: getImportBatchArgsProto(task.TableNameTup, task.FilePath),
progressReporter: progressReporter,
totalProgressAmount: totalProgressAmount,
currentProgressAmount: currentProgressAmount,
task: task,
batchProducer: batchProducer,
workerPool: workerPool,
colocatedImportBatchQueue: colocatedImportBatchQueue,
isTableColocated: isTableColocated,
useColocatedImportBatchQueue: useColocatedImportBatchQueue,
importBatchArgsProto: getImportBatchArgsProto(task.TableNameTup, task.FilePath),
progressReporter: progressReporter,
totalProgressAmount: totalProgressAmount,
currentProgressAmount: currentProgressAmount,
}
state.RegisterFileTaskImporter(fti)
return fti, nil
Expand Down Expand Up @@ -133,7 +149,7 @@ func (fti *FileTaskImporter) importBatch(batch *Batch) {
}

func (fti *FileTaskImporter) submitBatch(batch *Batch) error {
fti.workerPool.Go(func() {
importBatchFunc := func() {
// There are `poolSize` number of competing go-routines trying to invoke COPY.
// But the `connPool` will allow only `parallelism` number of connections to be
// used at a time. Thus limiting the number of concurrent COPYs to `parallelism`.
Expand All @@ -143,7 +159,13 @@ func (fti *FileTaskImporter) submitBatch(batch *Batch) error {
} else {
fti.updateProgress(batch.RecordCount)
}
})
}
if fti.useColocatedImportBatchQueue && fti.isTableColocated {
fti.colocatedImportBatchQueue <- importBatchFunc
} else {
fti.workerPool.Go(importBatchFunc)
}

log.Infof("Queued batch: %s", spew.Sdump(batch))
return nil
}
Expand Down
14 changes: 7 additions & 7 deletions yb-voyager/cmd/importDataFileTaskImporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestBasicTaskImport(t *testing.T) {

progressReporter := NewImportDataProgressReporter(true)
workerPool := pool.New().WithMaxGoroutines(2)
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter)
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter, nil, false)
testutils.FatalIfError(t, err)

for !taskImporter.AllBatchesSubmitted() {
Expand Down Expand Up @@ -93,7 +93,7 @@ func TestImportAllBatchesAndResume(t *testing.T) {

progressReporter := NewImportDataProgressReporter(true)
workerPool := pool.New().WithMaxGoroutines(2)
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter)
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter, nil, false)

for !taskImporter.AllBatchesSubmitted() {
err := taskImporter.ProduceAndSubmitNextBatchToWorkerPool()
Expand All @@ -109,7 +109,7 @@ func TestImportAllBatchesAndResume(t *testing.T) {
// simulate restart
progressReporter = NewImportDataProgressReporter(true)
workerPool = pool.New().WithMaxGoroutines(2)
taskImporter, err = NewFileTaskImporter(task, state, workerPool, progressReporter)
taskImporter, err = NewFileTaskImporter(task, state, workerPool, progressReporter, nil, false)
testutils.FatalIfError(t, err)

assert.Equal(t, true, taskImporter.AllBatchesSubmitted())
Expand Down Expand Up @@ -144,7 +144,7 @@ func TestTaskImportResumable(t *testing.T) {

progressReporter := NewImportDataProgressReporter(true)
workerPool := pool.New().WithMaxGoroutines(2)
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter)
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter, nil, false)
testutils.FatalIfError(t, err)

// submit 1 batch
Expand All @@ -161,7 +161,7 @@ func TestTaskImportResumable(t *testing.T) {
// simulate restart
progressReporter = NewImportDataProgressReporter(true)
workerPool = pool.New().WithMaxGoroutines(2)
taskImporter, err = NewFileTaskImporter(task, state, workerPool, progressReporter)
taskImporter, err = NewFileTaskImporter(task, state, workerPool, progressReporter, nil, false)
testutils.FatalIfError(t, err)

// submit second batch, not first batch again as it was already imported
Expand Down Expand Up @@ -203,7 +203,7 @@ func TestTaskImportResumableNoPK(t *testing.T) {

progressReporter := NewImportDataProgressReporter(true)
workerPool := pool.New().WithMaxGoroutines(2)
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter)
taskImporter, err := NewFileTaskImporter(task, state, workerPool, progressReporter, nil, false)
testutils.FatalIfError(t, err)

// submit 1 batch
Expand All @@ -220,7 +220,7 @@ func TestTaskImportResumableNoPK(t *testing.T) {
// simulate restart
progressReporter = NewImportDataProgressReporter(true)
workerPool = pool.New().WithMaxGoroutines(2)
taskImporter, err = NewFileTaskImporter(task, state, workerPool, progressReporter)
taskImporter, err = NewFileTaskImporter(task, state, workerPool, progressReporter, nil, false)
testutils.FatalIfError(t, err)

// submit second batch, not first batch again as it was already imported
Expand Down
Loading