Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Dagster job to populate missing biosample records for a study from GOLD #859

Merged
merged 7 commits into from
Jan 23, 2025
13 changes: 13 additions & 0 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@

from nmdc_runtime.site.ops import (
build_merged_db,
missing_gold_biosample_repair,
nmdc_schema_database_export_filename,
nmdc_schema_database_from_gold_study,
nmdc_schema_object_to_dict,
Expand Down Expand Up @@ -483,3 +484,15 @@ def fill_missing_data_generation_data_object_records():
filename = nmdc_study_id_filename(study_id)
outputs = export_json_to_drs(database_dict, filename)
add_output_run_event(outputs)


@graph
def fill_missing_biosample_records_from_gold():
(study_id, gold_nmdc_instrument_mapping_file_url) = get_database_updater_inputs()
gold_nmdc_instrument_map_df = get_df_from_url(gold_nmdc_instrument_mapping_file_url)

database = missing_gold_biosample_repair(study_id, gold_nmdc_instrument_map_df)
database_dict = nmdc_schema_object_to_dict(database)
filename = nmdc_study_id_filename(study_id)
outputs = export_json_to_drs(database_dict, filename)
add_output_run_event(outputs)
32 changes: 32 additions & 0 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1301,3 +1301,35 @@ def missing_data_generation_repair(
database = database_updater.create_missing_dg_records()

return database


@op(
required_resource_keys={
"runtime_api_user_client",
"runtime_api_site_client",
"gold_api_client",
}
)
def missing_gold_biosample_repair(
context: OpExecutionContext,
nmdc_study_id: str,
gold_nmdc_instrument_map_df: pd.DataFrame,
) -> nmdc.Database:
runtime_api_user_client: RuntimeApiUserClient = (
context.resources.runtime_api_user_client
)
runtime_api_site_client: RuntimeApiSiteClient = (
context.resources.runtime_api_site_client
)
gold_api_client: GoldApiClient = context.resources.gold_api_client

database_updater = DatabaseUpdater(
runtime_api_user_client,
runtime_api_site_client,
gold_api_client,
nmdc_study_id,
gold_nmdc_instrument_map_df,
)
database = database_updater.generate_biosample_set_from_gold_api_for_study()

return database
80 changes: 79 additions & 1 deletion nmdc_runtime/site/repair/database_updater.py
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ def _fetch_gold_projects(self, gold_biosample_id: str):
"""
return self.gold_api_client.fetch_projects_by_biosample(gold_biosample_id)

def create_missing_dg_records(self):
def create_missing_dg_records(self) -> nmdc.Database:
"""This method creates missing data generation records for a given study in the NMDC database using
metadata from GOLD. The way the logic works is, it first fetches all the biosamples associated
with the study from the NMDC database. Then, it fetches all the biosample and project data data
Expand Down Expand Up @@ -148,3 +148,81 @@ class which is responsible for making data_generation_set records.
)

return database

def generate_biosample_set_from_gold_api_for_study(self) -> nmdc.Database:
"""This method creates biosample_set records for a given study in the NMDC database using
metadata from GOLD. The logic works by first fetching the biosampleGoldId values of all
biosamples associated with the study. Then, it fetches the list of all biosamples associated
with the GOLD study using the GOLD API. There's pre-processing logic in the GoldStudyTranslator
to filter out biosamples based on `sequencingStrategy` and `projectStatus`. On this list of
filtered biosamples, we compute a "set difference" (conceptually) between the list of
filtered samples and ones that are already in the NMDC database, i.e., we ignore biosamples
that are already present in the database, and continue on to create biosample_set records for
those that do not have records in the database already.

:return: An instance of `nmdc:Database` object which is JSON-ified and rendered on the frontend.
"""
database = nmdc.Database()

# get a list of all biosamples associated with a given NMDC study id
biosample_set = self.runtime_api_user_client.get_biosamples_for_study(
self.study_id
)

# get a list of GOLD biosample ids (`biosampleGoldId` values) by iterating
# over all the biosample_set records retrieved using the above logic
nmdc_gold_ids = set()
for biosample in biosample_set:
gold_ids = biosample.get("gold_biosample_identifiers", [])
for gold_id in gold_ids:
nmdc_gold_ids.add(gold_id.replace("gold:", ""))

# retrieve GOLD study id by looking at the `gold_study_identifiers` key/slot
# on the NMDC study record
nmdc_study = self.runtime_api_user_client.get_study(self.study_id)[0]
gold_study_id = nmdc_study.get("gold_study_identifiers", [])[0].replace(
"gold:", ""
)

# use the GOLD study id to fetch all biosample records associated with the study
gold_biosamples_for_study = self.gold_api_client.fetch_biosamples_by_study(
gold_study_id
)

# part of the code where we are (conceptually) computing a set difference between
# the list of filtered samples and ones that are already in the NMDC database
missing_gold_biosamples = [
gbs
for gbs in gold_biosamples_for_study
if gbs.get("biosampleGoldId") not in nmdc_gold_ids
]

gold_study_translator = GoldStudyTranslator(
biosamples=missing_gold_biosamples,
gold_nmdc_instrument_map_df=self.gold_nmdc_instrument_map_df,
)

translated_biosamples = gold_study_translator.biosamples

# mint new NMDC biosample IDs for the "missing" biosamples
gold_biosample_ids = [
biosample["biosampleGoldId"] for biosample in translated_biosamples
]
nmdc_biosample_ids = self.runtime_api_site_client.mint_id(
"nmdc:Biosample", len(translated_biosamples)
).json()
gold_to_nmdc_biosample_ids = dict(zip(gold_biosample_ids, nmdc_biosample_ids))

database.biosample_set = [
gold_study_translator._translate_biosample(
biosample,
nmdc_biosample_id=gold_to_nmdc_biosample_ids[
biosample["biosampleGoldId"]
],
nmdc_study_id=self.study_id,
nmdc_field_site_id=None,
)
for biosample in translated_biosamples
]

return database
42 changes: 42 additions & 0 deletions nmdc_runtime/site/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from nmdc_runtime.api.models.trigger import Trigger
from nmdc_runtime.site.export.study_metadata import export_study_biosamples_metadata
from nmdc_runtime.site.graphs import (
fill_missing_biosample_records_from_gold,
translate_metadata_submission_to_nmdc_schema_database,
ingest_metadata_submission,
gold_study_to_database,
Expand Down Expand Up @@ -968,6 +969,47 @@ def database_record_repair():
},
},
),
fill_missing_biosample_records_from_gold.to_job(
resource_defs=resource_defs,
config={
"resources": merge(
unfreeze(normal_resources),
{
"runtime_api_user_client": {
"config": {
"base_url": {"env": "API_HOST"},
"username": {"env": "API_ADMIN_USER"},
"password": {"env": "API_ADMIN_PASS"},
},
},
"runtime_api_site_client": {
"config": {
"base_url": {"env": "API_HOST"},
"client_id": {"env": "API_SITE_CLIENT_ID"},
"client_secret": {"env": "API_SITE_CLIENT_SECRET"},
"site_id": {"env": "API_SITE_ID"},
},
},
"gold_api_client": {
"config": {
"base_url": {"env": "GOLD_API_BASE_URL"},
"username": {"env": "GOLD_API_USERNAME"},
"password": {"env": "GOLD_API_PASSWORD"},
},
},
},
),
"ops": {
"get_database_updater_inputs": {
"config": {
"nmdc_study_id": "",
"gold_nmdc_instrument_mapping_file_url": "https://raw.githubusercontent.com/microbiomedata/nmdc-schema/refs/heads/main/assets/misc/gold_seqMethod_to_nmdc_instrument_set.tsv",
}
},
"export_json_to_drs": {"config": {"username": ""}},
},
},
),
]


Expand Down
12 changes: 12 additions & 0 deletions nmdc_runtime/site/resources.py
Original file line number Diff line number Diff line change
Expand Up @@ -159,6 +159,18 @@ def get_omics_processing_by_name(self, name: str):
response.raise_for_status()
return response.json()["cursor"]["firstBatch"]

def get_study(self, study_id: str):
response = self.request(
"POST",
f"/queries:run",
{
"find": "study_set",
"filter": {"id": study_id},
},
)
response.raise_for_status()
return response.json()["cursor"]["firstBatch"]


class RuntimeApiSiteClient(RuntimeApiClient):
def __init__(
Expand Down
Loading
Loading