Skip to content

Commit

Permalink
feat: atomic replace of alldocs
Browse files Browse the repository at this point in the history
- restore `materialize_alldocs()` call for `apply_changesheet` and `apply_metadata_in` graphs
- leave TODO for ensuring re-materialization on update via `POST /queries:run` (want this to no block API response)

closes #809
  • Loading branch information
dwinston committed Dec 17, 2024
1 parent e2863b4 commit fd3f789
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 38 deletions.
20 changes: 11 additions & 9 deletions nmdc_runtime/api/endpoints/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,16 @@ def _run_query(query, mdb) -> CommandResponse:
if cmd_response.ok
else QueryRun(qid=query.id, ran_at=ran_at, error=cmd_response)
)
if q_type in (DeleteCommand, UpdateCommand) and cmd_response.n == 0:
raise HTTPException(
status_code=status.HTTP_418_IM_A_TEAPOT,
detail=(
f"{'update' if q_type is UpdateCommand else 'delete'} command modified zero documents."
" I'm guessing that's not what you expected. Check the syntax of your request."
" But what do I know? I'm just a teapot.",
),
)
if q_type in (DeleteCommand, UpdateCommand):
# TODO `_request_dagster_run` of `ensure_alldocs`?
if cmd_response.n == 0:
raise HTTPException(
status_code=status.HTTP_418_IM_A_TEAPOT,
detail=(
f"{'update' if q_type is UpdateCommand else 'delete'} command modified zero documents."
" I'm guessing that's not what you expected. Check the syntax of your request."
" But what do I know? I'm just a teapot.",
),
)
mdb.query_runs.insert_one(query_run.model_dump(exclude_unset=True))
return cmd_response
2 changes: 2 additions & 0 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,14 @@ def apply_changesheet():
sheet_in = get_changesheet_in()
outputs = perform_changesheet_updates(sheet_in)
add_output_run_event(outputs)
materialize_alldocs()


@graph
def apply_metadata_in():
outputs = perform_mongo_updates(get_json_in())
add_output_run_event(outputs)
materialize_alldocs()


@graph
Expand Down
55 changes: 28 additions & 27 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,7 @@ def materialize_alldocs(context) -> int:
# TODO include functional_annotation_agg for "real-time" ref integrity checking.
# For now, production use cases for materialized `alldocs` are limited to `id`-having collections.
collection_names = populated_schema_collection_names_with_id_field(mdb)
context.log.info(f"{collection_names=}")

# Build alldocs
context.log.info("constructing `alldocs` collection")
context.log.info(f"constructing `alldocs` collection using {collection_names=}")

document_class_names = set(
chain.from_iterable(collection_name_to_class_names.values())
Expand All @@ -1070,7 +1067,8 @@ def materialize_alldocs(context) -> int:
for cls_name in document_class_names
}

# Any ancestor of a document class is a document-referenceable range, i.e., a valid range of a document-reference-ranged slot.
# Any ancestor of a document class is a document-referencable range,
# i.e., a valid range of a document-reference-ranged slot.
document_referenceable_ranges = set(
chain.from_iterable(
schema_view.class_ancestors(cls_name) for cls_name in document_class_names
Expand All @@ -1086,17 +1084,15 @@ def materialize_alldocs(context) -> int:
):
document_reference_ranged_slots[cls_name].append(slot_name)

# Drop any existing `alldocs` collection (e.g. from previous use of this op).
#
# FIXME: This "nuke and pave" approach introduces a race condition.
# For example, if someone were to visit an API endpoint that uses the "alldocs" collection,
# the endpoint would fail to perform its job since the "alldocs" collection is temporarily missing.
#
mdb.alldocs.drop()
# Build `alldocs` to a temporary collection for atomic replacement
# https://www.mongodb.com/docs/v6.0/reference/method/db.collection.renameCollection/#resource-locking-in-replica-sets
temp_alldocs_collection_name = f"tmp.alldocs.{ObjectId()}"
temp_alldocs_collection = mdb[temp_alldocs_collection_name]
context.log.info(f"constructing `{temp_alldocs_collection.name}` collection")

for coll_name in collection_names:
context.log.info(f"{coll_name=}")
requests = []
write_operations = []
documents_processed_counter = 0
for doc in mdb[coll_name].find():
doc_type = doc["type"][5:] # lop off "nmdc:" prefix
Expand All @@ -1105,30 +1101,35 @@ def materialize_alldocs(context) -> int:
]
new_doc = keyfilter(lambda slot: slot in slots_to_include, doc)
new_doc["_type_and_ancestors"] = schema_view.class_ancestors(doc_type)
requests.append(InsertOne(new_doc))
if len(requests) == BULK_WRITE_BATCH_SIZE:
_ = mdb.alldocs.bulk_write(requests, ordered=False)
requests.clear()
write_operations.append(InsertOne(new_doc))
if len(write_operations) == BULK_WRITE_BATCH_SIZE:
_ = temp_alldocs_collection.bulk_write(write_operations, ordered=False)
write_operations.clear()
documents_processed_counter += BULK_WRITE_BATCH_SIZE
if len(requests) > 0:
_ = mdb.alldocs.bulk_write(requests, ordered=False)
documents_processed_counter += len(requests)
if len(write_operations) > 0:
_ = temp_alldocs_collection.bulk_write(write_operations, ordered=False)
documents_processed_counter += len(write_operations)
context.log.info(
f"Inserted {documents_processed_counter} documents from {coll_name=} "
)

context.log.info(
f"refreshed {mdb.alldocs} collection with {mdb.alldocs.estimated_document_count()} docs."
f"produced `{temp_alldocs_collection.name}` collection with"
f" {temp_alldocs_collection.estimated_document_count()} docs."
)

# Re-idx for `alldocs` collection
mdb.alldocs.create_index("id", unique=True)
# The indexes were added to improve the performance of the
# /data_objects/study/{study_id} endpoint
context.log.info(f"creating indexes on `{temp_alldocs_collection.name}` ...")
# Ensure unique index on "id". Index creation here is blocking (i.e. background=False),
# so that `temp_alldocs_collection` will be "good to go" on renaming.
temp_alldocs_collection.create_index("id", unique=True)
# Add indexes to improve performance of `GET /data_objects/study/{study_id}`:
slots_to_index = ["has_input", "has_output", "was_informed_by"]
[mdb.alldocs.create_index(slot) for slot in slots_to_index]

[temp_alldocs_collection.create_index(slot) for slot in slots_to_index]
context.log.info(f"created indexes on id, {slots_to_index}.")

context.log.info(f"renaming `{temp_alldocs_collection.name}` to `alldocs`...")
temp_alldocs_collection.rename("alldocs", dropTarget=True)

return mdb.alldocs.estimated_document_count()


Expand Down
4 changes: 2 additions & 2 deletions nmdc_runtime/site/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,8 @@
job=housekeeping.to_job(**preset_normal),
)

materialize_alldocs_daily = ScheduleDefinition(
name="daily_materialize_alldocs",
ensure_alldocs_daily = ScheduleDefinition(
name="daily_ensure_alldocs",
cron_schedule="0 3 * * *",
execution_timezone="America/New_York",
job=ensure_alldocs.to_job(**preset_normal),
Expand Down

0 comments on commit fd3f789

Please sign in to comment.