Skip to content

Commit

Permalink
Snapshotting, log trimming and recovery
Browse files Browse the repository at this point in the history
  • Loading branch information
ReubenMathew authored and mprimi committed Oct 25, 2024
1 parent 730b87f commit ccb4d9c
Show file tree
Hide file tree
Showing 8 changed files with 598 additions and 44 deletions.
164 changes: 153 additions & 11 deletions toy-raft/raft/badger_store.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,11 @@ import (
)

var (
TermKey = []byte("term")
VoteKey = []byte("vote")
LastLogIdxKey = []byte("lastLogIdx")
TermKey = []byte("term")
VoteKey = []byte("vote")
// TODO: load these once on startup, keep flushing values to disk on log writes
LastLogIdxKey = []byte("lastLogIdx")
FirstLogIdxKey = []byte("firstLogIdx")
)

type BadgerStorage struct {
Expand All @@ -25,7 +27,7 @@ type logEntry struct {
Term uint64
}

func NewDiskStorage(replicaId string, baseDir string) Storage {
func NewDiskStorage(replicaId string, baseDir string) *BadgerStorage {
dbPath := filepath.Join(baseDir, replicaId)
db, err := badger.Open(badger.DefaultOptions(dbPath))
if err != nil {
Expand Down Expand Up @@ -59,7 +61,7 @@ func NewDiskStorage(replicaId string, baseDir string) Storage {
return store
}

func NewInMemoryStorage() Storage {
func NewInMemoryStorage() *BadgerStorage {
db, err := badger.Open(badger.DefaultOptions("").WithInMemory(true))
if err != nil {
panic(fmt.Errorf("failed to init memory store: %w", err))
Expand All @@ -83,6 +85,7 @@ func (store *BadgerStorage) storageInit() {

// initialize log
store.setLastLogIdx(0)
store.setFirstLogIdx(1)
}

func (store *BadgerStorage) setLastLogIdx(newLastLogIdx uint64) {
Expand All @@ -101,6 +104,15 @@ func (store *BadgerStorage) setLastLogIdx(newLastLogIdx uint64) {
panic(fmt.Errorf("setting invalid last log index"))
}

if firstLogIdx := store.GetFirstLogIndex(); firstLogIdx > newLastLogIdx+1 {
assert.Unreachable("Setting lastLogIndex below trim threshold (firstLogIndex)", map[string]any{
"newLastLogIdx": newLastLogIdx,
"currentLastLogIdx": currentLastLogIdx,
"firstLogIndex": firstLogIdx,
})
panic(fmt.Errorf("setting invalid last log index"))
}

buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, newLastLogIdx)

Expand All @@ -114,8 +126,38 @@ func (store *BadgerStorage) setLastLogIdx(newLastLogIdx uint64) {
}
}

func (store *BadgerStorage) setFirstLogIdx(newFirstLogIdx uint64) {

currentFirstLogIdx := store.GetFirstLogIndex()
if currentFirstLogIdx == 0 && newFirstLogIdx == 0 {
// initial case, don't panic
} else if newFirstLogIdx == currentFirstLogIdx {
assert.Unreachable(
"Setting invalid firstLogIndex",
map[string]any{
"newFirstLogIdx": newFirstLogIdx,
"currentFirstLogIdx": currentFirstLogIdx,
},
)
panic(fmt.Errorf("setting invalid first log index"))
}

buf := make([]byte, 8)
binary.BigEndian.PutUint64(buf, newFirstLogIdx)

if err := store.db.Update(func(txn *badger.Txn) error {
if err := txn.Set(FirstLogIdxKey, buf); err != nil {
return fmt.Errorf("failed to set firstLogIdx: %s", err)
}
return nil
}); err != nil {
panic(fmt.Errorf("failed to commit first log index: %w", err))
}
}

func (store *BadgerStorage) GetLogEntry(idx uint64) (*Entry, bool) {
lastLogIdx := store.GetLastLogIndex()
firstLogIdx := store.GetFirstLogIndex()
if idx == 0 {
assert.Unreachable(
"Invalid entry lookup index",
Expand All @@ -129,6 +171,16 @@ func (store *BadgerStorage) GetLogEntry(idx uint64) (*Entry, bool) {

if idx > lastLogIdx {
return nil, false
} else if idx < firstLogIdx {
assert.Unreachable(
"Entry below trim threshold",
map[string]any{
"index": idx,
"lastLogIdx": lastLogIdx,
"firstLogIdx": firstLogIdx,
},
)
panic("attempted to look up entry below trim threshold")
}

var entry *Entry
Expand All @@ -151,8 +203,9 @@ func (store *BadgerStorage) GetLogEntry(idx uint64) (*Entry, bool) {
assert.Unreachable(
"Failed to load entry",
map[string]any{
"index": idx,
"lastLogIdx": lastLogIdx,
"index": idx,
"lastLogIdx": lastLogIdx,
"firstLogIdx": firstLogIdx,
},
)
panic(fmt.Errorf("failed to load entry: %w", err))
Expand All @@ -161,11 +214,12 @@ func (store *BadgerStorage) GetLogEntry(idx uint64) (*Entry, bool) {
return entry, true
}

func (store *BadgerStorage) TestGetLogEntries() []*Entry {
func (store *BadgerStorage) TestGetLogEntries() ([]*Entry, uint64) {
lastLogIdx := store.GetLastLogIndex()
firstLogIdx := store.GetFirstLogIndex()
entries := make([]*Entry, 0, lastLogIdx)
err := store.db.View(func(txn *badger.Txn) error {
for idx := uint64(1); idx <= lastLogIdx; idx++ {
for idx := firstLogIdx; idx <= lastLogIdx; idx++ {

item, err := txn.Get(store.idxToKey(idx))
if err != nil {
Expand All @@ -185,11 +239,52 @@ func (store *BadgerStorage) TestGetLogEntries() []*Entry {
if err != nil {
panic(err)
}
return entries
return entries, firstLogIdx - 1
}

func (store *BadgerStorage) DeleteEntriesUpTo(endingLogIndex uint64) {
firstLogIndex := store.GetFirstLogIndex()
lastLogIndex := store.GetLastLogIndex()

if endingLogIndex == 0 {
assert.Unreachable("Trimming up to 0", map[string]any{})
panic(fmt.Errorf("trimming log up to index 0"))
}

if endingLogIndex < firstLogIndex || endingLogIndex > lastLogIndex {
assert.Unreachable("attempted to trim outside log range", map[string]any{
"firstLogIndex": firstLogIndex,
"lastLogIndex": lastLogIndex,
"endingLogIndex": endingLogIndex,
})
panic(fmt.Errorf("attempted to trim outside log range"))
}

err := store.db.Update(func(txn *badger.Txn) error {
for idx := firstLogIndex; idx <= endingLogIndex; idx++ {
err := txn.Delete(store.idxToKey(idx))
if err != nil {
return fmt.Errorf("failed to delete key at index %d: %w", idx, err)
}
}
return nil
})
if err != nil {
assert.Unreachable("failed to trim log", map[string]any{
"firstLogIndex": firstLogIndex,
"lastLogIndex": lastLogIndex,
"endingLogIndex": endingLogIndex,
"error": err,
})
panic(fmt.Errorf("failed to trim log"))
}

store.setFirstLogIdx(endingLogIndex + 1)
}

func (store *BadgerStorage) DeleteEntriesFrom(startingLogIdx uint64) {
lastLogIdx := store.GetLastLogIndex()
firstLogIdx := store.GetFirstLogIndex()
if startingLogIdx == 0 {
assert.Unreachable(
"Invalid delete start index",
Expand All @@ -199,7 +294,18 @@ func (store *BadgerStorage) DeleteEntriesFrom(startingLogIdx uint64) {
},
)
panic(fmt.Errorf("invalid delete start index"))
} else if startingLogIdx < firstLogIdx {
assert.Unreachable(
"Delete below trim threshold",
map[string]any{
"startingLogIdx": startingLogIdx,
"lastLogIdx": lastLogIdx,
"firstLogIdx": firstLogIdx,
},
)
panic(fmt.Errorf("attempted to delete below trim threshold"))
}

err := store.db.Update(func(txn *badger.Txn) error {
for idx := startingLogIdx; idx <= lastLogIdx; idx++ {
err := txn.Delete(store.idxToKey(idx))
Expand All @@ -215,6 +321,7 @@ func (store *BadgerStorage) DeleteEntriesFrom(startingLogIdx uint64) {
map[string]any{
"startingLogIdx": startingLogIdx,
"lastLogIdx": lastLogIdx,
"firstLogIdx": firstLogIdx,
},
)
panic(fmt.Errorf("failed to delete: %w", err))
Expand Down Expand Up @@ -242,8 +349,32 @@ func (store *BadgerStorage) GetLastLogIndexAndTerm() (lastLogIndex uint64, term
return
}

func (store *BadgerStorage) GetLastLogIndex() uint64 {
func (store *BadgerStorage) GetFirstLogIndex() uint64 {
var firstLogIdx uint64
store.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(FirstLogIdxKey)
if errors.Is(err, badger.ErrKeyNotFound) {
// key doesn't exist yet
return nil
}

if err != nil {
return fmt.Errorf("failed to get firstLogIdx: %w", err)
}

err = item.Value(func(val []byte) error {
firstLogIdx = binary.BigEndian.Uint64(val)
return nil
})
if err != nil {
return fmt.Errorf("failed to read value of firstLogIdx item: %w", err)
}
return nil
})
return firstLogIdx
}

func (store *BadgerStorage) GetLastLogIndex() uint64 {
var lastLogIdx uint64
store.db.View(func(txn *badger.Txn) error {
item, err := txn.Get(LastLogIdxKey)
Expand Down Expand Up @@ -343,6 +474,17 @@ func (store *BadgerStorage) VoteFor(id string, currentTerm uint64) {

func (store *BadgerStorage) GetLogEntriesFrom(startingLogIdx uint64) []Entry {
lastLogIdx := store.GetLastLogIndex()
firstLogIdx := store.GetFirstLogIndex()

if startingLogIdx < firstLogIdx {
assert.Unreachable("entries below trim threshold", map[string]any{
"startingLogIdx": startingLogIdx,
"firstLogIdx": firstLogIdx,
"lastLogIdx": lastLogIdx,
})
panic(fmt.Errorf("attempted to load entries below trim threshold"))
}

entries := make([]Entry, 0, lastLogIdx-startingLogIdx+1)
err := store.db.View(func(txn *badger.Txn) error {
for idx := startingLogIdx; idx <= lastLogIdx; idx++ {
Expand Down
Loading

0 comments on commit ccb4d9c

Please sign in to comment.