Skip to content

Commit

Permalink
Partialy implement unregister
Browse files Browse the repository at this point in the history
  • Loading branch information
mikelsr committed Aug 6, 2023
1 parent 576b6b8 commit 54761fd
Showing 1 changed file with 35 additions and 5 deletions.
40 changes: 35 additions & 5 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -268,7 +268,7 @@ func (n *Node) doReady(ctx context.Context, ready raft.Ready) error {
case raftpb.EntryNormal:
err = n.addEntry(entry)
case raftpb.EntryConfChange:
err = n.addConfChange(entry)
err = n.addConfChange(ctx, entry)
default:
err = fmt.Errorf(
"unrecognized entry type: %s", raftpb.EntryType_name[int32(entry.Type)])
Expand Down Expand Up @@ -370,7 +370,7 @@ func (n *Node) addEntry(entry raftpb.Entry) error {
return nil
}

func (n *Node) addConfChange(entry raftpb.Entry) error {
func (n *Node) addConfChange(ctx context.Context, entry raftpb.Entry) error {
if entry.Type != raftpb.EntryConfChange || entry.Data == nil {
return nil
}
Expand All @@ -386,9 +386,9 @@ func (n *Node) addConfChange(entry raftpb.Entry) error {

switch cc.Type {
case raftpb.ConfChangeAddNode:

err = n.addNode(ctx, cc)
case raftpb.ConfChangeRemoveNode:

err = n.removeNode(ctx, cc)
default:
err = fmt.Errorf(
"unrecognized conf change type: %s",
Expand All @@ -397,6 +397,26 @@ func (n *Node) addConfChange(entry raftpb.Entry) error {
return err
}

func (n *Node) addNode(ctx context.Context, cc raftpb.ConfChange) error {
return n.Register(ctx, cc.NodeID)
}

func (n *Node) removeNode(ctx context.Context, cc raftpb.ConfChange) error {
// Unregister the node.
defer n.Unregister(ctx, cc.NodeID)

// Leader, self, steps down.
if n.ID == cc.NodeID && n.ID == n.raft.Status().Lead {
n.raft.Stop()
return nil
}
// Other leader steps down.
if cc.NodeID == n.raft.Status().Lead {
return n.raft.Campaign(ctx)
}
return nil
}

// TODO find a more appropiate name
func (n *Node) broadcast(ctx context.Context, messages []raftpb.Message) {
peers := n.Cluster.Peers()
Expand Down Expand Up @@ -452,7 +472,7 @@ func (n *Node) retrieveWithTimeout(ctx context.Context, id uint64, timeout time.
return node, err
}

// Register a new Node in the cluster.
// Register a new node in the cluster.
func (n *Node) Register(ctx context.Context, id uint64) error {
var (
err error
Expand All @@ -473,3 +493,13 @@ func (n *Node) Register(ctx context.Context, id uint64) error {
n.Cluster.AddPeer(ctx, node)
return nil
}

// Unregister a node.
func (n *Node) Unregister(ctx context.Context, id uint64) {
if n.ID == id {
return
}

// TODO release capability
n.Cluster.RemovePeer(id)
}

0 comments on commit 54761fd

Please sign in to comment.