Skip to content

Commit

Permalink
Pull Requests: List pull requests (#8184)
Browse files Browse the repository at this point in the history
  • Loading branch information
N-o-Z authored Sep 19, 2024
1 parent 992c446 commit 7d52a45
Show file tree
Hide file tree
Showing 9 changed files with 485 additions and 30 deletions.
10 changes: 10 additions & 0 deletions pkg/graveler/graveler.go
Original file line number Diff line number Diff line change
Expand Up @@ -811,6 +811,14 @@ type LinkAddressIterator interface {
Close()
}

type PullsIterator interface {
Next() bool
SeekGE(id PullRequestID)
Value() *PullRequestRecord
Err() error
Close()
}

// These are the more complex internal components that compose the functionality of the Graveler

// RefManager handles references: branches, commits, probably tags in the future
Expand Down Expand Up @@ -913,6 +921,8 @@ type RefManager interface {

GetPullRequest(ctx context.Context, repository *RepositoryRecord, pullID PullRequestID) (*PullRequest, error)

ListPullRequests(ctx context.Context, repository *RepositoryRecord) (PullsIterator, error)

CreatePullRequest(ctx context.Context, repository *RepositoryRecord, pullRequestID PullRequestID, pullRequest *PullRequest) error

DeletePullRequest(ctx context.Context, repository *RepositoryRecord, pullRequestID PullRequestID) error
Expand Down
104 changes: 104 additions & 0 deletions pkg/graveler/mock/graveler.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

27 changes: 13 additions & 14 deletions pkg/graveler/model.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ const (
settingsPrefix = "settings"
importsPrefix = "imports"
repoMetadataPrefix = "repo-metadata"
pullRequestsPrefix = "pulls"
)

//nolint:gochecknoinits
Expand Down Expand Up @@ -77,10 +76,6 @@ func RepoMetadataPath() string {
return repoMetadataPrefix
}

func PullRequestPath(pullID PullRequestID) string {
return kv.FormatPath(pullRequestsPrefix, pullID.String())
}

func CommitFromProto(pb *CommitData) *Commit {
parents := make([]CommitID, 0)
for _, parent := range pb.Parents {
Expand Down Expand Up @@ -222,15 +217,19 @@ func ProtoFromRepositoryMetadata(metadata RepositoryMetadata) *RepoMetadata {
}
}

func PullRequestFromProto(pb *PullRequestData) *PullRequest {
return &PullRequest{
CreationDate: pb.CreatedAt.AsTime(),
Status: pb.Status,
Title: pb.Title,
Author: pb.Author,
Description: pb.Description,
Source: pb.SourceBranch,
Destination: pb.DestinationBranch,
func PullRequestFromProto(pb *PullRequestData) *PullRequestRecord {
return &PullRequestRecord{
ID: PullRequestID(pb.Id),
PullRequest: PullRequest{
CreationDate: pb.CreatedAt.AsTime(),
Status: pb.Status,
Title: pb.Title,
Author: pb.Author,
Description: pb.Description,
Source: pb.SourceBranch,
Destination: pb.DestinationBranch,
MergedCommitID: pb.CommitId,
},
}
}

Expand Down
61 changes: 55 additions & 6 deletions pkg/graveler/ref/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -646,15 +646,37 @@ func (m *Manager) DeleteExpiredImports(ctx context.Context, repository *graveler
}

// Pull Requests logic
// TODO (niro): In the future we would probably like to move all the PR logic into a dedicated service similar to actions.
// TODO (niro): For now we put all the logic here under a single block

const (
// pullRequestsPrefix used for repo context listing
pullRequestsPrefix = "pulls"
// PullsPartitionKey used for lookup per source-dest (future)
PullsPartitionKey = "pulls"
reposPrefix = "repos"
)

func PullRequestPath(pullID graveler.PullRequestID) string {
return kv.FormatPath(pullRequestsPrefix, pullID.String())
}

func basePullsPath(repoID string) string {
return kv.FormatPath(reposPrefix, repoID)
}

func PullBySrcDstPath(repository *graveler.RepositoryRecord, srcBranch, dstBranch string) string {
return kv.FormatPath(basePullsPath(graveler.RepoPartition(repository)), srcBranch, dstBranch)
}

func (m *Manager) getPullWithPredicate(ctx context.Context, repository *graveler.RepositoryRecord, pullID graveler.PullRequestID) (*graveler.PullRequest, kv.Predicate, error) {
type pullWithPred struct {
*graveler.PullRequest
*graveler.PullRequestRecord
kv.Predicate
}
key := fmt.Sprintf("GetPullRequest:%s:%s", repository.RepositoryID, pullID)
result, err := m.batchExecutor.BatchFor(ctx, key, m.maxBatchDelay, batch.ExecuterFunc(func() (interface{}, error) {
pullKey := graveler.PullRequestPath(pullID)
pullKey := PullRequestPath(pullID)
data := graveler.PullRequestData{}
pred, err := kv.GetMsg(context.Background(), m.kvStore, graveler.RepoPartition(repository), []byte(pullKey), &data)
if err != nil {
Expand All @@ -669,24 +691,51 @@ func (m *Manager) getPullWithPredicate(ctx context.Context, repository *graveler
return nil, nil, err
}
p := result.(*pullWithPred)
return p.PullRequest, p.Predicate, nil
return &p.PullRequest, p.Predicate, nil
}

func (m *Manager) GetPullRequest(ctx context.Context, repository *graveler.RepositoryRecord, pullID graveler.PullRequestID) (*graveler.PullRequest, error) {
pull, _, err := m.getPullWithPredicate(ctx, repository, pullID)
return pull, err
}

func (m *Manager) ListPullRequests(ctx context.Context, repository *graveler.RepositoryRecord) (graveler.PullsIterator, error) {
return NewPullsIterator(ctx, m.kvStore, repository)
}

func (m *Manager) CreatePullRequest(ctx context.Context, repository *graveler.RepositoryRecord, pullRequestID graveler.PullRequestID, pullRequest *graveler.PullRequest) error {
err := kv.SetMsgIf(ctx, m.kvStore, graveler.RepoPartition(repository), []byte(graveler.PullRequestPath(pullRequestID)), graveler.ProtoFromPullRequest(pullRequestID, pullRequest), nil)
// Save secondary index by source - dest. For now, we override the value. In the future we should allow only single src-dest to exist
secondaryKey := []byte(PullBySrcDstPath(repository, pullRequest.Source, pullRequest.Destination))
err := kv.SetMsg(ctx, m.kvStore, PullsPartitionKey, secondaryKey, &kv.SecondaryIndex{PrimaryKey: []byte(pullRequestID.String())})
if err != nil {
return fmt.Errorf("save secondary index by src-dest (key %s): %w", secondaryKey, err)
}

// Save primary
err = kv.SetMsgIf(ctx, m.kvStore, graveler.RepoPartition(repository), []byte(PullRequestPath(pullRequestID)), graveler.ProtoFromPullRequest(pullRequestID, pullRequest), nil)
if errors.Is(err, kv.ErrPredicateFailed) {
err = graveler.ErrPullRequestExists
}
return err
}

func (m *Manager) DeletePullRequest(ctx context.Context, repository *graveler.RepositoryRecord, pullRequestID graveler.PullRequestID) error {
pullKey := graveler.PullRequestPath(pullRequestID)
pr, _, err := m.getPullWithPredicate(ctx, repository, pullRequestID)
if err != nil {
if errors.Is(err, graveler.ErrPullRequestNotFound) { // Ignore if not exists
return nil
}
return err
}

// Delete secondary key
secondaryKey := []byte(PullBySrcDstPath(repository, pr.Source, pr.Destination))
if err = m.kvStore.Delete(ctx, []byte(PullsPartitionKey), secondaryKey); err != nil {
return fmt.Errorf("delete secondary index by src-dest (key %s): %w", secondaryKey, err)
}

// Delete primary key
pullKey := PullRequestPath(pullRequestID)
return m.kvStore.Delete(ctx, []byte(graveler.RepoPartition(repository)), []byte(pullKey))
}

Expand All @@ -700,5 +749,5 @@ func (m *Manager) UpdatePullRequest(ctx context.Context, repository *graveler.Re
if err != nil || newPull == nil {
return err
}
return kv.SetMsgIf(ctx, m.kvStore, graveler.RepoPartition(repository), []byte(graveler.PullRequestPath(pullRequestID)), graveler.ProtoFromPullRequest(pullRequestID, newPull), pred)
return kv.SetMsgIf(ctx, m.kvStore, graveler.RepoPartition(repository), []byte(PullRequestPath(pullRequestID)), graveler.ProtoFromPullRequest(pullRequestID, newPull), pred)
}
25 changes: 20 additions & 5 deletions pkg/graveler/ref/manager_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1228,7 +1228,7 @@ func TestManager_SetRepositoryMetadata(t *testing.T) {
}

func TestManager_GetPullRequest(t *testing.T) {
r, _ := testRefManager(t)
r, store := testRefManager(t)
repository, err := r.CreateRepository(context.Background(), "repo1", graveler.Repository{
StorageNamespace: "s3://",
CreationDate: time.Now(),
Expand All @@ -1239,7 +1239,7 @@ func TestManager_GetPullRequest(t *testing.T) {

t.Run("get_pull_request_exists", func(t *testing.T) {
expected := graveler.PullRequestRecord{
ID: "",
ID: "pullID",
PullRequest: graveler.PullRequest{
CreationDate: time.Now().UTC(),
Status: graveler.PullRequestStatus_CLOSED,
Expand All @@ -1248,11 +1248,20 @@ func TestManager_GetPullRequest(t *testing.T) {
Description: "some description",
Source: "dev",
Destination: "main",
MergedCommitID: "",
MergedCommitID: "abc",
},
}
require.NoError(t, r.CreatePullRequest(ctx, repository, expected.ID, &expected.PullRequest))
pull, err := r.GetPullRequest(ctx, repository, expected.ID)

// Verify secondary index
data, err := store.Get(ctx, []byte(ref.PullsPartitionKey), []byte(ref.PullBySrcDstPath(repository, expected.Source, expected.Destination)))
require.NoError(t, err)
sec := kv.SecondaryIndex{}
require.NoError(t, proto.Unmarshal(data.Value, &sec))
require.Equal(t, expected.ID.String(), string(sec.PrimaryKey))

// Verify we can get pull from secondary index
pull, err := r.GetPullRequest(ctx, repository, graveler.PullRequestID(sec.PrimaryKey))
require.NoError(t, err)
require.Equal(t, expected.PullRequest, *pull)
})
Expand All @@ -1264,7 +1273,7 @@ func TestManager_GetPullRequest(t *testing.T) {
}

func TestManager_DeletePullRequest(t *testing.T) {
r, _ := testRefManager(t)
r, store := testRefManager(t)
repository, err := r.CreateRepository(context.Background(), "repo1", graveler.Repository{
StorageNamespace: "s3://",
CreationDate: time.Now(),
Expand All @@ -1288,8 +1297,14 @@ func TestManager_DeletePullRequest(t *testing.T) {
},
}
require.NoError(t, r.CreatePullRequest(ctx, repository, rec.ID, &rec.PullRequest))

// Delete Pull request
err := r.DeletePullRequest(ctx, repository, rec.ID)
require.NoError(t, err)

// Verify secondary index deleted
_, err = store.Get(ctx, []byte(ref.PullsPartitionKey), []byte(ref.PullBySrcDstPath(repository, rec.Source, rec.Destination)))
require.ErrorIs(t, err, kv.ErrNotFound)
})

t.Run("delete_pull_request_doesnt_exists", func(t *testing.T) {
Expand Down
Loading

0 comments on commit 7d52a45

Please sign in to comment.