Skip to content

Commit

Permalink
replica and workload
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenMathew committed Jul 29, 2024
1 parent ec0d468 commit fddb3ab
Show file tree
Hide file tree
Showing 5 changed files with 91 additions and 34 deletions.
14 changes: 14 additions & 0 deletions toy-raft/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,14 @@

.PHONY: clean all

all: replica workload

clean:
rm -f replica
rm -f workload

replica:
go build toy-raft/cmd/replica

workload:
go build toy-raft/cmd/workload
22 changes: 21 additions & 1 deletion toy-raft/main.go → toy-raft/cmd/replica/main.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package main

import (
"errors"
"flag"
"fmt"
"log"
"strings"
"time"
"toy-raft/network"
"toy-raft/raft"
"toy-raft/server"
Expand Down Expand Up @@ -52,5 +55,22 @@ func main() {
network.RegisterNode(replicaId, raftNode)
server := server.NewServer(replicaId, raftNode, sm)
server.Start()
select {}

proposalCount := 0

for {
select {
case <-time.After(1 * time.Second):
if err := server.Propose([]byte{}); err != nil {
if errors.Is(err, raft.ErrNotLeader) {
continue
}
panic(err)
}
proposalCount++

case <-time.After(10 * time.Second):
fmt.Printf("proposalCount: %d\n", proposalCount)
}
}
}
5 changes: 5 additions & 0 deletions toy-raft/cmd/workload/main.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
package main

func main() {

}
10 changes: 0 additions & 10 deletions toy-raft/network/nats.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,16 +40,6 @@ func NewNatsNetwork(groupId, natsUrl string) Network {
}

func (net *NatsNetwork) RegisterNode(id string, networkDevice NetworkDevice) {
// subscribe to proposals
{
_, err := net.conn.Subscribe(net.proposalSubject, func(msg *nats.Msg) {
// FIX: wtf
networkDevice.Receive([]byte(fmt.Sprintf("{\"operationType\": 4, \"payload\": \"%v\"}", msg.Data)))
})
if err != nil {
panic(fmt.Sprintf("failed to subscribe to proposal subject: %s", err))
}
}
// subscribe to unicast messages
{
recipientSubj := fmt.Sprintf("%s.%s", net.unicastPrefix, id)
Expand Down
74 changes: 51 additions & 23 deletions toy-raft/raft/raft.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,15 @@ import (
"fmt"
"log"
"math/rand"
"sync/atomic"
"time"
"toy-raft/network"
"toy-raft/state"
)

const (
updateChannelBufferSz int = 10000
proposalQueueBufferSz int = 100
heartbeatInterval time.Duration = 1 * time.Second
aeResponseTimeoutDuration time.Duration = 200 * time.Millisecond
voteResponseTimeoutDuration time.Duration = 3 * time.Second
Expand Down Expand Up @@ -42,6 +44,8 @@ type RaftNodeImpl struct {
// set of peers including self
peers map[string]bool

acceptingProposals atomic.Bool

// Follower
electionTimeoutTimer *time.Timer

Expand All @@ -52,6 +56,7 @@ type RaftNodeImpl struct {
// Leader only
followersStateMap map[string]*FollowerState
sendAppendEntriesTicker *time.Ticker
inboundProposals chan []byte

// All servers
// index of highest log entry known to be committed
Expand All @@ -66,16 +71,17 @@ func NewRaftNodeImpl(id string, sm state.StateMachine, storage Storage, network
peersMap[peer] = true
}
return &RaftNodeImpl{
id: id,
storage: storage,
stateMachine: sm,
network: network,
quitCh: make(chan bool),
inboundMessages: make(chan []byte, updateChannelBufferSz),
peers: peersMap,
state: Follower,
commitIndex: 0,
lastApplied: 0,
id: id,
storage: storage,
stateMachine: sm,
network: network,
quitCh: make(chan bool),
inboundMessages: make(chan []byte, updateChannelBufferSz),
inboundProposals: make(chan []byte, proposalQueueBufferSz),
peers: peersMap,
state: Follower,
commitIndex: 0,
lastApplied: 0,
}
}

Expand Down Expand Up @@ -138,6 +144,21 @@ func (rn *RaftNodeImpl) processOneTransistionInternal(inactivityTimeout time.Dur
currentTerm := rn.storage.GetCurrentTerm()

select {
case proposal := <-rn.inboundProposals:
// consume proposal
if rn.state == Leader {
msgCopy := make([]byte, len(proposal))
// encode
entry := Entry{
Term: rn.storage.GetCurrentTerm(),
Cmd: msgCopy,
}
if err := rn.storage.AppendEntry(entry); err != nil {
panic(err)
}
} else {
rn.Log("ignoring proposal since we are not leader")
}
case inboundMessage := <-rn.inboundMessages:
// handle the new message from network
opType, message, err := parseMessage(inboundMessage)
Expand Down Expand Up @@ -589,6 +610,14 @@ func (rn *RaftNodeImpl) ascendToLeader() {
}
}

swapped := rn.acceptingProposals.CompareAndSwap(false, true)
// guard:
if !swapped {
panic("unexpectedly was accepting proposals")
}

// leader state is initialized

// find term for last log entry, if no entries exist then 0
var prevLogTerm uint64 = 0
prevLogIdx := rn.storage.GetLastLogIndex()
Expand Down Expand Up @@ -684,6 +713,11 @@ func (rn *RaftNodeImpl) stepdown() {
stopAndDrainTicker(rn.sendAppendEntriesTicker)
rn.Log("leader stepped down, cleared followersStateMap, and stopped sendAppendEntriesTicker")
resetAndRestartTimer(rn.electionTimeoutTimer, randomTimerDuration(minElectionTimeout, maxElectionTimeout))
swapped := rn.acceptingProposals.CompareAndSwap(true, false)
// guard:
if !swapped {
panic("unexpectedly was not accepting proposals")
}
case Candidate:
rn.voteMap = nil
stopAndDrainTimer(rn.voteResponseTimeoutTimer)
Expand Down Expand Up @@ -755,20 +789,14 @@ func (rn *RaftNodeImpl) Log(format string, args ...any) {
var ErrNotLeader = fmt.Errorf("not leader")

func (rn *RaftNodeImpl) Propose(msg []byte) error {
if rn.state == Leader {
msgCopy := make([]byte, len(msg))
copy(msgCopy, msg)
// encode
entry := Entry{
Term: rn.storage.GetCurrentTerm(),
Cmd: msgCopy,
}
if err := rn.storage.AppendEntry(entry); err != nil {
panic(err)
// HACK:this way of accepting proposals might have false positives/negatives
if rn.acceptingProposals.Load() {
var proposal []byte
bytesCopied := copy(proposal, msg)
if len(msg) != bytesCopied {
panic("failed to copy buffer")
}
} else {
// TODO: if follower, route to leader
return ErrNotLeader
rn.inboundProposals <- msg
}
return nil
}
Expand Down

0 comments on commit fddb3ab

Please sign in to comment.