Skip to content

Commit

Permalink
feat: test coverage for apply_metadata_in job (#635)
Browse files Browse the repository at this point in the history
closes #631
  • Loading branch information
dwinston authored Aug 13, 2024
1 parent f61f842 commit 3820750
Show file tree
Hide file tree
Showing 3 changed files with 75 additions and 34 deletions.
50 changes: 29 additions & 21 deletions nmdc_runtime/api/endpoints/metadata.py
Original file line number Diff line number Diff line change
Expand Up @@ -250,13 +250,39 @@ async def submit_json_nmdcdb(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(rv),
)

extra_run_config_data = _ensure_job__metadata_in(docs, user.username, mdb)

requested = _request_dagster_run(
nmdc_workflow_id="metadata-in-1.0.0",
nmdc_workflow_inputs=[], # handled by _request_dagster_run given extra_run_config_data
extra_run_config_data=extra_run_config_data,
mdb=mdb,
user=user,
)
if requested["type"] == "success":
return requested
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=(
f"Runtime failed to start metadata-in-1.0.0 job. "
f'Detail: {requested["detail"]}'
),
)


def _ensure_job__metadata_in(
docs, username, mdb, client_id=API_SITE_CLIENT_ID, drs_object_exists_ok=False
):
drs_obj_doc = persist_content_and_get_drs_object(
content=json.dumps(docs),
username=user.username,
username=username,
filename=None,
content_type="application/json",
description="JSON metadata in",
id_ns="json-metadata-in",
exists_ok=drs_object_exists_ok,
)
job_spec = {
"workflow": {"id": "metadata-in-1.0.0"},
Expand All @@ -276,9 +302,9 @@ async def submit_json_nmdcdb(
detail=f'failed to complete metadata-in-1.0.0/{drs_obj_doc["id"]} job',
)

site = get_site(mdb, client_id=API_SITE_CLIENT_ID)
site = get_site(mdb, client_id=client_id)
operation = _claim_job(job.id, mdb, site)
extra_run_config_data = {
return {
"ops": {
"get_json_in": {
"config": {
Expand All @@ -288,21 +314,3 @@ async def submit_json_nmdcdb(
"perform_mongo_updates": {"config": {"operation_id": operation["id"]}},
}
}

requested = _request_dagster_run(
nmdc_workflow_id="metadata-in-1.0.0",
nmdc_workflow_inputs=[], # handled by _request_dagster_run given extra_run_config_data
extra_run_config_data=extra_run_config_data,
mdb=mdb,
user=user,
)
if requested["type"] == "success":
return requested
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
detail=(
f"Runtime failed to start metadata-in-1.0.0 job. "
f'Detail: {requested["detail"]}'
),
)
35 changes: 29 additions & 6 deletions nmdc_runtime/api/endpoints/util.py
Original file line number Diff line number Diff line change
Expand Up @@ -415,6 +415,7 @@ def persist_content_and_get_drs_object(
filename=None,
content_type="application/json",
id_ns="json-metadata-in",
exists_ok=False,
):
mdb = get_mongo_db()
drs_id = local_part(generate_one_id(mdb, ns=id_ns, shoulder="gfs0"))
Expand Down Expand Up @@ -453,12 +454,22 @@ def persist_content_and_get_drs_object(
)
self_uri = f"drs://{HOSTNAME_EXTERNAL}/{drs_id}"
return _create_object(
mdb, object_in, mgr_site="nmdc-runtime", drs_id=drs_id, self_uri=self_uri
mdb,
object_in,
mgr_site="nmdc-runtime",
drs_id=drs_id,
self_uri=self_uri,
exists_ok=exists_ok,
)


def _create_object(
mdb: MongoDatabase, object_in: DrsObjectIn, mgr_site, drs_id, self_uri
mdb: MongoDatabase,
object_in: DrsObjectIn,
mgr_site,
drs_id,
self_uri,
exists_ok=False,
):
drs_obj = DrsObject(
**object_in.model_dump(exclude_unset=True),
Expand All @@ -471,10 +482,22 @@ def _create_object(
mdb.objects.insert_one(doc)
except DuplicateKeyError as e:
if e.details["keyPattern"] == {"checksums.type": 1, "checksums.checksum": 1}:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"provided checksum matches existing object: {e.details['keyValue']}",
)
if exists_ok:
return mdb.objects.find_one(
{
"checksums": {
"$elemMatch": {
"type": e.details["keyValue"]["checksums.type"],
"checksum": e.details["keyValue"]["checksums.checksum"],
}
}
}
)
else:
raise HTTPException(
status_code=status.HTTP_409_CONFLICT,
detail=f"provided checksum matches existing object: {e.details['keyValue']}",
)
else:
raise HTTPException(
status_code=status.HTTP_500_INTERNAL_SERVER_ERROR,
Expand Down
24 changes: 17 additions & 7 deletions tests/test_ops/test_ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,11 @@
import os

import pytest
from dagster import build_op_context
from dagster import build_op_context, RunConfig

from nmdc_runtime.api.endpoints.metadata import _ensure_job__metadata_in
from nmdc_runtime.api.endpoints.util import persist_content_and_get_drs_object
from nmdc_runtime.site.repository import preset_normal
from nmdc_runtime.site.resources import (
mongo_resource,
runtime_api_site_client_resource,
Expand All @@ -13,10 +15,7 @@
RuntimeApiUserClient,
)

from nmdc_runtime.site.ops import (
perform_mongo_updates,
_add_schema_docs_with_or_without_replacement,
)
from nmdc_runtime.site.graphs import apply_metadata_in


@pytest.fixture
Expand Down Expand Up @@ -50,7 +49,7 @@ def op_context():
)


def test_perform_mongo_updates_functional_annotation_agg(op_context):
def test_apply_metadata_in_functional_annotation_agg(op_context):
mongo = op_context.resources.mongo
docs = {
"functional_annotation_agg": [
Expand All @@ -70,7 +69,18 @@ def test_perform_mongo_updates_functional_annotation_agg(op_context):
for doc_spec in docs["functional_annotation_agg"]:
mongo.db.functional_annotation_agg.delete_many(doc_spec)

_add_schema_docs_with_or_without_replacement(mongo, docs)
extra_run_config_data = _ensure_job__metadata_in(
docs,
op_context.resources.runtime_api_user_client.username,
mongo.db,
op_context.resources.runtime_api_site_client.client_id,
drs_object_exists_ok=True, # If there exists a DRS object with a matching checksum, use it.
)

apply_metadata_in.to_job(**preset_normal).execute_in_process(
run_config=extra_run_config_data
)

assert (
mongo.db.functional_annotation_agg.count_documents(
{"$or": docs["functional_annotation_agg"]}
Expand Down

0 comments on commit 3820750

Please sign in to comment.