Skip to content

Commit

Permalink
fix(defrag): handle no space left error
Browse files Browse the repository at this point in the history
Signed-off-by: Thomas Gosteli <thomas.gosteli@protonmail.ch>
  • Loading branch information
ghouscht committed Nov 5, 2024
1 parent f04d57c commit 28a2e22
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 70 deletions.
77 changes: 14 additions & 63 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@
package backend

import (
"errors"
"fmt"
"hash/crc32"
"io"
Expand All @@ -27,7 +26,6 @@ import (

humanize "github.com/dustin/go-humanize"
bolt "go.etcd.io/bbolt"
berrors "go.etcd.io/bbolt/errors"
"go.uber.org/zap"
)

Expand Down Expand Up @@ -456,61 +454,7 @@ func (b *backend) Commits() int64 {
}

func (b *backend) Defrag() error {
err := b.defrag()
if err != nil {
if b.lg != nil {
b.lg.Error("defrag failed, trying to reopen db",
zap.Error(err),
)
}

// TODO: verify that these are all the locks that must be acquired
// I copied this from the defrag() function, assuming this is everything that needs to be done
b.batchTx.LockOutsideApply()
defer b.batchTx.Unlock()

// lock database after lock tx to avoid deadlock.
b.mu.Lock()
defer b.mu.Unlock()

// block concurrent read requests while resetting tx
b.readTx.Lock()
defer b.readTx.Unlock()

// TODO: Is there a way to figure out if the bbolt db is closed or open?
// Haven't found one so far, thus I'm using b.db.Begin() and then checking the error it returns.

rwTx, err := b.db.Begin(true)
if err != nil {
// The only error we can probably handle here is if the database was closed during the defrag attempt.
if !errors.Is(err, berrors.ErrDatabaseNotOpen) {
panic("failed to begin transaction: " + err.Error())
}
// TODO: this has potential to block forever if lock on the db file can't be acquired and the bbolt options
// don't set a timeout for this operation.
b.db, err = bolt.Open(b.db.Path(), 0600, b.bopts)
if err != nil {
panic("failed to reopen database after failed defrag attempt: " + err.Error())
}
rwTx = b.unsafeBegin(true)
}
b.batchTx.tx = rwTx

b.readTx.reset()
b.readTx.tx = b.unsafeBegin(false) // TODO: This should work because now there should be a DB open

// TODO: Verify if this is needed?
size := b.readTx.tx.Size()
db := b.readTx.tx.DB()
atomic.StoreInt64(&b.size, size)
atomic.StoreInt64(&b.sizeInUse, size-(int64(db.Stats().FreePageN)*int64(db.Info().PageSize)))

if b.lg != nil {
b.lg.Info("reopened database after failed defrag attempt")
}
}

return err
return b.defrag()
}

func (b *backend) defrag() error {
Expand All @@ -532,31 +476,38 @@ func (b *backend) defrag() error {
b.readTx.Lock()
defer b.readTx.Unlock()

b.batchTx.unsafeCommit(true)

b.batchTx.tx = nil

// Create a temporary file to ensure we start with a clean slate.
// Snapshotter.cleanupSnapdir cleans up any of these that are found during startup.
// gofail: var defragNoSpace string
// return fmt.Errorf(defragNoSpace)
dir := filepath.Dir(b.db.Path())
temp, err := os.CreateTemp(dir, "db.tmp.*")
if err != nil {
return err
}
defer temp.Close()

// Commit/stop and then reset current transactions (including the readTx)
b.batchTx.unsafeCommit(true)
b.batchTx.tx = nil

options := bolt.Options{}
if boltOpenOptions != nil {
options = *boltOpenOptions
}
options.OpenFile = func(_ string, _ int, _ os.FileMode) (file *os.File, err error) {
// gofail: var defragNoSpace string
// return nil, fmt.Errorf(defragNoSpace)
return temp, nil
}
// Don't load tmp db into memory regardless of opening options
options.Mlock = false
tdbp := temp.Name()
tmpdb, err := bolt.Open(tdbp, 0600, &options)
if err != nil {
// Transactions were committed/stopped earlier, begin new transactions with old bolt db so etcd can continue to
// run.
b.batchTx.tx = b.unsafeBegin(true)
b.readTx.tx = b.unsafeBegin(false)

return err
}

Expand Down
14 changes: 7 additions & 7 deletions tests/e2e/defrag_no_space_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,6 @@ import (
"testing"
"time"

"github.com/davecgh/go-spew/spew"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"go.etcd.io/etcd/tests/v3/framework/config"
"go.etcd.io/etcd/tests/v3/framework/e2e"
Expand All @@ -39,10 +37,12 @@ func TestDefragNoSpace(t *testing.T) {
member := clus.Procs[0]

require.NoError(t, member.Failpoints().SetupHTTP(context.Background(), "defragNoSpace", `return("no space")`))
require.Error(t, member.Etcdctl().Defragment(context.Background(), config.DefragOption{Timeout: time.Minute}))
require.ErrorContains(t, member.Etcdctl().Defragment(context.Background(), config.DefragOption{Timeout: time.Minute}), "no space")

// This fails because etcd panics and we can no longer put a key.
assert.NoError(t, member.Etcdctl().Put(context.Background(), "foo", "bar", config.PutOptions{}))

spew.Dump(clus.Procs[0].Logs().Lines())
// Make sure etcd continues to run even after the failed defrag attempt
require.NoError(t, member.Etcdctl().Put(context.Background(), "foo", "bar", config.PutOptions{}))
value, err := member.Etcdctl().Get(context.Background(), "foo", config.GetOptions{})
require.NoError(t, err)
require.Len(t, value.Kvs, 1)
require.Equal(t, "bar", string(value.Kvs[0].Value))
}

0 comments on commit 28a2e22

Please sign in to comment.