Skip to content

Commit

Permalink
feat(store): correct corrupted files on write (#3859)
Browse files Browse the repository at this point in the history
  • Loading branch information
walldiss authored Oct 22, 2024
1 parent 78b0ff3 commit 4f1f14a
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 0 deletions.
36 changes: 36 additions & 0 deletions store/file/ods.go
Original file line number Diff line number Diff line change
Expand Up @@ -131,6 +131,27 @@ func writeAxisRoots(w io.Writer, roots *share.AxisRoots) error {
return nil
}

// ValidateODSSize checks if the file under given FS path has the expected size.
func ValidateODSSize(path string, eds *rsmt2d.ExtendedDataSquare) error {
ods, err := OpenODS(path)
if err != nil {
return fmt.Errorf("opening file: %w", err)
}

shares := filledSharesAmount(eds)
shareSize := len(eds.GetCell(0, 0))
expectedSize := ods.hdr.OffsetWithRoots() + shares*shareSize

info, err := ods.fl.Stat()
if err != nil {
return fmt.Errorf("getting file info: %w", err)
}
if info.Size() != int64(expectedSize) {
return fmt.Errorf("file size mismatch: expected %d, got %d", expectedSize, info.Size())
}
return nil
}

// OpenODS opens an existing ODS file under given FS path.
// It only reads the header with metadata. The other content
// of the File is read lazily.
Expand Down Expand Up @@ -414,3 +435,18 @@ func readColHalf(r io.ReaderAt, colIdx int, hdr *headerV0, offset int) ([]share.
}
return shares, nil
}

// filledSharesAmount returns the amount of shares in the ODS that are not tail padding.
func filledSharesAmount(eds *rsmt2d.ExtendedDataSquare) int {
var amount int
for i := range eds.Width() / 2 {
for j := range eds.Width() / 2 {
shr := eds.GetCell(i, j)
if share.GetNamespace(shr).Equals(share.TailPaddingNamespace) {
break
}
amount++
}
}
return amount
}
13 changes: 13 additions & 0 deletions store/file/ods_q4.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,19 @@ func CreateODSQ4(
return nil
}

// ValidateODSQ4Size checks the size of the ODS and Q4 files under the given FS paths.
func ValidateODSQ4Size(pathODS, pathQ4 string, eds *rsmt2d.ExtendedDataSquare) error {
err := ValidateODSSize(pathODS, eds)
if err != nil {
return fmt.Errorf("validating ODS file size: %w", err)
}
err = validateQ4Size(pathQ4, eds)
if err != nil {
return fmt.Errorf("validating Q4 file size: %w", err)
}
return nil
}

// ODSWithQ4 returns ODSQ4 instance over ODS. It opens Q4 file lazily under the given path.
func ODSWithQ4(ods *ODS, pathQ4 string) *ODSQ4 {
return &ODSQ4{
Expand Down
92 changes: 92 additions & 0 deletions store/file/ods_q4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package file

import (
"context"
"fmt"
"io"
"os"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -39,6 +42,95 @@ func TestODSQ4File(t *testing.T) {
eds.TestStreamer(ctx, t, createODSQ4AccessorStreamer, ODSSize)
}

func TestValidateODSQ4FileSize(t *testing.T) {
edses := []struct {
name string
eds *rsmt2d.ExtendedDataSquare
}{
{
name: "no padding",
eds: edstest.RandEDS(t, 8),
},
{
name: "with padding",
eds: edstest.RandEDSWithTailPadding(t, 8, 11),
},
{
name: "empty", eds: share.EmptyEDS(),
},
}

tests := []struct {
name string
createFile func(pathODS, pathQ4 string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error
valid bool
}{
{
name: "valid",
createFile: CreateODSQ4,
valid: true,
},
{
name: "shorter q4",
createFile: func(pathODS, pathQ4 string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error {
err := CreateODSQ4(pathODS, pathQ4, roots, eds)
if err != nil {
return err
}
file, err := os.OpenFile(pathQ4, os.O_RDWR, 0)
if err != nil {
return err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return err
}
return file.Truncate(info.Size() - 1)
},
valid: false,
},
{
name: "longer q4",
createFile: func(pathODS, pathQ4 string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error {
err := CreateODSQ4(pathODS, pathQ4, roots, eds)
if err != nil {
return err
}
file, err := os.OpenFile(pathQ4, os.O_RDWR, 0)
if err != nil {
return err
}
defer file.Close()
// append 1 byte to the file
_, err = file.Seek(0, io.SeekEnd)
if err != nil {
return err
}
_, err = file.Write([]byte{0})
return err
},
valid: false,
},
}

for _, tt := range tests {
for _, eds := range edses {
t.Run(fmt.Sprintf("%s/%s", tt.name, eds.name), func(t *testing.T) {
pathODS := t.TempDir() + tt.name + eds.name
pathQ4 := pathODS + ".q4"
roots, err := share.NewAxisRoots(eds.eds)
require.NoError(t, err)
err = tt.createFile(pathODS, pathQ4, roots, eds.eds)
require.NoError(t, err)

err = ValidateODSQ4Size(pathODS, pathQ4, eds.eds)
require.Equal(t, tt.valid, err == nil)
})
}
}
}

// BenchmarkAxisFromODSQ4File/Size:32/ProofType:row/squareHalf:0-16 354836 3345 ns/op
// BenchmarkAxisFromODSQ4File/Size:32/ProofType:row/squareHalf:1-16 339547 3187 ns/op
// BenchmarkAxisFromODSQ4File/Size:32/ProofType:col/squareHalf:0-16 69364 16440 ns/op
Expand Down
91 changes: 91 additions & 0 deletions store/file/ods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,9 @@ package file

import (
"context"
"fmt"
"io"
"os"
"strconv"
"testing"
"time"
Expand Down Expand Up @@ -61,6 +64,94 @@ func TestODSFile(t *testing.T) {
eds.TestStreamer(ctx, t, createODSAccessorStreamer, ODSSize)
}

func TestValidateODSSize(t *testing.T) {
edses := []struct {
name string
eds *rsmt2d.ExtendedDataSquare
}{
{
name: "no padding",
eds: edstest.RandEDS(t, 8),
},
{
name: "with padding",
eds: edstest.RandEDSWithTailPadding(t, 8, 11),
},
{
name: "empty", eds: share.EmptyEDS(),
},
}

tests := []struct {
name string
createFile func(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error
valid bool
}{
{
name: "valid",
createFile: CreateODS,
valid: true,
},
{
name: "shorter",
createFile: func(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error {
err := CreateODS(path, roots, eds)
if err != nil {
return err
}
file, err := os.OpenFile(path, os.O_RDWR, 0)
if err != nil {
return err
}
defer file.Close()
info, err := file.Stat()
if err != nil {
return err
}
return file.Truncate(info.Size() - 1)
},
valid: false,
},
{
name: "longer",
createFile: func(path string, roots *share.AxisRoots, eds *rsmt2d.ExtendedDataSquare) error {
err := CreateODS(path, roots, eds)
if err != nil {
return err
}
file, err := os.OpenFile(path, os.O_RDWR, 0)
if err != nil {
return err
}
defer file.Close()
// append 1 byte to the file
_, err = file.Seek(0, io.SeekEnd)
if err != nil {
return err
}
_, err = file.Write([]byte{0})
return err
},
valid: false,
},
}

for _, tt := range tests {
for _, eds := range edses {
t.Run(fmt.Sprintf("%s/%s", tt.name, eds.name), func(t *testing.T) {
path := t.TempDir() + tt.name + eds.name
roots, err := share.NewAxisRoots(eds.eds)
require.NoError(t, err)
err = tt.createFile(path, roots, eds.eds)
require.NoError(t, err)

err = ValidateODSSize(path, eds.eds)
require.Equal(t, tt.valid, err == nil)
})
}
}
}

// BenchmarkAxisFromODSFile/Size:32/ProofType:row/squareHalf:0-16 382011 3104 ns/op
// BenchmarkAxisFromODSFile/Size:32/ProofType:row/squareHalf:1-16 9320 122408 ns/op
// BenchmarkAxisFromODSFile/Size:32/ProofType:col/squareHalf:0-16 4408911 266.5 ns/op
Expand Down
21 changes: 21 additions & 0 deletions store/file/q4.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,27 @@ func writeQ4(w io.Writer, eds *rsmt2d.ExtendedDataSquare) error {
return nil
}

func validateQ4Size(path string, eds *rsmt2d.ExtendedDataSquare) error {
f, err := os.Open(path)
if err != nil {
return fmt.Errorf("opening file: %w", err)
}
defer f.Close()

odsSize := int(eds.Width() / 2)
shareSize := len(eds.GetCell(0, 0))
expectedSize := shareSize * odsSize * odsSize

info, err := f.Stat()
if err != nil {
return fmt.Errorf("getting file info: %w", err)
}
if info.Size() != int64(expectedSize) {
return fmt.Errorf("file size mismatch: expected %d, got %d", expectedSize, info.Size())
}
return nil
}

// openQ4 opens an existing Q4 file under given FS path.
func openQ4(path string, hdr *headerV0) (*q4, error) {
f, err := os.Open(path)
Expand Down
63 changes: 63 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,6 +181,14 @@ func (s *Store) createODSQ4File(
)
}

// if file already exists, check if it's corrupted
if errors.Is(err, os.ErrExist) {
err = s.validateAndRecoverODSQ4(square, roots, height, pathODS, pathQ4)
if err != nil {
return false, err
}
}

// create hard link with height as name
err = s.linkHeight(roots.Hash(), height)
// if both file and link exist, we consider it as success
Expand All @@ -198,6 +206,29 @@ func (s *Store) createODSQ4File(
return false, nil
}

func (s *Store) validateAndRecoverODSQ4(
square *rsmt2d.ExtendedDataSquare,
roots *share.AxisRoots,
height uint64,
pathODS, pathQ4 string,
) error {
// Validate the size of the file to ensure it's not corrupted
err := file.ValidateODSQ4Size(pathODS, pathQ4, square)
if err == nil {
return nil
}
log.Warnf("ODSQ4 file with height %d is corrupted, recovering", height)
err = s.removeODSQ4(height, roots.Hash())
if err != nil {
return fmt.Errorf("removing corrupted ODSQ4 file: %w", err)
}
err = file.CreateODSQ4(pathODS, pathQ4, roots, square)
if err != nil {
return fmt.Errorf("recreating ODSQ4 file: %w", err)
}
return nil
}

func (s *Store) createODSFile(
square *rsmt2d.ExtendedDataSquare,
roots *share.AxisRoots,
Expand All @@ -214,6 +245,15 @@ func (s *Store) createODSFile(
)
}

// if file already exists, check if it's corrupted
if errors.Is(err, os.ErrExist) {
// Validate the size of the file to ensure it's not corrupted
err = s.validateAndRecoverODS(square, roots, height, pathODS)
if err != nil {
return false, err
}
}

// create hard link with height as name
err = s.linkHeight(roots.Hash(), height)
// if both file and link exist, we consider it as success
Expand All @@ -231,6 +271,29 @@ func (s *Store) createODSFile(
return false, nil
}

func (s *Store) validateAndRecoverODS(
square *rsmt2d.ExtendedDataSquare,
roots *share.AxisRoots,
height uint64,
pathODS string,
) error {
// Validate the size of the file to ensure it's not corrupted
err := file.ValidateODSSize(pathODS, square)
if err == nil {
return nil
}
log.Warnf("ODS file with height %d is corrupted, recovering", height)
err = s.removeODS(height, roots.Hash())
if err != nil {
return fmt.Errorf("removing corrupted ODS file: %w", err)
}
err = file.CreateODS(pathODS, roots, square)
if err != nil {
return fmt.Errorf("recreating ODS file: %w", err)
}
return nil
}

func (s *Store) linkHeight(datahash share.DataHash, height uint64) error {
linktoOds := s.heightToPath(height, odsFileExt)
if datahash.IsEmptyEDS() {
Expand Down
Loading

0 comments on commit 4f1f14a

Please sign in to comment.