Skip to content

Commit

Permalink
Add log statements, fix references, add way too many AddRefs
Browse files Browse the repository at this point in the history
  • Loading branch information
mikelsr committed Sep 9, 2023
1 parent 8a1de29 commit 3cb1b2a
Show file tree
Hide file tree
Showing 3 changed files with 73 additions and 63 deletions.
34 changes: 21 additions & 13 deletions raft/capnp.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ func (n *Node) Join(ctx context.Context, call api.Raft_join) error {
return err
}
for i, node := range nodes {
caps.Set(i, node)
caps.Set(i, node.AddRef())
}
if err = res.SetNodes(caps); err != nil {
res.SetError(err.Error())
Expand All @@ -37,8 +37,13 @@ func (n *Node) Join(ctx context.Context, call api.Raft_join) error {

// join is the capnp-free logic of Join.
func (n *Node) join(ctx context.Context, node api.Raft) ([]api.Raft, error) {
node = node.AddRef()

id, err := rpcGetId(ctx, node)
if err := node.Resolve(ctx); err != nil {
return nil, err
}

id, err := rpcGetId(ctx, node.AddRef())
if err != nil {
return nil, err
}
Expand All @@ -47,24 +52,24 @@ func (n *Node) join(ctx context.Context, node api.Raft) ([]api.Raft, error) {
ID: id,
Type: raftpb.ConfChangeAddNode,
NodeID: id,
// TODO con can nodes be propagated?
// Context: marshaledCap,
}

n.Logger.Debugf("[%x] joined by peer %x\n", n.ID, id)

if err := n.Raft.ProposeConfChange(ctx, cc); err != nil {
return nil, err
}

// TODO is the map necessary?
peers := n.Cluster.Peers()
nodes := make([]api.Raft, len(peers))
// Return known peers
pm := n.View.Peers()
peers := make([]api.Raft, len(pm))
i := 0
for _, node := range peers {
nodes[i] = node
for _, peer := range pm {
peers[i] = peer.AddRef()
i++
}

return nodes, nil
return peers, nil
}

// Leave a Raft cluster.
Expand All @@ -74,7 +79,7 @@ func (n *Node) Leave(ctx context.Context, call api.Raft_leave) error {
return err
}
node := call.Args().Node()
if err = n.leave(ctx, node); err != nil {
if err = n.leave(ctx, node.AddRef()); err != nil {
res.SetError(err.Error())
return err
}
Expand All @@ -83,7 +88,7 @@ func (n *Node) Leave(ctx context.Context, call api.Raft_leave) error {

// leave is the capnp-free logic of Leave.
func (n *Node) leave(ctx context.Context, node api.Raft) error {
id, err := rpcGetId(ctx, node)
id, err := rpcGetId(ctx, node.AddRef())
if err != nil {
return err
}
Expand All @@ -93,6 +98,9 @@ func (n *Node) leave(ctx context.Context, node api.Raft) error {
Type: raftpb.ConfChangeRemoveNode,
NodeID: id,
}

n.Logger.Debugf("[%x] left by peer %x\n", id)

return n.Raft.ProposeConfChange(ctx, cc)
}

Expand All @@ -117,7 +125,7 @@ func (n *Node) Send(ctx context.Context, call api.Raft_send) error {
// send is the capnp-free logic of Send.
func (n *Node) send(ctx context.Context, msgData []byte) error {
var (
msg *raftpb.Message
msg = &raftpb.Message{}
err error
)

Expand Down
5 changes: 3 additions & 2 deletions raft/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,8 +2,9 @@ package raft

import "go.etcd.io/raft/v3"

// Specify ID. Call AFTER WithConfig.
func (n *Node) WithID(id uint64) *Node {
n.ID = id
n.Config.ID = id
return n
}

Expand Down Expand Up @@ -33,6 +34,6 @@ func (n *Node) WithRaftConfig(config *raft.Config) *Node {
}

func (n *Node) WithLogger(logger raft.Logger) *Node {
n.logger = logger
n.Logger = logger
return n
}
97 changes: 49 additions & 48 deletions raft/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ import (
"context"
"errors"
"fmt"
"log"
"runtime"
"sync"
"time"
Expand All @@ -16,11 +15,10 @@ import (

// Node implements api.Raft_Server.
type Node struct {
ID uint64
*Cluster
*View
items ItemMap
queue MessageQueue
logger raft.Logger
Logger raft.Logger

// Raft specifics
Raft raft.Node
Expand All @@ -38,15 +36,16 @@ type Node struct {
OnNewValue
RaftNodeRetrieval
RaftStore

init bool
}

func New() *Node {
return &Node{
ID: DefaultID(),
Cluster: NewCluster(),
items: ItemMap{},
queue: make(MessageQueue),
logger: DefaultLogger,
View: NewView(),
items: ItemMap{},
queue: make(MessageQueue),
Logger: DefaultLogger(false),

pauseChan: make(chan bool),
ticker: *time.NewTicker(time.Second),
Expand All @@ -56,7 +55,7 @@ func New() *Node {

// Cap instantiates a new capability from n.
func (n *Node) Cap() api.Raft {
return api.Raft_ServerToClient(n)
return api.Raft_ServerToClient(n).AddRef()
}

// Stop a node in a non-forcing, non way (pending operations will complete).
Expand All @@ -77,36 +76,33 @@ func (n *Node) Stop(cause error) {
// Start the raft node.
func (n *Node) Start(ctx context.Context) {

n.init()
n.logger.Info("init done")
n.Init()

var err error
for {
select {
case <-n.ticker.C:
n.logger.Info("tick")
n.Raft.Tick()

case ready := <-n.Raft.Ready():
n.logger.Info("ready")
err = n.doReady(ctx, ready)

case pause := <-n.pauseChan:
n.logger.Info("pause")
err = n.doPause(ctx, pause)

case <-ctx.Done():
n.logger.Info("ctx done")
n.Logger.Debug("ctx done")
err = ctx.Err()

case err := <-n.stopChan:
n.logger.Infof("stop with error: %s", err.Error())
n.Logger.Debugf("stop with error: %s", err.Error())
defer close(n.stopChan)
defer n.doStop(ctx, err)
return
}

if err != nil {
n.Logger.Error(err)
go func() {
n.stopChan <- err
}()
Expand All @@ -119,21 +115,25 @@ func (n *Node) Start(ctx context.Context) {
func (n *Node) lateConfig() {
n.Config.ID = n.ID
n.Config.Storage = n.Storage
n.Config.Logger = n.logger
n.Config.Logger = n.Logger
}

// init the underlying Raft node and register self in cluster.
func (n *Node) init() {
n.lateConfig()
peers := []raft.Peer{{ID: n.ID}}
n.Cluster.addPeer(n.ID, api.Raft_ServerToClient(n))
for k := range n.Cluster.Peers() {
if k == n.ID {
continue
}
peers = append(peers, raft.Peer{ID: k})
// Init the underlying Raft node and register self in cluster.
func (n *Node) Init() {
if !n.init {
n.init = true
n.lateConfig()
peerList := []raft.Peer{{ID: n.ID}}
// n.Cluster.addPeer(n.ID, n.Cap())
// for k := range n.Cluster.Peers() {
// if k == n.ID {
// continue
// }
// peerList = append(peerList, raft.Peer{ID: k})
// }
n.Raft = raft.StartNode(n.Config, peerList)
n.Logger.Debug("init done")
}
n.Raft = raft.StartNode(n.Config, peers)
}

func (n *Node) doReady(ctx context.Context, ready raft.Ready) error {
Expand All @@ -142,21 +142,19 @@ func (n *Node) doReady(ctx context.Context, ready raft.Ready) error {
return err
}

n.logger.Info("send messages")
n.sendMessages(ctx, ready.Messages)

if !raft.IsEmptySnap(ready.Snapshot) {
return errors.New("snapshotting is not yet implemented")
}

n.logger.Info("process entries")
for _, entry := range ready.CommittedEntries {
switch entry.Type {
case raftpb.EntryNormal:
n.logger.Info("normal entry")
n.Logger.Debug("process normal entry")
err = n.addEntry(entry)
case raftpb.EntryConfChange:
n.logger.Info("conf change")
n.Logger.Debug("process conf change")
err = n.addConfChange(ctx, entry)
default:
err = fmt.Errorf(
Expand All @@ -165,16 +163,16 @@ func (n *Node) doReady(ctx context.Context, ready raft.Ready) error {
if err != nil {
return err
}
n.Raft.Advance()
}
n.Raft.Advance()
return err
}

func (n *Node) doStop(ctx context.Context, err error) {
if err != nil {
log.Fatalf("Stop server with error `%s`.\n", err.Error())
n.Logger.Fatalf("Stop server with error `%s`.\n", err.Error())
} else {
log.Println("Stop server with no errors.")
n.Logger.Debug("Stop server with no errors.")
}
n.Raft.Stop()
}
Expand Down Expand Up @@ -261,7 +259,7 @@ func (n *Node) addEntry(entry raftpb.Entry) error {
}

func (n *Node) addConfChange(ctx context.Context, entry raftpb.Entry) error {
if entry.Type != raftpb.EntryConfChange || entry.Data == nil {
if entry.Data == nil {
return nil
}

Expand All @@ -276,10 +274,10 @@ func (n *Node) addConfChange(ctx context.Context, entry raftpb.Entry) error {

switch cc.Type {
case raftpb.ConfChangeAddNode:
n.logger.Info("add node")
n.Logger.Debugf("[%x] add node %x\n", n.ID, cc.NodeID)
err = n.addNode(ctx, cc)
case raftpb.ConfChangeRemoveNode:
n.logger.Info("remove node")
n.Logger.Debug("remove node")
err = n.removeNode(ctx, cc)
default:
err = fmt.Errorf(
Expand Down Expand Up @@ -312,18 +310,21 @@ func (n *Node) removeNode(ctx context.Context, cc raftpb.ConfChange) error {

// TODO find a more appropiate name
func (n *Node) sendMessages(ctx context.Context, messages []raftpb.Message) {
peers := n.Cluster.Peers()
peers := n.View.Peers()

for _, msg := range messages {
// Recipient is send.

// Recipient is sender.
if msg.To == n.ID {
n.Raft.Step(ctx, msg)
continue
}

// Recipient is potentially a peer.
if peer, found := peers[msg.To]; found {
if err := rpcSend(ctx, peer, msg); err != nil {
n.Logger.Debugf("[%x] send message %v to peer %x\n", n.ID, msg, msg.To)
if err := rpcSend(ctx, peer.AddRef(), msg); err != nil {
n.Logger.Error(err)
n.Raft.ReportUnreachable(msg.To)
}
}
Expand All @@ -335,7 +336,7 @@ func (n *Node) retrieve(ctx context.Context, id uint64, nodeC chan api.Raft, err
if err != nil {
errC <- err
} else {
nodeC <- node
nodeC <- node.AddRef()
}
}

Expand Down Expand Up @@ -363,7 +364,7 @@ func (n *Node) retrieveWithTimeout(ctx context.Context, id uint64, timeout time.
err = ctx.Err()
}

return node, err
return node.AddRef(), err
}

// Register a new node in the cluster.
Expand All @@ -373,7 +374,6 @@ func (n *Node) Register(ctx context.Context, id uint64) error {
node api.Raft
)

n.logger.Info("retrieve with timeout")
for i := 0; i < RetrievalRetries; i++ {
node, err = n.retrieveWithTimeout(ctx, id, RetrievalTimeout)
if err == nil {
Expand All @@ -387,17 +387,18 @@ func (n *Node) Register(ctx context.Context, id uint64) error {
if err != nil {
return err
}
n.logger.Info("add peer")
n.Cluster.AddPeer(ctx, node)
n.Logger.Debugf("[%x] add peer %x\n", n.ID, id)
n.View.AddPeer(ctx, node.AddRef())
return nil
}

// Unregister a node.
func (n *Node) Unregister(ctx context.Context, id uint64) {
n.Logger.Debugf("[%x] unregister node %x\n", n.ID, id)
if n.ID == id {
return
}

peer := n.Cluster.PopPeer(id)
peer := n.View.PopPeer(id)
peer.Release()
}

0 comments on commit 3cb1b2a

Please sign in to comment.