Skip to content

Commit

Permalink
Address upstream PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com>
  • Loading branch information
ejortegau committed Feb 12, 2024
1 parent e994868 commit 7a086fc
Show file tree
Hide file tree
Showing 4 changed files with 35 additions and 37 deletions.
4 changes: 2 additions & 2 deletions go/vt/throttler/throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -226,9 +226,9 @@ func (t *Throttler) Throttle(threadID int) time.Duration {
return t.threadThrottlers[threadID].throttle(t.nowFunc())
}

// LastMaxLagNotIgnoredForTabletType returns the max of all the last replication lag values seen across all tablets of
// MaxLag returns the max of all the last replication lag values seen across all tablets of
// the provided type, excluding ignored tablets.
func (t *Throttler) LastMaxLagNotIgnoredForTabletType(tabletType topodata.TabletType) uint32 {
func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 {
cache := t.maxReplicationLagModule.lagCacheByType(tabletType)

var maxLag uint32
Expand Down
12 changes: 6 additions & 6 deletions go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

40 changes: 20 additions & 20 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,7 +87,7 @@ type ThrottlerInterface interface {
GetConfiguration() *throttlerdatapb.Configuration
UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error
ResetConfiguration()
LastMaxLagNotIgnoredForTabletType(tabletType topodatapb.TabletType) uint32
MaxLag(tabletType topodatapb.TabletType) uint32
}

// TopologyWatcherInterface defines the public interface that is implemented by
Expand Down Expand Up @@ -187,9 +187,9 @@ type txThrottlerStateImpl struct {
healthCheck discovery.LegacyHealthCheck
topologyWatchers []TopologyWatcherInterface

shardMaxLag int64
endChannel chan bool
endWaitGroup sync.WaitGroup
maxLag int64
done chan bool
waitForTermination sync.WaitGroup
}

// NewTxThrottler tries to construct a txThrottler from the
Expand Down Expand Up @@ -307,7 +307,7 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) {

// Throttle according to both what the throttler state says and the priority. Workloads with lower priority value
// are less likely to be throttled.
result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority
result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle()

t.requestsTotal.Add(workload, 1)
if result {
Expand Down Expand Up @@ -335,9 +335,9 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
return nil, err
}
result := &txThrottlerStateImpl{
config: config,
throttler: t,
endChannel: make(chan bool, 1),
config: config,
throttler: t,
done: make(chan bool, 1),
}
result.healthCheck = healthCheckFactory()
result.healthCheck.SetListener(result, false /* sendDownEvents */)
Expand All @@ -356,8 +356,8 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar
discovery.DefaultTopoReadConcurrency))
}

result.endWaitGroup.Add(1)
go result.updateMaxShardLag()
result.waitForTermination.Add(1)
go result.updateMaxLag()

return result, nil
}
Expand All @@ -371,14 +371,14 @@ func (ts *txThrottlerStateImpl) throttle() bool {
ts.throttleMu.Lock()
defer ts.throttleMu.Unlock()

maxLag := atomic.LoadInt64(&ts.shardMaxLag)
maxLag := atomic.LoadInt64(&ts.maxLag)

return ts.throttler.Throttle(0 /* threadId */) > 0 &&
maxLag > ts.config.throttlerConfig.TargetReplicationLagSec
return maxLag > ts.config.throttlerConfig.TargetReplicationLagSec &&
ts.throttler.Throttle(0 /* threadId */) > 0
}

func (ts *txThrottlerStateImpl) updateMaxShardLag() {
defer ts.endWaitGroup.Done()
func (ts *txThrottlerStateImpl) updateMaxLag() {
defer ts.waitForTermination.Done()
// We use half of the target lag to ensure we have enough resolution to see changes in lag below that value
ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second)
outerloop:
Expand All @@ -388,13 +388,13 @@ outerloop:
var maxLag uint32

for _, tabletType := range ts.config.tabletTypes {
maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType)
maxLagPerTabletType := ts.throttler.MaxLag(tabletType)
if maxLagPerTabletType > maxLag {
maxLag = maxLagPerTabletType
}
}
atomic.StoreInt64(&ts.shardMaxLag, int64(maxLag))
case <-ts.endChannel:
atomic.StoreInt64(&ts.maxLag, int64(maxLag))
case <-ts.done:
break outerloop
}
}
Expand All @@ -412,8 +412,8 @@ func (ts *txThrottlerStateImpl) deallocateResources() {
ts.healthCheck.Close()
ts.healthCheck = nil

ts.endChannel <- true
ts.endWaitGroup.Wait()
ts.done <- true
ts.waitForTermination.Wait()
// After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not
// to be executing, so we can safely close the throttler.
ts.throttler.Close()
Expand Down
16 changes: 7 additions & 9 deletions go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,14 +116,12 @@ func TestEnabledThrottler(t *testing.T) {
calls = append(calls, call)

// 3
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)
// Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first
// whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle()

// 4
call = mockThrottler.EXPECT().Throttle(0)
call.Return(1 * time.Second)
calls = append(calls, call)
// Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first
// whether there is lag or not, so no call to the underlying mockThrottler is issued.

call = mockThrottler.EXPECT().Close()
calls = append(calls, call)
Expand Down Expand Up @@ -151,10 +149,10 @@ func TestEnabledThrottler(t *testing.T) {
assert.True(t, ok)
// Stop the go routine that keeps updating the cached shard's max lag to preventi it from changing the value in a
// way that will interfere with how we manipulate that value in our tests to evaluate different cases:
throttlerImpl.endChannel <- true
throttlerImpl.done <- true

// 1 should not throttle due to return value of underlying Throttle(), despite high lag
atomic.StoreInt64(&throttlerImpl.shardMaxLag, 20)
atomic.StoreInt64(&throttlerImpl.maxLag, 20)
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"])
assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"])
Expand All @@ -179,7 +177,7 @@ func TestEnabledThrottler(t *testing.T) {
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])

// 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag
atomic.StoreInt64(&throttlerImpl.shardMaxLag, 1)
atomic.StoreInt64(&throttlerImpl.maxLag, 1)
assert.False(t, throttler.Throttle(100, "some-workload"))
assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"])
assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])
Expand Down

0 comments on commit 7a086fc

Please sign in to comment.