diff --git a/nmdc_runtime/site/ops.py b/nmdc_runtime/site/ops.py index 66d5d2d0..f819430d 100644 --- a/nmdc_runtime/site/ops.py +++ b/nmdc_runtime/site/ops.py @@ -7,6 +7,7 @@ from collections import defaultdict from datetime import datetime, timezone from io import BytesIO, StringIO +from toolz.dicttoolz import keyfilter from typing import Tuple from zipfile import ZipFile from itertools import chain @@ -93,17 +94,20 @@ from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id from nmdc_runtime.util import ( drs_object_in_for, + get_names_of_classes_in_effective_range_of_slot, pluralize, put_object, validate_json, specialize_activity_set_docs, collection_name_to_class_names, class_hierarchy_as_list, + nmdc_schema_view, populated_schema_collection_names_with_id_field, ) from nmdc_schema import nmdc from nmdc_schema.nmdc import Database as NMDCDatabase from pydantic import BaseModel +from pymongo import InsertOne from pymongo.database import Database as MongoDatabase from starlette import status from toolz import assoc, dissoc, get_in, valfilter, identity @@ -1037,23 +1041,51 @@ def site_code_mapping() -> dict: @op(required_resource_keys={"mongo"}) def materialize_alldocs(context) -> int: + """ + This function re-creates the alldocs collection to reflect the current state of the Mongo database. + See nmdc-runtime/docs/nb/bulk_validation_referential_integrity_check.ipynb for more details. + """ mdb = context.resources.mongo.db - collection_names = populated_schema_collection_names_with_id_field(mdb) + schema_view = nmdc_schema_view() - # Insert a no-op as an anchor point for this comment. - # - # Note: There used to be code here that `assert`-ed that each collection could only contain documents of a single - # type. With the legacy schema, that assertion was true. With the Berkeley schema, it is false. That code was - # in place because subsequent code (further below) used a single document in a collection as the source of the - # class ancestry information of _all_ documents in that collection; an optimization that spared us from - # having to do the same for every single document in that collection. With the Berkeley schema, we have - # eliminated that optimization (since it is inadequate; it would produce some incorrect class ancestries - # for descendants of `PlannedProcess`, for example). - # - pass + # batch size for writing documents to alldocs + BULK_WRITE_BATCH_SIZE = 2000 + # TODO include functional_annotation_agg for "real-time" ref integrity checking. + # For now, production use cases for materialized `alldocs` are limited to `id`-having collections. + collection_names = populated_schema_collection_names_with_id_field(mdb) context.log.info(f"{collection_names=}") + # Build alldocs + context.log.info("constructing `alldocs` collection") + + document_class_names = set( + chain.from_iterable(collection_name_to_class_names.values()) + ) + + cls_slot_map = { + cls_name: { + slot.name: slot for slot in schema_view.class_induced_slots(cls_name) + } + for cls_name in document_class_names + } + + # Any ancestor of a document class is a document-referenceable range, i.e., a valid range of a document-reference-ranged slot. + document_referenceable_ranges = set( + chain.from_iterable( + schema_view.class_ancestors(cls_name) for cls_name in document_class_names + ) + ) + + document_reference_ranged_slots = defaultdict(list) + for cls_name, slot_map in cls_slot_map.items(): + for slot_name, slot in slot_map.items(): + if ( + set(get_names_of_classes_in_effective_range_of_slot(schema_view, slot)) + & document_referenceable_ranges + ): + document_reference_ranged_slots[cls_name].append(slot_name) + # Drop any existing `alldocs` collection (e.g. from previous use of this op). # # FIXME: This "nuke and pave" approach introduces a race condition. @@ -1062,90 +1094,41 @@ def materialize_alldocs(context) -> int: # mdb.alldocs.drop() - # Build alldocs - context.log.info("constructing `alldocs` collection") - - # For each collection, group its documents by their `type` value, transform them, and load them into `alldocs`. - for collection_name in collection_names: - context.log.info( - f"Found {mdb[collection_name].estimated_document_count()} estimated documents for {collection_name=}." - ) - - # Process all the distinct `type` values (i.e. value in the `type` field) of the documents in this collection. - # - # References: - # - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.distinct - # - distinct_type_values = mdb[collection_name].distinct(key="type") + for coll_name in collection_names: + context.log.info(f"{coll_name=}") + requests = [] + documents_processed_counter = 0 + for doc in mdb[coll_name].find(): + doc_type = doc["type"][5:] # lop off "nmdc:" prefix + slots_to_include = ["id", "type"] + document_reference_ranged_slots[ + doc_type + ] + new_doc = keyfilter(lambda slot: slot in slots_to_include, doc) + new_doc["_type_and_ancestors"] = schema_view.class_ancestors(doc_type) + requests.append(InsertOne(new_doc)) + if len(requests) == BULK_WRITE_BATCH_SIZE: + _ = mdb.alldocs.bulk_write(requests, ordered=False) + requests.clear() + documents_processed_counter += BULK_WRITE_BATCH_SIZE + if len(requests) > 0: + _ = mdb.alldocs.bulk_write(requests, ordered=False) + documents_processed_counter += len(requests) context.log.info( - f"Found {len(distinct_type_values)} distinct `type` values in {collection_name=}: {distinct_type_values=}" + f"Inserted {documents_processed_counter} documents from {coll_name=} " ) - for type_value in distinct_type_values: - - # Process all the documents in this collection that have this value in their `type` field. - # - # References: - # - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.count_documents - # - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find - # - filter_ = {"type": type_value} - num_docs_having_type = mdb[collection_name].count_documents(filter=filter_) - docs_having_type = mdb[collection_name].find(filter=filter_) - context.log.info( - f"Found {num_docs_having_type} documents having {type_value=} in {collection_name=}." - ) - # Get a "representative" document from the result. - # - # Note: Since all of the documents in this batch have the same class ancestry, we will save time by - # determining the class ancestry of only _one_ of them (we call this the "representative") and then - # (later) attributing that class ancestry to all of them. - # - representative_doc = next(docs_having_type) - - # Instantiate the Python class represented by the "representative" document. - db_dict = { - # Shed the `_id` attribute, since the constructor doesn't allow it. - collection_name: [dissoc(representative_doc, "_id")] - } - nmdc_db = NMDCDatabase(**db_dict) - representative_instance = getattr(nmdc_db, collection_name)[0] - - # Get the class ancestry of that instance, as a list of class names (including its own class name). - ancestor_class_names = class_hierarchy_as_list(representative_instance) - - # Store the documents belonging to this group, in the `alldocs` collection, setting their `type` field - # to the list of class names obtained from the "representative" document above. - # - # TODO: Document why clobbering the existing contents of the `type` field is OK. - # - # Note: The reason we `chain()` our "representative" document (in an iterable) with the `docs_having_type` - # iterator here is that, when we called `next(docs_having_type)` above, we "consumed" our - # "representative" document from that iterator. We use `chain()` here so that that document gets - # inserted alongside its cousins (i.e. the documents _still_ accessible via `docs_having_type`). - # Reference: https://docs.python.org/3/library/itertools.html#itertools.chain - # - inserted_many_result = mdb.alldocs.insert_many( - [ - assoc(dissoc(doc, "type", "_id"), "type", ancestor_class_names) - for doc in chain([representative_doc], docs_having_type) - ] - ) - context.log.info( - f"Inserted {len(inserted_many_result.inserted_ids)} documents from {collection_name=} " - f"originally having {type_value=}." - ) + context.log.info( + f"refreshed {mdb.alldocs} collection with {mdb.alldocs.estimated_document_count()} docs." + ) # Re-idx for `alldocs` collection mdb.alldocs.create_index("id", unique=True) # The indexes were added to improve the performance of the # /data_objects/study/{study_id} endpoint - mdb.alldocs.create_index("has_input") - mdb.alldocs.create_index("has_output") - mdb.alldocs.create_index("was_informed_by") - context.log.info( - f"refreshed {mdb.alldocs} collection with {mdb.alldocs.estimated_document_count()} docs." - ) + slots_to_index = ["has_input", "has_output", "was_informed_by"] + [mdb.alldocs.create_index(slot) for slot in slots_to_index] + + context.log.info(f"created indexes on id, {slots_to_index}.") return mdb.alldocs.estimated_document_count() diff --git a/nmdc_runtime/util.py b/nmdc_runtime/util.py index eee078f9..d4ab65c2 100644 --- a/nmdc_runtime/util.py +++ b/nmdc_runtime/util.py @@ -17,6 +17,8 @@ import requests from frozendict import frozendict from jsonschema.validators import Draft7Validator +from linkml_runtime import linkml_model +from linkml_runtime.utils.schemaview import SchemaView from nmdc_schema.nmdc import Database as NMDCDatabase from nmdc_schema.get_nmdc_view import ViewGetter from pydantic import Field, BaseModel @@ -29,6 +31,48 @@ from typing_extensions import Annotated +def get_names_of_classes_in_effective_range_of_slot( + schema_view: SchemaView, slot_definition: linkml_model.SlotDefinition +) -> List[str]: + r""" + Determine the slot's "effective" range, by taking into account its `any_of` constraints (if defined). + + Note: The `any_of` constraints constrain the slot's "effective" range beyond that described by the + induced slot definition's `range` attribute. `SchemaView` does not seem to provide the result + of applying those additional constraints, so we do it manually here (if any are defined). + Reference: https://github.com/orgs/linkml/discussions/2101#discussion-6625646 + + Reference: https://linkml.io/linkml-model/latest/docs/any_of/ + """ + + # Initialize the list to be empty. + names_of_eligible_target_classes = [] + + # If the `any_of` constraint is defined on this slot, use that instead of the `range`. + if "any_of" in slot_definition and len(slot_definition.any_of) > 0: + for slot_expression in slot_definition.any_of: + # Use the slot expression's `range` to get the specified eligible class name + # and the names of all classes that inherit from that eligible class. + if slot_expression.range in schema_view.all_classes(): + own_and_descendant_class_names = schema_view.class_descendants( + slot_expression.range + ) + names_of_eligible_target_classes.extend(own_and_descendant_class_names) + else: + # Use the slot's `range` to get the specified eligible class name + # and the names of all classes that inherit from that eligible class. + if slot_definition.range in schema_view.all_classes(): + own_and_descendant_class_names = schema_view.class_descendants( + slot_definition.range + ) + names_of_eligible_target_classes.extend(own_and_descendant_class_names) + + # Remove duplicate class names. + names_of_eligible_target_classes = list(set(names_of_eligible_target_classes)) + + return names_of_eligible_target_classes + + def get_class_names_from_collection_spec( spec: dict, prefix: Optional[str] = None ) -> List[str]: diff --git a/tests/test_ops/test_materialize_alldocs.py b/tests/test_ops/test_materialize_alldocs.py index 16295b5e..4c3edfa4 100644 --- a/tests/test_ops/test_materialize_alldocs.py +++ b/tests/test_ops/test_materialize_alldocs.py @@ -42,11 +42,28 @@ def test_materialize_alldocs(op_context): # # Reference: https://microbiomedata.github.io/berkeley-schema-fy24/FieldResearchSite/#direct # - field_research_site_class_ancestry_chain = ["FieldResearchSite", "Site", "MaterialEntity", "NamedThing"] + field_research_site_class_ancestry_chain = [ + "FieldResearchSite", + "Site", + "MaterialEntity", + "NamedThing", + ] field_research_site_documents = [ - {"id": "frsite-99-00000001", "type": "nmdc:FieldResearchSite", "name": "Site A"}, - {"id": "frsite-99-00000002", "type": "nmdc:FieldResearchSite", "name": "Site B"}, - {"id": "frsite-99-00000003", "type": "nmdc:FieldResearchSite", "name": "Site C"}, + { + "id": "frsite-99-00000001", + "type": "nmdc:FieldResearchSite", + "name": "Site A", + }, + { + "id": "frsite-99-00000002", + "type": "nmdc:FieldResearchSite", + "name": "Site B", + }, + { + "id": "frsite-99-00000003", + "type": "nmdc:FieldResearchSite", + "name": "Site C", + }, ] field_research_site_set_collection = mdb.get_collection("field_research_site_set") for document in field_research_site_documents: @@ -70,7 +87,9 @@ def test_materialize_alldocs(op_context): # Get a reference to the newly-materialized `alldocs` collection. alldocs_collection = mdb.get_collection("alldocs") - num_alldocs_docs = alldocs_collection.count_documents({}) # here, we get an _exact_ count + num_alldocs_docs = alldocs_collection.count_documents( + {} + ) # here, we get an _exact_ count # Verify each upstream document is represented correctly—and only once—in the `alldocs` collection. # @@ -86,14 +105,21 @@ def test_materialize_alldocs(op_context): collection = mdb.get_collection(collection_name) for document in collection.find({}): num_upstream_docs += 1 - document_lacking_type = dissoc(document, "_id", "type") - document_having_generic_type = assoc(document_lacking_type, "type", {"$type": "array"}) + document_having_generic_type = assoc( + {"id": document["id"]}, + "_type_and_ancestors", + {"$type": "array"}, + ) assert alldocs_collection.count_documents(document_having_generic_type) == 1 # Verify each of the specific documents we created above appears in the `alldocs` collection once, - # and that its `type` value has been replaced with its class ancestry chain. + # and that `_type_and_ancestors` has been set to its class ancestry chain. for document in field_research_site_documents: - alldocs_document = assoc(dissoc(document, "type"), "type", field_research_site_class_ancestry_chain) + alldocs_document = { + "id": document["id"], + "type": document["type"], + "_type_and_ancestors": field_research_site_class_ancestry_chain, + } assert alldocs_collection.count_documents(alldocs_document) == 1 # Verify the total number of documents in all the upstream collections, combined,