Skip to content

Commit

Permalink
added prepared bool to VTGate Execute request to use raw query for ge…
Browse files Browse the repository at this point in the history
…tting plan without parsing and normalizing

Signed-off-by: Harshit Gangal <harshit@planetscale.com>
  • Loading branch information
harshit-gangal committed Feb 25, 2025
1 parent d07da0a commit 530366f
Show file tree
Hide file tree
Showing 40 changed files with 687 additions and 573 deletions.
11 changes: 9 additions & 2 deletions go/cmd/vtgateclienttest/services/callerid.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,11 +75,18 @@ func (c *callerIDClient) checkCallerID(ctx context.Context, received string) (bo
return true, fmt.Errorf("SUCCESS: callerid matches")
}

func (c *callerIDClient) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, *sqltypes.Result, error) {
func (c *callerIDClient) Execute(
ctx context.Context,
mysqlCtx vtgateservice.MySQLConnection,
session *vtgatepb.Session,
sql string,
bindVariables map[string]*querypb.BindVariable,
prepared bool,
) (*vtgatepb.Session, *sqltypes.Result, error) {
if ok, err := c.checkCallerID(ctx, sql); ok {
return session, nil, err
}
return c.fallbackClient.Execute(ctx, mysqlCtx, session, sql, bindVariables)
return c.fallbackClient.Execute(ctx, mysqlCtx, session, sql, bindVariables, prepared)
}

func (c *callerIDClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, sqlList []string, bindVariablesList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) {
Expand Down
11 changes: 9 additions & 2 deletions go/cmd/vtgateclienttest/services/echo.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,7 +98,14 @@ func echoQueryResult(vals map[string]any) *sqltypes.Result {
return qr
}

func (c *echoClient) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, *sqltypes.Result, error) {
func (c *echoClient) Execute(
ctx context.Context,
mysqlCtx vtgateservice.MySQLConnection,
session *vtgatepb.Session,
sql string,
bindVariables map[string]*querypb.BindVariable,
prepared bool,
) (*vtgatepb.Session, *sqltypes.Result, error) {
if strings.HasPrefix(sql, EchoPrefix) {
return session, echoQueryResult(map[string]any{
"callerId": callerid.EffectiveCallerIDFromContext(ctx),
Expand All @@ -107,7 +114,7 @@ func (c *echoClient) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLCo
"session": session,
}), nil
}
return c.fallbackClient.Execute(ctx, mysqlCtx, session, sql, bindVariables)
return c.fallbackClient.Execute(ctx, mysqlCtx, session, sql, bindVariables, prepared)
}

func (c *echoClient) StreamExecute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable, callback func(*sqltypes.Result) error) (*vtgatepb.Session, error) {
Expand Down
11 changes: 9 additions & 2 deletions go/cmd/vtgateclienttest/services/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,14 +110,21 @@ func trimmedRequestToError(received string) error {
}
}

func (c *errorClient) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, *sqltypes.Result, error) {
func (c *errorClient) Execute(
ctx context.Context,
mysqlCtx vtgateservice.MySQLConnection,
session *vtgatepb.Session,
sql string,
bindVariables map[string]*querypb.BindVariable,
prepared bool,
) (*vtgatepb.Session, *sqltypes.Result, error) {
if err := requestToPartialError(sql, session); err != nil {
return session, nil, err
}
if err := requestToError(sql); err != nil {
return session, nil, err
}
return c.fallbackClient.Execute(ctx, mysqlCtx, session, sql, bindVariables)
return c.fallbackClient.Execute(ctx, mysqlCtx, session, sql, bindVariables, prepared)
}

func (c *errorClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, sqlList []string, bindVariablesList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) {
Expand Down
11 changes: 9 additions & 2 deletions go/cmd/vtgateclienttest/services/fallback.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,15 @@ func newFallbackClient(fallback vtgateservice.VTGateService) fallbackClient {
return fallbackClient{fallback: fallback}
}

func (c fallbackClient) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, *sqltypes.Result, error) {
return c.fallback.Execute(ctx, mysqlCtx, session, sql, bindVariables)
func (c fallbackClient) Execute(
ctx context.Context,
mysqlCtx vtgateservice.MySQLConnection,
session *vtgatepb.Session,
sql string,
bindVariables map[string]*querypb.BindVariable,
prepared bool,
) (*vtgatepb.Session, *sqltypes.Result, error) {
return c.fallback.Execute(ctx, mysqlCtx, session, sql, bindVariables, prepared)
}

func (c fallbackClient) ExecuteBatch(ctx context.Context, session *vtgatepb.Session, sqlList []string, bindVariablesList []map[string]*querypb.BindVariable) (*vtgatepb.Session, []sqltypes.QueryResponse, error) {
Expand Down
9 changes: 8 additions & 1 deletion go/cmd/vtgateclienttest/services/terminal.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,14 @@ func newTerminalClient() *terminalClient {
return &terminalClient{}
}

func (c *terminalClient) Execute(ctx context.Context, mysqlCtx vtgateservice.MySQLConnection, session *vtgatepb.Session, sql string, bindVariables map[string]*querypb.BindVariable) (*vtgatepb.Session, *sqltypes.Result, error) {
func (c *terminalClient) Execute(
ctx context.Context,
mysqlCtx vtgateservice.MySQLConnection,
session *vtgatepb.Session,
sql string,
bindVariables map[string]*querypb.BindVariable,
prepared bool,
) (*vtgatepb.Session, *sqltypes.Result, error) {
if sql == "quit://" {
log.Fatal("Received quit:// query. Going down.")
}
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/cluster_process.go
Original file line number Diff line number Diff line change
Expand Up @@ -958,7 +958,7 @@ func (cluster *LocalProcessCluster) ExecOnVTGate(ctx context.Context, addr strin
session := conn.Session(target, opts)
defer conn.Close()

return session.Execute(ctx, sql, bindvars)
return session.Execute(ctx, sql, bindvars, false)
}

// StreamTabletHealth invokes a HealthStream on a local cluster Vttablet and
Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/cluster/cluster_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,7 +275,7 @@ func positionAtLeast(t *testing.T, tablet *Vttablet, a string, b string) bool {

// ExecuteQueriesUsingVtgate sends query to vtgate using vtgate session.
func ExecuteQueriesUsingVtgate(t *testing.T, session *vtgateconn.VTGateSession, query string) {
_, err := session.Execute(context.Background(), query, nil)
_, err := session.Execute(context.Background(), query, nil, false)
require.Nil(t, err)
}

Expand Down
2 changes: 1 addition & 1 deletion go/test/endtoend/recovery/recovery_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ var (

// VerifyQueriesUsingVtgate verifies queries using vtgate.
func VerifyQueriesUsingVtgate(t *testing.T, session *vtgateconn.VTGateSession, query string, value string) {
qr, err := session.Execute(context.Background(), query, nil)
qr, err := session.Execute(context.Background(), query, nil, false)
require.Nil(t, err)
assert.Equal(t, value, fmt.Sprintf("%v", qr.Rows[0][0]))
}
Expand Down
Loading

0 comments on commit 530366f

Please sign in to comment.