Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

import multiple tables at same time - 1 #2191

Merged
merged 18 commits into from
Jan 22, 2025
Merged
138 changes: 5 additions & 133 deletions yb-voyager/cmd/importData.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ package cmd

import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
Expand Down Expand Up @@ -1005,145 +1004,18 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f
importBatchArgsProto := getImportBatchArgsProto(task.TableNameTup, task.FilePath)
log.Infof("Start splitting table %q: data-file: %q", task.TableNameTup, origDataFile)

err := state.PrepareForFileImport(task.FilePath, task.TableNameTup)
fileBatchProducer, err := NewFileBatchProducer(task, state)
if err != nil {
utils.ErrExit("preparing for file import: %s", err)
}
log.Infof("Collect all interrupted/remaining splits.")
pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup)
if err != nil {
utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err)
}
for _, batch := range pendingBatches {
submitBatch(batch, updateProgressFn, importBatchArgsProto)
}
if !fileFullySplit {
splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto)
}
}

func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple,
lastBatchNumber int64, lastOffset int64, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) {
log.Infof("Split data file %q: tableName=%q, largestSplit=%v, largestOffset=%v", filePath, t, lastBatchNumber, lastOffset)
batchNum := lastBatchNumber + 1
numLinesTaken := lastOffset

reader, err := dataStore.Open(filePath)
if err != nil {
utils.ErrExit("preparing reader for split generation on file: %q: %v", filePath, err)
}

dataFile, err := datafile.NewDataFile(filePath, reader, dataFileDescriptor)
if err != nil {
utils.ErrExit("open datafile: %q: %v", filePath, err)
}
defer dataFile.Close()

log.Infof("Skipping %d lines from %q", lastOffset, filePath)
err = dataFile.SkipLines(lastOffset)
if err != nil {
utils.ErrExit("skipping line for offset=%d: %v", lastOffset, err)
}

var readLineErr error = nil
var line string
var currentBytesRead int64
var batchWriter *BatchWriter
header := ""
if dataFileDescriptor.HasHeader {
header = dataFile.GetHeader()
}

// Helper function to initialize a new batchWriter
initBatchWriter := func() {
batchWriter = state.NewBatchWriter(filePath, t, batchNum)
err := batchWriter.Init()
if err != nil {
utils.ErrExit("initializing batch writer for table: %q: %s", t, err)
}
// Write the header if necessary
if header != "" && dataFileDescriptor.FileFormat == datafile.CSV {
err = batchWriter.WriteHeader(header)
if err != nil {
utils.ErrExit("writing header for table: %q: %s", t, err)
}
}
utils.ErrExit("creating file batch producer: %s", err)
}

// Function to finalize and submit the current batch
finalizeBatch := func(isLastBatch bool, offsetEnd int64, bytesInBatch int64) {
batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch)
for !fileBatchProducer.Done() {
batch, err := fileBatchProducer.NextBatch()
if err != nil {
utils.ErrExit("finalizing batch %d: %s", batchNum, err)
utils.ErrExit("getting next batch: %s", err)
}
batchWriter = nil
submitBatch(batch, updateProgressFn, importBatchArgsProto)

// Increment batchNum only if this is not the last batch
if !isLastBatch {
batchNum++
}
}

for readLineErr == nil {

if batchWriter == nil {
initBatchWriter() // Create a new batchWriter
}

line, currentBytesRead, readLineErr = dataFile.NextLine()
if readLineErr == nil || (readLineErr == io.EOF && line != "") {
// handling possible case: last dataline(i.e. EOF) but no newline char at the end
numLinesTaken += 1
}
log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, dataFile.GetBytesRead(), currentBytesRead)
if currentBytesRead > tdb.MaxBatchSizeInBytes() {
//If a row is itself larger than MaxBatchSizeInBytes erroring out
ybSpecificMsg := ""
if tconf.TargetDBType == YUGABYTEDB {
ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)"
}
utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg)
}
if line != "" {
// can't use importBatchArgsProto.Columns as to use case insenstiive column names
columnNames, _ := TableToColumnNames.Get(t)
line, err = valueConverter.ConvertRow(t, columnNames, line)
if err != nil {
utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, t.ForOutput(), filePath, err)
}

// Check if adding this record exceeds the max batch size
if batchWriter.NumRecordsWritten == batchSizeInNumRows ||
dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead

// Finalize the current batch without adding the record
finalizeBatch(false, numLinesTaken-1, dataFile.GetBytesRead()-currentBytesRead)

//carry forward the bytes to next batch
dataFile.ResetBytesRead(currentBytesRead)

// Start a new batch by calling the initBatchWriter function
initBatchWriter()
}

// Write the record to the new or current batch
err = batchWriter.WriteRecord(line)
if err != nil {
utils.ErrExit("Write to batch %d: %s", batchNum, err)
}
}

// Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0
if readLineErr == io.EOF {
finalizeBatch(true, numLinesTaken, dataFile.GetBytesRead())
dataFile.ResetBytesRead(0)
} else if readLineErr != nil {
utils.ErrExit("read line from data file: %q: %s", filePath, readLineErr)
}
}

log.Infof("splitFilesForTable: done splitting data file %q for table %q", filePath, t)
}

func executePostSnapshotImportSqls() error {
Expand Down
236 changes: 236 additions & 0 deletions yb-voyager/cmd/importDataFileBatchProducer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,236 @@
/*
Copyright (c) YugabyteDB, Inc.

Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at

http://www.apache.org/licenses/LICENSE-2.0

Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
*/
package cmd

import (
"fmt"
"io"

log "github.com/sirupsen/logrus"
"github.com/yugabyte/yb-voyager/yb-voyager/src/datafile"
"github.com/yugabyte/yb-voyager/yb-voyager/src/utils"
)

type FileBatchProducer struct {
task *ImportFileTask
state *ImportDataState

pendingBatches []*Batch //pending batches after recovery
lastBatchNumber int64 // batch number of the last batch that was produced
lastOffset int64 // file offset from where the last batch was produced, only used in recovery
fileFullySplit bool // if the file is fully split into batches
completed bool // if all batches have been produced

dataFile datafile.DataFile
header string
numLinesTaken int64 // number of lines read from the file
// line that was read from file while producing the previous batch
// but not added to the batch because adding it would breach size/row based thresholds.
lineFromPreviousBatch string
}

func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBatchProducer, error) {
err := state.PrepareForFileImport(task.FilePath, task.TableNameTup)
if err != nil {
return nil, fmt.Errorf("preparing for file import: %s", err)
}
pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup)
if err != nil {
return nil, fmt.Errorf("recovering state for table: %q: %s", task.TableNameTup, err)
}
completed := len(pendingBatches) == 0 && fileFullySplit

return &FileBatchProducer{
task: task,
state: state,
pendingBatches: pendingBatches,
lastBatchNumber: lastBatchNumber,
lastOffset: lastOffset,
fileFullySplit: fileFullySplit,
completed: completed,
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: completed: len(pendingBatches) == 0 && fileFullySplit

numLinesTaken: lastOffset,
}, nil
}

func (p *FileBatchProducer) Done() bool {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: p --> producer ?

return p.completed
}

func (p *FileBatchProducer) NextBatch() (*Batch, error) {
if p.Done() {
return nil, fmt.Errorf("already completed producing all batches")
}
if len(p.pendingBatches) > 0 {
batch := p.pendingBatches[0]
p.pendingBatches = p.pendingBatches[1:]
// file is fully split and returning the last batch, so mark the producer as completed
if len(p.pendingBatches) == 0 && p.fileFullySplit {
p.completed = true
}
Comment on lines +77 to +82
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Here we are setting as complete before last batch is processed.
should we be setting this when this is actually no batch available futher?

Suggesting something like this::

if len(p.pendingBatches) > 0 {
		batch := p.pendingBatches[0]
		p.pendingBatches = p.pendingBatches[1:]
		return batch, nil
	} else if len(p.pendingBatches) == 0 && p.fileFullySplit {
		p.completed = true
	}

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@makalaaneesh this one might be important.

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@sanyamsinghal Since that is the last batch that we are returning (because file is fully split and we are picking the last pending batch, no batches are further available, so it made sense to mark it as done.

return batch, nil
}

return p.produceNextBatch()
}

func (p *FileBatchProducer) produceNextBatch() (*Batch, error) {
if p.dataFile == nil {
err := p.openDataFile()
if err != nil {
return nil, err
}
}

var readLineErr error
var line string
var currentBytesRead int64
batchNum := p.lastBatchNumber + 1

batchWriter, err := p.newBatchWriter()
if err != nil {
return nil, err
}
// in the previous batch, a line was read from file but not added to the batch
// because adding it would breach size/row based thresholds.
// Add that line to the current batch.
if p.lineFromPreviousBatch != "" {
err = batchWriter.WriteRecord(p.lineFromPreviousBatch)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Add comment for explaining about this lineFromPreviousBatch

if err != nil {
return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err)
}
p.lineFromPreviousBatch = ""
}

for readLineErr == nil {

line, currentBytesRead, readLineErr = p.dataFile.NextLine()
if readLineErr == nil || (readLineErr == io.EOF && line != "") {
// handling possible case: last dataline(i.e. EOF) but no newline char at the end
p.numLinesTaken += 1
}
log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, p.dataFile.GetBytesRead(), currentBytesRead)
if currentBytesRead > tdb.MaxBatchSizeInBytes() {
//If a row is itself larger than MaxBatchSizeInBytes erroring out
ybSpecificMsg := ""
if tconf.TargetDBType == YUGABYTEDB {
ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)"
}
return nil, fmt.Errorf("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg)
}
if line != "" {
// can't use importBatchArgsProto.Columns as to use case insenstiive column names
columnNames, _ := TableToColumnNames.Get(p.task.TableNameTup)
line, err = valueConverter.ConvertRow(p.task.TableNameTup, columnNames, line)
if err != nil {
return nil, fmt.Errorf("transforming line number=%d for table: %q in file %s: %s", p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, err)
}

// Check if adding this record exceeds the max batch size
if batchWriter.NumRecordsWritten == batchSizeInNumRows ||
p.dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead

// Finalize the current batch without adding the record
batch, err := p.finalizeBatch(batchWriter, false, p.numLinesTaken-1, p.dataFile.GetBytesRead()-currentBytesRead)
if err != nil {
return nil, err
}

//carry forward the bytes to next batch
p.dataFile.ResetBytesRead(currentBytesRead)
p.lineFromPreviousBatch = line

return batch, nil
}

// Write the record to the current batch
err = batchWriter.WriteRecord(line)
if err != nil {
return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err)
}
}

// Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0
if readLineErr == io.EOF {
batch, err := p.finalizeBatch(batchWriter, true, p.numLinesTaken, p.dataFile.GetBytesRead())
if err != nil {
return nil, err
}

p.completed = true
// TODO: resetting bytes read to 0 is technically not correct if we are adding a header
// to each batch file. Currently header bytes are only considered in the first batch.
// For the rest of the batches, header bytes are ignored, since we are resetting it to 0.
p.dataFile.ResetBytesRead(0)
return batch, nil
} else if readLineErr != nil {
return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr)
}
}
// ideally should not reach here
return nil, fmt.Errorf("could not produce next batch: err: %w", readLineErr)
}

func (p *FileBatchProducer) openDataFile() error {
reader, err := dataStore.Open(p.task.FilePath)
if err != nil {
return fmt.Errorf("preparing reader for split generation on file: %q: %v", p.task.FilePath, err)
}

dataFile, err := datafile.NewDataFile(p.task.FilePath, reader, dataFileDescriptor)

if err != nil {
return fmt.Errorf("open datafile: %q: %v", p.task.FilePath, err)
}
p.dataFile = dataFile

log.Infof("Skipping %d lines from %q", p.lastOffset, p.task.FilePath)
err = dataFile.SkipLines(p.lastOffset)
if err != nil {
return fmt.Errorf("skipping line for offset=%d: %v", p.lastOffset, err)
}
if dataFileDescriptor.HasHeader {
p.header = dataFile.GetHeader()
}
return nil
}

func (p *FileBatchProducer) newBatchWriter() (*BatchWriter, error) {
batchNum := p.lastBatchNumber + 1
batchWriter := p.state.NewBatchWriter(p.task.FilePath, p.task.TableNameTup, batchNum)
err := batchWriter.Init()
if err != nil {
return nil, fmt.Errorf("initializing batch writer for table: %q: %s", p.task.TableNameTup, err)
}
// Write the header if necessary
if p.header != "" && dataFileDescriptor.FileFormat == datafile.CSV {
err = batchWriter.WriteHeader(p.header)
if err != nil {
utils.ErrExit("writing header for table: %q: %s", p.task.TableNameTup, err)
}
}
return batchWriter, nil
}

func (p *FileBatchProducer) finalizeBatch(batchWriter *BatchWriter, isLastBatch bool, offsetEnd int64, bytesInBatch int64) (*Batch, error) {
batchNum := p.lastBatchNumber + 1
batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch)
if err != nil {
utils.ErrExit("finalizing batch %d: %s", batchNum, err)
}
batchWriter = nil
p.lastBatchNumber = batchNum
return batch, nil
}
Loading
Loading