Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

478 refactor v1 workflows activities endpoint #479

Merged
merged 9 commits into from
Feb 14, 2024
66 changes: 63 additions & 3 deletions nmdc_runtime/api/endpoints/workflows.py
Original file line number Diff line number Diff line change
@@ -1,13 +1,22 @@
from typing import List
import os
from typing import Any, List

import pymongo
from fastapi import APIRouter, Depends

from fastapi import APIRouter, Depends, HTTPException
from motor.motor_asyncio import AsyncIOMotorDatabase
from pymongo.database import Database as MongoDatabase
from pymongo.errors import BulkWriteError
from starlette import status

from nmdc_runtime.api.core.util import raise404_if_none
from nmdc_runtime.api.db.mongo import get_mongo_db
from nmdc_runtime.api.db.mongo import get_mongo_db, activity_collection_names
from nmdc_runtime.api.models.capability import Capability
from nmdc_runtime.api.models.object_type import ObjectType
from nmdc_runtime.api.models.site import Site, get_current_client_site
from nmdc_runtime.api.models.workflow import Workflow
from nmdc_runtime.site.resources import MongoDB
from nmdc_runtime.util import validate_json

router = APIRouter()

Expand Down Expand Up @@ -43,3 +52,54 @@ def list_workflow_capabilities(
):
doc = raise404_if_none(mdb.workflows.find_one({"id": workflow_id}))
return list(mdb.capabilities.find({"id": {"$in": doc.get("capability_ids", [])}}))


# TODO: Create activity.py in ../models
@router.post("/workflows/activities")
async def post_activity(
activity_set: dict[str, Any],
site: Site = Depends(get_current_client_site),
mdb: MongoDatabase = Depends(get_mongo_db),
):
"""
Please migrate all workflows from `v1/workflows/activities` to this endpoint.
-------
Post activity set to database and claim job.

Parameters
-------
activity_set: dict[str,Any]
Set of activities for specific workflows.

Returns
-------
dict[str,str]

"""
_ = site # must be authenticated
try:
# verify activities in activity_set are nmdc-schema compliant
for collection_name in activity_set:
if collection_name not in activity_collection_names(mdb):
raise ValueError("keys must be nmdc-schema activity collection names`")
# validate request JSON
rv = validate_json(activity_set, mdb)
if rv["result"] == "errors":
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(rv),
)
# create mongodb instance for dagster
mongo_resource = MongoDB(
host=os.getenv("MONGO_HOST"),
dbname=os.getenv("MONGO_DBNAME"),
username=os.getenv("MONGO_USERNAME"),
password=os.getenv("MONGO_PASSWORD"),
)
mongo_resource.add_docs(activity_set, validate=False, replace=True)
# TODO: Update return value to List[Activity]
return {"message": "jobs accepted"}
except BulkWriteError as e:
raise HTTPException(status_code=409, detail=str(e))
except ValueError as e:
raise HTTPException(status_code=409, detail=str(e))
56 changes: 35 additions & 21 deletions nmdc_runtime/api/v1/workflows/activities.py
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
"""Module."""
import os
from typing import Any

from fastapi import APIRouter, BackgroundTasks, Depends, HTTPException
from fastapi import APIRouter, Depends, HTTPException
from motor.motor_asyncio import AsyncIOMotorDatabase
from pymongo.database import Database as MongoDatabase
from pymongo.errors import BulkWriteError
from starlette import status

from components.nmdc_runtime.workflow_execution_activity import (
ActivityService,
Database,
from nmdc_runtime.api.db.mongo import (
get_mongo_db,
activity_collection_names,
)
from nmdc_runtime.api.db.mongo import get_async_mongo_db, get_mongo_db
from nmdc_runtime.api.models.site import Site, get_current_client_site
from nmdc_runtime.site.resources import MongoDB
from nmdc_runtime.util import validate_json

router = APIRouter(
prefix="/workflows/activities", tags=["workflow_execution_activities"]
Expand All @@ -26,31 +28,43 @@ async def job_to_db(job_spec: dict[str, Any], mdb: AsyncIOMotorDatabase) -> None
@router.post("", status_code=status.HTTP_201_CREATED)
async def post_activity(
activity_set: dict[str, Any],
background_tasks: BackgroundTasks,
site: Site = Depends(get_current_client_site),
mdb: MongoDatabase = Depends(get_mongo_db),
amdb: AsyncIOMotorDatabase = Depends(get_async_mongo_db),
) -> dict[str, str]:
"""Post activity set to database and claim job.

Parameters
"""
**NOTE: This endpoint is DEPRECATED. Please migrate to `~/workflows/activities`.**
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why the leading ~? I'll change to POST /workflows/activities.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not PR-blocking

----------
activity_set : dict[str,Any]
The `v1/workflows/activities` endpoint will be removed in an upcoming release.
--
Post activity set to database and claim job.

Parameters: activity_set: dict[str,Any]
Set of activities for specific workflows.

Returns
-------
dict[str,str]
Returns: dict[str,str]
"""
_ = site # must be authenticated
try:
activity_service = ActivityService()
nmdc_db = Database(**activity_set)
activities = await activity_service.add_activity_set(nmdc_db, mdb)
# background_tasks.add_task(
# activity_service.create_jobs, activities, nmdc_db.data_object_set, amdb
# )
# verify activities in activity_set are nmdc-schema compliant
for collection_name in activity_set:
if collection_name not in activity_collection_names(mdb):
raise ValueError("keys must be nmdc-schema activity collection names`")
# validate request JSON
rv = validate_json(activity_set, mdb)
if rv["result"] == "errors":
raise HTTPException(
status_code=status.HTTP_422_UNPROCESSABLE_ENTITY,
detail=str(rv),
)
# create mongodb instance for dagster
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

more accurately, use same logic as the dagster job for json:submit endpoint.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not PR-blocking

mongo_resource = MongoDB(
host=os.getenv("MONGO_HOST"),
dbname=os.getenv("MONGO_DBNAME"),
username=os.getenv("MONGO_USERNAME"),
password=os.getenv("MONGO_PASSWORD"),
)
mongo_resource.add_docs(activity_set, validate=False, replace=True)
return {"message": "jobs accepted"}

except BulkWriteError as e:
raise HTTPException(status_code=409, detail=str(e))
except ValueError as e:
Expand Down
43 changes: 43 additions & 0 deletions tests/test_api/test_endpoints.py
Original file line number Diff line number Diff line change
Expand Up @@ -228,3 +228,46 @@ def test_submit_changesheet():
)
mdb.objects.delete_one({"id": drs_obj_doc["id"]})
assert True


def test_submit_workflow_activities(api_site_client):
test_collection, test_id = (
"read_qc_analysis_activity_set",
"nmdc:wfrqc-11-t0tvnp52.2",
)
test_payload = {
test_collection: [
{
"id": test_id,
"name": "Read QC Activity for nmdc:wfrqc-11-t0tvnp52.1",
"started_at_time": "2024-01-11T20:48:30.718133+00:00",
"ended_at_time": "2024-01-11T21:11:44.884260+00:00",
"was_informed_by": "nmdc:omprc-11-9mvz7z22",
"execution_resource": "NERSC-Perlmutter",
"git_url": "https://github.com/microbiomedata/ReadsQC",
"has_input": ["nmdc:dobj-11-gpthnj64"],
"has_output": [
"nmdc:dobj-11-w5dak635",
"nmdc:dobj-11-g6d71n77",
"nmdc:dobj-11-bds7qq03",
],
"type": "nmdc:ReadQcAnalysisActivity",
"part_of": ["nmdc:omprc-11-9mvz7z22"],
"version": "v1.0.8",
}
]
}
mdb = get_mongo_db()
if doc_to_restore := mdb[test_collection].find_one({"id": test_id}):
mdb[test_collection].delete_one({"id": test_id})
rv = api_site_client.request(
"POST",
"/v1/workflows/activities",
test_payload,
)
assert rv.json() == {"message": "jobs accepted"}
rv = api_site_client.request("GET", f"/nmdcschema/ids/{test_id}")
mdb[test_collection].delete_one({"id": test_id})
if doc_to_restore:
mdb[test_collection].insert_one(doc_to_restore)
assert "id" in rv.json() and "input_read_count" not in rv.json()
Loading