From e636adf330c673da77836c10c97cafdc8f5200c9 Mon Sep 17 00:00:00 2001 From: Matthew Barnes Date: Fri, 31 Jan 2025 11:35:19 -0500 Subject: [PATCH] database: Merge Cosmos DB containers Discard the 'Operations' and 'Subscriptions' containers. This data now lives in the 'Resources' container. Doing this enables the use of transactional batch operations when multiple Cosmos DB items of different types need to be created or updated together. --- backend/operations_scanner.go | 2 +- dev-infrastructure/modules/rp-cosmos.bicep | 9 +--- internal/database/database.go | 59 +++++++--------------- internal/mocks/dbclient.go | 8 +-- 4 files changed, 23 insertions(+), 55 deletions(-) diff --git a/backend/operations_scanner.go b/backend/operations_scanner.go index db7dddf2dd..0042e566de 100644 --- a/backend/operations_scanner.go +++ b/backend/operations_scanner.go @@ -211,7 +211,7 @@ func (s *OperationsScanner) processOperations(ctx context.Context, subscriptionI pk := database.NewPartitionKey(subscriptionID) - iterator := s.dbClient.ListOperationDocs(subscriptionID) + iterator := s.dbClient.ListOperationDocs(pk) for operationID, operationDoc := range iterator.Items(ctx) { if !operationDoc.Status.IsTerminal() { diff --git a/dev-infrastructure/modules/rp-cosmos.bicep b/dev-infrastructure/modules/rp-cosmos.bicep index cbe0ca69bb..c80d646760 100644 --- a/dev-infrastructure/modules/rp-cosmos.bicep +++ b/dev-infrastructure/modules/rp-cosmos.bicep @@ -12,16 +12,9 @@ param private bool // Local Params var containers = [ - { - name: 'Subscriptions' - partitionKeyPaths: ['/id'] - } - { - name: 'Operations' - defaultTtl: 604800 // 7 days - } { name: 'Resources' + defaultTtl: -1 // enable ttl on items } { name: 'PartitionKeys' diff --git a/internal/database/database.go b/internal/database/database.go index 0652fcf0ae..afbd992f81 100644 --- a/internal/database/database.go +++ b/internal/database/database.go @@ -22,25 +22,10 @@ import ( const ( billingContainer = "Billing" locksContainer = "Locks" - operationsContainer = "Operations" resourcesContainer = "Resources" - subscriptionsContainer = "Subscriptions" partitionKeysContainer = "PartitionKeys" - // XXX The azcosmos SDK currently only supports single-partition queries, - // so there's no way to list all items in a container unless you know - // all the partition keys. The backend needs to list all items in the - // Operations container so to work around this limitation we keep all - // items in a single partition with a well-known name: "workaround". - // - // Once [1] is fixed we could transition the Operations container to - // using subscription IDs as the partition key like other containers. - // The items are transient thanks to the container's default TTL, so - // GetOperationDoc would just need temporary fallback logic to check - // the "workaround" partition. - // - // [1] https://github.com/Azure/azure-sdk-for-go/issues/18578 - operationsPartitionKey = "workaround" + operationTimeToLive = 604800 // 7 days ) var ErrNotFound = errors.New("not found") @@ -85,7 +70,7 @@ type DBClient interface { GetOperationDoc(ctx context.Context, pk azcosmos.PartitionKey, operationID string) (*OperationDocument, error) CreateOperationDoc(ctx context.Context, doc *OperationDocument) (string, error) UpdateOperationDoc(ctx context.Context, pk azcosmos.PartitionKey, operationID string, callback func(*OperationDocument) bool) (bool, error) - ListOperationDocs(subscriptionID string) DBClientIterator[OperationDocument] + ListOperationDocs(pk azcosmos.PartitionKey) DBClientIterator[OperationDocument] // GetSubscriptionDoc retrieves a subscription from the database given the subscriptionID. // ErrNotFound is returned if an associated subscription cannot be found. @@ -101,8 +86,6 @@ var _ DBClient = &cosmosDBClient{} type cosmosDBClient struct { database *azcosmos.DatabaseClient resources *azcosmos.ContainerClient - operations *azcosmos.ContainerClient - subscriptions *azcosmos.ContainerClient partitionKeys *azcosmos.ContainerClient lockClient *LockClient } @@ -113,8 +96,6 @@ func NewDBClient(ctx context.Context, database *azcosmos.DatabaseClient) (DBClie // NewContainer only fails if the container ID argument is // empty, so we can safely disregard the error return value. resources, _ := database.NewContainer(resourcesContainer) - operations, _ := database.NewContainer(operationsContainer) - subscriptions, _ := database.NewContainer(subscriptionsContainer) partitionKeys, _ := database.NewContainer(partitionKeysContainer) locks, _ := database.NewContainer(locksContainer) @@ -126,8 +107,6 @@ func NewDBClient(ctx context.Context, database *azcosmos.DatabaseClient) (DBClie return &cosmosDBClient{ database: database, resources: resources, - operations: operations, - subscriptions: subscriptions, partitionKeys: partitionKeys, lockClient: lockClient, }, nil @@ -327,13 +306,11 @@ func (d *cosmosDBClient) ListResourceDocs(prefix *azcorearm.ResourceID, maxItems } } -func (d *cosmosDBClient) getOperationDoc(ctx context.Context, pk azcosmos.PartitionKey, operationID string) (*typedDocument[OperationDocument], *OperationDocument, error) { //nolint:staticcheck +func (d *cosmosDBClient) getOperationDoc(ctx context.Context, pk azcosmos.PartitionKey, operationID string) (*typedDocument[OperationDocument], *OperationDocument, error) { // Make sure lookup keys are lowercase. operationID = strings.ToLower(operationID) - pk = NewPartitionKey(operationsPartitionKey) //nolint:staticcheck - - response, err := d.operations.ReadItem(ctx, pk, operationID, nil) + response, err := d.resources.ReadItem(ctx, pk, operationID, nil) if err != nil { if isResponseError(err, http.StatusNotFound) { err = ErrNotFound @@ -359,14 +336,18 @@ func (d *cosmosDBClient) GetOperationDoc(ctx context.Context, pk azcosmos.Partit // CreateOperationDoc writes an asynchronous operation document to the "operations" // container func (d *cosmosDBClient) CreateOperationDoc(ctx context.Context, doc *OperationDocument) (string, error) { - typedDoc := newTypedDocument[OperationDocument](operationsPartitionKey, OperationResourceType) + // Make sure partition key is lowercase. + subscriptionID := strings.ToLower(doc.ExternalID.SubscriptionID) + + typedDoc := newTypedDocument[OperationDocument](subscriptionID, OperationResourceType) + typedDoc.TimeToLive = operationTimeToLive data, err := typedDocumentMarshal(typedDoc, doc) if err != nil { return "", fmt.Errorf("failed to marshal Operations container item for '%s': %w", typedDoc.ID, err) } - _, err = d.operations.CreateItem(ctx, typedDoc.getPartitionKey(), data, nil) + _, err = d.resources.CreateItem(ctx, typedDoc.getPartitionKey(), data, nil) if err != nil { return "", fmt.Errorf("failed to create Operations container item for '%s': %w", typedDoc.ID, err) } @@ -407,7 +388,7 @@ func (d *cosmosDBClient) UpdateOperationDoc(ctx context.Context, pk azcosmos.Par } options.IfMatchEtag = &typedDoc.CosmosETag - _, err = d.operations.ReplaceItem(ctx, typedDoc.getPartitionKey(), typedDoc.ID, data, options) + _, err = d.resources.ReplaceItem(ctx, pk, typedDoc.ID, data, options) if err == nil { return true, nil } @@ -422,24 +403,18 @@ func (d *cosmosDBClient) UpdateOperationDoc(ctx context.Context, pk azcosmos.Par return false, err } -func (d *cosmosDBClient) ListOperationDocs(subscriptionID string) DBClientIterator[OperationDocument] { - pk := NewPartitionKey(operationsPartitionKey) - - query := "SELECT * FROM c WHERE STRINGEQUALS(c.resourceType, @resourceType, true) AND STARTSWITH(c.externalId, @prefix, true)" +func (d *cosmosDBClient) ListOperationDocs(pk azcosmos.PartitionKey) DBClientIterator[OperationDocument] { + query := "SELECT * FROM c WHERE STRINGEQUALS(c.resourceType, @resourceType, true)" opt := azcosmos.QueryOptions{ QueryParameters: []azcosmos.QueryParameter{ { Name: "@resourceType", Value: OperationResourceType.String(), }, - { - Name: "@prefix", - Value: "/subscriptions/" + strings.ToLower(subscriptionID), - }, }, } - pager := d.operations.NewQueryItemsPager(query, pk, &opt) + pager := d.resources.NewQueryItemsPager(query, pk, &opt) return newQueryItemsIterator[OperationDocument](pager) } @@ -450,7 +425,7 @@ func (d *cosmosDBClient) getSubscriptionDoc(ctx context.Context, subscriptionID pk := NewPartitionKey(subscriptionID) - response, err := d.subscriptions.ReadItem(ctx, pk, subscriptionID, nil) + response, err := d.resources.ReadItem(ctx, pk, subscriptionID, nil) if err != nil { if isResponseError(err, http.StatusNotFound) { err = ErrNotFound @@ -482,7 +457,7 @@ func (d *cosmosDBClient) CreateSubscriptionDoc(ctx context.Context, subscription return fmt.Errorf("failed to marshal Subscriptions container item for '%s': %w", subscriptionID, err) } - _, err = d.subscriptions.CreateItem(ctx, typedDoc.getPartitionKey(), data, nil) + _, err = d.resources.CreateItem(ctx, typedDoc.getPartitionKey(), data, nil) if err != nil { return fmt.Errorf("failed to create Subscriptions container item for '%s': %w", subscriptionID, err) } @@ -530,7 +505,7 @@ func (d *cosmosDBClient) UpdateSubscriptionDoc(ctx context.Context, subscription } options.IfMatchEtag = &typedDoc.CosmosETag - _, err = d.subscriptions.ReplaceItem(ctx, typedDoc.getPartitionKey(), typedDoc.ID, data, options) + _, err = d.resources.ReplaceItem(ctx, typedDoc.getPartitionKey(), typedDoc.ID, data, options) if err == nil { return true, nil } diff --git a/internal/mocks/dbclient.go b/internal/mocks/dbclient.go index b234501d93..c0c0f640e7 100644 --- a/internal/mocks/dbclient.go +++ b/internal/mocks/dbclient.go @@ -255,17 +255,17 @@ func (mr *MockDBClientMockRecorder) ListAllSubscriptionDocs() *gomock.Call { } // ListOperationDocs mocks base method. -func (m *MockDBClient) ListOperationDocs(subscriptionID string) database.DBClientIterator[database.OperationDocument] { +func (m *MockDBClient) ListOperationDocs(pk azcosmos.PartitionKey) database.DBClientIterator[database.OperationDocument] { m.ctrl.T.Helper() - ret := m.ctrl.Call(m, "ListOperationDocs", subscriptionID) + ret := m.ctrl.Call(m, "ListOperationDocs", pk) ret0, _ := ret[0].(database.DBClientIterator[database.OperationDocument]) return ret0 } // ListOperationDocs indicates an expected call of ListOperationDocs. -func (mr *MockDBClientMockRecorder) ListOperationDocs(subscriptionID any) *gomock.Call { +func (mr *MockDBClientMockRecorder) ListOperationDocs(pk any) *gomock.Call { mr.mock.ctrl.T.Helper() - return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListOperationDocs", reflect.TypeOf((*MockDBClient)(nil).ListOperationDocs), subscriptionID) + return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "ListOperationDocs", reflect.TypeOf((*MockDBClient)(nil).ListOperationDocs), pk) } // ListResourceDocs mocks base method.