Skip to content

Commit

Permalink
Changed solution to use msr to store the state of previous run for wh…
Browse files Browse the repository at this point in the history
…ether cutover detected by importer and use it
  • Loading branch information
priyanshi-yb committed Mar 4, 2025
1 parent 643df4e commit f5431b3
Show file tree
Hide file tree
Showing 4 changed files with 38 additions and 101 deletions.
38 changes: 0 additions & 38 deletions yb-voyager/cmd/eventQueue.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,26 +54,6 @@ func NewEventQueue(exportDir string) *EventQueue {
}
}

// GetNextSegment returns the next segment to process
func (eq *EventQueue) GetLastProcessedSegment() (*EventQueueSegment, error) {
var err error
segmentsExporterRole := ""
if importerRole == SOURCE_DB_IMPORTER_ROLE {
// in case of fall-back import, restrict to only segments exported from target db.
segmentsExporterRole = TARGET_DB_EXPORTER_FB_ROLE

}
eq.SegmentNumToStream, err = metaDB.GetMaxSegmentExportedByAndImportedBy(importerRole, segmentsExporterRole)
segmentFileName := fmt.Sprintf("%s.%d.%s", QUEUE_SEGMENT_FILE_NAME, eq.SegmentNumToStream, QUEUE_SEGMENT_FILE_EXTENSION)
segmentFilePath := filepath.Join(eq.QueueDirPath, segmentFileName)
_, err = os.Stat(segmentFilePath)
if err != nil {
return nil, fmt.Errorf("failed to get next segment file path: %w", err)
}
segment := NewEventQueueSegment(segmentFilePath, eq.SegmentNumToStream)
return segment, nil
}

// GetNextSegment returns the next segment to process
func (eq *EventQueue) GetNextSegment() (*EventQueueSegment, error) {
var err error
Expand Down Expand Up @@ -201,21 +181,3 @@ func (eqs *EventQueueSegment) IsProcessed() bool {
func (eqs *EventQueueSegment) MarkProcessed() {
eqs.processed = true
}

func (eqs *EventQueueSegment) getLastEvent() (*tgtdb.Event, error) {
var lastEvent *tgtdb.Event
for {
event, err := eqs.NextEvent()
if err != nil {
return nil, fmt.Errorf("error getting next event of last processed segment: %v", err)
}
if event == nil {
break
}

lastEvent = event
}

return lastEvent, nil

}
57 changes: 22 additions & 35 deletions yb-voyager/cmd/live_migration.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,46 +56,20 @@ func init() {
}

func cutoverInitiatedAndCutoverEventProcessed() (bool, error) {
cutoverInitiated, err := cutoverInitiatedAlready(importerRole)
msr, err := metaDB.GetMigrationStatusRecord()
if err != nil {
return false, err
return false, fmt.Errorf("getting migration status record: %v", err)
}
if !cutoverInitiated {
return false, nil
}
eventQueue = NewEventQueue(exportDir)

//Get the last Segement processed - lastSeg
// check the last event of the lastSeg file
// if its a cutover event then return true
lastProcessedSegment, err := eventQueue.GetLastProcessedSegment()
if err != nil {
return false, fmt.Errorf("error getting last segment proccessed: %v", err)
}
if lastProcessedSegment.SegmentNum == -1 {
//If there are no processed segment files - return false
return false, nil
}

err = lastProcessedSegment.Open()
if err != nil {
return false, fmt.Errorf("error opening the last Processed segment: %v", err)
}

lastEvent, err := lastProcessedSegment.getLastEvent()
if err != nil {
return false, fmt.Errorf("error getting the lastEvent of segment: %v", err)
}

if lastEvent.IsCutoverToTarget() && importerRole == TARGET_DB_IMPORTER_ROLE ||
lastEvent.IsCutoverToSourceReplica() && importerRole == SOURCE_REPLICA_DB_IMPORTER_ROLE ||
lastEvent.IsCutoverToSource() && importerRole == SOURCE_DB_IMPORTER_ROLE { // cutover or fall-forward command
//If the last event is cutover one then return true meaning no need to continue stream changes
return true, nil
switch importerRole {
case TARGET_DB_IMPORTER_ROLE:
return msr.CutoverToTargetRequested && msr.CutoverDetectedByTargetImporter, nil
case SOURCE_REPLICA_DB_IMPORTER_ROLE:
return msr.CutoverToSourceReplicaRequested && msr.CutoverDetectedBySourceReplicaImporter, nil
case SOURCE_DB_IMPORTER_ROLE:
return msr.CutoverToSourceRequested && msr.CutoverDetectedBySourceImporter, nil
}

return false, nil

}

func streamChanges(state *ImportDataState, tableNames []sqlname.NameTuple) error {
Expand Down Expand Up @@ -228,6 +202,19 @@ func streamChangesFromSegment(
event.IsCutoverToSourceReplica() && importerRole == SOURCE_REPLICA_DB_IMPORTER_ROLE ||
event.IsCutoverToSource() && importerRole == SOURCE_DB_IMPORTER_ROLE { // cutover or fall-forward command

err := metaDB.UpdateMigrationStatusRecord(func(record *metadb.MigrationStatusRecord) {
switch importerRole {
case TARGET_DB_IMPORTER_ROLE:
record.CutoverDetectedByTargetImporter = true
case SOURCE_REPLICA_DB_IMPORTER_ROLE:
record.CutoverDetectedBySourceReplicaImporter = true
case SOURCE_DB_IMPORTER_ROLE:
record.CutoverDetectedBySourceImporter = true
}
})
if err != nil {
return fmt.Errorf("error updating the migration status record for cutover detected case: %", err)
}
updateCallhomeImportPhase(event)

eventQueue.EndOfQueue = true
Expand Down
21 changes: 0 additions & 21 deletions yb-voyager/src/metadb/metadataDB.go
Original file line number Diff line number Diff line change
Expand Up @@ -392,27 +392,6 @@ func (m *MetaDB) GetMinSegmentExportedByAndNotImportedBy(importerRole string, ex
return segmentNum.Int64, nil
}

//returning the max segment number that is processed by importer
func (m *MetaDB) GetMaxSegmentExportedByAndImportedBy(importerRole string, exporterRole string) (int64, error) {
query := fmt.Sprintf(`SELECT MAX(segment_no) FROM %s WHERE`, QUEUE_SEGMENT_META_TABLE_NAME)
query = fmt.Sprintf("%s imported_by_%s = 1", query, importerRole)
if exporterRole != "" {
query = fmt.Sprintf("%s AND exporter_role = '%s'", query, exporterRole)
}
query = fmt.Sprintf("%s;", query)

row := m.db.QueryRow(query)
var segmentNum sql.NullInt64
err := row.Scan(&segmentNum)
if err != nil {
return -1, fmt.Errorf("run query on meta db - %s : %w", query, err)
}
if !segmentNum.Valid {
return -1, ErrNoQueueSegmentsFound
}
return segmentNum.Int64, nil
}

func (m *MetaDB) GetExportedEventsStatsForTable(schemaName string, tableName string) (*tgtdb.EventCounter, error) {
var totalCount int64
var inserts int64
Expand Down
23 changes: 16 additions & 7 deletions yb-voyager/src/metadb/migrationStatus.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,18 +24,27 @@ type MigrationStatusRecord struct {
TableListExportedFromSource []string `json:"TableListExportedFromSource"`
SourceDBConf *srcdb.Source `json:"SourceDBConf"`

CutoverToTargetRequested bool `json:"CutoverToTargetRequested"`
//All the cutover requested flags by initiate cutover command
CutoverToTargetRequested bool `json:"CutoverToTargetRequested"`
CutoverToSourceRequested bool `json:"CutoverToSourceRequested"`
CutoverToSourceReplicaRequested bool `json:"CutoverToSourceReplicaRequested"`

//All the cutover processed by importer/exporter flags
CutoverProcessedBySourceExporter bool `json:"CutoverProcessedBySourceExporter"`
CutoverProcessedByTargetImporter bool `json:"CutoverProcessedByTargetImporter"`
ExportFromTargetFallForwardStarted bool `json:"ExportFromTargetFallForwardStarted"`
CutoverToSourceReplicaRequested bool `json:"CutoverToSourceReplicaRequested"`
CutoverToSourceProcessedByTargetExporter bool `json:"CutoverToSourceProcessedByTargetExporter"`
CutoverToSourceReplicaProcessedByTargetExporter bool `json:"CutoverToSourceReplicaProcessedByTargetExporter"`
CutoverProcessedByTargetImporter bool `json:"CutoverProcessedByTargetImporter"`
CutoverToSourceReplicaProcessedBySRImporter bool `json:"CutoverToSourceReplicaProcessedBySRImporter"`
ExportFromTargetFallBackStarted bool `json:"ExportFromTargetFallBackStarted"`
CutoverToSourceRequested bool `json:"CutoverToSourceRequested"`
CutoverToSourceProcessedByTargetExporter bool `json:"CutoverToSourceProcessedByTargetExporter"`
CutoverToSourceProcessedBySourceImporter bool `json:"CutoverToSourceProcessedBySourceImporter"`

//All the cutover detectedc flags by importer when they processed the cutover event
CutoverDetectedByTargetImporter bool `json:"CutoverDetectedByTargetImporter"`
CutoverDetectedBySourceImporter bool `json:"CutoverDetectedBySourceImporter"`
CutoverDetectedBySourceReplicaImporter bool `json:"CutoverDetectedBySourceReplicaImporter"`

ExportFromTargetFallForwardStarted bool `json:"ExportFromTargetFallForwardStarted"`
ExportFromTargetFallBackStarted bool `json:"ExportFromTargetFallBackStarted"`

ExportSchemaDone bool `json:"ExportSchemaDone"`
ExportDataDone bool `json:"ExportDataDone"` // to be interpreted as export of snapshot data from source is complete
ExportDataSourceDebeziumStarted bool `json:"ExportDataSourceDebeziumStarted"`
Expand Down

0 comments on commit f5431b3

Please sign in to comment.