Skip to content

Commit

Permalink
Fix lakectl local commits remote changes outside synced prefix (#7796)
Browse files Browse the repository at this point in the history
* Fix lakectl local commits remote changes outside synced prefix
  • Loading branch information
eladlachmi authored May 26, 2024
1 parent 805d429 commit bf963e3
Show file tree
Hide file tree
Showing 5 changed files with 427 additions and 2 deletions.
59 changes: 59 additions & 0 deletions cmd/lakectl/cmd/local_commit.go
Original file line number Diff line number Diff line change
@@ -1,20 +1,28 @@
package cmd

import (
"context"
"fmt"
"net/http"
"path/filepath"
"slices"
"strings"

"github.com/go-openapi/swag"
"github.com/spf13/cobra"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/diff"
"github.com/treeverse/lakefs/pkg/git"
"github.com/treeverse/lakefs/pkg/local"
"github.com/treeverse/lakefs/pkg/uri"
"golang.org/x/sync/errgroup"
)

const (
asciiCharAfterSlash = "0"
)

func findConflicts(changes local.Changes) (conflicts []string) {
for _, c := range changes {
if c.Type == local.ChangeTypeConflict {
Expand All @@ -24,6 +32,50 @@ func findConflicts(changes local.Changes) (conflicts []string) {
return
}

func hasExternalChange(ctx context.Context, client *apigen.ClientWithResponses, remote *uri.URI, idx *local.Index) bool {
currentURI, err := idx.GetCurrentURI()
if err != nil {
DieErr(err)
}

// Get first uncommitted change. If it's outside the local prefix, we're done
dirtyResp, err := client.DiffBranchWithResponse(ctx, remote.Repository, remote.Ref, &apigen.DiffBranchParams{
Amount: apiutil.Ptr(apigen.PaginationAmount(1)),
})
DieOnErrorOrUnexpectedStatusCode(dirtyResp, err, http.StatusOK)

if len(dirtyResp.JSON200.Results) == 0 {
return false
}
if slices.ContainsFunc(dirtyResp.JSON200.Results, func(diff apigen.Diff) bool {
return diff.PathType == "object" && !strings.HasPrefix(diff.Path, *currentURI.Path)
}) {
return true
}

// Get the first uncommitted change after the prefix. If it exists, we're also done
nextPrefix := fmt.Sprintf("%s%s", filepath.Clean(*currentURI.Path), asciiCharAfterSlash)
dirtyResp, err = client.DiffBranchWithResponse(ctx, remote.Repository, remote.Ref, &apigen.DiffBranchParams{
Amount: apiutil.Ptr(apigen.PaginationAmount(1)),
After: apiutil.Ptr(apigen.PaginationAfter(nextPrefix)),
})
DieOnErrorOrUnexpectedStatusCode(dirtyResp, err, http.StatusOK)

// The above gives us SeekGT. Since we need SeekGE, we do another stat for exact match
statResp, err := client.StatObjectWithResponse(ctx, remote.Repository, remote.Ref, &apigen.StatObjectParams{
Path: nextPrefix,
})
if err != nil {
DieErr(err)
}

if len(dirtyResp.JSON200.Results) > 0 || statResp.StatusCode() == http.StatusOK {
return true
}

return false
}

var localCommitCmd = &cobra.Command{
Use: "commit [directory]",
Short: "Commit changes from local directory to the lakeFS branch it tracks.",
Expand All @@ -33,6 +85,7 @@ var localCommitCmd = &cobra.Command{
_, localPath := getSyncArgs(args, false, false)
syncFlags := getSyncFlags(cmd, client)
message, kvPairs := getCommitFlags(cmd)
force := Must(cmd.Flags().GetBool(localForceFlagName))

idx, err := local.ReadIndex(localPath)
if err != nil {
Expand All @@ -56,6 +109,11 @@ var localCommitCmd = &cobra.Command{
resp, err := client.GetBranchWithResponse(cmd.Context(), remote.Repository, remote.Ref)
DieOnErrorOrUnexpectedStatusCode(resp, err, http.StatusOK)

hasChangesOutsideSyncedPrefix := hasExternalChange(cmd.Context(), client, remote, idx)
if hasChangesOutsideSyncedPrefix && !force {
DieFmt("Branch %s contains uncommitted changes outside of local path '%s'.\nTo proceed, use the --force flag.", remote.Ref, localPath)
}

// Diff local with current head
baseRemote := remote.WithRef(idx.AtHead)
changes := localDiff(cmd.Context(), client, baseRemote, idx.LocalPath())
Expand Down Expand Up @@ -170,6 +228,7 @@ var localCommitCmd = &cobra.Command{

//nolint:gochecknoinits
func init() {
withForceFlag(localCommitCmd, "Commit changes even if remote branch includes uncommitted changes external to the synced path")
withCommitFlags(localCommitCmd, false)
withSyncFlags(localCommitCmd)
localCmd.AddCommand(localCommitCmd)
Expand Down
216 changes: 216 additions & 0 deletions cmd/lakectl/cmd/local_commit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,216 @@
package cmd

import (
"context"
"encoding/json"
"fmt"
"net/http"
"net/http/httptest"
"strings"
"testing"

"github.com/deepmap/oapi-codegen/pkg/securityprovider"
"github.com/stretchr/testify/require"
"github.com/treeverse/lakefs/pkg/api/apigen"
"github.com/treeverse/lakefs/pkg/api/apiutil"
"github.com/treeverse/lakefs/pkg/local"
"github.com/treeverse/lakefs/pkg/uri"
)

const (
defaultAdminAccessKeyID = "AKIAIOSFDNN7EXAMPLEQ"
defaultAdminSecretAccessKey = "wJalrXUtnFEMI/K7MDENG/bPxRfiCYEXAMPLEKEY"
)

func getTestClient(t *testing.T, endpoint string) *apigen.ClientWithResponses {
t.Helper()
transport := http.DefaultTransport.(*http.Transport).Clone()
transport.MaxIdleConnsPerHost = DefaultMaxIdleConnsPerHost
httpClient := &http.Client{
Transport: transport,
}
basicAuthProvider, err := securityprovider.NewSecurityProviderBasicAuth(string(defaultAdminAccessKeyID), string(defaultAdminSecretAccessKey))
require.NoError(t, err)

serverEndpoint, err := apiutil.NormalizeLakeFSEndpoint(endpoint)
require.NoError(t, err)

client, err := apigen.NewClientWithResponses(
serverEndpoint,
apigen.WithHTTPClient(httpClient),
apigen.WithRequestEditorFn(basicAuthProvider.Intercept),
)
require.NoError(t, err)

return client
}

func TestUncommittedOutsideOfPrefix(t *testing.T) {
prefix := "xyzzy/"
remote := &uri.URI{
Repository: "test",
Ref: "test",
}
idx := &local.Index{
PathURI: fmt.Sprintf("lakefs://test/test/%s", prefix),
ActiveOperation: "",
}

testCases := []struct {
name string
h http.HandlerFunc
expectedResult bool
}{
{
name: "Uncommitted changes - none",
h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
res := &apigen.DiffList{
Results: []apigen.Diff{},
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(res)
}),
expectedResult: false,
},
{
name: "Uncommitted changes - outside",
h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
res := &apigen.DiffList{
Results: []apigen.Diff{
{
PathType: "object",
Path: "otherPrefix/a",
},
},
}
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Content-Type-Options", "nosniff")
w.WriteHeader(http.StatusOK)
json.NewEncoder(w).Encode(res)
}),
expectedResult: true,
},
{
name: "Uncommitted changes - inside",
h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var res *apigen.DiffList
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Content-Type-Options", "nosniff")
if strings.Contains(r.RequestURI, "/diff?amount") {
res = &apigen.DiffList{
Results: []apigen.Diff{
{
PathType: "object",
Path: fmt.Sprintf("%sa", prefix),
},
},
}
w.WriteHeader(http.StatusOK)
}

if strings.Contains(r.RequestURI, "/diff?after") {
res = &apigen.DiffList{
Results: []apigen.Diff{},
}
w.WriteHeader(http.StatusOK)
}

if strings.Contains(r.RequestURI, "/stat?path") {
res = &apigen.DiffList{}
w.WriteHeader(http.StatusNotFound)
}
require.NotNil(t, res, "Unexpected request")
json.NewEncoder(w).Encode(res)
}),
expectedResult: false,
},
{
name: "Uncommitted changes - inside before outside",
h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var res *apigen.DiffList
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Content-Type-Options", "nosniff")
if strings.Contains(r.RequestURI, "/diff?amount") {
res = &apigen.DiffList{
Results: []apigen.Diff{
{
PathType: "object",
Path: fmt.Sprintf("%sa", prefix),
},
},
}
w.WriteHeader(http.StatusOK)
}

if strings.Contains(r.RequestURI, "/diff?after") {
res = &apigen.DiffList{
Results: []apigen.Diff{
{
PathType: "object",
Path: "zzz/a",
},
},
}
w.WriteHeader(http.StatusOK)
}

if strings.Contains(r.RequestURI, "/stat?path") {
res = &apigen.DiffList{}
w.WriteHeader(http.StatusNotFound)
}

require.NotNil(t, res, "Unexpected request")
json.NewEncoder(w).Encode(res)
}),
expectedResult: true,
},
{
name: "Uncommitted changes - on the boundry",
h: http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {
var res *apigen.DiffList
w.Header().Set("Content-Type", "application/json")
w.Header().Set("X-Content-Type-Options", "nosniff")
if strings.Contains(r.RequestURI, "/diff?amount") {
res = &apigen.DiffList{
Results: []apigen.Diff{
{
PathType: "object",
Path: fmt.Sprintf("%sa", prefix),
},
},
}
w.WriteHeader(http.StatusOK)
}

if strings.Contains(r.RequestURI, "/diff?after") {
res = &apigen.DiffList{
Results: []apigen.Diff{},
}
w.WriteHeader(http.StatusOK)
}

if strings.Contains(r.RequestURI, "/stat?path") {
// we only look at the status, so we don't care about the body
res = &apigen.DiffList{}
w.WriteHeader(http.StatusOK)
}

require.NotNil(t, res, "Unexpected request")
json.NewEncoder(w).Encode(res)
}),
expectedResult: true,
},
}
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
server := httptest.NewServer(tc.h)
defer server.Close()

testClient := getTestClient(t, server.URL)
res := hasExternalChange(context.Background(), testClient, remote, idx)
require.Equal(t, tc.expectedResult, res)
})
}
}
1 change: 1 addition & 0 deletions docs/reference/cli.md
Original file line number Diff line number Diff line change
Expand Up @@ -2575,6 +2575,7 @@ lakectl local commit [directory] [flags]

```
--allow-empty-message allow an empty commit message
--force Commit changes even if remote branch includes uncommitted changes external to the synced path
-h, --help help for commit
-m, --message string commit message
--meta strings key value pair in the form of key=value
Expand Down
Loading

0 comments on commit bf963e3

Please sign in to comment.