Skip to content

Commit

Permalink
rework readiness probe logic to be less confusing and word ongoing pr…
Browse files Browse the repository at this point in the history
…obes better
  • Loading branch information
rkuo-danswer committed Nov 7, 2024
1 parent 9974f19 commit 607bfa1
Showing 1 changed file with 34 additions and 16 deletions.
50 changes: 34 additions & 16 deletions backend/danswer/background/celery/apps/app_base.py
Original file line number Diff line number Diff line change
Expand Up @@ -142,67 +142,85 @@ def on_celeryd_init(sender: Any = None, conf: Any = None, **kwargs: Any) -> None


def wait_for_redis(sender: Any, **kwargs: Any) -> None:
"""Waits for redis to become ready subject to a hardcoded timeout.
Will raise WorkerShutdown to kill the celery worker if the timeout is reached."""

r = get_redis_client(tenant_id=None)

WAIT_INTERVAL = 5
WAIT_LIMIT = 60

ready = False
time_start = time.monotonic()
logger.info("Redis: Readiness probe starting.")
while True:
try:
if r.ping():
ready = True
break
except Exception:
pass

time_elapsed = time.monotonic() - time_start
if time_elapsed > WAIT_LIMIT:
break

logger.info(
f"Redis: Readiness probe failed. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
f"Redis: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)
if time_elapsed > WAIT_LIMIT:
msg = (
f"Redis: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)

time.sleep(WAIT_INTERVAL)

if not ready:
msg = (
f"Redis: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)

logger.info("Redis: Readiness probe succeeded. Continuing...")
return


def wait_for_db(sender: Any, **kwargs: Any) -> None:
"""Waits for the db to become ready subject to a hardcoded timeout.
Will raise WorkerShutdown to kill the celery worker if the timeout is reached."""

WAIT_INTERVAL = 5
WAIT_LIMIT = 60

ready = False
time_start = time.monotonic()
logger.info("Database: Readiness probe starting.")
while True:
try:
with Session(get_sqlalchemy_engine()) as db_session:
result = db_session.execute(text("SELECT NOW()")).scalar()
if result:
ready = True
break
except Exception:
pass

time_elapsed = time.monotonic() - time_start
if time_elapsed > WAIT_LIMIT:
break

logger.info(
f"Database: Readiness probe failed. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
f"Database: Readiness probe ongoing. elapsed={time_elapsed:.1f} timeout={WAIT_LIMIT:.1f}"
)
if time_elapsed > WAIT_LIMIT:
msg = (
f"Database: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)

time.sleep(WAIT_INTERVAL)

if not ready:
msg = (
f"Database: Readiness probe did not succeed within the timeout "
f"({WAIT_LIMIT} seconds). Exiting..."
)
logger.error(msg)
raise WorkerShutdown(msg)

logger.info("Database: Readiness probe succeeded. Continuing...")
return

Expand Down

0 comments on commit 607bfa1

Please sign in to comment.