diff --git a/yb-voyager/cmd/eventQueue.go b/yb-voyager/cmd/eventQueue.go index 05f4cfbf7..11225f894 100644 --- a/yb-voyager/cmd/eventQueue.go +++ b/yb-voyager/cmd/eventQueue.go @@ -54,26 +54,6 @@ func NewEventQueue(exportDir string) *EventQueue { } } -// GetNextSegment returns the next segment to process -func (eq *EventQueue) GetLastProcessedSegment() (*EventQueueSegment, error) { - var err error - segmentsExporterRole := "" - if importerRole == SOURCE_DB_IMPORTER_ROLE { - // in case of fall-back import, restrict to only segments exported from target db. - segmentsExporterRole = TARGET_DB_EXPORTER_FB_ROLE - - } - eq.SegmentNumToStream, err = metaDB.GetMaxSegmentExportedByAndImportedBy(importerRole, segmentsExporterRole) - segmentFileName := fmt.Sprintf("%s.%d.%s", QUEUE_SEGMENT_FILE_NAME, eq.SegmentNumToStream, QUEUE_SEGMENT_FILE_EXTENSION) - segmentFilePath := filepath.Join(eq.QueueDirPath, segmentFileName) - _, err = os.Stat(segmentFilePath) - if err != nil { - return nil, fmt.Errorf("failed to get next segment file path: %w", err) - } - segment := NewEventQueueSegment(segmentFilePath, eq.SegmentNumToStream) - return segment, nil -} - // GetNextSegment returns the next segment to process func (eq *EventQueue) GetNextSegment() (*EventQueueSegment, error) { var err error @@ -201,21 +181,3 @@ func (eqs *EventQueueSegment) IsProcessed() bool { func (eqs *EventQueueSegment) MarkProcessed() { eqs.processed = true } - -func (eqs *EventQueueSegment) getLastEvent() (*tgtdb.Event, error) { - var lastEvent *tgtdb.Event - for { - event, err := eqs.NextEvent() - if err != nil { - return nil, fmt.Errorf("error getting next event of last processed segment: %v", err) - } - if event == nil { - break - } - - lastEvent = event - } - - return lastEvent, nil - -} diff --git a/yb-voyager/cmd/live_migration.go b/yb-voyager/cmd/live_migration.go index 1b806f108..17827ef58 100644 --- a/yb-voyager/cmd/live_migration.go +++ b/yb-voyager/cmd/live_migration.go @@ -56,46 +56,20 @@ func init() { } func cutoverInitiatedAndCutoverEventProcessed() (bool, error) { - cutoverInitiated, err := cutoverInitiatedAlready(importerRole) + msr, err := metaDB.GetMigrationStatusRecord() if err != nil { - return false, err + return false, fmt.Errorf("getting migration status record: %v", err) } - if !cutoverInitiated { - return false, nil - } - eventQueue = NewEventQueue(exportDir) - - //Get the last Segement processed - lastSeg - // check the last event of the lastSeg file - // if its a cutover event then return true - lastProcessedSegment, err := eventQueue.GetLastProcessedSegment() - if err != nil { - return false, fmt.Errorf("error getting last segment proccessed: %v", err) - } - if lastProcessedSegment.SegmentNum == -1 { - //If there are no processed segment files - return false - return false, nil - } - - err = lastProcessedSegment.Open() - if err != nil { - return false, fmt.Errorf("error opening the last Processed segment: %v", err) - } - - lastEvent, err := lastProcessedSegment.getLastEvent() - if err != nil { - return false, fmt.Errorf("error getting the lastEvent of segment: %v", err) - } - - if lastEvent.IsCutoverToTarget() && importerRole == TARGET_DB_IMPORTER_ROLE || - lastEvent.IsCutoverToSourceReplica() && importerRole == SOURCE_REPLICA_DB_IMPORTER_ROLE || - lastEvent.IsCutoverToSource() && importerRole == SOURCE_DB_IMPORTER_ROLE { // cutover or fall-forward command - //If the last event is cutover one then return true meaning no need to continue stream changes - return true, nil + switch importerRole { + case TARGET_DB_IMPORTER_ROLE: + return msr.CutoverToTargetRequested && msr.CutoverDetectedByTargetImporter, nil + case SOURCE_REPLICA_DB_IMPORTER_ROLE: + return msr.CutoverToSourceReplicaRequested && msr.CutoverDetectedBySourceReplicaImporter, nil + case SOURCE_DB_IMPORTER_ROLE: + return msr.CutoverToSourceRequested && msr.CutoverDetectedBySourceImporter, nil } return false, nil - } func streamChanges(state *ImportDataState, tableNames []sqlname.NameTuple) error { @@ -228,6 +202,19 @@ func streamChangesFromSegment( event.IsCutoverToSourceReplica() && importerRole == SOURCE_REPLICA_DB_IMPORTER_ROLE || event.IsCutoverToSource() && importerRole == SOURCE_DB_IMPORTER_ROLE { // cutover or fall-forward command + err := metaDB.UpdateMigrationStatusRecord(func(record *metadb.MigrationStatusRecord) { + switch importerRole { + case TARGET_DB_IMPORTER_ROLE: + record.CutoverDetectedByTargetImporter = true + case SOURCE_REPLICA_DB_IMPORTER_ROLE: + record.CutoverDetectedBySourceReplicaImporter = true + case SOURCE_DB_IMPORTER_ROLE: + record.CutoverDetectedBySourceImporter = true + } + }) + if err != nil { + return fmt.Errorf("error updating the migration status record for cutover detected case: %", err) + } updateCallhomeImportPhase(event) eventQueue.EndOfQueue = true diff --git a/yb-voyager/src/metadb/metadataDB.go b/yb-voyager/src/metadb/metadataDB.go index 89b713cf2..32784bf2b 100644 --- a/yb-voyager/src/metadb/metadataDB.go +++ b/yb-voyager/src/metadb/metadataDB.go @@ -392,27 +392,6 @@ func (m *MetaDB) GetMinSegmentExportedByAndNotImportedBy(importerRole string, ex return segmentNum.Int64, nil } -//returning the max segment number that is processed by importer -func (m *MetaDB) GetMaxSegmentExportedByAndImportedBy(importerRole string, exporterRole string) (int64, error) { - query := fmt.Sprintf(`SELECT MAX(segment_no) FROM %s WHERE`, QUEUE_SEGMENT_META_TABLE_NAME) - query = fmt.Sprintf("%s imported_by_%s = 1", query, importerRole) - if exporterRole != "" { - query = fmt.Sprintf("%s AND exporter_role = '%s'", query, exporterRole) - } - query = fmt.Sprintf("%s;", query) - - row := m.db.QueryRow(query) - var segmentNum sql.NullInt64 - err := row.Scan(&segmentNum) - if err != nil { - return -1, fmt.Errorf("run query on meta db - %s : %w", query, err) - } - if !segmentNum.Valid { - return -1, ErrNoQueueSegmentsFound - } - return segmentNum.Int64, nil -} - func (m *MetaDB) GetExportedEventsStatsForTable(schemaName string, tableName string) (*tgtdb.EventCounter, error) { var totalCount int64 var inserts int64 diff --git a/yb-voyager/src/metadb/migrationStatus.go b/yb-voyager/src/metadb/migrationStatus.go index 04e94f9fd..4b196539c 100644 --- a/yb-voyager/src/metadb/migrationStatus.go +++ b/yb-voyager/src/metadb/migrationStatus.go @@ -24,18 +24,27 @@ type MigrationStatusRecord struct { TableListExportedFromSource []string `json:"TableListExportedFromSource"` SourceDBConf *srcdb.Source `json:"SourceDBConf"` - CutoverToTargetRequested bool `json:"CutoverToTargetRequested"` + //All the cutover requested flags by initiate cutover command + CutoverToTargetRequested bool `json:"CutoverToTargetRequested"` + CutoverToSourceRequested bool `json:"CutoverToSourceRequested"` + CutoverToSourceReplicaRequested bool `json:"CutoverToSourceReplicaRequested"` + + //All the cutover processed by importer/exporter flags CutoverProcessedBySourceExporter bool `json:"CutoverProcessedBySourceExporter"` - CutoverProcessedByTargetImporter bool `json:"CutoverProcessedByTargetImporter"` - ExportFromTargetFallForwardStarted bool `json:"ExportFromTargetFallForwardStarted"` - CutoverToSourceReplicaRequested bool `json:"CutoverToSourceReplicaRequested"` + CutoverToSourceProcessedByTargetExporter bool `json:"CutoverToSourceProcessedByTargetExporter"` CutoverToSourceReplicaProcessedByTargetExporter bool `json:"CutoverToSourceReplicaProcessedByTargetExporter"` + CutoverProcessedByTargetImporter bool `json:"CutoverProcessedByTargetImporter"` CutoverToSourceReplicaProcessedBySRImporter bool `json:"CutoverToSourceReplicaProcessedBySRImporter"` - ExportFromTargetFallBackStarted bool `json:"ExportFromTargetFallBackStarted"` - CutoverToSourceRequested bool `json:"CutoverToSourceRequested"` - CutoverToSourceProcessedByTargetExporter bool `json:"CutoverToSourceProcessedByTargetExporter"` CutoverToSourceProcessedBySourceImporter bool `json:"CutoverToSourceProcessedBySourceImporter"` + //All the cutover detectedc flags by importer when they processed the cutover event + CutoverDetectedByTargetImporter bool `json:"CutoverDetectedByTargetImporter"` + CutoverDetectedBySourceImporter bool `json:"CutoverDetectedBySourceImporter"` + CutoverDetectedBySourceReplicaImporter bool `json:"CutoverDetectedBySourceReplicaImporter"` + + ExportFromTargetFallForwardStarted bool `json:"ExportFromTargetFallForwardStarted"` + ExportFromTargetFallBackStarted bool `json:"ExportFromTargetFallBackStarted"` + ExportSchemaDone bool `json:"ExportSchemaDone"` ExportDataDone bool `json:"ExportDataDone"` // to be interpreted as export of snapshot data from source is complete ExportDataSourceDebeziumStarted bool `json:"ExportDataSourceDebeziumStarted"`