Skip to content

Commit

Permalink
broker/fragment: region support for s3 stores (#365)
Browse files Browse the repository at this point in the history
Previously s3 stores could only use the region available via a given profile or the AWS application
default credentials. This adds the ability to set an alternate region in the endpoint URL for an s3
fragment store for buckets that are in a different region.
  • Loading branch information
williamhbaker authored Mar 11, 2024
1 parent ede1dcc commit 188c28b
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 4 deletions.
13 changes: 10 additions & 3 deletions broker/fragment/store_s3.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,16 +44,19 @@ type S3StoreConfig struct {
// SSEKMSKeyId specifies the ID for the AWS KMS symmetric customer managed key
// By default, not used.
SSEKMSKeyId string
// Region is the region for the bucket. If empty, the region is determined
// from `Profile` or the default credentials.
Region string
}

type s3Backend struct {
clients map[[2]string]*s3.S3
clients map[[3]string]*s3.S3
clientsMu sync.Mutex
}

func newS3Backend() *s3Backend {
return &s3Backend{
clients: make(map[[2]string]*s3.S3),
clients: make(map[[3]string]*s3.S3),
}
}

Expand Down Expand Up @@ -198,14 +201,18 @@ func (s *s3Backend) s3Client(ep *url.URL) (cfg S3StoreConfig, client *s3.S3, err
defer s.clientsMu.Unlock()
s.clientsMu.Lock()

var key = [2]string{cfg.Endpoint, cfg.Profile}
var key = [3]string{cfg.Endpoint, cfg.Profile, cfg.Region}
if client = s.clients[key]; client != nil {
return
}

var awsConfig = aws.NewConfig()
awsConfig.WithCredentialsChainVerboseErrors(true)

if cfg.Region != "" {
awsConfig.WithRegion(cfg.Region)
}

if cfg.Endpoint != "" {
awsConfig.WithEndpoint(cfg.Endpoint)
// We must force path style because bucket-named virtual hosts
Expand Down
3 changes: 2 additions & 1 deletion broker/fragment/stores_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,14 +126,15 @@ func TestStoreInteractions(t *testing.T) {
}

func TestParseStoreArgsS3(t *testing.T) {
storeURL, _ := url.Parse("s3://bucket/prefix/?endpoint=https://s3.region.amazonaws.com&SSE=kms&SSEKMSKeyId=123")
storeURL, _ := url.Parse("s3://bucket/prefix/?endpoint=https://s3.region.amazonaws.com&SSE=kms&SSEKMSKeyId=123&region=some-region")
var s3Cfg S3StoreConfig
parseStoreArgs(storeURL, &s3Cfg)
require.Equal(t, "bucket", storeURL.Host)
require.Equal(t, "prefix/", storeURL.Path[1:])
require.Equal(t, "https://s3.region.amazonaws.com", s3Cfg.Endpoint)
require.Equal(t, "kms", s3Cfg.SSE)
require.Equal(t, "123", s3Cfg.SSEKMSKeyId)
require.Equal(t, "some-region", s3Cfg.Region)
}

func readFrag(t *testing.T, f pb.Fragment) string {
Expand Down

0 comments on commit 188c28b

Please sign in to comment.