Skip to content

Commit

Permalink
TxThrottler support for transactions outside BEGIN/COMMIT (vitessio#1…
Browse files Browse the repository at this point in the history
…3040)

* TxThrottler support for transactions outside BEGIN/COMMIT

This change allows the TxThrottler to throttle requests sent outside of
explicit transactions (i.e. explicit BEGIN/COMMIT blocks) when configured to do
so via a new config flag. Otherwise, it preserves the current/default behavior
of only throttling transactions inside BEGIN/COMMIT.

In addition, when this flag is passed, and because the call to throttle is done
in a context in which the execution plan is already known, this change uses the
plan type to make sure that throttling is triggered only when the query being
executed is INSERT/UPDATE/DELETE/LOAD, so that SELECTs and others no longer get
throttled unnecessarily, as they do not contribute to increasing replication
lag, which is what the TxThrottler aims at controlling.

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>

* Fix e2e flag tests & TxThrottler unit test

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>

* Throttle auto-commit statements in QueryExecutor instead of TxPool

This changes where we call the transaction throttler:

1. Statements in `BEGIN/COMMIT` blocks keep being throttled in
   `TabletServer.begin()`.
2. Additionally, throttling is added in QueryExecutor.execAutocommit() and
   `QueryExecutor.execAsTransaction()`.

We also change things so that throttling in this new case is not opt-in via
configuration flag but instead is the new and only behavior.

Finally, we remove some previously added changes to example scripts that had
been added with the intention of testing and are not part of the PR.

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>

* Adds test cases for QueryExecutor.Execute() with TxThrottle throttling

To make unit testing simple here, we separated the interface and implementation
of the TxThrottle, and simply used a mock implementation of the interface in
the tests.

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>

* Add note on new TxThrottler behavior in v17 changelog

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>

* Fix PR number in changelog entry for TxThrottler behavior change.

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>

* Make linter happy

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>

* Address PR comments

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>

---------

Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
ejortegau authored and timvaillancourt committed Apr 17, 2024
1 parent fb4d7ea commit 28dbd9e
Show file tree
Hide file tree
Showing 5 changed files with 140 additions and 60 deletions.
9 changes: 9 additions & 0 deletions go/vt/vttablet/tabletserver/query_executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,8 @@ var streamResultPool = sync.Pool{New: func() any {
}
}}

var errTxThrottled = vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")

func returnStreamResult(result *sqltypes.Result) error {
// only return large results slices to the pool
if cap(result.Rows) >= streamRowsSize {
Expand Down Expand Up @@ -203,6 +205,10 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}
qre.options.TransactionIsolation = querypb.ExecuteOptions_AUTOCOMMIT

if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
return nil, errTxThrottled
}

conn, _, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)

if err != nil {
Expand All @@ -214,6 +220,9 @@ func (qre *QueryExecutor) execAutocommit(f func(conn *StatefulConnection) (*sqlt
}

func (qre *QueryExecutor) execAsTransaction(f func(conn *StatefulConnection) (*sqltypes.Result, error)) (*sqltypes.Result, error) {
if qre.tsv.txThrottler.Throttle(qre.tsv.getPriorityFromOptions(qre.options)) {
return nil, errTxThrottled
}
conn, beginSQL, _, err := qre.tsv.te.txPool.Begin(qre.ctx, qre.options, false, 0, nil, qre.setting)
if err != nil {
return nil, err
Expand Down
98 changes: 74 additions & 24 deletions go/vt/vttablet/tabletserver/query_executor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,17 +17,14 @@ limitations under the License.
package tabletserver

import (
"context"
"fmt"
"io"
"math/rand"
"reflect"
"strings"
"testing"

"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"

"context"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

Expand All @@ -44,6 +41,8 @@ import (
"vitess.io/vitess/go/vt/vttablet/tabletserver/planbuilder"
"vitess.io/vitess/go/vt/vttablet/tabletserver/rules"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tabletenv"
"vitess.io/vitess/go/vt/vttablet/tabletserver/tx"
"vitess.io/vitess/go/vt/vttablet/tabletserver/txthrottler"

querypb "vitess.io/vitess/go/vt/proto/query"
tableaclpb "vitess.io/vitess/go/vt/proto/tableacl"
Expand Down Expand Up @@ -81,6 +80,10 @@ func TestQueryExecutorPlans(t *testing.T) {
// inTxWant is the query log we expect if we're in a transation.
// If empty, then we should expect the same as logWant.
inTxWant string
// errorWant is the error we expect to get, if any, and should be nil if no error should be returned
errorWant error
// TxThrottler allows the test case to override the transaction throttler
txThrottler txthrottler.TxThrottler
}{{
input: "select * from t",
dbResponses: []dbResponse{{
Expand Down Expand Up @@ -267,7 +270,25 @@ func TestQueryExecutorPlans(t *testing.T) {
resultWant: emptyResult,
planWant: "Show",
logWant: "show create table mysql.`user`",
}}
}, {
input: "update test_table set a=1",
dbResponses: []dbResponse{{
query: "update test_table set a = 1 limit 10001",
result: dmlResult,
}},
errorWant: errTxThrottled,
txThrottler: &mockTxThrottler{true},
}, {
input: "update test_table set a=1",
passThrough: true,
dbResponses: []dbResponse{{
query: "update test_table set a = 1 limit 10001",
result: dmlResult,
}},
errorWant: errTxThrottled,
txThrottler: &mockTxThrottler{true},
},
}
for _, tcase := range testcases {
t.Run(tcase.input, func(t *testing.T) {
db := setUpQueryExecutorTest(t)
Expand All @@ -277,6 +298,9 @@ func TestQueryExecutorPlans(t *testing.T) {
}
ctx := context.Background()
tsv := newTestTabletServer(ctx, noFlags, db)
if tcase.txThrottler != nil {
tsv.txThrottler = tcase.txThrottler
}
tsv.config.DB.DBName = "ks"
defer tsv.StopService()

Expand All @@ -285,32 +309,39 @@ func TestQueryExecutorPlans(t *testing.T) {
// Test outside a transaction.
qre := newTestQueryExecutor(ctx, tsv, tcase.input, 0)
got, err := qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)

if tcase.errorWant == nil {
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, tcase.input)
assert.Equal(t, tcase.logWant, qre.logStats.RewrittenSQL(), tcase.input)
} else {
assert.True(t, vterrors.Equals(err, tcase.errorWant))
}
// Wait for the existing query to be processed by the cache
tsv.QueryPlanCacheWait()

// Test inside a transaction.
target := tsv.sm.Target()
state, err := tsv.Begin(ctx, target, nil)
require.NoError(t, err)
require.NotNil(t, state.TabletAlias, "alias should not be nil")
assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
defer tsv.Commit(ctx, target, state.TransactionID)

qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
got, err = qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
want := tcase.logWant
if tcase.inTxWant != "" {
want = tcase.inTxWant
if tcase.errorWant == nil {
require.NoError(t, err)
require.NotNil(t, state.TabletAlias, "alias should not be nil")
assert.Equal(t, tsv.alias, state.TabletAlias, "Wrong alias returned by Begin")
defer tsv.Commit(ctx, target, state.TransactionID)

qre = newTestQueryExecutor(ctx, tsv, tcase.input, state.TransactionID)
got, err = qre.Execute()
require.NoError(t, err, tcase.input)
assert.Equal(t, tcase.resultWant, got, "in tx: %v", tcase.input)
assert.Equal(t, tcase.planWant, qre.logStats.PlanType, "in tx: %v", tcase.input)
want := tcase.logWant
if tcase.inTxWant != "" {
want = tcase.inTxWant
}
assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
} else {
assert.True(t, vterrors.Equals(err, tcase.errorWant))
}
assert.Equal(t, want, qre.logStats.RewrittenSQL(), "in tx: %v", tcase.input)
})
}
}
Expand Down Expand Up @@ -1540,3 +1571,22 @@ func addQueryExecutorSupportedQueries(db *fakesqldb.DB) {
}},
})
}

type mockTxThrottler struct {
throttle bool
}

func (m mockTxThrottler) InitDBConfig(target *querypb.Target) {
panic("implement me")
}

func (m mockTxThrottler) Open() (err error) {
return nil
}

func (m mockTxThrottler) Close() {
}

func (m mockTxThrottler) Throttle(priority int) (result bool) {
return m.throttle
}
44 changes: 27 additions & 17 deletions go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ type TabletServer struct {
tracker *schema.Tracker
watcher *BinlogWatcher
qe *QueryEngine
txThrottler *txthrottler.TxThrottler
txThrottler txthrottler.TxThrottler
te *TxEngine
messager *messager.Engine
hs *healthStreamer
Expand Down Expand Up @@ -488,22 +488,8 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
target, options, false, /* allowOnShutdown */
func(ctx context.Context, logStats *tabletenv.LogStats) error {
startTime := time.Now()
priority := tsv.config.TxThrottlerDefaultPriority
if options != nil && options.Priority != "" {
optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)
} else {
priority = optionsPriority
}
}
if tsv.txThrottler.Throttle(priority) {
return vterrors.Errorf(vtrpcpb.Code_RESOURCE_EXHAUSTED, "Transaction throttled")
if tsv.txThrottler.Throttle(tsv.getPriorityFromOptions(options)) {
return errTxThrottled
}
var connSetting *pools.Setting
if len(settings) > 0 {
Expand Down Expand Up @@ -534,6 +520,30 @@ func (tsv *TabletServer) begin(ctx context.Context, target *querypb.Target, save
return state, err
}

func (tsv *TabletServer) getPriorityFromOptions(options *querypb.ExecuteOptions) int {
priority := tsv.config.TxThrottlerDefaultPriority
if options == nil {
return priority
}
if options.Priority == "" {
return priority
}

optionsPriority, err := strconv.Atoi(options.Priority)
// This should never error out, as the value for Priority has been validated in the vtgate already.
// Still, handle it just to make sure.
if err != nil {
log.Errorf(
"The value of the %s query directive could not be converted to integer, using the "+
"default value. Error was: %s",
sqlparser.DirectivePriority, priority, err)

return priority
}

return optionsPriority
}

// Commit commits the specified transaction.
func (tsv *TabletServer) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (newReservedID int64, err error) {
err = tsv.execRequest(
Expand Down
41 changes: 25 additions & 16 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,14 @@ func resetTxThrottlerFactories() {
}
}

// TxThrottler defines the interface for the transaction throttler.
type TxThrottler interface {
InitDBConfig(target *querypb.Target)
Open() (err error)
Close()
Throttle(priority int) (result bool)
}

func init() {
resetTxThrottlerFactories()
}
Expand Down Expand Up @@ -95,7 +103,7 @@ type TopologyWatcherInterface interface {
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// TxThrottler throttles transactions based on replication lag.
// txThrottler implements TxThrottle for throttling transactions based on replication lag.
// It's a thin wrapper around the throttler found in vitess/go/vt/throttler.
// It uses a discovery.HealthCheck to send replication-lag updates to the wrapped throttler.
//
Expand All @@ -119,10 +127,10 @@ const TxThrottlerName = "TransactionThrottler"
// // To release the resources used by the throttler the caller should call Close().
// t.Close()
//
// A TxThrottler object is generally not thread-safe: at any given time at most one goroutine should
// A txThrottler object is generally not thread-safe: at any given time at most one goroutine should
// be executing a method. The only exception is the 'Throttle' method where multiple goroutines are
// allowed to execute it concurrently.
type TxThrottler struct {
type txThrottler struct {
// config stores the transaction throttler's configuration.
// It is populated in NewTxThrottler and is not modified
// since.
Expand Down Expand Up @@ -172,12 +180,12 @@ type txThrottlerState struct {
topologyWatchers []TopologyWatcherInterface
}

// NewTxThrottler tries to construct a TxThrottler from the
// NewTxThrottler tries to construct a txThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
// This function calls tryCreateTxThrottler that does the actual creation work
// and returns an error if one occurred.
func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) TxThrottler {
txThrottler, err := tryCreateTxThrottler(env, topoServer)
if err != nil {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
Expand All @@ -191,11 +199,11 @@ func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
}

// InitDBConfig initializes the target parameters for the throttler.
func (t *TxThrottler) InitDBConfig(target *querypb.Target) {
func (t *txThrottler) InitDBConfig(target *querypb.Target) {
t.target = proto.Clone(target).(*querypb.Target)
}

func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrottler, error) {
func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*txThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, topoServer, &txThrottlerConfig{enabled: false})
}
Expand All @@ -218,7 +226,7 @@ func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrott
})
}

func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*TxThrottler, error) {
func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrottlerConfig) (*txThrottler, error) {
if config.enabled {
// Verify config.
err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify()
Expand All @@ -229,7 +237,7 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott
return nil, fmt.Errorf("empty healthCheckCells given. %+v", config)
}
}
return &TxThrottler{
return &txThrottler{
config: config,
topoServer: topoServer,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
Expand All @@ -239,23 +247,23 @@ func newTxThrottler(env tabletenv.Env, topoServer *topo.Server, config *txThrott
}

// Open opens the transaction throttler. It must be called prior to 'Throttle'.
func (t *TxThrottler) Open() (err error) {
func (t *txThrottler) Open() (err error) {
if !t.config.enabled {
return nil
}
if t.state != nil {
return nil
}
log.Info("TxThrottler: opening")
log.Info("txThrottler: opening")
t.throttlerRunning.Set(1)
t.state, err = newTxThrottlerState(t.topoServer, t.config, t.target)
return err
}

// Close closes the TxThrottler object and releases resources.
// Close closes the txThrottler object and releases resources.
// It should be called after the throttler is no longer needed.
// It's ok to call this method on a closed throttler--in which case the method does nothing.
func (t *TxThrottler) Close() {
func (t *txThrottler) Close() {
if !t.config.enabled {
return
}
Expand All @@ -265,24 +273,25 @@ func (t *TxThrottler) Close() {
t.state.deallocateResources()
t.state = nil
t.throttlerRunning.Set(0)
log.Info("TxThrottler: closed")
log.Info("txThrottler: closed")
}

// Throttle should be called before a new transaction is started.
// It returns true if the transaction should not proceed (the caller
// should back off). Throttle requires that Open() was previously called
// successfully.
func (t *TxThrottler) Throttle(priority int) (result bool) {
func (t *txThrottler) Throttle(priority int) (result bool) {
if !t.config.enabled {
return false
}
if t.state == nil {
return false
}

// Throttle according to both what the throttle state says, and the priority. Workloads with lower priority value
// Throttle according to both what the throttler state says and the priority. Workloads with lower priority value
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority

t.requestsTotal.Add(1)
if result {
t.requestsThrottled.Add(1)
Expand Down
Loading

0 comments on commit 28dbd9e

Please sign in to comment.