Skip to content

Commit

Permalink
Flesh out replica/workload to test with Antithesis
Browse files Browse the repository at this point in the history
  • Loading branch information
mprimi committed Jul 30, 2024
1 parent 159b5af commit 583df64
Show file tree
Hide file tree
Showing 7 changed files with 180 additions and 96 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
toy-raft/cmd/replica/replica
toy-raft/cmd/workload/workload
67 changes: 28 additions & 39 deletions toy-raft/cmd/replica/main.go
Original file line number Diff line number Diff line change
@@ -1,76 +1,65 @@
package main

import (
"errors"
"flag"
"fmt"
"log"
"os"
"slices"
"strings"
"time"

"github.com/nats-io/nats.go"

"toy-raft/network"
"toy-raft/raft"
"toy-raft/server"
"toy-raft/state"

"github.com/nats-io/nats.go"
)

const n = 20

func main() {
var (
replicaId string
groupId string
natsUrl string
peerString string
)
flag.StringVar(&replicaId, "replicaId", "", "unique id of replica")
flag.StringVar(&groupId, "groupId", "", "raft group id")
flag.StringVar(&natsUrl, "natsUrl", "", "nats url")
flag.StringVar(&replicaId, "replica-id", "", "unique id of replica")
flag.StringVar(&groupId, "group-id", "", "raft group id")
flag.StringVar(&natsUrl, "nats-url", nats.DefaultURL, "nats url")
flag.StringVar(&peerString, "peers", "", "comma separated list of peer ids (including self)")
flag.Parse()

fatalErr := func(err error) {
fmt.Println(err)
os.Exit(1)
}
if replicaId == "" {
panic("requires a valid replica id")
fatalErr(fmt.Errorf("missing required argument: replica-id"))
}

if groupId == "" {
panic("requires a valid raft group id")
fatalErr(fmt.Errorf("missing required argument: group-id"))
}

if peerString == "" {
panic("requires a valid peer list")
fatalErr(fmt.Errorf("missing required argument: peers"))
}
peers := strings.Split(peerString, ",")

if natsUrl == "" {
log.Printf("no natsUrl was provided, using default: %s", nats.DefaultURL)
natsUrl = nats.DefaultURL
if !slices.Contains(peers, replicaId) {
fatalErr(fmt.Errorf("list of peers does not include this replica"))
}

network := network.NewNatsNetwork(groupId, natsUrl)

sm := state.NewKeepLastBlocksStateMachine(replicaId, n)
raftNode := raft.NewRaftNodeImpl(replicaId, sm, raft.NewInMemoryStorage(), network, peers)
network.RegisterNode(replicaId, raftNode)
server := server.NewServer(replicaId, raftNode, sm)
server.Start()

proposalCount := 0
natsNetwork, err := network.NewNatsNetwork(groupId, natsUrl)
if err != nil {
fatalErr(fmt.Errorf("failed to initialize network: %w", err))
}

for {
select {
case <-time.After(1 * time.Second):
if err := server.Propose([]byte{}); err != nil {
if errors.Is(err, raft.ErrNotLeader) {
continue
}
panic(err)
}
proposalCount++
sm := state.NewKeepLastBlocksStateMachine(replicaId, 10)
raftNode := raft.NewRaftNodeImpl(replicaId, sm, raft.NewInMemoryStorage(), natsNetwork, peers)
natsNetwork.RegisterNode(replicaId, raftNode)
srv := server.NewServer(replicaId, raftNode, sm)
srv.Start()

case <-time.After(10 * time.Second):
fmt.Printf("proposalCount: %d\n", proposalCount)
}
}
// Block forever
select {}
}
46 changes: 46 additions & 0 deletions toy-raft/cmd/workload/main.go
Original file line number Diff line number Diff line change
@@ -1,5 +1,51 @@
package main

import (
"flag"
"fmt"
"os"
"time"

"github.com/antithesishq/antithesis-sdk-go/lifecycle"
"github.com/nats-io/nats.go"
)

func main() {
var (
groupId string
natsUrl string
)
flag.StringVar(&groupId, "group-id", "", "raft group id")
flag.StringVar(&natsUrl, "nats-url", nats.DefaultURL, "nats url")
flag.Parse()

fatalErr := func(err error) {
fmt.Println(err)
os.Exit(1)
}

if groupId == "" {
fatalErr(fmt.Errorf("missing required argument: group-id"))
}

nc, err := nats.Connect(natsUrl)
if err != nil {
fatalErr(fmt.Errorf("failed to connect: %w", err))
}
defer nc.Close()

// TODO wait for RAFT group to be established -- not sure how.
// For now, do the dumb thing:
time.Sleep(3 * time.Second)

// If running in antithesis, signal setup is complete
lifecycle.SetupComplete(nil)

// Block forever
for {
select {
case <-time.After(5 * time.Second):
fmt.Printf("Idle...\n")
}
}
}
27 changes: 14 additions & 13 deletions toy-raft/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,27 +3,28 @@ module toy-raft
go 1.21.5

require (
github.com/antithesishq/antithesis-sdk-go v0.4.0
github.com/dgraph-io/badger/v4 v4.2.0
github.com/nats-io/nats.go v1.36.0
)

require (
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.3.0 // indirect
github.com/dgraph-io/ristretto v0.1.1 // indirect
github.com/dustin/go-humanize v1.0.0 // indirect
github.com/dustin/go-humanize v1.0.1 // indirect
github.com/gogo/protobuf v1.3.2 // indirect
github.com/golang/glog v1.0.0 // indirect
github.com/golang/groupcache v0.0.0-20190702054246-869f871628b6 // indirect
github.com/golang/protobuf v1.5.2 // indirect
github.com/golang/snappy v0.0.3 // indirect
github.com/google/flatbuffers v1.12.1 // indirect
github.com/klauspost/compress v1.17.2 // indirect
github.com/golang/glog v1.2.2 // indirect
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/google/flatbuffers v24.3.25+incompatible // indirect
github.com/klauspost/compress v1.17.9 // indirect
github.com/nats-io/nkeys v0.4.7 // indirect
github.com/nats-io/nuid v1.0.1 // indirect
github.com/pkg/errors v0.9.1 // indirect
go.opencensus.io v0.22.5 // indirect
golang.org/x/crypto v0.18.0 // indirect
golang.org/x/net v0.10.0 // indirect
golang.org/x/sys v0.16.0 // indirect
google.golang.org/protobuf v1.28.1 // indirect
go.opencensus.io v0.24.0 // indirect
golang.org/x/crypto v0.25.0 // indirect
golang.org/x/net v0.27.0 // indirect
golang.org/x/sys v0.22.0 // indirect
google.golang.org/protobuf v1.34.2 // indirect
)
Loading

0 comments on commit 583df64

Please sign in to comment.