Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Snapshotting and log trimming #2

Merged
merged 2 commits into from
Oct 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion toy-raft/cmd/replica/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,7 @@ func main() {
// Block forever
for {
select {
case <-time.After(3 * time.Second):
case <-time.After(1 * time.Second):
rng.Read(buffer)
if err := srv.Propose(buffer); err != nil {
if errors.Is(err, raft.ErrNotLeader) {
Expand Down
164 changes: 153 additions & 11 deletions toy-raft/raft/badger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
)

var (
TermKey = []byte("term")
VoteKey = []byte("vote")
LastLogIdxKey = []byte("lastLogIdx")
TermKey = []byte("term")
VoteKey = []byte("vote")
// TODO: load these once on startup, keep flushing values to disk on log writes
LastLogIdxKey = []byte("lastLogIdx")
FirstLogIdxKey = []byte("firstLogIdx")
)

type BadgerStorage struct {
Expand All @@ -25,7 +27,7 @@ type logEntry struct {
Term uint64
}

func NewDiskStorage(replicaId string, baseDir string) Storage {
func NewDiskStorage(replicaId string, baseDir string) *BadgerStorage {
dbPath := filepath.Join(baseDir, replicaId)
db, err := badger.Open(badger.DefaultOptions(dbPath))
if err != nil {
Expand Down Expand Up @@ -59,7 +61,7 @@ func NewDiskStorage(replicaId string, baseDir string) Storage {
return store
}

func NewInMemoryStorage() Storage {
func NewInMemoryStorage() *BadgerStorage {
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true))
if err != nil {
panic(fmt.Errorf("failed to init memory store: %w", err))
Expand All @@ -83,6 +85,7 @@ func (store *BadgerStorage) storageInit() {

// initialize log
store.setLastLogIdx(0)
store.setFirstLogIdx(1)
}

func (store *BadgerStorage) setLastLogIdx(newLastLogIdx uint64) {
Expand All @@ -101,6 +104,15 @@ func (store *BadgerStorage) setLastLogIdx(newLastLogIdx uint64) {
panic(fmt.Errorf("setting invalid last log index"))
}

if firstLogIdx := store.GetFirstLogIndex(); firstLogIdx > newLastLogIdx+1 {
assert.Unreachable("Setting lastLogIndex below trim threshold (firstLogIndex)", map[string]any{
"newLastLogIdx": newLastLogIdx,
"currentLastLogIdx": currentLastLogIdx,
"firstLogIndex": firstLogIdx,
})
panic(fmt.Errorf("setting invalid last log index"))
}

buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, newLastLogIdx)

Expand All @@ -114,8 +126,38 @@ func (store *BadgerStorage) setLastLogIdx(newLastLogIdx uint64) {
}
}

func (store *BadgerStorage) setFirstLogIdx(newFirstLogIdx uint64) {

currentFirstLogIdx := store.GetFirstLogIndex()
if currentFirstLogIdx == 0 && newFirstLogIdx == 0 {
// initial case, don't panic
} else if newFirstLogIdx == currentFirstLogIdx {
assert.Unreachable(
"Setting invalid firstLogIndex",
map[string]any{
"newFirstLogIdx": newFirstLogIdx,
"currentFirstLogIdx": currentFirstLogIdx,
},
)
panic(fmt.Errorf("setting invalid first log index"))
}

buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, newFirstLogIdx)

if err := store.db.Update(func(txn *badger.Txn) error {
if err := txn.Set(FirstLogIdxKey, buf); err != nil {
return fmt.Errorf("failed to set firstLogIdx: %s", err)
}
return nil
}); err != nil {
panic(fmt.Errorf("failed to commit first log index: %w", err))
}
}

func (store *BadgerStorage) GetLogEntry(idx uint64) (*Entry, bool) {
lastLogIdx := store.GetLastLogIndex()
firstLogIdx := store.GetFirstLogIndex()
if idx == 0 {
assert.Unreachable(
"Invalid entry lookup index",
Expand All @@ -129,6 +171,16 @@ func (store *BadgerStorage) GetLogEntry(idx uint64) (*Entry, bool) {

if idx > lastLogIdx {
return nil, false
} else if idx < firstLogIdx {
assert.Unreachable(
"Entry below trim threshold",
map[string]any{
"index": idx,
"lastLogIdx": lastLogIdx,
"firstLogIdx": firstLogIdx,
},
)
panic("attempted to look up entry below trim threshold")
}

var entry *Entry
Expand All @@ -151,8 +203,9 @@ func (store *BadgerStorage) GetLogEntry(idx uint64) (*Entry, bool) {
assert.Unreachable(
"Failed to load entry",
map[string]any{
"index": idx,
"lastLogIdx": lastLogIdx,
"index": idx,
"lastLogIdx": lastLogIdx,
"firstLogIdx": firstLogIdx,
},
)
panic(fmt.Errorf("failed to load entry: %w", err))
Expand All @@ -161,11 +214,12 @@ func (store *BadgerStorage) GetLogEntry(idx uint64) (*Entry, bool) {
return entry, true
}

func (store *BadgerStorage) TestGetLogEntries() []*Entry {
func (store *BadgerStorage) TestGetLogEntries() ([]*Entry, uint64) {
lastLogIdx := store.GetLastLogIndex()
firstLogIdx := store.GetFirstLogIndex()
entries := make([]*Entry, 0, lastLogIdx)
err := store.db.View(func(txn *badger.Txn) error {
for idx := uint64(1); idx <= lastLogIdx; idx++ {
for idx := firstLogIdx; idx <= lastLogIdx; idx++ {

item, err := txn.Get(store.idxToKey(idx))
if err != nil {
Expand All @@ -185,11 +239,52 @@ func (store *BadgerStorage) TestGetLogEntries() []*Entry {
if err != nil {
panic(err)
}
return entries
return entries, firstLogIdx - 1
}

func (store *BadgerStorage) DeleteEntriesUpTo(endingLogIndex uint64) {
firstLogIndex := store.GetFirstLogIndex()
lastLogIndex := store.GetLastLogIndex()

if endingLogIndex == 0 {
assert.Unreachable("Trimming up to 0", map[string]any{})
panic(fmt.Errorf("trimming log up to index 0"))
}

if endingLogIndex < firstLogIndex || endingLogIndex > lastLogIndex {
assert.Unreachable("attempted to trim outside log range", map[string]any{
"firstLogIndex": firstLogIndex,
"lastLogIndex": lastLogIndex,
"endingLogIndex": endingLogIndex,
})
panic(fmt.Errorf("attempted to trim outside log range"))
}

err := store.db.Update(func(txn *badger.Txn) error {
for idx := firstLogIndex; idx <= endingLogIndex; idx++ {
err := txn.Delete(store.idxToKey(idx))
if err != nil {
return fmt.Errorf("failed to delete key at index %d: %w", idx, err)
}
}
return nil
})
if err != nil {
assert.Unreachable("failed to trim log", map[string]any{
"firstLogIndex": firstLogIndex,
"lastLogIndex": lastLogIndex,
"endingLogIndex": endingLogIndex,
"error": err,
})
panic(fmt.Errorf("failed to trim log"))
}

store.setFirstLogIdx(endingLogIndex + 1)
}

func (store *BadgerStorage) DeleteEntriesFrom(startingLogIdx uint64) {
lastLogIdx := store.GetLastLogIndex()
firstLogIdx := store.GetFirstLogIndex()
if startingLogIdx == 0 {
assert.Unreachable(
"Invalid delete start index",
Expand All @@ -199,7 +294,18 @@ func (store *BadgerStorage) DeleteEntriesFrom(startingLogIdx uint64) {
},
)
panic(fmt.Errorf("invalid delete start index"))
} else if startingLogIdx < firstLogIdx {
assert.Unreachable(
"Delete below trim threshold",
map[string]any{
"startingLogIdx": startingLogIdx,
"lastLogIdx": lastLogIdx,
"firstLogIdx": firstLogIdx,
},
)
panic(fmt.Errorf("attempted to delete below trim threshold"))
}

err := store.db.Update(func(txn *badger.Txn) error {
for idx := startingLogIdx; idx <= lastLogIdx; idx++ {
err := txn.Delete(store.idxToKey(idx))
Expand All @@ -215,6 +321,7 @@ func (store *BadgerStorage) DeleteEntriesFrom(startingLogIdx uint64) {
map[string]any{
"startingLogIdx": startingLogIdx,
"lastLogIdx": lastLogIdx,
"firstLogIdx": firstLogIdx,
},
)
panic(fmt.Errorf("failed to delete: %w", err))
Expand Down Expand Up @@ -242,8 +349,32 @@ func (store *BadgerStorage) GetLastLogIndexAndTerm() (lastLogIndex uint64, term
return
}

func (store *BadgerStorage) GetLastLogIndex() uint64 {
func (store *BadgerStorage) GetFirstLogIndex() uint64 {
var firstLogIdx uint64
store.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(FirstLogIdxKey)
if errors.Is(err, badger.ErrKeyNotFound) {
// key doesn't exist yet
return nil
}

if err != nil {
return fmt.Errorf("failed to get firstLogIdx: %w", err)
}

err = item.Value(func(val []byte) error {
firstLogIdx = binary.BigEndian.Uint64(val)
return nil
})
if err != nil {
return fmt.Errorf("failed to read value of firstLogIdx item: %w", err)
}
return nil
})
return firstLogIdx
}

func (store *BadgerStorage) GetLastLogIndex() uint64 {
var lastLogIdx uint64
store.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(LastLogIdxKey)
Expand Down Expand Up @@ -343,6 +474,17 @@ func (store *BadgerStorage) VoteFor(id string, currentTerm uint64) {

func (store *BadgerStorage) GetLogEntriesFrom(startingLogIdx uint64) []Entry {
lastLogIdx := store.GetLastLogIndex()
firstLogIdx := store.GetFirstLogIndex()

if startingLogIdx < firstLogIdx {
assert.Unreachable("entries below trim threshold", map[string]any{
"startingLogIdx": startingLogIdx,
"firstLogIdx": firstLogIdx,
"lastLogIdx": lastLogIdx,
})
panic(fmt.Errorf("attempted to load entries below trim threshold"))
}

entries := make([]Entry, 0, lastLogIdx-startingLogIdx+1)
err := store.db.View(func(txn *badger.Txn) error {
for idx := startingLogIdx; idx <= lastLogIdx; idx++ {
Expand Down
Loading