Skip to content

Commit

Permalink
database: Merge Cosmos DB containers
Browse files Browse the repository at this point in the history
Discard the 'Operations' and 'Subscriptions' containers. This data
now lives in the 'Resources' container.

Doing this enables the use of transactional batch operations when
multiple Cosmos DB items of different types need to be created or
updated together.
  • Loading branch information
Matthew Barnes committed Feb 17, 2025
1 parent b61a9e0 commit cf8106f
Show file tree
Hide file tree
Showing 5 changed files with 39 additions and 73 deletions.
2 changes: 1 addition & 1 deletion backend/operations_scanner.go
Original file line number Diff line number Diff line change
Expand Up @@ -213,7 +213,7 @@ func (s *OperationsScanner) processOperations(ctx context.Context, subscriptionI

pk := database.NewPartitionKey(subscriptionID)

iterator := s.dbClient.ListOperationDocs(subscriptionID)
iterator := s.dbClient.ListOperationDocs(pk)

for operationID, operationDoc := range iterator.Items(ctx) {
if !operationDoc.Status.IsTerminal() {
Expand Down
9 changes: 1 addition & 8 deletions dev-infrastructure/modules/rp-cosmos.bicep
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,9 @@ param private bool

// Local Params
var containers = [
{
name: 'Subscriptions'
partitionKeyPaths: ['/id']
}
{
name: 'Operations'
defaultTtl: 604800 // 7 days
}
{
name: 'Resources'
defaultTtl: -1 // enable ttl on items
}
{
name: 'PartitionKeys'
Expand Down
34 changes: 16 additions & 18 deletions frontend/pkg/metrics/metrics_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package metrics

// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.

import (
"bytes"
"errors"
"io"
"log/slog"
"slices"
"maps"
"testing"
"time"

Expand All @@ -20,19 +23,14 @@ import (
"github.com/Azure/ARO-HCP/internal/mocks"
)

// Copyright (c) Microsoft Corporation.
// Licensed under the Apache License 2.0.

func TestSubscriptionCollector(t *testing.T) {
logger := slog.New(slog.NewTextHandler(io.Discard, nil))
nosubs := slices.Values([]*database.SubscriptionDocument{})
subs := slices.Values([]*database.SubscriptionDocument{
database.NewSubscriptionDocument(
"00000000-0000-0000-0000-000000000000",
&arm.Subscription{
State: arm.SubscriptionStateRegistered,
RegistrationDate: api.Ptr(time.Now().String()),
}),
nosubs := maps.All(map[string]*arm.Subscription{})
subs := maps.All(map[string]*arm.Subscription{
"00000000-0000-0000-0000-000000000000": &arm.Subscription{
State: arm.SubscriptionStateRegistered,
RegistrationDate: api.Ptr(time.Now().String()),
},
})
ctrl := gomock.NewController(t)

Expand All @@ -42,10 +40,10 @@ func TestSubscriptionCollector(t *testing.T) {
collector := NewSubscriptionCollector(r, mockDBClient, "test")

t.Run("no subscription", func(t *testing.T) {
mockIter := mocks.NewMockDBClientIterator[database.SubscriptionDocument](ctrl)
mockIter := mocks.NewMockDBClientIterator[arm.Subscription](ctrl)
mockIter.EXPECT().
Items(gomock.Any()).
Return(database.DBClientIteratorItem[database.SubscriptionDocument](nosubs))
Return(database.DBClientIteratorItem[arm.Subscription](nosubs))
mockIter.EXPECT().
GetError().
Return(nil)
Expand All @@ -69,10 +67,10 @@ frontend_subscription_collector_last_sync 1
})

t.Run("db error", func(t *testing.T) {
mockIter := mocks.NewMockDBClientIterator[database.SubscriptionDocument](ctrl)
mockIter := mocks.NewMockDBClientIterator[arm.Subscription](ctrl)
mockIter.EXPECT().
Items(gomock.Any()).
Return(database.DBClientIteratorItem[database.SubscriptionDocument](nosubs))
Return(database.DBClientIteratorItem[arm.Subscription](nosubs))
mockIter.EXPECT().
GetError().
Return(errors.New("db error"))
Expand All @@ -96,10 +94,10 @@ frontend_subscription_collector_last_sync 0
})

t.Run("refresh with 1 subscription", func(t *testing.T) {
mockIter := mocks.NewMockDBClientIterator[database.SubscriptionDocument](ctrl)
mockIter := mocks.NewMockDBClientIterator[arm.Subscription](ctrl)
mockIter.EXPECT().
Items(gomock.Any()).
Return(database.DBClientIteratorItem[database.SubscriptionDocument](subs))
Return(database.DBClientIteratorItem[arm.Subscription](subs))
mockIter.EXPECT().
GetError().
Return(nil)
Expand Down
59 changes: 17 additions & 42 deletions internal/database/database.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,25 +22,10 @@ import (
const (
billingContainer = "Billing"
locksContainer = "Locks"
operationsContainer = "Operations"
resourcesContainer = "Resources"
subscriptionsContainer = "Subscriptions"
partitionKeysContainer = "PartitionKeys"

// XXX The azcosmos SDK currently only supports single-partition queries,
// so there's no way to list all items in a container unless you know
// all the partition keys. The backend needs to list all items in the
// Operations container so to work around this limitation we keep all
// items in a single partition with a well-known name: "workaround".
//
// Once [1] is fixed we could transition the Operations container to
// using subscription IDs as the partition key like other containers.
// The items are transient thanks to the container's default TTL, so
// GetOperationDoc would just need temporary fallback logic to check
// the "workaround" partition.
//
// [1] https://github.com/Azure/azure-sdk-for-go/issues/18578
operationsPartitionKey = "workaround"
operationTimeToLive = 604800 // 7 days
)

var ErrNotFound = errors.New("not found")
Expand Down Expand Up @@ -85,7 +70,7 @@ type DBClient interface {
GetOperationDoc(ctx context.Context, pk azcosmos.PartitionKey, operationID string) (*OperationDocument, error)
CreateOperationDoc(ctx context.Context, doc *OperationDocument) (string, error)
UpdateOperationDoc(ctx context.Context, pk azcosmos.PartitionKey, operationID string, callback func(*OperationDocument) bool) (bool, error)
ListOperationDocs(subscriptionID string) DBClientIterator[OperationDocument]
ListOperationDocs(pk azcosmos.PartitionKey) DBClientIterator[OperationDocument]

// GetSubscriptionDoc retrieves a subscription from the database given the subscriptionID.
// ErrNotFound is returned if an associated subscription cannot be found.
Expand All @@ -101,8 +86,6 @@ var _ DBClient = &cosmosDBClient{}
type cosmosDBClient struct {
database *azcosmos.DatabaseClient
resources *azcosmos.ContainerClient
operations *azcosmos.ContainerClient
subscriptions *azcosmos.ContainerClient
partitionKeys *azcosmos.ContainerClient
lockClient *LockClient
}
Expand All @@ -113,8 +96,6 @@ func NewDBClient(ctx context.Context, database *azcosmos.DatabaseClient) (DBClie
// NewContainer only fails if the container ID argument is
// empty, so we can safely disregard the error return value.
resources, _ := database.NewContainer(resourcesContainer)
operations, _ := database.NewContainer(operationsContainer)
subscriptions, _ := database.NewContainer(subscriptionsContainer)
partitionKeys, _ := database.NewContainer(partitionKeysContainer)
locks, _ := database.NewContainer(locksContainer)

Expand All @@ -126,8 +107,6 @@ func NewDBClient(ctx context.Context, database *azcosmos.DatabaseClient) (DBClie
return &cosmosDBClient{
database: database,
resources: resources,
operations: operations,
subscriptions: subscriptions,
partitionKeys: partitionKeys,
lockClient: lockClient,
}, nil
Expand Down Expand Up @@ -327,13 +306,11 @@ func (d *cosmosDBClient) ListResourceDocs(prefix *azcorearm.ResourceID, maxItems
}
}

func (d *cosmosDBClient) getOperationDoc(ctx context.Context, pk azcosmos.PartitionKey, operationID string) (*typedDocument, *OperationDocument, error) { //nolint:staticcheck
func (d *cosmosDBClient) getOperationDoc(ctx context.Context, pk azcosmos.PartitionKey, operationID string) (*typedDocument, *OperationDocument, error) {
// Make sure lookup keys are lowercase.
operationID = strings.ToLower(operationID)

pk = NewPartitionKey(operationsPartitionKey) //nolint:staticcheck

response, err := d.operations.ReadItem(ctx, pk, operationID, nil)
response, err := d.resources.ReadItem(ctx, pk, operationID, nil)
if err != nil {
if isResponseError(err, http.StatusNotFound) {
err = ErrNotFound
Expand All @@ -359,14 +336,18 @@ func (d *cosmosDBClient) GetOperationDoc(ctx context.Context, pk azcosmos.Partit
// CreateOperationDoc writes an asynchronous operation document to the "operations"
// container
func (d *cosmosDBClient) CreateOperationDoc(ctx context.Context, doc *OperationDocument) (string, error) {
typedDoc := newTypedDocument(operationsPartitionKey, OperationResourceType)
// Make sure partition key is lowercase.
subscriptionID := strings.ToLower(doc.ExternalID.SubscriptionID)

typedDoc := newTypedDocument(subscriptionID, OperationResourceType)
typedDoc.TimeToLive = operationTimeToLive

data, err := typedDocumentMarshal(typedDoc, doc)
if err != nil {
return "", fmt.Errorf("failed to marshal Operations container item for '%s': %w", typedDoc.ID, err)
}

_, err = d.operations.CreateItem(ctx, typedDoc.getPartitionKey(), data, nil)
_, err = d.resources.CreateItem(ctx, typedDoc.getPartitionKey(), data, nil)
if err != nil {
return "", fmt.Errorf("failed to create Operations container item for '%s': %w", typedDoc.ID, err)
}
Expand Down Expand Up @@ -407,7 +388,7 @@ func (d *cosmosDBClient) UpdateOperationDoc(ctx context.Context, pk azcosmos.Par
}

options.IfMatchEtag = &typedDoc.CosmosETag
_, err = d.operations.ReplaceItem(ctx, typedDoc.getPartitionKey(), typedDoc.ID, data, options)
_, err = d.resources.ReplaceItem(ctx, pk, typedDoc.ID, data, options)
if err == nil {
return true, nil
}
Expand All @@ -422,24 +403,18 @@ func (d *cosmosDBClient) UpdateOperationDoc(ctx context.Context, pk azcosmos.Par
return false, err
}

func (d *cosmosDBClient) ListOperationDocs(subscriptionID string) DBClientIterator[OperationDocument] {
pk := azcosmos.NewPartitionKeyString(operationsPartitionKey)

query := "SELECT * FROM c WHERE STRINGEQUALS(c.resourceType, @resourceType, true) AND STARTSWITH(c.externalId, @prefix, true)"
func (d *cosmosDBClient) ListOperationDocs(pk azcosmos.PartitionKey) DBClientIterator[OperationDocument] {
query := "SELECT * FROM c WHERE STRINGEQUALS(c.resourceType, @resourceType, true)"
opt := azcosmos.QueryOptions{
QueryParameters: []azcosmos.QueryParameter{
{
Name: "@resourceType",
Value: OperationResourceType.String(),
},
{
Name: "@prefix",
Value: "/subscriptions/" + strings.ToLower(subscriptionID),
},
},
}

pager := d.operations.NewQueryItemsPager(query, pk, &opt)
pager := d.resources.NewQueryItemsPager(query, pk, &opt)

return newQueryItemsIterator[OperationDocument](pager)
}
Expand All @@ -450,7 +425,7 @@ func (d *cosmosDBClient) getSubscriptionDoc(ctx context.Context, subscriptionID

pk := NewPartitionKey(subscriptionID)

response, err := d.subscriptions.ReadItem(ctx, pk, subscriptionID, nil)
response, err := d.resources.ReadItem(ctx, pk, subscriptionID, nil)
if err != nil {
if isResponseError(err, http.StatusNotFound) {
err = ErrNotFound
Expand Down Expand Up @@ -485,7 +460,7 @@ func (d *cosmosDBClient) CreateSubscriptionDoc(ctx context.Context, subscription
return fmt.Errorf("failed to marshal Subscriptions container item for '%s': %w", subscriptionID, err)
}

_, err = d.subscriptions.CreateItem(ctx, typedDoc.getPartitionKey(), data, nil)
_, err = d.resources.CreateItem(ctx, typedDoc.getPartitionKey(), data, nil)
if err != nil {
return fmt.Errorf("failed to create Subscriptions container item for '%s': %w", subscriptionID, err)
}
Expand Down Expand Up @@ -533,7 +508,7 @@ func (d *cosmosDBClient) UpdateSubscriptionDoc(ctx context.Context, subscription
}

options.IfMatchEtag = &typedDoc.CosmosETag
_, err = d.subscriptions.ReplaceItem(ctx, typedDoc.getPartitionKey(), typedDoc.ID, data, options)
_, err = d.resources.ReplaceItem(ctx, typedDoc.getPartitionKey(), typedDoc.ID, data, options)
if err == nil {
return true, nil
}
Expand Down
8 changes: 4 additions & 4 deletions internal/mocks/dbclient.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

0 comments on commit cf8106f

Please sign in to comment.