Skip to content

Commit

Permalink
wrap grpc errors in wrappedService
Browse files Browse the repository at this point in the history
Signed-off-by: Florent Poinsard <florent.poinsard@outlook.fr>
  • Loading branch information
frouioui committed Feb 20, 2025
1 parent a43a4d4 commit f5ee299
Show file tree
Hide file tree
Showing 5 changed files with 60 additions and 144 deletions.
3 changes: 3 additions & 0 deletions go/vt/vtgate/executor.go
Original file line number Diff line number Diff line change
Expand Up @@ -350,6 +350,9 @@ func (e *Executor) StreamExecute(

// Check if there was partial DML execution. If so, rollback the effect of the partially executed query.
if err != nil {
if safeSession.InTransaction() && e.rollbackOnFatalTxError(ctx, safeSession, err) {
return err
}
if !canReturnRows(plan.Type) {
return e.rollbackExecIfNeeded(ctx, safeSession, bindVars, logStats, err)
}
Expand Down
38 changes: 19 additions & 19 deletions go/vt/vttablet/grpctabletconn/conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,7 @@ func (conn *gRPCQueryClient) Execute(ctx context.Context, target *querypb.Target
}
er, err := conn.c.Execute(ctx, req)
if err != nil {
return nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0)
return nil, tabletconn.ErrorFromGRPC(err)
}
return sqltypes.Proto3ToResult(er.Result), nil
}
Expand Down Expand Up @@ -171,7 +171,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb.
}
stream, err := conn.c.StreamExecute(ctx, req)
if err != nil {
return nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0)
return nil, tabletconn.ErrorFromGRPC(err)
}
return stream, nil
}()
Expand All @@ -182,7 +182,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb.
for {
ser, err := stream.Recv()
if err != nil {
return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0)
return tabletconn.ErrorFromGRPC(err)
}
if fields == nil {
fields = ser.Result.Fields
Expand Down Expand Up @@ -212,7 +212,7 @@ func (conn *gRPCQueryClient) Begin(ctx context.Context, target *querypb.Target,
}
br, err := conn.c.Begin(ctx, req)
if err != nil {
return state, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, true)
return state, tabletconn.ErrorFromGRPC(err)
}
state.TransactionID = br.TransactionId
state.TabletAlias = br.TabletAlias
Expand All @@ -236,7 +236,7 @@ func (conn *gRPCQueryClient) Commit(ctx context.Context, target *querypb.Target,
}
resp, err := conn.c.Commit(ctx, req)
if err != nil {
return 0, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0)
return 0, tabletconn.ErrorFromGRPC(err)
}
return resp.ReservedId, nil
}
Expand All @@ -257,7 +257,7 @@ func (conn *gRPCQueryClient) Rollback(ctx context.Context, target *querypb.Targe
}
resp, err := conn.c.Rollback(ctx, req)
if err != nil {
return 0, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0)
return 0, tabletconn.ErrorFromGRPC(err)
}
return resp.ReservedId, nil
}
Expand All @@ -279,7 +279,7 @@ func (conn *gRPCQueryClient) Prepare(ctx context.Context, target *querypb.Target
}
_, err := conn.c.Prepare(ctx, req)
if err != nil {
return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0)
return tabletconn.ErrorFromGRPC(err)
}
return nil
}
Expand All @@ -300,7 +300,7 @@ func (conn *gRPCQueryClient) CommitPrepared(ctx context.Context, target *querypb
}
_, err := conn.c.CommitPrepared(ctx, req)
if err != nil {
return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "")
return tabletconn.ErrorFromGRPC(err)
}
return nil
}
Expand All @@ -322,7 +322,7 @@ func (conn *gRPCQueryClient) RollbackPrepared(ctx context.Context, target *query
}
_, err := conn.c.RollbackPrepared(ctx, req)
if err != nil {
return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "")
return tabletconn.ErrorFromGRPC(err)
}
return nil
}
Expand All @@ -344,7 +344,7 @@ func (conn *gRPCQueryClient) CreateTransaction(ctx context.Context, target *quer
}
_, err := conn.c.CreateTransaction(ctx, req)
if err != nil {
return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "")
return tabletconn.ErrorFromGRPC(err)
}
return nil
}
Expand All @@ -367,7 +367,7 @@ func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Ta
Dtid: dtid,
}
resp, err := conn.c.StartCommit(ctx, req)
err = tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0)
err = tabletconn.ErrorFromGRPC(err)
if resp != nil {
return resp.State, err
}
Expand All @@ -392,7 +392,7 @@ func (conn *gRPCQueryClient) SetRollback(ctx context.Context, target *querypb.Ta
}
_, err := conn.c.SetRollback(ctx, req)
if err != nil {
return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0)
return tabletconn.ErrorFromGRPC(err)
}
return nil
}
Expand All @@ -414,7 +414,7 @@ func (conn *gRPCQueryClient) ConcludeTransaction(ctx context.Context, target *qu
}
_, err := conn.c.ConcludeTransaction(ctx, req)
if err != nil {
return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "")
return tabletconn.ErrorFromGRPC(err)
}
return nil
}
Expand All @@ -435,7 +435,7 @@ func (conn *gRPCQueryClient) ReadTransaction(ctx context.Context, target *queryp
}
response, err := conn.c.ReadTransaction(ctx, req)
if err != nil {
return nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "")
return nil, tabletconn.ErrorFromGRPC(err)
}
return response.Metadata, nil
}
Expand Down Expand Up @@ -483,13 +483,13 @@ func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.T
}
reply, err := conn.c.BeginExecute(ctx, req)
if err != nil {
return state, nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, true)
return state, nil, tabletconn.ErrorFromGRPC(err)
}
state.TransactionID = reply.TransactionId
state.TabletAlias = conn.tablet.Alias
state.SessionStateChanges = reply.SessionStateChanges
if reply.Error != nil {
return state, nil, tabletconn.ErrorsFromVTRPCWrapTransientTxError(reply.Error, reply.TransactionId != 0)
return state, nil, tabletconn.ErrorFromVTRPC(reply.Error)
}
return state, sqltypes.Proto3ToResult(reply.Result), nil
}
Expand Down Expand Up @@ -527,7 +527,7 @@ func (conn *gRPCQueryClient) BeginStreamExecute(ctx context.Context, target *que
}
stream, err := conn.c.BeginStreamExecute(ctx, req)
if err != nil {
return nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, true)
return nil, tabletconn.ErrorFromGRPC(err)
}
return stream, nil
}()
Expand All @@ -548,11 +548,11 @@ func (conn *gRPCQueryClient) BeginStreamExecute(ctx context.Context, target *que
}

if err != nil {
return state, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, state.TransactionID != 0)
return state, tabletconn.ErrorFromGRPC(err)
}

if ser.Error != nil {
return state, tabletconn.ErrorsFromVTRPCWrapTransientTxError(ser.Error, state.TransactionID != 0)
return state, tabletconn.ErrorFromVTRPC(ser.Error)
}

// The last stream receive will not have a result, so callback will not be called for it.
Expand Down
55 changes: 38 additions & 17 deletions go/vt/vttablet/queryservice/wrapped.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,13 +18,13 @@ package queryservice

import (
"context"
"strings"

"vitess.io/vitess/go/sqltypes"
"vitess.io/vitess/go/vt/vterrors"

binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata"
querypb "vitess.io/vitess/go/vt/proto/query"
vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc"
"vitess.io/vitess/go/vt/vterrors"
)

var _ QueryService = &wrappedService{}
Expand Down Expand Up @@ -76,6 +76,20 @@ func canRetry(ctx context.Context, err error) bool {
return false
}

// wrapInVT15001 will wrap the given error if we are in a transaction and the connection was refused.
// This situation often means that there is a transient error where the client should re-try the entire transaction.
func wrapInVT15001(err error, inTx bool) error {
if err == nil || !inTx {
return err
}
c := vterrors.Code(err)
m := err.Error()
if c != vtrpcpb.Code_UNAVAILABLE && !strings.Contains(m, "connection refused") {
return err
}
return vterrors.VT15001(c, m)
}

// wrappedService wraps an existing QueryService with
// a decorator function.
type wrappedService struct {
Expand All @@ -89,7 +103,7 @@ func (ws *wrappedService) Begin(ctx context.Context, target *querypb.Target, opt
state, innerErr = conn.Begin(ctx, target, options)
return canRetry(ctx, innerErr), innerErr
})
return state, err
return state, wrapInVT15001(err, true)
}

func (ws *wrappedService) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (int64, error) {
Expand All @@ -100,7 +114,7 @@ func (ws *wrappedService) Commit(ctx context.Context, target *querypb.Target, tr
return canRetry(ctx, innerErr), innerErr
})
if err != nil {
return 0, err
return 0, wrapInVT15001(err, transactionID != 0)
}
return rID, nil
}
Expand All @@ -113,37 +127,41 @@ func (ws *wrappedService) Rollback(ctx context.Context, target *querypb.Target,
return canRetry(ctx, innerErr), innerErr
})
if err != nil {
return 0, err
return 0, wrapInVT15001(err, transactionID != 0)
}
return rID, nil
}

func (ws *wrappedService) Prepare(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) error {
return ws.wrapper(ctx, target, ws.impl, "Prepare", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
err := ws.wrapper(ctx, target, ws.impl, "Prepare", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.Prepare(ctx, target, transactionID, dtid)
return canRetry(ctx, innerErr), innerErr
})
return wrapInVT15001(err, transactionID != 0)
}

func (ws *wrappedService) CommitPrepared(ctx context.Context, target *querypb.Target, dtid string) (err error) {
return ws.wrapper(ctx, target, ws.impl, "CommitPrepared", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
err = ws.wrapper(ctx, target, ws.impl, "CommitPrepared", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.CommitPrepared(ctx, target, dtid)
return canRetry(ctx, innerErr), innerErr
})
return wrapInVT15001(err, dtid != "")
}

func (ws *wrappedService) RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) (err error) {
return ws.wrapper(ctx, target, ws.impl, "RollbackPrepared", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
err = ws.wrapper(ctx, target, ws.impl, "RollbackPrepared", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.RollbackPrepared(ctx, target, dtid, originalID)
return canRetry(ctx, innerErr), innerErr
})
return wrapInVT15001(err, dtid != "")
}

func (ws *wrappedService) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) {
return ws.wrapper(ctx, target, ws.impl, "CreateTransaction", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
err = ws.wrapper(ctx, target, ws.impl, "CreateTransaction", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.CreateTransaction(ctx, target, dtid, participants)
return canRetry(ctx, innerErr), innerErr
})
return wrapInVT15001(err, dtid != "")
}

func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) {
Expand All @@ -152,21 +170,23 @@ func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Targe
state, innerErr = conn.StartCommit(ctx, target, transactionID, dtid)
return canRetry(ctx, innerErr), innerErr
})
return state, err
return state, wrapInVT15001(err, transactionID != 0)
}

func (ws *wrappedService) SetRollback(ctx context.Context, target *querypb.Target, dtid string, transactionID int64) (err error) {
return ws.wrapper(ctx, target, ws.impl, "SetRollback", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
err = ws.wrapper(ctx, target, ws.impl, "SetRollback", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.SetRollback(ctx, target, dtid, transactionID)
return canRetry(ctx, innerErr), innerErr
})
return wrapInVT15001(err, transactionID != 0)
}

func (ws *wrappedService) ConcludeTransaction(ctx context.Context, target *querypb.Target, dtid string) (err error) {
return ws.wrapper(ctx, target, ws.impl, "ConcludeTransaction", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
err = ws.wrapper(ctx, target, ws.impl, "ConcludeTransaction", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
innerErr := conn.ConcludeTransaction(ctx, target, dtid)
return canRetry(ctx, innerErr), innerErr
})
return wrapInVT15001(err, dtid != "")
}

func (ws *wrappedService) ReadTransaction(ctx context.Context, target *querypb.Target, dtid string) (metadata *querypb.TransactionMetadata, err error) {
Expand All @@ -175,7 +195,7 @@ func (ws *wrappedService) ReadTransaction(ctx context.Context, target *querypb.T
metadata, innerErr = conn.ReadTransaction(ctx, target, dtid)
return canRetry(ctx, innerErr), innerErr
})
return metadata, err
return metadata, wrapInVT15001(err, dtid != "")
}

func (ws *wrappedService) UnresolvedTransactions(ctx context.Context, target *querypb.Target, abandonAgeSeconds int64) (transactions []*querypb.TransactionMetadata, err error) {
Expand All @@ -196,13 +216,13 @@ func (ws *wrappedService) Execute(ctx context.Context, target *querypb.Target, q
retryable := canRetry(ctx, innerErr) && (!inDedicatedConn)
return retryable, innerErr
})
return qr, err
return qr, wrapInVT15001(err, transactionID != 0)
}

// StreamExecute implements the QueryService interface
func (ws *wrappedService) StreamExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, transactionID int64, reservedID int64, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error) error {
inDedicatedConn := transactionID != 0 || reservedID != 0
return ws.wrapper(ctx, target, ws.impl, "StreamExecute", inDedicatedConn, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
err := ws.wrapper(ctx, target, ws.impl, "StreamExecute", inDedicatedConn, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) {
streamingStarted := false
innerErr := conn.StreamExecute(ctx, target, query, bindVars, transactionID, reservedID, options, func(qr *sqltypes.Result) error {
streamingStarted = true
Expand All @@ -212,6 +232,7 @@ func (ws *wrappedService) StreamExecute(ctx context.Context, target *querypb.Tar
retryable := canRetry(ctx, innerErr) && (!streamingStarted)
return retryable, innerErr
})
return wrapInVT15001(err, transactionID != 0)
}

func (ws *wrappedService) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (state TransactionState, qr *sqltypes.Result, err error) {
Expand All @@ -221,7 +242,7 @@ func (ws *wrappedService) BeginExecute(ctx context.Context, target *querypb.Targ
state, qr, innerErr = conn.BeginExecute(ctx, target, preQueries, query, bindVars, reservedID, options)
return canRetry(ctx, innerErr) && !inDedicatedConn, innerErr
})
return state, qr, err
return state, qr, wrapInVT15001(err, true)
}

// BeginStreamExecute implements the QueryService interface
Expand All @@ -232,7 +253,7 @@ func (ws *wrappedService) BeginStreamExecute(ctx context.Context, target *queryp
state, innerErr = conn.BeginStreamExecute(ctx, target, preQueries, query, bindVars, reservedID, options, callback)
return canRetry(ctx, innerErr) && !inDedicatedConn, innerErr
})
return state, err
return state, wrapInVT15001(err, true)
}

func (ws *wrappedService) MessageStream(ctx context.Context, target *querypb.Target, name string, callback func(*sqltypes.Result) error) error {
Expand Down
26 changes: 0 additions & 26 deletions go/vt/vttablet/tabletconn/grpc_error.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ package tabletconn

import (
"io"
"strings"

"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
Expand All @@ -42,15 +41,6 @@ func ErrorFromGRPC(err error) error {
return vterrors.Errorf(vtrpcpb.Code(code), "vttablet: %v", err)
}

func ErrorsFromGRPCWrapTransientTxError(err error, inTx bool) error {
err = ErrorFromGRPC(err)
if err == nil || !inTx {
return err
}

return ifConnectionRefusedWrapInVT15001(err)
}

// ErrorFromVTRPC converts a *vtrpcpb.RPCError to vtError for
// tabletserver calls.
func ErrorFromVTRPC(err *vtrpcpb.RPCError) error {
Expand All @@ -59,19 +49,3 @@ func ErrorFromVTRPC(err *vtrpcpb.RPCError) error {
}
return vterrors.Errorf(err.Code, "vttablet: %s", err.Message)
}

func ErrorsFromVTRPCWrapTransientTxError(err *vtrpcpb.RPCError, inTx bool) error {
nerr := ErrorFromVTRPC(err)
if nerr == nil || !inTx {
return nerr
}

return ifConnectionRefusedWrapInVT15001(nerr)
}

func ifConnectionRefusedWrapInVT15001(err error) error {
if vterrors.Code(err) == vtrpcpb.Code_UNAVAILABLE && strings.Contains(err.Error(), "connection refused") {
return vterrors.VT15001(vtrpcpb.Code_UNAVAILABLE, err.Error())
}
return err
}
Loading

0 comments on commit f5ee299

Please sign in to comment.