From 35f01bd0989c8f3aee77d48f36bbb0faff42977e Mon Sep 17 00:00:00 2001 From: Benjamin Wang Date: Sat, 25 Jan 2025 22:23:32 +0000 Subject: [PATCH] Update etcdserver and client to support DowngradeVersionTest request Signed-off-by: Benjamin Wang --- client/v3/maintenance.go | 12 +++++++++++- client/v3/retry.go | 4 ++++ server/etcdserver/api/v3rpc/maintenance.go | 19 +++++++++++++++++++ server/etcdserver/server.go | 6 +++++- server/etcdserver/v3_server.go | 10 ++++++++++ .../adapter/maintenance_client_adapter.go | 4 ++++ server/proxy/grpcproxy/maintenance.go | 4 ++++ 7 files changed, 57 insertions(+), 2 deletions(-) diff --git a/client/v3/maintenance.go b/client/v3/maintenance.go index 00aaacd15fdc..0abb28956e1c 100644 --- a/client/v3/maintenance.go +++ b/client/v3/maintenance.go @@ -35,7 +35,8 @@ type ( MoveLeaderResponse pb.MoveLeaderResponse DowngradeResponse pb.DowngradeResponse - DowngradeAction pb.DowngradeRequest_DowngradeAction + DowngradeAction pb.DowngradeRequest_DowngradeAction + DowngradeVersionTestResponse pb.DowngradeVersionTestResponse ) const ( @@ -87,6 +88,10 @@ type Maintenance interface { // on the cluster version. // Supported since etcd 3.5. Downgrade(ctx context.Context, action DowngradeAction, version string) (*DowngradeResponse, error) + + // DowngradeVersionTest is for test only! It enables users (test cases) + // to send a DowngradeVersionTestRequest to etcdserver. + DowngradeVersionTest(ctx context.Context, version string) (*DowngradeVersionTestResponse, error) } // SnapshotResponse is aggregated response from the snapshot stream. @@ -348,3 +353,8 @@ func (m *maintenance) Downgrade(ctx context.Context, action DowngradeAction, ver resp, err := m.remote.Downgrade(ctx, &pb.DowngradeRequest{Action: actionType, Version: version}, m.callOpts...) return (*DowngradeResponse)(resp), ContextError(ctx, err) } + +func (m *maintenance) DowngradeVersionTest(ctx context.Context, version string) (*DowngradeVersionTestResponse, error) { + resp, err := m.remote.DowngradeVersionTest(ctx, &pb.DowngradeVersionTestRequest{Ver: version}) + return (*DowngradeVersionTestResponse)(resp), ContextError(ctx, err) +} diff --git a/client/v3/retry.go b/client/v3/retry.go index 9152c53a7d4c..7e90872cfa1e 100644 --- a/client/v3/retry.go +++ b/client/v3/retry.go @@ -233,6 +233,10 @@ type retryAuthClient struct { ac pb.AuthClient } +func (rmc *retryMaintenanceClient) DowngradeVersionTest(ctx context.Context, in *pb.DowngradeVersionTestRequest, opts ...grpc.CallOption) (resp *pb.DowngradeVersionTestResponse, err error) { + return rmc.mc.DowngradeVersionTest(ctx, in, opts...) +} + // RetryAuthClient implements a AuthClient. func RetryAuthClient(c *Client) pb.AuthClient { return &retryAuthClient{ diff --git a/server/etcdserver/api/v3rpc/maintenance.go b/server/etcdserver/api/v3rpc/maintenance.go index 92b41c31703b..891e98da7bf5 100644 --- a/server/etcdserver/api/v3rpc/maintenance.go +++ b/server/etcdserver/api/v3rpc/maintenance.go @@ -55,6 +55,7 @@ type Alarmer interface { type Downgrader interface { Downgrade(ctx context.Context, dr *pb.DowngradeRequest) (*pb.DowngradeResponse, error) + DowngradeVersionTest(ctx context.Context, r *pb.DowngradeVersionTestRequest) (*pb.DowngradeVersionTestResponse, error) } type LeaderTransferrer interface { @@ -296,6 +297,16 @@ func (ms *maintenanceServer) Downgrade(ctx context.Context, r *pb.DowngradeReque return resp, nil } +func (ms *maintenanceServer) DowngradeVersionTest(ctx context.Context, r *pb.DowngradeVersionTestRequest) (*pb.DowngradeVersionTestResponse, error) { + resp, err := ms.d.DowngradeVersionTest(ctx, r) + if err != nil { + return nil, togRPCError(err) + } + resp.Header = &pb.ResponseHeader{} + ms.hdr.fill(resp.Header) + return resp, nil +} + type authMaintenanceServer struct { *maintenanceServer *AuthAdmin @@ -355,3 +366,11 @@ func (ams *authMaintenanceServer) Downgrade(ctx context.Context, r *pb.Downgrade return ams.maintenanceServer.Downgrade(ctx, r) } + +func (ams *authMaintenanceServer) DowngradeVersionTest(ctx context.Context, r *pb.DowngradeVersionTestRequest) (*pb.DowngradeVersionTestResponse, error) { + if err := ams.isPermitted(ctx); err != nil { + return nil, togRPCError(err) + } + + return ams.maintenanceServer.DowngradeVersionTest(ctx, r) +} diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index e6cee5b2650d..8899c3511f01 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -2006,7 +2006,7 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership. } func (s *EtcdServer) applyInternalRaftRequest(r *pb.InternalRaftRequest, shouldApplyV3 membership.ShouldApplyV3) *apply.Result { - if r.ClusterVersionSet == nil && r.ClusterMemberAttrSet == nil && r.DowngradeInfoSet == nil { + if r.ClusterVersionSet == nil && r.ClusterMemberAttrSet == nil && r.DowngradeInfoSet == nil && r.DowngradeVersionTest == nil { if !shouldApplyV3 { return nil } @@ -2029,6 +2029,10 @@ func (s *EtcdServer) applyInternalRaftRequest(r *pb.InternalRaftRequest, shouldA case r.DowngradeInfoSet != nil: op = "DowngradeInfoSet" // Implemented in 3.5.x membershipApplier.DowngradeInfoSet(r.DowngradeInfoSet, shouldApplyV3) + case r.DowngradeVersionTest != nil: + op = "DowngradeVersionTest" + // do nothing, we are good as long as a WAL record + // has already been generated for this request. default: s.lg.Panic("not implemented apply", zap.Stringer("raft-request", r)) return nil diff --git a/server/etcdserver/v3_server.go b/server/etcdserver/v3_server.go index c6953604aa20..8afccea61b7d 100644 --- a/server/etcdserver/v3_server.go +++ b/server/etcdserver/v3_server.go @@ -1044,6 +1044,16 @@ func (s *EtcdServer) downgradeEnable(ctx context.Context, r *pb.DowngradeRequest return &resp, nil } +// DowngradeVersionTest is for test only! We intentionally send +// a raft request so that a related WAL record can be generated. +func (s *EtcdServer) DowngradeVersionTest(ctx context.Context, r *pb.DowngradeVersionTestRequest) (*pb.DowngradeVersionTestResponse, error) { + resp, err := s.raftRequest(ctx, pb.InternalRaftRequest{DowngradeVersionTest: r}) + if err != nil { + return nil, err + } + return resp.(*pb.DowngradeVersionTestResponse), nil +} + func (s *EtcdServer) downgradeCancel(ctx context.Context) (*pb.DowngradeResponse, error) { err := s.Version().DowngradeCancel(ctx) if err != nil { diff --git a/server/proxy/grpcproxy/adapter/maintenance_client_adapter.go b/server/proxy/grpcproxy/adapter/maintenance_client_adapter.go index 7b204451681b..1697b8e3f18e 100644 --- a/server/proxy/grpcproxy/adapter/maintenance_client_adapter.go +++ b/server/proxy/grpcproxy/adapter/maintenance_client_adapter.go @@ -56,6 +56,10 @@ func (s *mts2mtc) Downgrade(ctx context.Context, r *pb.DowngradeRequest, opts .. return s.mts.Downgrade(ctx, r) } +func (s *mts2mtc) DowngradeVersionTest(ctx context.Context, r *pb.DowngradeVersionTestRequest, opts ...grpc.CallOption) (*pb.DowngradeVersionTestResponse, error) { + return s.mts.DowngradeVersionTest(ctx, r) +} + func (s *mts2mtc) Snapshot(ctx context.Context, in *pb.SnapshotRequest, opts ...grpc.CallOption) (pb.Maintenance_SnapshotClient, error) { cs := newPipeStream(ctx, func(ss chanServerStream) error { return s.mts.Snapshot(in, &ss2scServerStream{ss}) diff --git a/server/proxy/grpcproxy/maintenance.go b/server/proxy/grpcproxy/maintenance.go index 553b66c4b73b..2dd736949884 100644 --- a/server/proxy/grpcproxy/maintenance.go +++ b/server/proxy/grpcproxy/maintenance.go @@ -86,3 +86,7 @@ func (mp *maintenanceProxy) MoveLeader(ctx context.Context, r *pb.MoveLeaderRequ func (mp *maintenanceProxy) Downgrade(ctx context.Context, r *pb.DowngradeRequest) (*pb.DowngradeResponse, error) { return mp.maintenanceClient.Downgrade(ctx, r) } + +func (mp *maintenanceProxy) DowngradeVersionTest(ctx context.Context, r *pb.DowngradeVersionTestRequest) (*pb.DowngradeVersionTestResponse, error) { + return mp.maintenanceClient.DowngradeVersionTest(ctx, r) +}