From c16b81a84f3ca9d1c540c4c54d437916556a5ca0 Mon Sep 17 00:00:00 2001 From: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> Date: Mon, 5 Aug 2024 09:13:58 +0300 Subject: [PATCH] throttler: towards formal gRPC calls in endtoend tests, removing HTTP API calls Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com> --- .../onlineddl/flow/onlineddl_flow_test.go | 67 ++------------ .../onlineddl/revert/onlineddl_revert_test.go | 6 +- .../scheduler/onlineddl_scheduler_test.go | 4 +- .../throttler_topo/throttler_test.go | 88 +++++-------------- go/test/endtoend/throttler/util.go | 30 +++---- go/test/endtoend/vreplication/helper_test.go | 29 ++---- .../vreplication/vreplication_test.go | 24 ++--- 7 files changed, 60 insertions(+), 188 deletions(-) diff --git a/go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go b/go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go index 58e985296b5..ae8d7d8073b 100644 --- a/go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go +++ b/go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go @@ -42,9 +42,7 @@ import ( "context" "flag" "fmt" - "io" "math/rand/v2" - "net/http" "os" "path" "runtime" @@ -62,6 +60,7 @@ import ( "vitess.io/vitess/go/test/endtoend/onlineddl" "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/log" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/vttablet" throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" @@ -244,9 +243,9 @@ func TestSchemaChange(t *testing.T) { ticker := time.NewTicker(500 * time.Millisecond) defer ticker.Stop() for { - _, statusCode, err := throttlerCheck(primaryTablet.VttabletProcess, throttlerapp.OnlineDDLName) + resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil) assert.NoError(t, err) - throttleWorkload.Store(statusCode != http.StatusOK) + throttleWorkload.Store(resp.Check.ResponseCode != tabletmanagerdatapb.CheckThrottlerResponseCode_OK) select { case <-ticker.C: case <-workloadCtx.Done(): @@ -286,7 +285,7 @@ func TestSchemaChange(t *testing.T) { onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false) onlineddl.ThrottleAllMigrations(t, &vtParams) onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true) - waitForThrottleCheckStatus(t, throttlerapp.OnlineDDLName, primaryTablet, http.StatusExpectationFailed) + throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED, migrationWaitTimeout) }) t.Run("unthrottle online-ddl", func(t *testing.T) { onlineddl.UnthrottleAllMigrations(t, &vtParams) @@ -296,7 +295,7 @@ func TestSchemaChange(t *testing.T) { t.Logf("Throttler status: %+v", status) } - waitForThrottleCheckStatus(t, throttlerapp.OnlineDDLName, primaryTablet, http.StatusOK) + throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, migrationWaitTimeout) }) t.Run("apply more DML", func(t *testing.T) { // Looking to run a substantial amount of DML, giving vreplication @@ -610,59 +609,3 @@ func initTable(t *testing.T) { assert.Greater(t, appliedDMLEnd, appliedDMLStart) assert.GreaterOrEqual(t, appliedDMLEnd-appliedDMLStart, int64(maxTableRows)) } - -func throttleResponse(tablet *cluster.VttabletProcess, path string) (respBody string, err error) { - apiURL := fmt.Sprintf("http://%s:%d/%s", tablet.TabletHostname, tablet.Port, path) - resp, err := httpClient.Get(apiURL) - if err != nil { - return "", err - } - defer resp.Body.Close() - b, err := io.ReadAll(resp.Body) - respBody = string(b) - return respBody, err -} - -func throttleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp.String())) -} - -func unthrottleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) { - return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp.String())) -} - -func throttlerCheck(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, statusCode int, err error) { - apiURL := fmt.Sprintf("http://%s:%d/throttler/check?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String()) - resp, err := httpClient.Get(apiURL) - if err != nil { - return "", 0, err - } - defer resp.Body.Close() - statusCode = resp.StatusCode - b, err := io.ReadAll(resp.Body) - respBody = string(b) - return respBody, statusCode, err -} - -// waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check -func waitForThrottleCheckStatus(t *testing.T, throttlerApp throttlerapp.Name, tablet *cluster.Vttablet, wantCode int) { - ctx, cancel := context.WithTimeout(context.Background(), migrationWaitTimeout) - defer cancel() - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - - for { - respBody, statusCode, err := throttlerCheck(tablet.VttabletProcess, throttlerApp) - require.NoError(t, err) - - if wantCode == statusCode { - return - } - select { - case <-ctx.Done(): - assert.Equalf(t, wantCode, statusCode, "body: %s", respBody) - return - case <-ticker.C: - } - } -} diff --git a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go index 3a963c85ce2..446259e51ac 100644 --- a/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go +++ b/go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go @@ -21,7 +21,6 @@ import ( "flag" "fmt" "math/rand/v2" - "net/http" "os" "path" "strings" @@ -33,6 +32,7 @@ import ( "vitess.io/vitess/go/mysql" "vitess.io/vitess/go/mysql/capabilities" "vitess.io/vitess/go/vt/log" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" @@ -209,7 +209,7 @@ func TestSchemaChange(t *testing.T) { require.Equal(t, 1, len(shards)) throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance) - throttler.WaitForCheckThrottlerResult(t, clusterInstance, primaryTablet, throttlerapp.TestingName, nil, http.StatusOK, time.Minute) + throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.TestingName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, time.Minute) t.Run("revertible", testRevertible) t.Run("revert", testRevert) @@ -415,7 +415,7 @@ func testRevertible(t *testing.T) { toStatement := fmt.Sprintf(createTableWrapper, testcase.toSchema) uuid = testOnlineDDLStatement(t, toStatement, ddlStrategy, "vtgate", tableName, "") if !onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) { - resp, err := throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.TestingName, nil) + resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.TestingName, nil) assert.NoError(t, err) fmt.Println("Throttler check response: ", resp) diff --git a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go index a034aa6d65a..689fdbc9a9f 100644 --- a/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go +++ b/go/test/endtoend/onlineddl/scheduler/onlineddl_scheduler_test.go @@ -224,7 +224,7 @@ func waitForMessage(t *testing.T, uuid string, messageSubstring string) { case <-ticker.C: case <-ctx.Done(): { - resp, err := throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.TestingName, nil) + resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.TestingName, nil) assert.NoError(t, err) fmt.Println("Throttler check response: ", resp) @@ -612,7 +612,7 @@ func testScheduler(t *testing.T) { ticker := time.NewTicker(time.Second) defer ticker.Stop() for { - throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.OnlineDDLName, nil) + throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil) select { case <-ticker.C: case <-ctx.Done(): diff --git a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go index 08cea643940..154024462fc 100644 --- a/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go +++ b/go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go @@ -19,7 +19,6 @@ import ( "context" "flag" "fmt" - "io" "net/http" "os" "sync" @@ -89,14 +88,6 @@ var ( } } }` - - httpClient = base.SetupHTTPClient(time.Second) - throttledAppsAPIPath = "throttler/throttled-apps" - statusAPIPath = "throttler/status" - getResponseBody = func(resp *http.Response) string { - body, _ := io.ReadAll(resp.Body) - return string(body) - } ) func TestMain(m *testing.M) { @@ -162,26 +153,13 @@ func TestMain(m *testing.M) { os.Exit(exitCode) } -func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody string, err error) { - resp, err = httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, throttledAppsAPIPath)) - if err != nil { - return resp, respBody, err - } - b, err := io.ReadAll(resp.Body) - if err != nil { - return resp, respBody, err - } - respBody = string(b) - return resp, respBody, err -} - func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*vtctldatapb.CheckThrottlerResponse, error) { flags := &throttle.CheckFlags{ Scope: base.ShardScope, SkipRequestHeartbeats: skipRequestHeartbeats, MultiMetricsEnabled: true, } - resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags) + resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, tablet, testAppName, flags) return resp, err } @@ -190,18 +168,14 @@ func throttleCheckSelf(tablet *cluster.Vttablet) (*vtctldatapb.CheckThrottlerRes Scope: base.SelfScope, MultiMetricsEnabled: true, } - resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags) + resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, tablet, testAppName, flags) return resp, err } -func throttleStatus(t *testing.T, tablet *cluster.Vttablet) string { - resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, statusAPIPath)) +func throttleStatus(t *testing.T, tablet *cluster.Vttablet) *tabletmanagerdatapb.GetThrottlerStatusResponse { + status, err := throttler.GetThrottlerStatus(&clusterInstance.VtctldClientProcess, tablet) require.NoError(t, err) - defer resp.Body.Close() - - b, err := io.ReadAll(resp.Body) - require.NoError(t, err) - return string(b) + return status } func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCode { @@ -211,32 +185,18 @@ func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCod require.NoError(t, err) time.Sleep(time.Second) + t.Logf("resp.StatusCode: %v", resp.Check.StatusCode) + t.Logf("resp.ResponseCode: %v", resp.Check.ResponseCode) return throttle.ResponseCodeFromStatus(resp.Check.ResponseCode, int(resp.Check.StatusCode)) } // waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode tabletmanagerdatapb.CheckThrottlerResponseCode) bool { _ = warmUpHeartbeat(t) - ctx, cancel := context.WithTimeout(context.Background(), onDemandHeartbeatDuration*4) - defer cancel() - - ticker := time.NewTicker(time.Second) - defer ticker.Stop() - for { - resp, err := throttleCheck(tablet, true) - require.NoError(t, err) - if wantCode == resp.Check.ResponseCode { - // Wait for any cached check values to be cleared and the new - // status value to be in effect everywhere before returning. - return true - } - select { - case <-ctx.Done(): - return assert.EqualValues(t, wantCode, resp.Check.StatusCode, "response: %+v", resp) - case <-ticker.C: - } - } + flags := &throttle.CheckFlags{SkipRequestHeartbeats: true} + _, err := throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, tablet, throttlerapp.OnlineDDLName, flags, wantCode, onDemandHeartbeatDuration*4) + return err == nil } func vtgateExec(t *testing.T, query string, expectError string) *sqltypes.Result { @@ -332,6 +292,7 @@ func TestInitialThrottler(t *testing.T) { }) t.Run("requesting heartbeats", func(t *testing.T) { respStatus := warmUpHeartbeat(t) + t.Logf("respStatus: %v", respStatus) assert.NotEqual(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, respStatus) }) t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) { @@ -344,7 +305,7 @@ func TestInitialThrottler(t *testing.T) { assert.Equal(t, base.ShardScope.String(), metrics.Scope) } - if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) { + if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) { rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false) assert.NoError(t, err) t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString()) @@ -369,7 +330,7 @@ func TestInitialThrottler(t *testing.T) { for _, metrics := range resp.Check.Metrics { assert.Equal(t, base.ShardScope.String(), metrics.Scope) } - if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) { + if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) { rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false) assert.NoError(t, err) t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString()) @@ -442,22 +403,19 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) { waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK) }) t.Run("validating throttled apps", func(t *testing.T) { - resp, body, err := throttledApps(primaryTablet) - require.NoError(t, err) - defer resp.Body.Close() - assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp)) - assert.Contains(t, body, throttlerapp.TestingAlwaysThrottlerName) + status := throttleStatus(t, primaryTablet) + assert.Contains(t, status.ThrottledApps, throttlerapp.TestingAlwaysThrottlerName.String()) }) t.Run("validating primary check self", func(t *testing.T) { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) - assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) + assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) }) t.Run("validating replica check self", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) require.NoError(t, err) - assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) + assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) }) } @@ -496,7 +454,7 @@ func TestLag(t *testing.T) { assert.Equal(t, base.SelfScope.String(), metrics.Scope) } // self (on primary) is unaffected by replication lag - if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) { + if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) { t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet)) t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet)) } @@ -590,13 +548,13 @@ func TestLag(t *testing.T) { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) // self (on primary) is unaffected by replication lag - assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) + assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) }) t.Run("replica self-check should be fine", func(t *testing.T) { resp, err := throttleCheckSelf(replicaTablet) require.NoError(t, err) - assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) + assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) }) } @@ -641,7 +599,7 @@ func TestCustomQuery(t *testing.T) { throttler.WaitForValidData(t, primaryTablet, throttlerEnabledTimeout) resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) - assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) + assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) }) t.Run("test threads running", func(t *testing.T) { @@ -680,7 +638,7 @@ func TestCustomQuery(t *testing.T) { { resp, err := throttleCheckSelf(primaryTablet) require.NoError(t, err) - assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) + assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) } }) @@ -704,7 +662,7 @@ func TestRestoreDefaultQuery(t *testing.T) { t.Run("validating OK response from throttler with default threshold, heartbeats running", func(t *testing.T) { resp, err := throttleCheck(primaryTablet, false) require.NoError(t, err) - assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) + assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) }) t.Run("validating pushback response from throttler on default threshold once heartbeats go stale", func(t *testing.T) { diff --git a/go/test/endtoend/throttler/util.go b/go/test/endtoend/throttler/util.go index fccad19c324..8ac4143cd2b 100644 --- a/go/test/endtoend/throttler/util.go +++ b/go/test/endtoend/throttler/util.go @@ -18,7 +18,6 @@ package throttler import ( "context" - "encoding/json" "fmt" "io" "net/http" @@ -168,8 +167,8 @@ func UpdateThrottlerTopoConfigRaw( } // CheckThrottler runs vtctldclient CheckThrottler. -func CheckThrottler(clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, appName throttlerapp.Name, flags *throttle.CheckFlags) (*vtctldatapb.CheckThrottlerResponse, error) { - output, err := CheckThrottlerRaw(&clusterInstance.VtctldClientProcess, tablet, appName, flags) +func CheckThrottler(vtctldProcess *cluster.VtctldClientProcess, tablet *cluster.Vttablet, appName throttlerapp.Name, flags *throttle.CheckFlags) (*vtctldatapb.CheckThrottlerResponse, error) { + output, err := CheckThrottlerRaw(vtctldProcess, tablet, appName, flags) if err != nil { return nil, err } @@ -340,7 +339,7 @@ func WaitUntilTabletsConfirmThrottledApp(t *testing.T, clusterInstance *cluster. for _, ks := range clusterInstance.Keyspaces { for _, shard := range ks.Shards { for _, tablet := range shard.Vttablets { - WaitForThrottledApp(t, tablet, throttlerApp, expectThrottled, ConfigTimeout) + WaitForThrottledApp(t, &clusterInstance.VtctldClientProcess, tablet, throttlerApp, expectThrottled, ConfigTimeout) } } } @@ -424,8 +423,7 @@ func WaitForThrottlerStatusEnabled(t *testing.T, vtctldProcess *cluster.VtctldCl // WaitForThrottlerStatusEnabled waits for a tablet to report its throttler status as // enabled/disabled and have the provided config (if any) until the specified timeout. -func WaitForThrottledApp(t *testing.T, tablet *cluster.Vttablet, throttlerApp throttlerapp.Name, expectThrottled bool, timeout time.Duration) { - throttledAppsURL := fmt.Sprintf("http://localhost:%d/throttler/throttled-apps", tablet.HTTPPort) +func WaitForThrottledApp(t *testing.T, vtctldProcess *cluster.VtctldClientProcess, tablet *cluster.Vttablet, throttlerApp throttlerapp.Name, expectThrottled bool, timeout time.Duration) { tabletURL := fmt.Sprintf("http://localhost:%d/debug/status_details", tablet.HTTPPort) ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() @@ -433,14 +431,14 @@ func WaitForThrottledApp(t *testing.T, tablet *cluster.Vttablet, throttlerApp th defer ticker.Stop() for { - throttledAppsBody := getHTTPBody(throttledAppsURL) - var throttledApps []base.AppThrottle - err := json.Unmarshal([]byte(throttledAppsBody), &throttledApps) - assert.NoError(t, err) + status, err := GetThrottlerStatus(vtctldProcess, tablet) + require.NoError(t, err) + throttledApps := status.ThrottledApps require.NotEmpty(t, throttledApps) // "always-throttled-app" is always there. appFoundThrottled := false for _, throttledApp := range throttledApps { - if throttledApp.AppName == throttlerApp.String() && throttledApp.ExpireAt.After(time.Now()) { + expiresAt := protoutil.TimeFromProto(throttledApp.ExpiresAt) + if throttledApp.Name == throttlerApp.String() && expiresAt.After(time.Now()) { appFoundThrottled = true break } @@ -460,8 +458,8 @@ func WaitForThrottledApp(t *testing.T, tablet *cluster.Vttablet, throttlerApp th } select { case <-ctx.Done(): - assert.Fail(t, "timeout", "waiting for the %s tablet's throttled apps with the correct config (expecting %s to be %v) after %v; last seen value: %s", - tablet.Alias, throttlerApp.String(), expectThrottled, timeout, throttledAppsBody) + assert.Fail(t, "timeout", "waiting for the %s tablet's throttled apps with the correct config (expecting %s to be %v) after %v; last seen throttled apps: %+v", + tablet.Alias, throttlerApp.String(), expectThrottled, timeout, throttledApps) return case <-ticker.C: } @@ -485,15 +483,15 @@ func EnableLagThrottlerAndWaitForStatus(t *testing.T, clusterInstance *cluster.L } } -func WaitForCheckThrottlerResult(t *testing.T, clusterInstance *cluster.LocalProcessCluster, tablet *cluster.Vttablet, appName throttlerapp.Name, flags *throttle.CheckFlags, expect int32, timeout time.Duration) (*vtctldatapb.CheckThrottlerResponse, error) { +func WaitForCheckThrottlerResult(t *testing.T, vtctldProcess *cluster.VtctldClientProcess, tablet *cluster.Vttablet, appName throttlerapp.Name, flags *throttle.CheckFlags, wantCode tabletmanagerdatapb.CheckThrottlerResponseCode, timeout time.Duration) (*vtctldatapb.CheckThrottlerResponse, error) { ctx, cancel := context.WithTimeout(context.Background(), timeout) defer cancel() ticker := time.NewTicker(time.Second) defer ticker.Stop() for { - resp, err := CheckThrottler(clusterInstance, tablet, appName, flags) + resp, err := CheckThrottler(vtctldProcess, tablet, appName, flags) require.NoError(t, err) - if resp.Check.StatusCode == expect { + if resp.Check.ResponseCode == wantCode { return resp, nil } select { diff --git a/go/test/endtoend/vreplication/helper_test.go b/go/test/endtoend/vreplication/helper_test.go index b45c09837c9..fb9e7403ede 100644 --- a/go/test/endtoend/vreplication/helper_test.go +++ b/go/test/endtoend/vreplication/helper_test.go @@ -46,6 +46,7 @@ import ( "vitess.io/vitess/go/sqlescape" "vitess.io/vitess/go/sqltypes" "vitess.io/vitess/go/test/endtoend/cluster" + "vitess.io/vitess/go/test/endtoend/throttler" "vitess.io/vitess/go/vt/log" "vitess.io/vitess/go/vt/schema" "vitess.io/vitess/go/vt/sqlparser" @@ -53,6 +54,7 @@ import ( "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp" binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" ) @@ -176,30 +178,9 @@ func waitForQueryResult(t *testing.T, conn *mysql.Conn, database string, query s // waitForTabletThrottlingStatus waits for the tablet to return the provided HTTP code for // the provided app name in its self check. -func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode int64) { - var gotCode int64 - timer := time.NewTimer(defaultTimeout) - defer timer.Stop() - for { - output, err := throttlerCheckSelf(tablet, throttlerApp) - require.NoError(t, err) - - gotCode, err = jsonparser.GetInt([]byte(output), "StatusCode") - require.NoError(t, err) - if wantCode == gotCode { - // Wait for any cached check values to be cleared and the new - // status value to be in effect everywhere before returning. - time.Sleep(500 * time.Millisecond) - return - } - select { - case <-timer.C: - require.FailNow(t, fmt.Sprintf("tablet %q did not return expected status of %d for application %q before the timeout of %s; last seen status: %d", - tablet.Name, wantCode, throttlerApp, defaultTimeout, gotCode)) - default: - time.Sleep(defaultTick) - } - } +func waitForTabletThrottlingStatus(t *testing.T, tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name, wantCode tabletmanagerdatapb.CheckThrottlerResponseCode) { + _, err := throttler.WaitForCheckThrottlerResult(t, vc.VtctldClient, &cluster.Vttablet{Alias: tablet.Name}, throttlerApp, nil, wantCode, defaultTimeout) + require.NoError(t, err) } // waitForNoWorkflowLag waits for the VReplication workflow's MaxVReplicationTransactionLag diff --git a/go/test/endtoend/vreplication/vreplication_test.go b/go/test/endtoend/vreplication/vreplication_test.go index 52874b5839c..6dd86b8a5f9 100644 --- a/go/test/endtoend/vreplication/vreplication_test.go +++ b/go/test/endtoend/vreplication/vreplication_test.go @@ -20,7 +20,6 @@ import ( "context" "fmt" "io" - "net/http" "runtime" "strconv" "strings" @@ -45,6 +44,7 @@ import ( binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" + tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata" topodatapb "vitess.io/vitess/go/vt/proto/topodata" vtgatepb "vitess.io/vitess/go/vt/proto/vtgate" throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base" @@ -71,9 +71,9 @@ const ( merchantKeyspace = "merchant-type" maxWait = 60 * time.Second - BypassLagCheck = true // temporary fix for flakiness seen only in CI when lag check is introduced - throttlerStatusThrottled = http.StatusExpectationFailed // 417 - throttlerStatusNotThrottled = http.StatusOK // 200 + BypassLagCheck = true // temporary fix for flakiness seen only in CI when lag check is introduced + throttlerStatusThrottled = tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED + throttlerStatusNotThrottled = tabletmanagerdatapb.CheckThrottlerResponseCode_OK ) func init() { @@ -98,21 +98,13 @@ func throttleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name } func unthrottleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) { + // clusterInstance :=&cluster.LocalProcessCluster{ + // Keyspaces: []*cluster.Keyspace{}, + // VtctldProcess: , + // } return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp.String())) } -func throttlerCheckSelf(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, err error) { - apiURL := fmt.Sprintf("http://%s:%d/throttler/check-self?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String()) - resp, err := httpClient.Get(apiURL) - if err != nil { - return "", err - } - defer resp.Body.Close() - b, err := io.ReadAll(resp.Body) - respBody = string(b) - return respBody, err -} - // TestVReplicationDDLHandling tests the DDL handling in // VReplication for the values of IGNORE, STOP, and EXEC. // NOTE: this is a manual test. It is not executed in the