Skip to content

Commit

Permalink
Register dagster scheduled job to re-materialize alldocs daily (#810)
Browse files Browse the repository at this point in the history
* create materialize_alldocs_daily schedule

* style: reformat

* style: reformat

* snake-case job name for materialize_alldocs_daily

* fix: do not partially sync via selected endpoints

Want to use a dagster sensor or mongo change stream listener to cover all updates,
e.g. via queries:run or via side channel direct mongo update.

* feat: atomic replace of `alldocs`

- restore `materialize_alldocs()` call for `apply_changesheet` and `apply_metadata_in` graphs
- leave TODO for ensuring re-materialization on update via `POST /queries:run` (want this to no block API response)

closes #809

* fix: rename

* fix: pytest.mark.skip unrelated regression

* Undo skipping of test now that its dependency has been satisfied

---------

Co-authored-by: Jing - Peters MBP <jingcao.yale@gmail.com>
Co-authored-by: github-actions <github-actions@github.com>
Co-authored-by: Mansi <mkshah605@gmail.com>
Co-authored-by: Donny Winston <donny@polyneme.xyz>
Co-authored-by: eecavanna <134325062+eecavanna@users.noreply.github.com>
  • Loading branch information
6 people authored Dec 17, 2024
1 parent 2bd127e commit b328235
Show file tree
Hide file tree
Showing 4 changed files with 49 additions and 37 deletions.
20 changes: 11 additions & 9 deletions nmdc_runtime/api/endpoints/queries.py
Original file line number Diff line number Diff line change
Expand Up @@ -227,14 +227,16 @@ def _run_query(query, mdb) -> CommandResponse:
if cmd_response.ok
else QueryRun(qid=query.id, ran_at=ran_at, error=cmd_response)
)
if q_type in (DeleteCommand, UpdateCommand) and cmd_response.n == 0:
raise HTTPException(
status_code=status.HTTP_418_IM_A_TEAPOT,
detail=(
f"{'update' if q_type is UpdateCommand else 'delete'} command modified zero documents."
" I'm guessing that's not what you expected. Check the syntax of your request."
" But what do I know? I'm just a teapot.",
),
)
if q_type in (DeleteCommand, UpdateCommand):
# TODO `_request_dagster_run` of `ensure_alldocs`?
if cmd_response.n == 0:
raise HTTPException(
status_code=status.HTTP_418_IM_A_TEAPOT,
detail=(
f"{'update' if q_type is UpdateCommand else 'delete'} command modified zero documents."
" I'm guessing that's not what you expected. Check the syntax of your request."
" But what do I know? I'm just a teapot.",
),
)
mdb.query_runs.insert_one(query_run.model_dump(exclude_unset=True))
return cmd_response
2 changes: 2 additions & 0 deletions nmdc_runtime/site/graphs.py
Original file line number Diff line number Diff line change
Expand Up @@ -117,12 +117,14 @@ def apply_changesheet():
sheet_in = get_changesheet_in()
outputs = perform_changesheet_updates(sheet_in)
add_output_run_event(outputs)
materialize_alldocs()


@graph
def apply_metadata_in():
outputs = perform_mongo_updates(get_json_in())
add_output_run_event(outputs)
materialize_alldocs()


@graph
Expand Down
55 changes: 28 additions & 27 deletions nmdc_runtime/site/ops.py
Original file line number Diff line number Diff line change
Expand Up @@ -1054,10 +1054,7 @@ def materialize_alldocs(context) -> int:
# TODO include functional_annotation_agg for "real-time" ref integrity checking.
# For now, production use cases for materialized `alldocs` are limited to `id`-having collections.
collection_names = populated_schema_collection_names_with_id_field(mdb)
context.log.info(f"{collection_names=}")

# Build alldocs
context.log.info("constructing `alldocs` collection")
context.log.info(f"constructing `alldocs` collection using {collection_names=}")

document_class_names = set(
chain.from_iterable(collection_name_to_class_names.values())
Expand All @@ -1070,7 +1067,8 @@ def materialize_alldocs(context) -> int:
for cls_name in document_class_names
}

# Any ancestor of a document class is a document-referenceable range, i.e., a valid range of a document-reference-ranged slot.
# Any ancestor of a document class is a document-referencable range,
# i.e., a valid range of a document-reference-ranged slot.
document_referenceable_ranges = set(
chain.from_iterable(
schema_view.class_ancestors(cls_name) for cls_name in document_class_names
Expand All @@ -1086,17 +1084,15 @@ def materialize_alldocs(context) -> int:
):
document_reference_ranged_slots[cls_name].append(slot_name)

# Drop any existing `alldocs` collection (e.g. from previous use of this op).
#
# FIXME: This "nuke and pave" approach introduces a race condition.
# For example, if someone were to visit an API endpoint that uses the "alldocs" collection,
# the endpoint would fail to perform its job since the "alldocs" collection is temporarily missing.
#
mdb.alldocs.drop()
# Build `alldocs` to a temporary collection for atomic replacement
# https://www.mongodb.com/docs/v6.0/reference/method/db.collection.renameCollection/#resource-locking-in-replica-sets
temp_alldocs_collection_name = f"tmp.alldocs.{ObjectId()}"
temp_alldocs_collection = mdb[temp_alldocs_collection_name]
context.log.info(f"constructing `{temp_alldocs_collection.name}` collection")

for coll_name in collection_names:
context.log.info(f"{coll_name=}")
requests = []
write_operations = []
documents_processed_counter = 0
for doc in mdb[coll_name].find():
doc_type = doc["type"][5:] # lop off "nmdc:" prefix
Expand All @@ -1105,30 +1101,35 @@ def materialize_alldocs(context) -> int:
]
new_doc = keyfilter(lambda slot: slot in slots_to_include, doc)
new_doc["_type_and_ancestors"] = schema_view.class_ancestors(doc_type)
requests.append(InsertOne(new_doc))
if len(requests) == BULK_WRITE_BATCH_SIZE:
_ = mdb.alldocs.bulk_write(requests, ordered=False)
requests.clear()
write_operations.append(InsertOne(new_doc))
if len(write_operations) == BULK_WRITE_BATCH_SIZE:
_ = temp_alldocs_collection.bulk_write(write_operations, ordered=False)
write_operations.clear()
documents_processed_counter += BULK_WRITE_BATCH_SIZE
if len(requests) > 0:
_ = mdb.alldocs.bulk_write(requests, ordered=False)
documents_processed_counter += len(requests)
if len(write_operations) > 0:
_ = temp_alldocs_collection.bulk_write(write_operations, ordered=False)
documents_processed_counter += len(write_operations)
context.log.info(
f"Inserted {documents_processed_counter} documents from {coll_name=} "
)

context.log.info(
f"refreshed {mdb.alldocs} collection with {mdb.alldocs.estimated_document_count()} docs."
f"produced `{temp_alldocs_collection.name}` collection with"
f" {temp_alldocs_collection.estimated_document_count()} docs."
)

# Re-idx for `alldocs` collection
mdb.alldocs.create_index("id", unique=True)
# The indexes were added to improve the performance of the
# /data_objects/study/{study_id} endpoint
context.log.info(f"creating indexes on `{temp_alldocs_collection.name}` ...")
# Ensure unique index on "id". Index creation here is blocking (i.e. background=False),
# so that `temp_alldocs_collection` will be "good to go" on renaming.
temp_alldocs_collection.create_index("id", unique=True)
# Add indexes to improve performance of `GET /data_objects/study/{study_id}`:
slots_to_index = ["has_input", "has_output", "was_informed_by"]
[mdb.alldocs.create_index(slot) for slot in slots_to_index]

[temp_alldocs_collection.create_index(slot) for slot in slots_to_index]
context.log.info(f"created indexes on id, {slots_to_index}.")

context.log.info(f"renaming `{temp_alldocs_collection.name}` to `alldocs`...")
temp_alldocs_collection.rename("alldocs", dropTarget=True)

return mdb.alldocs.estimated_document_count()


Expand Down
9 changes: 8 additions & 1 deletion nmdc_runtime/site/repository.py
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,13 @@
job=housekeeping.to_job(**preset_normal),
)

ensure_alldocs_daily = ScheduleDefinition(
name="daily_ensure_alldocs",
cron_schedule="0 3 * * *",
execution_timezone="America/New_York",
job=ensure_alldocs.to_job(**preset_normal),
)


def asset_materialization_metadata(asset_event, key):
"""Get metadata from an asset materialization event.
Expand Down Expand Up @@ -453,7 +460,7 @@ def repo():
export_study_biosamples_metadata.to_job(**preset_normal),
ensure_alldocs.to_job(**preset_normal),
]
schedules = [housekeeping_weekly]
schedules = [housekeeping_weekly, ensure_alldocs_daily]
sensors = [
done_object_put_ops,
ensure_gold_translation_job,
Expand Down

0 comments on commit b328235

Please sign in to comment.