Skip to content

Commit

Permalink
event driven indexing/docset/usergroup triggers (#3918)
Browse files Browse the repository at this point in the history
* WIP

* trigger indexing immediately when the ccpair is created

* add some logging and indexing trigger to the mock-credential endpoint

* better comments

* fix integration test

---------

Co-authored-by: Richard Kuo (Danswer) <rkuo@onyx.app>
  • Loading branch information
rkuo-danswer and Richard Kuo (Danswer) authored Feb 7, 2025
1 parent ef31e14 commit ae37f01
Show file tree
Hide file tree
Showing 6 changed files with 129 additions and 11 deletions.
5 changes: 5 additions & 0 deletions backend/onyx/configs/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -263,6 +263,11 @@ class PostgresAdvisoryLocks(Enum):


class OnyxCeleryQueues:
# "celery" is the default queue defined by celery and also the queue
# we are running in the primary worker to run system tasks
# Tasks running in this queue should be designed specifically to run quickly
PRIMARY = "celery"

# Light queue
VESPA_METADATA_SYNC = "vespa_metadata_sync"
DOC_PERMISSIONS_UPSERT = "doc_permissions_upsert"
Expand Down
28 changes: 28 additions & 0 deletions backend/onyx/server/documents/cc_pair.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,8 @@
try_creating_prune_generator_task,
)
from onyx.background.celery.versioned_apps.primary import app as primary_app
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.connector_credential_pair import add_credential_to_connector
from onyx.db.connector_credential_pair import (
get_connector_credential_pair_from_id_for_user,
Expand Down Expand Up @@ -228,6 +230,13 @@ def update_cc_pair_status(

db_session.commit()

# this speeds up the start of indexing by firing the check immediately
primary_app.send_task(
OnyxCeleryTask.CHECK_FOR_INDEXING,
kwargs=dict(tenant_id=tenant_id),
priority=OnyxCeleryPriority.HIGH,
)

return JSONResponse(
status_code=HTTPStatus.OK, content={"message": str(HTTPStatus.OK)}
)
Expand Down Expand Up @@ -540,7 +549,14 @@ def associate_credential_to_connector(
metadata: ConnectorCredentialPairMetadata,
user: User | None = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
tenant_id: str = Depends(get_current_tenant_id),
) -> StatusResponse[int]:
"""NOTE(rkuo): internally discussed and the consensus is this endpoint
and create_connector_with_mock_credential should be combined.
The intent of this endpoint is to handle connectors that actually need credentials.
"""

fetch_ee_implementation_or_noop(
"onyx.db.user_group", "validate_object_creation_for_user", None
)(
Expand All @@ -563,6 +579,18 @@ def associate_credential_to_connector(
groups=metadata.groups,
)

# trigger indexing immediately
primary_app.send_task(
OnyxCeleryTask.CHECK_FOR_INDEXING,
priority=OnyxCeleryPriority.HIGH,
kwargs={"tenant_id": tenant_id},
)

logger.info(
f"associate_credential_to_connector - running check_for_indexing: "
f"cc_pair={response.data}"
)

return response
except IntegrityError as e:
logger.error(f"IntegrityError: {e}")
Expand Down
22 changes: 22 additions & 0 deletions backend/onyx/server/documents/connector.py
Original file line number Diff line number Diff line change
Expand Up @@ -804,6 +804,14 @@ def create_connector_with_mock_credential(
db_session: Session = Depends(get_session),
tenant_id: str = Depends(get_current_tenant_id),
) -> StatusResponse:
"""NOTE(rkuo): internally discussed and the consensus is this endpoint
and associate_credential_to_connector should be combined.
The intent of this endpoint is to handle connectors that don't need credentials,
AKA web, file, etc ... but there isn't any reason a single endpoint couldn't
server this purpose.
"""

fetch_ee_implementation_or_noop(
"onyx.db.user_group", "validate_object_creation_for_user", None
)(
Expand Down Expand Up @@ -841,6 +849,18 @@ def create_connector_with_mock_credential(
groups=connector_data.groups,
)

# trigger indexing immediately
primary_app.send_task(
OnyxCeleryTask.CHECK_FOR_INDEXING,
priority=OnyxCeleryPriority.HIGH,
kwargs={"tenant_id": tenant_id},
)

logger.info(
f"create_connector_with_mock_credential - running check_for_indexing: "
f"cc_pair={response.data}"
)

create_milestone_and_report(
user=user,
distinct_id=user.email if user else tenant_id or "N/A",
Expand Down Expand Up @@ -1005,6 +1025,8 @@ def connector_run_once(
kwargs={"tenant_id": tenant_id},
)

logger.info("connector_run_once - running check_for_indexing")

msg = f"Marked {num_triggers} index attempts with indexing triggers."
return StatusResponse(
success=True,
Expand Down
26 changes: 26 additions & 0 deletions backend/onyx/server/features/document_set/api.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,15 @@

from onyx.auth.users import current_curator_or_admin_user
from onyx.auth.users import current_user
from onyx.background.celery.versioned_apps.primary import app as primary_app
from onyx.configs.constants import OnyxCeleryPriority
from onyx.configs.constants import OnyxCeleryTask
from onyx.db.document_set import check_document_sets_are_public
from onyx.db.document_set import fetch_all_document_sets_for_user
from onyx.db.document_set import insert_document_set
from onyx.db.document_set import mark_document_set_as_to_be_deleted
from onyx.db.document_set import update_document_set
from onyx.db.engine import get_current_tenant_id
from onyx.db.engine import get_session
from onyx.db.models import User
from onyx.server.features.document_set.models import CheckDocSetPublicRequest
Expand All @@ -29,6 +33,7 @@ def create_document_set(
document_set_creation_request: DocumentSetCreationRequest,
user: User = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
tenant_id: str = Depends(get_current_tenant_id),
) -> int:
fetch_ee_implementation_or_noop(
"onyx.db.user_group", "validate_object_creation_for_user", None
Expand All @@ -46,6 +51,13 @@ def create_document_set(
)
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))

primary_app.send_task(
OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK,
kwargs={"tenant_id": tenant_id},
priority=OnyxCeleryPriority.HIGH,
)

return document_set_db_model.id


Expand All @@ -54,6 +66,7 @@ def patch_document_set(
document_set_update_request: DocumentSetUpdateRequest,
user: User = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
tenant_id: str = Depends(get_current_tenant_id),
) -> None:
fetch_ee_implementation_or_noop(
"onyx.db.user_group", "validate_object_creation_for_user", None
Expand All @@ -72,12 +85,19 @@ def patch_document_set(
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))

primary_app.send_task(
OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK,
kwargs={"tenant_id": tenant_id},
priority=OnyxCeleryPriority.HIGH,
)


@router.delete("/admin/document-set/{document_set_id}")
def delete_document_set(
document_set_id: int,
user: User = Depends(current_curator_or_admin_user),
db_session: Session = Depends(get_session),
tenant_id: str = Depends(get_current_tenant_id),
) -> None:
try:
mark_document_set_as_to_be_deleted(
Expand All @@ -88,6 +108,12 @@ def delete_document_set(
except Exception as e:
raise HTTPException(status_code=400, detail=str(e))

primary_app.send_task(
OnyxCeleryTask.CHECK_FOR_VESPA_SYNC_TASK,
kwargs={"tenant_id": tenant_id},
priority=OnyxCeleryPriority.HIGH,
)


"""Endpoints for non-admins"""

Expand Down
5 changes: 5 additions & 0 deletions backend/onyx/server/manage/administrative.py
Original file line number Diff line number Diff line change
Expand Up @@ -197,6 +197,11 @@ def create_deletion_attempt_for_connector_id(
kwargs={"tenant_id": tenant_id},
)

logger.info(
f"create_deletion_attempt_for_connector_id - running check_for_connector_deletion: "
f"cc_pair={cc_pair.id}"
)

if cc_pair.connector.source == DocumentSource.FILE:
connector = cc_pair.connector
file_store = get_default_file_store(db_session)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,23 +1,23 @@
import time
from datetime import datetime

from onyx.db.models import IndexingStatus
from tests.integration.common_utils.managers.cc_pair import CCPairManager
from tests.integration.common_utils.managers.index_attempt import IndexAttemptManager
from tests.integration.common_utils.managers.user import UserManager
from tests.integration.common_utils.test_models import DATestIndexAttempt
from tests.integration.common_utils.test_models import DATestUser


def _verify_index_attempt_pagination(
cc_pair_id: int,
index_attempts: list[DATestIndexAttempt],
index_attempt_ids: list[int],
page_size: int = 5,
user_performing_action: DATestUser | None = None,
) -> None:
retrieved_attempts: list[int] = []
last_time_started = None # Track the last time_started seen

for i in range(0, len(index_attempts), page_size):
for i in range(0, len(index_attempt_ids), page_size):
paginated_result = IndexAttemptManager.get_index_attempt_page(
cc_pair_id=cc_pair_id,
page=(i // page_size),
Expand All @@ -26,9 +26,9 @@ def _verify_index_attempt_pagination(
)

# Verify that the total items is equal to the length of the index attempts list
assert paginated_result.total_items == len(index_attempts)
assert paginated_result.total_items == len(index_attempt_ids)
# Verify that the number of items in the page is equal to the page size
assert len(paginated_result.items) == min(page_size, len(index_attempts) - i)
assert len(paginated_result.items) == min(page_size, len(index_attempt_ids) - i)

# Verify time ordering within the page (descending order)
for attempt in paginated_result.items:
Expand All @@ -42,7 +42,7 @@ def _verify_index_attempt_pagination(
retrieved_attempts.extend([attempt.id for attempt in paginated_result.items])

# Create a set of all the expected index attempt IDs
all_expected_attempts = set(attempt.id for attempt in index_attempts)
all_expected_attempts = set(index_attempt_ids)
# Create a set of all the retrieved index attempt IDs
all_retrieved_attempts = set(retrieved_attempts)

Expand All @@ -51,6 +51,9 @@ def _verify_index_attempt_pagination(


def test_index_attempt_pagination(reset: None) -> None:
MAX_WAIT = 60
all_attempt_ids: list[int] = []

# Create an admin user to perform actions
user_performing_action: DATestUser = UserManager.create(
name="admin_performing_action",
Expand All @@ -62,20 +65,49 @@ def test_index_attempt_pagination(reset: None) -> None:
user_performing_action=user_performing_action,
)

# Create 300 successful index attempts
# Creating a CC pair will create an index attempt as well. wait for it.
start = time.monotonic()
while True:
paginated_result = IndexAttemptManager.get_index_attempt_page(
cc_pair_id=cc_pair.id,
page=0,
page_size=5,
user_performing_action=user_performing_action,
)

if paginated_result.total_items == 1:
all_attempt_ids.append(paginated_result.items[0].id)
print("Initial index attempt from cc_pair creation detected. Continuing...")
break

elapsed = time.monotonic() - start
if elapsed > MAX_WAIT:
raise TimeoutError(
f"Initial index attempt: Not detected within {MAX_WAIT} seconds."
)

print(
f"Waiting for initial index attempt: elapsed={elapsed:.2f} timeout={MAX_WAIT}"
)
time.sleep(1)

# Create 299 successful index attempts (for 300 total)
base_time = datetime.now()
all_attempts = IndexAttemptManager.create_test_index_attempts(
num_attempts=300,
generated_attempts = IndexAttemptManager.create_test_index_attempts(
num_attempts=299,
cc_pair_id=cc_pair.id,
status=IndexingStatus.SUCCESS,
base_time=base_time,
)

for attempt in generated_attempts:
all_attempt_ids.append(attempt.id)

# Verify basic pagination with different page sizes
print("Verifying basic pagination with page size 5")
_verify_index_attempt_pagination(
cc_pair_id=cc_pair.id,
index_attempts=all_attempts,
index_attempt_ids=all_attempt_ids,
page_size=5,
user_performing_action=user_performing_action,
)
Expand All @@ -84,7 +116,7 @@ def test_index_attempt_pagination(reset: None) -> None:
print("Verifying pagination with page size 100")
_verify_index_attempt_pagination(
cc_pair_id=cc_pair.id,
index_attempts=all_attempts,
index_attempt_ids=all_attempt_ids,
page_size=100,
user_performing_action=user_performing_action,
)

0 comments on commit ae37f01

Please sign in to comment.