From f5ee299c19fd51e26436110c35178a7c7ce38af1 Mon Sep 17 00:00:00 2001 From: Florent Poinsard Date: Thu, 20 Feb 2025 14:21:13 -0600 Subject: [PATCH] wrap grpc errors in wrappedService Signed-off-by: Florent Poinsard --- go/vt/vtgate/executor.go | 3 + go/vt/vttablet/grpctabletconn/conn.go | 38 ++++----- go/vt/vttablet/queryservice/wrapped.go | 55 +++++++++---- go/vt/vttablet/tabletconn/grpc_error.go | 26 ------- go/vt/vttablet/tabletconn/grpc_error_test.go | 82 -------------------- 5 files changed, 60 insertions(+), 144 deletions(-) diff --git a/go/vt/vtgate/executor.go b/go/vt/vtgate/executor.go index 69913b0ec31..a998b19f36a 100644 --- a/go/vt/vtgate/executor.go +++ b/go/vt/vtgate/executor.go @@ -350,6 +350,9 @@ func (e *Executor) StreamExecute( // Check if there was partial DML execution. If so, rollback the effect of the partially executed query. if err != nil { + if safeSession.InTransaction() && e.rollbackOnFatalTxError(ctx, safeSession, err) { + return err + } if !canReturnRows(plan.Type) { return e.rollbackExecIfNeeded(ctx, safeSession, bindVars, logStats, err) } diff --git a/go/vt/vttablet/grpctabletconn/conn.go b/go/vt/vttablet/grpctabletconn/conn.go index 1348098eb26..d2d5604d808 100644 --- a/go/vt/vttablet/grpctabletconn/conn.go +++ b/go/vt/vttablet/grpctabletconn/conn.go @@ -132,7 +132,7 @@ func (conn *gRPCQueryClient) Execute(ctx context.Context, target *querypb.Target } er, err := conn.c.Execute(ctx, req) if err != nil { - return nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0) + return nil, tabletconn.ErrorFromGRPC(err) } return sqltypes.Proto3ToResult(er.Result), nil } @@ -171,7 +171,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb. } stream, err := conn.c.StreamExecute(ctx, req) if err != nil { - return nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0) + return nil, tabletconn.ErrorFromGRPC(err) } return stream, nil }() @@ -182,7 +182,7 @@ func (conn *gRPCQueryClient) StreamExecute(ctx context.Context, target *querypb. for { ser, err := stream.Recv() if err != nil { - return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0) + return tabletconn.ErrorFromGRPC(err) } if fields == nil { fields = ser.Result.Fields @@ -212,7 +212,7 @@ func (conn *gRPCQueryClient) Begin(ctx context.Context, target *querypb.Target, } br, err := conn.c.Begin(ctx, req) if err != nil { - return state, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, true) + return state, tabletconn.ErrorFromGRPC(err) } state.TransactionID = br.TransactionId state.TabletAlias = br.TabletAlias @@ -236,7 +236,7 @@ func (conn *gRPCQueryClient) Commit(ctx context.Context, target *querypb.Target, } resp, err := conn.c.Commit(ctx, req) if err != nil { - return 0, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0) + return 0, tabletconn.ErrorFromGRPC(err) } return resp.ReservedId, nil } @@ -257,7 +257,7 @@ func (conn *gRPCQueryClient) Rollback(ctx context.Context, target *querypb.Targe } resp, err := conn.c.Rollback(ctx, req) if err != nil { - return 0, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0) + return 0, tabletconn.ErrorFromGRPC(err) } return resp.ReservedId, nil } @@ -279,7 +279,7 @@ func (conn *gRPCQueryClient) Prepare(ctx context.Context, target *querypb.Target } _, err := conn.c.Prepare(ctx, req) if err != nil { - return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0) + return tabletconn.ErrorFromGRPC(err) } return nil } @@ -300,7 +300,7 @@ func (conn *gRPCQueryClient) CommitPrepared(ctx context.Context, target *querypb } _, err := conn.c.CommitPrepared(ctx, req) if err != nil { - return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "") + return tabletconn.ErrorFromGRPC(err) } return nil } @@ -322,7 +322,7 @@ func (conn *gRPCQueryClient) RollbackPrepared(ctx context.Context, target *query } _, err := conn.c.RollbackPrepared(ctx, req) if err != nil { - return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "") + return tabletconn.ErrorFromGRPC(err) } return nil } @@ -344,7 +344,7 @@ func (conn *gRPCQueryClient) CreateTransaction(ctx context.Context, target *quer } _, err := conn.c.CreateTransaction(ctx, req) if err != nil { - return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "") + return tabletconn.ErrorFromGRPC(err) } return nil } @@ -367,7 +367,7 @@ func (conn *gRPCQueryClient) StartCommit(ctx context.Context, target *querypb.Ta Dtid: dtid, } resp, err := conn.c.StartCommit(ctx, req) - err = tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0) + err = tabletconn.ErrorFromGRPC(err) if resp != nil { return resp.State, err } @@ -392,7 +392,7 @@ func (conn *gRPCQueryClient) SetRollback(ctx context.Context, target *querypb.Ta } _, err := conn.c.SetRollback(ctx, req) if err != nil { - return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, transactionID != 0) + return tabletconn.ErrorFromGRPC(err) } return nil } @@ -414,7 +414,7 @@ func (conn *gRPCQueryClient) ConcludeTransaction(ctx context.Context, target *qu } _, err := conn.c.ConcludeTransaction(ctx, req) if err != nil { - return tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "") + return tabletconn.ErrorFromGRPC(err) } return nil } @@ -435,7 +435,7 @@ func (conn *gRPCQueryClient) ReadTransaction(ctx context.Context, target *queryp } response, err := conn.c.ReadTransaction(ctx, req) if err != nil { - return nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, dtid != "") + return nil, tabletconn.ErrorFromGRPC(err) } return response.Metadata, nil } @@ -483,13 +483,13 @@ func (conn *gRPCQueryClient) BeginExecute(ctx context.Context, target *querypb.T } reply, err := conn.c.BeginExecute(ctx, req) if err != nil { - return state, nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, true) + return state, nil, tabletconn.ErrorFromGRPC(err) } state.TransactionID = reply.TransactionId state.TabletAlias = conn.tablet.Alias state.SessionStateChanges = reply.SessionStateChanges if reply.Error != nil { - return state, nil, tabletconn.ErrorsFromVTRPCWrapTransientTxError(reply.Error, reply.TransactionId != 0) + return state, nil, tabletconn.ErrorFromVTRPC(reply.Error) } return state, sqltypes.Proto3ToResult(reply.Result), nil } @@ -527,7 +527,7 @@ func (conn *gRPCQueryClient) BeginStreamExecute(ctx context.Context, target *que } stream, err := conn.c.BeginStreamExecute(ctx, req) if err != nil { - return nil, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, true) + return nil, tabletconn.ErrorFromGRPC(err) } return stream, nil }() @@ -548,11 +548,11 @@ func (conn *gRPCQueryClient) BeginStreamExecute(ctx context.Context, target *que } if err != nil { - return state, tabletconn.ErrorsFromGRPCWrapTransientTxError(err, state.TransactionID != 0) + return state, tabletconn.ErrorFromGRPC(err) } if ser.Error != nil { - return state, tabletconn.ErrorsFromVTRPCWrapTransientTxError(ser.Error, state.TransactionID != 0) + return state, tabletconn.ErrorFromVTRPC(ser.Error) } // The last stream receive will not have a result, so callback will not be called for it. diff --git a/go/vt/vttablet/queryservice/wrapped.go b/go/vt/vttablet/queryservice/wrapped.go index c72a472a5cb..aa19f911870 100644 --- a/go/vt/vttablet/queryservice/wrapped.go +++ b/go/vt/vttablet/queryservice/wrapped.go @@ -18,13 +18,13 @@ package queryservice import ( "context" + "strings" "vitess.io/vitess/go/sqltypes" - "vitess.io/vitess/go/vt/vterrors" - binlogdatapb "vitess.io/vitess/go/vt/proto/binlogdata" querypb "vitess.io/vitess/go/vt/proto/query" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" + "vitess.io/vitess/go/vt/vterrors" ) var _ QueryService = &wrappedService{} @@ -76,6 +76,20 @@ func canRetry(ctx context.Context, err error) bool { return false } +// wrapInVT15001 will wrap the given error if we are in a transaction and the connection was refused. +// This situation often means that there is a transient error where the client should re-try the entire transaction. +func wrapInVT15001(err error, inTx bool) error { + if err == nil || !inTx { + return err + } + c := vterrors.Code(err) + m := err.Error() + if c != vtrpcpb.Code_UNAVAILABLE && !strings.Contains(m, "connection refused") { + return err + } + return vterrors.VT15001(c, m) +} + // wrappedService wraps an existing QueryService with // a decorator function. type wrappedService struct { @@ -89,7 +103,7 @@ func (ws *wrappedService) Begin(ctx context.Context, target *querypb.Target, opt state, innerErr = conn.Begin(ctx, target, options) return canRetry(ctx, innerErr), innerErr }) - return state, err + return state, wrapInVT15001(err, true) } func (ws *wrappedService) Commit(ctx context.Context, target *querypb.Target, transactionID int64) (int64, error) { @@ -100,7 +114,7 @@ func (ws *wrappedService) Commit(ctx context.Context, target *querypb.Target, tr return canRetry(ctx, innerErr), innerErr }) if err != nil { - return 0, err + return 0, wrapInVT15001(err, transactionID != 0) } return rID, nil } @@ -113,37 +127,41 @@ func (ws *wrappedService) Rollback(ctx context.Context, target *querypb.Target, return canRetry(ctx, innerErr), innerErr }) if err != nil { - return 0, err + return 0, wrapInVT15001(err, transactionID != 0) } return rID, nil } func (ws *wrappedService) Prepare(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) error { - return ws.wrapper(ctx, target, ws.impl, "Prepare", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { + err := ws.wrapper(ctx, target, ws.impl, "Prepare", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { innerErr := conn.Prepare(ctx, target, transactionID, dtid) return canRetry(ctx, innerErr), innerErr }) + return wrapInVT15001(err, transactionID != 0) } func (ws *wrappedService) CommitPrepared(ctx context.Context, target *querypb.Target, dtid string) (err error) { - return ws.wrapper(ctx, target, ws.impl, "CommitPrepared", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { + err = ws.wrapper(ctx, target, ws.impl, "CommitPrepared", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { innerErr := conn.CommitPrepared(ctx, target, dtid) return canRetry(ctx, innerErr), innerErr }) + return wrapInVT15001(err, dtid != "") } func (ws *wrappedService) RollbackPrepared(ctx context.Context, target *querypb.Target, dtid string, originalID int64) (err error) { - return ws.wrapper(ctx, target, ws.impl, "RollbackPrepared", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { + err = ws.wrapper(ctx, target, ws.impl, "RollbackPrepared", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { innerErr := conn.RollbackPrepared(ctx, target, dtid, originalID) return canRetry(ctx, innerErr), innerErr }) + return wrapInVT15001(err, dtid != "") } func (ws *wrappedService) CreateTransaction(ctx context.Context, target *querypb.Target, dtid string, participants []*querypb.Target) (err error) { - return ws.wrapper(ctx, target, ws.impl, "CreateTransaction", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { + err = ws.wrapper(ctx, target, ws.impl, "CreateTransaction", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { innerErr := conn.CreateTransaction(ctx, target, dtid, participants) return canRetry(ctx, innerErr), innerErr }) + return wrapInVT15001(err, dtid != "") } func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Target, transactionID int64, dtid string) (state querypb.StartCommitState, err error) { @@ -152,21 +170,23 @@ func (ws *wrappedService) StartCommit(ctx context.Context, target *querypb.Targe state, innerErr = conn.StartCommit(ctx, target, transactionID, dtid) return canRetry(ctx, innerErr), innerErr }) - return state, err + return state, wrapInVT15001(err, transactionID != 0) } func (ws *wrappedService) SetRollback(ctx context.Context, target *querypb.Target, dtid string, transactionID int64) (err error) { - return ws.wrapper(ctx, target, ws.impl, "SetRollback", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { + err = ws.wrapper(ctx, target, ws.impl, "SetRollback", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { innerErr := conn.SetRollback(ctx, target, dtid, transactionID) return canRetry(ctx, innerErr), innerErr }) + return wrapInVT15001(err, transactionID != 0) } func (ws *wrappedService) ConcludeTransaction(ctx context.Context, target *querypb.Target, dtid string) (err error) { - return ws.wrapper(ctx, target, ws.impl, "ConcludeTransaction", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { + err = ws.wrapper(ctx, target, ws.impl, "ConcludeTransaction", true, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { innerErr := conn.ConcludeTransaction(ctx, target, dtid) return canRetry(ctx, innerErr), innerErr }) + return wrapInVT15001(err, dtid != "") } func (ws *wrappedService) ReadTransaction(ctx context.Context, target *querypb.Target, dtid string) (metadata *querypb.TransactionMetadata, err error) { @@ -175,7 +195,7 @@ func (ws *wrappedService) ReadTransaction(ctx context.Context, target *querypb.T metadata, innerErr = conn.ReadTransaction(ctx, target, dtid) return canRetry(ctx, innerErr), innerErr }) - return metadata, err + return metadata, wrapInVT15001(err, dtid != "") } func (ws *wrappedService) UnresolvedTransactions(ctx context.Context, target *querypb.Target, abandonAgeSeconds int64) (transactions []*querypb.TransactionMetadata, err error) { @@ -196,13 +216,13 @@ func (ws *wrappedService) Execute(ctx context.Context, target *querypb.Target, q retryable := canRetry(ctx, innerErr) && (!inDedicatedConn) return retryable, innerErr }) - return qr, err + return qr, wrapInVT15001(err, transactionID != 0) } // StreamExecute implements the QueryService interface func (ws *wrappedService) StreamExecute(ctx context.Context, target *querypb.Target, query string, bindVars map[string]*querypb.BindVariable, transactionID int64, reservedID int64, options *querypb.ExecuteOptions, callback func(*sqltypes.Result) error) error { inDedicatedConn := transactionID != 0 || reservedID != 0 - return ws.wrapper(ctx, target, ws.impl, "StreamExecute", inDedicatedConn, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { + err := ws.wrapper(ctx, target, ws.impl, "StreamExecute", inDedicatedConn, func(ctx context.Context, target *querypb.Target, conn QueryService) (bool, error) { streamingStarted := false innerErr := conn.StreamExecute(ctx, target, query, bindVars, transactionID, reservedID, options, func(qr *sqltypes.Result) error { streamingStarted = true @@ -212,6 +232,7 @@ func (ws *wrappedService) StreamExecute(ctx context.Context, target *querypb.Tar retryable := canRetry(ctx, innerErr) && (!streamingStarted) return retryable, innerErr }) + return wrapInVT15001(err, transactionID != 0) } func (ws *wrappedService) BeginExecute(ctx context.Context, target *querypb.Target, preQueries []string, query string, bindVars map[string]*querypb.BindVariable, reservedID int64, options *querypb.ExecuteOptions) (state TransactionState, qr *sqltypes.Result, err error) { @@ -221,7 +242,7 @@ func (ws *wrappedService) BeginExecute(ctx context.Context, target *querypb.Targ state, qr, innerErr = conn.BeginExecute(ctx, target, preQueries, query, bindVars, reservedID, options) return canRetry(ctx, innerErr) && !inDedicatedConn, innerErr }) - return state, qr, err + return state, qr, wrapInVT15001(err, true) } // BeginStreamExecute implements the QueryService interface @@ -232,7 +253,7 @@ func (ws *wrappedService) BeginStreamExecute(ctx context.Context, target *queryp state, innerErr = conn.BeginStreamExecute(ctx, target, preQueries, query, bindVars, reservedID, options, callback) return canRetry(ctx, innerErr) && !inDedicatedConn, innerErr }) - return state, err + return state, wrapInVT15001(err, true) } func (ws *wrappedService) MessageStream(ctx context.Context, target *querypb.Target, name string, callback func(*sqltypes.Result) error) error { diff --git a/go/vt/vttablet/tabletconn/grpc_error.go b/go/vt/vttablet/tabletconn/grpc_error.go index 6309857d709..98e10948f6b 100644 --- a/go/vt/vttablet/tabletconn/grpc_error.go +++ b/go/vt/vttablet/tabletconn/grpc_error.go @@ -18,7 +18,6 @@ package tabletconn import ( "io" - "strings" "google.golang.org/grpc/codes" "google.golang.org/grpc/status" @@ -42,15 +41,6 @@ func ErrorFromGRPC(err error) error { return vterrors.Errorf(vtrpcpb.Code(code), "vttablet: %v", err) } -func ErrorsFromGRPCWrapTransientTxError(err error, inTx bool) error { - err = ErrorFromGRPC(err) - if err == nil || !inTx { - return err - } - - return ifConnectionRefusedWrapInVT15001(err) -} - // ErrorFromVTRPC converts a *vtrpcpb.RPCError to vtError for // tabletserver calls. func ErrorFromVTRPC(err *vtrpcpb.RPCError) error { @@ -59,19 +49,3 @@ func ErrorFromVTRPC(err *vtrpcpb.RPCError) error { } return vterrors.Errorf(err.Code, "vttablet: %s", err.Message) } - -func ErrorsFromVTRPCWrapTransientTxError(err *vtrpcpb.RPCError, inTx bool) error { - nerr := ErrorFromVTRPC(err) - if nerr == nil || !inTx { - return nerr - } - - return ifConnectionRefusedWrapInVT15001(nerr) -} - -func ifConnectionRefusedWrapInVT15001(err error) error { - if vterrors.Code(err) == vtrpcpb.Code_UNAVAILABLE && strings.Contains(err.Error(), "connection refused") { - return vterrors.VT15001(vtrpcpb.Code_UNAVAILABLE, err.Error()) - } - return err -} diff --git a/go/vt/vttablet/tabletconn/grpc_error_test.go b/go/vt/vttablet/tabletconn/grpc_error_test.go index 8bdc09d0e2d..82fe4c8cf1d 100644 --- a/go/vt/vttablet/tabletconn/grpc_error_test.go +++ b/go/vt/vttablet/tabletconn/grpc_error_test.go @@ -20,8 +20,6 @@ import ( "testing" "github.com/stretchr/testify/require" - "google.golang.org/grpc/codes" - "google.golang.org/grpc/status" vtrpcpb "vitess.io/vitess/go/vt/proto/vtrpc" "vitess.io/vitess/go/vt/vterrors" @@ -55,83 +53,3 @@ func TestTabletErrorFromRPCError(t *testing.T) { require.Equal(t, tcase.want, got) } } - -func TestGRPCErrorToVT15001(t *testing.T) { - msg := "connection error: desc = \"transport: Error while dialing: dial tcp 127.0.0.1:7108: connect: connection refused\"" - testcases := []struct { - in error - inTx bool - wrap bool - }{{ - in: status.Error(codes.Unavailable, msg), - inTx: true, - wrap: true, - }, { - in: status.Error(codes.Unavailable, msg), - inTx: false, - wrap: false, - }, { - in: status.Error(codes.Unavailable, "unavailable"), - inTx: true, - wrap: false, - }, { - in: status.Error(codes.FailedPrecondition, msg), - inTx: true, - wrap: false, - }} - for _, tcase := range testcases { - got := ErrorsFromGRPCWrapTransientTxError(tcase.in, tcase.inTx) - require.Error(t, got) - if tcase.wrap { - require.ErrorContains(t, got, "VT15001") - } else { - require.NotContains(t, got.Error(), "VT15001") - } - } -} - -func TestVTRPCErrorToVT15001(t *testing.T) { - msg := "connection error: desc = \"transport: Error while dialing: dial tcp 127.0.0.1:7108: connect: connection refused\"" - testcases := []struct { - in *vtrpcpb.RPCError - inTx bool - wrap bool - }{{ - in: &vtrpcpb.RPCError{ - Code: vtrpcpb.Code_UNAVAILABLE, - Message: msg, - }, - inTx: true, - wrap: true, - }, { - in: &vtrpcpb.RPCError{ - Code: vtrpcpb.Code_UNAVAILABLE, - Message: msg, - }, - inTx: false, - wrap: false, - }, { - in: &vtrpcpb.RPCError{ - Code: vtrpcpb.Code_UNAVAILABLE, - Message: "unavailable", - }, - inTx: true, - wrap: false, - }, { - in: &vtrpcpb.RPCError{ - Code: vtrpcpb.Code_FAILED_PRECONDITION, - Message: msg, - }, - inTx: true, - wrap: false, - }} - for _, tcase := range testcases { - got := ErrorsFromVTRPCWrapTransientTxError(tcase.in, tcase.inTx) - require.Error(t, got) - if tcase.wrap { - require.ErrorContains(t, got, "VT15001") - } else { - require.NotContains(t, got.Error(), "VT15001") - } - } -}