Skip to content

Commit

Permalink
Tablet throttler: multi-metric support (#15988)
Browse files Browse the repository at this point in the history
Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach authored Jul 11, 2024
1 parent b8497b0 commit cd0c2b5
Show file tree
Hide file tree
Showing 65 changed files with 14,944 additions and 17,739 deletions.
3 changes: 3 additions & 0 deletions .github/workflows/upgrade_downgrade_test_onlineddl_flow.yml
Original file line number Diff line number Diff line change
Expand Up @@ -140,6 +140,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
timeout-minutes: 10
run: |
echo "building last release: $(git rev-parse HEAD)"
source build.env
make build
mkdir -p /tmp/vitess-build-last/
Expand All @@ -162,6 +163,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
timeout-minutes: 10
run: |
echo "building next release: $(git rev-parse HEAD)"
source build.env
NOVTADMINBUILD=1 make build
mkdir -p /tmp/vitess-build-next/
Expand All @@ -182,6 +184,7 @@ jobs:
if: steps.skip-workflow.outputs.skip-workflow == 'false' && steps.changes.outputs.end_to_end == 'true'
timeout-minutes: 10
run: |
echo "building this SHA: $(git rev-parse HEAD)"
source build.env
make build
mkdir -p /tmp/vitess-build-current/
Expand Down
22 changes: 21 additions & 1 deletion changelog/21.0/21.0.0/summary.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
- [VTTablet Flags](#vttablet-flags)
- **[Traffic Mirroring](#traffic-mirroring)**
- **[New VTGate Shutdown Behavior](#new-vtgate-shutdown-behavior)**
- **[Tablet Throttler: Multi-Metric support](#tablet-throttler)**

## <a id="major-changes"/>Major Changes

Expand Down Expand Up @@ -60,4 +61,23 @@ without getting a `Server shutdown in progress` error.

This new behavior can be enabled by specifying the new `--mysql-server-drain-onterm` flag to VTGate.

See more information about this change by [reading its RFC](https://github.com/vitessio/vitess/issues/15971).
See more information about this change by [reading its RFC](https://github.com/vitessio/vitess/issues/15971).

### <a id="tablet-throttler"/>Tablet Throttler: Multi-Metric support

Up till `v20`, the tablet throttler would only monitor and use a single metric. That would be replication lag, by default, or could be the result of a custom query. `v21` introduces a major redesign where the throttler monitors and uses multiple metrics at the same time, including the above two.

Backwards compatible with `v20`, the default behavior in `v21` is to monitor all metrics, but only use `lag` (if the cutsom query is undefined) or the `cutsom` metric (if the custom query is defined). A `v20` `PRIMARY` is compatible with a `v21` `REPLICA`, and a `v21` `PRIMARY` is compatible with a `v20` `REPLICA`.

However, with `v21` it is possible to assign any combination of metrics (one or more) for a given app. The throttler would then accept or reject the app's requests based on the health of _all_ assigned metrics. `v21` comes with a preset list metrics, expected to be expanded:

- `lag`: replication lag based on heartbeat injection.
- `threads_running`: concurrent active threads on the MySQL server.
- `loadavg`: per core load average measured on the tablet instance/pod.
- `custom`: the result of a custom query executed on the MySQL server.

Each metric has a factory threshold which can be overridden by the `UpdateThrottlerConfig` command.

The throttler also supports the catch-all `"all"` app name, and it is thus possible to assign metrics to _all_ apps. Explicit app to metric assignments will override the catch-all configuration.

Metrics are assigned a default _scope_, which could be `self` (isolated to the tablet) or `shard` (max, aka _worst_ value among shard tablets). It is further possible to require a different scope for each metric.
3 changes: 2 additions & 1 deletion go/cmd/vtctldclient/command/onlineddl.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@ import (

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/proto/vttime"
)

const (
Expand Down Expand Up @@ -307,7 +308,7 @@ func throttleCommandHelper(cmd *cobra.Command, throttleType bool) error {
rule.ExpiresAt = protoutil.TimeToProto(time.Now().Add(throttle.DefaultAppThrottleDuration))
} else {
rule.Ratio = 0
rule.ExpiresAt = protoutil.TimeToProto(time.Now())
rule.ExpiresAt = &vttime.Time{} // zero
}

if strings.ToLower(uuid) == AllMigrationsIndicator {
Expand Down
113 changes: 108 additions & 5 deletions go/cmd/vtctldclient/command/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,11 @@ import (

topodatapb "vitess.io/vitess/go/vt/proto/topodata"
vtctldatapb "vitess.io/vitess/go/vt/proto/vtctldata"
"vitess.io/vitess/go/vt/proto/vttime"
"vitess.io/vitess/go/vt/topo/topoproto"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"
)

var (
Expand All @@ -37,33 +41,61 @@ var (
Short: "Update the tablet throttler configuration for all tablets in the given keyspace (across all cells)",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
PreRunE: validateUpdateThrottlerConfig,
RunE: commandUpdateThrottlerConfig,
}
CheckThrottler = &cobra.Command{
Use: "CheckThrottler [--app-name <name>] <tablet alias>",
Short: "Issue a throttler check on the given tablet.",
Example: "CheckThrottler --app-name online-ddl zone1-0000000101",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
RunE: commandCheckThrottler,
}

GetThrottlerStatus = &cobra.Command{
Use: "GetThrottlerStatus <tablet alias>",
Short: "Get the throttler status for the given tablet.",
Example: "GetThrottlerStatus zone1-0000000101",
DisableFlagsInUseLine: true,
Args: cobra.ExactArgs(1),
RunE: commandGetThrottlerStatus,
}
)

var (
updateThrottlerConfigOptions vtctldatapb.UpdateThrottlerConfigRequest
throttledAppRule topodatapb.ThrottledAppRule
unthrottledAppRule topodatapb.ThrottledAppRule
throttledAppDuration time.Duration

checkThrottlerOptions vtctldatapb.CheckThrottlerRequest
requestHeartbeats bool
)

func validateUpdateThrottlerConfig(cmd *cobra.Command, args []string) error {
if updateThrottlerConfigOptions.MetricName != "" && !cmd.Flags().Changed("threshold") {
return fmt.Errorf("--metric-name flag requires --threshold flag. Set threshold to 0 to disable the metric threshold configuration")
}
if cmd.Flags().Changed("app-name") && updateThrottlerConfigOptions.AppName == "" {
return fmt.Errorf("--app-name must not be empty")
}

return nil
}

func commandUpdateThrottlerConfig(cmd *cobra.Command, args []string) error {
keyspace := cmd.Flags().Arg(0)
cli.FinishedParsing(cmd)

if throttledAppRule.Name != "" && unthrottledAppRule.Name != "" {
return fmt.Errorf("throttle-app and unthrottle-app are mutually exclusive")
}

updateThrottlerConfigOptions.CustomQuerySet = cmd.Flags().Changed("custom-query")
updateThrottlerConfigOptions.Keyspace = keyspace

if throttledAppRule.Name != "" {
throttledAppRule.ExpiresAt = protoutil.TimeToProto(time.Now().Add(throttledAppDuration))
updateThrottlerConfigOptions.ThrottledApp = &throttledAppRule
} else if unthrottledAppRule.Name != "" {
unthrottledAppRule.ExpiresAt = protoutil.TimeToProto(time.Now())
unthrottledAppRule.ExpiresAt = &vttime.Time{} // zero
updateThrottlerConfigOptions.ThrottledApp = &unthrottledAppRule
}

Expand All @@ -74,9 +106,67 @@ func commandUpdateThrottlerConfig(cmd *cobra.Command, args []string) error {
return nil
}

func commandCheckThrottler(cmd *cobra.Command, args []string) error {
alias, err := topoproto.ParseTabletAlias(cmd.Flags().Arg(0))
if err != nil {
return err
}

cli.FinishedParsing(cmd)
if _, err := base.ScopeFromString(checkThrottlerOptions.Scope); err != nil {
return err
}
resp, err := client.CheckThrottler(commandCtx, &vtctldatapb.CheckThrottlerRequest{
TabletAlias: alias,
AppName: checkThrottlerOptions.AppName,
Scope: checkThrottlerOptions.Scope,
SkipRequestHeartbeats: !requestHeartbeats,
OkIfNotExists: checkThrottlerOptions.OkIfNotExists,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp)
if err != nil {
return err
}

fmt.Printf("%s\n", data)

return nil
}

func commandGetThrottlerStatus(cmd *cobra.Command, args []string) error {
alias, err := topoproto.ParseTabletAlias(cmd.Flags().Arg(0))
if err != nil {
return err
}

cli.FinishedParsing(cmd)

resp, err := client.GetThrottlerStatus(commandCtx, &vtctldatapb.GetThrottlerStatusRequest{
TabletAlias: alias,
})
if err != nil {
return err
}

data, err := cli.MarshalJSON(resp)
if err != nil {
return err
}

fmt.Printf("%s\n", data)

return nil
}

func init() {
// UpdateThrottlerConfig
UpdateThrottlerConfig.Flags().BoolVar(&updateThrottlerConfigOptions.Enable, "enable", false, "Enable the throttler")
UpdateThrottlerConfig.Flags().BoolVar(&updateThrottlerConfigOptions.Disable, "disable", false, "Disable the throttler")
UpdateThrottlerConfig.Flags().StringVar(&updateThrottlerConfigOptions.MetricName, "metric-name", "", "name of the metric for which we apply a new threshold (requires --threshold). If empty, the default (either 'lag' or 'custom') metric is used.")
UpdateThrottlerConfig.Flags().Float64Var(&updateThrottlerConfigOptions.Threshold, "threshold", 0, "threshold for the either default check (replication lag seconds) or custom check")
UpdateThrottlerConfig.Flags().StringVar(&updateThrottlerConfigOptions.CustomQuery, "custom-query", "", "custom throttler check query")
UpdateThrottlerConfig.Flags().BoolVar(&updateThrottlerConfigOptions.CheckAsCheckSelf, "check-as-check-self", false, "/throttler/check requests behave as is /throttler/check-self was called")
Expand All @@ -87,6 +177,19 @@ func init() {
UpdateThrottlerConfig.Flags().Float64Var(&throttledAppRule.Ratio, "throttle-app-ratio", throttle.DefaultThrottleRatio, "ratio to throttle app (app specififed in --throttled-app)")
UpdateThrottlerConfig.Flags().DurationVar(&throttledAppDuration, "throttle-app-duration", throttle.DefaultAppThrottleDuration, "duration after which throttled app rule expires (app specififed in --throttled-app)")
UpdateThrottlerConfig.Flags().BoolVar(&throttledAppRule.Exempt, "throttle-app-exempt", throttledAppRule.Exempt, "exempt this app from being at all throttled. WARNING: use with extreme care, as this is likely to push metrics beyond the throttler's threshold, and starve other apps")
UpdateThrottlerConfig.Flags().StringVar(&updateThrottlerConfigOptions.AppName, "app-name", "", "app name for which to assign metrics (requires --app-metrics)")
UpdateThrottlerConfig.Flags().StringSliceVar(&updateThrottlerConfigOptions.AppCheckedMetrics, "app-metrics", nil, "metrics to be used when checking the throttler for the app (requires --app-name). Empty to restore to default metrics")
UpdateThrottlerConfig.MarkFlagsMutuallyExclusive("unthrottle-app", "throttle-app")
UpdateThrottlerConfig.MarkFlagsRequiredTogether("app-name", "app-metrics")

Root.AddCommand(UpdateThrottlerConfig)
// Check Throttler
CheckThrottler.Flags().StringVar(&checkThrottlerOptions.AppName, "app-name", throttlerapp.VitessName.String(), "app name to check")
CheckThrottler.Flags().StringVar(&checkThrottlerOptions.Scope, "scope", base.UndefinedScope.String(), "check scope ('shard', 'self' or leave empty for per-metric defaults)")
CheckThrottler.Flags().BoolVar(&requestHeartbeats, "request-heartbeats", false, "request heartbeat lease")
CheckThrottler.Flags().BoolVar(&checkThrottlerOptions.OkIfNotExists, "ok-if-not-exists", false, "return OK even if metric does not exist")
Root.AddCommand(CheckThrottler)

// GetThrottlerStatus
Root.AddCommand(GetThrottlerStatus)
}
2 changes: 2 additions & 0 deletions go/flags/endtoend/vtctldclient.txt
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Available Commands:
Backup Uses the BackupStorage service on the given tablet to create and store a new backup.
BackupShard Finds the most up-to-date REPLICA, RDONLY, or SPARE tablet in the given shard and uses the BackupStorage service on that tablet to create and store a new backup.
ChangeTabletType Changes the db type for the specified tablet, if possible.
CheckThrottler Issue a throttler check on the given tablet.
CreateKeyspace Creates the specified keyspace in the topology.
CreateShard Creates the specified shard in the topology.
DeleteCellInfo Deletes the CellInfo for the provided cell.
Expand Down Expand Up @@ -56,6 +57,7 @@ Available Commands:
GetTablet Outputs a JSON structure that contains information about the tablet.
GetTabletVersion Print the version of a tablet from its debug vars.
GetTablets Looks up tablets according to filter criteria.
GetThrottlerStatus Get the throttler status for the given tablet.
GetTopologyPath Gets the value associated with the particular path (key) in the topology server.
GetVSchema Prints a JSON representation of a keyspace's topo record.
GetWorkflows Gets all vreplication workflows (Reshard, MoveTables, etc) in the given keyspace.
Expand Down
4 changes: 2 additions & 2 deletions go/test/endtoend/cluster/vtctldclient_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,13 +66,13 @@ func (vtctldclient *VtctldClientProcess) ExecuteCommandWithOutput(args ...string
pArgs = append(pArgs, "--test.coverprofile="+getCoveragePath("vtctldclient-"+args[0]+".out"), "--test.v")
}
pArgs = append(pArgs, args...)
for i := 1; i <= retries; i++ {
for i := range retries {
tmpProcess := exec.Command(
vtctldclient.Binary,
filterDoubleDashArgs(pArgs, vtctldclient.VtctldClientMajorVersion)...,
)
msg := binlogplayer.LimitString(strings.Join(tmpProcess.Args, " "), 256) // limit log line length
log.Infof("Executing vtctldclient with command: %v (attempt %d of %d)", msg, i, retries)
log.Infof("Executing vtctldclient with command: %v (attempt %d of %d)", msg, i+1, retries)
resultByte, err = tmpProcess.CombinedOutput()
resultStr = string(resultByte)
if err == nil || !shouldRetry(resultStr) {
Expand Down
30 changes: 12 additions & 18 deletions go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,7 +222,7 @@ func TestSchemaChange(t *testing.T) {
shards = clusterInstance.Keyspaces[0].Shards
require.Equal(t, 1, len(shards))

throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance, time.Second)
throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance)

t.Run("flow", func(t *testing.T) {
t.Run("create schema", func(t *testing.T) {
Expand Down Expand Up @@ -283,24 +283,18 @@ func TestSchemaChange(t *testing.T) {
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusRunning)
})
t.Run("throttle online-ddl", func(t *testing.T) {
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)
onlineddl.ThrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)

for _, tab := range tablets {
body, err := throttleApp(tab.VttabletProcess, throttlerapp.OnlineDDLName)
assert.NoError(t, err)
assert.Contains(t, body, throttlerapp.OnlineDDLName)
}
waitForThrottleCheckStatus(t, throttlerapp.OnlineDDLName, primaryTablet, http.StatusExpectationFailed)
})
t.Run("unthrottle online-ddl", func(t *testing.T) {
onlineddl.UnthrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)

for _, tab := range tablets {
body, err := unthrottleApp(tab.VttabletProcess, throttlerapp.OnlineDDLName)
if !onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false) {
status, err := throttler.GetThrottlerStatus(&clusterInstance.VtctldClientProcess, primaryTablet)
assert.NoError(t, err)
assert.Contains(t, body, throttlerapp.OnlineDDLName)

t.Logf("Throttler status: %+v", status)
}
waitForThrottleCheckStatus(t, throttlerapp.OnlineDDLName, primaryTablet, http.StatusOK)
})
Expand Down Expand Up @@ -341,7 +335,7 @@ func TestSchemaChange(t *testing.T) {
t.Run("optimistic wait for migration completion", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete)
isComplete = (status == schema.OnlineDDLStatusComplete)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
t.Logf("# Migration status (for debug purposes): <%s>", status)
})
if !isComplete {
t.Run("force complete cut-over", func(t *testing.T) {
Expand All @@ -350,7 +344,7 @@ func TestSchemaChange(t *testing.T) {
t.Run("another optimistic wait for migration completion", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete)
isComplete = (status == schema.OnlineDDLStatusComplete)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
t.Logf("# Migration status (for debug purposes): <%s>", status)
})
}
if !isComplete {
Expand All @@ -364,7 +358,7 @@ func TestSchemaChange(t *testing.T) {
}
t.Run("wait for migration completion", func(t *testing.T) {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
t.Logf("# Migration status (for debug purposes): <%s>", status)
onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete)
})
t.Run("validate table schema", func(t *testing.T) {
Expand Down Expand Up @@ -394,15 +388,15 @@ func testOnlineDDLStatement(t *testing.T, alterStatement string, ddlStrategy str
uuid = row.AsString("uuid", "")
uuid = strings.TrimSpace(uuid)
require.NotEmpty(t, uuid)
fmt.Println("# Generated UUID (for debug purposes):")
fmt.Printf("<%s>\n", uuid)
t.Logf("# Generated UUID (for debug purposes):")
t.Logf("<%s>", uuid)

strategySetting, err := schema.ParseDDLStrategy(ddlStrategy)
assert.NoError(t, err)

if !strategySetting.Strategy.IsDirect() && !skipWait && uuid != "" {
status := onlineddl.WaitForMigrationStatus(t, &vtParams, shards, uuid, migrationWaitTimeout, schema.OnlineDDLStatusComplete, schema.OnlineDDLStatusFailed)
fmt.Printf("# Migration status (for debug purposes): <%s>\n", status)
t.Logf("# Migration status (for debug purposes): <%s>", status)
}

if expectHint != "" {
Expand Down
Loading

0 comments on commit cd0c2b5

Please sign in to comment.