From fa29368299b3c846ad5ff03cfba88cd35562365b Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 15 Jan 2025 13:41:19 +0530 Subject: [PATCH 01/18] wip --- yb-voyager/cmd/importDataFileBatchProducer.go | 154 ++++++++++++++++++ 1 file changed, 154 insertions(+) create mode 100644 yb-voyager/cmd/importDataFileBatchProducer.go diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go new file mode 100644 index 0000000000..ca6335035a --- /dev/null +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -0,0 +1,154 @@ +/* +Copyright (c) YugabyteDB, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package cmd + +import ( + "fmt" + + log "github.com/sirupsen/logrus" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils" +) + +type FileBatchProducer struct { + task *ImportFileTask + state *ImportDataState + + pendingBatches []*Batch + lastBatchNumber int64 + lastOffset int64 + fileFullySplit bool + completed bool + + datafile *datafile.DataFile + header string +} + +func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBatchProducer, error) { + err := state.PrepareForFileImport(task.FilePath, task.TableNameTup) + if err != nil { + return nil, fmt.Errorf("preparing for file import: %s", err) + } + pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) + if err != nil { + return nil, fmt.Errorf("recovering state for table: %q: %s", task.TableNameTup, err) + } + var completed bool + if len(pendingBatches) == 0 && fileFullySplit { + completed = true + } + + return &FileBatchProducer{ + task: task, + state: state, + pendingBatches: pendingBatches, + lastBatchNumber: lastBatchNumber, + lastOffset: lastOffset, + fileFullySplit: fileFullySplit, + completed: completed, + }, nil +} + +func (p *FileBatchProducer) Done() bool { + return p.completed +} + +func (p *FileBatchProducer) NextBatch() (*Batch, error) { + if len(p.pendingBatches) > 0 { + batch := p.pendingBatches[0] + p.pendingBatches = p.pendingBatches[1:] + // file is fully split and returning the last batch, so mark the producer as completed + if len(p.pendingBatches) == 0 && p.fileFullySplit { + p.completed = true + } + return batch, nil + } + + return p.produceNextBatch() +} + +func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { + if p.datafile == nil { + err := p.openDataFile() + if err != nil { + return nil, err + } + } + + batchWriter, err := p.newBatchWriter() + if err != nil { + return nil, err + } + +} + +func (p *FileBatchProducer) openDataFile() error { + reader, err := dataStore.Open(p.task.FilePath) + if err != nil { + return fmt.Errorf("preparing reader for split generation on file: %q: %v", p.task.FilePath, err) + } + + dataFile, err := datafile.NewDataFile(p.task.FilePath, reader, dataFileDescriptor) + + if err != nil { + return fmt.Errorf("open datafile: %q: %v", p.task.FilePath, err) + } + p.datafile = &dataFile + + log.Infof("Skipping %d lines from %q", p.lastOffset, p.task.FilePath) + err = dataFile.SkipLines(p.lastOffset) + if err != nil { + return fmt.Errorf("skipping line for offset=%d: %v", p.lastOffset, err) + } + if dataFileDescriptor.HasHeader { + p.header = dataFile.GetHeader() + } + return nil +} + +func (p *FileBatchProducer) newBatchWriter() (*BatchWriter, error) { + batchNum := p.lastBatchNumber + 1 + batchWriter := p.state.NewBatchWriter(p.task.FilePath, p.task.TableNameTup, batchNum) + err := batchWriter.Init() + if err != nil { + return nil, fmt.Errorf("initializing batch writer for table: %q: %s", p.task.TableNameTup, err) + } + // Write the header if necessary + if p.header != "" && dataFileDescriptor.FileFormat == datafile.CSV { + err = batchWriter.WriteHeader(p.header) + if err != nil { + utils.ErrExit("writing header for table: %q: %s", p.task.TableNameTup, err) + } + } + return batchWriter, nil +} + +func (p *FileBatchProducer) finalizeBatch(batchWriter *BatchWriter, isLastBatch bool, offsetEnd int64, bytesInBatch int64) (*Batch, error) { + batchNum := p.lastBatchNumber + 1 + batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch) + if err != nil { + utils.ErrExit("finalizing batch %d: %s", batchNum, err) + } + batchWriter = nil + p.lastBatchNumber = batchNum + return batch, nil + // submitBatch(batch, updateProgressFn, importBatchArgsProto) + + // Increment batchNum only if this is not the last batch + // if !isLastBatch { + // batchNum++ + // } +} From ec54ea6c27ae2b6585c2861974d6f9283628c75f Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 15 Jan 2025 14:55:55 +0530 Subject: [PATCH 02/18] base logic for producing next batch --- yb-voyager/cmd/importDataFileBatchProducer.go | 74 ++++++++++++++++++- 1 file changed, 71 insertions(+), 3 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go index ca6335035a..db21968dbf 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer.go +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -17,6 +17,7 @@ package cmd import ( "fmt" + "io" log "github.com/sirupsen/logrus" "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" @@ -33,7 +34,7 @@ type FileBatchProducer struct { fileFullySplit bool completed bool - datafile *datafile.DataFile + dataFile datafile.DataFile header string } @@ -81,7 +82,7 @@ func (p *FileBatchProducer) NextBatch() (*Batch, error) { } func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { - if p.datafile == nil { + if p.dataFile == nil { err := p.openDataFile() if err != nil { return nil, err @@ -93,6 +94,73 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { return nil, err } + var readLineErr error + var line string + var currentBytesRead int64 + batchNum := p.lastBatchNumber + 1 + + for readLineErr == nil { + + line, currentBytesRead, readLineErr = p.dataFile.NextLine() + if readLineErr == nil || (readLineErr == io.EOF && line != "") { + // handling possible case: last dataline(i.e. EOF) but no newline char at the end + numLinesTaken += 1 + } + log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, p.dataFile.GetBytesRead(), currentBytesRead) + if currentBytesRead > tdb.MaxBatchSizeInBytes() { + //If a row is itself larger than MaxBatchSizeInBytes erroring out + ybSpecificMsg := "" + if tconf.TargetDBType == YUGABYTEDB { + ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)" + } + utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) + } + if line != "" { + // can't use importBatchArgsProto.Columns as to use case insenstiive column names + columnNames, _ := TableToColumnNames.Get(t) + line, err = valueConverter.ConvertRow(t, columnNames, line) + if err != nil { + utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, err) + } + + // Check if adding this record exceeds the max batch size + if batchWriter.NumRecordsWritten == batchSizeInNumRows || + p.dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead + + // Finalize the current batch without adding the record + batch, err := p.finalizeBatch(batchWriter, false, numLinesTaken-1, p.dataFile.GetBytesRead()-currentBytesRead) + if err != nil { + return nil, err + } + + //carry forward the bytes to next batch + p.dataFile.ResetBytesRead(currentBytesRead) + + // Start a new batch by calling the initBatchWriter function + initBatchWriter() + } + + // Write the record to the new or current batch + err = batchWriter.WriteRecord(line) + if err != nil { + utils.ErrExit("Write to batch %d: %s", batchNum, err) + } + } + + // Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0 + if readLineErr == io.EOF { + batch, err := p.finalizeBatch(batchWriter, true, numLinesTaken, p.dataFile.GetBytesRead()) + if err != nil { + return nil, err + } + + p.completed = true + p.dataFile.ResetBytesRead(0) + return batch, nil + } else if readLineErr != nil { + return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr) + } + } } func (p *FileBatchProducer) openDataFile() error { @@ -106,7 +174,7 @@ func (p *FileBatchProducer) openDataFile() error { if err != nil { return fmt.Errorf("open datafile: %q: %v", p.task.FilePath, err) } - p.datafile = &dataFile + p.dataFile = dataFile log.Infof("Skipping %d lines from %q", p.lastOffset, p.task.FilePath) err = dataFile.SkipLines(p.lastOffset) From a4aabc8be761f4ccdad169529a2f8f46e6f41a32 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 15 Jan 2025 16:30:53 +0530 Subject: [PATCH 03/18] store line from previous batch --- yb-voyager/cmd/importDataFileBatchProducer.go | 48 ++++++++++++------- 1 file changed, 32 insertions(+), 16 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go index db21968dbf..e9872341ab 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer.go +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -34,8 +34,10 @@ type FileBatchProducer struct { fileFullySplit bool completed bool - dataFile datafile.DataFile - header string + dataFile datafile.DataFile + header string + numLinesTaken int64 + lineFromPreviousBatch string } func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBatchProducer, error) { @@ -60,6 +62,7 @@ func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBa lastOffset: lastOffset, fileFullySplit: fileFullySplit, completed: completed, + numLinesTaken: lastOffset, }, nil } @@ -68,6 +71,9 @@ func (p *FileBatchProducer) Done() bool { } func (p *FileBatchProducer) NextBatch() (*Batch, error) { + if p.Done() { + return nil, fmt.Errorf("already done") + } if len(p.pendingBatches) > 0 { batch := p.pendingBatches[0] p.pendingBatches = p.pendingBatches[1:] @@ -89,22 +95,29 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { } } - batchWriter, err := p.newBatchWriter() - if err != nil { - return nil, err - } - var readLineErr error var line string var currentBytesRead int64 batchNum := p.lastBatchNumber + 1 + batchWriter, err := p.newBatchWriter() + if err != nil { + return nil, err + } + if p.lineFromPreviousBatch != "" { + err = batchWriter.WriteRecord(p.lineFromPreviousBatch) + if err != nil { + return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err) + } + p.lineFromPreviousBatch = "" + } + for readLineErr == nil { line, currentBytesRead, readLineErr = p.dataFile.NextLine() if readLineErr == nil || (readLineErr == io.EOF && line != "") { // handling possible case: last dataline(i.e. EOF) but no newline char at the end - numLinesTaken += 1 + p.numLinesTaken += 1 } log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, p.dataFile.GetBytesRead(), currentBytesRead) if currentBytesRead > tdb.MaxBatchSizeInBytes() { @@ -113,14 +126,14 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { if tconf.TargetDBType == YUGABYTEDB { ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)" } - utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) + return nil, fmt.Errorf("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) } if line != "" { // can't use importBatchArgsProto.Columns as to use case insenstiive column names - columnNames, _ := TableToColumnNames.Get(t) - line, err = valueConverter.ConvertRow(t, columnNames, line) + columnNames, _ := TableToColumnNames.Get(p.task.TableNameTup) + line, err = valueConverter.ConvertRow(p.task.TableNameTup, columnNames, line) if err != nil { - utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, err) + return nil, fmt.Errorf("transforming line number=%d for table: %q in file %s: %s", p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, err) } // Check if adding this record exceeds the max batch size @@ -128,28 +141,30 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { p.dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead // Finalize the current batch without adding the record - batch, err := p.finalizeBatch(batchWriter, false, numLinesTaken-1, p.dataFile.GetBytesRead()-currentBytesRead) + batch, err := p.finalizeBatch(batchWriter, false, p.numLinesTaken-1, p.dataFile.GetBytesRead()-currentBytesRead) if err != nil { return nil, err } //carry forward the bytes to next batch p.dataFile.ResetBytesRead(currentBytesRead) + p.lineFromPreviousBatch = line // Start a new batch by calling the initBatchWriter function - initBatchWriter() + // initBatchWriter() + return batch, nil } // Write the record to the new or current batch err = batchWriter.WriteRecord(line) if err != nil { - utils.ErrExit("Write to batch %d: %s", batchNum, err) + return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err) } } // Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0 if readLineErr == io.EOF { - batch, err := p.finalizeBatch(batchWriter, true, numLinesTaken, p.dataFile.GetBytesRead()) + batch, err := p.finalizeBatch(batchWriter, true, p.numLinesTaken, p.dataFile.GetBytesRead()) if err != nil { return nil, err } @@ -161,6 +176,7 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr) } } + return nil, fmt.Errorf("unexpected") } func (p *FileBatchProducer) openDataFile() error { From 440a5f3be9ca17374247f2c081ea972d6eacf6b7 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 11:45:52 +0530 Subject: [PATCH 04/18] test --- .../cmd/importDataFileBatchProducer_test.go | 116 ++++++++++++++++++ 1 file changed, 116 insertions(+) create mode 100644 yb-voyager/cmd/importDataFileBatchProducer_test.go diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go new file mode 100644 index 0000000000..76feebc972 --- /dev/null +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -0,0 +1,116 @@ +/* +Copyright (c) YugabyteDB, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package cmd + +import ( + "fmt" + "os" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/yugabyte/yb-voyager/yb-voyager/src/constants" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datastore" + "github.com/yugabyte/yb-voyager/yb-voyager/src/dbzm" + "github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" +) + +type DummyTdb struct { + tgtdb.TargetYugabyteDB +} + +func (t *DummyTdb) MaxBatchSizeInBytes() int64 { + return 1024 +} + +func createTempFile(dir string, fileContents string) (string, error) { + // Create a temporary file + file, err := os.CreateTemp(dir, "temp-*.txt") + if err != nil { + return "", err + } + defer file.Close() + + // Write some text to the file + _, err = file.WriteString(fileContents) + if err != nil { + return "", err + } + + return file.Name(), nil +} + +func TestBasicFileBatchProducer(t *testing.T) { + fileContents := `id,val +1, "hello"` + + exportDir, err := os.MkdirTemp("/tmp", "export-dir-*") + assert.NoError(t, err) + defer os.RemoveAll(fmt.Sprintf("%s/", exportDir)) + dataDir, err := os.MkdirTemp("/tmp", "data-dir-*") + assert.NoError(t, err) + defer os.RemoveAll(fmt.Sprintf("%s/", dataDir)) + tempFile, err := createTempFile(dataDir, fileContents) + assert.NoError(t, err) + + CreateMigrationProjectIfNotExists(constants.POSTGRESQL, exportDir) + tdb = &DummyTdb{} + valueConverter = &dbzm.NoOpValueConverter{} + dataStore = datastore.NewDataStore(dataDir) + batchSizeInNumRows = 2 + dataFileDescriptor = &datafile.Descriptor{ + FileFormat: "csv", + Delimiter: ",", + HasHeader: true, + ExportDir: exportDir, + QuoteChar: '"', + EscapeChar: '\\', + NullString: "NULL", + // DataFileList: []*FileEntry{ + // { + // FilePath: "file.csv", // Use relative path for testing absolute path handling. + // TableName: "public.my_table", + // RowCount: 100, + // FileSize: 2048, + // }, + // }, + // TableNameToExportedColumns: map[string][]string{ + // "public.my_table": {"id", "name", "age"}, + // }, + } + + sourceName := sqlname.NewObjectName(constants.POSTGRESQL, "public", "public", "test_table") + tableNameTup := sqlname.NameTuple{SourceName: sourceName, CurrentName: sourceName} + task := &ImportFileTask{ + ID: 1, + FilePath: tempFile, + TableNameTup: tableNameTup, + RowCount: 1, + } + + state := NewImportDataState(exportDir) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + assert.Equal(t, int64(1), batch.RecordCount) +} From c14e69ea6e5e4c449a63ff1c28d0cfdc1c2ed77a Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 12:54:30 +0530 Subject: [PATCH 05/18] rewrite test --- .../cmd/importDataFileBatchProducer_test.go | 76 +++++++++++-------- 1 file changed, 45 insertions(+), 31 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 76feebc972..9ddc10a715 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -29,11 +29,11 @@ import ( "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" ) -type DummyTdb struct { +type dummyTDB struct { tgtdb.TargetYugabyteDB } -func (t *DummyTdb) MaxBatchSizeInBytes() int64 { +func (t *dummyTDB) MaxBatchSizeInBytes() int64 { return 1024 } @@ -54,46 +54,44 @@ func createTempFile(dir string, fileContents string) (string, error) { return file.Name(), nil } -func TestBasicFileBatchProducer(t *testing.T) { - fileContents := `id,val -1, "hello"` +func setupDependenciesForTest(batchSize int64) (string, string, *ImportDataState, error) { + lexportDir, err := os.MkdirTemp("/tmp", "export-dir-*") + if err != nil { + return "", "", nil, err + } - exportDir, err := os.MkdirTemp("/tmp", "export-dir-*") - assert.NoError(t, err) - defer os.RemoveAll(fmt.Sprintf("%s/", exportDir)) - dataDir, err := os.MkdirTemp("/tmp", "data-dir-*") - assert.NoError(t, err) - defer os.RemoveAll(fmt.Sprintf("%s/", dataDir)) - tempFile, err := createTempFile(dataDir, fileContents) - assert.NoError(t, err) + ldataDir, err := os.MkdirTemp("/tmp", "data-dir-*") + if err != nil { + return "", "", nil, err + } - CreateMigrationProjectIfNotExists(constants.POSTGRESQL, exportDir) - tdb = &DummyTdb{} + CreateMigrationProjectIfNotExists(constants.POSTGRESQL, lexportDir) + tdb = &dummyTDB{} valueConverter = &dbzm.NoOpValueConverter{} - dataStore = datastore.NewDataStore(dataDir) - batchSizeInNumRows = 2 + dataStore = datastore.NewDataStore(ldataDir) + + batchSizeInNumRows = batchSize + + state := NewImportDataState(lexportDir) + return ldataDir, lexportDir, state, nil +} + +func setupFileForTest(lexportDir string, fileContents string, dir string, tableName string) (string, *ImportFileTask, error) { dataFileDescriptor = &datafile.Descriptor{ FileFormat: "csv", Delimiter: ",", HasHeader: true, - ExportDir: exportDir, + ExportDir: lexportDir, QuoteChar: '"', EscapeChar: '\\', NullString: "NULL", - // DataFileList: []*FileEntry{ - // { - // FilePath: "file.csv", // Use relative path for testing absolute path handling. - // TableName: "public.my_table", - // RowCount: 100, - // FileSize: 2048, - // }, - // }, - // TableNameToExportedColumns: map[string][]string{ - // "public.my_table": {"id", "name", "age"}, - // }, + } + tempFile, err := createTempFile(dir, fileContents) + if err != nil { + return "", nil, err } - sourceName := sqlname.NewObjectName(constants.POSTGRESQL, "public", "public", "test_table") + sourceName := sqlname.NewObjectName(constants.POSTGRESQL, "public", "public", tableName) tableNameTup := sqlname.NameTuple{SourceName: sourceName, CurrentName: sourceName} task := &ImportFileTask{ ID: 1, @@ -101,8 +99,24 @@ func TestBasicFileBatchProducer(t *testing.T) { TableNameTup: tableNameTup, RowCount: 1, } + return tempFile, task, nil +} + +func TestBasicFileBatchProducer(t *testing.T) { + ldataDir, lexportDir, state, err := setupDependenciesForTest(2) + assert.NoError(t, err) - state := NewImportDataState(exportDir) + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) batchproducer, err := NewFileBatchProducer(task, state) assert.NoError(t, err) From a056385e77f7daeb9f69a47ffec638ff566c7e23 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 12:57:23 +0530 Subject: [PATCH 06/18] minor fix --- yb-voyager/cmd/importDataFileBatchProducer_test.go | 14 ++++++++------ 1 file changed, 8 insertions(+), 6 deletions(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 9ddc10a715..12d3a6678f 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -30,11 +30,12 @@ import ( ) type dummyTDB struct { + maxSizeBytes int64 tgtdb.TargetYugabyteDB } -func (t *dummyTDB) MaxBatchSizeInBytes() int64 { - return 1024 +func (d *dummyTDB) MaxBatchSizeInBytes() int64 { + return d.maxSizeBytes } func createTempFile(dir string, fileContents string) (string, error) { @@ -54,7 +55,7 @@ func createTempFile(dir string, fileContents string) (string, error) { return file.Name(), nil } -func setupDependenciesForTest(batchSize int64) (string, string, *ImportDataState, error) { +func setupDependenciesForTest(batchSizeRows int64, batchSizeBytes int64) (string, string, *ImportDataState, error) { lexportDir, err := os.MkdirTemp("/tmp", "export-dir-*") if err != nil { return "", "", nil, err @@ -66,11 +67,11 @@ func setupDependenciesForTest(batchSize int64) (string, string, *ImportDataState } CreateMigrationProjectIfNotExists(constants.POSTGRESQL, lexportDir) - tdb = &dummyTDB{} + tdb = &dummyTDB{maxSizeBytes: batchSizeBytes} valueConverter = &dbzm.NoOpValueConverter{} dataStore = datastore.NewDataStore(ldataDir) - batchSizeInNumRows = batchSize + batchSizeInNumRows = batchSizeRows state := NewImportDataState(lexportDir) return ldataDir, lexportDir, state, nil @@ -103,7 +104,7 @@ func setupFileForTest(lexportDir string, fileContents string, dir string, tableN } func TestBasicFileBatchProducer(t *testing.T) { - ldataDir, lexportDir, state, err := setupDependenciesForTest(2) + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) assert.NoError(t, err) if ldataDir != "" { @@ -127,4 +128,5 @@ func TestBasicFileBatchProducer(t *testing.T) { assert.NoError(t, err) assert.NotNil(t, batch) assert.Equal(t, int64(1), batch.RecordCount) + assert.True(t, batchproducer.Done()) } From c45b6a3213fcee700918285f3320f61bef92cf9e Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:11:31 +0530 Subject: [PATCH 07/18] more tests --- .../cmd/importDataFileBatchProducer_test.go | 120 ++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 12d3a6678f..be4c4f59d3 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -130,3 +130,123 @@ func TestBasicFileBatchProducer(t *testing.T) { assert.Equal(t, int64(1), batch.RecordCount) assert.True(t, batchproducer.Done()) } + +func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { + // max batch size in rows is 2 + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello" +2, "world" +3, "foo" +4, "bar"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + var batches []*Batch + for !batchproducer.Done() { + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + batches = append(batches, batch) + } + + // 2 batches should be produced + assert.Equal(t, 2, len(batches)) + // each of length 2 + assert.Equal(t, int64(2), batches[0].RecordCount) + assert.Equal(t, int64(2), batches[1].RecordCount) +} + +func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { + // max batch size in size is 25 bytes + ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, 25) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + // each row exccept header is 10 bytes + fileContents := `id,val +1, "abcde" +2, "ghijk" +3, "mnopq" +4, "stuvw"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + var batches []*Batch + for !batchproducer.Done() { + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + batches = append(batches, batch) + } + + // 3 batches should be produced + // while calculating for the first batch, the header is also considered + assert.Equal(t, 3, len(batches)) + // each of length 2 + assert.Equal(t, int64(1), batches[0].RecordCount) + assert.Equal(t, int64(2), batches[1].RecordCount) + assert.Equal(t, int64(1), batches[2].RecordCount) +} + +func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *testing.T) { + // max batch size in size is 25 bytes + ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, 25) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + // 3rd row is greater than max batch size + fileContents := `id,val +1, "abcdef" +2, "ghijk" +3, "mnopq1234567899876543" +4, "stuvw"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + // 1st batch is fine. + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + assert.Equal(t, int64(1), batch.RecordCount) + + // 2nd batch should throw error + _, err = batchproducer.NextBatch() + assert.ErrorContains(t, err, "larger than max batch size") +} From a78387ef461f2e0aa3860ff515cec36e9d212441 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:17:50 +0530 Subject: [PATCH 08/18] batch value verify --- .../cmd/importDataFileBatchProducer_test.go | 18 ++++++++++++++++++ 1 file changed, 18 insertions(+) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index be4c4f59d3..d9ed8a6bcf 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -168,7 +168,14 @@ func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { assert.Equal(t, 2, len(batches)) // each of length 2 assert.Equal(t, int64(2), batches[0].RecordCount) + batchContents, err := os.ReadFile(batches[0].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n1, \"hello\"\n2, \"world\"", string(batchContents)) + assert.Equal(t, int64(2), batches[1].RecordCount) + batchContents, err = os.ReadFile(batches[1].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n3, \"foo\"\n4, \"bar\"", string(batchContents)) } func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { @@ -210,8 +217,19 @@ func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { assert.Equal(t, 3, len(batches)) // each of length 2 assert.Equal(t, int64(1), batches[0].RecordCount) + batchContents, err := os.ReadFile(batches[0].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n1, \"abcde\"", string(batchContents)) + assert.Equal(t, int64(2), batches[1].RecordCount) + batchContents, err = os.ReadFile(batches[1].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n2, \"ghijk\"\n3, \"mnopq\"", string(batchContents)) + assert.Equal(t, int64(1), batches[2].RecordCount) + batchContents, err = os.ReadFile(batches[2].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n4, \"stuvw\"", string(batchContents)) } func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *testing.T) { From 65c76456b5a57da556001a6c34ceeb0df4ca441f Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:19:52 +0530 Subject: [PATCH 09/18] assert less than --- yb-voyager/cmd/importDataFileBatchProducer_test.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index d9ed8a6bcf..8fe9cb4b9b 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -180,7 +180,8 @@ func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { // max batch size in size is 25 bytes - ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, 25) + maxBatchSizeBytes := int64(25) + ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, maxBatchSizeBytes) assert.NoError(t, err) if ldataDir != "" { @@ -217,16 +218,19 @@ func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { assert.Equal(t, 3, len(batches)) // each of length 2 assert.Equal(t, int64(1), batches[0].RecordCount) + assert.LessOrEqual(t, batches[0].ByteCount, maxBatchSizeBytes) batchContents, err := os.ReadFile(batches[0].GetFilePath()) assert.NoError(t, err) assert.Equal(t, "id,val\n1, \"abcde\"", string(batchContents)) assert.Equal(t, int64(2), batches[1].RecordCount) + assert.LessOrEqual(t, batches[1].ByteCount, maxBatchSizeBytes) batchContents, err = os.ReadFile(batches[1].GetFilePath()) assert.NoError(t, err) assert.Equal(t, "id,val\n2, \"ghijk\"\n3, \"mnopq\"", string(batchContents)) assert.Equal(t, int64(1), batches[2].RecordCount) + assert.LessOrEqual(t, batches[2].ByteCount, maxBatchSizeBytes) batchContents, err = os.ReadFile(batches[2].GetFilePath()) assert.NoError(t, err) assert.Equal(t, "id,val\n4, \"stuvw\"", string(batchContents)) From a9a294c832fad8feee0039ef081a8a966f0a741a Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:33:16 +0530 Subject: [PATCH 10/18] resumable test --- .../cmd/importDataFileBatchProducer_test.go | 57 +++++++++++++++++++ 1 file changed, 57 insertions(+) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index 8fe9cb4b9b..ed535c7497 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -20,6 +20,7 @@ import ( "os" "testing" + "github.com/google/go-cmp/cmp" "github.com/stretchr/testify/assert" "github.com/yugabyte/yb-voyager/yb-voyager/src/constants" "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" @@ -272,3 +273,59 @@ func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *tes _, err = batchproducer.NextBatch() assert.ErrorContains(t, err, "larger than max batch size") } + +func TestFileBatchProducerResumable(t *testing.T) { + // max batch size in rows is 2 + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello" +2, "world" +3, "foo" +4, "bar"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + assert.False(t, batchproducer.Done()) + + // generate one batch + batch1, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch1) + assert.Equal(t, int64(2), batch1.RecordCount) + + // simulate a crash and recover + batchproducer, err = NewFileBatchProducer(task, state) + assert.NoError(t, err) + assert.False(t, batchproducer.Done()) + + // state should have recovered that one batch + assert.Equal(t, 1, len(batchproducer.pendingBatches)) + assert.True(t, cmp.Equal(batch1, batchproducer.pendingBatches[0])) + + // verify that it picks up from pendingBatches + // instead of procing a new batch. + batch1Recovered, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch1Recovered) + assert.True(t, cmp.Equal(batch1, batch1Recovered)) + assert.Equal(t, 0, len(batchproducer.pendingBatches)) + assert.False(t, batchproducer.Done()) + + // get final batch + batch2, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch2) + assert.Equal(t, int64(2), batch2.RecordCount) + assert.True(t, batchproducer.Done()) +} From 1f32f7a6938f1647803d3315115ffbdda818d406 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:44:53 +0530 Subject: [PATCH 11/18] import data change to use filebatchproducer --- yb-voyager/cmd/importData.go | 27 ++++++++++++++++++++------- 1 file changed, 20 insertions(+), 7 deletions(-) diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 45050cf615..1d44f4d0f7 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -1010,17 +1010,30 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f if err != nil { utils.ErrExit("preparing for file import: %s", err) } - log.Infof("Collect all interrupted/remaining splits.") - pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) + + fileBatchProducer, err := NewFileBatchProducer(task, state) if err != nil { - utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err) + utils.ErrExit("creating file batch producer: %s", err) } - for _, batch := range pendingBatches { + + for !fileBatchProducer.Done() { + batch, err := fileBatchProducer.NextBatch() + if err != nil { + utils.ErrExit("getting next batch: %s", err) + } submitBatch(batch, updateProgressFn, importBatchArgsProto) } - if !fileFullySplit { - splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto) - } + // log.Infof("Collect all interrupted/remaining splits.") + // pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) + // if err != nil { + // utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err) + // } + // for _, batch := range pendingBatches { + // submitBatch(batch, updateProgressFn, importBatchArgsProto) + // } + // if !fileFullySplit { + // splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto) + // } } func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple, From cbc1ede3c7fac201111b00fe752bb959748b934a Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Thu, 16 Jan 2025 13:54:10 +0530 Subject: [PATCH 12/18] unit tag --- yb-voyager/cmd/importDataFileBatchProducer_test.go | 2 ++ 1 file changed, 2 insertions(+) diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go index ed535c7497..852dccee60 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer_test.go +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -1,3 +1,5 @@ +//go:build unit + /* Copyright (c) YugabyteDB, Inc. From 943bd22b6d946ddc219ffe7bb7014e3ab52dfb82 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Mon, 20 Jan 2025 11:13:32 +0530 Subject: [PATCH 13/18] cleanup --- yb-voyager/cmd/importData.go | 136 ------------------ yb-voyager/cmd/importDataFileBatchProducer.go | 11 +- 2 files changed, 3 insertions(+), 144 deletions(-) diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 1d44f4d0f7..09a48e01d8 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -17,7 +17,6 @@ package cmd import ( "fmt" - "io" "os" "os/exec" "path/filepath" @@ -1023,141 +1022,6 @@ func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn f } submitBatch(batch, updateProgressFn, importBatchArgsProto) } - // log.Infof("Collect all interrupted/remaining splits.") - // pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) - // if err != nil { - // utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err) - // } - // for _, batch := range pendingBatches { - // submitBatch(batch, updateProgressFn, importBatchArgsProto) - // } - // if !fileFullySplit { - // splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto) - // } -} - -func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple, - lastBatchNumber int64, lastOffset int64, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) { - log.Infof("Split data file %q: tableName=%q, largestSplit=%v, largestOffset=%v", filePath, t, lastBatchNumber, lastOffset) - batchNum := lastBatchNumber + 1 - numLinesTaken := lastOffset - - reader, err := dataStore.Open(filePath) - if err != nil { - utils.ErrExit("preparing reader for split generation on file: %q: %v", filePath, err) - } - - dataFile, err := datafile.NewDataFile(filePath, reader, dataFileDescriptor) - if err != nil { - utils.ErrExit("open datafile: %q: %v", filePath, err) - } - defer dataFile.Close() - - log.Infof("Skipping %d lines from %q", lastOffset, filePath) - err = dataFile.SkipLines(lastOffset) - if err != nil { - utils.ErrExit("skipping line for offset=%d: %v", lastOffset, err) - } - - var readLineErr error = nil - var line string - var currentBytesRead int64 - var batchWriter *BatchWriter - header := "" - if dataFileDescriptor.HasHeader { - header = dataFile.GetHeader() - } - - // Helper function to initialize a new batchWriter - initBatchWriter := func() { - batchWriter = state.NewBatchWriter(filePath, t, batchNum) - err := batchWriter.Init() - if err != nil { - utils.ErrExit("initializing batch writer for table: %q: %s", t, err) - } - // Write the header if necessary - if header != "" && dataFileDescriptor.FileFormat == datafile.CSV { - err = batchWriter.WriteHeader(header) - if err != nil { - utils.ErrExit("writing header for table: %q: %s", t, err) - } - } - } - - // Function to finalize and submit the current batch - finalizeBatch := func(isLastBatch bool, offsetEnd int64, bytesInBatch int64) { - batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch) - if err != nil { - utils.ErrExit("finalizing batch %d: %s", batchNum, err) - } - batchWriter = nil - submitBatch(batch, updateProgressFn, importBatchArgsProto) - - // Increment batchNum only if this is not the last batch - if !isLastBatch { - batchNum++ - } - } - - for readLineErr == nil { - - if batchWriter == nil { - initBatchWriter() // Create a new batchWriter - } - - line, currentBytesRead, readLineErr = dataFile.NextLine() - if readLineErr == nil || (readLineErr == io.EOF && line != "") { - // handling possible case: last dataline(i.e. EOF) but no newline char at the end - numLinesTaken += 1 - } - log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, dataFile.GetBytesRead(), currentBytesRead) - if currentBytesRead > tdb.MaxBatchSizeInBytes() { - //If a row is itself larger than MaxBatchSizeInBytes erroring out - ybSpecificMsg := "" - if tconf.TargetDBType == YUGABYTEDB { - ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)" - } - utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) - } - if line != "" { - // can't use importBatchArgsProto.Columns as to use case insenstiive column names - columnNames, _ := TableToColumnNames.Get(t) - line, err = valueConverter.ConvertRow(t, columnNames, line) - if err != nil { - utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, t.ForOutput(), filePath, err) - } - - // Check if adding this record exceeds the max batch size - if batchWriter.NumRecordsWritten == batchSizeInNumRows || - dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead - - // Finalize the current batch without adding the record - finalizeBatch(false, numLinesTaken-1, dataFile.GetBytesRead()-currentBytesRead) - - //carry forward the bytes to next batch - dataFile.ResetBytesRead(currentBytesRead) - - // Start a new batch by calling the initBatchWriter function - initBatchWriter() - } - - // Write the record to the new or current batch - err = batchWriter.WriteRecord(line) - if err != nil { - utils.ErrExit("Write to batch %d: %s", batchNum, err) - } - } - - // Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0 - if readLineErr == io.EOF { - finalizeBatch(true, numLinesTaken, dataFile.GetBytesRead()) - dataFile.ResetBytesRead(0) - } else if readLineErr != nil { - utils.ErrExit("read line from data file: %q: %s", filePath, readLineErr) - } - } - - log.Infof("splitFilesForTable: done splitting data file %q for table %q", filePath, t) } func executePostSnapshotImportSqls() error { diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go index e9872341ab..4bccbd53f2 100644 --- a/yb-voyager/cmd/importDataFileBatchProducer.go +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -72,7 +72,7 @@ func (p *FileBatchProducer) Done() bool { func (p *FileBatchProducer) NextBatch() (*Batch, error) { if p.Done() { - return nil, fmt.Errorf("already done") + return nil, fmt.Errorf("already completed producing all batches") } if len(p.pendingBatches) > 0 { batch := p.pendingBatches[0] @@ -176,7 +176,8 @@ func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr) } } - return nil, fmt.Errorf("unexpected") + // ideally should not reach here + return nil, fmt.Errorf("could not produce next batch: err: %w", readLineErr) } func (p *FileBatchProducer) openDataFile() error { @@ -229,10 +230,4 @@ func (p *FileBatchProducer) finalizeBatch(batchWriter *BatchWriter, isLastBatch batchWriter = nil p.lastBatchNumber = batchNum return batch, nil - // submitBatch(batch, updateProgressFn, importBatchArgsProto) - - // Increment batchNum only if this is not the last batch - // if !isLastBatch { - // batchNum++ - // } } From c8390e3d2291f08d28776ac85318168a4f7b9bdf Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Tue, 21 Jan 2025 13:55:01 +0530 Subject: [PATCH 14/18] wip --- yb-voyager/cmd/importDataFileTaskImporter.go | 25 ++++++++++++++++++++ 1 file changed, 25 insertions(+) create mode 100644 yb-voyager/cmd/importDataFileTaskImporter.go diff --git a/yb-voyager/cmd/importDataFileTaskImporter.go b/yb-voyager/cmd/importDataFileTaskImporter.go new file mode 100644 index 0000000000..af9d1c3f97 --- /dev/null +++ b/yb-voyager/cmd/importDataFileTaskImporter.go @@ -0,0 +1,25 @@ +/* +Copyright (c) YugabyteDB, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package cmd + +import "github.com/sourcegraph/conc/pool" + +type FileTaskImporter struct { + task *ImportFileTask + state *ImportDataState + batchProducer *FileBatchProducer + workerPool *pool.Pool +} From 3571226561ba1609dee909f156dfd8d887a566c9 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 22 Jan 2025 10:38:35 +0530 Subject: [PATCH 15/18] filetaskimporter --- yb-voyager/cmd/importData.go | 113 --------- yb-voyager/cmd/importDataFileTaskImporter.go | 232 ++++++++++++++++++- 2 files changed, 227 insertions(+), 118 deletions(-) diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 09a48e01d8..b58c616935 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -855,30 +855,6 @@ func getIdentityColumnsForTables(tables []sqlname.NameTuple, identityType string return result } -func getTotalProgressAmount(task *ImportFileTask) int64 { - if reportProgressInBytes { - return task.FileSize - } else { - return task.RowCount - } -} - -func getImportedProgressAmount(task *ImportFileTask, state *ImportDataState) int64 { - if reportProgressInBytes { - byteCount, err := state.GetImportedByteCount(task.FilePath, task.TableNameTup) - if err != nil { - utils.ErrExit("Failed to get imported byte count for table: %s: %s", task.TableNameTup, err) - } - return byteCount - } else { - rowCount, err := state.GetImportedRowCount(task.FilePath, task.TableNameTup) - if err != nil { - utils.ErrExit("Failed to get imported row count for table: %s: %s", task.TableNameTup, err) - } - return rowCount - } -} - func importFileTasksToTableNames(tasks []*ImportFileTask) []string { tableNames := []string{} for _, t := range tasks { @@ -974,31 +950,6 @@ func cleanImportState(state *ImportDataState, tasks []*ImportFileTask) { } } -func getImportBatchArgsProto(tableNameTup sqlname.NameTuple, filePath string) *tgtdb.ImportBatchArgs { - columns, _ := TableToColumnNames.Get(tableNameTup) - columns, err := tdb.QuoteAttributeNames(tableNameTup, columns) - if err != nil { - utils.ErrExit("if required quote column names: %s", err) - } - // If `columns` is unset at this point, no attribute list is passed in the COPY command. - fileFormat := dataFileDescriptor.FileFormat - if fileFormat == datafile.SQL { - fileFormat = datafile.TEXT - } - importBatchArgsProto := &tgtdb.ImportBatchArgs{ - TableNameTup: tableNameTup, - Columns: columns, - FileFormat: fileFormat, - Delimiter: dataFileDescriptor.Delimiter, - HasHeader: dataFileDescriptor.HasHeader && fileFormat == datafile.CSV, - QuoteChar: dataFileDescriptor.QuoteChar, - EscapeChar: dataFileDescriptor.EscapeChar, - NullString: dataFileDescriptor.NullString, - } - log.Infof("ImportBatchArgs: %v", spew.Sdump(importBatchArgsProto)) - return importBatchArgsProto -} - func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn func(int64)) { origDataFile := task.FilePath @@ -1051,44 +1002,6 @@ func submitBatch(batch *Batch, updateProgressFn func(int64), importBatchArgsProt log.Infof("Queued batch: %s", spew.Sdump(batch)) } -func importBatch(batch *Batch, importBatchArgsProto *tgtdb.ImportBatchArgs) { - err := batch.MarkPending() - if err != nil { - utils.ErrExit("marking batch as pending: %d: %s", batch.Number, err) - } - log.Infof("Importing %q", batch.FilePath) - - importBatchArgs := *importBatchArgsProto - importBatchArgs.FilePath = batch.FilePath - importBatchArgs.RowsPerTransaction = batch.OffsetEnd - batch.OffsetStart - - var rowsAffected int64 - sleepIntervalSec := 0 - for attempt := 0; attempt < COPY_MAX_RETRY_COUNT; attempt++ { - tableSchema, _ := TableNameToSchema.Get(batch.TableNameTup) - rowsAffected, err = tdb.ImportBatch(batch, &importBatchArgs, exportDir, tableSchema) - if err == nil || tdb.IsNonRetryableCopyError(err) { - break - } - log.Warnf("COPY FROM file %q: %s", batch.FilePath, err) - sleepIntervalSec += 10 - if sleepIntervalSec > MAX_SLEEP_SECOND { - sleepIntervalSec = MAX_SLEEP_SECOND - } - log.Infof("sleep for %d seconds before retrying the file %s (attempt %d)", - sleepIntervalSec, batch.FilePath, attempt) - time.Sleep(time.Duration(sleepIntervalSec) * time.Second) - } - log.Infof("%q => %d rows affected", batch.FilePath, rowsAffected) - if err != nil { - utils.ErrExit("import batch: %q into %s: %s", batch.FilePath, batch.TableNameTup, err) - } - err = batch.MarkDone() - if err != nil { - utils.ErrExit("marking batch as done: %q: %s", batch.FilePath, err) - } -} - func getIndexName(sqlQuery string, indexName string) (string, error) { // Return the index name itself if it is aleady qualified with schema name if len(strings.Split(indexName, ".")) == 2 { @@ -1237,29 +1150,3 @@ func createInitialImportDataTableMetrics(tasks []*ImportFileTask) []*cp.UpdateIm return result } - -func createImportDataTableMetrics(tableName string, countLiveRows int64, countTotalRows int64, - status int) cp.UpdateImportedRowCountEvent { - - var schemaName, tableName2 string - if strings.Count(tableName, ".") == 1 { - schemaName, tableName2 = cp.SplitTableNameForPG(tableName) - } else { - schemaName, tableName2 = tconf.Schema, tableName - } - result := cp.UpdateImportedRowCountEvent{ - BaseUpdateRowCountEvent: cp.BaseUpdateRowCountEvent{ - BaseEvent: cp.BaseEvent{ - EventType: "IMPORT DATA", - MigrationUUID: migrationUUID, - SchemaNames: []string{schemaName}, - }, - TableName: tableName2, - Status: cp.EXPORT_OR_IMPORT_DATA_STATUS_INT_TO_STR[status], - TotalRowCount: countTotalRows, - CompletedRowCount: countLiveRows, - }, - } - - return result -} diff --git a/yb-voyager/cmd/importDataFileTaskImporter.go b/yb-voyager/cmd/importDataFileTaskImporter.go index af9d1c3f97..2b5834e3b8 100644 --- a/yb-voyager/cmd/importDataFileTaskImporter.go +++ b/yb-voyager/cmd/importDataFileTaskImporter.go @@ -15,11 +15,233 @@ limitations under the License. */ package cmd -import "github.com/sourcegraph/conc/pool" +import ( + "fmt" + "strings" + "time" + + "github.com/davecgh/go-spew/spew" + log "github.com/sirupsen/logrus" + "github.com/sourcegraph/conc/pool" + "github.com/yugabyte/yb-voyager/yb-voyager/src/cp" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" + "github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" +) type FileTaskImporter struct { - task *ImportFileTask - state *ImportDataState - batchProducer *FileBatchProducer - workerPool *pool.Pool + task *ImportFileTask + state *ImportDataState + batchProducer *FileBatchProducer + importBatchArgsProto *tgtdb.ImportBatchArgs + workerPool *pool.Pool + + totalProgressAmount int64 + currentProgressAmount int64 + progressReporter *ImportDataProgressReporter +} + +func NewFileTaskImporter(task *ImportFileTask, state *ImportDataState, workerPool *pool.Pool, + progressReporter *ImportDataProgressReporter) (*FileTaskImporter, error) { + batchProducer, err := NewFileBatchProducer(task, state) + if err != nil { + return nil, fmt.Errorf("creating file batch producer: %s", err) + } + totalProgressAmount := getTotalProgressAmount(task) + progressReporter.ImportFileStarted(task, totalProgressAmount) + progressReporter.AddProgressAmount(task, getImportedProgressAmount(task, state)) + + fti := &FileTaskImporter{ + task: task, + state: state, + batchProducer: batchProducer, + workerPool: workerPool, + importBatchArgsProto: getImportBatchArgsProto(task.TableNameTup, task.FilePath), + progressReporter: progressReporter, + totalProgressAmount: getTotalProgressAmount(task), + } + return fti, nil +} + +// as of now, batch production and batch submission +// is done together in `SubmitNextBatch` method. +// In other words, as soon as a batch is produced, it is submitted. +// Therefore, to check whether all batches are submitted, we can check +// if the batch producer is done. +func (fti *FileTaskImporter) AllBatchesSubmitted() bool { + return fti.batchProducer.Done() +} + +func (fti *FileTaskImporter) AllBatchesImported() error { + // TODO: check importDataState for status. + panic("not implemented") +} + +func (fti *FileTaskImporter) SubmitNextBatch() error { + if fti.batchProducer.Done() { + return fmt.Errorf("no more batches to submit") + } + batch, err := fti.batchProducer.NextBatch() + if err != nil { + return fmt.Errorf("getting next batch: %w", err) + } + return fti.submitBatch(batch) +} + +func (fti *FileTaskImporter) submitBatch(batch *Batch) error { + fti.workerPool.Go(func() { + // There are `poolSize` number of competing go-routines trying to invoke COPY. + // But the `connPool` will allow only `parallelism` number of connections to be + // used at a time. Thus limiting the number of concurrent COPYs to `parallelism`. + importBatch(batch, fti.importBatchArgsProto) + if reportProgressInBytes { + fti.updateProgress(batch.ByteCount) + } else { + fti.updateProgress(batch.RecordCount) + } + }) + log.Infof("Queued batch: %s", spew.Sdump(batch)) + return nil +} + +func (fti *FileTaskImporter) updateProgress(progressAmount int64) { + fti.currentProgressAmount += progressAmount + fti.progressReporter.AddProgressAmount(fti.task, progressAmount) + + if importerRole == TARGET_DB_IMPORTER_ROLE && fti.totalProgressAmount > fti.currentProgressAmount { + importDataTableMetrics := createImportDataTableMetrics(fti.task.TableNameTup.ForKey(), + fti.currentProgressAmount, fti.totalProgressAmount, ROW_UPDATE_STATUS_IN_PROGRESS) + // The metrics are sent after evry 5 secs in implementation of UpdateImportedRowCount + controlPlane.UpdateImportedRowCount( + []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + } +} + +func (fti *FileTaskImporter) PostProcess() { + // TODO: close fileBatchProducer properly + + if importerRole == TARGET_DB_IMPORTER_ROLE { + importDataTableMetrics := createImportDataTableMetrics(fti.task.TableNameTup.ForKey(), + fti.currentProgressAmount, fti.totalProgressAmount, ROW_UPDATE_STATUS_COMPLETED) + controlPlane.UpdateImportedRowCount( + []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + } + + fti.progressReporter.FileImportDone(fti.task) // Remove the progress-bar for the file.\ +} + +// ============================================================================= // + +func getTotalProgressAmount(task *ImportFileTask) int64 { + if reportProgressInBytes { + return task.FileSize + } else { + return task.RowCount + } +} + +func getImportedProgressAmount(task *ImportFileTask, state *ImportDataState) int64 { + if reportProgressInBytes { + byteCount, err := state.GetImportedByteCount(task.FilePath, task.TableNameTup) + if err != nil { + utils.ErrExit("Failed to get imported byte count for table: %s: %s", task.TableNameTup, err) + } + return byteCount + } else { + rowCount, err := state.GetImportedRowCount(task.FilePath, task.TableNameTup) + if err != nil { + utils.ErrExit("Failed to get imported row count for table: %s: %s", task.TableNameTup, err) + } + return rowCount + } +} + +func createImportDataTableMetrics(tableName string, countLiveRows int64, countTotalRows int64, + status int) cp.UpdateImportedRowCountEvent { + + var schemaName, tableName2 string + if strings.Count(tableName, ".") == 1 { + schemaName, tableName2 = cp.SplitTableNameForPG(tableName) + } else { + schemaName, tableName2 = tconf.Schema, tableName + } + result := cp.UpdateImportedRowCountEvent{ + BaseUpdateRowCountEvent: cp.BaseUpdateRowCountEvent{ + BaseEvent: cp.BaseEvent{ + EventType: "IMPORT DATA", + MigrationUUID: migrationUUID, + SchemaNames: []string{schemaName}, + }, + TableName: tableName2, + Status: cp.EXPORT_OR_IMPORT_DATA_STATUS_INT_TO_STR[status], + TotalRowCount: countTotalRows, + CompletedRowCount: countLiveRows, + }, + } + + return result +} + +func getImportBatchArgsProto(tableNameTup sqlname.NameTuple, filePath string) *tgtdb.ImportBatchArgs { + columns, _ := TableToColumnNames.Get(tableNameTup) + columns, err := tdb.QuoteAttributeNames(tableNameTup, columns) + if err != nil { + utils.ErrExit("if required quote column names: %s", err) + } + // If `columns` is unset at this point, no attribute list is passed in the COPY command. + fileFormat := dataFileDescriptor.FileFormat + if fileFormat == datafile.SQL { + fileFormat = datafile.TEXT + } + importBatchArgsProto := &tgtdb.ImportBatchArgs{ + TableNameTup: tableNameTup, + Columns: columns, + FileFormat: fileFormat, + Delimiter: dataFileDescriptor.Delimiter, + HasHeader: dataFileDescriptor.HasHeader && fileFormat == datafile.CSV, + QuoteChar: dataFileDescriptor.QuoteChar, + EscapeChar: dataFileDescriptor.EscapeChar, + NullString: dataFileDescriptor.NullString, + } + log.Infof("ImportBatchArgs: %v", spew.Sdump(importBatchArgsProto)) + return importBatchArgsProto +} + +func importBatch(batch *Batch, importBatchArgsProto *tgtdb.ImportBatchArgs) { + err := batch.MarkPending() + if err != nil { + utils.ErrExit("marking batch as pending: %d: %s", batch.Number, err) + } + log.Infof("Importing %q", batch.FilePath) + + importBatchArgs := *importBatchArgsProto + importBatchArgs.FilePath = batch.FilePath + importBatchArgs.RowsPerTransaction = batch.OffsetEnd - batch.OffsetStart + + var rowsAffected int64 + sleepIntervalSec := 0 + for attempt := 0; attempt < COPY_MAX_RETRY_COUNT; attempt++ { + tableSchema, _ := TableNameToSchema.Get(batch.TableNameTup) + rowsAffected, err = tdb.ImportBatch(batch, &importBatchArgs, exportDir, tableSchema) + if err == nil || tdb.IsNonRetryableCopyError(err) { + break + } + log.Warnf("COPY FROM file %q: %s", batch.FilePath, err) + sleepIntervalSec += 10 + if sleepIntervalSec > MAX_SLEEP_SECOND { + sleepIntervalSec = MAX_SLEEP_SECOND + } + log.Infof("sleep for %d seconds before retrying the file %s (attempt %d)", + sleepIntervalSec, batch.FilePath, attempt) + time.Sleep(time.Duration(sleepIntervalSec) * time.Second) + } + log.Infof("%q => %d rows affected", batch.FilePath, rowsAffected) + if err != nil { + utils.ErrExit("import batch: %q into %s: %s", batch.FilePath, batch.TableNameTup, err) + } + err = batch.MarkDone() + if err != nil { + utils.ErrExit("marking batch as done: %q: %s", batch.FilePath, err) + } } From 1e4b1c8dcb3a94d353253aebd02b4bbc499af197 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 22 Jan 2025 10:39:48 +0530 Subject: [PATCH 16/18] run tests --- .github/workflows/go.yml | 2 +- .github/workflows/integration-tests.yml | 2 +- .github/workflows/issue-tests.yml | 2 +- .github/workflows/misc-migtests.yml | 2 +- .github/workflows/mysql-migtests.yml | 2 +- .github/workflows/pg-13-migtests.yml | 2 +- .github/workflows/pg-17-migtests.yml | 2 +- .github/workflows/pg-9-migtests.yml | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 88c20428d5..2d6a1c8098 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -4,7 +4,7 @@ on: push: branches: ['main', '*.*-dev', '*.*.*-dev'] pull_request: - branches: [main] + branches: ['*'] jobs: diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index d94a594cb1..4d71e66fd3 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -4,7 +4,7 @@ on: push: branches: ['main', '*.*-dev', '*.*.*-dev'] pull_request: - branches: [main] + branches: ['*'] env: ORACLE_INSTANT_CLIENT_VERSION: "21.5.0.0.0-1" diff --git a/.github/workflows/issue-tests.yml b/.github/workflows/issue-tests.yml index 550eef3c8a..3b72d282fd 100644 --- a/.github/workflows/issue-tests.yml +++ b/.github/workflows/issue-tests.yml @@ -4,7 +4,7 @@ on: push: branches: ['main', '*.*-dev', '*.*.*-dev'] pull_request: - branches: [main] + branches: ['*'] jobs: diff --git a/.github/workflows/misc-migtests.yml b/.github/workflows/misc-migtests.yml index 3362028481..fdb8fe0e1e 100644 --- a/.github/workflows/misc-migtests.yml +++ b/.github/workflows/misc-migtests.yml @@ -4,7 +4,7 @@ on: push: branches: ['main', '*.*-dev', '*.*.*-dev'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-misc-migration-tests: diff --git a/.github/workflows/mysql-migtests.yml b/.github/workflows/mysql-migtests.yml index 1dfda76a62..65e15e2431 100644 --- a/.github/workflows/mysql-migtests.yml +++ b/.github/workflows/mysql-migtests.yml @@ -4,7 +4,7 @@ on: push: branches: ['main', '*.*-dev', '*.*.*-dev'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-mysql-migration-tests: diff --git a/.github/workflows/pg-13-migtests.yml b/.github/workflows/pg-13-migtests.yml index d6b03ec165..43762a0fa6 100644 --- a/.github/workflows/pg-13-migtests.yml +++ b/.github/workflows/pg-13-migtests.yml @@ -4,7 +4,7 @@ on: push: branches: ['main', '*.*-dev', '*.*.*-dev'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-pg-13-migration-tests: diff --git a/.github/workflows/pg-17-migtests.yml b/.github/workflows/pg-17-migtests.yml index 27aacc0c68..231ec42282 100644 --- a/.github/workflows/pg-17-migtests.yml +++ b/.github/workflows/pg-17-migtests.yml @@ -4,7 +4,7 @@ on: push: branches: ['main', '*.*-dev', '*.*.*-dev'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-pg-17-migration-tests: diff --git a/.github/workflows/pg-9-migtests.yml b/.github/workflows/pg-9-migtests.yml index 8fa3251d1b..44c0e2a81f 100644 --- a/.github/workflows/pg-9-migtests.yml +++ b/.github/workflows/pg-9-migtests.yml @@ -4,7 +4,7 @@ on: push: branches: ['main', '*.*-dev', '*.*.*-dev'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-pg-9-migration-tests: From 0c2ec2476d228a5715693a624f44b68d132a362d Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 22 Jan 2025 10:45:35 +0530 Subject: [PATCH 17/18] use task importer --- yb-voyager/cmd/importData.go | 132 +++++++++++++++++++---------------- 1 file changed, 72 insertions(+), 60 deletions(-) diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index b58c616935..e15d650ebc 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -591,36 +591,48 @@ func importData(importFileTasks []*ImportFileTask) { batchImportPool = pool.New().WithMaxGoroutines(poolSize) log.Infof("created batch import pool of size: %d", poolSize) - totalProgressAmount := getTotalProgressAmount(task) - progressReporter.ImportFileStarted(task, totalProgressAmount) - importedProgressAmount := getImportedProgressAmount(task, state) - progressReporter.AddProgressAmount(task, importedProgressAmount) - - var currentProgress int64 - updateProgressFn := func(progressAmount int64) { - currentProgress += progressAmount - progressReporter.AddProgressAmount(task, progressAmount) - - if importerRole == TARGET_DB_IMPORTER_ROLE && totalProgressAmount > currentProgress { - importDataTableMetrics := createImportDataTableMetrics(task.TableNameTup.ForKey(), - currentProgress, totalProgressAmount, ROW_UPDATE_STATUS_IN_PROGRESS) - // The metrics are sent after evry 5 secs in implementation of UpdateImportedRowCount - controlPlane.UpdateImportedRowCount( - []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) - } + taskImporter, err := NewFileTaskImporter(task, state, batchImportPool, progressReporter) + if err != nil { + utils.ErrExit("Failed to create file task importer: %s", err) } - importFile(state, task, updateProgressFn) - batchImportPool.Wait() // Wait for the file import to finish. - - if importerRole == TARGET_DB_IMPORTER_ROLE { - importDataTableMetrics := createImportDataTableMetrics(task.TableNameTup.ForKey(), - currentProgress, totalProgressAmount, ROW_UPDATE_STATUS_COMPLETED) - controlPlane.UpdateImportedRowCount( - []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + // totalProgressAmount := getTotalProgressAmount(task) + // progressReporter.ImportFileStarted(task, totalProgressAmount) + // importedProgressAmount := getImportedProgressAmount(task, state) + // progressReporter.AddProgressAmount(task, importedProgressAmount) + + // var currentProgress int64 + // updateProgressFn := func(progressAmount int64) { + // currentProgress += progressAmount + // progressReporter.AddProgressAmount(task, progressAmount) + + // if importerRole == TARGET_DB_IMPORTER_ROLE && totalProgressAmount > currentProgress { + // importDataTableMetrics := createImportDataTableMetrics(task.TableNameTup.ForKey(), + // currentProgress, totalProgressAmount, ROW_UPDATE_STATUS_IN_PROGRESS) + // // The metrics are sent after evry 5 secs in implementation of UpdateImportedRowCount + // controlPlane.UpdateImportedRowCount( + // []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + // } + // } + + // importFile(state, task, updateProgressFn) + for !taskImporter.AllBatchesSubmitted() { + err := taskImporter.SubmitNextBatch() + if err != nil { + utils.ErrExit("Failed to submit next batch: task:%v err: %s", task, err) + } } - progressReporter.FileImportDone(task) // Remove the progress-bar for the file.\ + batchImportPool.Wait() // Wait for the file import to finish. + taskImporter.PostProcess() + // if importerRole == TARGET_DB_IMPORTER_ROLE { + // importDataTableMetrics := createImportDataTableMetrics(task.TableNameTup.ForKey(), + // currentProgress, totalProgressAmount, ROW_UPDATE_STATUS_COMPLETED) + // controlPlane.UpdateImportedRowCount( + // []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + // } + + // progressReporter.FileImportDone(task) // Remove the progress-bar for the file.\ } time.Sleep(time.Second * 2) } @@ -950,30 +962,30 @@ func cleanImportState(state *ImportDataState, tasks []*ImportFileTask) { } } -func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn func(int64)) { +// func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn func(int64)) { - origDataFile := task.FilePath - importBatchArgsProto := getImportBatchArgsProto(task.TableNameTup, task.FilePath) - log.Infof("Start splitting table %q: data-file: %q", task.TableNameTup, origDataFile) +// origDataFile := task.FilePath +// importBatchArgsProto := getImportBatchArgsProto(task.TableNameTup, task.FilePath) +// log.Infof("Start splitting table %q: data-file: %q", task.TableNameTup, origDataFile) - err := state.PrepareForFileImport(task.FilePath, task.TableNameTup) - if err != nil { - utils.ErrExit("preparing for file import: %s", err) - } +// err := state.PrepareForFileImport(task.FilePath, task.TableNameTup) +// if err != nil { +// utils.ErrExit("preparing for file import: %s", err) +// } - fileBatchProducer, err := NewFileBatchProducer(task, state) - if err != nil { - utils.ErrExit("creating file batch producer: %s", err) - } +// fileBatchProducer, err := NewFileBatchProducer(task, state) +// if err != nil { +// utils.ErrExit("creating file batch producer: %s", err) +// } - for !fileBatchProducer.Done() { - batch, err := fileBatchProducer.NextBatch() - if err != nil { - utils.ErrExit("getting next batch: %s", err) - } - submitBatch(batch, updateProgressFn, importBatchArgsProto) - } -} +// for !fileBatchProducer.Done() { +// batch, err := fileBatchProducer.NextBatch() +// if err != nil { +// utils.ErrExit("getting next batch: %s", err) +// } +// submitBatch(batch, updateProgressFn, importBatchArgsProto) +// } +// } func executePostSnapshotImportSqls() error { sequenceFilePath := filepath.Join(exportDir, "data", "postdata.sql") @@ -987,20 +999,20 @@ func executePostSnapshotImportSqls() error { return nil } -func submitBatch(batch *Batch, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) { - batchImportPool.Go(func() { - // There are `poolSize` number of competing go-routines trying to invoke COPY. - // But the `connPool` will allow only `parallelism` number of connections to be - // used at a time. Thus limiting the number of concurrent COPYs to `parallelism`. - importBatch(batch, importBatchArgsProto) - if reportProgressInBytes { - updateProgressFn(batch.ByteCount) - } else { - updateProgressFn(batch.RecordCount) - } - }) - log.Infof("Queued batch: %s", spew.Sdump(batch)) -} +// func submitBatch(batch *Batch, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) { +// batchImportPool.Go(func() { +// // There are `poolSize` number of competing go-routines trying to invoke COPY. +// // But the `connPool` will allow only `parallelism` number of connections to be +// // used at a time. Thus limiting the number of concurrent COPYs to `parallelism`. +// importBatch(batch, importBatchArgsProto) +// if reportProgressInBytes { +// updateProgressFn(batch.ByteCount) +// } else { +// updateProgressFn(batch.RecordCount) +// } +// }) +// log.Infof("Queued batch: %s", spew.Sdump(batch)) +// } func getIndexName(sqlQuery string, indexName string) (string, error) { // Return the index name itself if it is aleady qualified with schema name From 5dbacbd9ffe41f117c834a6a04b27e8a66513577 Mon Sep 17 00:00:00 2001 From: Aneesh Makala Date: Wed, 22 Jan 2025 10:47:03 +0530 Subject: [PATCH 18/18] run tests 2 --- .github/workflows/go.yml | 2 +- .github/workflows/integration-tests.yml | 2 +- .github/workflows/issue-tests.yml | 2 +- .github/workflows/misc-migtests.yml | 2 +- .github/workflows/mysql-migtests.yml | 2 +- .github/workflows/pg-13-migtests.yml | 2 +- .github/workflows/pg-17-migtests.yml | 2 +- .github/workflows/pg-9-migtests.yml | 2 +- 8 files changed, 8 insertions(+), 8 deletions(-) diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 2d6a1c8098..e2794811e7 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -2,7 +2,7 @@ name: Go on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: branches: ['*'] diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index 4d71e66fd3..af9410ed55 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -2,7 +2,7 @@ name: Go on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: branches: ['*'] diff --git a/.github/workflows/issue-tests.yml b/.github/workflows/issue-tests.yml index 3b72d282fd..40693caeb5 100644 --- a/.github/workflows/issue-tests.yml +++ b/.github/workflows/issue-tests.yml @@ -2,7 +2,7 @@ name: Go on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: branches: ['*'] diff --git a/.github/workflows/misc-migtests.yml b/.github/workflows/misc-migtests.yml index fdb8fe0e1e..3f7690e4ad 100644 --- a/.github/workflows/misc-migtests.yml +++ b/.github/workflows/misc-migtests.yml @@ -2,7 +2,7 @@ name: "Misc: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: branches: ['*'] diff --git a/.github/workflows/mysql-migtests.yml b/.github/workflows/mysql-migtests.yml index 65e15e2431..4f3b50ef0d 100644 --- a/.github/workflows/mysql-migtests.yml +++ b/.github/workflows/mysql-migtests.yml @@ -2,7 +2,7 @@ name: "MySQL: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: branches: ['*'] diff --git a/.github/workflows/pg-13-migtests.yml b/.github/workflows/pg-13-migtests.yml index 43762a0fa6..2b7b023d49 100644 --- a/.github/workflows/pg-13-migtests.yml +++ b/.github/workflows/pg-13-migtests.yml @@ -2,7 +2,7 @@ name: "PG 13: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: branches: ['*'] diff --git a/.github/workflows/pg-17-migtests.yml b/.github/workflows/pg-17-migtests.yml index 231ec42282..8016d72261 100644 --- a/.github/workflows/pg-17-migtests.yml +++ b/.github/workflows/pg-17-migtests.yml @@ -2,7 +2,7 @@ name: "PG 17: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: branches: ['*'] diff --git a/.github/workflows/pg-9-migtests.yml b/.github/workflows/pg-9-migtests.yml index 44c0e2a81f..7cca6ba363 100644 --- a/.github/workflows/pg-9-migtests.yml +++ b/.github/workflows/pg-9-migtests.yml @@ -2,7 +2,7 @@ name: "PG 9: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: branches: ['*']