From 28dbd9ef12c7088f3d9041270a2b9a7b8154b897 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U" <5791035+ejortegau@users.noreply.github.com> Date: Wed, 17 May 2023 15:48:45 +0200 Subject: [PATCH] TxThrottler support for transactions outside BEGIN/COMMIT (#13040) * TxThrottler support for transactions outside BEGIN/COMMIT This change allows the TxThrottler to throttle requests sent outside of explicit transactions (i.e. explicit BEGIN/COMMIT blocks) when configured to do so via a new config flag. Otherwise, it preserves the current/default behavior of only throttling transactions inside BEGIN/COMMIT. In addition, when this flag is passed, and because the call to throttle is done in a context in which the execution plan is already known, this change uses the plan type to make sure that throttling is triggered only when the query being executed is INSERT/UPDATE/DELETE/LOAD, so that SELECTs and others no longer get throttled unnecessarily, as they do not contribute to increasing replication lag, which is what the TxThrottler aims at controlling. Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Fix e2e flag tests & TxThrottler unit test Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Throttle auto-commit statements in QueryExecutor instead of TxPool This changes where we call the transaction throttler: 1. Statements in `BEGIN/COMMIT` blocks keep being throttled in `TabletServer.begin()`. 2. Additionally, throttling is added in QueryExecutor.execAutocommit() and `QueryExecutor.execAsTransaction()`. We also change things so that throttling in this new case is not opt-in via configuration flag but instead is the new and only behavior. Finally, we remove some previously added changes to example scripts that had been added with the intention of testing and are not part of the PR. Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Adds test cases for QueryExecutor.Execute() with TxThrottle throttling To make unit testing simple here, we separated the interface and implementation of the TxThrottle, and simply used a mock implementation of the interface in the tests. Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Add note on new TxThrottler behavior in v17 changelog Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Fix PR number in changelog entry for TxThrottler behavior change. Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Make linter happy Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> * Address PR comments Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> --------- Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> --- go/vt/vttablet/tabletserver/query_executor.go | 9 ++ .../tabletserver/query_executor_test.go | 98 ++++++++++++++----- go/vt/vttablet/tabletserver/tabletserver.go | 44 +++++---- .../tabletserver/txthrottler/tx_throttler.go | 41 +++++--- .../txthrottler/tx_throttler_test.go | 8 +- 5 files changed, 140 insertions(+), 60 deletions(-) diff --git a/go/vt/vttablet/tabletserver/query_executor.go b/go/vt/vttablet/tabletserver/query_executor.go index 0a9afe77a5e..2fc4b289828 100644 --- a/go/vt/vttablet/tabletserver/query_executor.go +++ b/go/vt/vttablet/tabletserver/query_executor.go @@ -76,6 +76,8 @@ var streamResultPool = sync.Pool{New: func() any { } }} +var errTxThrottled = vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled") + func returnStreamResult(result *sqltypes.Result) error { // only return large results slices to the pool if cap(result.Rows) >= streamRowsSize { @@ -203,6 +205,10 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt } qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT + if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) { + return nil, errTxThrottled + } + conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting) if err != nil { @@ -214,6 +220,9 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt } func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) { + if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) { + return nil, errTxThrottled + } conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting) if err != nil { return nil, err diff --git a/go/vt/vttablet/tabletserver/query_executor_test.go b/go/vt/vttablet/tabletserver/query_executor_test.go index 526419c4f49..4131a97b039 100644 --- a/go/vt/vttablet/tabletserver/query_executor_test.go +++ b/go/vt/vttablet/tabletserver/query_executor_test.go @@ -17,6 +17,7 @@ limitations under the License. package tabletserver import ( + "context" "fmt" "io" "math/rand" @@ -24,10 +25,6 @@ import ( "strings" "testing" - "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" - - "context" - "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -44,6 +41,8 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder" "vitess.io/vitess/go/vt/vttablet/tabletserver/rules" "vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv" + "vitess.io/vitess/go/vt/vttablet/tabletserver/tx" + "vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler" querypb "vitess.io/vitess/go/vt/proto/query" tableaclpb "vitess.io/vitess/go/vt/proto/tableacl" @@ -81,6 +80,10 @@ func TestQueryExecutorPlans(t *testing.T) { // inTxWant is the query log we expect if we're in a transation. // If empty, then we should expect the same as logWant. inTxWant string + // errorWant is the error we expect to get, if any, and should be nil if no error should be returned + errorWant error + // TxThrottler allows the test case to override the transaction throttler + txThrottler txthrottler.TxThrottler }{{ input: "select * from t", dbResponses: []dbResponse{{ @@ -267,7 +270,25 @@ func TestQueryExecutorPlans(t *testing.T) { resultWant: emptyResult, planWant: "Show", logWant: "show create table mysql.`user`", - }} + }, { + input: "update test_table set a=1", + dbResponses: []dbResponse{{ + query: "update test_table set a = 1 limit 10001", + result: dmlResult, + }}, + errorWant: errTxThrottled, + txThrottler: &mockTxThrottler{true}, + }, { + input: "update test_table set a=1", + passThrough: true, + dbResponses: []dbResponse{{ + query: "update test_table set a = 1 limit 10001", + result: dmlResult, + }}, + errorWant: errTxThrottled, + txThrottler: &mockTxThrottler{true}, + }, + } for _, tcase := range testcases { t.Run(tcase.input, func(t *testing.T) { db := setUpQueryExecutorTest(t) @@ -277,6 +298,9 @@ func TestQueryExecutorPlans(t *testing.T) { } ctx := context.Background() tsv := newTestTabletServer(ctx, noFlags, db) + if tcase.txThrottler != nil { + tsv.txThrottler = tcase.txThrottler + } tsv.config.DB.DBName = "ks" defer tsv.StopService() @@ -285,32 +309,39 @@ func TestQueryExecutorPlans(t *testing.T) { // Test outside a transaction. qre := newTestQueryExecutor(ctx, tsv, tcase.input, 0) got, err := qre.Execute() - require.NoError(t, err, tcase.input) - assert.Equal(t, tcase.resultWant, got, tcase.input) - assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input) - assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input) - + if tcase.errorWant == nil { + require.NoError(t, err, tcase.input) + assert.Equal(t, tcase.resultWant, got, tcase.input) + assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input) + assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input) + } else { + assert.True(t, vterrors.Equals(err, tcase.errorWant)) + } // Wait for the existing query to be processed by the cache tsv.QueryPlanCacheWait() // Test inside a transaction. target := tsv.sm.Target() state, err := tsv.Begin(ctx, target, nil) - require.NoError(t, err) - require.NotNil(t, state.TabletAlias, "alias should not be nil") - assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin") - defer tsv.Commit(ctx, target, state.TransactionID) - - qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID) - got, err = qre.Execute() - require.NoError(t, err, tcase.input) - assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input) - assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input) - want := tcase.logWant - if tcase.inTxWant != "" { - want = tcase.inTxWant + if tcase.errorWant == nil { + require.NoError(t, err) + require.NotNil(t, state.TabletAlias, "alias should not be nil") + assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin") + defer tsv.Commit(ctx, target, state.TransactionID) + + qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID) + got, err = qre.Execute() + require.NoError(t, err, tcase.input) + assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input) + assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input) + want := tcase.logWant + if tcase.inTxWant != "" { + want = tcase.inTxWant + } + assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input) + } else { + assert.True(t, vterrors.Equals(err, tcase.errorWant)) } - assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input) }) } } @@ -1540,3 +1571,22 @@ func addQueryExecutorSupportedQueries(db *fakesqldb.DB) { }}, }) } + +type mockTxThrottler struct { + throttle bool +} + +func (m mockTxThrottler) InitDBConfig(target *querypb.Target) { + panic("implement me") +} + +func (m mockTxThrottler) Open() (err error) { + return nil +} + +func (m mockTxThrottler) Close() { +} + +func (m mockTxThrottler) Throttle(priority int) (result bool) { + return m.throttle +} diff --git a/go/vt/vttablet/tabletserver/tabletserver.go b/go/vt/vttablet/tabletserver/tabletserver.go index 5a9de5be0dc..efb5fd5add4 100644 --- a/go/vt/vttablet/tabletserver/tabletserver.go +++ b/go/vt/vttablet/tabletserver/tabletserver.go @@ -112,7 +112,7 @@ type TabletServer struct { tracker *schema.Tracker watcher *BinlogWatcher qe *QueryEngine - txThrottler *txthrottler.TxThrottler + txThrottler txthrottler.TxThrottler te *TxEngine messager *messager.Engine hs *healthStreamer @@ -488,22 +488,8 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save target, options, false, /* allowOnShutdown */ func(ctx context.Context, logStats *tabletenv.LogStats) error { startTime := time.Now() - priority := tsv.config.TxThrottlerDefaultPriority - if options != nil && options.Priority != "" { - optionsPriority, err := strconv.Atoi(options.Priority) - // This should never error out, as the value for Priority has been validated in the vtgate already. - // Still, handle it just to make sure. - if err != nil { - log.Errorf( - "The value of the %s query directive could not be converted to integer, using the "+ - "default value. Error was: %s", - sqlparser.DirectivePriority, priority, err) - } else { - priority = optionsPriority - } - } - if tsv.txThrottler.Throttle(priority) { - return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled") + if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) { + return errTxThrottled } var connSetting *pools.Setting if len(settings) > 0 { @@ -534,6 +520,30 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save return state, err } +func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) int { + priority := tsv.config.TxThrottlerDefaultPriority + if options == nil { + return priority + } + if options.Priority == "" { + return priority + } + + optionsPriority, err := strconv.Atoi(options.Priority) + // This should never error out, as the value for Priority has been validated in the vtgate already. + // Still, handle it just to make sure. + if err != nil { + log.Errorf( + "The value of the %s query directive could not be converted to integer, using the "+ + "default value. Error was: %s", + sqlparser.DirectivePriority, priority, err) + + return priority + } + + return optionsPriority +} + // Commit commits the specified transaction. func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) { err = tsv.execRequest( diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 64e7070a228..bc5235593ac 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -65,6 +65,14 @@ func resetTxThrottlerFactories() { } } +// TxThrottler defines the interface for the transaction throttler. +type TxThrottler interface { + InitDBConfig(target *querypb.Target) + Open() (err error) + Close() + Throttle(priority int) (result bool) +} + func init() { resetTxThrottlerFactories() } @@ -95,7 +103,7 @@ type TopologyWatcherInterface interface { // go/vt/throttler.GlobalManager. const TxThrottlerName = "TransactionThrottler" -// TxThrottler throttles transactions based on replication lag. +// txThrottler implements TxThrottle for throttling transactions based on replication lag. // It's a thin wrapper around the throttler found in vitess/go/vt/throttler. // It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler. // @@ -119,10 +127,10 @@ const TxThrottlerName = "TransactionThrottler" // // To release the resources used by the throttler the caller should call Close(). // t.Close() // -// A TxThrottler object is generally not thread-safe: at any given time at most one goroutine should +// A txThrottler object is generally not thread-safe: at any given time at most one goroutine should // be executing a method. The only exception is the 'Throttle' method where multiple goroutines are // allowed to execute it concurrently. -type TxThrottler struct { +type txThrottler struct { // config stores the transaction throttler's configuration. // It is populated in NewTxThrottler and is not modified // since. @@ -172,12 +180,12 @@ type txThrottlerState struct { topologyWatchers []TopologyWatcherInterface } -// NewTxThrottler tries to construct a TxThrottler from the +// NewTxThrottler tries to construct a txThrottler from the // relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if // any error occurs. // This function calls tryCreateTxThrottler that does the actual creation work // and returns an error if one occurred. -func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler { +func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler { txThrottler, err := tryCreateTxThrottler(env, topoServer) if err != nil { log.Errorf("Error creating transaction throttler. Transaction throttling will"+ @@ -191,11 +199,11 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler { } // InitDBConfig initializes the target parameters for the throttler. -func (t *TxThrottler) InitDBConfig(target *querypb.Target) { +func (t *txThrottler) InitDBConfig(target *querypb.Target) { t.target = proto.Clone(target).(*querypb.Target) } -func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrottler, error) { +func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrottler, error) { if !env.Config().EnableTxThrottler { return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false}) } @@ -218,7 +226,7 @@ func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrott }) } -func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*TxThrottler, error) { +func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*txThrottler, error) { if config.enabled { // Verify config. err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify() @@ -229,7 +237,7 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott return nil, fmt.Errorf("empty healthCheckCells given. %+v", config) } } - return &TxThrottler{ + return &txThrottler{ config: config, topoServer: topoServer, throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"), @@ -239,23 +247,23 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott } // Open opens the transaction throttler. It must be called prior to 'Throttle'. -func (t *TxThrottler) Open() (err error) { +func (t *txThrottler) Open() (err error) { if !t.config.enabled { return nil } if t.state != nil { return nil } - log.Info("TxThrottler: opening") + log.Info("txThrottler: opening") t.throttlerRunning.Set(1) t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target) return err } -// Close closes the TxThrottler object and releases resources. +// Close closes the txThrottler object and releases resources. // It should be called after the throttler is no longer needed. // It's ok to call this method on a closed throttler--in which case the method does nothing. -func (t *TxThrottler) Close() { +func (t *txThrottler) Close() { if !t.config.enabled { return } @@ -265,14 +273,14 @@ func (t *TxThrottler) Close() { t.state.deallocateResources() t.state = nil t.throttlerRunning.Set(0) - log.Info("TxThrottler: closed") + log.Info("txThrottler: closed") } // Throttle should be called before a new transaction is started. // It returns true if the transaction should not proceed (the caller // should back off). Throttle requires that Open() was previously called // successfully. -func (t *TxThrottler) Throttle(priority int) (result bool) { +func (t *txThrottler) Throttle(priority int) (result bool) { if !t.config.enabled { return false } @@ -280,9 +288,10 @@ func (t *TxThrottler) Throttle(priority int) (result bool) { return false } - // Throttle according to both what the throttle state says, and the priority. Workloads with lower priority value + // Throttle according to both what the throttler state says and the priority. Workloads with lower priority value // are less likely to be throttled. result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority + t.requestsTotal.Add(1) if result { t.requestsThrottled.Add(1) diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 523b45c6174..97138e3928c 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -51,7 +51,8 @@ func TestDisabledThrottler(t *testing.T) { }) assert.Nil(t, throttler.Open()) assert.False(t, throttler.Throttle(0)) - assert.Zero(t, throttler.throttlerRunning.Get()) + throttlerImpl, _ := throttler.(*txThrottler) + assert.Zero(t, throttlerImpl.throttlerRunning.Get()) throttler.Close() } @@ -101,12 +102,13 @@ func TestEnabledThrottler(t *testing.T) { call4 := mockThrottler.EXPECT().Throttle(0) call4.Return(1 * time.Second) - call6 := mockThrottler.EXPECT().Close() + calllast := mockThrottler.EXPECT().Close() + call1.After(call0) call2.After(call1) call3.After(call2) call4.After(call3) - call6.After(call4) + calllast.After(call4) config := tabletenv.NewDefaultConfig() config.EnableTxThrottler = true