From bf963e3258bb563fa66c9ca2f12a78e696b4b4bc Mon Sep 17 00:00:00 2001 From: eladlachmi <110764839+eladlachmi@users.noreply.github.com> Date: Sun, 26 May 2024 11:51:17 +0300 Subject: [PATCH] Fix lakectl local commits remote changes outside synced prefix (#7796) * Fix lakectl local commits remote changes outside synced prefix --- cmd/lakectl/cmd/local_commit.go | 59 ++++++++ cmd/lakectl/cmd/local_commit_test.go | 216 +++++++++++++++++++++++++++ docs/reference/cli.md | 1 + esti/lakectl_local_test.go | 139 +++++++++++++++++ esti/lakectl_util.go | 14 +- 5 files changed, 427 insertions(+), 2 deletions(-) create mode 100644 cmd/lakectl/cmd/local_commit_test.go diff --git a/cmd/lakectl/cmd/local_commit.go b/cmd/lakectl/cmd/local_commit.go index 0bcd4f627fb..b6cd79b5c50 100644 --- a/cmd/lakectl/cmd/local_commit.go +++ b/cmd/lakectl/cmd/local_commit.go @@ -1,13 +1,17 @@ package cmd import ( + "context" "fmt" "net/http" + "path/filepath" + "slices" "strings" "github.com/go-openapi/swag" "github.com/spf13/cobra" "github.com/treeverse/lakefs/pkg/api/apigen" + "github.com/treeverse/lakefs/pkg/api/apiutil" "github.com/treeverse/lakefs/pkg/diff" "github.com/treeverse/lakefs/pkg/git" "github.com/treeverse/lakefs/pkg/local" @@ -15,6 +19,10 @@ import ( "golang.org/x/sync/errgroup" ) +const ( + asciiCharAfterSlash = "0" +) + func findConflicts(changes local.Changes) (conflicts []string) { for _, c := range changes { if c.Type == local.ChangeTypeConflict { @@ -24,6 +32,50 @@ func findConflicts(changes local.Changes) (conflicts []string) { return } +func hasExternalChange(ctx context.Context, client *apigen.ClientWithResponses, remote *uri.URI, idx *local.Index) bool { + currentURI, err := idx.GetCurrentURI() + if err != nil { + DieErr(err) + } + + // Get first uncommitted change. If it's outside the local prefix, we're done + dirtyResp, err := client.DiffBranchWithResponse(ctx, remote.Repository, remote.Ref, &apigen.DiffBranchParams{ + Amount: apiutil.Ptr(apigen.PaginationAmount(1)), + }) + DieOnErrorOrUnexpectedStatusCode(dirtyResp, err, http.StatusOK) + + if len(dirtyResp.JSON200.Results) == 0 { + return false + } + if slices.ContainsFunc(dirtyResp.JSON200.Results, func(diff apigen.Diff) bool { + return diff.PathType == "object" && !strings.HasPrefix(diff.Path, *currentURI.Path) + }) { + return true + } + + // Get the first uncommitted change after the prefix. If it exists, we're also done + nextPrefix := fmt.Sprintf("%s%s", filepath.Clean(*currentURI.Path), asciiCharAfterSlash) + dirtyResp, err = client.DiffBranchWithResponse(ctx, remote.Repository, remote.Ref, &apigen.DiffBranchParams{ + Amount: apiutil.Ptr(apigen.PaginationAmount(1)), + After: apiutil.Ptr(apigen.PaginationAfter(nextPrefix)), + }) + DieOnErrorOrUnexpectedStatusCode(dirtyResp, err, http.StatusOK) + + // The above gives us SeekGT. Since we need SeekGE, we do another stat for exact match + statResp, err := client.StatObjectWithResponse(ctx, remote.Repository, remote.Ref, &apigen.StatObjectParams{ + Path: nextPrefix, + }) + if err != nil { + DieErr(err) + } + + if len(dirtyResp.JSON200.Results) > 0 || statResp.StatusCode() == http.StatusOK { + return true + } + + return false +} + var localCommitCmd = &cobra.Command{ Use: "commit [directory]", Short: "Commit changes from local directory to the lakeFS branch it tracks.", @@ -33,6 +85,7 @@ var localCommitCmd = &cobra.Command{ _, localPath := getSyncArgs(args, false, false) syncFlags := getSyncFlags(cmd, client) message, kvPairs := getCommitFlags(cmd) + force := Must(cmd.Flags().GetBool(localForceFlagName)) idx, err := local.ReadIndex(localPath) if err != nil { @@ -56,6 +109,11 @@ var localCommitCmd = &cobra.Command{ resp, err := client.GetBranchWithResponse(cmd.Context(), remote.Repository, remote.Ref) DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK) + hasChangesOutsideSyncedPrefix := hasExternalChange(cmd.Context(), client, remote, idx) + if hasChangesOutsideSyncedPrefix && !force { + DieFmt("Branch %s contains uncommitted changes outside of local path '%s'.\nTo proceed, use the --force flag.", remote.Ref, localPath) + } + // Diff local with current head baseRemote := remote.WithRef(idx.AtHead) changes := localDiff(cmd.Context(), client, baseRemote, idx.LocalPath()) @@ -170,6 +228,7 @@ var localCommitCmd = &cobra.Command{ //nolint:gochecknoinits func init() { + withForceFlag(localCommitCmd, "Commit changes even if remote branch includes uncommitted changes external to the synced path") withCommitFlags(localCommitCmd, false) withSyncFlags(localCommitCmd) localCmd.AddCommand(localCommitCmd) diff --git a/cmd/lakectl/cmd/local_commit_test.go b/cmd/lakectl/cmd/local_commit_test.go new file mode 100644 index 00000000000..c01a47912db --- /dev/null +++ b/cmd/lakectl/cmd/local_commit_test.go @@ -0,0 +1,216 @@ +package cmd + +import ( + "context" + "encoding/json" + "fmt" + "net/http" + "net/http/httptest" + "strings" + "testing" + + "github.com/deepmap/oapi-codegen/pkg/securityprovider" + "github.com/stretchr/testify/require" + "github.com/treeverse/lakefs/pkg/api/apigen" + "github.com/treeverse/lakefs/pkg/api/apiutil" + "github.com/treeverse/lakefs/pkg/local" + "github.com/treeverse/lakefs/pkg/uri" +) + +const ( + defaultAdminAccessKeyID = "AKIAIOSFDNN7EXAMPLEQ" + defaultAdminSecretAccessKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY" +) + +func getTestClient(t *testing.T, endpoint string) *apigen.ClientWithResponses { + t.Helper() + transport := http.DefaultTransport.(*http.Transport).Clone() + transport.MaxIdleConnsPerHost = DefaultMaxIdleConnsPerHost + httpClient := &http.Client{ + Transport: transport, + } + basicAuthProvider, err := securityprovider.NewSecurityProviderBasicAuth(string(defaultAdminAccessKeyID), string(defaultAdminSecretAccessKey)) + require.NoError(t, err) + + serverEndpoint, err := apiutil.NormalizeLakeFSEndpoint(endpoint) + require.NoError(t, err) + + client, err := apigen.NewClientWithResponses( + serverEndpoint, + apigen.WithHTTPClient(httpClient), + apigen.WithRequestEditorFn(basicAuthProvider.Intercept), + ) + require.NoError(t, err) + + return client +} + +func TestUncommittedOutsideOfPrefix(t *testing.T) { + prefix := "xyzzy/" + remote := &uri.URI{ + Repository: "test", + Ref: "test", + } + idx := &local.Index{ + PathURI: fmt.Sprintf("lakefs://test/test/%s", prefix), + ActiveOperation: "", + } + + testCases := []struct { + name string + h http.HandlerFunc + expectedResult bool + }{ + { + name: "Uncommitted changes - none", + h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + res := &apigen.DiffList{ + Results: []apigen.Diff{}, + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(res) + }), + expectedResult: false, + }, + { + name: "Uncommitted changes - outside", + h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + res := &apigen.DiffList{ + Results: []apigen.Diff{ + { + PathType: "object", + Path: "otherPrefix/a", + }, + }, + } + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Content-Type-Options", "nosniff") + w.WriteHeader(http.StatusOK) + json.NewEncoder(w).Encode(res) + }), + expectedResult: true, + }, + { + name: "Uncommitted changes - inside", + h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var res *apigen.DiffList + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Content-Type-Options", "nosniff") + if strings.Contains(r.RequestURI, "/diff?amount") { + res = &apigen.DiffList{ + Results: []apigen.Diff{ + { + PathType: "object", + Path: fmt.Sprintf("%sa", prefix), + }, + }, + } + w.WriteHeader(http.StatusOK) + } + + if strings.Contains(r.RequestURI, "/diff?after") { + res = &apigen.DiffList{ + Results: []apigen.Diff{}, + } + w.WriteHeader(http.StatusOK) + } + + if strings.Contains(r.RequestURI, "/stat?path") { + res = &apigen.DiffList{} + w.WriteHeader(http.StatusNotFound) + } + require.NotNil(t, res, "Unexpected request") + json.NewEncoder(w).Encode(res) + }), + expectedResult: false, + }, + { + name: "Uncommitted changes - inside before outside", + h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var res *apigen.DiffList + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Content-Type-Options", "nosniff") + if strings.Contains(r.RequestURI, "/diff?amount") { + res = &apigen.DiffList{ + Results: []apigen.Diff{ + { + PathType: "object", + Path: fmt.Sprintf("%sa", prefix), + }, + }, + } + w.WriteHeader(http.StatusOK) + } + + if strings.Contains(r.RequestURI, "/diff?after") { + res = &apigen.DiffList{ + Results: []apigen.Diff{ + { + PathType: "object", + Path: "zzz/a", + }, + }, + } + w.WriteHeader(http.StatusOK) + } + + if strings.Contains(r.RequestURI, "/stat?path") { + res = &apigen.DiffList{} + w.WriteHeader(http.StatusNotFound) + } + + require.NotNil(t, res, "Unexpected request") + json.NewEncoder(w).Encode(res) + }), + expectedResult: true, + }, + { + name: "Uncommitted changes - on the boundry", + h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + var res *apigen.DiffList + w.Header().Set("Content-Type", "application/json") + w.Header().Set("X-Content-Type-Options", "nosniff") + if strings.Contains(r.RequestURI, "/diff?amount") { + res = &apigen.DiffList{ + Results: []apigen.Diff{ + { + PathType: "object", + Path: fmt.Sprintf("%sa", prefix), + }, + }, + } + w.WriteHeader(http.StatusOK) + } + + if strings.Contains(r.RequestURI, "/diff?after") { + res = &apigen.DiffList{ + Results: []apigen.Diff{}, + } + w.WriteHeader(http.StatusOK) + } + + if strings.Contains(r.RequestURI, "/stat?path") { + // we only look at the status, so we don't care about the body + res = &apigen.DiffList{} + w.WriteHeader(http.StatusOK) + } + + require.NotNil(t, res, "Unexpected request") + json.NewEncoder(w).Encode(res) + }), + expectedResult: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + server := httptest.NewServer(tc.h) + defer server.Close() + + testClient := getTestClient(t, server.URL) + res := hasExternalChange(context.Background(), testClient, remote, idx) + require.Equal(t, tc.expectedResult, res) + }) + } +} diff --git a/docs/reference/cli.md b/docs/reference/cli.md index 9b4a4d5d25b..dedfc80d257 100644 --- a/docs/reference/cli.md +++ b/docs/reference/cli.md @@ -2575,6 +2575,7 @@ lakectl local commit [directory] [flags] ``` --allow-empty-message allow an empty commit message + --force Commit changes even if remote branch includes uncommitted changes external to the synced path -h, --help help for commit -m, --message string commit message --meta strings key value pair in the form of key=value diff --git a/esti/lakectl_local_test.go b/esti/lakectl_local_test.go index 64abd8efe60..7c732155a1b 100644 --- a/esti/lakectl_local_test.go +++ b/esti/lakectl_local_test.go @@ -464,6 +464,145 @@ func TestLakectlLocal_commit(t *testing.T) { } } +func TestLakectlLocal_commit_remote_uncommitted(t *testing.T) { + tmpDir := t.TempDir() + fd, err := os.CreateTemp(tmpDir, "") + require.NoError(t, err) + require.NoError(t, fd.Close()) + repoName := generateUniqueRepositoryName() + storage := generateUniqueStorageNamespace(repoName) + vars := map[string]string{ + "REPO": repoName, + "STORAGE": storage, + "BRANCH": mainBranch, + "REF": mainBranch, + "PREFIX": "test-data", + } + + runCmd(t, fmt.Sprintf("%s repo create lakefs://%s %s", Lakectl(), repoName, storage), false, false, vars) + + testCases := []struct { + name string + uncommittedRemote []string + uncommittedLocal []string + expectFailure bool + expectedMessage string + withForceFlag bool + }{ + { + name: "uncommitted_changes_-_none", + uncommittedRemote: []string{}, + uncommittedLocal: []string{ + "test.data", + }, + expectFailure: false, + expectedMessage: "Commit for branch \"${BRANCH}\" completed", + withForceFlag: false, + }, + { + name: "uncommitted_changes_-_outside", + uncommittedRemote: []string{ + "otherPrefix/a", + }, + uncommittedLocal: []string{ + "test.data", + }, + expectFailure: true, + expectedMessage: "Branch ${BRANCH} contains uncommitted changes outside of local path '${LOCAL_DIR}'.\nTo proceed, use the --force flag.", + withForceFlag: false, + }, + { + name: "uncommitted_changes_-_inside", + uncommittedRemote: []string{ + fmt.Sprintf("%s/a", vars["PREFIX"]), + }, + uncommittedLocal: []string{ + "test.data", + }, + expectFailure: false, + expectedMessage: "Commit for branch \"${BRANCH}\" completed", + withForceFlag: false, + }, + { + name: "uncommitted_changes_-_inside_before_outside", + uncommittedRemote: []string{ + "zzz/a", + }, + uncommittedLocal: []string{ + "test.data", + }, + expectFailure: true, + expectedMessage: "Branch ${BRANCH} contains uncommitted changes outside of local path '${LOCAL_DIR}'.\nTo proceed, use the --force flag.", + withForceFlag: false, + }, + { + name: "uncommitted_changes_-_on_boundry", + uncommittedRemote: []string{ + fmt.Sprintf("%s0", vars["PREFIX"]), + }, + uncommittedLocal: []string{ + "test.data", + }, + expectFailure: true, + expectedMessage: "Branch ${BRANCH} contains uncommitted changes outside of local path '${LOCAL_DIR}'.\nTo proceed, use the --force flag.", + withForceFlag: false, + }, + { + name: "uncommitted_changes_-_outside_force", + uncommittedRemote: []string{ + "otherPrefix/a", + }, + uncommittedLocal: []string{ + "test.data", + }, + expectFailure: false, + expectedMessage: "Commit for branch \"${BRANCH}\" completed", + withForceFlag: true, + }, + } + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + dataDir, err := os.MkdirTemp(tmpDir, "") + require.NoError(t, err) + + runCmd(t, fmt.Sprintf("%s branch create lakefs://%s/%s --source lakefs://%s/%s", Lakectl(), repoName, tc.name, repoName, mainBranch), false, false, vars) + vars["LOCAL_DIR"] = dataDir + vars["BRANCH"] = tc.name + vars["REF"] = tc.name + RunCmdAndVerifyContainsText(t, fmt.Sprintf("%s local clone lakefs://%s/%s/%s %s", Lakectl(), repoName, vars["BRANCH"], vars["PREFIX"], dataDir), false, "Successfully cloned lakefs://${REPO}/${REF}/${PREFIX}/ to ${LOCAL_DIR}.", vars) + + // add remote files + if len(tc.uncommittedRemote) > 0 { + for _, f := range tc.uncommittedRemote { + vars["FILE_PATH"] = f + runCmd(t, fmt.Sprintf("%s fs upload -s files/ro_1k lakefs://%s/%s/%s", Lakectl(), vars["REPO"], vars["BRANCH"], vars["FILE_PATH"]), false, false, vars) + + } + } + + // add local files + if len(tc.uncommittedLocal) > 0 { + for _, f := range tc.uncommittedLocal { + fd, err = os.Create(filepath.Join(dataDir, f)) + require.NoError(t, err) + require.NoError(t, fd.Close()) + } + } + + force := "" + if tc.withForceFlag { + force = "--force" + } + cmd := fmt.Sprintf("%s local commit %s -m test %s", Lakectl(), force, dataDir) + if tc.expectFailure { + RunCmdAndVerifyFailureContainsText(t, cmd, false, tc.expectedMessage, vars) + } else { + RunCmdAndVerifyContainsText(t, cmd, false, tc.expectedMessage, vars) + } + }) + } +} + func TestLakectlLocal_interrupted(t *testing.T) { tmpDir := t.TempDir() repoName := generateUniqueRepositoryName() diff --git a/esti/lakectl_util.go b/esti/lakectl_util.go index 6fbc937630b..c199c392b6c 100644 --- a/esti/lakectl_util.go +++ b/esti/lakectl_util.go @@ -145,11 +145,21 @@ func RunCmdAndVerifySuccessWithFile(t *testing.T, cmd string, isTerminal bool, g } func RunCmdAndVerifyContainsText(t *testing.T, cmd string, isTerminal bool, expectedRaw string, vars map[string]string) { + t.Helper() + runCmdAndVerifyContainsText(t, cmd, false, isTerminal, expectedRaw, vars) +} + +func RunCmdAndVerifyFailureContainsText(t *testing.T, cmd string, isTerminal bool, expectedRaw string, vars map[string]string) { + t.Helper() + runCmdAndVerifyContainsText(t, cmd, true, isTerminal, expectedRaw, vars) +} + +func runCmdAndVerifyContainsText(t *testing.T, cmd string, expectFail, isTerminal bool, expectedRaw string, vars map[string]string) { t.Helper() s := sanitize(expectedRaw, vars) expected, err := expandVariables(s, vars) - require.NoErrorf(t, err, "Variable embed failed - %s", err) - sanitizedResult := runCmd(t, cmd, false, isTerminal, vars) + require.NoError(t, err, "Variable embed failed - %s", err) + sanitizedResult := runCmd(t, cmd, expectFail, isTerminal, vars) require.Contains(t, sanitizedResult, expected) }