From 46c40405914bd72c518f0aa3177c9f73b5640b80 Mon Sep 17 00:00:00 2001 From: "Kaiyuan (Kevin) Liu" <20115614+kevinliu24@users.noreply.github.com> Date: Sat, 9 Nov 2024 04:23:58 -0800 Subject: [PATCH] [operator] run leader balance multiple times until balanced. (#532) [operator] run leader balance mutiple times until balanced. --- pkg/controller/component/storaged_updater.go | 12 +++++++++++- pkg/nebula/meta_client.go | 19 +++++++++++++++++++ 2 files changed, 30 insertions(+), 1 deletion(-) diff --git a/pkg/controller/component/storaged_updater.go b/pkg/controller/component/storaged_updater.go index 72e227bb..cc55e6ad 100644 --- a/pkg/controller/component/storaged_updater.go +++ b/pkg/controller/component/storaged_updater.go @@ -467,10 +467,20 @@ func (s *storagedUpdater) balanceLeader(mc nebula.MetaInterface, nc *v1alpha1.Ne if time.Now().Before(lastBalancedTime.Add(BalanceLeaderInterval * time.Second)) { return utilerrors.ReconcileErrorf("partition leader is balancing") } + + balanced, err := mc.IsLeaderBalanced(space.Name) + if err != nil { + return utilerrors.ReconcileErrorf("failed to check if the leader is balanced for space %s: %v", space.Name, err) + } + + if balanced { + nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID) + continue + } + if err := mc.BalanceLeader(*space.Id.SpaceID); err != nil { return err } - nc.Status.Storaged.BalancedSpaces = append(nc.Status.Storaged.BalancedSpaces, *space.Id.SpaceID) nc.Status.Storaged.LastBalancedTime = &metav1.Time{Time: time.Now()} return utilerrors.ReconcileErrorf("space %d need to be synced", *space.Id.SpaceID) } diff --git a/pkg/nebula/meta_client.go b/pkg/nebula/meta_client.go index e1bf9c36..d7bac272 100644 --- a/pkg/nebula/meta_client.go +++ b/pkg/nebula/meta_client.go @@ -53,6 +53,7 @@ type ( GetSpaceLeaderHosts(space []byte) ([]string, error) GetLeaderCount(leaderHost string) (int, error) BalanceStatus(jobID int32, spaceID nebula.GraphSpaceID) error + IsLeaderBalanced(spaceName []byte) (bool, error) BalanceLeader(spaceID nebula.GraphSpaceID) error BalanceData(spaceID nebula.GraphSpaceID) (int32, error) BalanceDataInZone(spaceID nebula.GraphSpaceID) (int32, error) @@ -356,6 +357,24 @@ func (m *metaClient) BalanceLeader(spaceID nebula.GraphSpaceID) error { return nil } +func (m *metaClient) IsLeaderBalanced(spaceName []byte) (bool, error) { + hosts, err := m.ListHosts(meta.ListHostType_ALLOC) + if err != nil { + return false, err + } + + for _, host := range hosts { + if host.Status != meta.HostStatus_ONLINE { + continue + } + if host.LeaderParts[(string)(spaceName)] == nil { + return false, nil + } + } + + return true, nil +} + func (m *metaClient) runAdminJob(req *meta.AdminJobReq) (int32, error) { resp, err := m.retryOnError(req, func(req interface{}) (interface{}, error) { resp, err := m.client.RunAdminJob(req.(*meta.AdminJobReq))