From 46ef1c74b4431f848c1cf660848b469b92b4c7bb Mon Sep 17 00:00:00 2001 From: James Cotter Date: Thu, 7 Nov 2024 15:31:44 +0000 Subject: [PATCH 1/8] clients/v1: add RolesService client --- clients/v1/clients.pb.go | 2 + clientsv1_roles.go | 92 ++++++++++++++++++++++++++++++++++++++++ clientv1.go | 4 ++ go.mod | 2 +- 4 files changed, 99 insertions(+), 1 deletion(-) create mode 100644 clientsv1_roles.go diff --git a/clients/v1/clients.pb.go b/clients/v1/clients.pb.go index 6953b0b..370a3e9 100644 --- a/clients/v1/clients.pb.go +++ b/clients/v1/clients.pb.go @@ -849,6 +849,8 @@ type RegisterRoleResourcesRequestMetadata struct { // Upon completing the streaming request, any `resource_type` resources with a different revision will be removed. Revision string `protobuf:"bytes,1,opt,name=revision,proto3" json:"revision,omitempty"` // The type of resources being registered. + // Should be a valid resource type as defined in the `roles` package: + // https://github.com/sourcegraph/sourcegraph-accounts-sdk-go/blob/main/roles/ ResourceType string `protobuf:"bytes,2,opt,name=resource_type,json=resourceType,proto3" json:"resource_type,omitempty"` } diff --git a/clientsv1_roles.go b/clientsv1_roles.go new file mode 100644 index 0000000..6b97653 --- /dev/null +++ b/clientsv1_roles.go @@ -0,0 +1,92 @@ +package sams + +import ( + "context" + "slices" + + "connectrpc.com/connect" + "golang.org/x/oauth2" + + "github.com/google/uuid" + clientsv1 "github.com/sourcegraph/sourcegraph-accounts-sdk-go/clients/v1" + "github.com/sourcegraph/sourcegraph-accounts-sdk-go/clients/v1/clientsv1connect" + "github.com/sourcegraph/sourcegraph-accounts-sdk-go/roles" + "github.com/sourcegraph/sourcegraph/lib/errors" +) + +// RolesServiceV1 provides client methods to interact with the +// RolesService API v1. +type RolesServiceV1 struct { + client *ClientV1 +} + +func (s *RolesServiceV1) newClient(ctx context.Context) clientsv1connect.RolesServiceClient { + return clientsv1connect.NewRolesServiceClient( + oauth2.NewClient(ctx, s.client.tokenSource), + s.client.gRPCURL(), + connect.WithInterceptors(s.client.defaultInterceptors...), + ) +} + +// RegisterResourcesMetadata is the metadata for registering resources. +type RegisterResourcesMetadata struct { + ResourceType roles.ResourceType + Revision uuid.UUID +} + +func (r RegisterResourcesMetadata) validate() error { + if !slices.Contains(roles.ResourceTypes(), r.ResourceType) { + return errors.New("invalid resource type") + } + return nil +} + +// RegisterRoleResources registers the resources for a given resource type. +// `resourcesIterator` is a function that returns a list of resources to register. +// The function is invoked repeatedly until it produces an empty slice or an error. +// +// Required scope: sams::roles.resources::write +func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata RegisterResourcesMetadata, resourcesIterator func() ([]*clientsv1.RoleResource, error)) (uint64, error) { + err := metadata.validate() + if err != nil { + return 0, errors.Wrap(err, "invalid metadata") + } + + client := s.newClient(ctx) + stream := client.RegisterRoleResources(ctx) + err = stream.Send(&clientsv1.RegisterRoleResourcesRequest{ + Payload: &clientsv1.RegisterRoleResourcesRequest_Metadata{ + Metadata: &clientsv1.RegisterRoleResourcesRequestMetadata{ + ResourceType: string(metadata.ResourceType), + Revision: metadata.Revision.String(), + }, + }, + }) + if err != nil { + return 0, errors.Wrap(err, "failed to send metadata") + } + for { + resources, err := resourcesIterator() + if err != nil { + return 0, errors.Wrap(err, "failed to get resources") + } + if len(resources) == 0 { + break + } + err = stream.Send(&clientsv1.RegisterRoleResourcesRequest{ + Payload: &clientsv1.RegisterRoleResourcesRequest_Resources_{ + Resources: &clientsv1.RegisterRoleResourcesRequest_Resources{ + Resources: resources, + }, + }, + }) + if err != nil { + return 0, errors.Wrap(err, "failed to send resources") + } + } + resp, err := stream.CloseAndReceive() + if err != nil { + return 0, errors.Wrap(err, "failed to close stream") + } + return resp.Msg.GetResourceCount(), nil +} diff --git a/clientv1.go b/clientv1.go index cbfbafe..5465cf7 100644 --- a/clientv1.go +++ b/clientv1.go @@ -119,6 +119,10 @@ func (c *ClientV1) Tokens() *TokensServiceV1 { return &TokensServiceV1{client: c} } +func (c *ClientV1) Roles() *RolesServiceV1 { + return &RolesServiceV1{client: c} +} + var ( ErrNotFound = errors.New("not found") ErrRecordMismatch = errors.New("record mismatch") diff --git a/go.mod b/go.mod index f00eaa6..1f46374 100644 --- a/go.mod +++ b/go.mod @@ -8,6 +8,7 @@ require ( connectrpc.com/otelconnect v0.7.1 github.com/coreos/go-oidc/v3 v3.11.0 github.com/golang-jwt/jwt/v5 v5.2.1 + github.com/google/uuid v1.6.0 github.com/hexops/autogold/v2 v2.2.1 github.com/hexops/valast v1.4.4 github.com/lestrrat-go/jwx/v2 v2.1.1 @@ -45,7 +46,6 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/google/go-cmp v0.6.0 // indirect github.com/google/s2a-go v0.1.8 // indirect - github.com/google/uuid v1.6.0 // indirect github.com/googleapis/enterprise-certificate-proxy v0.3.2 // indirect github.com/googleapis/gax-go/v2 v2.13.0 // indirect github.com/hexops/gotextdiff v1.0.3 // indirect From 16c54a5d3875746cc93ebc8974a9e1edc1d18fc4 Mon Sep 17 00:00:00 2001 From: James Cotter Date: Fri, 8 Nov 2024 18:54:03 +0000 Subject: [PATCH 2/8] handle server abort when another replica is registering --- clientsv1_roles.go | 27 +++++++++++++++++++++------ clientv1.go | 5 +++++ 2 files changed, 26 insertions(+), 6 deletions(-) diff --git a/clientsv1_roles.go b/clientsv1_roles.go index 6b97653..0f9be0b 100644 --- a/clientsv1_roles.go +++ b/clientsv1_roles.go @@ -2,6 +2,7 @@ package sams import ( "context" + "io" "slices" "connectrpc.com/connect" @@ -28,7 +29,7 @@ func (s *RolesServiceV1) newClient(ctx context.Context) clientsv1connect.RolesSe ) } -// RegisterResourcesMetadata is the metadata for registering resources. +// RegisterResourcesMetadata is the metadata for a set of resources to be registered. type RegisterResourcesMetadata struct { ResourceType roles.ResourceType Revision uuid.UUID @@ -42,8 +43,9 @@ func (r RegisterResourcesMetadata) validate() error { } // RegisterRoleResources registers the resources for a given resource type. -// `resourcesIterator` is a function that returns a list of resources to register. +// `resourcesIterator` is a function that returns a page of resources to register. // The function is invoked repeatedly until it produces an empty slice or an error. +// If another replica is already registering the same resources, the function will return 0 without an error. // // Required scope: sams::roles.resources::write func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata RegisterResourcesMetadata, resourcesIterator func() ([]*clientsv1.RoleResource, error)) (uint64, error) { @@ -54,6 +56,7 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg client := s.newClient(ctx) stream := client.RegisterRoleResources(ctx) + // Metadata must be submitted first in the stream. err = stream.Send(&clientsv1.RegisterRoleResourcesRequest{ Payload: &clientsv1.RegisterRoleResourcesRequest_Metadata{ Metadata: &clientsv1.RegisterRoleResourcesRequestMetadata{ @@ -62,10 +65,16 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg }, }, }) + + sendResources := true if err != nil { + // The stream has been closed; skip sending resources. + if errors.Is(err, io.EOF) { + sendResources = false + } return 0, errors.Wrap(err, "failed to send metadata") } - for { + for sendResources { resources, err := resourcesIterator() if err != nil { return 0, errors.Wrap(err, "failed to get resources") @@ -81,12 +90,18 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg }, }) if err != nil { + // The stream has been closed, so we stop sending resources. + if errors.Is(err, io.EOF) { + break + } return 0, errors.Wrap(err, "failed to send resources") } } - resp, err := stream.CloseAndReceive() - if err != nil { - return 0, errors.Wrap(err, "failed to close stream") + + resp, err := parseResponseAndError(stream.CloseAndReceive()) + // Stream closed due to another replica registering the same resources. + if errors.Is(err, ErrAborted) { + return 0, nil } return resp.Msg.GetResourceCount(), nil } diff --git a/clientv1.go b/clientv1.go index 5465cf7..a5fa115 100644 --- a/clientv1.go +++ b/clientv1.go @@ -82,6 +82,10 @@ func parseResponseAndError[T any](resp *connect.Response[T], err error) (*connec return nil, ErrNotFound } + if connectErr.Code() == connect.CodeAborted { + return nil, ErrAborted + } + // Cannot determine action solely based on status code, let's look at the error // details. for _, detail := range connectErr.Details() { @@ -126,6 +130,7 @@ func (c *ClientV1) Roles() *RolesServiceV1 { var ( ErrNotFound = errors.New("not found") ErrRecordMismatch = errors.New("record mismatch") + ErrAborted = errors.New("aborted") ) // ClientCredentialsTokenSource returns a TokenSource that generates an access From a0dcb1e07da34e1973d20c8dfa12225b3905c363 Mon Sep 17 00:00:00 2001 From: James Cotter Date: Fri, 8 Nov 2024 18:56:10 +0000 Subject: [PATCH 3/8] fix --- clientsv1_roles.go | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/clientsv1_roles.go b/clientsv1_roles.go index 0f9be0b..d6b7c3f 100644 --- a/clientsv1_roles.go +++ b/clientsv1_roles.go @@ -71,8 +71,9 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg // The stream has been closed; skip sending resources. if errors.Is(err, io.EOF) { sendResources = false + } else { + return 0, errors.Wrap(err, "failed to send metadata") } - return 0, errors.Wrap(err, "failed to send metadata") } for sendResources { resources, err := resourcesIterator() From bd792cf16a80cacf8d673fdb0735c9400a6a39a9 Mon Sep 17 00:00:00 2001 From: James Cotter Date: Sat, 9 Nov 2024 18:03:56 +0000 Subject: [PATCH 4/8] add documentation around abort behaviour --- clients/v1/clients.proto | 4 ++++ clients/v1/clientsv1connect/clients.connect.go | 8 ++++++++ clientsv1_roles.go | 9 ++++++--- clientv1.go | 8 +++++++- 4 files changed, 25 insertions(+), 4 deletions(-) diff --git a/clients/v1/clients.proto b/clients/v1/clients.proto index 2159502..addc72a 100644 --- a/clients/v1/clients.proto +++ b/clients/v1/clients.proto @@ -168,6 +168,10 @@ message IntrospectTokenResponse { } service RolesService { + // RegisterRoleResources registers resources with SAMS. + // + // Only one client can register resources at a time. + // If another client is currently registering resources, this request will return an Aborted error. rpc RegisterRoleResources (stream RegisterRoleResourcesRequest) returns (RegisterRoleResourcesResponse) { option (sams_required_scopes) = "sams::roles.resources::write"; }; diff --git a/clients/v1/clientsv1connect/clients.connect.go b/clients/v1/clientsv1connect/clients.connect.go index 7e34c37..02c2c2d 100644 --- a/clients/v1/clientsv1connect/clients.connect.go +++ b/clients/v1/clientsv1connect/clients.connect.go @@ -421,6 +421,10 @@ func (UnimplementedTokensServiceHandler) IntrospectToken(context.Context, *conne // RolesServiceClient is a client for the clients.v1.RolesService service. type RolesServiceClient interface { + // RegisterRoleResources registers resources with SAMS. + // + // Only one client can register resources at a time. + // If another client is currently registering resources, this request will return an Aborted error. RegisterRoleResources(context.Context) *connect.ClientStreamForClient[v1.RegisterRoleResourcesRequest, v1.RegisterRoleResourcesResponse] } @@ -455,6 +459,10 @@ func (c *rolesServiceClient) RegisterRoleResources(ctx context.Context) *connect // RolesServiceHandler is an implementation of the clients.v1.RolesService service. type RolesServiceHandler interface { + // RegisterRoleResources registers resources with SAMS. + // + // Only one client can register resources at a time. + // If another client is currently registering resources, this request will return an Aborted error. RegisterRoleResources(context.Context, *connect.ClientStream[v1.RegisterRoleResourcesRequest]) (*connect.Response[v1.RegisterRoleResourcesResponse], error) } diff --git a/clientsv1_roles.go b/clientsv1_roles.go index d6b7c3f..8c1f8a6 100644 --- a/clientsv1_roles.go +++ b/clientsv1_roles.go @@ -100,9 +100,12 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg } resp, err := parseResponseAndError(stream.CloseAndReceive()) - // Stream closed due to another replica registering the same resources. - if errors.Is(err, ErrAborted) { - return 0, nil + if err != nil { + // Stream closed due to another replica registering the same resources. + if errors.Is(err, ErrAborted) { + return 0, nil + } + return 0, err } return resp.Msg.GetResourceCount(), nil } diff --git a/clientv1.go b/clientv1.go index a5fa115..eaf825e 100644 --- a/clientv1.go +++ b/clientv1.go @@ -82,6 +82,9 @@ func parseResponseAndError[T any](resp *connect.Response[T], err error) (*connec return nil, ErrNotFound } + // connect.CodeAborted is returned due to a concurrency conflict. + // e.g. Two clients trying to register role resources at the same time for the same resource type. + // It is safe to retry the request at a later time. if connectErr.Code() == connect.CodeAborted { return nil, ErrAborted } @@ -130,7 +133,10 @@ func (c *ClientV1) Roles() *RolesServiceV1 { var ( ErrNotFound = errors.New("not found") ErrRecordMismatch = errors.New("record mismatch") - ErrAborted = errors.New("aborted") + // ErrAborted is returned due to a concurrency conflict. + // e.g. Two clients trying to perform an operation at the same time for the same resource. + // It is safe to retry the request at a later time. + ErrAborted = errors.New("aborted") ) // ClientCredentialsTokenSource returns a TokenSource that generates an access From 8f90a9f22ad91841bff078b2e9ad7287038caa8a Mon Sep 17 00:00:00 2001 From: James Cotter Date: Mon, 11 Nov 2024 17:54:05 +0000 Subject: [PATCH 5/8] feedback --- clients/v1/clients.proto | 5 +++-- clients/v1/clientsv1connect/clients.connect.go | 10 ++++++---- clientsv1_roles.go | 17 +++++++++++++---- roles/roles.go | 12 ++++++++++++ 4 files changed, 34 insertions(+), 10 deletions(-) diff --git a/clients/v1/clients.proto b/clients/v1/clients.proto index addc72a..4a3f2e4 100644 --- a/clients/v1/clients.proto +++ b/clients/v1/clients.proto @@ -170,8 +170,9 @@ message IntrospectTokenResponse { service RolesService { // RegisterRoleResources registers resources with SAMS. // - // Only one client can register resources at a time. - // If another client is currently registering resources, this request will return an Aborted error. + // Only one client can register resources at a time for a particular resource type. + // If another client is currently registering resources for the same resource type + // this request will return an Aborted error. rpc RegisterRoleResources (stream RegisterRoleResourcesRequest) returns (RegisterRoleResourcesResponse) { option (sams_required_scopes) = "sams::roles.resources::write"; }; diff --git a/clients/v1/clientsv1connect/clients.connect.go b/clients/v1/clientsv1connect/clients.connect.go index 02c2c2d..ccc62ab 100644 --- a/clients/v1/clientsv1connect/clients.connect.go +++ b/clients/v1/clientsv1connect/clients.connect.go @@ -423,8 +423,9 @@ func (UnimplementedTokensServiceHandler) IntrospectToken(context.Context, *conne type RolesServiceClient interface { // RegisterRoleResources registers resources with SAMS. // - // Only one client can register resources at a time. - // If another client is currently registering resources, this request will return an Aborted error. + // Only one client can register resources at a time for a particular resource type. + // If another client is currently registering resources for the same resource type + // this request will return an Aborted error. RegisterRoleResources(context.Context) *connect.ClientStreamForClient[v1.RegisterRoleResourcesRequest, v1.RegisterRoleResourcesResponse] } @@ -461,8 +462,9 @@ func (c *rolesServiceClient) RegisterRoleResources(ctx context.Context) *connect type RolesServiceHandler interface { // RegisterRoleResources registers resources with SAMS. // - // Only one client can register resources at a time. - // If another client is currently registering resources, this request will return an Aborted error. + // Only one client can register resources at a time for a particular resource type. + // If another client is currently registering resources for the same resource type + // this request will return an Aborted error. RegisterRoleResources(context.Context, *connect.ClientStream[v1.RegisterRoleResourcesRequest]) (*connect.Response[v1.RegisterRoleResourcesResponse], error) } diff --git a/clientsv1_roles.go b/clientsv1_roles.go index 8c1f8a6..fe167f2 100644 --- a/clientsv1_roles.go +++ b/clientsv1_roles.go @@ -32,12 +32,11 @@ func (s *RolesServiceV1) newClient(ctx context.Context) clientsv1connect.RolesSe // RegisterResourcesMetadata is the metadata for a set of resources to be registered. type RegisterResourcesMetadata struct { ResourceType roles.ResourceType - Revision uuid.UUID } func (r RegisterResourcesMetadata) validate() error { if !slices.Contains(roles.ResourceTypes(), r.ResourceType) { - return errors.New("invalid resource type") + return errors.Newf("invalid resource type: %q", r.ResourceType) } return nil } @@ -45,7 +44,9 @@ func (r RegisterResourcesMetadata) validate() error { // RegisterRoleResources registers the resources for a given resource type. // `resourcesIterator` is a function that returns a page of resources to register. // The function is invoked repeatedly until it produces an empty slice or an error. -// If another replica is already registering the same resources, the function will return 0 without an error. +// If another replica is already registering resources for the same resource type +// the function will return 0 with ErrAborted. +// ErrAborted means the request is safe to retry at a later time. // // Required scope: sams::roles.resources::write func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata RegisterResourcesMetadata, resourcesIterator func() ([]*clientsv1.RoleResource, error)) (uint64, error) { @@ -54,6 +55,12 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg return 0, errors.Wrap(err, "invalid metadata") } + /// Generate a new revision for the request metadata. + revision, err := uuid.NewV7() + if err != nil { + return 0, errors.Wrap(err, "failed to generate revision for request metadata") + } + client := s.newClient(ctx) stream := client.RegisterRoleResources(ctx) // Metadata must be submitted first in the stream. @@ -61,7 +68,7 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg Payload: &clientsv1.RegisterRoleResourcesRequest_Metadata{ Metadata: &clientsv1.RegisterRoleResourcesRequestMetadata{ ResourceType: string(metadata.ResourceType), - Revision: metadata.Revision.String(), + Revision: revision.String(), }, }, }) @@ -81,6 +88,7 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg return 0, errors.Wrap(err, "failed to get resources") } if len(resources) == 0 { + sendResources = false break } err = stream.Send(&clientsv1.RegisterRoleResourcesRequest{ @@ -93,6 +101,7 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg if err != nil { // The stream has been closed, so we stop sending resources. if errors.Is(err, io.EOF) { + sendResources = false break } return 0, errors.Wrap(err, "failed to send resources") diff --git a/roles/roles.go b/roles/roles.go index 7329bbc..7985502 100644 --- a/roles/roles.go +++ b/roles/roles.go @@ -23,6 +23,18 @@ func (r Role) Service() services.Service { return services.Service(r[:strings.Index(string(r), "::")]) } +// ResourceType returns the resource type that the role is associated with. +// If the role is not registered, it returns "unknown". +func (r Role) ResourceType() ResourceType { + for _, role := range registeredRoles { + if role.id == r { + return role.resourceType + } + } + + return ResourceType("unknown") +} + // ToStrings converts a list of roles to a list of strings. func ToStrings(roles []Role) []string { ss := make([]string, len(roles)) From 5778544ac976d002bdda7feb4f37c5edeefe827f Mon Sep 17 00:00:00 2001 From: James Cotter Date: Mon, 11 Nov 2024 17:59:10 +0000 Subject: [PATCH 6/8] fix lint --- clientsv1_roles.go | 2 -- 1 file changed, 2 deletions(-) diff --git a/clientsv1_roles.go b/clientsv1_roles.go index fe167f2..6603d1b 100644 --- a/clientsv1_roles.go +++ b/clientsv1_roles.go @@ -88,7 +88,6 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg return 0, errors.Wrap(err, "failed to get resources") } if len(resources) == 0 { - sendResources = false break } err = stream.Send(&clientsv1.RegisterRoleResourcesRequest{ @@ -101,7 +100,6 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg if err != nil { // The stream has been closed, so we stop sending resources. if errors.Is(err, io.EOF) { - sendResources = false break } return 0, errors.Wrap(err, "failed to send resources") From 62b925fe96ad089819c02fb75dff7fcdd7428ead Mon Sep 17 00:00:00 2001 From: James Cotter Date: Mon, 11 Nov 2024 18:53:36 +0000 Subject: [PATCH 7/8] rename clientv1 role file --- clientsv1_roles.go => clientv1_roles.go | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename clientsv1_roles.go => clientv1_roles.go (100%) diff --git a/clientsv1_roles.go b/clientv1_roles.go similarity index 100% rename from clientsv1_roles.go rename to clientv1_roles.go From a81c292f7e02a57ec628399d5d2308889411e5b9 Mon Sep 17 00:00:00 2001 From: James Cotter Date: Tue, 12 Nov 2024 11:24:07 +0000 Subject: [PATCH 8/8] invalidate loop condition --- clientv1_roles.go | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/clientv1_roles.go b/clientv1_roles.go index 6603d1b..d485a0d 100644 --- a/clientv1_roles.go +++ b/clientv1_roles.go @@ -88,7 +88,8 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg return 0, errors.Wrap(err, "failed to get resources") } if len(resources) == 0 { - break + sendResources = false + continue } err = stream.Send(&clientsv1.RegisterRoleResourcesRequest{ Payload: &clientsv1.RegisterRoleResourcesRequest_Resources_{ @@ -100,7 +101,8 @@ func (s *RolesServiceV1) RegisterRoleResources(ctx context.Context, metadata Reg if err != nil { // The stream has been closed, so we stop sending resources. if errors.Is(err, io.EOF) { - break + sendResources = false + continue } return 0, errors.Wrap(err, "failed to send resources") }