diff --git a/.github/workflows/go.yml b/.github/workflows/go.yml index 88c20428d5..e2794811e7 100644 --- a/.github/workflows/go.yml +++ b/.github/workflows/go.yml @@ -2,9 +2,9 @@ name: Go on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: - branches: [main] + branches: ['*'] jobs: diff --git a/.github/workflows/integration-tests.yml b/.github/workflows/integration-tests.yml index d94a594cb1..af9410ed55 100644 --- a/.github/workflows/integration-tests.yml +++ b/.github/workflows/integration-tests.yml @@ -2,9 +2,9 @@ name: Go on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: - branches: [main] + branches: ['*'] env: ORACLE_INSTANT_CLIENT_VERSION: "21.5.0.0.0-1" diff --git a/.github/workflows/issue-tests.yml b/.github/workflows/issue-tests.yml index 550eef3c8a..40693caeb5 100644 --- a/.github/workflows/issue-tests.yml +++ b/.github/workflows/issue-tests.yml @@ -2,9 +2,9 @@ name: Go on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: - branches: [main] + branches: ['*'] jobs: diff --git a/.github/workflows/misc-migtests.yml b/.github/workflows/misc-migtests.yml index 3362028481..3f7690e4ad 100644 --- a/.github/workflows/misc-migtests.yml +++ b/.github/workflows/misc-migtests.yml @@ -2,9 +2,9 @@ name: "Misc: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-misc-migration-tests: diff --git a/.github/workflows/mysql-migtests.yml b/.github/workflows/mysql-migtests.yml index 1dfda76a62..4f3b50ef0d 100644 --- a/.github/workflows/mysql-migtests.yml +++ b/.github/workflows/mysql-migtests.yml @@ -2,9 +2,9 @@ name: "MySQL: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-mysql-migration-tests: diff --git a/.github/workflows/pg-13-migtests.yml b/.github/workflows/pg-13-migtests.yml index d6b03ec165..2b7b023d49 100644 --- a/.github/workflows/pg-13-migtests.yml +++ b/.github/workflows/pg-13-migtests.yml @@ -2,9 +2,9 @@ name: "PG 13: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-pg-13-migration-tests: diff --git a/.github/workflows/pg-17-migtests.yml b/.github/workflows/pg-17-migtests.yml index 27aacc0c68..8016d72261 100644 --- a/.github/workflows/pg-17-migtests.yml +++ b/.github/workflows/pg-17-migtests.yml @@ -2,9 +2,9 @@ name: "PG 17: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-pg-17-migration-tests: diff --git a/.github/workflows/pg-9-migtests.yml b/.github/workflows/pg-9-migtests.yml index 8fa3251d1b..7cca6ba363 100644 --- a/.github/workflows/pg-9-migtests.yml +++ b/.github/workflows/pg-9-migtests.yml @@ -2,9 +2,9 @@ name: "PG 9: Migration Tests" on: push: - branches: ['main', '*.*-dev', '*.*.*-dev'] + branches: ['*'] pull_request: - branches: ['main'] + branches: ['*'] jobs: run-pg-9-migration-tests: diff --git a/yb-voyager/cmd/importData.go b/yb-voyager/cmd/importData.go index 45050cf615..e15d650ebc 100644 --- a/yb-voyager/cmd/importData.go +++ b/yb-voyager/cmd/importData.go @@ -17,7 +17,6 @@ package cmd import ( "fmt" - "io" "os" "os/exec" "path/filepath" @@ -592,36 +591,48 @@ func importData(importFileTasks []*ImportFileTask) { batchImportPool = pool.New().WithMaxGoroutines(poolSize) log.Infof("created batch import pool of size: %d", poolSize) - totalProgressAmount := getTotalProgressAmount(task) - progressReporter.ImportFileStarted(task, totalProgressAmount) - importedProgressAmount := getImportedProgressAmount(task, state) - progressReporter.AddProgressAmount(task, importedProgressAmount) - - var currentProgress int64 - updateProgressFn := func(progressAmount int64) { - currentProgress += progressAmount - progressReporter.AddProgressAmount(task, progressAmount) - - if importerRole == TARGET_DB_IMPORTER_ROLE && totalProgressAmount > currentProgress { - importDataTableMetrics := createImportDataTableMetrics(task.TableNameTup.ForKey(), - currentProgress, totalProgressAmount, ROW_UPDATE_STATUS_IN_PROGRESS) - // The metrics are sent after evry 5 secs in implementation of UpdateImportedRowCount - controlPlane.UpdateImportedRowCount( - []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) - } + taskImporter, err := NewFileTaskImporter(task, state, batchImportPool, progressReporter) + if err != nil { + utils.ErrExit("Failed to create file task importer: %s", err) } - importFile(state, task, updateProgressFn) - batchImportPool.Wait() // Wait for the file import to finish. - - if importerRole == TARGET_DB_IMPORTER_ROLE { - importDataTableMetrics := createImportDataTableMetrics(task.TableNameTup.ForKey(), - currentProgress, totalProgressAmount, ROW_UPDATE_STATUS_COMPLETED) - controlPlane.UpdateImportedRowCount( - []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + // totalProgressAmount := getTotalProgressAmount(task) + // progressReporter.ImportFileStarted(task, totalProgressAmount) + // importedProgressAmount := getImportedProgressAmount(task, state) + // progressReporter.AddProgressAmount(task, importedProgressAmount) + + // var currentProgress int64 + // updateProgressFn := func(progressAmount int64) { + // currentProgress += progressAmount + // progressReporter.AddProgressAmount(task, progressAmount) + + // if importerRole == TARGET_DB_IMPORTER_ROLE && totalProgressAmount > currentProgress { + // importDataTableMetrics := createImportDataTableMetrics(task.TableNameTup.ForKey(), + // currentProgress, totalProgressAmount, ROW_UPDATE_STATUS_IN_PROGRESS) + // // The metrics are sent after evry 5 secs in implementation of UpdateImportedRowCount + // controlPlane.UpdateImportedRowCount( + // []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + // } + // } + + // importFile(state, task, updateProgressFn) + for !taskImporter.AllBatchesSubmitted() { + err := taskImporter.SubmitNextBatch() + if err != nil { + utils.ErrExit("Failed to submit next batch: task:%v err: %s", task, err) + } } - progressReporter.FileImportDone(task) // Remove the progress-bar for the file.\ + batchImportPool.Wait() // Wait for the file import to finish. + taskImporter.PostProcess() + // if importerRole == TARGET_DB_IMPORTER_ROLE { + // importDataTableMetrics := createImportDataTableMetrics(task.TableNameTup.ForKey(), + // currentProgress, totalProgressAmount, ROW_UPDATE_STATUS_COMPLETED) + // controlPlane.UpdateImportedRowCount( + // []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + // } + + // progressReporter.FileImportDone(task) // Remove the progress-bar for the file.\ } time.Sleep(time.Second * 2) } @@ -856,30 +867,6 @@ func getIdentityColumnsForTables(tables []sqlname.NameTuple, identityType string return result } -func getTotalProgressAmount(task *ImportFileTask) int64 { - if reportProgressInBytes { - return task.FileSize - } else { - return task.RowCount - } -} - -func getImportedProgressAmount(task *ImportFileTask, state *ImportDataState) int64 { - if reportProgressInBytes { - byteCount, err := state.GetImportedByteCount(task.FilePath, task.TableNameTup) - if err != nil { - utils.ErrExit("Failed to get imported byte count for table: %s: %s", task.TableNameTup, err) - } - return byteCount - } else { - rowCount, err := state.GetImportedRowCount(task.FilePath, task.TableNameTup) - if err != nil { - utils.ErrExit("Failed to get imported row count for table: %s: %s", task.TableNameTup, err) - } - return rowCount - } -} - func importFileTasksToTableNames(tasks []*ImportFileTask) []string { tableNames := []string{} for _, t := range tasks { @@ -975,177 +962,30 @@ func cleanImportState(state *ImportDataState, tasks []*ImportFileTask) { } } -func getImportBatchArgsProto(tableNameTup sqlname.NameTuple, filePath string) *tgtdb.ImportBatchArgs { - columns, _ := TableToColumnNames.Get(tableNameTup) - columns, err := tdb.QuoteAttributeNames(tableNameTup, columns) - if err != nil { - utils.ErrExit("if required quote column names: %s", err) - } - // If `columns` is unset at this point, no attribute list is passed in the COPY command. - fileFormat := dataFileDescriptor.FileFormat - if fileFormat == datafile.SQL { - fileFormat = datafile.TEXT - } - importBatchArgsProto := &tgtdb.ImportBatchArgs{ - TableNameTup: tableNameTup, - Columns: columns, - FileFormat: fileFormat, - Delimiter: dataFileDescriptor.Delimiter, - HasHeader: dataFileDescriptor.HasHeader && fileFormat == datafile.CSV, - QuoteChar: dataFileDescriptor.QuoteChar, - EscapeChar: dataFileDescriptor.EscapeChar, - NullString: dataFileDescriptor.NullString, - } - log.Infof("ImportBatchArgs: %v", spew.Sdump(importBatchArgsProto)) - return importBatchArgsProto -} - -func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn func(int64)) { - - origDataFile := task.FilePath - importBatchArgsProto := getImportBatchArgsProto(task.TableNameTup, task.FilePath) - log.Infof("Start splitting table %q: data-file: %q", task.TableNameTup, origDataFile) - - err := state.PrepareForFileImport(task.FilePath, task.TableNameTup) - if err != nil { - utils.ErrExit("preparing for file import: %s", err) - } - log.Infof("Collect all interrupted/remaining splits.") - pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) - if err != nil { - utils.ErrExit("recovering state for table: %q: %s", task.TableNameTup, err) - } - for _, batch := range pendingBatches { - submitBatch(batch, updateProgressFn, importBatchArgsProto) - } - if !fileFullySplit { - splitFilesForTable(state, origDataFile, task.TableNameTup, lastBatchNumber, lastOffset, updateProgressFn, importBatchArgsProto) - } -} - -func splitFilesForTable(state *ImportDataState, filePath string, t sqlname.NameTuple, - lastBatchNumber int64, lastOffset int64, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) { - log.Infof("Split data file %q: tableName=%q, largestSplit=%v, largestOffset=%v", filePath, t, lastBatchNumber, lastOffset) - batchNum := lastBatchNumber + 1 - numLinesTaken := lastOffset - - reader, err := dataStore.Open(filePath) - if err != nil { - utils.ErrExit("preparing reader for split generation on file: %q: %v", filePath, err) - } - - dataFile, err := datafile.NewDataFile(filePath, reader, dataFileDescriptor) - if err != nil { - utils.ErrExit("open datafile: %q: %v", filePath, err) - } - defer dataFile.Close() - - log.Infof("Skipping %d lines from %q", lastOffset, filePath) - err = dataFile.SkipLines(lastOffset) - if err != nil { - utils.ErrExit("skipping line for offset=%d: %v", lastOffset, err) - } - - var readLineErr error = nil - var line string - var currentBytesRead int64 - var batchWriter *BatchWriter - header := "" - if dataFileDescriptor.HasHeader { - header = dataFile.GetHeader() - } - - // Helper function to initialize a new batchWriter - initBatchWriter := func() { - batchWriter = state.NewBatchWriter(filePath, t, batchNum) - err := batchWriter.Init() - if err != nil { - utils.ErrExit("initializing batch writer for table: %q: %s", t, err) - } - // Write the header if necessary - if header != "" && dataFileDescriptor.FileFormat == datafile.CSV { - err = batchWriter.WriteHeader(header) - if err != nil { - utils.ErrExit("writing header for table: %q: %s", t, err) - } - } - } - - // Function to finalize and submit the current batch - finalizeBatch := func(isLastBatch bool, offsetEnd int64, bytesInBatch int64) { - batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch) - if err != nil { - utils.ErrExit("finalizing batch %d: %s", batchNum, err) - } - batchWriter = nil - submitBatch(batch, updateProgressFn, importBatchArgsProto) - - // Increment batchNum only if this is not the last batch - if !isLastBatch { - batchNum++ - } - } - - for readLineErr == nil { - - if batchWriter == nil { - initBatchWriter() // Create a new batchWriter - } - - line, currentBytesRead, readLineErr = dataFile.NextLine() - if readLineErr == nil || (readLineErr == io.EOF && line != "") { - // handling possible case: last dataline(i.e. EOF) but no newline char at the end - numLinesTaken += 1 - } - log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, dataFile.GetBytesRead(), currentBytesRead) - if currentBytesRead > tdb.MaxBatchSizeInBytes() { - //If a row is itself larger than MaxBatchSizeInBytes erroring out - ybSpecificMsg := "" - if tconf.TargetDBType == YUGABYTEDB { - ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)" - } - utils.ErrExit("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, numLinesTaken, t.ForOutput(), filePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) - } - if line != "" { - // can't use importBatchArgsProto.Columns as to use case insenstiive column names - columnNames, _ := TableToColumnNames.Get(t) - line, err = valueConverter.ConvertRow(t, columnNames, line) - if err != nil { - utils.ErrExit("transforming line number=%d for table: %q in file %s: %s", numLinesTaken, t.ForOutput(), filePath, err) - } - - // Check if adding this record exceeds the max batch size - if batchWriter.NumRecordsWritten == batchSizeInNumRows || - dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead +// func importFile(state *ImportDataState, task *ImportFileTask, updateProgressFn func(int64)) { - // Finalize the current batch without adding the record - finalizeBatch(false, numLinesTaken-1, dataFile.GetBytesRead()-currentBytesRead) - - //carry forward the bytes to next batch - dataFile.ResetBytesRead(currentBytesRead) - - // Start a new batch by calling the initBatchWriter function - initBatchWriter() - } +// origDataFile := task.FilePath +// importBatchArgsProto := getImportBatchArgsProto(task.TableNameTup, task.FilePath) +// log.Infof("Start splitting table %q: data-file: %q", task.TableNameTup, origDataFile) - // Write the record to the new or current batch - err = batchWriter.WriteRecord(line) - if err != nil { - utils.ErrExit("Write to batch %d: %s", batchNum, err) - } - } +// err := state.PrepareForFileImport(task.FilePath, task.TableNameTup) +// if err != nil { +// utils.ErrExit("preparing for file import: %s", err) +// } - // Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0 - if readLineErr == io.EOF { - finalizeBatch(true, numLinesTaken, dataFile.GetBytesRead()) - dataFile.ResetBytesRead(0) - } else if readLineErr != nil { - utils.ErrExit("read line from data file: %q: %s", filePath, readLineErr) - } - } +// fileBatchProducer, err := NewFileBatchProducer(task, state) +// if err != nil { +// utils.ErrExit("creating file batch producer: %s", err) +// } - log.Infof("splitFilesForTable: done splitting data file %q for table %q", filePath, t) -} +// for !fileBatchProducer.Done() { +// batch, err := fileBatchProducer.NextBatch() +// if err != nil { +// utils.ErrExit("getting next batch: %s", err) +// } +// submitBatch(batch, updateProgressFn, importBatchArgsProto) +// } +// } func executePostSnapshotImportSqls() error { sequenceFilePath := filepath.Join(exportDir, "data", "postdata.sql") @@ -1159,58 +999,20 @@ func executePostSnapshotImportSqls() error { return nil } -func submitBatch(batch *Batch, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) { - batchImportPool.Go(func() { - // There are `poolSize` number of competing go-routines trying to invoke COPY. - // But the `connPool` will allow only `parallelism` number of connections to be - // used at a time. Thus limiting the number of concurrent COPYs to `parallelism`. - importBatch(batch, importBatchArgsProto) - if reportProgressInBytes { - updateProgressFn(batch.ByteCount) - } else { - updateProgressFn(batch.RecordCount) - } - }) - log.Infof("Queued batch: %s", spew.Sdump(batch)) -} - -func importBatch(batch *Batch, importBatchArgsProto *tgtdb.ImportBatchArgs) { - err := batch.MarkPending() - if err != nil { - utils.ErrExit("marking batch as pending: %d: %s", batch.Number, err) - } - log.Infof("Importing %q", batch.FilePath) - - importBatchArgs := *importBatchArgsProto - importBatchArgs.FilePath = batch.FilePath - importBatchArgs.RowsPerTransaction = batch.OffsetEnd - batch.OffsetStart - - var rowsAffected int64 - sleepIntervalSec := 0 - for attempt := 0; attempt < COPY_MAX_RETRY_COUNT; attempt++ { - tableSchema, _ := TableNameToSchema.Get(batch.TableNameTup) - rowsAffected, err = tdb.ImportBatch(batch, &importBatchArgs, exportDir, tableSchema) - if err == nil || tdb.IsNonRetryableCopyError(err) { - break - } - log.Warnf("COPY FROM file %q: %s", batch.FilePath, err) - sleepIntervalSec += 10 - if sleepIntervalSec > MAX_SLEEP_SECOND { - sleepIntervalSec = MAX_SLEEP_SECOND - } - log.Infof("sleep for %d seconds before retrying the file %s (attempt %d)", - sleepIntervalSec, batch.FilePath, attempt) - time.Sleep(time.Duration(sleepIntervalSec) * time.Second) - } - log.Infof("%q => %d rows affected", batch.FilePath, rowsAffected) - if err != nil { - utils.ErrExit("import batch: %q into %s: %s", batch.FilePath, batch.TableNameTup, err) - } - err = batch.MarkDone() - if err != nil { - utils.ErrExit("marking batch as done: %q: %s", batch.FilePath, err) - } -} +// func submitBatch(batch *Batch, updateProgressFn func(int64), importBatchArgsProto *tgtdb.ImportBatchArgs) { +// batchImportPool.Go(func() { +// // There are `poolSize` number of competing go-routines trying to invoke COPY. +// // But the `connPool` will allow only `parallelism` number of connections to be +// // used at a time. Thus limiting the number of concurrent COPYs to `parallelism`. +// importBatch(batch, importBatchArgsProto) +// if reportProgressInBytes { +// updateProgressFn(batch.ByteCount) +// } else { +// updateProgressFn(batch.RecordCount) +// } +// }) +// log.Infof("Queued batch: %s", spew.Sdump(batch)) +// } func getIndexName(sqlQuery string, indexName string) (string, error) { // Return the index name itself if it is aleady qualified with schema name @@ -1360,29 +1162,3 @@ func createInitialImportDataTableMetrics(tasks []*ImportFileTask) []*cp.UpdateIm return result } - -func createImportDataTableMetrics(tableName string, countLiveRows int64, countTotalRows int64, - status int) cp.UpdateImportedRowCountEvent { - - var schemaName, tableName2 string - if strings.Count(tableName, ".") == 1 { - schemaName, tableName2 = cp.SplitTableNameForPG(tableName) - } else { - schemaName, tableName2 = tconf.Schema, tableName - } - result := cp.UpdateImportedRowCountEvent{ - BaseUpdateRowCountEvent: cp.BaseUpdateRowCountEvent{ - BaseEvent: cp.BaseEvent{ - EventType: "IMPORT DATA", - MigrationUUID: migrationUUID, - SchemaNames: []string{schemaName}, - }, - TableName: tableName2, - Status: cp.EXPORT_OR_IMPORT_DATA_STATUS_INT_TO_STR[status], - TotalRowCount: countTotalRows, - CompletedRowCount: countLiveRows, - }, - } - - return result -} diff --git a/yb-voyager/cmd/importDataFileBatchProducer.go b/yb-voyager/cmd/importDataFileBatchProducer.go new file mode 100644 index 0000000000..4bccbd53f2 --- /dev/null +++ b/yb-voyager/cmd/importDataFileBatchProducer.go @@ -0,0 +1,233 @@ +/* +Copyright (c) YugabyteDB, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package cmd + +import ( + "fmt" + "io" + + log "github.com/sirupsen/logrus" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils" +) + +type FileBatchProducer struct { + task *ImportFileTask + state *ImportDataState + + pendingBatches []*Batch + lastBatchNumber int64 + lastOffset int64 + fileFullySplit bool + completed bool + + dataFile datafile.DataFile + header string + numLinesTaken int64 + lineFromPreviousBatch string +} + +func NewFileBatchProducer(task *ImportFileTask, state *ImportDataState) (*FileBatchProducer, error) { + err := state.PrepareForFileImport(task.FilePath, task.TableNameTup) + if err != nil { + return nil, fmt.Errorf("preparing for file import: %s", err) + } + pendingBatches, lastBatchNumber, lastOffset, fileFullySplit, err := state.Recover(task.FilePath, task.TableNameTup) + if err != nil { + return nil, fmt.Errorf("recovering state for table: %q: %s", task.TableNameTup, err) + } + var completed bool + if len(pendingBatches) == 0 && fileFullySplit { + completed = true + } + + return &FileBatchProducer{ + task: task, + state: state, + pendingBatches: pendingBatches, + lastBatchNumber: lastBatchNumber, + lastOffset: lastOffset, + fileFullySplit: fileFullySplit, + completed: completed, + numLinesTaken: lastOffset, + }, nil +} + +func (p *FileBatchProducer) Done() bool { + return p.completed +} + +func (p *FileBatchProducer) NextBatch() (*Batch, error) { + if p.Done() { + return nil, fmt.Errorf("already completed producing all batches") + } + if len(p.pendingBatches) > 0 { + batch := p.pendingBatches[0] + p.pendingBatches = p.pendingBatches[1:] + // file is fully split and returning the last batch, so mark the producer as completed + if len(p.pendingBatches) == 0 && p.fileFullySplit { + p.completed = true + } + return batch, nil + } + + return p.produceNextBatch() +} + +func (p *FileBatchProducer) produceNextBatch() (*Batch, error) { + if p.dataFile == nil { + err := p.openDataFile() + if err != nil { + return nil, err + } + } + + var readLineErr error + var line string + var currentBytesRead int64 + batchNum := p.lastBatchNumber + 1 + + batchWriter, err := p.newBatchWriter() + if err != nil { + return nil, err + } + if p.lineFromPreviousBatch != "" { + err = batchWriter.WriteRecord(p.lineFromPreviousBatch) + if err != nil { + return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err) + } + p.lineFromPreviousBatch = "" + } + + for readLineErr == nil { + + line, currentBytesRead, readLineErr = p.dataFile.NextLine() + if readLineErr == nil || (readLineErr == io.EOF && line != "") { + // handling possible case: last dataline(i.e. EOF) but no newline char at the end + p.numLinesTaken += 1 + } + log.Debugf("Batch %d: totalBytesRead %d, currentBytes %d \n", batchNum, p.dataFile.GetBytesRead(), currentBytesRead) + if currentBytesRead > tdb.MaxBatchSizeInBytes() { + //If a row is itself larger than MaxBatchSizeInBytes erroring out + ybSpecificMsg := "" + if tconf.TargetDBType == YUGABYTEDB { + ybSpecificMsg = ", but should be strictly lower than the the rpc_max_message_size on YugabyteDB (default 267386880 bytes)" + } + return nil, fmt.Errorf("record of size %d larger than max batch size: record num=%d for table %q in file %s is larger than the max batch size %d bytes. Max Batch size can be changed using env var MAX_BATCH_SIZE_BYTES%s", currentBytesRead, p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, tdb.MaxBatchSizeInBytes(), ybSpecificMsg) + } + if line != "" { + // can't use importBatchArgsProto.Columns as to use case insenstiive column names + columnNames, _ := TableToColumnNames.Get(p.task.TableNameTup) + line, err = valueConverter.ConvertRow(p.task.TableNameTup, columnNames, line) + if err != nil { + return nil, fmt.Errorf("transforming line number=%d for table: %q in file %s: %s", p.numLinesTaken, p.task.TableNameTup.ForOutput(), p.task.FilePath, err) + } + + // Check if adding this record exceeds the max batch size + if batchWriter.NumRecordsWritten == batchSizeInNumRows || + p.dataFile.GetBytesRead() > tdb.MaxBatchSizeInBytes() { // GetBytesRead - returns the total bytes read until now including the currentBytesRead + + // Finalize the current batch without adding the record + batch, err := p.finalizeBatch(batchWriter, false, p.numLinesTaken-1, p.dataFile.GetBytesRead()-currentBytesRead) + if err != nil { + return nil, err + } + + //carry forward the bytes to next batch + p.dataFile.ResetBytesRead(currentBytesRead) + p.lineFromPreviousBatch = line + + // Start a new batch by calling the initBatchWriter function + // initBatchWriter() + return batch, nil + } + + // Write the record to the new or current batch + err = batchWriter.WriteRecord(line) + if err != nil { + return nil, fmt.Errorf("Write to batch %d: %s", batchNum, err) + } + } + + // Finalize the batch if it's the last line or the end of the file and reset the bytes read to 0 + if readLineErr == io.EOF { + batch, err := p.finalizeBatch(batchWriter, true, p.numLinesTaken, p.dataFile.GetBytesRead()) + if err != nil { + return nil, err + } + + p.completed = true + p.dataFile.ResetBytesRead(0) + return batch, nil + } else if readLineErr != nil { + return nil, fmt.Errorf("read line from data file: %q: %s", p.task.FilePath, readLineErr) + } + } + // ideally should not reach here + return nil, fmt.Errorf("could not produce next batch: err: %w", readLineErr) +} + +func (p *FileBatchProducer) openDataFile() error { + reader, err := dataStore.Open(p.task.FilePath) + if err != nil { + return fmt.Errorf("preparing reader for split generation on file: %q: %v", p.task.FilePath, err) + } + + dataFile, err := datafile.NewDataFile(p.task.FilePath, reader, dataFileDescriptor) + + if err != nil { + return fmt.Errorf("open datafile: %q: %v", p.task.FilePath, err) + } + p.dataFile = dataFile + + log.Infof("Skipping %d lines from %q", p.lastOffset, p.task.FilePath) + err = dataFile.SkipLines(p.lastOffset) + if err != nil { + return fmt.Errorf("skipping line for offset=%d: %v", p.lastOffset, err) + } + if dataFileDescriptor.HasHeader { + p.header = dataFile.GetHeader() + } + return nil +} + +func (p *FileBatchProducer) newBatchWriter() (*BatchWriter, error) { + batchNum := p.lastBatchNumber + 1 + batchWriter := p.state.NewBatchWriter(p.task.FilePath, p.task.TableNameTup, batchNum) + err := batchWriter.Init() + if err != nil { + return nil, fmt.Errorf("initializing batch writer for table: %q: %s", p.task.TableNameTup, err) + } + // Write the header if necessary + if p.header != "" && dataFileDescriptor.FileFormat == datafile.CSV { + err = batchWriter.WriteHeader(p.header) + if err != nil { + utils.ErrExit("writing header for table: %q: %s", p.task.TableNameTup, err) + } + } + return batchWriter, nil +} + +func (p *FileBatchProducer) finalizeBatch(batchWriter *BatchWriter, isLastBatch bool, offsetEnd int64, bytesInBatch int64) (*Batch, error) { + batchNum := p.lastBatchNumber + 1 + batch, err := batchWriter.Done(isLastBatch, offsetEnd, bytesInBatch) + if err != nil { + utils.ErrExit("finalizing batch %d: %s", batchNum, err) + } + batchWriter = nil + p.lastBatchNumber = batchNum + return batch, nil +} diff --git a/yb-voyager/cmd/importDataFileBatchProducer_test.go b/yb-voyager/cmd/importDataFileBatchProducer_test.go new file mode 100644 index 0000000000..852dccee60 --- /dev/null +++ b/yb-voyager/cmd/importDataFileBatchProducer_test.go @@ -0,0 +1,333 @@ +//go:build unit + +/* +Copyright (c) YugabyteDB, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package cmd + +import ( + "fmt" + "os" + "testing" + + "github.com/google/go-cmp/cmp" + "github.com/stretchr/testify/assert" + "github.com/yugabyte/yb-voyager/yb-voyager/src/constants" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datastore" + "github.com/yugabyte/yb-voyager/yb-voyager/src/dbzm" + "github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" +) + +type dummyTDB struct { + maxSizeBytes int64 + tgtdb.TargetYugabyteDB +} + +func (d *dummyTDB) MaxBatchSizeInBytes() int64 { + return d.maxSizeBytes +} + +func createTempFile(dir string, fileContents string) (string, error) { + // Create a temporary file + file, err := os.CreateTemp(dir, "temp-*.txt") + if err != nil { + return "", err + } + defer file.Close() + + // Write some text to the file + _, err = file.WriteString(fileContents) + if err != nil { + return "", err + } + + return file.Name(), nil +} + +func setupDependenciesForTest(batchSizeRows int64, batchSizeBytes int64) (string, string, *ImportDataState, error) { + lexportDir, err := os.MkdirTemp("/tmp", "export-dir-*") + if err != nil { + return "", "", nil, err + } + + ldataDir, err := os.MkdirTemp("/tmp", "data-dir-*") + if err != nil { + return "", "", nil, err + } + + CreateMigrationProjectIfNotExists(constants.POSTGRESQL, lexportDir) + tdb = &dummyTDB{maxSizeBytes: batchSizeBytes} + valueConverter = &dbzm.NoOpValueConverter{} + dataStore = datastore.NewDataStore(ldataDir) + + batchSizeInNumRows = batchSizeRows + + state := NewImportDataState(lexportDir) + return ldataDir, lexportDir, state, nil +} + +func setupFileForTest(lexportDir string, fileContents string, dir string, tableName string) (string, *ImportFileTask, error) { + dataFileDescriptor = &datafile.Descriptor{ + FileFormat: "csv", + Delimiter: ",", + HasHeader: true, + ExportDir: lexportDir, + QuoteChar: '"', + EscapeChar: '\\', + NullString: "NULL", + } + tempFile, err := createTempFile(dir, fileContents) + if err != nil { + return "", nil, err + } + + sourceName := sqlname.NewObjectName(constants.POSTGRESQL, "public", "public", tableName) + tableNameTup := sqlname.NameTuple{SourceName: sourceName, CurrentName: sourceName} + task := &ImportFileTask{ + ID: 1, + FilePath: tempFile, + TableNameTup: tableNameTup, + RowCount: 1, + } + return tempFile, task, nil +} + +func TestBasicFileBatchProducer(t *testing.T) { + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + assert.Equal(t, int64(1), batch.RecordCount) + assert.True(t, batchproducer.Done()) +} + +func TestFileBatchProducerBasedOnRowsThreshold(t *testing.T) { + // max batch size in rows is 2 + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello" +2, "world" +3, "foo" +4, "bar"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + var batches []*Batch + for !batchproducer.Done() { + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + batches = append(batches, batch) + } + + // 2 batches should be produced + assert.Equal(t, 2, len(batches)) + // each of length 2 + assert.Equal(t, int64(2), batches[0].RecordCount) + batchContents, err := os.ReadFile(batches[0].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n1, \"hello\"\n2, \"world\"", string(batchContents)) + + assert.Equal(t, int64(2), batches[1].RecordCount) + batchContents, err = os.ReadFile(batches[1].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n3, \"foo\"\n4, \"bar\"", string(batchContents)) +} + +func TestFileBatchProducerBasedOnSizeThreshold(t *testing.T) { + // max batch size in size is 25 bytes + maxBatchSizeBytes := int64(25) + ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, maxBatchSizeBytes) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + // each row exccept header is 10 bytes + fileContents := `id,val +1, "abcde" +2, "ghijk" +3, "mnopq" +4, "stuvw"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + var batches []*Batch + for !batchproducer.Done() { + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + batches = append(batches, batch) + } + + // 3 batches should be produced + // while calculating for the first batch, the header is also considered + assert.Equal(t, 3, len(batches)) + // each of length 2 + assert.Equal(t, int64(1), batches[0].RecordCount) + assert.LessOrEqual(t, batches[0].ByteCount, maxBatchSizeBytes) + batchContents, err := os.ReadFile(batches[0].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n1, \"abcde\"", string(batchContents)) + + assert.Equal(t, int64(2), batches[1].RecordCount) + assert.LessOrEqual(t, batches[1].ByteCount, maxBatchSizeBytes) + batchContents, err = os.ReadFile(batches[1].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n2, \"ghijk\"\n3, \"mnopq\"", string(batchContents)) + + assert.Equal(t, int64(1), batches[2].RecordCount) + assert.LessOrEqual(t, batches[2].ByteCount, maxBatchSizeBytes) + batchContents, err = os.ReadFile(batches[2].GetFilePath()) + assert.NoError(t, err) + assert.Equal(t, "id,val\n4, \"stuvw\"", string(batchContents)) +} + +func TestFileBatchProducerThrowsErrorWhenSingleRowGreaterThanMaxBatchSize(t *testing.T) { + // max batch size in size is 25 bytes + ldataDir, lexportDir, state, err := setupDependenciesForTest(1000, 25) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + // 3rd row is greater than max batch size + fileContents := `id,val +1, "abcdef" +2, "ghijk" +3, "mnopq1234567899876543" +4, "stuvw"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + + assert.False(t, batchproducer.Done()) + + // 1st batch is fine. + batch, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch) + assert.Equal(t, int64(1), batch.RecordCount) + + // 2nd batch should throw error + _, err = batchproducer.NextBatch() + assert.ErrorContains(t, err, "larger than max batch size") +} + +func TestFileBatchProducerResumable(t *testing.T) { + // max batch size in rows is 2 + ldataDir, lexportDir, state, err := setupDependenciesForTest(2, 1024) + assert.NoError(t, err) + + if ldataDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", ldataDir)) + } + if lexportDir != "" { + defer os.RemoveAll(fmt.Sprintf("%s/", lexportDir)) + } + + fileContents := `id,val +1, "hello" +2, "world" +3, "foo" +4, "bar"` + _, task, err := setupFileForTest(lexportDir, fileContents, ldataDir, "test_table") + assert.NoError(t, err) + + batchproducer, err := NewFileBatchProducer(task, state) + assert.NoError(t, err) + assert.False(t, batchproducer.Done()) + + // generate one batch + batch1, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch1) + assert.Equal(t, int64(2), batch1.RecordCount) + + // simulate a crash and recover + batchproducer, err = NewFileBatchProducer(task, state) + assert.NoError(t, err) + assert.False(t, batchproducer.Done()) + + // state should have recovered that one batch + assert.Equal(t, 1, len(batchproducer.pendingBatches)) + assert.True(t, cmp.Equal(batch1, batchproducer.pendingBatches[0])) + + // verify that it picks up from pendingBatches + // instead of procing a new batch. + batch1Recovered, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch1Recovered) + assert.True(t, cmp.Equal(batch1, batch1Recovered)) + assert.Equal(t, 0, len(batchproducer.pendingBatches)) + assert.False(t, batchproducer.Done()) + + // get final batch + batch2, err := batchproducer.NextBatch() + assert.NoError(t, err) + assert.NotNil(t, batch2) + assert.Equal(t, int64(2), batch2.RecordCount) + assert.True(t, batchproducer.Done()) +} diff --git a/yb-voyager/cmd/importDataFileTaskImporter.go b/yb-voyager/cmd/importDataFileTaskImporter.go new file mode 100644 index 0000000000..2b5834e3b8 --- /dev/null +++ b/yb-voyager/cmd/importDataFileTaskImporter.go @@ -0,0 +1,247 @@ +/* +Copyright (c) YugabyteDB, Inc. + +Licensed under the Apache License, Version 2.0 (the "License"); +you may not use this file except in compliance with the License. +You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + +Unless required by applicable law or agreed to in writing, software +distributed under the License is distributed on an "AS IS" BASIS, +WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +See the License for the specific language governing permissions and +limitations under the License. +*/ +package cmd + +import ( + "fmt" + "strings" + "time" + + "github.com/davecgh/go-spew/spew" + log "github.com/sirupsen/logrus" + "github.com/sourcegraph/conc/pool" + "github.com/yugabyte/yb-voyager/yb-voyager/src/cp" + "github.com/yugabyte/yb-voyager/yb-voyager/src/datafile" + "github.com/yugabyte/yb-voyager/yb-voyager/src/tgtdb" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils" + "github.com/yugabyte/yb-voyager/yb-voyager/src/utils/sqlname" +) + +type FileTaskImporter struct { + task *ImportFileTask + state *ImportDataState + batchProducer *FileBatchProducer + importBatchArgsProto *tgtdb.ImportBatchArgs + workerPool *pool.Pool + + totalProgressAmount int64 + currentProgressAmount int64 + progressReporter *ImportDataProgressReporter +} + +func NewFileTaskImporter(task *ImportFileTask, state *ImportDataState, workerPool *pool.Pool, + progressReporter *ImportDataProgressReporter) (*FileTaskImporter, error) { + batchProducer, err := NewFileBatchProducer(task, state) + if err != nil { + return nil, fmt.Errorf("creating file batch producer: %s", err) + } + totalProgressAmount := getTotalProgressAmount(task) + progressReporter.ImportFileStarted(task, totalProgressAmount) + progressReporter.AddProgressAmount(task, getImportedProgressAmount(task, state)) + + fti := &FileTaskImporter{ + task: task, + state: state, + batchProducer: batchProducer, + workerPool: workerPool, + importBatchArgsProto: getImportBatchArgsProto(task.TableNameTup, task.FilePath), + progressReporter: progressReporter, + totalProgressAmount: getTotalProgressAmount(task), + } + return fti, nil +} + +// as of now, batch production and batch submission +// is done together in `SubmitNextBatch` method. +// In other words, as soon as a batch is produced, it is submitted. +// Therefore, to check whether all batches are submitted, we can check +// if the batch producer is done. +func (fti *FileTaskImporter) AllBatchesSubmitted() bool { + return fti.batchProducer.Done() +} + +func (fti *FileTaskImporter) AllBatchesImported() error { + // TODO: check importDataState for status. + panic("not implemented") +} + +func (fti *FileTaskImporter) SubmitNextBatch() error { + if fti.batchProducer.Done() { + return fmt.Errorf("no more batches to submit") + } + batch, err := fti.batchProducer.NextBatch() + if err != nil { + return fmt.Errorf("getting next batch: %w", err) + } + return fti.submitBatch(batch) +} + +func (fti *FileTaskImporter) submitBatch(batch *Batch) error { + fti.workerPool.Go(func() { + // There are `poolSize` number of competing go-routines trying to invoke COPY. + // But the `connPool` will allow only `parallelism` number of connections to be + // used at a time. Thus limiting the number of concurrent COPYs to `parallelism`. + importBatch(batch, fti.importBatchArgsProto) + if reportProgressInBytes { + fti.updateProgress(batch.ByteCount) + } else { + fti.updateProgress(batch.RecordCount) + } + }) + log.Infof("Queued batch: %s", spew.Sdump(batch)) + return nil +} + +func (fti *FileTaskImporter) updateProgress(progressAmount int64) { + fti.currentProgressAmount += progressAmount + fti.progressReporter.AddProgressAmount(fti.task, progressAmount) + + if importerRole == TARGET_DB_IMPORTER_ROLE && fti.totalProgressAmount > fti.currentProgressAmount { + importDataTableMetrics := createImportDataTableMetrics(fti.task.TableNameTup.ForKey(), + fti.currentProgressAmount, fti.totalProgressAmount, ROW_UPDATE_STATUS_IN_PROGRESS) + // The metrics are sent after evry 5 secs in implementation of UpdateImportedRowCount + controlPlane.UpdateImportedRowCount( + []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + } +} + +func (fti *FileTaskImporter) PostProcess() { + // TODO: close fileBatchProducer properly + + if importerRole == TARGET_DB_IMPORTER_ROLE { + importDataTableMetrics := createImportDataTableMetrics(fti.task.TableNameTup.ForKey(), + fti.currentProgressAmount, fti.totalProgressAmount, ROW_UPDATE_STATUS_COMPLETED) + controlPlane.UpdateImportedRowCount( + []*cp.UpdateImportedRowCountEvent{&importDataTableMetrics}) + } + + fti.progressReporter.FileImportDone(fti.task) // Remove the progress-bar for the file.\ +} + +// ============================================================================= // + +func getTotalProgressAmount(task *ImportFileTask) int64 { + if reportProgressInBytes { + return task.FileSize + } else { + return task.RowCount + } +} + +func getImportedProgressAmount(task *ImportFileTask, state *ImportDataState) int64 { + if reportProgressInBytes { + byteCount, err := state.GetImportedByteCount(task.FilePath, task.TableNameTup) + if err != nil { + utils.ErrExit("Failed to get imported byte count for table: %s: %s", task.TableNameTup, err) + } + return byteCount + } else { + rowCount, err := state.GetImportedRowCount(task.FilePath, task.TableNameTup) + if err != nil { + utils.ErrExit("Failed to get imported row count for table: %s: %s", task.TableNameTup, err) + } + return rowCount + } +} + +func createImportDataTableMetrics(tableName string, countLiveRows int64, countTotalRows int64, + status int) cp.UpdateImportedRowCountEvent { + + var schemaName, tableName2 string + if strings.Count(tableName, ".") == 1 { + schemaName, tableName2 = cp.SplitTableNameForPG(tableName) + } else { + schemaName, tableName2 = tconf.Schema, tableName + } + result := cp.UpdateImportedRowCountEvent{ + BaseUpdateRowCountEvent: cp.BaseUpdateRowCountEvent{ + BaseEvent: cp.BaseEvent{ + EventType: "IMPORT DATA", + MigrationUUID: migrationUUID, + SchemaNames: []string{schemaName}, + }, + TableName: tableName2, + Status: cp.EXPORT_OR_IMPORT_DATA_STATUS_INT_TO_STR[status], + TotalRowCount: countTotalRows, + CompletedRowCount: countLiveRows, + }, + } + + return result +} + +func getImportBatchArgsProto(tableNameTup sqlname.NameTuple, filePath string) *tgtdb.ImportBatchArgs { + columns, _ := TableToColumnNames.Get(tableNameTup) + columns, err := tdb.QuoteAttributeNames(tableNameTup, columns) + if err != nil { + utils.ErrExit("if required quote column names: %s", err) + } + // If `columns` is unset at this point, no attribute list is passed in the COPY command. + fileFormat := dataFileDescriptor.FileFormat + if fileFormat == datafile.SQL { + fileFormat = datafile.TEXT + } + importBatchArgsProto := &tgtdb.ImportBatchArgs{ + TableNameTup: tableNameTup, + Columns: columns, + FileFormat: fileFormat, + Delimiter: dataFileDescriptor.Delimiter, + HasHeader: dataFileDescriptor.HasHeader && fileFormat == datafile.CSV, + QuoteChar: dataFileDescriptor.QuoteChar, + EscapeChar: dataFileDescriptor.EscapeChar, + NullString: dataFileDescriptor.NullString, + } + log.Infof("ImportBatchArgs: %v", spew.Sdump(importBatchArgsProto)) + return importBatchArgsProto +} + +func importBatch(batch *Batch, importBatchArgsProto *tgtdb.ImportBatchArgs) { + err := batch.MarkPending() + if err != nil { + utils.ErrExit("marking batch as pending: %d: %s", batch.Number, err) + } + log.Infof("Importing %q", batch.FilePath) + + importBatchArgs := *importBatchArgsProto + importBatchArgs.FilePath = batch.FilePath + importBatchArgs.RowsPerTransaction = batch.OffsetEnd - batch.OffsetStart + + var rowsAffected int64 + sleepIntervalSec := 0 + for attempt := 0; attempt < COPY_MAX_RETRY_COUNT; attempt++ { + tableSchema, _ := TableNameToSchema.Get(batch.TableNameTup) + rowsAffected, err = tdb.ImportBatch(batch, &importBatchArgs, exportDir, tableSchema) + if err == nil || tdb.IsNonRetryableCopyError(err) { + break + } + log.Warnf("COPY FROM file %q: %s", batch.FilePath, err) + sleepIntervalSec += 10 + if sleepIntervalSec > MAX_SLEEP_SECOND { + sleepIntervalSec = MAX_SLEEP_SECOND + } + log.Infof("sleep for %d seconds before retrying the file %s (attempt %d)", + sleepIntervalSec, batch.FilePath, attempt) + time.Sleep(time.Duration(sleepIntervalSec) * time.Second) + } + log.Infof("%q => %d rows affected", batch.FilePath, rowsAffected) + if err != nil { + utils.ErrExit("import batch: %q into %s: %s", batch.FilePath, batch.TableNameTup, err) + } + err = batch.MarkDone() + if err != nil { + utils.ErrExit("marking batch as done: %q: %s", batch.FilePath, err) + } +}