diff --git a/nmdc_runtime/site/graphs.py b/nmdc_runtime/site/graphs.py index fbdd4549..f2e844e7 100644 --- a/nmdc_runtime/site/graphs.py +++ b/nmdc_runtime/site/graphs.py @@ -57,6 +57,9 @@ get_ncbi_export_pipeline_inputs, ncbi_submission_xml_from_nmdc_study, ncbi_submission_xml_asset, + get_database_updater_inputs, + nmdc_study_id_filename, + missing_data_generation_repair, ) from nmdc_runtime.site.export.study_metadata import get_biosamples_by_study_id @@ -467,3 +470,16 @@ def nmdc_study_to_ncbi_submission_export(): all_instruments, ) ncbi_submission_xml_asset(xml_data) + + +@graph +def fill_missing_data_generation_data_object_records(): + (study_id, gold_nmdc_instrument_mapping_file_url) = get_database_updater_inputs() + gold_nmdc_instrument_map_df = get_df_from_url(gold_nmdc_instrument_mapping_file_url) + + database = missing_data_generation_repair(study_id, gold_nmdc_instrument_map_df) + + database_dict = nmdc_schema_object_to_dict(database) + filename = nmdc_study_id_filename(study_id) + outputs = export_json_to_drs(database_dict, filename) + add_output_run_event(outputs) diff --git a/nmdc_runtime/site/ops.py b/nmdc_runtime/site/ops.py index a9516c3f..019a6b91 100644 --- a/nmdc_runtime/site/ops.py +++ b/nmdc_runtime/site/ops.py @@ -91,7 +91,12 @@ from nmdc_runtime.site.translation.submission_portal_translator import ( SubmissionPortalTranslator, ) -from nmdc_runtime.site.util import run_and_log, schema_collection_has_index_on_id +from nmdc_runtime.site.repair.database_updater import DatabaseUpdater +from nmdc_runtime.site.util import ( + run_and_log, + schema_collection_has_index_on_id, + nmdc_study_id_to_filename, +) from nmdc_runtime.util import ( drs_object_in_for, get_names_of_classes_in_effective_range_of_slot, @@ -1241,3 +1246,58 @@ def ncbi_submission_xml_from_nmdc_study( all_instruments, ) return ncbi_xml + + +@op +def nmdc_study_id_filename(nmdc_study_id: str) -> str: + filename = nmdc_study_id_to_filename(nmdc_study_id) + return f"missing_database_records_for_{filename}.json" + + +@op( + config_schema={ + "nmdc_study_id": str, + "gold_nmdc_instrument_mapping_file_url": str, + }, + out={ + "nmdc_study_id": Out(str), + "gold_nmdc_instrument_mapping_file_url": Out(str), + }, +) +def get_database_updater_inputs(context: OpExecutionContext) -> Tuple[str, str]: + return ( + context.op_config["nmdc_study_id"], + context.op_config["gold_nmdc_instrument_mapping_file_url"], + ) + + +@op( + required_resource_keys={ + "runtime_api_user_client", + "runtime_api_site_client", + "gold_api_client", + } +) +def missing_data_generation_repair( + context: OpExecutionContext, + nmdc_study_id: str, + gold_nmdc_instrument_map_df: pd.DataFrame, +) -> nmdc.Database: + runtime_api_user_client: RuntimeApiUserClient = ( + context.resources.runtime_api_user_client + ) + runtime_api_site_client: RuntimeApiSiteClient = ( + context.resources.runtime_api_site_client + ) + gold_api_client: GoldApiClient = context.resources.gold_api_client + + database_updater = DatabaseUpdater( + runtime_api_user_client, + runtime_api_site_client, + gold_api_client, + nmdc_study_id, + gold_nmdc_instrument_map_df, + ) + database = database_updater.create_missing_dg_records() + + return database diff --git a/nmdc_runtime/site/repair/__init__.py b/nmdc_runtime/site/repair/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/nmdc_runtime/site/repair/database_updater.py b/nmdc_runtime/site/repair/database_updater.py new file mode 100644 index 00000000..27d91ce7 --- /dev/null +++ b/nmdc_runtime/site/repair/database_updater.py @@ -0,0 +1,150 @@ +from functools import lru_cache +from typing import Any, Dict, List +import pandas as pd +from nmdc_runtime.site.resources import ( + RuntimeApiUserClient, + RuntimeApiSiteClient, + GoldApiClient, +) +from nmdc_runtime.site.translation.gold_translator import GoldStudyTranslator +from nmdc_schema import nmdc + + +class DatabaseUpdater: + def __init__( + self, + runtime_api_user_client: RuntimeApiUserClient, + runtime_api_site_client: RuntimeApiSiteClient, + gold_api_client: GoldApiClient, + study_id: str, + gold_nmdc_instrument_map_df: pd.DataFrame = pd.DataFrame(), + ): + """This class serves as an API for repairing connections in the database by + adding records that are essentially missing "links"/"connections". As we identify + common use cases for adding missing records to the database, we can + add helper methods to this class. + + :param runtime_api_user_client: An object of RuntimeApiUserClient which can be + used to retrieve instance records from the NMDC database. + :param runtime_api_site_client: An object of RuntimeApiSiteClient which can be + used to mint new IDs for the repaired records that need to be added into the NMDC database. + :param gold_api_client: An object of GoldApiClient which can be used to retrieve + records from GOLD via the GOLD API. + :param study_id: NMDC study ID for which the missing records need to be added. + :param gold_nmdc_instrument_map_df: A dataframe originally stored as a TSV mapping file in the + NMDC schema repo, which maps GOLD instrument IDs to IDs of NMDC instrument_set records. + """ + self.runtime_api_user_client = runtime_api_user_client + self.runtime_api_site_client = runtime_api_site_client + self.gold_api_client = gold_api_client + self.study_id = study_id + self.gold_nmdc_instrument_map_df = gold_nmdc_instrument_map_df + + @lru_cache + def _fetch_gold_biosample(self, gold_biosample_id: str) -> List[Dict[str, Any]]: + """Fetch response from GOLD /biosamples API for a given biosample id. + + :param gold_biosample_id: GOLD biosample ID. + :return: Dictionary containing the response from the GOLD /biosamples API. + """ + return self.gold_api_client.fetch_biosample_by_biosample_id(gold_biosample_id) + + @lru_cache + def _fetch_gold_projects(self, gold_biosample_id: str): + """Fetch response from GOLD /projects API for a given biosample id. + + :param gold_biosample_id: GOLD biosample ID + :return: Dictionary containing the response from the GOLD /projects API. + """ + return self.gold_api_client.fetch_projects_by_biosample(gold_biosample_id) + + def create_missing_dg_records(self): + """This method creates missing data generation records for a given study in the NMDC database using + metadata from GOLD. The way the logic works is, it first fetches all the biosamples associated + with the study from the NMDC database. Then, it fetches all the biosample and project data data + associated with the individual biosamples from the GOLD API using the NMDC-GOLD biosample id + mappings on the "gold_biosample_identifiers" key/slot. We use the GoldStudyTranslator class + to mint the required number of `nmdc:DataGeneration` (`nmdc:NucleotideSequencing`) records based + on the number of GOLD sequencing projects, and then reimplement only the part of logic from that + class which is responsible for making data_generation_set records. + + :return: An instance of `nmdc:Database` object which is JSON-ified and rendered on the frontend. + """ + database = nmdc.Database() + + biosample_set = self.runtime_api_user_client.get_biosamples_for_study( + self.study_id + ) + + all_gold_biosamples = [] + all_gold_projects = [] + for biosample in biosample_set: + gold_biosample_identifiers = biosample.get("gold_biosample_identifiers") + if gold_biosample_identifiers: + for gold_biosample_id in gold_biosample_identifiers: + gold_biosample = self._fetch_gold_biosample(gold_biosample_id)[0] + gold_projects = self._fetch_gold_projects(gold_biosample_id) + gold_biosample["projects"] = gold_projects + + all_gold_biosamples.append(gold_biosample) + all_gold_projects.extend(gold_projects) + + gold_study_translator = GoldStudyTranslator( + biosamples=all_gold_biosamples, + projects=all_gold_projects, + gold_nmdc_instrument_map_df=self.gold_nmdc_instrument_map_df, + ) + + # The GoldStudyTranslator class has some pre-processing logic which filters out + # invalid biosamples and projects (based on `sequencingStrategy`, `projectStatus`, etc.) + filtered_biosamples = gold_study_translator.biosamples + filtered_projects = gold_study_translator.projects + + gold_project_ids = [project["projectGoldId"] for project in filtered_projects] + nmdc_nucleotide_sequencing_ids = self.runtime_api_site_client.mint_id( + "nmdc:NucleotideSequencing", len(gold_project_ids) + ).json() + gold_project_to_nmdc_nucleotide_sequencing_ids = dict( + zip(gold_project_ids, nmdc_nucleotide_sequencing_ids) + ) + + gold_to_nmdc_biosample_ids = {} + + for biosample in biosample_set: + gold_ids = biosample.get("gold_biosample_identifiers", []) + for gold_id in gold_ids: + gold_id_stripped = gold_id.replace("gold:", "") + gold_to_nmdc_biosample_ids[gold_id_stripped] = biosample["id"] + + database.data_generation_set = [] + # Similar to the logic in GoldStudyTranslator, the number of nmdc:NucleotideSequencing records + # created is based on the number of GOLD sequencing projects + for project in filtered_projects: + # map the projectGoldId to the NMDC biosample ID + biosample_gold_id = next( + ( + biosample["biosampleGoldId"] + for biosample in filtered_biosamples + if any( + p["projectGoldId"] == project["projectGoldId"] + for p in biosample.get("projects", []) + ) + ), + None, + ) + + if biosample_gold_id: + nmdc_biosample_id = gold_to_nmdc_biosample_ids.get(biosample_gold_id) + if nmdc_biosample_id: + database.data_generation_set.append( + gold_study_translator._translate_nucleotide_sequencing( + project, + nmdc_nucleotide_sequencing_id=gold_project_to_nmdc_nucleotide_sequencing_ids[ + project["projectGoldId"] + ], + nmdc_biosample_id=nmdc_biosample_id, + nmdc_study_id=self.study_id, + ) + ) + + return database diff --git a/nmdc_runtime/site/repository.py b/nmdc_runtime/site/repository.py index a1477394..ee1bcdbb 100644 --- a/nmdc_runtime/site/repository.py +++ b/nmdc_runtime/site/repository.py @@ -44,6 +44,7 @@ ingest_neon_surface_water_metadata, ensure_alldocs, nmdc_study_to_ncbi_submission_export, + fill_missing_data_generation_data_object_records, ) from nmdc_runtime.site.resources import ( get_mongo, @@ -922,6 +923,54 @@ def biosample_export(): ] +@repository +def database_record_repair(): + normal_resources = run_config_frozen__normal_env["resources"] + return [ + fill_missing_data_generation_data_object_records.to_job( + resource_defs=resource_defs, + config={ + "resources": merge( + unfreeze(normal_resources), + { + "runtime_api_user_client": { + "config": { + "base_url": {"env": "API_HOST"}, + "username": {"env": "API_ADMIN_USER"}, + "password": {"env": "API_ADMIN_PASS"}, + }, + }, + "runtime_api_site_client": { + "config": { + "base_url": {"env": "API_HOST"}, + "client_id": {"env": "API_SITE_CLIENT_ID"}, + "client_secret": {"env": "API_SITE_CLIENT_SECRET"}, + "site_id": {"env": "API_SITE_ID"}, + }, + }, + "gold_api_client": { + "config": { + "base_url": {"env": "GOLD_API_BASE_URL"}, + "username": {"env": "GOLD_API_USERNAME"}, + "password": {"env": "GOLD_API_PASSWORD"}, + }, + }, + }, + ), + "ops": { + "get_database_updater_inputs": { + "config": { + "nmdc_study_id": "", + "gold_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/gold_seqMethod_to_nmdc_instrument_set.tsv", + } + }, + "export_json_to_drs": {"config": {"username": ""}}, + }, + }, + ), + ] + + # @repository # def validation(): # graph_jobs = [validate_jgi_job, validate_gold_job, validate_emsl_job] diff --git a/nmdc_runtime/site/resources.py b/nmdc_runtime/site/resources.py index 7ceb693d..c00b0900 100644 --- a/nmdc_runtime/site/resources.py +++ b/nmdc_runtime/site/resources.py @@ -129,16 +129,23 @@ def get_omics_processing_records_by_gold_project_id(self, gold_project_id: str): return response.json()["cursor"]["firstBatch"] def get_biosamples_for_study(self, study_id: str): + # TODO: 10000 is an arbitrarily large number that has been chosen for the max_page_size param. + # The /nmdcschema/{collection-name} endpoint implements pagination via the page_token mechanism, + # but the tradeoff there is that we would need to make multiple requests to step through the + # each of the pages. By picking a large number for max_page_size, we can get all the results + # in a single request. + # This method previously used the /queries:run endpoint but the problem with that was that + # it used to truncate the number of results returned to 100. response = self.request( - "POST", - f"/queries:run", + "GET", + f"/nmdcschema/biosample_set", { - "find": "biosample_set", - "filter": {"part_of": {"$elemMatch": {"$eq": study_id}}}, + "filter": json.dumps({"associated_studies": study_id}), + "max_page_size": 10000, }, ) response.raise_for_status() - return response.json()["cursor"]["firstBatch"] + return response.json()["resources"] def get_omics_processing_by_name(self, name: str): response = self.request( @@ -370,6 +377,18 @@ def fetch_study(self, id: str) -> Union[Dict[str, Any], None]: return None return results[0] + def fetch_projects_by_biosample(self, biosample_id: str) -> List[Dict[str, Any]]: + id = self._normalize_id(biosample_id) + results = self.request("/projects", params={"biosampleGoldId": id}) + return results + + def fetch_biosample_by_biosample_id( + self, biosample_id: str + ) -> List[Dict[str, Any]]: + id = self._normalize_id(biosample_id) + results = self.request("/biosamples", params={"biosampleGoldId": id}) + return results + @resource( config_schema={ diff --git a/nmdc_runtime/site/util.py b/nmdc_runtime/site/util.py index 4280fe65..1f09cb6d 100644 --- a/nmdc_runtime/site/util.py +++ b/nmdc_runtime/site/util.py @@ -1,8 +1,9 @@ import os -from functools import lru_cache -from subprocess import Popen, PIPE, STDOUT, CalledProcessError +from dagster import op +from functools import lru_cache from pymongo.database import Database as MongoDatabase +from subprocess import Popen, PIPE, STDOUT, CalledProcessError from nmdc_runtime.api.db.mongo import get_collection_names_from_schema from nmdc_runtime.site.resources import mongo_resource @@ -47,3 +48,7 @@ def schema_collection_has_index_on_id(mdb: MongoDatabase) -> dict: def get_basename(filename: str) -> str: return os.path.basename(filename) + + +def nmdc_study_id_to_filename(nmdc_study_id: str) -> str: + return nmdc_study_id.replace(":", "_").replace("-", "_") diff --git a/nmdc_runtime/site/workspace.yaml b/nmdc_runtime/site/workspace.yaml index 5da09ab9..531ad21e 100644 --- a/nmdc_runtime/site/workspace.yaml +++ b/nmdc_runtime/site/workspace.yaml @@ -13,7 +13,7 @@ load_from: attribute: biosample_submission_ingest - python_package: package_name: nmdc_runtime.site.repository - attribute: biosample_export + attribute: database_record_repair # - python_package: # package_name: nmdc_runtime.site.repository # attribute: validation diff --git a/tests/test_data/test_database_updater.py b/tests/test_data/test_database_updater.py new file mode 100644 index 00000000..a96f3efa --- /dev/null +++ b/tests/test_data/test_database_updater.py @@ -0,0 +1,114 @@ +import pytest + +import pandas as pd + +from unittest.mock import MagicMock, patch + +from nmdc_runtime.site.repair.database_updater import DatabaseUpdater + + +@pytest.fixture +def test_setup(test_minter): + mock_runtime_api_user_client = MagicMock() + mock_runtime_api_site_client = MagicMock() + mock_gold_api_client = MagicMock() + + study_id = "nmdc:sty-11-e4yb9z58" + mock_gold_nmdc_instrument_map_df = pd.DataFrame( + { + "GOLD SeqMethod": [ + "Illumina HiSeq", + "Illumina HiSeq 2500-1TB", + ], + "NMDC instrument_set id": [ + "nmdc:inst-14-79zxap02", + "nmdc:inst-14-nn4b6k72", + ], + } + ) + + mint_id_mock = MagicMock() + mint_id_mock.json.return_value = test_minter("nmdc:NucleotideSequencing", 1) + mock_runtime_api_site_client.mint_id.return_value = mint_id_mock + + database_updater = DatabaseUpdater( + runtime_api_user_client=mock_runtime_api_user_client, + runtime_api_site_client=mock_runtime_api_site_client, + gold_api_client=mock_gold_api_client, + study_id=study_id, + gold_nmdc_instrument_map_df=mock_gold_nmdc_instrument_map_df, + ) + + return { + "runtime_api_user_client": mock_runtime_api_user_client, + "runtime_api_site_client": mock_runtime_api_site_client, + "gold_api_client": mock_gold_api_client, + "database_updater": database_updater, + "study_id": study_id, + } + + +@patch("nmdc_runtime.site.repair.database_updater.GoldStudyTranslator") +def test_create_missing_dg_records(MockGoldStudyTranslator, test_setup): + mock_runtime_api_user_client = test_setup["runtime_api_user_client"] + mock_runtime_api_site_client = test_setup["runtime_api_site_client"] + mock_gold_api_client = test_setup["gold_api_client"] + database_updater = test_setup["database_updater"] + + mock_runtime_api_user_client.get_biosamples_for_study.return_value = [ + { + "id": "nmdc:bsm-11-q59jb831", + "gold_biosample_identifiers": ["gold:Gb0150488"], + } + ] + + mock_gold_api_client.fetch_biosample_by_biosample_id.return_value = [ + { + "biosampleGoldId": "Gb0150488", + "biosampleName": "Switchgrass phyllosphere microbial communities", + "projects": [ + { + "projectGoldId": "Gp0208640", + "biosampleGoldId": "Gb0150488", + "sequencingStrategy": "Metagenome", + } + ], + } + ] + + mock_gold_api_client.fetch_projects_by_biosample.return_value = [ + { + "projectGoldId": "Gp0208640", + "biosampleGoldId": "Gb0150488", + "sequencingStrategy": "Metagenome", + } + ] + + MockGoldStudyTranslator.return_value.biosamples = [ + {"biosampleGoldId": "Gb0150488", "projects": [{"projectGoldId": "Gp0208640"}]} + ] + MockGoldStudyTranslator.return_value.projects = [{"projectGoldId": "Gp0208640"}] + + MockGoldStudyTranslator.return_value._translate_nucleotide_sequencing.return_value = MagicMock( + id="nmdc:dgns-00-12345678", + biosample_id="nmdc:bsm-11-q59jb831", + ) + + database = database_updater.create_missing_dg_records() + + assert len(database.data_generation_set) > 0 + assert database.data_generation_set[0].id.startswith("nmdc:dgns-00-") + assert database.data_generation_set[0].biosample_id == "nmdc:bsm-11-q59jb831" + + mock_runtime_api_user_client.get_biosamples_for_study.assert_called_once_with( + test_setup["study_id"] + ) + mock_gold_api_client.fetch_biosample_by_biosample_id.assert_called_once_with( + "gold:Gb0150488" + ) + mock_gold_api_client.fetch_projects_by_biosample.assert_called_once_with( + "gold:Gb0150488" + ) + mock_runtime_api_site_client.mint_id.assert_called_once_with( + "nmdc:NucleotideSequencing", 1 + )