Skip to content

Commit

Permalink
Merge branch 'main' into aneesh/colocated-picker-new
Browse files Browse the repository at this point in the history
  • Loading branch information
makalaaneesh committed Mar 3, 2025
2 parents 191c85d + 60435d2 commit dfc66ee
Show file tree
Hide file tree
Showing 4 changed files with 73 additions and 23 deletions.
11 changes: 7 additions & 4 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -715,6 +715,7 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS
// `parallelism` number of batches at a time.
batchImportPool = pool.New().WithMaxGoroutines(poolSize)
log.Infof("created batch import pool of size: %d", poolSize)
taskImporters := map[int]*FileTaskImporter{}

colocatedBatchImportPool := pool.New().WithMaxGoroutines(maxColocatedTasksInProgress)
log.Infof("created colocated batch import pool of size: %d", maxColocatedTasksInProgress)
Expand Down Expand Up @@ -750,8 +751,6 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS
}
}

taskImporters := map[int]*FileTaskImporter{}

for taskPicker.HasMoreTasks() {
task, err := taskPicker.Pick()
if err != nil {
Expand All @@ -775,7 +774,7 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS
// task could have been completed (all batches imported) OR still in progress
// in case task is done, we should inform task picker so that we stop picking that task.
log.Infof("All batches submitted for task: %s", task)
taskDone, err := taskImporter.AllBatchesImported()
taskDone, err := state.AllBatchesImported(task.FilePath, task.TableNameTup)
if err != nil {
return fmt.Errorf("check if all batches are imported: task: %v err :%w", task, err)
}
Expand All @@ -785,12 +784,16 @@ func importTasksViaTaskPicker(pendingTasks []*ImportFileTask, state *ImportDataS
if err != nil {
return fmt.Errorf("mark task as done: task: %v, err: %w", task, err)
}
state.UnregisterFileTaskImporter(taskImporter)
log.Infof("Import of task done: %s", task)
continue
} else {
// some batches are still in progress, wait for them to complete as decided by the picker.
// don't want to busy-wait, so in case of sequentialTaskPicker, we sleep.
taskPicker.WaitForTasksBatchesTobeImported()
err := taskPicker.WaitForTasksBatchesTobeImported()
if err != nil {
return fmt.Errorf("wait for tasks batches to be imported: %w", err)
}
continue
}

Expand Down
15 changes: 5 additions & 10 deletions yb-voyager/cmd/importDataFileTaskImporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,6 @@ worker pool for processing. It also maintains and updates the progress of the ta
*/
type FileTaskImporter struct {
task *ImportFileTask
state *ImportDataState
batchProducer *FileBatchProducer
importBatchArgsProto *tgtdb.ImportBatchArgs
workerPool *pool.Pool
Expand Down Expand Up @@ -69,7 +68,6 @@ func NewFileTaskImporter(task *ImportFileTask, state *ImportDataState, workerPoo

fti := &FileTaskImporter{
task: task,
state: state,
batchProducer: batchProducer,
workerPool: workerPool,
colocatedImportBatchQueue: colocatedImportBatchQueue,
Expand All @@ -80,9 +78,14 @@ func NewFileTaskImporter(task *ImportFileTask, state *ImportDataState, workerPoo
totalProgressAmount: totalProgressAmount,
currentProgressAmount: currentProgressAmount,
}
state.RegisterFileTaskImporter(fti)
return fti, nil
}

func (fti *FileTaskImporter) GetTaskID() int {
return fti.task.ID
}

// as of now, batch production and batch submission
// is done together in `SubmitNextBatch` method.
// In other words, as soon as a batch is produced, it is submitted.
Expand All @@ -92,14 +95,6 @@ func (fti *FileTaskImporter) AllBatchesSubmitted() bool {
return fti.batchProducer.Done()
}

func (fti *FileTaskImporter) AllBatchesImported() (bool, error) {
taskStatus, err := fti.state.GetFileImportState(fti.task.FilePath, fti.task.TableNameTup)
if err != nil {
return false, fmt.Errorf("getting file import state: %s", err)
}
return taskStatus == FILE_IMPORT_COMPLETED, nil
}

func (fti *FileTaskImporter) ProduceAndSubmitNextBatchToWorkerPool() error {
if fti.AllBatchesSubmitted() {
return fmt.Errorf("no more batches to submit")
Expand Down
31 changes: 26 additions & 5 deletions yb-voyager/cmd/importDataFileTaskPicker.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ type FileTaskPicker interface {
Pick() (*ImportFileTask, error)
MarkTaskAsDone(task *ImportFileTask) error
HasMoreTasks() bool
WaitForTasksBatchesTobeImported()
WaitForTasksBatchesTobeImported() error
}

/*
Expand Down Expand Up @@ -90,6 +90,7 @@ func (s *SequentialTaskPicker) Pick() (*ImportFileTask, error) {
s.inProgressTask = s.pendingTasks[0]
s.pendingTasks = s.pendingTasks[1:]
}

return s.inProgressTask, nil
}

Expand All @@ -114,11 +115,12 @@ func (s *SequentialTaskPicker) HasMoreTasks() bool {
return len(s.pendingTasks) > 0
}

func (s *SequentialTaskPicker) WaitForTasksBatchesTobeImported() {
func (s *SequentialTaskPicker) WaitForTasksBatchesTobeImported() error {
// Consider the scenario where we have a single task in progress and all batches are submitted, but not yet ingested.
// In this case as per SequentialTaskPicker's implementation, it will wait for the task to be marked as done.
// Instead of having a busy-loop where we keep checking if the task is done, we can wait for a second and then check again.
time.Sleep(time.Second * 1)
return nil
}

/*
Expand Down Expand Up @@ -155,6 +157,7 @@ type ColocatedAwareRandomTaskPicker struct {

tableTypes *utils.StructMap[sqlname.NameTuple, string] //colocated or sharded

state *ImportDataState
}

type YbTargetDBColocatedChecker interface {
Expand Down Expand Up @@ -230,6 +233,7 @@ func NewColocatedAwareRandomTaskPicker(maxTasksInProgress int, tasks []*ImportFi
maxTasksInProgress: maxTasksInProgress,
tableWisePendingTasks: tableWisePendingTasks,
tableTypes: tableTypes,
state: state,
}
if len(picker.tableWisePendingTasks.Keys()) > 0 {
err = picker.initializeChooser()
Expand Down Expand Up @@ -384,9 +388,26 @@ func (c *ColocatedAwareRandomTaskPicker) HasMoreTasks() bool {
return pendingTasks
}

func (c *ColocatedAwareRandomTaskPicker) WaitForTasksBatchesTobeImported() {
// no wait
return
func (c *ColocatedAwareRandomTaskPicker) WaitForTasksBatchesTobeImported() error {
// if for all in-progress tasks, all batches are submitted, then sleep for a bit
allTasksAllBatchesSubmitted := true

for _, task := range c.inProgressTasks {
taskAllBatchesSubmitted, err := c.state.AllBatchesSubmittedForTask(task.ID)
if err != nil {
return fmt.Errorf("checking if all batches are submitted for task: %v: %w", task, err)
}
if !taskAllBatchesSubmitted {
allTasksAllBatchesSubmitted = false
break
}
}

if allTasksAllBatchesSubmitted {
log.Infof("All batches submitted for all in-progress tasks. Sleeping")
time.Sleep(time.Millisecond * 100)
}
return nil
}

type ColocatedCappedRandomTaskPicker struct {
Expand Down
39 changes: 35 additions & 4 deletions yb-voyager/cmd/importDataState.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,17 +55,24 @@ metainfo/import_data_state/table::<table_name>/file::<base_name>:<path_hash>/
batch::<batch_num>.<offset_end>.<record_count>.<byte_count>.<state>
*/
type ImportDataState struct {
exportDir string
stateDir string
exportDir string
stateDir string
inProgressTaskImporters map[int]fileTaskImportStatusChecker // used to fetch in-memory status from FileTaskImporter
}

func NewImportDataState(exportDir string) *ImportDataState {
return &ImportDataState{
exportDir: exportDir,
stateDir: filepath.Join(exportDir, "metainfo", "import_data_state", importerRole),
exportDir: exportDir,
stateDir: filepath.Join(exportDir, "metainfo", "import_data_state", importerRole),
inProgressTaskImporters: make(map[int]fileTaskImportStatusChecker),
}
}

type fileTaskImportStatusChecker interface {
GetTaskID() int
AllBatchesSubmitted() bool
}

func (s *ImportDataState) PrepareForFileImport(filePath string, tableNameTup sqlname.NameTuple) error {
fileStateDir := s.getFileStateDir(filePath, tableNameTup)
log.Infof("Creating %q.", fileStateDir)
Expand Down Expand Up @@ -647,6 +654,30 @@ func (s *ImportDataState) GetImportedEventsStatsForTableList(tableNameTupList []
return tablesToEventCounter, nil
}

func (s *ImportDataState) RegisterFileTaskImporter(importer fileTaskImportStatusChecker) {
s.inProgressTaskImporters[importer.GetTaskID()] = importer
}

func (s *ImportDataState) UnregisterFileTaskImporter(importer fileTaskImportStatusChecker) {
delete(s.inProgressTaskImporters, importer.GetTaskID())
}

func (s *ImportDataState) AllBatchesSubmittedForTask(taskId int) (bool, error) {
taskImporter, ok := s.inProgressTaskImporters[taskId]
if !ok {
return false, fmt.Errorf("task importer with id %d not registered", taskId)
}
return taskImporter.AllBatchesSubmitted(), nil
}

func (s *ImportDataState) AllBatchesImported(filepath string, tableNameTup sqlname.NameTuple) (bool, error) {
taskStatus, err := s.GetFileImportState(filepath, tableNameTup)
if err != nil {
return false, fmt.Errorf("getting file import state: %s", err)
}
return taskStatus == FILE_IMPORT_COMPLETED, nil
}

//============================================================================

type BatchWriter struct {
Expand Down

0 comments on commit dfc66ee

Please sign in to comment.