Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback to poller replication lag if heartbeat lag fails #207

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
25 changes: 22 additions & 3 deletions go/vt/vttablet/tabletserver/repltracker/repltracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ limitations under the License.
package repltracker

import (
"errors"
"fmt"
"sync"
"time"

Expand Down Expand Up @@ -47,6 +49,8 @@ var (
heartbeatLagNsHistogram = stats.NewGenericHistogram("HeartbeatLagNsHistogram",
"Histogram of lag values in nanoseconds", []int64{0, 1e6, 1e7, 1e8, 1e9, 1e10, 1e11, 1e12},
[]string{"0", "1ms", "10ms", "100ms", "1s", "10s", "100s", "1000s", ">1000s"}, "Count", "Total")

errFallback = errors.New("failed to obtain replication lag from poller after attempting to use it as fall-back for heartbeat")
)

// ReplTracker tracks replication lag.
Expand Down Expand Up @@ -133,14 +137,29 @@ func (rt *ReplTracker) Status() (time.Duration, error) {
rt.mu.Lock()
defer rt.mu.Unlock()

fallbackToPoller := false
var heartbeatLag, mysqlLag time.Duration
var heartbeatErr, mysqlErr error

switch {
case rt.isPrimary || rt.mode == tabletenv.Disable:
return 0, nil
case rt.mode == tabletenv.Heartbeat:
return rt.hr.Status()
// This should allow us to migrate safely to using vttablet heartbeat. If using heartbeat fails (e.g. because
// the shard's primary does not yet have them and therefore, either the heartbeat table is missing or it's
// empty), fall back to the poller. Otherwise, use what the heartbeat says.
if heartbeatLag, heartbeatErr = rt.hr.Status(); heartbeatErr == nil {
return heartbeatLag, heartbeatErr
}
fallbackToPoller = true
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need this boolean? If either of the case statements are met, we would return out anyways

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

then the condition below on line 158 can just be

if mysqlErr != nil {...}

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think so. The intention is to raise different errors in case rt.poller.Status() failed depending on whether fallback was used or not.

}
// rt.mode == tabletenv.Poller
return rt.poller.Status()
// rt.mode == tabletenv.Poller or fallback after heartbeat error
mysqlLag, mysqlErr = rt.poller.Status()
if fallbackToPoller && mysqlErr != nil {
return 0, fmt.Errorf("%w: %s", errFallback, mysqlErr)
}

return mysqlLag, mysqlErr
}

// EnableHeartbeat enables or disables writes of heartbeat. This functionality
Expand Down
73 changes: 73 additions & 0 deletions go/vt/vttablet/tabletserver/repltracker/repltracker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,3 +95,76 @@ func TestReplTracker(t *testing.T) {
_, err = rt.Status()
assert.Equal(t, "err", err.Error())
}

func TestStatusHeartbeatFallBack(t *testing.T) {
t.Parallel()

heartbeatErr := errors.New("some error reading heartbeat")
mysqlErr := errors.New("some mysql error")
testCases := []struct {
name string
heartbeatLag tabletenv.Seconds
heartbeatError error
mysqldLag uint
mysqldErr error
expectedError error
expectedLag time.Duration
}{
{
name: "Heartbeat successful",
heartbeatLag: tabletenv.Seconds(5.0),
heartbeatError: nil,
expectedLag: 5 * time.Second,
},
{
name: "Heartbeat failed, mysqld lag successful",
heartbeatError: heartbeatErr,
mysqldLag: 8,
expectedLag: 8 * time.Second,
},
{
name: "Heartbeat & mysqld lag failed",
heartbeatError: heartbeatErr,
mysqldErr: mysqlErr,
expectedError: errFallback,
},
}

for _, testCase := range testCases {
theCase := testCase

t.Run(theCase.name, func(t *testing.T) {
t.Parallel()
config := tabletenv.NewDefaultConfig()
config.ReplicationTracker.Mode = tabletenv.Heartbeat
config.ReplicationTracker.HeartbeatIntervalSeconds = theCase.heartbeatLag
env := tabletenv.NewEnv(config, "ReplTrackerTest")
alias := &topodatapb.TabletAlias{
Cell: "cell",
Uid: 1,
}
mysqld := fakemysqldaemon.NewFakeMysqlDaemon(nil)
mysqld.ReplicationLagSeconds = theCase.mysqldLag
mysqld.Replicating = true
mysqld.ReplicationStatusError = theCase.mysqldErr
target := &querypb.Target{}

rt := NewReplTracker(env, alias)

rt.hr.lastKnownLag = time.Duration(theCase.heartbeatLag) * time.Second
rt.hr.lastKnownError = theCase.heartbeatError
rt.InitDBConfig(target, mysqld)

lag, err := rt.Status()

if theCase.expectedError == nil {
assert.NoError(t, err)
assert.Equal(t, theCase.expectedLag, lag)
} else {
assert.ErrorIs(t, err, theCase.expectedError)
}

})
}

}
Loading