From 7a086fc0267feda1e95f4e0fbf1397588c32c928 Mon Sep 17 00:00:00 2001 From: "Eduardo J. Ortega U" <5791035+ejortegau@users.noreply.github.com> Date: Mon, 12 Feb 2024 11:36:36 +0100 Subject: [PATCH] Address upstream PR comments Signed-off-by: Eduardo J. Ortega U <5791035+ejortegau@users.noreply.github.com> --- go/vt/throttler/throttler.go | 4 +- .../txthrottler/mock_throttler_test.go | 12 +++--- .../tabletserver/txthrottler/tx_throttler.go | 40 +++++++++---------- .../txthrottler/tx_throttler_test.go | 16 ++++---- 4 files changed, 35 insertions(+), 37 deletions(-) diff --git a/go/vt/throttler/throttler.go b/go/vt/throttler/throttler.go index 73ad66fe92d..c071f0694a6 100644 --- a/go/vt/throttler/throttler.go +++ b/go/vt/throttler/throttler.go @@ -226,9 +226,9 @@ func (t *Throttler) Throttle(threadID int) time.Duration { return t.threadThrottlers[threadID].throttle(t.nowFunc()) } -// LastMaxLagNotIgnoredForTabletType returns the max of all the last replication lag values seen across all tablets of +// MaxLag returns the max of all the last replication lag values seen across all tablets of // the provided type, excluding ignored tablets. -func (t *Throttler) LastMaxLagNotIgnoredForTabletType(tabletType topodata.TabletType) uint32 { +func (t *Throttler) MaxLag(tabletType topodata.TabletType) uint32 { cache := t.maxReplicationLagModule.lagCacheByType(tabletType) var maxLag uint32 diff --git a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go index 75daf37c5a0..76a02c121b7 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/mock_throttler_test.go @@ -64,18 +64,18 @@ func (mr *MockThrottlerInterfaceMockRecorder) GetConfiguration() *gomock.Call { return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "GetConfiguration", reflect.TypeOf((*MockThrottlerInterface)(nil).GetConfiguration)) } -// LastMaxLagNotIgnoredForTabletType mocks base method. -func (m *MockThrottlerInterface) LastMaxLagNotIgnoredForTabletType(tabletType topodata.TabletType) uint32 { +// MaxLag mocks base method. +func (m *MockThrottlerInterface) MaxLag(tabletType topodata.TabletType) uint32 { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "LastMaxLagNotIgnoredForTabletType", tabletType) + ret := m.ctrl.Call(m, "MaxLag", tabletType) ret0, _ := ret[0].(uint32) return ret0 } -// LastMaxLagNotIgnoredForTabletType indicates an expected call of LastMaxLagNotIgnoredForTabletType. -func (mr *MockThrottlerInterfaceMockRecorder) LastMaxLagNotIgnoredForTabletType(tabletType interface{}) *gomock.Call { +// MaxLag indicates an expected call of MaxLag. +func (mr *MockThrottlerInterfaceMockRecorder) MaxLag(tabletType interface{}) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "LastMaxLagNotIgnoredForTabletType", reflect.TypeOf((*MockThrottlerInterface)(nil).LastMaxLagNotIgnoredForTabletType), tabletType) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "MaxLag", reflect.TypeOf((*MockThrottlerInterface)(nil).MaxLag), tabletType) } // MaxRate mocks base method. diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go index 2ecc0b8fa45..18593bc73cb 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler.go @@ -87,7 +87,7 @@ type ThrottlerInterface interface { GetConfiguration() *throttlerdatapb.Configuration UpdateConfiguration(configuration *throttlerdatapb.Configuration, copyZeroValues bool) error ResetConfiguration() - LastMaxLagNotIgnoredForTabletType(tabletType topodatapb.TabletType) uint32 + MaxLag(tabletType topodatapb.TabletType) uint32 } // TopologyWatcherInterface defines the public interface that is implemented by @@ -187,9 +187,9 @@ type txThrottlerStateImpl struct { healthCheck discovery.LegacyHealthCheck topologyWatchers []TopologyWatcherInterface - shardMaxLag int64 - endChannel chan bool - endWaitGroup sync.WaitGroup + maxLag int64 + done chan bool + waitForTermination sync.WaitGroup } // NewTxThrottler tries to construct a txThrottler from the @@ -307,7 +307,7 @@ func (t *txThrottler) Throttle(priority int, workload string) (result bool) { // Throttle according to both what the throttler state says and the priority. Workloads with lower priority value // are less likely to be throttled. - result = t.state.throttle() && rand.Intn(sqlparser.MaxPriorityValue) < priority + result = rand.Intn(sqlparser.MaxPriorityValue) < priority && t.state.throttle() t.requestsTotal.Add(workload, 1) if result { @@ -335,9 +335,9 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar return nil, err } result := &txThrottlerStateImpl{ - config: config, - throttler: t, - endChannel: make(chan bool, 1), + config: config, + throttler: t, + done: make(chan bool, 1), } result.healthCheck = healthCheckFactory() result.healthCheck.SetListener(result, false /* sendDownEvents */) @@ -356,8 +356,8 @@ func newTxThrottlerState(topoServer *topo.Server, config *txThrottlerConfig, tar discovery.DefaultTopoReadConcurrency)) } - result.endWaitGroup.Add(1) - go result.updateMaxShardLag() + result.waitForTermination.Add(1) + go result.updateMaxLag() return result, nil } @@ -371,14 +371,14 @@ func (ts *txThrottlerStateImpl) throttle() bool { ts.throttleMu.Lock() defer ts.throttleMu.Unlock() - maxLag := atomic.LoadInt64(&ts.shardMaxLag) + maxLag := atomic.LoadInt64(&ts.maxLag) - return ts.throttler.Throttle(0 /* threadId */) > 0 && - maxLag > ts.config.throttlerConfig.TargetReplicationLagSec + return maxLag > ts.config.throttlerConfig.TargetReplicationLagSec && + ts.throttler.Throttle(0 /* threadId */) > 0 } -func (ts *txThrottlerStateImpl) updateMaxShardLag() { - defer ts.endWaitGroup.Done() +func (ts *txThrottlerStateImpl) updateMaxLag() { + defer ts.waitForTermination.Done() // We use half of the target lag to ensure we have enough resolution to see changes in lag below that value ticker := time.NewTicker(time.Duration(ts.config.throttlerConfig.TargetReplicationLagSec/2) * time.Second) outerloop: @@ -388,13 +388,13 @@ outerloop: var maxLag uint32 for _, tabletType := range ts.config.tabletTypes { - maxLagPerTabletType := ts.throttler.LastMaxLagNotIgnoredForTabletType(tabletType) + maxLagPerTabletType := ts.throttler.MaxLag(tabletType) if maxLagPerTabletType > maxLag { maxLag = maxLagPerTabletType } } - atomic.StoreInt64(&ts.shardMaxLag, int64(maxLag)) - case <-ts.endChannel: + atomic.StoreInt64(&ts.maxLag, int64(maxLag)) + case <-ts.done: break outerloop } } @@ -412,8 +412,8 @@ func (ts *txThrottlerStateImpl) deallocateResources() { ts.healthCheck.Close() ts.healthCheck = nil - ts.endChannel <- true - ts.endWaitGroup.Wait() + ts.done <- true + ts.waitForTermination.Wait() // After ts.healthCheck is closed txThrottlerStateImpl.StatsUpdate() is guaranteed not // to be executing, so we can safely close the throttler. ts.throttler.Close() diff --git a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go index 8311638e017..a3e4d219da3 100644 --- a/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go +++ b/go/vt/vttablet/tabletserver/txthrottler/tx_throttler_test.go @@ -116,14 +116,12 @@ func TestEnabledThrottler(t *testing.T) { calls = append(calls, call) // 3 - call = mockThrottler.EXPECT().Throttle(0) - call.Return(1 * time.Second) - calls = append(calls, call) + // Nothing gets mocked here because the order of evaluation in txThrottler.Throttle() evaluates first + // whether the priority allows for throttling or not, so no need to mock calls in mockThrottler.Throttle() // 4 - call = mockThrottler.EXPECT().Throttle(0) - call.Return(1 * time.Second) - calls = append(calls, call) + // Nothing gets mocked here because the order of evaluation in txThrottlerStateImpl.Throttle() evaluates first + // whether there is lag or not, so no call to the underlying mockThrottler is issued. call = mockThrottler.EXPECT().Close() calls = append(calls, call) @@ -151,10 +149,10 @@ func TestEnabledThrottler(t *testing.T) { assert.True(t, ok) // Stop the go routine that keeps updating the cached shard's max lag to preventi it from changing the value in a // way that will interfere with how we manipulate that value in our tests to evaluate different cases: - throttlerImpl.endChannel <- true + throttlerImpl.done <- true // 1 should not throttle due to return value of underlying Throttle(), despite high lag - atomic.StoreInt64(&throttlerImpl.shardMaxLag, 20) + atomic.StoreInt64(&throttlerImpl.maxLag, 20) assert.False(t, throttler.Throttle(100, "some-workload")) assert.Equal(t, int64(1), throttler.requestsTotal.Counts()["some-workload"]) assert.Zero(t, throttler.requestsThrottled.Counts()["some-workload"]) @@ -179,7 +177,7 @@ func TestEnabledThrottler(t *testing.T) { assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"]) // 4 should not throttle despite return value of underlying Throttle() and priority = 100, due to low lag - atomic.StoreInt64(&throttlerImpl.shardMaxLag, 1) + atomic.StoreInt64(&throttlerImpl.maxLag, 1) assert.False(t, throttler.Throttle(100, "some-workload")) assert.Equal(t, int64(4), throttler.requestsTotal.Counts()["some-workload"]) assert.Equal(t, int64(1), throttler.requestsThrottled.Counts()["some-workload"])