Skip to content

Commit

Permalink
Merge pull request #907 from microbiomedata/1063-bug-update-ncbi-xml-…
Browse files Browse the repository at this point in the history
…export-pipeline-to-use-respective-collections-for-data-fetching-rather-than-alldocs

Replace the use of `alldocs` with respective collections for record retrieval in NCBI exporter
  • Loading branch information
sujaypatil96 authored Feb 20, 2025
2 parents 1ec03eb + b4feee9 commit c7f1331
Show file tree
Hide file tree
Showing 3 changed files with 110 additions and 35 deletions.
25 changes: 23 additions & 2 deletions nmdc_runtime/site/export/ncbi_xml.py
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import xml.etree.ElementTree as ET
import xml.dom.minidom

from typing import Any
from typing import Any, List, Union
from urllib.parse import urlparse
from nmdc_runtime.site.export.ncbi_xml_utils import (
get_instruments,
Expand Down Expand Up @@ -366,7 +366,14 @@ def set_fastq(
)
# Currently, we are making the assumption that only one instrument
# is used to sequence a Biosample
instrument_id = ntseq.get("instrument_used", "")[0]
instrument_used: List[str] = ntseq.get(
"instrument_used", []
)
if not instrument_used:
instrument_id = None
else:
instrument_id = instrument_used[0]

instrument = all_instruments.get(instrument_id, {})
instrument_vendor = instrument.get("vendor", "")
instrument_model = instrument.get("model", "")
Expand Down Expand Up @@ -448,6 +455,20 @@ def set_fastq(
"Attribute", "NextSeq 550", {"name": "instrument_model"}
)
)
elif instrument_model == "novaseq_6000":
sra_attributes.append(
self.set_element(
"Attribute",
"NovaSeq 6000",
{"name": "instrument_model"},
)
)
elif instrument_model == "hiseq":
sra_attributes.append(
self.set_element(
"Attribute", "HiSeq", {"name": "instrument_model"}
)
)

if analyte_category == "metagenome":
sra_attributes.append(
Expand Down
111 changes: 81 additions & 30 deletions nmdc_runtime/site/export/ncbi_xml_utils.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
from io import BytesIO, StringIO
from typing import Any, Dict, List, Union

from nmdc_runtime.api.endpoints.util import strip_oid
from nmdc_runtime.minter.config import typecodes
from lxml import etree
from pymongo.collection import Collection

import csv
import requests
Expand Down Expand Up @@ -45,35 +49,53 @@ def get_instruments(instrument_set_collection):
raise RuntimeError(f"An error occurred while fetching instrument data: {e}")


def fetch_data_objects_from_biosamples(all_docs_collection, biosamples_list):
def fetch_data_objects_from_biosamples(
all_docs_collection: Collection,
data_object_set: Collection,
biosamples_list: List[Dict[str, Any]],
) -> List[Dict[str, Dict[str, Any]]]:
"""This method fetches the data objects that are "associated" (derived from/products of)
with their respective biosamples by iterating over the alldocs collection recursively.
The methods returns a dictionary with biosample ids as keys and the associated list of
data objects as values.
:param all_docs_collection: reference to the alldocs collection
:param data_object_set: reference to the data_object_set collection
:param biosamples_list: list of biosamples as JSON documents
:return: list of dictionaries with biosample ids as keys and associated data objects as values
"""
biosample_data_objects = []

def collect_data_objects(doc_ids, collected_objects, unique_ids):
for doc_id in doc_ids:
if (
get_classname_from_typecode(doc_id) == "DataObject"
and doc_id not in unique_ids
):
data_obj = data_object_set.find_one({"id": doc_id})
if data_obj:
collected_objects.append(strip_oid(data_obj))
unique_ids.add(doc_id)

biosample_data_objects = []

for biosample in biosamples_list:
current_ids = [biosample["id"]]
collected_data_objects = []
unique_ids = set()

while current_ids:
new_current_ids = []
for current_id in current_ids:
query = {"has_input": current_id}
document = all_docs_collection.find_one(query)
for doc in all_docs_collection.find({"has_input": current_id}):
has_output = doc.get("has_output", [])

if not document:
continue

has_output = document.get("has_output")
if not has_output:
continue

for output_id in has_output:
if get_classname_from_typecode(output_id) == "DataObject":
data_object_doc = all_docs_collection.find_one(
{"id": output_id}
)
if data_object_doc:
collected_data_objects.append(data_object_doc)
else:
new_current_ids.append(output_id)
collect_data_objects(has_output, collected_data_objects, unique_ids)
new_current_ids.extend(
op
for op in has_output
if get_classname_from_typecode(op) != "DataObject"
)

current_ids = new_current_ids

Expand All @@ -83,12 +105,25 @@ def fetch_data_objects_from_biosamples(all_docs_collection, biosamples_list):
return biosample_data_objects


def fetch_nucleotide_sequencing_from_biosamples(all_docs_collection, biosamples_list):
biosample_data_objects = []
def fetch_nucleotide_sequencing_from_biosamples(
all_docs_collection: Collection,
data_generation_set: Collection,
biosamples_list: List[Dict[str, Any]],
) -> List[Dict[str, Dict[str, Any]]]:
"""This method fetches the nucleotide sequencing process records that create data objects
for biosamples by iterating over the alldocs collection recursively.
:param all_docs_collection: reference to the alldocs collection
:param data_generation_set: reference to the data_generation_set collection
:param biosamples_list: list of biosamples as JSON documents
:return: list of dictionaries with biosample ids as keys and associated nucleotide sequencing
process objects as values
"""
biosample_ntseq_objects = []

for biosample in biosamples_list:
current_ids = [biosample["id"]]
collected_data_objects = []
collected_ntseq_objects = []

while current_ids:
new_current_ids = []
Expand All @@ -105,23 +140,39 @@ def fetch_nucleotide_sequencing_from_biosamples(all_docs_collection, biosamples_

for output_id in has_output:
if get_classname_from_typecode(output_id) == "DataObject":
nucleotide_sequencing_doc = all_docs_collection.find_one(
nucleotide_sequencing_doc = data_generation_set.find_one(
{"id": document["id"]}
)
if nucleotide_sequencing_doc:
collected_data_objects.append(nucleotide_sequencing_doc)
collected_ntseq_objects.append(
strip_oid(nucleotide_sequencing_doc)
)
else:
new_current_ids.append(output_id)

current_ids = new_current_ids

if collected_data_objects:
biosample_data_objects.append({biosample["id"]: collected_data_objects})
if collected_ntseq_objects:
biosample_ntseq_objects.append({biosample["id"]: collected_ntseq_objects})

return biosample_ntseq_objects

return biosample_data_objects

def fetch_library_preparation_from_biosamples(
all_docs_collection: Collection,
material_processing_set: Collection,
biosamples_list: List[Dict[str, Any]],
) -> List[Dict[str, Dict[str, Any]]]:
"""This method fetches the library preparation process records that create processed samples,
which are further fed/inputted into (by `has_input` slot) a nucleotide sequencing process
for biosamples by iterating over the alldocs collection recursively.
def fetch_library_preparation_from_biosamples(all_docs_collection, biosamples_list):
:param all_docs_collection: reference to the alldocs collection
:param material_processing_set: reference to the material_processing_set collection
:param biosamples_list: list of biosamples as JSON documents
:return: list of dictionaries with biosample ids as keys and associated library preparation process
objects as values
"""
biosample_lib_prep = []

for biosample in biosamples_list:
Expand All @@ -144,10 +195,10 @@ def fetch_library_preparation_from_biosamples(all_docs_collection, biosamples_li
"has_input": output_id,
"type": {"$in": ["LibraryPreparation"]},
}
lib_prep_doc = all_docs_collection.find_one(lib_prep_query)
lib_prep_doc = material_processing_set.find_one(lib_prep_query)

if lib_prep_doc:
biosample_lib_prep.append({biosample_id: lib_prep_doc})
biosample_lib_prep.append({biosample_id: strip_oid(lib_prep_doc)})
break # Stop at the first document that meets the criteria

return biosample_lib_prep
Expand Down
9 changes: 6 additions & 3 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1188,8 +1188,9 @@ def get_ncbi_export_pipeline_inputs(context: OpExecutionContext) -> str:
def get_data_objects_from_biosamples(context: OpExecutionContext, biosamples: list):
mdb = context.resources.mongo.db
alldocs_collection = mdb["alldocs"]
data_object_set = mdb["data_object_set"]
biosample_data_objects = fetch_data_objects_from_biosamples(
alldocs_collection, biosamples
alldocs_collection, data_object_set, biosamples
)
return biosample_data_objects

Expand All @@ -1200,8 +1201,9 @@ def get_nucleotide_sequencing_from_biosamples(
):
mdb = context.resources.mongo.db
alldocs_collection = mdb["alldocs"]
data_generation_set = mdb["data_generation_set"]
biosample_omics_processing = fetch_nucleotide_sequencing_from_biosamples(
alldocs_collection, biosamples
alldocs_collection, data_generation_set, biosamples
)
return biosample_omics_processing

Expand All @@ -1212,8 +1214,9 @@ def get_library_preparation_from_biosamples(
):
mdb = context.resources.mongo.db
alldocs_collection = mdb["alldocs"]
material_processing_set = mdb["material_processing_set"]
biosample_lib_prep = fetch_library_preparation_from_biosamples(
alldocs_collection, biosamples
alldocs_collection, material_processing_set, biosamples
)
return biosample_lib_prep

Expand Down

0 comments on commit c7f1331

Please sign in to comment.