From 314d425718be5213857af1276c055936d157fd8f Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 6 Oct 2014 14:15:50 -0700 Subject: [PATCH 1/3] main/raft: write addNode ConfChange entries in log when start raft --- etcdserver/server.go | 17 ++++++++++++++++- etcdserver/server_test.go | 12 ++++++------ raft/example_test.go | 2 +- raft/node.go | 14 +++++++++++++- raft/node_test.go | 28 ++++++++++++++++++++-------- 5 files changed, 56 insertions(+), 17 deletions(-) diff --git a/etcdserver/server.go b/etcdserver/server.go index 552e0caff05..401b0c01163 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -8,6 +8,7 @@ import ( "net/http" "os" "path" + "sort" "sync/atomic" "time" @@ -129,7 +130,14 @@ func NewServer(cfg *ServerConfig) *EtcdServer { if w, err = wal.Create(waldir); err != nil { log.Fatal(err) } - n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1) + ids := cfg.Cluster.IDs() + sort.Sort(int64Slice(ids)) + ccs := make([]raftpb.ConfChange, len(ids)) + for i, id := range ids { + // TODO: add context for PeerURLs + ccs[i] = raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: id} + } + n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1, ccs) } else { var index int64 snapshot, err := ss.Load() @@ -552,3 +560,10 @@ func getBool(v *bool) (vv bool, set bool) { } return *v, true } + +// int64Slice implements sort interface +type int64Slice []int64 + +func (p int64Slice) Len() int { return len(p) } +func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 5ff006332cd..165f43fe94e 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -391,7 +391,7 @@ func testServer(t *testing.T, ns int64) { for i := int64(0); i < ns; i++ { id := i + 1 - n := raft.StartNode(id, members, 10, 1) + n := raft.StartNode(id, members, 10, 1, nil) tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() srv := &EtcdServer{ @@ -458,7 +458,7 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { ctx, _ := context.WithCancel(context.Background()) - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelerates internal clock @@ -491,7 +491,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) st := &storeRecorder{} wait := &waitRecorder{} srv := &EtcdServer{ @@ -527,7 +527,7 @@ func TestDoProposalStopped(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelarates internal clock @@ -668,7 +668,7 @@ func TestSyncTrigger(t *testing.T) { // snapshot should snapshot the store and cut the persistent // TODO: node.Compact is called... we need to make the node an interface func TestSnapshot(t *testing.T) { - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) defer n.Stop() st := &storeRecorder{} p := &storageRecorder{} @@ -699,7 +699,7 @@ func TestSnapshot(t *testing.T) { // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { ctx := context.Background() - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) n.Campaign(ctx) st := &storeRecorder{} p := &storageRecorder{} diff --git a/raft/example_test.go b/raft/example_test.go index 289b970fc54..c957068b4ba 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -10,7 +10,7 @@ func saveStateToDisk(st pb.HardState) {} func saveToDisk(ents []pb.Entry) {} func Example_Node() { - n := StartNode(0, nil, 0, 0) + n := StartNode(0, nil, 0, 0, nil) // stuff to n happens in other goroutines diff --git a/raft/node.go b/raft/node.go index e853758ffbd..f755e4bd568 100644 --- a/raft/node.go +++ b/raft/node.go @@ -101,9 +101,21 @@ type Node interface { // StartNode returns a new Node given a unique raft id, a list of raft peers, and // the election and heartbeat timeouts in units of ticks. -func StartNode(id int64, peers []int64, election, heartbeat int) Node { +// It also wraps ConfChanges in entry and puts them at the head of the log. +func StartNode(id int64, peers []int64, election, heartbeat int, ccs []pb.ConfChange) Node { n := newNode() r := newRaft(id, peers, election, heartbeat) + ents := make([]pb.Entry, len(ccs)) + for i, cc := range ccs { + data, err := cc.Marshal() + if err != nil { + panic("unexpected marshal error") + } + ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: int64(i + 1), Data: data} + } + if !r.raftLog.maybeAppend(0, 0, int64(len(ccs)), ents...) { + panic("unexpected append failure") + } go n.run(r) return &n } diff --git a/raft/node_test.go b/raft/node_test.go index 4843f1ccb31..9e93a7ef62e 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -149,21 +149,33 @@ func TestNode(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() + cc := raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: 1} + ccdata, err := cc.Marshal() + if err != nil { + t.Fatalf("unexpected marshal error: %v", err) + } wants := []Ready{ { - SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, - HardState: raftpb.HardState{Term: 1, Commit: 1}, - Entries: []raftpb.Entry{{}, {Term: 1, Index: 1}}, - CommittedEntries: []raftpb.Entry{{Term: 1, Index: 1}}, + SoftState: &SoftState{Lead: 1, RaftState: StateLeader}, + HardState: raftpb.HardState{Term: 1, Commit: 2}, + Entries: []raftpb.Entry{ + {}, + {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, + {Term: 1, Index: 2}, + }, + CommittedEntries: []raftpb.Entry{ + {Type: raftpb.EntryConfChange, Term: 1, Index: 1, Data: ccdata}, + {Term: 1, Index: 2}, + }, }, { - HardState: raftpb.HardState{Term: 1, Commit: 2}, - Entries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}}, - CommittedEntries: []raftpb.Entry{{Term: 1, Index: 2, Data: []byte("foo")}}, + HardState: raftpb.HardState{Term: 1, Commit: 3}, + Entries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}}, + CommittedEntries: []raftpb.Entry{{Term: 1, Index: 3, Data: []byte("foo")}}, }, } - n := StartNode(1, []int64{1}, 0, 0) + n := StartNode(1, []int64{1}, 0, 0, []raftpb.ConfChange{cc}) n.Campaign(ctx) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0]) From c15c3eab4c08fe28b12969b14f2af716fd0954bb Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 6 Oct 2014 15:12:02 -0700 Subject: [PATCH 2/3] etcdserver: move int64Slice into pkg/types/ --- etcdserver/cluster_test.go | 12 ++++-------- etcdserver/server.go | 9 +-------- pkg/types/slice.go | 8 ++++++++ 3 files changed, 13 insertions(+), 16 deletions(-) create mode 100644 pkg/types/slice.go diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index 52550e2505f..ccdb6e2c01b 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -4,6 +4,8 @@ import ( "reflect" "sort" "testing" + + "github.com/coreos/etcd/pkg/types" ) func TestClusterAddSlice(t *testing.T) { @@ -201,12 +203,6 @@ func TestClusterSetBad(t *testing.T) { } } -type int64slice []int64 - -func (a int64slice) Len() int { return len(a) } -func (a int64slice) Swap(i, j int) { a[i], a[j] = a[j], a[i] } -func (a int64slice) Less(i, j int) bool { return a[i] < a[j] } - func TestClusterIDs(t *testing.T) { cs := Cluster{} cs.AddSlice([]Member{ @@ -214,8 +210,8 @@ func TestClusterIDs(t *testing.T) { {ID: 4}, {ID: 100}, }) - w := int64slice([]int64{1, 4, 100}) - g := int64slice(cs.IDs()) + w := types.Int64Slice([]int64{1, 4, 100}) + g := types.Int64Slice(cs.IDs()) sort.Sort(g) if !reflect.DeepEqual(w, g) { t.Errorf("IDs=%+v, want %+v", g, w) diff --git a/etcdserver/server.go b/etcdserver/server.go index 401b0c01163..4354d68a7b8 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -131,7 +131,7 @@ func NewServer(cfg *ServerConfig) *EtcdServer { log.Fatal(err) } ids := cfg.Cluster.IDs() - sort.Sort(int64Slice(ids)) + sort.Sort(types.Int64Slice(ids)) ccs := make([]raftpb.ConfChange, len(ids)) for i, id := range ids { // TODO: add context for PeerURLs @@ -560,10 +560,3 @@ func getBool(v *bool) (vv bool, set bool) { } return *v, true } - -// int64Slice implements sort interface -type int64Slice []int64 - -func (p int64Slice) Len() int { return len(p) } -func (p int64Slice) Less(i, j int) bool { return p[i] < p[j] } -func (p int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } diff --git a/pkg/types/slice.go b/pkg/types/slice.go new file mode 100644 index 00000000000..3fecc013d1b --- /dev/null +++ b/pkg/types/slice.go @@ -0,0 +1,8 @@ +package types + +// Int64Slice implements sort interface +type Int64Slice []int64 + +func (p Int64Slice) Len() int { return len(p) } +func (p Int64Slice) Less(i, j int) bool { return p[i] < p[j] } +func (p Int64Slice) Swap(i, j int) { p[i], p[j] = p[j], p[i] } From 45ebfb4217bdec5934004f55199adff81a2153ce Mon Sep 17 00:00:00 2001 From: Yicheng Qin Date: Mon, 6 Oct 2014 15:22:35 -0700 Subject: [PATCH 3/3] raft: refine initial entries logic in StartNode --- etcdserver/cluster.go | 1 + etcdserver/cluster_test.go | 8 ++------ etcdserver/server.go | 11 ++--------- etcdserver/server_test.go | 24 ++++++++++++------------ raft/example_test.go | 2 +- raft/node.go | 16 +++++++++------- raft/node_test.go | 2 +- 7 files changed, 28 insertions(+), 36 deletions(-) diff --git a/etcdserver/cluster.go b/etcdserver/cluster.go index eaace621767..c43d437e08c 100644 --- a/etcdserver/cluster.go +++ b/etcdserver/cluster.go @@ -100,6 +100,7 @@ func (c Cluster) IDs() []int64 { for _, m := range c { ids = append(ids, m.ID) } + sort.Sort(types.Int64Slice(ids)) return ids } diff --git a/etcdserver/cluster_test.go b/etcdserver/cluster_test.go index ccdb6e2c01b..89529c72ce7 100644 --- a/etcdserver/cluster_test.go +++ b/etcdserver/cluster_test.go @@ -2,10 +2,7 @@ package etcdserver import ( "reflect" - "sort" "testing" - - "github.com/coreos/etcd/pkg/types" ) func TestClusterAddSlice(t *testing.T) { @@ -210,9 +207,8 @@ func TestClusterIDs(t *testing.T) { {ID: 4}, {ID: 100}, }) - w := types.Int64Slice([]int64{1, 4, 100}) - g := types.Int64Slice(cs.IDs()) - sort.Sort(g) + w := []int64{1, 4, 100} + g := cs.IDs() if !reflect.DeepEqual(w, g) { t.Errorf("IDs=%+v, want %+v", g, w) } diff --git a/etcdserver/server.go b/etcdserver/server.go index 4354d68a7b8..8f8a6f3b963 100644 --- a/etcdserver/server.go +++ b/etcdserver/server.go @@ -8,7 +8,6 @@ import ( "net/http" "os" "path" - "sort" "sync/atomic" "time" @@ -130,14 +129,8 @@ func NewServer(cfg *ServerConfig) *EtcdServer { if w, err = wal.Create(waldir); err != nil { log.Fatal(err) } - ids := cfg.Cluster.IDs() - sort.Sort(types.Int64Slice(ids)) - ccs := make([]raftpb.ConfChange, len(ids)) - for i, id := range ids { - // TODO: add context for PeerURLs - ccs[i] = raftpb.ConfChange{Type: raftpb.ConfChangeAddNode, NodeID: id} - } - n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1, ccs) + // TODO: add context for PeerURLs + n = raft.StartNode(m.ID, cfg.Cluster.IDs(), 10, 1) } else { var index int64 snapshot, err := ss.Load() diff --git a/etcdserver/server_test.go b/etcdserver/server_test.go index 165f43fe94e..22dae63fe8f 100644 --- a/etcdserver/server_test.go +++ b/etcdserver/server_test.go @@ -391,7 +391,7 @@ func testServer(t *testing.T, ns int64) { for i := int64(0); i < ns; i++ { id := i + 1 - n := raft.StartNode(id, members, 10, 1, nil) + n := raft.StartNode(id, members, 10, 1) tk := time.NewTicker(10 * time.Millisecond) defer tk.Stop() srv := &EtcdServer{ @@ -458,7 +458,7 @@ func TestDoProposal(t *testing.T) { for i, tt := range tests { ctx, _ := context.WithCancel(context.Background()) - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelerates internal clock @@ -491,7 +491,7 @@ func TestDoProposal(t *testing.T) { func TestDoProposalCancelled(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) st := &storeRecorder{} wait := &waitRecorder{} srv := &EtcdServer{ @@ -527,7 +527,7 @@ func TestDoProposalStopped(t *testing.T) { ctx, cancel := context.WithCancel(context.Background()) defer cancel() // node cannot make any progress because there are two nodes - n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0, 0xBAD1}, 10, 1) st := &storeRecorder{} tk := make(chan time.Time) // this makes <-tk always successful, which accelarates internal clock @@ -668,7 +668,7 @@ func TestSyncTrigger(t *testing.T) { // snapshot should snapshot the store and cut the persistent // TODO: node.Compact is called... we need to make the node an interface func TestSnapshot(t *testing.T) { - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) defer n.Stop() st := &storeRecorder{} p := &storageRecorder{} @@ -699,7 +699,7 @@ func TestSnapshot(t *testing.T) { // Applied > SnapCount should trigger a SaveSnap event func TestTriggerSnap(t *testing.T) { ctx := context.Background() - n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1, nil) + n := raft.StartNode(0xBAD0, []int64{0xBAD0}, 10, 1) n.Campaign(ctx) st := &storeRecorder{} p := &storageRecorder{} @@ -712,7 +712,7 @@ func TestTriggerSnap(t *testing.T) { } s.start() - for i := 0; int64(i) < s.snapCount; i++ { + for i := 0; int64(i) < s.snapCount-1; i++ { s.Do(ctx, pb.Request{Method: "PUT", ID: 1}) } time.Sleep(time.Millisecond) @@ -720,12 +720,12 @@ func TestTriggerSnap(t *testing.T) { gaction := p.Action() // each operation is recorded as a Save - // Nop + SnapCount * Puts + Cut + SaveSnap = Save + SnapCount * Save + Cut + SaveSnap - if len(gaction) != 3+int(s.snapCount) { - t.Fatalf("len(action) = %d, want %d", len(gaction), 3+int(s.snapCount)) + // BootstrapConfig/Nop + (SnapCount - 1) * Puts + Cut + SaveSnap = Save + (SnapCount - 1) * Save + Cut + SaveSnap + if len(gaction) != 2+int(s.snapCount) { + t.Fatalf("len(action) = %d, want %d", len(gaction), 2+int(s.snapCount)) } - if !reflect.DeepEqual(gaction[12], action{name: "SaveSnap"}) { - t.Errorf("action = %s, want SaveSnap", gaction[12]) + if !reflect.DeepEqual(gaction[11], action{name: "SaveSnap"}) { + t.Errorf("action = %s, want SaveSnap", gaction[11]) } } diff --git a/raft/example_test.go b/raft/example_test.go index c957068b4ba..289b970fc54 100644 --- a/raft/example_test.go +++ b/raft/example_test.go @@ -10,7 +10,7 @@ func saveStateToDisk(st pb.HardState) {} func saveToDisk(ents []pb.Entry) {} func Example_Node() { - n := StartNode(0, nil, 0, 0, nil) + n := StartNode(0, nil, 0, 0) // stuff to n happens in other goroutines diff --git a/raft/node.go b/raft/node.go index f755e4bd568..28c3e797cd8 100644 --- a/raft/node.go +++ b/raft/node.go @@ -101,21 +101,23 @@ type Node interface { // StartNode returns a new Node given a unique raft id, a list of raft peers, and // the election and heartbeat timeouts in units of ticks. -// It also wraps ConfChanges in entry and puts them at the head of the log. -func StartNode(id int64, peers []int64, election, heartbeat int, ccs []pb.ConfChange) Node { +// It also builds ConfChangeAddNode entry for each peer and puts them at the head of the log. +func StartNode(id int64, peers []int64, election, heartbeat int) Node { n := newNode() r := newRaft(id, peers, election, heartbeat) - ents := make([]pb.Entry, len(ccs)) - for i, cc := range ccs { + + ents := make([]pb.Entry, len(peers)) + for i, peer := range peers { + cc := pb.ConfChange{Type: pb.ConfChangeAddNode, NodeID: peer} data, err := cc.Marshal() if err != nil { panic("unexpected marshal error") } ents[i] = pb.Entry{Type: pb.EntryConfChange, Term: 1, Index: int64(i + 1), Data: data} } - if !r.raftLog.maybeAppend(0, 0, int64(len(ccs)), ents...) { - panic("unexpected append failure") - } + r.raftLog.append(0, ents...) + r.raftLog.committed = int64(len(ents)) + go n.run(r) return &n } diff --git a/raft/node_test.go b/raft/node_test.go index 9e93a7ef62e..0822c1e91c1 100644 --- a/raft/node_test.go +++ b/raft/node_test.go @@ -175,7 +175,7 @@ func TestNode(t *testing.T) { }, } - n := StartNode(1, []int64{1}, 0, 0, []raftpb.ConfChange{cc}) + n := StartNode(1, []int64{1}, 0, 0) n.Campaign(ctx) if g := <-n.Ready(); !reflect.DeepEqual(g, wants[0]) { t.Errorf("#%d: g = %+v,\n w %+v", 1, g, wants[0])