From fa29368299b3c846ad5ff03cfba88cd35562365b Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 15 Jan 2025 13:41:19 +0530 Subject: [PATCH 01/17] wip --- yb-voyager/cmd/importDataFileBatchProducer.go | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 yb-voyager/cmd/importDataFileBatchProducer.go diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go new file mode 100644 index 0000000000..ca6335035a --- /dev/null +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -0,0 +1,154 @@ +/* +Copyright (c) YugabyteDB, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package cmd + +import ( + "fmt" + + log "github.com/sirupsen/logrus" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils" +) + +type FileBatchProducer struct { + task *ImportFileTask + state *ImportDataState + + pendingBatches []*Batch + lastBatchNumber int64 + lastOffset int64 + fileFullySplit bool + completed bool + + datafile *datafile.DataFile + header string +} + +func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBatchProducer, error) { + err := state.PrepareForFileImport(task.FilePath, task.TableNameTup) + if err != nil { + return nil, fmt.Errorf("preparing for file import: %s", err) + } + pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) + if err != nil { + return nil, fmt.Errorf("recovering state for table: %q: %s", task.TableNameTup, err) + } + var completed bool + if len(pendingBatches) == 0 && fileFullySplit { + completed = true + } + + return &FileBatchProducer{ + task: task, + state: state, + pendingBatches: pendingBatches, + lastBatchNumber: lastBatchNumber, + lastOffset: lastOffset, + fileFullySplit: fileFullySplit, + completed: completed, + }, nil +} + +func (p *FileBatchProducer) Done() bool { + return p.completed +} + +func (p *FileBatchProducer) NextBatch() (*Batch, error) { + if len(p.pendingBatches) > 0 { + batch := p.pendingBatches[0] + p.pendingBatches = p.pendingBatches[1:] + // file is fully split and returning the last batch, so mark the producer as completed + if len(p.pendingBatches) == 0 && p.fileFullySplit { + p.completed = true + } + return batch, nil + } + + return p.produceNextBatch() +} + +func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { + if p.datafile == nil { + err := p.openDataFile() + if err != nil { + return nil, err + } + } + + batchWriter, err := p.newBatchWriter() + if err != nil { + return nil, err + } + +} + +func (p *FileBatchProducer) openDataFile() error { + reader, err := dataStore.Open(p.task.FilePath) + if err != nil { + return fmt.Errorf("preparing reader for split generation on file: %q: %v", p.task.FilePath, err) + } + + dataFile, err := datafile.NewDataFile(p.task.FilePath, reader, dataFileDescriptor) + + if err != nil { + return fmt.Errorf("open datafile: %q: %v", p.task.FilePath, err) + } + p.datafile = &dataFile + + log.Infof("Skipping %d lines from %q", p.lastOffset, p.task.FilePath) + err = dataFile.SkipLines(p.lastOffset) + if err != nil { + return fmt.Errorf("skipping line for offset=%d: %v", p.lastOffset, err) + } + if dataFileDescriptor.HasHeader { + p.header = dataFile.GetHeader() + } + return nil +} + +func (p *FileBatchProducer) newBatchWriter() (*BatchWriter, error) { + batchNum := p.lastBatchNumber + 1 + batchWriter := p.state.NewBatchWriter(p.task.FilePath, p.task.TableNameTup, batchNum) + err := batchWriter.Init() + if err != nil { + return nil, fmt.Errorf("initializing batch writer for table: %q: %s", p.task.TableNameTup, err) + } + // Write the header if necessary + if p.header != "" && dataFileDescriptor.FileFormat == datafile.CSV { + err = batchWriter.WriteHeader(p.header) + if err != nil { + utils.ErrExit("writing header for table: %q: %s", p.task.TableNameTup, err) + } + } + return batchWriter, nil +} + +func (p *FileBatchProducer) finalizeBatch(batchWriter *BatchWriter, isLastBatch bool, offsetEnd int64, bytesInBatch int64) (*Batch, error) { + batchNum := p.lastBatchNumber + 1 + batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch) + if err != nil { + utils.ErrExit("finalizing batch %d: %s", batchNum, err) + } + batchWriter = nil + p.lastBatchNumber = batchNum + return batch, nil + // submitBatch(batch, updateProgressFn, importBatchArgsProto) + + // Increment batchNum only if this is not the last batch + // if !isLastBatch { + // batchNum++ + // } +} From ec54ea6c27ae2b6585c2861974d6f9283628c75f Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 15 Jan 2025 14:55:55 +0530 Subject: [PATCH 02/17] base logic for producing next batch --- yb-voyager/cmd/importDataFileBatchProducer.go | 74 ++++++++++++++++++- 1 file changed, 71 insertions(+), 3 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go index ca6335035a..db21968dbf 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer.go +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -17,6 +17,7 @@ package cmd import ( "fmt" + "io" log "github.com/sirupsen/logrus" "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" @@ -33,7 +34,7 @@ type FileBatchProducer struct { fileFullySplit bool completed bool - datafile *datafile.DataFile + dataFile datafile.DataFile header string } @@ -81,7 +82,7 @@ func (p *FileBatchProducer) NextBatch() (*Batch, error) { } func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { - if p.datafile == nil { + if p.dataFile == nil { err := p.openDataFile() if err != nil { return nil, err @@ -93,6 +94,73 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { return nil, err } + var readLineErr error + var line string + var currentBytesRead int64 + batchNum := p.lastBatchNumber + 1 + + for readLineErr == nil { + + line, currentBytesRead, readLineErr = p.dataFile.NextLine() + if readLineErr == nil || (readLineErr == io.EOF && line != "") { + // handling possible case: last dataline(i.e. EOF) but no newline char at the end + numLinesTaken += 1 + } + log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, p.dataFile.GetBytesRead(), currentBytesRead) + if currentBytesRead > tdb.MaxBatchSizeInBytes() { + //If a row is itself larger than MaxBatchSizeInBytes erroring out + ybSpecificMsg := "" + if tconf.TargetDBType == YUGABYTEDB { + ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)" + } + utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) + } + if line != "" { + // can't use importBatchArgsProto.Columns as to use case insenstiive column names + columnNames, _ := TableToColumnNames.Get(t) + line, err = valueConverter.ConvertRow(t, columnNames, line) + if err != nil { + utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, err) + } + + // Check if adding this record exceeds the max batch size + if batchWriter.NumRecordsWritten == batchSizeInNumRows || + p.dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead + + // Finalize the current batch without adding the record + batch, err := p.finalizeBatch(batchWriter, false, numLinesTaken-1, p.dataFile.GetBytesRead()-currentBytesRead) + if err != nil { + return nil, err + } + + //carry forward the bytes to next batch + p.dataFile.ResetBytesRead(currentBytesRead) + + // Start a new batch by calling the initBatchWriter function + initBatchWriter() + } + + // Write the record to the new or current batch + err = batchWriter.WriteRecord(line) + if err != nil { + utils.ErrExit("Write to batch %d: %s", batchNum, err) + } + } + + // Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0 + if readLineErr == io.EOF { + batch, err := p.finalizeBatch(batchWriter, true, numLinesTaken, p.dataFile.GetBytesRead()) + if err != nil { + return nil, err + } + + p.completed = true + p.dataFile.ResetBytesRead(0) + return batch, nil + } else if readLineErr != nil { + return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr) + } + } } func (p *FileBatchProducer) openDataFile() error { @@ -106,7 +174,7 @@ func (p *FileBatchProducer) openDataFile() error { if err != nil { return fmt.Errorf("open datafile: %q: %v", p.task.FilePath, err) } - p.datafile = &dataFile + p.dataFile = dataFile log.Infof("Skipping %d lines from %q", p.lastOffset, p.task.FilePath) err = dataFile.SkipLines(p.lastOffset) From a4aabc8be761f4ccdad169529a2f8f46e6f41a32 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 15 Jan 2025 16:30:53 +0530 Subject: [PATCH 03/17] store line from previous batch --- yb-voyager/cmd/importDataFileBatchProducer.go | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go index db21968dbf..e9872341ab 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer.go +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -34,8 +34,10 @@ type FileBatchProducer struct { fileFullySplit bool completed bool - dataFile datafile.DataFile - header string + dataFile datafile.DataFile + header string + numLinesTaken int64 + lineFromPreviousBatch string } func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBatchProducer, error) { @@ -60,6 +62,7 @@ func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBa lastOffset: lastOffset, fileFullySplit: fileFullySplit, completed: completed, + numLinesTaken: lastOffset, }, nil } @@ -68,6 +71,9 @@ func (p *FileBatchProducer) Done() bool { } func (p *FileBatchProducer) NextBatch() (*Batch, error) { + if p.Done() { + return nil, fmt.Errorf("already done") + } if len(p.pendingBatches) > 0 { batch := p.pendingBatches[0] p.pendingBatches = p.pendingBatches[1:] @@ -89,22 +95,29 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { } } - batchWriter, err := p.newBatchWriter() - if err != nil { - return nil, err - } - var readLineErr error var line string var currentBytesRead int64 batchNum := p.lastBatchNumber + 1 + batchWriter, err := p.newBatchWriter() + if err != nil { + return nil, err + } + if p.lineFromPreviousBatch != "" { + err = batchWriter.WriteRecord(p.lineFromPreviousBatch) + if err != nil { + return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err) + } + p.lineFromPreviousBatch = "" + } + for readLineErr == nil { line, currentBytesRead, readLineErr = p.dataFile.NextLine() if readLineErr == nil || (readLineErr == io.EOF && line != "") { // handling possible case: last dataline(i.e. EOF) but no newline char at the end - numLinesTaken += 1 + p.numLinesTaken += 1 } log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, p.dataFile.GetBytesRead(), currentBytesRead) if currentBytesRead > tdb.MaxBatchSizeInBytes() { @@ -113,14 +126,14 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { if tconf.TargetDBType == YUGABYTEDB { ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)" } - utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) + return nil, fmt.Errorf("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) } if line != "" { // can't use importBatchArgsProto.Columns as to use case insenstiive column names - columnNames, _ := TableToColumnNames.Get(t) - line, err = valueConverter.ConvertRow(t, columnNames, line) + columnNames, _ := TableToColumnNames.Get(p.task.TableNameTup) + line, err = valueConverter.ConvertRow(p.task.TableNameTup, columnNames, line) if err != nil { - utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, err) + return nil, fmt.Errorf("transforming line number=%d for table: %q in file %s: %s", p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, err) } // Check if adding this record exceeds the max batch size @@ -128,28 +141,30 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { p.dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead // Finalize the current batch without adding the record - batch, err := p.finalizeBatch(batchWriter, false, numLinesTaken-1, p.dataFile.GetBytesRead()-currentBytesRead) + batch, err := p.finalizeBatch(batchWriter, false, p.numLinesTaken-1, p.dataFile.GetBytesRead()-currentBytesRead) if err != nil { return nil, err } //carry forward the bytes to next batch p.dataFile.ResetBytesRead(currentBytesRead) + p.lineFromPreviousBatch = line // Start a new batch by calling the initBatchWriter function - initBatchWriter() + // initBatchWriter() + return batch, nil } // Write the record to the new or current batch err = batchWriter.WriteRecord(line) if err != nil { - utils.ErrExit("Write to batch %d: %s", batchNum, err) + return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err) } } // Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0 if readLineErr == io.EOF { - batch, err := p.finalizeBatch(batchWriter, true, numLinesTaken, p.dataFile.GetBytesRead()) + batch, err := p.finalizeBatch(batchWriter, true, p.numLinesTaken, p.dataFile.GetBytesRead()) if err != nil { return nil, err } @@ -161,6 +176,7 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr) } } + return nil, fmt.Errorf("unexpected") } func (p *FileBatchProducer) openDataFile() error { From 440a5f3be9ca17374247f2c081ea972d6eacf6b7 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 11:45:52 +0530 Subject: [PATCH 04/17] test --- .../cmd/importDataFileBatchProducer_test.go | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 yb-voyager/cmd/importDataFileBatchProducer_test.go diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go new file mode 100644 index 0000000000..76feebc972 --- /dev/null +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -0,0 +1,116 @@ +/* +Copyright (c) YugabyteDB, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package cmd + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/yugabyte/yb-voyager/yb-voyager/src/constants" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datastore" + "github.com/yugabyte/yb-voyager/yb-voyager/src/dbzm" + "github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" +) + +type DummyTdb struct { + tgtdb.TargetYugabyteDB +} + +func (t *DummyTdb) MaxBatchSizeInBytes() int64 { + return 1024 +} + +func createTempFile(dir string, fileContents string) (string, error) { + // Create a temporary file + file, err := os.CreateTemp(dir, "temp-*.txt") + if err != nil { + return "", err + } + defer file.Close() + + // Write some text to the file + _, err = file.WriteString(fileContents) + if err != nil { + return "", err + } + + return file.Name(), nil +} + +func TestBasicFileBatchProducer(t *testing.T) { + fileContents := `id,val +1, "hello"` + + exportDir, err := os.MkdirTemp("/tmp", "export-dir-*") + assert.NoError(t, err) + defer os.RemoveAll(fmt.Sprintf("%s/", exportDir)) + dataDir, err := os.MkdirTemp("/tmp", "data-dir-*") + assert.NoError(t, err) + defer os.RemoveAll(fmt.Sprintf("%s/", dataDir)) + tempFile, err := createTempFile(dataDir, fileContents) + assert.NoError(t, err) + + CreateMigrationProjectIfNotExists(constants.POSTGRESQL, exportDir) + tdb = &DummyTdb{} + valueConverter = &dbzm.NoOpValueConverter{} + dataStore = datastore.NewDataStore(dataDir) + batchSizeInNumRows = 2 + dataFileDescriptor = &datafile.Descriptor{ + FileFormat: "csv", + Delimiter: ",", + HasHeader: true, + ExportDir: exportDir, + QuoteChar: '"', + EscapeChar: '\\', + NullString: "NULL", + // DataFileList: []*FileEntry{ + // { + // FilePath: "file.csv", // Use relative path for testing absolute path handling. + // TableName: "public.my_table", + // RowCount: 100, + // FileSize: 2048, + // }, + // }, + // TableNameToExportedColumns: map[string][]string{ + // "public.my_table": {"id", "name", "age"}, + // }, + } + + sourceName := sqlname.NewObjectName(constants.POSTGRESQL, "public", "public", "test_table") + tableNameTup := sqlname.NameTuple{SourceName: sourceName, CurrentName: sourceName} + task := &ImportFileTask{ + ID: 1, + FilePath: tempFile, + TableNameTup: tableNameTup, + RowCount: 1, + } + + state := NewImportDataState(exportDir) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + assert.Equal(t, int64(1), batch.RecordCount) +} From c14e69ea6e5e4c449a63ff1c28d0cfdc1c2ed77a Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 12:54:30 +0530 Subject: [PATCH 05/17] rewrite test --- .../cmd/importDataFileBatchProducer_test.go | 76 +++++++++++-------- 1 file changed, 45 insertions(+), 31 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 76feebc972..9ddc10a715 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -29,11 +29,11 @@ import ( "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" ) -type DummyTdb struct { +type dummyTDB struct { tgtdb.TargetYugabyteDB } -func (t *DummyTdb) MaxBatchSizeInBytes() int64 { +func (t *dummyTDB) MaxBatchSizeInBytes() int64 { return 1024 } @@ -54,46 +54,44 @@ func createTempFile(dir string, fileContents string) (string, error) { return file.Name(), nil } -func TestBasicFileBatchProducer(t *testing.T) { - fileContents := `id,val -1, "hello"` +func setupDependenciesForTest(batchSize int64) (string, string, *ImportDataState, error) { + lexportDir, err := os.MkdirTemp("/tmp", "export-dir-*") + if err != nil { + return "", "", nil, err + } - exportDir, err := os.MkdirTemp("/tmp", "export-dir-*") - assert.NoError(t, err) - defer os.RemoveAll(fmt.Sprintf("%s/", exportDir)) - dataDir, err := os.MkdirTemp("/tmp", "data-dir-*") - assert.NoError(t, err) - defer os.RemoveAll(fmt.Sprintf("%s/", dataDir)) - tempFile, err := createTempFile(dataDir, fileContents) - assert.NoError(t, err) + ldataDir, err := os.MkdirTemp("/tmp", "data-dir-*") + if err != nil { + return "", "", nil, err + } - CreateMigrationProjectIfNotExists(constants.POSTGRESQL, exportDir) - tdb = &DummyTdb{} + CreateMigrationProjectIfNotExists(constants.POSTGRESQL, lexportDir) + tdb = &dummyTDB{} valueConverter = &dbzm.NoOpValueConverter{} - dataStore = datastore.NewDataStore(dataDir) - batchSizeInNumRows = 2 + dataStore = datastore.NewDataStore(ldataDir) + + batchSizeInNumRows = batchSize + + state := NewImportDataState(lexportDir) + return ldataDir, lexportDir, state, nil +} + +func setupFileForTest(lexportDir string, fileContents string, dir string, tableName string) (string, *ImportFileTask, error) { dataFileDescriptor = &datafile.Descriptor{ FileFormat: "csv", Delimiter: ",", HasHeader: true, - ExportDir: exportDir, + ExportDir: lexportDir, QuoteChar: '"', EscapeChar: '\\', NullString: "NULL", - // DataFileList: []*FileEntry{ - // { - // FilePath: "file.csv", // Use relative path for testing absolute path handling. - // TableName: "public.my_table", - // RowCount: 100, - // FileSize: 2048, - // }, - // }, - // TableNameToExportedColumns: map[string][]string{ - // "public.my_table": {"id", "name", "age"}, - // }, + } + tempFile, err := createTempFile(dir, fileContents) + if err != nil { + return "", nil, err } - sourceName := sqlname.NewObjectName(constants.POSTGRESQL, "public", "public", "test_table") + sourceName := sqlname.NewObjectName(constants.POSTGRESQL, "public", "public", tableName) tableNameTup := sqlname.NameTuple{SourceName: sourceName, CurrentName: sourceName} task := &ImportFileTask{ ID: 1, @@ -101,8 +99,24 @@ func TestBasicFileBatchProducer(t *testing.T) { TableNameTup: tableNameTup, RowCount: 1, } + return tempFile, task, nil +} + +func TestBasicFileBatchProducer(t *testing.T) { + ldataDir, lexportDir, state, err := setupDependenciesForTest(2) + assert.NoError(t, err) - state := NewImportDataState(exportDir) + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) batchproducer, err := NewFileBatchProducer(task, state) assert.NoError(t, err) From a056385e77f7daeb9f69a47ffec638ff566c7e23 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 12:57:23 +0530 Subject: [PATCH 06/17] minor fix --- yb-voyager/cmd/importDataFileBatchProducer_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 9ddc10a715..12d3a6678f 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -30,11 +30,12 @@ import ( ) type dummyTDB struct { + maxSizeBytes int64 tgtdb.TargetYugabyteDB } -func (t *dummyTDB) MaxBatchSizeInBytes() int64 { - return 1024 +func (d *dummyTDB) MaxBatchSizeInBytes() int64 { + return d.maxSizeBytes } func createTempFile(dir string, fileContents string) (string, error) { @@ -54,7 +55,7 @@ func createTempFile(dir string, fileContents string) (string, error) { return file.Name(), nil } -func setupDependenciesForTest(batchSize int64) (string, string, *ImportDataState, error) { +func setupDependenciesForTest(batchSizeRows int64, batchSizeBytes int64) (string, string, *ImportDataState, error) { lexportDir, err := os.MkdirTemp("/tmp", "export-dir-*") if err != nil { return "", "", nil, err @@ -66,11 +67,11 @@ func setupDependenciesForTest(batchSize int64) (string, string, *ImportDataState } CreateMigrationProjectIfNotExists(constants.POSTGRESQL, lexportDir) - tdb = &dummyTDB{} + tdb = &dummyTDB{maxSizeBytes: batchSizeBytes} valueConverter = &dbzm.NoOpValueConverter{} dataStore = datastore.NewDataStore(ldataDir) - batchSizeInNumRows = batchSize + batchSizeInNumRows = batchSizeRows state := NewImportDataState(lexportDir) return ldataDir, lexportDir, state, nil @@ -103,7 +104,7 @@ func setupFileForTest(lexportDir string, fileContents string, dir string, tableN } func TestBasicFileBatchProducer(t *testing.T) { - ldataDir, lexportDir, state, err := setupDependenciesForTest(2) + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) assert.NoError(t, err) if ldataDir != "" { @@ -127,4 +128,5 @@ func TestBasicFileBatchProducer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, batch) assert.Equal(t, int64(1), batch.RecordCount) + assert.True(t, batchproducer.Done()) } From c45b6a3213fcee700918285f3320f61bef92cf9e Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:11:31 +0530 Subject: [PATCH 07/17] more tests --- .../cmd/importDataFileBatchProducer_test.go | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 12d3a6678f..be4c4f59d3 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -130,3 +130,123 @@ func TestBasicFileBatchProducer(t *testing.T) { assert.Equal(t, int64(1), batch.RecordCount) assert.True(t, batchproducer.Done()) } + +func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { + // max batch size in rows is 2 + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello" +2, "world" +3, "foo" +4, "bar"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + var batches []*Batch + for !batchproducer.Done() { + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + batches = append(batches, batch) + } + + // 2 batches should be produced + assert.Equal(t, 2, len(batches)) + // each of length 2 + assert.Equal(t, int64(2), batches[0].RecordCount) + assert.Equal(t, int64(2), batches[1].RecordCount) +} + +func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { + // max batch size in size is 25 bytes + ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, 25) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + // each row exccept header is 10 bytes + fileContents := `id,val +1, "abcde" +2, "ghijk" +3, "mnopq" +4, "stuvw"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + var batches []*Batch + for !batchproducer.Done() { + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + batches = append(batches, batch) + } + + // 3 batches should be produced + // while calculating for the first batch, the header is also considered + assert.Equal(t, 3, len(batches)) + // each of length 2 + assert.Equal(t, int64(1), batches[0].RecordCount) + assert.Equal(t, int64(2), batches[1].RecordCount) + assert.Equal(t, int64(1), batches[2].RecordCount) +} + +func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *testing.T) { + // max batch size in size is 25 bytes + ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, 25) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + // 3rd row is greater than max batch size + fileContents := `id,val +1, "abcdef" +2, "ghijk" +3, "mnopq1234567899876543" +4, "stuvw"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + // 1st batch is fine. + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + assert.Equal(t, int64(1), batch.RecordCount) + + // 2nd batch should throw error + _, err = batchproducer.NextBatch() + assert.ErrorContains(t, err, "larger than max batch size") +} From a78387ef461f2e0aa3860ff515cec36e9d212441 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:17:50 +0530 Subject: [PATCH 08/17] batch value verify --- .../cmd/importDataFileBatchProducer_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index be4c4f59d3..d9ed8a6bcf 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -168,7 +168,14 @@ func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { assert.Equal(t, 2, len(batches)) // each of length 2 assert.Equal(t, int64(2), batches[0].RecordCount) + batchContents, err := os.ReadFile(batches[0].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n1, \"hello\"\n2, \"world\"", string(batchContents)) + assert.Equal(t, int64(2), batches[1].RecordCount) + batchContents, err = os.ReadFile(batches[1].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n3, \"foo\"\n4, \"bar\"", string(batchContents)) } func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { @@ -210,8 +217,19 @@ func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { assert.Equal(t, 3, len(batches)) // each of length 2 assert.Equal(t, int64(1), batches[0].RecordCount) + batchContents, err := os.ReadFile(batches[0].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n1, \"abcde\"", string(batchContents)) + assert.Equal(t, int64(2), batches[1].RecordCount) + batchContents, err = os.ReadFile(batches[1].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n2, \"ghijk\"\n3, \"mnopq\"", string(batchContents)) + assert.Equal(t, int64(1), batches[2].RecordCount) + batchContents, err = os.ReadFile(batches[2].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n4, \"stuvw\"", string(batchContents)) } func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *testing.T) { From 65c76456b5a57da556001a6c34ceeb0df4ca441f Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:19:52 +0530 Subject: [PATCH 09/17] assert less than --- yb-voyager/cmd/importDataFileBatchProducer_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index d9ed8a6bcf..8fe9cb4b9b 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -180,7 +180,8 @@ func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { // max batch size in size is 25 bytes - ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, 25) + maxBatchSizeBytes := int64(25) + ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, maxBatchSizeBytes) assert.NoError(t, err) if ldataDir != "" { @@ -217,16 +218,19 @@ func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { assert.Equal(t, 3, len(batches)) // each of length 2 assert.Equal(t, int64(1), batches[0].RecordCount) + assert.LessOrEqual(t, batches[0].ByteCount, maxBatchSizeBytes) batchContents, err := os.ReadFile(batches[0].GetFilePath()) assert.NoError(t, err) assert.Equal(t, "id,val\n1, \"abcde\"", string(batchContents)) assert.Equal(t, int64(2), batches[1].RecordCount) + assert.LessOrEqual(t, batches[1].ByteCount, maxBatchSizeBytes) batchContents, err = os.ReadFile(batches[1].GetFilePath()) assert.NoError(t, err) assert.Equal(t, "id,val\n2, \"ghijk\"\n3, \"mnopq\"", string(batchContents)) assert.Equal(t, int64(1), batches[2].RecordCount) + assert.LessOrEqual(t, batches[2].ByteCount, maxBatchSizeBytes) batchContents, err = os.ReadFile(batches[2].GetFilePath()) assert.NoError(t, err) assert.Equal(t, "id,val\n4, \"stuvw\"", string(batchContents)) From a9a294c832fad8feee0039ef081a8a966f0a741a Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:33:16 +0530 Subject: [PATCH 10/17] resumable test --- .../cmd/importDataFileBatchProducer_test.go | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 8fe9cb4b9b..ed535c7497 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -20,6 +20,7 @@ import ( "os" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/yugabyte/yb-voyager/yb-voyager/src/constants" "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" @@ -272,3 +273,59 @@ func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *tes _, err = batchproducer.NextBatch() assert.ErrorContains(t, err, "larger than max batch size") } + +func TestFileBatchProducerResumable(t *testing.T) { + // max batch size in rows is 2 + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello" +2, "world" +3, "foo" +4, "bar"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + assert.False(t, batchproducer.Done()) + + // generate one batch + batch1, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch1) + assert.Equal(t, int64(2), batch1.RecordCount) + + // simulate a crash and recover + batchproducer, err = NewFileBatchProducer(task, state) + assert.NoError(t, err) + assert.False(t, batchproducer.Done()) + + // state should have recovered that one batch + assert.Equal(t, 1, len(batchproducer.pendingBatches)) + assert.True(t, cmp.Equal(batch1, batchproducer.pendingBatches[0])) + + // verify that it picks up from pendingBatches + // instead of procing a new batch. + batch1Recovered, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch1Recovered) + assert.True(t, cmp.Equal(batch1, batch1Recovered)) + assert.Equal(t, 0, len(batchproducer.pendingBatches)) + assert.False(t, batchproducer.Done()) + + // get final batch + batch2, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch2) + assert.Equal(t, int64(2), batch2.RecordCount) + assert.True(t, batchproducer.Done()) +} From 1f32f7a6938f1647803d3315115ffbdda818d406 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:44:53 +0530 Subject: [PATCH 11/17] import data change to use filebatchproducer --- yb-voyager/cmd/importData.go | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 45050cf615..1d44f4d0f7 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -1010,17 +1010,30 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f if err != nil { utils.ErrExit("preparing for file import: %s", err) } - log.Infof("Collect all interrupted/remaining splits.") - pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) + + fileBatchProducer, err := NewFileBatchProducer(task, state) if err != nil { - utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err) + utils.ErrExit("creating file batch producer: %s", err) } - for _, batch := range pendingBatches { + + for !fileBatchProducer.Done() { + batch, err := fileBatchProducer.NextBatch() + if err != nil { + utils.ErrExit("getting next batch: %s", err) + } submitBatch(batch, updateProgressFn, importBatchArgsProto) } - if !fileFullySplit { - splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto) - } + // log.Infof("Collect all interrupted/remaining splits.") + // pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) + // if err != nil { + // utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err) + // } + // for _, batch := range pendingBatches { + // submitBatch(batch, updateProgressFn, importBatchArgsProto) + // } + // if !fileFullySplit { + // splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto) + // } } func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple, From cbc1ede3c7fac201111b00fe752bb959748b934a Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:54:10 +0530 Subject: [PATCH 12/17] unit tag --- yb-voyager/cmd/importDataFileBatchProducer_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index ed535c7497..852dccee60 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -1,3 +1,5 @@ +//go:build unit + /* Copyright (c) YugabyteDB, Inc. From 943bd22b6d946ddc219ffe7bb7014e3ab52dfb82 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Mon, 20 Jan 2025 11:13:32 +0530 Subject: [PATCH 13/17] cleanup --- yb-voyager/cmd/importData.go | 136 ------------------ yb-voyager/cmd/importDataFileBatchProducer.go | 11 +- 2 files changed, 3 insertions(+), 144 deletions(-) diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 1d44f4d0f7..09a48e01d8 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -17,7 +17,6 @@ package cmd import ( "fmt" - "io" "os" "os/exec" "path/filepath" @@ -1023,141 +1022,6 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f } submitBatch(batch, updateProgressFn, importBatchArgsProto) } - // log.Infof("Collect all interrupted/remaining splits.") - // pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) - // if err != nil { - // utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err) - // } - // for _, batch := range pendingBatches { - // submitBatch(batch, updateProgressFn, importBatchArgsProto) - // } - // if !fileFullySplit { - // splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto) - // } -} - -func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple, - lastBatchNumber int64, lastOffset int64, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) { - log.Infof("Split data file %q: tableName=%q, largestSplit=%v, largestOffset=%v", filePath, t, lastBatchNumber, lastOffset) - batchNum := lastBatchNumber + 1 - numLinesTaken := lastOffset - - reader, err := dataStore.Open(filePath) - if err != nil { - utils.ErrExit("preparing reader for split generation on file: %q: %v", filePath, err) - } - - dataFile, err := datafile.NewDataFile(filePath, reader, dataFileDescriptor) - if err != nil { - utils.ErrExit("open datafile: %q: %v", filePath, err) - } - defer dataFile.Close() - - log.Infof("Skipping %d lines from %q", lastOffset, filePath) - err = dataFile.SkipLines(lastOffset) - if err != nil { - utils.ErrExit("skipping line for offset=%d: %v", lastOffset, err) - } - - var readLineErr error = nil - var line string - var currentBytesRead int64 - var batchWriter *BatchWriter - header := "" - if dataFileDescriptor.HasHeader { - header = dataFile.GetHeader() - } - - // Helper function to initialize a new batchWriter - initBatchWriter := func() { - batchWriter = state.NewBatchWriter(filePath, t, batchNum) - err := batchWriter.Init() - if err != nil { - utils.ErrExit("initializing batch writer for table: %q: %s", t, err) - } - // Write the header if necessary - if header != "" && dataFileDescriptor.FileFormat == datafile.CSV { - err = batchWriter.WriteHeader(header) - if err != nil { - utils.ErrExit("writing header for table: %q: %s", t, err) - } - } - } - - // Function to finalize and submit the current batch - finalizeBatch := func(isLastBatch bool, offsetEnd int64, bytesInBatch int64) { - batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch) - if err != nil { - utils.ErrExit("finalizing batch %d: %s", batchNum, err) - } - batchWriter = nil - submitBatch(batch, updateProgressFn, importBatchArgsProto) - - // Increment batchNum only if this is not the last batch - if !isLastBatch { - batchNum++ - } - } - - for readLineErr == nil { - - if batchWriter == nil { - initBatchWriter() // Create a new batchWriter - } - - line, currentBytesRead, readLineErr = dataFile.NextLine() - if readLineErr == nil || (readLineErr == io.EOF && line != "") { - // handling possible case: last dataline(i.e. EOF) but no newline char at the end - numLinesTaken += 1 - } - log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, dataFile.GetBytesRead(), currentBytesRead) - if currentBytesRead > tdb.MaxBatchSizeInBytes() { - //If a row is itself larger than MaxBatchSizeInBytes erroring out - ybSpecificMsg := "" - if tconf.TargetDBType == YUGABYTEDB { - ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)" - } - utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) - } - if line != "" { - // can't use importBatchArgsProto.Columns as to use case insenstiive column names - columnNames, _ := TableToColumnNames.Get(t) - line, err = valueConverter.ConvertRow(t, columnNames, line) - if err != nil { - utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, t.ForOutput(), filePath, err) - } - - // Check if adding this record exceeds the max batch size - if batchWriter.NumRecordsWritten == batchSizeInNumRows || - dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead - - // Finalize the current batch without adding the record - finalizeBatch(false, numLinesTaken-1, dataFile.GetBytesRead()-currentBytesRead) - - //carry forward the bytes to next batch - dataFile.ResetBytesRead(currentBytesRead) - - // Start a new batch by calling the initBatchWriter function - initBatchWriter() - } - - // Write the record to the new or current batch - err = batchWriter.WriteRecord(line) - if err != nil { - utils.ErrExit("Write to batch %d: %s", batchNum, err) - } - } - - // Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0 - if readLineErr == io.EOF { - finalizeBatch(true, numLinesTaken, dataFile.GetBytesRead()) - dataFile.ResetBytesRead(0) - } else if readLineErr != nil { - utils.ErrExit("read line from data file: %q: %s", filePath, readLineErr) - } - } - - log.Infof("splitFilesForTable: done splitting data file %q for table %q", filePath, t) } func executePostSnapshotImportSqls() error { diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go index e9872341ab..4bccbd53f2 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer.go +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -72,7 +72,7 @@ func (p *FileBatchProducer) Done() bool { func (p *FileBatchProducer) NextBatch() (*Batch, error) { if p.Done() { - return nil, fmt.Errorf("already done") + return nil, fmt.Errorf("already completed producing all batches") } if len(p.pendingBatches) > 0 { batch := p.pendingBatches[0] @@ -176,7 +176,8 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr) } } - return nil, fmt.Errorf("unexpected") + // ideally should not reach here + return nil, fmt.Errorf("could not produce next batch: err: %w", readLineErr) } func (p *FileBatchProducer) openDataFile() error { @@ -229,10 +230,4 @@ func (p *FileBatchProducer) finalizeBatch(batchWriter *BatchWriter, isLastBatch batchWriter = nil p.lastBatchNumber = batchNum return batch, nil - // submitBatch(batch, updateProgressFn, importBatchArgsProto) - - // Increment batchNum only if this is not the last batch - // if !isLastBatch { - // batchNum++ - // } } From ae0f5f39da84a08a0aca7ea78d8499181a16c342 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 22 Jan 2025 13:00:08 +0530 Subject: [PATCH 14/17] comments --- yb-voyager/cmd/importDataFileBatchProducer.go | 24 +++++++++---------- 1 file changed, 12 insertions(+), 12 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go index 4bccbd53f2..037aa11e34 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer.go +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -28,15 +28,17 @@ type FileBatchProducer struct { task *ImportFileTask state *ImportDataState - pendingBatches []*Batch - lastBatchNumber int64 - lastOffset int64 - fileFullySplit bool - completed bool - - dataFile datafile.DataFile - header string - numLinesTaken int64 + pendingBatches []*Batch //pending batches after recovery + lastBatchNumber int64 // batch number of the last batch that was produced + lastOffset int64 // file offset from where the last batch was produced, only used in recovery + fileFullySplit bool // if the file is fully split into batches + completed bool // if all batches have been produced + + dataFile datafile.DataFile + header string + numLinesTaken int64 // number of lines read from the file + // line that was read from file while producing the previous batch + // but not added to the batch because adding it would breach size/row based thresholds. lineFromPreviousBatch string } @@ -150,12 +152,10 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { p.dataFile.ResetBytesRead(currentBytesRead) p.lineFromPreviousBatch = line - // Start a new batch by calling the initBatchWriter function - // initBatchWriter() return batch, nil } - // Write the record to the new or current batch + // Write the record to the current batch err = batchWriter.WriteRecord(line) if err != nil { return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err) From 634f29c4090749eeb0e6b853b259689d4a241848 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 22 Jan 2025 13:06:30 +0530 Subject: [PATCH 15/17] review comments --- yb-voyager/cmd/importData.go | 5 ----- yb-voyager/cmd/importDataFileBatchProducer.go | 11 +++++++---- 2 files changed, 7 insertions(+), 9 deletions(-) diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 09a48e01d8..52d55c83b6 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -1005,11 +1005,6 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f importBatchArgsProto := getImportBatchArgsProto(task.TableNameTup, task.FilePath) log.Infof("Start splitting table %q: data-file: %q", task.TableNameTup, origDataFile) - err := state.PrepareForFileImport(task.FilePath, task.TableNameTup) - if err != nil { - utils.ErrExit("preparing for file import: %s", err) - } - fileBatchProducer, err := NewFileBatchProducer(task, state) if err != nil { utils.ErrExit("creating file batch producer: %s", err) diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go index 037aa11e34..8551089cd6 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer.go +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -51,10 +51,7 @@ func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBa if err != nil { return nil, fmt.Errorf("recovering state for table: %q: %s", task.TableNameTup, err) } - var completed bool - if len(pendingBatches) == 0 && fileFullySplit { - completed = true - } + completed := len(pendingBatches) == 0 && fileFullySplit return &FileBatchProducer{ task: task, @@ -106,6 +103,9 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { if err != nil { return nil, err } + // in the previous batch, a line was read from file but not added to the batch + // because adding it would breach size/row based thresholds. + // Add that line to the current batch. if p.lineFromPreviousBatch != "" { err = batchWriter.WriteRecord(p.lineFromPreviousBatch) if err != nil { @@ -170,6 +170,9 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { } p.completed = true + // TODO: resetting bytes read to 0 is technically not correct if we are adding a header + // to each batch file. Currently header bytes are only considered in the first batch. + // For the rest of the batches, header bytes are ignored, since we are resetting it to 0. p.dataFile.ResetBytesRead(0) return batch, nil } else if readLineErr != nil { From b2da3ffde76b571d935edd24532f33cb0f5558e7 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 22 Jan 2025 13:50:21 +0530 Subject: [PATCH 16/17] test for when all batches are produced and we resume --- .../cmd/importDataFileBatchProducer_test.go | 54 +++++++++++++++++++ 1 file changed, 54 insertions(+) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 852dccee60..6d3b528510 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -331,3 +331,57 @@ func TestFileBatchProducerResumable(t *testing.T) { assert.Equal(t, int64(2), batch2.RecordCount) assert.True(t, batchproducer.Done()) } + +func TestFileBatchProducerResumeAfterAllBatchesProduced(t *testing.T) { + // max batch size in rows is 2 + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello" +2, "world" +3, "foo" +4, "bar"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + assert.False(t, batchproducer.Done()) + + // generate all batches + batches := []*Batch{} + for !batchproducer.Done() { + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + batches = append(batches, batch) + } + + // simulate a crash and recover + batchproducer, err = NewFileBatchProducer(task, state) + assert.NoError(t, err) + assert.False(t, batchproducer.Done()) + + // state should have recovered two batches + assert.Equal(t, 2, len(batchproducer.pendingBatches)) + + // verify that it picks up from pendingBatches + // instead of procing a new batch. + recoveredBatches := []*Batch{} + for !batchproducer.Done() { + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + recoveredBatches = append(recoveredBatches, batch) + } + assert.Equal(t, len(batches), len(recoveredBatches)) + assert.ElementsMatch(t, batches, recoveredBatches) +} From c65f1126e4c2e44d6035d6e605ad8b7c50831605 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 22 Jan 2025 14:10:37 +0530 Subject: [PATCH 17/17] review comments --- .../cmd/importDataFileBatchProducer_test.go | 67 ++++++++----------- yb-voyager/test/utils/testutils.go | 17 +++++ 2 files changed, 45 insertions(+), 39 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 6d3b528510..c33ca04750 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -30,6 +30,7 @@ import ( "github.com/yugabyte/yb-voyager/yb-voyager/src/dbzm" "github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb" "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" + testutils "github.com/yugabyte/yb-voyager/yb-voyager/test/utils" ) type dummyTDB struct { @@ -41,24 +42,7 @@ func (d *dummyTDB) MaxBatchSizeInBytes() int64 { return d.maxSizeBytes } -func createTempFile(dir string, fileContents string) (string, error) { - // Create a temporary file - file, err := os.CreateTemp(dir, "temp-*.txt") - if err != nil { - return "", err - } - defer file.Close() - - // Write some text to the file - _, err = file.WriteString(fileContents) - if err != nil { - return "", err - } - - return file.Name(), nil -} - -func setupDependenciesForTest(batchSizeRows int64, batchSizeBytes int64) (string, string, *ImportDataState, error) { +func setupExportDirAndImportDependencies(batchSizeRows int64, batchSizeBytes int64) (string, string, *ImportDataState, error) { lexportDir, err := os.MkdirTemp("/tmp", "export-dir-*") if err != nil { return "", "", nil, err @@ -80,7 +64,7 @@ func setupDependenciesForTest(batchSizeRows int64, batchSizeBytes int64) (string return ldataDir, lexportDir, state, nil } -func setupFileForTest(lexportDir string, fileContents string, dir string, tableName string) (string, *ImportFileTask, error) { +func createFileAndTask(lexportDir string, fileContents string, ldataDir string, tableName string) (string, *ImportFileTask, error) { dataFileDescriptor = &datafile.Descriptor{ FileFormat: "csv", Delimiter: ",", @@ -90,7 +74,7 @@ func setupFileForTest(lexportDir string, fileContents string, dir string, tableN EscapeChar: '\\', NullString: "NULL", } - tempFile, err := createTempFile(dir, fileContents) + tempFile, err := testutils.CreateTempFile(ldataDir, fileContents) if err != nil { return "", nil, err } @@ -107,7 +91,7 @@ func setupFileForTest(lexportDir string, fileContents string, dir string, tableN } func TestBasicFileBatchProducer(t *testing.T) { - ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) assert.NoError(t, err) if ldataDir != "" { @@ -119,7 +103,7 @@ func TestBasicFileBatchProducer(t *testing.T) { fileContents := `id,val 1, "hello"` - _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + _, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table") assert.NoError(t, err) batchproducer, err := NewFileBatchProducer(task, state) @@ -136,7 +120,7 @@ func TestBasicFileBatchProducer(t *testing.T) { func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { // max batch size in rows is 2 - ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) assert.NoError(t, err) if ldataDir != "" { @@ -151,7 +135,7 @@ func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { 2, "world" 3, "foo" 4, "bar"` - _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + _, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table") assert.NoError(t, err) batchproducer, err := NewFileBatchProducer(task, state) @@ -169,22 +153,24 @@ func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { // 2 batches should be produced assert.Equal(t, 2, len(batches)) - // each of length 2 + + batch1ExpectedContents := "id,val\n1, \"hello\"\n2, \"world\"" assert.Equal(t, int64(2), batches[0].RecordCount) batchContents, err := os.ReadFile(batches[0].GetFilePath()) assert.NoError(t, err) - assert.Equal(t, "id,val\n1, \"hello\"\n2, \"world\"", string(batchContents)) + assert.Equal(t, batch1ExpectedContents, string(batchContents)) + batch2ExpectedContents := "id,val\n3, \"foo\"\n4, \"bar\"" assert.Equal(t, int64(2), batches[1].RecordCount) batchContents, err = os.ReadFile(batches[1].GetFilePath()) assert.NoError(t, err) - assert.Equal(t, "id,val\n3, \"foo\"\n4, \"bar\"", string(batchContents)) + assert.Equal(t, batch2ExpectedContents, string(batchContents)) } func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { // max batch size in size is 25 bytes maxBatchSizeBytes := int64(25) - ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, maxBatchSizeBytes) + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(1000, maxBatchSizeBytes) assert.NoError(t, err) if ldataDir != "" { @@ -200,7 +186,7 @@ func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { 2, "ghijk" 3, "mnopq" 4, "stuvw"` - _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + _, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table") assert.NoError(t, err) batchproducer, err := NewFileBatchProducer(task, state) @@ -219,29 +205,32 @@ func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { // 3 batches should be produced // while calculating for the first batch, the header is also considered assert.Equal(t, 3, len(batches)) - // each of length 2 + + batch1ExpectedContents := "id,val\n1, \"abcde\"" assert.Equal(t, int64(1), batches[0].RecordCount) assert.LessOrEqual(t, batches[0].ByteCount, maxBatchSizeBytes) batchContents, err := os.ReadFile(batches[0].GetFilePath()) assert.NoError(t, err) - assert.Equal(t, "id,val\n1, \"abcde\"", string(batchContents)) + assert.Equal(t, batch1ExpectedContents, string(batchContents)) + batch2ExpectedContents := "id,val\n2, \"ghijk\"\n3, \"mnopq\"" assert.Equal(t, int64(2), batches[1].RecordCount) assert.LessOrEqual(t, batches[1].ByteCount, maxBatchSizeBytes) batchContents, err = os.ReadFile(batches[1].GetFilePath()) assert.NoError(t, err) - assert.Equal(t, "id,val\n2, \"ghijk\"\n3, \"mnopq\"", string(batchContents)) + assert.Equal(t, batch2ExpectedContents, string(batchContents)) + batch3ExpectedContents := "id,val\n4, \"stuvw\"" assert.Equal(t, int64(1), batches[2].RecordCount) assert.LessOrEqual(t, batches[2].ByteCount, maxBatchSizeBytes) batchContents, err = os.ReadFile(batches[2].GetFilePath()) assert.NoError(t, err) - assert.Equal(t, "id,val\n4, \"stuvw\"", string(batchContents)) + assert.Equal(t, batch3ExpectedContents, string(batchContents)) } func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *testing.T) { // max batch size in size is 25 bytes - ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, 25) + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(1000, 25) assert.NoError(t, err) if ldataDir != "" { @@ -257,7 +246,7 @@ func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *tes 2, "ghijk" 3, "mnopq1234567899876543" 4, "stuvw"` - _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + _, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table") assert.NoError(t, err) batchproducer, err := NewFileBatchProducer(task, state) @@ -278,7 +267,7 @@ func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *tes func TestFileBatchProducerResumable(t *testing.T) { // max batch size in rows is 2 - ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) assert.NoError(t, err) if ldataDir != "" { @@ -293,7 +282,7 @@ func TestFileBatchProducerResumable(t *testing.T) { 2, "world" 3, "foo" 4, "bar"` - _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + _, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table") assert.NoError(t, err) batchproducer, err := NewFileBatchProducer(task, state) @@ -334,7 +323,7 @@ func TestFileBatchProducerResumable(t *testing.T) { func TestFileBatchProducerResumeAfterAllBatchesProduced(t *testing.T) { // max batch size in rows is 2 - ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + ldataDir, lexportDir, state, err := setupExportDirAndImportDependencies(2, 1024) assert.NoError(t, err) if ldataDir != "" { @@ -349,7 +338,7 @@ func TestFileBatchProducerResumeAfterAllBatchesProduced(t *testing.T) { 2, "world" 3, "foo" 4, "bar"` - _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + _, task, err := createFileAndTask(lexportDir, fileContents, ldataDir, "test_table") assert.NoError(t, err) batchproducer, err := NewFileBatchProducer(task, state) diff --git a/yb-voyager/test/utils/testutils.go b/yb-voyager/test/utils/testutils.go index 942e00fd67..f8a3418ab0 100644 --- a/yb-voyager/test/utils/testutils.go +++ b/yb-voyager/test/utils/testutils.go @@ -406,3 +406,20 @@ func FatalIfError(t *testing.T, err error) { t.Fatalf("error: %v", err) } } + +func CreateTempFile(dir string, fileContents string) (string, error) { + // Create a temporary file + file, err := os.CreateTemp(dir, "temp-*.txt") + if err != nil { + return "", err + } + defer file.Close() + + // Write some text to the file + _, err = file.WriteString(fileContents) + if err != nil { + return "", err + } + + return file.Name(), nil +}