Skip to content

Commit

Permalink
696 update dagster op for materialize alldocs (#817)
Browse files Browse the repository at this point in the history
* updated materialize_alldocs to reflect functionality in bulk_validation_referential_integrity notebook. Added necessary utils to util.py to support this.

* replace 'pick' with keyfilter

* add documentation about BULK_WRITE_BATCH_SIZE

* add docstring to 'materialize_alldocs' function, and run 'black' to format files

* fix: update test

---------

Co-authored-by: Donny Winston <donny@polyneme.xyz>
  • Loading branch information
mkshah605 and dwinston authored Dec 13, 2024
1 parent 22a9a58 commit 1b1a25c
Show file tree
Hide file tree
Showing 3 changed files with 150 additions and 97 deletions.
159 changes: 71 additions & 88 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@
from collections import defaultdict
from datetime import datetime, timezone
from io import BytesIO, StringIO
from toolz.dicttoolz import keyfilter
from typing import Tuple
from zipfile import ZipFile
from itertools import chain
Expand Down Expand Up @@ -93,17 +94,20 @@
from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id
from nmdc_runtime.util import (
drs_object_in_for,
get_names_of_classes_in_effective_range_of_slot,
pluralize,
put_object,
validate_json,
specialize_activity_set_docs,
collection_name_to_class_names,
class_hierarchy_as_list,
nmdc_schema_view,
populated_schema_collection_names_with_id_field,
)
from nmdc_schema import nmdc
from nmdc_schema.nmdc import Database as NMDCDatabase
from pydantic import BaseModel
from pymongo import InsertOne
from pymongo.database import Database as MongoDatabase
from starlette import status
from toolz import assoc, dissoc, get_in, valfilter, identity
Expand Down Expand Up @@ -1037,23 +1041,51 @@ def site_code_mapping() -> dict:

@op(required_resource_keys={"mongo"})
def materialize_alldocs(context) -> int:
"""
This function re-creates the alldocs collection to reflect the current state of the Mongo database.
See nmdc-runtime/docs/nb/bulk_validation_referential_integrity_check.ipynb for more details.
"""
mdb = context.resources.mongo.db
collection_names = populated_schema_collection_names_with_id_field(mdb)
schema_view = nmdc_schema_view()

# Insert a no-op as an anchor point for this comment.
#
# Note: There used to be code here that `assert`-ed that each collection could only contain documents of a single
# type. With the legacy schema, that assertion was true. With the Berkeley schema, it is false. That code was
# in place because subsequent code (further below) used a single document in a collection as the source of the
# class ancestry information of _all_ documents in that collection; an optimization that spared us from
# having to do the same for every single document in that collection. With the Berkeley schema, we have
# eliminated that optimization (since it is inadequate; it would produce some incorrect class ancestries
# for descendants of `PlannedProcess`, for example).
#
pass
# batch size for writing documents to alldocs
BULK_WRITE_BATCH_SIZE = 2000

# TODO include functional_annotation_agg for "real-time" ref integrity checking.
# For now, production use cases for materialized `alldocs` are limited to `id`-having collections.
collection_names = populated_schema_collection_names_with_id_field(mdb)
context.log.info(f"{collection_names=}")

# Build alldocs
context.log.info("constructing `alldocs` collection")

document_class_names = set(
chain.from_iterable(collection_name_to_class_names.values())
)

cls_slot_map = {
cls_name: {
slot.name: slot for slot in schema_view.class_induced_slots(cls_name)
}
for cls_name in document_class_names
}

# Any ancestor of a document class is a document-referenceable range, i.e., a valid range of a document-reference-ranged slot.
document_referenceable_ranges = set(
chain.from_iterable(
schema_view.class_ancestors(cls_name) for cls_name in document_class_names
)
)

document_reference_ranged_slots = defaultdict(list)
for cls_name, slot_map in cls_slot_map.items():
for slot_name, slot in slot_map.items():
if (
set(get_names_of_classes_in_effective_range_of_slot(schema_view, slot))
& document_referenceable_ranges
):
document_reference_ranged_slots[cls_name].append(slot_name)

# Drop any existing `alldocs` collection (e.g. from previous use of this op).
#
# FIXME: This "nuke and pave" approach introduces a race condition.
Expand All @@ -1062,90 +1094,41 @@ def materialize_alldocs(context) -> int:
#
mdb.alldocs.drop()

# Build alldocs
context.log.info("constructing `alldocs` collection")

# For each collection, group its documents by their `type` value, transform them, and load them into `alldocs`.
for collection_name in collection_names:
context.log.info(
f"Found {mdb[collection_name].estimated_document_count()} estimated documents for {collection_name=}."
)

# Process all the distinct `type` values (i.e. value in the `type` field) of the documents in this collection.
#
# References:
# - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.distinct
#
distinct_type_values = mdb[collection_name].distinct(key="type")
for coll_name in collection_names:
context.log.info(f"{coll_name=}")
requests = []
documents_processed_counter = 0
for doc in mdb[coll_name].find():
doc_type = doc["type"][5:] # lop off "nmdc:" prefix
slots_to_include = ["id", "type"] + document_reference_ranged_slots[
doc_type
]
new_doc = keyfilter(lambda slot: slot in slots_to_include, doc)
new_doc["_type_and_ancestors"] = schema_view.class_ancestors(doc_type)
requests.append(InsertOne(new_doc))
if len(requests) == BULK_WRITE_BATCH_SIZE:
_ = mdb.alldocs.bulk_write(requests, ordered=False)
requests.clear()
documents_processed_counter += BULK_WRITE_BATCH_SIZE
if len(requests) > 0:
_ = mdb.alldocs.bulk_write(requests, ordered=False)
documents_processed_counter += len(requests)
context.log.info(
f"Found {len(distinct_type_values)} distinct `type` values in {collection_name=}: {distinct_type_values=}"
f"Inserted {documents_processed_counter} documents from {coll_name=} "
)
for type_value in distinct_type_values:

# Process all the documents in this collection that have this value in their `type` field.
#
# References:
# - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.count_documents
# - https://pymongo.readthedocs.io/en/stable/api/pymongo/collection.html#pymongo.collection.Collection.find
#
filter_ = {"type": type_value}
num_docs_having_type = mdb[collection_name].count_documents(filter=filter_)
docs_having_type = mdb[collection_name].find(filter=filter_)
context.log.info(
f"Found {num_docs_having_type} documents having {type_value=} in {collection_name=}."
)

# Get a "representative" document from the result.
#
# Note: Since all of the documents in this batch have the same class ancestry, we will save time by
# determining the class ancestry of only _one_ of them (we call this the "representative") and then
# (later) attributing that class ancestry to all of them.
#
representative_doc = next(docs_having_type)

# Instantiate the Python class represented by the "representative" document.
db_dict = {
# Shed the `_id` attribute, since the constructor doesn't allow it.
collection_name: [dissoc(representative_doc, "_id")]
}
nmdc_db = NMDCDatabase(**db_dict)
representative_instance = getattr(nmdc_db, collection_name)[0]

# Get the class ancestry of that instance, as a list of class names (including its own class name).
ancestor_class_names = class_hierarchy_as_list(representative_instance)

# Store the documents belonging to this group, in the `alldocs` collection, setting their `type` field
# to the list of class names obtained from the "representative" document above.
#
# TODO: Document why clobbering the existing contents of the `type` field is OK.
#
# Note: The reason we `chain()` our "representative" document (in an iterable) with the `docs_having_type`
# iterator here is that, when we called `next(docs_having_type)` above, we "consumed" our
# "representative" document from that iterator. We use `chain()` here so that that document gets
# inserted alongside its cousins (i.e. the documents _still_ accessible via `docs_having_type`).
# Reference: https://docs.python.org/3/library/itertools.html#itertools.chain
#
inserted_many_result = mdb.alldocs.insert_many(
[
assoc(dissoc(doc, "type", "_id"), "type", ancestor_class_names)
for doc in chain([representative_doc], docs_having_type)
]
)
context.log.info(
f"Inserted {len(inserted_many_result.inserted_ids)} documents from {collection_name=} "
f"originally having {type_value=}."
)
context.log.info(
f"refreshed {mdb.alldocs} collection with {mdb.alldocs.estimated_document_count()} docs."
)

# Re-idx for `alldocs` collection
mdb.alldocs.create_index("id", unique=True)
# The indexes were added to improve the performance of the
# /data_objects/study/{study_id} endpoint
mdb.alldocs.create_index("has_input")
mdb.alldocs.create_index("has_output")
mdb.alldocs.create_index("was_informed_by")
context.log.info(
f"refreshed {mdb.alldocs} collection with {mdb.alldocs.estimated_document_count()} docs."
)
slots_to_index = ["has_input", "has_output", "was_informed_by"]
[mdb.alldocs.create_index(slot) for slot in slots_to_index]

context.log.info(f"created indexes on id, {slots_to_index}.")
return mdb.alldocs.estimated_document_count()


Expand Down
44 changes: 44 additions & 0 deletions nmdc_runtime/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
import requests
from frozendict import frozendict
from jsonschema.validators import Draft7Validator
from linkml_runtime import linkml_model
from linkml_runtime.utils.schemaview import SchemaView
from nmdc_schema.nmdc import Database as NMDCDatabase
from nmdc_schema.get_nmdc_view import ViewGetter
from pydantic import Field, BaseModel
Expand All @@ -29,6 +31,48 @@
from typing_extensions import Annotated


def get_names_of_classes_in_effective_range_of_slot(
schema_view: SchemaView, slot_definition: linkml_model.SlotDefinition
) -> List[str]:
r"""
Determine the slot's "effective" range, by taking into account its `any_of` constraints (if defined).
Note: The `any_of` constraints constrain the slot's "effective" range beyond that described by the
induced slot definition's `range` attribute. `SchemaView` does not seem to provide the result
of applying those additional constraints, so we do it manually here (if any are defined).
Reference: https://github.com/orgs/linkml/discussions/2101#discussion-6625646
Reference: https://linkml.io/linkml-model/latest/docs/any_of/
"""

# Initialize the list to be empty.
names_of_eligible_target_classes = []

# If the `any_of` constraint is defined on this slot, use that instead of the `range`.
if "any_of" in slot_definition and len(slot_definition.any_of) > 0:
for slot_expression in slot_definition.any_of:
# Use the slot expression's `range` to get the specified eligible class name
# and the names of all classes that inherit from that eligible class.
if slot_expression.range in schema_view.all_classes():
own_and_descendant_class_names = schema_view.class_descendants(
slot_expression.range
)
names_of_eligible_target_classes.extend(own_and_descendant_class_names)
else:
# Use the slot's `range` to get the specified eligible class name
# and the names of all classes that inherit from that eligible class.
if slot_definition.range in schema_view.all_classes():
own_and_descendant_class_names = schema_view.class_descendants(
slot_definition.range
)
names_of_eligible_target_classes.extend(own_and_descendant_class_names)

# Remove duplicate class names.
names_of_eligible_target_classes = list(set(names_of_eligible_target_classes))

return names_of_eligible_target_classes


def get_class_names_from_collection_spec(
spec: dict, prefix: Optional[str] = None
) -> List[str]:
Expand Down
44 changes: 35 additions & 9 deletions tests/test_ops/test_materialize_alldocs.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,11 +42,28 @@ def test_materialize_alldocs(op_context):
#
# Reference: https://microbiomedata.github.io/berkeley-schema-fy24/FieldResearchSite/#direct
#
field_research_site_class_ancestry_chain = ["FieldResearchSite", "Site", "MaterialEntity", "NamedThing"]
field_research_site_class_ancestry_chain = [
"FieldResearchSite",
"Site",
"MaterialEntity",
"NamedThing",
]
field_research_site_documents = [
{"id": "frsite-99-00000001", "type": "nmdc:FieldResearchSite", "name": "Site A"},
{"id": "frsite-99-00000002", "type": "nmdc:FieldResearchSite", "name": "Site B"},
{"id": "frsite-99-00000003", "type": "nmdc:FieldResearchSite", "name": "Site C"},
{
"id": "frsite-99-00000001",
"type": "nmdc:FieldResearchSite",
"name": "Site A",
},
{
"id": "frsite-99-00000002",
"type": "nmdc:FieldResearchSite",
"name": "Site B",
},
{
"id": "frsite-99-00000003",
"type": "nmdc:FieldResearchSite",
"name": "Site C",
},
]
field_research_site_set_collection = mdb.get_collection("field_research_site_set")
for document in field_research_site_documents:
Expand All @@ -70,7 +87,9 @@ def test_materialize_alldocs(op_context):

# Get a reference to the newly-materialized `alldocs` collection.
alldocs_collection = mdb.get_collection("alldocs")
num_alldocs_docs = alldocs_collection.count_documents({}) # here, we get an _exact_ count
num_alldocs_docs = alldocs_collection.count_documents(
{}
) # here, we get an _exact_ count

# Verify each upstream document is represented correctly—and only once—in the `alldocs` collection.
#
Expand All @@ -86,14 +105,21 @@ def test_materialize_alldocs(op_context):
collection = mdb.get_collection(collection_name)
for document in collection.find({}):
num_upstream_docs += 1
document_lacking_type = dissoc(document, "_id", "type")
document_having_generic_type = assoc(document_lacking_type, "type", {"$type": "array"})
document_having_generic_type = assoc(
{"id": document["id"]},
"_type_and_ancestors",
{"$type": "array"},
)
assert alldocs_collection.count_documents(document_having_generic_type) == 1

# Verify each of the specific documents we created above appears in the `alldocs` collection once,
# and that its `type` value has been replaced with its class ancestry chain.
# and that `_type_and_ancestors` has been set to its class ancestry chain.
for document in field_research_site_documents:
alldocs_document = assoc(dissoc(document, "type"), "type", field_research_site_class_ancestry_chain)
alldocs_document = {
"id": document["id"],
"type": document["type"],
"_type_and_ancestors": field_research_site_class_ancestry_chain,
}
assert alldocs_collection.count_documents(alldocs_document) == 1

# Verify the total number of documents in all the upstream collections, combined,
Expand Down

0 comments on commit 1b1a25c

Please sign in to comment.