Skip to content

Commit

Permalink
Remove waitForLeaderUpdate and fix potential segfault
Browse files Browse the repository at this point in the history
  • Loading branch information
ploubser committed Dec 11, 2024
1 parent 545baf1 commit e379cb1
Showing 1 changed file with 5 additions and 44 deletions.
49 changes: 5 additions & 44 deletions balancer/balancer.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,9 @@
package balancer

import (
"context"
"fmt"
"math"
"slices"
"time"

"github.com/nats-io/jsm.go"
"github.com/nats-io/jsm.go/api"
Expand Down Expand Up @@ -159,13 +157,10 @@ func (b *Balancer) balance(servers map[string]*peer, evenDistribution int) (int,
for i := 0; i < s.rebalance; i++ {
randomIndex := rand.Intn(len(s.entities))
entity := s.entities[randomIndex]
b.log.Infof("Requesting leader (%s) step down for %s", s.hostname, entity.Name())
info, err := entity.ClusterInfo()
if err != nil {
return 0, err
if entity == nil {
return steppedDown, fmt.Errorf("no more valid entities to balance")
}

currentLeader := info.Leader
b.log.Infof("Requesting leader (%s) step down for %s", s.hostname, entity.Name())

err = entity.LeaderStepDown()
if err != nil {
Expand All @@ -175,23 +170,13 @@ func (b *Balancer) balance(servers map[string]*peer, evenDistribution int) (int,
if retries == 0 {
i--
retries++
s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1)
continue
}
return 0, err
} else {
b.log.Infof("Successful step down %s ", entity.Name())
}

err = b.waitForLeaderUpdate(currentLeader, entity)
if err != nil {
// If leader election doesn't result in a new leader we will retry once before giving up
if retries == 0 {
i--
retries++
continue
}
return 0, err
}
b.log.Infof("Successful step down '%s'", entity.Name())
retries = 0
s.entities = slices.Delete(s.entities, randomIndex, randomIndex+1)
steppedDown += 1
Expand All @@ -211,30 +196,6 @@ func (b *Balancer) balance(servers map[string]*peer, evenDistribution int) (int,
return steppedDown, nil
}

func (b *Balancer) waitForLeaderUpdate(currentLeader string, entity balanceEntity) error {
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
ticker := time.NewTicker(250 * time.Millisecond)
defer ticker.Stop()

for {
select {
case <-ticker.C:
info, err := entity.ClusterInfo()
if err != nil {
continue
}
if currentLeader == info.Leader {
return nil
}

return nil
case <-ctx.Done():
return fmt.Errorf("leader did not change - %s", entity.Name())
}
}
}

// BalanceStreams finds the expected distribution of stream leaders over servers
// and forces leader election on any with an uneven distribution
func (b *Balancer) BalanceStreams(streams []*jsm.Stream) (int, error) {
Expand Down

0 comments on commit e379cb1

Please sign in to comment.