Skip to content

Commit

Permalink
Merge pull request #608 from microbiomedata/issue-401-reimplement-dat…
Browse files Browse the repository at this point in the history
…a-objects-study-id-endpoint

reimplement `/data_objects/study/{study_id}` using alldocs
  • Loading branch information
sujaypatil96 authored Aug 21, 2024
2 parents 3820750 + b90710b commit c996255
Show file tree
Hide file tree
Showing 4 changed files with 204 additions and 38 deletions.
36 changes: 36 additions & 0 deletions nmdc_runtime/api/db/mongo.py
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
import gzip
import json
import os
from contextlib import AbstractContextManager
from functools import lru_cache
from typing import Set, Dict, Any, Iterable
from uuid import uuid4

import bson
from linkml_runtime import SchemaView
from nmdc_schema.get_nmdc_view import ViewGetter
from nmdc_schema.nmdc_data import get_nmdc_schema_definition
Expand Down Expand Up @@ -107,3 +110,36 @@ def mongodump_excluded_collections():
)
)
return excluded_collections


def mongorestore_collection(mdb, collection_name, bson_file_path):
"""
Replaces the specified collection with one that reflects the contents of the
specified BSON file.
"""
with gzip.open(bson_file_path, "rb") as bson_file:
data = bson.decode_all(bson_file.read())
if data:
mdb.drop_collection(collection_name)
mdb[collection_name].insert_many(data)
print(
f"mongorestore_collection: {len(data)} documents into {collection_name} after drop"
)


def mongorestore_from_dir(mdb, dump_directory, skip_collections=None):
"""
Effectively runs a `mongorestore` command in pure Python.
Helpful in a container context that does not have the `mongorestore` command available.
"""
skip_collections = skip_collections or []
for root, dirs, files in os.walk(dump_directory):
for file in files:
if file.endswith(".bson.gz"):
collection_name = file.replace(".bson.gz", "")
if collection_name in skip_collections:
continue
bson_file_path = os.path.join(root, file)
mongorestore_collection(mdb, collection_name, bson_file_path)

print("mongorestore_from_dir completed successfully.")
104 changes: 69 additions & 35 deletions nmdc_runtime/api/endpoints/find.py
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,8 @@

from fastapi import APIRouter, Depends, Form
from jinja2 import Environment, PackageLoader, select_autoescape
from nmdc_runtime.util import get_nmdc_jsonschema_dict, nmdc_activity_collection_names
from nmdc_runtime.minter.config import typecodes
from nmdc_runtime.util import get_nmdc_jsonschema_dict
from pymongo.database import Database as MongoDatabase
from starlette.responses import HTMLResponse
from toolz import merge, assoc_in
Expand Down Expand Up @@ -110,6 +111,21 @@ def find_data_objects(
return find_resources(req, mdb, "data_object_set")


def get_classname_from_typecode(doc_id: str) -> str:
r"""
Returns the name of the schema class of which an instance could have the specified `id`.
>>> get_classname_from_typecode("nmdc:sty-11-r2h77870")
'Study'
"""
typecode = doc_id.split(":")[1].split("-")[0]
class_map_data = typecodes()
class_map = {
entry["name"]: entry["schema_class"].split(":")[1] for entry in class_map_data
}
return class_map.get(typecode)


@router.get(
"/data_objects/study/{study_id}",
response_model_exclude_unset=True,
Expand All @@ -118,43 +134,61 @@ def find_data_objects_for_study(
study_id: str,
mdb: MongoDatabase = Depends(get_mongo_db),
):
rv = {"biosample_set": {}, "data_object_set": []}
data_object_ids = set()
"""This API endpoint is used to retrieve data object ids associated with
all the biosamples that are part of a given study. This endpoint makes
use of the `alldocs` collection for its implementation.
:param study_id: NMDC study id for which data objects are to be retrieved
:param mdb: PyMongo connection, defaults to Depends(get_mongo_db)
:return: List of dictionaries where each dictionary contains biosample id as key,
and another dictionary with key 'data_object_set' containing list of data object ids as value
"""
biosample_data_objects = []
study = raise404_if_none(
mdb.study_set.find_one({"id": study_id}, ["id"]), detail="Study not found"
)
for biosample in mdb.biosample_set.find({"part_of": study["id"]}, ["id"]):
rv["biosample_set"][biosample["id"]] = {"omics_processing_set": {}}
for opa in mdb.omics_processing_set.find(
{"has_input": biosample["id"]}, ["id", "has_output"]
):
rv["biosample_set"][biosample["id"]]["omics_processing_set"][opa["id"]] = {
"has_output": {}
}
for do_id in opa.get("has_output", []):
data_object_ids.add(do_id)
rv["biosample_set"][biosample["id"]]["omics_processing_set"][opa["id"]][
"has_output"
][do_id] = {}
for coll_name in nmdc_activity_collection_names():
acts = list(
mdb[coll_name].find({"has_input": do_id}, ["id", "has_output"])
)
if acts:
data_object_ids |= {
do for act in acts for do in act.get("has_output", [])
}
rv["biosample_set"][biosample["id"]]["omics_processing_set"][
opa["id"]
]["has_output"][do_id][coll_name] = {
act["id"]: act.get("has_output", []) for act in acts
}

rv["data_object_set"] = [
strip_oid(d)
for d in mdb.data_object_set.find({"id": {"$in": list(data_object_ids)}})
]
return rv

biosamples = mdb.biosample_set.find({"part_of": study["id"]}, ["id"])
biosample_ids = [biosample["id"] for biosample in biosamples]

for biosample_id in biosample_ids:
current_ids = [biosample_id]
collected_data_objects = []

while current_ids:
new_current_ids = []
for current_id in current_ids:
query = {"has_input": current_id}
document = mdb.alldocs.find_one(query)

if not document:
continue

has_output = document.get("has_output")
if not has_output:
continue

for output_id in has_output:
if get_classname_from_typecode(output_id) == "DataObject":
data_object_doc = mdb.data_object_set.find_one(
{"id": output_id}
)
if data_object_doc:
collected_data_objects.append(strip_oid(data_object_doc))
else:
new_current_ids.append(output_id)

current_ids = new_current_ids

if collected_data_objects:
biosample_data_objects.append(
{
"biosample_id": biosample_id,
"data_object_set": collected_data_objects,
}
)

return biosample_data_objects


@router.get(
Expand Down
74 changes: 71 additions & 3 deletions tests/test_api/test_endpoints.py
Original file line number Diff line number Diff line change
@@ -1,25 +1,83 @@
import json
import os
import re
import subprocess
import sys

import bson
import pytest
import requests
from dagster import build_op_context
from starlette import status
from tenacity import wait_random_exponential, retry
from toolz import get_in

from nmdc_runtime.api.core.auth import get_password_hash
from nmdc_runtime.api.core.metadata import df_from_sheet_in, _validate_changesheet
from nmdc_runtime.api.core.util import generate_secret, dotted_path_for
from nmdc_runtime.api.db.mongo import get_mongo_db
from nmdc_runtime.api.db.mongo import get_mongo_db, mongorestore_from_dir
from nmdc_runtime.api.endpoints.util import persist_content_and_get_drs_object
from nmdc_runtime.api.models.job import Job, JobOperationMetadata
from nmdc_runtime.api.models.metadata import ChangesheetIn
from nmdc_runtime.api.models.site import SiteInDB, SiteClientInDB
from nmdc_runtime.api.models.user import UserInDB, UserIn, User
from nmdc_runtime.site.ops import materialize_alldocs
from nmdc_runtime.site.repository import run_config_frozen__normal_env
from nmdc_runtime.site.resources import get_mongo, RuntimeApiSiteClient
from nmdc_runtime.util import REPO_ROOT_DIR
from nmdc_runtime.site.resources import get_mongo, RuntimeApiSiteClient, mongo_resource
from nmdc_runtime.util import REPO_ROOT_DIR, ensure_unique_id_indexes
from tests.test_util import download_and_extract_tar
from tests.test_ops.test_ops import op_context as test_op_context

TEST_MONGODUMPS_DIR = REPO_ROOT_DIR.joinpath("tests", "nmdcdb")
SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_BASENAME = (
"nmdc-prod-schema-collections__2024-07-29_20-12-07"
)
SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_URL = (
"https://portal.nersc.gov/cfs/m3408/meta/mongodumps/"
f"{SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_BASENAME}.tar"
) # 84MB. Should be < 100MB.


def ensure_local_mongodump_exists():
dump_dir = TEST_MONGODUMPS_DIR.joinpath(
SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_BASENAME
)
if not os.path.exists(dump_dir):
download_and_extract_tar(
url=SCHEMA_COLLECTIONS_MONGODUMP_ARCHIVE_URL, extract_to=TEST_MONGODUMPS_DIR
)
else:
print(f"local mongodump already exists at {TEST_MONGODUMPS_DIR}")
return dump_dir


def ensure_schema_collections_and_alldocs():
# Return if `alldocs` collection has already been materialized.
mdb = get_mongo_db()
if mdb.alldocs.estimated_document_count() > 0:
print(
"ensure_schema_collections_and_alldocs: `alldocs` collection already materialized"
)
return

dump_dir = ensure_local_mongodump_exists()
mongorestore_from_dir(mdb, dump_dir, skip_collections=["functional_annotation_agg"])
ensure_unique_id_indexes(mdb)
print("materializing alldocs...")
materialize_alldocs(
build_op_context(
resources={
"mongo": mongo_resource.configured(
{
"dbname": os.getenv("MONGO_DBNAME"),
"host": os.getenv("MONGO_HOST"),
"password": os.getenv("MONGO_PASSWORD"),
"username": os.getenv("MONGO_USERNAME"),
}
)
}
)
)


def ensure_test_resources(mdb):
Expand Down Expand Up @@ -60,6 +118,7 @@ def ensure_test_resources(mdb):
{"id": job_id}, job.model_dump(exclude_unset=True), upsert=True
)
mdb["minter.requesters"].replace_one({"id": site_id}, {"id": site_id}, upsert=True)
ensure_schema_collections_and_alldocs()
return {
"site_client": {
"site_id": site_id,
Expand Down Expand Up @@ -313,3 +372,12 @@ def test_get_class_name_and_collection_names_by_doc_id():
"GET", f"{base_url}/nmdcschema/ids/{id_}/collection-name"
)
assert response.status_code == 404


def test_find_data_objects_for_study(api_site_client):
ensure_schema_collections_and_alldocs()
rv = api_site_client.request(
"GET",
"/data_objects/study/nmdc:sty-11-hdd4bf83",
)
assert len(rv.json()) >= 60
28 changes: 28 additions & 0 deletions tests/test_util.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
import os
import tarfile
from copy import deepcopy

import json
Expand All @@ -13,6 +15,7 @@
from nmdc_runtime.util import get_nmdc_jsonschema_dict
from pymongo.database import Database as MongoDatabase
from pymongo.write_concern import WriteConcern
import requests

from nmdc_runtime.site.repository import run_config_frozen__normal_env
from nmdc_runtime.site.resources import get_mongo
Expand Down Expand Up @@ -133,6 +136,31 @@ def test_multiple_errors():
print(validation_errors)


def download_and_extract_tar(url, extract_to="."):
# Download the file
response = requests.get(url, stream=True)
if response.status_code == 200:
tar_path = os.path.join(extract_to, "downloaded_file.tar")
os.makedirs(extract_to, exist_ok=True)
with open(tar_path, "wb") as file:
chunk_size = 8192
print(f"Downloading tar file using stream {chunk_size=}")
for chunk in response.iter_content(chunk_size=chunk_size):
file.write(chunk)
print(f"Downloaded tar file to {tar_path}")

# Extract the tar file
with tarfile.open(tar_path, "r") as tar:
tar.extractall(path=extract_to)
print(f"Extracted tar file to {extract_to}")

# Optionally, remove the tar file after extraction
os.remove(tar_path)
print(f"Removed tar file {tar_path}")
else:
print(f"Failed to download file. Status code: {response.status_code}")


if __name__ == "__main__":
if len(sys.argv) > 1:
eval(f"{sys.argv[1]}()")
Expand Down

0 comments on commit c996255

Please sign in to comment.