diff --git a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py index a4608762bd6..a17be42dc08 100644 --- a/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py +++ b/backend/onyx/background/celery/tasks/doc_permission_syncing/tasks.py @@ -59,6 +59,7 @@ from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSync from onyx.redis.redis_connector_doc_perm_sync import RedisConnectorPermissionSyncPayload from onyx.redis.redis_pool import get_redis_client +from onyx.redis.redis_pool import get_redis_replica_client from onyx.redis.redis_pool import redis_lock_dump from onyx.server.utils import make_short_id from onyx.utils.logger import doc_permission_sync_ctx @@ -124,6 +125,7 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool # we need to use celery's redis client to access its redis data # (which lives on a different db number) r = get_redis_client(tenant_id=tenant_id) + r_replica = get_redis_replica_client(tenant_id=tenant_id) r_celery: Redis = self.app.broker_connection().channel().client # type: ignore lock_beat: RedisLock = r.lock( @@ -164,7 +166,9 @@ def check_for_doc_permissions_sync(self: Task, *, tenant_id: str | None) -> bool # tasks can be in the queue in redis, in reserved tasks (prefetched by the worker), # or be currently executing try: - validate_permission_sync_fences(tenant_id, r, r_celery, lock_beat) + validate_permission_sync_fences( + tenant_id, r, r_replica, r_celery, lock_beat + ) except Exception: task_logger.exception( "Exception while validating permission sync fences" @@ -487,6 +491,7 @@ def update_external_document_permissions_task( def validate_permission_sync_fences( tenant_id: str | None, r: Redis, + r_replica: Redis, r_celery: Redis, lock_beat: RedisLock, ) -> None: @@ -509,7 +514,7 @@ def validate_permission_sync_fences( # validate all existing permission sync jobs lock_beat.reacquire() - keys = cast(set[Any], r.smembers(OnyxRedisConstants.ACTIVE_FENCES)) + keys = cast(set[Any], r_replica.smembers(OnyxRedisConstants.ACTIVE_FENCES)) for key in keys: key_bytes = cast(bytes, key) key_str = key_bytes.decode("utf-8") diff --git a/backend/onyx/background/celery/tasks/vespa/tasks.py b/backend/onyx/background/celery/tasks/vespa/tasks.py index e50c0fdfc65..af2d2aad7a8 100644 --- a/backend/onyx/background/celery/tasks/vespa/tasks.py +++ b/backend/onyx/background/celery/tasks/vespa/tasks.py @@ -1025,15 +1025,6 @@ def vespa_metadata_sync_task( # the sync might repeat again later mark_document_as_synced(document_id, db_session) - # this code checks for and removes a per document sync key that is - # used to block out the same doc from continualy resyncing - # a quick hack that is only needed for production issues - # redis_syncing_key = RedisConnectorCredentialPair.make_redis_syncing_key( - # document_id - # ) - # r = get_redis_client(tenant_id=tenant_id) - # r.delete(redis_syncing_key) - elapsed = time.monotonic() - start task_logger.info( f"doc={document_id} " diff --git a/backend/onyx/redis/redis_connector_credential_pair.py b/backend/onyx/redis/redis_connector_credential_pair.py index 52beefa5b08..db8d526c0dc 100644 --- a/backend/onyx/redis/redis_connector_credential_pair.py +++ b/backend/onyx/redis/redis_connector_credential_pair.py @@ -32,8 +32,6 @@ class RedisConnectorCredentialPair(RedisObjectHelper): PREFIX = "connectorsync" TASKSET_PREFIX = PREFIX + "_taskset" - # SYNCING_PREFIX = PREFIX + ":vespa_syncing" - def __init__(self, tenant_id: str | None, id: int) -> None: super().__init__(tenant_id, str(id)) @@ -56,11 +54,6 @@ def set_skip_docs(self, skip_docs: set[str]) -> None: # the list on the fly self.skip_docs = skip_docs - # @staticmethod - # def make_redis_syncing_key(doc_id: str) -> str: - # """used to create a key in redis to block a doc from syncing""" - # return f"{RedisConnectorCredentialPair.SYNCING_PREFIX}:{doc_id}" - def generate_tasks( self, max_tasks: int, @@ -108,15 +101,6 @@ def generate_tasks( if doc.id in self.skip_docs: continue - # an arbitrary number in seconds to prevent the same doc from syncing repeatedly - # SYNC_EXPIRATION = 24 * 60 * 60 - - # a quick hack that can be uncommented to prevent a doc from resyncing over and over - # redis_syncing_key = self.make_redis_syncing_key(doc.id) - # if redis_client.exists(redis_syncing_key): - # continue - # redis_client.set(redis_syncing_key, custom_task_id, ex=SYNC_EXPIRATION) - # celery's default task id format is "dd32ded3-00aa-4884-8b21-42f8332e7fac" # the key for the result is "celery-task-meta-dd32ded3-00aa-4884-8b21-42f8332e7fac" # we prefix the task id so it's easier to keep track of who created the task