Skip to content

Commit

Permalink
Add basic metrics to vttablet transaction throttler (vitessio#12418)
Browse files Browse the repository at this point in the history
* Add basic stats to vttablet tx throttler

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* test new metrics

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* reorder

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* short names

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Add max rate

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Move NewGaugeFunc to under conditional

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Use env

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Remove env from TxThrottler struct

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Fix tests

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* PR suggestion

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* Fix unit test

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

* reorder test vars

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>

---------

Signed-off-by: Tim Vaillancourt <tim@timvaillancourt.com>
  • Loading branch information
timvaillancourt committed Apr 16, 2024
1 parent e5eb0d1 commit 22945c1
Show file tree
Hide file tree
Showing 3 changed files with 59 additions and 53 deletions.
2 changes: 1 addition & 1 deletion go/vt/vttablet/tabletserver/tabletserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func NewTabletServer(name string, config *tabletenv.TabletConfig, topoServer *to
tsv.tracker = schema.NewTracker(tsv, tsv.vstreamer, tsv.se)
tsv.watcher = NewBinlogWatcher(tsv, tsv.vstreamer, tsv.config)
tsv.qe = NewQueryEngine(tsv, tsv.se)
tsv.txThrottler = txthrottler.NewTxThrottler(tsv.config, topoServer)
tsv.txThrottler = txthrottler.NewTxThrottler(tsv, topoServer)
tsv.te = NewTxEngine(tsv)
tsv.messager = messager.NewEngine(tsv, tsv.se, tsv.vstreamer)

Expand Down
50 changes: 33 additions & 17 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (

"context"

"vitess.io/vitess/go/stats"
"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/log"
"vitess.io/vitess/go/vt/throttler"
Expand Down Expand Up @@ -77,19 +78,28 @@ type TxThrottler struct {
state *txThrottlerState

target *querypb.Target

// stats
throttlerRunning *stats.Gauge
requestsTotal *stats.Counter
requestsThrottled *stats.Counter
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

// NewTxThrottler tries to construct a TxThrottler from the
// relevant fields in the tabletenv.Config object. It returns a disabled TxThrottler if
// any error occurs.
// This function calls tryCreateTxThrottler that does the actual creation work
// and returns an error if one occurred.
func NewTxThrottler(config *tabletenv.TabletConfig, topoServer *topo.Server) *TxThrottler {
txThrottler, err := tryCreateTxThrottler(config, topoServer)
func NewTxThrottler(env tabletenv.Env, topoServer *topo.Server) *TxThrottler {
txThrottler, err := tryCreateTxThrottler(env, topoServer)
if err != nil {
log.Errorf("Error creating transaction throttler. Transaction throttling will"+
" be disabled. Error: %v", err)
txThrottler, err = newTxThrottler(&txThrottlerConfig{enabled: false})
txThrottler, err = newTxThrottler(env, &txThrottlerConfig{enabled: false})
if err != nil {
panic("BUG: Can't create a disabled transaction throttler")
}
Expand All @@ -104,22 +114,22 @@ func (t *TxThrottler) InitDBConfig(target *querypb.Target) {
t.target = proto.Clone(target).(*querypb.Target)
}

func tryCreateTxThrottler(config *tabletenv.TabletConfig, topoServer *topo.Server) (*TxThrottler, error) {
if !config.EnableTxThrottler {
return newTxThrottler(&txThrottlerConfig{enabled: false})
func tryCreateTxThrottler(env tabletenv.Env, topoServer *topo.Server) (*TxThrottler, error) {
if !env.Config().EnableTxThrottler {
return newTxThrottler(env, &txThrottlerConfig{enabled: false})
}

var throttlerConfig throttlerdatapb.Configuration
if err := prototext.Unmarshal([]byte(config.TxThrottlerConfig), &throttlerConfig); err != nil {
if err := prototext.Unmarshal([]byte(env.Config().TxThrottlerConfig), &throttlerConfig); err != nil {
return nil, err
}

// Clone tsv.TxThrottlerHealthCheckCells so that we don't assume tsv.TxThrottlerHealthCheckCells
// is immutable.
healthCheckCells := make([]string, len(config.TxThrottlerHealthCheckCells))
copy(healthCheckCells, config.TxThrottlerHealthCheckCells)
healthCheckCells := make([]string, len(env.Config().TxThrottlerHealthCheckCells))
copy(healthCheckCells, env.Config().TxThrottlerHealthCheckCells)

return newTxThrottler(&txThrottlerConfig{
return newTxThrottler(env, &txThrottlerConfig{
enabled: true,
topoServer: topoServer,
throttlerConfig: &throttlerConfig,
Expand Down Expand Up @@ -205,11 +215,7 @@ func resetTxThrottlerFactories() {
}
}

// TxThrottlerName is the name the wrapped go/vt/throttler object will be registered with
// go/vt/throttler.GlobalManager.
const TxThrottlerName = "TransactionThrottler"

func newTxThrottler(config *txThrottlerConfig) (*TxThrottler, error) {
func newTxThrottler(env tabletenv.Env, config *txThrottlerConfig) (*TxThrottler, error) {
if config.enabled {
// Verify config.
err := throttler.MaxReplicationLagModuleConfig{Configuration: config.throttlerConfig}.Verify()
Expand All @@ -221,7 +227,10 @@ func newTxThrottler(config *txThrottlerConfig) (*TxThrottler, error) {
}
}
return &TxThrottler{
config: config,
config: config,
throttlerRunning: env.Exporter().NewGauge("TransactionThrottlerRunning", "transaction throttler running state"),
requestsTotal: env.Exporter().NewCounter("TransactionThrottlerRequests", "transaction throttler requests"),
requestsThrottled: env.Exporter().NewCounter("TransactionThrottlerThrottled", "transaction throttler requests throttled"),
}, nil
}

Expand All @@ -234,6 +243,7 @@ func (t *TxThrottler) Open() error {
return nil
}
log.Info("TxThrottler: opening")
t.throttlerRunning.Set(1)
var err error
t.state, err = newTxThrottlerState(t.config, t.target.Keyspace, t.target.Shard, t.target.Cell)
return err
Expand All @@ -251,6 +261,7 @@ func (t *TxThrottler) Close() {
}
t.state.deallocateResources()
t.state = nil
t.throttlerRunning.Set(0)
log.Info("TxThrottler: closed")
}

Expand All @@ -265,7 +276,12 @@ func (t *TxThrottler) Throttle() (result bool) {
if t.state == nil {
panic("BUG: Throttle() called on a closed TxThrottler")
}
return t.state.throttle()
result = t.state.throttle()
t.requestsTotal.Add(1)
if result {
t.requestsThrottled.Add(1)
}
return result
}

func newTxThrottlerState(config *txThrottlerConfig, keyspace, shard, cell string) (*txThrottlerState, error) {
Expand Down
60 changes: 25 additions & 35 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ import (
"time"

"github.com/golang/mock/gomock"
"github.com/stretchr/testify/assert"

"vitess.io/vitess/go/vt/discovery"
"vitess.io/vitess/go/vt/topo"
Expand All @@ -39,17 +40,15 @@ import (
func TestDisabledThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = false
throttler := NewTxThrottler(config, nil)
env := tabletenv.NewEnv(config, t.Name())
throttler := NewTxThrottler(env, nil)
throttler.InitDBConfig(&querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
})
if err := throttler.Open(); err != nil {
t.Fatalf("want: nil, got: %v", err)
}
if result := throttler.Throttle(); result != false {
t.Errorf("want: false, got: %v", result)
}
assert.Nil(t, throttler.Open())
assert.False(t, throttler.Throttle())
assert.Zero(t, throttler.throttlerRunning.Get())
throttler.Close()
}

Expand All @@ -70,28 +69,18 @@ func TestEnabledThrottler(t *testing.T) {
}

topologyWatcherFactory = func(topoServer *topo.Server, hc discovery.HealthCheck, cell, keyspace, shard string, refreshInterval time.Duration, topoReadConcurrency int) TopologyWatcherInterface {
if ts != topoServer {
t.Errorf("want: %v, got: %v", ts, topoServer)
}
if cell != "cell1" && cell != "cell2" {
t.Errorf("want: cell1 or cell2, got: %v", cell)
}
if keyspace != "keyspace" {
t.Errorf("want: keyspace, got: %v", keyspace)
}
if shard != "shard" {
t.Errorf("want: shard, got: %v", shard)
}
assert.Equal(t, ts, topoServer)
assert.Contains(t, []string{"cell1", "cell2"}, cell)
assert.Equal(t, "keyspace", keyspace)
assert.Equal(t, "shard", shard)
result := NewMockTopologyWatcherInterface(mockCtrl)
result.EXPECT().Stop()
return result
}

mockThrottler := NewMockThrottlerInterface(mockCtrl)
throttlerFactory = func(name, unit string, threadCount int, maxRate, maxReplicationLag int64) (ThrottlerInterface, error) {
if threadCount != 1 {
t.Errorf("want: 1, got: %v", threadCount)
}
assert.Equal(t, 1, threadCount)
return mockThrottler, nil
}

Expand All @@ -115,21 +104,21 @@ func TestEnabledThrottler(t *testing.T) {
config := tabletenv.NewDefaultConfig()
config.EnableTxThrottler = true
config.TxThrottlerHealthCheckCells = []string{"cell1", "cell2"}
env := tabletenv.NewEnv(config, t.Name())

throttler, err := tryCreateTxThrottler(config, ts)
if err != nil {
t.Fatalf("want: nil, got: %v", err)
}
throttler, err := tryCreateTxThrottler(env, ts)
assert.Nil(t, err)
throttler.InitDBConfig(&querypb.Target{
Keyspace: "keyspace",
Shard: "shard",
})
if err := throttler.Open(); err != nil {
t.Fatalf("want: nil, got: %v", err)
}
if result := throttler.Throttle(); result != false {
t.Errorf("want: false, got: %v", result)
}
assert.Nil(t, throttler.Open())
assert.Equal(t, int64(1), throttler.throttlerRunning.Get())

assert.False(t, throttler.Throttle())
assert.Equal(t, int64(1), throttler.requestsTotal.Get())
assert.Zero(t, throttler.requestsThrottled.Get())

throttler.state.StatsUpdate(tabletStats)
rdonlyTabletStats := &discovery.TabletHealth{
Target: &querypb.Target{
Expand All @@ -139,8 +128,9 @@ func TestEnabledThrottler(t *testing.T) {
// This call should not be forwarded to the go/vt/throttler.Throttler object.
throttler.state.StatsUpdate(rdonlyTabletStats)
// The second throttle call should reject.
if result := throttler.Throttle(); result != true {
t.Errorf("want: true, got: %v", result)
}
assert.True(t, throttler.Throttle())
assert.Equal(t, int64(2), throttler.requestsTotal.Get())
assert.Equal(t, int64(1), throttler.requestsThrottled.Get())
throttler.Close()
assert.Zero(t, throttler.throttlerRunning.Get())
}

0 comments on commit 22945c1

Please sign in to comment.