Skip to content

Commit

Permalink
[operator] run leader balance multiple times until balanced. (#532)
Browse files Browse the repository at this point in the history
[operator] run leader balance mutiple times until balanced.
  • Loading branch information
kevinliu24 authored Nov 9, 2024
1 parent 77f0684 commit 46c4040
Show file tree
Hide file tree
Showing 2 changed files with 30 additions and 1 deletion.
12 changes: 11 additions & 1 deletion pkg/controller/component/storaged_updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -467,10 +467,20 @@ func (s *storagedUpdater) balanceLeader(mc nebula.MetaInterface, nc *v1alpha1.Ne
if time.Now().Before(lastBalancedTime.Add(BalanceLeaderInterval * time.Second)) {
return utilerrors.ReconcileErrorf("partition leader is balancing")
}

balanced, err := mc.IsLeaderBalanced(space.Name)
if err != nil {
return utilerrors.ReconcileErrorf("failed to check if the leader is balanced for space %s: %v", space.Name, err)
}

if balanced {
nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID)
continue
}

if err := mc.BalanceLeader(*space.Id.SpaceID); err != nil {
return err
}
nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID)
nc.Status.Storaged.LastBalancedTime = &metav1.Time{Time: time.Now()}
return utilerrors.ReconcileErrorf("space %d need to be synced", *space.Id.SpaceID)
}
Expand Down
19 changes: 19 additions & 0 deletions pkg/nebula/meta_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@ type (
GetSpaceLeaderHosts(space []byte) ([]string, error)
GetLeaderCount(leaderHost string) (int, error)
BalanceStatus(jobID int32, spaceID nebula.GraphSpaceID) error
IsLeaderBalanced(spaceName []byte) (bool, error)
BalanceLeader(spaceID nebula.GraphSpaceID) error
BalanceData(spaceID nebula.GraphSpaceID) (int32, error)
BalanceDataInZone(spaceID nebula.GraphSpaceID) (int32, error)
Expand Down Expand Up @@ -356,6 +357,24 @@ func (m *metaClient) BalanceLeader(spaceID nebula.GraphSpaceID) error {
return nil
}

func (m *metaClient) IsLeaderBalanced(spaceName []byte) (bool, error) {
hosts, err := m.ListHosts(meta.ListHostType_ALLOC)
if err != nil {
return false, err
}

for _, host := range hosts {
if host.Status != meta.HostStatus_ONLINE {
continue
}
if host.LeaderParts[(string)(spaceName)] == nil {
return false, nil
}
}

return true, nil
}

func (m *metaClient) runAdminJob(req *meta.AdminJobReq) (int32, error) {
resp, err := m.retryOnError(req, func(req interface{}) (interface{}, error) {
resp, err := m.client.RunAdminJob(req.(*meta.AdminJobReq))
Expand Down

0 comments on commit 46c4040

Please sign in to comment.