diff --git a/README.md b/README.md index 507460da..3745ff43 100644 --- a/README.md +++ b/README.md @@ -331,6 +331,7 @@ The tool supports the following per-partition, replica placement strategies: | `any` | Allow any replica placement | | `balanced-leaders` | Ensure that the leaders of each partition are evenly distributed across the broker racks | | `in-rack` | Ensure that the followers for each partition are in the same rack as the leader; generally this is done when the leaders are already balanced, but this isn't required | +| `cross-rack` | Ensure that the replicas for each partition are all in different racks; generally this is done when the leaders are already balanced, but this isn't required | | `static` | Specify the placement manually, via an extra `staticAssignments` field | | `static-in-rack` | Specify the rack placement per partition manually, via an extra `staticRackAssignments` field | diff --git a/pkg/apply/apply.go b/pkg/apply/apply.go index a9eaed4c..4f04a9cc 100644 --- a/pkg/apply/apply.go +++ b/pkg/apply/apply.go @@ -656,8 +656,8 @@ func (t *TopicApplier) updatePlacement( batchSize, newTopic, ) - case config.PlacementStrategyInRack: - // If we want in-rack, first check that the leaders are balanced; we don't + case config.PlacementStrategyInRack, config.PlacementStrategyCrossRack: + // If we want in-rack or cross-rack, first check that the leaders are balanced; we don't // block this, but we should at least warn the user before continuing. result, err = assigners.EvaluateAssignments( currAssignments, @@ -670,12 +670,13 @@ func (t *TopicApplier) updatePlacement( return err } if !result { - log.Info( - "Desired strategy is in-rack, but leaders aren't balanced. It is strongly suggested to do the latter first.", + log.Infof( + "Desired strategy is %s, but leaders aren't balanced. It is strongly suggested to do the latter first.", + desiredPlacement, ) ok, _ := Confirm( - "OK to apply in-rack despite having unbalanced leaders?", + fmt.Sprintf("OK to apply %s despite having unbalanced leaders?", desiredPlacement), t.config.SkipConfirm || t.config.DryRun, ) if !ok { @@ -764,6 +765,8 @@ func (t *TopicApplier) updatePlacementHelper( assigner = assigners.NewBalancedLeaderAssigner(t.brokers, picker) case config.PlacementStrategyInRack: assigner = assigners.NewSingleRackAssigner(t.brokers, picker) + case config.PlacementStrategyCrossRack: + assigner = assigners.NewCrossRackAssigner(t.brokers, picker) case config.PlacementStrategyStatic: assigner = &assigners.StaticAssigner{ Assignments: admin.ReplicasToAssignments( diff --git a/pkg/apply/assigners/cross_rack.go b/pkg/apply/assigners/cross_rack.go new file mode 100644 index 00000000..73066350 --- /dev/null +++ b/pkg/apply/assigners/cross_rack.go @@ -0,0 +1,125 @@ +package assigners + +import ( + "fmt" + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/apply/pickers" + "sort" +) + +// CrossRackAssigner is an assigner that ensures that the replicas of each +// partition are on different racks than each other. The algorithm is: +// +// https://segment.atlassian.net/browse/DRES-922?focusedCommentId=237288 +// +// for each partition: +// for each non-leader replica: +// if replica is in same rack as leader: +// change replica to a placeholder (-1) +// +// then: +// +// for each partition: +// for each non-leader replica: +// if replica is set to placeholder: +// use picker to replace it with a broker in a different rack than the leader and any other replicas +// +// Note that this assigner doesn't make any leader changes. Thus, the assignments +// need to already be leader balanced before we make the changes with this assigner. +type CrossRackAssigner struct { + brokers []admin.BrokerInfo + brokerRacks map[int]string + brokersPerRack map[string][]int + picker pickers.Picker +} + +var _ Assigner = (*CrossRackAssigner)(nil) + +// NewCrossRackAssigner creates and returns a CrossRackAssigner instance. +func NewCrossRackAssigner( + brokers []admin.BrokerInfo, + picker pickers.Picker, +) *CrossRackAssigner { + return &CrossRackAssigner{ + brokers: brokers, + brokerRacks: admin.BrokerRacks(brokers), + brokersPerRack: admin.BrokersPerRack(brokers), + picker: picker, + } +} + +// Assign returns a new partition assignment according to the assigner-specific logic. +func (s *CrossRackAssigner) Assign( + topic string, + curr []admin.PartitionAssignment, +) ([]admin.PartitionAssignment, error) { + if err := admin.CheckAssignments(curr); err != nil { + return nil, err + } + + // Check to make sure that the number of racks is >= number of replicas. + // Otherwise, we won't be able to find a feasible assignment. + if len(s.brokersPerRack) < len(curr[0].Replicas) { + return nil, fmt.Errorf("Do not have enough racks for cross-rack placement") + } + + desired := admin.CopyAssignments(curr) + + // First, null-out any replicas that are in the wrong rack, and record the racks we keep + usedRacksPerPartition := make([]map[string]bool, len(curr)) + for index, assignment := range desired { + usedRacks := make(map[string]bool, len(curr)) + for r, replica := range assignment.Replicas { + replicaRack := s.brokerRacks[replica] + if _, used := usedRacks[replicaRack]; used { + // Rack has already been seen, null it out since we need to replace it + desired[index].Replicas[r] = -1 + } else { + // First time using this rack for this partition + usedRacks[replicaRack] = true + } + } + usedRacksPerPartition[index] = usedRacks + } + + // Which racks did we not use yet? + availableRacksPerPartition := make([][]string, 0, len(curr)) + for _, usedRacks := range usedRacksPerPartition { + availableRacks := make(map[string]bool) + for _, rack := range s.brokerRacks { + if _, used := usedRacks[rack]; !used { + availableRacks[rack] = true + } + } + sortedRacks := make([]string, 0, len(availableRacks)) + for r := range availableRacks { + sortedRacks = append(sortedRacks, r) + } + sort.Strings(sortedRacks) + availableRacksPerPartition = append(availableRacksPerPartition, sortedRacks) + } + + // Then, go back and replace all of the marked replicas with replicas from available racks + for index, assignment := range desired { + for r, replica := range assignment.Replicas { + if replica == -1 { + // Pop the 1st rack off and pick one of the brokers in that rack + targetRack, remainingRacks := availableRacksPerPartition[index][0], availableRacksPerPartition[index][1:] + availableRacksPerPartition[index] = remainingRacks + targetBrokers := s.brokersPerRack[targetRack] + err := s.picker.PickNew( + topic, + targetBrokers, + desired, + index, + r, + ) + if err != nil { + return nil, err + } + } + } + } + + return desired, nil +} diff --git a/pkg/apply/assigners/cross_rack_test.go b/pkg/apply/assigners/cross_rack_test.go new file mode 100644 index 00000000..f4c65865 --- /dev/null +++ b/pkg/apply/assigners/cross_rack_test.go @@ -0,0 +1,248 @@ +package assigners + +import ( + "errors" + "testing" + + "github.com/segmentio/topicctl/pkg/admin" + "github.com/segmentio/topicctl/pkg/apply/pickers" + "github.com/segmentio/topicctl/pkg/config" +) + +func TestCrossRackAssignerThreeReplicas(t *testing.T) { + brokers := testBrokers(12, 3) + assigner := NewCrossRackAssigner(brokers, pickers.NewLowestIndexPicker()) + checker := func(result []admin.PartitionAssignment) bool { + ok, _ := EvaluateAssignments( + result, + brokers, + config.TopicPlacementConfig{ + Strategy: config.PlacementStrategyCrossRack, + }, + ) + return ok + } + + testCases := []assignerTestCase{ + { + description: "Already cross rack", + curr: [][]int{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + }, + expected: [][]int{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + }, + checker: checker, + }, + { + description: "Single change", + curr: [][]int{ + {1, 2, 3}, + {4, 5, 6}, + {7, 4, 9}, + }, + expected: [][]int{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + }, + checker: checker, + }, + { + description: "Multiple changes", + curr: [][]int{ + {1, 4, 7}, + {2, 5, 8}, + {9, 3, 11}, + {8, 4, 11}, + {10, 4, 11}, + {5, 4, 11}, + {7, 4, 11}, + {3, 4, 11}, + {12, 4, 11}, + }, + expected: [][]int{ + {1, 2, 3}, + {2, 1, 6}, + // 2nd position should be replaced with different rack than other 2 replicas + // 3rd position is a valid rack already, should be left alone + {9, 7, 11}, + // 2nd & 3rd both were invalid racks + {8, 4, 9}, + // 3rd was a valid rack + {10, 3, 11}, + {5, 4, 12}, + {7, 6, 11}, + {3, 4, 11}, + {12, 4, 11}, + }, + checker: checker, + }, + { + description: "Changes with all replicas", + curr: [][]int{ + {1, 4, 7}, + {2, 5, 8}, + {3, 6, 9}, + {4, 7, 10}, + {5, 8, 11}, + {6, 9, 12}, + {7, 10, 1}, + {8, 11, 2}, + {9, 12, 3}, + {10, 1, 4}, + {11, 2, 5}, + {12, 3, 6}, + }, + expected: [][]int{ + {1, 2, 3}, + {2, 1, 6}, + {3, 4, 2}, + {4, 5, 9}, + {5, 7, 12}, + {6, 10, 5}, + {7, 8, 3}, + {8, 1, 6}, + {9, 4, 8}, + {10, 11, 9}, + {11, 7, 12}, + {12, 10, 11}, + }, + checker: checker, + }, + } + + for _, testCase := range testCases { + testCase.evaluate(t, assigner) + } +} + +func TestCrossRackAssignerThreeReplicasRandomized(t *testing.T) { + brokers := testBrokers(12, 3) + assigner := NewCrossRackAssigner(brokers, pickers.NewRandomizedPicker()) + checker := func(result []admin.PartitionAssignment) bool { + ok, _ := EvaluateAssignments( + result, + brokers, + config.TopicPlacementConfig{ + Strategy: config.PlacementStrategyCrossRack, + }, + ) + return ok + } + + testCases := []assignerTestCase{ + { + description: "Already cross rack", + curr: [][]int{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + }, + expected: [][]int{ + {1, 2, 3}, + {4, 5, 6}, + {7, 8, 9}, + }, + checker: checker, + }, + { + description: "Single change", + curr: [][]int{ + {1, 2, 3}, + {4, 10, 6}, + {7, 10, 9}, + }, + expected: [][]int{ + {1, 2, 3}, + {4, 11, 6}, + {7, 5, 9}, + }, + checker: checker, + }, + { + description: "Changes with all replicas", + curr: [][]int{ + {1, 4, 7}, + {2, 5, 8}, + {3, 6, 9}, + {4, 7, 10}, + {5, 8, 11}, + {6, 9, 12}, + {7, 10, 1}, + {8, 11, 2}, + {9, 12, 3}, + {10, 1, 4}, + {11, 2, 5}, + {12, 3, 6}, + }, + expected: [][]int{ + {1, 2, 6}, + {2, 10, 9}, + {3, 1, 8}, + {4, 5, 3}, + {5, 4, 12}, + {6, 7, 11}, + {7, 8, 9}, + {8, 10, 12}, + {9, 1, 5}, + {10, 11, 3}, + {11, 4, 6}, + {12, 7, 2}, + }, + checker: checker, + }, + } + + for _, testCase := range testCases { + testCase.evaluate(t, assigner) + } +} + +func TestCrossRackAssignerTwoReplicas(t *testing.T) { + brokers := testBrokers(6, 3) + assigner := NewCrossRackAssigner(brokers, pickers.NewLowestIndexPicker()) + checker := func(result []admin.PartitionAssignment) bool { + ok, _ := EvaluateAssignments( + result, + brokers, + config.TopicPlacementConfig{ + Strategy: config.PlacementStrategyCrossRack, + }, + ) + return ok + } + + testCases := []assignerTestCase{ + { + description: "Already cross rack", + curr: [][]int{ + {1, 2}, + {3, 4}, + {5, 6}, + }, + expected: [][]int{ + {1, 2}, + {3, 4}, + {5, 6}, + }, + checker: checker, + }, + { + description: "Error due to more replicas than racks", + curr: [][]int{ + {1, 2, 3, 4}, + {5, 6, 1, 2}, + }, + err: errors.New("more replicas than racks"), + }, + } + + for _, testCase := range testCases { + testCase.evaluate(t, assigner) + } +} \ No newline at end of file diff --git a/pkg/apply/assigners/evaluate.go b/pkg/apply/assigners/evaluate.go index 9e359c7d..36f90e74 100644 --- a/pkg/apply/assigners/evaluate.go +++ b/pkg/apply/assigners/evaluate.go @@ -57,6 +57,14 @@ func EvaluateAssignments( return balanced, nil case config.PlacementStrategyInRack: return minRacks == 1 && maxRacks == 1, nil + case config.PlacementStrategyCrossRack: + brokerRacks := admin.BrokerRacks(brokers) + for _, assignment := range assignments { + if len(assignment.Replicas) != len(assignment.DistinctRacks(brokerRacks)) { + return false, nil + } + } + return true, nil default: return false, fmt.Errorf( "Unrecognized placementStrategy: %s", diff --git a/pkg/config/topic.go b/pkg/config/topic.go index 08d5ae83..23d2caaf 100644 --- a/pkg/config/topic.go +++ b/pkg/config/topic.go @@ -27,6 +27,10 @@ const ( // and the replicas for each partition are in the same rack as the leader. PlacementStrategyInRack PlacementStrategy = "in-rack" + // PlacementStrategyCrossRack is a strategy in which the leaders are balanced + // and the replicas in each partition are spread to separate racks. + PlacementStrategyCrossRack PlacementStrategy = "cross-rack" + // PlacementStrategyStatic uses a static placement defined in the config. This is for // testing only and should generally not be used in production. PlacementStrategyStatic PlacementStrategy = "static" @@ -41,6 +45,7 @@ var allPlacementStrategies = []PlacementStrategy{ PlacementStrategyAny, PlacementStrategyBalancedLeaders, PlacementStrategyInRack, + PlacementStrategyCrossRack, PlacementStrategyStatic, PlacementStrategyStaticInRack, } @@ -261,6 +266,17 @@ func (t TopicConfig) Validate(numRacks int) error { ), ) } + case PlacementStrategyCrossRack: + if t.Spec.ReplicationFactor > numRacks { + err = multierror.Append( + err, + fmt.Errorf( + "Replication factor (%d) cannot be larger than the number of racks (%d)", + t.Spec.ReplicationFactor, + numRacks, + ), + ) + } case PlacementStrategyInRack: case PlacementStrategyStatic: if len(placement.StaticAssignments) != t.Spec.Partitions {