Skip to content

Commit

Permalink
Restructured the code a bit to be able to add more tests for guardrai…
Browse files Browse the repository at this point in the history
…ls and new leafs addition. Added tests for subsequent run validating the guardrails of missing and extra tables and new leaf tables addition
  • Loading branch information
priyanshi-yb committed Feb 28, 2025
1 parent e64f29f commit b842a60
Show file tree
Hide file tree
Showing 2 changed files with 220 additions and 58 deletions.
79 changes: 56 additions & 23 deletions yb-voyager/cmd/exportData.go
Original file line number Diff line number Diff line change
Expand Up @@ -559,10 +559,10 @@ func addLeafPartitionsInTableList(tableList []sqlname.NameTuple, addAllLeafParti
allLeafPartitions := GetAllLeafPartitions(table)
prevLengthOfList := len(modifiedTableList)
switch true {
case len(allLeafPartitions) == 0 && rootTable != table: //leaf partition
case len(allLeafPartitions) == 0 && !rootTable.Equals(table): //leaf partition
partitionsToRootTableMap[qualifiedCatalogName] = rootTable.AsQualifiedCatalogName() // Unquoted->Unquoted map as debezium uses Unquoted table name
modifiedTableList = append(modifiedTableList, table)
case len(allLeafPartitions) == 0 && rootTable == table: //normal table
case len(allLeafPartitions) == 0 && rootTable.Equals(table): //normal table
modifiedTableList = append(modifiedTableList, table)
case len(allLeafPartitions) > 0 && addAllLeafPartitions: // table with partitions in table list
for _, leafPartition := range allLeafPartitions {
Expand All @@ -583,7 +583,12 @@ func addLeafPartitionsInTableList(tableList []sqlname.NameTuple, addAllLeafParti
func GetRootTableOfPartition(table sqlname.NameTuple) (sqlname.NameTuple, error) {
parentTable := source.DB().ParentTableOfPartition(table)
if parentTable == "" {
return table, nil
//now we know its a root table so return the tuple from the nameregistry
tuple, err := namereg.NameReg.LookupTableName(table.ForKey())
if err != nil {
return sqlname.NameTuple{}, fmt.Errorf("lookup failed for the table: %v: %v", table.ForKey(), err)
}
return tuple, nil
}

// non-root table
Expand Down Expand Up @@ -810,6 +815,12 @@ func applyTableListFlagsOnFullListAndAddLeafPartitions(fullTableList []sqlname.N
if err != nil {
return nil, err
}
} else {
//this is only for filtering the non-leaf and non-root tables - the mid level partitioned table from fullTableList
_, includeTableList, err = addLeafPartitionsInTableList(includeTableList, false)
if err != nil {
return nil, fmt.Errorf("error keeping only leaf and root tables: %v", err)
}
}
//return the final table list generated from the command flags table-list (include list) / exclude-table-list in this run
return sqlname.SetDifferenceNameTuples(includeTableList, excludeTableList), nil
Expand Down Expand Up @@ -940,20 +951,9 @@ func getInitialTableList() (map[string]string, []sqlname.NameTuple, error) {

//guardrails around the table-list in case of re-run

//TODO: fix later this registered list also contains sequence names right now
//so in case some include/exclude table names are not proper we will display this registeredList as valid table names
registeredListObjNames, err := namereg.NameReg.GetRegisteredTableList()
registeredList, err := getRegisteredNameRegList()
if err != nil {
utils.ErrExit("error getting registered list in name registry: %v", err)
}

registeredList := make([]sqlname.NameTuple, 0)
for _, obj := range registeredListObjNames {
tuple, err := getNameTupleFromQualifiedObject(obj.Qualified.Quoted, obj, false)
if err != nil {
return nil, nil, fmt.Errorf("error in getting the name tuple for the qualified object: %s: %v", obj.Qualified.Quoted, obj)
}
registeredList = append(registeredList, tuple)
return nil, nil, fmt.Errorf("error getting name registry list: %v", err)
}

rootTables := make([]sqlname.NameTuple, 0)
Expand All @@ -964,6 +964,9 @@ func getInitialTableList() (map[string]string, []sqlname.NameTuple, error) {
}
rootTables = append(rootTables, tuple)
}
rootTables = lo.UniqBy(rootTables, func(t sqlname.NameTuple) string {
return t.ForKey()
})

// Finding all the partitions of all root tables part of migration, and report if there any new partitions added
rootToNewLeafTablesMap, err := detectReportNewLeafPartitionsOnPartitionedTables(rootTables, registeredList)
Expand All @@ -977,8 +980,20 @@ func getInitialTableList() (map[string]string, []sqlname.NameTuple, error) {
return partitionsToRootTableMap, firstRunTableWithLeafsAndRoots, nil
}

firstRunTableWithLeafParititons, currentRunTableListWithLeafPartitions, err := applyTableListFlagsOnCurrentAndRemoveRootsFromBothLists(registeredList, source.TableList, source.ExcludeTableList, rootToNewLeafTablesMap, rootTables, firstRunTableWithLeafsAndRoots)
if err != nil {
return nil, nil, fmt.Errorf("error applying table list flags for current list and remove roots: %v", err)
}
//Reporting the guardrail msgs only on leaf tables to be consistent so filtering the root table from both the list
guardrailsAroundFirstRunAndCurrentRunTableList(firstRunTableWithLeafParititons, currentRunTableListWithLeafPartitions)

return partitionsToRootTableMap, firstRunTableWithLeafsAndRoots, nil

}

func applyTableListFlagsOnCurrentAndRemoveRootsFromBothLists(registeredList []sqlname.NameTuple, tableListViaFlag string, excludeTableListViaFlag string, rootToNewLeafTablesMap map[string][]string, rootTables []sqlname.NameTuple, firstRunTableWithLeafsAndRoots []sqlname.NameTuple) ([]sqlname.NameTuple, []sqlname.NameTuple, error) {
//apply include/exclude flags and if a new table is passed (which is not present in name registry), then error out Unknown table
currentRunTableListFilteredViaFlags, err := applyTableListFlagsOnFullListAndAddLeafPartitions(registeredList, source.TableList, source.ExcludeTableList)
currentRunTableListFilteredViaFlags, err := applyTableListFlagsOnFullListAndAddLeafPartitions(registeredList, tableListViaFlag, excludeTableListViaFlag)
if err != nil {
return nil, nil, fmt.Errorf("error in apply table list filter on registered list for the flags in current run: %v", err)
}
Expand Down Expand Up @@ -1018,14 +1033,30 @@ func getInitialTableList() (map[string]string, []sqlname.NameTuple, error) {
})
})

//Reporting the guardrail msgs only on leaf tables to be consistent so filtering the root table from both the list
guardrailsAroundFirstRunAndCurrentRunTableList(firstRunTableWithLeafParititons, currentRunTableListWithLeafPartitions)
return firstRunTableWithLeafParititons, currentRunTableListWithLeafPartitions, nil
}

return partitionsToRootTableMap, firstRunTableWithLeafsAndRoots, nil
func getRegisteredNameRegList() ([]sqlname.NameTuple, error) {
//TODO: fix later this registered list also contains sequence names right now
//so in case some include/exclude table names are not proper we will display this registeredList as valid table names
//caveat for this is that in some cases while showing list on console we might list sequences as well, will fix this
registeredListObjNames, err := namereg.NameReg.GetRegisteredTableList()
if err != nil {
utils.ErrExit("error getting registered list in name registry: %v", err)
}

registeredList := make([]sqlname.NameTuple, 0)
for _, obj := range registeredListObjNames {
tuple, err := getNameTupleFromQualifiedObject(obj.Qualified.Quoted, obj, false)
if err != nil {
return nil, fmt.Errorf("error in getting the name tuple for the qualified object: %s: %v", obj.Qualified.Quoted, obj)
}
registeredList = append(registeredList, tuple)
}
return registeredList, nil
}

func guardrailsAroundFirstRunAndCurrentRunTableList(firstRunTableListWithLeafPartitions, currentRunTableListWithLeafPartitions []sqlname.NameTuple) {
func guardrailsAroundFirstRunAndCurrentRunTableList(firstRunTableListWithLeafPartitions, currentRunTableListWithLeafPartitions []sqlname.NameTuple) ([]sqlname.NameTuple, []sqlname.NameTuple) {
missingTables := sqlname.SetDifferenceNameTuples(firstRunTableListWithLeafPartitions, currentRunTableListWithLeafPartitions)
extraTables := sqlname.SetDifferenceNameTuples(currentRunTableListWithLeafPartitions, firstRunTableListWithLeafPartitions)

Expand All @@ -1041,14 +1072,16 @@ func guardrailsAroundFirstRunAndCurrentRunTableList(firstRunTableListWithLeafPar
return t.ForMinOutput()
}), ","))
}
msg := fmt.Sprintf("Using the table list passed in the initial phase of migration - %v. \nDo you want continue?", lo.Map(firstRunTableListWithLeafPartitions, func(t sqlname.NameTuple, _ int) string {
msg := fmt.Sprintf("Using the table list passed in the initial phase of migration - %v. \nDo you want continue?", strings.Join(lo.Map(firstRunTableListWithLeafPartitions, func(t sqlname.NameTuple, _ int) string {
return t.ForMinOutput()
}))
}), ","))
if !utils.AskPrompt(msg) {
utils.ErrExit("Aborting, Start a fresh migration...")
}
}

return missingTables, extraTables

}

func detectReportNewLeafPartitionsOnPartitionedTables(rootTables []sqlname.NameTuple, registeredList []sqlname.NameTuple) (map[string][]string, error) {
Expand Down
Loading

0 comments on commit b842a60

Please sign in to comment.