From b7812513ce21ba1baf7931adabe7d3cfeb585d24 Mon Sep 17 00:00:00 2001 From: Marek Siarkowicz Date: Thu, 23 Nov 2023 15:45:09 +0100 Subject: [PATCH] Translate v2 requests into v3 ClusterMemberAttrSetRequest and ClusterVersionSetRequest Signed-off-by: Marek Siarkowicz --- server/etcdserver/apply_v2.go | 38 ++++++++++++++++-------- server/etcdserver/server.go | 8 ++--- server/etcdserver/server_test.go | 50 +++++++++++++++++++++++++------- 3 files changed, 68 insertions(+), 28 deletions(-) diff --git a/server/etcdserver/apply_v2.go b/server/etcdserver/apply_v2.go index cb48330658a..60442fcdfde 100644 --- a/server/etcdserver/apply_v2.go +++ b/server/etcdserver/apply_v2.go @@ -19,32 +19,46 @@ import ( "net/http" "path" - "github.com/coreos/go-semver/semver" "go.uber.org/zap" - "go.etcd.io/etcd/server/v3/etcdserver/api" + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/api/v3/membershippb" "go.etcd.io/etcd/server/v3/etcdserver/api/membership" ) -func (s *EtcdServer) applyV2Request(r *RequestV2, shouldApplyV3 membership.ShouldApplyV3) { +func v2ToV3Request(lg *zap.Logger, r *RequestV2) pb.InternalRaftRequest { if r.Method != http.MethodPut || (!storeMemberAttributeRegexp.MatchString(r.Path) && r.Path != membership.StoreClusterVersionKey()) { - s.lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method)) + lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method)) } if storeMemberAttributeRegexp.MatchString(r.Path) { - id := membership.MustParseMemberIDFromKey(s.lg, path.Dir(r.Path)) + id := membership.MustParseMemberIDFromKey(lg, path.Dir(r.Path)) var attr membership.Attributes if err := json.Unmarshal([]byte(r.Val), &attr); err != nil { - s.lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) + lg.Panic("failed to unmarshal", zap.String("value", r.Val), zap.Error(err)) } - if s.cluster != nil { - s.cluster.UpdateAttributes(id, attr, shouldApplyV3) + return pb.InternalRaftRequest{ + Header: &pb.RequestHeader{ + ID: r.ID, + }, + ClusterMemberAttrSet: &membershippb.ClusterMemberAttrSetRequest{ + Member_ID: uint64(id), + MemberAttributes: &membershippb.Attributes{ + Name: attr.Name, + ClientUrls: attr.ClientURLs, + }, + }, } } - // TODO remove v2 version set to avoid the conflict between v2 and v3 in etcd 3.6 if r.Path == membership.StoreClusterVersionKey() { - if s.cluster != nil { - // persist to backend given v2store can be very stale - s.cluster.SetVersion(semver.Must(semver.NewVersion(r.Val)), api.UpdateCapability, shouldApplyV3) + return pb.InternalRaftRequest{ + Header: &pb.RequestHeader{ + ID: r.ID, + }, + ClusterVersionSet: &membershippb.ClusterVersionSetRequest{ + Ver: r.Val, + }, } } + lg.Panic("detected disallowed v2 WAL for stage --v2-deprecation=write-only", zap.String("method", r.Method)) + return pb.InternalRaftRequest{} } diff --git a/server/etcdserver/server.go b/server/etcdserver/server.go index 3e5c6b619d2..7e329381935 100644 --- a/server/etcdserver/server.go +++ b/server/etcdserver/server.go @@ -1945,17 +1945,13 @@ func (s *EtcdServer) applyEntryNormal(e *raftpb.Entry, shouldApplyV3 membership. rp := &r pbutil.MustUnmarshal(rp, e.Data) s.lg.Debug("applyEntryNormal", zap.Stringer("V2request", rp)) - s.applyV2Request((*RequestV2)(rp), shouldApplyV3) - s.w.Trigger(r.ID, Response{}) - return + raftReq = v2ToV3Request(s.lg, (*RequestV2)(rp)) } s.lg.Debug("applyEntryNormal", zap.Stringer("raftReq", &raftReq)) if raftReq.V2 != nil { req := (*RequestV2)(raftReq.V2) - s.applyV2Request(req, shouldApplyV3) - s.w.Trigger(req.ID, Response{}) - return + raftReq = v2ToV3Request(s.lg, req) } id := raftReq.ID diff --git a/server/etcdserver/server_test.go b/server/etcdserver/server_test.go index 688d71be264..2b24b12ad19 100644 --- a/server/etcdserver/server_test.go +++ b/server/etcdserver/server_test.go @@ -29,6 +29,7 @@ import ( "time" "github.com/coreos/go-semver/semver" + "github.com/golang/protobuf/proto" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "go.uber.org/zap" @@ -52,6 +53,7 @@ import ( "go.etcd.io/etcd/server/v3/etcdserver/api/rafthttp" "go.etcd.io/etcd/server/v3/etcdserver/api/snap" "go.etcd.io/etcd/server/v3/etcdserver/api/v2store" + "go.etcd.io/etcd/server/v3/etcdserver/api/v3alarm" apply2 "go.etcd.io/etcd/server/v3/etcdserver/apply" "go.etcd.io/etcd/server/v3/etcdserver/cindex" "go.etcd.io/etcd/server/v3/etcdserver/errors" @@ -156,11 +158,19 @@ func TestV2SetMemberAttributes(t *testing.T) { defer betesting.Close(t, be) cl := newTestClusterWithBackend(t, []*membership.Member{{ID: 1}}, be) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - v2store: mockstore.NewRecorder(), - cluster: cl, + lgMu: new(sync.RWMutex), + lg: zaptest.NewLogger(t), + v2store: mockstore.NewRecorder(), + cluster: cl, + consistIndex: cindex.NewConsistentIndex(be), + w: wait.New(), } + as, err := v3alarm.NewAlarmStore(srv.lg, schema.NewAlarmBackend(srv.lg, be)) + if err != nil { + t.Fatal(err) + } + srv.alarmStore = as + srv.uberApply = srv.NewUberApplier() req := pb.Request{ Method: "PUT", @@ -168,7 +178,13 @@ func TestV2SetMemberAttributes(t *testing.T) { Path: membership.MemberAttributesStorePath(1), Val: `{"Name":"abc","ClientURLs":["http://127.0.0.1:2379"]}`, } - srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth) + data, err := proto.Marshal(&req) + if err != nil { + t.Fatal(err) + } + srv.applyEntryNormal(&raftpb.Entry{ + Data: data, + }, membership.ApplyV2storeOnly) w := membership.Attributes{Name: "abc", ClientURLs: []string{"http://127.0.0.1:2379"}} if g := cl.Member(1).Attributes; !reflect.DeepEqual(g, w) { t.Errorf("attributes = %v, want %v", g, w) @@ -183,11 +199,19 @@ func TestV2SetClusterVersion(t *testing.T) { cl := newTestClusterWithBackend(t, []*membership.Member{}, be) cl.SetVersion(semver.New("3.4.0"), api.UpdateCapability, membership.ApplyBoth) srv := &EtcdServer{ - lgMu: new(sync.RWMutex), - lg: zaptest.NewLogger(t), - v2store: mockstore.NewRecorder(), - cluster: cl, + lgMu: new(sync.RWMutex), + lg: zaptest.NewLogger(t), + v2store: mockstore.NewRecorder(), + cluster: cl, + consistIndex: cindex.NewConsistentIndex(be), + w: wait.New(), } + as, err := v3alarm.NewAlarmStore(srv.lg, schema.NewAlarmBackend(srv.lg, be)) + if err != nil { + t.Fatal(err) + } + srv.alarmStore = as + srv.uberApply = srv.NewUberApplier() req := pb.Request{ Method: "PUT", @@ -195,7 +219,13 @@ func TestV2SetClusterVersion(t *testing.T) { Path: membership.StoreClusterVersionKey(), Val: "3.5.0", } - srv.applyV2Request((*RequestV2)(&req), membership.ApplyBoth) + data, err := proto.Marshal(&req) + if err != nil { + t.Fatal(err) + } + srv.applyEntryNormal(&raftpb.Entry{ + Data: data, + }, membership.ApplyV2storeOnly) if g := cl.Version(); !reflect.DeepEqual(*g, version.V3_5) { t.Errorf("attributes = %v, want %v", *g, version.V3_5) }