From 432a167dc96d1c7a018e7ed10ae286bd26d02432 Mon Sep 17 00:00:00 2001 From: mikelsr Date: Sun, 6 Aug 2023 19:42:59 +0200 Subject: [PATCH] Move implementation of RPC methods to a separate file --- raft/capnp.go | 195 +++++++++++++++++++++++++++++++++++++++----- raft/capnp_utils.go | 38 +++++++++ raft/node.go | 185 ----------------------------------------- 3 files changed, 211 insertions(+), 207 deletions(-) create mode 100644 raft/capnp_utils.go diff --git a/raft/capnp.go b/raft/capnp.go index 9602c36..e248755 100644 --- a/raft/capnp.go +++ b/raft/capnp.go @@ -1,38 +1,189 @@ package raft import ( - "errors" + "context" + "encoding/json" - "capnproto.org/go/capnp/v3" "github.com/mikelsr/raft-capnp/proto/api" + "go.etcd.io/raft/v3/raftpb" ) -// Cap is a Capnp capability. -type Cap interface { - api.Raft +// Join a Raft cluster. +func (n *Node) Join(ctx context.Context, call api.Raft_join) error { + res, err := call.AllocResults() + if err != nil { + return err + } + node := call.Args().Node() + nodes, err := n.join(ctx, node) + if err != nil { + res.SetError(err.Error()) + return err + } + caps, err := res.NewNodes(int32(len(nodes))) + if err != nil { + res.SetError(err.Error()) + return err + } + for i, node := range nodes { + caps.Set(i, node) + } + if err = res.SetNodes(caps); err != nil { + res.SetError(err.Error()) + return err + } + return nil +} + +// join is the capnp-free logic of Join. +func (n *Node) join(ctx context.Context, node api.Raft) ([]api.Raft, error) { + + id, err := rpcGetId(ctx, node) + if err != nil { + return nil, err + } + + cc := raftpb.ConfChange{ + ID: id, + Type: raftpb.ConfChangeAddNode, + NodeID: id, + // TODO con can nodes be propagated? + // Context: marshaledCap, + } + + if err := n.raft.ProposeConfChange(ctx, cc); err != nil { + return nil, err + } + + // TODO is the map necessary? + peers := n.Cluster.Peers() + nodes := make([]api.Raft, len(peers)) + i := 0 + for _, node := range peers { + nodes[i] = node + i++ + } + + return nodes, nil +} + +// Leave a Raft cluster. +func (n *Node) Leave(ctx context.Context, call api.Raft_leave) error { + res, err := call.AllocResults() + if err != nil { + return err + } + node := call.Args().Node() + if err = n.leave(ctx, node); err != nil { + res.SetError(err.Error()) + return err + } + return nil +} + +// leave is the capnp-free logic of Leave. +func (n *Node) leave(ctx context.Context, node api.Raft) error { + id, err := rpcGetId(ctx, node) + if err != nil { + return err + } + + cc := raftpb.ConfChange{ + ID: id, + Type: raftpb.ConfChangeRemoveNode, + NodeID: id, + } + return n.raft.ProposeConfChange(ctx, cc) } -// PointerList creates a list of pointers. -func PointerList[C Cap, CS any](caps []CS, serverToClient func(CS) C) (capnp.PointerList, error) { - _, seg, err := capnp.NewMessage(capnp.SingleSegment(nil)) +// Send advances the raft state machine with the received message. +func (n *Node) Send(ctx context.Context, call api.Raft_send) error { + res, err := call.AllocResults() if err != nil { - return capnp.PointerList{}, err + return err } - l, err := capnp.NewPointerList(seg, int32(len(caps))) + msgData, err := call.Args().Msg() if err != nil { - return capnp.PointerList{}, err + res.SetError(err.Error()) + return err } + if err = n.send(ctx, msgData); err != nil { + res.SetError(err.Error()) + return err + } + return nil +} + +// send is the capnp-free logic of Send. +func (n *Node) send(ctx context.Context, msgData []byte) error { + var ( + msg *raftpb.Message + err error + ) + + if err = json.Unmarshal(msgData, msg); err != nil { + return err + } + + if n.IsPaused() { + n.pauseLock.Lock() + n.queue <- *msg + n.pauseLock.Unlock() + } else { + err = n.raft.Step(ctx, *msg) + } + return err +} - for i, cap := range caps { - client := capnp.Client(serverToClient(cap)) - if !client.IsValid() { - return capnp.PointerList{}, errors.New("invalid capability when converting to list") - } - _, iSeg, err := capnp.NewMessage(capnp.SingleSegment(nil)) - if err != nil { - return capnp.PointerList{}, err - } - l.Set(i, client.EncodeAsPtr(iSeg)) +// Put proposes a new value. +func (n *Node) Put(ctx context.Context, call api.Raft_put) error { + res, err := call.AllocResults() + if err != nil { + return err + } + + itemCap, err := call.Args().Item() + if err != nil { + res.SetError(err.Error()) + return err + } + + item, err := ItemFromApi(itemCap) + if err != nil { + return err + } + + if err = n.put(ctx, item); err != nil { + res.SetError(err.Error()) + return err + } + + return nil +} + +// put is the capnp-free logic of Put. +func (n *Node) put(ctx context.Context, item Item) error { + itemData, err := item.Marshal() + if err != nil { + return err + } + + return n.raft.Propose(ctx, itemData) +} + +func (n *Node) List(ctx context.Context, call api.Raft_list) error { + return nil +} + +func (n *Node) Members(ctx context.Context, call api.Raft_members) error { + return nil +} + +func (n *Node) Id(ctx context.Context, call api.Raft_id) error { + res, err := call.AllocResults() + if err != nil { + return err } - return l, nil + res.SetId(n.ID) + return nil } diff --git a/raft/capnp_utils.go b/raft/capnp_utils.go new file mode 100644 index 0000000..9602c36 --- /dev/null +++ b/raft/capnp_utils.go @@ -0,0 +1,38 @@ +package raft + +import ( + "errors" + + "capnproto.org/go/capnp/v3" + "github.com/mikelsr/raft-capnp/proto/api" +) + +// Cap is a Capnp capability. +type Cap interface { + api.Raft +} + +// PointerList creates a list of pointers. +func PointerList[C Cap, CS any](caps []CS, serverToClient func(CS) C) (capnp.PointerList, error) { + _, seg, err := capnp.NewMessage(capnp.SingleSegment(nil)) + if err != nil { + return capnp.PointerList{}, err + } + l, err := capnp.NewPointerList(seg, int32(len(caps))) + if err != nil { + return capnp.PointerList{}, err + } + + for i, cap := range caps { + client := capnp.Client(serverToClient(cap)) + if !client.IsValid() { + return capnp.PointerList{}, errors.New("invalid capability when converting to list") + } + _, iSeg, err := capnp.NewMessage(capnp.SingleSegment(nil)) + if err != nil { + return capnp.PointerList{}, err + } + l.Set(i, client.EncodeAsPtr(iSeg)) + } + return l, nil +} diff --git a/raft/node.go b/raft/node.go index 582eafd..f4dda52 100644 --- a/raft/node.go +++ b/raft/node.go @@ -2,7 +2,6 @@ package raft import ( "context" - "encoding/json" "errors" "fmt" "log" @@ -39,190 +38,6 @@ type Node struct { RaftStore } -// CAPNP START - -// Join a Raft cluster. -func (n *Node) Join(ctx context.Context, call api.Raft_join) error { - res, err := call.AllocResults() - if err != nil { - return err - } - node := call.Args().Node() - nodes, err := n.join(ctx, node) - if err != nil { - res.SetError(err.Error()) - return err - } - caps, err := res.NewNodes(int32(len(nodes))) - if err != nil { - res.SetError(err.Error()) - return err - } - for i, node := range nodes { - caps.Set(i, node) - } - if err = res.SetNodes(caps); err != nil { - res.SetError(err.Error()) - return err - } - return nil -} - -// join is the capnp-free logic of Join. -func (n *Node) join(ctx context.Context, node api.Raft) ([]api.Raft, error) { - - id, err := rpcGetId(ctx, node) - if err != nil { - return nil, err - } - - cc := raftpb.ConfChange{ - ID: id, - Type: raftpb.ConfChangeAddNode, - NodeID: id, - // TODO con can nodes be propagated? - // Context: marshaledCap, - } - - if err := n.raft.ProposeConfChange(ctx, cc); err != nil { - return nil, err - } - - // TODO is the map necessary? - peers := n.Cluster.Peers() - nodes := make([]api.Raft, len(peers)) - i := 0 - for _, node := range peers { - nodes[i] = node - i++ - } - - return nodes, nil -} - -// Leave a Raft cluster. -func (n *Node) Leave(ctx context.Context, call api.Raft_leave) error { - res, err := call.AllocResults() - if err != nil { - return err - } - node := call.Args().Node() - if err = n.leave(ctx, node); err != nil { - res.SetError(err.Error()) - return err - } - return nil -} - -// leave is the capnp-free logic of Leave. -func (n *Node) leave(ctx context.Context, node api.Raft) error { - id, err := rpcGetId(ctx, node) - if err != nil { - return err - } - - cc := raftpb.ConfChange{ - ID: id, - Type: raftpb.ConfChangeRemoveNode, - NodeID: id, - } - return n.raft.ProposeConfChange(ctx, cc) -} - -// Send advances the raft state machine with the received message. -func (n *Node) Send(ctx context.Context, call api.Raft_send) error { - res, err := call.AllocResults() - if err != nil { - return err - } - msgData, err := call.Args().Msg() - if err != nil { - res.SetError(err.Error()) - return err - } - if err = n.send(ctx, msgData); err != nil { - res.SetError(err.Error()) - return err - } - return nil -} - -// send is the capnp-free logic of Send. -func (n *Node) send(ctx context.Context, msgData []byte) error { - var ( - msg *raftpb.Message - err error - ) - - if err = json.Unmarshal(msgData, msg); err != nil { - return err - } - - if n.IsPaused() { - n.pauseLock.Lock() - n.queue <- *msg - n.pauseLock.Unlock() - } else { - err = n.raft.Step(ctx, *msg) - } - return err -} - -// Put proposes a new value. -func (n *Node) Put(ctx context.Context, call api.Raft_put) error { - res, err := call.AllocResults() - if err != nil { - return err - } - - itemCap, err := call.Args().Item() - if err != nil { - res.SetError(err.Error()) - return err - } - - item, err := ItemFromApi(itemCap) - if err != nil { - return err - } - - if err = n.put(ctx, item); err != nil { - res.SetError(err.Error()) - return err - } - - return nil -} - -// put is the capnp-free logic of Put. -func (n *Node) put(ctx context.Context, item Item) error { - itemData, err := item.Marshal() - if err != nil { - return err - } - - return n.raft.Propose(ctx, itemData) -} - -func (n *Node) List(ctx context.Context, call api.Raft_list) error { - return nil -} - -func (n *Node) Members(ctx context.Context, call api.Raft_members) error { - return nil -} - -func (n *Node) Id(ctx context.Context, call api.Raft_id) error { - res, err := call.AllocResults() - if err != nil { - return err - } - res.SetId(n.ID) - return nil -} - -// CAPNP END - func (n *Node) Start(ctx context.Context) { var err error for {