diff --git a/nmdc_runtime/site/graphs.py b/nmdc_runtime/site/graphs.py index 1cb1f1e3..3f06f6f4 100644 --- a/nmdc_runtime/site/graphs.py +++ b/nmdc_runtime/site/graphs.py @@ -56,6 +56,9 @@ get_ncbi_export_pipeline_inputs, ncbi_submission_xml_from_nmdc_study, ncbi_submission_xml_asset, + get_biosample_rollup_pipeline_input, + materialize_biosample_rollup, + biosample_rollup_filename, ) from nmdc_runtime.site.export.study_metadata import get_biosamples_by_study_id @@ -105,6 +108,16 @@ def ensure_alldocs(): materialize_alldocs() +@graph +def biosample_rollup_export(): + nmdc_study_id = get_biosample_rollup_pipeline_input() + biosample_rollup = materialize_biosample_rollup(nmdc_study_id) + + filename = biosample_rollup_filename() + outputs = export_json_to_drs(biosample_rollup, filename) + add_output_run_event(outputs) + + @graph def ensure_jobs(): jobs = construct_jobs() diff --git a/nmdc_runtime/site/ops.py b/nmdc_runtime/site/ops.py index c8317fae..b4baf39d 100644 --- a/nmdc_runtime/site/ops.py +++ b/nmdc_runtime/site/ops.py @@ -7,9 +7,9 @@ from collections import defaultdict from datetime import datetime, timezone from io import BytesIO, StringIO -from typing import Tuple from zipfile import ZipFile from itertools import chain +from typing import Dict, List, Tuple import pandas as pd import requests @@ -20,9 +20,7 @@ Any, AssetKey, AssetMaterialization, - Dict, Failure, - List, MetadataValue, OpExecutionContext, Out, @@ -34,12 +32,11 @@ Optional, Field, Permissive, - Bool, ) from gridfs import GridFS from linkml_runtime.dumpers import json_dumper from linkml_runtime.utils.yamlutils import YAMLRoot -from nmdc_runtime.api.db.mongo import get_mongo_db +from nmdc_runtime.api.db.mongo import get_collection_names_from_schema, get_mongo_db from nmdc_runtime.api.core.idgen import generate_one_id from nmdc_runtime.api.core.metadata import ( _validate_changesheet, @@ -47,9 +44,18 @@ get_collection_for_id, map_id_to_collection, ) -from nmdc_runtime.api.core.util import dotted_path_for, hash_from_str, json_clean, now +from nmdc_runtime.api.core.util import ( + dotted_path_for, + hash_from_str, + json_clean, + now, + raise404_if_none, +) from nmdc_runtime.api.endpoints.util import persist_content_and_get_drs_object -from nmdc_runtime.api.endpoints.find import find_study_by_id +from nmdc_runtime.api.endpoints.find import ( + find_study_by_id, + get_classname_from_typecode, +) from nmdc_runtime.api.models.job import Job, JobOperationMetadata from nmdc_runtime.api.models.metadata import ChangesheetIn from nmdc_runtime.api.models.operation import ( @@ -63,6 +69,7 @@ _add_run_complete_event, ) from nmdc_runtime.api.models.util import ResultT +from nmdc_runtime.minter.config import typecodes from nmdc_runtime.site.export.ncbi_xml import NCBISubmissionXML from nmdc_runtime.site.export.ncbi_xml_utils import ( fetch_data_objects_from_biosamples, @@ -89,7 +96,11 @@ from nmdc_runtime.site.translation.submission_portal_translator import ( SubmissionPortalTranslator, ) -from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id +from nmdc_runtime.site.util import ( + get_collection_from_typecode, + run_and_log, + schema_collection_has_index_on_id, +) from nmdc_runtime.util import ( drs_object_in_for, pluralize, @@ -102,6 +113,7 @@ ) from nmdc_schema import nmdc from nmdc_schema.nmdc import Database as NMDCDatabase +from nmdc_schema.get_nmdc_view import ViewGetter from pydantic import BaseModel from pymongo.database import Database as MongoDatabase from starlette import status @@ -1141,6 +1153,116 @@ def materialize_alldocs(context) -> int: return mdb.alldocs.estimated_document_count() +@op +def biosample_rollup_filename() -> str: + return "biosample_rollup.json" + + +@op( + config_schema={"nmdc_study_id": str}, + out={"nmdc_study_id": Out(str)}, +) +def get_biosample_rollup_pipeline_input(context: OpExecutionContext) -> str: + return context.op_config["nmdc_study_id"] + + +@op(required_resource_keys={"mongo"}) +def materialize_biosample_rollup( + context: OpExecutionContext, nmdc_study_id: str +) -> Dict: + mdb = context.resources.mongo.db + + study = raise404_if_none( + mdb.study_set.find_one({"id": nmdc_study_id}, projection={"id": 1}), + detail="Study not found", + ) + if not study: + return [] + + # Note: With nmdc-schema v10 (legacy schema), we used the field named `part_of` here. + # With nmdc-schema v11 (Berkeley schema), we use the field named `associated_studies` here. + biosamples = mdb.biosample_set.find({"associated_studies": study["id"]}, ["id"]) + biosample_ids = [biosample["id"] for biosample in biosamples] + if not biosample_ids: + return [] + + biosample_associated_ids = [] + + # SchemaView interface to NMDC Schema + nmdc_view = ViewGetter() + nmdc_sv = nmdc_view.get_view() + dg_descendants = nmdc_sv.class_descendants("DataGeneration") + + for biosample_id in biosample_ids: + current_ids = [biosample_id] + + # List to capture all document IDs that are related to a Biosample ID + all_collected_ids = [] + + while current_ids: + new_current_ids = [] + for current_id in current_ids: + query = {"has_input": current_id} + documents = mdb.alldocs.find(query) + + if not documents: + continue + + for document in documents: + doc_id = document.get("id") + all_collected_ids.append(doc_id) + has_output = document.get("has_output", []) + + if not has_output and any( + doc_type in dg_descendants + for doc_type in document.get("type", []) + ): + was_informed_by_query = {"was_informed_by": doc_id} + informed_by_docs = mdb.alldocs.find(was_informed_by_query) + + for informed_by_doc in informed_by_docs: + all_collected_ids.append(informed_by_doc.get("id")) + all_collected_ids.extend( + informed_by_doc.get("has_input", []) + ) + all_collected_ids.extend( + informed_by_doc.get("has_output", []) + ) + continue + + new_current_ids.extend( + op + for op in has_output + if get_classname_from_typecode(op) != "DataObject" + ) + all_collected_ids.extend(has_output) + + if any( + doc_type in dg_descendants + for doc_type in document.get("type", []) + ): + was_informed_by_query = {"was_informed_by": doc_id} + informed_by_docs = mdb.alldocs.find(was_informed_by_query) + for informed_by_doc in informed_by_docs: + all_collected_ids.append(informed_by_doc.get("id")) + all_collected_ids.extend( + informed_by_doc.get("has_input", []) + ) + all_collected_ids.extend( + informed_by_doc.get("has_output", []) + ) + + current_ids = new_current_ids + + result = { + "biosample_id": biosample_id, + "associated_ids": all_collected_ids, + } + biosample_associated_ids.append(result) + + return {"biosample_rollup": biosample_associated_ids} + + @op(config_schema={"nmdc_study_id": str}, required_resource_keys={"mongo"}) def get_ncbi_export_pipeline_study(context: OpExecutionContext) -> Any: nmdc_study = find_study_by_id( diff --git a/nmdc_runtime/site/repository.py b/nmdc_runtime/site/repository.py index 5d7f1987..7c75f796 100644 --- a/nmdc_runtime/site/repository.py +++ b/nmdc_runtime/site/repository.py @@ -43,6 +43,7 @@ ingest_neon_benthic_metadata, ingest_neon_surface_water_metadata, ensure_alldocs, + biosample_rollup_export, nmdc_study_to_ncbi_submission_export, ) from nmdc_runtime.site.resources import ( @@ -911,6 +912,32 @@ def biosample_export(): }, }, ), + biosample_rollup_export.to_job( + resource_defs=resource_defs, + config={ + "resources": merge( + unfreeze(normal_resources), + { + "mongo": { + "config": { + "host": {"env": "MONGO_HOST"}, + "username": {"env": "MONGO_USERNAME"}, + "password": {"env": "MONGO_PASSWORD"}, + "dbname": {"env": "MONGO_DBNAME"}, + }, + }, + }, + ), + "ops": { + "get_biosample_rollup_pipeline_input": { + "config": { + "nmdc_study_id": "", + } + }, + "export_json_to_drs": {"config": {"username": ""}}, + }, + }, + ), ] diff --git a/nmdc_runtime/site/util.py b/nmdc_runtime/site/util.py index 4280fe65..5a805ae8 100644 --- a/nmdc_runtime/site/util.py +++ b/nmdc_runtime/site/util.py @@ -4,7 +4,10 @@ from pymongo.database import Database as MongoDatabase +from linkml_runtime.utils.schemaview import SchemaView +from nmdc_schema.get_nmdc_view import ViewGetter from nmdc_runtime.api.db.mongo import get_collection_names_from_schema +from nmdc_runtime.minter.config import typecodes from nmdc_runtime.site.resources import mongo_resource mode_test = { @@ -24,6 +27,8 @@ }, } +DATABASE_CLASS_NAME = "Database" + def run_and_log(shell_cmd, context): process = Popen(shell_cmd, shell=True, stdout=PIPE, stderr=STDOUT) @@ -47,3 +52,43 @@ def schema_collection_has_index_on_id(mdb: MongoDatabase) -> dict: def get_basename(filename: str) -> str: return os.path.basename(filename) + + +def get_name_of_class_objects_in_collection( + schema_view: SchemaView, collection_name: str +) -> str: + slot_definition = schema_view.induced_slot(collection_name, DATABASE_CLASS_NAME) + return slot_definition.range + + +@lru_cache +def get_class_names_to_collection_names_map(): + vg = ViewGetter() + schema_view = vg.get_view() + + collection_names = get_collection_names_from_schema() + + class_names_to_collection_names = {} + for collection_name in collection_names: + class_name = get_name_of_class_objects_in_collection( + schema_view, collection_name + ) + class_names_to_collection_names[class_name] = collection_name + + return class_names_to_collection_names + + +def get_collection_from_typecode(doc_id: str): + typecode = doc_id.split(":")[1].split("-")[0] + class_map_data = typecodes() + + class_map = { + entry["name"]: entry["schema_class"].split(":")[1] for entry in class_map_data + } + class_name = class_map.get(typecode) + if class_name: + collection_dict = get_class_names_to_collection_names_map() + collection_name = collection_dict.get(class_name) + return collection_name + + return None