Skip to content

Commit

Permalink
add cross-rack placement strategy & assigner (#52)
Browse files Browse the repository at this point in the history
* add cross-rack assigner

* make it clear that the comparison is the distinct racks in *this* partition's assignment

* fix formatting of Infof, don't believe your lying eyes when in GoLand

* document cross-rack placement strategy in README
  • Loading branch information
erikdw authored Nov 17, 2021
1 parent e59ce91 commit 17ef91f
Show file tree
Hide file tree
Showing 6 changed files with 406 additions and 5 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -331,6 +331,7 @@ The tool supports the following per-partition, replica placement strategies:
| `any` | Allow any replica placement |
| `balanced-leaders` | Ensure that the leaders of each partition are evenly distributed across the broker racks |
| `in-rack` | Ensure that the followers for each partition are in the same rack as the leader; generally this is done when the leaders are already balanced, but this isn't required |
| `cross-rack` | Ensure that the replicas for each partition are all in different racks; generally this is done when the leaders are already balanced, but this isn't required |
| `static` | Specify the placement manually, via an extra `staticAssignments` field |
| `static-in-rack` | Specify the rack placement per partition manually, via an extra `staticRackAssignments` field |

Expand Down
13 changes: 8 additions & 5 deletions pkg/apply/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,8 +656,8 @@ func (t *TopicApplier) updatePlacement(
batchSize,
newTopic,
)
case config.PlacementStrategyInRack:
// If we want in-rack, first check that the leaders are balanced; we don't
case config.PlacementStrategyInRack, config.PlacementStrategyCrossRack:
// If we want in-rack or cross-rack, first check that the leaders are balanced; we don't
// block this, but we should at least warn the user before continuing.
result, err = assigners.EvaluateAssignments(
currAssignments,
Expand All @@ -670,12 +670,13 @@ func (t *TopicApplier) updatePlacement(
return err
}
if !result {
log.Info(
"Desired strategy is in-rack, but leaders aren't balanced. It is strongly suggested to do the latter first.",
log.Infof(
"Desired strategy is %s, but leaders aren't balanced. It is strongly suggested to do the latter first.",
desiredPlacement,
)

ok, _ := Confirm(
"OK to apply in-rack despite having unbalanced leaders?",
fmt.Sprintf("OK to apply %s despite having unbalanced leaders?", desiredPlacement),
t.config.SkipConfirm || t.config.DryRun,
)
if !ok {
Expand Down Expand Up @@ -764,6 +765,8 @@ func (t *TopicApplier) updatePlacementHelper(
assigner = assigners.NewBalancedLeaderAssigner(t.brokers, picker)
case config.PlacementStrategyInRack:
assigner = assigners.NewSingleRackAssigner(t.brokers, picker)
case config.PlacementStrategyCrossRack:
assigner = assigners.NewCrossRackAssigner(t.brokers, picker)
case config.PlacementStrategyStatic:
assigner = &assigners.StaticAssigner{
Assignments: admin.ReplicasToAssignments(
Expand Down
125 changes: 125 additions & 0 deletions pkg/apply/assigners/cross_rack.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
package assigners

import (
"fmt"
"github.com/segmentio/topicctl/pkg/admin"
"github.com/segmentio/topicctl/pkg/apply/pickers"
"sort"
)

// CrossRackAssigner is an assigner that ensures that the replicas of each
// partition are on different racks than each other. The algorithm is:
//
// https://segment.atlassian.net/browse/DRES-922?focusedCommentId=237288
//
// for each partition:
// for each non-leader replica:
// if replica is in same rack as leader:
// change replica to a placeholder (-1)
//
// then:
//
// for each partition:
// for each non-leader replica:
// if replica is set to placeholder:
// use picker to replace it with a broker in a different rack than the leader and any other replicas
//
// Note that this assigner doesn't make any leader changes. Thus, the assignments
// need to already be leader balanced before we make the changes with this assigner.
type CrossRackAssigner struct {
brokers []admin.BrokerInfo
brokerRacks map[int]string
brokersPerRack map[string][]int
picker pickers.Picker
}

var _ Assigner = (*CrossRackAssigner)(nil)

// NewCrossRackAssigner creates and returns a CrossRackAssigner instance.
func NewCrossRackAssigner(
brokers []admin.BrokerInfo,
picker pickers.Picker,
) *CrossRackAssigner {
return &CrossRackAssigner{
brokers: brokers,
brokerRacks: admin.BrokerRacks(brokers),
brokersPerRack: admin.BrokersPerRack(brokers),
picker: picker,
}
}

// Assign returns a new partition assignment according to the assigner-specific logic.
func (s *CrossRackAssigner) Assign(
topic string,
curr []admin.PartitionAssignment,
) ([]admin.PartitionAssignment, error) {
if err := admin.CheckAssignments(curr); err != nil {
return nil, err
}

// Check to make sure that the number of racks is >= number of replicas.
// Otherwise, we won't be able to find a feasible assignment.
if len(s.brokersPerRack) < len(curr[0].Replicas) {
return nil, fmt.Errorf("Do not have enough racks for cross-rack placement")
}

desired := admin.CopyAssignments(curr)

// First, null-out any replicas that are in the wrong rack, and record the racks we keep
usedRacksPerPartition := make([]map[string]bool, len(curr))
for index, assignment := range desired {
usedRacks := make(map[string]bool, len(curr))
for r, replica := range assignment.Replicas {
replicaRack := s.brokerRacks[replica]
if _, used := usedRacks[replicaRack]; used {
// Rack has already been seen, null it out since we need to replace it
desired[index].Replicas[r] = -1
} else {
// First time using this rack for this partition
usedRacks[replicaRack] = true
}
}
usedRacksPerPartition[index] = usedRacks
}

// Which racks did we not use yet?
availableRacksPerPartition := make([][]string, 0, len(curr))
for _, usedRacks := range usedRacksPerPartition {
availableRacks := make(map[string]bool)
for _, rack := range s.brokerRacks {
if _, used := usedRacks[rack]; !used {
availableRacks[rack] = true
}
}
sortedRacks := make([]string, 0, len(availableRacks))
for r := range availableRacks {
sortedRacks = append(sortedRacks, r)
}
sort.Strings(sortedRacks)
availableRacksPerPartition = append(availableRacksPerPartition, sortedRacks)
}

// Then, go back and replace all of the marked replicas with replicas from available racks
for index, assignment := range desired {
for r, replica := range assignment.Replicas {
if replica == -1 {
// Pop the 1st rack off and pick one of the brokers in that rack
targetRack, remainingRacks := availableRacksPerPartition[index][0], availableRacksPerPartition[index][1:]
availableRacksPerPartition[index] = remainingRacks
targetBrokers := s.brokersPerRack[targetRack]
err := s.picker.PickNew(
topic,
targetBrokers,
desired,
index,
r,
)
if err != nil {
return nil, err
}
}
}
}

return desired, nil
}
Loading

0 comments on commit 17ef91f

Please sign in to comment.