Skip to content

Commit

Permalink
throttler: towards formal gRPC calls in endtoend tests, removing HTTP…
Browse files Browse the repository at this point in the history
… API calls

Signed-off-by: Shlomi Noach <2607934+shlomi-noach@users.noreply.github.com>
  • Loading branch information
shlomi-noach committed Aug 5, 2024
1 parent 33030b7 commit c16b81a
Show file tree
Hide file tree
Showing 7 changed files with 60 additions and 188 deletions.
67 changes: 5 additions & 62 deletions go/test/endtoend/onlineddl/flow/onlineddl_flow_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,7 @@ import (
"context"
"flag"
"fmt"
"io"
"math/rand/v2"
"net/http"
"os"
"path"
"runtime"
Expand All @@ -62,6 +60,7 @@ import (
"vitess.io/vitess/go/test/endtoend/onlineddl"
"vitess.io/vitess/go/test/endtoend/throttler"
"vitess.io/vitess/go/vt/log"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet"
throttlebase "vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/base"
Expand Down Expand Up @@ -244,9 +243,9 @@ func TestSchemaChange(t *testing.T) {
ticker := time.NewTicker(500 * time.Millisecond)
defer ticker.Stop()
for {
_, statusCode, err := throttlerCheck(primaryTablet.VttabletProcess, throttlerapp.OnlineDDLName)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil)
assert.NoError(t, err)
throttleWorkload.Store(statusCode != http.StatusOK)
throttleWorkload.Store(resp.Check.ResponseCode != tabletmanagerdatapb.CheckThrottlerResponseCode_OK)
select {
case <-ticker.C:
case <-workloadCtx.Done():
Expand Down Expand Up @@ -286,7 +285,7 @@ func TestSchemaChange(t *testing.T) {
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, false)
onlineddl.ThrottleAllMigrations(t, &vtParams)
onlineddl.CheckThrottledApps(t, &vtParams, throttlerapp.OnlineDDLName, true)
waitForThrottleCheckStatus(t, throttlerapp.OnlineDDLName, primaryTablet, http.StatusExpectationFailed)
throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_THRESHOLD_EXCEEDED, migrationWaitTimeout)
})
t.Run("unthrottle online-ddl", func(t *testing.T) {
onlineddl.UnthrottleAllMigrations(t, &vtParams)
Expand All @@ -296,7 +295,7 @@ func TestSchemaChange(t *testing.T) {

t.Logf("Throttler status: %+v", status)
}
waitForThrottleCheckStatus(t, throttlerapp.OnlineDDLName, primaryTablet, http.StatusOK)
throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, migrationWaitTimeout)
})
t.Run("apply more DML", func(t *testing.T) {
// Looking to run a substantial amount of DML, giving vreplication
Expand Down Expand Up @@ -610,59 +609,3 @@ func initTable(t *testing.T) {
assert.Greater(t, appliedDMLEnd, appliedDMLStart)
assert.GreaterOrEqual(t, appliedDMLEnd-appliedDMLStart, int64(maxTableRows))
}

func throttleResponse(tablet *cluster.VttabletProcess, path string) (respBody string, err error) {
apiURL := fmt.Sprintf("http://%s:%d/%s", tablet.TabletHostname, tablet.Port, path)
resp, err := httpClient.Get(apiURL)
if err != nil {
return "", err
}
defer resp.Body.Close()
b, err := io.ReadAll(resp.Body)
respBody = string(b)
return respBody, err
}

func throttleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/throttle-app?app=%s&duration=1h", throttlerApp.String()))
}

func unthrottleApp(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (string, error) {
return throttleResponse(tablet, fmt.Sprintf("throttler/unthrottle-app?app=%s", throttlerApp.String()))
}

func throttlerCheck(tablet *cluster.VttabletProcess, throttlerApp throttlerapp.Name) (respBody string, statusCode int, err error) {
apiURL := fmt.Sprintf("http://%s:%d/throttler/check?app=%s", tablet.TabletHostname, tablet.Port, throttlerApp.String())
resp, err := httpClient.Get(apiURL)
if err != nil {
return "", 0, err
}
defer resp.Body.Close()
statusCode = resp.StatusCode
b, err := io.ReadAll(resp.Body)
respBody = string(b)
return respBody, statusCode, err
}

// waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check
func waitForThrottleCheckStatus(t *testing.T, throttlerApp throttlerapp.Name, tablet *cluster.Vttablet, wantCode int) {
ctx, cancel := context.WithTimeout(context.Background(), migrationWaitTimeout)
defer cancel()
ticker := time.NewTicker(time.Second)
defer ticker.Stop()

for {
respBody, statusCode, err := throttlerCheck(tablet.VttabletProcess, throttlerApp)
require.NoError(t, err)

if wantCode == statusCode {
return
}
select {
case <-ctx.Done():
assert.Equalf(t, wantCode, statusCode, "body: %s", respBody)
return
case <-ticker.C:
}
}
}
6 changes: 3 additions & 3 deletions go/test/endtoend/onlineddl/revert/onlineddl_revert_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"flag"
"fmt"
"math/rand/v2"
"net/http"
"os"
"path"
"strings"
Expand All @@ -33,6 +32,7 @@ import (
"vitess.io/vitess/go/mysql"
"vitess.io/vitess/go/mysql/capabilities"
"vitess.io/vitess/go/vt/log"
tabletmanagerdatapb "vitess.io/vitess/go/vt/proto/tabletmanagerdata"
"vitess.io/vitess/go/vt/schema"
"vitess.io/vitess/go/vt/vttablet/tabletserver/throttle/throttlerapp"

Expand Down Expand Up @@ -209,7 +209,7 @@ func TestSchemaChange(t *testing.T) {
require.Equal(t, 1, len(shards))

throttler.EnableLagThrottlerAndWaitForStatus(t, clusterInstance)
throttler.WaitForCheckThrottlerResult(t, clusterInstance, primaryTablet, throttlerapp.TestingName, nil, http.StatusOK, time.Minute)
throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.TestingName, nil, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, time.Minute)

t.Run("revertible", testRevertible)
t.Run("revert", testRevert)
Expand Down Expand Up @@ -415,7 +415,7 @@ func testRevertible(t *testing.T) {
toStatement := fmt.Sprintf(createTableWrapper, testcase.toSchema)
uuid = testOnlineDDLStatement(t, toStatement, ddlStrategy, "vtgate", tableName, "")
if !onlineddl.CheckMigrationStatus(t, &vtParams, shards, uuid, schema.OnlineDDLStatusComplete) {
resp, err := throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.TestingName, nil)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.TestingName, nil)
assert.NoError(t, err)
fmt.Println("Throttler check response: ", resp)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,7 @@ func waitForMessage(t *testing.T, uuid string, messageSubstring string) {
case <-ticker.C:
case <-ctx.Done():
{
resp, err := throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.TestingName, nil)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.TestingName, nil)
assert.NoError(t, err)
fmt.Println("Throttler check response: ", resp)

Expand Down Expand Up @@ -612,7 +612,7 @@ func testScheduler(t *testing.T) {
ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
throttler.CheckThrottler(clusterInstance, primaryTablet, throttlerapp.OnlineDDLName, nil)
throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, primaryTablet, throttlerapp.OnlineDDLName, nil)
select {
case <-ticker.C:
case <-ctx.Done():
Expand Down
88 changes: 23 additions & 65 deletions go/test/endtoend/tabletmanager/throttler_topo/throttler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,6 @@ import (
"context"
"flag"
"fmt"
"io"
"net/http"
"os"
"sync"
Expand Down Expand Up @@ -89,14 +88,6 @@ var (
}
}
}`

httpClient = base.SetupHTTPClient(time.Second)
throttledAppsAPIPath = "throttler/throttled-apps"
statusAPIPath = "throttler/status"
getResponseBody = func(resp *http.Response) string {
body, _ := io.ReadAll(resp.Body)
return string(body)
}
)

func TestMain(m *testing.M) {
Expand Down Expand Up @@ -162,26 +153,13 @@ func TestMain(m *testing.M) {
os.Exit(exitCode)
}

func throttledApps(tablet *cluster.Vttablet) (resp *http.Response, respBody string, err error) {
resp, err = httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, throttledAppsAPIPath))
if err != nil {
return resp, respBody, err
}
b, err := io.ReadAll(resp.Body)
if err != nil {
return resp, respBody, err
}
respBody = string(b)
return resp, respBody, err
}

func throttleCheck(tablet *cluster.Vttablet, skipRequestHeartbeats bool) (*vtctldatapb.CheckThrottlerResponse, error) {
flags := &throttle.CheckFlags{
Scope: base.ShardScope,
SkipRequestHeartbeats: skipRequestHeartbeats,
MultiMetricsEnabled: true,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, tablet, testAppName, flags)
return resp, err
}

Expand All @@ -190,18 +168,14 @@ func throttleCheckSelf(tablet *cluster.Vttablet) (*vtctldatapb.CheckThrottlerRes
Scope: base.SelfScope,
MultiMetricsEnabled: true,
}
resp, err := throttler.CheckThrottler(clusterInstance, tablet, testAppName, flags)
resp, err := throttler.CheckThrottler(&clusterInstance.VtctldClientProcess, tablet, testAppName, flags)
return resp, err
}

func throttleStatus(t *testing.T, tablet *cluster.Vttablet) string {
resp, err := httpClient.Get(fmt.Sprintf("http://localhost:%d/%s", tablet.HTTPPort, statusAPIPath))
func throttleStatus(t *testing.T, tablet *cluster.Vttablet) *tabletmanagerdatapb.GetThrottlerStatusResponse {
status, err := throttler.GetThrottlerStatus(&clusterInstance.VtctldClientProcess, tablet)
require.NoError(t, err)
defer resp.Body.Close()

b, err := io.ReadAll(resp.Body)
require.NoError(t, err)
return string(b)
return status
}

func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCode {
Expand All @@ -211,32 +185,18 @@ func warmUpHeartbeat(t *testing.T) tabletmanagerdatapb.CheckThrottlerResponseCod
require.NoError(t, err)

time.Sleep(time.Second)
t.Logf("resp.StatusCode: %v", resp.Check.StatusCode)
t.Logf("resp.ResponseCode: %v", resp.Check.ResponseCode)
return throttle.ResponseCodeFromStatus(resp.Check.ResponseCode, int(resp.Check.StatusCode))
}

// waitForThrottleCheckStatus waits for the tablet to return the provided HTTP code in a throttle check
func waitForThrottleCheckStatus(t *testing.T, tablet *cluster.Vttablet, wantCode tabletmanagerdatapb.CheckThrottlerResponseCode) bool {
_ = warmUpHeartbeat(t)
ctx, cancel := context.WithTimeout(context.Background(), onDemandHeartbeatDuration*4)
defer cancel()

ticker := time.NewTicker(time.Second)
defer ticker.Stop()
for {
resp, err := throttleCheck(tablet, true)
require.NoError(t, err)

if wantCode == resp.Check.ResponseCode {
// Wait for any cached check values to be cleared and the new
// status value to be in effect everywhere before returning.
return true
}
select {
case <-ctx.Done():
return assert.EqualValues(t, wantCode, resp.Check.StatusCode, "response: %+v", resp)
case <-ticker.C:
}
}
flags := &throttle.CheckFlags{SkipRequestHeartbeats: true}
_, err := throttler.WaitForCheckThrottlerResult(t, &clusterInstance.VtctldClientProcess, tablet, throttlerapp.OnlineDDLName, flags, wantCode, onDemandHeartbeatDuration*4)
return err == nil
}

func vtgateExec(t *testing.T, query string, expectError string) *sqltypes.Result {
Expand Down Expand Up @@ -332,6 +292,7 @@ func TestInitialThrottler(t *testing.T) {
})
t.Run("requesting heartbeats", func(t *testing.T) {
respStatus := warmUpHeartbeat(t)
t.Logf("respStatus: %v", respStatus)
assert.NotEqual(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, respStatus)
})
t.Run("validating OK response from throttler with low threshold, heartbeats running", func(t *testing.T) {
Expand All @@ -344,7 +305,7 @@ func TestInitialThrottler(t *testing.T) {
assert.Equal(t, base.ShardScope.String(), metrics.Scope)
}

if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
Expand All @@ -369,7 +330,7 @@ func TestInitialThrottler(t *testing.T) {
for _, metrics := range resp.Check.Metrics {
assert.Equal(t, base.ShardScope.String(), metrics.Scope)
}
if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
rs, err := replicaTablet.VttabletProcess.QueryTablet("show replica status", keyspaceName, false)
assert.NoError(t, err)
t.Logf("Seconds_Behind_Source: %s", rs.Named().Row()["Seconds_Behind_Source"].ToString())
Expand Down Expand Up @@ -442,22 +403,19 @@ func TestThrottlerAfterMetricsCollected(t *testing.T) {
waitForThrottleCheckStatus(t, primaryTablet, tabletmanagerdatapb.CheckThrottlerResponseCode_OK)
})
t.Run("validating throttled apps", func(t *testing.T) {
resp, body, err := throttledApps(primaryTablet)
require.NoError(t, err)
defer resp.Body.Close()
assert.Equalf(t, http.StatusOK, resp.StatusCode, "Unexpected response from throttler: %s", getResponseBody(resp))
assert.Contains(t, body, throttlerapp.TestingAlwaysThrottlerName)
status := throttleStatus(t, primaryTablet)
assert.Contains(t, status.ThrottledApps, throttlerapp.TestingAlwaysThrottlerName.String())
})
t.Run("validating primary check self", func(t *testing.T) {
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("validating replica check self", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
}
Expand Down Expand Up @@ -496,7 +454,7 @@ func TestLag(t *testing.T) {
assert.Equal(t, base.SelfScope.String(), metrics.Scope)
}
// self (on primary) is unaffected by replication lag
if !assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp) {
if !assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp) {
t.Logf("throttler primary status: %+v", throttleStatus(t, primaryTablet))
t.Logf("throttler replica status: %+v", throttleStatus(t, replicaTablet))
}
Expand Down Expand Up @@ -590,13 +548,13 @@ func TestLag(t *testing.T) {
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
// self (on primary) is unaffected by replication lag
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("replica self-check should be fine", func(t *testing.T) {
resp, err := throttleCheckSelf(replicaTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
}
Expand Down Expand Up @@ -641,7 +599,7 @@ func TestCustomQuery(t *testing.T) {
throttler.WaitForValidData(t, primaryTablet, throttlerEnabledTimeout)
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("test threads running", func(t *testing.T) {
Expand Down Expand Up @@ -680,7 +638,7 @@ func TestCustomQuery(t *testing.T) {
{
resp, err := throttleCheckSelf(primaryTablet)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
}
})
Expand All @@ -704,7 +662,7 @@ func TestRestoreDefaultQuery(t *testing.T) {
t.Run("validating OK response from throttler with default threshold, heartbeats running", func(t *testing.T) {
resp, err := throttleCheck(primaryTablet, false)
require.NoError(t, err)
assert.EqualValues(t, http.StatusOK, resp.Check.StatusCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
assert.EqualValues(t, tabletmanagerdatapb.CheckThrottlerResponseCode_OK, resp.Check.ResponseCode, "Unexpected response from throttler: %+v", resp)
})
t.Run("validating pushback response from throttler on default threshold once heartbeats go stale", func(t *testing.T) {
Expand Down
Loading

0 comments on commit c16b81a

Please sign in to comment.