Skip to content

Commit

Permalink
Tests for non-empty string storageID tierFS (#8616)
Browse files Browse the repository at this point in the history
* start working on a test

* basic test

* basic test

* revert mem adapter change

* reapply mem adapter change

* test with two writes

* review fixes

* review fixes
  • Loading branch information
nadavsteindler authored Feb 10, 2025
1 parent 83bb605 commit ac82f88
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 17 deletions.
7 changes: 6 additions & 1 deletion pkg/block/mem/adapter.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (

"github.com/google/uuid"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/config"
)

var (
Expand Down Expand Up @@ -74,7 +75,11 @@ func getKey(obj block.ObjectPointer) string {
if obj.IdentifierType == block.IdentifierTypeFull {
return obj.Identifier
}
return fmt.Sprintf("%s:%s", obj.StorageNamespace, obj.Identifier)
if obj.StorageID == config.SingleBlockstoreID {
return fmt.Sprintf("%s:%s", obj.StorageNamespace, obj.Identifier)
} else {
return fmt.Sprintf("%s:%s:%s", obj.StorageID, obj.StorageNamespace, obj.Identifier)
}
}

func (a *Adapter) Put(_ context.Context, obj block.ObjectPointer, _ int64, reader io.Reader, opts block.PutOpts) (*block.PutResponse, error) {
Expand Down
16 changes: 12 additions & 4 deletions pkg/pyramid/tier_fs.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@ import (
"github.com/google/uuid"
"github.com/treeverse/lakefs/pkg/block"
"github.com/treeverse/lakefs/pkg/cache"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/pyramid/params"
)
Expand Down Expand Up @@ -186,7 +187,7 @@ func (tfs *TierFS) GetRemoteURI(_ context.Context, _, filename string) (string,
// operation. Open(namespace, filename) calls will return an error before the close was
// called. Create only performs local operations so it ignores the context.
func (tfs *TierFS) Create(_ context.Context, storageID, namespace string) (StoredFile, error) {
nsPath, err := parseNamespacePath(namespace)
nsPath, err := parseNamespacePath(storageID, namespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -215,7 +216,7 @@ func (tfs *TierFS) Create(_ context.Context, storageID, namespace string) (Store
// Open returns a file descriptor to the local file.
// If the file is missing from the local disk, it will try to fetch it from the block storage.
func (tfs *TierFS) Open(ctx context.Context, storageID, namespace, filename string) (File, error) {
nsPath, err := parseNamespacePath(namespace)
nsPath, err := parseNamespacePath(storageID, namespace)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -449,7 +450,8 @@ func (tfs *TierFS) workspaceTempFilePath(namespace string) string {
return path.Join(tfs.workspaceDirPath(namespace), uuid.Must(uuid.NewRandom()).String())
}

func parseNamespacePath(namespace string) (string, error) {
// Convert the storageID and namespace to a filepath to be used for storage
func parseNamespacePath(storageID, namespace string) (string, error) {
u, err := url.Parse(namespace)
if err != nil {
return "", fmt.Errorf("parse namespace: %w", err)
Expand All @@ -467,5 +469,11 @@ func parseNamespacePath(namespace string) (string, error) {
} else {
nsPath = h + "/" + u.Path
}
return nsPath, nil

// If there is a non-empty storageID, we need to add another level to the path
if storageID == config.SingleBlockstoreID {
return nsPath, nil
} else {
return storageID + ":" + nsPath, nil
}
}
63 changes: 51 additions & 12 deletions pkg/pyramid/tier_fs_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/block/mem"
"github.com/treeverse/lakefs/pkg/config"
"github.com/treeverse/lakefs/pkg/logging"
"github.com/treeverse/lakefs/pkg/pyramid/params"
)

const (
blockStoragePrefix = "prefix"
allocatedDiskBytes = 4 * 1024 * 1024
secondaryStorageID = "another_one"
)

func TestSimpleWriteRead(t *testing.T) {
Expand All @@ -32,8 +34,10 @@ func TestSimpleWriteRead(t *testing.T) {
filename := "1/2/file1.txt"

content := []byte("hello world!")
writeToFile(t, ctx, namespace, filename, content)
checkContent(t, ctx, namespace, filename, content)
writeToFile(t, ctx, config.SingleBlockstoreID, namespace, filename, content)
err := checkContent(t, ctx, config.SingleBlockstoreID, namespace, filename, content)
require.NoError(t, err)

}

func TestReadFailDuringWrite(t *testing.T) {
Expand All @@ -53,7 +57,40 @@ func TestReadFailDuringWrite(t *testing.T) {
require.Error(t, err)
require.NoError(t, f.Close())
require.NoError(t, f.Store(ctx, filename))
checkContent(t, ctx, namespace, filename, content)
err = checkContent(t, ctx, config.SingleBlockstoreID, namespace, filename, content)
require.NoError(t, err)
}

func TestOneWriteTwoStorageIDs(t *testing.T) {
ctx := context.Background()
namespace := uniqueNamespace()
filename := "1/2/file1.txt"
content := []byte("hello world!")

// Write content to default SID
writeToFile(t, ctx, config.SingleBlockstoreID, namespace, filename, content)

// Read it from a different SID: should fail!
_, err := fs.Open(ctx, secondaryStorageID, namespace, filename)
require.ErrorContains(t, err, "not found")
}

func TestTwoWritesTwoStorageIDs(t *testing.T) {
ctx := context.Background()
namespace := uniqueNamespace()
filename := "1/2/file1.txt"
content1 := []byte("hello world!")
content2 := []byte("goodbye world!")

// Write content to two
writeToFile(t, ctx, config.SingleBlockstoreID, namespace, filename, content1)
writeToFile(t, ctx, secondaryStorageID, namespace, filename, content2)

// Check that both writes succeed
err := checkContent(t, ctx, config.SingleBlockstoreID, namespace, filename, content1)
require.NoError(t, err)
err = checkContent(t, ctx, secondaryStorageID, namespace, filename, content2)
require.NoError(t, err)
}

func TestEvictionSingleNamespace(t *testing.T) {
Expand Down Expand Up @@ -149,7 +186,7 @@ func testEviction(t *testing.T, namespaces ...string) {
if err != nil {
t.Fatal("rand.Read", err)
}
writeToFile(t, ctx, namespaces[i%len(namespaces)], filename, content)
writeToFile(t, ctx, config.SingleBlockstoreID, namespaces[i%len(namespaces)], filename, content)
}

// read
Expand All @@ -176,7 +213,7 @@ func TestMultipleConcurrentReads(t *testing.T) {
namespace := uniqueNamespace()
filename := "1/2/file1.txt"
content := []byte("hello world!")
writeToFile(t, ctx, namespace, filename, content)
writeToFile(t, ctx, config.SingleBlockstoreID, namespace, filename, content)

// remove the file
err := filepath.Walk(baseDir, func(path string, info os.FileInfo, err error) error {
Expand All @@ -194,7 +231,7 @@ func TestMultipleConcurrentReads(t *testing.T) {
for i := 0; i < concurrencyLevel; i++ {
go func() {
defer wg.Done()
checkContent(t, ctx, namespace, filename, content)
_ = checkContent(t, ctx, config.SingleBlockstoreID, namespace, filename, content)
}()
}

Expand All @@ -204,9 +241,9 @@ func TestMultipleConcurrentReads(t *testing.T) {
require.Equal(t, int64(1), adapter.GetCount())
}

func writeToFile(t *testing.T, ctx context.Context, namespace, filename string, content []byte) {
func writeToFile(t *testing.T, ctx context.Context, storageID, namespace, filename string, content []byte) {
t.Helper()
f, err := fs.Create(ctx, "", namespace)
f, err := fs.Create(ctx, storageID, namespace)
require.NoError(t, err)

n, err := f.Write(content)
Expand All @@ -217,23 +254,25 @@ func writeToFile(t *testing.T, ctx context.Context, namespace, filename string,
require.NoError(t, f.Store(ctx, filename))
}

func checkContent(t *testing.T, ctx context.Context, namespace string, filename string, content []byte) {
func checkContent(t *testing.T, ctx context.Context, storageID, namespace string, filename string, content []byte) error {
t.Helper()
f, err := fs.Open(ctx, "", namespace, filename)
f, err := fs.Open(ctx, storageID, namespace, filename)
if err != nil {
t.Errorf("Failed to open namespace:%s filename:%s - %s", namespace, filename, err)
return
return err
}
defer func() { _ = f.Close() }()

data, err := io.ReadAll(f)
if err != nil {
t.Errorf("Failed to read all namespace:%s filename:%s - %s", namespace, filename, err)
return
return err
}
if !bytes.Equal(content, data) {
t.Errorf("Content mismatch reading namespace:%s filename:%s", namespace, filename)
return err
}
return nil
}

type mockEv struct{}
Expand Down

0 comments on commit ac82f88

Please sign in to comment.