Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

client: expose callback function #9032

Merged
merged 2 commits into from
Feb 5, 2025
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
11 changes: 6 additions & 5 deletions client/servicediscovery/callbacks.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,23 +20,24 @@ import (
"github.com/pingcap/kvproto/pkg/pdpb"
)

type leaderSwitchedCallbackFunc func(string) error
// LeaderSwitchedCallbackFunc is the callback function for leader switched event
type LeaderSwitchedCallbackFunc func(string) error

// serviceCallbacks contains all the callback functions for service discovery events
type serviceCallbacks struct {
sync.RWMutex
// serviceModeUpdateCb will be called when the service mode gets updated
serviceModeUpdateCb func(pdpb.ServiceMode)
// leaderSwitchedCbs will be called after the leader switched
leaderSwitchedCbs []leaderSwitchedCallbackFunc
leaderSwitchedCbs []LeaderSwitchedCallbackFunc
// membersChangedCbs will be called after there is any membership change in the
// leader and followers
membersChangedCbs []func()
}

func newServiceCallbacks() *serviceCallbacks {
return &serviceCallbacks{
leaderSwitchedCbs: make([]leaderSwitchedCallbackFunc, 0),
leaderSwitchedCbs: make([]LeaderSwitchedCallbackFunc, 0),
membersChangedCbs: make([]func(), 0),
}
}
Expand All @@ -47,7 +48,7 @@ func (c *serviceCallbacks) setServiceModeUpdateCallback(cb func(pdpb.ServiceMode
c.serviceModeUpdateCb = cb
}

func (c *serviceCallbacks) addLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc) {
func (c *serviceCallbacks) addLeaderSwitchedCallback(cb LeaderSwitchedCallbackFunc) {
c.Lock()
defer c.Unlock()
c.leaderSwitchedCbs = append(c.leaderSwitchedCbs, cb)
Expand All @@ -72,7 +73,7 @@ func (c *serviceCallbacks) onServiceModeUpdate(mode pdpb.ServiceMode) {

func (c *serviceCallbacks) onLeaderSwitched(leader string) error {
c.RLock()
cbs := make([]leaderSwitchedCallbackFunc, len(c.leaderSwitchedCbs))
cbs := make([]LeaderSwitchedCallbackFunc, len(c.leaderSwitchedCbs))
copy(cbs, c.leaderSwitchedCbs)
c.RUnlock()

Expand Down
4 changes: 2 additions & 2 deletions client/servicediscovery/mock_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,10 +101,10 @@
func (*mockServiceDiscovery) CheckMemberChanged() error { return nil }

// ExecAndAddLeaderSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) ExecAndAddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}
func (*mockServiceDiscovery) ExecAndAddLeaderSwitchedCallback(LeaderSwitchedCallbackFunc) {}

Check warning on line 104 in client/servicediscovery/mock_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/mock_service_discovery.go#L104

Added line #L104 was not covered by tests

// AddLeaderSwitchedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}
func (*mockServiceDiscovery) AddLeaderSwitchedCallback(LeaderSwitchedCallbackFunc) {}

Check warning on line 107 in client/servicediscovery/mock_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/mock_service_discovery.go#L107

Added line #L107 was not covered by tests

// AddMembersChangedCallback implements the ServiceDiscovery interface.
func (*mockServiceDiscovery) AddMembersChangedCallback(func()) {}
8 changes: 4 additions & 4 deletions client/servicediscovery/service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,11 +130,11 @@ type ServiceDiscovery interface {
// in a quorum-based cluster or among the primary/secondaries in a primary/secondary configured cluster.
CheckMemberChanged() error
// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
ExecAndAddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc)
ExecAndAddLeaderSwitchedCallback(cb LeaderSwitchedCallbackFunc)
// AddLeaderSwitchedCallback adds callbacks which will be called when the leader
// in a quorum-based cluster or the primary in a primary/secondary configured cluster
// is switched.
AddLeaderSwitchedCallback(cb leaderSwitchedCallbackFunc)
AddLeaderSwitchedCallback(cb LeaderSwitchedCallbackFunc)
// AddMembersChangedCallback adds callbacks which will be called when any leader/follower
// in a quorum-based cluster or any primary/secondary in a primary/secondary configured cluster
// is changed.
Expand Down Expand Up @@ -780,7 +780,7 @@ func (c *serviceDiscovery) CheckMemberChanged() error {
}

// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
func (c *serviceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
func (c *serviceDiscovery) ExecAndAddLeaderSwitchedCallback(callback LeaderSwitchedCallbackFunc) {
url := c.getLeaderURL()
if len(url) > 0 {
if err := callback(url); err != nil {
Expand All @@ -794,7 +794,7 @@ func (c *serviceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitc
// AddLeaderSwitchedCallback adds callbacks which will be called when the leader
// in a quorum-based cluster or the primary in a primary/secondary configured cluster
// is switched.
func (c *serviceDiscovery) AddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
func (c *serviceDiscovery) AddLeaderSwitchedCallback(callback LeaderSwitchedCallbackFunc) {
c.callbacks.addLeaderSwitchedCallback(callback)
}

Expand Down
6 changes: 3 additions & 3 deletions client/servicediscovery/tso_service_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,7 +141,7 @@
clientConns sync.Map // Store as map[string]*grpc.ClientConn

// tsoLeaderUpdatedCb will be called when the TSO leader is updated.
tsoLeaderUpdatedCb leaderSwitchedCallbackFunc
tsoLeaderUpdatedCb LeaderSwitchedCallbackFunc

checkMembershipCh chan struct{}

Expand Down Expand Up @@ -359,7 +359,7 @@
}

// ExecAndAddLeaderSwitchedCallback executes the callback once and adds it to the callback list then.
func (c *tsoServiceDiscovery) ExecAndAddLeaderSwitchedCallback(callback leaderSwitchedCallbackFunc) {
func (c *tsoServiceDiscovery) ExecAndAddLeaderSwitchedCallback(callback LeaderSwitchedCallbackFunc) {
url := c.getPrimaryURL()
if len(url) > 0 {
if err := callback(url); err != nil {
Expand All @@ -371,7 +371,7 @@

// AddLeaderSwitchedCallback adds callbacks which will be called when the primary in
// a primary/secondary configured cluster is switched.
func (*tsoServiceDiscovery) AddLeaderSwitchedCallback(leaderSwitchedCallbackFunc) {}
func (*tsoServiceDiscovery) AddLeaderSwitchedCallback(LeaderSwitchedCallbackFunc) {}

Check warning on line 374 in client/servicediscovery/tso_service_discovery.go

View check run for this annotation

Codecov / codecov/patch

client/servicediscovery/tso_service_discovery.go#L374

Added line #L374 was not covered by tests

// AddMembersChangedCallback adds callbacks which will be called when any primary/secondary
// in a primary/secondary configured cluster is changed.
Expand Down